diff options
| author | Dan Engelbrecht <[email protected]> | 2025-11-27 16:05:56 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-11-27 16:05:56 +0100 |
| commit | 4984e8cd5c38cf77c8cb978f75f808bce0577f2d (patch) | |
| tree | c298828c6290a669500788f96f8ea25be41ff88a /src/zenstore/filecas.cpp | |
| parent | remove bad assert (#670) (diff) | |
| download | zen-4984e8cd5c38cf77c8cb978f75f808bce0577f2d.tar.xz zen-4984e8cd5c38cf77c8cb978f75f808bce0577f2d.zip | |
automatic scrub on startup (#667)
- Improvement: Deeper validation of data when scrub is activated (cas/cache/project)
- Improvement: Enabled more multi threading when running scrub operations
- Improvement: Added means to force a scrub operation at startup with a new release using ZEN_DATA_FORCE_SCRUB_VERSION variable in xmake.lua
Diffstat (limited to 'src/zenstore/filecas.cpp')
| -rw-r--r-- | src/zenstore/filecas.cpp | 109 |
1 files changed, 40 insertions, 69 deletions
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 13d881e0c..31b3a68c4 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -799,8 +799,11 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx) ZEN_ASSERT(m_IsInitialized); - std::vector<IoHash> BadHashes; - uint64_t ChunkCount{0}, ChunkBytes{0}; + RwLock BadKeysLock; + std::vector<IoHash> BadKeys; + auto ReportBadKey = [&](const IoHash& Key) { BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Key); }); }; + + uint64_t ChunkCount{0}, ChunkBytes{0}; int DiscoveredFilesNotInIndex = 0; @@ -820,88 +823,56 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx) ZEN_INFO("discovered {} files @ '{}' ({} not in index), scrubbing", m_Index.size(), m_RootDirectory, DiscoveredFilesNotInIndex); - IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) { - if (!Payload) - { - BadHashes.push_back(Hash); - return; - } - ++ChunkCount; - ChunkBytes += Payload.GetSize(); + std::atomic<bool> Abort; + std::atomic<bool> Pause; + ParallelWork Work(Abort, Pause, WorkerThreadPool::EMode::DisableBacklog); + + try + { + IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) { + Ctx.ThrowIfDeadlineExpired(); - IoBuffer InMemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Payload); + ChunkCount++; + ChunkBytes += Payload.GetSize(); - IoHash RawHash; - uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(Payload, /* out */ RawHash, /* out */ RawSize)) - { - if (RawHash == Hash) + if (!Payload) { - // Header hash matches the file name, full validation requires that - // we check that the decompressed data hash also matches + ReportBadKey(Hash); + return; + } - CompressedBuffer CompBuffer = CompressedBuffer::FromCompressedNoValidate(std::move(InMemoryBuffer)); + Payload.MakeOwned(); - OodleCompressor Compressor; - OodleCompressionLevel CompressionLevel; - uint64_t BlockSize; - if (CompBuffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) + Work.ScheduleWork(Ctx.ThreadPool(), [&, Hash = IoHash(Hash), Payload = std::move(Payload)](std::atomic<bool>& AbortFlag) { + if (!AbortFlag) { - if (BlockSize == 0) - { - BlockSize = 256 * 1024; - } - else if (BlockSize < (1024 * 1024)) - { - BlockSize = BlockSize * (1024 * 1024 / BlockSize); - } - - std::unique_ptr<uint8_t[]> DecompressionBuffer(new uint8_t[BlockSize]); - - IoHashStream Hasher; - - uint64_t RawOffset = 0; - while (RawSize) + if (!ValidateCompressedBuffer(CompositeBuffer(SharedBuffer(Payload)), &Hash)) { - const uint64_t DecompressedBlockSize = Min(BlockSize, RawSize); - - bool Ok = CompBuffer.TryDecompressTo(MutableMemoryView((void*)DecompressionBuffer.get(), DecompressedBlockSize), - RawOffset); - - if (Ok) - { - Hasher.Append(DecompressionBuffer.get(), DecompressedBlockSize); - } - - RawSize -= DecompressedBlockSize; - RawOffset += DecompressedBlockSize; - } - - const IoHash FinalHash = Hasher.GetHash(); - - if (FinalHash == Hash) - { - // all good - return; + ReportBadKey(Hash); } } - } - } - - BadHashes.push_back(Hash); - }); + }); + }); + Work.Wait(); + } + catch (const ScrubDeadlineExpiredException&) + { + ZEN_INFO("Scrubbing deadline expired, operation incomplete"); + Abort = true; + Work.Wait(); + } Ctx.ReportScrubbed(ChunkCount, ChunkBytes); - if (!BadHashes.empty()) + if (!BadKeys.empty()) { - ZEN_WARN("file CAS scrubbing: {} bad chunks found @ '{}'", BadHashes.size(), m_RootDirectory); + ZEN_WARN("file CAS scrubbing: {} bad chunks out of {} found @ '{}'", BadKeys.size(), ChunkCount, m_RootDirectory); if (Ctx.RunRecovery()) { - ZEN_WARN("recovery: deleting backing files for {} bad chunks which were identified as bad", BadHashes.size()); + ZEN_WARN("recovery: deleting backing files for {} bad chunks which were identified as bad", BadKeys.size()); - for (const IoHash& Hash : BadHashes) + for (const IoHash& Hash : BadKeys) { std::error_code Ec; DeleteChunk(Hash, Ec); @@ -914,14 +885,14 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx) } else { - ZEN_WARN("recovery: NOT deleting backing files for {} bad chunks", BadHashes.size()); + ZEN_WARN("recovery: NOT deleting backing files for {} bad chunks", BadKeys.size()); } } // Let whomever it concerns know about the bad chunks. This could // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do - Ctx.ReportBadCidChunks(BadHashes); + Ctx.ReportBadCidChunks(BadKeys); ZEN_INFO("file CAS @ '{}' scrubbed: {} chunks ({}), took {}", m_RootDirectory, |