diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-06 10:18:49 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-06 10:18:49 +0200 |
| commit | d5b2a263e6d4c893c6ec3fb99b36110630da6529 (patch) | |
| tree | 35bfa4c23c838fdbebd07100db2ebf2404977b3f /src/zenstore | |
| parent | fix link error with operator new (#553) (diff) | |
| download | zen-d5b2a263e6d4c893c6ec3fb99b36110630da6529.tar.xz zen-d5b2a263e6d4c893c6ec3fb99b36110630da6529.zip | |
speed up tests (#555)
* faster FileSystemTraversal test
* faster jobqueue test
* faster NamedEvent test
* faster cache tests
* faster basic http tests
* faster blockstore test
* faster cache store tests
* faster compactcas tests
* more responsive zenserver launch
* tweak worker pool sizes in tests
Diffstat (limited to 'src/zenstore')
| -rw-r--r-- | src/zenstore/blockstore.cpp | 130 | ||||
| -rw-r--r-- | src/zenstore/cache/structuredcachestore.cpp | 88 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 145 |
3 files changed, 209 insertions, 154 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 77d21834a..6e51247f1 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -1686,7 +1686,7 @@ TEST_CASE("blockstore.iterate.chunks") .Size = DefaultIterateSmallChunkWindowSize * 2}; BlockStoreLocation BadBlockIndex = {.BlockIndex = 0xfffff, .Offset = 1024, .Size = 1024}; - WorkerThreadPool WorkerPool(4); + WorkerThreadPool WorkerPool(Max(std::thread::hardware_concurrency() - 1u, 4u)); std::vector<BlockStoreLocation> Locations{FirstChunkLocation, SecondChunkLocation, @@ -1772,7 +1772,7 @@ TEST_CASE("blockstore.iterate.chunks") 0); CHECK(Continue); }, - WorkerThreadPool::EMode::EnableBacklog); + WorkerThreadPool::EMode::DisableBacklog); return true; }); WorkLatch.CountDown(); @@ -1789,7 +1789,7 @@ TEST_CASE("blockstore.thread.read.write") BlockStore Store; Store.Initialize(RootDirectory / "store", 1088, 1024); - constexpr size_t ChunkCount = 1000; + constexpr size_t ChunkCount = 500; constexpr size_t Alignment = 8; std::vector<IoBuffer> Chunks; std::vector<IoHash> ChunkHashes; @@ -1805,70 +1805,84 @@ TEST_CASE("blockstore.thread.read.write") std::vector<BlockStoreLocation> ChunkLocations; ChunkLocations.resize(ChunkCount); - WorkerThreadPool WorkerPool(8); - std::atomic<size_t> WorkCompleted = 0; - for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) - { - WorkerPool.ScheduleWork( - [&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() { - IoBuffer& Chunk = Chunks[ChunkIndex]; - Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { - ChunkLocations[ChunkIndex] = L; - }); - WorkCompleted.fetch_add(1); - }, - WorkerThreadPool::EMode::DisableBacklog); - } - while (WorkCompleted < Chunks.size()) + WorkerThreadPool WorkerPool(Max(std::thread::hardware_concurrency() - 1u, 8u)); { - Sleep(1); + std::atomic<size_t> WorkCompleted = 0; + Latch L(1); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + L.AddCount(1); + WorkerPool.ScheduleWork( + [&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); + IoBuffer& Chunk = Chunks[ChunkIndex]; + Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { + ChunkLocations[ChunkIndex] = L; + }); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); + } + L.CountDown(); + L.Wait(); + CHECK(WorkCompleted == Chunks.size()); } - WorkCompleted = 0; - for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) - { - WorkerPool.ScheduleWork( - [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { - IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); - CHECK(VerifyChunk); - IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); - CHECK(VerifyHash == ChunkHashes[ChunkIndex]); - WorkCompleted.fetch_add(1); - }, - WorkerThreadPool::EMode::DisableBacklog); - } - while (WorkCompleted < Chunks.size()) { - Sleep(1); + std::atomic<size_t> WorkCompleted = 0; + Latch L(1); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + L.AddCount(1); + WorkerPool.ScheduleWork( + [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); + IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); + CHECK(VerifyChunk); + IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); + CHECK(VerifyHash == ChunkHashes[ChunkIndex]); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); + } + L.CountDown(); + L.Wait(); + CHECK(WorkCompleted == Chunks.size()); } std::vector<BlockStoreLocation> SecondChunkLocations; SecondChunkLocations.resize(ChunkCount); - WorkCompleted = 0; - for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) - { - WorkerPool.ScheduleWork( - [&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() { - IoBuffer& Chunk = Chunks[ChunkIndex]; - Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { - SecondChunkLocations[ChunkIndex] = L; - }); - WorkCompleted.fetch_add(1); - }, - WorkerThreadPool::EMode::DisableBacklog); - WorkerPool.ScheduleWork( - [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { - IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); - CHECK(VerifyChunk); - IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); - CHECK(VerifyHash == ChunkHashes[ChunkIndex]); - WorkCompleted.fetch_add(1); - }, - WorkerThreadPool::EMode::DisableBacklog); - } - while (WorkCompleted < Chunks.size() * 2) { - Sleep(1); + std::atomic<size_t> WorkCompleted = 0; + Latch L(1); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + L.AddCount(1); + WorkerPool.ScheduleWork( + [&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); + IoBuffer& Chunk = Chunks[ChunkIndex]; + Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { + SecondChunkLocations[ChunkIndex] = L; + }); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); + L.AddCount(1); + WorkerPool.ScheduleWork( + [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); + IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); + CHECK(VerifyChunk); + IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); + CHECK(VerifyHash == ChunkHashes[ChunkIndex]); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); + } + L.CountDown(); + L.Wait(); + CHECK(WorkCompleted == Chunks.size() * 2); } } diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index fd54e6765..da6acbde4 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -1332,17 +1332,30 @@ namespace testutils { std::pair<Oid, IoBuffer> CreateBinaryBlob(size_t Size) { return {Oid::NewOid(), CreateRandomBlob(Size)}; } - std::vector<std::pair<Oid, CompressedBuffer>> CreateCompressedAttachment(CidStore& Store, const std::span<const size_t>& Sizes) + std::vector<std::pair<Oid, CompressedBuffer>> CreateCompressedAttachment(CidStore& Store, + WorkerThreadPool& ThreadPool, + const std::span<const size_t>& Sizes) { std::vector<std::pair<Oid, CompressedBuffer>> Result; - Result.reserve(Sizes.size()); - for (size_t Size : Sizes) + Result.resize(Sizes.size()); + Latch L(1); + for (size_t Index = 0; Index < Sizes.size(); Index++) { - auto Blob = CreateBinaryBlob(Size); - CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Blob.second.Data(), Blob.second.Size())); - CHECK(!Store.ContainsChunk(Compressed.DecodeRawHash())); - Result.emplace_back(std::pair<Oid, CompressedBuffer>(Blob.first, Compressed)); + L.AddCount(1); + ThreadPool.ScheduleWork( + [&, Index]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); + size_t Size = Sizes[Index]; + auto Blob = CreateBinaryBlob(Size); + CompressedBuffer Compressed = + CompressedBuffer::Compress(SharedBuffer::MakeView(Blob.second.Data(), Blob.second.Size())); + CHECK(!Store.ContainsChunk(Compressed.DecodeRawHash())); + Result[Index] = std::pair<Oid, CompressedBuffer>(Blob.first, Compressed); + }, + WorkerThreadPool::EMode::DisableBacklog); } + L.CountDown(); + L.Wait(); return Result; } @@ -1528,7 +1541,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) ScopedTemporaryDirectory TempDir; const uint64_t kChunkSize = 1048; - const int32_t kChunkCount = 8192; + const int32_t kChunkCount = 4096; struct Chunk { @@ -1569,26 +1582,28 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) CreateDirectories(TempDir.Path()); - WorkerThreadPool ThreadPool(4); + WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency(), 8u)); GcManager Gc; auto JobQueue = MakeJobQueue(1, "testqueue"); ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path(), {}); { + Latch L(1); std::atomic<size_t> WorkCompleted = 0; for (const auto& Chunk : Chunks) { + L.AddCount(1); ThreadPool.ScheduleWork( - [&Zcs, &WorkCompleted, &Chunk]() { + [&Zcs, &WorkCompleted, &Chunk, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false); WorkCompleted.fetch_add(1); }, WorkerThreadPool::EMode::EnableBacklog); } - while (WorkCompleted < Chunks.size()) - { - Sleep(1); - } + L.CountDown(); + L.Wait(); + CHECK(WorkCompleted == Chunks.size()); } auto DoGC = [](GcManager& Gc, ZenCacheNamespace& Zcs, std::unordered_map<IoHash, std::string, IoHash::Hasher>& GcChunkHashes) { @@ -1615,11 +1630,14 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) CHECK_LE(kChunkSize * Chunks.size(), TotalSize); { + Latch L(1); std::atomic<size_t> WorkCompleted = 0; for (const auto& Chunk : Chunks) { + L.AddCount(1); ThreadPool.ScheduleWork( - [&Zcs, &WorkCompleted, &Chunk]() { + [&Zcs, &WorkCompleted, &Chunk, &L]() { + auto _ = MakeGuard([&]() { L.CountDown(); }); std::string Bucket = Chunk.second.Bucket; IoHash ChunkHash = Chunk.first; ZenCacheValue CacheValue; @@ -1631,10 +1649,9 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) }, WorkerThreadPool::EMode::EnableBacklog); } - while (WorkCompleted < Chunks.size()) - { - Sleep(1); - } + L.CountDown(); + L.Wait(); + CHECK(WorkCompleted == Chunks.size()); } std::unordered_map<IoHash, std::string, IoHash::Hasher> GcChunkHashes; GcChunkHashes.reserve(Chunks.size()); @@ -1672,10 +1689,13 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) WorkerThreadPool::EMode::EnableBacklog); } + Latch L(1); for (const auto& Chunk : Chunks) { + L.AddCount(1); ThreadPool.ScheduleWork( - [&Zcs, &WorkCompleted, Chunk]() { + [&Zcs, &WorkCompleted, Chunk, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); ZenCacheValue CacheValue; if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) { @@ -1699,10 +1719,9 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) DoGC(Gc, Zcs, GcChunkHashes); } - while (WorkCompleted < NewChunks.size() + Chunks.size()) - { - Sleep(1); - } + L.CountDown(); + L.Wait(); + CHECK(WorkCompleted == NewChunks.size() + Chunks.size()); { // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope @@ -1718,12 +1737,15 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) } } { + Latch L(1); { std::atomic<size_t> WorkCompleted = 0; for (const auto& Chunk : GcChunkHashes) { + L.AddCount(1); ThreadPool.ScheduleWork( - [&Zcs, &WorkCompleted, Chunk]() { + [&Zcs, &WorkCompleted, Chunk, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); ZenCacheValue CacheValue; CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue)); CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); @@ -1731,10 +1753,9 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) }, WorkerThreadPool::EMode::EnableBacklog); } - while (WorkCompleted < GcChunkHashes.size()) - { - Sleep(1); - } + L.CountDown(); + L.Wait(); + CHECK(WorkCompleted == GcChunkHashes.size()); } } } @@ -2236,6 +2257,8 @@ TEST_CASE("cachestore.newgc.basics") std::vector<IoHash> CacheRecords; std::vector<IoHash> UnstructuredCacheValues; + WorkerThreadPool WorkerPool(Max(std::thread::hardware_concurrency() - 1u, 2u)); + const auto TearDrinkerBucket = "teardrinker"sv; { GcManager Gc; @@ -2246,11 +2269,12 @@ TEST_CASE("cachestore.newgc.basics") // Create some basic data { // Structured record with attachments - auto Attachments1 = CreateCompressedAttachment(CidStore, std::vector<size_t>{77, 1024 * 1024 * 2, 99, 1024 * 1024 * 2 + 87}); + auto Attachments1 = + CreateCompressedAttachment(CidStore, WorkerPool, std::vector<size_t>{77, 1024 * 1024 * 2, 99, 1024 * 1024 * 2 + 87}); CacheRecords.emplace_back(CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments1)); // Structured record with reuse of attachments - auto Attachments2 = CreateCompressedAttachment(CidStore, std::vector<size_t>{971}); + auto Attachments2 = CreateCompressedAttachment(CidStore, WorkerPool, std::vector<size_t>{971}); Attachments2.push_back(Attachments1[0]); Attachments2.push_back(Attachments1[1]); CacheRecords.emplace_back(CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments2)); @@ -2664,7 +2688,7 @@ TEST_CASE("cachestore.newgc.basics") CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount); auto Attachments = - CreateCompressedAttachment(CidStore, std::vector<size_t>{177, 1024 * 1024 * 2 + 31, 8999, 1024 * 1024 * 2 + 187}); + CreateCompressedAttachment(CidStore, WorkerPool, std::vector<size_t>{177, 1024 * 1024 * 2 + 31, 8999, 1024 * 1024 * 2 + 187}); IoHash CacheRecord = CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments); { // Do get so it ends up in memcache diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 9792f6d0b..14ef2a15d 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -1491,7 +1491,7 @@ TEST_CASE("compactcas.threadedinsert") ScopedTemporaryDirectory TempDir; const uint64_t kChunkSize = 1048; - const int32_t kChunkCount = 4096; + const int32_t kChunkCount = 2048; uint64_t ExpectedSize = 0; tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher> Chunks; @@ -1513,40 +1513,47 @@ TEST_CASE("compactcas.threadedinsert") } } - std::atomic<size_t> WorkCompleted = 0; WorkerThreadPool ThreadPool(4); GcManager Gc; CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "test", 32768, 16, true); { - for (const auto& Chunk : Chunks) + std::atomic<size_t> WorkCompleted = 0; + Latch L(1); + Cas.Initialize(TempDir.Path(), "test", 32768, 16, true); { - const IoHash& Hash = Chunk.first; - const IoBuffer& Buffer = Chunk.second; - ThreadPool.ScheduleWork( - [&Cas, &WorkCompleted, Buffer, Hash]() { - CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash); - ZEN_ASSERT(InsertResult.New); - WorkCompleted.fetch_add(1); - }, - WorkerThreadPool::EMode::DisableBacklog); - } - while (WorkCompleted < Chunks.size()) - { - Sleep(1); + for (const auto& Chunk : Chunks) + { + const IoHash& Hash = Chunk.first; + const IoBuffer& Buffer = Chunk.second; + L.AddCount(1); + ThreadPool.ScheduleWork( + [&Cas, &WorkCompleted, Buffer, Hash, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); + CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash); + ZEN_ASSERT(InsertResult.New); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); + } + L.CountDown(); + L.Wait(); + CHECK(WorkCompleted == Chunks.size()); } } - WorkCompleted = 0; - const uint64_t TotalSize = Cas.StorageSize().DiskSize; - CHECK_LE(ExpectedSize, TotalSize); - CHECK_GE(ExpectedSize + 32768, TotalSize); - { + std::atomic<size_t> WorkCompleted = 0; + Latch L(1); + const uint64_t TotalSize = Cas.StorageSize().DiskSize; + CHECK_LE(ExpectedSize, TotalSize); + CHECK_GE(ExpectedSize + 32768, TotalSize); + for (const auto& Chunk : Chunks) { + L.AddCount(1); ThreadPool.ScheduleWork( - [&Cas, &WorkCompleted, &Chunk]() { + [&Cas, &WorkCompleted, &Chunk, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); IoHash ChunkHash = Chunk.first; IoBuffer Buffer = Cas.FindChunk(ChunkHash); IoHash Hash = IoHash::HashBuffer(Buffer); @@ -1555,10 +1562,9 @@ TEST_CASE("compactcas.threadedinsert") }, WorkerThreadPool::EMode::DisableBacklog); } - while (WorkCompleted < Chunks.size()) - { - Sleep(1); - } + L.CountDown(); + L.Wait(); + CHECK(WorkCompleted == Chunks.size()); } tsl::robin_set<IoHash, IoHash::Hasher> GcChunkHashes; @@ -1568,7 +1574,8 @@ TEST_CASE("compactcas.threadedinsert") GcChunkHashes.insert(Chunk.first); } { - WorkCompleted = 0; + std::atomic<size_t> WorkCompleted = 0; + Latch L(1); tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher> NewChunks; NewChunks.reserve(kChunkCount); @@ -1584,8 +1591,10 @@ TEST_CASE("compactcas.threadedinsert") for (const auto& Chunk : NewChunks) { + L.AddCount(1); ThreadPool.ScheduleWork( - [&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() { + [&Cas, &WorkCompleted, Chunk, &AddedChunkCount, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); Cas.InsertChunk(Chunk.second, Chunk.first); AddedChunkCount.fetch_add(1); WorkCompleted.fetch_add(1); @@ -1594,8 +1603,10 @@ TEST_CASE("compactcas.threadedinsert") } for (const auto& Chunk : Chunks) { + L.AddCount(1); ThreadPool.ScheduleWork( - [&Cas, &WorkCompleted, Chunk]() { + [&Cas, &WorkCompleted, Chunk, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); IoHash ChunkHash = Chunk.first; IoBuffer Buffer = Cas.FindChunk(ChunkHash); if (Buffer) @@ -1678,29 +1689,31 @@ TEST_CASE("compactcas.threadedinsert") DoGC(Cas, ChunksToDelete, KeepHashes, GcChunkHashes); } - while (WorkCompleted < NewChunks.size() + Chunks.size()) - { - Sleep(1); - } + L.CountDown(); + L.Wait(); + + CHECK(WorkCompleted == NewChunks.size() + Chunks.size()); DoGC(Cas, ChunksToDelete, KeepHashes, GcChunkHashes); } { - WorkCompleted = 0; + std::atomic<size_t> WorkCompleted = 0; + Latch L(1); for (const IoHash& ChunkHash : GcChunkHashes) { + L.AddCount(1); ThreadPool.ScheduleWork( - [&Cas, &WorkCompleted, ChunkHash]() { + [&Cas, &WorkCompleted, ChunkHash, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); CHECK(Cas.HaveChunk(ChunkHash)); CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); WorkCompleted.fetch_add(1); }, WorkerThreadPool::EMode::DisableBacklog); } - while (WorkCompleted < GcChunkHashes.size()) - { - Sleep(1); - } + L.CountDown(); + L.Wait(); + CHECK(WorkCompleted == GcChunkHashes.size()); } } } @@ -1709,9 +1722,9 @@ TEST_CASE("compactcas.restart") { uint64_t ExpectedSize = 0; - auto GenerateChunks = [&](CasContainerStrategy& Cas, size_t ChunkCount, uint64_t ChunkSize, std::vector<IoHash>& Hashes) { - WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put"); + WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put"); + auto GenerateChunks = [&](CasContainerStrategy& Cas, size_t ChunkCount, uint64_t ChunkSize, std::vector<IoHash>& Hashes) { Latch WorkLatch(1); tsl::robin_set<IoHash, IoHash::Hasher> ChunkHashesLookup; ChunkHashesLookup.reserve(ChunkCount); @@ -1759,6 +1772,7 @@ TEST_CASE("compactcas.restart") } WorkLatch.CountDown(); WorkLatch.Wait(); + CHECK(ChunkHashesLookup.size() == ChunkCount); }; ScopedTemporaryDirectory TempDir; @@ -1780,26 +1794,29 @@ TEST_CASE("compactcas.restart") Hashes.reserve(kChunkCount); auto ValidateChunks = [&](CasContainerStrategy& Cas, std::span<const IoHash> Hashes, bool ShouldExist) { - for (const IoHash& Hash : Hashes) - { - if (ShouldExist) - { - CHECK(Cas.HaveChunk(Hash)); - IoBuffer Buffer = Cas.FindChunk(Hash); - CHECK(Buffer); - IoHash ValidateHash; - uint64_t ValidateRawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), ValidateHash, ValidateRawSize); - CHECK(Compressed); - CHECK(ValidateHash == Hash); - } - else - { - CHECK(!Cas.HaveChunk(Hash)); - IoBuffer Buffer = Cas.FindChunk(Hash); - CHECK(!Buffer); - } - } + std::vector<bool> Exists(Hashes.size(), false); + Cas.IterateChunks( + Hashes, + [&](size_t Index, const IoBuffer& Buffer) -> bool { + Exists[Index] = !!Buffer; + if (ShouldExist) + { + CHECK(Buffer); + IoHash ValidateHash; + uint64_t ValidateRawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), ValidateHash, ValidateRawSize); + CHECK(Compressed); + CHECK(ValidateHash == Hashes[Index]); + } + else + { + CHECK(!Buffer); + } + return true; + }, + &ThreadPool, + 1u * 1248u); + CHECK_EQ(std::find(Exists.begin(), Exists.end(), !ShouldExist), Exists.end()); }; { @@ -1944,7 +1961,7 @@ TEST_CASE("compactcas.iteratechunks") const uint64_t kChunkSize = 1048 + 395; const size_t kChunkCount = 63840; - for (uint32_t N = 0; N < 4; N++) + for (uint32_t N = 0; N < 2; N++) { GcManager Gc; CasContainerStrategy Cas(Gc); @@ -2008,7 +2025,7 @@ TEST_CASE("compactcas.iteratechunks") Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end()); } }, - WorkerThreadPool::EMode::EnableBacklog); + WorkerThreadPool::EMode::DisableBacklog); Offset += BatchCount; } WorkLatch.CountDown(); |