aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-09-10 16:38:33 +0200
committerGitHub Enterprise <[email protected]>2025-09-10 16:38:33 +0200
commit339668ac935f781c06225d2d685642e27348772b (patch)
treea5552d166eef9b5c72a2f9a6903e584dfc8968d7 /src/zenstore
parentfaster oplog entries with referenceset (#488) (diff)
downloadzen-339668ac935f781c06225d2d685642e27348772b.tar.xz
zen-339668ac935f781c06225d2d685642e27348772b.zip
add EMode to WorkerTheadPool to avoid thread starvation (#492)
- Improvement: Add a new mode to worker thread pools to avoid starvation of workers which could cause long stalls due to other work begin queued up. UE-305498
Diffstat (limited to 'src/zenstore')
-rw-r--r--src/zenstore/blockstore.cpp212
-rw-r--r--src/zenstore/buildstore/buildstore.cpp2
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp7
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp104
-rw-r--r--src/zenstore/cas.cpp19
-rw-r--r--src/zenstore/compactcas.cpp218
-rw-r--r--src/zenstore/filecas.cpp2
-rw-r--r--src/zenstore/gc.cpp188
-rw-r--r--src/zenstore/workspaces.cpp24
9 files changed, 416 insertions, 360 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 7b56c64bd..c50f2bb13 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -1686,80 +1686,82 @@ TEST_CASE("blockstore.iterate.chunks")
Latch WorkLatch(1);
Store.IterateChunks(Locations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool {
WorkLatch.AddCount(1);
- WorkerPool.ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- bool Continue = Store.IterateBlock(
- Locations,
- ChunkIndexes,
- [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool {
- switch (ChunkIndex)
- {
- case 0:
- CHECK(Data);
- CHECK(Size == FirstChunkData.size());
- CHECK(std::string((const char*)Data, Size) == FirstChunkData);
- break;
- case 1:
- CHECK(Data);
- CHECK(Size == SecondChunkData.size());
- CHECK(std::string((const char*)Data, Size) == SecondChunkData);
- break;
- case 2:
- CHECK(false);
- break;
- case 3:
- CHECK(!Data);
- break;
- case 4:
- CHECK(!Data);
- break;
- case 5:
- CHECK(!Data);
- break;
- default:
- CHECK(false);
- break;
- }
- return true;
- },
- [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool {
- switch (ChunkIndex)
- {
- case 0:
- case 1:
- CHECK(false);
- break;
- case 2:
- {
- CHECK(Size == VeryLargeChunk.size());
- char* Buffer = new char[Size];
- size_t HashOffset = 0;
- File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) {
- memcpy(&Buffer[HashOffset], Data, Size);
- HashOffset += Size;
- });
- CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0);
- delete[] Buffer;
- }
- break;
- case 3:
- CHECK(false);
- break;
- case 4:
- CHECK(false);
- break;
- case 5:
- CHECK(false);
- break;
- default:
- CHECK(false);
- break;
- }
- return true;
- },
- 0);
- CHECK(Continue);
- });
+ WorkerPool.ScheduleWork(
+ [&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ bool Continue = Store.IterateBlock(
+ Locations,
+ ChunkIndexes,
+ [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> bool {
+ switch (ChunkIndex)
+ {
+ case 0:
+ CHECK(Data);
+ CHECK(Size == FirstChunkData.size());
+ CHECK(std::string((const char*)Data, Size) == FirstChunkData);
+ break;
+ case 1:
+ CHECK(Data);
+ CHECK(Size == SecondChunkData.size());
+ CHECK(std::string((const char*)Data, Size) == SecondChunkData);
+ break;
+ case 2:
+ CHECK(false);
+ break;
+ case 3:
+ CHECK(!Data);
+ break;
+ case 4:
+ CHECK(!Data);
+ break;
+ case 5:
+ CHECK(!Data);
+ break;
+ default:
+ CHECK(false);
+ break;
+ }
+ return true;
+ },
+ [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> bool {
+ switch (ChunkIndex)
+ {
+ case 0:
+ case 1:
+ CHECK(false);
+ break;
+ case 2:
+ {
+ CHECK(Size == VeryLargeChunk.size());
+ char* Buffer = new char[Size];
+ size_t HashOffset = 0;
+ File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) {
+ memcpy(&Buffer[HashOffset], Data, Size);
+ HashOffset += Size;
+ });
+ CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0);
+ delete[] Buffer;
+ }
+ break;
+ case 3:
+ CHECK(false);
+ break;
+ case 4:
+ CHECK(false);
+ break;
+ case 5:
+ CHECK(false);
+ break;
+ default:
+ CHECK(false);
+ break;
+ }
+ return true;
+ },
+ 0);
+ CHECK(Continue);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
return true;
});
WorkLatch.CountDown();
@@ -1796,11 +1798,15 @@ TEST_CASE("blockstore.thread.read.write")
std::atomic<size_t> WorkCompleted = 0;
for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
{
- WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() {
- IoBuffer& Chunk = Chunks[ChunkIndex];
- Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations[ChunkIndex] = L; });
- WorkCompleted.fetch_add(1);
- });
+ WorkerPool.ScheduleWork(
+ [&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() {
+ IoBuffer& Chunk = Chunks[ChunkIndex];
+ Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) {
+ ChunkLocations[ChunkIndex] = L;
+ });
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
while (WorkCompleted < Chunks.size())
{
@@ -1810,13 +1816,15 @@ TEST_CASE("blockstore.thread.read.write")
WorkCompleted = 0;
for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
{
- WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
- IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]);
- CHECK(VerifyChunk);
- IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
- CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
- WorkCompleted.fetch_add(1);
- });
+ WorkerPool.ScheduleWork(
+ [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
+ IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]);
+ CHECK(VerifyChunk);
+ IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
+ CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
while (WorkCompleted < Chunks.size())
{
@@ -1828,20 +1836,24 @@ TEST_CASE("blockstore.thread.read.write")
WorkCompleted = 0;
for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
{
- WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() {
- IoBuffer& Chunk = Chunks[ChunkIndex];
- Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) {
- SecondChunkLocations[ChunkIndex] = L;
- });
- WorkCompleted.fetch_add(1);
- });
- WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
- IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]);
- CHECK(VerifyChunk);
- IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
- CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
- WorkCompleted.fetch_add(1);
- });
+ WorkerPool.ScheduleWork(
+ [&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() {
+ IoBuffer& Chunk = Chunks[ChunkIndex];
+ Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) {
+ SecondChunkLocations[ChunkIndex] = L;
+ });
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
+ WorkerPool.ScheduleWork(
+ [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
+ IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]);
+ CHECK(VerifyChunk);
+ IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
+ CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
while (WorkCompleted < Chunks.size() * 2)
{
diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp
index d65c2bf06..4539746ba 100644
--- a/src/zenstore/buildstore/buildstore.cpp
+++ b/src/zenstore/buildstore/buildstore.cpp
@@ -376,7 +376,7 @@ BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoB
{
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
for (size_t Index = 0; Index < Metadatas.size(); Index++)
{
Work.ScheduleWork(
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index cacbbd966..fd52cdab5 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -4215,7 +4215,7 @@ ZenCacheDiskLayer::DiscoverBuckets()
WorkerThreadPool& Pool = GetLargeWorkerPool(EWorkloadType::Burst);
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
try
{
for (auto& BucketPath : FoundBucketDirectories)
@@ -4387,7 +4387,7 @@ ZenCacheDiskLayer::Flush()
WorkerThreadPool& Pool = GetMediumWorkerPool(EWorkloadType::Burst);
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
try
{
for (auto& Bucket : Buckets)
@@ -4434,7 +4434,8 @@ ZenCacheDiskLayer::Scrub(ScrubContext& Ctx)
{
# if 1
Results.push_back(Ctx.ThreadPool().EnqueueTask(
- std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }}));
+ std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }},
+ WorkerThreadPool::EMode::EnableBacklog));
# else
CacheBucket& Bucket = *Kv.second;
Bucket.ScrubStorage(Ctx);
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index 1f2d6c37f..3f27e6d21 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -1574,10 +1574,12 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
std::atomic<size_t> WorkCompleted = 0;
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
- Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Zcs, &WorkCompleted, &Chunk]() {
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
while (WorkCompleted < Chunks.size())
{
@@ -1612,16 +1614,18 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
std::atomic<size_t> WorkCompleted = 0;
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
- std::string Bucket = Chunk.second.Bucket;
- IoHash ChunkHash = Chunk.first;
- ZenCacheValue CacheValue;
+ ThreadPool.ScheduleWork(
+ [&Zcs, &WorkCompleted, &Chunk]() {
+ std::string Bucket = Chunk.second.Bucket;
+ IoHash ChunkHash = Chunk.first;
+ ZenCacheValue CacheValue;
- CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue));
- IoHash Hash = IoHash::HashBuffer(CacheValue.Value);
- CHECK(ChunkHash == Hash);
- WorkCompleted.fetch_add(1);
- });
+ CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue));
+ IoHash Hash = IoHash::HashBuffer(CacheValue.Value);
+ CHECK(ChunkHash == Hash);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
while (WorkCompleted < Chunks.size())
{
@@ -1655,23 +1659,27 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
std::atomic_uint32_t AddedChunkCount = 0;
for (const auto& Chunk : NewChunks)
{
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() {
- Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
- AddedChunkCount.fetch_add(1);
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() {
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
+ AddedChunkCount.fetch_add(1);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
- ZenCacheValue CacheValue;
- if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
- {
- CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
- }
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Zcs, &WorkCompleted, Chunk]() {
+ ZenCacheValue CacheValue;
+ if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
+ {
+ CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
+ }
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
while (AddedChunkCount.load() < NewChunks.size())
{
@@ -1710,12 +1718,14 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
std::atomic<size_t> WorkCompleted = 0;
for (const auto& Chunk : GcChunkHashes)
{
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
- ZenCacheValue CacheValue;
- CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue));
- CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Zcs, &WorkCompleted, Chunk]() {
+ ZenCacheValue CacheValue;
+ CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue));
+ CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
while (WorkCompleted < GcChunkHashes.size())
{
@@ -1848,11 +1858,13 @@ TEST_CASE("cachestore.drop.bucket")
CHECK(Value1.Value);
std::atomic_bool WorkComplete = false;
- Workers.ScheduleWork([&]() {
- zen::Sleep(100);
- Value1.Value = IoBuffer{};
- WorkComplete = true;
- });
+ Workers.ScheduleWork(
+ [&]() {
+ zen::Sleep(100);
+ Value1.Value = IoBuffer{};
+ WorkComplete = true;
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
// On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket
// Our DropBucket execution blocks any incoming request from completing until we are done with the drop
CHECK(Zcs.DropBucket(Namespace, Bucket));
@@ -1931,14 +1943,16 @@ TEST_CASE("cachestore.drop.namespace")
CHECK(Value4.Value);
std::atomic_bool WorkComplete = false;
- Workers.ScheduleWork([&]() {
- zen::Sleep(100);
- Value1.Value = IoBuffer{};
- Value2.Value = IoBuffer{};
- Value3.Value = IoBuffer{};
- Value4.Value = IoBuffer{};
- WorkComplete = true;
- });
+ Workers.ScheduleWork(
+ [&]() {
+ zen::Sleep(100);
+ Value1.Value = IoBuffer{};
+ Value2.Value = IoBuffer{};
+ Value3.Value = IoBuffer{};
+ Value4.Value = IoBuffer{};
+ WorkComplete = true;
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
// On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket
// Our DropBucket execution blocks any incoming request from completing until we are done with the drop
CHECK(Zcs.DropNamespace(Namespace1));
diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp
index 6b89beb3d..49d24c21e 100644
--- a/src/zenstore/cas.cpp
+++ b/src/zenstore/cas.cpp
@@ -132,13 +132,18 @@ CasImpl::Initialize(const CidStoreConfiguration& InConfig)
WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Burst);
std::vector<std::future<void>> Work;
Work.emplace_back(
- WorkerPool.EnqueueTask(std::packaged_task<void()>{[&]() { m_LargeStrategy.Initialize(m_Config.RootDirectory, IsNewStore); }}));
- Work.emplace_back(WorkerPool.EnqueueTask(std::packaged_task<void()>{[&]() {
- m_TinyStrategy.Initialize(m_Config.RootDirectory, "tobs", 1u << 28, 16, IsNewStore); // 256 Mb per block
- }}));
- Work.emplace_back(WorkerPool.EnqueueTask(std::packaged_task<void()>{[&]() {
- m_SmallStrategy.Initialize(m_Config.RootDirectory, "sobs", 1u << 30, 4096, IsNewStore); // 1 Gb per block
- }}));
+ WorkerPool.EnqueueTask(std::packaged_task<void()>{[&]() { m_LargeStrategy.Initialize(m_Config.RootDirectory, IsNewStore); }},
+ WorkerThreadPool::EMode::DisableBacklog));
+ Work.emplace_back(WorkerPool.EnqueueTask(
+ std::packaged_task<void()>{[&]() {
+ m_TinyStrategy.Initialize(m_Config.RootDirectory, "tobs", 1u << 28, 16, IsNewStore); // 256 Mb per block
+ }},
+ WorkerThreadPool::EMode::DisableBacklog));
+ Work.emplace_back(WorkerPool.EnqueueTask(
+ std::packaged_task<void()>{[&]() {
+ m_SmallStrategy.Initialize(m_Config.RootDirectory, "sobs", 1u << 30, 4096, IsNewStore); // 1 Gb per block
+ }},
+ WorkerThreadPool::EMode::DisableBacklog));
for (std::future<void>& Result : Work)
{
if (Result.valid())
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index b00abb2cb..b7bfbd188 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -412,7 +412,7 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas
std::atomic<bool> AbortFlag;
{
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
try
{
const bool Continue = m_BlockStore.IterateChunks(
@@ -1491,11 +1491,13 @@ TEST_CASE("compactcas.threadedinsert")
{
const IoHash& Hash = Chunk.first;
const IoBuffer& Buffer = Chunk.second;
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Buffer, Hash]() {
- CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash);
- ZEN_ASSERT(InsertResult.New);
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Cas, &WorkCompleted, Buffer, Hash]() {
+ CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash);
+ ZEN_ASSERT(InsertResult.New);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
while (WorkCompleted < Chunks.size())
{
@@ -1511,13 +1513,15 @@ TEST_CASE("compactcas.threadedinsert")
{
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, &Chunk]() {
- IoHash ChunkHash = Chunk.first;
- IoBuffer Buffer = Cas.FindChunk(ChunkHash);
- IoHash Hash = IoHash::HashBuffer(Buffer);
- CHECK(ChunkHash == Hash);
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Cas, &WorkCompleted, &Chunk]() {
+ IoHash ChunkHash = Chunk.first;
+ IoBuffer Buffer = Cas.FindChunk(ChunkHash);
+ IoHash Hash = IoHash::HashBuffer(Buffer);
+ CHECK(ChunkHash == Hash);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
while (WorkCompleted < Chunks.size())
{
@@ -1548,23 +1552,27 @@ TEST_CASE("compactcas.threadedinsert")
for (const auto& Chunk : NewChunks)
{
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() {
- Cas.InsertChunk(Chunk.second, Chunk.first);
- AddedChunkCount.fetch_add(1);
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() {
+ Cas.InsertChunk(Chunk.second, Chunk.first);
+ AddedChunkCount.fetch_add(1);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk]() {
- IoHash ChunkHash = Chunk.first;
- IoBuffer Buffer = Cas.FindChunk(ChunkHash);
- if (Buffer)
- {
- CHECK(ChunkHash == IoHash::HashBuffer(Buffer));
- }
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Cas, &WorkCompleted, Chunk]() {
+ IoHash ChunkHash = Chunk.first;
+ IoBuffer Buffer = Cas.FindChunk(ChunkHash);
+ if (Buffer)
+ {
+ CHECK(ChunkHash == IoHash::HashBuffer(Buffer));
+ }
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
tsl::robin_set<IoHash, IoHash::Hasher> ChunksToDelete;
@@ -1649,11 +1657,13 @@ TEST_CASE("compactcas.threadedinsert")
WorkCompleted = 0;
for (const IoHash& ChunkHash : GcChunkHashes)
{
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() {
- CHECK(Cas.HaveChunk(ChunkHash));
- CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash)));
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Cas, &WorkCompleted, ChunkHash]() {
+ CHECK(Cas.HaveChunk(ChunkHash));
+ CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash)));
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
while (WorkCompleted < GcChunkHashes.size())
{
@@ -1711,7 +1721,8 @@ TEST_CASE("compactcas.restart")
RwLock::ExclusiveLockScope __(InsertLock);
Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end());
}
- });
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
Offset += BatchCount;
}
WorkLatch.CountDown();
@@ -1964,7 +1975,8 @@ TEST_CASE("compactcas.iteratechunks")
RwLock::ExclusiveLockScope __(InsertLock);
Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end());
}
- });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
Offset += BatchCount;
}
WorkLatch.CountDown();
@@ -1998,82 +2010,84 @@ TEST_CASE("compactcas.iteratechunks")
for (size_t I = 0; I < 2; I++)
{
WorkLatch.AddCount(1);
- ThreadPool.ScheduleWork([&Cas, &Hashes, &BatchWorkerPool, &WorkLatch, I]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- std::vector<IoHash> PartialHashes;
- PartialHashes.reserve(Hashes.size() / 4);
- for (size_t Index = 0; Index < Hashes.size(); Index++)
- {
- size_t TestIndex = Index + I;
- if ((TestIndex % 7 == 1) || (TestIndex % 13 == 1) || (TestIndex % 17 == 1))
+ ThreadPool.ScheduleWork(
+ [&Cas, &Hashes, &BatchWorkerPool, &WorkLatch, I]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ std::vector<IoHash> PartialHashes;
+ PartialHashes.reserve(Hashes.size() / 4);
+ for (size_t Index = 0; Index < Hashes.size(); Index++)
{
- PartialHashes.push_back(Hashes[Index]);
+ size_t TestIndex = Index + I;
+ if ((TestIndex % 7 == 1) || (TestIndex % 13 == 1) || (TestIndex % 17 == 1))
+ {
+ PartialHashes.push_back(Hashes[Index]);
+ }
}
- }
- std::reverse(PartialHashes.begin(), PartialHashes.end());
+ std::reverse(PartialHashes.begin(), PartialHashes.end());
- std::vector<IoHash> NoFoundHashes;
- std::vector<size_t> NoFindIndexes;
+ std::vector<IoHash> NoFoundHashes;
+ std::vector<size_t> NoFindIndexes;
- NoFoundHashes.reserve(9);
- for (size_t J = 0; J < 9; J++)
- {
- std::string Data = fmt::format("oh no, we don't exist {}", J + 1);
- NoFoundHashes.push_back(IoHash::HashBuffer(Data.data(), Data.length()));
- }
-
- NoFindIndexes.reserve(9);
-
- // Sprinkle in chunks that are not found!
- auto It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0, NoFoundHashes[0]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
- It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0 + 1, NoFoundHashes[1]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
- It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1, NoFoundHashes[2]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
- It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1 + 1, NoFoundHashes[3]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
- It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 2, NoFoundHashes[4]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
- It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3, NoFoundHashes[5]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
- It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3 + 1, NoFoundHashes[6]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
- It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 4, NoFoundHashes[7]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
- It = PartialHashes.insert(PartialHashes.end(), NoFoundHashes[8]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
-
- std::vector<std::atomic<bool>> FoundFlags(PartialHashes.size() + NoFoundHashes.size());
- std::vector<std::atomic<uint32_t>> FetchedCounts(PartialHashes.size() + NoFoundHashes.size());
-
- CHECK(Cas.IterateChunks(
- PartialHashes,
- [&PartialHashes, &FoundFlags, &FetchedCounts, &NoFindIndexes](size_t Index, const IoBuffer& Payload) {
- CHECK_EQ(NoFindIndexes.end(), std::find(NoFindIndexes.begin(), NoFindIndexes.end(), Index));
- uint32_t PreviousCount = FetchedCounts[Index].fetch_add(1);
- CHECK(PreviousCount == 0);
- FoundFlags[Index] = !!Payload;
- const IoHash& Hash = PartialHashes[Index];
- CHECK(Hash == IoHash::HashBuffer(Payload));
- return true;
- },
- &BatchWorkerPool,
- 2048u));
-
- for (size_t FoundIndex = 0; FoundIndex < PartialHashes.size(); FoundIndex++)
- {
- CHECK(FetchedCounts[FoundIndex].load() <= 1);
- if (std::find(NoFindIndexes.begin(), NoFindIndexes.end(), FoundIndex) == NoFindIndexes.end())
+ NoFoundHashes.reserve(9);
+ for (size_t J = 0; J < 9; J++)
{
- CHECK(FoundFlags[FoundIndex]);
+ std::string Data = fmt::format("oh no, we don't exist {}", J + 1);
+ NoFoundHashes.push_back(IoHash::HashBuffer(Data.data(), Data.length()));
}
- else
+
+ NoFindIndexes.reserve(9);
+
+ // Sprinkle in chunks that are not found!
+ auto It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0, NoFoundHashes[0]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0 + 1, NoFoundHashes[1]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1, NoFoundHashes[2]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1 + 1, NoFoundHashes[3]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 2, NoFoundHashes[4]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3, NoFoundHashes[5]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3 + 1, NoFoundHashes[6]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 4, NoFoundHashes[7]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.end(), NoFoundHashes[8]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+
+ std::vector<std::atomic<bool>> FoundFlags(PartialHashes.size() + NoFoundHashes.size());
+ std::vector<std::atomic<uint32_t>> FetchedCounts(PartialHashes.size() + NoFoundHashes.size());
+
+ CHECK(Cas.IterateChunks(
+ PartialHashes,
+ [&PartialHashes, &FoundFlags, &FetchedCounts, &NoFindIndexes](size_t Index, const IoBuffer& Payload) {
+ CHECK_EQ(NoFindIndexes.end(), std::find(NoFindIndexes.begin(), NoFindIndexes.end(), Index));
+ uint32_t PreviousCount = FetchedCounts[Index].fetch_add(1);
+ CHECK(PreviousCount == 0);
+ FoundFlags[Index] = !!Payload;
+ const IoHash& Hash = PartialHashes[Index];
+ CHECK(Hash == IoHash::HashBuffer(Payload));
+ return true;
+ },
+ &BatchWorkerPool,
+ 2048u));
+
+ for (size_t FoundIndex = 0; FoundIndex < PartialHashes.size(); FoundIndex++)
{
- CHECK(!FoundFlags[FoundIndex]);
+ CHECK(FetchedCounts[FoundIndex].load() <= 1);
+ if (std::find(NoFindIndexes.begin(), NoFindIndexes.end(), FoundIndex) == NoFindIndexes.end())
+ {
+ CHECK(FoundFlags[FoundIndex]);
+ }
+ else
+ {
+ CHECK(!FoundFlags[FoundIndex]);
+ }
}
- }
- });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
WorkLatch.CountDown();
WorkLatch.Wait();
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 68644be2d..365a933c1 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -665,7 +665,7 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
try
{
for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++)
diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp
index 5023695b2..b08e6a3ca 100644
--- a/src/zenstore/gc.cpp
+++ b/src/zenstore/gc.cpp
@@ -869,7 +869,8 @@ GcManager::CollectGarbage(const GcSettings& Settings)
Ex.what());
SetCancelGC(true);
}
- });
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
WorkLeft.CountDown();
WorkLeft.Wait();
@@ -981,7 +982,8 @@ GcManager::CollectGarbage(const GcSettings& Settings)
SetCancelGC(true);
}
}
- });
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
WorkLeft.CountDown();
WorkLeft.Wait();
@@ -1021,77 +1023,80 @@ GcManager::CollectGarbage(const GcSettings& Settings)
GcReferencer* Referencer = m_GcReferencers[Index];
std::pair<std::string, GcReferencerStats>* ReferemcerStats = &Result.ReferencerStats[Index];
WorkLeft.AddCount(1);
- ParallelWorkThreadPool.ScheduleWork([this,
- &Ctx,
- &WorkLeft,
- Referencer,
- Index,
- Result = &Result,
- ReferemcerStats,
- &ReferenceValidatorsLock,
- &ReferenceValidators]() {
- ZEN_MEMSCOPE(GetGcTag());
+ ParallelWorkThreadPool.ScheduleWork(
+ [this,
+ &Ctx,
+ &WorkLeft,
+ Referencer,
+ Index,
+ Result = &Result,
+ ReferemcerStats,
+ &ReferenceValidatorsLock,
+ &ReferenceValidators]() {
+ ZEN_MEMSCOPE(GetGcTag());
- auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
- std::vector<GcReferenceValidator*> Validators;
- auto __ = MakeGuard([&Validators]() {
- while (!Validators.empty())
- {
- delete Validators.back();
- Validators.pop_back();
- }
- });
- try
- {
+ auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
+ std::vector<GcReferenceValidator*> Validators;
+ auto __ = MakeGuard([&Validators]() {
+ while (!Validators.empty())
+ {
+ delete Validators.back();
+ Validators.pop_back();
+ }
+ });
+ try
{
- SCOPED_TIMER(ReferemcerStats->second.CreateReferenceValidatorsMS =
- std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
- Validators = Referencer->CreateReferenceValidators(Ctx);
+ {
+ SCOPED_TIMER(ReferemcerStats->second.CreateReferenceValidatorsMS =
+ std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
+ Validators = Referencer->CreateReferenceValidators(Ctx);
+ }
+ if (!Validators.empty())
+ {
+ RwLock::ExclusiveLockScope __(ReferenceValidatorsLock);
+ for (auto& ReferenceValidator : Validators)
+ {
+ size_t ReferencesStatsIndex = Result->ReferenceValidatorStats.size();
+ Result->ReferenceValidatorStats.push_back({ReferenceValidator->GetGcName(Ctx), {}});
+ ReferenceValidators.insert_or_assign(
+ std::unique_ptr<GcReferenceValidator>(ReferenceValidator),
+ ReferencesStatsIndex);
+ ReferenceValidator = nullptr;
+ }
+ }
}
- if (!Validators.empty())
+ catch (const std::system_error& Ex)
{
- RwLock::ExclusiveLockScope __(ReferenceValidatorsLock);
- for (auto& ReferenceValidator : Validators)
+ if (IsOOD(Ex) || IsOOM(Ex))
{
- size_t ReferencesStatsIndex = Result->ReferenceValidatorStats.size();
- Result->ReferenceValidatorStats.push_back({ReferenceValidator->GetGcName(Ctx), {}});
- ReferenceValidators.insert_or_assign(std::unique_ptr<GcReferenceValidator>(ReferenceValidator),
- ReferencesStatsIndex);
- ReferenceValidator = nullptr;
+ ZEN_WARN("GCV2: Failed creating reference validators for {}. Reason: '{}'",
+ Referencer->GetGcName(Ctx),
+ Ex.what());
+ }
+ else
+ {
+ ZEN_ERROR("GCV2: Failed creating reference validators for {}. Reason: '{}'",
+ Referencer->GetGcName(Ctx),
+ Ex.what());
}
+ SetCancelGC(true);
}
- }
- catch (const std::system_error& Ex)
- {
- if (IsOOD(Ex) || IsOOM(Ex))
+ catch (const std::bad_alloc& Ex)
{
- ZEN_WARN("GCV2: Failed creating reference validators for {}. Reason: '{}'",
- Referencer->GetGcName(Ctx),
- Ex.what());
+ ZEN_ERROR("GCV2: Failed creating reference validators for {}. Reason: '{}'",
+ Referencer->GetGcName(Ctx),
+ Ex.what());
+ SetCancelGC(true);
}
- else
+ catch (const std::exception& Ex)
{
ZEN_ERROR("GCV2: Failed creating reference validators for {}. Reason: '{}'",
Referencer->GetGcName(Ctx),
Ex.what());
+ SetCancelGC(true);
}
- SetCancelGC(true);
- }
- catch (const std::bad_alloc& Ex)
- {
- ZEN_ERROR("GCV2: Failed creating reference validators for {}. Reason: '{}'",
- Referencer->GetGcName(Ctx),
- Ex.what());
- SetCancelGC(true);
- }
- catch (const std::exception& Ex)
- {
- ZEN_ERROR("GCV2: Failed creating reference validators for {}. Reason: '{}'",
- Referencer->GetGcName(Ctx),
- Ex.what());
- SetCancelGC(true);
- }
- });
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
WorkLeft.CountDown();
WorkLeft.Wait();
@@ -1221,47 +1226,49 @@ GcManager::CollectGarbage(const GcSettings& Settings)
size_t Index = It.second;
std::pair<std::string, GcReferencerStats>* Stats = &Result.ReferencerStats[Index];
WorkLeft.AddCount(1);
- LockedPhaseThreadPool.ScheduleWork([this, &Ctx, Checker, Index, Stats, &WorkLeft]() {
- ZEN_MEMSCOPE(GetGcTag());
+ LockedPhaseThreadPool.ScheduleWork(
+ [this, &Ctx, Checker, Index, Stats, &WorkLeft]() {
+ ZEN_MEMSCOPE(GetGcTag());
- auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
- try
- {
- SCOPED_TIMER(Stats->second.UpdateLockedStateMS =
- std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
- Checker->UpdateLockedState(Ctx);
- }
- catch (const std::system_error& Ex)
- {
- if (IsOOD(Ex) || IsOOM(Ex))
+ auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
+ try
+ {
+ SCOPED_TIMER(Stats->second.UpdateLockedStateMS =
+ std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
+ Checker->UpdateLockedState(Ctx);
+ }
+ catch (const std::system_error& Ex)
+ {
+ if (IsOOD(Ex) || IsOOM(Ex))
+ {
+ ZEN_WARN("GCV2: Failed Updating locked state for {}. Reason: '{}'",
+ Checker->GetGcName(Ctx),
+ Ex.what());
+ }
+ else
+ {
+ ZEN_ERROR("GCV2: Failed Updating locked state for {}. Reason: '{}'",
+ Checker->GetGcName(Ctx),
+ Ex.what());
+ }
+ SetCancelGC(true);
+ }
+ catch (const std::bad_alloc& Ex)
{
ZEN_WARN("GCV2: Failed Updating locked state for {}. Reason: '{}'",
Checker->GetGcName(Ctx),
Ex.what());
+ SetCancelGC(true);
}
- else
+ catch (const std::exception& Ex)
{
ZEN_ERROR("GCV2: Failed Updating locked state for {}. Reason: '{}'",
Checker->GetGcName(Ctx),
Ex.what());
+ SetCancelGC(true);
}
- SetCancelGC(true);
- }
- catch (const std::bad_alloc& Ex)
- {
- ZEN_WARN("GCV2: Failed Updating locked state for {}. Reason: '{}'",
- Checker->GetGcName(Ctx),
- Ex.what());
- SetCancelGC(true);
- }
- catch (const std::exception& Ex)
- {
- ZEN_ERROR("GCV2: Failed Updating locked state for {}. Reason: '{}'",
- Checker->GetGcName(Ctx),
- Ex.what());
- SetCancelGC(true);
- }
- });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
WorkLeft.CountDown();
WorkLeft.Wait();
@@ -1373,7 +1380,8 @@ GcManager::CollectGarbage(const GcSettings& Settings)
Ex.what());
SetCancelGC(true);
}
- });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
WorkLeft.CountDown();
WorkLeft.Wait();
diff --git a/src/zenstore/workspaces.cpp b/src/zenstore/workspaces.cpp
index 0ca2adab2..4e7bd79a3 100644
--- a/src/zenstore/workspaces.cpp
+++ b/src/zenstore/workspaces.cpp
@@ -622,17 +622,19 @@ Workspaces::GetWorkspaceShareChunks(const Oid& WorkspaceId,
for (size_t Index = 0; Index < ChunkRequests.size(); Index++)
{
WorkLatch.AddCount(1);
- WorkerPool.ScheduleWork([&, Index]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- try
- {
- Chunks[Index] = GetOne(RootPath, *WorkspaceAndShare.second, ChunkRequests[Index]);
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Exception while fetching chunks, chunk {}: {}", ChunkRequests[Index].ChunkId, Ex.what());
- }
- });
+ WorkerPool.ScheduleWork(
+ [&, Index]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ try
+ {
+ Chunks[Index] = GetOne(RootPath, *WorkspaceAndShare.second, ChunkRequests[Index]);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Exception while fetching chunks, chunk {}: {}", ChunkRequests[Index].ChunkId, Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
WorkLatch.CountDown();
WorkLatch.Wait();