aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-10-06 10:18:49 +0200
committerGitHub Enterprise <[email protected]>2025-10-06 10:18:49 +0200
commitd5b2a263e6d4c893c6ec3fb99b36110630da6529 (patch)
tree35bfa4c23c838fdbebd07100db2ebf2404977b3f /src
parentfix link error with operator new (#553) (diff)
downloadzen-d5b2a263e6d4c893c6ec3fb99b36110630da6529.tar.xz
zen-d5b2a263e6d4c893c6ec3fb99b36110630da6529.zip
speed up tests (#555)
* faster FileSystemTraversal test * faster jobqueue test * faster NamedEvent test * faster cache tests * faster basic http tests * faster blockstore test * faster cache store tests * faster compactcas tests * more responsive zenserver launch * tweak worker pool sizes in tests
Diffstat (limited to 'src')
-rw-r--r--src/zencore/filesystem.cpp13
-rw-r--r--src/zencore/jobqueue.cpp8
-rw-r--r--src/zencore/thread.cpp6
-rw-r--r--src/zenremotestore/projectstore/remoteprojectstore.cpp8
-rw-r--r--src/zenserver-test/cache-tests.cpp16
-rw-r--r--src/zenserver-test/zenserver-test.cpp4
-rw-r--r--src/zenstore/blockstore.cpp130
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp88
-rw-r--r--src/zenstore/compactcas.cpp145
-rw-r--r--src/zenutil/zenserverprocess.cpp6
10 files changed, 245 insertions, 179 deletions
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp
index 8e6f5085f..d18f21dbe 100644
--- a/src/zencore/filesystem.cpp
+++ b/src/zencore/filesystem.cpp
@@ -3112,15 +3112,22 @@ TEST_CASE("filesystem")
{
virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t, uint32_t, uint64_t) override
{
- bFoundExpected |= std::filesystem::equivalent(Parent / File, Expected);
+ // std::filesystem::equivalent is *very* expensive on Windows, filter out unlikely candidates
+ if (ExpectedFilename == ToLower(std::filesystem::path(File).string()))
+ {
+ bFoundExpected |= std::filesystem::equivalent(Parent / File, Expected);
+ }
}
virtual bool VisitDirectory(const std::filesystem::path&, const path_view&, uint32_t) override { return true; }
- bool bFoundExpected = false;
+ bool bFoundExpected = false;
+
+ std::string ExpectedFilename;
std::filesystem::path Expected;
} Visitor;
- Visitor.Expected = BinPath;
+ Visitor.ExpectedFilename = ToLower(BinPath.filename().string());
+ Visitor.Expected = BinPath;
FileSystemTraversal().TraverseFileSystem(BinPath.parent_path().parent_path(), Visitor);
CHECK(Visitor.bFoundExpected);
diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp
index 4aa8c113e..bd391909d 100644
--- a/src/zencore/jobqueue.cpp
+++ b/src/zencore/jobqueue.cpp
@@ -465,7 +465,7 @@ TEST_CASE("JobQueue")
std::unique_ptr<JobQueue> Queue(MakeJobQueue(2, "queue"));
WorkerThreadPool Pool(4);
Latch JobsLatch(1);
- for (uint32_t I = 0; I < 100; I++)
+ for (uint32_t I = 0; I < 32; I++)
{
JobsLatch.AddCount(1);
Pool.ScheduleWork(
@@ -479,7 +479,7 @@ TEST_CASE("JobQueue")
return;
}
Context.ReportProgress("going to sleep", "", 100, 100);
- Sleep(10);
+ Sleep(5);
if (Context.IsCancelled())
{
return;
@@ -489,7 +489,7 @@ TEST_CASE("JobQueue")
{
zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", I));
}
- Sleep(10);
+ Sleep(5);
if (Context.IsCancelled())
{
return;
@@ -575,7 +575,7 @@ TEST_CASE("JobQueue")
RemainingJobs.size(),
PendingCount,
RemainingJobs.size() - PendingCount);
- Sleep(100);
+ Sleep(5);
}
JobsLatch.Wait();
}
diff --git a/src/zencore/thread.cpp b/src/zencore/thread.cpp
index b8ec85a4a..abf282467 100644
--- a/src/zencore/thread.cpp
+++ b/src/zencore/thread.cpp
@@ -608,12 +608,12 @@ TEST_CASE("NamedEvent")
ReadyEvent.Set();
NamedEvent TestEvent(Name);
- TestEvent.Wait(1000);
+ TestEvent.Wait(100);
});
ReadyEvent.Wait();
- zen::Sleep(100);
+ zen::Sleep(10);
TestEvent.Set();
Waiter.join();
@@ -621,7 +621,7 @@ TEST_CASE("NamedEvent")
// Manual reset property
for (uint32_t i = 0; i < 8; ++i)
{
- bool bEventSet = TestEvent.Wait(100);
+ bool bEventSet = TestEvent.Wait(10);
CHECK(bEventSet);
}
}
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp
index d008d1452..0d1e0d93d 100644
--- a/src/zenremotestore/projectstore/remoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp
@@ -3452,8 +3452,12 @@ TEST_CASE_TEMPLATE("project.store.export",
std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Options);
RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
- WorkerThreadPool WorkerPool(4);
- WorkerThreadPool NetworkPool(2);
+ uint32_t NetworkWorkerCount = Max(std::thread::hardware_concurrency() / 4u, 2u);
+ uint32_t WorkerCount =
+ (NetworkWorkerCount < std::thread::hardware_concurrency()) ? Max(std::thread::hardware_concurrency() - NetworkWorkerCount, 4u) : 4u;
+
+ WorkerThreadPool WorkerPool(WorkerCount);
+ WorkerThreadPool NetworkPool(NetworkWorkerCount);
RemoteProjectStore::Result ExportResult = SaveOplog(CidStore,
*RemoteStore,
diff --git a/src/zenserver-test/cache-tests.cpp b/src/zenserver-test/cache-tests.cpp
index 2a040936c..1ce5f3be4 100644
--- a/src/zenserver-test/cache-tests.cpp
+++ b/src/zenserver-test/cache-tests.cpp
@@ -690,9 +690,9 @@ TEST_CASE("zcache.rpc")
CbPackage Package;
CHECK(Request.Format(Package));
- IoBuffer Body = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
- Body.SetContentType(HttpContentType::kCbPackage);
- HttpClient::Response Result = Http.Post("/$rpc", Body, {{"Accept", "application/x-ue-cbpkg"}});
+ CompositeBuffer Body(FormatPackageMessageBuffer(Package));
+ HttpClient::Response Result =
+ Http.Post("/$rpc", Body, HttpContentType::kCbPackage, HttpClient::Accept(HttpContentType::kCbPackage));
CHECK(Result.StatusCode == HttpResponseCode::OK);
if (OutPackages)
@@ -762,7 +762,7 @@ TEST_CASE("zcache.rpc")
const std::string BaseUri = fmt::format("http://localhost:{}/z$", BasePort);
CachePolicy Policy = CachePolicy::Default;
- std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 128);
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 16);
GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, Policy);
CHECK(Result.Result.Results.size() == Keys.size());
@@ -791,7 +791,7 @@ TEST_CASE("zcache.rpc")
const std::string BaseUri = fmt::format("http://localhost:{}/z$", BasePort);
CachePolicy Policy = CachePolicy::Default;
- std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 128);
+ std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 16);
std::vector<zen::CacheKey> Keys;
for (const zen::CacheKey& Key : ExistingKeys)
@@ -1244,7 +1244,7 @@ TEST_CASE("zcache.rpc")
const std::string BaseUri = fmt::format("http://localhost:{}/z$", BasePort);
std::vector<zen::CacheKey> SmallKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024);
- std::vector<zen::CacheKey> LargeKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024 * 1024 * 16, SmallKeys.size());
+ std::vector<zen::CacheKey> LargeKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 2, 1024 * 1024 * 16, SmallKeys.size());
std::vector<zen::CacheKey> Keys(SmallKeys.begin(), SmallKeys.end());
Keys.insert(Keys.end(), LargeKeys.begin(), LargeKeys.end());
@@ -1540,7 +1540,7 @@ TEST_CASE("zcache.failing.upstream")
}
Completed.fetch_add(1);
},
- WorkerThreadPool::EMode::EnableBacklog);
+ WorkerThreadPool::EMode::DisableBacklog);
}
bool UseUpstream1 = false;
while (Completed < ThreadCount * KeyMultiplier)
@@ -1634,7 +1634,7 @@ TEST_CASE("zcache.failing.upstream")
}
Completed.fetch_add(1);
},
- WorkerThreadPool::EMode::EnableBacklog);
+ WorkerThreadPool::EMode::DisableBacklog);
}
while (Completed < ThreadCount * KeyMultiplier)
{
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp
index 773383954..42296cbe1 100644
--- a/src/zenserver-test/zenserver-test.cpp
+++ b/src/zenserver-test/zenserver-test.cpp
@@ -134,7 +134,7 @@ TEST_CASE("default.single")
HttpClient Http{fmt::format("http://localhost:{}", PortNumber)};
- for (int i = 0; i < 10000; ++i)
+ for (int i = 0; i < 100; ++i)
{
auto res = Http.Get("/test/hello"sv);
++RequestCount;
@@ -222,7 +222,7 @@ TEST_CASE("multi.basic")
HttpClient Http{fmt::format("http://localhost:{}", PortNumber)};
- for (int i = 0; i < 10000; ++i)
+ for (int i = 0; i < 100; ++i)
{
auto res = Http.Get("/test/hello"sv);
++RequestCount;
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 77d21834a..6e51247f1 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -1686,7 +1686,7 @@ TEST_CASE("blockstore.iterate.chunks")
.Size = DefaultIterateSmallChunkWindowSize * 2};
BlockStoreLocation BadBlockIndex = {.BlockIndex = 0xfffff, .Offset = 1024, .Size = 1024};
- WorkerThreadPool WorkerPool(4);
+ WorkerThreadPool WorkerPool(Max(std::thread::hardware_concurrency() - 1u, 4u));
std::vector<BlockStoreLocation> Locations{FirstChunkLocation,
SecondChunkLocation,
@@ -1772,7 +1772,7 @@ TEST_CASE("blockstore.iterate.chunks")
0);
CHECK(Continue);
},
- WorkerThreadPool::EMode::EnableBacklog);
+ WorkerThreadPool::EMode::DisableBacklog);
return true;
});
WorkLatch.CountDown();
@@ -1789,7 +1789,7 @@ TEST_CASE("blockstore.thread.read.write")
BlockStore Store;
Store.Initialize(RootDirectory / "store", 1088, 1024);
- constexpr size_t ChunkCount = 1000;
+ constexpr size_t ChunkCount = 500;
constexpr size_t Alignment = 8;
std::vector<IoBuffer> Chunks;
std::vector<IoHash> ChunkHashes;
@@ -1805,70 +1805,84 @@ TEST_CASE("blockstore.thread.read.write")
std::vector<BlockStoreLocation> ChunkLocations;
ChunkLocations.resize(ChunkCount);
- WorkerThreadPool WorkerPool(8);
- std::atomic<size_t> WorkCompleted = 0;
- for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
- {
- WorkerPool.ScheduleWork(
- [&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() {
- IoBuffer& Chunk = Chunks[ChunkIndex];
- Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) {
- ChunkLocations[ChunkIndex] = L;
- });
- WorkCompleted.fetch_add(1);
- },
- WorkerThreadPool::EMode::DisableBacklog);
- }
- while (WorkCompleted < Chunks.size())
+ WorkerThreadPool WorkerPool(Max(std::thread::hardware_concurrency() - 1u, 8u));
{
- Sleep(1);
+ std::atomic<size_t> WorkCompleted = 0;
+ Latch L(1);
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ L.AddCount(1);
+ WorkerPool.ScheduleWork(
+ [&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted, &L]() {
+ auto _ = MakeGuard([&L]() { L.CountDown(); });
+ IoBuffer& Chunk = Chunks[ChunkIndex];
+ Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) {
+ ChunkLocations[ChunkIndex] = L;
+ });
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
+ }
+ L.CountDown();
+ L.Wait();
+ CHECK(WorkCompleted == Chunks.size());
}
- WorkCompleted = 0;
- for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
- {
- WorkerPool.ScheduleWork(
- [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
- IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]);
- CHECK(VerifyChunk);
- IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
- CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
- WorkCompleted.fetch_add(1);
- },
- WorkerThreadPool::EMode::DisableBacklog);
- }
- while (WorkCompleted < Chunks.size())
{
- Sleep(1);
+ std::atomic<size_t> WorkCompleted = 0;
+ Latch L(1);
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ L.AddCount(1);
+ WorkerPool.ScheduleWork(
+ [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted, &L]() {
+ auto _ = MakeGuard([&L]() { L.CountDown(); });
+ IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]);
+ CHECK(VerifyChunk);
+ IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
+ CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
+ }
+ L.CountDown();
+ L.Wait();
+ CHECK(WorkCompleted == Chunks.size());
}
std::vector<BlockStoreLocation> SecondChunkLocations;
SecondChunkLocations.resize(ChunkCount);
- WorkCompleted = 0;
- for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
- {
- WorkerPool.ScheduleWork(
- [&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() {
- IoBuffer& Chunk = Chunks[ChunkIndex];
- Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) {
- SecondChunkLocations[ChunkIndex] = L;
- });
- WorkCompleted.fetch_add(1);
- },
- WorkerThreadPool::EMode::DisableBacklog);
- WorkerPool.ScheduleWork(
- [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
- IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]);
- CHECK(VerifyChunk);
- IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
- CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
- WorkCompleted.fetch_add(1);
- },
- WorkerThreadPool::EMode::DisableBacklog);
- }
- while (WorkCompleted < Chunks.size() * 2)
{
- Sleep(1);
+ std::atomic<size_t> WorkCompleted = 0;
+ Latch L(1);
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ L.AddCount(1);
+ WorkerPool.ScheduleWork(
+ [&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted, &L]() {
+ auto _ = MakeGuard([&L]() { L.CountDown(); });
+ IoBuffer& Chunk = Chunks[ChunkIndex];
+ Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) {
+ SecondChunkLocations[ChunkIndex] = L;
+ });
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
+ L.AddCount(1);
+ WorkerPool.ScheduleWork(
+ [&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted, &L]() {
+ auto _ = MakeGuard([&L]() { L.CountDown(); });
+ IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]);
+ CHECK(VerifyChunk);
+ IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
+ CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
+ }
+ L.CountDown();
+ L.Wait();
+ CHECK(WorkCompleted == Chunks.size() * 2);
}
}
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index fd54e6765..da6acbde4 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -1332,17 +1332,30 @@ namespace testutils {
std::pair<Oid, IoBuffer> CreateBinaryBlob(size_t Size) { return {Oid::NewOid(), CreateRandomBlob(Size)}; }
- std::vector<std::pair<Oid, CompressedBuffer>> CreateCompressedAttachment(CidStore& Store, const std::span<const size_t>& Sizes)
+ std::vector<std::pair<Oid, CompressedBuffer>> CreateCompressedAttachment(CidStore& Store,
+ WorkerThreadPool& ThreadPool,
+ const std::span<const size_t>& Sizes)
{
std::vector<std::pair<Oid, CompressedBuffer>> Result;
- Result.reserve(Sizes.size());
- for (size_t Size : Sizes)
+ Result.resize(Sizes.size());
+ Latch L(1);
+ for (size_t Index = 0; Index < Sizes.size(); Index++)
{
- auto Blob = CreateBinaryBlob(Size);
- CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Blob.second.Data(), Blob.second.Size()));
- CHECK(!Store.ContainsChunk(Compressed.DecodeRawHash()));
- Result.emplace_back(std::pair<Oid, CompressedBuffer>(Blob.first, Compressed));
+ L.AddCount(1);
+ ThreadPool.ScheduleWork(
+ [&, Index]() {
+ auto _ = MakeGuard([&L]() { L.CountDown(); });
+ size_t Size = Sizes[Index];
+ auto Blob = CreateBinaryBlob(Size);
+ CompressedBuffer Compressed =
+ CompressedBuffer::Compress(SharedBuffer::MakeView(Blob.second.Data(), Blob.second.Size()));
+ CHECK(!Store.ContainsChunk(Compressed.DecodeRawHash()));
+ Result[Index] = std::pair<Oid, CompressedBuffer>(Blob.first, Compressed);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
+ L.CountDown();
+ L.Wait();
return Result;
}
@@ -1528,7 +1541,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
ScopedTemporaryDirectory TempDir;
const uint64_t kChunkSize = 1048;
- const int32_t kChunkCount = 8192;
+ const int32_t kChunkCount = 4096;
struct Chunk
{
@@ -1569,26 +1582,28 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
CreateDirectories(TempDir.Path());
- WorkerThreadPool ThreadPool(4);
+ WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency(), 8u));
GcManager Gc;
auto JobQueue = MakeJobQueue(1, "testqueue");
ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path(), {});
{
+ Latch L(1);
std::atomic<size_t> WorkCompleted = 0;
for (const auto& Chunk : Chunks)
{
+ L.AddCount(1);
ThreadPool.ScheduleWork(
- [&Zcs, &WorkCompleted, &Chunk]() {
+ [&Zcs, &WorkCompleted, &Chunk, &L]() {
+ auto _ = MakeGuard([&L]() { L.CountDown(); });
Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
WorkCompleted.fetch_add(1);
},
WorkerThreadPool::EMode::EnableBacklog);
}
- while (WorkCompleted < Chunks.size())
- {
- Sleep(1);
- }
+ L.CountDown();
+ L.Wait();
+ CHECK(WorkCompleted == Chunks.size());
}
auto DoGC = [](GcManager& Gc, ZenCacheNamespace& Zcs, std::unordered_map<IoHash, std::string, IoHash::Hasher>& GcChunkHashes) {
@@ -1615,11 +1630,14 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
CHECK_LE(kChunkSize * Chunks.size(), TotalSize);
{
+ Latch L(1);
std::atomic<size_t> WorkCompleted = 0;
for (const auto& Chunk : Chunks)
{
+ L.AddCount(1);
ThreadPool.ScheduleWork(
- [&Zcs, &WorkCompleted, &Chunk]() {
+ [&Zcs, &WorkCompleted, &Chunk, &L]() {
+ auto _ = MakeGuard([&]() { L.CountDown(); });
std::string Bucket = Chunk.second.Bucket;
IoHash ChunkHash = Chunk.first;
ZenCacheValue CacheValue;
@@ -1631,10 +1649,9 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
},
WorkerThreadPool::EMode::EnableBacklog);
}
- while (WorkCompleted < Chunks.size())
- {
- Sleep(1);
- }
+ L.CountDown();
+ L.Wait();
+ CHECK(WorkCompleted == Chunks.size());
}
std::unordered_map<IoHash, std::string, IoHash::Hasher> GcChunkHashes;
GcChunkHashes.reserve(Chunks.size());
@@ -1672,10 +1689,13 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
WorkerThreadPool::EMode::EnableBacklog);
}
+ Latch L(1);
for (const auto& Chunk : Chunks)
{
+ L.AddCount(1);
ThreadPool.ScheduleWork(
- [&Zcs, &WorkCompleted, Chunk]() {
+ [&Zcs, &WorkCompleted, Chunk, &L]() {
+ auto _ = MakeGuard([&L]() { L.CountDown(); });
ZenCacheValue CacheValue;
if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
{
@@ -1699,10 +1719,9 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
DoGC(Gc, Zcs, GcChunkHashes);
}
- while (WorkCompleted < NewChunks.size() + Chunks.size())
- {
- Sleep(1);
- }
+ L.CountDown();
+ L.Wait();
+ CHECK(WorkCompleted == NewChunks.size() + Chunks.size());
{
// Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
@@ -1718,12 +1737,15 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
}
}
{
+ Latch L(1);
{
std::atomic<size_t> WorkCompleted = 0;
for (const auto& Chunk : GcChunkHashes)
{
+ L.AddCount(1);
ThreadPool.ScheduleWork(
- [&Zcs, &WorkCompleted, Chunk]() {
+ [&Zcs, &WorkCompleted, Chunk, &L]() {
+ auto _ = MakeGuard([&L]() { L.CountDown(); });
ZenCacheValue CacheValue;
CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue));
CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
@@ -1731,10 +1753,9 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
},
WorkerThreadPool::EMode::EnableBacklog);
}
- while (WorkCompleted < GcChunkHashes.size())
- {
- Sleep(1);
- }
+ L.CountDown();
+ L.Wait();
+ CHECK(WorkCompleted == GcChunkHashes.size());
}
}
}
@@ -2236,6 +2257,8 @@ TEST_CASE("cachestore.newgc.basics")
std::vector<IoHash> CacheRecords;
std::vector<IoHash> UnstructuredCacheValues;
+ WorkerThreadPool WorkerPool(Max(std::thread::hardware_concurrency() - 1u, 2u));
+
const auto TearDrinkerBucket = "teardrinker"sv;
{
GcManager Gc;
@@ -2246,11 +2269,12 @@ TEST_CASE("cachestore.newgc.basics")
// Create some basic data
{
// Structured record with attachments
- auto Attachments1 = CreateCompressedAttachment(CidStore, std::vector<size_t>{77, 1024 * 1024 * 2, 99, 1024 * 1024 * 2 + 87});
+ auto Attachments1 =
+ CreateCompressedAttachment(CidStore, WorkerPool, std::vector<size_t>{77, 1024 * 1024 * 2, 99, 1024 * 1024 * 2 + 87});
CacheRecords.emplace_back(CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments1));
// Structured record with reuse of attachments
- auto Attachments2 = CreateCompressedAttachment(CidStore, std::vector<size_t>{971});
+ auto Attachments2 = CreateCompressedAttachment(CidStore, WorkerPool, std::vector<size_t>{971});
Attachments2.push_back(Attachments1[0]);
Attachments2.push_back(Attachments1[1]);
CacheRecords.emplace_back(CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments2));
@@ -2664,7 +2688,7 @@ TEST_CASE("cachestore.newgc.basics")
CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
auto Attachments =
- CreateCompressedAttachment(CidStore, std::vector<size_t>{177, 1024 * 1024 * 2 + 31, 8999, 1024 * 1024 * 2 + 187});
+ CreateCompressedAttachment(CidStore, WorkerPool, std::vector<size_t>{177, 1024 * 1024 * 2 + 31, 8999, 1024 * 1024 * 2 + 187});
IoHash CacheRecord = CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments);
{
// Do get so it ends up in memcache
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 9792f6d0b..14ef2a15d 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -1491,7 +1491,7 @@ TEST_CASE("compactcas.threadedinsert")
ScopedTemporaryDirectory TempDir;
const uint64_t kChunkSize = 1048;
- const int32_t kChunkCount = 4096;
+ const int32_t kChunkCount = 2048;
uint64_t ExpectedSize = 0;
tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher> Chunks;
@@ -1513,40 +1513,47 @@ TEST_CASE("compactcas.threadedinsert")
}
}
- std::atomic<size_t> WorkCompleted = 0;
WorkerThreadPool ThreadPool(4);
GcManager Gc;
CasContainerStrategy Cas(Gc);
- Cas.Initialize(TempDir.Path(), "test", 32768, 16, true);
{
- for (const auto& Chunk : Chunks)
+ std::atomic<size_t> WorkCompleted = 0;
+ Latch L(1);
+ Cas.Initialize(TempDir.Path(), "test", 32768, 16, true);
{
- const IoHash& Hash = Chunk.first;
- const IoBuffer& Buffer = Chunk.second;
- ThreadPool.ScheduleWork(
- [&Cas, &WorkCompleted, Buffer, Hash]() {
- CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash);
- ZEN_ASSERT(InsertResult.New);
- WorkCompleted.fetch_add(1);
- },
- WorkerThreadPool::EMode::DisableBacklog);
- }
- while (WorkCompleted < Chunks.size())
- {
- Sleep(1);
+ for (const auto& Chunk : Chunks)
+ {
+ const IoHash& Hash = Chunk.first;
+ const IoBuffer& Buffer = Chunk.second;
+ L.AddCount(1);
+ ThreadPool.ScheduleWork(
+ [&Cas, &WorkCompleted, Buffer, Hash, &L]() {
+ auto _ = MakeGuard([&L]() { L.CountDown(); });
+ CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash);
+ ZEN_ASSERT(InsertResult.New);
+ WorkCompleted.fetch_add(1);
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
+ }
+ L.CountDown();
+ L.Wait();
+ CHECK(WorkCompleted == Chunks.size());
}
}
- WorkCompleted = 0;
- const uint64_t TotalSize = Cas.StorageSize().DiskSize;
- CHECK_LE(ExpectedSize, TotalSize);
- CHECK_GE(ExpectedSize + 32768, TotalSize);
-
{
+ std::atomic<size_t> WorkCompleted = 0;
+ Latch L(1);
+ const uint64_t TotalSize = Cas.StorageSize().DiskSize;
+ CHECK_LE(ExpectedSize, TotalSize);
+ CHECK_GE(ExpectedSize + 32768, TotalSize);
+
for (const auto& Chunk : Chunks)
{
+ L.AddCount(1);
ThreadPool.ScheduleWork(
- [&Cas, &WorkCompleted, &Chunk]() {
+ [&Cas, &WorkCompleted, &Chunk, &L]() {
+ auto _ = MakeGuard([&L]() { L.CountDown(); });
IoHash ChunkHash = Chunk.first;
IoBuffer Buffer = Cas.FindChunk(ChunkHash);
IoHash Hash = IoHash::HashBuffer(Buffer);
@@ -1555,10 +1562,9 @@ TEST_CASE("compactcas.threadedinsert")
},
WorkerThreadPool::EMode::DisableBacklog);
}
- while (WorkCompleted < Chunks.size())
- {
- Sleep(1);
- }
+ L.CountDown();
+ L.Wait();
+ CHECK(WorkCompleted == Chunks.size());
}
tsl::robin_set<IoHash, IoHash::Hasher> GcChunkHashes;
@@ -1568,7 +1574,8 @@ TEST_CASE("compactcas.threadedinsert")
GcChunkHashes.insert(Chunk.first);
}
{
- WorkCompleted = 0;
+ std::atomic<size_t> WorkCompleted = 0;
+ Latch L(1);
tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher> NewChunks;
NewChunks.reserve(kChunkCount);
@@ -1584,8 +1591,10 @@ TEST_CASE("compactcas.threadedinsert")
for (const auto& Chunk : NewChunks)
{
+ L.AddCount(1);
ThreadPool.ScheduleWork(
- [&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() {
+ [&Cas, &WorkCompleted, Chunk, &AddedChunkCount, &L]() {
+ auto _ = MakeGuard([&L]() { L.CountDown(); });
Cas.InsertChunk(Chunk.second, Chunk.first);
AddedChunkCount.fetch_add(1);
WorkCompleted.fetch_add(1);
@@ -1594,8 +1603,10 @@ TEST_CASE("compactcas.threadedinsert")
}
for (const auto& Chunk : Chunks)
{
+ L.AddCount(1);
ThreadPool.ScheduleWork(
- [&Cas, &WorkCompleted, Chunk]() {
+ [&Cas, &WorkCompleted, Chunk, &L]() {
+ auto _ = MakeGuard([&L]() { L.CountDown(); });
IoHash ChunkHash = Chunk.first;
IoBuffer Buffer = Cas.FindChunk(ChunkHash);
if (Buffer)
@@ -1678,29 +1689,31 @@ TEST_CASE("compactcas.threadedinsert")
DoGC(Cas, ChunksToDelete, KeepHashes, GcChunkHashes);
}
- while (WorkCompleted < NewChunks.size() + Chunks.size())
- {
- Sleep(1);
- }
+ L.CountDown();
+ L.Wait();
+
+ CHECK(WorkCompleted == NewChunks.size() + Chunks.size());
DoGC(Cas, ChunksToDelete, KeepHashes, GcChunkHashes);
}
{
- WorkCompleted = 0;
+ std::atomic<size_t> WorkCompleted = 0;
+ Latch L(1);
for (const IoHash& ChunkHash : GcChunkHashes)
{
+ L.AddCount(1);
ThreadPool.ScheduleWork(
- [&Cas, &WorkCompleted, ChunkHash]() {
+ [&Cas, &WorkCompleted, ChunkHash, &L]() {
+ auto _ = MakeGuard([&L]() { L.CountDown(); });
CHECK(Cas.HaveChunk(ChunkHash));
CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash)));
WorkCompleted.fetch_add(1);
},
WorkerThreadPool::EMode::DisableBacklog);
}
- while (WorkCompleted < GcChunkHashes.size())
- {
- Sleep(1);
- }
+ L.CountDown();
+ L.Wait();
+ CHECK(WorkCompleted == GcChunkHashes.size());
}
}
}
@@ -1709,9 +1722,9 @@ TEST_CASE("compactcas.restart")
{
uint64_t ExpectedSize = 0;
- auto GenerateChunks = [&](CasContainerStrategy& Cas, size_t ChunkCount, uint64_t ChunkSize, std::vector<IoHash>& Hashes) {
- WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put");
+ WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put");
+ auto GenerateChunks = [&](CasContainerStrategy& Cas, size_t ChunkCount, uint64_t ChunkSize, std::vector<IoHash>& Hashes) {
Latch WorkLatch(1);
tsl::robin_set<IoHash, IoHash::Hasher> ChunkHashesLookup;
ChunkHashesLookup.reserve(ChunkCount);
@@ -1759,6 +1772,7 @@ TEST_CASE("compactcas.restart")
}
WorkLatch.CountDown();
WorkLatch.Wait();
+ CHECK(ChunkHashesLookup.size() == ChunkCount);
};
ScopedTemporaryDirectory TempDir;
@@ -1780,26 +1794,29 @@ TEST_CASE("compactcas.restart")
Hashes.reserve(kChunkCount);
auto ValidateChunks = [&](CasContainerStrategy& Cas, std::span<const IoHash> Hashes, bool ShouldExist) {
- for (const IoHash& Hash : Hashes)
- {
- if (ShouldExist)
- {
- CHECK(Cas.HaveChunk(Hash));
- IoBuffer Buffer = Cas.FindChunk(Hash);
- CHECK(Buffer);
- IoHash ValidateHash;
- uint64_t ValidateRawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), ValidateHash, ValidateRawSize);
- CHECK(Compressed);
- CHECK(ValidateHash == Hash);
- }
- else
- {
- CHECK(!Cas.HaveChunk(Hash));
- IoBuffer Buffer = Cas.FindChunk(Hash);
- CHECK(!Buffer);
- }
- }
+ std::vector<bool> Exists(Hashes.size(), false);
+ Cas.IterateChunks(
+ Hashes,
+ [&](size_t Index, const IoBuffer& Buffer) -> bool {
+ Exists[Index] = !!Buffer;
+ if (ShouldExist)
+ {
+ CHECK(Buffer);
+ IoHash ValidateHash;
+ uint64_t ValidateRawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), ValidateHash, ValidateRawSize);
+ CHECK(Compressed);
+ CHECK(ValidateHash == Hashes[Index]);
+ }
+ else
+ {
+ CHECK(!Buffer);
+ }
+ return true;
+ },
+ &ThreadPool,
+ 1u * 1248u);
+ CHECK_EQ(std::find(Exists.begin(), Exists.end(), !ShouldExist), Exists.end());
};
{
@@ -1944,7 +1961,7 @@ TEST_CASE("compactcas.iteratechunks")
const uint64_t kChunkSize = 1048 + 395;
const size_t kChunkCount = 63840;
- for (uint32_t N = 0; N < 4; N++)
+ for (uint32_t N = 0; N < 2; N++)
{
GcManager Gc;
CasContainerStrategy Cas(Gc);
@@ -2008,7 +2025,7 @@ TEST_CASE("compactcas.iteratechunks")
Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end());
}
},
- WorkerThreadPool::EMode::EnableBacklog);
+ WorkerThreadPool::EMode::DisableBacklog);
Offset += BatchCount;
}
WorkLatch.CountDown();
diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp
index 36211263a..6a93f0c63 100644
--- a/src/zenutil/zenserverprocess.cpp
+++ b/src/zenutil/zenserverprocess.cpp
@@ -915,7 +915,7 @@ ZenServerInstance::Detach()
uint16_t
ZenServerInstance::WaitUntilReady()
{
- while (m_ReadyEvent.Wait(100) == false)
+ while (m_ReadyEvent.Wait(10) == false)
{
if (!m_Process.IsValid())
{
@@ -938,7 +938,7 @@ bool
ZenServerInstance::WaitUntilReady(int Timeout)
{
int TimeoutLeftMS = Timeout;
- while (m_ReadyEvent.Wait(100) == false)
+ while (m_ReadyEvent.Wait(10) == false)
{
if (!m_Process.IsValid())
{
@@ -950,7 +950,7 @@ ZenServerInstance::WaitUntilReady(int Timeout)
ZEN_WARN("Wait abandoned by exited process");
return false;
}
- TimeoutLeftMS -= 100;
+ TimeoutLeftMS -= 10;
if ((TimeoutLeftMS <= 0))
{
ZEN_WARN("Wait abandoned due to timeout");