diff options
| author | Stefan Boberg <[email protected]> | 2023-12-11 11:48:23 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-12-11 11:48:23 +0100 |
| commit | 37920b41048acffa30cf156d7d36bfc17ba15c0e (patch) | |
| tree | 15c4f652a54470e359a9b9dcd194e89cb10eaaf9 /src/zenserver/projectstore | |
| parent | multi-line logging improvements (#597) (diff) | |
| download | zen-37920b41048acffa30cf156d7d36bfc17ba15c0e.tar.xz zen-37920b41048acffa30cf156d7d36bfc17ba15c0e.zip | |
improved scrubbing of oplogs and filecas (#596)
- Improvement: Scrub command now validates compressed buffer hashes in filecas storage (used for large chunks)
- Improvement: Added --dry, --no-gc and --no-cas options to zen scrub command
- Improvement: Implemented oplog scrubbing (previously was a no-op)
- Improvement: Implemented support for running scrubbint at startup with --scrub=<options>
Diffstat (limited to 'src/zenserver/projectstore')
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 261 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 13 |
2 files changed, 193 insertions, 81 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 9ba8e3a19..73cb35fb8 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -2,6 +2,7 @@ #include "projectstore.h" +#include <zencore/assertfmt.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryutil.h> @@ -298,38 +299,60 @@ struct ProjectStore::OplogStorage : public RefCounted Stopwatch Timer; - uint64_t InvalidEntries = 0; + uint64_t InvalidEntries = 0; + uint64_t TombstoneEntries = 0; std::vector<OplogEntry> OpLogEntries; std::vector<size_t> OplogOrder; { - tsl::robin_map<XXH3_128, size_t, XXH3_128::Hasher> LatestKeys; + tsl::robin_map<Oid, size_t, Oid::Hasher> LatestKeys; + const uint64_t SkipEntryCount = 0; + m_Oplog.Replay( [&](const OplogEntry& LogEntry) { - if (LogEntry.OpCoreSize == 0) + if (LogEntry.IsTombstone()) + { + if (auto It = LatestKeys.find(LogEntry.OpKeyHash); It == LatestKeys.end()) + { + ZEN_SCOPED_WARN("found tombstone referencing unknown key {}", LogEntry.OpKeyHash); + } + } + else { - ++InvalidEntries; - return; + if (LogEntry.OpCoreSize == 0) + { + ++InvalidEntries; + return; + } + + const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; + m_NextOpsOffset = + Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign)); + m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn); } - const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; - m_NextOpsOffset = - Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign)); - m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn); if (auto It = LatestKeys.find(LogEntry.OpKeyHash); It != LatestKeys.end()) { - OpLogEntries[It->second] = LogEntry; + OplogEntry& Entry = OpLogEntries[It->second]; + + if (LogEntry.IsTombstone() && Entry.IsTombstone()) + { + ZEN_SCOPED_WARN("found double tombstone - '{}'", LogEntry.OpKeyHash); + } + + Entry = LogEntry; } else { - size_t OpIndex = OpLogEntries.size(); + const size_t OpIndex = OpLogEntries.size(); LatestKeys[LogEntry.OpKeyHash] = OpIndex; OplogOrder.push_back(OpIndex); OpLogEntries.push_back(LogEntry); } }, - 0); + SkipEntryCount); } + std::sort(OplogOrder.begin(), OplogOrder.end(), [&](size_t Lhs, size_t Rhs) { const OplogEntry& LhsEntry = OpLogEntries[Lhs]; const OplogEntry& RhsEntry = OpLogEntries[Rhs]; @@ -342,47 +365,54 @@ struct ProjectStore::OplogStorage : public RefCounted { const OplogEntry& LogEntry = OpLogEntries[OplogOrderIndex]; - const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; - MemoryView OpBufferView = OpBlobsBuffer.MakeView(LogEntry.OpCoreSize, OpFileOffset); - if (OpBufferView.GetSize() == LogEntry.OpCoreSize) + if (LogEntry.IsTombstone()) + { + TombstoneEntries++; + } + else { // Verify checksum, ignore op data if incorrect - const auto OpCoreHash = uint32_t(XXH3_64bits(OpBufferView.GetData(), LogEntry.OpCoreSize) & 0xffffFFFF); - if (OpCoreHash != LogEntry.OpCoreHash) - { - ZEN_WARN("skipping oplog entry with bad checksum!"); - InvalidEntries++; - continue; - } - Handler(CbObjectView(OpBufferView.GetData()), LogEntry); - continue; - } + auto VerifyAndHandleOp = [&](MemoryView OpBufferView) { + const uint32_t OpCoreHash = uint32_t(XXH3_64bits(OpBufferView.GetData(), LogEntry.OpCoreSize) & 0xffffFFFF); - IoBuffer OpBuffer(LogEntry.OpCoreSize); - OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset); + if (OpCoreHash == LogEntry.OpCoreHash) + { + Handler(CbObjectView(OpBufferView.GetData()), LogEntry); + } + else + { + ZEN_WARN("skipping oplog entry with bad checksum!"); + InvalidEntries++; + } + }; - // Verify checksum, ignore op data if incorrect - const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), LogEntry.OpCoreSize) & 0xffffFFFF); + const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; + const MemoryView OpBufferView = OpBlobsBuffer.MakeView(LogEntry.OpCoreSize, OpFileOffset); + if (OpBufferView.GetSize() == LogEntry.OpCoreSize) + { + VerifyAndHandleOp(OpBufferView); + } + else + { + IoBuffer OpBuffer(LogEntry.OpCoreSize); + OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset); - if (OpCoreHash != LogEntry.OpCoreHash) - { - ZEN_WARN("skipping oplog entry with bad checksum!"); - InvalidEntries++; - continue; + VerifyAndHandleOp(OpBuffer); + } } - Handler(CbObjectView(OpBuffer.Data()), LogEntry); } if (InvalidEntries) { - ZEN_WARN("ignored {} zero-sized oplog entries", InvalidEntries); + ZEN_WARN("ignored {} invalid oplog entries", InvalidEntries); } - ZEN_INFO("Oplog replay completed in {} - Max LSN# {}, Next offset: {}", + ZEN_INFO("oplog replay completed in {} - Max LSN# {}, Next offset: {}, {} tombstones", NiceTimeSpanMs(Timer.GetElapsedTimeMs()), m_MaxLsn.load(), - m_NextOpsOffset.load()); + m_NextOpsOffset.load(), + TombstoneEntries); } void ReplayLogEntries(const std::span<OplogEntryAddress> Entries, std::function<void(CbObjectView)>&& Handler) @@ -418,7 +448,7 @@ struct ProjectStore::OplogStorage : public RefCounted return CbObject(SharedBuffer(std::move(OpBuffer))); } - OplogEntry AppendOp(SharedBuffer Buffer, uint32_t OpCoreHash, XXH3_128 KeyHash) + OplogEntry AppendOp(SharedBuffer Buffer, uint32_t OpCoreHash, Oid KeyHash) { ZEN_TRACE_CPU("Store::OplogStorage::AppendOp"); @@ -446,6 +476,14 @@ struct ProjectStore::OplogStorage : public RefCounted return Entry; } + void AppendTombstone(Oid KeyHash) + { + OplogEntry Entry = {.OpKeyHash = KeyHash}; + Entry.MakeTombstone(); + + m_Oplog.Append(Entry); + } + void Flush() { m_Oplog.Flush(); @@ -507,9 +545,67 @@ ProjectStore::Oplog::Flush() } void -ProjectStore::Oplog::ScrubStorage(ScrubContext& Ctx) const +ProjectStore::Oplog::ScrubStorage(ScrubContext& Ctx) { - ZEN_UNUSED(Ctx); + std::vector<Oid> BadEntryKeys; + + using namespace std::literals; + + IterateOplogWithKey([&](int Lsn, const Oid& Key, CbObjectView Op) { + ZEN_UNUSED(Lsn); + + std::vector<IoHash> Cids; + Op.IterateAttachments([&](CbFieldView Visitor) { Cids.emplace_back(Visitor.AsAttachment()); }); + + { + XXH3_128Stream KeyHasher; + Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); + XXH3_128 KeyHash128 = KeyHasher.GetHash(); + Oid KeyHash; + memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash); + + ZEN_ASSERT_FORMAT(KeyHash == Key, "oplog data does not match information from index (op:{} != index:{})", KeyHash, Key); + } + + for (const IoHash& Cid : Cids) + { + if (!m_CidStore.ContainsChunk(Cid)) + { + // oplog entry references a CAS chunk which is not + // present + BadEntryKeys.push_back(Key); + return; + } + if (Ctx.IsBadCid(Cid)) + { + // oplog entry references a CAS chunk which has been + // flagged as bad + BadEntryKeys.push_back(Key); + return; + } + } + }); + + if (!BadEntryKeys.empty()) + { + if (Ctx.RunRecovery()) + { + ZEN_WARN("scrubbing found {} bad ops in oplog @ '{}', these will be removed from the index", BadEntryKeys.size(), m_BasePath); + + // Actually perform some clean-up + RwLock::ExclusiveLockScope _(m_OplogLock); + + for (const auto& Key : BadEntryKeys) + { + m_LatestOpMap.erase(Key); + m_Storage->AppendTombstone(Key); + } + } + else + { + ZEN_WARN("scrubbing found {} bad ops in oplog @ '{}' but no cleanup will be performed", BadEntryKeys.size(), m_BasePath); + } + } } void @@ -658,6 +754,8 @@ ProjectStore::Oplog::Update(const std::filesystem::path& MarkerPath) void ProjectStore::Oplog::ReplayLog() { + ZEN_LOG_SCOPE("ReplayLog '{}'", m_OplogId); + RwLock::ExclusiveLockScope OplogLock(m_OplogLock); if (!m_Storage) { @@ -818,41 +916,55 @@ ProjectStore::Oplog::IterateOplogWithKey(std::function<void(int, const Oid&, CbO return; } - std::vector<size_t> EntryIndexes; - std::vector<OplogEntryAddress> Entries; - std::vector<Oid> Keys; - std::vector<int> LSNs; - Entries.reserve(m_LatestOpMap.size()); - EntryIndexes.reserve(m_LatestOpMap.size()); - Keys.reserve(m_LatestOpMap.size()); - LSNs.reserve(m_LatestOpMap.size()); + std::vector<OplogEntryAddress> SortedEntries; + std::vector<Oid> SortedKeys; + std::vector<int> SortedLSNs; - for (const auto& Kv : m_LatestOpMap) { - const auto AddressEntry = m_OpAddressMap.find(Kv.second); - ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); + const auto TargetEntryCount = m_LatestOpMap.size(); - Entries.push_back(AddressEntry->second); - Keys.push_back(Kv.first); - LSNs.push_back(Kv.second); - EntryIndexes.push_back(EntryIndexes.size()); - } + std::vector<size_t> EntryIndexes; + std::vector<OplogEntryAddress> Entries; + std::vector<Oid> Keys; + std::vector<int> LSNs; - std::sort(EntryIndexes.begin(), EntryIndexes.end(), [&Entries](const size_t& Lhs, const size_t& Rhs) { - const OplogEntryAddress& LhsEntry = Entries[Lhs]; - const OplogEntryAddress& RhsEntry = Entries[Rhs]; - return LhsEntry.Offset < RhsEntry.Offset; - }); - std::vector<OplogEntryAddress> SortedEntries; - SortedEntries.reserve(EntryIndexes.size()); - for (size_t Index : EntryIndexes) - { - SortedEntries.push_back(Entries[Index]); + Entries.reserve(TargetEntryCount); + EntryIndexes.reserve(TargetEntryCount); + Keys.reserve(TargetEntryCount); + LSNs.reserve(TargetEntryCount); + + for (const auto& Kv : m_LatestOpMap) + { + const auto AddressEntry = m_OpAddressMap.find(Kv.second); + ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); + + Entries.push_back(AddressEntry->second); + Keys.push_back(Kv.first); + LSNs.push_back(Kv.second); + EntryIndexes.push_back(EntryIndexes.size()); + } + + std::sort(EntryIndexes.begin(), EntryIndexes.end(), [&Entries](const size_t& Lhs, const size_t& Rhs) { + const OplogEntryAddress& LhsEntry = Entries[Lhs]; + const OplogEntryAddress& RhsEntry = Entries[Rhs]; + return LhsEntry.Offset < RhsEntry.Offset; + }); + + SortedEntries.reserve(EntryIndexes.size()); + SortedKeys.reserve(EntryIndexes.size()); + SortedLSNs.reserve(EntryIndexes.size()); + + for (size_t Index : EntryIndexes) + { + SortedEntries.push_back(Entries[Index]); + SortedKeys.push_back(Keys[Index]); + SortedLSNs.push_back(LSNs[Index]); + } } size_t EntryIndex = 0; m_Storage->ReplayLogEntries(SortedEntries, [&](CbObjectView Op) { - Handler(LSNs[EntryIndex], Keys[EntryIndex], Op); + Handler(SortedLSNs[EntryIndex], SortedKeys[EntryIndex], Op); EntryIndex++; }); } @@ -1030,7 +1142,7 @@ ProjectStore::Oplog::GetMapping(CbObjectView Core) } if (ClientPath.empty()) { - ZEN_WARN("invalid file for entry '{}', missing 'both 'clientpath'", Id); + ZEN_WARN("invalid file for entry '{}', missing 'clientpath' field", Id); continue; } @@ -1088,7 +1200,7 @@ ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, } m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize}); - m_LatestOpMap[OpEntry.OpKeyAsOId()] = OpEntry.OpLsn; + m_LatestOpMap[OpEntry.OpKeyHash] = OpEntry.OpLsn; return OpEntry.OpLsn; } @@ -1150,7 +1262,9 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core) XXH3_128Stream KeyHasher; Core["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); - XXH3_128 KeyHash = KeyHasher.GetHash(); + XXH3_128 KeyHash128 = KeyHasher.GetHash(); + Oid KeyHash; + memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash); RefPtr<OplogStorage> Storage; { @@ -1545,7 +1659,7 @@ ProjectStore::Project::ScrubStorage(ScrubContext& Ctx) { OpenOplog(OpLogId); } - IterateOplogs([&](const RwLock::SharedLockScope& ProjectLock, const Oplog& Ops) { + IterateOplogs([&](const RwLock::SharedLockScope& ProjectLock, Oplog& Ops) { if (!IsExpired(ProjectLock, GcClock::TimePoint::min(), Ops)) { Ops.ScrubStorage(Ctx); @@ -3290,7 +3404,8 @@ ProjectStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) if (!Project->IsExpired(Lock, Ctx.Settings.ProjectStoreExpireTime)) { ZEN_DEBUG( - "GCV2: projectstore [REMOVE EXPIRED] '{}': skipped garbage collect of project '{}'. Project no longer expired.", + "GCV2: projectstore [REMOVE EXPIRED] '{}': skipped garbage collect of project '{}'. Project no longer " + "expired.", m_ProjectBasePath, ProjectId); continue; diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index 57cda8ae7..5ebcd420c 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -31,14 +31,11 @@ struct OplogEntry uint32_t OpCoreOffset; // note: Multiple of alignment! uint32_t OpCoreSize; uint32_t OpCoreHash; // Used as checksum - XXH3_128 OpKeyHash; // XXH128_canonical_t + Oid OpKeyHash; + uint32_t Reserved; - inline Oid OpKeyAsOId() const - { - Oid Id; - memcpy(Id.OidBits, &OpKeyHash, sizeof Id.OidBits); - return Id; - } + inline bool IsTombstone() const { return OpCoreOffset == 0 && OpCoreSize == 0 && OpLsn == 0; } + inline void MakeTombstone() { OpLsn = OpCoreOffset = OpCoreSize = OpCoreHash = Reserved = 0; } }; struct OplogEntryAddress @@ -127,7 +124,7 @@ public: LoggerRef Log() { return m_OuterProject->Log(); } void Flush(); - void ScrubStorage(ScrubContext& Ctx) const; + void ScrubStorage(ScrubContext& Ctx); void GatherReferences(GcContext& GcCtx); static uint64_t TotalSize(const std::filesystem::path& BasePath); uint64_t TotalSize() const; |