aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore/projectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-10-13 09:38:02 +0200
committerGitHub <[email protected]>2023-10-13 09:38:02 +0200
commit1d992a472c54ef9a63364996031e3c6d2f8affe5 (patch)
treebb20344ed4c5b4295c12914bf0124e11be5ff3a7 /src/zenserver/projectstore/projectstore.cpp
parentMerge pull request #465 from EpicGames/zs/default-port-change (diff)
downloadzen-1d992a472c54ef9a63364996031e3c6d2f8affe5.tar.xz
zen-1d992a472c54ef9a63364996031e3c6d2f8affe5.zip
faster oplog iteration (#471)
* use a CbObjectView instead of CbObject to avoid creating IOBufferCore instances * use BasicFileBuffer directly where possible * changelog
Diffstat (limited to 'src/zenserver/projectstore/projectstore.cpp')
-rw-r--r--src/zenserver/projectstore/projectstore.cpp56
1 files changed, 34 insertions, 22 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 02760f6dd..8faad58b6 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -288,7 +288,7 @@ struct ProjectStore::OplogStorage : public RefCounted
ZEN_ASSERT(!(m_NextOpsOffset & (m_OpsAlign - 1)));
}
- void ReplayLog(std::function<void(CbObject, const OplogEntry&)>&& Handler)
+ void ReplayLog(std::function<void(CbObjectView, const OplogEntry&)>&& Handler)
{
ZEN_TRACE_CPU("Store::OplogStorage::ReplayLog");
@@ -299,7 +299,6 @@ struct ProjectStore::OplogStorage : public RefCounted
Stopwatch Timer;
uint64_t InvalidEntries = 0;
- size_t LargestOp = 256;
std::vector<OplogEntry> OpLogEntries;
std::vector<size_t> OplogOrder;
@@ -312,10 +311,6 @@ struct ProjectStore::OplogStorage : public RefCounted
++InvalidEntries;
return;
}
- if (LogEntry.OpCoreSize > LargestOp)
- {
- LargestOp = LogEntry.OpCoreSize;
- }
const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
m_NextOpsOffset =
Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign));
@@ -342,14 +337,29 @@ struct ProjectStore::OplogStorage : public RefCounted
});
BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536);
- IoBuffer OpBuffer(LargestOp);
for (size_t OplogOrderIndex : OplogOrder)
{
const OplogEntry& LogEntry = OpLogEntries[OplogOrderIndex];
- ZEN_ASSERT_SLOW(OpBuffer.GetSize() >= LogEntry.OpCoreSize);
const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
+ MemoryView OpBufferView = OpBlobsBuffer.MakeView(LogEntry.OpCoreSize, OpFileOffset);
+ if (OpBufferView.GetSize() == LogEntry.OpCoreSize)
+ {
+ // Verify checksum, ignore op data if incorrect
+ const auto OpCoreHash = uint32_t(XXH3_64bits(OpBufferView.GetData(), LogEntry.OpCoreSize) & 0xffffFFFF);
+
+ if (OpCoreHash != LogEntry.OpCoreHash)
+ {
+ ZEN_WARN("skipping oplog entry with bad checksum!");
+ InvalidEntries++;
+ continue;
+ }
+ Handler(CbObjectView(OpBufferView.GetData()), LogEntry);
+ continue;
+ }
+
+ IoBuffer OpBuffer(LogEntry.OpCoreSize);
OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset);
// Verify checksum, ignore op data if incorrect
@@ -361,7 +371,7 @@ struct ProjectStore::OplogStorage : public RefCounted
InvalidEntries++;
continue;
}
- Handler(CbObject(SharedBuffer::MakeView(OpBuffer.Data(), LogEntry.OpCoreSize)), LogEntry);
+ Handler(CbObjectView(OpBuffer.Data()), LogEntry);
}
if (InvalidEntries)
@@ -375,22 +385,24 @@ struct ProjectStore::OplogStorage : public RefCounted
m_NextOpsOffset.load());
}
- void ReplayLogEntries(const std::span<OplogEntryAddress> Entries, std::function<void(CbObject)>&& Handler)
+ void ReplayLogEntries(const std::span<OplogEntryAddress> Entries, std::function<void(CbObjectView)>&& Handler)
{
ZEN_TRACE_CPU("Store::OplogStorage::ReplayLogEntries");
BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536);
- IoBuffer OpBuffer(512);
for (const OplogEntryAddress& Entry : Entries)
{
const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign;
- if (OpBuffer.Size() < Entry.Size)
+ MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Size, OpFileOffset);
+ if (OpBufferView.GetSize() == Entry.Size)
{
- OpBuffer = IoBuffer(Entry.Size);
+ Handler(CbObjectView(OpBufferView.GetData()));
+ continue;
}
+ IoBuffer OpBuffer(Entry.Size);
OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset);
- Handler(CbObject(SharedBuffer::MakeView(OpBuffer.Data(), Entry.Size)));
+ Handler(CbObjectView(OpBuffer.Data()));
}
}
@@ -509,7 +521,7 @@ ProjectStore::Oplog::GatherReferences(GcContext& GcCtx)
std::vector<IoHash> Cids;
Cids.reserve(1024);
- IterateOplog([&](CbObject Op) {
+ IterateOplog([&](CbObjectView Op) {
Op.IterateAttachments([&](CbFieldView Visitor) { Cids.emplace_back(Visitor.AsAttachment()); });
if (Cids.size() >= 1024)
{
@@ -649,7 +661,7 @@ ProjectStore::Oplog::ReplayLog()
{
return;
}
- m_Storage->ReplayLog([&](CbObject Op, const OplogEntry& OpEntry) { RegisterOplogEntry(OplogLock, GetMapping(Op), OpEntry); });
+ m_Storage->ReplayLog([&](CbObjectView Op, const OplogEntry& OpEntry) { RegisterOplogEntry(OplogLock, GetMapping(Op), OpEntry); });
}
IoBuffer
@@ -754,7 +766,7 @@ ProjectStore::Oplog::IterateFileMap(
}
void
-ProjectStore::Oplog::IterateOplog(std::function<void(CbObject)>&& Handler)
+ProjectStore::Oplog::IterateOplog(std::function<void(CbObjectView)>&& Handler)
{
RwLock::SharedLockScope _(m_OplogLock);
if (!m_Storage)
@@ -777,11 +789,11 @@ ProjectStore::Oplog::IterateOplog(std::function<void(CbObject)>&& Handler)
return Lhs.Offset < Rhs.Offset;
});
- m_Storage->ReplayLogEntries(Entries, [&](CbObject Op) { Handler(Op); });
+ m_Storage->ReplayLogEntries(Entries, [&](CbObjectView Op) { Handler(Op); });
}
void
-ProjectStore::Oplog::IterateOplogWithKey(std::function<void(int, const Oid&, CbObject)>&& Handler)
+ProjectStore::Oplog::IterateOplogWithKey(std::function<void(int, const Oid&, CbObjectView)>&& Handler)
{
RwLock::SharedLockScope _(m_OplogLock);
if (!m_Storage)
@@ -822,7 +834,7 @@ ProjectStore::Oplog::IterateOplogWithKey(std::function<void(int, const Oid&, CbO
}
size_t EntryIndex = 0;
- m_Storage->ReplayLogEntries(SortedEntries, [&](CbObject Op) {
+ m_Storage->ReplayLogEntries(SortedEntries, [&](CbObjectView Op) {
Handler(LSNs[EntryIndex], Keys[EntryIndex], Op);
EntryIndex++;
});
@@ -926,7 +938,7 @@ ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, Oid Chunk
}
ProjectStore::Oplog::OplogEntryMapping
-ProjectStore::Oplog::GetMapping(CbObject Core)
+ProjectStore::Oplog::GetMapping(CbObjectView Core)
{
using namespace std::literals;
@@ -2748,7 +2760,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
std::vector<CbObject> NewOps;
std::unordered_map<Oid, IoHash, Oid::Hasher> NewChunkMappings;
- Oplog->IterateOplog([&](CbObject Op) {
+ Oplog->IterateOplog([&](CbObjectView Op) {
bool OpRewritten = false;
bool AllOk = true;