aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/filecas.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-12-11 11:48:23 +0100
committerGitHub <[email protected]>2023-12-11 11:48:23 +0100
commit37920b41048acffa30cf156d7d36bfc17ba15c0e (patch)
tree15c4f652a54470e359a9b9dcd194e89cb10eaaf9 /src/zenstore/filecas.cpp
parentmulti-line logging improvements (#597) (diff)
downloadzen-37920b41048acffa30cf156d7d36bfc17ba15c0e.tar.xz
zen-37920b41048acffa30cf156d7d36bfc17ba15c0e.zip
improved scrubbing of oplogs and filecas (#596)
- Improvement: Scrub command now validates compressed buffer hashes in filecas storage (used for large chunks) - Improvement: Added --dry, --no-gc and --no-cas options to zen scrub command - Improvement: Implemented oplog scrubbing (previously was a no-op) - Improvement: Implemented support for running scrubbint at startup with --scrub=<options>
Diffstat (limited to 'src/zenstore/filecas.cpp')
-rw-r--r--src/zenstore/filecas.cpp92
1 files changed, 76 insertions, 16 deletions
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 5da612e30..f18509758 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -846,6 +846,13 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
{
ZEN_TRACE_CPU("FileCas::ScrubStorage");
+ if (Ctx.IsSkipCas())
+ {
+ ZEN_INFO("SKIPPED scrubbing: '{}'", m_RootDirectory);
+ return;
+ }
+
+ Stopwatch Timer;
ZEN_INFO("scrubbing file CAS @ '{}'", m_RootDirectory);
ZEN_ASSERT(m_IsInitialized);
@@ -853,6 +860,8 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
std::vector<IoHash> BadHashes;
uint64_t ChunkCount{0}, ChunkBytes{0};
+ int DiscoveredFilesNotInIndex = 0;
+
{
std::vector<FileCasStrategy::FileCasIndexEntry> ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory);
RwLock::ExclusiveLockScope _(m_Lock);
@@ -862,10 +871,13 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
{
m_TotalSize.fetch_add(static_cast<uint64_t>(Entry.Size), std::memory_order::relaxed);
m_CasLog.Append({.Key = Entry.Key, .Size = Entry.Size});
+ ++DiscoveredFilesNotInIndex;
}
}
}
+ ZEN_INFO("discovered {} files @ '{}' ({} not in index), scrubbing", m_Index.size(), m_RootDirectory, DiscoveredFilesNotInIndex);
+
IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) {
if (!Payload)
{
@@ -875,25 +887,65 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
++ChunkCount;
ChunkBytes += Payload.GetSize();
+ IoBuffer InMemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Payload);
+
IoHash RawHash;
uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize))
+ if (CompressedBuffer::ValidateCompressedHeader(Payload, /* out */ RawHash, /* out */ RawSize))
{
- if (RawHash != Hash)
+ if (RawHash == Hash)
{
- // Hash mismatch
- BadHashes.push_back(Hash);
- return;
+ // Header hash matches the file name, full validation requires that
+ // we check that the decompressed data hash also matches
+
+ CompressedBuffer CompBuffer = CompressedBuffer::FromCompressedNoValidate(std::move(InMemoryBuffer));
+
+ OodleCompressor Compressor;
+ OodleCompressionLevel CompressionLevel;
+ uint64_t BlockSize;
+ if (CompBuffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
+ {
+ if (BlockSize == 0)
+ {
+ BlockSize = 256 * 1024;
+ }
+ else if (BlockSize < (1024 * 1024))
+ {
+ BlockSize = BlockSize * (1024 * 1024 / BlockSize);
+ }
+
+ std::unique_ptr<uint8_t[]> DecompressionBuffer(new uint8_t[BlockSize]);
+
+ IoHashStream Hasher;
+
+ uint64_t RawOffset = 0;
+ while (RawSize)
+ {
+ const uint64_t DecompressedBlockSize = Min(BlockSize, RawSize);
+
+ bool Ok = CompBuffer.TryDecompressTo(MutableMemoryView((void*)DecompressionBuffer.get(), DecompressedBlockSize),
+ RawOffset);
+
+ if (Ok)
+ {
+ Hasher.Append(DecompressionBuffer.get(), DecompressedBlockSize);
+ }
+
+ RawSize -= DecompressedBlockSize;
+ RawOffset += DecompressedBlockSize;
+ }
+
+ const IoHash FinalHash = Hasher.GetHash();
+
+ if (FinalHash == Hash)
+ {
+ // all good
+ return;
+ }
+ }
}
- return;
- }
-#if ZEN_WITH_TESTS
- IoHash ComputedHash = IoHash::HashBuffer(CompositeBuffer(SharedBuffer(std::move(Payload))));
- if (ComputedHash == Hash)
- {
- return;
}
-#endif
+
BadHashes.push_back(Hash);
});
@@ -901,7 +953,7 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
if (!BadHashes.empty())
{
- ZEN_WARN("file CAS scrubbing: {} bad chunks found", BadHashes.size());
+ ZEN_WARN("file CAS scrubbing: {} bad chunks found @ '{}'", BadHashes.size(), m_RootDirectory);
if (Ctx.RunRecovery())
{
@@ -914,10 +966,14 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
if (Ec)
{
- ZEN_WARN("failed to delete file for chunk {}", Hash);
+ ZEN_WARN("failed to delete file for chunk {}: {}", Hash, Ec.message());
}
}
}
+ else
+ {
+ ZEN_WARN("recovery: NOT deleting backing files for {} bad chunks", BadHashes.size());
+ }
}
// Let whomever it concerns know about the bad chunks. This could
@@ -925,7 +981,11 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
// than a full validation pass might be able to do
Ctx.ReportBadCidChunks(BadHashes);
- ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes));
+ ZEN_INFO("file CAS @ '{}' scrubbed: {} chunks ({}), took {}",
+ m_RootDirectory,
+ ChunkCount,
+ NiceBytes(ChunkBytes),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
}
void