aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-12-11 11:48:23 +0100
committerGitHub <[email protected]>2023-12-11 11:48:23 +0100
commit37920b41048acffa30cf156d7d36bfc17ba15c0e (patch)
tree15c4f652a54470e359a9b9dcd194e89cb10eaaf9 /src/zenserver/projectstore
parentmulti-line logging improvements (#597) (diff)
downloadzen-37920b41048acffa30cf156d7d36bfc17ba15c0e.tar.xz
zen-37920b41048acffa30cf156d7d36bfc17ba15c0e.zip
improved scrubbing of oplogs and filecas (#596)
- Improvement: Scrub command now validates compressed buffer hashes in filecas storage (used for large chunks) - Improvement: Added --dry, --no-gc and --no-cas options to zen scrub command - Improvement: Implemented oplog scrubbing (previously was a no-op) - Improvement: Implemented support for running scrubbint at startup with --scrub=<options>
Diffstat (limited to 'src/zenserver/projectstore')
-rw-r--r--src/zenserver/projectstore/projectstore.cpp261
-rw-r--r--src/zenserver/projectstore/projectstore.h13
2 files changed, 193 insertions, 81 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 9ba8e3a19..73cb35fb8 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -2,6 +2,7 @@
#include "projectstore.h"
+#include <zencore/assertfmt.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
#include <zencore/compactbinaryutil.h>
@@ -298,38 +299,60 @@ struct ProjectStore::OplogStorage : public RefCounted
Stopwatch Timer;
- uint64_t InvalidEntries = 0;
+ uint64_t InvalidEntries = 0;
+ uint64_t TombstoneEntries = 0;
std::vector<OplogEntry> OpLogEntries;
std::vector<size_t> OplogOrder;
{
- tsl::robin_map<XXH3_128, size_t, XXH3_128::Hasher> LatestKeys;
+ tsl::robin_map<Oid, size_t, Oid::Hasher> LatestKeys;
+ const uint64_t SkipEntryCount = 0;
+
m_Oplog.Replay(
[&](const OplogEntry& LogEntry) {
- if (LogEntry.OpCoreSize == 0)
+ if (LogEntry.IsTombstone())
+ {
+ if (auto It = LatestKeys.find(LogEntry.OpKeyHash); It == LatestKeys.end())
+ {
+ ZEN_SCOPED_WARN("found tombstone referencing unknown key {}", LogEntry.OpKeyHash);
+ }
+ }
+ else
{
- ++InvalidEntries;
- return;
+ if (LogEntry.OpCoreSize == 0)
+ {
+ ++InvalidEntries;
+ return;
+ }
+
+ const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
+ m_NextOpsOffset =
+ Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign));
+ m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn);
}
- const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
- m_NextOpsOffset =
- Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign));
- m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn);
if (auto It = LatestKeys.find(LogEntry.OpKeyHash); It != LatestKeys.end())
{
- OpLogEntries[It->second] = LogEntry;
+ OplogEntry& Entry = OpLogEntries[It->second];
+
+ if (LogEntry.IsTombstone() && Entry.IsTombstone())
+ {
+ ZEN_SCOPED_WARN("found double tombstone - '{}'", LogEntry.OpKeyHash);
+ }
+
+ Entry = LogEntry;
}
else
{
- size_t OpIndex = OpLogEntries.size();
+ const size_t OpIndex = OpLogEntries.size();
LatestKeys[LogEntry.OpKeyHash] = OpIndex;
OplogOrder.push_back(OpIndex);
OpLogEntries.push_back(LogEntry);
}
},
- 0);
+ SkipEntryCount);
}
+
std::sort(OplogOrder.begin(), OplogOrder.end(), [&](size_t Lhs, size_t Rhs) {
const OplogEntry& LhsEntry = OpLogEntries[Lhs];
const OplogEntry& RhsEntry = OpLogEntries[Rhs];
@@ -342,47 +365,54 @@ struct ProjectStore::OplogStorage : public RefCounted
{
const OplogEntry& LogEntry = OpLogEntries[OplogOrderIndex];
- const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
- MemoryView OpBufferView = OpBlobsBuffer.MakeView(LogEntry.OpCoreSize, OpFileOffset);
- if (OpBufferView.GetSize() == LogEntry.OpCoreSize)
+ if (LogEntry.IsTombstone())
+ {
+ TombstoneEntries++;
+ }
+ else
{
// Verify checksum, ignore op data if incorrect
- const auto OpCoreHash = uint32_t(XXH3_64bits(OpBufferView.GetData(), LogEntry.OpCoreSize) & 0xffffFFFF);
- if (OpCoreHash != LogEntry.OpCoreHash)
- {
- ZEN_WARN("skipping oplog entry with bad checksum!");
- InvalidEntries++;
- continue;
- }
- Handler(CbObjectView(OpBufferView.GetData()), LogEntry);
- continue;
- }
+ auto VerifyAndHandleOp = [&](MemoryView OpBufferView) {
+ const uint32_t OpCoreHash = uint32_t(XXH3_64bits(OpBufferView.GetData(), LogEntry.OpCoreSize) & 0xffffFFFF);
- IoBuffer OpBuffer(LogEntry.OpCoreSize);
- OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset);
+ if (OpCoreHash == LogEntry.OpCoreHash)
+ {
+ Handler(CbObjectView(OpBufferView.GetData()), LogEntry);
+ }
+ else
+ {
+ ZEN_WARN("skipping oplog entry with bad checksum!");
+ InvalidEntries++;
+ }
+ };
- // Verify checksum, ignore op data if incorrect
- const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), LogEntry.OpCoreSize) & 0xffffFFFF);
+ const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
+ const MemoryView OpBufferView = OpBlobsBuffer.MakeView(LogEntry.OpCoreSize, OpFileOffset);
+ if (OpBufferView.GetSize() == LogEntry.OpCoreSize)
+ {
+ VerifyAndHandleOp(OpBufferView);
+ }
+ else
+ {
+ IoBuffer OpBuffer(LogEntry.OpCoreSize);
+ OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset);
- if (OpCoreHash != LogEntry.OpCoreHash)
- {
- ZEN_WARN("skipping oplog entry with bad checksum!");
- InvalidEntries++;
- continue;
+ VerifyAndHandleOp(OpBuffer);
+ }
}
- Handler(CbObjectView(OpBuffer.Data()), LogEntry);
}
if (InvalidEntries)
{
- ZEN_WARN("ignored {} zero-sized oplog entries", InvalidEntries);
+ ZEN_WARN("ignored {} invalid oplog entries", InvalidEntries);
}
- ZEN_INFO("Oplog replay completed in {} - Max LSN# {}, Next offset: {}",
+ ZEN_INFO("oplog replay completed in {} - Max LSN# {}, Next offset: {}, {} tombstones",
NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
m_MaxLsn.load(),
- m_NextOpsOffset.load());
+ m_NextOpsOffset.load(),
+ TombstoneEntries);
}
void ReplayLogEntries(const std::span<OplogEntryAddress> Entries, std::function<void(CbObjectView)>&& Handler)
@@ -418,7 +448,7 @@ struct ProjectStore::OplogStorage : public RefCounted
return CbObject(SharedBuffer(std::move(OpBuffer)));
}
- OplogEntry AppendOp(SharedBuffer Buffer, uint32_t OpCoreHash, XXH3_128 KeyHash)
+ OplogEntry AppendOp(SharedBuffer Buffer, uint32_t OpCoreHash, Oid KeyHash)
{
ZEN_TRACE_CPU("Store::OplogStorage::AppendOp");
@@ -446,6 +476,14 @@ struct ProjectStore::OplogStorage : public RefCounted
return Entry;
}
+ void AppendTombstone(Oid KeyHash)
+ {
+ OplogEntry Entry = {.OpKeyHash = KeyHash};
+ Entry.MakeTombstone();
+
+ m_Oplog.Append(Entry);
+ }
+
void Flush()
{
m_Oplog.Flush();
@@ -507,9 +545,67 @@ ProjectStore::Oplog::Flush()
}
void
-ProjectStore::Oplog::ScrubStorage(ScrubContext& Ctx) const
+ProjectStore::Oplog::ScrubStorage(ScrubContext& Ctx)
{
- ZEN_UNUSED(Ctx);
+ std::vector<Oid> BadEntryKeys;
+
+ using namespace std::literals;
+
+ IterateOplogWithKey([&](int Lsn, const Oid& Key, CbObjectView Op) {
+ ZEN_UNUSED(Lsn);
+
+ std::vector<IoHash> Cids;
+ Op.IterateAttachments([&](CbFieldView Visitor) { Cids.emplace_back(Visitor.AsAttachment()); });
+
+ {
+ XXH3_128Stream KeyHasher;
+ Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
+ XXH3_128 KeyHash128 = KeyHasher.GetHash();
+ Oid KeyHash;
+ memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash);
+
+ ZEN_ASSERT_FORMAT(KeyHash == Key, "oplog data does not match information from index (op:{} != index:{})", KeyHash, Key);
+ }
+
+ for (const IoHash& Cid : Cids)
+ {
+ if (!m_CidStore.ContainsChunk(Cid))
+ {
+ // oplog entry references a CAS chunk which is not
+ // present
+ BadEntryKeys.push_back(Key);
+ return;
+ }
+ if (Ctx.IsBadCid(Cid))
+ {
+ // oplog entry references a CAS chunk which has been
+ // flagged as bad
+ BadEntryKeys.push_back(Key);
+ return;
+ }
+ }
+ });
+
+ if (!BadEntryKeys.empty())
+ {
+ if (Ctx.RunRecovery())
+ {
+ ZEN_WARN("scrubbing found {} bad ops in oplog @ '{}', these will be removed from the index", BadEntryKeys.size(), m_BasePath);
+
+ // Actually perform some clean-up
+ RwLock::ExclusiveLockScope _(m_OplogLock);
+
+ for (const auto& Key : BadEntryKeys)
+ {
+ m_LatestOpMap.erase(Key);
+ m_Storage->AppendTombstone(Key);
+ }
+ }
+ else
+ {
+ ZEN_WARN("scrubbing found {} bad ops in oplog @ '{}' but no cleanup will be performed", BadEntryKeys.size(), m_BasePath);
+ }
+ }
}
void
@@ -658,6 +754,8 @@ ProjectStore::Oplog::Update(const std::filesystem::path& MarkerPath)
void
ProjectStore::Oplog::ReplayLog()
{
+ ZEN_LOG_SCOPE("ReplayLog '{}'", m_OplogId);
+
RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
if (!m_Storage)
{
@@ -818,41 +916,55 @@ ProjectStore::Oplog::IterateOplogWithKey(std::function<void(int, const Oid&, CbO
return;
}
- std::vector<size_t> EntryIndexes;
- std::vector<OplogEntryAddress> Entries;
- std::vector<Oid> Keys;
- std::vector<int> LSNs;
- Entries.reserve(m_LatestOpMap.size());
- EntryIndexes.reserve(m_LatestOpMap.size());
- Keys.reserve(m_LatestOpMap.size());
- LSNs.reserve(m_LatestOpMap.size());
+ std::vector<OplogEntryAddress> SortedEntries;
+ std::vector<Oid> SortedKeys;
+ std::vector<int> SortedLSNs;
- for (const auto& Kv : m_LatestOpMap)
{
- const auto AddressEntry = m_OpAddressMap.find(Kv.second);
- ZEN_ASSERT(AddressEntry != m_OpAddressMap.end());
+ const auto TargetEntryCount = m_LatestOpMap.size();
- Entries.push_back(AddressEntry->second);
- Keys.push_back(Kv.first);
- LSNs.push_back(Kv.second);
- EntryIndexes.push_back(EntryIndexes.size());
- }
+ std::vector<size_t> EntryIndexes;
+ std::vector<OplogEntryAddress> Entries;
+ std::vector<Oid> Keys;
+ std::vector<int> LSNs;
- std::sort(EntryIndexes.begin(), EntryIndexes.end(), [&Entries](const size_t& Lhs, const size_t& Rhs) {
- const OplogEntryAddress& LhsEntry = Entries[Lhs];
- const OplogEntryAddress& RhsEntry = Entries[Rhs];
- return LhsEntry.Offset < RhsEntry.Offset;
- });
- std::vector<OplogEntryAddress> SortedEntries;
- SortedEntries.reserve(EntryIndexes.size());
- for (size_t Index : EntryIndexes)
- {
- SortedEntries.push_back(Entries[Index]);
+ Entries.reserve(TargetEntryCount);
+ EntryIndexes.reserve(TargetEntryCount);
+ Keys.reserve(TargetEntryCount);
+ LSNs.reserve(TargetEntryCount);
+
+ for (const auto& Kv : m_LatestOpMap)
+ {
+ const auto AddressEntry = m_OpAddressMap.find(Kv.second);
+ ZEN_ASSERT(AddressEntry != m_OpAddressMap.end());
+
+ Entries.push_back(AddressEntry->second);
+ Keys.push_back(Kv.first);
+ LSNs.push_back(Kv.second);
+ EntryIndexes.push_back(EntryIndexes.size());
+ }
+
+ std::sort(EntryIndexes.begin(), EntryIndexes.end(), [&Entries](const size_t& Lhs, const size_t& Rhs) {
+ const OplogEntryAddress& LhsEntry = Entries[Lhs];
+ const OplogEntryAddress& RhsEntry = Entries[Rhs];
+ return LhsEntry.Offset < RhsEntry.Offset;
+ });
+
+ SortedEntries.reserve(EntryIndexes.size());
+ SortedKeys.reserve(EntryIndexes.size());
+ SortedLSNs.reserve(EntryIndexes.size());
+
+ for (size_t Index : EntryIndexes)
+ {
+ SortedEntries.push_back(Entries[Index]);
+ SortedKeys.push_back(Keys[Index]);
+ SortedLSNs.push_back(LSNs[Index]);
+ }
}
size_t EntryIndex = 0;
m_Storage->ReplayLogEntries(SortedEntries, [&](CbObjectView Op) {
- Handler(LSNs[EntryIndex], Keys[EntryIndex], Op);
+ Handler(SortedLSNs[EntryIndex], SortedKeys[EntryIndex], Op);
EntryIndex++;
});
}
@@ -1030,7 +1142,7 @@ ProjectStore::Oplog::GetMapping(CbObjectView Core)
}
if (ClientPath.empty())
{
- ZEN_WARN("invalid file for entry '{}', missing 'both 'clientpath'", Id);
+ ZEN_WARN("invalid file for entry '{}', missing 'clientpath' field", Id);
continue;
}
@@ -1088,7 +1200,7 @@ ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock,
}
m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize});
- m_LatestOpMap[OpEntry.OpKeyAsOId()] = OpEntry.OpLsn;
+ m_LatestOpMap[OpEntry.OpKeyHash] = OpEntry.OpLsn;
return OpEntry.OpLsn;
}
@@ -1150,7 +1262,9 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core)
XXH3_128Stream KeyHasher;
Core["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
- XXH3_128 KeyHash = KeyHasher.GetHash();
+ XXH3_128 KeyHash128 = KeyHasher.GetHash();
+ Oid KeyHash;
+ memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash);
RefPtr<OplogStorage> Storage;
{
@@ -1545,7 +1659,7 @@ ProjectStore::Project::ScrubStorage(ScrubContext& Ctx)
{
OpenOplog(OpLogId);
}
- IterateOplogs([&](const RwLock::SharedLockScope& ProjectLock, const Oplog& Ops) {
+ IterateOplogs([&](const RwLock::SharedLockScope& ProjectLock, Oplog& Ops) {
if (!IsExpired(ProjectLock, GcClock::TimePoint::min(), Ops))
{
Ops.ScrubStorage(Ctx);
@@ -3290,7 +3404,8 @@ ProjectStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
if (!Project->IsExpired(Lock, Ctx.Settings.ProjectStoreExpireTime))
{
ZEN_DEBUG(
- "GCV2: projectstore [REMOVE EXPIRED] '{}': skipped garbage collect of project '{}'. Project no longer expired.",
+ "GCV2: projectstore [REMOVE EXPIRED] '{}': skipped garbage collect of project '{}'. Project no longer "
+ "expired.",
m_ProjectBasePath,
ProjectId);
continue;
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index 57cda8ae7..5ebcd420c 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -31,14 +31,11 @@ struct OplogEntry
uint32_t OpCoreOffset; // note: Multiple of alignment!
uint32_t OpCoreSize;
uint32_t OpCoreHash; // Used as checksum
- XXH3_128 OpKeyHash; // XXH128_canonical_t
+ Oid OpKeyHash;
+ uint32_t Reserved;
- inline Oid OpKeyAsOId() const
- {
- Oid Id;
- memcpy(Id.OidBits, &OpKeyHash, sizeof Id.OidBits);
- return Id;
- }
+ inline bool IsTombstone() const { return OpCoreOffset == 0 && OpCoreSize == 0 && OpLsn == 0; }
+ inline void MakeTombstone() { OpLsn = OpCoreOffset = OpCoreSize = OpCoreHash = Reserved = 0; }
};
struct OplogEntryAddress
@@ -127,7 +124,7 @@ public:
LoggerRef Log() { return m_OuterProject->Log(); }
void Flush();
- void ScrubStorage(ScrubContext& Ctx) const;
+ void ScrubStorage(ScrubContext& Ctx);
void GatherReferences(GcContext& GcCtx);
static uint64_t TotalSize(const std::filesystem::path& BasePath);
uint64_t TotalSize() const;