aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-10-06 10:18:49 +0200
committerGitHub Enterprise <[email protected]>2025-10-06 10:18:49 +0200
commitd5b2a263e6d4c893c6ec3fb99b36110630da6529 (patch)
tree35bfa4c23c838fdbebd07100db2ebf2404977b3f /src/zenstore/compactcas.cpp
parentfix link error with operator new (#553) (diff)
downloadzen-d5b2a263e6d4c893c6ec3fb99b36110630da6529.tar.xz
zen-d5b2a263e6d4c893c6ec3fb99b36110630da6529.zip
speed up tests (#555)
* faster FileSystemTraversal test * faster jobqueue test * faster NamedEvent test * faster cache tests * faster basic http tests * faster blockstore test * faster cache store tests * faster compactcas tests * more responsive zenserver launch * tweak worker pool sizes in tests
Diffstat (limited to 'src/zenstore/compactcas.cpp')
-rw-r--r--src/zenstore/compactcas.cpp145
1 files changed, 81 insertions, 64 deletions
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 9792f6d0b..14ef2a15d 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -1491,7 +1491,7 @@ TEST_CASE("compactcas.threadedinsert")
ScopedTemporaryDirectory TempDir;
const uint64_t kChunkSize = 1048;
- const int32_t kChunkCount = 4096;
+ const int32_t kChunkCount = 2048;
uint64_t ExpectedSize = 0;
tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher> Chunks;
@@ -1513,40 +1513,47 @@ TEST_CASE("compactcas.threadedinsert")
}
}
- std::atomic<size_t> WorkCompleted = 0;
WorkerThreadPool ThreadPool(4);
GcManager Gc;
CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "test", 32768, 16, true);
{
- for (const auto& Chunk : Chunks)
+ std::atomic<size_t> WorkCompleted = 0;
+ Latch L(1);
+ Cas.Initialize(TempDir.Path(), "test", 32768, 16, true);
{
- const IoHash& Hash = Chunk.first;
- const IoBuffer& Buffer = Chunk.second;
- ThreadPool.ScheduleWork(
- [&Cas, &WorkCompleted, Buffer, Hash]() {
- CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash);
- ZEN_ASSERT(InsertResult.New);
- WorkCompleted.fetch_add(1);
- },
- WorkerThreadPool::EMode::DisableBacklog);
- }
- while (WorkCompleted < Chunks.size())
- {
- Sleep(1);
+ for (const auto& Chunk : Chunks)
+ {
+ const IoHash& Hash = Chunk.first;
+ const IoBuffer& Buffer = Chunk.second;
+ L.AddCount(1);
+ ThreadPool.ScheduleWork(
+ [&Cas, &WorkCompleted, Buffer, Hash, &L]() {
+ auto _ = MakeGuard([&L]() { L.CountDown(); });
+ CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash);
+ ZEN_ASSERT(InsertResult.New);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
+ }
+ L.CountDown();
+ L.Wait();
+ CHECK(WorkCompleted == Chunks.size());
}
}
- WorkCompleted = 0;
- const uint64_t TotalSize = Cas.StorageSize().DiskSize;
- CHECK_LE(ExpectedSize, TotalSize);
- CHECK_GE(ExpectedSize + 32768, TotalSize);
-
{
+ std::atomic<size_t> WorkCompleted = 0;
+ Latch L(1);
+ const uint64_t TotalSize = Cas.StorageSize().DiskSize;
+ CHECK_LE(ExpectedSize, TotalSize);
+ CHECK_GE(ExpectedSize + 32768, TotalSize);
+
for (const auto& Chunk : Chunks)
{
+ L.AddCount(1);
ThreadPool.ScheduleWork(
- [&Cas, &WorkCompleted, &Chunk]() {
+ [&Cas, &WorkCompleted, &Chunk, &L]() {
+ auto _ = MakeGuard([&L]() { L.CountDown(); });
IoHash ChunkHash = Chunk.first;
IoBuffer Buffer = Cas.FindChunk(ChunkHash);
IoHash Hash = IoHash::HashBuffer(Buffer);
@@ -1555,10 +1562,9 @@ TEST_CASE("compactcas.threadedinsert")
},
WorkerThreadPool::EMode::DisableBacklog);
}
- while (WorkCompleted < Chunks.size())
- {
- Sleep(1);
- }
+ L.CountDown();
+ L.Wait();
+ CHECK(WorkCompleted == Chunks.size());
}
tsl::robin_set<IoHash, IoHash::Hasher> GcChunkHashes;
@@ -1568,7 +1574,8 @@ TEST_CASE("compactcas.threadedinsert")
GcChunkHashes.insert(Chunk.first);
}
{
- WorkCompleted = 0;
+ std::atomic<size_t> WorkCompleted = 0;
+ Latch L(1);
tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher> NewChunks;
NewChunks.reserve(kChunkCount);
@@ -1584,8 +1591,10 @@ TEST_CASE("compactcas.threadedinsert")
for (const auto& Chunk : NewChunks)
{
+ L.AddCount(1);
ThreadPool.ScheduleWork(
- [&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() {
+ [&Cas, &WorkCompleted, Chunk, &AddedChunkCount, &L]() {
+ auto _ = MakeGuard([&L]() { L.CountDown(); });
Cas.InsertChunk(Chunk.second, Chunk.first);
AddedChunkCount.fetch_add(1);
WorkCompleted.fetch_add(1);
@@ -1594,8 +1603,10 @@ TEST_CASE("compactcas.threadedinsert")
}
for (const auto& Chunk : Chunks)
{
+ L.AddCount(1);
ThreadPool.ScheduleWork(
- [&Cas, &WorkCompleted, Chunk]() {
+ [&Cas, &WorkCompleted, Chunk, &L]() {
+ auto _ = MakeGuard([&L]() { L.CountDown(); });
IoHash ChunkHash = Chunk.first;
IoBuffer Buffer = Cas.FindChunk(ChunkHash);
if (Buffer)
@@ -1678,29 +1689,31 @@ TEST_CASE("compactcas.threadedinsert")
DoGC(Cas, ChunksToDelete, KeepHashes, GcChunkHashes);
}
- while (WorkCompleted < NewChunks.size() + Chunks.size())
- {
- Sleep(1);
- }
+ L.CountDown();
+ L.Wait();
+
+ CHECK(WorkCompleted == NewChunks.size() + Chunks.size());
DoGC(Cas, ChunksToDelete, KeepHashes, GcChunkHashes);
}
{
- WorkCompleted = 0;
+ std::atomic<size_t> WorkCompleted = 0;
+ Latch L(1);
for (const IoHash& ChunkHash : GcChunkHashes)
{
+ L.AddCount(1);
ThreadPool.ScheduleWork(
- [&Cas, &WorkCompleted, ChunkHash]() {
+ [&Cas, &WorkCompleted, ChunkHash, &L]() {
+ auto _ = MakeGuard([&L]() { L.CountDown(); });
CHECK(Cas.HaveChunk(ChunkHash));
CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash)));
WorkCompleted.fetch_add(1);
},
WorkerThreadPool::EMode::DisableBacklog);
}
- while (WorkCompleted < GcChunkHashes.size())
- {
- Sleep(1);
- }
+ L.CountDown();
+ L.Wait();
+ CHECK(WorkCompleted == GcChunkHashes.size());
}
}
}
@@ -1709,9 +1722,9 @@ TEST_CASE("compactcas.restart")
{
uint64_t ExpectedSize = 0;
- auto GenerateChunks = [&](CasContainerStrategy& Cas, size_t ChunkCount, uint64_t ChunkSize, std::vector<IoHash>& Hashes) {
- WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put");
+ WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put");
+ auto GenerateChunks = [&](CasContainerStrategy& Cas, size_t ChunkCount, uint64_t ChunkSize, std::vector<IoHash>& Hashes) {
Latch WorkLatch(1);
tsl::robin_set<IoHash, IoHash::Hasher> ChunkHashesLookup;
ChunkHashesLookup.reserve(ChunkCount);
@@ -1759,6 +1772,7 @@ TEST_CASE("compactcas.restart")
}
WorkLatch.CountDown();
WorkLatch.Wait();
+ CHECK(ChunkHashesLookup.size() == ChunkCount);
};
ScopedTemporaryDirectory TempDir;
@@ -1780,26 +1794,29 @@ TEST_CASE("compactcas.restart")
Hashes.reserve(kChunkCount);
auto ValidateChunks = [&](CasContainerStrategy& Cas, std::span<const IoHash> Hashes, bool ShouldExist) {
- for (const IoHash& Hash : Hashes)
- {
- if (ShouldExist)
- {
- CHECK(Cas.HaveChunk(Hash));
- IoBuffer Buffer = Cas.FindChunk(Hash);
- CHECK(Buffer);
- IoHash ValidateHash;
- uint64_t ValidateRawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), ValidateHash, ValidateRawSize);
- CHECK(Compressed);
- CHECK(ValidateHash == Hash);
- }
- else
- {
- CHECK(!Cas.HaveChunk(Hash));
- IoBuffer Buffer = Cas.FindChunk(Hash);
- CHECK(!Buffer);
- }
- }
+ std::vector<bool> Exists(Hashes.size(), false);
+ Cas.IterateChunks(
+ Hashes,
+ [&](size_t Index, const IoBuffer& Buffer) -> bool {
+ Exists[Index] = !!Buffer;
+ if (ShouldExist)
+ {
+ CHECK(Buffer);
+ IoHash ValidateHash;
+ uint64_t ValidateRawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), ValidateHash, ValidateRawSize);
+ CHECK(Compressed);
+ CHECK(ValidateHash == Hashes[Index]);
+ }
+ else
+ {
+ CHECK(!Buffer);
+ }
+ return true;
+ },
+ &ThreadPool,
+ 1u * 1248u);
+ CHECK_EQ(std::find(Exists.begin(), Exists.end(), !ShouldExist), Exists.end());
};
{
@@ -1944,7 +1961,7 @@ TEST_CASE("compactcas.iteratechunks")
const uint64_t kChunkSize = 1048 + 395;
const size_t kChunkCount = 63840;
- for (uint32_t N = 0; N < 4; N++)
+ for (uint32_t N = 0; N < 2; N++)
{
GcManager Gc;
CasContainerStrategy Cas(Gc);
@@ -2008,7 +2025,7 @@ TEST_CASE("compactcas.iteratechunks")
Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end());
}
},
- WorkerThreadPool::EMode::EnableBacklog);
+ WorkerThreadPool::EMode::DisableBacklog);
Offset += BatchCount;
}
WorkLatch.CountDown();