diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-06 10:18:49 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-06 10:18:49 +0200 |
| commit | d5b2a263e6d4c893c6ec3fb99b36110630da6529 (patch) | |
| tree | 35bfa4c23c838fdbebd07100db2ebf2404977b3f /src/zenstore/compactcas.cpp | |
| parent | fix link error with operator new (#553) (diff) | |
| download | zen-d5b2a263e6d4c893c6ec3fb99b36110630da6529.tar.xz zen-d5b2a263e6d4c893c6ec3fb99b36110630da6529.zip | |
speed up tests (#555)
* faster FileSystemTraversal test
* faster jobqueue test
* faster NamedEvent test
* faster cache tests
* faster basic http tests
* faster blockstore test
* faster cache store tests
* faster compactcas tests
* more responsive zenserver launch
* tweak worker pool sizes in tests
Diffstat (limited to 'src/zenstore/compactcas.cpp')
| -rw-r--r-- | src/zenstore/compactcas.cpp | 145 |
1 files changed, 81 insertions, 64 deletions
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 9792f6d0b..14ef2a15d 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -1491,7 +1491,7 @@ TEST_CASE("compactcas.threadedinsert") ScopedTemporaryDirectory TempDir; const uint64_t kChunkSize = 1048; - const int32_t kChunkCount = 4096; + const int32_t kChunkCount = 2048; uint64_t ExpectedSize = 0; tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher> Chunks; @@ -1513,40 +1513,47 @@ TEST_CASE("compactcas.threadedinsert") } } - std::atomic<size_t> WorkCompleted = 0; WorkerThreadPool ThreadPool(4); GcManager Gc; CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "test", 32768, 16, true); { - for (const auto& Chunk : Chunks) + std::atomic<size_t> WorkCompleted = 0; + Latch L(1); + Cas.Initialize(TempDir.Path(), "test", 32768, 16, true); { - const IoHash& Hash = Chunk.first; - const IoBuffer& Buffer = Chunk.second; - ThreadPool.ScheduleWork( - [&Cas, &WorkCompleted, Buffer, Hash]() { - CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash); - ZEN_ASSERT(InsertResult.New); - WorkCompleted.fetch_add(1); - }, - WorkerThreadPool::EMode::DisableBacklog); - } - while (WorkCompleted < Chunks.size()) - { - Sleep(1); + for (const auto& Chunk : Chunks) + { + const IoHash& Hash = Chunk.first; + const IoBuffer& Buffer = Chunk.second; + L.AddCount(1); + ThreadPool.ScheduleWork( + [&Cas, &WorkCompleted, Buffer, Hash, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); + CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash); + ZEN_ASSERT(InsertResult.New); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); + } + L.CountDown(); + L.Wait(); + CHECK(WorkCompleted == Chunks.size()); } } - WorkCompleted = 0; - const uint64_t TotalSize = Cas.StorageSize().DiskSize; - CHECK_LE(ExpectedSize, TotalSize); - CHECK_GE(ExpectedSize + 32768, TotalSize); - { + std::atomic<size_t> WorkCompleted = 0; + Latch L(1); + const uint64_t TotalSize = Cas.StorageSize().DiskSize; + CHECK_LE(ExpectedSize, TotalSize); + CHECK_GE(ExpectedSize + 32768, TotalSize); + for (const auto& Chunk : Chunks) { + L.AddCount(1); ThreadPool.ScheduleWork( - [&Cas, &WorkCompleted, &Chunk]() { + [&Cas, &WorkCompleted, &Chunk, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); IoHash ChunkHash = Chunk.first; IoBuffer Buffer = Cas.FindChunk(ChunkHash); IoHash Hash = IoHash::HashBuffer(Buffer); @@ -1555,10 +1562,9 @@ TEST_CASE("compactcas.threadedinsert") }, WorkerThreadPool::EMode::DisableBacklog); } - while (WorkCompleted < Chunks.size()) - { - Sleep(1); - } + L.CountDown(); + L.Wait(); + CHECK(WorkCompleted == Chunks.size()); } tsl::robin_set<IoHash, IoHash::Hasher> GcChunkHashes; @@ -1568,7 +1574,8 @@ TEST_CASE("compactcas.threadedinsert") GcChunkHashes.insert(Chunk.first); } { - WorkCompleted = 0; + std::atomic<size_t> WorkCompleted = 0; + Latch L(1); tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher> NewChunks; NewChunks.reserve(kChunkCount); @@ -1584,8 +1591,10 @@ TEST_CASE("compactcas.threadedinsert") for (const auto& Chunk : NewChunks) { + L.AddCount(1); ThreadPool.ScheduleWork( - [&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() { + [&Cas, &WorkCompleted, Chunk, &AddedChunkCount, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); Cas.InsertChunk(Chunk.second, Chunk.first); AddedChunkCount.fetch_add(1); WorkCompleted.fetch_add(1); @@ -1594,8 +1603,10 @@ TEST_CASE("compactcas.threadedinsert") } for (const auto& Chunk : Chunks) { + L.AddCount(1); ThreadPool.ScheduleWork( - [&Cas, &WorkCompleted, Chunk]() { + [&Cas, &WorkCompleted, Chunk, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); IoHash ChunkHash = Chunk.first; IoBuffer Buffer = Cas.FindChunk(ChunkHash); if (Buffer) @@ -1678,29 +1689,31 @@ TEST_CASE("compactcas.threadedinsert") DoGC(Cas, ChunksToDelete, KeepHashes, GcChunkHashes); } - while (WorkCompleted < NewChunks.size() + Chunks.size()) - { - Sleep(1); - } + L.CountDown(); + L.Wait(); + + CHECK(WorkCompleted == NewChunks.size() + Chunks.size()); DoGC(Cas, ChunksToDelete, KeepHashes, GcChunkHashes); } { - WorkCompleted = 0; + std::atomic<size_t> WorkCompleted = 0; + Latch L(1); for (const IoHash& ChunkHash : GcChunkHashes) { + L.AddCount(1); ThreadPool.ScheduleWork( - [&Cas, &WorkCompleted, ChunkHash]() { + [&Cas, &WorkCompleted, ChunkHash, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); CHECK(Cas.HaveChunk(ChunkHash)); CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); WorkCompleted.fetch_add(1); }, WorkerThreadPool::EMode::DisableBacklog); } - while (WorkCompleted < GcChunkHashes.size()) - { - Sleep(1); - } + L.CountDown(); + L.Wait(); + CHECK(WorkCompleted == GcChunkHashes.size()); } } } @@ -1709,9 +1722,9 @@ TEST_CASE("compactcas.restart") { uint64_t ExpectedSize = 0; - auto GenerateChunks = [&](CasContainerStrategy& Cas, size_t ChunkCount, uint64_t ChunkSize, std::vector<IoHash>& Hashes) { - WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put"); + WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put"); + auto GenerateChunks = [&](CasContainerStrategy& Cas, size_t ChunkCount, uint64_t ChunkSize, std::vector<IoHash>& Hashes) { Latch WorkLatch(1); tsl::robin_set<IoHash, IoHash::Hasher> ChunkHashesLookup; ChunkHashesLookup.reserve(ChunkCount); @@ -1759,6 +1772,7 @@ TEST_CASE("compactcas.restart") } WorkLatch.CountDown(); WorkLatch.Wait(); + CHECK(ChunkHashesLookup.size() == ChunkCount); }; ScopedTemporaryDirectory TempDir; @@ -1780,26 +1794,29 @@ TEST_CASE("compactcas.restart") Hashes.reserve(kChunkCount); auto ValidateChunks = [&](CasContainerStrategy& Cas, std::span<const IoHash> Hashes, bool ShouldExist) { - for (const IoHash& Hash : Hashes) - { - if (ShouldExist) - { - CHECK(Cas.HaveChunk(Hash)); - IoBuffer Buffer = Cas.FindChunk(Hash); - CHECK(Buffer); - IoHash ValidateHash; - uint64_t ValidateRawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), ValidateHash, ValidateRawSize); - CHECK(Compressed); - CHECK(ValidateHash == Hash); - } - else - { - CHECK(!Cas.HaveChunk(Hash)); - IoBuffer Buffer = Cas.FindChunk(Hash); - CHECK(!Buffer); - } - } + std::vector<bool> Exists(Hashes.size(), false); + Cas.IterateChunks( + Hashes, + [&](size_t Index, const IoBuffer& Buffer) -> bool { + Exists[Index] = !!Buffer; + if (ShouldExist) + { + CHECK(Buffer); + IoHash ValidateHash; + uint64_t ValidateRawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), ValidateHash, ValidateRawSize); + CHECK(Compressed); + CHECK(ValidateHash == Hashes[Index]); + } + else + { + CHECK(!Buffer); + } + return true; + }, + &ThreadPool, + 1u * 1248u); + CHECK_EQ(std::find(Exists.begin(), Exists.end(), !ShouldExist), Exists.end()); }; { @@ -1944,7 +1961,7 @@ TEST_CASE("compactcas.iteratechunks") const uint64_t kChunkSize = 1048 + 395; const size_t kChunkCount = 63840; - for (uint32_t N = 0; N < 4; N++) + for (uint32_t N = 0; N < 2; N++) { GcManager Gc; CasContainerStrategy Cas(Gc); @@ -2008,7 +2025,7 @@ TEST_CASE("compactcas.iteratechunks") Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end()); } }, - WorkerThreadPool::EMode::EnableBacklog); + WorkerThreadPool::EMode::DisableBacklog); Offset += BatchCount; } WorkLatch.CountDown(); |