diff options
| author | Dan Engelbrecht <[email protected]> | 2023-09-13 11:48:49 -0400 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-09-13 17:48:49 +0200 |
| commit | 37069155238a1c0259b08c28c21f2849c406cda2 (patch) | |
| tree | d87eee0ccc2b0fcbd929a0f63d574eb4919c2f0c /src | |
| parent | issue warning instead of assert on bad data in cid store (#400) (diff) | |
| download | zen-37069155238a1c0259b08c28c21f2849c406cda2.tar.xz zen-37069155238a1c0259b08c28c21f2849c406cda2.zip | |
scan oplog object for fields (#397)
* scan oplog object for fields
* Read all oplog entries but only read op data and get mapping for latest op of each key
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 219 |
1 files changed, 130 insertions, 89 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index f993a7ec7..6ec18904e 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -291,45 +291,71 @@ struct ProjectStore::OplogStorage : public RefCounted Stopwatch Timer; uint64_t InvalidEntries = 0; + size_t LargestOp = 256; - BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536); - IoBuffer OpBuffer(512); - - m_Oplog.Replay( - [&](const OplogEntry& LogEntry) { - if (LogEntry.OpCoreSize == 0) - { - ++InvalidEntries; - - return; - } - - if (OpBuffer.GetSize() < LogEntry.OpCoreSize) - { - OpBuffer = IoBuffer(LogEntry.OpCoreSize); - } + std::vector<OplogEntry> OpLogEntries; + std::vector<size_t> OplogOrder; + { + tsl::robin_map<XXH3_128, size_t, XXH3_128::Hasher> LatestKeys; + m_Oplog.Replay( + [&](const OplogEntry& LogEntry) { + if (LogEntry.OpCoreSize == 0) + { + ++InvalidEntries; + return; + } + if (LogEntry.OpCoreSize > LargestOp) + { + LargestOp = LogEntry.OpCoreSize; + } + const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; + m_NextOpsOffset = + Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign)); + m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn); - const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; - OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset); + if (auto It = LatestKeys.find(LogEntry.OpKeyHash); It != LatestKeys.end()) + { + OpLogEntries[It->second] = LogEntry; + } + else + { + size_t OpIndex = OpLogEntries.size(); + LatestKeys[LogEntry.OpKeyHash] = OpIndex; + OplogOrder.push_back(OpIndex); + OpLogEntries.push_back(LogEntry); + } + }, + 0); + } + std::sort(OplogOrder.begin(), OplogOrder.end(), [&](size_t Lhs, size_t Rhs) { + const OplogEntry& LhsEntry = OpLogEntries[Lhs]; + const OplogEntry& RhsEntry = OpLogEntries[Rhs]; + return LhsEntry.OpCoreOffset < RhsEntry.OpCoreOffset; + }); - // Verify checksum, ignore op data if incorrect - const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), LogEntry.OpCoreSize) & 0xffffFFFF); + BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536); + IoBuffer OpBuffer(LargestOp); - if (OpCoreHash != LogEntry.OpCoreHash) - { - ZEN_WARN("skipping oplog entry with bad checksum!"); - return; - } + for (size_t OplogOrderIndex : OplogOrder) + { + const OplogEntry& LogEntry = OpLogEntries[OplogOrderIndex]; + ZEN_ASSERT_SLOW(OpBuffer.GetSize() >= LogEntry.OpCoreSize); - CbObject Op(SharedBuffer::MakeView(OpBuffer.Data(), LogEntry.OpCoreSize)); + const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; + OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset); - m_NextOpsOffset = - Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign)); - m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn); + // Verify checksum, ignore op data if incorrect + const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), LogEntry.OpCoreSize) & 0xffffFFFF); - Handler(Op, LogEntry); - }, - 0); + if (OpCoreHash != LogEntry.OpCoreHash) + { + ZEN_WARN("skipping oplog entry with bad checksum!"); + InvalidEntries++; + continue; + } + CbObject Op(SharedBuffer::MakeView(OpBuffer.Data(), LogEntry.OpCoreSize)); + Handler(Op, LogEntry); + } if (InvalidEntries) { @@ -852,72 +878,87 @@ ProjectStore::Oplog::GetMapping(CbObject Core) OplogEntryMapping Result; // Update chunk id maps - CbObjectView PackageObj = Core["package"sv].AsObjectView(); - CbArrayView BulkDataArray = Core["bulkdata"sv].AsArrayView(); - CbArrayView PackageDataArray = Core["packagedata"sv].AsArrayView(); - Result.Chunks.reserve(PackageObj ? 1 : 0 + BulkDataArray.Num() + PackageDataArray.Num()); - - if (PackageObj) - { - Oid Id = PackageObj["id"sv].AsObjectId(); - IoHash Hash = PackageObj["data"sv].AsBinaryAttachment(); - Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash}); - ZEN_DEBUG("package data {} -> {}", Id, Hash); - } - - for (CbFieldView& Entry : PackageDataArray) + for (CbFieldView Field : Core) { - CbObjectView PackageDataObj = Entry.AsObjectView(); - Oid Id = PackageDataObj["id"sv].AsObjectId(); - IoHash Hash = PackageDataObj["data"sv].AsBinaryAttachment(); - Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash}); - ZEN_DEBUG("package {} -> {}", Id, Hash); - } - - for (CbFieldView& Entry : BulkDataArray) - { - CbObjectView BulkObj = Entry.AsObjectView(); - Oid Id = BulkObj["id"sv].AsObjectId(); - IoHash Hash = BulkObj["data"sv].AsBinaryAttachment(); - Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash}); - ZEN_DEBUG("bulkdata {} -> {}", Id, Hash); - } - - CbArrayView FilesArray = Core["files"sv].AsArrayView(); - Result.Files.reserve(FilesArray.Num()); - for (CbFieldView& Entry : FilesArray) - { - CbObjectView FileObj = Entry.AsObjectView(); - - std::string_view ServerPath = FileObj["serverpath"sv].AsString(); - std::string_view ClientPath = FileObj["clientpath"sv].AsString(); - Oid Id = FileObj["id"sv].AsObjectId(); - IoHash Hash = FileObj["data"sv].AsBinaryAttachment(); - if (ServerPath.empty() && Hash == IoHash::Zero) + std::string_view FieldName = Field.GetName(); + if (FieldName == "package"sv) + { + CbObjectView PackageObj = Field.AsObjectView(); + Oid Id = PackageObj["id"sv].AsObjectId(); + IoHash Hash = PackageObj["data"sv].AsBinaryAttachment(); + Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash}); + ZEN_DEBUG("package data {} -> {}", Id, Hash); + continue; + } + if (FieldName == "bulkdata"sv) { - ZEN_WARN("invalid file for entry '{}', missing both 'serverpath' and 'data' fields", Id); + CbArrayView BulkDataArray = Field.AsArrayView(); + for (CbFieldView& Entry : BulkDataArray) + { + CbObjectView BulkObj = Entry.AsObjectView(); + Oid Id = BulkObj["id"sv].AsObjectId(); + IoHash Hash = BulkObj["data"sv].AsBinaryAttachment(); + Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash}); + ZEN_DEBUG("bulkdata {} -> {}", Id, Hash); + } continue; } - if (ClientPath.empty()) + if (FieldName == "packagedata"sv) { - ZEN_WARN("invalid file for entry '{}', missing 'both 'clientpath'", Id); + CbArrayView PackageDataArray = Field.AsArrayView(); + for (CbFieldView& Entry : PackageDataArray) + { + CbObjectView PackageDataObj = Entry.AsObjectView(); + Oid Id = PackageDataObj["id"sv].AsObjectId(); + IoHash Hash = PackageDataObj["data"sv].AsBinaryAttachment(); + Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash}); + ZEN_DEBUG("package {} -> {}", Id, Hash); + } continue; } + if (FieldName == "files"sv) + { + CbArrayView FilesArray = Field.AsArrayView(); + Result.Files.reserve(FilesArray.Num()); + for (CbFieldView& Entry : FilesArray) + { + CbObjectView FileObj = Entry.AsObjectView(); - Result.Files.emplace_back(OplogEntryMapping::FileMapping{Id, Hash, std::string(ServerPath), std::string(ClientPath)}); - ZEN_DEBUG("file {} -> {}, ServerPath: {}, ClientPath: {}", Id, Hash, ServerPath, ClientPath); - } + std::string_view ServerPath = FileObj["serverpath"sv].AsString(); + std::string_view ClientPath = FileObj["clientpath"sv].AsString(); + Oid Id = FileObj["id"sv].AsObjectId(); + IoHash Hash = FileObj["data"sv].AsBinaryAttachment(); + if (ServerPath.empty() && Hash == IoHash::Zero) + { + ZEN_WARN("invalid file for entry '{}', missing both 'serverpath' and 'data' fields", Id); + continue; + } + if (ClientPath.empty()) + { + ZEN_WARN("invalid file for entry '{}', missing 'both 'clientpath'", Id); + continue; + } - CbArrayView MetaArray = Core["meta"sv].AsArrayView(); - Result.Meta.reserve(MetaArray.Num()); - for (CbFieldView& Entry : MetaArray) - { - CbObjectView MetaObj = Entry.AsObjectView(); - Oid Id = MetaObj["id"sv].AsObjectId(); - IoHash Hash = MetaObj["data"sv].AsBinaryAttachment(); - Result.Meta.emplace_back(OplogEntryMapping::Mapping{Id, Hash}); - auto NameString = MetaObj["name"sv].AsString(); - ZEN_DEBUG("meta data ({}) {} -> {}", NameString, Id, Hash); + Result.Files.emplace_back(OplogEntryMapping::FileMapping{Id, Hash, std::string(ServerPath), std::string(ClientPath)}); + ZEN_DEBUG("file {} -> {}, ServerPath: {}, ClientPath: {}", Id, Hash, ServerPath, ClientPath); + } + continue; + } + if (FieldName == "meta"sv) + { + CbArrayView MetaArray = Field.AsArrayView(); + Result.Meta.reserve(MetaArray.Num()); + for (CbFieldView& Entry : MetaArray) + { + CbObjectView MetaObj = Entry.AsObjectView(); + Oid Id = MetaObj["id"sv].AsObjectId(); + IoHash Hash = MetaObj["data"sv].AsBinaryAttachment(); + Result.Meta.emplace_back(OplogEntryMapping::Mapping{Id, Hash}); + auto NameString = MetaObj["name"sv].AsString(); + ZEN_DEBUG("meta data ({}) {} -> {}", NameString, Id, Hash); + } + continue; + } } return Result; |