diff options
| author | Dan Engelbrecht <[email protected]> | 2025-09-10 16:38:33 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-09-10 16:38:33 +0200 |
| commit | 339668ac935f781c06225d2d685642e27348772b (patch) | |
| tree | a5552d166eef9b5c72a2f9a6903e584dfc8968d7 /src/zenstore | |
| parent | faster oplog entries with referenceset (#488) (diff) | |
| download | zen-339668ac935f781c06225d2d685642e27348772b.tar.xz zen-339668ac935f781c06225d2d685642e27348772b.zip | |
add EMode to WorkerTheadPool to avoid thread starvation (#492)
- Improvement: Add a new mode to worker thread pools to avoid starvation of workers which could cause long stalls due to other work begin queued up. UE-305498
Diffstat (limited to 'src/zenstore')
| -rw-r--r-- | src/zenstore/blockstore.cpp | 212 | ||||
| -rw-r--r-- | src/zenstore/buildstore/buildstore.cpp | 2 | ||||
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 7 | ||||
| -rw-r--r-- | src/zenstore/cache/structuredcachestore.cpp | 104 | ||||
| -rw-r--r-- | src/zenstore/cas.cpp | 19 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 218 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 2 | ||||
| -rw-r--r-- | src/zenstore/gc.cpp | 188 | ||||
| -rw-r--r-- | src/zenstore/workspaces.cpp | 24 |
9 files changed, 416 insertions, 360 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 7b56c64bd..c50f2bb13 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -1686,80 +1686,82 @@ TEST_CASE("blockstore.iterate.chunks") Latch WorkLatch(1); Store.IterateChunks(Locations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool { WorkLatch.AddCount(1); - WorkerPool.ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - bool Continue = Store.IterateBlock( - Locations, - ChunkIndexes, - [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool { - switch (ChunkIndex) - { - case 0: - CHECK(Data); - CHECK(Size == FirstChunkData.size()); - CHECK(std::string((const char*)Data, Size) == FirstChunkData); - break; - case 1: - CHECK(Data); - CHECK(Size == SecondChunkData.size()); - CHECK(std::string((const char*)Data, Size) == SecondChunkData); - break; - case 2: - CHECK(false); - break; - case 3: - CHECK(!Data); - break; - case 4: - CHECK(!Data); - break; - case 5: - CHECK(!Data); - break; - default: - CHECK(false); - break; - } - return true; - }, - [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool { - switch (ChunkIndex) - { - case 0: - case 1: - CHECK(false); - break; - case 2: - { - CHECK(Size == VeryLargeChunk.size()); - char* Buffer = new char[Size]; - size_t HashOffset = 0; - File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) { - memcpy(&Buffer[HashOffset], Data, Size); - HashOffset += Size; - }); - CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0); - delete[] Buffer; - } - break; - case 3: - CHECK(false); - break; - case 4: - CHECK(false); - break; - case 5: - CHECK(false); - break; - default: - CHECK(false); - break; - } - return true; - }, - 0); - CHECK(Continue); - }); + WorkerPool.ScheduleWork( + [&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + bool Continue = Store.IterateBlock( + Locations, + ChunkIndexes, + [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool { + switch (ChunkIndex) + { + case 0: + CHECK(Data); + CHECK(Size == FirstChunkData.size()); + CHECK(std::string((const char*)Data, Size) == FirstChunkData); + break; + case 1: + CHECK(Data); + CHECK(Size == SecondChunkData.size()); + CHECK(std::string((const char*)Data, Size) == SecondChunkData); + break; + case 2: + CHECK(false); + break; + case 3: + CHECK(!Data); + break; + case 4: + CHECK(!Data); + break; + case 5: + CHECK(!Data); + break; + default: + CHECK(false); + break; + } + return true; + }, + [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool { + switch (ChunkIndex) + { + case 0: + case 1: + CHECK(false); + break; + case 2: + { + CHECK(Size == VeryLargeChunk.size()); + char* Buffer = new char[Size]; + size_t HashOffset = 0; + File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) { + memcpy(&Buffer[HashOffset], Data, Size); + HashOffset += Size; + }); + CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0); + delete[] Buffer; + } + break; + case 3: + CHECK(false); + break; + case 4: + CHECK(false); + break; + case 5: + CHECK(false); + break; + default: + CHECK(false); + break; + } + return true; + }, + 0); + CHECK(Continue); + }, + WorkerThreadPool::EMode::EnableBacklog); return true; }); WorkLatch.CountDown(); @@ -1796,11 +1798,15 @@ TEST_CASE("blockstore.thread.read.write") std::atomic<size_t> WorkCompleted = 0; for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { - WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() { - IoBuffer& Chunk = Chunks[ChunkIndex]; - Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations[ChunkIndex] = L; }); - WorkCompleted.fetch_add(1); - }); + WorkerPool.ScheduleWork( + [&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() { + IoBuffer& Chunk = Chunks[ChunkIndex]; + Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { + ChunkLocations[ChunkIndex] = L; + }); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); } while (WorkCompleted < Chunks.size()) { @@ -1810,13 +1816,15 @@ TEST_CASE("blockstore.thread.read.write") WorkCompleted = 0; for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { - WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { - IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); - CHECK(VerifyChunk); - IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); - CHECK(VerifyHash == ChunkHashes[ChunkIndex]); - WorkCompleted.fetch_add(1); - }); + WorkerPool.ScheduleWork( + [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { + IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); + CHECK(VerifyChunk); + IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); + CHECK(VerifyHash == ChunkHashes[ChunkIndex]); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); } while (WorkCompleted < Chunks.size()) { @@ -1828,20 +1836,24 @@ TEST_CASE("blockstore.thread.read.write") WorkCompleted = 0; for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { - WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() { - IoBuffer& Chunk = Chunks[ChunkIndex]; - Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { - SecondChunkLocations[ChunkIndex] = L; - }); - WorkCompleted.fetch_add(1); - }); - WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { - IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); - CHECK(VerifyChunk); - IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); - CHECK(VerifyHash == ChunkHashes[ChunkIndex]); - WorkCompleted.fetch_add(1); - }); + WorkerPool.ScheduleWork( + [&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() { + IoBuffer& Chunk = Chunks[ChunkIndex]; + Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { + SecondChunkLocations[ChunkIndex] = L; + }); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); + WorkerPool.ScheduleWork( + [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { + IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); + CHECK(VerifyChunk); + IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); + CHECK(VerifyHash == ChunkHashes[ChunkIndex]); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); } while (WorkCompleted < Chunks.size() * 2) { diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp index d65c2bf06..4539746ba 100644 --- a/src/zenstore/buildstore/buildstore.cpp +++ b/src/zenstore/buildstore/buildstore.cpp @@ -376,7 +376,7 @@ BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoB { std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); for (size_t Index = 0; Index < Metadatas.size(); Index++) { Work.ScheduleWork( diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index cacbbd966..fd52cdab5 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -4215,7 +4215,7 @@ ZenCacheDiskLayer::DiscoverBuckets() WorkerThreadPool& Pool = GetLargeWorkerPool(EWorkloadType::Burst); std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); try { for (auto& BucketPath : FoundBucketDirectories) @@ -4387,7 +4387,7 @@ ZenCacheDiskLayer::Flush() WorkerThreadPool& Pool = GetMediumWorkerPool(EWorkloadType::Burst); std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); try { for (auto& Bucket : Buckets) @@ -4434,7 +4434,8 @@ ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) { # if 1 Results.push_back(Ctx.ThreadPool().EnqueueTask( - std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }})); + std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }}, + WorkerThreadPool::EMode::EnableBacklog)); # else CacheBucket& Bucket = *Kv.second; Bucket.ScrubStorage(Ctx); diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index 1f2d6c37f..3f27e6d21 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -1574,10 +1574,12 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) std::atomic<size_t> WorkCompleted = 0; for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { - Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false); - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Zcs, &WorkCompleted, &Chunk]() { + Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::EnableBacklog); } while (WorkCompleted < Chunks.size()) { @@ -1612,16 +1614,18 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) std::atomic<size_t> WorkCompleted = 0; for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { - std::string Bucket = Chunk.second.Bucket; - IoHash ChunkHash = Chunk.first; - ZenCacheValue CacheValue; + ThreadPool.ScheduleWork( + [&Zcs, &WorkCompleted, &Chunk]() { + std::string Bucket = Chunk.second.Bucket; + IoHash ChunkHash = Chunk.first; + ZenCacheValue CacheValue; - CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue)); - IoHash Hash = IoHash::HashBuffer(CacheValue.Value); - CHECK(ChunkHash == Hash); - WorkCompleted.fetch_add(1); - }); + CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue)); + IoHash Hash = IoHash::HashBuffer(CacheValue.Value); + CHECK(ChunkHash == Hash); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::EnableBacklog); } while (WorkCompleted < Chunks.size()) { @@ -1655,23 +1659,27 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) std::atomic_uint32_t AddedChunkCount = 0; for (const auto& Chunk : NewChunks) { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() { - Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false); - AddedChunkCount.fetch_add(1); - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() { + Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false); + AddedChunkCount.fetch_add(1); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::EnableBacklog); } for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() { - ZenCacheValue CacheValue; - if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) - { - CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); - } - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Zcs, &WorkCompleted, Chunk]() { + ZenCacheValue CacheValue; + if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) + { + CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); + } + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::EnableBacklog); } while (AddedChunkCount.load() < NewChunks.size()) { @@ -1710,12 +1718,14 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) std::atomic<size_t> WorkCompleted = 0; for (const auto& Chunk : GcChunkHashes) { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() { - ZenCacheValue CacheValue; - CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue)); - CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Zcs, &WorkCompleted, Chunk]() { + ZenCacheValue CacheValue; + CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue)); + CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::EnableBacklog); } while (WorkCompleted < GcChunkHashes.size()) { @@ -1848,11 +1858,13 @@ TEST_CASE("cachestore.drop.bucket") CHECK(Value1.Value); std::atomic_bool WorkComplete = false; - Workers.ScheduleWork([&]() { - zen::Sleep(100); - Value1.Value = IoBuffer{}; - WorkComplete = true; - }); + Workers.ScheduleWork( + [&]() { + zen::Sleep(100); + Value1.Value = IoBuffer{}; + WorkComplete = true; + }, + WorkerThreadPool::EMode::EnableBacklog); // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket // Our DropBucket execution blocks any incoming request from completing until we are done with the drop CHECK(Zcs.DropBucket(Namespace, Bucket)); @@ -1931,14 +1943,16 @@ TEST_CASE("cachestore.drop.namespace") CHECK(Value4.Value); std::atomic_bool WorkComplete = false; - Workers.ScheduleWork([&]() { - zen::Sleep(100); - Value1.Value = IoBuffer{}; - Value2.Value = IoBuffer{}; - Value3.Value = IoBuffer{}; - Value4.Value = IoBuffer{}; - WorkComplete = true; - }); + Workers.ScheduleWork( + [&]() { + zen::Sleep(100); + Value1.Value = IoBuffer{}; + Value2.Value = IoBuffer{}; + Value3.Value = IoBuffer{}; + Value4.Value = IoBuffer{}; + WorkComplete = true; + }, + WorkerThreadPool::EMode::EnableBacklog); // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket // Our DropBucket execution blocks any incoming request from completing until we are done with the drop CHECK(Zcs.DropNamespace(Namespace1)); diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp index 6b89beb3d..49d24c21e 100644 --- a/src/zenstore/cas.cpp +++ b/src/zenstore/cas.cpp @@ -132,13 +132,18 @@ CasImpl::Initialize(const CidStoreConfiguration& InConfig) WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Burst); std::vector<std::future<void>> Work; Work.emplace_back( - WorkerPool.EnqueueTask(std::packaged_task<void()>{[&]() { m_LargeStrategy.Initialize(m_Config.RootDirectory, IsNewStore); }})); - Work.emplace_back(WorkerPool.EnqueueTask(std::packaged_task<void()>{[&]() { - m_TinyStrategy.Initialize(m_Config.RootDirectory, "tobs", 1u << 28, 16, IsNewStore); // 256 Mb per block - }})); - Work.emplace_back(WorkerPool.EnqueueTask(std::packaged_task<void()>{[&]() { - m_SmallStrategy.Initialize(m_Config.RootDirectory, "sobs", 1u << 30, 4096, IsNewStore); // 1 Gb per block - }})); + WorkerPool.EnqueueTask(std::packaged_task<void()>{[&]() { m_LargeStrategy.Initialize(m_Config.RootDirectory, IsNewStore); }}, + WorkerThreadPool::EMode::DisableBacklog)); + Work.emplace_back(WorkerPool.EnqueueTask( + std::packaged_task<void()>{[&]() { + m_TinyStrategy.Initialize(m_Config.RootDirectory, "tobs", 1u << 28, 16, IsNewStore); // 256 Mb per block + }}, + WorkerThreadPool::EMode::DisableBacklog)); + Work.emplace_back(WorkerPool.EnqueueTask( + std::packaged_task<void()>{[&]() { + m_SmallStrategy.Initialize(m_Config.RootDirectory, "sobs", 1u << 30, 4096, IsNewStore); // 1 Gb per block + }}, + WorkerThreadPool::EMode::DisableBacklog)); for (std::future<void>& Result : Work) { if (Result.valid()) diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index b00abb2cb..b7bfbd188 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -412,7 +412,7 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas std::atomic<bool> AbortFlag; { std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); try { const bool Continue = m_BlockStore.IterateChunks( @@ -1491,11 +1491,13 @@ TEST_CASE("compactcas.threadedinsert") { const IoHash& Hash = Chunk.first; const IoBuffer& Buffer = Chunk.second; - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Buffer, Hash]() { - CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash); - ZEN_ASSERT(InsertResult.New); - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Cas, &WorkCompleted, Buffer, Hash]() { + CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash); + ZEN_ASSERT(InsertResult.New); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); } while (WorkCompleted < Chunks.size()) { @@ -1511,13 +1513,15 @@ TEST_CASE("compactcas.threadedinsert") { for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, &Chunk]() { - IoHash ChunkHash = Chunk.first; - IoBuffer Buffer = Cas.FindChunk(ChunkHash); - IoHash Hash = IoHash::HashBuffer(Buffer); - CHECK(ChunkHash == Hash); - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Cas, &WorkCompleted, &Chunk]() { + IoHash ChunkHash = Chunk.first; + IoBuffer Buffer = Cas.FindChunk(ChunkHash); + IoHash Hash = IoHash::HashBuffer(Buffer); + CHECK(ChunkHash == Hash); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); } while (WorkCompleted < Chunks.size()) { @@ -1548,23 +1552,27 @@ TEST_CASE("compactcas.threadedinsert") for (const auto& Chunk : NewChunks) { - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() { - Cas.InsertChunk(Chunk.second, Chunk.first); - AddedChunkCount.fetch_add(1); - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() { + Cas.InsertChunk(Chunk.second, Chunk.first); + AddedChunkCount.fetch_add(1); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); } for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk]() { - IoHash ChunkHash = Chunk.first; - IoBuffer Buffer = Cas.FindChunk(ChunkHash); - if (Buffer) - { - CHECK(ChunkHash == IoHash::HashBuffer(Buffer)); - } - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Cas, &WorkCompleted, Chunk]() { + IoHash ChunkHash = Chunk.first; + IoBuffer Buffer = Cas.FindChunk(ChunkHash); + if (Buffer) + { + CHECK(ChunkHash == IoHash::HashBuffer(Buffer)); + } + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); } tsl::robin_set<IoHash, IoHash::Hasher> ChunksToDelete; @@ -1649,11 +1657,13 @@ TEST_CASE("compactcas.threadedinsert") WorkCompleted = 0; for (const IoHash& ChunkHash : GcChunkHashes) { - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() { - CHECK(Cas.HaveChunk(ChunkHash)); - CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Cas, &WorkCompleted, ChunkHash]() { + CHECK(Cas.HaveChunk(ChunkHash)); + CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); } while (WorkCompleted < GcChunkHashes.size()) { @@ -1711,7 +1721,8 @@ TEST_CASE("compactcas.restart") RwLock::ExclusiveLockScope __(InsertLock); Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end()); } - }); + }, + WorkerThreadPool::EMode::DisableBacklog); Offset += BatchCount; } WorkLatch.CountDown(); @@ -1964,7 +1975,8 @@ TEST_CASE("compactcas.iteratechunks") RwLock::ExclusiveLockScope __(InsertLock); Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end()); } - }); + }, + WorkerThreadPool::EMode::EnableBacklog); Offset += BatchCount; } WorkLatch.CountDown(); @@ -1998,82 +2010,84 @@ TEST_CASE("compactcas.iteratechunks") for (size_t I = 0; I < 2; I++) { WorkLatch.AddCount(1); - ThreadPool.ScheduleWork([&Cas, &Hashes, &BatchWorkerPool, &WorkLatch, I]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - std::vector<IoHash> PartialHashes; - PartialHashes.reserve(Hashes.size() / 4); - for (size_t Index = 0; Index < Hashes.size(); Index++) - { - size_t TestIndex = Index + I; - if ((TestIndex % 7 == 1) || (TestIndex % 13 == 1) || (TestIndex % 17 == 1)) + ThreadPool.ScheduleWork( + [&Cas, &Hashes, &BatchWorkerPool, &WorkLatch, I]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + std::vector<IoHash> PartialHashes; + PartialHashes.reserve(Hashes.size() / 4); + for (size_t Index = 0; Index < Hashes.size(); Index++) { - PartialHashes.push_back(Hashes[Index]); + size_t TestIndex = Index + I; + if ((TestIndex % 7 == 1) || (TestIndex % 13 == 1) || (TestIndex % 17 == 1)) + { + PartialHashes.push_back(Hashes[Index]); + } } - } - std::reverse(PartialHashes.begin(), PartialHashes.end()); + std::reverse(PartialHashes.begin(), PartialHashes.end()); - std::vector<IoHash> NoFoundHashes; - std::vector<size_t> NoFindIndexes; + std::vector<IoHash> NoFoundHashes; + std::vector<size_t> NoFindIndexes; - NoFoundHashes.reserve(9); - for (size_t J = 0; J < 9; J++) - { - std::string Data = fmt::format("oh no, we don't exist {}", J + 1); - NoFoundHashes.push_back(IoHash::HashBuffer(Data.data(), Data.length())); - } - - NoFindIndexes.reserve(9); - - // Sprinkle in chunks that are not found! - auto It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0, NoFoundHashes[0]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0 + 1, NoFoundHashes[1]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1, NoFoundHashes[2]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1 + 1, NoFoundHashes[3]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 2, NoFoundHashes[4]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3, NoFoundHashes[5]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3 + 1, NoFoundHashes[6]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 4, NoFoundHashes[7]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - It = PartialHashes.insert(PartialHashes.end(), NoFoundHashes[8]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - - std::vector<std::atomic<bool>> FoundFlags(PartialHashes.size() + NoFoundHashes.size()); - std::vector<std::atomic<uint32_t>> FetchedCounts(PartialHashes.size() + NoFoundHashes.size()); - - CHECK(Cas.IterateChunks( - PartialHashes, - [&PartialHashes, &FoundFlags, &FetchedCounts, &NoFindIndexes](size_t Index, const IoBuffer& Payload) { - CHECK_EQ(NoFindIndexes.end(), std::find(NoFindIndexes.begin(), NoFindIndexes.end(), Index)); - uint32_t PreviousCount = FetchedCounts[Index].fetch_add(1); - CHECK(PreviousCount == 0); - FoundFlags[Index] = !!Payload; - const IoHash& Hash = PartialHashes[Index]; - CHECK(Hash == IoHash::HashBuffer(Payload)); - return true; - }, - &BatchWorkerPool, - 2048u)); - - for (size_t FoundIndex = 0; FoundIndex < PartialHashes.size(); FoundIndex++) - { - CHECK(FetchedCounts[FoundIndex].load() <= 1); - if (std::find(NoFindIndexes.begin(), NoFindIndexes.end(), FoundIndex) == NoFindIndexes.end()) + NoFoundHashes.reserve(9); + for (size_t J = 0; J < 9; J++) { - CHECK(FoundFlags[FoundIndex]); + std::string Data = fmt::format("oh no, we don't exist {}", J + 1); + NoFoundHashes.push_back(IoHash::HashBuffer(Data.data(), Data.length())); } - else + + NoFindIndexes.reserve(9); + + // Sprinkle in chunks that are not found! + auto It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0, NoFoundHashes[0]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0 + 1, NoFoundHashes[1]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1, NoFoundHashes[2]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1 + 1, NoFoundHashes[3]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 2, NoFoundHashes[4]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3, NoFoundHashes[5]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3 + 1, NoFoundHashes[6]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 4, NoFoundHashes[7]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.end(), NoFoundHashes[8]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + + std::vector<std::atomic<bool>> FoundFlags(PartialHashes.size() + NoFoundHashes.size()); + std::vector<std::atomic<uint32_t>> FetchedCounts(PartialHashes.size() + NoFoundHashes.size()); + + CHECK(Cas.IterateChunks( + PartialHashes, + [&PartialHashes, &FoundFlags, &FetchedCounts, &NoFindIndexes](size_t Index, const IoBuffer& Payload) { + CHECK_EQ(NoFindIndexes.end(), std::find(NoFindIndexes.begin(), NoFindIndexes.end(), Index)); + uint32_t PreviousCount = FetchedCounts[Index].fetch_add(1); + CHECK(PreviousCount == 0); + FoundFlags[Index] = !!Payload; + const IoHash& Hash = PartialHashes[Index]; + CHECK(Hash == IoHash::HashBuffer(Payload)); + return true; + }, + &BatchWorkerPool, + 2048u)); + + for (size_t FoundIndex = 0; FoundIndex < PartialHashes.size(); FoundIndex++) { - CHECK(!FoundFlags[FoundIndex]); + CHECK(FetchedCounts[FoundIndex].load() <= 1); + if (std::find(NoFindIndexes.begin(), NoFindIndexes.end(), FoundIndex) == NoFindIndexes.end()) + { + CHECK(FoundFlags[FoundIndex]); + } + else + { + CHECK(!FoundFlags[FoundIndex]); + } } - } - }); + }, + WorkerThreadPool::EMode::EnableBacklog); } WorkLatch.CountDown(); WorkLatch.Wait(); diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 68644be2d..365a933c1 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -665,7 +665,7 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); try { for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++) diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index 5023695b2..b08e6a3ca 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -869,7 +869,8 @@ GcManager::CollectGarbage(const GcSettings& Settings) Ex.what()); SetCancelGC(true); } - }); + }, + WorkerThreadPool::EMode::DisableBacklog); } WorkLeft.CountDown(); WorkLeft.Wait(); @@ -981,7 +982,8 @@ GcManager::CollectGarbage(const GcSettings& Settings) SetCancelGC(true); } } - }); + }, + WorkerThreadPool::EMode::DisableBacklog); } WorkLeft.CountDown(); WorkLeft.Wait(); @@ -1021,77 +1023,80 @@ GcManager::CollectGarbage(const GcSettings& Settings) GcReferencer* Referencer = m_GcReferencers[Index]; std::pair<std::string, GcReferencerStats>* ReferemcerStats = &Result.ReferencerStats[Index]; WorkLeft.AddCount(1); - ParallelWorkThreadPool.ScheduleWork([this, - &Ctx, - &WorkLeft, - Referencer, - Index, - Result = &Result, - ReferemcerStats, - &ReferenceValidatorsLock, - &ReferenceValidators]() { - ZEN_MEMSCOPE(GetGcTag()); + ParallelWorkThreadPool.ScheduleWork( + [this, + &Ctx, + &WorkLeft, + Referencer, + Index, + Result = &Result, + ReferemcerStats, + &ReferenceValidatorsLock, + &ReferenceValidators]() { + ZEN_MEMSCOPE(GetGcTag()); - auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); - std::vector<GcReferenceValidator*> Validators; - auto __ = MakeGuard([&Validators]() { - while (!Validators.empty()) - { - delete Validators.back(); - Validators.pop_back(); - } - }); - try - { + auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); + std::vector<GcReferenceValidator*> Validators; + auto __ = MakeGuard([&Validators]() { + while (!Validators.empty()) + { + delete Validators.back(); + Validators.pop_back(); + } + }); + try { - SCOPED_TIMER(ReferemcerStats->second.CreateReferenceValidatorsMS = - std::chrono::milliseconds(Timer.GetElapsedTimeMs());); - Validators = Referencer->CreateReferenceValidators(Ctx); + { + SCOPED_TIMER(ReferemcerStats->second.CreateReferenceValidatorsMS = + std::chrono::milliseconds(Timer.GetElapsedTimeMs());); + Validators = Referencer->CreateReferenceValidators(Ctx); + } + if (!Validators.empty()) + { + RwLock::ExclusiveLockScope __(ReferenceValidatorsLock); + for (auto& ReferenceValidator : Validators) + { + size_t ReferencesStatsIndex = Result->ReferenceValidatorStats.size(); + Result->ReferenceValidatorStats.push_back({ReferenceValidator->GetGcName(Ctx), {}}); + ReferenceValidators.insert_or_assign( + std::unique_ptr<GcReferenceValidator>(ReferenceValidator), + ReferencesStatsIndex); + ReferenceValidator = nullptr; + } + } } - if (!Validators.empty()) + catch (const std::system_error& Ex) { - RwLock::ExclusiveLockScope __(ReferenceValidatorsLock); - for (auto& ReferenceValidator : Validators) + if (IsOOD(Ex) || IsOOM(Ex)) { - size_t ReferencesStatsIndex = Result->ReferenceValidatorStats.size(); - Result->ReferenceValidatorStats.push_back({ReferenceValidator->GetGcName(Ctx), {}}); - ReferenceValidators.insert_or_assign(std::unique_ptr<GcReferenceValidator>(ReferenceValidator), - ReferencesStatsIndex); - ReferenceValidator = nullptr; + ZEN_WARN("GCV2: Failed creating reference validators for {}. Reason: '{}'", + Referencer->GetGcName(Ctx), + Ex.what()); + } + else + { + ZEN_ERROR("GCV2: Failed creating reference validators for {}. Reason: '{}'", + Referencer->GetGcName(Ctx), + Ex.what()); } + SetCancelGC(true); } - } - catch (const std::system_error& Ex) - { - if (IsOOD(Ex) || IsOOM(Ex)) + catch (const std::bad_alloc& Ex) { - ZEN_WARN("GCV2: Failed creating reference validators for {}. Reason: '{}'", - Referencer->GetGcName(Ctx), - Ex.what()); + ZEN_ERROR("GCV2: Failed creating reference validators for {}. Reason: '{}'", + Referencer->GetGcName(Ctx), + Ex.what()); + SetCancelGC(true); } - else + catch (const std::exception& Ex) { ZEN_ERROR("GCV2: Failed creating reference validators for {}. Reason: '{}'", Referencer->GetGcName(Ctx), Ex.what()); + SetCancelGC(true); } - SetCancelGC(true); - } - catch (const std::bad_alloc& Ex) - { - ZEN_ERROR("GCV2: Failed creating reference validators for {}. Reason: '{}'", - Referencer->GetGcName(Ctx), - Ex.what()); - SetCancelGC(true); - } - catch (const std::exception& Ex) - { - ZEN_ERROR("GCV2: Failed creating reference validators for {}. Reason: '{}'", - Referencer->GetGcName(Ctx), - Ex.what()); - SetCancelGC(true); - } - }); + }, + WorkerThreadPool::EMode::DisableBacklog); } WorkLeft.CountDown(); WorkLeft.Wait(); @@ -1221,47 +1226,49 @@ GcManager::CollectGarbage(const GcSettings& Settings) size_t Index = It.second; std::pair<std::string, GcReferencerStats>* Stats = &Result.ReferencerStats[Index]; WorkLeft.AddCount(1); - LockedPhaseThreadPool.ScheduleWork([this, &Ctx, Checker, Index, Stats, &WorkLeft]() { - ZEN_MEMSCOPE(GetGcTag()); + LockedPhaseThreadPool.ScheduleWork( + [this, &Ctx, Checker, Index, Stats, &WorkLeft]() { + ZEN_MEMSCOPE(GetGcTag()); - auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); - try - { - SCOPED_TIMER(Stats->second.UpdateLockedStateMS = - std::chrono::milliseconds(Timer.GetElapsedTimeMs());); - Checker->UpdateLockedState(Ctx); - } - catch (const std::system_error& Ex) - { - if (IsOOD(Ex) || IsOOM(Ex)) + auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); + try + { + SCOPED_TIMER(Stats->second.UpdateLockedStateMS = + std::chrono::milliseconds(Timer.GetElapsedTimeMs());); + Checker->UpdateLockedState(Ctx); + } + catch (const std::system_error& Ex) + { + if (IsOOD(Ex) || IsOOM(Ex)) + { + ZEN_WARN("GCV2: Failed Updating locked state for {}. Reason: '{}'", + Checker->GetGcName(Ctx), + Ex.what()); + } + else + { + ZEN_ERROR("GCV2: Failed Updating locked state for {}. Reason: '{}'", + Checker->GetGcName(Ctx), + Ex.what()); + } + SetCancelGC(true); + } + catch (const std::bad_alloc& Ex) { ZEN_WARN("GCV2: Failed Updating locked state for {}. Reason: '{}'", Checker->GetGcName(Ctx), Ex.what()); + SetCancelGC(true); } - else + catch (const std::exception& Ex) { ZEN_ERROR("GCV2: Failed Updating locked state for {}. Reason: '{}'", Checker->GetGcName(Ctx), Ex.what()); + SetCancelGC(true); } - SetCancelGC(true); - } - catch (const std::bad_alloc& Ex) - { - ZEN_WARN("GCV2: Failed Updating locked state for {}. Reason: '{}'", - Checker->GetGcName(Ctx), - Ex.what()); - SetCancelGC(true); - } - catch (const std::exception& Ex) - { - ZEN_ERROR("GCV2: Failed Updating locked state for {}. Reason: '{}'", - Checker->GetGcName(Ctx), - Ex.what()); - SetCancelGC(true); - } - }); + }, + WorkerThreadPool::EMode::EnableBacklog); } WorkLeft.CountDown(); WorkLeft.Wait(); @@ -1373,7 +1380,8 @@ GcManager::CollectGarbage(const GcSettings& Settings) Ex.what()); SetCancelGC(true); } - }); + }, + WorkerThreadPool::EMode::EnableBacklog); } WorkLeft.CountDown(); WorkLeft.Wait(); diff --git a/src/zenstore/workspaces.cpp b/src/zenstore/workspaces.cpp index 0ca2adab2..4e7bd79a3 100644 --- a/src/zenstore/workspaces.cpp +++ b/src/zenstore/workspaces.cpp @@ -622,17 +622,19 @@ Workspaces::GetWorkspaceShareChunks(const Oid& WorkspaceId, for (size_t Index = 0; Index < ChunkRequests.size(); Index++) { WorkLatch.AddCount(1); - WorkerPool.ScheduleWork([&, Index]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - try - { - Chunks[Index] = GetOne(RootPath, *WorkspaceAndShare.second, ChunkRequests[Index]); - } - catch (const std::exception& Ex) - { - ZEN_WARN("Exception while fetching chunks, chunk {}: {}", ChunkRequests[Index].ChunkId, Ex.what()); - } - }); + WorkerPool.ScheduleWork( + [&, Index]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + try + { + Chunks[Index] = GetOne(RootPath, *WorkspaceAndShare.second, ChunkRequests[Index]); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Exception while fetching chunks, chunk {}: {}", ChunkRequests[Index].ChunkId, Ex.what()); + } + }, + WorkerThreadPool::EMode::DisableBacklog); } WorkLatch.CountDown(); WorkLatch.Wait(); |