aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-11-27 16:05:56 +0100
committerGitHub Enterprise <[email protected]>2025-11-27 16:05:56 +0100
commit4984e8cd5c38cf77c8cb978f75f808bce0577f2d (patch)
treec298828c6290a669500788f96f8ea25be41ff88a /src/zenstore/compactcas.cpp
parentremove bad assert (#670) (diff)
downloadzen-4984e8cd5c38cf77c8cb978f75f808bce0577f2d.tar.xz
zen-4984e8cd5c38cf77c8cb978f75f808bce0577f2d.zip
automatic scrub on startup (#667)
- Improvement: Deeper validation of data when scrub is activated (cas/cache/project) - Improvement: Enabled more multi threading when running scrub operations - Improvement: Added means to force a scrub operation at startup with a new release using ZEN_DATA_FORCE_SCRUB_VERSION variable in xmake.lua
Diffstat (limited to 'src/zenstore/compactcas.cpp')
-rw-r--r--src/zenstore/compactcas.cpp49
1 files changed, 26 insertions, 23 deletions
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index e1f17fbb9..a5de5c448 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -557,6 +557,10 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
std::vector<BlockStoreLocation> ChunkLocations;
std::vector<IoHash> ChunkIndexToChunkHash;
+ std::atomic<bool> Abort;
+ std::atomic<bool> Pause;
+ ParallelWork Work(Abort, Pause, WorkerThreadPool::EMode::DisableBacklog);
+
try
{
RwLock::SharedLockScope _(m_LocationMapLock);
@@ -589,58 +593,57 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
}
IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
- IoHash RawHash;
- uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
+
+ if (!ValidateCompressedBuffer(CompositeBuffer(SharedBuffer(Buffer)), &Hash))
{
- if (RawHash == Hash)
- {
- // TODO: this should also hash the (decompressed) contents
- return true;
- }
+ BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); });
}
- BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); });
return true;
};
const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
- Ctx.ThrowIfDeadlineExpired();
-
ChunkCount.fetch_add(1);
ChunkBytes.fetch_add(Size);
const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
- IoHash RawHash;
- uint64_t RawSize;
- // TODO: Add API to verify compressed buffer without having to memory-map the whole file
- if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
+ if (!ValidateCompressedBuffer(CompositeBuffer(SharedBuffer(Buffer)), &Hash))
{
- if (RawHash == Hash)
- {
- // TODO: this should also hash the (decompressed) contents
- return true;
- }
+ BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); });
}
- BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); });
return true;
};
m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) {
- return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk, 0);
+ Ctx.ThrowIfDeadlineExpired();
+ Work.ScheduleWork(
+ Ctx.ThreadPool(),
+ [&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag)
+ {
+ m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk, 0);
+ }
+ });
+ return !Abort;
});
+ Work.Wait();
}
catch (const ScrubDeadlineExpiredException&)
{
ZEN_INFO("Scrubbing deadline expired, operation incomplete");
+ Abort = true;
+ Work.Wait();
}
Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
if (!BadKeys.empty())
{
- ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_RootDirectory / m_ContainerBaseName);
+ ZEN_WARN("Scrubbing found {} bad chunks out of {} in '{}'",
+ BadKeys.size(),
+ ChunkCount.load(),
+ m_RootDirectory / m_ContainerBaseName);
if (Ctx.RunRecovery())
{