diff options
| author | Dan Engelbrecht <[email protected]> | 2025-09-10 16:38:33 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-09-10 16:38:33 +0200 |
| commit | 339668ac935f781c06225d2d685642e27348772b (patch) | |
| tree | a5552d166eef9b5c72a2f9a6903e584dfc8968d7 /src/zenstore/compactcas.cpp | |
| parent | faster oplog entries with referenceset (#488) (diff) | |
| download | zen-339668ac935f781c06225d2d685642e27348772b.tar.xz zen-339668ac935f781c06225d2d685642e27348772b.zip | |
add EMode to WorkerTheadPool to avoid thread starvation (#492)
- Improvement: Add a new mode to worker thread pools to avoid starvation of workers which could cause long stalls due to other work begin queued up. UE-305498
Diffstat (limited to 'src/zenstore/compactcas.cpp')
| -rw-r--r-- | src/zenstore/compactcas.cpp | 218 |
1 files changed, 116 insertions, 102 deletions
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index b00abb2cb..b7bfbd188 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -412,7 +412,7 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas std::atomic<bool> AbortFlag; { std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); try { const bool Continue = m_BlockStore.IterateChunks( @@ -1491,11 +1491,13 @@ TEST_CASE("compactcas.threadedinsert") { const IoHash& Hash = Chunk.first; const IoBuffer& Buffer = Chunk.second; - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Buffer, Hash]() { - CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash); - ZEN_ASSERT(InsertResult.New); - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Cas, &WorkCompleted, Buffer, Hash]() { + CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash); + ZEN_ASSERT(InsertResult.New); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); } while (WorkCompleted < Chunks.size()) { @@ -1511,13 +1513,15 @@ TEST_CASE("compactcas.threadedinsert") { for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, &Chunk]() { - IoHash ChunkHash = Chunk.first; - IoBuffer Buffer = Cas.FindChunk(ChunkHash); - IoHash Hash = IoHash::HashBuffer(Buffer); - CHECK(ChunkHash == Hash); - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Cas, &WorkCompleted, &Chunk]() { + IoHash ChunkHash = Chunk.first; + IoBuffer Buffer = Cas.FindChunk(ChunkHash); + IoHash Hash = IoHash::HashBuffer(Buffer); + CHECK(ChunkHash == Hash); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); } while (WorkCompleted < Chunks.size()) { @@ -1548,23 +1552,27 @@ TEST_CASE("compactcas.threadedinsert") for (const auto& Chunk : NewChunks) { - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() { - Cas.InsertChunk(Chunk.second, Chunk.first); - AddedChunkCount.fetch_add(1); - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() { + Cas.InsertChunk(Chunk.second, Chunk.first); + AddedChunkCount.fetch_add(1); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); } for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk]() { - IoHash ChunkHash = Chunk.first; - IoBuffer Buffer = Cas.FindChunk(ChunkHash); - if (Buffer) - { - CHECK(ChunkHash == IoHash::HashBuffer(Buffer)); - } - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Cas, &WorkCompleted, Chunk]() { + IoHash ChunkHash = Chunk.first; + IoBuffer Buffer = Cas.FindChunk(ChunkHash); + if (Buffer) + { + CHECK(ChunkHash == IoHash::HashBuffer(Buffer)); + } + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); } tsl::robin_set<IoHash, IoHash::Hasher> ChunksToDelete; @@ -1649,11 +1657,13 @@ TEST_CASE("compactcas.threadedinsert") WorkCompleted = 0; for (const IoHash& ChunkHash : GcChunkHashes) { - ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() { - CHECK(Cas.HaveChunk(ChunkHash)); - CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); - WorkCompleted.fetch_add(1); - }); + ThreadPool.ScheduleWork( + [&Cas, &WorkCompleted, ChunkHash]() { + CHECK(Cas.HaveChunk(ChunkHash)); + CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); } while (WorkCompleted < GcChunkHashes.size()) { @@ -1711,7 +1721,8 @@ TEST_CASE("compactcas.restart") RwLock::ExclusiveLockScope __(InsertLock); Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end()); } - }); + }, + WorkerThreadPool::EMode::DisableBacklog); Offset += BatchCount; } WorkLatch.CountDown(); @@ -1964,7 +1975,8 @@ TEST_CASE("compactcas.iteratechunks") RwLock::ExclusiveLockScope __(InsertLock); Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end()); } - }); + }, + WorkerThreadPool::EMode::EnableBacklog); Offset += BatchCount; } WorkLatch.CountDown(); @@ -1998,82 +2010,84 @@ TEST_CASE("compactcas.iteratechunks") for (size_t I = 0; I < 2; I++) { WorkLatch.AddCount(1); - ThreadPool.ScheduleWork([&Cas, &Hashes, &BatchWorkerPool, &WorkLatch, I]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - std::vector<IoHash> PartialHashes; - PartialHashes.reserve(Hashes.size() / 4); - for (size_t Index = 0; Index < Hashes.size(); Index++) - { - size_t TestIndex = Index + I; - if ((TestIndex % 7 == 1) || (TestIndex % 13 == 1) || (TestIndex % 17 == 1)) + ThreadPool.ScheduleWork( + [&Cas, &Hashes, &BatchWorkerPool, &WorkLatch, I]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + std::vector<IoHash> PartialHashes; + PartialHashes.reserve(Hashes.size() / 4); + for (size_t Index = 0; Index < Hashes.size(); Index++) { - PartialHashes.push_back(Hashes[Index]); + size_t TestIndex = Index + I; + if ((TestIndex % 7 == 1) || (TestIndex % 13 == 1) || (TestIndex % 17 == 1)) + { + PartialHashes.push_back(Hashes[Index]); + } } - } - std::reverse(PartialHashes.begin(), PartialHashes.end()); + std::reverse(PartialHashes.begin(), PartialHashes.end()); - std::vector<IoHash> NoFoundHashes; - std::vector<size_t> NoFindIndexes; + std::vector<IoHash> NoFoundHashes; + std::vector<size_t> NoFindIndexes; - NoFoundHashes.reserve(9); - for (size_t J = 0; J < 9; J++) - { - std::string Data = fmt::format("oh no, we don't exist {}", J + 1); - NoFoundHashes.push_back(IoHash::HashBuffer(Data.data(), Data.length())); - } - - NoFindIndexes.reserve(9); - - // Sprinkle in chunks that are not found! - auto It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0, NoFoundHashes[0]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0 + 1, NoFoundHashes[1]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1, NoFoundHashes[2]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1 + 1, NoFoundHashes[3]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 2, NoFoundHashes[4]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3, NoFoundHashes[5]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3 + 1, NoFoundHashes[6]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 4, NoFoundHashes[7]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - It = PartialHashes.insert(PartialHashes.end(), NoFoundHashes[8]); - NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); - - std::vector<std::atomic<bool>> FoundFlags(PartialHashes.size() + NoFoundHashes.size()); - std::vector<std::atomic<uint32_t>> FetchedCounts(PartialHashes.size() + NoFoundHashes.size()); - - CHECK(Cas.IterateChunks( - PartialHashes, - [&PartialHashes, &FoundFlags, &FetchedCounts, &NoFindIndexes](size_t Index, const IoBuffer& Payload) { - CHECK_EQ(NoFindIndexes.end(), std::find(NoFindIndexes.begin(), NoFindIndexes.end(), Index)); - uint32_t PreviousCount = FetchedCounts[Index].fetch_add(1); - CHECK(PreviousCount == 0); - FoundFlags[Index] = !!Payload; - const IoHash& Hash = PartialHashes[Index]; - CHECK(Hash == IoHash::HashBuffer(Payload)); - return true; - }, - &BatchWorkerPool, - 2048u)); - - for (size_t FoundIndex = 0; FoundIndex < PartialHashes.size(); FoundIndex++) - { - CHECK(FetchedCounts[FoundIndex].load() <= 1); - if (std::find(NoFindIndexes.begin(), NoFindIndexes.end(), FoundIndex) == NoFindIndexes.end()) + NoFoundHashes.reserve(9); + for (size_t J = 0; J < 9; J++) { - CHECK(FoundFlags[FoundIndex]); + std::string Data = fmt::format("oh no, we don't exist {}", J + 1); + NoFoundHashes.push_back(IoHash::HashBuffer(Data.data(), Data.length())); } - else + + NoFindIndexes.reserve(9); + + // Sprinkle in chunks that are not found! + auto It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0, NoFoundHashes[0]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0 + 1, NoFoundHashes[1]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1, NoFoundHashes[2]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1 + 1, NoFoundHashes[3]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 2, NoFoundHashes[4]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3, NoFoundHashes[5]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3 + 1, NoFoundHashes[6]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 4, NoFoundHashes[7]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + It = PartialHashes.insert(PartialHashes.end(), NoFoundHashes[8]); + NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It)); + + std::vector<std::atomic<bool>> FoundFlags(PartialHashes.size() + NoFoundHashes.size()); + std::vector<std::atomic<uint32_t>> FetchedCounts(PartialHashes.size() + NoFoundHashes.size()); + + CHECK(Cas.IterateChunks( + PartialHashes, + [&PartialHashes, &FoundFlags, &FetchedCounts, &NoFindIndexes](size_t Index, const IoBuffer& Payload) { + CHECK_EQ(NoFindIndexes.end(), std::find(NoFindIndexes.begin(), NoFindIndexes.end(), Index)); + uint32_t PreviousCount = FetchedCounts[Index].fetch_add(1); + CHECK(PreviousCount == 0); + FoundFlags[Index] = !!Payload; + const IoHash& Hash = PartialHashes[Index]; + CHECK(Hash == IoHash::HashBuffer(Payload)); + return true; + }, + &BatchWorkerPool, + 2048u)); + + for (size_t FoundIndex = 0; FoundIndex < PartialHashes.size(); FoundIndex++) { - CHECK(!FoundFlags[FoundIndex]); + CHECK(FetchedCounts[FoundIndex].load() <= 1); + if (std::find(NoFindIndexes.begin(), NoFindIndexes.end(), FoundIndex) == NoFindIndexes.end()) + { + CHECK(FoundFlags[FoundIndex]); + } + else + { + CHECK(!FoundFlags[FoundIndex]); + } } - } - }); + }, + WorkerThreadPool::EMode::EnableBacklog); } WorkLatch.CountDown(); WorkLatch.Wait(); |