aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/projectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-11-27 16:05:56 +0100
committerGitHub Enterprise <[email protected]>2025-11-27 16:05:56 +0100
commit4984e8cd5c38cf77c8cb978f75f808bce0577f2d (patch)
treec298828c6290a669500788f96f8ea25be41ff88a /src/zenstore/projectstore.cpp
parentremove bad assert (#670) (diff)
downloadzen-4984e8cd5c38cf77c8cb978f75f808bce0577f2d.tar.xz
zen-4984e8cd5c38cf77c8cb978f75f808bce0577f2d.zip
automatic scrub on startup (#667)
- Improvement: Deeper validation of data when scrub is activated (cas/cache/project) - Improvement: Enabled more multi threading when running scrub operations - Improvement: Added means to force a scrub operation at startup with a new release using ZEN_DATA_FORCE_SCRUB_VERSION variable in xmake.lua
Diffstat (limited to 'src/zenstore/projectstore.cpp')
-rw-r--r--src/zenstore/projectstore.cpp302
1 files changed, 236 insertions, 66 deletions
diff --git a/src/zenstore/projectstore.cpp b/src/zenstore/projectstore.cpp
index 7570b8513..7e9ff50bb 100644
--- a/src/zenstore/projectstore.cpp
+++ b/src/zenstore/projectstore.cpp
@@ -774,70 +774,125 @@ struct ProjectStore::OplogStorage : public RefCounted
ZEN_TRACE_CPU("Store::OplogStorage::ReplayLogEntries");
BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536);
+ const uint64_t BlobsSize = m_OpBlobs.FileSize();
for (ProjectStore::Oplog::PayloadIndex EntryOffset : Order)
{
const Oplog::OplogPayload& Entry = Entries[EntryOffset];
const uint64_t OpFileOffset = Entry.Address.Offset * m_OpsAlign;
- MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset);
- if (OpBufferView.GetSize() == Entry.Address.Size)
+ if (OpFileOffset + Entry.Address.Size > BlobsSize)
{
- if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); Error == CbValidateError::None)
+ ZEN_WARN("oplog '{}/{}': skipping op outside of file size - {}. Op offset: {}, Op size: {}, file size {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ Entry.Lsn.Number,
+ OpFileOffset,
+ Entry.Address.Size,
+ BlobsSize);
+ }
+ else
+ {
+ MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset);
+ if (OpBufferView.GetSize() == Entry.Address.Size)
{
- CbObjectView OpView(OpBufferView.GetData());
- if (OpView.GetSize() != OpBufferView.GetSize())
+ if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default);
+ Error == CbValidateError::None)
{
- ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}",
- m_OwnerOplog->GetOuterProjectIdentifier(),
- m_OwnerOplog->OplogId(),
- Entry.Lsn.Number,
- OpView.GetSize(),
- OpBufferView.GetSize());
+ CbObjectView OpView(OpBufferView.GetData());
+ if (OpView.GetSize() != OpBufferView.GetSize())
+ {
+ ZEN_WARN(
+ "oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ Entry.Lsn.Number,
+ OpView.GetSize(),
+ OpBufferView.GetSize());
+ }
+ else
+ {
+ Handler(Entry.Lsn, OpView);
+ }
}
else
{
- Handler(Entry.Lsn, OpView);
+ ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ Entry.Lsn.Number,
+ ToString(Error));
}
}
else
{
- ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}",
- m_OwnerOplog->GetOuterProjectIdentifier(),
- m_OwnerOplog->OplogId(),
- Entry.Lsn.Number,
- ToString(Error));
- }
- }
- else
- {
- IoBuffer OpBuffer(Entry.Address.Size);
- OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset);
- OpBufferView = OpBuffer.GetView();
- if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); Error == CbValidateError::None)
- {
- CbObjectView OpView(OpBuffer.Data());
- if (OpView.GetSize() != OpBuffer.GetSize())
+ IoBuffer OpBuffer(Entry.Address.Size);
+ OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset);
+ OpBufferView = OpBuffer.GetView();
+ if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default);
+ Error == CbValidateError::None)
{
- ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}",
- m_OwnerOplog->GetOuterProjectIdentifier(),
- m_OwnerOplog->OplogId(),
- Entry.Lsn.Number,
- OpView.GetSize(),
- OpBuffer.GetSize());
+ CbObjectView OpView(OpBuffer.Data());
+ if (OpView.GetSize() != OpBuffer.GetSize())
+ {
+ ZEN_WARN(
+ "oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ Entry.Lsn.Number,
+ OpView.GetSize(),
+ OpBuffer.GetSize());
+ }
+ else
+ {
+ Handler(Entry.Lsn, OpView);
+ }
}
else
{
- Handler(Entry.Lsn, OpView);
+ ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ Entry.Lsn.Number,
+ ToString(Error));
}
}
+ }
+ }
+ }
+
+ void ReplayLogEntries(const std::span<const Oplog::OplogPayload> Entries,
+ const std::span<const Oplog::PayloadIndex> Order,
+ std::function<void(LogSequenceNumber Lsn, const IoBuffer& Buffer)>&& Handler)
+ {
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::OplogStorage::ReplayLogEntries");
+
+ BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536);
+ const uint64_t BlobsSize = m_OpBlobs.FileSize();
+
+ for (ProjectStore::Oplog::PayloadIndex EntryOffset : Order)
+ {
+ const Oplog::OplogPayload& Entry = Entries[EntryOffset];
+
+ const uint64_t OpFileOffset = Entry.Address.Offset * m_OpsAlign;
+ if (OpFileOffset + Entry.Address.Size > BlobsSize)
+ {
+ Handler(Entry.Lsn, {});
+ }
+ else
+ {
+ MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset);
+ if (OpBufferView.GetSize() == Entry.Address.Size)
+ {
+ IoBuffer Buffer = IoBuffer(IoBuffer::Wrap, OpBufferView.GetData(), OpBufferView.GetSize());
+ Handler(Entry.Lsn, Buffer);
+ }
else
{
- ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}",
- m_OwnerOplog->GetOuterProjectIdentifier(),
- m_OwnerOplog->OplogId(),
- Entry.Lsn.Number,
- ToString(Error));
+ IoBuffer OpBuffer(Entry.Address.Size);
+ OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset);
+ Handler(Entry.Lsn, OpBuffer);
}
}
}
@@ -1118,40 +1173,77 @@ ProjectStore::Oplog::Scrub(ScrubContext& Ctx)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_INFO("scrubbing oplog '{}/{}'", m_OuterProjectId, m_OplogId);
+
ZEN_ASSERT(m_Mode == EMode::kFull);
+ Stopwatch Timer;
+ std::atomic_uint64_t OpCount = 0;
+ std::atomic_uint64_t VerifiedOpBytes = 0;
+
+ auto LogStats = MakeGuard([&] {
+ const uint32_t DurationMs = gsl::narrow<uint32_t>(Timer.GetElapsedTimeMs());
+
+ ZEN_INFO("oplog '{}/{}' scrubbed {} in {} from {} ops ({})",
+ m_OuterProjectId,
+ m_OplogId,
+ NiceBytes(VerifiedOpBytes.load()),
+ NiceTimeSpanMs(DurationMs),
+ OpCount.load(),
+ NiceRate(VerifiedOpBytes, DurationMs));
+ });
+
std::vector<std::pair<LogSequenceNumber, Oid>> BadEntries;
using namespace std::literals;
- IterateOplogWithKey([&](LogSequenceNumber Lsn, const Oid& Key, CbObjectView Op) {
+ IterateOplogWithKeyRaw([&](LogSequenceNumber Lsn, const Oid& Key, const IoBuffer& Buffer) {
+ Ctx.ThrowIfDeadlineExpired();
+
+ OpCount++;
+ VerifiedOpBytes += Buffer.GetSize();
+
+ if (!Buffer)
{
- const Oid KeyHash = ComputeOpKey(Op);
- if (KeyHash != Key)
+ ZEN_WARN("Scrub: oplog data for op {} (Lns: {}) could not be read from disk", Key, Lsn.Number);
+ BadEntries.push_back({Lsn, Key});
+ return;
+ }
+ {
+ MemoryView OpBufferView = Buffer.GetView();
+ if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); Error != CbValidateError::None)
{
+ ZEN_WARN("Scrub: oplog data for op {} (Lns: {}) is not valid compact binary. Error: {}", Key, Lsn.Number, ToString(Error));
BadEntries.push_back({Lsn, Key});
- ZEN_WARN("Scrub: oplog data does not match information from index (op:{} != index:{})", KeyHash, Key);
return;
}
}
- // TODO: Should we really delete an Op because it points to a missing or malformed Cid chunk?
+ CbObjectView OpView(Buffer.GetData());
+ if (OpView.GetSize() != Buffer.GetSize())
+ {
+ ZEN_WARN("Scrub: oplog payload size {} for op {} (Lns: {}) does not match object size {}",
+ Buffer.GetSize(),
+ Key,
+ Lsn.Number,
+ OpView.GetSize());
+ BadEntries.push_back({Lsn, Key});
+ return;
+ }
- Op.IterateAttachments([&](CbFieldView Visitor) {
- const IoHash Cid = Visitor.AsAttachment();
- if (Ctx.IsBadCid(Cid))
- {
- // oplog entry references a CAS chunk which has been flagged as bad
- BadEntries.push_back({Lsn, Key});
- return;
- }
- if (!m_CidStore.ContainsChunk(Cid))
+ {
+ const Oid KeyHash = ComputeOpKey(OpView);
+ if (KeyHash != Key)
{
- // oplog entry references a CAS chunk which is not present
BadEntries.push_back({Lsn, Key});
+ ZEN_WARN("Scrub: oplog data for op {} (Lns: {}) does not match information from index (op:{} != index:{})",
+ Key,
+ Lsn.Number,
+ KeyHash,
+ Key);
return;
}
- });
+ }
});
if (!BadEntries.empty())
@@ -2577,6 +2669,32 @@ ProjectStore::Oplog::IterateOplogWithKey(std::function<void(LogSequenceNumber, c
}
}
+void
+ProjectStore::Oplog::IterateOplogWithKeyRaw(std::function<void(LogSequenceNumber, const Oid&, const IoBuffer& Buffer)>&& Handler)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ tsl::robin_map<PayloadIndex, Oid, PayloadIndex::Hasher> ReverseKeyMap;
+ std::vector<PayloadIndex> ReplayOrder;
+
+ {
+ RwLock::SharedLockScope _(m_OplogLock);
+ if (m_Storage)
+ {
+ ReplayOrder = GetSortedOpPayloadRangeLocked({}, &ReverseKeyMap);
+ if (!ReplayOrder.empty())
+ {
+ uint32_t EntryIndex = 0;
+ m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber Lsn, const IoBuffer& Buffer) {
+ const PayloadIndex PayloadOffset = ReplayOrder[EntryIndex];
+ Handler(Lsn, ReverseKeyMap.at(PayloadOffset), Buffer);
+ EntryIndex++;
+ });
+ }
+ }
+ }
+}
+
static constexpr uint32_t OplogMetaDataExpectedMagic = 0x6f'74'6d'62; // 'omta';
void
@@ -3773,18 +3891,64 @@ void
ProjectStore::Project::Scrub(ScrubContext& Ctx)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ ZEN_INFO("scrubbing '{}'", ProjectRootDir);
+
// Scrubbing needs to check all existing oplogs
std::vector<std::string> OpLogs = ScanForOplogs();
- for (const std::string& OpLogId : OpLogs)
+
+ RwLock::SharedLockScope _(m_ProjectLock);
+
+ std::atomic<bool> Abort;
+ std::atomic<bool> Pause;
+ ParallelWork Work(Abort, Pause, WorkerThreadPool::EMode::DisableBacklog);
+
+ try
{
- OpenOplog(OpLogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true);
- }
- IterateOplogs([&](const RwLock::SharedLockScope&, Oplog& Ops) {
- if (!IsExpired(GcClock::TimePoint::min(), Ops))
+ for (const std::string& OpLogId : OpLogs)
{
- Ops.Scrub(Ctx);
+ Ref<ProjectStore::Oplog> OpLog;
+ {
+ if (auto OpIt = m_Oplogs.find(OpLogId); OpIt != m_Oplogs.end())
+ {
+ OpLog = OpIt->second;
+ }
+ else
+ {
+ std::filesystem::path OplogBasePath = BasePathForOplog(OpLogId);
+ if (ProjectStore::Oplog::ExistsAt(OplogBasePath))
+ {
+ OpLog = new ProjectStore::Oplog(
+ Log(),
+ Identifier,
+ OpLogId,
+ m_CidStore,
+ OplogBasePath,
+ std::filesystem::path{},
+ ProjectStore::Oplog::EMode::kFull); // We need it to be a full read so we can write a new index snapshot
+ OpLog->Read();
+ }
+ }
+ }
+
+ if (OpLog)
+ {
+ Work.ScheduleWork(Ctx.ThreadPool(), [OpLog, &Ctx](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag)
+ {
+ OpLog->Scrub(Ctx);
+ }
+ });
+ }
}
- });
+ Work.Wait();
+ }
+ catch (const ScrubDeadlineExpiredException&)
+ {
+ ZEN_INFO("Scrubbing deadline expired, operation incomplete");
+ Abort = true;
+ Work.Wait();
+ }
}
uint64_t
@@ -4454,7 +4618,10 @@ ProjectStore::GetProjectFiles(LoggerRef InLog, Project& Project, Oplog& Oplog, c
if (WantsRawSizeField)
{
IoHash _;
- if (CompressedBuffer::ValidateCompressedHeader(Payload, _, RawSizes[Index]))
+ if (CompressedBuffer::ValidateCompressedHeader(Payload,
+ _,
+ RawSizes[Index],
+ /*OutOptionalTotalCompressedSize*/ nullptr))
{
if (WantsSizeField)
{
@@ -4611,7 +4778,10 @@ ProjectStore::GetProjectChunkInfos(LoggerRef InLog, Project& Project, Oplog& Opl
{
ZEN_ASSERT_SLOW(RawSizes[Index] == (uint64_t)-1);
IoHash _;
- if (CompressedBuffer::ValidateCompressedHeader(Payload, _, RawSizes[Index]))
+ if (CompressedBuffer::ValidateCompressedHeader(Payload,
+ _,
+ RawSizes[Index],
+ /*OutOptionalTotalCompressedSize*/ nullptr))
{
if (WantsSizeField)
{
@@ -4722,7 +4892,7 @@ ProjectStore::GetChunkInfo(LoggerRef InLog, Project& Project, Oplog& Oplog, cons
{
IoHash RawHash;
uint64_t RawSize;
- bool IsCompressed = CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize);
+ bool IsCompressed = CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize, /*OutOptionalTotalCompressedSize*/ nullptr);
if (!IsCompressed)
{
throw std::runtime_error(