aboutsummaryrefslogtreecommitdiff
path: root/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-04-12 20:24:11 +0200
committerDan Engelbrecht <[email protected]>2022-04-12 20:24:11 +0200
commitf08f699382fe2117abbc87738dbe23586588f5d2 (patch)
tree3a7826231931df259f5feda355d92dcc850628b1 /zenstore/compactcas.cpp
parentsafer check for added size i threaded test (diff)
downloadzen-f08f699382fe2117abbc87738dbe23586588f5d2.tar.xz
zen-f08f699382fe2117abbc87738dbe23586588f5d2.zip
wait until work is completed, not just picked up
Diffstat (limited to 'zenstore/compactcas.cpp')
-rw-r--r--zenstore/compactcas.cpp33
1 files changed, 21 insertions, 12 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 2c096d85e..920ed965f 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -2272,9 +2272,9 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
CreateDirectories(CasConfig.RootDirectory);
- const uint64_t kChunkSize = 1048;
- const int32_t kChunkCount = 4096;
- uint64_t ExpectedSize = 0;
+ const uint64_t kChunkSize = 1048;
+ const int32_t kChunkCount = 4096;
+ uint64_t ExpectedSize = 0;
std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> Chunks;
Chunks.reserve(kChunkCount);
@@ -2295,6 +2295,7 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
}
}
+ std::atomic<size_t> WorkCompleted = 0;
WorkerThreadPool ThreadPool(4);
CasGc Gc;
CasContainerStrategy Cas(CasConfig, Gc);
@@ -2304,31 +2305,34 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
{
const IoHash& Hash = Chunk.first;
const IoBuffer& Buffer = Chunk.second;
- ThreadPool.ScheduleWork([&Cas, Buffer, Hash]() {
+ ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Buffer, Hash]() {
CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash);
ZEN_ASSERT(InsertResult.New);
+ WorkCompleted.fetch_add(1);
});
}
- while (ThreadPool.PendingWork() > 0)
+ while (WorkCompleted < Chunks.size())
{
Sleep(1);
}
}
+ WorkCompleted = 0;
const uint64_t TotalSize = Cas.StorageSize().DiskSize;
CHECK_EQ(ExpectedSize, TotalSize);
{
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Cas, &Chunk]() {
+ ThreadPool.ScheduleWork([&Cas, &WorkCompleted, &Chunk]() {
IoHash ChunkHash = Chunk.first;
IoBuffer Buffer = Cas.FindChunk(ChunkHash);
IoHash Hash = IoHash::HashBuffer(Buffer);
CHECK(ChunkHash == Hash);
+ WorkCompleted.fetch_add(1);
});
}
- while (ThreadPool.PendingWork() > 0)
+ while (WorkCompleted < Chunks.size())
{
Sleep(1);
}
@@ -2341,6 +2345,7 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
GcChunkHashes.insert(Chunk.first);
}
{
+ WorkCompleted = 0;
std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> NewChunks;
NewChunks.reserve(kChunkCount);
@@ -2355,20 +2360,22 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
for (const auto& Chunk : NewChunks)
{
- ThreadPool.ScheduleWork([&Cas, Chunk, &AddedChunkCount]() {
+ ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() {
Cas.InsertChunk(Chunk.second, Chunk.first);
AddedChunkCount.fetch_add(1);
+ WorkCompleted.fetch_add(1);
});
}
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Cas, Chunk]() {
+ ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk]() {
IoHash ChunkHash = Chunk.first;
IoBuffer Buffer = Cas.FindChunk(ChunkHash);
if (Buffer)
{
CHECK(ChunkHash == IoHash::HashBuffer(Buffer));
}
+ WorkCompleted.fetch_add(1);
});
}
@@ -2410,7 +2417,7 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
}
- while (ThreadPool.PendingWork() > 0)
+ while (WorkCompleted < NewChunks.size() + Chunks.size())
{
Sleep(1);
}
@@ -2451,14 +2458,16 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
}
{
+ WorkCompleted = 0;
for (const IoHash& ChunkHash : GcChunkHashes)
{
- ThreadPool.ScheduleWork([&Cas, ChunkHash]() {
+ ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() {
CHECK(Cas.HaveChunk(ChunkHash));
CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash)));
+ WorkCompleted.fetch_add(1);
});
}
- while (ThreadPool.PendingWork() > 0)
+ while (WorkCompleted < GcChunkHashes.size())
{
Sleep(1);
}