diff options
| author | Dan Engelbrecht <[email protected]> | 2022-04-12 20:24:11 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-04-12 20:24:11 +0200 |
| commit | f08f699382fe2117abbc87738dbe23586588f5d2 (patch) | |
| tree | 3a7826231931df259f5feda355d92dcc850628b1 /zenstore/compactcas.cpp | |
| parent | safer check for added size i threaded test (diff) | |
| download | zen-f08f699382fe2117abbc87738dbe23586588f5d2.tar.xz zen-f08f699382fe2117abbc87738dbe23586588f5d2.zip | |
wait until work is completed, not just picked up
Diffstat (limited to 'zenstore/compactcas.cpp')
| -rw-r--r-- | zenstore/compactcas.cpp | 33 |
1 files changed, 21 insertions, 12 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 2c096d85e..920ed965f 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -2272,9 +2272,9 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true)) CreateDirectories(CasConfig.RootDirectory); - const uint64_t kChunkSize = 1048; - const int32_t kChunkCount = 4096; - uint64_t ExpectedSize = 0; + const uint64_t kChunkSize = 1048; + const int32_t kChunkCount = 4096; + uint64_t ExpectedSize = 0; std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> Chunks; Chunks.reserve(kChunkCount); @@ -2295,6 +2295,7 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true)) } } + std::atomic<size_t> WorkCompleted = 0; WorkerThreadPool ThreadPool(4); CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); @@ -2304,31 +2305,34 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true)) { const IoHash& Hash = Chunk.first; const IoBuffer& Buffer = Chunk.second; - ThreadPool.ScheduleWork([&Cas, Buffer, Hash]() { + ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Buffer, Hash]() { CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash); ZEN_ASSERT(InsertResult.New); + WorkCompleted.fetch_add(1); }); } - while (ThreadPool.PendingWork() > 0) + while (WorkCompleted < Chunks.size()) { Sleep(1); } } + WorkCompleted = 0; const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(ExpectedSize, TotalSize); { for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Cas, &Chunk]() { + ThreadPool.ScheduleWork([&Cas, &WorkCompleted, &Chunk]() { IoHash ChunkHash = Chunk.first; IoBuffer Buffer = Cas.FindChunk(ChunkHash); IoHash Hash = IoHash::HashBuffer(Buffer); CHECK(ChunkHash == Hash); + WorkCompleted.fetch_add(1); }); } - while (ThreadPool.PendingWork() > 0) + while (WorkCompleted < Chunks.size()) { Sleep(1); } @@ -2341,6 +2345,7 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true)) GcChunkHashes.insert(Chunk.first); } { + WorkCompleted = 0; std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> NewChunks; NewChunks.reserve(kChunkCount); @@ -2355,20 +2360,22 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true)) for (const auto& Chunk : NewChunks) { - ThreadPool.ScheduleWork([&Cas, Chunk, &AddedChunkCount]() { + ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() { Cas.InsertChunk(Chunk.second, Chunk.first); AddedChunkCount.fetch_add(1); + WorkCompleted.fetch_add(1); }); } for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Cas, Chunk]() { + ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk]() { IoHash ChunkHash = Chunk.first; IoBuffer Buffer = Cas.FindChunk(ChunkHash); if (Buffer) { CHECK(ChunkHash == IoHash::HashBuffer(Buffer)); } + WorkCompleted.fetch_add(1); }); } @@ -2410,7 +2417,7 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true)) Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); } - while (ThreadPool.PendingWork() > 0) + while (WorkCompleted < NewChunks.size() + Chunks.size()) { Sleep(1); } @@ -2451,14 +2458,16 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true)) Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); } { + WorkCompleted = 0; for (const IoHash& ChunkHash : GcChunkHashes) { - ThreadPool.ScheduleWork([&Cas, ChunkHash]() { + ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() { CHECK(Cas.HaveChunk(ChunkHash)); CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); + WorkCompleted.fetch_add(1); }); } - while (ThreadPool.PendingWork() > 0) + while (WorkCompleted < GcChunkHashes.size()) { Sleep(1); } |