aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/filecas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-11-27 16:05:56 +0100
committerGitHub Enterprise <[email protected]>2025-11-27 16:05:56 +0100
commit4984e8cd5c38cf77c8cb978f75f808bce0577f2d (patch)
treec298828c6290a669500788f96f8ea25be41ff88a /src/zenstore/filecas.cpp
parentremove bad assert (#670) (diff)
downloadzen-4984e8cd5c38cf77c8cb978f75f808bce0577f2d.tar.xz
zen-4984e8cd5c38cf77c8cb978f75f808bce0577f2d.zip
automatic scrub on startup (#667)
- Improvement: Deeper validation of data when scrub is activated (cas/cache/project) - Improvement: Enabled more multi threading when running scrub operations - Improvement: Added means to force a scrub operation at startup with a new release using ZEN_DATA_FORCE_SCRUB_VERSION variable in xmake.lua
Diffstat (limited to 'src/zenstore/filecas.cpp')
-rw-r--r--src/zenstore/filecas.cpp109
1 files changed, 40 insertions, 69 deletions
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 13d881e0c..31b3a68c4 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -799,8 +799,11 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
ZEN_ASSERT(m_IsInitialized);
- std::vector<IoHash> BadHashes;
- uint64_t ChunkCount{0}, ChunkBytes{0};
+ RwLock BadKeysLock;
+ std::vector<IoHash> BadKeys;
+ auto ReportBadKey = [&](const IoHash& Key) { BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Key); }); };
+
+ uint64_t ChunkCount{0}, ChunkBytes{0};
int DiscoveredFilesNotInIndex = 0;
@@ -820,88 +823,56 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
ZEN_INFO("discovered {} files @ '{}' ({} not in index), scrubbing", m_Index.size(), m_RootDirectory, DiscoveredFilesNotInIndex);
- IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) {
- if (!Payload)
- {
- BadHashes.push_back(Hash);
- return;
- }
- ++ChunkCount;
- ChunkBytes += Payload.GetSize();
+ std::atomic<bool> Abort;
+ std::atomic<bool> Pause;
+ ParallelWork Work(Abort, Pause, WorkerThreadPool::EMode::DisableBacklog);
+
+ try
+ {
+ IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) {
+ Ctx.ThrowIfDeadlineExpired();
- IoBuffer InMemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Payload);
+ ChunkCount++;
+ ChunkBytes += Payload.GetSize();
- IoHash RawHash;
- uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(Payload, /* out */ RawHash, /* out */ RawSize))
- {
- if (RawHash == Hash)
+ if (!Payload)
{
- // Header hash matches the file name, full validation requires that
- // we check that the decompressed data hash also matches
+ ReportBadKey(Hash);
+ return;
+ }
- CompressedBuffer CompBuffer = CompressedBuffer::FromCompressedNoValidate(std::move(InMemoryBuffer));
+ Payload.MakeOwned();
- OodleCompressor Compressor;
- OodleCompressionLevel CompressionLevel;
- uint64_t BlockSize;
- if (CompBuffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
+ Work.ScheduleWork(Ctx.ThreadPool(), [&, Hash = IoHash(Hash), Payload = std::move(Payload)](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag)
{
- if (BlockSize == 0)
- {
- BlockSize = 256 * 1024;
- }
- else if (BlockSize < (1024 * 1024))
- {
- BlockSize = BlockSize * (1024 * 1024 / BlockSize);
- }
-
- std::unique_ptr<uint8_t[]> DecompressionBuffer(new uint8_t[BlockSize]);
-
- IoHashStream Hasher;
-
- uint64_t RawOffset = 0;
- while (RawSize)
+ if (!ValidateCompressedBuffer(CompositeBuffer(SharedBuffer(Payload)), &Hash))
{
- const uint64_t DecompressedBlockSize = Min(BlockSize, RawSize);
-
- bool Ok = CompBuffer.TryDecompressTo(MutableMemoryView((void*)DecompressionBuffer.get(), DecompressedBlockSize),
- RawOffset);
-
- if (Ok)
- {
- Hasher.Append(DecompressionBuffer.get(), DecompressedBlockSize);
- }
-
- RawSize -= DecompressedBlockSize;
- RawOffset += DecompressedBlockSize;
- }
-
- const IoHash FinalHash = Hasher.GetHash();
-
- if (FinalHash == Hash)
- {
- // all good
- return;
+ ReportBadKey(Hash);
}
}
- }
- }
-
- BadHashes.push_back(Hash);
- });
+ });
+ });
+ Work.Wait();
+ }
+ catch (const ScrubDeadlineExpiredException&)
+ {
+ ZEN_INFO("Scrubbing deadline expired, operation incomplete");
+ Abort = true;
+ Work.Wait();
+ }
Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
- if (!BadHashes.empty())
+ if (!BadKeys.empty())
{
- ZEN_WARN("file CAS scrubbing: {} bad chunks found @ '{}'", BadHashes.size(), m_RootDirectory);
+ ZEN_WARN("file CAS scrubbing: {} bad chunks out of {} found @ '{}'", BadKeys.size(), ChunkCount, m_RootDirectory);
if (Ctx.RunRecovery())
{
- ZEN_WARN("recovery: deleting backing files for {} bad chunks which were identified as bad", BadHashes.size());
+ ZEN_WARN("recovery: deleting backing files for {} bad chunks which were identified as bad", BadKeys.size());
- for (const IoHash& Hash : BadHashes)
+ for (const IoHash& Hash : BadKeys)
{
std::error_code Ec;
DeleteChunk(Hash, Ec);
@@ -914,14 +885,14 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
}
else
{
- ZEN_WARN("recovery: NOT deleting backing files for {} bad chunks", BadHashes.size());
+ ZEN_WARN("recovery: NOT deleting backing files for {} bad chunks", BadKeys.size());
}
}
// Let whomever it concerns know about the bad chunks. This could
// be used to invalidate higher level data structures more efficiently
// than a full validation pass might be able to do
- Ctx.ReportBadCidChunks(BadHashes);
+ Ctx.ReportBadCidChunks(BadKeys);
ZEN_INFO("file CAS @ '{}' scrubbed: {} chunks ({}), took {}",
m_RootDirectory,