diff options
| author | Dan Engelbrecht <[email protected]> | 2026-01-09 16:52:08 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-01-09 16:52:08 +0100 |
| commit | 4b25a0926ce5cc4336a58165ddfbb11e7fe97f6b (patch) | |
| tree | dc278605bd7b1036a24701455ab6df80f7871e30 /src | |
| parent | CprHttpClient cleanup (#703) (diff) | |
| download | zen-4b25a0926ce5cc4336a58165ddfbb11e7fe97f6b.tar.xz zen-4b25a0926ce5cc4336a58165ddfbb11e7fe97f6b.zip | |
various optimizations (#704)
- Improvement: Validate chunk hashes when dechunking files in oplog import
- Improvement: Use stream decompression when dechunking files
- Improvement: When assembling blocks for oplog export, make sure we keep under/at block size limit
- Improvement: Make cancelling of oplog import more responsive
- Improvement: Use decompress to composite to avoid allocating a new memory buffer for uncompressed chunks during oplog import
- Improvement: Reduce memory buffer size and allocate it on demand when writing multiple chunks to block store
- Improvement: Reduce lock contention when fetching/checking existence of chunks in block store
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenremotestore/chunking/chunkedcontent.cpp | 2 | ||||
| -rw-r--r-- | src/zenremotestore/projectstore/remoteprojectstore.cpp | 146 | ||||
| -rw-r--r-- | src/zenstore/blockstore.cpp | 24 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 10 | ||||
| -rw-r--r-- | src/zenstore/projectstore.cpp | 2 |
5 files changed, 151 insertions, 33 deletions
diff --git a/src/zenremotestore/chunking/chunkedcontent.cpp b/src/zenremotestore/chunking/chunkedcontent.cpp index e8187d348..fda01aa56 100644 --- a/src/zenremotestore/chunking/chunkedcontent.cpp +++ b/src/zenremotestore/chunking/chunkedcontent.cpp @@ -108,7 +108,7 @@ namespace { uint32_t PathIndex, std::atomic<bool>& AbortFlag) { - ZEN_TRACE_CPU("ChunkFolderContent"); + ZEN_TRACE_CPU("HashOneFile"); const uint64_t RawSize = OutChunkedContent.RawSizes[PathIndex]; const std::filesystem::path& Path = OutChunkedContent.Paths[PathIndex]; diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp index b566e5bed..5ba541dd0 100644 --- a/src/zenremotestore/projectstore/remoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp @@ -11,6 +11,7 @@ #include <zencore/scopeguard.h> #include <zencore/stream.h> #include <zencore/timer.h> +#include <zencore/trace.h> #include <zencore/workthreadpool.h> #include <zenhttp/httpcommon.h> #include <zenremotestore/chunking/chunkedfile.h> @@ -266,6 +267,8 @@ namespace remotestore_impl { &DownloadStartMS, IgnoreMissingAttachments, OptionalContext]() { + ZEN_TRACE_CPU("DownloadBlockChunks"); + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -387,6 +390,8 @@ namespace remotestore_impl { OptionalContext, RetriesLeft, Chunks = Chunks]() { + ZEN_TRACE_CPU("DownloadBlock"); + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -492,7 +497,7 @@ namespace remotestore_impl { {}); return; } - SharedBuffer BlockPayload = Compressed.Decompress(); + CompositeBuffer BlockPayload = Compressed.DecompressToComposite(); if (!BlockPayload) { if (RetriesLeft > 0) @@ -542,7 +547,7 @@ namespace remotestore_impl { uint64_t BlockHeaderSize = 0; bool StoreChunksOK = IterateChunkBlock( - BlockPayload, + BlockPayload.Flatten(), [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info, &PotentialSize]( CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { @@ -648,6 +653,8 @@ namespace remotestore_impl { &Info, IgnoreMissingAttachments, OptionalContext]() { + ZEN_TRACE_CPU("DownloadAttachment"); + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -694,6 +701,8 @@ namespace remotestore_impl { AttachmentSize, Bytes = std::move(AttachmentResult.Bytes), OptionalContext]() { + ZEN_TRACE_CPU("WriteAttachment"); + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -745,6 +754,8 @@ namespace remotestore_impl { Chunks = std::move(ChunksInBlock), &AsyncOnBlock, &RemoteResult]() mutable { + ZEN_TRACE_CPU("CreateBlock"); + auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -917,6 +928,8 @@ namespace remotestore_impl { &LooseFileAttachments, &Info, OptionalContext]() { + ZEN_TRACE_CPU("UploadAttachment"); + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -1039,6 +1052,8 @@ namespace remotestore_impl { &BulkBlockAttachmentsToUpload, &Info, OptionalContext]() { + ZEN_TRACE_CPU("UploadChunk"); + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -1587,6 +1602,8 @@ BuildContainer(CidStore& ChunkStore, AllowChunking, &RemoteResult, OptionalContext]() { + ZEN_TRACE_CPU("PrepareChunk"); + auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); }); if (remotestore_impl::IsCancelled(OptionalContext)) { @@ -2041,6 +2058,15 @@ BuildContainer(CidStore& ChunkStore, if (BlockAttachmentHashes.insert(AttachmentHash).second) { + if (BuildBlocks && ChunksInBlock.size() > 0) + { + if (((BlockSize + PayloadSize) > MaxBlockSize || (ChunksInBlock.size() + 1) > MaxChunksPerBlock) && + (CurrentOpKey != LastOpKey)) + { + NewBlock(); + } + } + if (auto It = LooseUploadAttachments.find(RawHash); It != LooseUploadAttachments.end()) { ChunksInBlock.emplace_back(std::make_pair( @@ -2079,10 +2105,6 @@ BuildContainer(CidStore& ChunkStore, } BlockSize += PayloadSize; - if ((BlockSize >= MaxBlockSize || ChunksInBlock.size() > MaxChunksPerBlock) && (CurrentOpKey != LastOpKey)) - { - NewBlock(); - } LastOpKey = CurrentOpKey; ChunksAssembled++; } @@ -2126,6 +2148,14 @@ BuildContainer(CidStore& ChunkStore, if (BlockAttachmentHashes.insert(ChunkHash).second) { const ChunkSource& Source = Chunked.ChunkSources[ChunkIndex]; + uint32_t ChunkSize = gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size); + if (BuildBlocks && ChunksInBlock.size() > 0) + { + if ((BlockSize + ChunkSize) > MaxBlockSize || (ChunksInBlock.size() + 1) > MaxChunksPerBlock) + { + NewBlock(); + } + } ChunksInBlock.emplace_back( std::make_pair(ChunkHash, [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size]( @@ -2136,13 +2166,6 @@ BuildContainer(CidStore& ChunkStore, OodleCompressionLevel::None)}; })); BlockSize += CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size; - if (BuildBlocks) - { - if (BlockSize >= MaxBlockSize || ChunksInBlock.size() > MaxChunksPerBlock) - { - NewBlock(); - } - } ChunksAssembled++; } ChunkedHashes.erase(FindIt); @@ -2781,12 +2804,26 @@ ParseOplogContainer(const CbObject& ContainerObject, for (CbFieldView OpEntry : OpsArray) { OpEntry.IterateAttachments([&](CbFieldView FieldView) { OpsAttachments.insert(FieldView.AsAttachment()); }); + if (remotestore_impl::IsCancelled(OptionalContext)) + { + return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK), + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, + .Reason = "Operation cancelled"}; + } } } { std::vector<IoHash> ReferencedAttachments(OpsAttachments.begin(), OpsAttachments.end()); OnReferencedAttachments(ReferencedAttachments); } + + if (remotestore_impl::IsCancelled(OptionalContext)) + { + return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK), + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, + .Reason = "Operation cancelled"}; + } + remotestore_impl::ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", OpsAttachments.size())); CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView(); @@ -3206,6 +3243,8 @@ LoadOplog(CidStore& ChunkStore, IgnoreMissingAttachments, &Info, OptionalContext]() { + ZEN_TRACE_CPU("DechunkAttachment"); + auto _ = MakeGuard([&DechunkLatch, &TempFileName] { std::error_code Ec; if (IsFile(TempFileName, Ec)) @@ -3232,7 +3271,7 @@ LoadOplog(CidStore& ChunkStore, { BasicFileWriter TmpWriter(TmpFile, 64u * 1024u); - uint64_t Offset = CompressedBuffer::GetHeaderSizeForNoneEncoder(); + uint64_t ChunkOffset = CompressedBuffer::GetHeaderSizeForNoneEncoder(); BLAKE3Stream HashingStream; for (std::uint32_t SequenceIndex : Chunked.ChunkSequence) { @@ -3255,15 +3294,80 @@ LoadOplog(CidStore& ChunkStore, } return; } - CompositeBuffer Decompressed = - CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).DecompressToComposite(); - for (const SharedBuffer& Segment : Decompressed.GetSegments()) + + IoHash RawHash; + uint64_t RawSize; + + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), RawHash, RawSize); + if (RawHash != ChunkHash) + { + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Mismatching raw hash {} for chunk {} for chunked attachment {}", + RawHash, + ChunkHash, + Chunked.RawHash)); + + // We only add 1 as the resulting missing count will be 1 for the dechunked file + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError( + gsl::narrow<int>(HttpResponseCode::NotFound), + "Missing chunk", + fmt::format("Mismatching raw hash {} for chunk {} for chunked attachment {}", + RawHash, + ChunkHash, + Chunked.RawHash)); + } + return; + } + { - MemoryView SegmentData = Segment.GetView(); - HashingStream.Append(SegmentData); - TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), Offset); - Offset += SegmentData.GetSize(); + ZEN_TRACE_CPU("DecompressChunk"); + + if (!Compressed.DecompressToStream(0, + RawSize, + [&](uint64_t SourceOffset, + uint64_t SourceSize, + uint64_t Offset, + const CompositeBuffer& RangeBuffer) { + ZEN_UNUSED(SourceOffset, SourceSize, Offset); + + for (const SharedBuffer& Segment : + RangeBuffer.GetSegments()) + { + MemoryView SegmentData = Segment.GetView(); + HashingStream.Append(SegmentData); + TmpWriter.Write(SegmentData.GetData(), + SegmentData.GetSize(), + ChunkOffset + Offset); + } + return true; + })) + { + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Failed to decompress chunk {} for chunked attachment {}", + ChunkHash, + Chunked.RawHash)); + + // We only add 1 as the resulting missing count will be 1 for the dechunked file + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError( + gsl::narrow<int>(HttpResponseCode::NotFound), + "Missing chunk", + fmt::format("Failed to decompress chunk {} for chunked attachment {}", + ChunkHash, + Chunked.RawHash)); + } + return; + } } + ChunkOffset += RawSize; } BLAKE3 RawHash = HashingStream.GetHash(); ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash)); diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index f97c98e08..0542d1171 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -762,7 +762,7 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con LargestSize = Max(LargestSize, Size); } - const uint64_t MinSize = Max(LargestSize, 8u * 1024u * 1024u); + const uint64_t MinSize = Max(LargestSize, 512u * 1024u); const uint64_t BufferSize = Min(TotalSize, MinSize); std::vector<uint8_t> Buffer(BufferSize); @@ -815,7 +815,12 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con auto _ = MakeGuard([this, WriteBlockIndex]() { RemoveActiveWriteBlock(WriteBlockIndex); }); + if (Count > 1) { + if (Buffer.empty()) + { + Buffer.resize(BufferSize); + } MutableMemoryView WriteBuffer(Buffer.data(), RangeSize); for (size_t Index = 0; Index < Count; Index++) { @@ -824,9 +829,14 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con WriteBuffer.MidInline(RoundUp(SourceBuffer.GetSize(), Alignment)); } WriteBlock->Write(Buffer.data(), RangeSize, AlignedInsertOffset); + m_TotalSize.fetch_add(RangeSize, std::memory_order::relaxed); + } + else + { + MemoryView SourceBuffer = Datas[Offset]; + WriteBlock->Write(SourceBuffer.GetData(), SourceBuffer.GetSize(), AlignedInsertOffset); + m_TotalSize.fetch_add(SourceBuffer.GetSize(), std::memory_order::relaxed); } - - m_TotalSize.fetch_add(RangeSize, std::memory_order::relaxed); uint32_t ChunkOffset = AlignedInsertOffset; std::vector<BlockStoreLocation> Locations(Count); @@ -845,11 +855,11 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con bool BlockStore::HasChunk(const BlockStoreLocation& Location) const { - ZEN_TRACE_CPU("BlockStore::TryGetChunk"); + ZEN_TRACE_CPU("BlockStore::HasChunk"); RwLock::SharedLockScope InsertLock(m_InsertLock); if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end()) { - if (const Ref<BlockStoreFile>& Block = BlockIt->second; Block) + if (Ref<BlockStoreFile> Block = BlockIt->second; Block) { InsertLock.ReleaseNow(); @@ -878,8 +888,10 @@ BlockStore::TryGetChunk(const BlockStoreLocation& Location) const RwLock::SharedLockScope InsertLock(m_InsertLock); if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end()) { - if (const Ref<BlockStoreFile>& Block = BlockIt->second; Block) + if (Ref<BlockStoreFile> Block = BlockIt->second; Block) { + InsertLock.ReleaseNow(); + IoBuffer Chunk = Block->GetChunk(Location.Offset, Location.Size); if (Chunk.GetSize() == Location.Size) { diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index a5de5c448..37a8c36b8 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -301,13 +301,14 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { ZEN_TRACE_CPU("CasContainer::FindChunk"); - RwLock::SharedLockScope _(m_LocationMapLock); + RwLock::SharedLockScope Lock(m_LocationMapLock); auto KeyIt = m_LocationMap.find(ChunkHash); if (KeyIt == m_LocationMap.end()) { return IoBuffer(); } - const BlockStoreLocation& Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment); + const BlockStoreLocation Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment); + Lock.ReleaseNow(); IoBuffer Chunk = m_BlockStore.TryGetChunk(Location); return Chunk; @@ -316,10 +317,11 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash) bool CasContainerStrategy::HaveChunk(const IoHash& ChunkHash) { - RwLock::SharedLockScope _(m_LocationMapLock); + RwLock::SharedLockScope Lock(m_LocationMapLock); if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) { - const BlockStoreLocation& Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment); + const BlockStoreLocation Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment); + Lock.ReleaseNow(); return m_BlockStore.HasChunk(Location); } return false; diff --git a/src/zenstore/projectstore.cpp b/src/zenstore/projectstore.cpp index f1001f665..c5b27c1ea 100644 --- a/src/zenstore/projectstore.cpp +++ b/src/zenstore/projectstore.cpp @@ -3917,7 +3917,7 @@ ProjectStore::Project::Scrub(ScrubContext& Ctx) { ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_INFO("scrubbing '{}'", ProjectRootDir); + ZEN_INFO("scrubbing '{}'", m_OplogStoragePath); // Scrubbing needs to check all existing oplogs std::vector<std::string> OpLogs = ScanForOplogs(); |