aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-02-11 13:58:08 +0100
committerGitHub Enterprise <[email protected]>2026-02-11 13:58:08 +0100
commit885612b5562b52cb15f0dbb6d046500ea8090779 (patch)
treeddb903ea486df0353251ee6efe023107511db3c7 /src/zenstore
parent5.7.20 (diff)
downloadzen-885612b5562b52cb15f0dbb6d046500ea8090779.tar.xz
zen-885612b5562b52cb15f0dbb6d046500ea8090779.zip
reduce lock time for project store gc precache and gc validate (#750)
* add oplog snapshot function to allow reduction of held oplog locks * release project lock when precaching each oplog
Diffstat (limited to 'src/zenstore')
-rw-r--r--src/zenstore/include/zenstore/projectstore.h10
-rw-r--r--src/zenstore/projectstore.cpp297
2 files changed, 261 insertions, 46 deletions
diff --git a/src/zenstore/include/zenstore/projectstore.h b/src/zenstore/include/zenstore/projectstore.h
index 09c3096ad..33ef996db 100644
--- a/src/zenstore/include/zenstore/projectstore.h
+++ b/src/zenstore/include/zenstore/projectstore.h
@@ -238,6 +238,16 @@ public:
std::atomic_bool& IsCancelledFlag,
WorkerThreadPool* OptionalWorkerPool);
+ struct OplogSnapshot
+ {
+ std::vector<CbObjectView> Ops;
+ std::vector<Oid> Keys;
+ std::vector<LogSequenceNumber> LSNs;
+ std::vector<IoBuffer> PayloadBuffers;
+ };
+
+ OplogSnapshot GetSnapshotLocked();
+
private:
struct FileMapEntry
{
diff --git a/src/zenstore/projectstore.cpp b/src/zenstore/projectstore.cpp
index e6c8d624a..1ab2b317a 100644
--- a/src/zenstore/projectstore.cpp
+++ b/src/zenstore/projectstore.cpp
@@ -22,6 +22,8 @@
#include "referencemetadata.h"
+#include <numeric>
+
ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_set.h>
#include <xxh3.h>
@@ -861,9 +863,9 @@ struct ProjectStore::OplogStorage : public RefCounted
}
}
- void ReplayLogEntries(const std::span<const Oplog::OplogPayload> Entries,
- const std::span<const Oplog::PayloadIndex> Order,
- std::function<void(LogSequenceNumber Lsn, const IoBuffer& Buffer)>&& Handler)
+ void ReplayLogEntries(const std::span<const Oplog::OplogPayload> Entries,
+ const std::span<const Oplog::PayloadIndex> Order,
+ std::function<void(LogSequenceNumber Lsn, IoBuffer&& Buffer)>&& Handler)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
ZEN_TRACE_CPU("Store::OplogStorage::ReplayLogEntries");
@@ -886,13 +888,13 @@ struct ProjectStore::OplogStorage : public RefCounted
if (OpBufferView.GetSize() == Entry.Address.Size)
{
IoBuffer Buffer = IoBuffer(IoBuffer::Wrap, OpBufferView.GetData(), OpBufferView.GetSize());
- Handler(Entry.Lsn, Buffer);
+ Handler(Entry.Lsn, std::move(Buffer));
}
else
{
IoBuffer OpBuffer(Entry.Address.Size);
OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset);
- Handler(Entry.Lsn, OpBuffer);
+ Handler(Entry.Lsn, std::move(OpBuffer));
}
}
}
@@ -1645,18 +1647,47 @@ ProjectStore::Oplog::Validate(const std::filesystem::path& ProjectRootDir,
Keys.reserve(OpCount);
Mappings.reserve(OpCount);
- IterateOplogWithKey([&](LogSequenceNumber LSN, const Oid& Key, CbObjectView OpView) {
- Result.LSNLow = Min(Result.LSNLow, LSN);
- Result.LSNHigh = Max(Result.LSNHigh, LSN);
- KeyHashes.push_back(Key);
- Keys.emplace_back(std::string(OpView["key"sv].AsString()));
+ {
+ Stopwatch SnapshotTimer;
+ RwLock::SharedLockScope OplogLock(m_OplogLock);
+ ProjectStore::Oplog::OplogSnapshot Snapshot = GetSnapshotLocked();
+ OplogLock.ReleaseNow();
- std::vector<IoHash> OpAttachments;
- OpView.IterateAttachments([&OpAttachments](CbFieldView Attachment) { OpAttachments.push_back(Attachment.AsAttachment()); });
- Attachments.emplace_back(std::move(OpAttachments));
+ uint64_t AllocatedSize = std::accumulate(Snapshot.PayloadBuffers.begin(),
+ Snapshot.PayloadBuffers.end(),
+ uint64_t(0),
+ [](uint64_t Current, const IoBuffer& Buffer) { return Current + Buffer.GetSize(); });
+ uint64_t UsedSize =
+ std::accumulate(Snapshot.Ops.begin(), Snapshot.Ops.end(), uint64_t(0), [](uint64_t Current, const CbObjectView& Object) {
+ return Current + Object.GetSize();
+ });
- Mappings.push_back(GetMapping(OpView));
- });
+ ZEN_INFO("Oplog snapshot fetched {} ops from {} op data using {} memory from oplog '{}/{}' in {}",
+ Snapshot.Ops.size(),
+ NiceBytes(UsedSize),
+ NiceBytes(AllocatedSize),
+ m_OuterProjectId,
+ m_OplogId,
+ NiceTimeSpanMs(SnapshotTimer.GetElapsedTimeMs()));
+
+ for (size_t Index = 0; Index < Snapshot.Ops.size(); Index++)
+ {
+ CbObjectView& OpView = Snapshot.Ops[Index];
+ LogSequenceNumber LSN = Snapshot.LSNs[Index];
+ const Oid& Key = Snapshot.Keys[Index];
+
+ Result.LSNLow = Min(Result.LSNLow, LSN);
+ Result.LSNHigh = Max(Result.LSNHigh, LSN);
+ KeyHashes.push_back(Key);
+ Keys.emplace_back(std::string(OpView["key"sv].AsString()));
+
+ std::vector<IoHash> OpAttachments;
+ OpView.IterateAttachments([&OpAttachments](CbFieldView Attachment) { OpAttachments.push_back(Attachment.AsAttachment()); });
+ Attachments.emplace_back(std::move(OpAttachments));
+
+ Mappings.push_back(GetMapping(OpView));
+ }
+ }
Result.OpCount = gsl::narrow<uint32_t>(Keys.size());
@@ -2644,6 +2675,104 @@ ProjectStore::Oplog::GetSortedOpPayloadRangeLocked(const Paging& Entry
return ReplayOrder;
}
+ProjectStore::Oplog::OplogSnapshot
+ProjectStore::Oplog::GetSnapshotLocked()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::Oplog::GetSnapshotLocked");
+ if (!m_Storage)
+ {
+ return {};
+ }
+
+ uint64_t WriteOffset = 0;
+ OplogSnapshot Snapshot;
+
+ const uint64_t PayloadBufferPageSize = 64u * 1024u;
+
+ size_t OpCount = GetOplogEntryCount();
+ Snapshot.Ops.reserve(OpCount);
+ Snapshot.Keys.reserve(OpCount);
+ Snapshot.LSNs.reserve(OpCount);
+
+ tsl::robin_map<PayloadIndex, Oid, PayloadIndex::Hasher> ReverseKeyMap;
+ std::vector<PayloadIndex> ReplayOrder = GetSortedOpPayloadRangeLocked(Paging{}, &ReverseKeyMap);
+ if (!ReplayOrder.empty())
+ {
+ uint32_t EntryIndex = 0;
+ m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber LSN, IoBuffer&& Buffer) {
+ const PayloadIndex PayloadOffset = ReplayOrder[EntryIndex];
+
+ Snapshot.Keys.push_back(ReverseKeyMap.at(PayloadOffset));
+ Snapshot.LSNs.push_back(LSN);
+
+ uint64_t Size = Buffer.GetSize();
+ if (Buffer.IsOwned())
+ {
+ CbObjectView CopyView(Buffer.GetData());
+ if (CopyView.GetSize() == Size)
+ {
+ Snapshot.Ops.emplace_back(std::move(CopyView));
+ }
+ else
+ {
+ Snapshot.Ops.emplace_back(CbObjectView{});
+ }
+ if (Snapshot.PayloadBuffers.empty())
+ {
+ Snapshot.PayloadBuffers.emplace_back(std::move(Buffer));
+ WriteOffset = Snapshot.PayloadBuffers.back().Size();
+ }
+ else
+ {
+ Snapshot.PayloadBuffers.insert(Snapshot.PayloadBuffers.end() - 1, std::move(Buffer));
+ }
+ }
+ else
+ {
+ uint64_t AvailableSize = Snapshot.PayloadBuffers.empty() ? 0 : Snapshot.PayloadBuffers.back().GetSize() - WriteOffset;
+ MutableMemoryView WriteBuffer;
+ if (Size > AvailableSize)
+ {
+ if (Size >= PayloadBufferPageSize && !Snapshot.PayloadBuffers.empty())
+ {
+ // Insert the large payload before the current payload buffer so we can continue to use that
+ IoBuffer PayloadBuffer(Size);
+ WriteBuffer = PayloadBuffer.GetMutableView();
+ Snapshot.PayloadBuffers.insert(Snapshot.PayloadBuffers.end() - 1, std::move(PayloadBuffer));
+ }
+ else
+ {
+ IoBuffer PayloadBuffer(Max(Size, PayloadBufferPageSize));
+ WriteBuffer = PayloadBuffer.GetMutableView().Mid(0, Size);
+ Snapshot.PayloadBuffers.emplace_back(std::move(PayloadBuffer));
+ WriteOffset = Size;
+ }
+ }
+ else
+ {
+ WriteBuffer = Snapshot.PayloadBuffers.back().GetMutableView().Mid(WriteOffset, Size);
+ WriteOffset += Size;
+ }
+ WriteBuffer.CopyFrom(Buffer.GetView());
+ CbObjectView CopyView(WriteBuffer.GetData());
+
+ if (CopyView.GetSize() == Size)
+ {
+ Snapshot.Ops.emplace_back(std::move(CopyView));
+ }
+ else
+ {
+ Snapshot.Ops.emplace_back(CbObjectView{});
+ }
+ }
+
+ EntryIndex++;
+ });
+ }
+ return Snapshot;
+}
+
void
ProjectStore::Oplog::IterateOplogLocked(std::function<void(CbObjectView)>&& Handler, const Paging& EntryPaging)
{
@@ -2710,7 +2839,7 @@ ProjectStore::Oplog::IterateOplogWithKeyRaw(std::function<void(LogSequenceNumber
if (!ReplayOrder.empty())
{
uint32_t EntryIndex = 0;
- m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber Lsn, const IoBuffer& Buffer) {
+ m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber Lsn, IoBuffer&& Buffer) {
const PayloadIndex PayloadOffset = ReplayOrder[EntryIndex];
Handler(Lsn, ReverseKeyMap.at(PayloadOffset), Buffer);
EntryIndex++;
@@ -6035,39 +6164,41 @@ public:
{
Ref<ProjectStore::Oplog> Oplog;
- RwLock::SharedLockScope __(m_Project->m_ProjectLock);
- if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end())
{
- Oplog = It->second;
- Oplog->EnableUpdateCapture();
- m_OplogHasUpdateCapture = true;
- }
- else if (ProjectStore::Oplog::ExistsAt(m_OplogBasePath))
- {
- Stopwatch OplogTimer;
- Oplog = new ProjectStore::Oplog(m_Project->Log(),
- m_Project->Identifier,
- m_OplogId,
- m_Project->m_CidStore,
- m_OplogBasePath,
- std::filesystem::path{},
- ProjectStore::Oplog::EMode::kBasicReadOnly);
- Oplog->Read();
- if (Ctx.Settings.Verbose)
+ RwLock::SharedLockScope __(m_Project->m_ProjectLock);
+ if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end())
{
- ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': read oplog '{}/{}' in {}",
- m_OplogBasePath,
- m_Project->Identifier,
- m_OplogId,
- NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs()));
+ Oplog = It->second;
+ Oplog->EnableUpdateCapture();
+ m_OplogHasUpdateCapture = true;
+ }
+ else if (ProjectStore::Oplog::ExistsAt(m_OplogBasePath))
+ {
+ Stopwatch OplogTimer;
+ Oplog = new ProjectStore::Oplog(m_Project->Log(),
+ m_Project->Identifier,
+ m_OplogId,
+ m_Project->m_CidStore,
+ m_OplogBasePath,
+ std::filesystem::path{},
+ ProjectStore::Oplog::EMode::kBasicReadOnly);
+ Oplog->Read();
+ if (Ctx.Settings.Verbose)
+ {
+ ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': read oplog '{}/{}' in {}",
+ m_OplogBasePath,
+ m_Project->Identifier,
+ m_OplogId,
+ NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs()));
+ }
+ }
+ else
+ {
+ return;
}
- }
- else
- {
- return;
}
- RwLock::SharedLockScope ___(Oplog->m_OplogLock);
+ RwLock::SharedLockScope OplogLock(Oplog->m_OplogLock);
if (Ctx.IsCancelledFlag)
{
return;
@@ -6083,7 +6214,45 @@ public:
}
}
- Oplog->GetAttachmentsLocked(m_References, Ctx.Settings.StoreProjectAttachmentMetaData);
+ if (Ctx.Settings.StoreProjectAttachmentMetaData)
+ {
+ Oplog->GetAttachmentsLocked(m_References, /*StoreMetaDataOnDisk*/ true);
+ }
+ else
+ {
+ Stopwatch SnapshotTimer;
+ ProjectStore::Oplog::OplogSnapshot Snapshot = Oplog->GetSnapshotLocked();
+ OplogLock.ReleaseNow();
+
+ uint64_t AllocatedSize =
+ std::accumulate(Snapshot.PayloadBuffers.begin(),
+ Snapshot.PayloadBuffers.end(),
+ uint64_t(0),
+ [](uint64_t Current, const IoBuffer& Buffer) { return Current + Buffer.GetSize(); });
+ uint64_t UsedSize =
+ std::accumulate(Snapshot.Ops.begin(),
+ Snapshot.Ops.end(),
+ uint64_t(0),
+ [](uint64_t Current, const CbObjectView& Object) { return Current + Object.GetSize(); });
+
+ ZEN_INFO(
+ "GCV2: projectstore [PRECACHE] '{}': Oplog snapshot fetched {} ops from {} op data using {} memory from oplog '{}/{}' "
+ "in {}",
+ m_OplogBasePath,
+ Snapshot.Ops.size(),
+ NiceBytes(UsedSize),
+ NiceBytes(AllocatedSize),
+ m_Project->Identifier,
+ m_OplogId,
+ NiceTimeSpanMs(SnapshotTimer.GetElapsedTimeMs()));
+
+ for (size_t Index = 0; Index < Snapshot.Ops.size(); Index++)
+ {
+ CbObjectView& Op = Snapshot.Ops[Index];
+ Op.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); });
+ }
+ }
+
m_OplogAccessTime = m_Project->LastOplogAccessTime(m_OplogId);
}
FilterReferences(Ctx, fmt::format("projectstore [PRECACHE] '{}'", m_OplogBasePath), m_References);
@@ -6147,7 +6316,43 @@ public:
OplogTimer.Reset();
- Oplog->GetAttachmentsLocked(m_AddedReferences, Ctx.Settings.StoreProjectAttachmentMetaData);
+ if (Ctx.Settings.StoreProjectAttachmentMetaData)
+ {
+ Oplog->GetAttachmentsLocked(m_References, /*StoreMetaDataOnDisk*/ true);
+ }
+ else
+ {
+ Stopwatch SnapshotTimer;
+ ProjectStore::Oplog::OplogSnapshot Snapshot = Oplog->GetSnapshotLocked();
+
+ uint64_t AllocatedSize =
+ std::accumulate(Snapshot.PayloadBuffers.begin(),
+ Snapshot.PayloadBuffers.end(),
+ uint64_t(0),
+ [](uint64_t Current, const IoBuffer& Buffer) { return Current + Buffer.GetSize(); });
+ uint64_t UsedSize =
+ std::accumulate(Snapshot.Ops.begin(),
+ Snapshot.Ops.end(),
+ uint64_t(0),
+ [](uint64_t Current, const CbObjectView& Object) { return Current + Object.GetSize(); });
+
+ ZEN_INFO(
+ "GCV2: projectstore [LOCKSTATE] '{}': Oplog snapshot fetched {} ops from {} op data using {} memory from oplog "
+ "'{}/{}' in {}",
+ m_OplogBasePath,
+ Snapshot.Ops.size(),
+ NiceBytes(UsedSize),
+ NiceBytes(AllocatedSize),
+ m_Project->Identifier,
+ m_OplogId,
+ NiceTimeSpanMs(SnapshotTimer.GetElapsedTimeMs()));
+
+ for (size_t Index = 0; Index < Snapshot.Ops.size(); Index++)
+ {
+ CbObjectView& Op = Snapshot.Ops[Index];
+ Op.IterateAttachments([&](CbFieldView Visitor) { m_AddedReferences.emplace_back(Visitor.AsAttachment()); });
+ }
+ }
}
if (Ctx.Settings.Verbose)
{