aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/blockstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-09-10 16:38:33 +0200
committerGitHub Enterprise <[email protected]>2025-09-10 16:38:33 +0200
commit339668ac935f781c06225d2d685642e27348772b (patch)
treea5552d166eef9b5c72a2f9a6903e584dfc8968d7 /src/zenstore/blockstore.cpp
parentfaster oplog entries with referenceset (#488) (diff)
downloadzen-339668ac935f781c06225d2d685642e27348772b.tar.xz
zen-339668ac935f781c06225d2d685642e27348772b.zip
add EMode to WorkerTheadPool to avoid thread starvation (#492)
- Improvement: Add a new mode to worker thread pools to avoid starvation of workers which could cause long stalls due to other work begin queued up. UE-305498
Diffstat (limited to 'src/zenstore/blockstore.cpp')
-rw-r--r--src/zenstore/blockstore.cpp212
1 files changed, 112 insertions, 100 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 7b56c64bd..c50f2bb13 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -1686,80 +1686,82 @@ TEST_CASE("blockstore.iterate.chunks")
Latch WorkLatch(1);
Store.IterateChunks(Locations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool {
WorkLatch.AddCount(1);
- WorkerPool.ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- bool Continue = Store.IterateBlock(
- Locations,
- ChunkIndexes,
- [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool {
- switch (ChunkIndex)
- {
- case 0:
- CHECK(Data);
- CHECK(Size == FirstChunkData.size());
- CHECK(std::string((const char*)Data, Size) == FirstChunkData);
- break;
- case 1:
- CHECK(Data);
- CHECK(Size == SecondChunkData.size());
- CHECK(std::string((const char*)Data, Size) == SecondChunkData);
- break;
- case 2:
- CHECK(false);
- break;
- case 3:
- CHECK(!Data);
- break;
- case 4:
- CHECK(!Data);
- break;
- case 5:
- CHECK(!Data);
- break;
- default:
- CHECK(false);
- break;
- }
- return true;
- },
- [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool {
- switch (ChunkIndex)
- {
- case 0:
- case 1:
- CHECK(false);
- break;
- case 2:
- {
- CHECK(Size == VeryLargeChunk.size());
- char* Buffer = new char[Size];
- size_t HashOffset = 0;
- File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) {
- memcpy(&Buffer[HashOffset], Data, Size);
- HashOffset += Size;
- });
- CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0);
- delete[] Buffer;
- }
- break;
- case 3:
- CHECK(false);
- break;
- case 4:
- CHECK(false);
- break;
- case 5:
- CHECK(false);
- break;
- default:
- CHECK(false);
- break;
- }
- return true;
- },
- 0);
- CHECK(Continue);
- });
+ WorkerPool.ScheduleWork(
+ [&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ bool Continue = Store.IterateBlock(
+ Locations,
+ ChunkIndexes,
+ [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool {
+ switch (ChunkIndex)
+ {
+ case 0:
+ CHECK(Data);
+ CHECK(Size == FirstChunkData.size());
+ CHECK(std::string((const char*)Data, Size) == FirstChunkData);
+ break;
+ case 1:
+ CHECK(Data);
+ CHECK(Size == SecondChunkData.size());
+ CHECK(std::string((const char*)Data, Size) == SecondChunkData);
+ break;
+ case 2:
+ CHECK(false);
+ break;
+ case 3:
+ CHECK(!Data);
+ break;
+ case 4:
+ CHECK(!Data);
+ break;
+ case 5:
+ CHECK(!Data);
+ break;
+ default:
+ CHECK(false);
+ break;
+ }
+ return true;
+ },
+ [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool {
+ switch (ChunkIndex)
+ {
+ case 0:
+ case 1:
+ CHECK(false);
+ break;
+ case 2:
+ {
+ CHECK(Size == VeryLargeChunk.size());
+ char* Buffer = new char[Size];
+ size_t HashOffset = 0;
+ File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) {
+ memcpy(&Buffer[HashOffset], Data, Size);
+ HashOffset += Size;
+ });
+ CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0);
+ delete[] Buffer;
+ }
+ break;
+ case 3:
+ CHECK(false);
+ break;
+ case 4:
+ CHECK(false);
+ break;
+ case 5:
+ CHECK(false);
+ break;
+ default:
+ CHECK(false);
+ break;
+ }
+ return true;
+ },
+ 0);
+ CHECK(Continue);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
return true;
});
WorkLatch.CountDown();
@@ -1796,11 +1798,15 @@ TEST_CASE("blockstore.thread.read.write")
std::atomic<size_t> WorkCompleted = 0;
for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
{
- WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() {
- IoBuffer& Chunk = Chunks[ChunkIndex];
- Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations[ChunkIndex] = L; });
- WorkCompleted.fetch_add(1);
- });
+ WorkerPool.ScheduleWork(
+ [&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() {
+ IoBuffer& Chunk = Chunks[ChunkIndex];
+ Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) {
+ ChunkLocations[ChunkIndex] = L;
+ });
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
while (WorkCompleted < Chunks.size())
{
@@ -1810,13 +1816,15 @@ TEST_CASE("blockstore.thread.read.write")
WorkCompleted = 0;
for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
{
- WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
- IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]);
- CHECK(VerifyChunk);
- IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
- CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
- WorkCompleted.fetch_add(1);
- });
+ WorkerPool.ScheduleWork(
+ [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
+ IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]);
+ CHECK(VerifyChunk);
+ IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
+ CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
while (WorkCompleted < Chunks.size())
{
@@ -1828,20 +1836,24 @@ TEST_CASE("blockstore.thread.read.write")
WorkCompleted = 0;
for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
{
- WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() {
- IoBuffer& Chunk = Chunks[ChunkIndex];
- Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) {
- SecondChunkLocations[ChunkIndex] = L;
- });
- WorkCompleted.fetch_add(1);
- });
- WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
- IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]);
- CHECK(VerifyChunk);
- IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
- CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
- WorkCompleted.fetch_add(1);
- });
+ WorkerPool.ScheduleWork(
+ [&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() {
+ IoBuffer& Chunk = Chunks[ChunkIndex];
+ Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) {
+ SecondChunkLocations[ChunkIndex] = L;
+ });
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
+ WorkerPool.ScheduleWork(
+ [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
+ IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]);
+ CHECK(VerifyChunk);
+ IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
+ CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
while (WorkCompleted < Chunks.size() * 2)
{