aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenstore')
-rw-r--r--src/zenstore/blockstore.cpp1
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp78
-rw-r--r--src/zenstore/cache/cacherpc.cpp5
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp50
-rw-r--r--src/zenstore/cidstore.cpp7
-rw-r--r--src/zenstore/compactcas.cpp49
-rw-r--r--src/zenstore/filecas.cpp109
-rw-r--r--src/zenstore/gc.cpp241
-rw-r--r--src/zenstore/include/zenstore/cache/cacheshared.h2
-rw-r--r--src/zenstore/include/zenstore/gc.h68
-rw-r--r--src/zenstore/include/zenstore/projectstore.h1
-rw-r--r--src/zenstore/include/zenstore/scrubcontext.h3
-rw-r--r--src/zenstore/projectstore.cpp302
-rw-r--r--src/zenstore/scrubcontext.cpp64
14 files changed, 613 insertions, 367 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 6c5b50f58..f97c98e08 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -1129,6 +1129,7 @@ BlockStore::IterateChunks(const std::span<const BlockStoreLocation>& ChunkLocati
++RangeEnd;
}
+ ZEN_LOG_SCOPE("iterating chunks from '{}'", GetBlockPath(m_BlocksBasePath, BlockIndex));
if (!Callback(BlockIndex, ChunkIndexRange.subspan(RangeStart, RangeEnd - RangeStart)))
{
return false;
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 67f587a7c..4f0d412ec 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -241,7 +241,10 @@ UpdateValueWithRawSizeAndHash(ZenCacheValue& Value)
{
if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
{
- return CompressedBuffer::ValidateCompressedHeader(Value.Value, Value.RawHash, Value.RawSize);
+ return CompressedBuffer::ValidateCompressedHeader(Value.Value,
+ Value.RawHash,
+ Value.RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr);
}
else
{
@@ -1623,7 +1626,10 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MetaData");
if (Location.IsFlagSet(DiskLocation::kCompressed))
{
- if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize))
+ if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value,
+ OutValue.RawHash,
+ OutValue.RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr))
{
OutValue = ZenCacheValue{};
AddToMemCache = false;
@@ -1752,7 +1758,10 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
{
if (OutValue.Value.GetContentType() == ZenContentType::kCompressedBinary)
{
- if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize))
+ if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value,
+ OutValue.RawHash,
+ OutValue.RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr))
{
OutValue = ZenCacheValue{};
}
@@ -1900,7 +1909,10 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
ZEN_TRACE_CPU("Z$::Bucket::Get::MetaData");
if (Location.IsFlagSet(DiskLocation::kCompressed))
{
- if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize))
+ if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value,
+ OutValue.RawHash,
+ OutValue.RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr))
{
OutValue = ZenCacheValue{};
m_DiskMissCount++;
@@ -2369,7 +2381,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
auto LogStats = MakeGuard([&] {
const uint32_t DurationMs = gsl::narrow<uint32_t>(Timer.GetElapsedTimeMs());
- ZEN_INFO("cache bucket '{}' scrubbed {}B in {} from {} chunks ({})",
+ ZEN_INFO("cache bucket '{}' scrubbed {} in {} from {} chunks ({})",
m_BucketName,
NiceBytes(VerifiedChunkBytes.load()),
NiceTimeSpanMs(DurationMs),
@@ -2402,10 +2414,10 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
const BucketPayload& Payload = m_Payloads[Kv.second];
const DiskLocation& Loc = Payload.Location;
+ Ctx.ThrowIfDeadlineExpired();
+
if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
- Ctx.ThrowIfDeadlineExpired();
-
ChunkCount.fetch_add(1);
VerifiedChunkBytes.fetch_add(Loc.Size());
@@ -2439,7 +2451,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
ReportBadKey(HashKey);
continue;
}
- if (!ValidateIoBuffer(Loc.GetContentType(), Buffer))
+ if (!ValidateIoBuffer(Loc.GetContentType(), std::move(Buffer)))
{
ReportBadKey(HashKey);
continue;
@@ -2478,7 +2490,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
ZenContentType ContentType = Payload.Location.GetContentType();
Buffer.SetContentType(ContentType);
- if (!ValidateIoBuffer(ContentType, Buffer))
+ if (!ValidateIoBuffer(ContentType, std::move(Buffer)))
{
ReportBadKey(Hash);
return true;
@@ -2501,7 +2513,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
ZenContentType ContentType = Payload.Location.GetContentType();
Buffer.SetContentType(ContentType);
- if (!ValidateIoBuffer(ContentType, Buffer))
+ if (!ValidateIoBuffer(ContentType, std::move(Buffer)))
{
ReportBadKey(Hash);
return true;
@@ -2522,7 +2534,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
if (!BadKeys.empty())
{
- ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir);
+ ZEN_WARN("Scrubbing found {} bad chunks out of {} in '{}'", BadKeys.size(), ChunkCount.load(), m_BucketDir);
if (Ctx.RunRecovery())
{
@@ -4443,35 +4455,33 @@ ZenCacheDiskLayer::Scrub(ScrubContext& Ctx)
ZEN_TRACE_CPU("Z$::ScrubStorage");
RwLock::SharedLockScope _(m_Lock);
- {
- std::vector<std::future<void>> Results;
- Results.reserve(m_Buckets.size());
- for (auto& Kv : m_Buckets)
- {
-# if 1
- Results.push_back(Ctx.ThreadPool().EnqueueTask(
- std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }},
- WorkerThreadPool::EMode::EnableBacklog));
-# else
- CacheBucket& Bucket = *Kv.second;
- Bucket.ScrubStorage(Ctx);
-# endif
- }
+ std::atomic<bool> Abort;
+ std::atomic<bool> Pause;
+ ParallelWork Work(Abort, Pause, WorkerThreadPool::EMode::DisableBacklog);
- for (auto& Result : Results)
- {
- if (Result.valid())
- {
- Result.wait();
- }
- }
- for (auto& Result : Results)
+ try
+ {
+ for (auto& Kv : m_Buckets)
{
- Result.get();
+ Ctx.ThrowIfDeadlineExpired();
+ Work.ScheduleWork(Ctx.ThreadPool(), [Bucket = Kv.second.get(), &Ctx](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag)
+ {
+ Bucket->ScrubStorage(Ctx);
+ }
+ });
}
+ Work.Wait();
+ }
+ catch (const ScrubDeadlineExpiredException&)
+ {
+ ZEN_INFO("Scrubbing deadline expired, operation incomplete");
+ Abort = true;
+ Work.Wait();
}
}
+
#endif // ZEN_WITH_TESTS
CacheStoreSize
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index 09a3d0afc..9e57b41c3 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -1757,7 +1757,10 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
else
{
IoHash _;
- if (CompressedBuffer::ValidateCompressedHeader(Payload, _, Request->RawSize))
+ if (CompressedBuffer::ValidateCompressedHeader(Payload,
+ _,
+ Request->RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr))
{
Request->Exists = true;
Request->RawSizeKnown = true;
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index a164f66c3..c0b433c51 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -4,7 +4,9 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryutil.h>
#include <zencore/compactbinaryvalidation.h>
+#include <zencore/compositebuffer.h>
#include <zencore/compress.h>
#include <zencore/except.h>
#include <zencore/filesystem.h>
@@ -71,48 +73,38 @@ IsKnownBadBucketName(std::string_view Bucket)
}
bool
-ValidateIoBuffer(ZenContentType ContentType, IoBuffer Buffer)
+ValidateIoBuffer(ZenContentType ContentType, IoBuffer&& Buffer)
{
ZEN_ASSERT_SLOW(Buffer.GetContentType() == ContentType);
if (ContentType == ZenContentType::kCbObject)
{
- CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All);
+ uint64_t BufferSize = Buffer.GetSize();
+ CbValidateError Error = CbValidateError::None;
+ CbObject Object = ValidateAndReadCompactBinaryObject(std::move(Buffer), Error);
if (Error == CbValidateError::None)
{
- return true;
- }
-
- ZEN_SCOPED_ERROR("compact binary validation failed: '{}'", ToString(Error));
-
- return false;
- }
- else if (ContentType == ZenContentType::kCompressedBinary)
- {
- IoBuffer MemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Buffer);
-
- IoHash HeaderRawHash;
- uint64_t RawSize = 0;
- if (!CompressedBuffer::ValidateCompressedHeader(MemoryBuffer, /* out */ HeaderRawHash, /* out */ RawSize))
- {
- ZEN_SCOPED_ERROR("compressed buffer header validation failed");
-
- return false;
+ if (Object.GetSize() == BufferSize)
+ {
+ return true;
+ }
+ else
+ {
+ ZEN_SCOPED_WARN("compact binary object size {} does not match payload size {}", Object.GetSize(), BufferSize);
+ return false;
+ }
}
-
- CompressedBuffer Compressed =
- CompressedBuffer::FromCompressed(SharedBuffer(MemoryBuffer), /* out */ HeaderRawHash, /* out */ RawSize);
- CompositeBuffer Decompressed = Compressed.DecompressToComposite();
- IoHash DecompressedHash = IoHash::HashBuffer(Decompressed);
-
- if (HeaderRawHash != DecompressedHash)
+ else
{
- ZEN_SCOPED_ERROR("decompressed hash {} differs from header hash {}", DecompressedHash, HeaderRawHash);
-
+ ZEN_SCOPED_WARN("compact binary validation failed: '{}'", ToString(Error));
return false;
}
}
+ else if (ContentType == ZenContentType::kCompressedBinary)
+ {
+ return ValidateCompressedBuffer(CompositeBuffer(SharedBuffer(std::move(Buffer))), /*OptionalExpectedHash*/ nullptr);
+ }
else
{
// No way to verify this kind of content (what is it exactly?)
diff --git a/src/zenstore/cidstore.cpp b/src/zenstore/cidstore.cpp
index 52d5df061..bedf91287 100644
--- a/src/zenstore/cidstore.cpp
+++ b/src/zenstore/cidstore.cpp
@@ -31,7 +31,8 @@ struct CidStore::Impl
#if ZEN_BUILD_DEBUG
IoHash VerifyRawHash;
uint64_t _;
- ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(ChunkData, VerifyRawHash, _) && RawHash == VerifyRawHash);
+ ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(ChunkData, VerifyRawHash, _, /*OutOptionalTotalCompressedSize*/ nullptr) &&
+ RawHash == VerifyRawHash);
#endif
IoBuffer Payload(ChunkData);
@@ -66,7 +67,9 @@ struct CidStore::Impl
#if ZEN_BUILD_DEBUG
IoHash VerifyRawHash;
uint64_t _;
- ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(ChunkData, VerifyRawHash, _) && RawHashes[Offset++] == VerifyRawHash);
+ ZEN_ASSERT(
+ CompressedBuffer::ValidateCompressedHeader(ChunkData, VerifyRawHash, _, /*OutOptionalTotalCompressedSize*/ nullptr) &&
+ RawHashes[Offset++] == VerifyRawHash);
#endif
Chunks.push_back(ChunkData);
Chunks.back().SetContentType(ZenContentType::kCompressedBinary);
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index e1f17fbb9..a5de5c448 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -557,6 +557,10 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
std::vector<BlockStoreLocation> ChunkLocations;
std::vector<IoHash> ChunkIndexToChunkHash;
+ std::atomic<bool> Abort;
+ std::atomic<bool> Pause;
+ ParallelWork Work(Abort, Pause, WorkerThreadPool::EMode::DisableBacklog);
+
try
{
RwLock::SharedLockScope _(m_LocationMapLock);
@@ -589,58 +593,57 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
}
IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
- IoHash RawHash;
- uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
+
+ if (!ValidateCompressedBuffer(CompositeBuffer(SharedBuffer(Buffer)), &Hash))
{
- if (RawHash == Hash)
- {
- // TODO: this should also hash the (decompressed) contents
- return true;
- }
+ BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); });
}
- BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); });
return true;
};
const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
- Ctx.ThrowIfDeadlineExpired();
-
ChunkCount.fetch_add(1);
ChunkBytes.fetch_add(Size);
const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
- IoHash RawHash;
- uint64_t RawSize;
- // TODO: Add API to verify compressed buffer without having to memory-map the whole file
- if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
+ if (!ValidateCompressedBuffer(CompositeBuffer(SharedBuffer(Buffer)), &Hash))
{
- if (RawHash == Hash)
- {
- // TODO: this should also hash the (decompressed) contents
- return true;
- }
+ BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); });
}
- BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); });
return true;
};
m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) {
- return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk, 0);
+ Ctx.ThrowIfDeadlineExpired();
+ Work.ScheduleWork(
+ Ctx.ThreadPool(),
+ [&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag)
+ {
+ m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk, 0);
+ }
+ });
+ return !Abort;
});
+ Work.Wait();
}
catch (const ScrubDeadlineExpiredException&)
{
ZEN_INFO("Scrubbing deadline expired, operation incomplete");
+ Abort = true;
+ Work.Wait();
}
Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
if (!BadKeys.empty())
{
- ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_RootDirectory / m_ContainerBaseName);
+ ZEN_WARN("Scrubbing found {} bad chunks out of {} in '{}'",
+ BadKeys.size(),
+ ChunkCount.load(),
+ m_RootDirectory / m_ContainerBaseName);
if (Ctx.RunRecovery())
{
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 13d881e0c..31b3a68c4 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -799,8 +799,11 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
ZEN_ASSERT(m_IsInitialized);
- std::vector<IoHash> BadHashes;
- uint64_t ChunkCount{0}, ChunkBytes{0};
+ RwLock BadKeysLock;
+ std::vector<IoHash> BadKeys;
+ auto ReportBadKey = [&](const IoHash& Key) { BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Key); }); };
+
+ uint64_t ChunkCount{0}, ChunkBytes{0};
int DiscoveredFilesNotInIndex = 0;
@@ -820,88 +823,56 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
ZEN_INFO("discovered {} files @ '{}' ({} not in index), scrubbing", m_Index.size(), m_RootDirectory, DiscoveredFilesNotInIndex);
- IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) {
- if (!Payload)
- {
- BadHashes.push_back(Hash);
- return;
- }
- ++ChunkCount;
- ChunkBytes += Payload.GetSize();
+ std::atomic<bool> Abort;
+ std::atomic<bool> Pause;
+ ParallelWork Work(Abort, Pause, WorkerThreadPool::EMode::DisableBacklog);
+
+ try
+ {
+ IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) {
+ Ctx.ThrowIfDeadlineExpired();
- IoBuffer InMemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Payload);
+ ChunkCount++;
+ ChunkBytes += Payload.GetSize();
- IoHash RawHash;
- uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(Payload, /* out */ RawHash, /* out */ RawSize))
- {
- if (RawHash == Hash)
+ if (!Payload)
{
- // Header hash matches the file name, full validation requires that
- // we check that the decompressed data hash also matches
+ ReportBadKey(Hash);
+ return;
+ }
- CompressedBuffer CompBuffer = CompressedBuffer::FromCompressedNoValidate(std::move(InMemoryBuffer));
+ Payload.MakeOwned();
- OodleCompressor Compressor;
- OodleCompressionLevel CompressionLevel;
- uint64_t BlockSize;
- if (CompBuffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
+ Work.ScheduleWork(Ctx.ThreadPool(), [&, Hash = IoHash(Hash), Payload = std::move(Payload)](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag)
{
- if (BlockSize == 0)
- {
- BlockSize = 256 * 1024;
- }
- else if (BlockSize < (1024 * 1024))
- {
- BlockSize = BlockSize * (1024 * 1024 / BlockSize);
- }
-
- std::unique_ptr<uint8_t[]> DecompressionBuffer(new uint8_t[BlockSize]);
-
- IoHashStream Hasher;
-
- uint64_t RawOffset = 0;
- while (RawSize)
+ if (!ValidateCompressedBuffer(CompositeBuffer(SharedBuffer(Payload)), &Hash))
{
- const uint64_t DecompressedBlockSize = Min(BlockSize, RawSize);
-
- bool Ok = CompBuffer.TryDecompressTo(MutableMemoryView((void*)DecompressionBuffer.get(), DecompressedBlockSize),
- RawOffset);
-
- if (Ok)
- {
- Hasher.Append(DecompressionBuffer.get(), DecompressedBlockSize);
- }
-
- RawSize -= DecompressedBlockSize;
- RawOffset += DecompressedBlockSize;
- }
-
- const IoHash FinalHash = Hasher.GetHash();
-
- if (FinalHash == Hash)
- {
- // all good
- return;
+ ReportBadKey(Hash);
}
}
- }
- }
-
- BadHashes.push_back(Hash);
- });
+ });
+ });
+ Work.Wait();
+ }
+ catch (const ScrubDeadlineExpiredException&)
+ {
+ ZEN_INFO("Scrubbing deadline expired, operation incomplete");
+ Abort = true;
+ Work.Wait();
+ }
Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
- if (!BadHashes.empty())
+ if (!BadKeys.empty())
{
- ZEN_WARN("file CAS scrubbing: {} bad chunks found @ '{}'", BadHashes.size(), m_RootDirectory);
+ ZEN_WARN("file CAS scrubbing: {} bad chunks out of {} found @ '{}'", BadKeys.size(), ChunkCount, m_RootDirectory);
if (Ctx.RunRecovery())
{
- ZEN_WARN("recovery: deleting backing files for {} bad chunks which were identified as bad", BadHashes.size());
+ ZEN_WARN("recovery: deleting backing files for {} bad chunks which were identified as bad", BadKeys.size());
- for (const IoHash& Hash : BadHashes)
+ for (const IoHash& Hash : BadKeys)
{
std::error_code Ec;
DeleteChunk(Hash, Ec);
@@ -914,14 +885,14 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
}
else
{
- ZEN_WARN("recovery: NOT deleting backing files for {} bad chunks", BadHashes.size());
+ ZEN_WARN("recovery: NOT deleting backing files for {} bad chunks", BadKeys.size());
}
}
// Let whomever it concerns know about the bad chunks. This could
// be used to invalidate higher level data structures more efficiently
// than a full validation pass might be able to do
- Ctx.ReportBadCidChunks(BadHashes);
+ Ctx.ReportBadCidChunks(BadKeys);
ZEN_INFO("file CAS @ '{}' scrubbed: {} chunks ({}), took {}",
m_RootDirectory,
diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp
index ba1bce974..a4a141577 100644
--- a/src/zenstore/gc.cpp
+++ b/src/zenstore/gc.cpp
@@ -1810,6 +1810,8 @@ GcScheduler::Shutdown()
ZEN_TRACE_CPU("GcScheduler::Shutdown");
ZEN_MEMSCOPE(GetGcTag());
+ m_GcManager.SetCancelGC(true);
+
if (static_cast<uint32_t>(GcSchedulerStatus::kStopped) != m_Status)
{
bool GcIsRunning = m_Status == static_cast<uint32_t>(GcSchedulerStatus::kRunning);
@@ -1817,18 +1819,18 @@ GcScheduler::Shutdown()
{
ZEN_INFO("Requesting cancel running garbage collection");
}
- m_GcManager.SetCancelGC(true);
m_Status = static_cast<uint32_t>(GcSchedulerStatus::kStopped);
- m_GcSignal.notify_one();
+ m_GcSignal.Set();
+ }
- if (m_GcThread.joinable())
+ if (m_GcThread.joinable())
+ {
+ bool GcIsRunning = m_Status == static_cast<uint32_t>(GcSchedulerStatus::kRunning);
+ if (GcIsRunning)
{
- if (GcIsRunning)
- {
- ZEN_INFO("Waiting for garbage collection to complete");
- }
- m_GcThread.join();
+ ZEN_INFO("Waiting for garbage collection to complete");
}
+ m_GcThread.join();
}
m_DiskUsageLog.Flush();
m_DiskUsageLog.Close();
@@ -1839,17 +1841,17 @@ GcScheduler::TriggerGc(const GcScheduler::TriggerGcParams& Params)
{
ZEN_MEMSCOPE(GetGcTag());
std::unique_lock Lock(m_GcMutex);
- if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status)
+
+ if (m_TriggerGcParams || m_TriggerScrubParams)
{
- m_TriggerGcParams = Params;
- uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle);
+ return false;
+ }
- if (m_Status.compare_exchange_strong(/* expected */ IdleState,
- /* desired */ static_cast<uint32_t>(GcSchedulerStatus::kRunning)))
- {
- m_GcSignal.notify_one();
- return true;
- }
+ if (static_cast<uint32_t>(GcSchedulerStatus::kStopped) != m_Status)
+ {
+ m_TriggerGcParams = Params;
+ m_GcSignal.Set();
+ return true;
}
return false;
}
@@ -1860,17 +1862,16 @@ GcScheduler::TriggerScrub(const TriggerScrubParams& Params)
ZEN_MEMSCOPE(GetGcTag());
std::unique_lock Lock(m_GcMutex);
- if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status)
+ if (m_TriggerGcParams || m_TriggerScrubParams)
{
- m_TriggerScrubParams = Params;
- uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle);
-
- if (m_Status.compare_exchange_strong(/* expected */ IdleState, /* desired */ static_cast<uint32_t>(GcSchedulerStatus::kRunning)))
- {
- m_GcSignal.notify_one();
+ return false;
+ }
- return true;
- }
+ if (static_cast<uint32_t>(GcSchedulerStatus::kStopped) != m_Status)
+ {
+ m_TriggerScrubParams = Params;
+ m_GcSignal.Set();
+ return true;
}
return false;
@@ -1977,8 +1978,6 @@ GcScheduler::AppendGCLog(std::string_view Id, GcClock::TimePoint StartTime, cons
MemoryView EntryBuffer(Blob.data(), Blob.size());
{
- RwLock::ExclusiveLockScope _(m_GcLogLock);
-
GcLogFile.Open(Path, BasicFile::Mode::kWrite);
uint64_t AppendPos = GcLogFile.FileSize();
@@ -2096,13 +2095,25 @@ GcScheduler::GetState() const
return Result;
}
+bool
+GcScheduler::IsManualTriggerPresent() const
+{
+ bool IsPending = Status() != GcSchedulerStatus::kStopped;
+ if (IsPending)
+ {
+ std::unique_lock Lock(m_GcMutex);
+ IsPending = m_TriggerGcParams || m_TriggerScrubParams;
+ }
+ return IsPending;
+}
+
void
GcScheduler::SchedulerThread()
{
ZEN_MEMSCOPE(GetGcTag());
SetCurrentThreadName("GcScheduler");
- std::chrono::seconds WaitTime{0};
+ std::chrono::seconds WaitTime{m_Config.Enabled ? std::chrono::seconds{0} : std::chrono::seconds::max()};
const std::chrono::seconds ShortWaitTime{5};
bool SilenceErrors = false;
@@ -2111,60 +2122,67 @@ GcScheduler::SchedulerThread()
(void)CheckDiskSpace();
std::chrono::seconds WaitedTime{0};
- bool Timeout = false;
+
+ std::optional<TriggerGcParams> TriggerGcParams;
+ std::optional<TriggerScrubParams> TriggerScrubParams;
+
+ ZEN_ASSERT(WaitTime.count() >= 0);
+ while (Status() != GcSchedulerStatus::kStopped)
{
- ZEN_ASSERT(WaitTime.count() >= 0);
- std::unique_lock Lock(m_GcMutex);
- while (!Timeout && (Status() != GcSchedulerStatus::kStopped))
- {
- std::chrono::seconds ShortWait = Min(WaitTime, ShortWaitTime);
- bool ShortTimeout = std::cv_status::timeout == m_GcSignal.wait_for(Lock, ShortWait);
+ std::chrono::seconds ShortWait = Min(WaitTime, ShortWaitTime);
+ bool ShortTimeout = !m_GcSignal.Wait(gsl::narrow<int>(ShortWait.count() * 1000));
- if (ShortTimeout)
+ if (ShortTimeout)
+ {
+ if (WaitTime > ShortWaitTime)
{
- if (WaitTime > ShortWaitTime)
+ DiskSpace Space = CheckDiskSpace();
+ if (!AreDiskWritesAllowed())
{
- DiskSpace Space = CheckDiskSpace();
- if (!AreDiskWritesAllowed())
- {
- ZEN_INFO("Triggering GC due to low disk space ({}) on {}", NiceBytes(Space.Free), m_Config.RootDirectory);
- Timeout = true;
- }
- WaitTime -= ShortWaitTime;
- }
- else
- {
- Timeout = true;
+ ZEN_INFO("Triggering GC due to low disk space ({}) on {}", NiceBytes(Space.Free), m_Config.RootDirectory);
+ break;
}
+ WaitTime -= ShortWaitTime;
}
else
{
- // We got a signal
break;
}
}
+ else
+ {
+ m_GcSignal.Reset();
+ {
+ std::unique_lock Lock(m_GcMutex);
+ TriggerGcParams = m_TriggerGcParams;
+ TriggerScrubParams = m_TriggerScrubParams;
+ }
+ break;
+ }
}
+ auto TriggerCleanup = MakeGuard([&]() {
+ if (TriggerGcParams || TriggerScrubParams)
+ {
+ std::unique_lock Lock(m_GcMutex);
+ m_TriggerGcParams.reset();
+ m_TriggerScrubParams.reset();
+ }
+ });
+
if (Status() == GcSchedulerStatus::kStopped)
{
break;
}
- if (!m_Config.Enabled && !m_TriggerScrubParams && !m_TriggerGcParams)
- {
- WaitTime = std::chrono::seconds::max();
- continue;
- }
-
- if (!Timeout && Status() == GcSchedulerStatus::kIdle)
- {
- continue;
- }
-
try
{
- bool DoGc = m_Config.Enabled;
- bool DoScrubbing = false;
+ bool ManualGcTriggered = false;
+ bool ManualScrubbingTriggered = false;
+ bool LowDiskSpaceGCTriggered = false;
+ bool HighDiskSpaceUsageGCTriggered = false;
+ bool TimeBasedGCTriggered = false;
+
std::chrono::seconds ScrubTimeslice = std::chrono::seconds::max();
bool DoDelete = true;
bool CollectSmallObjects = m_Config.CollectSmallObjects;
@@ -2189,16 +2207,33 @@ GcScheduler::SchedulerThread()
uint8_t NextAttachmentPassIndex =
ComputeAttachmentRange(m_AttachmentPassIndex, m_Config.AttachmentPassCount, AttachmentRangeMin, AttachmentRangeMax);
- bool LowDiskSpaceGCTriggered = false;
- bool HighDiskSpaceUsageGCTriggered = false;
- bool TimeBasedGCTriggered = false;
-
GcClock::TimePoint Now = GcClock::Now();
- if (m_TriggerGcParams)
+ if (TriggerScrubParams)
{
- const auto TriggerParams = m_TriggerGcParams.value();
- m_TriggerGcParams.reset();
+ ZEN_ASSERT_SLOW(!TriggerGcParams);
+ ZEN_INFO("Manual scrub triggered");
+ const auto TriggerParams = TriggerScrubParams.value();
+
+ ManualScrubbingTriggered = true;
+
+ if (!TriggerParams.SkipGc)
+ {
+ ManualGcTriggered = true;
+ }
+
+ if (TriggerParams.SkipCas)
+ {
+ SkipCid = true;
+ }
+
+ DoDelete = !TriggerParams.SkipDelete;
+ ScrubTimeslice = TriggerParams.MaxTimeslice;
+ }
+ else if (TriggerGcParams)
+ {
+ ZEN_INFO("Manual gc triggered");
+ const auto TriggerParams = TriggerGcParams.value();
CollectSmallObjects = TriggerParams.CollectSmallObjects;
@@ -2249,34 +2284,10 @@ GcScheduler::SchedulerThread()
{
EnableValidation = TriggerParams.EnableValidation.value();
}
- DoGc = true;
- }
-
- if (m_TriggerScrubParams)
- {
- DoScrubbing = true;
-
- if (m_TriggerScrubParams->SkipGc)
- {
- DoGc = false;
- }
-
- if (m_TriggerScrubParams->SkipCas)
- {
- SkipCid = true;
- }
-
- DoDelete = !m_TriggerScrubParams->SkipDelete;
- ScrubTimeslice = m_TriggerScrubParams->MaxTimeslice;
+ ManualGcTriggered = true;
}
- if (DoScrubbing)
- {
- ScrubStorage(DoDelete, SkipCid, ScrubTimeslice);
- m_TriggerScrubParams.reset();
- }
-
- if (!DoGc)
+ if (!ManualScrubbingTriggered && !ManualGcTriggered && !m_Config.Enabled)
{
continue;
}
@@ -2288,10 +2299,12 @@ GcScheduler::SchedulerThread()
GcClock::TimePoint BuildStoreExpireTime =
MaxBuildStoreDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxBuildStoreDuration;
- const GcStorageSize TotalSize = m_GcManager.TotalStorageSize();
-
- if (Timeout && Status() == GcSchedulerStatus::kIdle)
+ if (!ManualGcTriggered && !ManualScrubbingTriggered)
{
+ // Check for GC triggered by time/size limits
+
+ const GcStorageSize TotalSize = m_GcManager.TotalStorageSize();
+
DiskSpace Space = CheckDiskSpace();
const int64_t PressureGraphLength = 30;
@@ -2508,7 +2521,9 @@ GcScheduler::SchedulerThread()
}
continue;
}
+ }
+ {
uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle);
if (!m_Status.compare_exchange_strong(IdleState, static_cast<uint32_t>(GcSchedulerStatus::kRunning)))
{
@@ -2517,13 +2532,30 @@ GcScheduler::SchedulerThread()
}
}
- if (!SkipCid)
+ auto ResetState = MakeGuard([&]() {
+ uint32_t RunningState = static_cast<uint32_t>(GcSchedulerStatus::kRunning);
+ if (!m_Status.compare_exchange_strong(RunningState, static_cast<uint32_t>(GcSchedulerStatus::kIdle)))
+ {
+ ZEN_ASSERT(m_Status == static_cast<uint32_t>(GcSchedulerStatus::kStopped));
+ }
+ });
+
+ if (ManualScrubbingTriggered)
{
- m_AttachmentPassIndex = NextAttachmentPassIndex;
+ ScrubStorage(DoDelete, SkipCid, ScrubTimeslice);
+ if (!ManualGcTriggered)
+ {
+ continue;
+ }
}
if (PrepareDiskReserve())
{
+ if (!SkipCid)
+ {
+ m_AttachmentPassIndex = NextAttachmentPassIndex;
+ }
+
bool GcSuccess = CollectGarbage(CacheExpireTime,
ProjectStoreExpireTime,
BuildStoreExpireTime,
@@ -2596,13 +2628,6 @@ GcScheduler::SchedulerThread()
WaitTime = m_Config.MonitorInterval;
}
m_GcManager.SetCancelGC(false);
-
- uint32_t RunningState = static_cast<uint32_t>(GcSchedulerStatus::kRunning);
- if (!m_Status.compare_exchange_strong(RunningState, static_cast<uint32_t>(GcSchedulerStatus::kIdle)))
- {
- ZEN_ASSERT(m_Status == static_cast<uint32_t>(GcSchedulerStatus::kStopped));
- break;
- }
}
}
diff --git a/src/zenstore/include/zenstore/cache/cacheshared.h b/src/zenstore/include/zenstore/cache/cacheshared.h
index 8f40ae727..791720589 100644
--- a/src/zenstore/include/zenstore/cache/cacheshared.h
+++ b/src/zenstore/include/zenstore/cache/cacheshared.h
@@ -78,6 +78,6 @@ enum class PutStatus
};
bool IsKnownBadBucketName(std::string_view BucketName);
-bool ValidateIoBuffer(ZenContentType ContentType, IoBuffer Buffer);
+bool ValidateIoBuffer(ZenContentType ContentType, IoBuffer&& Buffer);
} // namespace zen
diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h
index 5150ecd42..734d2e5a7 100644
--- a/src/zenstore/include/zenstore/gc.h
+++ b/src/zenstore/include/zenstore/gc.h
@@ -488,10 +488,10 @@ public:
GcScheduler(GcManager& GcManager);
~GcScheduler();
- void Initialize(const GcSchedulerConfig& Config);
- void Shutdown();
- GcSchedulerStatus Status() const { return static_cast<GcSchedulerStatus>(m_Status.load()); }
- GcSchedulerState GetState() const;
+ void Initialize(const GcSchedulerConfig& Config);
+ void Shutdown();
+ bool IsManualTriggerPresent() const;
+ GcSchedulerState GetState() const;
struct TriggerGcParams
{
@@ -528,30 +528,31 @@ public:
bool CancelGC();
private:
- void SchedulerThread();
- bool ReclaimDiskReserve();
- bool PrepareDiskReserve();
- bool CollectGarbage(const GcClock::TimePoint& CacheExpireTime,
- const GcClock::TimePoint& ProjectStoreExpireTime,
- const GcClock::TimePoint& BuildStoreExpireTime,
- bool Delete,
- bool CollectSmallObjects,
- bool SkipCid,
- GcVersion UseGCVersion,
- uint32_t CompactBlockUsageThresholdPercent,
- bool Verbose,
- bool SingleThreaded,
- const IoHash& AttachmentRangeMin,
- const IoHash& AttachmentRangeMax,
- bool StoreCacheAttachmentMetaData,
- bool StoreProjectAttachmentMetaData,
- bool EnableValidation,
- bool SilenceErrors);
- void ScrubStorage(bool DoDelete, bool SkipCid, std::chrono::seconds TimeSlice);
- LoggerRef Log() { return m_Log; }
- virtual bool AreDiskWritesAllowed() const override { return !m_AreDiskWritesBlocked.load(); }
- DiskSpace CheckDiskSpace();
- void AppendGCLog(std::string_view Id, GcClock::TimePoint GcStartTime, const GcSettings& Settings, const GcResult& Result);
+ GcSchedulerStatus Status() const { return static_cast<GcSchedulerStatus>(m_Status.load()); }
+ void SchedulerThread();
+ bool ReclaimDiskReserve();
+ bool PrepareDiskReserve();
+ bool CollectGarbage(const GcClock::TimePoint& CacheExpireTime,
+ const GcClock::TimePoint& ProjectStoreExpireTime,
+ const GcClock::TimePoint& BuildStoreExpireTime,
+ bool Delete,
+ bool CollectSmallObjects,
+ bool SkipCid,
+ GcVersion UseGCVersion,
+ uint32_t CompactBlockUsageThresholdPercent,
+ bool Verbose,
+ bool SingleThreaded,
+ const IoHash& AttachmentRangeMin,
+ const IoHash& AttachmentRangeMax,
+ bool StoreCacheAttachmentMetaData,
+ bool StoreProjectAttachmentMetaData,
+ bool EnableValidation,
+ bool SilenceErrors);
+ void ScrubStorage(bool DoDelete, bool SkipCid, std::chrono::seconds TimeSlice);
+ LoggerRef Log() { return m_Log; }
+ virtual bool AreDiskWritesAllowed() const override { return !m_AreDiskWritesBlocked.load(); }
+ DiskSpace CheckDiskSpace();
+ void AppendGCLog(std::string_view Id, GcClock::TimePoint GcStartTime, const GcSettings& Settings, const GcResult& Result);
LoggerRef m_Log;
GcManager& m_GcManager;
@@ -571,18 +572,17 @@ private:
std::optional<GcResult> m_LastLightweightGCV2Result;
std::optional<GcResult> m_LastFullGCV2Result;
- std::atomic_uint32_t m_Status{};
- std::thread m_GcThread;
- mutable std::mutex m_GcMutex;
- std::condition_variable m_GcSignal;
+ std::atomic_uint32_t m_Status{};
+ std::thread m_GcThread;
+ mutable std::mutex m_GcMutex;
+ Event m_GcSignal;
+
std::optional<TriggerGcParams> m_TriggerGcParams;
std::optional<TriggerScrubParams> m_TriggerScrubParams;
std::atomic_bool m_AreDiskWritesBlocked = false;
TCasLogFile<DiskUsageWindow::DiskUsageEntry> m_DiskUsageLog;
DiskUsageWindow m_DiskUsageWindow;
-
- RwLock m_GcLogLock;
};
void gc_forcelink();
diff --git a/src/zenstore/include/zenstore/projectstore.h b/src/zenstore/include/zenstore/projectstore.h
index 258be5930..ad108f65b 100644
--- a/src/zenstore/include/zenstore/projectstore.h
+++ b/src/zenstore/include/zenstore/projectstore.h
@@ -133,6 +133,7 @@ public:
void IterateOplog(std::function<void(CbObjectView)>&& Fn, const Paging& EntryPaging);
void IterateOplogWithKey(std::function<void(LogSequenceNumber, const Oid&, CbObjectView)>&& Fn);
void IterateOplogWithKey(std::function<void(LogSequenceNumber, const Oid&, CbObjectView)>&& Fn, const Paging& EntryPaging);
+ void IterateOplogWithKeyRaw(std::function<void(LogSequenceNumber, const Oid&, const IoBuffer& Buffer)>&& Handler);
void IterateOplogLocked(std::function<void(CbObjectView)>&& Fn, const Paging& EntryPaging);
size_t GetOplogEntryCount() const;
diff --git a/src/zenstore/include/zenstore/scrubcontext.h b/src/zenstore/include/zenstore/scrubcontext.h
index 2f28cfec7..0562ca8c5 100644
--- a/src/zenstore/include/zenstore/scrubcontext.h
+++ b/src/zenstore/include/zenstore/scrubcontext.h
@@ -8,6 +8,7 @@
namespace zen {
class WorkerThreadPool;
+class CompositeBuffer;
/** Context object for data scrubbing
@@ -67,4 +68,6 @@ public:
~ScrubDeadlineExpiredException();
};
+bool ValidateCompressedBuffer(const CompositeBuffer& Buffer, const IoHash* OptionalExpectedHash);
+
} // namespace zen
diff --git a/src/zenstore/projectstore.cpp b/src/zenstore/projectstore.cpp
index 7570b8513..7e9ff50bb 100644
--- a/src/zenstore/projectstore.cpp
+++ b/src/zenstore/projectstore.cpp
@@ -774,70 +774,125 @@ struct ProjectStore::OplogStorage : public RefCounted
ZEN_TRACE_CPU("Store::OplogStorage::ReplayLogEntries");
BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536);
+ const uint64_t BlobsSize = m_OpBlobs.FileSize();
for (ProjectStore::Oplog::PayloadIndex EntryOffset : Order)
{
const Oplog::OplogPayload& Entry = Entries[EntryOffset];
const uint64_t OpFileOffset = Entry.Address.Offset * m_OpsAlign;
- MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset);
- if (OpBufferView.GetSize() == Entry.Address.Size)
+ if (OpFileOffset + Entry.Address.Size > BlobsSize)
{
- if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); Error == CbValidateError::None)
+ ZEN_WARN("oplog '{}/{}': skipping op outside of file size - {}. Op offset: {}, Op size: {}, file size {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ Entry.Lsn.Number,
+ OpFileOffset,
+ Entry.Address.Size,
+ BlobsSize);
+ }
+ else
+ {
+ MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset);
+ if (OpBufferView.GetSize() == Entry.Address.Size)
{
- CbObjectView OpView(OpBufferView.GetData());
- if (OpView.GetSize() != OpBufferView.GetSize())
+ if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default);
+ Error == CbValidateError::None)
{
- ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}",
- m_OwnerOplog->GetOuterProjectIdentifier(),
- m_OwnerOplog->OplogId(),
- Entry.Lsn.Number,
- OpView.GetSize(),
- OpBufferView.GetSize());
+ CbObjectView OpView(OpBufferView.GetData());
+ if (OpView.GetSize() != OpBufferView.GetSize())
+ {
+ ZEN_WARN(
+ "oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ Entry.Lsn.Number,
+ OpView.GetSize(),
+ OpBufferView.GetSize());
+ }
+ else
+ {
+ Handler(Entry.Lsn, OpView);
+ }
}
else
{
- Handler(Entry.Lsn, OpView);
+ ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ Entry.Lsn.Number,
+ ToString(Error));
}
}
else
{
- ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}",
- m_OwnerOplog->GetOuterProjectIdentifier(),
- m_OwnerOplog->OplogId(),
- Entry.Lsn.Number,
- ToString(Error));
- }
- }
- else
- {
- IoBuffer OpBuffer(Entry.Address.Size);
- OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset);
- OpBufferView = OpBuffer.GetView();
- if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); Error == CbValidateError::None)
- {
- CbObjectView OpView(OpBuffer.Data());
- if (OpView.GetSize() != OpBuffer.GetSize())
+ IoBuffer OpBuffer(Entry.Address.Size);
+ OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset);
+ OpBufferView = OpBuffer.GetView();
+ if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default);
+ Error == CbValidateError::None)
{
- ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}",
- m_OwnerOplog->GetOuterProjectIdentifier(),
- m_OwnerOplog->OplogId(),
- Entry.Lsn.Number,
- OpView.GetSize(),
- OpBuffer.GetSize());
+ CbObjectView OpView(OpBuffer.Data());
+ if (OpView.GetSize() != OpBuffer.GetSize())
+ {
+ ZEN_WARN(
+ "oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ Entry.Lsn.Number,
+ OpView.GetSize(),
+ OpBuffer.GetSize());
+ }
+ else
+ {
+ Handler(Entry.Lsn, OpView);
+ }
}
else
{
- Handler(Entry.Lsn, OpView);
+ ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ Entry.Lsn.Number,
+ ToString(Error));
}
}
+ }
+ }
+ }
+
+ void ReplayLogEntries(const std::span<const Oplog::OplogPayload> Entries,
+ const std::span<const Oplog::PayloadIndex> Order,
+ std::function<void(LogSequenceNumber Lsn, const IoBuffer& Buffer)>&& Handler)
+ {
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::OplogStorage::ReplayLogEntries");
+
+ BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536);
+ const uint64_t BlobsSize = m_OpBlobs.FileSize();
+
+ for (ProjectStore::Oplog::PayloadIndex EntryOffset : Order)
+ {
+ const Oplog::OplogPayload& Entry = Entries[EntryOffset];
+
+ const uint64_t OpFileOffset = Entry.Address.Offset * m_OpsAlign;
+ if (OpFileOffset + Entry.Address.Size > BlobsSize)
+ {
+ Handler(Entry.Lsn, {});
+ }
+ else
+ {
+ MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset);
+ if (OpBufferView.GetSize() == Entry.Address.Size)
+ {
+ IoBuffer Buffer = IoBuffer(IoBuffer::Wrap, OpBufferView.GetData(), OpBufferView.GetSize());
+ Handler(Entry.Lsn, Buffer);
+ }
else
{
- ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}",
- m_OwnerOplog->GetOuterProjectIdentifier(),
- m_OwnerOplog->OplogId(),
- Entry.Lsn.Number,
- ToString(Error));
+ IoBuffer OpBuffer(Entry.Address.Size);
+ OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset);
+ Handler(Entry.Lsn, OpBuffer);
}
}
}
@@ -1118,40 +1173,77 @@ ProjectStore::Oplog::Scrub(ScrubContext& Ctx)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_INFO("scrubbing oplog '{}/{}'", m_OuterProjectId, m_OplogId);
+
ZEN_ASSERT(m_Mode == EMode::kFull);
+ Stopwatch Timer;
+ std::atomic_uint64_t OpCount = 0;
+ std::atomic_uint64_t VerifiedOpBytes = 0;
+
+ auto LogStats = MakeGuard([&] {
+ const uint32_t DurationMs = gsl::narrow<uint32_t>(Timer.GetElapsedTimeMs());
+
+ ZEN_INFO("oplog '{}/{}' scrubbed {} in {} from {} ops ({})",
+ m_OuterProjectId,
+ m_OplogId,
+ NiceBytes(VerifiedOpBytes.load()),
+ NiceTimeSpanMs(DurationMs),
+ OpCount.load(),
+ NiceRate(VerifiedOpBytes, DurationMs));
+ });
+
std::vector<std::pair<LogSequenceNumber, Oid>> BadEntries;
using namespace std::literals;
- IterateOplogWithKey([&](LogSequenceNumber Lsn, const Oid& Key, CbObjectView Op) {
+ IterateOplogWithKeyRaw([&](LogSequenceNumber Lsn, const Oid& Key, const IoBuffer& Buffer) {
+ Ctx.ThrowIfDeadlineExpired();
+
+ OpCount++;
+ VerifiedOpBytes += Buffer.GetSize();
+
+ if (!Buffer)
{
- const Oid KeyHash = ComputeOpKey(Op);
- if (KeyHash != Key)
+ ZEN_WARN("Scrub: oplog data for op {} (Lns: {}) could not be read from disk", Key, Lsn.Number);
+ BadEntries.push_back({Lsn, Key});
+ return;
+ }
+ {
+ MemoryView OpBufferView = Buffer.GetView();
+ if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); Error != CbValidateError::None)
{
+ ZEN_WARN("Scrub: oplog data for op {} (Lns: {}) is not valid compact binary. Error: {}", Key, Lsn.Number, ToString(Error));
BadEntries.push_back({Lsn, Key});
- ZEN_WARN("Scrub: oplog data does not match information from index (op:{} != index:{})", KeyHash, Key);
return;
}
}
- // TODO: Should we really delete an Op because it points to a missing or malformed Cid chunk?
+ CbObjectView OpView(Buffer.GetData());
+ if (OpView.GetSize() != Buffer.GetSize())
+ {
+ ZEN_WARN("Scrub: oplog payload size {} for op {} (Lns: {}) does not match object size {}",
+ Buffer.GetSize(),
+ Key,
+ Lsn.Number,
+ OpView.GetSize());
+ BadEntries.push_back({Lsn, Key});
+ return;
+ }
- Op.IterateAttachments([&](CbFieldView Visitor) {
- const IoHash Cid = Visitor.AsAttachment();
- if (Ctx.IsBadCid(Cid))
- {
- // oplog entry references a CAS chunk which has been flagged as bad
- BadEntries.push_back({Lsn, Key});
- return;
- }
- if (!m_CidStore.ContainsChunk(Cid))
+ {
+ const Oid KeyHash = ComputeOpKey(OpView);
+ if (KeyHash != Key)
{
- // oplog entry references a CAS chunk which is not present
BadEntries.push_back({Lsn, Key});
+ ZEN_WARN("Scrub: oplog data for op {} (Lns: {}) does not match information from index (op:{} != index:{})",
+ Key,
+ Lsn.Number,
+ KeyHash,
+ Key);
return;
}
- });
+ }
});
if (!BadEntries.empty())
@@ -2577,6 +2669,32 @@ ProjectStore::Oplog::IterateOplogWithKey(std::function<void(LogSequenceNumber, c
}
}
+void
+ProjectStore::Oplog::IterateOplogWithKeyRaw(std::function<void(LogSequenceNumber, const Oid&, const IoBuffer& Buffer)>&& Handler)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ tsl::robin_map<PayloadIndex, Oid, PayloadIndex::Hasher> ReverseKeyMap;
+ std::vector<PayloadIndex> ReplayOrder;
+
+ {
+ RwLock::SharedLockScope _(m_OplogLock);
+ if (m_Storage)
+ {
+ ReplayOrder = GetSortedOpPayloadRangeLocked({}, &ReverseKeyMap);
+ if (!ReplayOrder.empty())
+ {
+ uint32_t EntryIndex = 0;
+ m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber Lsn, const IoBuffer& Buffer) {
+ const PayloadIndex PayloadOffset = ReplayOrder[EntryIndex];
+ Handler(Lsn, ReverseKeyMap.at(PayloadOffset), Buffer);
+ EntryIndex++;
+ });
+ }
+ }
+ }
+}
+
static constexpr uint32_t OplogMetaDataExpectedMagic = 0x6f'74'6d'62; // 'omta';
void
@@ -3773,18 +3891,64 @@ void
ProjectStore::Project::Scrub(ScrubContext& Ctx)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ ZEN_INFO("scrubbing '{}'", ProjectRootDir);
+
// Scrubbing needs to check all existing oplogs
std::vector<std::string> OpLogs = ScanForOplogs();
- for (const std::string& OpLogId : OpLogs)
+
+ RwLock::SharedLockScope _(m_ProjectLock);
+
+ std::atomic<bool> Abort;
+ std::atomic<bool> Pause;
+ ParallelWork Work(Abort, Pause, WorkerThreadPool::EMode::DisableBacklog);
+
+ try
{
- OpenOplog(OpLogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true);
- }
- IterateOplogs([&](const RwLock::SharedLockScope&, Oplog& Ops) {
- if (!IsExpired(GcClock::TimePoint::min(), Ops))
+ for (const std::string& OpLogId : OpLogs)
{
- Ops.Scrub(Ctx);
+ Ref<ProjectStore::Oplog> OpLog;
+ {
+ if (auto OpIt = m_Oplogs.find(OpLogId); OpIt != m_Oplogs.end())
+ {
+ OpLog = OpIt->second;
+ }
+ else
+ {
+ std::filesystem::path OplogBasePath = BasePathForOplog(OpLogId);
+ if (ProjectStore::Oplog::ExistsAt(OplogBasePath))
+ {
+ OpLog = new ProjectStore::Oplog(
+ Log(),
+ Identifier,
+ OpLogId,
+ m_CidStore,
+ OplogBasePath,
+ std::filesystem::path{},
+ ProjectStore::Oplog::EMode::kFull); // We need it to be a full read so we can write a new index snapshot
+ OpLog->Read();
+ }
+ }
+ }
+
+ if (OpLog)
+ {
+ Work.ScheduleWork(Ctx.ThreadPool(), [OpLog, &Ctx](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag)
+ {
+ OpLog->Scrub(Ctx);
+ }
+ });
+ }
}
- });
+ Work.Wait();
+ }
+ catch (const ScrubDeadlineExpiredException&)
+ {
+ ZEN_INFO("Scrubbing deadline expired, operation incomplete");
+ Abort = true;
+ Work.Wait();
+ }
}
uint64_t
@@ -4454,7 +4618,10 @@ ProjectStore::GetProjectFiles(LoggerRef InLog, Project& Project, Oplog& Oplog, c
if (WantsRawSizeField)
{
IoHash _;
- if (CompressedBuffer::ValidateCompressedHeader(Payload, _, RawSizes[Index]))
+ if (CompressedBuffer::ValidateCompressedHeader(Payload,
+ _,
+ RawSizes[Index],
+ /*OutOptionalTotalCompressedSize*/ nullptr))
{
if (WantsSizeField)
{
@@ -4611,7 +4778,10 @@ ProjectStore::GetProjectChunkInfos(LoggerRef InLog, Project& Project, Oplog& Opl
{
ZEN_ASSERT_SLOW(RawSizes[Index] == (uint64_t)-1);
IoHash _;
- if (CompressedBuffer::ValidateCompressedHeader(Payload, _, RawSizes[Index]))
+ if (CompressedBuffer::ValidateCompressedHeader(Payload,
+ _,
+ RawSizes[Index],
+ /*OutOptionalTotalCompressedSize*/ nullptr))
{
if (WantsSizeField)
{
@@ -4722,7 +4892,7 @@ ProjectStore::GetChunkInfo(LoggerRef InLog, Project& Project, Oplog& Oplog, cons
{
IoHash RawHash;
uint64_t RawSize;
- bool IsCompressed = CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize);
+ bool IsCompressed = CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize, /*OutOptionalTotalCompressedSize*/ nullptr);
if (!IsCompressed)
{
throw std::runtime_error(
diff --git a/src/zenstore/scrubcontext.cpp b/src/zenstore/scrubcontext.cpp
index fbcd7d33c..8f8ec09a7 100644
--- a/src/zenstore/scrubcontext.cpp
+++ b/src/zenstore/scrubcontext.cpp
@@ -2,6 +2,10 @@
#include "zenstore/scrubcontext.h"
+#include <zencore/compress.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/string.h>
#include <zencore/workthreadpool.h>
namespace zen {
@@ -62,4 +66,64 @@ ScrubContext::ThrowIfDeadlineExpired() const
throw ScrubDeadlineExpiredException();
}
+bool
+ValidateCompressedBuffer(const CompositeBuffer& Buffer, const IoHash* OptionalExpectedHash)
+{
+ IoHash HeaderRawHash;
+ uint64_t RawSize = 0;
+ uint64_t TotalCompressedSize = 0;
+ if (!CompressedBuffer::ValidateCompressedHeader(Buffer, HeaderRawHash, RawSize, &TotalCompressedSize))
+ {
+ if (OptionalExpectedHash)
+ {
+ ZEN_SCOPED_WARN("compressed buffer header validation failed for chunk with hash {}", *OptionalExpectedHash);
+ }
+ else
+ {
+ ZEN_SCOPED_WARN("compressed buffer header validation failed");
+ }
+ return false;
+ }
+
+ if (OptionalExpectedHash != nullptr && HeaderRawHash != (*OptionalExpectedHash))
+ {
+ ZEN_SCOPED_WARN("compressed buffer hash {} does not match expected hash {}", HeaderRawHash, *OptionalExpectedHash);
+ return false;
+ }
+
+ if (TotalCompressedSize != Buffer.GetSize())
+ {
+ ZEN_SCOPED_WARN("compressed buffer size does not match total compressed size in header for chunk {}", HeaderRawHash);
+ return false;
+ }
+
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Buffer, /* out */ HeaderRawHash, /* out */ RawSize);
+
+ IoHashStream HashStream;
+ if (!Compressed.DecompressToStream(
+ 0,
+ RawSize,
+ [&HashStream](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range) -> bool {
+ ZEN_UNUSED(SourceOffset, SourceSize, Offset);
+ for (const SharedBuffer& Segment : Range.GetSegments())
+ {
+ HashStream.Append(Segment);
+ }
+ return true;
+ }))
+ {
+ ZEN_SCOPED_WARN("compressed buffer could not be decompressed for chunk {}", HeaderRawHash);
+ return false;
+ }
+
+ IoHash DecompressedHash = HashStream.GetHash();
+
+ if (HeaderRawHash != DecompressedHash)
+ {
+ ZEN_SCOPED_WARN("decompressed hash {} differs from header hash {}", DecompressedHash, HeaderRawHash);
+ return false;
+ }
+ return true;
+}
+
} // namespace zen