aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-09-10 16:38:33 +0200
committerGitHub Enterprise <[email protected]>2025-09-10 16:38:33 +0200
commit339668ac935f781c06225d2d685642e27348772b (patch)
treea5552d166eef9b5c72a2f9a6903e584dfc8968d7 /src/zenstore/compactcas.cpp
parentfaster oplog entries with referenceset (#488) (diff)
downloadzen-339668ac935f781c06225d2d685642e27348772b.tar.xz
zen-339668ac935f781c06225d2d685642e27348772b.zip
add EMode to WorkerTheadPool to avoid thread starvation (#492)
- Improvement: Add a new mode to worker thread pools to avoid starvation of workers which could cause long stalls due to other work begin queued up. UE-305498
Diffstat (limited to 'src/zenstore/compactcas.cpp')
-rw-r--r--src/zenstore/compactcas.cpp218
1 files changed, 116 insertions, 102 deletions
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index b00abb2cb..b7bfbd188 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -412,7 +412,7 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas
std::atomic<bool> AbortFlag;
{
std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog);
try
{
const bool Continue = m_BlockStore.IterateChunks(
@@ -1491,11 +1491,13 @@ TEST_CASE("compactcas.threadedinsert")
{
const IoHash& Hash = Chunk.first;
const IoBuffer& Buffer = Chunk.second;
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Buffer, Hash]() {
- CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash);
- ZEN_ASSERT(InsertResult.New);
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Cas, &WorkCompleted, Buffer, Hash]() {
+ CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash);
+ ZEN_ASSERT(InsertResult.New);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
while (WorkCompleted < Chunks.size())
{
@@ -1511,13 +1513,15 @@ TEST_CASE("compactcas.threadedinsert")
{
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, &Chunk]() {
- IoHash ChunkHash = Chunk.first;
- IoBuffer Buffer = Cas.FindChunk(ChunkHash);
- IoHash Hash = IoHash::HashBuffer(Buffer);
- CHECK(ChunkHash == Hash);
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Cas, &WorkCompleted, &Chunk]() {
+ IoHash ChunkHash = Chunk.first;
+ IoBuffer Buffer = Cas.FindChunk(ChunkHash);
+ IoHash Hash = IoHash::HashBuffer(Buffer);
+ CHECK(ChunkHash == Hash);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
while (WorkCompleted < Chunks.size())
{
@@ -1548,23 +1552,27 @@ TEST_CASE("compactcas.threadedinsert")
for (const auto& Chunk : NewChunks)
{
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() {
- Cas.InsertChunk(Chunk.second, Chunk.first);
- AddedChunkCount.fetch_add(1);
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() {
+ Cas.InsertChunk(Chunk.second, Chunk.first);
+ AddedChunkCount.fetch_add(1);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk]() {
- IoHash ChunkHash = Chunk.first;
- IoBuffer Buffer = Cas.FindChunk(ChunkHash);
- if (Buffer)
- {
- CHECK(ChunkHash == IoHash::HashBuffer(Buffer));
- }
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Cas, &WorkCompleted, Chunk]() {
+ IoHash ChunkHash = Chunk.first;
+ IoBuffer Buffer = Cas.FindChunk(ChunkHash);
+ if (Buffer)
+ {
+ CHECK(ChunkHash == IoHash::HashBuffer(Buffer));
+ }
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
tsl::robin_set<IoHash, IoHash::Hasher> ChunksToDelete;
@@ -1649,11 +1657,13 @@ TEST_CASE("compactcas.threadedinsert")
WorkCompleted = 0;
for (const IoHash& ChunkHash : GcChunkHashes)
{
- ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() {
- CHECK(Cas.HaveChunk(ChunkHash));
- CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash)));
- WorkCompleted.fetch_add(1);
- });
+ ThreadPool.ScheduleWork(
+ [&Cas, &WorkCompleted, ChunkHash]() {
+ CHECK(Cas.HaveChunk(ChunkHash));
+ CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash)));
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
while (WorkCompleted < GcChunkHashes.size())
{
@@ -1711,7 +1721,8 @@ TEST_CASE("compactcas.restart")
RwLock::ExclusiveLockScope __(InsertLock);
Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end());
}
- });
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
Offset += BatchCount;
}
WorkLatch.CountDown();
@@ -1964,7 +1975,8 @@ TEST_CASE("compactcas.iteratechunks")
RwLock::ExclusiveLockScope __(InsertLock);
Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end());
}
- });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
Offset += BatchCount;
}
WorkLatch.CountDown();
@@ -1998,82 +2010,84 @@ TEST_CASE("compactcas.iteratechunks")
for (size_t I = 0; I < 2; I++)
{
WorkLatch.AddCount(1);
- ThreadPool.ScheduleWork([&Cas, &Hashes, &BatchWorkerPool, &WorkLatch, I]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- std::vector<IoHash> PartialHashes;
- PartialHashes.reserve(Hashes.size() / 4);
- for (size_t Index = 0; Index < Hashes.size(); Index++)
- {
- size_t TestIndex = Index + I;
- if ((TestIndex % 7 == 1) || (TestIndex % 13 == 1) || (TestIndex % 17 == 1))
+ ThreadPool.ScheduleWork(
+ [&Cas, &Hashes, &BatchWorkerPool, &WorkLatch, I]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ std::vector<IoHash> PartialHashes;
+ PartialHashes.reserve(Hashes.size() / 4);
+ for (size_t Index = 0; Index < Hashes.size(); Index++)
{
- PartialHashes.push_back(Hashes[Index]);
+ size_t TestIndex = Index + I;
+ if ((TestIndex % 7 == 1) || (TestIndex % 13 == 1) || (TestIndex % 17 == 1))
+ {
+ PartialHashes.push_back(Hashes[Index]);
+ }
}
- }
- std::reverse(PartialHashes.begin(), PartialHashes.end());
+ std::reverse(PartialHashes.begin(), PartialHashes.end());
- std::vector<IoHash> NoFoundHashes;
- std::vector<size_t> NoFindIndexes;
+ std::vector<IoHash> NoFoundHashes;
+ std::vector<size_t> NoFindIndexes;
- NoFoundHashes.reserve(9);
- for (size_t J = 0; J < 9; J++)
- {
- std::string Data = fmt::format("oh no, we don't exist {}", J + 1);
- NoFoundHashes.push_back(IoHash::HashBuffer(Data.data(), Data.length()));
- }
-
- NoFindIndexes.reserve(9);
-
- // Sprinkle in chunks that are not found!
- auto It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0, NoFoundHashes[0]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
- It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0 + 1, NoFoundHashes[1]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
- It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1, NoFoundHashes[2]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
- It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1 + 1, NoFoundHashes[3]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
- It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 2, NoFoundHashes[4]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
- It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3, NoFoundHashes[5]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
- It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3 + 1, NoFoundHashes[6]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
- It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 4, NoFoundHashes[7]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
- It = PartialHashes.insert(PartialHashes.end(), NoFoundHashes[8]);
- NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
-
- std::vector<std::atomic<bool>> FoundFlags(PartialHashes.size() + NoFoundHashes.size());
- std::vector<std::atomic<uint32_t>> FetchedCounts(PartialHashes.size() + NoFoundHashes.size());
-
- CHECK(Cas.IterateChunks(
- PartialHashes,
- [&PartialHashes, &FoundFlags, &FetchedCounts, &NoFindIndexes](size_t Index, const IoBuffer& Payload) {
- CHECK_EQ(NoFindIndexes.end(), std::find(NoFindIndexes.begin(), NoFindIndexes.end(), Index));
- uint32_t PreviousCount = FetchedCounts[Index].fetch_add(1);
- CHECK(PreviousCount == 0);
- FoundFlags[Index] = !!Payload;
- const IoHash& Hash = PartialHashes[Index];
- CHECK(Hash == IoHash::HashBuffer(Payload));
- return true;
- },
- &BatchWorkerPool,
- 2048u));
-
- for (size_t FoundIndex = 0; FoundIndex < PartialHashes.size(); FoundIndex++)
- {
- CHECK(FetchedCounts[FoundIndex].load() <= 1);
- if (std::find(NoFindIndexes.begin(), NoFindIndexes.end(), FoundIndex) == NoFindIndexes.end())
+ NoFoundHashes.reserve(9);
+ for (size_t J = 0; J < 9; J++)
{
- CHECK(FoundFlags[FoundIndex]);
+ std::string Data = fmt::format("oh no, we don't exist {}", J + 1);
+ NoFoundHashes.push_back(IoHash::HashBuffer(Data.data(), Data.length()));
}
- else
+
+ NoFindIndexes.reserve(9);
+
+ // Sprinkle in chunks that are not found!
+ auto It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0, NoFoundHashes[0]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 0 + 1, NoFoundHashes[1]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1, NoFoundHashes[2]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 1 + 1, NoFoundHashes[3]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 2, NoFoundHashes[4]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3, NoFoundHashes[5]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 3 + 1, NoFoundHashes[6]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.begin() + (PartialHashes.size() / 4) * 4, NoFoundHashes[7]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+ It = PartialHashes.insert(PartialHashes.end(), NoFoundHashes[8]);
+ NoFindIndexes.push_back(std::distance(PartialHashes.begin(), It));
+
+ std::vector<std::atomic<bool>> FoundFlags(PartialHashes.size() + NoFoundHashes.size());
+ std::vector<std::atomic<uint32_t>> FetchedCounts(PartialHashes.size() + NoFoundHashes.size());
+
+ CHECK(Cas.IterateChunks(
+ PartialHashes,
+ [&PartialHashes, &FoundFlags, &FetchedCounts, &NoFindIndexes](size_t Index, const IoBuffer& Payload) {
+ CHECK_EQ(NoFindIndexes.end(), std::find(NoFindIndexes.begin(), NoFindIndexes.end(), Index));
+ uint32_t PreviousCount = FetchedCounts[Index].fetch_add(1);
+ CHECK(PreviousCount == 0);
+ FoundFlags[Index] = !!Payload;
+ const IoHash& Hash = PartialHashes[Index];
+ CHECK(Hash == IoHash::HashBuffer(Payload));
+ return true;
+ },
+ &BatchWorkerPool,
+ 2048u));
+
+ for (size_t FoundIndex = 0; FoundIndex < PartialHashes.size(); FoundIndex++)
{
- CHECK(!FoundFlags[FoundIndex]);
+ CHECK(FetchedCounts[FoundIndex].load() <= 1);
+ if (std::find(NoFindIndexes.begin(), NoFindIndexes.end(), FoundIndex) == NoFindIndexes.end())
+ {
+ CHECK(FoundFlags[FoundIndex]);
+ }
+ else
+ {
+ CHECK(!FoundFlags[FoundIndex]);
+ }
}
- }
- });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
WorkLatch.CountDown();
WorkLatch.Wait();