aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcachestore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-04-12 22:33:35 +0200
committerDan Engelbrecht <[email protected]>2022-04-12 22:33:35 +0200
commit1cc2c8b9547e5244134299707ade3eb5afbf6c55 (patch)
treede0e4bca6d4bcfc89c671a95a899768131f6c0a2 /zenserver/cache/structuredcachestore.cpp
parentremove unneeded lock in threaded test (diff)
downloadzen-1cc2c8b9547e5244134299707ade3eb5afbf6c55.tar.xz
zen-1cc2c8b9547e5244134299707ade3eb5afbf6c55.zip
Wait for work to complete rather than being picked up
Diffstat (limited to 'zenserver/cache/structuredcachestore.cpp')
-rw-r--r--zenserver/cache/structuredcachestore.cpp49
1 files changed, 29 insertions, 20 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 2746dc673..3ba4e6b05 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -3127,8 +3127,7 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
{
-
- while(true)
+ while (true)
{
IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
IoHash Hash = HashBuffer(Chunk);
@@ -3136,10 +3135,10 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
{
continue;
}
- Chunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk};
+ Chunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk};
break;
}
- while(true)
+ while (true)
{
IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
IoHash Hash = HashBuffer(Chunk);
@@ -3147,23 +3146,27 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
{
continue;
}
- Chunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk};
+ Chunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk};
break;
}
}
CreateDirectories(TempDir.Path());
- WorkerThreadPool ThreadPool(4);
- CasGc Gc;
- ZenCacheStore Zcs(Gc, TempDir.Path());
+ WorkerThreadPool ThreadPool(4);
+ CasGc Gc;
+ ZenCacheStore Zcs(Gc, TempDir.Path());
{
+ std::atomic<size_t> WorkCompleted = 0;
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Zcs, &Chunk]() { Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}); });
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer});
+ WorkCompleted.fetch_add(1);
+ });
}
- while (ThreadPool.PendingWork() > 0)
+ while (WorkCompleted < Chunks.size())
{
Sleep(1);
}
@@ -3172,10 +3175,11 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
const uint64_t TotalSize = Zcs.StorageSize().DiskSize;
CHECK_EQ(kChunkSize * Chunks.size(), TotalSize);
- {
+ {
+ std::atomic<size_t> WorkCompleted = 0;
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Zcs, &Chunk]() {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
std::string Bucket = Chunk.second.Bucket;
IoHash ChunkHash = Chunk.first;
ZenCacheValue CacheValue;
@@ -3183,9 +3187,10 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue));
IoHash Hash = IoHash::HashBuffer(CacheValue.Value);
CHECK(ChunkHash == Hash);
+ WorkCompleted.fetch_add(1);
});
}
- while (ThreadPool.PendingWork() > 0)
+ while (WorkCompleted < Chunks.size())
{
Sleep(1);
}
@@ -3213,24 +3218,26 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
}
}
- std::atomic_uint32_t AddedChunkCount;
-
+ std::atomic<size_t> WorkCompleted = 0;
+ std::atomic_uint32_t AddedChunkCount = 0;
for (const auto& Chunk : NewChunks)
{
- ThreadPool.ScheduleWork([&Zcs, Chunk, &AddedChunkCount]() {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() {
Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer});
AddedChunkCount.fetch_add(1);
+ WorkCompleted.fetch_add(1);
});
}
for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Zcs, Chunk]() {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
ZenCacheValue CacheValue;
if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
{
CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
}
+ WorkCompleted.fetch_add(1);
});
}
while (AddedChunkCount.load() < NewChunks.size())
@@ -3277,7 +3284,7 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
}
- while (ThreadPool.PendingWork() > 0)
+ while (WorkCompleted < NewChunks.size() + Chunks.size())
{
Sleep(1);
}
@@ -3326,15 +3333,17 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
}
}
{
+ std::atomic<size_t> WorkCompleted = 0;
for (const auto& Chunk : GcChunkHashes)
{
- ThreadPool.ScheduleWork([&Zcs, Chunk]() {
+ ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
ZenCacheValue CacheValue;
CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue));
CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
+ WorkCompleted.fetch_add(1);
});
}
- while (ThreadPool.PendingWork() > 0)
+ while (WorkCompleted < GcChunkHashes.size())
{
Sleep(1);
}