aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore/projectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-09-15 17:08:44 +0200
committerGitHub Enterprise <[email protected]>2025-09-15 17:08:44 +0200
commitabb1413494537381f2716d69ec459f485f923f71 (patch)
tree8dbb797886df2b3a5bfefcec8579f2985fd2c252 /src/zenserver/projectstore/projectstore.cpp
parentrevise exception vs error (#495) (diff)
downloadzen-abb1413494537381f2716d69ec459f485f923f71.tar.xz
zen-abb1413494537381f2716d69ec459f485f923f71.zip
new in-memory storage strategy for oplogs (#490)
- Improvement: Revised project oplog in-memory representation which reduces load times and memory usage
Diffstat (limited to 'src/zenserver/projectstore/projectstore.cpp')
-rw-r--r--src/zenserver/projectstore/projectstore.cpp1015
1 files changed, 598 insertions, 417 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 7cb115110..97175da23 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -388,31 +388,68 @@ namespace {
: fmt::format("{}: {}", Result.Reason, Result.Text)};
}
+ std::pair<int32_t, int32_t> GetPagedRange(int32_t TotalSize, const ProjectStore::Oplog::Paging& EntryPaging)
+ {
+ int32_t Start = std::clamp(EntryPaging.Start, 0, TotalSize);
+ int32_t End = EntryPaging.Count < 0 ? TotalSize : (Start + std::min<int32_t>(EntryPaging.Count, TotalSize - Start));
+ return {Start, End};
+ }
+
#pragma pack(push)
#pragma pack(1)
struct OplogIndexHeader
{
+ OplogIndexHeader() {}
+
+ struct BodyV1
+ {
+ uint64_t LogPosition = 0;
+ uint32_t LSNCount = 0;
+ uint64_t KeyCount = 0;
+ uint32_t OpAddressMapCount = 0;
+ uint32_t LatestOpMapCount = 0;
+ uint64_t ChunkMapCount = 0;
+ uint64_t MetaMapCount = 0;
+ uint64_t FileMapCount = 0;
+ };
+
+ struct BodyV2
+ {
+ uint64_t LogPosition = 0;
+ uint64_t KeyCount = 0;
+ uint32_t OpPayloadIndexCount = 0;
+ uint32_t OpPayloadCount = 0;
+ uint32_t ChunkMapCount = 0;
+ uint32_t MetaMapCount = 0;
+ uint32_t FileMapCount = 0;
+ uint32_t MaxLSN = 0;
+ uint32_t NextOpCoreOffset = 0; // note: Multiple of oplog data alignment!
+ uint32_t Reserved[2] = {0, 0};
+ };
+
static constexpr uint32_t ExpectedMagic = 0x7569647a; // 'zidx';
- static constexpr uint32_t CurrentVersion = 1;
+ static constexpr uint32_t Version1 = 1;
+ static constexpr uint32_t Version2 = 2;
+ static constexpr uint32_t CurrentVersion = Version2;
static constexpr uint64_t DataAlignment = 8;
- uint32_t Magic = ExpectedMagic;
- uint32_t Version = CurrentVersion;
- uint64_t LogPosition = 0;
- uint32_t LSNCount = 0;
- uint64_t KeyCount = 0;
- uint32_t OpAddressMapCount = 0;
- uint32_t LatestOpMapCount = 0;
- uint64_t ChunkMapCount = 0;
- uint64_t MetaMapCount = 0;
- uint64_t FileMapCount = 0;
- uint32_t Checksum = 0;
+ uint32_t Magic = ExpectedMagic;
+ uint32_t Version = CurrentVersion;
+
+ union
+ {
+ BodyV1 V1;
+ BodyV2 V2;
+ };
+
+ uint32_t Checksum = 0;
static uint32_t ComputeChecksum(const OplogIndexHeader& Header)
{
return XXH32(&Header.Magic, sizeof(OplogIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
}
};
+
#pragma pack(pop)
static_assert(sizeof(OplogIndexHeader) == 64);
@@ -545,6 +582,14 @@ struct ProjectStore::OplogStorage : public RefCounted
m_NextOpsOffset = RoundUp((NextOpFileOffset.Offset * m_OpsAlign) + NextOpFileOffset.Size, m_OpsAlign);
}
+ void SetMaxLSNAndNextOpOffset(uint32_t MaxLSN, uint32_t NextOpOffset)
+ {
+ m_MaxLsn.store(MaxLSN);
+ m_NextOpsOffset = NextOpOffset * m_OpsAlign;
+ }
+
+ uint32_t GetNextOpOffset() const { return gsl::narrow<uint32_t>(m_NextOpsOffset / m_OpsAlign); }
+
enum EMode
{
Create,
@@ -603,35 +648,37 @@ struct ProjectStore::OplogStorage : public RefCounted
{
ZEN_MEMSCOPE(GetProjectstoreTag());
- const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
- const MemoryView OpBufferView = OpBlobsBuffer.MakeView(LogEntry.OpCoreSize, OpFileOffset);
- if (OpBufferView.GetSize() == LogEntry.OpCoreSize)
+ const uint64_t OpFileOffset = LogEntry.OpCoreAddress.Offset * m_OpsAlign;
+ const MemoryView OpBufferView = OpBlobsBuffer.MakeView(LogEntry.OpCoreAddress.Size, OpFileOffset);
+ if (OpBufferView.GetSize() == LogEntry.OpCoreAddress.Size)
{
return IoBuffer(IoBuffer::Wrap, OpBufferView.GetData(), OpBufferView.GetSize());
}
else
{
- IoBuffer OpBuffer(LogEntry.OpCoreSize);
- OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset);
+ IoBuffer OpBuffer(LogEntry.OpCoreAddress.Size);
+ OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreAddress.Size, OpFileOffset);
return OpBuffer;
}
}
- uint64_t GetEffectiveBlobsSize(std::span<const OplogEntryAddress> Addresses) const
+ uint64_t GetEffectiveBlobsSize(std::span<const Oplog::OplogPayload> Addresses) const
{
uint64_t EffectiveSize = 0;
- for (const OplogEntryAddress& Address : Addresses)
+ for (const Oplog::OplogPayload& OpPayload : Addresses)
{
- EffectiveSize += RoundUp(Address.Size, m_OpsAlign);
+ EffectiveSize += RoundUp(OpPayload.Address.Size, m_OpsAlign);
}
return EffectiveSize;
}
- void Compact(
- std::span<const uint32_t> LSNs,
- std::function<void(const Oid& OpKeyHash, uint32_t OldLSN, uint32_t NewLSN, const OplogEntryAddress& NewAddress)>&& Callback,
- bool RetainLSNs,
- bool DryRun)
+ void Compact(std::span<const ProjectStore::LogSequenceNumber> LSNs,
+ std::function<void(const Oid& OpKeyHash,
+ ProjectStore::LogSequenceNumber OldLSN,
+ ProjectStore::LogSequenceNumber NewLSN,
+ const OplogEntryAddress& NewAddress)>&& Callback,
+ bool RetainLSNs,
+ bool DryRun)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
ZEN_TRACE_CPU("Store::OplogStorage::Compact");
@@ -665,9 +712,9 @@ struct ProjectStore::OplogStorage : public RefCounted
std::vector<OplogEntry> Ops;
Ops.reserve(LSNs.size());
- tsl::robin_map<uint32_t, size_t> LSNToIndex;
+ ProjectStore::LsnMap<size_t> LSNToIndex;
LSNToIndex.reserve(LSNs.size());
- for (uint32_t LSN : LSNs)
+ for (ProjectStore::LogSequenceNumber LSN : LSNs)
{
LSNToIndex[LSN] = (size_t)-1;
}
@@ -693,10 +740,10 @@ struct ProjectStore::OplogStorage : public RefCounted
SkipEntryCount);
std::sort(Ops.begin(), Ops.end(), [&](const OplogEntry& Lhs, const OplogEntry& Rhs) {
- return Lhs.OpCoreOffset < Rhs.OpCoreOffset;
+ return Lhs.OpCoreAddress.Offset < Rhs.OpCoreAddress.Offset;
});
- std::vector<uint32_t> OldLSNs;
+ std::vector<ProjectStore::LogSequenceNumber> OldLSNs;
OldLSNs.reserve(Ops.size());
uint64_t OpWriteOffset = 0;
@@ -712,15 +759,15 @@ struct ProjectStore::OplogStorage : public RefCounted
IoBuffer OpBuffer = GetOpBuffer(OldBlobsBuffer, LogEntry);
if (RetainLSNs)
{
- MaxLSN = Max(MaxLSN, LogEntry.OpLsn);
+ MaxLSN = Max(MaxLSN, LogEntry.OpLsn.Number);
}
else
{
- LogEntry.OpLsn = ++MaxLSN;
+ LogEntry.OpLsn = ProjectStore::LogSequenceNumber(++MaxLSN);
}
- LogEntry.OpCoreOffset = gsl::narrow<uint32_t>(OpWriteOffset / m_OpsAlign);
- NewOpBlobsBuffer.Write(OpBuffer.GetData(), LogEntry.OpCoreSize, OpWriteOffset);
- OpWriteOffset = RoundUp((LogEntry.OpCoreOffset * m_OpsAlign) + LogEntry.OpCoreSize, m_OpsAlign);
+ LogEntry.OpCoreAddress.Offset = gsl::narrow<uint32_t>(OpWriteOffset / m_OpsAlign);
+ NewOpBlobsBuffer.Write(OpBuffer.GetData(), LogEntry.OpCoreAddress.Size, OpWriteOffset);
+ OpWriteOffset = RoundUp((LogEntry.OpCoreAddress.Offset * m_OpsAlign) + LogEntry.OpCoreAddress.Size, m_OpsAlign);
}
Oplog.Append(Ops);
}
@@ -768,10 +815,7 @@ struct ProjectStore::OplogStorage : public RefCounted
for (size_t Index = 0; Index < Ops.size(); Index++)
{
const OplogEntry& LogEntry = Ops[Index];
- Callback(LogEntry.OpKeyHash,
- OldLSNs[Index],
- LogEntry.OpLsn,
- OplogEntryAddress{.Offset = LogEntry.OpCoreOffset, .Size = LogEntry.OpCoreSize});
+ Callback(LogEntry.OpKeyHash, OldLSNs[Index], LogEntry.OpLsn, LogEntry.OpCoreAddress);
}
ZEN_INFO("oplog '{}/{}': compact completed in {} - Max LSN# {}, New size: {}, old size {}.",
@@ -837,21 +881,21 @@ struct ProjectStore::OplogStorage : public RefCounted
return;
}
}
- else if (LogEntry.OpCoreSize == 0)
+ else if (LogEntry.OpCoreAddress.Size == 0)
{
ZEN_SCOPED_WARN("skipping zero size op {}", LogEntry.OpKeyHash);
++OutInvalidEntries;
return;
}
- else if (LogEntry.OpLsn == 0)
+ else if (LogEntry.OpLsn.Number == 0)
{
ZEN_SCOPED_WARN("skipping zero lsn op {}", LogEntry.OpKeyHash);
++OutInvalidEntries;
return;
}
- const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
- if ((OpFileOffset + LogEntry.OpCoreSize) > OpsBlockSize)
+ const uint64_t OpFileOffset = LogEntry.OpCoreAddress.Offset * m_OpsAlign;
+ if ((OpFileOffset + LogEntry.OpCoreAddress.Size) > OpsBlockSize)
{
ZEN_SCOPED_WARN("skipping out of bounds op {}", LogEntry.OpKeyHash);
++OutInvalidEntries;
@@ -905,7 +949,7 @@ struct ProjectStore::OplogStorage : public RefCounted
uint64_t NextOpFileOffset = m_NextOpsOffset;
std::sort(OpLogEntries.begin(), OpLogEntries.end(), [&](const OplogEntry& Lhs, const OplogEntry& Rhs) {
- return Lhs.OpCoreOffset < Rhs.OpCoreOffset;
+ return Lhs.OpCoreAddress.Offset < Rhs.OpCoreAddress.Offset;
});
for (const OplogEntry& LogEntry : OpLogEntries)
@@ -921,7 +965,7 @@ struct ProjectStore::OplogStorage : public RefCounted
// Verify checksum, ignore op data if incorrect
const uint32_t ExpectedOpCoreHash = LogEntry.OpCoreHash;
- const uint32_t OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.GetData(), LogEntry.OpCoreSize) & 0xffffFFFF);
+ const uint32_t OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.GetData(), LogEntry.OpCoreAddress.Size) & 0xffffFFFF);
if (OpCoreHash != ExpectedOpCoreHash)
{
@@ -944,9 +988,10 @@ struct ProjectStore::OplogStorage : public RefCounted
else
{
Handler(CbObjectView(OpBuffer.GetData()), LogEntry);
- MaxOpLsn = Max(MaxOpLsn, LogEntry.OpLsn);
- const uint64_t EntryNextOpFileOffset = RoundUp((LogEntry.OpCoreOffset * m_OpsAlign) + LogEntry.OpCoreSize, m_OpsAlign);
- NextOpFileOffset = Max(NextOpFileOffset, EntryNextOpFileOffset);
+ MaxOpLsn = Max(MaxOpLsn, LogEntry.OpLsn.Number);
+ const uint64_t EntryNextOpFileOffset =
+ RoundUp((LogEntry.OpCoreAddress.Offset * m_OpsAlign) + LogEntry.OpCoreAddress.Size, m_OpsAlign);
+ NextOpFileOffset = Max(NextOpFileOffset, EntryNextOpFileOffset);
}
}
}
@@ -965,25 +1010,29 @@ struct ProjectStore::OplogStorage : public RefCounted
InvalidEntries);
}
- void ReplayLogEntries(const std::span<OplogEntryAddress> Entries, std::function<void(CbObjectView)>&& Handler)
+ void ReplayLogEntries(const std::span<const Oplog::OplogPayload> Entries,
+ const std::span<const Oplog::PayloadIndex> Order,
+ std::function<void(LogSequenceNumber Lsn, CbObjectView)>&& Handler)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
ZEN_TRACE_CPU("Store::OplogStorage::ReplayLogEntries");
BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536);
- for (const OplogEntryAddress& Entry : Entries)
+ for (ProjectStore::Oplog::PayloadIndex EntryOffset : Order)
{
- const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign;
- MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Size, OpFileOffset);
- if (OpBufferView.GetSize() == Entry.Size)
+ const Oplog::OplogPayload& Entry = Entries[EntryOffset];
+
+ const uint64_t OpFileOffset = Entry.Address.Offset * m_OpsAlign;
+ MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset);
+ if (OpBufferView.GetSize() == Entry.Address.Size)
{
- Handler(CbObjectView(OpBufferView.GetData()));
+ Handler(Entry.Lsn, CbObjectView(OpBufferView.GetData()));
continue;
}
- IoBuffer OpBuffer(Entry.Size);
- OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset);
- Handler(CbObjectView(OpBuffer.Data()));
+ IoBuffer OpBuffer(Entry.Address.Size);
+ OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset);
+ Handler(Entry.Lsn, CbObjectView(OpBuffer.Data()));
}
}
@@ -1034,8 +1083,8 @@ struct ProjectStore::OplogStorage : public RefCounted
RwLock::ExclusiveLockScope Lock(m_RwLock);
const uint64_t WriteOffset = m_NextOpsOffset;
- const uint32_t OpLsn = ++m_MaxLsn;
- if (OpLsn == std::numeric_limits<uint32_t>::max())
+ const LogSequenceNumber OpLsn(++m_MaxLsn);
+ if (!OpLsn.Number)
{
ZEN_ERROR("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId());
throw std::runtime_error(fmt::format("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId()));
@@ -1045,11 +1094,10 @@ struct ProjectStore::OplogStorage : public RefCounted
ZEN_ASSERT(IsMultipleOf(WriteOffset, m_OpsAlign));
- OplogEntry Entry = {.OpLsn = OpLsn,
- .OpCoreOffset = gsl::narrow_cast<uint32_t>(WriteOffset / m_OpsAlign),
- .OpCoreSize = uint32_t(WriteSize),
- .OpCoreHash = OpData.OpCoreHash,
- .OpKeyHash = OpData.KeyHash};
+ OplogEntry Entry = {.OpLsn = OpLsn,
+ .OpCoreAddress = {.Offset = gsl::narrow_cast<uint32_t>(WriteOffset / m_OpsAlign), .Size = uint32_t(WriteSize)},
+ .OpCoreHash = OpData.OpCoreHash,
+ .OpKeyHash = OpData.KeyHash};
m_OpBlobs.Write(OpData.Buffer.GetData(), WriteSize, WriteOffset);
m_Oplog.Append(Entry);
@@ -1064,7 +1112,7 @@ struct ProjectStore::OplogStorage : public RefCounted
size_t OpCount = Ops.size();
std::vector<std::pair<uint64_t, uint64_t>> OffsetAndSizes;
- std::vector<uint32_t> OpLsns;
+ std::vector<LogSequenceNumber> OpLsns;
OffsetAndSizes.resize(OpCount);
OpLsns.resize(OpCount);
@@ -1083,8 +1131,8 @@ struct ProjectStore::OplogStorage : public RefCounted
for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++)
{
OffsetAndSizes[OpIndex].first = WriteOffset - WriteStart;
- OpLsns[OpIndex] = ++m_MaxLsn;
- if (OpLsns[OpIndex] == std::numeric_limits<uint32_t>::max())
+ OpLsns[OpIndex] = LogSequenceNumber(++m_MaxLsn);
+ if (!OpLsns[OpIndex])
{
ZEN_ERROR("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId());
throw std::runtime_error(fmt::format("Oplog count has exceeded available range for oplog {}", m_OwnerOplog->OplogId()));
@@ -1103,11 +1151,12 @@ struct ProjectStore::OplogStorage : public RefCounted
{
MutableMemoryView WriteBufferView = WriteBuffer.GetMutableView().RightChop(OffsetAndSizes[OpIndex].first);
WriteBufferView.CopyFrom(Ops[OpIndex].Buffer);
- Entries[OpIndex] = {.OpLsn = OpLsns[OpIndex],
- .OpCoreOffset = gsl::narrow_cast<uint32_t>((WriteStart + OffsetAndSizes[OpIndex].first) / m_OpsAlign),
- .OpCoreSize = uint32_t(OffsetAndSizes[OpIndex].second),
- .OpCoreHash = Ops[OpIndex].OpCoreHash,
- .OpKeyHash = Ops[OpIndex].KeyHash};
+ Entries[OpIndex] = {
+ .OpLsn = OpLsns[OpIndex],
+ .OpCoreAddress = {.Offset = gsl::narrow_cast<uint32_t>((WriteStart + OffsetAndSizes[OpIndex].first) / m_OpsAlign),
+ .Size = uint32_t(OffsetAndSizes[OpIndex].second)},
+ .OpCoreHash = Ops[OpIndex].OpCoreHash,
+ .OpKeyHash = Ops[OpIndex].KeyHash};
}
m_OpBlobs.Write(WriteBuffer.GetData(), WriteBuffer.GetSize(), WriteStart);
@@ -1229,7 +1278,7 @@ ProjectStore::Oplog::Flush()
}
uint64_t LogCount = m_Storage->LogCount();
- if (m_LogFlushPosition != LogCount)
+ if (m_LogFlushPosition != LogCount || m_IsLegacySnapshot)
{
WriteIndexSnapshot();
}
@@ -1237,69 +1286,101 @@ ProjectStore::Oplog::Flush()
}
void
+ProjectStore::Oplog::RefreshLsnToPayloadOffsetMap(RwLock::ExclusiveLockScope&)
+{
+ if (!m_LsnToPayloadOffsetMap)
+ {
+ m_LsnToPayloadOffsetMap = std::make_unique<LsnMap<PayloadIndex>>();
+ m_LsnToPayloadOffsetMap->reserve(m_OpLogPayloads.size());
+ for (uint32_t PayloadOffset = 0; PayloadOffset < m_OpLogPayloads.size(); PayloadOffset++)
+ {
+ const OplogPayload& Payload = m_OpLogPayloads[PayloadOffset];
+ if (Payload.Lsn.Number != 0 && Payload.Address.Size != 0)
+ {
+ m_LsnToPayloadOffsetMap->insert_or_assign(Payload.Lsn, PayloadIndex(PayloadOffset));
+ }
+ }
+ }
+}
+
+void
ProjectStore::Oplog::Scrub(ScrubContext& Ctx)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
ZEN_ASSERT(m_Mode == EMode::kFull);
- std::vector<Oid> BadEntryKeys;
+ std::vector<std::pair<LogSequenceNumber, Oid>> BadEntries;
using namespace std::literals;
- IterateOplogWithKey([&](uint32_t Lsn, const Oid& Key, CbObjectView Op) {
- ZEN_UNUSED(Lsn);
-
- std::vector<IoHash> Cids;
- Op.IterateAttachments([&](CbFieldView Visitor) { Cids.emplace_back(Visitor.AsAttachment()); });
-
+ IterateOplogWithKey([&](LogSequenceNumber Lsn, const Oid& Key, CbObjectView Op) {
{
const Oid KeyHash = ComputeOpKey(Op);
-
- ZEN_ASSERT_FORMAT(KeyHash == Key, "oplog data does not match information from index (op:{} != index:{})", KeyHash, Key);
+ if (KeyHash != Key)
+ {
+ BadEntries.push_back({Lsn, Key});
+ ZEN_WARN("Scrub: oplog data does not match information from index (op:{} != index:{})", KeyHash, Key);
+ return;
+ }
}
- for (const IoHash& Cid : Cids)
- {
- if (!m_CidStore.ContainsChunk(Cid))
+ // TODO: Should we really delete an Op because it points to a missing or malformed Cid chunk?
+
+ Op.IterateAttachments([&](CbFieldView Visitor) {
+ const IoHash Cid = Visitor.AsAttachment();
+ if (Ctx.IsBadCid(Cid))
{
- // oplog entry references a CAS chunk which is not
- // present
- BadEntryKeys.push_back(Key);
+ // oplog entry references a CAS chunk which has been flagged as bad
+ BadEntries.push_back({Lsn, Key});
return;
}
- if (Ctx.IsBadCid(Cid))
+ if (!m_CidStore.ContainsChunk(Cid))
{
- // oplog entry references a CAS chunk which has been
- // flagged as bad
- BadEntryKeys.push_back(Key);
+ // oplog entry references a CAS chunk which is not present
+ BadEntries.push_back({Lsn, Key});
return;
}
- }
+ });
});
- if (!BadEntryKeys.empty())
+ if (!BadEntries.empty())
{
if (Ctx.RunRecovery())
{
ZEN_WARN("oplog '{}/{}': scrubbing found {} bad ops in oplog @ '{}', these will be removed from the index",
m_OuterProjectId,
m_OplogId,
- BadEntryKeys.size(),
+ BadEntries.size(),
m_BasePath);
// Actually perform some clean-up
- RwLock::ExclusiveLockScope _(m_OplogLock);
- for (const auto& Key : BadEntryKeys)
+ RwLock::ExclusiveLockScope Lock(m_OplogLock);
+
+ RefreshLsnToPayloadOffsetMap(Lock);
+
+ for (const auto& BadEntry : BadEntries)
{
- if (auto It = m_LatestOpMap.find(Key); It != m_LatestOpMap.end())
+ const LogSequenceNumber BadLsn = BadEntry.first;
+ const Oid& BadKey = BadEntry.second;
+ if (auto It = m_OpToPayloadOffsetMap.find(BadKey); It != m_OpToPayloadOffsetMap.end())
+ {
+ OplogPayload& Payload = m_OpLogPayloads[It->second];
+ if (Payload.Lsn == BadLsn)
+ {
+ m_OpToPayloadOffsetMap.erase(It);
+ m_Storage->AppendTombstone(BadKey);
+ }
+ }
+ if (auto LsnIt = m_LsnToPayloadOffsetMap->find(BadLsn); LsnIt != m_LsnToPayloadOffsetMap->end())
{
- m_OpAddressMap.erase(It->second);
- m_LatestOpMap.erase(It);
+ const PayloadIndex LsnPayloadOffset = LsnIt->second;
+ OplogPayload& LsnPayload = m_OpLogPayloads[LsnPayloadOffset];
+ LsnPayload = {.Lsn = LogSequenceNumber(0), .Address = {.Offset = 0, .Size = 0}};
}
- m_Storage->AppendTombstone(Key);
+ m_LsnToPayloadOffsetMap->erase(BadLsn);
}
- if (!BadEntryKeys.empty())
+ if (!BadEntries.empty())
{
m_MetaValid = false;
}
@@ -1309,7 +1390,7 @@ ProjectStore::Oplog::Scrub(ScrubContext& Ctx)
ZEN_WARN("oplog '{}/{}': scrubbing found {} bad ops in oplog @ '{}' but no cleanup will be performed",
m_OuterProjectId,
m_OplogId,
- BadEntryKeys.size(),
+ BadEntries.size(),
m_BasePath);
}
}
@@ -1355,8 +1436,9 @@ ProjectStore::Oplog::ResetState()
m_ChunkMap.clear();
m_MetaMap.clear();
m_FileMap.clear();
- m_OpAddressMap.clear();
- m_LatestOpMap.clear();
+ m_OpToPayloadOffsetMap.clear();
+ m_OpLogPayloads.clear();
+ m_LsnToPayloadOffsetMap.reset();
m_Storage = {};
}
@@ -1367,14 +1449,15 @@ ProjectStore::Oplog::PrepareForDelete(std::filesystem::path& OutRemoveDirectory)
RwLock::ExclusiveLockScope _(m_OplogLock);
m_UpdateCaptureRefCounter = 0;
- m_CapturedLSNs.reset();
+ m_CapturedOps.reset();
m_CapturedAttachments.reset();
m_PendingPrepOpAttachments.clear();
m_ChunkMap.clear();
m_MetaMap.clear();
m_FileMap.clear();
- m_OpAddressMap.clear();
- m_LatestOpMap.clear();
+ m_OpToPayloadOffsetMap.clear();
+ m_OpLogPayloads.clear();
+ m_LsnToPayloadOffsetMap.reset();
m_Storage = {};
if (PrepareDirectoryDelete(m_BasePath, OutRemoveDirectory))
{
@@ -1430,6 +1513,8 @@ ProjectStore::Oplog::Read()
RemoveFile(m_MetaPath, DummyEc);
}
+ ZEN_ASSERT(!m_LsnToPayloadOffsetMap);
+
ReadIndexSnapshot();
if (m_Mode == EMode::kFull)
@@ -1459,12 +1544,14 @@ ProjectStore::Oplog::Read()
m_MetaMap.insert_or_assign(Meta.Id, Meta.Hash);
}
- m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize});
- m_LatestOpMap[OpEntry.OpKeyHash] = OpEntry.OpLsn;
+ const PayloadIndex PayloadOffset(m_OpLogPayloads.size());
+
+ m_OpToPayloadOffsetMap.insert_or_assign(OpEntry.OpKeyHash, PayloadOffset);
+ m_OpLogPayloads.push_back({.Lsn = OpEntry.OpLsn, .Address = OpEntry.OpCoreAddress});
},
m_LogFlushPosition);
- if (m_Storage->LogCount() != m_LogFlushPosition)
+ if (m_Storage->LogCount() != m_LogFlushPosition || m_IsLegacySnapshot)
{
WriteIndexSnapshot();
}
@@ -1476,8 +1563,10 @@ ProjectStore::Oplog::Read()
m_Storage->ReadOplogEntriesFromLog(OpLogEntries, InvalidEntries, m_LogFlushPosition);
for (const OplogEntry& OpEntry : OpLogEntries)
{
- m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize});
- m_LatestOpMap[OpEntry.OpKeyHash] = OpEntry.OpLsn;
+ const PayloadIndex PayloadOffset(m_OpLogPayloads.size());
+
+ m_OpToPayloadOffsetMap.insert_or_assign(OpEntry.OpKeyHash, PayloadOffset);
+ m_OpLogPayloads.push_back({.Lsn = OpEntry.OpLsn, .Address = OpEntry.OpCoreAddress});
}
}
}
@@ -1536,8 +1625,9 @@ ProjectStore::Oplog::Reset()
m_ChunkMap.clear();
m_MetaMap.clear();
m_FileMap.clear();
- m_OpAddressMap.clear();
- m_LatestOpMap.clear();
+ m_OpToPayloadOffsetMap.clear();
+ m_OpLogPayloads.clear();
+ m_LsnToPayloadOffsetMap.reset();
m_Storage = new OplogStorage(this, m_BasePath);
m_Storage->Open(OplogStorage::EMode::Create);
m_MetaValid = false;
@@ -1628,7 +1718,7 @@ ProjectStore::Oplog::Validate(const std::filesystem::path& ProjectRootDir,
Keys.reserve(OpCount);
Mappings.reserve(OpCount);
- IterateOplogWithKey([&](uint32_t LSN, const Oid& Key, CbObjectView OpView) {
+ IterateOplogWithKey([&](LogSequenceNumber LSN, const Oid& Key, CbObjectView OpView) {
Result.LSNLow = Min(Result.LSNLow, LSN);
Result.LSNHigh = Max(Result.LSNHigh, LSN);
KeyHashes.push_back(Key);
@@ -1774,34 +1864,21 @@ ProjectStore::Oplog::WriteIndexSnapshot()
const fs::path IndexPath = m_BasePath / "ops.zidx";
try
{
- // Write the current state of the location map to a new index state
- std::vector<uint32_t> LSNEntries;
- std::vector<Oid> Keys;
- std::vector<OplogEntryAddress> AddressMapEntries;
- std::vector<uint32_t> LatestOpMapEntries;
- std::vector<IoHash> ChunkMapEntries;
- std::vector<IoHash> MetaMapEntries;
- std::vector<uint32_t> FilePathLengths;
- std::vector<std::string> FilePaths;
- uint64_t IndexLogPosition = 0;
+ std::vector<Oid> Keys;
+ std::vector<PayloadIndex> OpPayloadOffsets;
+ std::vector<IoHash> ChunkMapEntries;
+ std::vector<IoHash> MetaMapEntries;
+ std::vector<uint32_t> FilePathLengths;
+ std::vector<std::string> FilePaths;
+ uint64_t IndexLogPosition = 0;
{
IndexLogPosition = m_Storage->LogCount();
-
- Keys.reserve(m_LatestOpMap.size() + m_ChunkMap.size() + m_MetaMap.size() + m_FileMap.size());
-
- AddressMapEntries.reserve(m_OpAddressMap.size());
- LSNEntries.reserve(m_OpAddressMap.size());
- for (const auto& It : m_OpAddressMap)
+ Keys.reserve(m_OpToPayloadOffsetMap.size() + m_ChunkMap.size() + m_MetaMap.size() + m_FileMap.size());
+ OpPayloadOffsets.reserve(m_OpToPayloadOffsetMap.size());
+ for (const auto& Kv : m_OpToPayloadOffsetMap)
{
- LSNEntries.push_back(It.first);
- AddressMapEntries.push_back(It.second);
- }
-
- LatestOpMapEntries.reserve(m_LatestOpMap.size());
- for (const auto& It : m_LatestOpMap)
- {
- Keys.push_back(It.first);
- LatestOpMapEntries.push_back(It.second);
+ Keys.push_back(Kv.first);
+ OpPayloadOffsets.push_back(Kv.second);
}
ChunkMapEntries.reserve(m_ChunkMap.size());
@@ -1841,14 +1918,16 @@ ProjectStore::Oplog::WriteIndexSnapshot()
{
BasicFileWriter IndexFile(ObjectIndexFile, 65536u);
- OplogIndexHeader Header = {.LogPosition = IndexLogPosition,
- .LSNCount = gsl::narrow<uint32_t>(LSNEntries.size()),
- .KeyCount = gsl::narrow<uint64_t>(Keys.size()),
- .OpAddressMapCount = gsl::narrow<uint32_t>(AddressMapEntries.size()),
- .LatestOpMapCount = gsl::narrow<uint32_t>(LatestOpMapEntries.size()),
- .ChunkMapCount = gsl::narrow<uint64_t>(ChunkMapEntries.size()),
- .MetaMapCount = gsl::narrow<uint64_t>(MetaMapEntries.size()),
- .FileMapCount = gsl::narrow<uint64_t>(FilePathLengths.size() / 2)};
+ OplogIndexHeader Header;
+ Header.V2 = {.LogPosition = IndexLogPosition,
+ .KeyCount = gsl::narrow<uint64_t>(Keys.size()),
+ .OpPayloadIndexCount = gsl::narrow<uint32_t>(OpPayloadOffsets.size()),
+ .OpPayloadCount = gsl::narrow<uint32_t>(m_OpLogPayloads.size()),
+ .ChunkMapCount = gsl::narrow<uint32_t>(ChunkMapEntries.size()),
+ .MetaMapCount = gsl::narrow<uint32_t>(MetaMapEntries.size()),
+ .FileMapCount = gsl::narrow<uint32_t>(FilePathLengths.size() / 2),
+ .MaxLSN = m_Storage->MaxLSN(),
+ .NextOpCoreOffset = m_Storage->GetNextOpOffset()};
Header.Checksum = OplogIndexHeader::ComputeChecksum(Header);
@@ -1856,16 +1935,13 @@ ProjectStore::Oplog::WriteIndexSnapshot()
IndexFile.Write(&Header, sizeof(OplogIndexHeader), Offset);
Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment);
- IndexFile.Write(LSNEntries.data(), LSNEntries.size() * sizeof(uint32_t), Offset);
- Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment);
-
IndexFile.Write(Keys.data(), Keys.size() * sizeof(Oid), Offset);
Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment);
- IndexFile.Write(AddressMapEntries.data(), AddressMapEntries.size() * sizeof(OplogEntryAddress), Offset);
+ IndexFile.Write(OpPayloadOffsets.data(), OpPayloadOffsets.size() * sizeof(PayloadIndex), Offset);
Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment);
- IndexFile.Write(LatestOpMapEntries.data(), LatestOpMapEntries.size() * sizeof(uint32_t), Offset);
+ IndexFile.Write(m_OpLogPayloads.data(), m_OpLogPayloads.size() * sizeof(OplogPayload), Offset);
Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment);
IndexFile.Write(ChunkMapEntries.data(), ChunkMapEntries.size() * sizeof(IoHash), Offset);
@@ -1883,6 +1959,7 @@ ProjectStore::Oplog::WriteIndexSnapshot()
Offset += FilePath.length();
}
}
+
ObjectIndexFile.Flush();
ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec);
if (Ec)
@@ -1893,8 +1970,9 @@ ProjectStore::Oplog::WriteIndexSnapshot()
IndexPath,
Ec.message()));
}
- EntryCount = LSNEntries.size();
+ EntryCount = m_OpLogPayloads.size();
m_LogFlushPosition = IndexLogPosition;
+ m_IsLegacySnapshot = false;
}
catch (const std::exception& Err)
{
@@ -1935,12 +2013,8 @@ ProjectStore::Oplog::ReadIndexSnapshot()
Offset += sizeof(OplogIndexHeader);
Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
- if ((Header.Magic == OplogIndexHeader::ExpectedMagic) && (Header.Version == OplogIndexHeader::CurrentVersion) &&
- (Header.Checksum == OplogIndexHeader::ComputeChecksum(Header)))
+ if (Header.Magic == OplogIndexHeader::ExpectedMagic)
{
- uint32_t MaxLSN = 0;
- OplogEntryAddress LastOpAddress{.Offset = 0, .Size = 0};
-
uint32_t Checksum = OplogIndexHeader::ComputeChecksum(Header);
if (Header.Checksum != Checksum)
{
@@ -1953,117 +2027,229 @@ ProjectStore::Oplog::ReadIndexSnapshot()
return;
}
- if (Header.LatestOpMapCount + Header.ChunkMapCount + Header.MetaMapCount + Header.FileMapCount != Header.KeyCount)
+ if (Header.Version == OplogIndexHeader::Version1)
{
- ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'. Key count mismatch. Expected: {}, Found: {}",
- m_OuterProjectId,
- m_OplogId,
- IndexPath,
- Header.LatestOpMapCount + Header.ChunkMapCount + Header.MetaMapCount + Header.FileMapCount,
- Header.KeyCount);
- return;
- }
+ uint32_t MaxLSN = 0;
+ OplogEntryAddress LastOpAddress{.Offset = 0, .Size = 0};
- std::vector<uint32_t> LSNEntries(Header.LSNCount);
- ObjectIndexFile.Read(LSNEntries.data(), LSNEntries.size() * sizeof(uint32_t), Offset);
- Offset += LSNEntries.size() * sizeof(uint32_t);
- Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
- size_t LSNOffset = 0;
+ if (Header.V1.LatestOpMapCount + Header.V1.ChunkMapCount + Header.V1.MetaMapCount + Header.V1.FileMapCount !=
+ Header.V1.KeyCount)
+ {
+ ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'. Key count mismatch. Expected: {}, Found: {}",
+ m_OuterProjectId,
+ m_OplogId,
+ IndexPath,
+ Header.V1.LatestOpMapCount + Header.V1.ChunkMapCount + Header.V1.MetaMapCount + Header.V1.FileMapCount,
+ Header.V1.KeyCount);
+ return;
+ }
- std::vector<Oid> Keys(Header.KeyCount);
- ObjectIndexFile.Read(Keys.data(), Keys.size() * sizeof(Oid), Offset);
- Offset += Keys.size() * sizeof(Oid);
- Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
- size_t KeyOffset = 0;
+ std::vector<uint32_t> LSNEntries(Header.V1.LSNCount);
+ ObjectIndexFile.Read(LSNEntries.data(), LSNEntries.size() * sizeof(uint32_t), Offset);
+ Offset += LSNEntries.size() * sizeof(uint32_t);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+ size_t LSNOffset = 0;
+
+ std::vector<Oid> Keys(Header.V1.KeyCount);
+ ObjectIndexFile.Read(Keys.data(), Keys.size() * sizeof(Oid), Offset);
+ Offset += Keys.size() * sizeof(Oid);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+ size_t KeyOffset = 0;
- {
- std::vector<OplogEntryAddress> AddressMapEntries(Header.OpAddressMapCount);
- ObjectIndexFile.Read(AddressMapEntries.data(), AddressMapEntries.size() * sizeof(OplogEntryAddress), Offset);
- Offset += AddressMapEntries.size() * sizeof(OplogEntryAddress);
- Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
- m_OpAddressMap.reserve(AddressMapEntries.size());
- for (const OplogEntryAddress& Address : AddressMapEntries)
{
- m_OpAddressMap.insert_or_assign(LSNEntries[LSNOffset++], Address);
- if (Address.Offset > LastOpAddress.Offset)
+ std::vector<OplogEntryAddress> AddressMapEntries(Header.V1.OpAddressMapCount);
+ ObjectIndexFile.Read(AddressMapEntries.data(), AddressMapEntries.size() * sizeof(OplogEntryAddress), Offset);
+ Offset += AddressMapEntries.size() * sizeof(OplogEntryAddress);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+
+ tsl::robin_map<uint32_t, PayloadIndex> LsnToPayloadOffset;
+ LsnToPayloadOffset.reserve(AddressMapEntries.size());
+
+ m_OpLogPayloads.reserve(AddressMapEntries.size());
+ for (const OplogEntryAddress& Address : AddressMapEntries)
{
- LastOpAddress = Address;
+ const uint32_t LSN = LSNEntries[LSNOffset++];
+ LsnToPayloadOffset.insert_or_assign(LSN, PayloadIndex(m_OpLogPayloads.size()));
+
+ m_OpLogPayloads.push_back({.Lsn = LogSequenceNumber(LSN), .Address = Address});
+ if (Address.Offset > LastOpAddress.Offset)
+ {
+ LastOpAddress = Address;
+ }
+ MaxLSN = Max(MaxLSN, LSN);
+ }
+
+ {
+ std::vector<uint32_t> LatestOpMapEntries(Header.V1.LatestOpMapCount);
+ ObjectIndexFile.Read(LatestOpMapEntries.data(), LatestOpMapEntries.size() * sizeof(uint32_t), Offset);
+ Offset += LatestOpMapEntries.size() * sizeof(uint32_t);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+ m_OpToPayloadOffsetMap.reserve(LatestOpMapEntries.size());
+ for (uint32_t Lsn : LatestOpMapEntries)
+ {
+ if (auto It = LsnToPayloadOffset.find(Lsn); It != LsnToPayloadOffset.end())
+ {
+ m_OpToPayloadOffsetMap.insert_or_assign(Keys[KeyOffset++], It->second);
+ MaxLSN = Max(MaxLSN, Lsn);
+ }
+ }
}
}
- }
- {
- std::vector<uint32_t> LatestOpMapEntries(Header.LatestOpMapCount);
- ObjectIndexFile.Read(LatestOpMapEntries.data(), LatestOpMapEntries.size() * sizeof(uint32_t), Offset);
- Offset += LatestOpMapEntries.size() * sizeof(uint32_t);
- Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
- m_LatestOpMap.reserve(LatestOpMapEntries.size());
- for (uint32_t LSN : LatestOpMapEntries)
+ if (m_Mode == EMode::kFull)
{
- if (!m_OpAddressMap.contains(LSN))
{
- throw std::runtime_error(fmt::format("LSN {} is no present in Op address map", LSN));
+ std::vector<IoHash> ChunkMapEntries(Header.V1.ChunkMapCount);
+ ObjectIndexFile.Read(ChunkMapEntries.data(), ChunkMapEntries.size() * sizeof(IoHash), Offset);
+ Offset += ChunkMapEntries.size() * sizeof(IoHash);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+ m_ChunkMap.reserve(ChunkMapEntries.size());
+ for (const IoHash& ChunkId : ChunkMapEntries)
+ {
+ m_ChunkMap.insert_or_assign(Keys[KeyOffset++], ChunkId);
+ }
+ }
+ {
+ std::vector<IoHash> MetaMapEntries(Header.V1.MetaMapCount);
+ ObjectIndexFile.Read(MetaMapEntries.data(), MetaMapEntries.size() * sizeof(IoHash), Offset);
+ Offset += MetaMapEntries.size() * sizeof(IoHash);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+ m_MetaMap.reserve(MetaMapEntries.size());
+ for (const IoHash& ChunkId : MetaMapEntries)
+ {
+ m_MetaMap.insert_or_assign(Keys[KeyOffset++], ChunkId);
+ }
+ }
+ {
+ m_FileMap.reserve(Header.V1.FileMapCount);
+
+ std::vector<uint32_t> FilePathLengths(Header.V1.FileMapCount * 2);
+ ObjectIndexFile.Read(FilePathLengths.data(), FilePathLengths.size() * sizeof(uint32_t), Offset);
+ Offset += FilePathLengths.size() * sizeof(uint32_t);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+
+ BasicFileBuffer IndexFile(ObjectIndexFile, 65536);
+ auto ReadString([&IndexFile, &Offset](uint32_t Length) -> std::string {
+ MemoryView StringData = IndexFile.MakeView(Length, Offset);
+ if (StringData.GetSize() != Length)
+ {
+ throw std::runtime_error(fmt::format("Invalid format. Expected to read {} bytes but got {}",
+ Length,
+ uint32_t(StringData.GetSize())));
+ }
+ Offset += Length;
+ return std::string((const char*)StringData.GetData(), Length);
+ });
+ for (uint64_t FileLengthOffset = 0; FileLengthOffset < FilePathLengths.size();)
+ {
+ std::string ServerPath = ReadString(FilePathLengths[FileLengthOffset++]);
+ std::string ClientPath = ReadString(FilePathLengths[FileLengthOffset++]);
+ m_FileMap.insert_or_assign(
+ Keys[KeyOffset++],
+ FileMapEntry{.ServerPath = std::move(ServerPath), .ClientPath = std::move(ClientPath)});
+ }
}
- m_LatestOpMap.insert_or_assign(Keys[KeyOffset++], LSN);
- MaxLSN = Max(MaxLSN, LSN);
}
+ m_LogFlushPosition = Header.V1.LogPosition;
+ m_Storage->SetMaxLSNAndNextWriteAddress(MaxLSN, LastOpAddress);
+ EntryCount = Header.V1.LSNCount;
+ m_IsLegacySnapshot = true;
}
- if (m_Mode == EMode::kFull)
+ else if (Header.Version == OplogIndexHeader::Version2)
{
+ std::vector<Oid> Keys(Header.V2.KeyCount);
+
+ ObjectIndexFile.Read(Keys.data(), Keys.size() * sizeof(Oid), Offset);
+ Offset += Keys.size() * sizeof(Oid);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+
+ std::vector<PayloadIndex> OpPayloadOffsets(Header.V2.OpPayloadIndexCount);
+ ObjectIndexFile.Read(OpPayloadOffsets.data(), OpPayloadOffsets.size() * sizeof(PayloadIndex), Offset);
+ Offset += OpPayloadOffsets.size() * sizeof(PayloadIndex);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+
+ m_OpLogPayloads.resize(Header.V2.OpPayloadCount);
+ ObjectIndexFile.Read(m_OpLogPayloads.data(), m_OpLogPayloads.size() * sizeof(OplogPayload), Offset);
+ Offset += m_OpLogPayloads.size() * sizeof(OplogPayload);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+
+ size_t KeyOffset = 0;
+
+ m_OpToPayloadOffsetMap.reserve(Header.V2.OpPayloadIndexCount);
+ for (const PayloadIndex PayloadOffset : OpPayloadOffsets)
{
- std::vector<IoHash> ChunkMapEntries(Header.ChunkMapCount);
- ObjectIndexFile.Read(ChunkMapEntries.data(), ChunkMapEntries.size() * sizeof(IoHash), Offset);
- Offset += ChunkMapEntries.size() * sizeof(IoHash);
- Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
- m_ChunkMap.reserve(ChunkMapEntries.size());
- for (const IoHash& ChunkId : ChunkMapEntries)
- {
- m_ChunkMap.insert_or_assign(Keys[KeyOffset++], ChunkId);
- }
+ const Oid& Key = Keys[KeyOffset++];
+ ZEN_ASSERT_SLOW(PayloadOffset.Index < Header.V2.OpPayloadCount);
+ m_OpToPayloadOffsetMap.insert({Key, PayloadOffset});
}
+
+ if (m_Mode == EMode::kFull)
{
- std::vector<IoHash> MetaMapEntries(Header.MetaMapCount);
- ObjectIndexFile.Read(MetaMapEntries.data(), MetaMapEntries.size() * sizeof(IoHash), Offset);
- Offset += MetaMapEntries.size() * sizeof(IoHash);
- Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
- m_MetaMap.reserve(MetaMapEntries.size());
- for (const IoHash& ChunkId : MetaMapEntries)
{
- m_MetaMap.insert_or_assign(Keys[KeyOffset++], ChunkId);
+ std::vector<IoHash> ChunkMapEntries(Header.V2.ChunkMapCount);
+ ObjectIndexFile.Read(ChunkMapEntries.data(), ChunkMapEntries.size() * sizeof(IoHash), Offset);
+ Offset += ChunkMapEntries.size() * sizeof(IoHash);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+ m_ChunkMap.reserve(ChunkMapEntries.size());
+ for (const IoHash& ChunkId : ChunkMapEntries)
+ {
+ m_ChunkMap.insert_or_assign(Keys[KeyOffset++], ChunkId);
+ }
}
- }
- {
- m_FileMap.reserve(Header.FileMapCount);
-
- std::vector<uint32_t> FilePathLengths(Header.FileMapCount * 2);
- ObjectIndexFile.Read(FilePathLengths.data(), FilePathLengths.size() * sizeof(uint32_t), Offset);
- Offset += FilePathLengths.size() * sizeof(uint32_t);
- Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
-
- BasicFileBuffer IndexFile(ObjectIndexFile, 65536);
- auto ReadString([&IndexFile, &Offset](uint32_t Length) -> std::string {
- MemoryView StringData = IndexFile.MakeView(Length, Offset);
- if (StringData.GetSize() != Length)
- {
- throw std::runtime_error(fmt::format("Invalid format. Expected to read {} bytes but got {}",
- Length,
- uint32_t(StringData.GetSize())));
- }
- Offset += Length;
- return std::string((const char*)StringData.GetData(), Length);
- });
- for (uint64_t FileLengthOffset = 0; FileLengthOffset < FilePathLengths.size();)
{
- std::string ServerPath = ReadString(FilePathLengths[FileLengthOffset++]);
- std::string ClientPath = ReadString(FilePathLengths[FileLengthOffset++]);
- m_FileMap.insert_or_assign(
- Keys[KeyOffset++],
- FileMapEntry{.ServerPath = std::move(ServerPath), .ClientPath = std::move(ClientPath)});
+ std::vector<IoHash> MetaMapEntries(Header.V2.MetaMapCount);
+ ObjectIndexFile.Read(MetaMapEntries.data(), MetaMapEntries.size() * sizeof(IoHash), Offset);
+ Offset += MetaMapEntries.size() * sizeof(IoHash);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+ m_MetaMap.reserve(MetaMapEntries.size());
+ for (const IoHash& ChunkId : MetaMapEntries)
+ {
+ m_MetaMap.insert_or_assign(Keys[KeyOffset++], ChunkId);
+ }
+ }
+ {
+ m_FileMap.reserve(Header.V2.FileMapCount);
+
+ std::vector<uint32_t> FilePathLengths(Header.V2.FileMapCount * 2);
+ ObjectIndexFile.Read(FilePathLengths.data(), FilePathLengths.size() * sizeof(uint32_t), Offset);
+ Offset += FilePathLengths.size() * sizeof(uint32_t);
+ Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment);
+
+ BasicFileBuffer IndexFile(ObjectIndexFile, 65536);
+ auto ReadString([&IndexFile, &Offset](uint32_t Length) -> std::string {
+ MemoryView StringData = IndexFile.MakeView(Length, Offset);
+ if (StringData.GetSize() != Length)
+ {
+ throw std::runtime_error(fmt::format("Invalid format. Expected to read {} bytes but got {}",
+ Length,
+ uint32_t(StringData.GetSize())));
+ }
+ Offset += Length;
+ return std::string((const char*)StringData.GetData(), Length);
+ });
+ for (uint64_t FileLengthOffset = 0; FileLengthOffset < FilePathLengths.size();)
+ {
+ std::string ServerPath = ReadString(FilePathLengths[FileLengthOffset++]);
+ std::string ClientPath = ReadString(FilePathLengths[FileLengthOffset++]);
+ m_FileMap.insert_or_assign(
+ Keys[KeyOffset++],
+ FileMapEntry{.ServerPath = std::move(ServerPath), .ClientPath = std::move(ClientPath)});
+ }
}
}
+ m_LogFlushPosition = Header.V2.LogPosition;
+ m_Storage->SetMaxLSNAndNextOpOffset(Header.V2.MaxLSN, Header.V2.NextOpCoreOffset);
+ EntryCount = Header.V2.OpPayloadCount;
+ m_IsLegacySnapshot = false;
+ }
+ else
+ {
+ ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'. Unsupported version number {}",
+ m_OuterProjectId,
+ m_OplogId,
+ IndexPath,
+ Header.Version);
+ return;
}
- m_LogFlushPosition = Header.LogPosition;
- m_Storage->SetMaxLSNAndNextWriteAddress(MaxLSN, LastOpAddress);
- EntryCount = Header.LSNCount;
}
else
{
@@ -2073,8 +2259,8 @@ ProjectStore::Oplog::ReadIndexSnapshot()
}
catch (const std::exception& Ex)
{
- m_OpAddressMap.clear();
- m_LatestOpMap.clear();
+ m_OpToPayloadOffsetMap.clear();
+ m_OpLogPayloads.clear();
m_ChunkMap.clear();
m_MetaMap.clear();
m_FileMap.clear();
@@ -2106,19 +2292,7 @@ ProjectStore::Oplog::GetUnusedSpacePercentLocked() const
return 0;
}
- std::vector<OplogEntryAddress> Addresses;
- {
- Addresses.reserve(m_LatestOpMap.size());
- for (auto It : m_LatestOpMap)
- {
- if (auto AddressIt = m_OpAddressMap.find(It.second); AddressIt != m_OpAddressMap.end())
- {
- Addresses.push_back(AddressIt->second);
- }
- }
- }
-
- const uint64_t EffectiveBlobsSize = m_Storage->GetEffectiveBlobsSize(std::move(Addresses));
+ const uint64_t EffectiveBlobsSize = m_Storage->GetEffectiveBlobsSize(m_OpLogPayloads);
if (EffectiveBlobsSize < ActualBlobsSize)
{
@@ -2144,31 +2318,36 @@ ProjectStore::Oplog::Compact(RwLock::ExclusiveLockScope&, bool DryRun, bool Reta
Stopwatch Timer;
- std::vector<uint32_t> LSNs;
- LSNs.reserve(m_LatestOpMap.size());
- for (auto It : m_LatestOpMap)
+ std::vector<LogSequenceNumber> LatestLSNs;
+ LatestLSNs.reserve(m_OpLogPayloads.size());
+ for (const auto& Kv : m_OpToPayloadOffsetMap)
{
- LSNs.push_back(It.second);
+ const OplogPayload& OpPayload = m_OpLogPayloads[Kv.second];
+ LatestLSNs.push_back(OpPayload.Lsn);
}
- tsl::robin_map<uint32_t, OplogEntryAddress> OpAddressMap; // Index LSN -> op data in ops blob file
- OidMap<uint32_t> LatestOpMap; // op key -> latest op LSN for key
+ std::vector<OplogPayload> OpPayloads;
+ OpPayloads.reserve(LatestLSNs.size());
+
+ OidMap<PayloadIndex> OpToPayloadOffsetMap;
+ OpToPayloadOffsetMap.reserve(LatestLSNs.size());
uint64_t PreSize = TotalSize();
m_Storage->Compact(
- LSNs,
- [&](const Oid& OpKeyHash, uint32_t, uint32_t NewLSN, const OplogEntryAddress& NewAddress) {
- LatestOpMap.insert_or_assign(OpKeyHash, NewLSN);
- OpAddressMap.insert_or_assign(NewLSN, NewAddress);
+ LatestLSNs,
+ [&](const Oid& OpKeyHash, LogSequenceNumber, LogSequenceNumber NewLsn, const OplogEntryAddress& NewAddress) {
+ OpToPayloadOffsetMap.insert({OpKeyHash, PayloadIndex(OpPayloads.size())});
+ OpPayloads.push_back({.Lsn = NewLsn, .Address = NewAddress});
},
RetainLSNs,
/*DryRun*/ DryRun);
if (!DryRun)
{
- m_OpAddressMap.swap(OpAddressMap);
- m_LatestOpMap.swap(LatestOpMap);
+ m_OpToPayloadOffsetMap.swap(OpToPayloadOffsetMap);
+ m_OpLogPayloads.swap(OpPayloads);
+ m_LsnToPayloadOffsetMap.reset();
WriteIndexSnapshot();
}
@@ -2498,15 +2677,44 @@ ProjectStore::Oplog::IterateOplog(std::function<void(CbObjectView)>&& Handler, c
IterateOplogLocked(std::move(Handler), EntryPaging);
}
-template<typename ContainerElement>
-std::span<ContainerElement>
-CreateSpanFromPaging(std::vector<ContainerElement>& Container, const ProjectStore::Oplog::Paging& Paging)
+std::vector<ProjectStore::Oplog::PayloadIndex>
+ProjectStore::Oplog::GetSortedOpPayloadRangeLocked(const Paging& EntryPaging,
+ tsl::robin_map<PayloadIndex, Oid, PayloadIndex::Hasher>* OutOptionalReverseKeyMap)
{
- std::span<ContainerElement> Span(Container);
- int32_t Size = int32_t(Container.size());
- int32_t Start = std::clamp(Paging.Start, 0, Size);
- int32_t End = Paging.Count < 0 ? Size : (Start + std::min<int32_t>(Paging.Count, Size - Start));
- return Span.subspan(Start, End - Start);
+ std::pair<int32_t, int32_t> StartAndEnd = GetPagedRange(int32_t(m_OpToPayloadOffsetMap.size()), EntryPaging);
+ if (StartAndEnd.first == StartAndEnd.second)
+ {
+ return {};
+ }
+
+ const int32_t ReplayCount = StartAndEnd.second - StartAndEnd.first;
+
+ auto Start = m_OpToPayloadOffsetMap.cbegin();
+ std::advance(Start, StartAndEnd.first);
+
+ auto End = Start;
+ std::advance(End, ReplayCount);
+
+ std::vector<PayloadIndex> ReplayOrder;
+ ReplayOrder.reserve(ReplayCount);
+
+ for (auto It = Start; It != End; It++)
+ {
+ const PayloadIndex PayloadOffset = It->second;
+ if (OutOptionalReverseKeyMap != nullptr)
+ {
+ OutOptionalReverseKeyMap->insert_or_assign(PayloadOffset, It->first);
+ }
+ ReplayOrder.push_back(PayloadOffset);
+ }
+
+ std::sort(ReplayOrder.begin(), ReplayOrder.end(), [this](const PayloadIndex Lhs, const PayloadIndex Rhs) {
+ const OplogEntryAddress& LhsEntry = m_OpLogPayloads[Lhs].Address;
+ const OplogEntryAddress& RhsEntry = m_OpLogPayloads[Rhs].Address;
+ return LhsEntry.Offset < RhsEntry.Offset;
+ });
+
+ return ReplayOrder;
}
void
@@ -2519,23 +2727,44 @@ ProjectStore::Oplog::IterateOplogLocked(std::function<void(CbObjectView)>&& Hand
return;
}
- std::vector<OplogEntryAddress> Entries;
- Entries.reserve(m_LatestOpMap.size());
-
- for (const auto& Kv : m_LatestOpMap)
+ std::vector<PayloadIndex> ReplayOrder = GetSortedOpPayloadRangeLocked(EntryPaging, nullptr);
+ if (!ReplayOrder.empty())
{
- const auto AddressEntry = m_OpAddressMap.find(Kv.second);
- ZEN_ASSERT(AddressEntry != m_OpAddressMap.end());
-
- Entries.push_back(AddressEntry->second);
+ m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber, CbObjectView Op) { Handler(Op); });
}
+}
- std::sort(Entries.begin(), Entries.end(), [](const OplogEntryAddress& Lhs, const OplogEntryAddress& Rhs) {
- return Lhs.Offset < Rhs.Offset;
- });
+void
+ProjectStore::Oplog::IterateOplogWithKey(std::function<void(LogSequenceNumber, const Oid&, CbObjectView)>&& Handler)
+{
+ IterateOplogWithKey(std::move(Handler), Paging{});
+}
+
+void
+ProjectStore::Oplog::IterateOplogWithKey(std::function<void(LogSequenceNumber, const Oid&, CbObjectView)>&& Handler,
+ const Paging& EntryPaging)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
- std::span<OplogEntryAddress> EntrySpan = CreateSpanFromPaging(Entries, EntryPaging);
- m_Storage->ReplayLogEntries(EntrySpan, [&](CbObjectView Op) { Handler(Op); });
+ tsl::robin_map<PayloadIndex, Oid, PayloadIndex::Hasher> ReverseKeyMap;
+ std::vector<PayloadIndex> ReplayOrder;
+
+ {
+ RwLock::SharedLockScope _(m_OplogLock);
+ if (m_Storage)
+ {
+ ReplayOrder = GetSortedOpPayloadRangeLocked(EntryPaging, &ReverseKeyMap);
+ if (!ReplayOrder.empty())
+ {
+ uint32_t EntryIndex = 0;
+ m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber Lsn, CbObjectView Op) {
+ const PayloadIndex PayloadOffset = ReplayOrder[EntryIndex];
+ Handler(Lsn, ReverseKeyMap.at(PayloadOffset), Op);
+ EntryIndex++;
+ });
+ }
+ }
+ }
}
static constexpr uint32_t OplogMetaDataExpectedMagic = 0x6f'74'6d'62; // 'omta';
@@ -2625,80 +2854,10 @@ ProjectStore::Oplog::GetOplogEntryCount() const
{
return 0;
}
- return m_LatestOpMap.size();
-}
-
-void
-ProjectStore::Oplog::IterateOplogWithKey(std::function<void(uint32_t, const Oid&, CbObjectView)>&& Handler)
-{
- IterateOplogWithKey(std::move(Handler), Paging{});
+ return m_OpToPayloadOffsetMap.size();
}
-void
-ProjectStore::Oplog::IterateOplogWithKey(std::function<void(uint32_t, const Oid&, CbObjectView)>&& Handler, const Paging& EntryPaging)
-{
- ZEN_MEMSCOPE(GetProjectstoreTag());
- RwLock::SharedLockScope _(m_OplogLock);
- if (!m_Storage)
- {
- return;
- }
-
- std::vector<OplogEntryAddress> SortedEntries;
- std::vector<Oid> SortedKeys;
- std::vector<uint32_t> SortedLSNs;
-
- {
- const auto TargetEntryCount = m_LatestOpMap.size();
-
- std::vector<size_t> EntryIndexes;
- std::vector<OplogEntryAddress> Entries;
- std::vector<Oid> Keys;
- std::vector<uint32_t> LSNs;
-
- Entries.reserve(TargetEntryCount);
- EntryIndexes.reserve(TargetEntryCount);
- Keys.reserve(TargetEntryCount);
- LSNs.reserve(TargetEntryCount);
-
- for (const auto& Kv : m_LatestOpMap)
- {
- const auto AddressEntry = m_OpAddressMap.find(Kv.second);
- ZEN_ASSERT(AddressEntry != m_OpAddressMap.end());
-
- Entries.push_back(AddressEntry->second);
- Keys.push_back(Kv.first);
- LSNs.push_back(Kv.second);
- EntryIndexes.push_back(EntryIndexes.size());
- }
-
- std::sort(EntryIndexes.begin(), EntryIndexes.end(), [&Entries](const size_t& Lhs, const size_t& Rhs) {
- const OplogEntryAddress& LhsEntry = Entries[Lhs];
- const OplogEntryAddress& RhsEntry = Entries[Rhs];
- return LhsEntry.Offset < RhsEntry.Offset;
- });
-
- SortedEntries.reserve(EntryIndexes.size());
- SortedKeys.reserve(EntryIndexes.size());
- SortedLSNs.reserve(EntryIndexes.size());
-
- for (size_t Index : EntryIndexes)
- {
- SortedEntries.push_back(Entries[Index]);
- SortedKeys.push_back(Keys[Index]);
- SortedLSNs.push_back(LSNs[Index]);
- }
- }
-
- std::span<OplogEntryAddress> EntrySpan = CreateSpanFromPaging(SortedEntries, EntryPaging);
- size_t EntryIndex = EntrySpan.empty() ? 0 : static_cast<size_t>(&EntrySpan.front() - &SortedEntries.front());
- m_Storage->ReplayLogEntries(EntrySpan, [&](CbObjectView Op) {
- Handler(SortedLSNs[EntryIndex], SortedKeys[EntryIndex], Op);
- EntryIndex++;
- });
-}
-
-std::optional<uint32_t>
+ProjectStore::LogSequenceNumber
ProjectStore::Oplog::GetOpIndexByKey(const Oid& Key)
{
RwLock::SharedLockScope _(m_OplogLock);
@@ -2706,9 +2865,9 @@ ProjectStore::Oplog::GetOpIndexByKey(const Oid& Key)
{
return {};
}
- if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end())
+ if (const auto LatestOp = m_OpToPayloadOffsetMap.find(Key); LatestOp != m_OpToPayloadOffsetMap.end())
{
- return LatestOp->second;
+ return m_OpLogPayloads[LatestOp->second].Lsn;
}
return {};
}
@@ -2722,31 +2881,44 @@ ProjectStore::Oplog::GetOpByKey(const Oid& Key)
return {};
}
- if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end())
+ if (const auto LatestOp = m_OpToPayloadOffsetMap.find(Key); LatestOp != m_OpToPayloadOffsetMap.end())
{
- const auto AddressEntry = m_OpAddressMap.find(LatestOp->second);
- ZEN_ASSERT(AddressEntry != m_OpAddressMap.end());
-
- return m_Storage->GetOp(AddressEntry->second);
+ return m_Storage->GetOp(m_OpLogPayloads[LatestOp->second].Address);
}
return {};
}
std::optional<CbObject>
-ProjectStore::Oplog::GetOpByIndex(uint32_t Index)
+ProjectStore::Oplog::GetOpByIndex(ProjectStore::LogSequenceNumber Index)
{
- RwLock::SharedLockScope _(m_OplogLock);
+ {
+ RwLock::SharedLockScope _(m_OplogLock);
+ if (!m_Storage)
+ {
+ return {};
+ }
+
+ if (m_LsnToPayloadOffsetMap)
+ {
+ if (auto It = m_LsnToPayloadOffsetMap->find(Index); It != m_LsnToPayloadOffsetMap->end())
+ {
+ return m_Storage->GetOp(m_OpLogPayloads[It->second].Address);
+ }
+ }
+ }
+
+ RwLock::ExclusiveLockScope Lock(m_OplogLock);
if (!m_Storage)
{
return {};
}
- if (const auto AddressEntryIt = m_OpAddressMap.find(Index); AddressEntryIt != m_OpAddressMap.end())
+ RefreshLsnToPayloadOffsetMap(Lock);
+ if (auto It = m_LsnToPayloadOffsetMap->find(Index); It != m_LsnToPayloadOffsetMap->end())
{
- return m_Storage->GetOp(AddressEntryIt->second);
+ return m_Storage->GetOp(m_OpLogPayloads[It->second].Address);
}
-
return {};
}
@@ -2772,14 +2944,14 @@ ProjectStore::Oplog::EnableUpdateCapture()
m_OplogLock.WithExclusiveLock([&]() {
if (m_UpdateCaptureRefCounter == 0)
{
- ZEN_ASSERT(!m_CapturedLSNs);
+ ZEN_ASSERT(!m_CapturedOps);
ZEN_ASSERT(!m_CapturedAttachments);
- m_CapturedLSNs = std::make_unique<std::vector<uint32_t>>();
+ m_CapturedOps = std::make_unique<std::vector<Oid>>();
m_CapturedAttachments = std::make_unique<std::vector<IoHash>>();
}
else
{
- ZEN_ASSERT(m_CapturedLSNs);
+ ZEN_ASSERT(m_CapturedOps);
ZEN_ASSERT(m_CapturedAttachments);
}
m_UpdateCaptureRefCounter++;
@@ -2792,34 +2964,36 @@ ProjectStore::Oplog::DisableUpdateCapture()
ZEN_MEMSCOPE(GetProjectstoreTag());
m_OplogLock.WithExclusiveLock([&]() {
- ZEN_ASSERT(m_CapturedLSNs);
+ ZEN_ASSERT(m_CapturedOps);
ZEN_ASSERT(m_CapturedAttachments);
ZEN_ASSERT(m_UpdateCaptureRefCounter > 0);
m_UpdateCaptureRefCounter--;
if (m_UpdateCaptureRefCounter == 0)
{
- m_CapturedLSNs.reset();
+ m_CapturedOps.reset();
m_CapturedAttachments.reset();
}
});
}
void
-ProjectStore::Oplog::IterateCapturedLSNsLocked(std::function<bool(const CbObjectView& UpdateOp)>&& Callback)
+ProjectStore::Oplog::IterateCapturedOpsLocked(
+ std::function<bool(const Oid& Key, ProjectStore::LogSequenceNumber LSN, const CbObjectView& UpdateOp)>&& Callback)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
- if (m_CapturedLSNs)
+ if (m_CapturedOps)
{
if (!m_Storage)
{
return;
}
- for (uint32_t UpdatedLSN : *m_CapturedLSNs)
+ for (const Oid& OpKey : *m_CapturedOps)
{
- if (const auto AddressEntryIt = m_OpAddressMap.find(UpdatedLSN); AddressEntryIt != m_OpAddressMap.end())
+ if (const auto AddressEntryIt = m_OpToPayloadOffsetMap.find(OpKey); AddressEntryIt != m_OpToPayloadOffsetMap.end())
{
- Callback(m_Storage->GetOp(AddressEntryIt->second));
+ const OplogPayload& OpPayload = m_OpLogPayloads[AddressEntryIt->second];
+ Callback(OpKey, OpPayload.Lsn, m_Storage->GetOp(OpPayload.Address));
}
}
}
@@ -3045,7 +3219,7 @@ ProjectStore::Oplog::GetMapping(CbObjectView Core)
return Result;
}
-uint32_t
+ProjectStore::LogSequenceNumber
ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock,
const OplogEntryMapping& OpMapping,
const OplogEntry& OpEntry)
@@ -3072,12 +3246,18 @@ ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock,
AddMetaMapping(OplogLock, Meta.Id, Meta.Hash);
}
- m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize});
- m_LatestOpMap[OpEntry.OpKeyHash] = OpEntry.OpLsn;
+ const PayloadIndex PayloadOffset(m_OpLogPayloads.size());
+ m_OpToPayloadOffsetMap.insert_or_assign(OpEntry.OpKeyHash, PayloadOffset);
+ m_OpLogPayloads.push_back({.Lsn = OpEntry.OpLsn, .Address = OpEntry.OpCoreAddress});
+
+ if (m_LsnToPayloadOffsetMap)
+ {
+ m_LsnToPayloadOffsetMap->insert_or_assign(OpEntry.OpLsn, PayloadOffset);
+ }
return OpEntry.OpLsn;
}
-uint32_t
+ProjectStore::LogSequenceNumber
ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
{
ZEN_ASSERT(m_Mode == EMode::kFull);
@@ -3085,9 +3265,9 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
ZEN_MEMSCOPE(GetProjectstoreTag());
ZEN_TRACE_CPU("Store::Oplog::AppendNewOplogEntry");
- const CbObject& Core = OpPackage.GetObject();
- const uint32_t EntryId = AppendNewOplogEntry(Core);
- if (EntryId == 0xffffffffu)
+ const CbObject& Core = OpPackage.GetObject();
+ const ProjectStore::LogSequenceNumber EntryId = AppendNewOplogEntry(Core);
+ if (!EntryId)
{
// The oplog has been deleted so just drop this
return EntryId;
@@ -3132,7 +3312,7 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
}
}
- ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes));
+ ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId.Number, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes));
return EntryId;
}
@@ -3149,7 +3329,7 @@ ProjectStore::Oplog::GetStorage()
return Storage;
}
-uint32_t
+ProjectStore::LogSequenceNumber
ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core)
{
ZEN_ASSERT(m_Mode == EMode::kFull);
@@ -3162,7 +3342,7 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core)
RefPtr<OplogStorage> Storage = GetStorage();
if (!Storage)
{
- return 0xffffffffu;
+ return {};
}
OplogEntryMapping Mapping = GetMapping(Core);
@@ -3170,18 +3350,18 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core)
const OplogEntry OpEntry = Storage->AppendOp(OpData);
- RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
- const uint32_t EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry);
- if (m_CapturedLSNs)
+ RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
+ const ProjectStore::LogSequenceNumber EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry);
+ if (m_CapturedOps)
{
- m_CapturedLSNs->push_back(EntryId);
+ m_CapturedOps->push_back(OpData.KeyHash);
}
m_MetaValid = false;
return EntryId;
}
-std::vector<uint32_t>
+std::vector<ProjectStore::LogSequenceNumber>
ProjectStore::Oplog::AppendNewOplogEntries(std::span<CbObjectView> Cores)
{
ZEN_ASSERT(m_Mode == EMode::kFull);
@@ -3194,7 +3374,7 @@ ProjectStore::Oplog::AppendNewOplogEntries(std::span<CbObjectView> Cores)
RefPtr<OplogStorage> Storage = GetStorage();
if (!Storage)
{
- return std::vector<uint32_t>(Cores.size(), 0xffffffffu);
+ return std::vector<ProjectStore::LogSequenceNumber>(Cores.size(), LogSequenceNumber{});
}
size_t OpCount = Cores.size();
@@ -3212,21 +3392,21 @@ ProjectStore::Oplog::AppendNewOplogEntries(std::span<CbObjectView> Cores)
std::vector<OplogEntry> OpEntries = Storage->AppendOps(OpDatas);
- std::vector<uint32_t> EntryIds;
+ std::vector<ProjectStore::LogSequenceNumber> EntryIds;
EntryIds.resize(OpCount);
{
{
RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
- if (m_CapturedLSNs)
+ if (m_CapturedOps)
{
- m_CapturedLSNs->reserve(m_CapturedLSNs->size() + OpCount);
+ m_CapturedOps->reserve(m_CapturedOps->size() + OpCount);
}
for (size_t OpIndex = 0; OpIndex < OpCount; OpIndex++)
{
EntryIds[OpIndex] = RegisterOplogEntry(OplogLock, Mappings[OpIndex], OpEntries[OpIndex]);
- if (m_CapturedLSNs)
+ if (m_CapturedOps)
{
- m_CapturedLSNs->push_back(EntryIds[OpIndex]);
+ m_CapturedOps->push_back(OpDatas[OpIndex].KeyHash);
}
}
}
@@ -5917,11 +6097,11 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
for (CbObject& NewOp : NewOps)
{
- uint32_t NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp));
+ ProjectStore::LogSequenceNumber NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp));
- ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn);
+ ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn.Number);
- ResponseObj.AddInteger(NewLsn);
+ ResponseObj.AddInteger(NewLsn.Number);
}
ResponseObj.EndArray();
@@ -6678,7 +6858,8 @@ public:
if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end())
{
Ref<ProjectStore::Oplog> Oplog = It->second;
- Oplog->IterateCapturedLSNsLocked([&](const CbObjectView& UpdateOp) -> bool {
+ Oplog->IterateCapturedOpsLocked([&](const Oid& Key, ProjectStore::LogSequenceNumber LSN, const CbObjectView& UpdateOp) -> bool {
+ ZEN_UNUSED(Key, LSN);
UpdateOp.IterateAttachments([&](CbFieldView Visitor) { m_AddedReferences.emplace_back(Visitor.AsAttachment()); });
return true;
});
@@ -6884,8 +7065,8 @@ public:
m_OplogId,
NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
Result.OpCount,
- Result.LSNLow,
- Result.LSNHigh,
+ Result.LSNLow.Number,
+ Result.LSNHigh.Number,
Status);
});
Ref<ProjectStore::Oplog> Oplog;
@@ -8988,7 +9169,7 @@ TEST_CASE("project.store.iterateoplog")
}
};
auto IncrementCount = [&Count](CbObjectView /* Op */) { ++Count; };
- auto MarkFound = [&TestOids, &Count](uint32_t /* LSN */, const Oid& /* InId */, CbObjectView Op) {
+ auto MarkFound = [&TestOids, &Count](ProjectStore::LogSequenceNumber /* LSN */, const Oid& /* InId */, CbObjectView Op) {
for (TestOidData& TestOid : TestOids)
{
if (Op["key"sv].AsString() == TestOid.Key)