aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-12-11 11:48:23 +0100
committerGitHub <[email protected]>2023-12-11 11:48:23 +0100
commit37920b41048acffa30cf156d7d36bfc17ba15c0e (patch)
tree15c4f652a54470e359a9b9dcd194e89cb10eaaf9 /src
parentmulti-line logging improvements (#597) (diff)
downloadzen-37920b41048acffa30cf156d7d36bfc17ba15c0e.tar.xz
zen-37920b41048acffa30cf156d7d36bfc17ba15c0e.zip
improved scrubbing of oplogs and filecas (#596)
- Improvement: Scrub command now validates compressed buffer hashes in filecas storage (used for large chunks) - Improvement: Added --dry, --no-gc and --no-cas options to zen scrub command - Improvement: Implemented oplog scrubbing (previously was a no-op) - Improvement: Implemented support for running scrubbint at startup with --scrub=<options>
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/admin_cmd.cpp13
-rw-r--r--src/zen/cmds/admin_cmd.h3
-rw-r--r--src/zencore/include/zencore/string.h13
-rw-r--r--src/zenserver/admin/admin.cpp20
-rw-r--r--src/zenserver/config.cpp4
-rw-r--r--src/zenserver/config.h1
-rw-r--r--src/zenserver/main.cpp19
-rw-r--r--src/zenserver/projectstore/projectstore.cpp261
-rw-r--r--src/zenserver/projectstore/projectstore.h13
-rw-r--r--src/zenserver/zenserver.cpp68
-rw-r--r--src/zenserver/zenserver.h2
-rw-r--r--src/zenstore/compactcas.cpp38
-rw-r--r--src/zenstore/filecas.cpp92
-rw-r--r--src/zenstore/gc.cpp13
-rw-r--r--src/zenstore/include/zenstore/gc.h4
-rw-r--r--src/zenstore/include/zenstore/scrubcontext.h5
-rw-r--r--src/zenstore/scrubcontext.cpp7
17 files changed, 439 insertions, 137 deletions
diff --git a/src/zen/cmds/admin_cmd.cpp b/src/zen/cmds/admin_cmd.cpp
index 50510ce03..1bde785c7 100644
--- a/src/zen/cmds/admin_cmd.cpp
+++ b/src/zen/cmds/admin_cmd.cpp
@@ -20,6 +20,9 @@ ScrubCommand::ScrubCommand()
{
m_Options.add_options()("h,help", "Print help");
m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>");
+ m_Options.add_option("", "n", "dry", "Dry run (do not delete any data)", cxxopts::value(m_DryRun), "<bool>");
+ m_Options.add_option("", "", "no-gc", "Do not perform GC after scrub pass", cxxopts::value(m_NoGc), "<bool>");
+ m_Options.add_option("", "", "no-cas", "Do not scrub CAS stores", cxxopts::value(m_NoCas), "<bool>");
}
ScrubCommand::~ScrubCommand() = default;
@@ -41,11 +44,13 @@ ScrubCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw OptionParseException("unable to resolve server specification");
}
- zen::HttpClient Http(m_HostName);
+ HttpClient Http(m_HostName);
+
+ HttpClient::KeyValueMap Params{{"skipdelete", ToString(m_DryRun)}, {"skipgc", ToString(m_NoGc)}, {"skipcid", ToString(m_NoCas)}};
- if (zen::HttpClient::Response Response = Http.Post("/admin/scrub"sv))
+ if (HttpClient::Response Response = Http.Post("/admin/scrub"sv, /* headers */ HttpClient::KeyValueMap{}, Params))
{
- ZEN_CONSOLE("OK: {}", Response.ToText());
+ ZEN_CONSOLE("scrub started OK: {}", Response.ToText());
return 0;
}
@@ -78,7 +83,7 @@ GcCommand::GcCommand()
"<smallobjects>");
m_Options.add_option("", "", "skipcid", "Skip collection of CAS data", cxxopts::value(m_SkipCid)->default_value("false"), "<skipcid>");
m_Options.add_option("",
- "",
+ "n",
"skipdelete",
"Skip deletion of data (dryrun)",
cxxopts::value(m_SkipDelete)->default_value("false"),
diff --git a/src/zen/cmds/admin_cmd.h b/src/zen/cmds/admin_cmd.h
index b5741c666..12029d57e 100644
--- a/src/zen/cmds/admin_cmd.h
+++ b/src/zen/cmds/admin_cmd.h
@@ -22,6 +22,9 @@ public:
private:
cxxopts::Options m_Options{"scrub", "Scrub zen storage"};
std::string m_HostName;
+ bool m_DryRun = false;
+ bool m_NoGc = false;
+ bool m_NoCas = false;
};
/** Garbage collect storage
diff --git a/src/zencore/include/zencore/string.h b/src/zencore/include/zencore/string.h
index e3de2224c..3aec1647d 100644
--- a/src/zencore/include/zencore/string.h
+++ b/src/zencore/include/zencore/string.h
@@ -809,6 +809,19 @@ ForEachStrTok(const std::string_view& Str, char Delim, Fn&& Func)
//////////////////////////////////////////////////////////////////////////
+inline std::string_view
+ToString(bool Value)
+{
+ using namespace std::literals;
+ if (Value)
+ {
+ return "true"sv;
+ }
+ return "false"sv;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
inline int32_t
StrCaseCompare(const char* Lhs, const char* Rhs, int64_t Length = -1)
{
diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp
index 1d5463a32..a0c90c9a0 100644
--- a/src/zenserver/admin/admin.cpp
+++ b/src/zenserver/admin/admin.cpp
@@ -405,10 +405,30 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler,
GcScheduler::TriggerScrubParams ScrubParams;
ScrubParams.MaxTimeslice = std::chrono::seconds(100);
+
+ if (auto Param = Params.GetValue("skipdelete"); Param.empty() == false)
+ {
+ ScrubParams.SkipDelete = (Param == "true"sv);
+ }
+
+ if (auto Param = Params.GetValue("skipgc"); Param.empty() == false)
+ {
+ ScrubParams.SkipGc = (Param == "true"sv);
+ }
+
+ if (auto Param = Params.GetValue("skipcid"); Param.empty() == false)
+ {
+ ScrubParams.SkipCas = (Param == "true"sv);
+ }
+
m_GcScheduler.TriggerScrub(ScrubParams);
CbObjectWriter Response;
Response << "ok"sv << true;
+ Response << "skip_delete" << ScrubParams.SkipDelete;
+ Response << "skip_gc" << ScrubParams.SkipGc;
+ Response << "skip_cas" << ScrubParams.SkipCas;
+ Response << "max_time" << TimeSpan(0, 0, gsl::narrow<int>(ScrubParams.MaxTimeslice.count()));
HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save());
},
HttpVerb::kPost);
diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp
index 3fe0b0c63..5f2c3351e 100644
--- a/src/zenserver/config.cpp
+++ b/src/zenserver/config.cpp
@@ -518,6 +518,10 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
options.add_options()("clean",
"Clean out all state at startup",
cxxopts::value<bool>(ServerOptions.IsCleanStart)->default_value("false"));
+ options.add_options()("scrub",
+ "Validate state at startup",
+ cxxopts::value(ServerOptions.ScrubOptions)->implicit_value("yes"),
+ "(nocas,nogc,nodelete,yes,no)*");
options.add_options()("help", "Show command line help");
options.add_options()("t, test", "Enable test mode", cxxopts::value<bool>(ServerOptions.IsTest)->default_value("false"));
options.add_options()("data-dir", "Specify persistence root", cxxopts::value<std::string>(DataDir));
diff --git a/src/zenserver/config.h b/src/zenserver/config.h
index 11311f9d8..cd2d92523 100644
--- a/src/zenserver/config.h
+++ b/src/zenserver/config.h
@@ -153,6 +153,7 @@ struct ZenServerOptions
bool ObjectStoreEnabled = false;
bool NoConsoleOutput = false; // Control default use of stdout for diagnostics
std::string Loggers[zen::logging::level::LogLevelCount];
+ std::string ScrubOptions;
#if ZEN_WITH_TRACE
std::string TraceHost; // Host name or IP address to send trace data to
std::string TraceFile; // Path of a file to write a trace
diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp
index 6901323e3..868b5131c 100644
--- a/src/zenserver/main.cpp
+++ b/src/zenserver/main.cpp
@@ -338,9 +338,24 @@ main(int argc, char* argv[])
ZenServerOptions ServerOptions;
ParseCliOptions(argc, argv, ServerOptions);
- if (ServerOptions.IsCleanStart || !ServerOptions.BaseSnapshotDir.empty())
+ std::string_view DeleteReason;
+
+ if (ServerOptions.IsCleanStart)
+ {
+ DeleteReason = "clean start requested"sv;
+ }
+ else if (!ServerOptions.BaseSnapshotDir.empty())
+ {
+ DeleteReason = "will initialize state from base snapshot"sv;
+ }
+
+ if (!DeleteReason.empty())
{
- DeleteDirectories(ServerOptions.DataDir);
+ if (std::filesystem::exists(ServerOptions.DataDir))
+ {
+ ZEN_CONSOLE_INFO("deleting files from '{}' ({})", ServerOptions.DataDir, DeleteReason);
+ DeleteDirectories(ServerOptions.DataDir);
+ }
}
if (!std::filesystem::exists(ServerOptions.DataDir))
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 9ba8e3a19..73cb35fb8 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -2,6 +2,7 @@
#include "projectstore.h"
+#include <zencore/assertfmt.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
#include <zencore/compactbinaryutil.h>
@@ -298,38 +299,60 @@ struct ProjectStore::OplogStorage : public RefCounted
Stopwatch Timer;
- uint64_t InvalidEntries = 0;
+ uint64_t InvalidEntries = 0;
+ uint64_t TombstoneEntries = 0;
std::vector<OplogEntry> OpLogEntries;
std::vector<size_t> OplogOrder;
{
- tsl::robin_map<XXH3_128, size_t, XXH3_128::Hasher> LatestKeys;
+ tsl::robin_map<Oid, size_t, Oid::Hasher> LatestKeys;
+ const uint64_t SkipEntryCount = 0;
+
m_Oplog.Replay(
[&](const OplogEntry& LogEntry) {
- if (LogEntry.OpCoreSize == 0)
+ if (LogEntry.IsTombstone())
+ {
+ if (auto It = LatestKeys.find(LogEntry.OpKeyHash); It == LatestKeys.end())
+ {
+ ZEN_SCOPED_WARN("found tombstone referencing unknown key {}", LogEntry.OpKeyHash);
+ }
+ }
+ else
{
- ++InvalidEntries;
- return;
+ if (LogEntry.OpCoreSize == 0)
+ {
+ ++InvalidEntries;
+ return;
+ }
+
+ const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
+ m_NextOpsOffset =
+ Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign));
+ m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn);
}
- const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
- m_NextOpsOffset =
- Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign));
- m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn);
if (auto It = LatestKeys.find(LogEntry.OpKeyHash); It != LatestKeys.end())
{
- OpLogEntries[It->second] = LogEntry;
+ OplogEntry& Entry = OpLogEntries[It->second];
+
+ if (LogEntry.IsTombstone() && Entry.IsTombstone())
+ {
+ ZEN_SCOPED_WARN("found double tombstone - '{}'", LogEntry.OpKeyHash);
+ }
+
+ Entry = LogEntry;
}
else
{
- size_t OpIndex = OpLogEntries.size();
+ const size_t OpIndex = OpLogEntries.size();
LatestKeys[LogEntry.OpKeyHash] = OpIndex;
OplogOrder.push_back(OpIndex);
OpLogEntries.push_back(LogEntry);
}
},
- 0);
+ SkipEntryCount);
}
+
std::sort(OplogOrder.begin(), OplogOrder.end(), [&](size_t Lhs, size_t Rhs) {
const OplogEntry& LhsEntry = OpLogEntries[Lhs];
const OplogEntry& RhsEntry = OpLogEntries[Rhs];
@@ -342,47 +365,54 @@ struct ProjectStore::OplogStorage : public RefCounted
{
const OplogEntry& LogEntry = OpLogEntries[OplogOrderIndex];
- const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
- MemoryView OpBufferView = OpBlobsBuffer.MakeView(LogEntry.OpCoreSize, OpFileOffset);
- if (OpBufferView.GetSize() == LogEntry.OpCoreSize)
+ if (LogEntry.IsTombstone())
+ {
+ TombstoneEntries++;
+ }
+ else
{
// Verify checksum, ignore op data if incorrect
- const auto OpCoreHash = uint32_t(XXH3_64bits(OpBufferView.GetData(), LogEntry.OpCoreSize) & 0xffffFFFF);
- if (OpCoreHash != LogEntry.OpCoreHash)
- {
- ZEN_WARN("skipping oplog entry with bad checksum!");
- InvalidEntries++;
- continue;
- }
- Handler(CbObjectView(OpBufferView.GetData()), LogEntry);
- continue;
- }
+ auto VerifyAndHandleOp = [&](MemoryView OpBufferView) {
+ const uint32_t OpCoreHash = uint32_t(XXH3_64bits(OpBufferView.GetData(), LogEntry.OpCoreSize) & 0xffffFFFF);
- IoBuffer OpBuffer(LogEntry.OpCoreSize);
- OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset);
+ if (OpCoreHash == LogEntry.OpCoreHash)
+ {
+ Handler(CbObjectView(OpBufferView.GetData()), LogEntry);
+ }
+ else
+ {
+ ZEN_WARN("skipping oplog entry with bad checksum!");
+ InvalidEntries++;
+ }
+ };
- // Verify checksum, ignore op data if incorrect
- const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), LogEntry.OpCoreSize) & 0xffffFFFF);
+ const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
+ const MemoryView OpBufferView = OpBlobsBuffer.MakeView(LogEntry.OpCoreSize, OpFileOffset);
+ if (OpBufferView.GetSize() == LogEntry.OpCoreSize)
+ {
+ VerifyAndHandleOp(OpBufferView);
+ }
+ else
+ {
+ IoBuffer OpBuffer(LogEntry.OpCoreSize);
+ OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset);
- if (OpCoreHash != LogEntry.OpCoreHash)
- {
- ZEN_WARN("skipping oplog entry with bad checksum!");
- InvalidEntries++;
- continue;
+ VerifyAndHandleOp(OpBuffer);
+ }
}
- Handler(CbObjectView(OpBuffer.Data()), LogEntry);
}
if (InvalidEntries)
{
- ZEN_WARN("ignored {} zero-sized oplog entries", InvalidEntries);
+ ZEN_WARN("ignored {} invalid oplog entries", InvalidEntries);
}
- ZEN_INFO("Oplog replay completed in {} - Max LSN# {}, Next offset: {}",
+ ZEN_INFO("oplog replay completed in {} - Max LSN# {}, Next offset: {}, {} tombstones",
NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
m_MaxLsn.load(),
- m_NextOpsOffset.load());
+ m_NextOpsOffset.load(),
+ TombstoneEntries);
}
void ReplayLogEntries(const std::span<OplogEntryAddress> Entries, std::function<void(CbObjectView)>&& Handler)
@@ -418,7 +448,7 @@ struct ProjectStore::OplogStorage : public RefCounted
return CbObject(SharedBuffer(std::move(OpBuffer)));
}
- OplogEntry AppendOp(SharedBuffer Buffer, uint32_t OpCoreHash, XXH3_128 KeyHash)
+ OplogEntry AppendOp(SharedBuffer Buffer, uint32_t OpCoreHash, Oid KeyHash)
{
ZEN_TRACE_CPU("Store::OplogStorage::AppendOp");
@@ -446,6 +476,14 @@ struct ProjectStore::OplogStorage : public RefCounted
return Entry;
}
+ void AppendTombstone(Oid KeyHash)
+ {
+ OplogEntry Entry = {.OpKeyHash = KeyHash};
+ Entry.MakeTombstone();
+
+ m_Oplog.Append(Entry);
+ }
+
void Flush()
{
m_Oplog.Flush();
@@ -507,9 +545,67 @@ ProjectStore::Oplog::Flush()
}
void
-ProjectStore::Oplog::ScrubStorage(ScrubContext& Ctx) const
+ProjectStore::Oplog::ScrubStorage(ScrubContext& Ctx)
{
- ZEN_UNUSED(Ctx);
+ std::vector<Oid> BadEntryKeys;
+
+ using namespace std::literals;
+
+ IterateOplogWithKey([&](int Lsn, const Oid& Key, CbObjectView Op) {
+ ZEN_UNUSED(Lsn);
+
+ std::vector<IoHash> Cids;
+ Op.IterateAttachments([&](CbFieldView Visitor) { Cids.emplace_back(Visitor.AsAttachment()); });
+
+ {
+ XXH3_128Stream KeyHasher;
+ Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
+ XXH3_128 KeyHash128 = KeyHasher.GetHash();
+ Oid KeyHash;
+ memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash);
+
+ ZEN_ASSERT_FORMAT(KeyHash == Key, "oplog data does not match information from index (op:{} != index:{})", KeyHash, Key);
+ }
+
+ for (const IoHash& Cid : Cids)
+ {
+ if (!m_CidStore.ContainsChunk(Cid))
+ {
+ // oplog entry references a CAS chunk which is not
+ // present
+ BadEntryKeys.push_back(Key);
+ return;
+ }
+ if (Ctx.IsBadCid(Cid))
+ {
+ // oplog entry references a CAS chunk which has been
+ // flagged as bad
+ BadEntryKeys.push_back(Key);
+ return;
+ }
+ }
+ });
+
+ if (!BadEntryKeys.empty())
+ {
+ if (Ctx.RunRecovery())
+ {
+ ZEN_WARN("scrubbing found {} bad ops in oplog @ '{}', these will be removed from the index", BadEntryKeys.size(), m_BasePath);
+
+ // Actually perform some clean-up
+ RwLock::ExclusiveLockScope _(m_OplogLock);
+
+ for (const auto& Key : BadEntryKeys)
+ {
+ m_LatestOpMap.erase(Key);
+ m_Storage->AppendTombstone(Key);
+ }
+ }
+ else
+ {
+ ZEN_WARN("scrubbing found {} bad ops in oplog @ '{}' but no cleanup will be performed", BadEntryKeys.size(), m_BasePath);
+ }
+ }
}
void
@@ -658,6 +754,8 @@ ProjectStore::Oplog::Update(const std::filesystem::path& MarkerPath)
void
ProjectStore::Oplog::ReplayLog()
{
+ ZEN_LOG_SCOPE("ReplayLog '{}'", m_OplogId);
+
RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
if (!m_Storage)
{
@@ -818,41 +916,55 @@ ProjectStore::Oplog::IterateOplogWithKey(std::function<void(int, const Oid&, CbO
return;
}
- std::vector<size_t> EntryIndexes;
- std::vector<OplogEntryAddress> Entries;
- std::vector<Oid> Keys;
- std::vector<int> LSNs;
- Entries.reserve(m_LatestOpMap.size());
- EntryIndexes.reserve(m_LatestOpMap.size());
- Keys.reserve(m_LatestOpMap.size());
- LSNs.reserve(m_LatestOpMap.size());
+ std::vector<OplogEntryAddress> SortedEntries;
+ std::vector<Oid> SortedKeys;
+ std::vector<int> SortedLSNs;
- for (const auto& Kv : m_LatestOpMap)
{
- const auto AddressEntry = m_OpAddressMap.find(Kv.second);
- ZEN_ASSERT(AddressEntry != m_OpAddressMap.end());
+ const auto TargetEntryCount = m_LatestOpMap.size();
- Entries.push_back(AddressEntry->second);
- Keys.push_back(Kv.first);
- LSNs.push_back(Kv.second);
- EntryIndexes.push_back(EntryIndexes.size());
- }
+ std::vector<size_t> EntryIndexes;
+ std::vector<OplogEntryAddress> Entries;
+ std::vector<Oid> Keys;
+ std::vector<int> LSNs;
- std::sort(EntryIndexes.begin(), EntryIndexes.end(), [&Entries](const size_t& Lhs, const size_t& Rhs) {
- const OplogEntryAddress& LhsEntry = Entries[Lhs];
- const OplogEntryAddress& RhsEntry = Entries[Rhs];
- return LhsEntry.Offset < RhsEntry.Offset;
- });
- std::vector<OplogEntryAddress> SortedEntries;
- SortedEntries.reserve(EntryIndexes.size());
- for (size_t Index : EntryIndexes)
- {
- SortedEntries.push_back(Entries[Index]);
+ Entries.reserve(TargetEntryCount);
+ EntryIndexes.reserve(TargetEntryCount);
+ Keys.reserve(TargetEntryCount);
+ LSNs.reserve(TargetEntryCount);
+
+ for (const auto& Kv : m_LatestOpMap)
+ {
+ const auto AddressEntry = m_OpAddressMap.find(Kv.second);
+ ZEN_ASSERT(AddressEntry != m_OpAddressMap.end());
+
+ Entries.push_back(AddressEntry->second);
+ Keys.push_back(Kv.first);
+ LSNs.push_back(Kv.second);
+ EntryIndexes.push_back(EntryIndexes.size());
+ }
+
+ std::sort(EntryIndexes.begin(), EntryIndexes.end(), [&Entries](const size_t& Lhs, const size_t& Rhs) {
+ const OplogEntryAddress& LhsEntry = Entries[Lhs];
+ const OplogEntryAddress& RhsEntry = Entries[Rhs];
+ return LhsEntry.Offset < RhsEntry.Offset;
+ });
+
+ SortedEntries.reserve(EntryIndexes.size());
+ SortedKeys.reserve(EntryIndexes.size());
+ SortedLSNs.reserve(EntryIndexes.size());
+
+ for (size_t Index : EntryIndexes)
+ {
+ SortedEntries.push_back(Entries[Index]);
+ SortedKeys.push_back(Keys[Index]);
+ SortedLSNs.push_back(LSNs[Index]);
+ }
}
size_t EntryIndex = 0;
m_Storage->ReplayLogEntries(SortedEntries, [&](CbObjectView Op) {
- Handler(LSNs[EntryIndex], Keys[EntryIndex], Op);
+ Handler(SortedLSNs[EntryIndex], SortedKeys[EntryIndex], Op);
EntryIndex++;
});
}
@@ -1030,7 +1142,7 @@ ProjectStore::Oplog::GetMapping(CbObjectView Core)
}
if (ClientPath.empty())
{
- ZEN_WARN("invalid file for entry '{}', missing 'both 'clientpath'", Id);
+ ZEN_WARN("invalid file for entry '{}', missing 'clientpath' field", Id);
continue;
}
@@ -1088,7 +1200,7 @@ ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock,
}
m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize});
- m_LatestOpMap[OpEntry.OpKeyAsOId()] = OpEntry.OpLsn;
+ m_LatestOpMap[OpEntry.OpKeyHash] = OpEntry.OpLsn;
return OpEntry.OpLsn;
}
@@ -1150,7 +1262,9 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core)
XXH3_128Stream KeyHasher;
Core["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
- XXH3_128 KeyHash = KeyHasher.GetHash();
+ XXH3_128 KeyHash128 = KeyHasher.GetHash();
+ Oid KeyHash;
+ memcpy(&KeyHash, KeyHash128.Hash, sizeof KeyHash);
RefPtr<OplogStorage> Storage;
{
@@ -1545,7 +1659,7 @@ ProjectStore::Project::ScrubStorage(ScrubContext& Ctx)
{
OpenOplog(OpLogId);
}
- IterateOplogs([&](const RwLock::SharedLockScope& ProjectLock, const Oplog& Ops) {
+ IterateOplogs([&](const RwLock::SharedLockScope& ProjectLock, Oplog& Ops) {
if (!IsExpired(ProjectLock, GcClock::TimePoint::min(), Ops))
{
Ops.ScrubStorage(Ctx);
@@ -3290,7 +3404,8 @@ ProjectStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
if (!Project->IsExpired(Lock, Ctx.Settings.ProjectStoreExpireTime))
{
ZEN_DEBUG(
- "GCV2: projectstore [REMOVE EXPIRED] '{}': skipped garbage collect of project '{}'. Project no longer expired.",
+ "GCV2: projectstore [REMOVE EXPIRED] '{}': skipped garbage collect of project '{}'. Project no longer "
+ "expired.",
m_ProjectBasePath,
ProjectId);
continue;
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index 57cda8ae7..5ebcd420c 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -31,14 +31,11 @@ struct OplogEntry
uint32_t OpCoreOffset; // note: Multiple of alignment!
uint32_t OpCoreSize;
uint32_t OpCoreHash; // Used as checksum
- XXH3_128 OpKeyHash; // XXH128_canonical_t
+ Oid OpKeyHash;
+ uint32_t Reserved;
- inline Oid OpKeyAsOId() const
- {
- Oid Id;
- memcpy(Id.OidBits, &OpKeyHash, sizeof Id.OidBits);
- return Id;
- }
+ inline bool IsTombstone() const { return OpCoreOffset == 0 && OpCoreSize == 0 && OpLsn == 0; }
+ inline void MakeTombstone() { OpLsn = OpCoreOffset = OpCoreSize = OpCoreHash = Reserved = 0; }
};
struct OplogEntryAddress
@@ -127,7 +124,7 @@ public:
LoggerRef Log() { return m_OuterProject->Log(); }
void Flush();
- void ScrubStorage(ScrubContext& Ctx) const;
+ void ScrubStorage(ScrubContext& Ctx);
void GatherReferences(GcContext& GcCtx);
static uint64_t TotalSize(const std::filesystem::path& BasePath);
uint64_t TotalSize() const;
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index 418e3c325..336f715f4 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -120,6 +120,7 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen
m_DebugOptionForcedCrash = ServerOptions.ShouldCrash;
m_IsPowerCycle = ServerOptions.IsPowerCycle;
const int ParentPid = ServerOptions.OwnerPid;
+ m_StartupScrubOptions = ServerOptions.ScrubOptions;
if (ParentPid)
{
@@ -554,10 +555,6 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
void
ZenServer::Run()
{
- // This is disabled for now, awaiting better scheduling
- //
- // ScrubStorage();
-
if (m_ProcessMonitor.IsActive())
{
CheckOwnerPid();
@@ -594,6 +591,69 @@ ZenServer::Run()
OnReady();
+ if (!m_StartupScrubOptions.empty())
+ {
+ using namespace std::literals;
+
+ ZEN_INFO("triggering scrub with settings: '{}'", m_StartupScrubOptions);
+
+ bool DoScrub = true;
+ bool DoWait = false;
+ GcScheduler::TriggerScrubParams ScrubParams;
+
+ ForEachStrTok(m_StartupScrubOptions, ',', [&](std::string_view Token) {
+ if (Token == "nocas"sv)
+ {
+ ScrubParams.SkipCas = true;
+ }
+ else if (Token == "nodelete"sv)
+ {
+ ScrubParams.SkipDelete = true;
+ }
+ else if (Token == "nogc"sv)
+ {
+ ScrubParams.SkipGc = true;
+ }
+ else if (Token == "no"sv)
+ {
+ DoScrub = false;
+ }
+ else if (Token == "wait"sv)
+ {
+ DoWait = true;
+ }
+ return true;
+ });
+
+ if (DoScrub)
+ {
+ m_GcScheduler.TriggerScrub(ScrubParams);
+
+ if (DoWait)
+ {
+ auto State = m_GcScheduler.Status();
+
+ while ((State != GcSchedulerStatus::kRunning) && (State != GcSchedulerStatus::kStopped))
+ {
+ Sleep(500);
+
+ State = m_GcScheduler.Status();
+ }
+
+ ZEN_INFO("waiting for Scrub/GC to complete...");
+
+ while (State == GcSchedulerStatus::kRunning)
+ {
+ Sleep(500);
+
+ State = m_GcScheduler.Status();
+ }
+
+ ZEN_INFO("Scrub/GC completed");
+ }
+ }
+ }
+
if (m_IsPowerCycle)
{
ZEN_INFO("Power cycle mode enabled -- shutting down");
diff --git a/src/zenserver/zenserver.h b/src/zenserver/zenserver.h
index 68d928bec..1afd70b3e 100644
--- a/src/zenserver/zenserver.h
+++ b/src/zenserver/zenserver.h
@@ -139,6 +139,8 @@ private:
bool m_DebugOptionForcedCrash = false;
bool m_UseSentry = false;
+
+ std::string m_StartupScrubOptions;
};
} // namespace zen
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 96ab65a5f..b21f9f8d8 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -254,6 +254,12 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
{
ZEN_TRACE_CPU("CasContainer::ScrubStorage");
+ if (Ctx.IsSkipCas())
+ {
+ ZEN_INFO("SKIPPED scrubbing: '{}'", m_BlocksBasePath);
+ return;
+ }
+
ZEN_INFO("scrubbing '{}'", m_BlocksBasePath);
std::vector<IoHash> BadKeys;
@@ -297,21 +303,12 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
uint64_t RawSize;
if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
{
- if (RawHash != Hash)
+ if (RawHash == Hash)
{
- // Hash mismatch
- BadKeys.push_back(Hash);
+ // TODO: this should also hash the (decompressed) contents
return;
}
- return;
- }
-#if ZEN_WITH_TESTS
- IoHash ComputedHash = IoHash::HashBuffer(Data, Size);
- if (ComputedHash == Hash)
- {
- return;
}
-#endif
BadKeys.push_back(Hash);
};
@@ -326,26 +323,15 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
IoHash RawHash;
uint64_t RawSize;
- // TODO: Add API to verify compressed buffer without having to memorymap the whole file
+ // TODO: Add API to verify compressed buffer without having to memory-map the whole file
if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
{
- if (RawHash != Hash)
+ if (RawHash == Hash)
{
- // Hash mismatch
- BadKeys.push_back(Hash);
+ // TODO: this should also hash the (decompressed) contents
return;
}
- return;
- }
-#if ZEN_WITH_TESTS
- IoHashStream Hasher;
- File.StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); });
- IoHash ComputedHash = Hasher.GetHash();
- if (ComputedHash == Hash)
- {
- return;
}
-#endif
BadKeys.push_back(Hash);
};
@@ -398,7 +384,7 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
Ctx.ReportBadCidChunks(BadKeys);
}
- ZEN_INFO("compact cas scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes));
+ ZEN_INFO("scrubbed {} chunks ({}) in '{}'", ChunkCount, NiceBytes(ChunkBytes), m_RootDirectory / m_ContainerBaseName);
}
void
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 5da612e30..f18509758 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -846,6 +846,13 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
{
ZEN_TRACE_CPU("FileCas::ScrubStorage");
+ if (Ctx.IsSkipCas())
+ {
+ ZEN_INFO("SKIPPED scrubbing: '{}'", m_RootDirectory);
+ return;
+ }
+
+ Stopwatch Timer;
ZEN_INFO("scrubbing file CAS @ '{}'", m_RootDirectory);
ZEN_ASSERT(m_IsInitialized);
@@ -853,6 +860,8 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
std::vector<IoHash> BadHashes;
uint64_t ChunkCount{0}, ChunkBytes{0};
+ int DiscoveredFilesNotInIndex = 0;
+
{
std::vector<FileCasStrategy::FileCasIndexEntry> ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory);
RwLock::ExclusiveLockScope _(m_Lock);
@@ -862,10 +871,13 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
{
m_TotalSize.fetch_add(static_cast<uint64_t>(Entry.Size), std::memory_order::relaxed);
m_CasLog.Append({.Key = Entry.Key, .Size = Entry.Size});
+ ++DiscoveredFilesNotInIndex;
}
}
}
+ ZEN_INFO("discovered {} files @ '{}' ({} not in index), scrubbing", m_Index.size(), m_RootDirectory, DiscoveredFilesNotInIndex);
+
IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) {
if (!Payload)
{
@@ -875,25 +887,65 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
++ChunkCount;
ChunkBytes += Payload.GetSize();
+ IoBuffer InMemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Payload);
+
IoHash RawHash;
uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize))
+ if (CompressedBuffer::ValidateCompressedHeader(Payload, /* out */ RawHash, /* out */ RawSize))
{
- if (RawHash != Hash)
+ if (RawHash == Hash)
{
- // Hash mismatch
- BadHashes.push_back(Hash);
- return;
+ // Header hash matches the file name, full validation requires that
+ // we check that the decompressed data hash also matches
+
+ CompressedBuffer CompBuffer = CompressedBuffer::FromCompressedNoValidate(std::move(InMemoryBuffer));
+
+ OodleCompressor Compressor;
+ OodleCompressionLevel CompressionLevel;
+ uint64_t BlockSize;
+ if (CompBuffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
+ {
+ if (BlockSize == 0)
+ {
+ BlockSize = 256 * 1024;
+ }
+ else if (BlockSize < (1024 * 1024))
+ {
+ BlockSize = BlockSize * (1024 * 1024 / BlockSize);
+ }
+
+ std::unique_ptr<uint8_t[]> DecompressionBuffer(new uint8_t[BlockSize]);
+
+ IoHashStream Hasher;
+
+ uint64_t RawOffset = 0;
+ while (RawSize)
+ {
+ const uint64_t DecompressedBlockSize = Min(BlockSize, RawSize);
+
+ bool Ok = CompBuffer.TryDecompressTo(MutableMemoryView((void*)DecompressionBuffer.get(), DecompressedBlockSize),
+ RawOffset);
+
+ if (Ok)
+ {
+ Hasher.Append(DecompressionBuffer.get(), DecompressedBlockSize);
+ }
+
+ RawSize -= DecompressedBlockSize;
+ RawOffset += DecompressedBlockSize;
+ }
+
+ const IoHash FinalHash = Hasher.GetHash();
+
+ if (FinalHash == Hash)
+ {
+ // all good
+ return;
+ }
+ }
}
- return;
- }
-#if ZEN_WITH_TESTS
- IoHash ComputedHash = IoHash::HashBuffer(CompositeBuffer(SharedBuffer(std::move(Payload))));
- if (ComputedHash == Hash)
- {
- return;
}
-#endif
+
BadHashes.push_back(Hash);
});
@@ -901,7 +953,7 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
if (!BadHashes.empty())
{
- ZEN_WARN("file CAS scrubbing: {} bad chunks found", BadHashes.size());
+ ZEN_WARN("file CAS scrubbing: {} bad chunks found @ '{}'", BadHashes.size(), m_RootDirectory);
if (Ctx.RunRecovery())
{
@@ -914,10 +966,14 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
if (Ec)
{
- ZEN_WARN("failed to delete file for chunk {}", Hash);
+ ZEN_WARN("failed to delete file for chunk {}: {}", Hash, Ec.message());
}
}
}
+ else
+ {
+ ZEN_WARN("recovery: NOT deleting backing files for {} bad chunks", BadHashes.size());
+ }
}
// Let whomever it concerns know about the bad chunks. This could
@@ -925,7 +981,11 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
// than a full validation pass might be able to do
Ctx.ReportBadCidChunks(BadHashes);
- ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes));
+ ZEN_INFO("file CAS @ '{}' scrubbed: {} chunks ({}), took {}",
+ m_RootDirectory,
+ ChunkCount,
+ NiceBytes(ChunkBytes),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
}
void
diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp
index 2660c2643..de653b0e3 100644
--- a/src/zenstore/gc.cpp
+++ b/src/zenstore/gc.cpp
@@ -1712,12 +1712,18 @@ GcScheduler::SchedulerThread()
DoGc = false;
}
+ if (m_TriggerScrubParams->SkipCas)
+ {
+ SkipCid = true;
+ }
+
+ DoDelete = !m_TriggerScrubParams->SkipDelete;
ScrubTimeslice = m_TriggerScrubParams->MaxTimeslice;
}
if (DoScrubbing)
{
- ScrubStorage(DoDelete, ScrubTimeslice);
+ ScrubStorage(DoDelete, SkipCid, ScrubTimeslice);
m_TriggerScrubParams.reset();
}
@@ -1961,7 +1967,7 @@ GcScheduler::SchedulerThread()
}
void
-GcScheduler::ScrubStorage(bool DoDelete, std::chrono::seconds TimeSlice)
+GcScheduler::ScrubStorage(bool DoDelete, bool SkipCid, std::chrono::seconds TimeSlice)
{
const std::chrono::steady_clock::time_point TimeNow = std::chrono::steady_clock::now();
std::chrono::steady_clock::time_point Deadline = TimeNow + TimeSlice;
@@ -1972,13 +1978,14 @@ GcScheduler::ScrubStorage(bool DoDelete, std::chrono::seconds TimeSlice)
}
Stopwatch Timer;
- ZEN_INFO("scrubbing STARTING (delete mode => {})", DoDelete);
+ ZEN_INFO("scrubbing STARTING (delete mode => {}, skip CID => {})", DoDelete, SkipCid);
WorkerThreadPool& ThreadPool = GetSmallWorkerPool();
ScrubContext Ctx{ThreadPool, Deadline};
try
{
+ Ctx.SetSkipCas(SkipCid);
Ctx.SetShouldDelete(DoDelete);
m_GcManager.ScrubStorage(Ctx);
}
diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h
index 698b0d4e8..30dd97ce8 100644
--- a/src/zenstore/include/zenstore/gc.h
+++ b/src/zenstore/include/zenstore/gc.h
@@ -492,6 +492,8 @@ public:
{
bool SkipGc = false;
std::chrono::seconds MaxTimeslice = std::chrono::seconds::max();
+ bool SkipDelete = false;
+ bool SkipCas = false;
};
bool TriggerScrub(const TriggerScrubParams& Params);
@@ -508,7 +510,7 @@ private:
GcVersion UseGCVersion,
uint32_t CompactBlockUsageThresholdPercent,
bool Verbose);
- void ScrubStorage(bool DoDelete, std::chrono::seconds TimeSlice);
+ void ScrubStorage(bool DoDelete, bool SkipCid, std::chrono::seconds TimeSlice);
LoggerRef Log() { return m_Log; }
virtual bool AreDiskWritesAllowed() const override { return !m_AreDiskWritesBlocked.load(); }
DiskSpace CheckDiskSpace();
diff --git a/src/zenstore/include/zenstore/scrubcontext.h b/src/zenstore/include/zenstore/scrubcontext.h
index cefaf0888..2f28cfec7 100644
--- a/src/zenstore/include/zenstore/scrubcontext.h
+++ b/src/zenstore/include/zenstore/scrubcontext.h
@@ -38,15 +38,20 @@ public:
inline uint64_t ScrubbedBytes() const { return m_ByteCount; }
HashKeySet BadCids() const;
+ bool IsBadCid(const IoHash& Cid) const;
inline bool RunRecovery() const { return m_Recover; }
inline void SetShouldDelete(bool DoDelete) { m_Recover = DoDelete; }
+ inline bool IsSkipCas() const { return m_SkipCas; }
+ inline void SetSkipCas(bool DoSkipCas) { m_SkipCas = DoSkipCas; }
+
inline WorkerThreadPool& ThreadPool() { return m_WorkerThreadPool; }
private:
uint64_t m_ScrubTime = GetHifreqTimerValue();
bool m_Recover = true;
+ bool m_SkipCas = false;
std::atomic<uint64_t> m_ChunkCount{0};
std::atomic<uint64_t> m_ByteCount{0};
mutable RwLock m_Lock;
diff --git a/src/zenstore/scrubcontext.cpp b/src/zenstore/scrubcontext.cpp
index f5a3784c3..fbcd7d33c 100644
--- a/src/zenstore/scrubcontext.cpp
+++ b/src/zenstore/scrubcontext.cpp
@@ -33,6 +33,13 @@ ScrubContext::BadCids() const
return m_BadCid;
}
+bool
+ScrubContext::IsBadCid(const IoHash& Cid) const
+{
+ RwLock::SharedLockScope _(m_Lock);
+ return m_BadCid.ContainsHash(Cid);
+}
+
void
ScrubContext::ReportBadCidChunks(std::span<IoHash> BadCasChunks)
{