aboutsummaryrefslogtreecommitdiff
path: root/src/zenremotestore
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-01-09 16:52:08 +0100
committerGitHub Enterprise <[email protected]>2026-01-09 16:52:08 +0100
commit4b25a0926ce5cc4336a58165ddfbb11e7fe97f6b (patch)
treedc278605bd7b1036a24701455ab6df80f7871e30 /src/zenremotestore
parentCprHttpClient cleanup (#703) (diff)
downloadzen-4b25a0926ce5cc4336a58165ddfbb11e7fe97f6b.tar.xz
zen-4b25a0926ce5cc4336a58165ddfbb11e7fe97f6b.zip
various optimizations (#704)
- Improvement: Validate chunk hashes when dechunking files in oplog import - Improvement: Use stream decompression when dechunking files - Improvement: When assembling blocks for oplog export, make sure we keep under/at block size limit - Improvement: Make cancelling of oplog import more responsive - Improvement: Use decompress to composite to avoid allocating a new memory buffer for uncompressed chunks during oplog import - Improvement: Reduce memory buffer size and allocate it on demand when writing multiple chunks to block store - Improvement: Reduce lock contention when fetching/checking existence of chunks in block store
Diffstat (limited to 'src/zenremotestore')
-rw-r--r--src/zenremotestore/chunking/chunkedcontent.cpp2
-rw-r--r--src/zenremotestore/projectstore/remoteprojectstore.cpp146
2 files changed, 126 insertions, 22 deletions
diff --git a/src/zenremotestore/chunking/chunkedcontent.cpp b/src/zenremotestore/chunking/chunkedcontent.cpp
index e8187d348..fda01aa56 100644
--- a/src/zenremotestore/chunking/chunkedcontent.cpp
+++ b/src/zenremotestore/chunking/chunkedcontent.cpp
@@ -108,7 +108,7 @@ namespace {
uint32_t PathIndex,
std::atomic<bool>& AbortFlag)
{
- ZEN_TRACE_CPU("ChunkFolderContent");
+ ZEN_TRACE_CPU("HashOneFile");
const uint64_t RawSize = OutChunkedContent.RawSizes[PathIndex];
const std::filesystem::path& Path = OutChunkedContent.Paths[PathIndex];
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp
index b566e5bed..5ba541dd0 100644
--- a/src/zenremotestore/projectstore/remoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp
@@ -11,6 +11,7 @@
#include <zencore/scopeguard.h>
#include <zencore/stream.h>
#include <zencore/timer.h>
+#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zenhttp/httpcommon.h>
#include <zenremotestore/chunking/chunkedfile.h>
@@ -266,6 +267,8 @@ namespace remotestore_impl {
&DownloadStartMS,
IgnoreMissingAttachments,
OptionalContext]() {
+ ZEN_TRACE_CPU("DownloadBlockChunks");
+
auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -387,6 +390,8 @@ namespace remotestore_impl {
OptionalContext,
RetriesLeft,
Chunks = Chunks]() {
+ ZEN_TRACE_CPU("DownloadBlock");
+
auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -492,7 +497,7 @@ namespace remotestore_impl {
{});
return;
}
- SharedBuffer BlockPayload = Compressed.Decompress();
+ CompositeBuffer BlockPayload = Compressed.DecompressToComposite();
if (!BlockPayload)
{
if (RetriesLeft > 0)
@@ -542,7 +547,7 @@ namespace remotestore_impl {
uint64_t BlockHeaderSize = 0;
bool StoreChunksOK = IterateChunkBlock(
- BlockPayload,
+ BlockPayload.Flatten(),
[&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info, &PotentialSize](
CompressedBuffer&& Chunk,
const IoHash& AttachmentRawHash) {
@@ -648,6 +653,8 @@ namespace remotestore_impl {
&Info,
IgnoreMissingAttachments,
OptionalContext]() {
+ ZEN_TRACE_CPU("DownloadAttachment");
+
auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -694,6 +701,8 @@ namespace remotestore_impl {
AttachmentSize,
Bytes = std::move(AttachmentResult.Bytes),
OptionalContext]() {
+ ZEN_TRACE_CPU("WriteAttachment");
+
auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -745,6 +754,8 @@ namespace remotestore_impl {
Chunks = std::move(ChunksInBlock),
&AsyncOnBlock,
&RemoteResult]() mutable {
+ ZEN_TRACE_CPU("CreateBlock");
+
auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -917,6 +928,8 @@ namespace remotestore_impl {
&LooseFileAttachments,
&Info,
OptionalContext]() {
+ ZEN_TRACE_CPU("UploadAttachment");
+
auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -1039,6 +1052,8 @@ namespace remotestore_impl {
&BulkBlockAttachmentsToUpload,
&Info,
OptionalContext]() {
+ ZEN_TRACE_CPU("UploadChunk");
+
auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -1587,6 +1602,8 @@ BuildContainer(CidStore& ChunkStore,
AllowChunking,
&RemoteResult,
OptionalContext]() {
+ ZEN_TRACE_CPU("PrepareChunk");
+
auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); });
if (remotestore_impl::IsCancelled(OptionalContext))
{
@@ -2041,6 +2058,15 @@ BuildContainer(CidStore& ChunkStore,
if (BlockAttachmentHashes.insert(AttachmentHash).second)
{
+ if (BuildBlocks && ChunksInBlock.size() > 0)
+ {
+ if (((BlockSize + PayloadSize) > MaxBlockSize || (ChunksInBlock.size() + 1) > MaxChunksPerBlock) &&
+ (CurrentOpKey != LastOpKey))
+ {
+ NewBlock();
+ }
+ }
+
if (auto It = LooseUploadAttachments.find(RawHash); It != LooseUploadAttachments.end())
{
ChunksInBlock.emplace_back(std::make_pair(
@@ -2079,10 +2105,6 @@ BuildContainer(CidStore& ChunkStore,
}
BlockSize += PayloadSize;
- if ((BlockSize >= MaxBlockSize || ChunksInBlock.size() > MaxChunksPerBlock) && (CurrentOpKey != LastOpKey))
- {
- NewBlock();
- }
LastOpKey = CurrentOpKey;
ChunksAssembled++;
}
@@ -2126,6 +2148,14 @@ BuildContainer(CidStore& ChunkStore,
if (BlockAttachmentHashes.insert(ChunkHash).second)
{
const ChunkSource& Source = Chunked.ChunkSources[ChunkIndex];
+ uint32_t ChunkSize = gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size);
+ if (BuildBlocks && ChunksInBlock.size() > 0)
+ {
+ if ((BlockSize + ChunkSize) > MaxBlockSize || (ChunksInBlock.size() + 1) > MaxChunksPerBlock)
+ {
+ NewBlock();
+ }
+ }
ChunksInBlock.emplace_back(
std::make_pair(ChunkHash,
[Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size](
@@ -2136,13 +2166,6 @@ BuildContainer(CidStore& ChunkStore,
OodleCompressionLevel::None)};
}));
BlockSize += CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size;
- if (BuildBlocks)
- {
- if (BlockSize >= MaxBlockSize || ChunksInBlock.size() > MaxChunksPerBlock)
- {
- NewBlock();
- }
- }
ChunksAssembled++;
}
ChunkedHashes.erase(FindIt);
@@ -2781,12 +2804,26 @@ ParseOplogContainer(const CbObject& ContainerObject,
for (CbFieldView OpEntry : OpsArray)
{
OpEntry.IterateAttachments([&](CbFieldView FieldView) { OpsAttachments.insert(FieldView.AsAttachment()); });
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK),
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
+ .Reason = "Operation cancelled"};
+ }
}
}
{
std::vector<IoHash> ReferencedAttachments(OpsAttachments.begin(), OpsAttachments.end());
OnReferencedAttachments(ReferencedAttachments);
}
+
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK),
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
+ .Reason = "Operation cancelled"};
+ }
+
remotestore_impl::ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", OpsAttachments.size()));
CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView();
@@ -3206,6 +3243,8 @@ LoadOplog(CidStore& ChunkStore,
IgnoreMissingAttachments,
&Info,
OptionalContext]() {
+ ZEN_TRACE_CPU("DechunkAttachment");
+
auto _ = MakeGuard([&DechunkLatch, &TempFileName] {
std::error_code Ec;
if (IsFile(TempFileName, Ec))
@@ -3232,7 +3271,7 @@ LoadOplog(CidStore& ChunkStore,
{
BasicFileWriter TmpWriter(TmpFile, 64u * 1024u);
- uint64_t Offset = CompressedBuffer::GetHeaderSizeForNoneEncoder();
+ uint64_t ChunkOffset = CompressedBuffer::GetHeaderSizeForNoneEncoder();
BLAKE3Stream HashingStream;
for (std::uint32_t SequenceIndex : Chunked.ChunkSequence)
{
@@ -3255,15 +3294,80 @@ LoadOplog(CidStore& ChunkStore,
}
return;
}
- CompositeBuffer Decompressed =
- CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).DecompressToComposite();
- for (const SharedBuffer& Segment : Decompressed.GetSegments())
+
+ IoHash RawHash;
+ uint64_t RawSize;
+
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), RawHash, RawSize);
+ if (RawHash != ChunkHash)
+ {
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Mismatching raw hash {} for chunk {} for chunked attachment {}",
+ RawHash,
+ ChunkHash,
+ Chunked.RawHash));
+
+ // We only add 1 as the resulting missing count will be 1 for the dechunked file
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(
+ gsl::narrow<int>(HttpResponseCode::NotFound),
+ "Missing chunk",
+ fmt::format("Mismatching raw hash {} for chunk {} for chunked attachment {}",
+ RawHash,
+ ChunkHash,
+ Chunked.RawHash));
+ }
+ return;
+ }
+
{
- MemoryView SegmentData = Segment.GetView();
- HashingStream.Append(SegmentData);
- TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), Offset);
- Offset += SegmentData.GetSize();
+ ZEN_TRACE_CPU("DecompressChunk");
+
+ if (!Compressed.DecompressToStream(0,
+ RawSize,
+ [&](uint64_t SourceOffset,
+ uint64_t SourceSize,
+ uint64_t Offset,
+ const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset, SourceSize, Offset);
+
+ for (const SharedBuffer& Segment :
+ RangeBuffer.GetSegments())
+ {
+ MemoryView SegmentData = Segment.GetView();
+ HashingStream.Append(SegmentData);
+ TmpWriter.Write(SegmentData.GetData(),
+ SegmentData.GetSize(),
+ ChunkOffset + Offset);
+ }
+ return true;
+ }))
+ {
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Failed to decompress chunk {} for chunked attachment {}",
+ ChunkHash,
+ Chunked.RawHash));
+
+ // We only add 1 as the resulting missing count will be 1 for the dechunked file
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(
+ gsl::narrow<int>(HttpResponseCode::NotFound),
+ "Missing chunk",
+ fmt::format("Failed to decompress chunk {} for chunked attachment {}",
+ ChunkHash,
+ Chunked.RawHash));
+ }
+ return;
+ }
}
+ ChunkOffset += RawSize;
}
BLAKE3 RawHash = HashingStream.GetHash();
ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash));