aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/cachedisklayer.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-11-27 16:05:56 +0100
committerGitHub Enterprise <[email protected]>2025-11-27 16:05:56 +0100
commit4984e8cd5c38cf77c8cb978f75f808bce0577f2d (patch)
treec298828c6290a669500788f96f8ea25be41ff88a /src/zenstore/cache/cachedisklayer.cpp
parentremove bad assert (#670) (diff)
downloadzen-4984e8cd5c38cf77c8cb978f75f808bce0577f2d.tar.xz
zen-4984e8cd5c38cf77c8cb978f75f808bce0577f2d.zip
automatic scrub on startup (#667)
- Improvement: Deeper validation of data when scrub is activated (cas/cache/project) - Improvement: Enabled more multi threading when running scrub operations - Improvement: Added means to force a scrub operation at startup with a new release using ZEN_DATA_FORCE_SCRUB_VERSION variable in xmake.lua
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp78
1 files changed, 44 insertions, 34 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 67f587a7c..4f0d412ec 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -241,7 +241,10 @@ UpdateValueWithRawSizeAndHash(ZenCacheValue& Value)
{
if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
{
- return CompressedBuffer::ValidateCompressedHeader(Value.Value, Value.RawHash, Value.RawSize);
+ return CompressedBuffer::ValidateCompressedHeader(Value.Value,
+ Value.RawHash,
+ Value.RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr);
}
else
{
@@ -1623,7 +1626,10 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MetaData");
if (Location.IsFlagSet(DiskLocation::kCompressed))
{
- if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize))
+ if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value,
+ OutValue.RawHash,
+ OutValue.RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr))
{
OutValue = ZenCacheValue{};
AddToMemCache = false;
@@ -1752,7 +1758,10 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
{
if (OutValue.Value.GetContentType() == ZenContentType::kCompressedBinary)
{
- if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize))
+ if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value,
+ OutValue.RawHash,
+ OutValue.RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr))
{
OutValue = ZenCacheValue{};
}
@@ -1900,7 +1909,10 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
ZEN_TRACE_CPU("Z$::Bucket::Get::MetaData");
if (Location.IsFlagSet(DiskLocation::kCompressed))
{
- if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize))
+ if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value,
+ OutValue.RawHash,
+ OutValue.RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr))
{
OutValue = ZenCacheValue{};
m_DiskMissCount++;
@@ -2369,7 +2381,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
auto LogStats = MakeGuard([&] {
const uint32_t DurationMs = gsl::narrow<uint32_t>(Timer.GetElapsedTimeMs());
- ZEN_INFO("cache bucket '{}' scrubbed {}B in {} from {} chunks ({})",
+ ZEN_INFO("cache bucket '{}' scrubbed {} in {} from {} chunks ({})",
m_BucketName,
NiceBytes(VerifiedChunkBytes.load()),
NiceTimeSpanMs(DurationMs),
@@ -2402,10 +2414,10 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
const BucketPayload& Payload = m_Payloads[Kv.second];
const DiskLocation& Loc = Payload.Location;
+ Ctx.ThrowIfDeadlineExpired();
+
if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
- Ctx.ThrowIfDeadlineExpired();
-
ChunkCount.fetch_add(1);
VerifiedChunkBytes.fetch_add(Loc.Size());
@@ -2439,7 +2451,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
ReportBadKey(HashKey);
continue;
}
- if (!ValidateIoBuffer(Loc.GetContentType(), Buffer))
+ if (!ValidateIoBuffer(Loc.GetContentType(), std::move(Buffer)))
{
ReportBadKey(HashKey);
continue;
@@ -2478,7 +2490,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
ZenContentType ContentType = Payload.Location.GetContentType();
Buffer.SetContentType(ContentType);
- if (!ValidateIoBuffer(ContentType, Buffer))
+ if (!ValidateIoBuffer(ContentType, std::move(Buffer)))
{
ReportBadKey(Hash);
return true;
@@ -2501,7 +2513,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
ZenContentType ContentType = Payload.Location.GetContentType();
Buffer.SetContentType(ContentType);
- if (!ValidateIoBuffer(ContentType, Buffer))
+ if (!ValidateIoBuffer(ContentType, std::move(Buffer)))
{
ReportBadKey(Hash);
return true;
@@ -2522,7 +2534,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
if (!BadKeys.empty())
{
- ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir);
+ ZEN_WARN("Scrubbing found {} bad chunks out of {} in '{}'", BadKeys.size(), ChunkCount.load(), m_BucketDir);
if (Ctx.RunRecovery())
{
@@ -4443,35 +4455,33 @@ ZenCacheDiskLayer::Scrub(ScrubContext& Ctx)
ZEN_TRACE_CPU("Z$::ScrubStorage");
RwLock::SharedLockScope _(m_Lock);
- {
- std::vector<std::future<void>> Results;
- Results.reserve(m_Buckets.size());
- for (auto& Kv : m_Buckets)
- {
-# if 1
- Results.push_back(Ctx.ThreadPool().EnqueueTask(
- std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }},
- WorkerThreadPool::EMode::EnableBacklog));
-# else
- CacheBucket& Bucket = *Kv.second;
- Bucket.ScrubStorage(Ctx);
-# endif
- }
+ std::atomic<bool> Abort;
+ std::atomic<bool> Pause;
+ ParallelWork Work(Abort, Pause, WorkerThreadPool::EMode::DisableBacklog);
- for (auto& Result : Results)
- {
- if (Result.valid())
- {
- Result.wait();
- }
- }
- for (auto& Result : Results)
+ try
+ {
+ for (auto& Kv : m_Buckets)
{
- Result.get();
+ Ctx.ThrowIfDeadlineExpired();
+ Work.ScheduleWork(Ctx.ThreadPool(), [Bucket = Kv.second.get(), &Ctx](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag)
+ {
+ Bucket->ScrubStorage(Ctx);
+ }
+ });
}
+ Work.Wait();
+ }
+ catch (const ScrubDeadlineExpiredException&)
+ {
+ ZEN_INFO("Scrubbing deadline expired, operation incomplete");
+ Abort = true;
+ Work.Wait();
}
}
+
#endif // ZEN_WITH_TESTS
CacheStoreSize