diff options
| author | Dan Engelbrecht <[email protected]> | 2023-10-13 09:38:02 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-10-13 09:38:02 +0200 |
| commit | 1d992a472c54ef9a63364996031e3c6d2f8affe5 (patch) | |
| tree | bb20344ed4c5b4295c12914bf0124e11be5ff3a7 /src/zenserver/projectstore/projectstore.cpp | |
| parent | Merge pull request #465 from EpicGames/zs/default-port-change (diff) | |
| download | zen-1d992a472c54ef9a63364996031e3c6d2f8affe5.tar.xz zen-1d992a472c54ef9a63364996031e3c6d2f8affe5.zip | |
faster oplog iteration (#471)
* use a CbObjectView instead of CbObject to avoid creating IOBufferCore instances
* use BasicFileBuffer directly where possible
* changelog
Diffstat (limited to 'src/zenserver/projectstore/projectstore.cpp')
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 56 |
1 files changed, 34 insertions, 22 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 02760f6dd..8faad58b6 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -288,7 +288,7 @@ struct ProjectStore::OplogStorage : public RefCounted ZEN_ASSERT(!(m_NextOpsOffset & (m_OpsAlign - 1))); } - void ReplayLog(std::function<void(CbObject, const OplogEntry&)>&& Handler) + void ReplayLog(std::function<void(CbObjectView, const OplogEntry&)>&& Handler) { ZEN_TRACE_CPU("Store::OplogStorage::ReplayLog"); @@ -299,7 +299,6 @@ struct ProjectStore::OplogStorage : public RefCounted Stopwatch Timer; uint64_t InvalidEntries = 0; - size_t LargestOp = 256; std::vector<OplogEntry> OpLogEntries; std::vector<size_t> OplogOrder; @@ -312,10 +311,6 @@ struct ProjectStore::OplogStorage : public RefCounted ++InvalidEntries; return; } - if (LogEntry.OpCoreSize > LargestOp) - { - LargestOp = LogEntry.OpCoreSize; - } const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; m_NextOpsOffset = Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign)); @@ -342,14 +337,29 @@ struct ProjectStore::OplogStorage : public RefCounted }); BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536); - IoBuffer OpBuffer(LargestOp); for (size_t OplogOrderIndex : OplogOrder) { const OplogEntry& LogEntry = OpLogEntries[OplogOrderIndex]; - ZEN_ASSERT_SLOW(OpBuffer.GetSize() >= LogEntry.OpCoreSize); const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; + MemoryView OpBufferView = OpBlobsBuffer.MakeView(LogEntry.OpCoreSize, OpFileOffset); + if (OpBufferView.GetSize() == LogEntry.OpCoreSize) + { + // Verify checksum, ignore op data if incorrect + const auto OpCoreHash = uint32_t(XXH3_64bits(OpBufferView.GetData(), LogEntry.OpCoreSize) & 0xffffFFFF); + + if (OpCoreHash != LogEntry.OpCoreHash) + { + ZEN_WARN("skipping oplog entry with bad checksum!"); + InvalidEntries++; + continue; + } + Handler(CbObjectView(OpBufferView.GetData()), LogEntry); + continue; + } + + IoBuffer OpBuffer(LogEntry.OpCoreSize); OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset); // Verify checksum, ignore op data if incorrect @@ -361,7 +371,7 @@ struct ProjectStore::OplogStorage : public RefCounted InvalidEntries++; continue; } - Handler(CbObject(SharedBuffer::MakeView(OpBuffer.Data(), LogEntry.OpCoreSize)), LogEntry); + Handler(CbObjectView(OpBuffer.Data()), LogEntry); } if (InvalidEntries) @@ -375,22 +385,24 @@ struct ProjectStore::OplogStorage : public RefCounted m_NextOpsOffset.load()); } - void ReplayLogEntries(const std::span<OplogEntryAddress> Entries, std::function<void(CbObject)>&& Handler) + void ReplayLogEntries(const std::span<OplogEntryAddress> Entries, std::function<void(CbObjectView)>&& Handler) { ZEN_TRACE_CPU("Store::OplogStorage::ReplayLogEntries"); BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536); - IoBuffer OpBuffer(512); for (const OplogEntryAddress& Entry : Entries) { const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign; - if (OpBuffer.Size() < Entry.Size) + MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Size, OpFileOffset); + if (OpBufferView.GetSize() == Entry.Size) { - OpBuffer = IoBuffer(Entry.Size); + Handler(CbObjectView(OpBufferView.GetData())); + continue; } + IoBuffer OpBuffer(Entry.Size); OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset); - Handler(CbObject(SharedBuffer::MakeView(OpBuffer.Data(), Entry.Size))); + Handler(CbObjectView(OpBuffer.Data())); } } @@ -509,7 +521,7 @@ ProjectStore::Oplog::GatherReferences(GcContext& GcCtx) std::vector<IoHash> Cids; Cids.reserve(1024); - IterateOplog([&](CbObject Op) { + IterateOplog([&](CbObjectView Op) { Op.IterateAttachments([&](CbFieldView Visitor) { Cids.emplace_back(Visitor.AsAttachment()); }); if (Cids.size() >= 1024) { @@ -649,7 +661,7 @@ ProjectStore::Oplog::ReplayLog() { return; } - m_Storage->ReplayLog([&](CbObject Op, const OplogEntry& OpEntry) { RegisterOplogEntry(OplogLock, GetMapping(Op), OpEntry); }); + m_Storage->ReplayLog([&](CbObjectView Op, const OplogEntry& OpEntry) { RegisterOplogEntry(OplogLock, GetMapping(Op), OpEntry); }); } IoBuffer @@ -754,7 +766,7 @@ ProjectStore::Oplog::IterateFileMap( } void -ProjectStore::Oplog::IterateOplog(std::function<void(CbObject)>&& Handler) +ProjectStore::Oplog::IterateOplog(std::function<void(CbObjectView)>&& Handler) { RwLock::SharedLockScope _(m_OplogLock); if (!m_Storage) @@ -777,11 +789,11 @@ ProjectStore::Oplog::IterateOplog(std::function<void(CbObject)>&& Handler) return Lhs.Offset < Rhs.Offset; }); - m_Storage->ReplayLogEntries(Entries, [&](CbObject Op) { Handler(Op); }); + m_Storage->ReplayLogEntries(Entries, [&](CbObjectView Op) { Handler(Op); }); } void -ProjectStore::Oplog::IterateOplogWithKey(std::function<void(int, const Oid&, CbObject)>&& Handler) +ProjectStore::Oplog::IterateOplogWithKey(std::function<void(int, const Oid&, CbObjectView)>&& Handler) { RwLock::SharedLockScope _(m_OplogLock); if (!m_Storage) @@ -822,7 +834,7 @@ ProjectStore::Oplog::IterateOplogWithKey(std::function<void(int, const Oid&, CbO } size_t EntryIndex = 0; - m_Storage->ReplayLogEntries(SortedEntries, [&](CbObject Op) { + m_Storage->ReplayLogEntries(SortedEntries, [&](CbObjectView Op) { Handler(LSNs[EntryIndex], Keys[EntryIndex], Op); EntryIndex++; }); @@ -926,7 +938,7 @@ ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, Oid Chunk } ProjectStore::Oplog::OplogEntryMapping -ProjectStore::Oplog::GetMapping(CbObject Core) +ProjectStore::Oplog::GetMapping(CbObjectView Core) { using namespace std::literals; @@ -2748,7 +2760,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, std::vector<CbObject> NewOps; std::unordered_map<Oid, IoHash, Oid::Hasher> NewChunkMappings; - Oplog->IterateOplog([&](CbObject Op) { + Oplog->IterateOplog([&](CbObjectView Op) { bool OpRewritten = false; bool AllOk = true; |