aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/structuredcachestore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-10-06 10:18:49 +0200
committerGitHub Enterprise <[email protected]>2025-10-06 10:18:49 +0200
commitd5b2a263e6d4c893c6ec3fb99b36110630da6529 (patch)
tree35bfa4c23c838fdbebd07100db2ebf2404977b3f /src/zenstore/cache/structuredcachestore.cpp
parentfix link error with operator new (#553) (diff)
downloadzen-d5b2a263e6d4c893c6ec3fb99b36110630da6529.tar.xz
zen-d5b2a263e6d4c893c6ec3fb99b36110630da6529.zip
speed up tests (#555)
* faster FileSystemTraversal test * faster jobqueue test * faster NamedEvent test * faster cache tests * faster basic http tests * faster blockstore test * faster cache store tests * faster compactcas tests * more responsive zenserver launch * tweak worker pool sizes in tests
Diffstat (limited to 'src/zenstore/cache/structuredcachestore.cpp')
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp88
1 files changed, 56 insertions, 32 deletions
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index fd54e6765..da6acbde4 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -1332,17 +1332,30 @@ namespace testutils {
std::pair<Oid, IoBuffer> CreateBinaryBlob(size_t Size) { return {Oid::NewOid(), CreateRandomBlob(Size)}; }
- std::vector<std::pair<Oid, CompressedBuffer>> CreateCompressedAttachment(CidStore& Store, const std::span<const size_t>& Sizes)
+ std::vector<std::pair<Oid, CompressedBuffer>> CreateCompressedAttachment(CidStore& Store,
+ WorkerThreadPool& ThreadPool,
+ const std::span<const size_t>& Sizes)
{
std::vector<std::pair<Oid, CompressedBuffer>> Result;
- Result.reserve(Sizes.size());
- for (size_t Size : Sizes)
+ Result.resize(Sizes.size());
+ Latch L(1);
+ for (size_t Index = 0; Index < Sizes.size(); Index++)
{
- auto Blob = CreateBinaryBlob(Size);
- CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Blob.second.Data(), Blob.second.Size()));
- CHECK(!Store.ContainsChunk(Compressed.DecodeRawHash()));
- Result.emplace_back(std::pair<Oid, CompressedBuffer>(Blob.first, Compressed));
+ L.AddCount(1);
+ ThreadPool.ScheduleWork(
+ [&, Index]() {
+ auto _ = MakeGuard([&L]() { L.CountDown(); });
+ size_t Size = Sizes[Index];
+ auto Blob = CreateBinaryBlob(Size);
+ CompressedBuffer Compressed =
+ CompressedBuffer::Compress(SharedBuffer::MakeView(Blob.second.Data(), Blob.second.Size()));
+ CHECK(!Store.ContainsChunk(Compressed.DecodeRawHash()));
+ Result[Index] = std::pair<Oid, CompressedBuffer>(Blob.first, Compressed);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
+ L.CountDown();
+ L.Wait();
return Result;
}
@@ -1528,7 +1541,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
ScopedTemporaryDirectory TempDir;
const uint64_t kChunkSize = 1048;
- const int32_t kChunkCount = 8192;
+ const int32_t kChunkCount = 4096;
struct Chunk
{
@@ -1569,26 +1582,28 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
CreateDirectories(TempDir.Path());
- WorkerThreadPool ThreadPool(4);
+ WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency(), 8u));
GcManager Gc;
auto JobQueue = MakeJobQueue(1, "testqueue");
ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path(), {});
{
+ Latch L(1);
std::atomic<size_t> WorkCompleted = 0;
for (const auto& Chunk : Chunks)
{
+ L.AddCount(1);
ThreadPool.ScheduleWork(
- [&Zcs, &WorkCompleted, &Chunk]() {
+ [&Zcs, &WorkCompleted, &Chunk, &L]() {
+ auto _ = MakeGuard([&L]() { L.CountDown(); });
Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
WorkCompleted.fetch_add(1);
},
WorkerThreadPool::EMode::EnableBacklog);
}
- while (WorkCompleted < Chunks.size())
- {
- Sleep(1);
- }
+ L.CountDown();
+ L.Wait();
+ CHECK(WorkCompleted == Chunks.size());
}
auto DoGC = [](GcManager& Gc, ZenCacheNamespace& Zcs, std::unordered_map<IoHash, std::string, IoHash::Hasher>& GcChunkHashes) {
@@ -1615,11 +1630,14 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
CHECK_LE(kChunkSize * Chunks.size(), TotalSize);
{
+ Latch L(1);
std::atomic<size_t> WorkCompleted = 0;
for (const auto& Chunk : Chunks)
{
+ L.AddCount(1);
ThreadPool.ScheduleWork(
- [&Zcs, &WorkCompleted, &Chunk]() {
+ [&Zcs, &WorkCompleted, &Chunk, &L]() {
+ auto _ = MakeGuard([&]() { L.CountDown(); });
std::string Bucket = Chunk.second.Bucket;
IoHash ChunkHash = Chunk.first;
ZenCacheValue CacheValue;
@@ -1631,10 +1649,9 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
},
WorkerThreadPool::EMode::EnableBacklog);
}
- while (WorkCompleted < Chunks.size())
- {
- Sleep(1);
- }
+ L.CountDown();
+ L.Wait();
+ CHECK(WorkCompleted == Chunks.size());
}
std::unordered_map<IoHash, std::string, IoHash::Hasher> GcChunkHashes;
GcChunkHashes.reserve(Chunks.size());
@@ -1672,10 +1689,13 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
WorkerThreadPool::EMode::EnableBacklog);
}
+ Latch L(1);
for (const auto& Chunk : Chunks)
{
+ L.AddCount(1);
ThreadPool.ScheduleWork(
- [&Zcs, &WorkCompleted, Chunk]() {
+ [&Zcs, &WorkCompleted, Chunk, &L]() {
+ auto _ = MakeGuard([&L]() { L.CountDown(); });
ZenCacheValue CacheValue;
if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
{
@@ -1699,10 +1719,9 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
DoGC(Gc, Zcs, GcChunkHashes);
}
- while (WorkCompleted < NewChunks.size() + Chunks.size())
- {
- Sleep(1);
- }
+ L.CountDown();
+ L.Wait();
+ CHECK(WorkCompleted == NewChunks.size() + Chunks.size());
{
// Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
@@ -1718,12 +1737,15 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
}
}
{
+ Latch L(1);
{
std::atomic<size_t> WorkCompleted = 0;
for (const auto& Chunk : GcChunkHashes)
{
+ L.AddCount(1);
ThreadPool.ScheduleWork(
- [&Zcs, &WorkCompleted, Chunk]() {
+ [&Zcs, &WorkCompleted, Chunk, &L]() {
+ auto _ = MakeGuard([&L]() { L.CountDown(); });
ZenCacheValue CacheValue;
CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue));
CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
@@ -1731,10 +1753,9 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
},
WorkerThreadPool::EMode::EnableBacklog);
}
- while (WorkCompleted < GcChunkHashes.size())
- {
- Sleep(1);
- }
+ L.CountDown();
+ L.Wait();
+ CHECK(WorkCompleted == GcChunkHashes.size());
}
}
}
@@ -2236,6 +2257,8 @@ TEST_CASE("cachestore.newgc.basics")
std::vector<IoHash> CacheRecords;
std::vector<IoHash> UnstructuredCacheValues;
+ WorkerThreadPool WorkerPool(Max(std::thread::hardware_concurrency() - 1u, 2u));
+
const auto TearDrinkerBucket = "teardrinker"sv;
{
GcManager Gc;
@@ -2246,11 +2269,12 @@ TEST_CASE("cachestore.newgc.basics")
// Create some basic data
{
// Structured record with attachments
- auto Attachments1 = CreateCompressedAttachment(CidStore, std::vector<size_t>{77, 1024 * 1024 * 2, 99, 1024 * 1024 * 2 + 87});
+ auto Attachments1 =
+ CreateCompressedAttachment(CidStore, WorkerPool, std::vector<size_t>{77, 1024 * 1024 * 2, 99, 1024 * 1024 * 2 + 87});
CacheRecords.emplace_back(CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments1));
// Structured record with reuse of attachments
- auto Attachments2 = CreateCompressedAttachment(CidStore, std::vector<size_t>{971});
+ auto Attachments2 = CreateCompressedAttachment(CidStore, WorkerPool, std::vector<size_t>{971});
Attachments2.push_back(Attachments1[0]);
Attachments2.push_back(Attachments1[1]);
CacheRecords.emplace_back(CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments2));
@@ -2664,7 +2688,7 @@ TEST_CASE("cachestore.newgc.basics")
CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
auto Attachments =
- CreateCompressedAttachment(CidStore, std::vector<size_t>{177, 1024 * 1024 * 2 + 31, 8999, 1024 * 1024 * 2 + 187});
+ CreateCompressedAttachment(CidStore, WorkerPool, std::vector<size_t>{177, 1024 * 1024 * 2 + 31, 8999, 1024 * 1024 * 2 + 187});
IoHash CacheRecord = CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments);
{
// Do get so it ends up in memcache