diff options
| author | Dan Engelbrecht <[email protected]> | 2022-04-12 22:33:35 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-04-12 22:33:35 +0200 |
| commit | 1cc2c8b9547e5244134299707ade3eb5afbf6c55 (patch) | |
| tree | de0e4bca6d4bcfc89c671a95a899768131f6c0a2 /zenserver/cache/structuredcachestore.cpp | |
| parent | remove unneeded lock in threaded test (diff) | |
| download | zen-1cc2c8b9547e5244134299707ade3eb5afbf6c55.tar.xz zen-1cc2c8b9547e5244134299707ade3eb5afbf6c55.zip | |
Wait for work to complete rather than being picked up
Diffstat (limited to 'zenserver/cache/structuredcachestore.cpp')
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 49 |
1 files changed, 29 insertions, 20 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 2746dc673..3ba4e6b05 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -3127,8 +3127,7 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { - - while(true) + while (true) { IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); IoHash Hash = HashBuffer(Chunk); @@ -3136,10 +3135,10 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) { continue; } - Chunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk}; + Chunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk}; break; } - while(true) + while (true) { IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); IoHash Hash = HashBuffer(Chunk); @@ -3147,23 +3146,27 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) { continue; } - Chunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk}; + Chunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk}; break; } } CreateDirectories(TempDir.Path()); - WorkerThreadPool ThreadPool(4); - CasGc Gc; - ZenCacheStore Zcs(Gc, TempDir.Path()); + WorkerThreadPool ThreadPool(4); + CasGc Gc; + ZenCacheStore Zcs(Gc, TempDir.Path()); { + std::atomic<size_t> WorkCompleted = 0; for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Zcs, &Chunk]() { Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}); }); + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { + Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}); + WorkCompleted.fetch_add(1); + }); } - while (ThreadPool.PendingWork() > 0) + while (WorkCompleted < Chunks.size()) { Sleep(1); } @@ -3172,10 +3175,11 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) const uint64_t TotalSize = Zcs.StorageSize().DiskSize; CHECK_EQ(kChunkSize * Chunks.size(), TotalSize); - { + { + std::atomic<size_t> WorkCompleted = 0; for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Zcs, &Chunk]() { + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { std::string Bucket = Chunk.second.Bucket; IoHash ChunkHash = Chunk.first; ZenCacheValue CacheValue; @@ -3183,9 +3187,10 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue)); IoHash Hash = IoHash::HashBuffer(CacheValue.Value); CHECK(ChunkHash == Hash); + WorkCompleted.fetch_add(1); }); } - while (ThreadPool.PendingWork() > 0) + while (WorkCompleted < Chunks.size()) { Sleep(1); } @@ -3213,24 +3218,26 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) } } - std::atomic_uint32_t AddedChunkCount; - + std::atomic<size_t> WorkCompleted = 0; + std::atomic_uint32_t AddedChunkCount = 0; for (const auto& Chunk : NewChunks) { - ThreadPool.ScheduleWork([&Zcs, Chunk, &AddedChunkCount]() { + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() { Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}); AddedChunkCount.fetch_add(1); + WorkCompleted.fetch_add(1); }); } for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Zcs, Chunk]() { + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() { ZenCacheValue CacheValue; if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) { CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); } + WorkCompleted.fetch_add(1); }); } while (AddedChunkCount.load() < NewChunks.size()) @@ -3277,7 +3284,7 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); } - while (ThreadPool.PendingWork() > 0) + while (WorkCompleted < NewChunks.size() + Chunks.size()) { Sleep(1); } @@ -3326,15 +3333,17 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) } } { + std::atomic<size_t> WorkCompleted = 0; for (const auto& Chunk : GcChunkHashes) { - ThreadPool.ScheduleWork([&Zcs, Chunk]() { + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() { ZenCacheValue CacheValue; CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue)); CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); + WorkCompleted.fetch_add(1); }); } - while (ThreadPool.PendingWork() > 0) + while (WorkCompleted < GcChunkHashes.size()) { Sleep(1); } |