diff options
| author | Dan Engelbrecht <[email protected]> | 2026-01-09 16:52:08 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-01-09 16:52:08 +0100 |
| commit | 4b25a0926ce5cc4336a58165ddfbb11e7fe97f6b (patch) | |
| tree | dc278605bd7b1036a24701455ab6df80f7871e30 /src/zenremotestore/projectstore/remoteprojectstore.cpp | |
| parent | CprHttpClient cleanup (#703) (diff) | |
| download | zen-4b25a0926ce5cc4336a58165ddfbb11e7fe97f6b.tar.xz zen-4b25a0926ce5cc4336a58165ddfbb11e7fe97f6b.zip | |
various optimizations (#704)
- Improvement: Validate chunk hashes when dechunking files in oplog import
- Improvement: Use stream decompression when dechunking files
- Improvement: When assembling blocks for oplog export, make sure we keep under/at block size limit
- Improvement: Make cancelling of oplog import more responsive
- Improvement: Use decompress to composite to avoid allocating a new memory buffer for uncompressed chunks during oplog import
- Improvement: Reduce memory buffer size and allocate it on demand when writing multiple chunks to block store
- Improvement: Reduce lock contention when fetching/checking existence of chunks in block store
Diffstat (limited to 'src/zenremotestore/projectstore/remoteprojectstore.cpp')
| -rw-r--r-- | src/zenremotestore/projectstore/remoteprojectstore.cpp | 146 |
1 files changed, 125 insertions, 21 deletions
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp index b566e5bed..5ba541dd0 100644 --- a/src/zenremotestore/projectstore/remoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp @@ -11,6 +11,7 @@ #include <zencore/scopeguard.h> #include <zencore/stream.h> #include <zencore/timer.h> +#include <zencore/trace.h> #include <zencore/workthreadpool.h> #include <zenhttp/httpcommon.h> #include <zenremotestore/chunking/chunkedfile.h> @@ -266,6 +267,8 @@ namespace remotestore_impl { &DownloadStartMS, IgnoreMissingAttachments, OptionalContext]() { + ZEN_TRACE_CPU("DownloadBlockChunks"); + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -387,6 +390,8 @@ namespace remotestore_impl { OptionalContext, RetriesLeft, Chunks = Chunks]() { + ZEN_TRACE_CPU("DownloadBlock"); + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -492,7 +497,7 @@ namespace remotestore_impl { {}); return; } - SharedBuffer BlockPayload = Compressed.Decompress(); + CompositeBuffer BlockPayload = Compressed.DecompressToComposite(); if (!BlockPayload) { if (RetriesLeft > 0) @@ -542,7 +547,7 @@ namespace remotestore_impl { uint64_t BlockHeaderSize = 0; bool StoreChunksOK = IterateChunkBlock( - BlockPayload, + BlockPayload.Flatten(), [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info, &PotentialSize]( CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { @@ -648,6 +653,8 @@ namespace remotestore_impl { &Info, IgnoreMissingAttachments, OptionalContext]() { + ZEN_TRACE_CPU("DownloadAttachment"); + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -694,6 +701,8 @@ namespace remotestore_impl { AttachmentSize, Bytes = std::move(AttachmentResult.Bytes), OptionalContext]() { + ZEN_TRACE_CPU("WriteAttachment"); + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -745,6 +754,8 @@ namespace remotestore_impl { Chunks = std::move(ChunksInBlock), &AsyncOnBlock, &RemoteResult]() mutable { + ZEN_TRACE_CPU("CreateBlock"); + auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -917,6 +928,8 @@ namespace remotestore_impl { &LooseFileAttachments, &Info, OptionalContext]() { + ZEN_TRACE_CPU("UploadAttachment"); + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -1039,6 +1052,8 @@ namespace remotestore_impl { &BulkBlockAttachmentsToUpload, &Info, OptionalContext]() { + ZEN_TRACE_CPU("UploadChunk"); + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -1587,6 +1602,8 @@ BuildContainer(CidStore& ChunkStore, AllowChunking, &RemoteResult, OptionalContext]() { + ZEN_TRACE_CPU("PrepareChunk"); + auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); }); if (remotestore_impl::IsCancelled(OptionalContext)) { @@ -2041,6 +2058,15 @@ BuildContainer(CidStore& ChunkStore, if (BlockAttachmentHashes.insert(AttachmentHash).second) { + if (BuildBlocks && ChunksInBlock.size() > 0) + { + if (((BlockSize + PayloadSize) > MaxBlockSize || (ChunksInBlock.size() + 1) > MaxChunksPerBlock) && + (CurrentOpKey != LastOpKey)) + { + NewBlock(); + } + } + if (auto It = LooseUploadAttachments.find(RawHash); It != LooseUploadAttachments.end()) { ChunksInBlock.emplace_back(std::make_pair( @@ -2079,10 +2105,6 @@ BuildContainer(CidStore& ChunkStore, } BlockSize += PayloadSize; - if ((BlockSize >= MaxBlockSize || ChunksInBlock.size() > MaxChunksPerBlock) && (CurrentOpKey != LastOpKey)) - { - NewBlock(); - } LastOpKey = CurrentOpKey; ChunksAssembled++; } @@ -2126,6 +2148,14 @@ BuildContainer(CidStore& ChunkStore, if (BlockAttachmentHashes.insert(ChunkHash).second) { const ChunkSource& Source = Chunked.ChunkSources[ChunkIndex]; + uint32_t ChunkSize = gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size); + if (BuildBlocks && ChunksInBlock.size() > 0) + { + if ((BlockSize + ChunkSize) > MaxBlockSize || (ChunksInBlock.size() + 1) > MaxChunksPerBlock) + { + NewBlock(); + } + } ChunksInBlock.emplace_back( std::make_pair(ChunkHash, [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size]( @@ -2136,13 +2166,6 @@ BuildContainer(CidStore& ChunkStore, OodleCompressionLevel::None)}; })); BlockSize += CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size; - if (BuildBlocks) - { - if (BlockSize >= MaxBlockSize || ChunksInBlock.size() > MaxChunksPerBlock) - { - NewBlock(); - } - } ChunksAssembled++; } ChunkedHashes.erase(FindIt); @@ -2781,12 +2804,26 @@ ParseOplogContainer(const CbObject& ContainerObject, for (CbFieldView OpEntry : OpsArray) { OpEntry.IterateAttachments([&](CbFieldView FieldView) { OpsAttachments.insert(FieldView.AsAttachment()); }); + if (remotestore_impl::IsCancelled(OptionalContext)) + { + return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK), + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, + .Reason = "Operation cancelled"}; + } } } { std::vector<IoHash> ReferencedAttachments(OpsAttachments.begin(), OpsAttachments.end()); OnReferencedAttachments(ReferencedAttachments); } + + if (remotestore_impl::IsCancelled(OptionalContext)) + { + return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK), + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, + .Reason = "Operation cancelled"}; + } + remotestore_impl::ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", OpsAttachments.size())); CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView(); @@ -3206,6 +3243,8 @@ LoadOplog(CidStore& ChunkStore, IgnoreMissingAttachments, &Info, OptionalContext]() { + ZEN_TRACE_CPU("DechunkAttachment"); + auto _ = MakeGuard([&DechunkLatch, &TempFileName] { std::error_code Ec; if (IsFile(TempFileName, Ec)) @@ -3232,7 +3271,7 @@ LoadOplog(CidStore& ChunkStore, { BasicFileWriter TmpWriter(TmpFile, 64u * 1024u); - uint64_t Offset = CompressedBuffer::GetHeaderSizeForNoneEncoder(); + uint64_t ChunkOffset = CompressedBuffer::GetHeaderSizeForNoneEncoder(); BLAKE3Stream HashingStream; for (std::uint32_t SequenceIndex : Chunked.ChunkSequence) { @@ -3255,15 +3294,80 @@ LoadOplog(CidStore& ChunkStore, } return; } - CompositeBuffer Decompressed = - CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).DecompressToComposite(); - for (const SharedBuffer& Segment : Decompressed.GetSegments()) + + IoHash RawHash; + uint64_t RawSize; + + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), RawHash, RawSize); + if (RawHash != ChunkHash) + { + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Mismatching raw hash {} for chunk {} for chunked attachment {}", + RawHash, + ChunkHash, + Chunked.RawHash)); + + // We only add 1 as the resulting missing count will be 1 for the dechunked file + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError( + gsl::narrow<int>(HttpResponseCode::NotFound), + "Missing chunk", + fmt::format("Mismatching raw hash {} for chunk {} for chunked attachment {}", + RawHash, + ChunkHash, + Chunked.RawHash)); + } + return; + } + { - MemoryView SegmentData = Segment.GetView(); - HashingStream.Append(SegmentData); - TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), Offset); - Offset += SegmentData.GetSize(); + ZEN_TRACE_CPU("DecompressChunk"); + + if (!Compressed.DecompressToStream(0, + RawSize, + [&](uint64_t SourceOffset, + uint64_t SourceSize, + uint64_t Offset, + const CompositeBuffer& RangeBuffer) { + ZEN_UNUSED(SourceOffset, SourceSize, Offset); + + for (const SharedBuffer& Segment : + RangeBuffer.GetSegments()) + { + MemoryView SegmentData = Segment.GetView(); + HashingStream.Append(SegmentData); + TmpWriter.Write(SegmentData.GetData(), + SegmentData.GetSize(), + ChunkOffset + Offset); + } + return true; + })) + { + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Failed to decompress chunk {} for chunked attachment {}", + ChunkHash, + Chunked.RawHash)); + + // We only add 1 as the resulting missing count will be 1 for the dechunked file + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError( + gsl::narrow<int>(HttpResponseCode::NotFound), + "Missing chunk", + fmt::format("Failed to decompress chunk {} for chunked attachment {}", + ChunkHash, + Chunked.RawHash)); + } + return; + } } + ChunkOffset += RawSize; } BLAKE3 RawHash = HashingStream.GetHash(); ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash)); |