aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-09-13 11:48:49 -0400
committerGitHub <[email protected]>2023-09-13 17:48:49 +0200
commit37069155238a1c0259b08c28c21f2849c406cda2 (patch)
treed87eee0ccc2b0fcbd929a0f63d574eb4919c2f0c /src
parentissue warning instead of assert on bad data in cid store (#400) (diff)
downloadzen-37069155238a1c0259b08c28c21f2849c406cda2.tar.xz
zen-37069155238a1c0259b08c28c21f2849c406cda2.zip
scan oplog object for fields (#397)
* scan oplog object for fields * Read all oplog entries but only read op data and get mapping for latest op of each key
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/projectstore/projectstore.cpp219
1 files changed, 130 insertions, 89 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index f993a7ec7..6ec18904e 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -291,45 +291,71 @@ struct ProjectStore::OplogStorage : public RefCounted
Stopwatch Timer;
uint64_t InvalidEntries = 0;
+ size_t LargestOp = 256;
- BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536);
- IoBuffer OpBuffer(512);
-
- m_Oplog.Replay(
- [&](const OplogEntry& LogEntry) {
- if (LogEntry.OpCoreSize == 0)
- {
- ++InvalidEntries;
-
- return;
- }
-
- if (OpBuffer.GetSize() < LogEntry.OpCoreSize)
- {
- OpBuffer = IoBuffer(LogEntry.OpCoreSize);
- }
+ std::vector<OplogEntry> OpLogEntries;
+ std::vector<size_t> OplogOrder;
+ {
+ tsl::robin_map<XXH3_128, size_t, XXH3_128::Hasher> LatestKeys;
+ m_Oplog.Replay(
+ [&](const OplogEntry& LogEntry) {
+ if (LogEntry.OpCoreSize == 0)
+ {
+ ++InvalidEntries;
+ return;
+ }
+ if (LogEntry.OpCoreSize > LargestOp)
+ {
+ LargestOp = LogEntry.OpCoreSize;
+ }
+ const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
+ m_NextOpsOffset =
+ Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign));
+ m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn);
- const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
- OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset);
+ if (auto It = LatestKeys.find(LogEntry.OpKeyHash); It != LatestKeys.end())
+ {
+ OpLogEntries[It->second] = LogEntry;
+ }
+ else
+ {
+ size_t OpIndex = OpLogEntries.size();
+ LatestKeys[LogEntry.OpKeyHash] = OpIndex;
+ OplogOrder.push_back(OpIndex);
+ OpLogEntries.push_back(LogEntry);
+ }
+ },
+ 0);
+ }
+ std::sort(OplogOrder.begin(), OplogOrder.end(), [&](size_t Lhs, size_t Rhs) {
+ const OplogEntry& LhsEntry = OpLogEntries[Lhs];
+ const OplogEntry& RhsEntry = OpLogEntries[Rhs];
+ return LhsEntry.OpCoreOffset < RhsEntry.OpCoreOffset;
+ });
- // Verify checksum, ignore op data if incorrect
- const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), LogEntry.OpCoreSize) & 0xffffFFFF);
+ BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536);
+ IoBuffer OpBuffer(LargestOp);
- if (OpCoreHash != LogEntry.OpCoreHash)
- {
- ZEN_WARN("skipping oplog entry with bad checksum!");
- return;
- }
+ for (size_t OplogOrderIndex : OplogOrder)
+ {
+ const OplogEntry& LogEntry = OpLogEntries[OplogOrderIndex];
+ ZEN_ASSERT_SLOW(OpBuffer.GetSize() >= LogEntry.OpCoreSize);
- CbObject Op(SharedBuffer::MakeView(OpBuffer.Data(), LogEntry.OpCoreSize));
+ const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
+ OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset);
- m_NextOpsOffset =
- Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign));
- m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn);
+ // Verify checksum, ignore op data if incorrect
+ const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), LogEntry.OpCoreSize) & 0xffffFFFF);
- Handler(Op, LogEntry);
- },
- 0);
+ if (OpCoreHash != LogEntry.OpCoreHash)
+ {
+ ZEN_WARN("skipping oplog entry with bad checksum!");
+ InvalidEntries++;
+ continue;
+ }
+ CbObject Op(SharedBuffer::MakeView(OpBuffer.Data(), LogEntry.OpCoreSize));
+ Handler(Op, LogEntry);
+ }
if (InvalidEntries)
{
@@ -852,72 +878,87 @@ ProjectStore::Oplog::GetMapping(CbObject Core)
OplogEntryMapping Result;
// Update chunk id maps
- CbObjectView PackageObj = Core["package"sv].AsObjectView();
- CbArrayView BulkDataArray = Core["bulkdata"sv].AsArrayView();
- CbArrayView PackageDataArray = Core["packagedata"sv].AsArrayView();
- Result.Chunks.reserve(PackageObj ? 1 : 0 + BulkDataArray.Num() + PackageDataArray.Num());
-
- if (PackageObj)
- {
- Oid Id = PackageObj["id"sv].AsObjectId();
- IoHash Hash = PackageObj["data"sv].AsBinaryAttachment();
- Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash});
- ZEN_DEBUG("package data {} -> {}", Id, Hash);
- }
-
- for (CbFieldView& Entry : PackageDataArray)
+ for (CbFieldView Field : Core)
{
- CbObjectView PackageDataObj = Entry.AsObjectView();
- Oid Id = PackageDataObj["id"sv].AsObjectId();
- IoHash Hash = PackageDataObj["data"sv].AsBinaryAttachment();
- Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash});
- ZEN_DEBUG("package {} -> {}", Id, Hash);
- }
-
- for (CbFieldView& Entry : BulkDataArray)
- {
- CbObjectView BulkObj = Entry.AsObjectView();
- Oid Id = BulkObj["id"sv].AsObjectId();
- IoHash Hash = BulkObj["data"sv].AsBinaryAttachment();
- Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash});
- ZEN_DEBUG("bulkdata {} -> {}", Id, Hash);
- }
-
- CbArrayView FilesArray = Core["files"sv].AsArrayView();
- Result.Files.reserve(FilesArray.Num());
- for (CbFieldView& Entry : FilesArray)
- {
- CbObjectView FileObj = Entry.AsObjectView();
-
- std::string_view ServerPath = FileObj["serverpath"sv].AsString();
- std::string_view ClientPath = FileObj["clientpath"sv].AsString();
- Oid Id = FileObj["id"sv].AsObjectId();
- IoHash Hash = FileObj["data"sv].AsBinaryAttachment();
- if (ServerPath.empty() && Hash == IoHash::Zero)
+ std::string_view FieldName = Field.GetName();
+ if (FieldName == "package"sv)
+ {
+ CbObjectView PackageObj = Field.AsObjectView();
+ Oid Id = PackageObj["id"sv].AsObjectId();
+ IoHash Hash = PackageObj["data"sv].AsBinaryAttachment();
+ Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash});
+ ZEN_DEBUG("package data {} -> {}", Id, Hash);
+ continue;
+ }
+ if (FieldName == "bulkdata"sv)
{
- ZEN_WARN("invalid file for entry '{}', missing both 'serverpath' and 'data' fields", Id);
+ CbArrayView BulkDataArray = Field.AsArrayView();
+ for (CbFieldView& Entry : BulkDataArray)
+ {
+ CbObjectView BulkObj = Entry.AsObjectView();
+ Oid Id = BulkObj["id"sv].AsObjectId();
+ IoHash Hash = BulkObj["data"sv].AsBinaryAttachment();
+ Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash});
+ ZEN_DEBUG("bulkdata {} -> {}", Id, Hash);
+ }
continue;
}
- if (ClientPath.empty())
+ if (FieldName == "packagedata"sv)
{
- ZEN_WARN("invalid file for entry '{}', missing 'both 'clientpath'", Id);
+ CbArrayView PackageDataArray = Field.AsArrayView();
+ for (CbFieldView& Entry : PackageDataArray)
+ {
+ CbObjectView PackageDataObj = Entry.AsObjectView();
+ Oid Id = PackageDataObj["id"sv].AsObjectId();
+ IoHash Hash = PackageDataObj["data"sv].AsBinaryAttachment();
+ Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash});
+ ZEN_DEBUG("package {} -> {}", Id, Hash);
+ }
continue;
}
+ if (FieldName == "files"sv)
+ {
+ CbArrayView FilesArray = Field.AsArrayView();
+ Result.Files.reserve(FilesArray.Num());
+ for (CbFieldView& Entry : FilesArray)
+ {
+ CbObjectView FileObj = Entry.AsObjectView();
- Result.Files.emplace_back(OplogEntryMapping::FileMapping{Id, Hash, std::string(ServerPath), std::string(ClientPath)});
- ZEN_DEBUG("file {} -> {}, ServerPath: {}, ClientPath: {}", Id, Hash, ServerPath, ClientPath);
- }
+ std::string_view ServerPath = FileObj["serverpath"sv].AsString();
+ std::string_view ClientPath = FileObj["clientpath"sv].AsString();
+ Oid Id = FileObj["id"sv].AsObjectId();
+ IoHash Hash = FileObj["data"sv].AsBinaryAttachment();
+ if (ServerPath.empty() && Hash == IoHash::Zero)
+ {
+ ZEN_WARN("invalid file for entry '{}', missing both 'serverpath' and 'data' fields", Id);
+ continue;
+ }
+ if (ClientPath.empty())
+ {
+ ZEN_WARN("invalid file for entry '{}', missing 'both 'clientpath'", Id);
+ continue;
+ }
- CbArrayView MetaArray = Core["meta"sv].AsArrayView();
- Result.Meta.reserve(MetaArray.Num());
- for (CbFieldView& Entry : MetaArray)
- {
- CbObjectView MetaObj = Entry.AsObjectView();
- Oid Id = MetaObj["id"sv].AsObjectId();
- IoHash Hash = MetaObj["data"sv].AsBinaryAttachment();
- Result.Meta.emplace_back(OplogEntryMapping::Mapping{Id, Hash});
- auto NameString = MetaObj["name"sv].AsString();
- ZEN_DEBUG("meta data ({}) {} -> {}", NameString, Id, Hash);
+ Result.Files.emplace_back(OplogEntryMapping::FileMapping{Id, Hash, std::string(ServerPath), std::string(ClientPath)});
+ ZEN_DEBUG("file {} -> {}, ServerPath: {}, ClientPath: {}", Id, Hash, ServerPath, ClientPath);
+ }
+ continue;
+ }
+ if (FieldName == "meta"sv)
+ {
+ CbArrayView MetaArray = Field.AsArrayView();
+ Result.Meta.reserve(MetaArray.Num());
+ for (CbFieldView& Entry : MetaArray)
+ {
+ CbObjectView MetaObj = Entry.AsObjectView();
+ Oid Id = MetaObj["id"sv].AsObjectId();
+ IoHash Hash = MetaObj["data"sv].AsBinaryAttachment();
+ Result.Meta.emplace_back(OplogEntryMapping::Mapping{Id, Hash});
+ auto NameString = MetaObj["name"sv].AsString();
+ ZEN_DEBUG("meta data ({}) {} -> {}", NameString, Id, Hash);
+ }
+ continue;
+ }
}
return Result;