diff options
Diffstat (limited to 'src/zenstore/projectstore.cpp')
| -rw-r--r-- | src/zenstore/projectstore.cpp | 302 |
1 files changed, 236 insertions, 66 deletions
diff --git a/src/zenstore/projectstore.cpp b/src/zenstore/projectstore.cpp index 7570b8513..7e9ff50bb 100644 --- a/src/zenstore/projectstore.cpp +++ b/src/zenstore/projectstore.cpp @@ -774,70 +774,125 @@ struct ProjectStore::OplogStorage : public RefCounted ZEN_TRACE_CPU("Store::OplogStorage::ReplayLogEntries"); BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536); + const uint64_t BlobsSize = m_OpBlobs.FileSize(); for (ProjectStore::Oplog::PayloadIndex EntryOffset : Order) { const Oplog::OplogPayload& Entry = Entries[EntryOffset]; const uint64_t OpFileOffset = Entry.Address.Offset * m_OpsAlign; - MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset); - if (OpBufferView.GetSize() == Entry.Address.Size) + if (OpFileOffset + Entry.Address.Size > BlobsSize) { - if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); Error == CbValidateError::None) + ZEN_WARN("oplog '{}/{}': skipping op outside of file size - {}. Op offset: {}, Op size: {}, file size {}", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + Entry.Lsn.Number, + OpFileOffset, + Entry.Address.Size, + BlobsSize); + } + else + { + MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset); + if (OpBufferView.GetSize() == Entry.Address.Size) { - CbObjectView OpView(OpBufferView.GetData()); - if (OpView.GetSize() != OpBufferView.GetSize()) + if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); + Error == CbValidateError::None) { - ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}", - m_OwnerOplog->GetOuterProjectIdentifier(), - m_OwnerOplog->OplogId(), - Entry.Lsn.Number, - OpView.GetSize(), - OpBufferView.GetSize()); + CbObjectView OpView(OpBufferView.GetData()); + if (OpView.GetSize() != OpBufferView.GetSize()) + { + ZEN_WARN( + "oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + Entry.Lsn.Number, + OpView.GetSize(), + OpBufferView.GetSize()); + } + else + { + Handler(Entry.Lsn, OpView); + } } else { - Handler(Entry.Lsn, OpView); + ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + Entry.Lsn.Number, + ToString(Error)); } } else { - ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}", - m_OwnerOplog->GetOuterProjectIdentifier(), - m_OwnerOplog->OplogId(), - Entry.Lsn.Number, - ToString(Error)); - } - } - else - { - IoBuffer OpBuffer(Entry.Address.Size); - OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset); - OpBufferView = OpBuffer.GetView(); - if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); Error == CbValidateError::None) - { - CbObjectView OpView(OpBuffer.Data()); - if (OpView.GetSize() != OpBuffer.GetSize()) + IoBuffer OpBuffer(Entry.Address.Size); + OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset); + OpBufferView = OpBuffer.GetView(); + if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); + Error == CbValidateError::None) { - ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}", - m_OwnerOplog->GetOuterProjectIdentifier(), - m_OwnerOplog->OplogId(), - Entry.Lsn.Number, - OpView.GetSize(), - OpBuffer.GetSize()); + CbObjectView OpView(OpBuffer.Data()); + if (OpView.GetSize() != OpBuffer.GetSize()) + { + ZEN_WARN( + "oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + Entry.Lsn.Number, + OpView.GetSize(), + OpBuffer.GetSize()); + } + else + { + Handler(Entry.Lsn, OpView); + } } else { - Handler(Entry.Lsn, OpView); + ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + Entry.Lsn.Number, + ToString(Error)); } } + } + } + } + + void ReplayLogEntries(const std::span<const Oplog::OplogPayload> Entries, + const std::span<const Oplog::PayloadIndex> Order, + std::function<void(LogSequenceNumber Lsn, const IoBuffer& Buffer)>&& Handler) + { + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::OplogStorage::ReplayLogEntries"); + + BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536); + const uint64_t BlobsSize = m_OpBlobs.FileSize(); + + for (ProjectStore::Oplog::PayloadIndex EntryOffset : Order) + { + const Oplog::OplogPayload& Entry = Entries[EntryOffset]; + + const uint64_t OpFileOffset = Entry.Address.Offset * m_OpsAlign; + if (OpFileOffset + Entry.Address.Size > BlobsSize) + { + Handler(Entry.Lsn, {}); + } + else + { + MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset); + if (OpBufferView.GetSize() == Entry.Address.Size) + { + IoBuffer Buffer = IoBuffer(IoBuffer::Wrap, OpBufferView.GetData(), OpBufferView.GetSize()); + Handler(Entry.Lsn, Buffer); + } else { - ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}", - m_OwnerOplog->GetOuterProjectIdentifier(), - m_OwnerOplog->OplogId(), - Entry.Lsn.Number, - ToString(Error)); + IoBuffer OpBuffer(Entry.Address.Size); + OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset); + Handler(Entry.Lsn, OpBuffer); } } } @@ -1118,40 +1173,77 @@ ProjectStore::Oplog::Scrub(ScrubContext& Ctx) { ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_INFO("scrubbing oplog '{}/{}'", m_OuterProjectId, m_OplogId); + ZEN_ASSERT(m_Mode == EMode::kFull); + Stopwatch Timer; + std::atomic_uint64_t OpCount = 0; + std::atomic_uint64_t VerifiedOpBytes = 0; + + auto LogStats = MakeGuard([&] { + const uint32_t DurationMs = gsl::narrow<uint32_t>(Timer.GetElapsedTimeMs()); + + ZEN_INFO("oplog '{}/{}' scrubbed {} in {} from {} ops ({})", + m_OuterProjectId, + m_OplogId, + NiceBytes(VerifiedOpBytes.load()), + NiceTimeSpanMs(DurationMs), + OpCount.load(), + NiceRate(VerifiedOpBytes, DurationMs)); + }); + std::vector<std::pair<LogSequenceNumber, Oid>> BadEntries; using namespace std::literals; - IterateOplogWithKey([&](LogSequenceNumber Lsn, const Oid& Key, CbObjectView Op) { + IterateOplogWithKeyRaw([&](LogSequenceNumber Lsn, const Oid& Key, const IoBuffer& Buffer) { + Ctx.ThrowIfDeadlineExpired(); + + OpCount++; + VerifiedOpBytes += Buffer.GetSize(); + + if (!Buffer) { - const Oid KeyHash = ComputeOpKey(Op); - if (KeyHash != Key) + ZEN_WARN("Scrub: oplog data for op {} (Lns: {}) could not be read from disk", Key, Lsn.Number); + BadEntries.push_back({Lsn, Key}); + return; + } + { + MemoryView OpBufferView = Buffer.GetView(); + if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); Error != CbValidateError::None) { + ZEN_WARN("Scrub: oplog data for op {} (Lns: {}) is not valid compact binary. Error: {}", Key, Lsn.Number, ToString(Error)); BadEntries.push_back({Lsn, Key}); - ZEN_WARN("Scrub: oplog data does not match information from index (op:{} != index:{})", KeyHash, Key); return; } } - // TODO: Should we really delete an Op because it points to a missing or malformed Cid chunk? + CbObjectView OpView(Buffer.GetData()); + if (OpView.GetSize() != Buffer.GetSize()) + { + ZEN_WARN("Scrub: oplog payload size {} for op {} (Lns: {}) does not match object size {}", + Buffer.GetSize(), + Key, + Lsn.Number, + OpView.GetSize()); + BadEntries.push_back({Lsn, Key}); + return; + } - Op.IterateAttachments([&](CbFieldView Visitor) { - const IoHash Cid = Visitor.AsAttachment(); - if (Ctx.IsBadCid(Cid)) - { - // oplog entry references a CAS chunk which has been flagged as bad - BadEntries.push_back({Lsn, Key}); - return; - } - if (!m_CidStore.ContainsChunk(Cid)) + { + const Oid KeyHash = ComputeOpKey(OpView); + if (KeyHash != Key) { - // oplog entry references a CAS chunk which is not present BadEntries.push_back({Lsn, Key}); + ZEN_WARN("Scrub: oplog data for op {} (Lns: {}) does not match information from index (op:{} != index:{})", + Key, + Lsn.Number, + KeyHash, + Key); return; } - }); + } }); if (!BadEntries.empty()) @@ -2577,6 +2669,32 @@ ProjectStore::Oplog::IterateOplogWithKey(std::function<void(LogSequenceNumber, c } } +void +ProjectStore::Oplog::IterateOplogWithKeyRaw(std::function<void(LogSequenceNumber, const Oid&, const IoBuffer& Buffer)>&& Handler) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + tsl::robin_map<PayloadIndex, Oid, PayloadIndex::Hasher> ReverseKeyMap; + std::vector<PayloadIndex> ReplayOrder; + + { + RwLock::SharedLockScope _(m_OplogLock); + if (m_Storage) + { + ReplayOrder = GetSortedOpPayloadRangeLocked({}, &ReverseKeyMap); + if (!ReplayOrder.empty()) + { + uint32_t EntryIndex = 0; + m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber Lsn, const IoBuffer& Buffer) { + const PayloadIndex PayloadOffset = ReplayOrder[EntryIndex]; + Handler(Lsn, ReverseKeyMap.at(PayloadOffset), Buffer); + EntryIndex++; + }); + } + } + } +} + static constexpr uint32_t OplogMetaDataExpectedMagic = 0x6f'74'6d'62; // 'omta'; void @@ -3773,18 +3891,64 @@ void ProjectStore::Project::Scrub(ScrubContext& Ctx) { ZEN_MEMSCOPE(GetProjectstoreTag()); + + ZEN_INFO("scrubbing '{}'", ProjectRootDir); + // Scrubbing needs to check all existing oplogs std::vector<std::string> OpLogs = ScanForOplogs(); - for (const std::string& OpLogId : OpLogs) + + RwLock::SharedLockScope _(m_ProjectLock); + + std::atomic<bool> Abort; + std::atomic<bool> Pause; + ParallelWork Work(Abort, Pause, WorkerThreadPool::EMode::DisableBacklog); + + try { - OpenOplog(OpLogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); - } - IterateOplogs([&](const RwLock::SharedLockScope&, Oplog& Ops) { - if (!IsExpired(GcClock::TimePoint::min(), Ops)) + for (const std::string& OpLogId : OpLogs) { - Ops.Scrub(Ctx); + Ref<ProjectStore::Oplog> OpLog; + { + if (auto OpIt = m_Oplogs.find(OpLogId); OpIt != m_Oplogs.end()) + { + OpLog = OpIt->second; + } + else + { + std::filesystem::path OplogBasePath = BasePathForOplog(OpLogId); + if (ProjectStore::Oplog::ExistsAt(OplogBasePath)) + { + OpLog = new ProjectStore::Oplog( + Log(), + Identifier, + OpLogId, + m_CidStore, + OplogBasePath, + std::filesystem::path{}, + ProjectStore::Oplog::EMode::kFull); // We need it to be a full read so we can write a new index snapshot + OpLog->Read(); + } + } + } + + if (OpLog) + { + Work.ScheduleWork(Ctx.ThreadPool(), [OpLog, &Ctx](std::atomic<bool>& AbortFlag) { + if (!AbortFlag) + { + OpLog->Scrub(Ctx); + } + }); + } } - }); + Work.Wait(); + } + catch (const ScrubDeadlineExpiredException&) + { + ZEN_INFO("Scrubbing deadline expired, operation incomplete"); + Abort = true; + Work.Wait(); + } } uint64_t @@ -4454,7 +4618,10 @@ ProjectStore::GetProjectFiles(LoggerRef InLog, Project& Project, Oplog& Oplog, c if (WantsRawSizeField) { IoHash _; - if (CompressedBuffer::ValidateCompressedHeader(Payload, _, RawSizes[Index])) + if (CompressedBuffer::ValidateCompressedHeader(Payload, + _, + RawSizes[Index], + /*OutOptionalTotalCompressedSize*/ nullptr)) { if (WantsSizeField) { @@ -4611,7 +4778,10 @@ ProjectStore::GetProjectChunkInfos(LoggerRef InLog, Project& Project, Oplog& Opl { ZEN_ASSERT_SLOW(RawSizes[Index] == (uint64_t)-1); IoHash _; - if (CompressedBuffer::ValidateCompressedHeader(Payload, _, RawSizes[Index])) + if (CompressedBuffer::ValidateCompressedHeader(Payload, + _, + RawSizes[Index], + /*OutOptionalTotalCompressedSize*/ nullptr)) { if (WantsSizeField) { @@ -4722,7 +4892,7 @@ ProjectStore::GetChunkInfo(LoggerRef InLog, Project& Project, Oplog& Oplog, cons { IoHash RawHash; uint64_t RawSize; - bool IsCompressed = CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize); + bool IsCompressed = CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize, /*OutOptionalTotalCompressedSize*/ nullptr); if (!IsCompressed) { throw std::runtime_error( |