diff options
| author | Dan Engelbrecht <[email protected]> | 2025-09-10 16:38:33 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-09-10 16:38:33 +0200 |
| commit | 339668ac935f781c06225d2d685642e27348772b (patch) | |
| tree | a5552d166eef9b5c72a2f9a6903e584dfc8968d7 /src/zenstore/blockstore.cpp | |
| parent | faster oplog entries with referenceset (#488) (diff) | |
| download | zen-339668ac935f781c06225d2d685642e27348772b.tar.xz zen-339668ac935f781c06225d2d685642e27348772b.zip | |
add EMode to WorkerTheadPool to avoid thread starvation (#492)
- Improvement: Add a new mode to worker thread pools to avoid starvation of workers which could cause long stalls due to other work begin queued up. UE-305498
Diffstat (limited to 'src/zenstore/blockstore.cpp')
| -rw-r--r-- | src/zenstore/blockstore.cpp | 212 |
1 files changed, 112 insertions, 100 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 7b56c64bd..c50f2bb13 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -1686,80 +1686,82 @@ TEST_CASE("blockstore.iterate.chunks") Latch WorkLatch(1); Store.IterateChunks(Locations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool { WorkLatch.AddCount(1); - WorkerPool.ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - bool Continue = Store.IterateBlock( - Locations, - ChunkIndexes, - [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool { - switch (ChunkIndex) - { - case 0: - CHECK(Data); - CHECK(Size == FirstChunkData.size()); - CHECK(std::string((const char*)Data, Size) == FirstChunkData); - break; - case 1: - CHECK(Data); - CHECK(Size == SecondChunkData.size()); - CHECK(std::string((const char*)Data, Size) == SecondChunkData); - break; - case 2: - CHECK(false); - break; - case 3: - CHECK(!Data); - break; - case 4: - CHECK(!Data); - break; - case 5: - CHECK(!Data); - break; - default: - CHECK(false); - break; - } - return true; - }, - [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool { - switch (ChunkIndex) - { - case 0: - case 1: - CHECK(false); - break; - case 2: - { - CHECK(Size == VeryLargeChunk.size()); - char* Buffer = new char[Size]; - size_t HashOffset = 0; - File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) { - memcpy(&Buffer[HashOffset], Data, Size); - HashOffset += Size; - }); - CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0); - delete[] Buffer; - } - break; - case 3: - CHECK(false); - break; - case 4: - CHECK(false); - break; - case 5: - CHECK(false); - break; - default: - CHECK(false); - break; - } - return true; - }, - 0); - CHECK(Continue); - }); + WorkerPool.ScheduleWork( + [&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + bool Continue = Store.IterateBlock( + Locations, + ChunkIndexes, + [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool { + switch (ChunkIndex) + { + case 0: + CHECK(Data); + CHECK(Size == FirstChunkData.size()); + CHECK(std::string((const char*)Data, Size) == FirstChunkData); + break; + case 1: + CHECK(Data); + CHECK(Size == SecondChunkData.size()); + CHECK(std::string((const char*)Data, Size) == SecondChunkData); + break; + case 2: + CHECK(false); + break; + case 3: + CHECK(!Data); + break; + case 4: + CHECK(!Data); + break; + case 5: + CHECK(!Data); + break; + default: + CHECK(false); + break; + } + return true; + }, + [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool { + switch (ChunkIndex) + { + case 0: + case 1: + CHECK(false); + break; + case 2: + { + CHECK(Size == VeryLargeChunk.size()); + char* Buffer = new char[Size]; + size_t HashOffset = 0; + File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) { + memcpy(&Buffer[HashOffset], Data, Size); + HashOffset += Size; + }); + CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0); + delete[] Buffer; + } + break; + case 3: + CHECK(false); + break; + case 4: + CHECK(false); + break; + case 5: + CHECK(false); + break; + default: + CHECK(false); + break; + } + return true; + }, + 0); + CHECK(Continue); + }, + WorkerThreadPool::EMode::EnableBacklog); return true; }); WorkLatch.CountDown(); @@ -1796,11 +1798,15 @@ TEST_CASE("blockstore.thread.read.write") std::atomic<size_t> WorkCompleted = 0; for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { - WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() { - IoBuffer& Chunk = Chunks[ChunkIndex]; - Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations[ChunkIndex] = L; }); - WorkCompleted.fetch_add(1); - }); + WorkerPool.ScheduleWork( + [&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() { + IoBuffer& Chunk = Chunks[ChunkIndex]; + Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { + ChunkLocations[ChunkIndex] = L; + }); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); } while (WorkCompleted < Chunks.size()) { @@ -1810,13 +1816,15 @@ TEST_CASE("blockstore.thread.read.write") WorkCompleted = 0; for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { - WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { - IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); - CHECK(VerifyChunk); - IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); - CHECK(VerifyHash == ChunkHashes[ChunkIndex]); - WorkCompleted.fetch_add(1); - }); + WorkerPool.ScheduleWork( + [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { + IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); + CHECK(VerifyChunk); + IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); + CHECK(VerifyHash == ChunkHashes[ChunkIndex]); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); } while (WorkCompleted < Chunks.size()) { @@ -1828,20 +1836,24 @@ TEST_CASE("blockstore.thread.read.write") WorkCompleted = 0; for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { - WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() { - IoBuffer& Chunk = Chunks[ChunkIndex]; - Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { - SecondChunkLocations[ChunkIndex] = L; - }); - WorkCompleted.fetch_add(1); - }); - WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { - IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); - CHECK(VerifyChunk); - IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); - CHECK(VerifyHash == ChunkHashes[ChunkIndex]); - WorkCompleted.fetch_add(1); - }); + WorkerPool.ScheduleWork( + [&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() { + IoBuffer& Chunk = Chunks[ChunkIndex]; + Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { + SecondChunkLocations[ChunkIndex] = L; + }); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); + WorkerPool.ScheduleWork( + [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { + IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); + CHECK(VerifyChunk); + IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); + CHECK(VerifyHash == ChunkHashes[ChunkIndex]); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); } while (WorkCompleted < Chunks.size() * 2) { |