diff options
| author | Liam Mitchell <[email protected]> | 2026-03-09 18:25:30 -0700 |
|---|---|---|
| committer | Liam Mitchell <[email protected]> | 2026-03-09 18:25:30 -0700 |
| commit | 57c1683b2935c834250b73eb506319ed67946160 (patch) | |
| tree | 1fc8f237010b26e65659b731fe6f6eae30422f5c /src/zenstore | |
| parent | Allow external OidcToken executable to be specified unless disabled via comma... (diff) | |
| parent | reduce lock time for project store gc precache and gc validate (#750) (diff) | |
| download | zen-57c1683b2935c834250b73eb506319ed67946160.tar.xz zen-57c1683b2935c834250b73eb506319ed67946160.zip | |
Merge branch 'main' into lm/oidctoken-exe-path
Diffstat (limited to 'src/zenstore')
| -rw-r--r-- | src/zenstore/blockstore.cpp | 28 | ||||
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 142 | ||||
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 19 | ||||
| -rw-r--r-- | src/zenstore/cas.h | 2 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 37 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/caslog.h | 4 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/projectstore.h | 10 | ||||
| -rw-r--r-- | src/zenstore/projectstore.cpp | 301 |
8 files changed, 402 insertions, 141 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index f97c98e08..3ea91ead6 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -374,7 +374,7 @@ BlockStoreFile::GetMetaPath() const //////////////////////////////////////////////////////// -constexpr uint64_t DefaultIterateSmallChunkWindowSize = 2 * 1024 * 1024; +constexpr uint64_t DefaultIterateSmallChunkWindowSize = 512u * 1024u; BlockStore::BlockStore() { @@ -762,7 +762,7 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con LargestSize = Max(LargestSize, Size); } - const uint64_t MinSize = Max(LargestSize, 8u * 1024u * 1024u); + const uint64_t MinSize = Max(LargestSize, 512u * 1024u); const uint64_t BufferSize = Min(TotalSize, MinSize); std::vector<uint8_t> Buffer(BufferSize); @@ -815,7 +815,12 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con auto _ = MakeGuard([this, WriteBlockIndex]() { RemoveActiveWriteBlock(WriteBlockIndex); }); + if (Count > 1) { + if (Buffer.empty()) + { + Buffer.resize(BufferSize); + } MutableMemoryView WriteBuffer(Buffer.data(), RangeSize); for (size_t Index = 0; Index < Count; Index++) { @@ -824,9 +829,14 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con WriteBuffer.MidInline(RoundUp(SourceBuffer.GetSize(), Alignment)); } WriteBlock->Write(Buffer.data(), RangeSize, AlignedInsertOffset); + m_TotalSize.fetch_add(RangeSize, std::memory_order::relaxed); + } + else + { + MemoryView SourceBuffer = Datas[Offset]; + WriteBlock->Write(SourceBuffer.GetData(), SourceBuffer.GetSize(), AlignedInsertOffset); + m_TotalSize.fetch_add(SourceBuffer.GetSize(), std::memory_order::relaxed); } - - m_TotalSize.fetch_add(RangeSize, std::memory_order::relaxed); uint32_t ChunkOffset = AlignedInsertOffset; std::vector<BlockStoreLocation> Locations(Count); @@ -845,11 +855,11 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con bool BlockStore::HasChunk(const BlockStoreLocation& Location) const { - ZEN_TRACE_CPU("BlockStore::TryGetChunk"); + ZEN_TRACE_CPU("BlockStore::HasChunk"); RwLock::SharedLockScope InsertLock(m_InsertLock); if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end()) { - if (const Ref<BlockStoreFile>& Block = BlockIt->second; Block) + if (Ref<BlockStoreFile> Block = BlockIt->second; Block) { InsertLock.ReleaseNow(); @@ -878,8 +888,10 @@ BlockStore::TryGetChunk(const BlockStoreLocation& Location) const RwLock::SharedLockScope InsertLock(m_InsertLock); if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end()) { - if (const Ref<BlockStoreFile>& Block = BlockIt->second; Block) + if (Ref<BlockStoreFile> Block = BlockIt->second; Block) { + InsertLock.ReleaseNow(); + IoBuffer Chunk = Block->GetChunk(Location.Offset, Location.Size); if (Chunk.GetSize() == Location.Size) { @@ -941,7 +953,7 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations, uint64_t IterateSmallChunkWindowSize = Max(DefaultIterateSmallChunkWindowSize, LargeSizeLimit); - const uint64_t IterateSmallChunkMaxGapSize = Max(2048u, IterateSmallChunkWindowSize / 512u); + const uint64_t IterateSmallChunkMaxGapSize = Max(2048u, IterateSmallChunkWindowSize / 256u); IterateSmallChunkWindowSize = Min((LargeSizeLimit + IterateSmallChunkMaxGapSize) * ChunkLocations.size(), IterateSmallChunkWindowSize); diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index b2e045632..ead7e4f3a 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -2410,74 +2410,95 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) try { - std::vector<BlockStoreLocation> ChunkLocations; - std::vector<IoHash> ChunkIndexToChunkHash; + std::vector<DiskLocation> ChunkLocations; + std::vector<IoHash> ChunkIndexToChunkHash; + std::vector<DiskLocation> StandaloneLocations; + std::vector<IoHash> StandaloneIndexToKeysHash; - RwLock::SharedLockScope _(m_IndexLock); + { + RwLock::SharedLockScope _(m_IndexLock); - const size_t BlockChunkInitialCount = m_Index.size() / 4; - ChunkLocations.reserve(BlockChunkInitialCount); - ChunkIndexToChunkHash.reserve(BlockChunkInitialCount); + const size_t InitialCount = m_Index.size() / 4; + ChunkLocations.reserve(InitialCount); + ChunkIndexToChunkHash.reserve(InitialCount); + StandaloneLocations.reserve(InitialCount); + StandaloneIndexToKeysHash.reserve(InitialCount); - // Do a pass over the index and verify any standalone file values straight away - // all other storage classes are gathered and verified in bulk in order to enable - // more efficient I/O scheduling + for (auto& Kv : m_Index) + { + const IoHash& HashKey = Kv.first; + const BucketPayload& Payload = m_Payloads[Kv.second]; + const DiskLocation& Loc = Payload.Location; - for (auto& Kv : m_Index) - { - const IoHash& HashKey = Kv.first; - const BucketPayload& Payload = m_Payloads[Kv.second]; - const DiskLocation& Loc = Payload.Location; + Ctx.ThrowIfDeadlineExpired(); + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + StandaloneLocations.push_back(Loc); + StandaloneIndexToKeysHash.push_back(HashKey); + } + else + { + ChunkLocations.push_back(Loc); + ChunkIndexToChunkHash.push_back(HashKey); + } + } + } + + for (size_t StandaloneKeyIndex = 0; StandaloneKeyIndex < StandaloneIndexToKeysHash.size(); StandaloneKeyIndex++) + { Ctx.ThrowIfDeadlineExpired(); - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - ChunkCount.fetch_add(1); - VerifiedChunkBytes.fetch_add(Loc.Size()); + const IoHash& HashKey = StandaloneIndexToKeysHash[StandaloneKeyIndex]; + const DiskLocation& Loc = StandaloneLocations[StandaloneKeyIndex]; - if (Loc.GetContentType() == ZenContentType::kBinary) - { - // Blob cache value, not much we can do about data integrity checking - // here since there's no hash available - ExtendablePathBuilder<256> DataFilePath; - BuildPath(DataFilePath, HashKey); + ChunkCount.fetch_add(1); + VerifiedChunkBytes.fetch_add(Loc.Size()); - RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); + if (Loc.GetContentType() == ZenContentType::kBinary) + { + // Blob cache value, not much we can do about data integrity checking + // here since there's no hash available + ExtendablePathBuilder<256> DataFilePath; + BuildPath(DataFilePath, HashKey); - std::error_code Ec; - uintmax_t size = FileSizeFromPath(DataFilePath.ToPath(), Ec); - if (Ec) - { - ReportBadKey(HashKey); - } - if (size != Loc.Size()) - { - ReportBadKey(HashKey); - } - continue; + RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); + + std::error_code Ec; + uintmax_t Size = FileSizeFromPath(DataFilePath.ToPath(), Ec); + if (Ec) + { + ReportBadKey(HashKey); } - else + ValueLock.ReleaseNow(); + + if (Size != Loc.Size()) { - // Structured cache value - IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey); - if (!Buffer) + // Make sure we verify that size hasn't changed behind our back... + RwLock::SharedLockScope _(m_IndexLock); + if (auto It = m_Index.find(HashKey); It != m_Index.end()) { - ReportBadKey(HashKey); - continue; - } - if (!ValidateIoBuffer(Loc.GetContentType(), std::move(Buffer))) - { - ReportBadKey(HashKey); - continue; + const BucketPayload& Payload = m_Payloads[It->second]; + const DiskLocation& CurrentLoc = Payload.Location; + if (Size != CurrentLoc.Size()) + { + ReportBadKey(HashKey); + } } } } else { - ChunkLocations.emplace_back(Loc.GetBlockLocation(m_Configuration.PayloadAlignment)); - ChunkIndexToChunkHash.push_back(HashKey); - continue; + // Structured cache value + IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey); + if (!Buffer) + { + ReportBadKey(HashKey); + } + else if (!ValidateIoBuffer(Loc.GetContentType(), std::move(Buffer))) + { + ReportBadKey(HashKey); + } } } @@ -2502,8 +2523,9 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) ReportBadKey(Hash); return true; } - const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; - ZenContentType ContentType = Payload.Location.GetContentType(); + + const DiskLocation& Loc = ChunkLocations[ChunkIndex]; + ZenContentType ContentType = Loc.GetContentType(); Buffer.SetContentType(ContentType); if (!ValidateIoBuffer(ContentType, std::move(Buffer))) { @@ -2525,8 +2547,8 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) ReportBadKey(Hash); return true; } - const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; - ZenContentType ContentType = Payload.Location.GetContentType(); + const DiskLocation& Loc = ChunkLocations[ChunkIndex]; + ZenContentType ContentType = Loc.GetContentType(); Buffer.SetContentType(ContentType); if (!ValidateIoBuffer(ContentType, std::move(Buffer))) { @@ -2536,8 +2558,16 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) return true; }; - m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) { - return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk, 0); + std::vector<BlockStoreLocation> ChunkBlockLocations; + ChunkBlockLocations.reserve(ChunkLocations.size()); + + for (const DiskLocation& Loc : ChunkLocations) + { + ChunkBlockLocations.push_back(Loc.GetBlockLocation(m_Configuration.PayloadAlignment)); + } + + m_BlockStore.IterateChunks(ChunkBlockLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) { + return m_BlockStore.IterateBlock(ChunkBlockLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk, 0); }); } catch (ScrubDeadlineExpiredException&) diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index 660c66b9a..94abcf547 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -594,16 +594,16 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb { FoundLocalInvalid = true; } - else if (CbValidateError Error = ValidateCompactBinary(Request.RecordCacheValue.GetView(), CbValidateMode::Default); - Error != CbValidateError::None) + else if (CbObjectView RecordObject = CbObjectView(Request.RecordCacheValue.GetData()); + RecordObject.GetSize() != Request.RecordCacheValue.GetSize()) { ZEN_WARN("HandleRpcGetCacheRecords stored record is corrupt, compact binary format validation failed. Reason: '{}'", - ToString(Error)); + "Object size does not match payload size"); FoundLocalInvalid = true; } else { - Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData()); + Request.RecordObject = std::move(RecordObject); ParseValues(Request); Request.Complete = true; @@ -1710,16 +1710,15 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, Record.ValuesRead = true; if (Record.CacheValue && Record.CacheValue.GetContentType() == ZenContentType::kCbObject) { - if (CbValidateError Error = ValidateCompactBinary(Record.CacheValue.GetView(), CbValidateMode::Default); - Error != CbValidateError::None) + if (CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData()); + RecordObject.GetSize() != Record.CacheValue.GetSize()) { - ZEN_WARN("GetLocalCacheRecords stored record for is corrupt, compact binary format validation failed. Reason: '{}'", - ToString(Error)); + ZEN_WARN("GetLocalCacheRecords stored record is corrupt, compact binary format validation failed. Reason: '{}'", + "Object size does not match payload size"); } else { - CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData()); - CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView(); + CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView(); Record.Values.reserve(ValuesArray.Num()); for (CbFieldView ValueField : ValuesArray) { diff --git a/src/zenstore/cas.h b/src/zenstore/cas.h index 0f6e2ba9d..47b6e63cc 100644 --- a/src/zenstore/cas.h +++ b/src/zenstore/cas.h @@ -59,7 +59,7 @@ protected: CidStoreConfiguration m_Config; }; -ZENCORE_API std::unique_ptr<CasStore> CreateCasStore(GcManager& Gc); +std::unique_ptr<CasStore> CreateCasStore(GcManager& Gc); void CAS_forcelink(); diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index a5de5c448..5d8f95c9e 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -301,13 +301,14 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { ZEN_TRACE_CPU("CasContainer::FindChunk"); - RwLock::SharedLockScope _(m_LocationMapLock); + RwLock::SharedLockScope Lock(m_LocationMapLock); auto KeyIt = m_LocationMap.find(ChunkHash); if (KeyIt == m_LocationMap.end()) { return IoBuffer(); } - const BlockStoreLocation& Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment); + const BlockStoreLocation Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment); + Lock.ReleaseNow(); IoBuffer Chunk = m_BlockStore.TryGetChunk(Location); return Chunk; @@ -316,10 +317,11 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash) bool CasContainerStrategy::HaveChunk(const IoHash& ChunkHash) { - RwLock::SharedLockScope _(m_LocationMapLock); + RwLock::SharedLockScope Lock(m_LocationMapLock); if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) { - const BlockStoreLocation& Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment); + const BlockStoreLocation Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment); + Lock.ReleaseNow(); return m_BlockStore.HasChunk(Location); } return false; @@ -545,11 +547,11 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) if (Ctx.IsSkipCas()) { - ZEN_INFO("SKIPPED scrubbing: '{}'", m_BlocksBasePath); + ZEN_INFO("SKIPPED scrubbing: '{}'", m_RootDirectory); return; } - ZEN_INFO("scrubbing '{}'", m_BlocksBasePath); + ZEN_INFO("scrubbing '{}'", m_RootDirectory); RwLock BadKeysLock; std::vector<IoHash> BadKeys; @@ -563,20 +565,21 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) try { - RwLock::SharedLockScope _(m_LocationMapLock); - - uint64_t TotalChunkCount = m_LocationMap.size(); - ChunkLocations.reserve(TotalChunkCount); - ChunkIndexToChunkHash.reserve(TotalChunkCount); { - for (const auto& Entry : m_LocationMap) + uint64_t TotalChunkCount = m_LocationMap.size(); + ChunkLocations.reserve(TotalChunkCount); + ChunkIndexToChunkHash.reserve(TotalChunkCount); + RwLock::SharedLockScope _(m_LocationMapLock); { - const IoHash& ChunkHash = Entry.first; - const BlockStoreDiskLocation& DiskLocation = m_Locations[Entry.second]; - BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); + for (const auto& Entry : m_LocationMap) + { + const IoHash& ChunkHash = Entry.first; + const BlockStoreDiskLocation& DiskLocation = m_Locations[Entry.second]; + BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); - ChunkLocations.push_back(Location); - ChunkIndexToChunkHash.push_back(ChunkHash); + ChunkLocations.push_back(Location); + ChunkIndexToChunkHash.push_back(ChunkHash); + } } } diff --git a/src/zenstore/include/zenstore/caslog.h b/src/zenstore/include/zenstore/caslog.h index 3d95c9c90..f3dd32fb1 100644 --- a/src/zenstore/include/zenstore/caslog.h +++ b/src/zenstore/include/zenstore/caslog.h @@ -41,8 +41,8 @@ private: static const inline uint8_t MagicSequence[16] = {'.', '-', '=', ' ', 'C', 'A', 'S', 'L', 'O', 'G', 'v', '1', ' ', '=', '-', '.'}; - ZENCORE_API uint32_t ComputeChecksum(); - void Finalize() { Checksum = ComputeChecksum(); } + uint32_t ComputeChecksum(); + void Finalize() { Checksum = ComputeChecksum(); } }; static_assert(sizeof(FileHeader) == 64); diff --git a/src/zenstore/include/zenstore/projectstore.h b/src/zenstore/include/zenstore/projectstore.h index 09c3096ad..33ef996db 100644 --- a/src/zenstore/include/zenstore/projectstore.h +++ b/src/zenstore/include/zenstore/projectstore.h @@ -238,6 +238,16 @@ public: std::atomic_bool& IsCancelledFlag, WorkerThreadPool* OptionalWorkerPool); + struct OplogSnapshot + { + std::vector<CbObjectView> Ops; + std::vector<Oid> Keys; + std::vector<LogSequenceNumber> LSNs; + std::vector<IoBuffer> PayloadBuffers; + }; + + OplogSnapshot GetSnapshotLocked(); + private: struct FileMapEntry { diff --git a/src/zenstore/projectstore.cpp b/src/zenstore/projectstore.cpp index f1001f665..1ab2b317a 100644 --- a/src/zenstore/projectstore.cpp +++ b/src/zenstore/projectstore.cpp @@ -22,6 +22,8 @@ #include "referencemetadata.h" +#include <numeric> + ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_set.h> #include <xxh3.h> @@ -861,9 +863,9 @@ struct ProjectStore::OplogStorage : public RefCounted } } - void ReplayLogEntries(const std::span<const Oplog::OplogPayload> Entries, - const std::span<const Oplog::PayloadIndex> Order, - std::function<void(LogSequenceNumber Lsn, const IoBuffer& Buffer)>&& Handler) + void ReplayLogEntries(const std::span<const Oplog::OplogPayload> Entries, + const std::span<const Oplog::PayloadIndex> Order, + std::function<void(LogSequenceNumber Lsn, IoBuffer&& Buffer)>&& Handler) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::OplogStorage::ReplayLogEntries"); @@ -886,13 +888,13 @@ struct ProjectStore::OplogStorage : public RefCounted if (OpBufferView.GetSize() == Entry.Address.Size) { IoBuffer Buffer = IoBuffer(IoBuffer::Wrap, OpBufferView.GetData(), OpBufferView.GetSize()); - Handler(Entry.Lsn, Buffer); + Handler(Entry.Lsn, std::move(Buffer)); } else { IoBuffer OpBuffer(Entry.Address.Size); OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset); - Handler(Entry.Lsn, OpBuffer); + Handler(Entry.Lsn, std::move(OpBuffer)); } } } @@ -1645,18 +1647,47 @@ ProjectStore::Oplog::Validate(const std::filesystem::path& ProjectRootDir, Keys.reserve(OpCount); Mappings.reserve(OpCount); - IterateOplogWithKey([&](LogSequenceNumber LSN, const Oid& Key, CbObjectView OpView) { - Result.LSNLow = Min(Result.LSNLow, LSN); - Result.LSNHigh = Max(Result.LSNHigh, LSN); - KeyHashes.push_back(Key); - Keys.emplace_back(std::string(OpView["key"sv].AsString())); + { + Stopwatch SnapshotTimer; + RwLock::SharedLockScope OplogLock(m_OplogLock); + ProjectStore::Oplog::OplogSnapshot Snapshot = GetSnapshotLocked(); + OplogLock.ReleaseNow(); - std::vector<IoHash> OpAttachments; - OpView.IterateAttachments([&OpAttachments](CbFieldView Attachment) { OpAttachments.push_back(Attachment.AsAttachment()); }); - Attachments.emplace_back(std::move(OpAttachments)); + uint64_t AllocatedSize = std::accumulate(Snapshot.PayloadBuffers.begin(), + Snapshot.PayloadBuffers.end(), + uint64_t(0), + [](uint64_t Current, const IoBuffer& Buffer) { return Current + Buffer.GetSize(); }); + uint64_t UsedSize = + std::accumulate(Snapshot.Ops.begin(), Snapshot.Ops.end(), uint64_t(0), [](uint64_t Current, const CbObjectView& Object) { + return Current + Object.GetSize(); + }); - Mappings.push_back(GetMapping(OpView)); - }); + ZEN_INFO("Oplog snapshot fetched {} ops from {} op data using {} memory from oplog '{}/{}' in {}", + Snapshot.Ops.size(), + NiceBytes(UsedSize), + NiceBytes(AllocatedSize), + m_OuterProjectId, + m_OplogId, + NiceTimeSpanMs(SnapshotTimer.GetElapsedTimeMs())); + + for (size_t Index = 0; Index < Snapshot.Ops.size(); Index++) + { + CbObjectView& OpView = Snapshot.Ops[Index]; + LogSequenceNumber LSN = Snapshot.LSNs[Index]; + const Oid& Key = Snapshot.Keys[Index]; + + Result.LSNLow = Min(Result.LSNLow, LSN); + Result.LSNHigh = Max(Result.LSNHigh, LSN); + KeyHashes.push_back(Key); + Keys.emplace_back(std::string(OpView["key"sv].AsString())); + + std::vector<IoHash> OpAttachments; + OpView.IterateAttachments([&OpAttachments](CbFieldView Attachment) { OpAttachments.push_back(Attachment.AsAttachment()); }); + Attachments.emplace_back(std::move(OpAttachments)); + + Mappings.push_back(GetMapping(OpView)); + } + } Result.OpCount = gsl::narrow<uint32_t>(Keys.size()); @@ -2644,6 +2675,104 @@ ProjectStore::Oplog::GetSortedOpPayloadRangeLocked(const Paging& Entry return ReplayOrder; } +ProjectStore::Oplog::OplogSnapshot +ProjectStore::Oplog::GetSnapshotLocked() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::Oplog::GetSnapshotLocked"); + if (!m_Storage) + { + return {}; + } + + uint64_t WriteOffset = 0; + OplogSnapshot Snapshot; + + const uint64_t PayloadBufferPageSize = 64u * 1024u; + + size_t OpCount = GetOplogEntryCount(); + Snapshot.Ops.reserve(OpCount); + Snapshot.Keys.reserve(OpCount); + Snapshot.LSNs.reserve(OpCount); + + tsl::robin_map<PayloadIndex, Oid, PayloadIndex::Hasher> ReverseKeyMap; + std::vector<PayloadIndex> ReplayOrder = GetSortedOpPayloadRangeLocked(Paging{}, &ReverseKeyMap); + if (!ReplayOrder.empty()) + { + uint32_t EntryIndex = 0; + m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber LSN, IoBuffer&& Buffer) { + const PayloadIndex PayloadOffset = ReplayOrder[EntryIndex]; + + Snapshot.Keys.push_back(ReverseKeyMap.at(PayloadOffset)); + Snapshot.LSNs.push_back(LSN); + + uint64_t Size = Buffer.GetSize(); + if (Buffer.IsOwned()) + { + CbObjectView CopyView(Buffer.GetData()); + if (CopyView.GetSize() == Size) + { + Snapshot.Ops.emplace_back(std::move(CopyView)); + } + else + { + Snapshot.Ops.emplace_back(CbObjectView{}); + } + if (Snapshot.PayloadBuffers.empty()) + { + Snapshot.PayloadBuffers.emplace_back(std::move(Buffer)); + WriteOffset = Snapshot.PayloadBuffers.back().Size(); + } + else + { + Snapshot.PayloadBuffers.insert(Snapshot.PayloadBuffers.end() - 1, std::move(Buffer)); + } + } + else + { + uint64_t AvailableSize = Snapshot.PayloadBuffers.empty() ? 0 : Snapshot.PayloadBuffers.back().GetSize() - WriteOffset; + MutableMemoryView WriteBuffer; + if (Size > AvailableSize) + { + if (Size >= PayloadBufferPageSize && !Snapshot.PayloadBuffers.empty()) + { + // Insert the large payload before the current payload buffer so we can continue to use that + IoBuffer PayloadBuffer(Size); + WriteBuffer = PayloadBuffer.GetMutableView(); + Snapshot.PayloadBuffers.insert(Snapshot.PayloadBuffers.end() - 1, std::move(PayloadBuffer)); + } + else + { + IoBuffer PayloadBuffer(Max(Size, PayloadBufferPageSize)); + WriteBuffer = PayloadBuffer.GetMutableView().Mid(0, Size); + Snapshot.PayloadBuffers.emplace_back(std::move(PayloadBuffer)); + WriteOffset = Size; + } + } + else + { + WriteBuffer = Snapshot.PayloadBuffers.back().GetMutableView().Mid(WriteOffset, Size); + WriteOffset += Size; + } + WriteBuffer.CopyFrom(Buffer.GetView()); + CbObjectView CopyView(WriteBuffer.GetData()); + + if (CopyView.GetSize() == Size) + { + Snapshot.Ops.emplace_back(std::move(CopyView)); + } + else + { + Snapshot.Ops.emplace_back(CbObjectView{}); + } + } + + EntryIndex++; + }); + } + return Snapshot; +} + void ProjectStore::Oplog::IterateOplogLocked(std::function<void(CbObjectView)>&& Handler, const Paging& EntryPaging) { @@ -2710,7 +2839,7 @@ ProjectStore::Oplog::IterateOplogWithKeyRaw(std::function<void(LogSequenceNumber if (!ReplayOrder.empty()) { uint32_t EntryIndex = 0; - m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber Lsn, const IoBuffer& Buffer) { + m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber Lsn, IoBuffer&& Buffer) { const PayloadIndex PayloadOffset = ReplayOrder[EntryIndex]; Handler(Lsn, ReverseKeyMap.at(PayloadOffset), Buffer); EntryIndex++; @@ -3917,7 +4046,7 @@ ProjectStore::Project::Scrub(ScrubContext& Ctx) { ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_INFO("scrubbing '{}'", ProjectRootDir); + ZEN_INFO("scrubbing '{}'", m_OplogStoragePath); // Scrubbing needs to check all existing oplogs std::vector<std::string> OpLogs = ScanForOplogs(); @@ -3932,6 +4061,7 @@ ProjectStore::Project::Scrub(ScrubContext& Ctx) { for (const std::string& OpLogId : OpLogs) { + Ctx.ThrowIfDeadlineExpired(); Ref<ProjectStore::Oplog> OpLog; { if (auto OpIt = m_Oplogs.find(OpLogId); OpIt != m_Oplogs.end()) @@ -4358,6 +4488,7 @@ ProjectStore::ScrubStorage(ScrubContext& Ctx) } for (const Ref<Project>& Project : Projects) { + Ctx.ThrowIfDeadlineExpired(); Project->Scrub(Ctx); } } @@ -6033,39 +6164,41 @@ public: { Ref<ProjectStore::Oplog> Oplog; - RwLock::SharedLockScope __(m_Project->m_ProjectLock); - if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end()) { - Oplog = It->second; - Oplog->EnableUpdateCapture(); - m_OplogHasUpdateCapture = true; - } - else if (ProjectStore::Oplog::ExistsAt(m_OplogBasePath)) - { - Stopwatch OplogTimer; - Oplog = new ProjectStore::Oplog(m_Project->Log(), - m_Project->Identifier, - m_OplogId, - m_Project->m_CidStore, - m_OplogBasePath, - std::filesystem::path{}, - ProjectStore::Oplog::EMode::kBasicReadOnly); - Oplog->Read(); - if (Ctx.Settings.Verbose) + RwLock::SharedLockScope __(m_Project->m_ProjectLock); + if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end()) { - ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': read oplog '{}/{}' in {}", - m_OplogBasePath, - m_Project->Identifier, - m_OplogId, - NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs())); + Oplog = It->second; + Oplog->EnableUpdateCapture(); + m_OplogHasUpdateCapture = true; + } + else if (ProjectStore::Oplog::ExistsAt(m_OplogBasePath)) + { + Stopwatch OplogTimer; + Oplog = new ProjectStore::Oplog(m_Project->Log(), + m_Project->Identifier, + m_OplogId, + m_Project->m_CidStore, + m_OplogBasePath, + std::filesystem::path{}, + ProjectStore::Oplog::EMode::kBasicReadOnly); + Oplog->Read(); + if (Ctx.Settings.Verbose) + { + ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': read oplog '{}/{}' in {}", + m_OplogBasePath, + m_Project->Identifier, + m_OplogId, + NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs())); + } + } + else + { + return; } - } - else - { - return; } - RwLock::SharedLockScope ___(Oplog->m_OplogLock); + RwLock::SharedLockScope OplogLock(Oplog->m_OplogLock); if (Ctx.IsCancelledFlag) { return; @@ -6081,7 +6214,45 @@ public: } } - Oplog->GetAttachmentsLocked(m_References, Ctx.Settings.StoreProjectAttachmentMetaData); + if (Ctx.Settings.StoreProjectAttachmentMetaData) + { + Oplog->GetAttachmentsLocked(m_References, /*StoreMetaDataOnDisk*/ true); + } + else + { + Stopwatch SnapshotTimer; + ProjectStore::Oplog::OplogSnapshot Snapshot = Oplog->GetSnapshotLocked(); + OplogLock.ReleaseNow(); + + uint64_t AllocatedSize = + std::accumulate(Snapshot.PayloadBuffers.begin(), + Snapshot.PayloadBuffers.end(), + uint64_t(0), + [](uint64_t Current, const IoBuffer& Buffer) { return Current + Buffer.GetSize(); }); + uint64_t UsedSize = + std::accumulate(Snapshot.Ops.begin(), + Snapshot.Ops.end(), + uint64_t(0), + [](uint64_t Current, const CbObjectView& Object) { return Current + Object.GetSize(); }); + + ZEN_INFO( + "GCV2: projectstore [PRECACHE] '{}': Oplog snapshot fetched {} ops from {} op data using {} memory from oplog '{}/{}' " + "in {}", + m_OplogBasePath, + Snapshot.Ops.size(), + NiceBytes(UsedSize), + NiceBytes(AllocatedSize), + m_Project->Identifier, + m_OplogId, + NiceTimeSpanMs(SnapshotTimer.GetElapsedTimeMs())); + + for (size_t Index = 0; Index < Snapshot.Ops.size(); Index++) + { + CbObjectView& Op = Snapshot.Ops[Index]; + Op.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); }); + } + } + m_OplogAccessTime = m_Project->LastOplogAccessTime(m_OplogId); } FilterReferences(Ctx, fmt::format("projectstore [PRECACHE] '{}'", m_OplogBasePath), m_References); @@ -6145,7 +6316,43 @@ public: OplogTimer.Reset(); - Oplog->GetAttachmentsLocked(m_AddedReferences, Ctx.Settings.StoreProjectAttachmentMetaData); + if (Ctx.Settings.StoreProjectAttachmentMetaData) + { + Oplog->GetAttachmentsLocked(m_References, /*StoreMetaDataOnDisk*/ true); + } + else + { + Stopwatch SnapshotTimer; + ProjectStore::Oplog::OplogSnapshot Snapshot = Oplog->GetSnapshotLocked(); + + uint64_t AllocatedSize = + std::accumulate(Snapshot.PayloadBuffers.begin(), + Snapshot.PayloadBuffers.end(), + uint64_t(0), + [](uint64_t Current, const IoBuffer& Buffer) { return Current + Buffer.GetSize(); }); + uint64_t UsedSize = + std::accumulate(Snapshot.Ops.begin(), + Snapshot.Ops.end(), + uint64_t(0), + [](uint64_t Current, const CbObjectView& Object) { return Current + Object.GetSize(); }); + + ZEN_INFO( + "GCV2: projectstore [LOCKSTATE] '{}': Oplog snapshot fetched {} ops from {} op data using {} memory from oplog " + "'{}/{}' in {}", + m_OplogBasePath, + Snapshot.Ops.size(), + NiceBytes(UsedSize), + NiceBytes(AllocatedSize), + m_Project->Identifier, + m_OplogId, + NiceTimeSpanMs(SnapshotTimer.GetElapsedTimeMs())); + + for (size_t Index = 0; Index < Snapshot.Ops.size(); Index++) + { + CbObjectView& Op = Snapshot.Ops[Index]; + Op.IterateAttachments([&](CbFieldView Visitor) { m_AddedReferences.emplace_back(Visitor.AsAttachment()); }); + } + } } if (Ctx.Settings.Verbose) { |