diff options
| author | Stefan Boberg <[email protected]> | 2023-12-11 11:48:23 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-12-11 11:48:23 +0100 |
| commit | 37920b41048acffa30cf156d7d36bfc17ba15c0e (patch) | |
| tree | 15c4f652a54470e359a9b9dcd194e89cb10eaaf9 /src | |
| parent | multi-line logging improvements (#597) (diff) | |
| download | zen-37920b41048acffa30cf156d7d36bfc17ba15c0e.tar.xz zen-37920b41048acffa30cf156d7d36bfc17ba15c0e.zip | |
improved scrubbing of oplogs and filecas (#596)
- Improvement: Scrub command now validates compressed buffer hashes in filecas storage (used for large chunks)
- Improvement: Added --dry, --no-gc and --no-cas options to zen scrub command
- Improvement: Implemented oplog scrubbing (previously was a no-op)
- Improvement: Implemented support for running scrubbint at startup with --scrub=<options>
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/admin_cmd.cpp | 13 | ||||
| -rw-r--r-- | src/zen/cmds/admin_cmd.h | 3 | ||||
| -rw-r--r-- | src/zencore/include/zencore/string.h | 13 | ||||
| -rw-r--r-- | src/zenserver/admin/admin.cpp | 20 | ||||
| -rw-r--r-- | src/zenserver/config.cpp | 4 | ||||
| -rw-r--r-- | src/zenserver/config.h | 1 | ||||
| -rw-r--r-- | src/zenserver/main.cpp | 19 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 261 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 13 | ||||
| -rw-r--r-- | src/zenserver/zenserver.cpp | 68 | ||||
| -rw-r--r-- | src/zenserver/zenserver.h | 2 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 38 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 92 | ||||
| -rw-r--r-- | src/zenstore/gc.cpp | 13 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/gc.h | 4 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/scrubcontext.h | 5 | ||||
| -rw-r--r-- | src/zenstore/scrubcontext.cpp | 7 |
17 files changed, 439 insertions, 137 deletions
diff --git a/src/zen/cmds/admin_cmd.cpp b/src/zen/cmds/admin_cmd.cpp index 50510ce03..1bde785c7 100644 --- a/src/zen/cmds/admin_cmd.cpp +++ b/src/zen/cmds/admin_cmd.cpp @@ -20,6 +20,9 @@ ScrubCommand::ScrubCommand() { m_Options.add_options()("h,help", "Print help"); m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); + m_Options.add_option("", "n", "dry", "Dry run (do not delete any data)", cxxopts::value(m_DryRun), "<bool>"); + m_Options.add_option("", "", "no-gc", "Do not perform GC after scrub pass", cxxopts::value(m_NoGc), "<bool>"); + m_Options.add_option("", "", "no-cas", "Do not scrub CAS stores", cxxopts::value(m_NoCas), "<bool>"); } ScrubCommand::~ScrubCommand() = default; @@ -41,11 +44,13 @@ ScrubCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw OptionParseException("unable to resolve server specification"); } - zen::HttpClient Http(m_HostName); + HttpClient Http(m_HostName); + + HttpClient::KeyValueMap Params{{"skipdelete", ToString(m_DryRun)}, {"skipgc", ToString(m_NoGc)}, {"skipcid", ToString(m_NoCas)}}; - if (zen::HttpClient::Response Response = Http.Post("/admin/scrub"sv)) + if (HttpClient::Response Response = Http.Post("/admin/scrub"sv, /* headers */ HttpClient::KeyValueMap{}, Params)) { - ZEN_CONSOLE("OK: {}", Response.ToText()); + ZEN_CONSOLE("scrub started OK: {}", Response.ToText()); return 0; } @@ -78,7 +83,7 @@ GcCommand::GcCommand() "<smallobjects>"); m_Options.add_option("", "", "skipcid", "Skip collection of CAS data", cxxopts::value(m_SkipCid)->default_value("false"), "<skipcid>"); m_Options.add_option("", - "", + "n", "skipdelete", "Skip deletion of data (dryrun)", cxxopts::value(m_SkipDelete)->default_value("false"), diff --git a/src/zen/cmds/admin_cmd.h b/src/zen/cmds/admin_cmd.h index b5741c666..12029d57e 100644 --- a/src/zen/cmds/admin_cmd.h +++ b/src/zen/cmds/admin_cmd.h @@ -22,6 +22,9 @@ public: private: cxxopts::Options m_Options{"scrub", "Scrub zen storage"}; std::string m_HostName; + bool m_DryRun = false; + bool m_NoGc = false; + bool m_NoCas = false; }; /** Garbage collect storage diff --git a/src/zencore/include/zencore/string.h b/src/zencore/include/zencore/string.h index e3de2224c..3aec1647d 100644 --- a/src/zencore/include/zencore/string.h +++ b/src/zencore/include/zencore/string.h @@ -809,6 +809,19 @@ ForEachStrTok(const std::string_view& Str, char Delim, Fn&& Func) ////////////////////////////////////////////////////////////////////////// +inline std::string_view +ToString(bool Value) +{ + using namespace std::literals; + if (Value) + { + return "true"sv; + } + return "false"sv; +} + +////////////////////////////////////////////////////////////////////////// + inline int32_t StrCaseCompare(const char* Lhs, const char* Rhs, int64_t Length = -1) { diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp index 1d5463a32..a0c90c9a0 100644 --- a/src/zenserver/admin/admin.cpp +++ b/src/zenserver/admin/admin.cpp @@ -405,10 +405,30 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, GcScheduler::TriggerScrubParams ScrubParams; ScrubParams.MaxTimeslice = std::chrono::seconds(100); + + if (auto Param = Params.GetValue("skipdelete"); Param.empty() == false) + { + ScrubParams.SkipDelete = (Param == "true"sv); + } + + if (auto Param = Params.GetValue("skipgc"); Param.empty() == false) + { + ScrubParams.SkipGc = (Param == "true"sv); + } + + if (auto Param = Params.GetValue("skipcid"); Param.empty() == false) + { + ScrubParams.SkipCas = (Param == "true"sv); + } + m_GcScheduler.TriggerScrub(ScrubParams); CbObjectWriter Response; Response << "ok"sv << true; + Response << "skip_delete" << ScrubParams.SkipDelete; + Response << "skip_gc" << ScrubParams.SkipGc; + Response << "skip_cas" << ScrubParams.SkipCas; + Response << "max_time" << TimeSpan(0, 0, gsl::narrow<int>(ScrubParams.MaxTimeslice.count())); HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); }, HttpVerb::kPost); diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp index 3fe0b0c63..5f2c3351e 100644 --- a/src/zenserver/config.cpp +++ b/src/zenserver/config.cpp @@ -518,6 +518,10 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) options.add_options()("clean", "Clean out all state at startup", cxxopts::value<bool>(ServerOptions.IsCleanStart)->default_value("false")); + options.add_options()("scrub", + "Validate state at startup", + cxxopts::value(ServerOptions.ScrubOptions)->implicit_value("yes"), + "(nocas,nogc,nodelete,yes,no)*"); options.add_options()("help", "Show command line help"); options.add_options()("t, test", "Enable test mode", cxxopts::value<bool>(ServerOptions.IsTest)->default_value("false")); options.add_options()("data-dir", "Specify persistence root", cxxopts::value<std::string>(DataDir)); diff --git a/src/zenserver/config.h b/src/zenserver/config.h index 11311f9d8..cd2d92523 100644 --- a/src/zenserver/config.h +++ b/src/zenserver/config.h @@ -153,6 +153,7 @@ struct ZenServerOptions bool ObjectStoreEnabled = false; bool NoConsoleOutput = false; // Control default use of stdout for diagnostics std::string Loggers[zen::logging::level::LogLevelCount]; + std::string ScrubOptions; #if ZEN_WITH_TRACE std::string TraceHost; // Host name or IP address to send trace data to std::string TraceFile; // Path of a file to write a trace diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp index 6901323e3..868b5131c 100644 --- a/src/zenserver/main.cpp +++ b/src/zenserver/main.cpp @@ -338,9 +338,24 @@ main(int argc, char* argv[]) ZenServerOptions ServerOptions; ParseCliOptions(argc, argv, ServerOptions); - if (ServerOptions.IsCleanStart || !ServerOptions.BaseSnapshotDir.empty()) + std::string_view DeleteReason; + + if (ServerOptions.IsCleanStart) + { + DeleteReason = "clean start requested"sv; + } + else if (!ServerOptions.BaseSnapshotDir.empty()) + { + DeleteReason = "will initialize state from base snapshot"sv; + } + + if (!DeleteReason.empty()) { - DeleteDirectories(ServerOptions.DataDir); + if (std::filesystem::exists(ServerOptions.DataDir)) + { + ZEN_CONSOLE_INFO("deleting files from '{}' ({})", ServerOptions.DataDir, DeleteReason); + DeleteDirectories(ServerOptions.DataDir); + } } if (!std::filesystem::exists(ServerOptions.DataDir)) diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 9ba8e3a19..73cb35fb8 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -2,6 +2,7 @@ #include "projectstore.h" +#include <zencore/assertfmt.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryutil.h> @@ -298,38 +299,60 @@ struct ProjectStore::OplogStorage : public RefCounted Stopwatch Timer; - uint64_t InvalidEntries = 0; + uint64_t InvalidEntries = 0; + uint64_t TombstoneEntries = 0; std::vector<OplogEntry> OpLogEntries; std::vector<size_t> OplogOrder; { - tsl::robin_map<XXH3_128, size_t, XXH3_128::Hasher> LatestKeys; + tsl::robin_map<Oid, size_t, Oid::Hasher> LatestKeys; + const uint64_t SkipEntryCount = 0; + m_Oplog.Replay( [&](const OplogEntry& LogEntry) { - if (LogEntry.OpCoreSize == 0) + if (LogEntry.IsTombstone()) + { + if (auto It = LatestKeys.find(LogEntry.OpKeyHash); It == LatestKeys.end()) + { + ZEN_SCOPED_WARN("found tombstone referencing unknown key {}", LogEntry.OpKeyHash); + } + } + else { - ++InvalidEntries; - return; + if (LogEntry.OpCoreSize == 0) + { + ++InvalidEntries; + return; + } + + const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; + m_NextOpsOffset = + Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign)); + m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn); } - const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; - m_NextOpsOffset = - Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign)); - m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn); if (auto It = LatestKeys.find(LogEntry.OpKeyHash); It != LatestKeys.end()) { - OpLogEntries[It->second] = LogEntry; + OplogEntry& Entry = OpLogEntries[It->second]; + + if (LogEntry.IsTombstone() && Entry.IsTombstone()) + { + ZEN_SCOPED_WARN("found double tombstone - '{}'", LogEntry.OpKeyHash); + } + + Entry = LogEntry; } else { - size_t OpIndex = OpLogEntries.size(); + const size_t OpIndex = OpLogEntries.size(); LatestKeys[LogEntry.OpKeyHash] = OpIndex; OplogOrder.push_back(OpIndex); OpLogEntries.push_back(LogEntry); } }, - 0); + SkipEntryCount); } + std::sort(OplogOrder.begin(), OplogOrder.end(), [&](size_t Lhs, size_t Rhs) { const OplogEntry& LhsEntry = OpLogEntries[Lhs]; const OplogEntry& RhsEntry = OpLogEntries[Rhs]; @@ -342,47 +365,54 @@ struct ProjectStore::OplogStorage : public RefCounted { const OplogEntry& LogEntry = OpLogEntries[OplogOrderIndex]; - const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; - MemoryView OpBufferView = OpBlobsBuffer.MakeView(LogEntry.OpCoreSize, OpFileOffset); - if (OpBufferView.GetSize() == LogEntry.OpCoreSize) + if (LogEntry.IsTombstone()) + { + TombstoneEntries++; + } + else { // Verify checksum, ignore op data if incorrect - const auto OpCoreHash = uint32_t(XXH3_64bits(OpBufferView.GetData(), LogEntry.OpCoreSize) & 0xffffFFFF); - if (OpCoreHash != LogEntry.OpCoreHash) - { - ZEN_WARN("skipping oplog entry with bad checksum!"); - InvalidEntries++; - continue; - } - Handler(CbObjectView(OpBufferView.GetData()), LogEntry); - continue; - } + auto VerifyAndHandleOp = [&](MemoryView OpBufferView) { + const uint32_t OpCoreHash = uint32_t(XXH3_64bits(OpBufferView.GetData(), LogEntry.OpCoreSize) & 0xffffFFFF); - IoBuffer OpBuffer(LogEntry.OpCoreSize); - OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset); + if (OpCoreHash == LogEntry.OpCoreHash) + { + Handler(CbObjectView(OpBufferView.GetData()), LogEntry); + } + else + { + ZEN_WARN("skipping oplog entry with bad checksum!"); + InvalidEntries++; + } + }; - // Verify checksum, ignore op data if incorrect - const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), LogEntry.OpCoreSize) & 0xffffFFFF); + const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; + const MemoryView OpBufferView = OpBlobsBuffer.MakeView(LogEntry.OpCoreSize, OpFileOffset); + if (OpBufferView.GetSize() == LogEntry.OpCoreSize) + { + VerifyAndHandleOp(OpBufferView); + } + else + { + IoBuffer OpBuffer(LogEntry.OpCoreSize); + OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset); - if (OpCoreHash != LogEntry.OpCoreHash) - { - ZEN_WARN("skipping oplog entry with bad checksum!"); - InvalidEntries++; - continue; + VerifyAndHandleOp(OpBuffer); + } } - Handler(CbObjectView(OpBuffer.Data()), LogEntry); } if (InvalidEntries) { - ZEN_WARN("ignored {} zero-sized oplog entries", InvalidEntries); + ZEN_WARN("ignored {} invalid oplog entries", InvalidEntries); } - ZEN_INFO("Oplog replay completed in {} - Max LSN# {}, Next offset: {}", + ZEN_INFO("oplog replay completed in {} - Max LSN# {}, Next offset: {}, {} tombstones", NiceTimeSpanMs(Timer.GetElapsedTimeMs()), m_MaxLsn.load(), - m_NextOpsOffset.load()); + m_NextOpsOffset.load(), + TombstoneEntries); } void ReplayLogEntries(const std::span<OplogEntryAddress> Entries, std::function<void(CbObjectView)>&& Handler) @@ -418,7 +448,7 @@ struct ProjectStore::OplogStorage : public RefCounted return CbObject(SharedBuffer(std::move(OpBuffer))); } - OplogEntry AppendOp(SharedBuffer Buffer, uint32_t OpCoreHash, XXH3_128 KeyHash) + OplogEntry AppendOp(SharedBuffer Buffer, uint32_t OpCoreHash, Oid KeyHash) { ZEN_TRACE_CPU("Store::OplogStorage::AppendOp"); @@ -446,6 +476,14 @@ struct ProjectStore::OplogStorage : public RefCounted return Entry; } + void AppendTombstone(Oid KeyHash) + { + OplogEntry Entry = {.OpKeyHash = KeyHash}; + Entry.MakeTombstone(); + + m_Oplog.Append(Entry); + } + void Flush() { m_Oplog.Flush(); @@ -507,9 +545,67 @@ ProjectStore::Oplog::Flush() } void -ProjectStore::Oplog::ScrubStorage(ScrubContext& Ctx) const +ProjectStore::Oplog::ScrubStorage(ScrubContext& Ctx) { - ZEN_UNUSED(Ctx); + std::vector<Oid> BadEntryKeys; + + using namespace std::literals; + + IterateOplogWithKey([&](int Lsn, const Oid& Key, CbObjectView Op) { + ZEN_UNUSED(Lsn); + + std::vector<IoHash> Cids; + Op.IterateAttachments([&](CbFieldView Visitor) { Cids.emplace_back(Visitor.AsAttachment()); }); + + { + XXH3_128Stream KeyHasher; + Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); + XXH3_128 KeyHash128 = KeyHasher.GetHash(); + Oid KeyHash; + memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash); + + ZEN_ASSERT_FORMAT(KeyHash == Key, "oplog data does not match information from index (op:{} != index:{})", KeyHash, Key); + } + + for (const IoHash& Cid : Cids) + { + if (!m_CidStore.ContainsChunk(Cid)) + { + // oplog entry references a CAS chunk which is not + // present + BadEntryKeys.push_back(Key); + return; + } + if (Ctx.IsBadCid(Cid)) + { + // oplog entry references a CAS chunk which has been + // flagged as bad + BadEntryKeys.push_back(Key); + return; + } + } + }); + + if (!BadEntryKeys.empty()) + { + if (Ctx.RunRecovery()) + { + ZEN_WARN("scrubbing found {} bad ops in oplog @ '{}', these will be removed from the index", BadEntryKeys.size(), m_BasePath); + + // Actually perform some clean-up + RwLock::ExclusiveLockScope _(m_OplogLock); + + for (const auto& Key : BadEntryKeys) + { + m_LatestOpMap.erase(Key); + m_Storage->AppendTombstone(Key); + } + } + else + { + ZEN_WARN("scrubbing found {} bad ops in oplog @ '{}' but no cleanup will be performed", BadEntryKeys.size(), m_BasePath); + } + } } void @@ -658,6 +754,8 @@ ProjectStore::Oplog::Update(const std::filesystem::path& MarkerPath) void ProjectStore::Oplog::ReplayLog() { + ZEN_LOG_SCOPE("ReplayLog '{}'", m_OplogId); + RwLock::ExclusiveLockScope OplogLock(m_OplogLock); if (!m_Storage) { @@ -818,41 +916,55 @@ ProjectStore::Oplog::IterateOplogWithKey(std::function<void(int, const Oid&, CbO return; } - std::vector<size_t> EntryIndexes; - std::vector<OplogEntryAddress> Entries; - std::vector<Oid> Keys; - std::vector<int> LSNs; - Entries.reserve(m_LatestOpMap.size()); - EntryIndexes.reserve(m_LatestOpMap.size()); - Keys.reserve(m_LatestOpMap.size()); - LSNs.reserve(m_LatestOpMap.size()); + std::vector<OplogEntryAddress> SortedEntries; + std::vector<Oid> SortedKeys; + std::vector<int> SortedLSNs; - for (const auto& Kv : m_LatestOpMap) { - const auto AddressEntry = m_OpAddressMap.find(Kv.second); - ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); + const auto TargetEntryCount = m_LatestOpMap.size(); - Entries.push_back(AddressEntry->second); - Keys.push_back(Kv.first); - LSNs.push_back(Kv.second); - EntryIndexes.push_back(EntryIndexes.size()); - } + std::vector<size_t> EntryIndexes; + std::vector<OplogEntryAddress> Entries; + std::vector<Oid> Keys; + std::vector<int> LSNs; - std::sort(EntryIndexes.begin(), EntryIndexes.end(), [&Entries](const size_t& Lhs, const size_t& Rhs) { - const OplogEntryAddress& LhsEntry = Entries[Lhs]; - const OplogEntryAddress& RhsEntry = Entries[Rhs]; - return LhsEntry.Offset < RhsEntry.Offset; - }); - std::vector<OplogEntryAddress> SortedEntries; - SortedEntries.reserve(EntryIndexes.size()); - for (size_t Index : EntryIndexes) - { - SortedEntries.push_back(Entries[Index]); + Entries.reserve(TargetEntryCount); + EntryIndexes.reserve(TargetEntryCount); + Keys.reserve(TargetEntryCount); + LSNs.reserve(TargetEntryCount); + + for (const auto& Kv : m_LatestOpMap) + { + const auto AddressEntry = m_OpAddressMap.find(Kv.second); + ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); + + Entries.push_back(AddressEntry->second); + Keys.push_back(Kv.first); + LSNs.push_back(Kv.second); + EntryIndexes.push_back(EntryIndexes.size()); + } + + std::sort(EntryIndexes.begin(), EntryIndexes.end(), [&Entries](const size_t& Lhs, const size_t& Rhs) { + const OplogEntryAddress& LhsEntry = Entries[Lhs]; + const OplogEntryAddress& RhsEntry = Entries[Rhs]; + return LhsEntry.Offset < RhsEntry.Offset; + }); + + SortedEntries.reserve(EntryIndexes.size()); + SortedKeys.reserve(EntryIndexes.size()); + SortedLSNs.reserve(EntryIndexes.size()); + + for (size_t Index : EntryIndexes) + { + SortedEntries.push_back(Entries[Index]); + SortedKeys.push_back(Keys[Index]); + SortedLSNs.push_back(LSNs[Index]); + } } size_t EntryIndex = 0; m_Storage->ReplayLogEntries(SortedEntries, [&](CbObjectView Op) { - Handler(LSNs[EntryIndex], Keys[EntryIndex], Op); + Handler(SortedLSNs[EntryIndex], SortedKeys[EntryIndex], Op); EntryIndex++; }); } @@ -1030,7 +1142,7 @@ ProjectStore::Oplog::GetMapping(CbObjectView Core) } if (ClientPath.empty()) { - ZEN_WARN("invalid file for entry '{}', missing 'both 'clientpath'", Id); + ZEN_WARN("invalid file for entry '{}', missing 'clientpath' field", Id); continue; } @@ -1088,7 +1200,7 @@ ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, } m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize}); - m_LatestOpMap[OpEntry.OpKeyAsOId()] = OpEntry.OpLsn; + m_LatestOpMap[OpEntry.OpKeyHash] = OpEntry.OpLsn; return OpEntry.OpLsn; } @@ -1150,7 +1262,9 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core) XXH3_128Stream KeyHasher; Core["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); - XXH3_128 KeyHash = KeyHasher.GetHash(); + XXH3_128 KeyHash128 = KeyHasher.GetHash(); + Oid KeyHash; + memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash); RefPtr<OplogStorage> Storage; { @@ -1545,7 +1659,7 @@ ProjectStore::Project::ScrubStorage(ScrubContext& Ctx) { OpenOplog(OpLogId); } - IterateOplogs([&](const RwLock::SharedLockScope& ProjectLock, const Oplog& Ops) { + IterateOplogs([&](const RwLock::SharedLockScope& ProjectLock, Oplog& Ops) { if (!IsExpired(ProjectLock, GcClock::TimePoint::min(), Ops)) { Ops.ScrubStorage(Ctx); @@ -3290,7 +3404,8 @@ ProjectStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) if (!Project->IsExpired(Lock, Ctx.Settings.ProjectStoreExpireTime)) { ZEN_DEBUG( - "GCV2: projectstore [REMOVE EXPIRED] '{}': skipped garbage collect of project '{}'. Project no longer expired.", + "GCV2: projectstore [REMOVE EXPIRED] '{}': skipped garbage collect of project '{}'. Project no longer " + "expired.", m_ProjectBasePath, ProjectId); continue; diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index 57cda8ae7..5ebcd420c 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -31,14 +31,11 @@ struct OplogEntry uint32_t OpCoreOffset; // note: Multiple of alignment! uint32_t OpCoreSize; uint32_t OpCoreHash; // Used as checksum - XXH3_128 OpKeyHash; // XXH128_canonical_t + Oid OpKeyHash; + uint32_t Reserved; - inline Oid OpKeyAsOId() const - { - Oid Id; - memcpy(Id.OidBits, &OpKeyHash, sizeof Id.OidBits); - return Id; - } + inline bool IsTombstone() const { return OpCoreOffset == 0 && OpCoreSize == 0 && OpLsn == 0; } + inline void MakeTombstone() { OpLsn = OpCoreOffset = OpCoreSize = OpCoreHash = Reserved = 0; } }; struct OplogEntryAddress @@ -127,7 +124,7 @@ public: LoggerRef Log() { return m_OuterProject->Log(); } void Flush(); - void ScrubStorage(ScrubContext& Ctx) const; + void ScrubStorage(ScrubContext& Ctx); void GatherReferences(GcContext& GcCtx); static uint64_t TotalSize(const std::filesystem::path& BasePath); uint64_t TotalSize() const; diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index 418e3c325..336f715f4 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -120,6 +120,7 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen m_DebugOptionForcedCrash = ServerOptions.ShouldCrash; m_IsPowerCycle = ServerOptions.IsPowerCycle; const int ParentPid = ServerOptions.OwnerPid; + m_StartupScrubOptions = ServerOptions.ScrubOptions; if (ParentPid) { @@ -554,10 +555,6 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) void ZenServer::Run() { - // This is disabled for now, awaiting better scheduling - // - // ScrubStorage(); - if (m_ProcessMonitor.IsActive()) { CheckOwnerPid(); @@ -594,6 +591,69 @@ ZenServer::Run() OnReady(); + if (!m_StartupScrubOptions.empty()) + { + using namespace std::literals; + + ZEN_INFO("triggering scrub with settings: '{}'", m_StartupScrubOptions); + + bool DoScrub = true; + bool DoWait = false; + GcScheduler::TriggerScrubParams ScrubParams; + + ForEachStrTok(m_StartupScrubOptions, ',', [&](std::string_view Token) { + if (Token == "nocas"sv) + { + ScrubParams.SkipCas = true; + } + else if (Token == "nodelete"sv) + { + ScrubParams.SkipDelete = true; + } + else if (Token == "nogc"sv) + { + ScrubParams.SkipGc = true; + } + else if (Token == "no"sv) + { + DoScrub = false; + } + else if (Token == "wait"sv) + { + DoWait = true; + } + return true; + }); + + if (DoScrub) + { + m_GcScheduler.TriggerScrub(ScrubParams); + + if (DoWait) + { + auto State = m_GcScheduler.Status(); + + while ((State != GcSchedulerStatus::kRunning) && (State != GcSchedulerStatus::kStopped)) + { + Sleep(500); + + State = m_GcScheduler.Status(); + } + + ZEN_INFO("waiting for Scrub/GC to complete..."); + + while (State == GcSchedulerStatus::kRunning) + { + Sleep(500); + + State = m_GcScheduler.Status(); + } + + ZEN_INFO("Scrub/GC completed"); + } + } + } + if (m_IsPowerCycle) { ZEN_INFO("Power cycle mode enabled -- shutting down"); diff --git a/src/zenserver/zenserver.h b/src/zenserver/zenserver.h index 68d928bec..1afd70b3e 100644 --- a/src/zenserver/zenserver.h +++ b/src/zenserver/zenserver.h @@ -139,6 +139,8 @@ private: bool m_DebugOptionForcedCrash = false; bool m_UseSentry = false; + + std::string m_StartupScrubOptions; }; } // namespace zen diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 96ab65a5f..b21f9f8d8 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -254,6 +254,12 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) { ZEN_TRACE_CPU("CasContainer::ScrubStorage"); + if (Ctx.IsSkipCas()) + { + ZEN_INFO("SKIPPED scrubbing: '{}'", m_BlocksBasePath); + return; + } + ZEN_INFO("scrubbing '{}'", m_BlocksBasePath); std::vector<IoHash> BadKeys; @@ -297,21 +303,12 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) uint64_t RawSize; if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) { - if (RawHash != Hash) + if (RawHash == Hash) { - // Hash mismatch - BadKeys.push_back(Hash); + // TODO: this should also hash the (decompressed) contents return; } - return; - } -#if ZEN_WITH_TESTS - IoHash ComputedHash = IoHash::HashBuffer(Data, Size); - if (ComputedHash == Hash) - { - return; } -#endif BadKeys.push_back(Hash); }; @@ -326,26 +323,15 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) IoHash RawHash; uint64_t RawSize; - // TODO: Add API to verify compressed buffer without having to memorymap the whole file + // TODO: Add API to verify compressed buffer without having to memory-map the whole file if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) { - if (RawHash != Hash) + if (RawHash == Hash) { - // Hash mismatch - BadKeys.push_back(Hash); + // TODO: this should also hash the (decompressed) contents return; } - return; - } -#if ZEN_WITH_TESTS - IoHashStream Hasher; - File.StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); }); - IoHash ComputedHash = Hasher.GetHash(); - if (ComputedHash == Hash) - { - return; } -#endif BadKeys.push_back(Hash); }; @@ -398,7 +384,7 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) Ctx.ReportBadCidChunks(BadKeys); } - ZEN_INFO("compact cas scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes)); + ZEN_INFO("scrubbed {} chunks ({}) in '{}'", ChunkCount, NiceBytes(ChunkBytes), m_RootDirectory / m_ContainerBaseName); } void diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 5da612e30..f18509758 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -846,6 +846,13 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx) { ZEN_TRACE_CPU("FileCas::ScrubStorage"); + if (Ctx.IsSkipCas()) + { + ZEN_INFO("SKIPPED scrubbing: '{}'", m_RootDirectory); + return; + } + + Stopwatch Timer; ZEN_INFO("scrubbing file CAS @ '{}'", m_RootDirectory); ZEN_ASSERT(m_IsInitialized); @@ -853,6 +860,8 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx) std::vector<IoHash> BadHashes; uint64_t ChunkCount{0}, ChunkBytes{0}; + int DiscoveredFilesNotInIndex = 0; + { std::vector<FileCasStrategy::FileCasIndexEntry> ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory); RwLock::ExclusiveLockScope _(m_Lock); @@ -862,10 +871,13 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx) { m_TotalSize.fetch_add(static_cast<uint64_t>(Entry.Size), std::memory_order::relaxed); m_CasLog.Append({.Key = Entry.Key, .Size = Entry.Size}); + ++DiscoveredFilesNotInIndex; } } } + ZEN_INFO("discovered {} files @ '{}' ({} not in index), scrubbing", m_Index.size(), m_RootDirectory, DiscoveredFilesNotInIndex); + IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) { if (!Payload) { @@ -875,25 +887,65 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx) ++ChunkCount; ChunkBytes += Payload.GetSize(); + IoBuffer InMemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Payload); + IoHash RawHash; uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize)) + if (CompressedBuffer::ValidateCompressedHeader(Payload, /* out */ RawHash, /* out */ RawSize)) { - if (RawHash != Hash) + if (RawHash == Hash) { - // Hash mismatch - BadHashes.push_back(Hash); - return; + // Header hash matches the file name, full validation requires that + // we check that the decompressed data hash also matches + + CompressedBuffer CompBuffer = CompressedBuffer::FromCompressedNoValidate(std::move(InMemoryBuffer)); + + OodleCompressor Compressor; + OodleCompressionLevel CompressionLevel; + uint64_t BlockSize; + if (CompBuffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) + { + if (BlockSize == 0) + { + BlockSize = 256 * 1024; + } + else if (BlockSize < (1024 * 1024)) + { + BlockSize = BlockSize * (1024 * 1024 / BlockSize); + } + + std::unique_ptr<uint8_t[]> DecompressionBuffer(new uint8_t[BlockSize]); + + IoHashStream Hasher; + + uint64_t RawOffset = 0; + while (RawSize) + { + const uint64_t DecompressedBlockSize = Min(BlockSize, RawSize); + + bool Ok = CompBuffer.TryDecompressTo(MutableMemoryView((void*)DecompressionBuffer.get(), DecompressedBlockSize), + RawOffset); + + if (Ok) + { + Hasher.Append(DecompressionBuffer.get(), DecompressedBlockSize); + } + + RawSize -= DecompressedBlockSize; + RawOffset += DecompressedBlockSize; + } + + const IoHash FinalHash = Hasher.GetHash(); + + if (FinalHash == Hash) + { + // all good + return; + } + } } - return; - } -#if ZEN_WITH_TESTS - IoHash ComputedHash = IoHash::HashBuffer(CompositeBuffer(SharedBuffer(std::move(Payload)))); - if (ComputedHash == Hash) - { - return; } -#endif + BadHashes.push_back(Hash); }); @@ -901,7 +953,7 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx) if (!BadHashes.empty()) { - ZEN_WARN("file CAS scrubbing: {} bad chunks found", BadHashes.size()); + ZEN_WARN("file CAS scrubbing: {} bad chunks found @ '{}'", BadHashes.size(), m_RootDirectory); if (Ctx.RunRecovery()) { @@ -914,10 +966,14 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx) if (Ec) { - ZEN_WARN("failed to delete file for chunk {}", Hash); + ZEN_WARN("failed to delete file for chunk {}: {}", Hash, Ec.message()); } } } + else + { + ZEN_WARN("recovery: NOT deleting backing files for {} bad chunks", BadHashes.size()); + } } // Let whomever it concerns know about the bad chunks. This could @@ -925,7 +981,11 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx) // than a full validation pass might be able to do Ctx.ReportBadCidChunks(BadHashes); - ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes)); + ZEN_INFO("file CAS @ '{}' scrubbed: {} chunks ({}), took {}", + m_RootDirectory, + ChunkCount, + NiceBytes(ChunkBytes), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } void diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index 2660c2643..de653b0e3 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -1712,12 +1712,18 @@ GcScheduler::SchedulerThread() DoGc = false; } + if (m_TriggerScrubParams->SkipCas) + { + SkipCid = true; + } + + DoDelete = !m_TriggerScrubParams->SkipDelete; ScrubTimeslice = m_TriggerScrubParams->MaxTimeslice; } if (DoScrubbing) { - ScrubStorage(DoDelete, ScrubTimeslice); + ScrubStorage(DoDelete, SkipCid, ScrubTimeslice); m_TriggerScrubParams.reset(); } @@ -1961,7 +1967,7 @@ GcScheduler::SchedulerThread() } void -GcScheduler::ScrubStorage(bool DoDelete, std::chrono::seconds TimeSlice) +GcScheduler::ScrubStorage(bool DoDelete, bool SkipCid, std::chrono::seconds TimeSlice) { const std::chrono::steady_clock::time_point TimeNow = std::chrono::steady_clock::now(); std::chrono::steady_clock::time_point Deadline = TimeNow + TimeSlice; @@ -1972,13 +1978,14 @@ GcScheduler::ScrubStorage(bool DoDelete, std::chrono::seconds TimeSlice) } Stopwatch Timer; - ZEN_INFO("scrubbing STARTING (delete mode => {})", DoDelete); + ZEN_INFO("scrubbing STARTING (delete mode => {}, skip CID => {})", DoDelete, SkipCid); WorkerThreadPool& ThreadPool = GetSmallWorkerPool(); ScrubContext Ctx{ThreadPool, Deadline}; try { + Ctx.SetSkipCas(SkipCid); Ctx.SetShouldDelete(DoDelete); m_GcManager.ScrubStorage(Ctx); } diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h index 698b0d4e8..30dd97ce8 100644 --- a/src/zenstore/include/zenstore/gc.h +++ b/src/zenstore/include/zenstore/gc.h @@ -492,6 +492,8 @@ public: { bool SkipGc = false; std::chrono::seconds MaxTimeslice = std::chrono::seconds::max(); + bool SkipDelete = false; + bool SkipCas = false; }; bool TriggerScrub(const TriggerScrubParams& Params); @@ -508,7 +510,7 @@ private: GcVersion UseGCVersion, uint32_t CompactBlockUsageThresholdPercent, bool Verbose); - void ScrubStorage(bool DoDelete, std::chrono::seconds TimeSlice); + void ScrubStorage(bool DoDelete, bool SkipCid, std::chrono::seconds TimeSlice); LoggerRef Log() { return m_Log; } virtual bool AreDiskWritesAllowed() const override { return !m_AreDiskWritesBlocked.load(); } DiskSpace CheckDiskSpace(); diff --git a/src/zenstore/include/zenstore/scrubcontext.h b/src/zenstore/include/zenstore/scrubcontext.h index cefaf0888..2f28cfec7 100644 --- a/src/zenstore/include/zenstore/scrubcontext.h +++ b/src/zenstore/include/zenstore/scrubcontext.h @@ -38,15 +38,20 @@ public: inline uint64_t ScrubbedBytes() const { return m_ByteCount; } HashKeySet BadCids() const; + bool IsBadCid(const IoHash& Cid) const; inline bool RunRecovery() const { return m_Recover; } inline void SetShouldDelete(bool DoDelete) { m_Recover = DoDelete; } + inline bool IsSkipCas() const { return m_SkipCas; } + inline void SetSkipCas(bool DoSkipCas) { m_SkipCas = DoSkipCas; } + inline WorkerThreadPool& ThreadPool() { return m_WorkerThreadPool; } private: uint64_t m_ScrubTime = GetHifreqTimerValue(); bool m_Recover = true; + bool m_SkipCas = false; std::atomic<uint64_t> m_ChunkCount{0}; std::atomic<uint64_t> m_ByteCount{0}; mutable RwLock m_Lock; diff --git a/src/zenstore/scrubcontext.cpp b/src/zenstore/scrubcontext.cpp index f5a3784c3..fbcd7d33c 100644 --- a/src/zenstore/scrubcontext.cpp +++ b/src/zenstore/scrubcontext.cpp @@ -33,6 +33,13 @@ ScrubContext::BadCids() const return m_BadCid; } +bool +ScrubContext::IsBadCid(const IoHash& Cid) const +{ + RwLock::SharedLockScope _(m_Lock); + return m_BadCid.ContainsHash(Cid); +} + void ScrubContext::ReportBadCidChunks(std::span<IoHash> BadCasChunks) { |