From 4984e8cd5c38cf77c8cb978f75f808bce0577f2d Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 27 Nov 2025 16:05:56 +0100 Subject: automatic scrub on startup (#667) - Improvement: Deeper validation of data when scrub is activated (cas/cache/project) - Improvement: Enabled more multi threading when running scrub operations - Improvement: Added means to force a scrub operation at startup with a new release using ZEN_DATA_FORCE_SCRUB_VERSION variable in xmake.lua --- src/zenstore/cache/cachedisklayer.cpp | 78 ++++++++++++++++++++--------------- 1 file changed, 44 insertions(+), 34 deletions(-) (limited to 'src/zenstore/cache/cachedisklayer.cpp') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 67f587a7c..4f0d412ec 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -241,7 +241,10 @@ UpdateValueWithRawSizeAndHash(ZenCacheValue& Value) { if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) { - return CompressedBuffer::ValidateCompressedHeader(Value.Value, Value.RawHash, Value.RawSize); + return CompressedBuffer::ValidateCompressedHeader(Value.Value, + Value.RawHash, + Value.RawSize, + /*OutOptionalTotalCompressedSize*/ nullptr); } else { @@ -1623,7 +1626,10 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MetaData"); if (Location.IsFlagSet(DiskLocation::kCompressed)) { - if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize)) + if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, + OutValue.RawHash, + OutValue.RawSize, + /*OutOptionalTotalCompressedSize*/ nullptr)) { OutValue = ZenCacheValue{}; AddToMemCache = false; @@ -1752,7 +1758,10 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept { if (OutValue.Value.GetContentType() == ZenContentType::kCompressedBinary) { - if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize)) + if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, + OutValue.RawHash, + OutValue.RawSize, + /*OutOptionalTotalCompressedSize*/ nullptr)) { OutValue = ZenCacheValue{}; } @@ -1900,7 +1909,10 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal ZEN_TRACE_CPU("Z$::Bucket::Get::MetaData"); if (Location.IsFlagSet(DiskLocation::kCompressed)) { - if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize)) + if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, + OutValue.RawHash, + OutValue.RawSize, + /*OutOptionalTotalCompressedSize*/ nullptr)) { OutValue = ZenCacheValue{}; m_DiskMissCount++; @@ -2369,7 +2381,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) auto LogStats = MakeGuard([&] { const uint32_t DurationMs = gsl::narrow(Timer.GetElapsedTimeMs()); - ZEN_INFO("cache bucket '{}' scrubbed {}B in {} from {} chunks ({})", + ZEN_INFO("cache bucket '{}' scrubbed {} in {} from {} chunks ({})", m_BucketName, NiceBytes(VerifiedChunkBytes.load()), NiceTimeSpanMs(DurationMs), @@ -2402,10 +2414,10 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) const BucketPayload& Payload = m_Payloads[Kv.second]; const DiskLocation& Loc = Payload.Location; + Ctx.ThrowIfDeadlineExpired(); + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { - Ctx.ThrowIfDeadlineExpired(); - ChunkCount.fetch_add(1); VerifiedChunkBytes.fetch_add(Loc.Size()); @@ -2439,7 +2451,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) ReportBadKey(HashKey); continue; } - if (!ValidateIoBuffer(Loc.GetContentType(), Buffer)) + if (!ValidateIoBuffer(Loc.GetContentType(), std::move(Buffer))) { ReportBadKey(HashKey); continue; @@ -2478,7 +2490,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; ZenContentType ContentType = Payload.Location.GetContentType(); Buffer.SetContentType(ContentType); - if (!ValidateIoBuffer(ContentType, Buffer)) + if (!ValidateIoBuffer(ContentType, std::move(Buffer))) { ReportBadKey(Hash); return true; @@ -2501,7 +2513,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; ZenContentType ContentType = Payload.Location.GetContentType(); Buffer.SetContentType(ContentType); - if (!ValidateIoBuffer(ContentType, Buffer)) + if (!ValidateIoBuffer(ContentType, std::move(Buffer))) { ReportBadKey(Hash); return true; @@ -2522,7 +2534,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) if (!BadKeys.empty()) { - ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir); + ZEN_WARN("Scrubbing found {} bad chunks out of {} in '{}'", BadKeys.size(), ChunkCount.load(), m_BucketDir); if (Ctx.RunRecovery()) { @@ -4443,35 +4455,33 @@ ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) ZEN_TRACE_CPU("Z$::ScrubStorage"); RwLock::SharedLockScope _(m_Lock); - { - std::vector> Results; - Results.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) - { -# if 1 - Results.push_back(Ctx.ThreadPool().EnqueueTask( - std::packaged_task{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }}, - WorkerThreadPool::EMode::EnableBacklog)); -# else - CacheBucket& Bucket = *Kv.second; - Bucket.ScrubStorage(Ctx); -# endif - } + std::atomic Abort; + std::atomic Pause; + ParallelWork Work(Abort, Pause, WorkerThreadPool::EMode::DisableBacklog); - for (auto& Result : Results) - { - if (Result.valid()) - { - Result.wait(); - } - } - for (auto& Result : Results) + try + { + for (auto& Kv : m_Buckets) { - Result.get(); + Ctx.ThrowIfDeadlineExpired(); + Work.ScheduleWork(Ctx.ThreadPool(), [Bucket = Kv.second.get(), &Ctx](std::atomic& AbortFlag) { + if (!AbortFlag) + { + Bucket->ScrubStorage(Ctx); + } + }); } + Work.Wait(); + } + catch (const ScrubDeadlineExpiredException&) + { + ZEN_INFO("Scrubbing deadline expired, operation incomplete"); + Abort = true; + Work.Wait(); } } + #endif // ZEN_WITH_TESTS CacheStoreSize -- cgit v1.2.3