diff options
| author | Dan Engelbrecht <[email protected]> | 2026-02-11 13:58:08 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-02-11 13:58:08 +0100 |
| commit | 885612b5562b52cb15f0dbb6d046500ea8090779 (patch) | |
| tree | ddb903ea486df0353251ee6efe023107511db3c7 /src/zenstore | |
| parent | 5.7.20 (diff) | |
| download | zen-885612b5562b52cb15f0dbb6d046500ea8090779.tar.xz zen-885612b5562b52cb15f0dbb6d046500ea8090779.zip | |
reduce lock time for project store gc precache and gc validate (#750)
* add oplog snapshot function to allow reduction of held oplog locks
* release project lock when precaching each oplog
Diffstat (limited to 'src/zenstore')
| -rw-r--r-- | src/zenstore/include/zenstore/projectstore.h | 10 | ||||
| -rw-r--r-- | src/zenstore/projectstore.cpp | 297 |
2 files changed, 261 insertions, 46 deletions
diff --git a/src/zenstore/include/zenstore/projectstore.h b/src/zenstore/include/zenstore/projectstore.h index 09c3096ad..33ef996db 100644 --- a/src/zenstore/include/zenstore/projectstore.h +++ b/src/zenstore/include/zenstore/projectstore.h @@ -238,6 +238,16 @@ public: std::atomic_bool& IsCancelledFlag, WorkerThreadPool* OptionalWorkerPool); + struct OplogSnapshot + { + std::vector<CbObjectView> Ops; + std::vector<Oid> Keys; + std::vector<LogSequenceNumber> LSNs; + std::vector<IoBuffer> PayloadBuffers; + }; + + OplogSnapshot GetSnapshotLocked(); + private: struct FileMapEntry { diff --git a/src/zenstore/projectstore.cpp b/src/zenstore/projectstore.cpp index e6c8d624a..1ab2b317a 100644 --- a/src/zenstore/projectstore.cpp +++ b/src/zenstore/projectstore.cpp @@ -22,6 +22,8 @@ #include "referencemetadata.h" +#include <numeric> + ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_set.h> #include <xxh3.h> @@ -861,9 +863,9 @@ struct ProjectStore::OplogStorage : public RefCounted } } - void ReplayLogEntries(const std::span<const Oplog::OplogPayload> Entries, - const std::span<const Oplog::PayloadIndex> Order, - std::function<void(LogSequenceNumber Lsn, const IoBuffer& Buffer)>&& Handler) + void ReplayLogEntries(const std::span<const Oplog::OplogPayload> Entries, + const std::span<const Oplog::PayloadIndex> Order, + std::function<void(LogSequenceNumber Lsn, IoBuffer&& Buffer)>&& Handler) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::OplogStorage::ReplayLogEntries"); @@ -886,13 +888,13 @@ struct ProjectStore::OplogStorage : public RefCounted if (OpBufferView.GetSize() == Entry.Address.Size) { IoBuffer Buffer = IoBuffer(IoBuffer::Wrap, OpBufferView.GetData(), OpBufferView.GetSize()); - Handler(Entry.Lsn, Buffer); + Handler(Entry.Lsn, std::move(Buffer)); } else { IoBuffer OpBuffer(Entry.Address.Size); OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset); - Handler(Entry.Lsn, OpBuffer); + Handler(Entry.Lsn, std::move(OpBuffer)); } } } @@ -1645,18 +1647,47 @@ ProjectStore::Oplog::Validate(const std::filesystem::path& ProjectRootDir, Keys.reserve(OpCount); Mappings.reserve(OpCount); - IterateOplogWithKey([&](LogSequenceNumber LSN, const Oid& Key, CbObjectView OpView) { - Result.LSNLow = Min(Result.LSNLow, LSN); - Result.LSNHigh = Max(Result.LSNHigh, LSN); - KeyHashes.push_back(Key); - Keys.emplace_back(std::string(OpView["key"sv].AsString())); + { + Stopwatch SnapshotTimer; + RwLock::SharedLockScope OplogLock(m_OplogLock); + ProjectStore::Oplog::OplogSnapshot Snapshot = GetSnapshotLocked(); + OplogLock.ReleaseNow(); - std::vector<IoHash> OpAttachments; - OpView.IterateAttachments([&OpAttachments](CbFieldView Attachment) { OpAttachments.push_back(Attachment.AsAttachment()); }); - Attachments.emplace_back(std::move(OpAttachments)); + uint64_t AllocatedSize = std::accumulate(Snapshot.PayloadBuffers.begin(), + Snapshot.PayloadBuffers.end(), + uint64_t(0), + [](uint64_t Current, const IoBuffer& Buffer) { return Current + Buffer.GetSize(); }); + uint64_t UsedSize = + std::accumulate(Snapshot.Ops.begin(), Snapshot.Ops.end(), uint64_t(0), [](uint64_t Current, const CbObjectView& Object) { + return Current + Object.GetSize(); + }); - Mappings.push_back(GetMapping(OpView)); - }); + ZEN_INFO("Oplog snapshot fetched {} ops from {} op data using {} memory from oplog '{}/{}' in {}", + Snapshot.Ops.size(), + NiceBytes(UsedSize), + NiceBytes(AllocatedSize), + m_OuterProjectId, + m_OplogId, + NiceTimeSpanMs(SnapshotTimer.GetElapsedTimeMs())); + + for (size_t Index = 0; Index < Snapshot.Ops.size(); Index++) + { + CbObjectView& OpView = Snapshot.Ops[Index]; + LogSequenceNumber LSN = Snapshot.LSNs[Index]; + const Oid& Key = Snapshot.Keys[Index]; + + Result.LSNLow = Min(Result.LSNLow, LSN); + Result.LSNHigh = Max(Result.LSNHigh, LSN); + KeyHashes.push_back(Key); + Keys.emplace_back(std::string(OpView["key"sv].AsString())); + + std::vector<IoHash> OpAttachments; + OpView.IterateAttachments([&OpAttachments](CbFieldView Attachment) { OpAttachments.push_back(Attachment.AsAttachment()); }); + Attachments.emplace_back(std::move(OpAttachments)); + + Mappings.push_back(GetMapping(OpView)); + } + } Result.OpCount = gsl::narrow<uint32_t>(Keys.size()); @@ -2644,6 +2675,104 @@ ProjectStore::Oplog::GetSortedOpPayloadRangeLocked(const Paging& Entry return ReplayOrder; } +ProjectStore::Oplog::OplogSnapshot +ProjectStore::Oplog::GetSnapshotLocked() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::Oplog::GetSnapshotLocked"); + if (!m_Storage) + { + return {}; + } + + uint64_t WriteOffset = 0; + OplogSnapshot Snapshot; + + const uint64_t PayloadBufferPageSize = 64u * 1024u; + + size_t OpCount = GetOplogEntryCount(); + Snapshot.Ops.reserve(OpCount); + Snapshot.Keys.reserve(OpCount); + Snapshot.LSNs.reserve(OpCount); + + tsl::robin_map<PayloadIndex, Oid, PayloadIndex::Hasher> ReverseKeyMap; + std::vector<PayloadIndex> ReplayOrder = GetSortedOpPayloadRangeLocked(Paging{}, &ReverseKeyMap); + if (!ReplayOrder.empty()) + { + uint32_t EntryIndex = 0; + m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber LSN, IoBuffer&& Buffer) { + const PayloadIndex PayloadOffset = ReplayOrder[EntryIndex]; + + Snapshot.Keys.push_back(ReverseKeyMap.at(PayloadOffset)); + Snapshot.LSNs.push_back(LSN); + + uint64_t Size = Buffer.GetSize(); + if (Buffer.IsOwned()) + { + CbObjectView CopyView(Buffer.GetData()); + if (CopyView.GetSize() == Size) + { + Snapshot.Ops.emplace_back(std::move(CopyView)); + } + else + { + Snapshot.Ops.emplace_back(CbObjectView{}); + } + if (Snapshot.PayloadBuffers.empty()) + { + Snapshot.PayloadBuffers.emplace_back(std::move(Buffer)); + WriteOffset = Snapshot.PayloadBuffers.back().Size(); + } + else + { + Snapshot.PayloadBuffers.insert(Snapshot.PayloadBuffers.end() - 1, std::move(Buffer)); + } + } + else + { + uint64_t AvailableSize = Snapshot.PayloadBuffers.empty() ? 0 : Snapshot.PayloadBuffers.back().GetSize() - WriteOffset; + MutableMemoryView WriteBuffer; + if (Size > AvailableSize) + { + if (Size >= PayloadBufferPageSize && !Snapshot.PayloadBuffers.empty()) + { + // Insert the large payload before the current payload buffer so we can continue to use that + IoBuffer PayloadBuffer(Size); + WriteBuffer = PayloadBuffer.GetMutableView(); + Snapshot.PayloadBuffers.insert(Snapshot.PayloadBuffers.end() - 1, std::move(PayloadBuffer)); + } + else + { + IoBuffer PayloadBuffer(Max(Size, PayloadBufferPageSize)); + WriteBuffer = PayloadBuffer.GetMutableView().Mid(0, Size); + Snapshot.PayloadBuffers.emplace_back(std::move(PayloadBuffer)); + WriteOffset = Size; + } + } + else + { + WriteBuffer = Snapshot.PayloadBuffers.back().GetMutableView().Mid(WriteOffset, Size); + WriteOffset += Size; + } + WriteBuffer.CopyFrom(Buffer.GetView()); + CbObjectView CopyView(WriteBuffer.GetData()); + + if (CopyView.GetSize() == Size) + { + Snapshot.Ops.emplace_back(std::move(CopyView)); + } + else + { + Snapshot.Ops.emplace_back(CbObjectView{}); + } + } + + EntryIndex++; + }); + } + return Snapshot; +} + void ProjectStore::Oplog::IterateOplogLocked(std::function<void(CbObjectView)>&& Handler, const Paging& EntryPaging) { @@ -2710,7 +2839,7 @@ ProjectStore::Oplog::IterateOplogWithKeyRaw(std::function<void(LogSequenceNumber if (!ReplayOrder.empty()) { uint32_t EntryIndex = 0; - m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber Lsn, const IoBuffer& Buffer) { + m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber Lsn, IoBuffer&& Buffer) { const PayloadIndex PayloadOffset = ReplayOrder[EntryIndex]; Handler(Lsn, ReverseKeyMap.at(PayloadOffset), Buffer); EntryIndex++; @@ -6035,39 +6164,41 @@ public: { Ref<ProjectStore::Oplog> Oplog; - RwLock::SharedLockScope __(m_Project->m_ProjectLock); - if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end()) { - Oplog = It->second; - Oplog->EnableUpdateCapture(); - m_OplogHasUpdateCapture = true; - } - else if (ProjectStore::Oplog::ExistsAt(m_OplogBasePath)) - { - Stopwatch OplogTimer; - Oplog = new ProjectStore::Oplog(m_Project->Log(), - m_Project->Identifier, - m_OplogId, - m_Project->m_CidStore, - m_OplogBasePath, - std::filesystem::path{}, - ProjectStore::Oplog::EMode::kBasicReadOnly); - Oplog->Read(); - if (Ctx.Settings.Verbose) + RwLock::SharedLockScope __(m_Project->m_ProjectLock); + if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end()) { - ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': read oplog '{}/{}' in {}", - m_OplogBasePath, - m_Project->Identifier, - m_OplogId, - NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs())); + Oplog = It->second; + Oplog->EnableUpdateCapture(); + m_OplogHasUpdateCapture = true; + } + else if (ProjectStore::Oplog::ExistsAt(m_OplogBasePath)) + { + Stopwatch OplogTimer; + Oplog = new ProjectStore::Oplog(m_Project->Log(), + m_Project->Identifier, + m_OplogId, + m_Project->m_CidStore, + m_OplogBasePath, + std::filesystem::path{}, + ProjectStore::Oplog::EMode::kBasicReadOnly); + Oplog->Read(); + if (Ctx.Settings.Verbose) + { + ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': read oplog '{}/{}' in {}", + m_OplogBasePath, + m_Project->Identifier, + m_OplogId, + NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs())); + } + } + else + { + return; } - } - else - { - return; } - RwLock::SharedLockScope ___(Oplog->m_OplogLock); + RwLock::SharedLockScope OplogLock(Oplog->m_OplogLock); if (Ctx.IsCancelledFlag) { return; @@ -6083,7 +6214,45 @@ public: } } - Oplog->GetAttachmentsLocked(m_References, Ctx.Settings.StoreProjectAttachmentMetaData); + if (Ctx.Settings.StoreProjectAttachmentMetaData) + { + Oplog->GetAttachmentsLocked(m_References, /*StoreMetaDataOnDisk*/ true); + } + else + { + Stopwatch SnapshotTimer; + ProjectStore::Oplog::OplogSnapshot Snapshot = Oplog->GetSnapshotLocked(); + OplogLock.ReleaseNow(); + + uint64_t AllocatedSize = + std::accumulate(Snapshot.PayloadBuffers.begin(), + Snapshot.PayloadBuffers.end(), + uint64_t(0), + [](uint64_t Current, const IoBuffer& Buffer) { return Current + Buffer.GetSize(); }); + uint64_t UsedSize = + std::accumulate(Snapshot.Ops.begin(), + Snapshot.Ops.end(), + uint64_t(0), + [](uint64_t Current, const CbObjectView& Object) { return Current + Object.GetSize(); }); + + ZEN_INFO( + "GCV2: projectstore [PRECACHE] '{}': Oplog snapshot fetched {} ops from {} op data using {} memory from oplog '{}/{}' " + "in {}", + m_OplogBasePath, + Snapshot.Ops.size(), + NiceBytes(UsedSize), + NiceBytes(AllocatedSize), + m_Project->Identifier, + m_OplogId, + NiceTimeSpanMs(SnapshotTimer.GetElapsedTimeMs())); + + for (size_t Index = 0; Index < Snapshot.Ops.size(); Index++) + { + CbObjectView& Op = Snapshot.Ops[Index]; + Op.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); }); + } + } + m_OplogAccessTime = m_Project->LastOplogAccessTime(m_OplogId); } FilterReferences(Ctx, fmt::format("projectstore [PRECACHE] '{}'", m_OplogBasePath), m_References); @@ -6147,7 +6316,43 @@ public: OplogTimer.Reset(); - Oplog->GetAttachmentsLocked(m_AddedReferences, Ctx.Settings.StoreProjectAttachmentMetaData); + if (Ctx.Settings.StoreProjectAttachmentMetaData) + { + Oplog->GetAttachmentsLocked(m_References, /*StoreMetaDataOnDisk*/ true); + } + else + { + Stopwatch SnapshotTimer; + ProjectStore::Oplog::OplogSnapshot Snapshot = Oplog->GetSnapshotLocked(); + + uint64_t AllocatedSize = + std::accumulate(Snapshot.PayloadBuffers.begin(), + Snapshot.PayloadBuffers.end(), + uint64_t(0), + [](uint64_t Current, const IoBuffer& Buffer) { return Current + Buffer.GetSize(); }); + uint64_t UsedSize = + std::accumulate(Snapshot.Ops.begin(), + Snapshot.Ops.end(), + uint64_t(0), + [](uint64_t Current, const CbObjectView& Object) { return Current + Object.GetSize(); }); + + ZEN_INFO( + "GCV2: projectstore [LOCKSTATE] '{}': Oplog snapshot fetched {} ops from {} op data using {} memory from oplog " + "'{}/{}' in {}", + m_OplogBasePath, + Snapshot.Ops.size(), + NiceBytes(UsedSize), + NiceBytes(AllocatedSize), + m_Project->Identifier, + m_OplogId, + NiceTimeSpanMs(SnapshotTimer.GetElapsedTimeMs())); + + for (size_t Index = 0; Index < Snapshot.Ops.size(); Index++) + { + CbObjectView& Op = Snapshot.Ops[Index]; + Op.IterateAttachments([&](CbFieldView Visitor) { m_AddedReferences.emplace_back(Visitor.AsAttachment()); }); + } + } } if (Ctx.Settings.Verbose) { |