diff options
| author | Dan Engelbrecht <[email protected]> | 2025-09-15 17:08:44 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-09-15 17:08:44 +0200 |
| commit | abb1413494537381f2716d69ec459f485f923f71 (patch) | |
| tree | 8dbb797886df2b3a5bfefcec8579f2985fd2c252 /src/zenserver/projectstore/projectstore.cpp | |
| parent | revise exception vs error (#495) (diff) | |
| download | zen-abb1413494537381f2716d69ec459f485f923f71.tar.xz zen-abb1413494537381f2716d69ec459f485f923f71.zip | |
new in-memory storage strategy for oplogs (#490)
- Improvement: Revised project oplog in-memory representation which reduces load times and memory usage
Diffstat (limited to 'src/zenserver/projectstore/projectstore.cpp')
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 1015 |
1 files changed, 598 insertions, 417 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 7cb115110..97175da23 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -388,31 +388,68 @@ namespace { : fmt::format("{}: {}", Result.Reason, Result.Text)}; } + std::pair<int32_t, int32_t> GetPagedRange(int32_t TotalSize, const ProjectStore::Oplog::Paging& EntryPaging) + { + int32_t Start = std::clamp(EntryPaging.Start, 0, TotalSize); + int32_t End = EntryPaging.Count < 0 ? TotalSize : (Start + std::min<int32_t>(EntryPaging.Count, TotalSize - Start)); + return {Start, End}; + } + #pragma pack(push) #pragma pack(1) struct OplogIndexHeader { + OplogIndexHeader() {} + + struct BodyV1 + { + uint64_t LogPosition = 0; + uint32_t LSNCount = 0; + uint64_t KeyCount = 0; + uint32_t OpAddressMapCount = 0; + uint32_t LatestOpMapCount = 0; + uint64_t ChunkMapCount = 0; + uint64_t MetaMapCount = 0; + uint64_t FileMapCount = 0; + }; + + struct BodyV2 + { + uint64_t LogPosition = 0; + uint64_t KeyCount = 0; + uint32_t OpPayloadIndexCount = 0; + uint32_t OpPayloadCount = 0; + uint32_t ChunkMapCount = 0; + uint32_t MetaMapCount = 0; + uint32_t FileMapCount = 0; + uint32_t MaxLSN = 0; + uint32_t NextOpCoreOffset = 0; // note: Multiple of oplog data alignment! + uint32_t Reserved[2] = {0, 0}; + }; + static constexpr uint32_t ExpectedMagic = 0x7569647a; // 'zidx'; - static constexpr uint32_t CurrentVersion = 1; + static constexpr uint32_t Version1 = 1; + static constexpr uint32_t Version2 = 2; + static constexpr uint32_t CurrentVersion = Version2; static constexpr uint64_t DataAlignment = 8; - uint32_t Magic = ExpectedMagic; - uint32_t Version = CurrentVersion; - uint64_t LogPosition = 0; - uint32_t LSNCount = 0; - uint64_t KeyCount = 0; - uint32_t OpAddressMapCount = 0; - uint32_t LatestOpMapCount = 0; - uint64_t ChunkMapCount = 0; - uint64_t MetaMapCount = 0; - uint64_t FileMapCount = 0; - uint32_t Checksum = 0; + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + + union + { + BodyV1 V1; + BodyV2 V2; + }; + + uint32_t Checksum = 0; static uint32_t ComputeChecksum(const OplogIndexHeader& Header) { return XXH32(&Header.Magic, sizeof(OplogIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); } }; + #pragma pack(pop) static_assert(sizeof(OplogIndexHeader) == 64); @@ -545,6 +582,14 @@ struct ProjectStore::OplogStorage : public RefCounted m_NextOpsOffset = RoundUp((NextOpFileOffset.Offset * m_OpsAlign) + NextOpFileOffset.Size, m_OpsAlign); } + void SetMaxLSNAndNextOpOffset(uint32_t MaxLSN, uint32_t NextOpOffset) + { + m_MaxLsn.store(MaxLSN); + m_NextOpsOffset = NextOpOffset * m_OpsAlign; + } + + uint32_t GetNextOpOffset() const { return gsl::narrow<uint32_t>(m_NextOpsOffset / m_OpsAlign); } + enum EMode { Create, @@ -603,35 +648,37 @@ struct ProjectStore::OplogStorage : public RefCounted { ZEN_MEMSCOPE(GetProjectstoreTag()); - const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; - const MemoryView OpBufferView = OpBlobsBuffer.MakeView(LogEntry.OpCoreSize, OpFileOffset); - if (OpBufferView.GetSize() == LogEntry.OpCoreSize) + const uint64_t OpFileOffset = LogEntry.OpCoreAddress.Offset * m_OpsAlign; + const MemoryView OpBufferView = OpBlobsBuffer.MakeView(LogEntry.OpCoreAddress.Size, OpFileOffset); + if (OpBufferView.GetSize() == LogEntry.OpCoreAddress.Size) { return IoBuffer(IoBuffer::Wrap, OpBufferView.GetData(), OpBufferView.GetSize()); } else { - IoBuffer OpBuffer(LogEntry.OpCoreSize); - OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset); + IoBuffer OpBuffer(LogEntry.OpCoreAddress.Size); + OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreAddress.Size, OpFileOffset); return OpBuffer; } } - uint64_t GetEffectiveBlobsSize(std::span<const OplogEntryAddress> Addresses) const + uint64_t GetEffectiveBlobsSize(std::span<const Oplog::OplogPayload> Addresses) const { uint64_t EffectiveSize = 0; - for (const OplogEntryAddress& Address : Addresses) + for (const Oplog::OplogPayload& OpPayload : Addresses) { - EffectiveSize += RoundUp(Address.Size, m_OpsAlign); + EffectiveSize += RoundUp(OpPayload.Address.Size, m_OpsAlign); } return EffectiveSize; } - void Compact( - std::span<const uint32_t> LSNs, - std::function<void(const Oid& OpKeyHash, uint32_t OldLSN, uint32_t NewLSN, const OplogEntryAddress& NewAddress)>&& Callback, - bool RetainLSNs, - bool DryRun) + void Compact(std::span<const ProjectStore::LogSequenceNumber> LSNs, + std::function<void(const Oid& OpKeyHash, + ProjectStore::LogSequenceNumber OldLSN, + ProjectStore::LogSequenceNumber NewLSN, + const OplogEntryAddress& NewAddress)>&& Callback, + bool RetainLSNs, + bool DryRun) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::OplogStorage::Compact"); @@ -665,9 +712,9 @@ struct ProjectStore::OplogStorage : public RefCounted std::vector<OplogEntry> Ops; Ops.reserve(LSNs.size()); - tsl::robin_map<uint32_t, size_t> LSNToIndex; + ProjectStore::LsnMap<size_t> LSNToIndex; LSNToIndex.reserve(LSNs.size()); - for (uint32_t LSN : LSNs) + for (ProjectStore::LogSequenceNumber LSN : LSNs) { LSNToIndex[LSN] = (size_t)-1; } @@ -693,10 +740,10 @@ struct ProjectStore::OplogStorage : public RefCounted SkipEntryCount); std::sort(Ops.begin(), Ops.end(), [&](const OplogEntry& Lhs, const OplogEntry& Rhs) { - return Lhs.OpCoreOffset < Rhs.OpCoreOffset; + return Lhs.OpCoreAddress.Offset < Rhs.OpCoreAddress.Offset; }); - std::vector<uint32_t> OldLSNs; + std::vector<ProjectStore::LogSequenceNumber> OldLSNs; OldLSNs.reserve(Ops.size()); uint64_t OpWriteOffset = 0; @@ -712,15 +759,15 @@ struct ProjectStore::OplogStorage : public RefCounted IoBuffer OpBuffer = GetOpBuffer(OldBlobsBuffer, LogEntry); if (RetainLSNs) { - MaxLSN = Max(MaxLSN, LogEntry.OpLsn); + MaxLSN = Max(MaxLSN, LogEntry.OpLsn.Number); } else { - LogEntry.OpLsn = ++MaxLSN; + LogEntry.OpLsn = ProjectStore::LogSequenceNumber(++MaxLSN); } - LogEntry.OpCoreOffset = gsl::narrow<uint32_t>(OpWriteOffset / m_OpsAlign); - NewOpBlobsBuffer.Write(OpBuffer.GetData(), LogEntry.OpCoreSize, OpWriteOffset); - OpWriteOffset = RoundUp((LogEntry.OpCoreOffset * m_OpsAlign) + LogEntry.OpCoreSize, m_OpsAlign); + LogEntry.OpCoreAddress.Offset = gsl::narrow<uint32_t>(OpWriteOffset / m_OpsAlign); + NewOpBlobsBuffer.Write(OpBuffer.GetData(), LogEntry.OpCoreAddress.Size, OpWriteOffset); + OpWriteOffset = RoundUp((LogEntry.OpCoreAddress.Offset * m_OpsAlign) + LogEntry.OpCoreAddress.Size, m_OpsAlign); } Oplog.Append(Ops); } @@ -768,10 +815,7 @@ struct ProjectStore::OplogStorage : public RefCounted for (size_t Index = 0; Index < Ops.size(); Index++) { const OplogEntry& LogEntry = Ops[Index]; - Callback(LogEntry.OpKeyHash, - OldLSNs[Index], - LogEntry.OpLsn, - OplogEntryAddress{.Offset = LogEntry.OpCoreOffset, .Size = LogEntry.OpCoreSize}); + Callback(LogEntry.OpKeyHash, OldLSNs[Index], LogEntry.OpLsn, LogEntry.OpCoreAddress); } ZEN_INFO("oplog '{}/{}': compact completed in {} - Max LSN# {}, New size: {}, old size {}.", @@ -837,21 +881,21 @@ struct ProjectStore::OplogStorage : public RefCounted return; } } - else if (LogEntry.OpCoreSize == 0) + else if (LogEntry.OpCoreAddress.Size == 0) { ZEN_SCOPED_WARN("skipping zero size op {}", LogEntry.OpKeyHash); ++OutInvalidEntries; return; } - else if (LogEntry.OpLsn == 0) + else if (LogEntry.OpLsn.Number == 0) { ZEN_SCOPED_WARN("skipping zero lsn op {}", LogEntry.OpKeyHash); ++OutInvalidEntries; return; } - const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; - if ((OpFileOffset + LogEntry.OpCoreSize) > OpsBlockSize) + const uint64_t OpFileOffset = LogEntry.OpCoreAddress.Offset * m_OpsAlign; + if ((OpFileOffset + LogEntry.OpCoreAddress.Size) > OpsBlockSize) { ZEN_SCOPED_WARN("skipping out of bounds op {}", LogEntry.OpKeyHash); ++OutInvalidEntries; @@ -905,7 +949,7 @@ struct ProjectStore::OplogStorage : public RefCounted uint64_t NextOpFileOffset = m_NextOpsOffset; std::sort(OpLogEntries.begin(), OpLogEntries.end(), [&](const OplogEntry& Lhs, const OplogEntry& Rhs) { - return Lhs.OpCoreOffset < Rhs.OpCoreOffset; + return Lhs.OpCoreAddress.Offset < Rhs.OpCoreAddress.Offset; }); for (const OplogEntry& LogEntry : OpLogEntries) @@ -921,7 +965,7 @@ struct ProjectStore::OplogStorage : public RefCounted // Verify checksum, ignore op data if incorrect const uint32_t ExpectedOpCoreHash = LogEntry.OpCoreHash; - const uint32_t OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.GetData(), LogEntry.OpCoreSize) & 0xffffFFFF); + const uint32_t OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.GetData(), LogEntry.OpCoreAddress.Size) & 0xffffFFFF); if (OpCoreHash != ExpectedOpCoreHash) { @@ -944,9 +988,10 @@ struct ProjectStore::OplogStorage : public RefCounted else { Handler(CbObjectView(OpBuffer.GetData()), LogEntry); - MaxOpLsn = Max(MaxOpLsn, LogEntry.OpLsn); - const uint64_t EntryNextOpFileOffset = RoundUp((LogEntry.OpCoreOffset * m_OpsAlign) + LogEntry.OpCoreSize, m_OpsAlign); - NextOpFileOffset = Max(NextOpFileOffset, EntryNextOpFileOffset); + MaxOpLsn = Max(MaxOpLsn, LogEntry.OpLsn.Number); + const uint64_t EntryNextOpFileOffset = + RoundUp((LogEntry.OpCoreAddress.Offset * m_OpsAlign) + LogEntry.OpCoreAddress.Size, m_OpsAlign); + NextOpFileOffset = Max(NextOpFileOffset, EntryNextOpFileOffset); } } } @@ -965,25 +1010,29 @@ struct ProjectStore::OplogStorage : public RefCounted InvalidEntries); } - void ReplayLogEntries(const std::span<OplogEntryAddress> Entries, std::function<void(CbObjectView)>&& Handler) + void ReplayLogEntries(const std::span<const Oplog::OplogPayload> Entries, + const std::span<const Oplog::PayloadIndex> Order, + std::function<void(LogSequenceNumber Lsn, CbObjectView)>&& Handler) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::OplogStorage::ReplayLogEntries"); BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536); - for (const OplogEntryAddress& Entry : Entries) + for (ProjectStore::Oplog::PayloadIndex EntryOffset : Order) { - const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign; - MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Size, OpFileOffset); - if (OpBufferView.GetSize() == Entry.Size) + const Oplog::OplogPayload& Entry = Entries[EntryOffset]; + + const uint64_t OpFileOffset = Entry.Address.Offset * m_OpsAlign; + MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset); + if (OpBufferView.GetSize() == Entry.Address.Size) { - Handler(CbObjectView(OpBufferView.GetData())); + Handler(Entry.Lsn, CbObjectView(OpBufferView.GetData())); continue; } - IoBuffer OpBuffer(Entry.Size); - OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset); - Handler(CbObjectView(OpBuffer.Data())); + IoBuffer OpBuffer(Entry.Address.Size); + OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset); + Handler(Entry.Lsn, CbObjectView(OpBuffer.Data())); } } @@ -1034,8 +1083,8 @@ struct ProjectStore::OplogStorage : public RefCounted RwLock::ExclusiveLockScope Lock(m_RwLock); const uint64_t WriteOffset = m_NextOpsOffset; - const uint32_t OpLsn = ++m_MaxLsn; - if (OpLsn == std::numeric_limits<uint32_t>::max()) + const LogSequenceNumber OpLsn(++m_MaxLsn); + if (!OpLsn.Number) { ZEN_ERROR("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId()); throw std::runtime_error(fmt::format("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId())); @@ -1045,11 +1094,10 @@ struct ProjectStore::OplogStorage : public RefCounted ZEN_ASSERT(IsMultipleOf(WriteOffset, m_OpsAlign)); - OplogEntry Entry = {.OpLsn = OpLsn, - .OpCoreOffset = gsl::narrow_cast<uint32_t>(WriteOffset / m_OpsAlign), - .OpCoreSize = uint32_t(WriteSize), - .OpCoreHash = OpData.OpCoreHash, - .OpKeyHash = OpData.KeyHash}; + OplogEntry Entry = {.OpLsn = OpLsn, + .OpCoreAddress = {.Offset = gsl::narrow_cast<uint32_t>(WriteOffset / m_OpsAlign), .Size = uint32_t(WriteSize)}, + .OpCoreHash = OpData.OpCoreHash, + .OpKeyHash = OpData.KeyHash}; m_OpBlobs.Write(OpData.Buffer.GetData(), WriteSize, WriteOffset); m_Oplog.Append(Entry); @@ -1064,7 +1112,7 @@ struct ProjectStore::OplogStorage : public RefCounted size_t OpCount = Ops.size(); std::vector<std::pair<uint64_t, uint64_t>> OffsetAndSizes; - std::vector<uint32_t> OpLsns; + std::vector<LogSequenceNumber> OpLsns; OffsetAndSizes.resize(OpCount); OpLsns.resize(OpCount); @@ -1083,8 +1131,8 @@ struct ProjectStore::OplogStorage : public RefCounted for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) { OffsetAndSizes[OpIndex].first = WriteOffset - WriteStart; - OpLsns[OpIndex] = ++m_MaxLsn; - if (OpLsns[OpIndex] == std::numeric_limits<uint32_t>::max()) + OpLsns[OpIndex] = LogSequenceNumber(++m_MaxLsn); + if (!OpLsns[OpIndex]) { ZEN_ERROR("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId()); throw std::runtime_error(fmt::format("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId())); @@ -1103,11 +1151,12 @@ struct ProjectStore::OplogStorage : public RefCounted { MutableMemoryView WriteBufferView = WriteBuffer.GetMutableView().RightChop(OffsetAndSizes[OpIndex].first); WriteBufferView.CopyFrom(Ops[OpIndex].Buffer); - Entries[OpIndex] = {.OpLsn = OpLsns[OpIndex], - .OpCoreOffset = gsl::narrow_cast<uint32_t>((WriteStart + OffsetAndSizes[OpIndex].first) / m_OpsAlign), - .OpCoreSize = uint32_t(OffsetAndSizes[OpIndex].second), - .OpCoreHash = Ops[OpIndex].OpCoreHash, - .OpKeyHash = Ops[OpIndex].KeyHash}; + Entries[OpIndex] = { + .OpLsn = OpLsns[OpIndex], + .OpCoreAddress = {.Offset = gsl::narrow_cast<uint32_t>((WriteStart + OffsetAndSizes[OpIndex].first) / m_OpsAlign), + .Size = uint32_t(OffsetAndSizes[OpIndex].second)}, + .OpCoreHash = Ops[OpIndex].OpCoreHash, + .OpKeyHash = Ops[OpIndex].KeyHash}; } m_OpBlobs.Write(WriteBuffer.GetData(), WriteBuffer.GetSize(), WriteStart); @@ -1229,7 +1278,7 @@ ProjectStore::Oplog::Flush() } uint64_t LogCount = m_Storage->LogCount(); - if (m_LogFlushPosition != LogCount) + if (m_LogFlushPosition != LogCount || m_IsLegacySnapshot) { WriteIndexSnapshot(); } @@ -1237,69 +1286,101 @@ ProjectStore::Oplog::Flush() } void +ProjectStore::Oplog::RefreshLsnToPayloadOffsetMap(RwLock::ExclusiveLockScope&) +{ + if (!m_LsnToPayloadOffsetMap) + { + m_LsnToPayloadOffsetMap = std::make_unique<LsnMap<PayloadIndex>>(); + m_LsnToPayloadOffsetMap->reserve(m_OpLogPayloads.size()); + for (uint32_t PayloadOffset = 0; PayloadOffset < m_OpLogPayloads.size(); PayloadOffset++) + { + const OplogPayload& Payload = m_OpLogPayloads[PayloadOffset]; + if (Payload.Lsn.Number != 0 && Payload.Address.Size != 0) + { + m_LsnToPayloadOffsetMap->insert_or_assign(Payload.Lsn, PayloadIndex(PayloadOffset)); + } + } + } +} + +void ProjectStore::Oplog::Scrub(ScrubContext& Ctx) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_ASSERT(m_Mode == EMode::kFull); - std::vector<Oid> BadEntryKeys; + std::vector<std::pair<LogSequenceNumber, Oid>> BadEntries; using namespace std::literals; - IterateOplogWithKey([&](uint32_t Lsn, const Oid& Key, CbObjectView Op) { - ZEN_UNUSED(Lsn); - - std::vector<IoHash> Cids; - Op.IterateAttachments([&](CbFieldView Visitor) { Cids.emplace_back(Visitor.AsAttachment()); }); - + IterateOplogWithKey([&](LogSequenceNumber Lsn, const Oid& Key, CbObjectView Op) { { const Oid KeyHash = ComputeOpKey(Op); - - ZEN_ASSERT_FORMAT(KeyHash == Key, "oplog data does not match information from index (op:{} != index:{})", KeyHash, Key); + if (KeyHash != Key) + { + BadEntries.push_back({Lsn, Key}); + ZEN_WARN("Scrub: oplog data does not match information from index (op:{} != index:{})", KeyHash, Key); + return; + } } - for (const IoHash& Cid : Cids) - { - if (!m_CidStore.ContainsChunk(Cid)) + // TODO: Should we really delete an Op because it points to a missing or malformed Cid chunk? + + Op.IterateAttachments([&](CbFieldView Visitor) { + const IoHash Cid = Visitor.AsAttachment(); + if (Ctx.IsBadCid(Cid)) { - // oplog entry references a CAS chunk which is not - // present - BadEntryKeys.push_back(Key); + // oplog entry references a CAS chunk which has been flagged as bad + BadEntries.push_back({Lsn, Key}); return; } - if (Ctx.IsBadCid(Cid)) + if (!m_CidStore.ContainsChunk(Cid)) { - // oplog entry references a CAS chunk which has been - // flagged as bad - BadEntryKeys.push_back(Key); + // oplog entry references a CAS chunk which is not present + BadEntries.push_back({Lsn, Key}); return; } - } + }); }); - if (!BadEntryKeys.empty()) + if (!BadEntries.empty()) { if (Ctx.RunRecovery()) { ZEN_WARN("oplog '{}/{}': scrubbing found {} bad ops in oplog @ '{}', these will be removed from the index", m_OuterProjectId, m_OplogId, - BadEntryKeys.size(), + BadEntries.size(), m_BasePath); // Actually perform some clean-up - RwLock::ExclusiveLockScope _(m_OplogLock); - for (const auto& Key : BadEntryKeys) + RwLock::ExclusiveLockScope Lock(m_OplogLock); + + RefreshLsnToPayloadOffsetMap(Lock); + + for (const auto& BadEntry : BadEntries) { - if (auto It = m_LatestOpMap.find(Key); It != m_LatestOpMap.end()) + const LogSequenceNumber BadLsn = BadEntry.first; + const Oid& BadKey = BadEntry.second; + if (auto It = m_OpToPayloadOffsetMap.find(BadKey); It != m_OpToPayloadOffsetMap.end()) + { + OplogPayload& Payload = m_OpLogPayloads[It->second]; + if (Payload.Lsn == BadLsn) + { + m_OpToPayloadOffsetMap.erase(It); + m_Storage->AppendTombstone(BadKey); + } + } + if (auto LsnIt = m_LsnToPayloadOffsetMap->find(BadLsn); LsnIt != m_LsnToPayloadOffsetMap->end()) { - m_OpAddressMap.erase(It->second); - m_LatestOpMap.erase(It); + const PayloadIndex LsnPayloadOffset = LsnIt->second; + OplogPayload& LsnPayload = m_OpLogPayloads[LsnPayloadOffset]; + LsnPayload = {.Lsn = LogSequenceNumber(0), .Address = {.Offset = 0, .Size = 0}}; } - m_Storage->AppendTombstone(Key); + m_LsnToPayloadOffsetMap->erase(BadLsn); } - if (!BadEntryKeys.empty()) + if (!BadEntries.empty()) { m_MetaValid = false; } @@ -1309,7 +1390,7 @@ ProjectStore::Oplog::Scrub(ScrubContext& Ctx) ZEN_WARN("oplog '{}/{}': scrubbing found {} bad ops in oplog @ '{}' but no cleanup will be performed", m_OuterProjectId, m_OplogId, - BadEntryKeys.size(), + BadEntries.size(), m_BasePath); } } @@ -1355,8 +1436,9 @@ ProjectStore::Oplog::ResetState() m_ChunkMap.clear(); m_MetaMap.clear(); m_FileMap.clear(); - m_OpAddressMap.clear(); - m_LatestOpMap.clear(); + m_OpToPayloadOffsetMap.clear(); + m_OpLogPayloads.clear(); + m_LsnToPayloadOffsetMap.reset(); m_Storage = {}; } @@ -1367,14 +1449,15 @@ ProjectStore::Oplog::PrepareForDelete(std::filesystem::path& OutRemoveDirectory) RwLock::ExclusiveLockScope _(m_OplogLock); m_UpdateCaptureRefCounter = 0; - m_CapturedLSNs.reset(); + m_CapturedOps.reset(); m_CapturedAttachments.reset(); m_PendingPrepOpAttachments.clear(); m_ChunkMap.clear(); m_MetaMap.clear(); m_FileMap.clear(); - m_OpAddressMap.clear(); - m_LatestOpMap.clear(); + m_OpToPayloadOffsetMap.clear(); + m_OpLogPayloads.clear(); + m_LsnToPayloadOffsetMap.reset(); m_Storage = {}; if (PrepareDirectoryDelete(m_BasePath, OutRemoveDirectory)) { @@ -1430,6 +1513,8 @@ ProjectStore::Oplog::Read() RemoveFile(m_MetaPath, DummyEc); } + ZEN_ASSERT(!m_LsnToPayloadOffsetMap); + ReadIndexSnapshot(); if (m_Mode == EMode::kFull) @@ -1459,12 +1544,14 @@ ProjectStore::Oplog::Read() m_MetaMap.insert_or_assign(Meta.Id, Meta.Hash); } - m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize}); - m_LatestOpMap[OpEntry.OpKeyHash] = OpEntry.OpLsn; + const PayloadIndex PayloadOffset(m_OpLogPayloads.size()); + + m_OpToPayloadOffsetMap.insert_or_assign(OpEntry.OpKeyHash, PayloadOffset); + m_OpLogPayloads.push_back({.Lsn = OpEntry.OpLsn, .Address = OpEntry.OpCoreAddress}); }, m_LogFlushPosition); - if (m_Storage->LogCount() != m_LogFlushPosition) + if (m_Storage->LogCount() != m_LogFlushPosition || m_IsLegacySnapshot) { WriteIndexSnapshot(); } @@ -1476,8 +1563,10 @@ ProjectStore::Oplog::Read() m_Storage->ReadOplogEntriesFromLog(OpLogEntries, InvalidEntries, m_LogFlushPosition); for (const OplogEntry& OpEntry : OpLogEntries) { - m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize}); - m_LatestOpMap[OpEntry.OpKeyHash] = OpEntry.OpLsn; + const PayloadIndex PayloadOffset(m_OpLogPayloads.size()); + + m_OpToPayloadOffsetMap.insert_or_assign(OpEntry.OpKeyHash, PayloadOffset); + m_OpLogPayloads.push_back({.Lsn = OpEntry.OpLsn, .Address = OpEntry.OpCoreAddress}); } } } @@ -1536,8 +1625,9 @@ ProjectStore::Oplog::Reset() m_ChunkMap.clear(); m_MetaMap.clear(); m_FileMap.clear(); - m_OpAddressMap.clear(); - m_LatestOpMap.clear(); + m_OpToPayloadOffsetMap.clear(); + m_OpLogPayloads.clear(); + m_LsnToPayloadOffsetMap.reset(); m_Storage = new OplogStorage(this, m_BasePath); m_Storage->Open(OplogStorage::EMode::Create); m_MetaValid = false; @@ -1628,7 +1718,7 @@ ProjectStore::Oplog::Validate(const std::filesystem::path& ProjectRootDir, Keys.reserve(OpCount); Mappings.reserve(OpCount); - IterateOplogWithKey([&](uint32_t LSN, const Oid& Key, CbObjectView OpView) { + IterateOplogWithKey([&](LogSequenceNumber LSN, const Oid& Key, CbObjectView OpView) { Result.LSNLow = Min(Result.LSNLow, LSN); Result.LSNHigh = Max(Result.LSNHigh, LSN); KeyHashes.push_back(Key); @@ -1774,34 +1864,21 @@ ProjectStore::Oplog::WriteIndexSnapshot() const fs::path IndexPath = m_BasePath / "ops.zidx"; try { - // Write the current state of the location map to a new index state - std::vector<uint32_t> LSNEntries; - std::vector<Oid> Keys; - std::vector<OplogEntryAddress> AddressMapEntries; - std::vector<uint32_t> LatestOpMapEntries; - std::vector<IoHash> ChunkMapEntries; - std::vector<IoHash> MetaMapEntries; - std::vector<uint32_t> FilePathLengths; - std::vector<std::string> FilePaths; - uint64_t IndexLogPosition = 0; + std::vector<Oid> Keys; + std::vector<PayloadIndex> OpPayloadOffsets; + std::vector<IoHash> ChunkMapEntries; + std::vector<IoHash> MetaMapEntries; + std::vector<uint32_t> FilePathLengths; + std::vector<std::string> FilePaths; + uint64_t IndexLogPosition = 0; { IndexLogPosition = m_Storage->LogCount(); - - Keys.reserve(m_LatestOpMap.size() + m_ChunkMap.size() + m_MetaMap.size() + m_FileMap.size()); - - AddressMapEntries.reserve(m_OpAddressMap.size()); - LSNEntries.reserve(m_OpAddressMap.size()); - for (const auto& It : m_OpAddressMap) + Keys.reserve(m_OpToPayloadOffsetMap.size() + m_ChunkMap.size() + m_MetaMap.size() + m_FileMap.size()); + OpPayloadOffsets.reserve(m_OpToPayloadOffsetMap.size()); + for (const auto& Kv : m_OpToPayloadOffsetMap) { - LSNEntries.push_back(It.first); - AddressMapEntries.push_back(It.second); - } - - LatestOpMapEntries.reserve(m_LatestOpMap.size()); - for (const auto& It : m_LatestOpMap) - { - Keys.push_back(It.first); - LatestOpMapEntries.push_back(It.second); + Keys.push_back(Kv.first); + OpPayloadOffsets.push_back(Kv.second); } ChunkMapEntries.reserve(m_ChunkMap.size()); @@ -1841,14 +1918,16 @@ ProjectStore::Oplog::WriteIndexSnapshot() { BasicFileWriter IndexFile(ObjectIndexFile, 65536u); - OplogIndexHeader Header = {.LogPosition = IndexLogPosition, - .LSNCount = gsl::narrow<uint32_t>(LSNEntries.size()), - .KeyCount = gsl::narrow<uint64_t>(Keys.size()), - .OpAddressMapCount = gsl::narrow<uint32_t>(AddressMapEntries.size()), - .LatestOpMapCount = gsl::narrow<uint32_t>(LatestOpMapEntries.size()), - .ChunkMapCount = gsl::narrow<uint64_t>(ChunkMapEntries.size()), - .MetaMapCount = gsl::narrow<uint64_t>(MetaMapEntries.size()), - .FileMapCount = gsl::narrow<uint64_t>(FilePathLengths.size() / 2)}; + OplogIndexHeader Header; + Header.V2 = {.LogPosition = IndexLogPosition, + .KeyCount = gsl::narrow<uint64_t>(Keys.size()), + .OpPayloadIndexCount = gsl::narrow<uint32_t>(OpPayloadOffsets.size()), + .OpPayloadCount = gsl::narrow<uint32_t>(m_OpLogPayloads.size()), + .ChunkMapCount = gsl::narrow<uint32_t>(ChunkMapEntries.size()), + .MetaMapCount = gsl::narrow<uint32_t>(MetaMapEntries.size()), + .FileMapCount = gsl::narrow<uint32_t>(FilePathLengths.size() / 2), + .MaxLSN = m_Storage->MaxLSN(), + .NextOpCoreOffset = m_Storage->GetNextOpOffset()}; Header.Checksum = OplogIndexHeader::ComputeChecksum(Header); @@ -1856,16 +1935,13 @@ ProjectStore::Oplog::WriteIndexSnapshot() IndexFile.Write(&Header, sizeof(OplogIndexHeader), Offset); Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); - IndexFile.Write(LSNEntries.data(), LSNEntries.size() * sizeof(uint32_t), Offset); - Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); - IndexFile.Write(Keys.data(), Keys.size() * sizeof(Oid), Offset); Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); - IndexFile.Write(AddressMapEntries.data(), AddressMapEntries.size() * sizeof(OplogEntryAddress), Offset); + IndexFile.Write(OpPayloadOffsets.data(), OpPayloadOffsets.size() * sizeof(PayloadIndex), Offset); Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); - IndexFile.Write(LatestOpMapEntries.data(), LatestOpMapEntries.size() * sizeof(uint32_t), Offset); + IndexFile.Write(m_OpLogPayloads.data(), m_OpLogPayloads.size() * sizeof(OplogPayload), Offset); Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); IndexFile.Write(ChunkMapEntries.data(), ChunkMapEntries.size() * sizeof(IoHash), Offset); @@ -1883,6 +1959,7 @@ ProjectStore::Oplog::WriteIndexSnapshot() Offset += FilePath.length(); } } + ObjectIndexFile.Flush(); ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec); if (Ec) @@ -1893,8 +1970,9 @@ ProjectStore::Oplog::WriteIndexSnapshot() IndexPath, Ec.message())); } - EntryCount = LSNEntries.size(); + EntryCount = m_OpLogPayloads.size(); m_LogFlushPosition = IndexLogPosition; + m_IsLegacySnapshot = false; } catch (const std::exception& Err) { @@ -1935,12 +2013,8 @@ ProjectStore::Oplog::ReadIndexSnapshot() Offset += sizeof(OplogIndexHeader); Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); - if ((Header.Magic == OplogIndexHeader::ExpectedMagic) && (Header.Version == OplogIndexHeader::CurrentVersion) && - (Header.Checksum == OplogIndexHeader::ComputeChecksum(Header))) + if (Header.Magic == OplogIndexHeader::ExpectedMagic) { - uint32_t MaxLSN = 0; - OplogEntryAddress LastOpAddress{.Offset = 0, .Size = 0}; - uint32_t Checksum = OplogIndexHeader::ComputeChecksum(Header); if (Header.Checksum != Checksum) { @@ -1953,117 +2027,229 @@ ProjectStore::Oplog::ReadIndexSnapshot() return; } - if (Header.LatestOpMapCount + Header.ChunkMapCount + Header.MetaMapCount + Header.FileMapCount != Header.KeyCount) + if (Header.Version == OplogIndexHeader::Version1) { - ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'. Key count mismatch. Expected: {}, Found: {}", - m_OuterProjectId, - m_OplogId, - IndexPath, - Header.LatestOpMapCount + Header.ChunkMapCount + Header.MetaMapCount + Header.FileMapCount, - Header.KeyCount); - return; - } + uint32_t MaxLSN = 0; + OplogEntryAddress LastOpAddress{.Offset = 0, .Size = 0}; - std::vector<uint32_t> LSNEntries(Header.LSNCount); - ObjectIndexFile.Read(LSNEntries.data(), LSNEntries.size() * sizeof(uint32_t), Offset); - Offset += LSNEntries.size() * sizeof(uint32_t); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); - size_t LSNOffset = 0; + if (Header.V1.LatestOpMapCount + Header.V1.ChunkMapCount + Header.V1.MetaMapCount + Header.V1.FileMapCount != + Header.V1.KeyCount) + { + ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'. Key count mismatch. Expected: {}, Found: {}", + m_OuterProjectId, + m_OplogId, + IndexPath, + Header.V1.LatestOpMapCount + Header.V1.ChunkMapCount + Header.V1.MetaMapCount + Header.V1.FileMapCount, + Header.V1.KeyCount); + return; + } - std::vector<Oid> Keys(Header.KeyCount); - ObjectIndexFile.Read(Keys.data(), Keys.size() * sizeof(Oid), Offset); - Offset += Keys.size() * sizeof(Oid); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); - size_t KeyOffset = 0; + std::vector<uint32_t> LSNEntries(Header.V1.LSNCount); + ObjectIndexFile.Read(LSNEntries.data(), LSNEntries.size() * sizeof(uint32_t), Offset); + Offset += LSNEntries.size() * sizeof(uint32_t); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + size_t LSNOffset = 0; + + std::vector<Oid> Keys(Header.V1.KeyCount); + ObjectIndexFile.Read(Keys.data(), Keys.size() * sizeof(Oid), Offset); + Offset += Keys.size() * sizeof(Oid); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + size_t KeyOffset = 0; - { - std::vector<OplogEntryAddress> AddressMapEntries(Header.OpAddressMapCount); - ObjectIndexFile.Read(AddressMapEntries.data(), AddressMapEntries.size() * sizeof(OplogEntryAddress), Offset); - Offset += AddressMapEntries.size() * sizeof(OplogEntryAddress); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); - m_OpAddressMap.reserve(AddressMapEntries.size()); - for (const OplogEntryAddress& Address : AddressMapEntries) { - m_OpAddressMap.insert_or_assign(LSNEntries[LSNOffset++], Address); - if (Address.Offset > LastOpAddress.Offset) + std::vector<OplogEntryAddress> AddressMapEntries(Header.V1.OpAddressMapCount); + ObjectIndexFile.Read(AddressMapEntries.data(), AddressMapEntries.size() * sizeof(OplogEntryAddress), Offset); + Offset += AddressMapEntries.size() * sizeof(OplogEntryAddress); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + + tsl::robin_map<uint32_t, PayloadIndex> LsnToPayloadOffset; + LsnToPayloadOffset.reserve(AddressMapEntries.size()); + + m_OpLogPayloads.reserve(AddressMapEntries.size()); + for (const OplogEntryAddress& Address : AddressMapEntries) { - LastOpAddress = Address; + const uint32_t LSN = LSNEntries[LSNOffset++]; + LsnToPayloadOffset.insert_or_assign(LSN, PayloadIndex(m_OpLogPayloads.size())); + + m_OpLogPayloads.push_back({.Lsn = LogSequenceNumber(LSN), .Address = Address}); + if (Address.Offset > LastOpAddress.Offset) + { + LastOpAddress = Address; + } + MaxLSN = Max(MaxLSN, LSN); + } + + { + std::vector<uint32_t> LatestOpMapEntries(Header.V1.LatestOpMapCount); + ObjectIndexFile.Read(LatestOpMapEntries.data(), LatestOpMapEntries.size() * sizeof(uint32_t), Offset); + Offset += LatestOpMapEntries.size() * sizeof(uint32_t); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + m_OpToPayloadOffsetMap.reserve(LatestOpMapEntries.size()); + for (uint32_t Lsn : LatestOpMapEntries) + { + if (auto It = LsnToPayloadOffset.find(Lsn); It != LsnToPayloadOffset.end()) + { + m_OpToPayloadOffsetMap.insert_or_assign(Keys[KeyOffset++], It->second); + MaxLSN = Max(MaxLSN, Lsn); + } + } } } - } - { - std::vector<uint32_t> LatestOpMapEntries(Header.LatestOpMapCount); - ObjectIndexFile.Read(LatestOpMapEntries.data(), LatestOpMapEntries.size() * sizeof(uint32_t), Offset); - Offset += LatestOpMapEntries.size() * sizeof(uint32_t); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); - m_LatestOpMap.reserve(LatestOpMapEntries.size()); - for (uint32_t LSN : LatestOpMapEntries) + if (m_Mode == EMode::kFull) { - if (!m_OpAddressMap.contains(LSN)) { - throw std::runtime_error(fmt::format("LSN {} is no present in Op address map", LSN)); + std::vector<IoHash> ChunkMapEntries(Header.V1.ChunkMapCount); + ObjectIndexFile.Read(ChunkMapEntries.data(), ChunkMapEntries.size() * sizeof(IoHash), Offset); + Offset += ChunkMapEntries.size() * sizeof(IoHash); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + m_ChunkMap.reserve(ChunkMapEntries.size()); + for (const IoHash& ChunkId : ChunkMapEntries) + { + m_ChunkMap.insert_or_assign(Keys[KeyOffset++], ChunkId); + } + } + { + std::vector<IoHash> MetaMapEntries(Header.V1.MetaMapCount); + ObjectIndexFile.Read(MetaMapEntries.data(), MetaMapEntries.size() * sizeof(IoHash), Offset); + Offset += MetaMapEntries.size() * sizeof(IoHash); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + m_MetaMap.reserve(MetaMapEntries.size()); + for (const IoHash& ChunkId : MetaMapEntries) + { + m_MetaMap.insert_or_assign(Keys[KeyOffset++], ChunkId); + } + } + { + m_FileMap.reserve(Header.V1.FileMapCount); + + std::vector<uint32_t> FilePathLengths(Header.V1.FileMapCount * 2); + ObjectIndexFile.Read(FilePathLengths.data(), FilePathLengths.size() * sizeof(uint32_t), Offset); + Offset += FilePathLengths.size() * sizeof(uint32_t); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + + BasicFileBuffer IndexFile(ObjectIndexFile, 65536); + auto ReadString([&IndexFile, &Offset](uint32_t Length) -> std::string { + MemoryView StringData = IndexFile.MakeView(Length, Offset); + if (StringData.GetSize() != Length) + { + throw std::runtime_error(fmt::format("Invalid format. Expected to read {} bytes but got {}", + Length, + uint32_t(StringData.GetSize()))); + } + Offset += Length; + return std::string((const char*)StringData.GetData(), Length); + }); + for (uint64_t FileLengthOffset = 0; FileLengthOffset < FilePathLengths.size();) + { + std::string ServerPath = ReadString(FilePathLengths[FileLengthOffset++]); + std::string ClientPath = ReadString(FilePathLengths[FileLengthOffset++]); + m_FileMap.insert_or_assign( + Keys[KeyOffset++], + FileMapEntry{.ServerPath = std::move(ServerPath), .ClientPath = std::move(ClientPath)}); + } } - m_LatestOpMap.insert_or_assign(Keys[KeyOffset++], LSN); - MaxLSN = Max(MaxLSN, LSN); } + m_LogFlushPosition = Header.V1.LogPosition; + m_Storage->SetMaxLSNAndNextWriteAddress(MaxLSN, LastOpAddress); + EntryCount = Header.V1.LSNCount; + m_IsLegacySnapshot = true; } - if (m_Mode == EMode::kFull) + else if (Header.Version == OplogIndexHeader::Version2) { + std::vector<Oid> Keys(Header.V2.KeyCount); + + ObjectIndexFile.Read(Keys.data(), Keys.size() * sizeof(Oid), Offset); + Offset += Keys.size() * sizeof(Oid); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + + std::vector<PayloadIndex> OpPayloadOffsets(Header.V2.OpPayloadIndexCount); + ObjectIndexFile.Read(OpPayloadOffsets.data(), OpPayloadOffsets.size() * sizeof(PayloadIndex), Offset); + Offset += OpPayloadOffsets.size() * sizeof(PayloadIndex); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + + m_OpLogPayloads.resize(Header.V2.OpPayloadCount); + ObjectIndexFile.Read(m_OpLogPayloads.data(), m_OpLogPayloads.size() * sizeof(OplogPayload), Offset); + Offset += m_OpLogPayloads.size() * sizeof(OplogPayload); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + + size_t KeyOffset = 0; + + m_OpToPayloadOffsetMap.reserve(Header.V2.OpPayloadIndexCount); + for (const PayloadIndex PayloadOffset : OpPayloadOffsets) { - std::vector<IoHash> ChunkMapEntries(Header.ChunkMapCount); - ObjectIndexFile.Read(ChunkMapEntries.data(), ChunkMapEntries.size() * sizeof(IoHash), Offset); - Offset += ChunkMapEntries.size() * sizeof(IoHash); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); - m_ChunkMap.reserve(ChunkMapEntries.size()); - for (const IoHash& ChunkId : ChunkMapEntries) - { - m_ChunkMap.insert_or_assign(Keys[KeyOffset++], ChunkId); - } + const Oid& Key = Keys[KeyOffset++]; + ZEN_ASSERT_SLOW(PayloadOffset.Index < Header.V2.OpPayloadCount); + m_OpToPayloadOffsetMap.insert({Key, PayloadOffset}); } + + if (m_Mode == EMode::kFull) { - std::vector<IoHash> MetaMapEntries(Header.MetaMapCount); - ObjectIndexFile.Read(MetaMapEntries.data(), MetaMapEntries.size() * sizeof(IoHash), Offset); - Offset += MetaMapEntries.size() * sizeof(IoHash); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); - m_MetaMap.reserve(MetaMapEntries.size()); - for (const IoHash& ChunkId : MetaMapEntries) { - m_MetaMap.insert_or_assign(Keys[KeyOffset++], ChunkId); + std::vector<IoHash> ChunkMapEntries(Header.V2.ChunkMapCount); + ObjectIndexFile.Read(ChunkMapEntries.data(), ChunkMapEntries.size() * sizeof(IoHash), Offset); + Offset += ChunkMapEntries.size() * sizeof(IoHash); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + m_ChunkMap.reserve(ChunkMapEntries.size()); + for (const IoHash& ChunkId : ChunkMapEntries) + { + m_ChunkMap.insert_or_assign(Keys[KeyOffset++], ChunkId); + } } - } - { - m_FileMap.reserve(Header.FileMapCount); - - std::vector<uint32_t> FilePathLengths(Header.FileMapCount * 2); - ObjectIndexFile.Read(FilePathLengths.data(), FilePathLengths.size() * sizeof(uint32_t), Offset); - Offset += FilePathLengths.size() * sizeof(uint32_t); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); - - BasicFileBuffer IndexFile(ObjectIndexFile, 65536); - auto ReadString([&IndexFile, &Offset](uint32_t Length) -> std::string { - MemoryView StringData = IndexFile.MakeView(Length, Offset); - if (StringData.GetSize() != Length) - { - throw std::runtime_error(fmt::format("Invalid format. Expected to read {} bytes but got {}", - Length, - uint32_t(StringData.GetSize()))); - } - Offset += Length; - return std::string((const char*)StringData.GetData(), Length); - }); - for (uint64_t FileLengthOffset = 0; FileLengthOffset < FilePathLengths.size();) { - std::string ServerPath = ReadString(FilePathLengths[FileLengthOffset++]); - std::string ClientPath = ReadString(FilePathLengths[FileLengthOffset++]); - m_FileMap.insert_or_assign( - Keys[KeyOffset++], - FileMapEntry{.ServerPath = std::move(ServerPath), .ClientPath = std::move(ClientPath)}); + std::vector<IoHash> MetaMapEntries(Header.V2.MetaMapCount); + ObjectIndexFile.Read(MetaMapEntries.data(), MetaMapEntries.size() * sizeof(IoHash), Offset); + Offset += MetaMapEntries.size() * sizeof(IoHash); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + m_MetaMap.reserve(MetaMapEntries.size()); + for (const IoHash& ChunkId : MetaMapEntries) + { + m_MetaMap.insert_or_assign(Keys[KeyOffset++], ChunkId); + } + } + { + m_FileMap.reserve(Header.V2.FileMapCount); + + std::vector<uint32_t> FilePathLengths(Header.V2.FileMapCount * 2); + ObjectIndexFile.Read(FilePathLengths.data(), FilePathLengths.size() * sizeof(uint32_t), Offset); + Offset += FilePathLengths.size() * sizeof(uint32_t); + Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + + BasicFileBuffer IndexFile(ObjectIndexFile, 65536); + auto ReadString([&IndexFile, &Offset](uint32_t Length) -> std::string { + MemoryView StringData = IndexFile.MakeView(Length, Offset); + if (StringData.GetSize() != Length) + { + throw std::runtime_error(fmt::format("Invalid format. Expected to read {} bytes but got {}", + Length, + uint32_t(StringData.GetSize()))); + } + Offset += Length; + return std::string((const char*)StringData.GetData(), Length); + }); + for (uint64_t FileLengthOffset = 0; FileLengthOffset < FilePathLengths.size();) + { + std::string ServerPath = ReadString(FilePathLengths[FileLengthOffset++]); + std::string ClientPath = ReadString(FilePathLengths[FileLengthOffset++]); + m_FileMap.insert_or_assign( + Keys[KeyOffset++], + FileMapEntry{.ServerPath = std::move(ServerPath), .ClientPath = std::move(ClientPath)}); + } } } + m_LogFlushPosition = Header.V2.LogPosition; + m_Storage->SetMaxLSNAndNextOpOffset(Header.V2.MaxLSN, Header.V2.NextOpCoreOffset); + EntryCount = Header.V2.OpPayloadCount; + m_IsLegacySnapshot = false; + } + else + { + ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'. Unsupported version number {}", + m_OuterProjectId, + m_OplogId, + IndexPath, + Header.Version); + return; } - m_LogFlushPosition = Header.LogPosition; - m_Storage->SetMaxLSNAndNextWriteAddress(MaxLSN, LastOpAddress); - EntryCount = Header.LSNCount; } else { @@ -2073,8 +2259,8 @@ ProjectStore::Oplog::ReadIndexSnapshot() } catch (const std::exception& Ex) { - m_OpAddressMap.clear(); - m_LatestOpMap.clear(); + m_OpToPayloadOffsetMap.clear(); + m_OpLogPayloads.clear(); m_ChunkMap.clear(); m_MetaMap.clear(); m_FileMap.clear(); @@ -2106,19 +2292,7 @@ ProjectStore::Oplog::GetUnusedSpacePercentLocked() const return 0; } - std::vector<OplogEntryAddress> Addresses; - { - Addresses.reserve(m_LatestOpMap.size()); - for (auto It : m_LatestOpMap) - { - if (auto AddressIt = m_OpAddressMap.find(It.second); AddressIt != m_OpAddressMap.end()) - { - Addresses.push_back(AddressIt->second); - } - } - } - - const uint64_t EffectiveBlobsSize = m_Storage->GetEffectiveBlobsSize(std::move(Addresses)); + const uint64_t EffectiveBlobsSize = m_Storage->GetEffectiveBlobsSize(m_OpLogPayloads); if (EffectiveBlobsSize < ActualBlobsSize) { @@ -2144,31 +2318,36 @@ ProjectStore::Oplog::Compact(RwLock::ExclusiveLockScope&, bool DryRun, bool Reta Stopwatch Timer; - std::vector<uint32_t> LSNs; - LSNs.reserve(m_LatestOpMap.size()); - for (auto It : m_LatestOpMap) + std::vector<LogSequenceNumber> LatestLSNs; + LatestLSNs.reserve(m_OpLogPayloads.size()); + for (const auto& Kv : m_OpToPayloadOffsetMap) { - LSNs.push_back(It.second); + const OplogPayload& OpPayload = m_OpLogPayloads[Kv.second]; + LatestLSNs.push_back(OpPayload.Lsn); } - tsl::robin_map<uint32_t, OplogEntryAddress> OpAddressMap; // Index LSN -> op data in ops blob file - OidMap<uint32_t> LatestOpMap; // op key -> latest op LSN for key + std::vector<OplogPayload> OpPayloads; + OpPayloads.reserve(LatestLSNs.size()); + + OidMap<PayloadIndex> OpToPayloadOffsetMap; + OpToPayloadOffsetMap.reserve(LatestLSNs.size()); uint64_t PreSize = TotalSize(); m_Storage->Compact( - LSNs, - [&](const Oid& OpKeyHash, uint32_t, uint32_t NewLSN, const OplogEntryAddress& NewAddress) { - LatestOpMap.insert_or_assign(OpKeyHash, NewLSN); - OpAddressMap.insert_or_assign(NewLSN, NewAddress); + LatestLSNs, + [&](const Oid& OpKeyHash, LogSequenceNumber, LogSequenceNumber NewLsn, const OplogEntryAddress& NewAddress) { + OpToPayloadOffsetMap.insert({OpKeyHash, PayloadIndex(OpPayloads.size())}); + OpPayloads.push_back({.Lsn = NewLsn, .Address = NewAddress}); }, RetainLSNs, /*DryRun*/ DryRun); if (!DryRun) { - m_OpAddressMap.swap(OpAddressMap); - m_LatestOpMap.swap(LatestOpMap); + m_OpToPayloadOffsetMap.swap(OpToPayloadOffsetMap); + m_OpLogPayloads.swap(OpPayloads); + m_LsnToPayloadOffsetMap.reset(); WriteIndexSnapshot(); } @@ -2498,15 +2677,44 @@ ProjectStore::Oplog::IterateOplog(std::function<void(CbObjectView)>&& Handler, c IterateOplogLocked(std::move(Handler), EntryPaging); } -template<typename ContainerElement> -std::span<ContainerElement> -CreateSpanFromPaging(std::vector<ContainerElement>& Container, const ProjectStore::Oplog::Paging& Paging) +std::vector<ProjectStore::Oplog::PayloadIndex> +ProjectStore::Oplog::GetSortedOpPayloadRangeLocked(const Paging& EntryPaging, + tsl::robin_map<PayloadIndex, Oid, PayloadIndex::Hasher>* OutOptionalReverseKeyMap) { - std::span<ContainerElement> Span(Container); - int32_t Size = int32_t(Container.size()); - int32_t Start = std::clamp(Paging.Start, 0, Size); - int32_t End = Paging.Count < 0 ? Size : (Start + std::min<int32_t>(Paging.Count, Size - Start)); - return Span.subspan(Start, End - Start); + std::pair<int32_t, int32_t> StartAndEnd = GetPagedRange(int32_t(m_OpToPayloadOffsetMap.size()), EntryPaging); + if (StartAndEnd.first == StartAndEnd.second) + { + return {}; + } + + const int32_t ReplayCount = StartAndEnd.second - StartAndEnd.first; + + auto Start = m_OpToPayloadOffsetMap.cbegin(); + std::advance(Start, StartAndEnd.first); + + auto End = Start; + std::advance(End, ReplayCount); + + std::vector<PayloadIndex> ReplayOrder; + ReplayOrder.reserve(ReplayCount); + + for (auto It = Start; It != End; It++) + { + const PayloadIndex PayloadOffset = It->second; + if (OutOptionalReverseKeyMap != nullptr) + { + OutOptionalReverseKeyMap->insert_or_assign(PayloadOffset, It->first); + } + ReplayOrder.push_back(PayloadOffset); + } + + std::sort(ReplayOrder.begin(), ReplayOrder.end(), [this](const PayloadIndex Lhs, const PayloadIndex Rhs) { + const OplogEntryAddress& LhsEntry = m_OpLogPayloads[Lhs].Address; + const OplogEntryAddress& RhsEntry = m_OpLogPayloads[Rhs].Address; + return LhsEntry.Offset < RhsEntry.Offset; + }); + + return ReplayOrder; } void @@ -2519,23 +2727,44 @@ ProjectStore::Oplog::IterateOplogLocked(std::function<void(CbObjectView)>&& Hand return; } - std::vector<OplogEntryAddress> Entries; - Entries.reserve(m_LatestOpMap.size()); - - for (const auto& Kv : m_LatestOpMap) + std::vector<PayloadIndex> ReplayOrder = GetSortedOpPayloadRangeLocked(EntryPaging, nullptr); + if (!ReplayOrder.empty()) { - const auto AddressEntry = m_OpAddressMap.find(Kv.second); - ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); - - Entries.push_back(AddressEntry->second); + m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber, CbObjectView Op) { Handler(Op); }); } +} - std::sort(Entries.begin(), Entries.end(), [](const OplogEntryAddress& Lhs, const OplogEntryAddress& Rhs) { - return Lhs.Offset < Rhs.Offset; - }); +void +ProjectStore::Oplog::IterateOplogWithKey(std::function<void(LogSequenceNumber, const Oid&, CbObjectView)>&& Handler) +{ + IterateOplogWithKey(std::move(Handler), Paging{}); +} + +void +ProjectStore::Oplog::IterateOplogWithKey(std::function<void(LogSequenceNumber, const Oid&, CbObjectView)>&& Handler, + const Paging& EntryPaging) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); - std::span<OplogEntryAddress> EntrySpan = CreateSpanFromPaging(Entries, EntryPaging); - m_Storage->ReplayLogEntries(EntrySpan, [&](CbObjectView Op) { Handler(Op); }); + tsl::robin_map<PayloadIndex, Oid, PayloadIndex::Hasher> ReverseKeyMap; + std::vector<PayloadIndex> ReplayOrder; + + { + RwLock::SharedLockScope _(m_OplogLock); + if (m_Storage) + { + ReplayOrder = GetSortedOpPayloadRangeLocked(EntryPaging, &ReverseKeyMap); + if (!ReplayOrder.empty()) + { + uint32_t EntryIndex = 0; + m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber Lsn, CbObjectView Op) { + const PayloadIndex PayloadOffset = ReplayOrder[EntryIndex]; + Handler(Lsn, ReverseKeyMap.at(PayloadOffset), Op); + EntryIndex++; + }); + } + } + } } static constexpr uint32_t OplogMetaDataExpectedMagic = 0x6f'74'6d'62; // 'omta'; @@ -2625,80 +2854,10 @@ ProjectStore::Oplog::GetOplogEntryCount() const { return 0; } - return m_LatestOpMap.size(); -} - -void -ProjectStore::Oplog::IterateOplogWithKey(std::function<void(uint32_t, const Oid&, CbObjectView)>&& Handler) -{ - IterateOplogWithKey(std::move(Handler), Paging{}); + return m_OpToPayloadOffsetMap.size(); } -void -ProjectStore::Oplog::IterateOplogWithKey(std::function<void(uint32_t, const Oid&, CbObjectView)>&& Handler, const Paging& EntryPaging) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - RwLock::SharedLockScope _(m_OplogLock); - if (!m_Storage) - { - return; - } - - std::vector<OplogEntryAddress> SortedEntries; - std::vector<Oid> SortedKeys; - std::vector<uint32_t> SortedLSNs; - - { - const auto TargetEntryCount = m_LatestOpMap.size(); - - std::vector<size_t> EntryIndexes; - std::vector<OplogEntryAddress> Entries; - std::vector<Oid> Keys; - std::vector<uint32_t> LSNs; - - Entries.reserve(TargetEntryCount); - EntryIndexes.reserve(TargetEntryCount); - Keys.reserve(TargetEntryCount); - LSNs.reserve(TargetEntryCount); - - for (const auto& Kv : m_LatestOpMap) - { - const auto AddressEntry = m_OpAddressMap.find(Kv.second); - ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); - - Entries.push_back(AddressEntry->second); - Keys.push_back(Kv.first); - LSNs.push_back(Kv.second); - EntryIndexes.push_back(EntryIndexes.size()); - } - - std::sort(EntryIndexes.begin(), EntryIndexes.end(), [&Entries](const size_t& Lhs, const size_t& Rhs) { - const OplogEntryAddress& LhsEntry = Entries[Lhs]; - const OplogEntryAddress& RhsEntry = Entries[Rhs]; - return LhsEntry.Offset < RhsEntry.Offset; - }); - - SortedEntries.reserve(EntryIndexes.size()); - SortedKeys.reserve(EntryIndexes.size()); - SortedLSNs.reserve(EntryIndexes.size()); - - for (size_t Index : EntryIndexes) - { - SortedEntries.push_back(Entries[Index]); - SortedKeys.push_back(Keys[Index]); - SortedLSNs.push_back(LSNs[Index]); - } - } - - std::span<OplogEntryAddress> EntrySpan = CreateSpanFromPaging(SortedEntries, EntryPaging); - size_t EntryIndex = EntrySpan.empty() ? 0 : static_cast<size_t>(&EntrySpan.front() - &SortedEntries.front()); - m_Storage->ReplayLogEntries(EntrySpan, [&](CbObjectView Op) { - Handler(SortedLSNs[EntryIndex], SortedKeys[EntryIndex], Op); - EntryIndex++; - }); -} - -std::optional<uint32_t> +ProjectStore::LogSequenceNumber ProjectStore::Oplog::GetOpIndexByKey(const Oid& Key) { RwLock::SharedLockScope _(m_OplogLock); @@ -2706,9 +2865,9 @@ ProjectStore::Oplog::GetOpIndexByKey(const Oid& Key) { return {}; } - if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end()) + if (const auto LatestOp = m_OpToPayloadOffsetMap.find(Key); LatestOp != m_OpToPayloadOffsetMap.end()) { - return LatestOp->second; + return m_OpLogPayloads[LatestOp->second].Lsn; } return {}; } @@ -2722,31 +2881,44 @@ ProjectStore::Oplog::GetOpByKey(const Oid& Key) return {}; } - if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end()) + if (const auto LatestOp = m_OpToPayloadOffsetMap.find(Key); LatestOp != m_OpToPayloadOffsetMap.end()) { - const auto AddressEntry = m_OpAddressMap.find(LatestOp->second); - ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); - - return m_Storage->GetOp(AddressEntry->second); + return m_Storage->GetOp(m_OpLogPayloads[LatestOp->second].Address); } return {}; } std::optional<CbObject> -ProjectStore::Oplog::GetOpByIndex(uint32_t Index) +ProjectStore::Oplog::GetOpByIndex(ProjectStore::LogSequenceNumber Index) { - RwLock::SharedLockScope _(m_OplogLock); + { + RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return {}; + } + + if (m_LsnToPayloadOffsetMap) + { + if (auto It = m_LsnToPayloadOffsetMap->find(Index); It != m_LsnToPayloadOffsetMap->end()) + { + return m_Storage->GetOp(m_OpLogPayloads[It->second].Address); + } + } + } + + RwLock::ExclusiveLockScope Lock(m_OplogLock); if (!m_Storage) { return {}; } - if (const auto AddressEntryIt = m_OpAddressMap.find(Index); AddressEntryIt != m_OpAddressMap.end()) + RefreshLsnToPayloadOffsetMap(Lock); + if (auto It = m_LsnToPayloadOffsetMap->find(Index); It != m_LsnToPayloadOffsetMap->end()) { - return m_Storage->GetOp(AddressEntryIt->second); + return m_Storage->GetOp(m_OpLogPayloads[It->second].Address); } - return {}; } @@ -2772,14 +2944,14 @@ ProjectStore::Oplog::EnableUpdateCapture() m_OplogLock.WithExclusiveLock([&]() { if (m_UpdateCaptureRefCounter == 0) { - ZEN_ASSERT(!m_CapturedLSNs); + ZEN_ASSERT(!m_CapturedOps); ZEN_ASSERT(!m_CapturedAttachments); - m_CapturedLSNs = std::make_unique<std::vector<uint32_t>>(); + m_CapturedOps = std::make_unique<std::vector<Oid>>(); m_CapturedAttachments = std::make_unique<std::vector<IoHash>>(); } else { - ZEN_ASSERT(m_CapturedLSNs); + ZEN_ASSERT(m_CapturedOps); ZEN_ASSERT(m_CapturedAttachments); } m_UpdateCaptureRefCounter++; @@ -2792,34 +2964,36 @@ ProjectStore::Oplog::DisableUpdateCapture() ZEN_MEMSCOPE(GetProjectstoreTag()); m_OplogLock.WithExclusiveLock([&]() { - ZEN_ASSERT(m_CapturedLSNs); + ZEN_ASSERT(m_CapturedOps); ZEN_ASSERT(m_CapturedAttachments); ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); m_UpdateCaptureRefCounter--; if (m_UpdateCaptureRefCounter == 0) { - m_CapturedLSNs.reset(); + m_CapturedOps.reset(); m_CapturedAttachments.reset(); } }); } void -ProjectStore::Oplog::IterateCapturedLSNsLocked(std::function<bool(const CbObjectView& UpdateOp)>&& Callback) +ProjectStore::Oplog::IterateCapturedOpsLocked( + std::function<bool(const Oid& Key, ProjectStore::LogSequenceNumber LSN, const CbObjectView& UpdateOp)>&& Callback) { ZEN_MEMSCOPE(GetProjectstoreTag()); - if (m_CapturedLSNs) + if (m_CapturedOps) { if (!m_Storage) { return; } - for (uint32_t UpdatedLSN : *m_CapturedLSNs) + for (const Oid& OpKey : *m_CapturedOps) { - if (const auto AddressEntryIt = m_OpAddressMap.find(UpdatedLSN); AddressEntryIt != m_OpAddressMap.end()) + if (const auto AddressEntryIt = m_OpToPayloadOffsetMap.find(OpKey); AddressEntryIt != m_OpToPayloadOffsetMap.end()) { - Callback(m_Storage->GetOp(AddressEntryIt->second)); + const OplogPayload& OpPayload = m_OpLogPayloads[AddressEntryIt->second]; + Callback(OpKey, OpPayload.Lsn, m_Storage->GetOp(OpPayload.Address)); } } } @@ -3045,7 +3219,7 @@ ProjectStore::Oplog::GetMapping(CbObjectView Core) return Result; } -uint32_t +ProjectStore::LogSequenceNumber ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, const OplogEntryMapping& OpMapping, const OplogEntry& OpEntry) @@ -3072,12 +3246,18 @@ ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, AddMetaMapping(OplogLock, Meta.Id, Meta.Hash); } - m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize}); - m_LatestOpMap[OpEntry.OpKeyHash] = OpEntry.OpLsn; + const PayloadIndex PayloadOffset(m_OpLogPayloads.size()); + m_OpToPayloadOffsetMap.insert_or_assign(OpEntry.OpKeyHash, PayloadOffset); + m_OpLogPayloads.push_back({.Lsn = OpEntry.OpLsn, .Address = OpEntry.OpCoreAddress}); + + if (m_LsnToPayloadOffsetMap) + { + m_LsnToPayloadOffsetMap->insert_or_assign(OpEntry.OpLsn, PayloadOffset); + } return OpEntry.OpLsn; } -uint32_t +ProjectStore::LogSequenceNumber ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) { ZEN_ASSERT(m_Mode == EMode::kFull); @@ -3085,9 +3265,9 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::Oplog::AppendNewOplogEntry"); - const CbObject& Core = OpPackage.GetObject(); - const uint32_t EntryId = AppendNewOplogEntry(Core); - if (EntryId == 0xffffffffu) + const CbObject& Core = OpPackage.GetObject(); + const ProjectStore::LogSequenceNumber EntryId = AppendNewOplogEntry(Core); + if (!EntryId) { // The oplog has been deleted so just drop this return EntryId; @@ -3132,7 +3312,7 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) } } - ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes)); + ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId.Number, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes)); return EntryId; } @@ -3149,7 +3329,7 @@ ProjectStore::Oplog::GetStorage() return Storage; } -uint32_t +ProjectStore::LogSequenceNumber ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core) { ZEN_ASSERT(m_Mode == EMode::kFull); @@ -3162,7 +3342,7 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core) RefPtr<OplogStorage> Storage = GetStorage(); if (!Storage) { - return 0xffffffffu; + return {}; } OplogEntryMapping Mapping = GetMapping(Core); @@ -3170,18 +3350,18 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core) const OplogEntry OpEntry = Storage->AppendOp(OpData); - RwLock::ExclusiveLockScope OplogLock(m_OplogLock); - const uint32_t EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry); - if (m_CapturedLSNs) + RwLock::ExclusiveLockScope OplogLock(m_OplogLock); + const ProjectStore::LogSequenceNumber EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry); + if (m_CapturedOps) { - m_CapturedLSNs->push_back(EntryId); + m_CapturedOps->push_back(OpData.KeyHash); } m_MetaValid = false; return EntryId; } -std::vector<uint32_t> +std::vector<ProjectStore::LogSequenceNumber> ProjectStore::Oplog::AppendNewOplogEntries(std::span<CbObjectView> Cores) { ZEN_ASSERT(m_Mode == EMode::kFull); @@ -3194,7 +3374,7 @@ ProjectStore::Oplog::AppendNewOplogEntries(std::span<CbObjectView> Cores) RefPtr<OplogStorage> Storage = GetStorage(); if (!Storage) { - return std::vector<uint32_t>(Cores.size(), 0xffffffffu); + return std::vector<ProjectStore::LogSequenceNumber>(Cores.size(), LogSequenceNumber{}); } size_t OpCount = Cores.size(); @@ -3212,21 +3392,21 @@ ProjectStore::Oplog::AppendNewOplogEntries(std::span<CbObjectView> Cores) std::vector<OplogEntry> OpEntries = Storage->AppendOps(OpDatas); - std::vector<uint32_t> EntryIds; + std::vector<ProjectStore::LogSequenceNumber> EntryIds; EntryIds.resize(OpCount); { { RwLock::ExclusiveLockScope OplogLock(m_OplogLock); - if (m_CapturedLSNs) + if (m_CapturedOps) { - m_CapturedLSNs->reserve(m_CapturedLSNs->size() + OpCount); + m_CapturedOps->reserve(m_CapturedOps->size() + OpCount); } for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++) { EntryIds[OpIndex] = RegisterOplogEntry(OplogLock, Mappings[OpIndex], OpEntries[OpIndex]); - if (m_CapturedLSNs) + if (m_CapturedOps) { - m_CapturedLSNs->push_back(EntryIds[OpIndex]); + m_CapturedOps->push_back(OpDatas[OpIndex].KeyHash); } } } @@ -5917,11 +6097,11 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, for (CbObject& NewOp : NewOps) { - uint32_t NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp)); + ProjectStore::LogSequenceNumber NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp)); - ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn); + ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn.Number); - ResponseObj.AddInteger(NewLsn); + ResponseObj.AddInteger(NewLsn.Number); } ResponseObj.EndArray(); @@ -6678,7 +6858,8 @@ public: if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end()) { Ref<ProjectStore::Oplog> Oplog = It->second; - Oplog->IterateCapturedLSNsLocked([&](const CbObjectView& UpdateOp) -> bool { + Oplog->IterateCapturedOpsLocked([&](const Oid& Key, ProjectStore::LogSequenceNumber LSN, const CbObjectView& UpdateOp) -> bool { + ZEN_UNUSED(Key, LSN); UpdateOp.IterateAttachments([&](CbFieldView Visitor) { m_AddedReferences.emplace_back(Visitor.AsAttachment()); }); return true; }); @@ -6884,8 +7065,8 @@ public: m_OplogId, NiceTimeSpanMs(Timer.GetElapsedTimeMs()), Result.OpCount, - Result.LSNLow, - Result.LSNHigh, + Result.LSNLow.Number, + Result.LSNHigh.Number, Status); }); Ref<ProjectStore::Oplog> Oplog; @@ -8988,7 +9169,7 @@ TEST_CASE("project.store.iterateoplog") } }; auto IncrementCount = [&Count](CbObjectView /* Op */) { ++Count; }; - auto MarkFound = [&TestOids, &Count](uint32_t /* LSN */, const Oid& /* InId */, CbObjectView Op) { + auto MarkFound = [&TestOids, &Count](ProjectStore::LogSequenceNumber /* LSN */, const Oid& /* InId */, CbObjectView Op) { for (TestOidData& TestOid : TestOids) { if (Op["key"sv].AsString() == TestOid.Key) |