diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-06 10:18:49 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-06 10:18:49 +0200 |
| commit | d5b2a263e6d4c893c6ec3fb99b36110630da6529 (patch) | |
| tree | 35bfa4c23c838fdbebd07100db2ebf2404977b3f /src | |
| parent | fix link error with operator new (#553) (diff) | |
| download | zen-d5b2a263e6d4c893c6ec3fb99b36110630da6529.tar.xz zen-d5b2a263e6d4c893c6ec3fb99b36110630da6529.zip | |
speed up tests (#555)
* faster FileSystemTraversal test
* faster jobqueue test
* faster NamedEvent test
* faster cache tests
* faster basic http tests
* faster blockstore test
* faster cache store tests
* faster compactcas tests
* more responsive zenserver launch
* tweak worker pool sizes in tests
Diffstat (limited to 'src')
| -rw-r--r-- | src/zencore/filesystem.cpp | 13 | ||||
| -rw-r--r-- | src/zencore/jobqueue.cpp | 8 | ||||
| -rw-r--r-- | src/zencore/thread.cpp | 6 | ||||
| -rw-r--r-- | src/zenremotestore/projectstore/remoteprojectstore.cpp | 8 | ||||
| -rw-r--r-- | src/zenserver-test/cache-tests.cpp | 16 | ||||
| -rw-r--r-- | src/zenserver-test/zenserver-test.cpp | 4 | ||||
| -rw-r--r-- | src/zenstore/blockstore.cpp | 130 | ||||
| -rw-r--r-- | src/zenstore/cache/structuredcachestore.cpp | 88 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 145 | ||||
| -rw-r--r-- | src/zenutil/zenserverprocess.cpp | 6 |
10 files changed, 245 insertions, 179 deletions
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp index 8e6f5085f..d18f21dbe 100644 --- a/src/zencore/filesystem.cpp +++ b/src/zencore/filesystem.cpp @@ -3112,15 +3112,22 @@ TEST_CASE("filesystem") { virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t, uint32_t, uint64_t) override { - bFoundExpected |= std::filesystem::equivalent(Parent / File, Expected); + // std::filesystem::equivalent is *very* expensive on Windows, filter out unlikely candidates + if (ExpectedFilename == ToLower(std::filesystem::path(File).string())) + { + bFoundExpected |= std::filesystem::equivalent(Parent / File, Expected); + } } virtual bool VisitDirectory(const std::filesystem::path&, const path_view&, uint32_t) override { return true; } - bool bFoundExpected = false; + bool bFoundExpected = false; + + std::string ExpectedFilename; std::filesystem::path Expected; } Visitor; - Visitor.Expected = BinPath; + Visitor.ExpectedFilename = ToLower(BinPath.filename().string()); + Visitor.Expected = BinPath; FileSystemTraversal().TraverseFileSystem(BinPath.parent_path().parent_path(), Visitor); CHECK(Visitor.bFoundExpected); diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp index 4aa8c113e..bd391909d 100644 --- a/src/zencore/jobqueue.cpp +++ b/src/zencore/jobqueue.cpp @@ -465,7 +465,7 @@ TEST_CASE("JobQueue") std::unique_ptr<JobQueue> Queue(MakeJobQueue(2, "queue")); WorkerThreadPool Pool(4); Latch JobsLatch(1); - for (uint32_t I = 0; I < 100; I++) + for (uint32_t I = 0; I < 32; I++) { JobsLatch.AddCount(1); Pool.ScheduleWork( @@ -479,7 +479,7 @@ TEST_CASE("JobQueue") return; } Context.ReportProgress("going to sleep", "", 100, 100); - Sleep(10); + Sleep(5); if (Context.IsCancelled()) { return; @@ -489,7 +489,7 @@ TEST_CASE("JobQueue") { zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", I)); } - Sleep(10); + Sleep(5); if (Context.IsCancelled()) { return; @@ -575,7 +575,7 @@ TEST_CASE("JobQueue") RemainingJobs.size(), PendingCount, RemainingJobs.size() - PendingCount); - Sleep(100); + Sleep(5); } JobsLatch.Wait(); } diff --git a/src/zencore/thread.cpp b/src/zencore/thread.cpp index b8ec85a4a..abf282467 100644 --- a/src/zencore/thread.cpp +++ b/src/zencore/thread.cpp @@ -608,12 +608,12 @@ TEST_CASE("NamedEvent") ReadyEvent.Set(); NamedEvent TestEvent(Name); - TestEvent.Wait(1000); + TestEvent.Wait(100); }); ReadyEvent.Wait(); - zen::Sleep(100); + zen::Sleep(10); TestEvent.Set(); Waiter.join(); @@ -621,7 +621,7 @@ TEST_CASE("NamedEvent") // Manual reset property for (uint32_t i = 0; i < 8; ++i) { - bool bEventSet = TestEvent.Wait(100); + bool bEventSet = TestEvent.Wait(10); CHECK(bEventSet); } } diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp index d008d1452..0d1e0d93d 100644 --- a/src/zenremotestore/projectstore/remoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp @@ -3452,8 +3452,12 @@ TEST_CASE_TEMPLATE("project.store.export", std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Options); RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); - WorkerThreadPool WorkerPool(4); - WorkerThreadPool NetworkPool(2); + uint32_t NetworkWorkerCount = Max(std::thread::hardware_concurrency() / 4u, 2u); + uint32_t WorkerCount = + (NetworkWorkerCount < std::thread::hardware_concurrency()) ? Max(std::thread::hardware_concurrency() - NetworkWorkerCount, 4u) : 4u; + + WorkerThreadPool WorkerPool(WorkerCount); + WorkerThreadPool NetworkPool(NetworkWorkerCount); RemoteProjectStore::Result ExportResult = SaveOplog(CidStore, *RemoteStore, diff --git a/src/zenserver-test/cache-tests.cpp b/src/zenserver-test/cache-tests.cpp index 2a040936c..1ce5f3be4 100644 --- a/src/zenserver-test/cache-tests.cpp +++ b/src/zenserver-test/cache-tests.cpp @@ -690,9 +690,9 @@ TEST_CASE("zcache.rpc") CbPackage Package; CHECK(Request.Format(Package)); - IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer(); - Body.SetContentType(HttpContentType::kCbPackage); - HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}}); + CompositeBuffer Body(FormatPackageMessageBuffer(Package)); + HttpClient::Response Result = + Http.Post("/$rpc", Body, HttpContentType::kCbPackage, HttpClient::Accept(HttpContentType::kCbPackage)); CHECK(Result.StatusCode == HttpResponseCode::OK); if (OutPackages) @@ -762,7 +762,7 @@ TEST_CASE("zcache.rpc") const std::string BaseUri = fmt::format("http://localhost:{}/z$", BasePort); CachePolicy Policy = CachePolicy::Default; - std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 128); + std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 16); GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, Policy); CHECK(Result.Result.Results.size() == Keys.size()); @@ -791,7 +791,7 @@ TEST_CASE("zcache.rpc") const std::string BaseUri = fmt::format("http://localhost:{}/z$", BasePort); CachePolicy Policy = CachePolicy::Default; - std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 128); + std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 16); std::vector<zen::CacheKey> Keys; for (const zen::CacheKey& Key : ExistingKeys) @@ -1244,7 +1244,7 @@ TEST_CASE("zcache.rpc") const std::string BaseUri = fmt::format("http://localhost:{}/z$", BasePort); std::vector<zen::CacheKey> SmallKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024); - std::vector<zen::CacheKey> LargeKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024 * 1024 * 16, SmallKeys.size()); + std::vector<zen::CacheKey> LargeKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 2, 1024 * 1024 * 16, SmallKeys.size()); std::vector<zen::CacheKey> Keys(SmallKeys.begin(), SmallKeys.end()); Keys.insert(Keys.end(), LargeKeys.begin(), LargeKeys.end()); @@ -1540,7 +1540,7 @@ TEST_CASE("zcache.failing.upstream") } Completed.fetch_add(1); }, - WorkerThreadPool::EMode::EnableBacklog); + WorkerThreadPool::EMode::DisableBacklog); } bool UseUpstream1 = false; while (Completed < ThreadCount * KeyMultiplier) @@ -1634,7 +1634,7 @@ TEST_CASE("zcache.failing.upstream") } Completed.fetch_add(1); }, - WorkerThreadPool::EMode::EnableBacklog); + WorkerThreadPool::EMode::DisableBacklog); } while (Completed < ThreadCount * KeyMultiplier) { diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index 773383954..42296cbe1 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -134,7 +134,7 @@ TEST_CASE("default.single") HttpClient Http{fmt::format("http://localhost:{}", PortNumber)}; - for (int i = 0; i < 10000; ++i) + for (int i = 0; i < 100; ++i) { auto res = Http.Get("/test/hello"sv); ++RequestCount; @@ -222,7 +222,7 @@ TEST_CASE("multi.basic") HttpClient Http{fmt::format("http://localhost:{}", PortNumber)}; - for (int i = 0; i < 10000; ++i) + for (int i = 0; i < 100; ++i) { auto res = Http.Get("/test/hello"sv); ++RequestCount; diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 77d21834a..6e51247f1 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -1686,7 +1686,7 @@ TEST_CASE("blockstore.iterate.chunks") .Size = DefaultIterateSmallChunkWindowSize * 2}; BlockStoreLocation BadBlockIndex = {.BlockIndex = 0xfffff, .Offset = 1024, .Size = 1024}; - WorkerThreadPool WorkerPool(4); + WorkerThreadPool WorkerPool(Max(std::thread::hardware_concurrency() - 1u, 4u)); std::vector<BlockStoreLocation> Locations{FirstChunkLocation, SecondChunkLocation, @@ -1772,7 +1772,7 @@ TEST_CASE("blockstore.iterate.chunks") 0); CHECK(Continue); }, - WorkerThreadPool::EMode::EnableBacklog); + WorkerThreadPool::EMode::DisableBacklog); return true; }); WorkLatch.CountDown(); @@ -1789,7 +1789,7 @@ TEST_CASE("blockstore.thread.read.write") BlockStore Store; Store.Initialize(RootDirectory / "store", 1088, 1024); - constexpr size_t ChunkCount = 1000; + constexpr size_t ChunkCount = 500; constexpr size_t Alignment = 8; std::vector<IoBuffer> Chunks; std::vector<IoHash> ChunkHashes; @@ -1805,70 +1805,84 @@ TEST_CASE("blockstore.thread.read.write") std::vector<BlockStoreLocation> ChunkLocations; ChunkLocations.resize(ChunkCount); - WorkerThreadPool WorkerPool(8); - std::atomic<size_t> WorkCompleted = 0; - for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) - { - WorkerPool.ScheduleWork( - [&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() { - IoBuffer& Chunk = Chunks[ChunkIndex]; - Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { - ChunkLocations[ChunkIndex] = L; - }); - WorkCompleted.fetch_add(1); - }, - WorkerThreadPool::EMode::DisableBacklog); - } - while (WorkCompleted < Chunks.size()) + WorkerThreadPool WorkerPool(Max(std::thread::hardware_concurrency() - 1u, 8u)); { - Sleep(1); + std::atomic<size_t> WorkCompleted = 0; + Latch L(1); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + L.AddCount(1); + WorkerPool.ScheduleWork( + [&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); + IoBuffer& Chunk = Chunks[ChunkIndex]; + Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { + ChunkLocations[ChunkIndex] = L; + }); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); + } + L.CountDown(); + L.Wait(); + CHECK(WorkCompleted == Chunks.size()); } - WorkCompleted = 0; - for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) - { - WorkerPool.ScheduleWork( - [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { - IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); - CHECK(VerifyChunk); - IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); - CHECK(VerifyHash == ChunkHashes[ChunkIndex]); - WorkCompleted.fetch_add(1); - }, - WorkerThreadPool::EMode::DisableBacklog); - } - while (WorkCompleted < Chunks.size()) { - Sleep(1); + std::atomic<size_t> WorkCompleted = 0; + Latch L(1); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + L.AddCount(1); + WorkerPool.ScheduleWork( + [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); + IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); + CHECK(VerifyChunk); + IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); + CHECK(VerifyHash == ChunkHashes[ChunkIndex]); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); + } + L.CountDown(); + L.Wait(); + CHECK(WorkCompleted == Chunks.size()); } std::vector<BlockStoreLocation> SecondChunkLocations; SecondChunkLocations.resize(ChunkCount); - WorkCompleted = 0; - for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) - { - WorkerPool.ScheduleWork( - [&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() { - IoBuffer& Chunk = Chunks[ChunkIndex]; - Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { - SecondChunkLocations[ChunkIndex] = L; - }); - WorkCompleted.fetch_add(1); - }, - WorkerThreadPool::EMode::DisableBacklog); - WorkerPool.ScheduleWork( - [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { - IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); - CHECK(VerifyChunk); - IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); - CHECK(VerifyHash == ChunkHashes[ChunkIndex]); - WorkCompleted.fetch_add(1); - }, - WorkerThreadPool::EMode::DisableBacklog); - } - while (WorkCompleted < Chunks.size() * 2) { - Sleep(1); + std::atomic<size_t> WorkCompleted = 0; + Latch L(1); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + L.AddCount(1); + WorkerPool.ScheduleWork( + [&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); + IoBuffer& Chunk = Chunks[ChunkIndex]; + Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { + SecondChunkLocations[ChunkIndex] = L; + }); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); + L.AddCount(1); + WorkerPool.ScheduleWork( + [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); + IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); + CHECK(VerifyChunk); + IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); + CHECK(VerifyHash == ChunkHashes[ChunkIndex]); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); + } + L.CountDown(); + L.Wait(); + CHECK(WorkCompleted == Chunks.size() * 2); } } diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index fd54e6765..da6acbde4 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -1332,17 +1332,30 @@ namespace testutils { std::pair<Oid, IoBuffer> CreateBinaryBlob(size_t Size) { return {Oid::NewOid(), CreateRandomBlob(Size)}; } - std::vector<std::pair<Oid, CompressedBuffer>> CreateCompressedAttachment(CidStore& Store, const std::span<const size_t>& Sizes) + std::vector<std::pair<Oid, CompressedBuffer>> CreateCompressedAttachment(CidStore& Store, + WorkerThreadPool& ThreadPool, + const std::span<const size_t>& Sizes) { std::vector<std::pair<Oid, CompressedBuffer>> Result; - Result.reserve(Sizes.size()); - for (size_t Size : Sizes) + Result.resize(Sizes.size()); + Latch L(1); + for (size_t Index = 0; Index < Sizes.size(); Index++) { - auto Blob = CreateBinaryBlob(Size); - CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Blob.second.Data(), Blob.second.Size())); - CHECK(!Store.ContainsChunk(Compressed.DecodeRawHash())); - Result.emplace_back(std::pair<Oid, CompressedBuffer>(Blob.first, Compressed)); + L.AddCount(1); + ThreadPool.ScheduleWork( + [&, Index]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); + size_t Size = Sizes[Index]; + auto Blob = CreateBinaryBlob(Size); + CompressedBuffer Compressed = + CompressedBuffer::Compress(SharedBuffer::MakeView(Blob.second.Data(), Blob.second.Size())); + CHECK(!Store.ContainsChunk(Compressed.DecodeRawHash())); + Result[Index] = std::pair<Oid, CompressedBuffer>(Blob.first, Compressed); + }, + WorkerThreadPool::EMode::DisableBacklog); } + L.CountDown(); + L.Wait(); return Result; } @@ -1528,7 +1541,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) ScopedTemporaryDirectory TempDir; const uint64_t kChunkSize = 1048; - const int32_t kChunkCount = 8192; + const int32_t kChunkCount = 4096; struct Chunk { @@ -1569,26 +1582,28 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) CreateDirectories(TempDir.Path()); - WorkerThreadPool ThreadPool(4); + WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency(), 8u)); GcManager Gc; auto JobQueue = MakeJobQueue(1, "testqueue"); ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path(), {}); { + Latch L(1); std::atomic<size_t> WorkCompleted = 0; for (const auto& Chunk : Chunks) { + L.AddCount(1); ThreadPool.ScheduleWork( - [&Zcs, &WorkCompleted, &Chunk]() { + [&Zcs, &WorkCompleted, &Chunk, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false); WorkCompleted.fetch_add(1); }, WorkerThreadPool::EMode::EnableBacklog); } - while (WorkCompleted < Chunks.size()) - { - Sleep(1); - } + L.CountDown(); + L.Wait(); + CHECK(WorkCompleted == Chunks.size()); } auto DoGC = [](GcManager& Gc, ZenCacheNamespace& Zcs, std::unordered_map<IoHash, std::string, IoHash::Hasher>& GcChunkHashes) { @@ -1615,11 +1630,14 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) CHECK_LE(kChunkSize * Chunks.size(), TotalSize); { + Latch L(1); std::atomic<size_t> WorkCompleted = 0; for (const auto& Chunk : Chunks) { + L.AddCount(1); ThreadPool.ScheduleWork( - [&Zcs, &WorkCompleted, &Chunk]() { + [&Zcs, &WorkCompleted, &Chunk, &L]() { + auto _ = MakeGuard([&]() { L.CountDown(); }); std::string Bucket = Chunk.second.Bucket; IoHash ChunkHash = Chunk.first; ZenCacheValue CacheValue; @@ -1631,10 +1649,9 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) }, WorkerThreadPool::EMode::EnableBacklog); } - while (WorkCompleted < Chunks.size()) - { - Sleep(1); - } + L.CountDown(); + L.Wait(); + CHECK(WorkCompleted == Chunks.size()); } std::unordered_map<IoHash, std::string, IoHash::Hasher> GcChunkHashes; GcChunkHashes.reserve(Chunks.size()); @@ -1672,10 +1689,13 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) WorkerThreadPool::EMode::EnableBacklog); } + Latch L(1); for (const auto& Chunk : Chunks) { + L.AddCount(1); ThreadPool.ScheduleWork( - [&Zcs, &WorkCompleted, Chunk]() { + [&Zcs, &WorkCompleted, Chunk, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); ZenCacheValue CacheValue; if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) { @@ -1699,10 +1719,9 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) DoGC(Gc, Zcs, GcChunkHashes); } - while (WorkCompleted < NewChunks.size() + Chunks.size()) - { - Sleep(1); - } + L.CountDown(); + L.Wait(); + CHECK(WorkCompleted == NewChunks.size() + Chunks.size()); { // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope @@ -1718,12 +1737,15 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) } } { + Latch L(1); { std::atomic<size_t> WorkCompleted = 0; for (const auto& Chunk : GcChunkHashes) { + L.AddCount(1); ThreadPool.ScheduleWork( - [&Zcs, &WorkCompleted, Chunk]() { + [&Zcs, &WorkCompleted, Chunk, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); ZenCacheValue CacheValue; CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue)); CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); @@ -1731,10 +1753,9 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) }, WorkerThreadPool::EMode::EnableBacklog); } - while (WorkCompleted < GcChunkHashes.size()) - { - Sleep(1); - } + L.CountDown(); + L.Wait(); + CHECK(WorkCompleted == GcChunkHashes.size()); } } } @@ -2236,6 +2257,8 @@ TEST_CASE("cachestore.newgc.basics") std::vector<IoHash> CacheRecords; std::vector<IoHash> UnstructuredCacheValues; + WorkerThreadPool WorkerPool(Max(std::thread::hardware_concurrency() - 1u, 2u)); + const auto TearDrinkerBucket = "teardrinker"sv; { GcManager Gc; @@ -2246,11 +2269,12 @@ TEST_CASE("cachestore.newgc.basics") // Create some basic data { // Structured record with attachments - auto Attachments1 = CreateCompressedAttachment(CidStore, std::vector<size_t>{77, 1024 * 1024 * 2, 99, 1024 * 1024 * 2 + 87}); + auto Attachments1 = + CreateCompressedAttachment(CidStore, WorkerPool, std::vector<size_t>{77, 1024 * 1024 * 2, 99, 1024 * 1024 * 2 + 87}); CacheRecords.emplace_back(CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments1)); // Structured record with reuse of attachments - auto Attachments2 = CreateCompressedAttachment(CidStore, std::vector<size_t>{971}); + auto Attachments2 = CreateCompressedAttachment(CidStore, WorkerPool, std::vector<size_t>{971}); Attachments2.push_back(Attachments1[0]); Attachments2.push_back(Attachments1[1]); CacheRecords.emplace_back(CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments2)); @@ -2664,7 +2688,7 @@ TEST_CASE("cachestore.newgc.basics") CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount); auto Attachments = - CreateCompressedAttachment(CidStore, std::vector<size_t>{177, 1024 * 1024 * 2 + 31, 8999, 1024 * 1024 * 2 + 187}); + CreateCompressedAttachment(CidStore, WorkerPool, std::vector<size_t>{177, 1024 * 1024 * 2 + 31, 8999, 1024 * 1024 * 2 + 187}); IoHash CacheRecord = CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments); { // Do get so it ends up in memcache diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 9792f6d0b..14ef2a15d 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -1491,7 +1491,7 @@ TEST_CASE("compactcas.threadedinsert") ScopedTemporaryDirectory TempDir; const uint64_t kChunkSize = 1048; - const int32_t kChunkCount = 4096; + const int32_t kChunkCount = 2048; uint64_t ExpectedSize = 0; tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher> Chunks; @@ -1513,40 +1513,47 @@ TEST_CASE("compactcas.threadedinsert") } } - std::atomic<size_t> WorkCompleted = 0; WorkerThreadPool ThreadPool(4); GcManager Gc; CasContainerStrategy Cas(Gc); - Cas.Initialize(TempDir.Path(), "test", 32768, 16, true); { - for (const auto& Chunk : Chunks) + std::atomic<size_t> WorkCompleted = 0; + Latch L(1); + Cas.Initialize(TempDir.Path(), "test", 32768, 16, true); { - const IoHash& Hash = Chunk.first; - const IoBuffer& Buffer = Chunk.second; - ThreadPool.ScheduleWork( - [&Cas, &WorkCompleted, Buffer, Hash]() { - CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash); - ZEN_ASSERT(InsertResult.New); - WorkCompleted.fetch_add(1); - }, - WorkerThreadPool::EMode::DisableBacklog); - } - while (WorkCompleted < Chunks.size()) - { - Sleep(1); + for (const auto& Chunk : Chunks) + { + const IoHash& Hash = Chunk.first; + const IoBuffer& Buffer = Chunk.second; + L.AddCount(1); + ThreadPool.ScheduleWork( + [&Cas, &WorkCompleted, Buffer, Hash, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); + CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash); + ZEN_ASSERT(InsertResult.New); + WorkCompleted.fetch_add(1); + }, + WorkerThreadPool::EMode::DisableBacklog); + } + L.CountDown(); + L.Wait(); + CHECK(WorkCompleted == Chunks.size()); } } - WorkCompleted = 0; - const uint64_t TotalSize = Cas.StorageSize().DiskSize; - CHECK_LE(ExpectedSize, TotalSize); - CHECK_GE(ExpectedSize + 32768, TotalSize); - { + std::atomic<size_t> WorkCompleted = 0; + Latch L(1); + const uint64_t TotalSize = Cas.StorageSize().DiskSize; + CHECK_LE(ExpectedSize, TotalSize); + CHECK_GE(ExpectedSize + 32768, TotalSize); + for (const auto& Chunk : Chunks) { + L.AddCount(1); ThreadPool.ScheduleWork( - [&Cas, &WorkCompleted, &Chunk]() { + [&Cas, &WorkCompleted, &Chunk, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); IoHash ChunkHash = Chunk.first; IoBuffer Buffer = Cas.FindChunk(ChunkHash); IoHash Hash = IoHash::HashBuffer(Buffer); @@ -1555,10 +1562,9 @@ TEST_CASE("compactcas.threadedinsert") }, WorkerThreadPool::EMode::DisableBacklog); } - while (WorkCompleted < Chunks.size()) - { - Sleep(1); - } + L.CountDown(); + L.Wait(); + CHECK(WorkCompleted == Chunks.size()); } tsl::robin_set<IoHash, IoHash::Hasher> GcChunkHashes; @@ -1568,7 +1574,8 @@ TEST_CASE("compactcas.threadedinsert") GcChunkHashes.insert(Chunk.first); } { - WorkCompleted = 0; + std::atomic<size_t> WorkCompleted = 0; + Latch L(1); tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher> NewChunks; NewChunks.reserve(kChunkCount); @@ -1584,8 +1591,10 @@ TEST_CASE("compactcas.threadedinsert") for (const auto& Chunk : NewChunks) { + L.AddCount(1); ThreadPool.ScheduleWork( - [&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() { + [&Cas, &WorkCompleted, Chunk, &AddedChunkCount, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); Cas.InsertChunk(Chunk.second, Chunk.first); AddedChunkCount.fetch_add(1); WorkCompleted.fetch_add(1); @@ -1594,8 +1603,10 @@ TEST_CASE("compactcas.threadedinsert") } for (const auto& Chunk : Chunks) { + L.AddCount(1); ThreadPool.ScheduleWork( - [&Cas, &WorkCompleted, Chunk]() { + [&Cas, &WorkCompleted, Chunk, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); IoHash ChunkHash = Chunk.first; IoBuffer Buffer = Cas.FindChunk(ChunkHash); if (Buffer) @@ -1678,29 +1689,31 @@ TEST_CASE("compactcas.threadedinsert") DoGC(Cas, ChunksToDelete, KeepHashes, GcChunkHashes); } - while (WorkCompleted < NewChunks.size() + Chunks.size()) - { - Sleep(1); - } + L.CountDown(); + L.Wait(); + + CHECK(WorkCompleted == NewChunks.size() + Chunks.size()); DoGC(Cas, ChunksToDelete, KeepHashes, GcChunkHashes); } { - WorkCompleted = 0; + std::atomic<size_t> WorkCompleted = 0; + Latch L(1); for (const IoHash& ChunkHash : GcChunkHashes) { + L.AddCount(1); ThreadPool.ScheduleWork( - [&Cas, &WorkCompleted, ChunkHash]() { + [&Cas, &WorkCompleted, ChunkHash, &L]() { + auto _ = MakeGuard([&L]() { L.CountDown(); }); CHECK(Cas.HaveChunk(ChunkHash)); CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); WorkCompleted.fetch_add(1); }, WorkerThreadPool::EMode::DisableBacklog); } - while (WorkCompleted < GcChunkHashes.size()) - { - Sleep(1); - } + L.CountDown(); + L.Wait(); + CHECK(WorkCompleted == GcChunkHashes.size()); } } } @@ -1709,9 +1722,9 @@ TEST_CASE("compactcas.restart") { uint64_t ExpectedSize = 0; - auto GenerateChunks = [&](CasContainerStrategy& Cas, size_t ChunkCount, uint64_t ChunkSize, std::vector<IoHash>& Hashes) { - WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put"); + WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put"); + auto GenerateChunks = [&](CasContainerStrategy& Cas, size_t ChunkCount, uint64_t ChunkSize, std::vector<IoHash>& Hashes) { Latch WorkLatch(1); tsl::robin_set<IoHash, IoHash::Hasher> ChunkHashesLookup; ChunkHashesLookup.reserve(ChunkCount); @@ -1759,6 +1772,7 @@ TEST_CASE("compactcas.restart") } WorkLatch.CountDown(); WorkLatch.Wait(); + CHECK(ChunkHashesLookup.size() == ChunkCount); }; ScopedTemporaryDirectory TempDir; @@ -1780,26 +1794,29 @@ TEST_CASE("compactcas.restart") Hashes.reserve(kChunkCount); auto ValidateChunks = [&](CasContainerStrategy& Cas, std::span<const IoHash> Hashes, bool ShouldExist) { - for (const IoHash& Hash : Hashes) - { - if (ShouldExist) - { - CHECK(Cas.HaveChunk(Hash)); - IoBuffer Buffer = Cas.FindChunk(Hash); - CHECK(Buffer); - IoHash ValidateHash; - uint64_t ValidateRawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), ValidateHash, ValidateRawSize); - CHECK(Compressed); - CHECK(ValidateHash == Hash); - } - else - { - CHECK(!Cas.HaveChunk(Hash)); - IoBuffer Buffer = Cas.FindChunk(Hash); - CHECK(!Buffer); - } - } + std::vector<bool> Exists(Hashes.size(), false); + Cas.IterateChunks( + Hashes, + [&](size_t Index, const IoBuffer& Buffer) -> bool { + Exists[Index] = !!Buffer; + if (ShouldExist) + { + CHECK(Buffer); + IoHash ValidateHash; + uint64_t ValidateRawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), ValidateHash, ValidateRawSize); + CHECK(Compressed); + CHECK(ValidateHash == Hashes[Index]); + } + else + { + CHECK(!Buffer); + } + return true; + }, + &ThreadPool, + 1u * 1248u); + CHECK_EQ(std::find(Exists.begin(), Exists.end(), !ShouldExist), Exists.end()); }; { @@ -1944,7 +1961,7 @@ TEST_CASE("compactcas.iteratechunks") const uint64_t kChunkSize = 1048 + 395; const size_t kChunkCount = 63840; - for (uint32_t N = 0; N < 4; N++) + for (uint32_t N = 0; N < 2; N++) { GcManager Gc; CasContainerStrategy Cas(Gc); @@ -2008,7 +2025,7 @@ TEST_CASE("compactcas.iteratechunks") Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end()); } }, - WorkerThreadPool::EMode::EnableBacklog); + WorkerThreadPool::EMode::DisableBacklog); Offset += BatchCount; } WorkLatch.CountDown(); diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp index 36211263a..6a93f0c63 100644 --- a/src/zenutil/zenserverprocess.cpp +++ b/src/zenutil/zenserverprocess.cpp @@ -915,7 +915,7 @@ ZenServerInstance::Detach() uint16_t ZenServerInstance::WaitUntilReady() { - while (m_ReadyEvent.Wait(100) == false) + while (m_ReadyEvent.Wait(10) == false) { if (!m_Process.IsValid()) { @@ -938,7 +938,7 @@ bool ZenServerInstance::WaitUntilReady(int Timeout) { int TimeoutLeftMS = Timeout; - while (m_ReadyEvent.Wait(100) == false) + while (m_ReadyEvent.Wait(10) == false) { if (!m_Process.IsValid()) { @@ -950,7 +950,7 @@ ZenServerInstance::WaitUntilReady(int Timeout) ZEN_WARN("Wait abandoned by exited process"); return false; } - TimeoutLeftMS -= 100; + TimeoutLeftMS -= 10; if ((TimeoutLeftMS <= 0)) { ZEN_WARN("Wait abandoned due to timeout"); |