aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-01-09 16:52:08 +0100
committerGitHub Enterprise <[email protected]>2026-01-09 16:52:08 +0100
commit4b25a0926ce5cc4336a58165ddfbb11e7fe97f6b (patch)
treedc278605bd7b1036a24701455ab6df80f7871e30 /src
parentCprHttpClient cleanup (#703) (diff)
downloadzen-4b25a0926ce5cc4336a58165ddfbb11e7fe97f6b.tar.xz
zen-4b25a0926ce5cc4336a58165ddfbb11e7fe97f6b.zip
various optimizations (#704)
- Improvement: Validate chunk hashes when dechunking files in oplog import - Improvement: Use stream decompression when dechunking files - Improvement: When assembling blocks for oplog export, make sure we keep under/at block size limit - Improvement: Make cancelling of oplog import more responsive - Improvement: Use decompress to composite to avoid allocating a new memory buffer for uncompressed chunks during oplog import - Improvement: Reduce memory buffer size and allocate it on demand when writing multiple chunks to block store - Improvement: Reduce lock contention when fetching/checking existence of chunks in block store
Diffstat (limited to 'src')
-rw-r--r--src/zenremotestore/chunking/chunkedcontent.cpp2
-rw-r--r--src/zenremotestore/projectstore/remoteprojectstore.cpp146
-rw-r--r--src/zenstore/blockstore.cpp24
-rw-r--r--src/zenstore/compactcas.cpp10
-rw-r--r--src/zenstore/projectstore.cpp2
5 files changed, 151 insertions, 33 deletions
diff --git a/src/zenremotestore/chunking/chunkedcontent.cpp b/src/zenremotestore/chunking/chunkedcontent.cpp
index e8187d348..fda01aa56 100644
--- a/src/zenremotestore/chunking/chunkedcontent.cpp
+++ b/src/zenremotestore/chunking/chunkedcontent.cpp
@@ -108,7 +108,7 @@ namespace {
uint32_t PathIndex,
std::atomic<bool>& AbortFlag)
{
- ZEN_TRACE_CPU("ChunkFolderContent");
+ ZEN_TRACE_CPU("HashOneFile");
const uint64_t RawSize = OutChunkedContent.RawSizes[PathIndex];
const std::filesystem::path& Path = OutChunkedContent.Paths[PathIndex];
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp
index b566e5bed..5ba541dd0 100644
--- a/src/zenremotestore/projectstore/remoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp
@@ -11,6 +11,7 @@
#include <zencore/scopeguard.h>
#include <zencore/stream.h>
#include <zencore/timer.h>
+#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zenhttp/httpcommon.h>
#include <zenremotestore/chunking/chunkedfile.h>
@@ -266,6 +267,8 @@ namespace remotestore_impl {
&DownloadStartMS,
IgnoreMissingAttachments,
OptionalContext]() {
+ ZEN_TRACE_CPU("DownloadBlockChunks");
+
auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -387,6 +390,8 @@ namespace remotestore_impl {
OptionalContext,
RetriesLeft,
Chunks = Chunks]() {
+ ZEN_TRACE_CPU("DownloadBlock");
+
auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -492,7 +497,7 @@ namespace remotestore_impl {
{});
return;
}
- SharedBuffer BlockPayload = Compressed.Decompress();
+ CompositeBuffer BlockPayload = Compressed.DecompressToComposite();
if (!BlockPayload)
{
if (RetriesLeft > 0)
@@ -542,7 +547,7 @@ namespace remotestore_impl {
uint64_t BlockHeaderSize = 0;
bool StoreChunksOK = IterateChunkBlock(
- BlockPayload,
+ BlockPayload.Flatten(),
[&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info, &PotentialSize](
CompressedBuffer&& Chunk,
const IoHash& AttachmentRawHash) {
@@ -648,6 +653,8 @@ namespace remotestore_impl {
&Info,
IgnoreMissingAttachments,
OptionalContext]() {
+ ZEN_TRACE_CPU("DownloadAttachment");
+
auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -694,6 +701,8 @@ namespace remotestore_impl {
AttachmentSize,
Bytes = std::move(AttachmentResult.Bytes),
OptionalContext]() {
+ ZEN_TRACE_CPU("WriteAttachment");
+
auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -745,6 +754,8 @@ namespace remotestore_impl {
Chunks = std::move(ChunksInBlock),
&AsyncOnBlock,
&RemoteResult]() mutable {
+ ZEN_TRACE_CPU("CreateBlock");
+
auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -917,6 +928,8 @@ namespace remotestore_impl {
&LooseFileAttachments,
&Info,
OptionalContext]() {
+ ZEN_TRACE_CPU("UploadAttachment");
+
auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -1039,6 +1052,8 @@ namespace remotestore_impl {
&BulkBlockAttachmentsToUpload,
&Info,
OptionalContext]() {
+ ZEN_TRACE_CPU("UploadChunk");
+
auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -1587,6 +1602,8 @@ BuildContainer(CidStore& ChunkStore,
AllowChunking,
&RemoteResult,
OptionalContext]() {
+ ZEN_TRACE_CPU("PrepareChunk");
+
auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); });
if (remotestore_impl::IsCancelled(OptionalContext))
{
@@ -2041,6 +2058,15 @@ BuildContainer(CidStore& ChunkStore,
if (BlockAttachmentHashes.insert(AttachmentHash).second)
{
+ if (BuildBlocks && ChunksInBlock.size() > 0)
+ {
+ if (((BlockSize + PayloadSize) > MaxBlockSize || (ChunksInBlock.size() + 1) > MaxChunksPerBlock) &&
+ (CurrentOpKey != LastOpKey))
+ {
+ NewBlock();
+ }
+ }
+
if (auto It = LooseUploadAttachments.find(RawHash); It != LooseUploadAttachments.end())
{
ChunksInBlock.emplace_back(std::make_pair(
@@ -2079,10 +2105,6 @@ BuildContainer(CidStore& ChunkStore,
}
BlockSize += PayloadSize;
- if ((BlockSize >= MaxBlockSize || ChunksInBlock.size() > MaxChunksPerBlock) && (CurrentOpKey != LastOpKey))
- {
- NewBlock();
- }
LastOpKey = CurrentOpKey;
ChunksAssembled++;
}
@@ -2126,6 +2148,14 @@ BuildContainer(CidStore& ChunkStore,
if (BlockAttachmentHashes.insert(ChunkHash).second)
{
const ChunkSource& Source = Chunked.ChunkSources[ChunkIndex];
+ uint32_t ChunkSize = gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size);
+ if (BuildBlocks && ChunksInBlock.size() > 0)
+ {
+ if ((BlockSize + ChunkSize) > MaxBlockSize || (ChunksInBlock.size() + 1) > MaxChunksPerBlock)
+ {
+ NewBlock();
+ }
+ }
ChunksInBlock.emplace_back(
std::make_pair(ChunkHash,
[Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size](
@@ -2136,13 +2166,6 @@ BuildContainer(CidStore& ChunkStore,
OodleCompressionLevel::None)};
}));
BlockSize += CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size;
- if (BuildBlocks)
- {
- if (BlockSize >= MaxBlockSize || ChunksInBlock.size() > MaxChunksPerBlock)
- {
- NewBlock();
- }
- }
ChunksAssembled++;
}
ChunkedHashes.erase(FindIt);
@@ -2781,12 +2804,26 @@ ParseOplogContainer(const CbObject& ContainerObject,
for (CbFieldView OpEntry : OpsArray)
{
OpEntry.IterateAttachments([&](CbFieldView FieldView) { OpsAttachments.insert(FieldView.AsAttachment()); });
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK),
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
+ .Reason = "Operation cancelled"};
+ }
}
}
{
std::vector<IoHash> ReferencedAttachments(OpsAttachments.begin(), OpsAttachments.end());
OnReferencedAttachments(ReferencedAttachments);
}
+
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK),
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
+ .Reason = "Operation cancelled"};
+ }
+
remotestore_impl::ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", OpsAttachments.size()));
CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView();
@@ -3206,6 +3243,8 @@ LoadOplog(CidStore& ChunkStore,
IgnoreMissingAttachments,
&Info,
OptionalContext]() {
+ ZEN_TRACE_CPU("DechunkAttachment");
+
auto _ = MakeGuard([&DechunkLatch, &TempFileName] {
std::error_code Ec;
if (IsFile(TempFileName, Ec))
@@ -3232,7 +3271,7 @@ LoadOplog(CidStore& ChunkStore,
{
BasicFileWriter TmpWriter(TmpFile, 64u * 1024u);
- uint64_t Offset = CompressedBuffer::GetHeaderSizeForNoneEncoder();
+ uint64_t ChunkOffset = CompressedBuffer::GetHeaderSizeForNoneEncoder();
BLAKE3Stream HashingStream;
for (std::uint32_t SequenceIndex : Chunked.ChunkSequence)
{
@@ -3255,15 +3294,80 @@ LoadOplog(CidStore& ChunkStore,
}
return;
}
- CompositeBuffer Decompressed =
- CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).DecompressToComposite();
- for (const SharedBuffer& Segment : Decompressed.GetSegments())
+
+ IoHash RawHash;
+ uint64_t RawSize;
+
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), RawHash, RawSize);
+ if (RawHash != ChunkHash)
+ {
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Mismatching raw hash {} for chunk {} for chunked attachment {}",
+ RawHash,
+ ChunkHash,
+ Chunked.RawHash));
+
+ // We only add 1 as the resulting missing count will be 1 for the dechunked file
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(
+ gsl::narrow<int>(HttpResponseCode::NotFound),
+ "Missing chunk",
+ fmt::format("Mismatching raw hash {} for chunk {} for chunked attachment {}",
+ RawHash,
+ ChunkHash,
+ Chunked.RawHash));
+ }
+ return;
+ }
+
{
- MemoryView SegmentData = Segment.GetView();
- HashingStream.Append(SegmentData);
- TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), Offset);
- Offset += SegmentData.GetSize();
+ ZEN_TRACE_CPU("DecompressChunk");
+
+ if (!Compressed.DecompressToStream(0,
+ RawSize,
+ [&](uint64_t SourceOffset,
+ uint64_t SourceSize,
+ uint64_t Offset,
+ const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset, SourceSize, Offset);
+
+ for (const SharedBuffer& Segment :
+ RangeBuffer.GetSegments())
+ {
+ MemoryView SegmentData = Segment.GetView();
+ HashingStream.Append(SegmentData);
+ TmpWriter.Write(SegmentData.GetData(),
+ SegmentData.GetSize(),
+ ChunkOffset + Offset);
+ }
+ return true;
+ }))
+ {
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Failed to decompress chunk {} for chunked attachment {}",
+ ChunkHash,
+ Chunked.RawHash));
+
+ // We only add 1 as the resulting missing count will be 1 for the dechunked file
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(
+ gsl::narrow<int>(HttpResponseCode::NotFound),
+ "Missing chunk",
+ fmt::format("Failed to decompress chunk {} for chunked attachment {}",
+ ChunkHash,
+ Chunked.RawHash));
+ }
+ return;
+ }
}
+ ChunkOffset += RawSize;
}
BLAKE3 RawHash = HashingStream.GetHash();
ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash));
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index f97c98e08..0542d1171 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -762,7 +762,7 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con
LargestSize = Max(LargestSize, Size);
}
- const uint64_t MinSize = Max(LargestSize, 8u * 1024u * 1024u);
+ const uint64_t MinSize = Max(LargestSize, 512u * 1024u);
const uint64_t BufferSize = Min(TotalSize, MinSize);
std::vector<uint8_t> Buffer(BufferSize);
@@ -815,7 +815,12 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con
auto _ = MakeGuard([this, WriteBlockIndex]() { RemoveActiveWriteBlock(WriteBlockIndex); });
+ if (Count > 1)
{
+ if (Buffer.empty())
+ {
+ Buffer.resize(BufferSize);
+ }
MutableMemoryView WriteBuffer(Buffer.data(), RangeSize);
for (size_t Index = 0; Index < Count; Index++)
{
@@ -824,9 +829,14 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con
WriteBuffer.MidInline(RoundUp(SourceBuffer.GetSize(), Alignment));
}
WriteBlock->Write(Buffer.data(), RangeSize, AlignedInsertOffset);
+ m_TotalSize.fetch_add(RangeSize, std::memory_order::relaxed);
+ }
+ else
+ {
+ MemoryView SourceBuffer = Datas[Offset];
+ WriteBlock->Write(SourceBuffer.GetData(), SourceBuffer.GetSize(), AlignedInsertOffset);
+ m_TotalSize.fetch_add(SourceBuffer.GetSize(), std::memory_order::relaxed);
}
-
- m_TotalSize.fetch_add(RangeSize, std::memory_order::relaxed);
uint32_t ChunkOffset = AlignedInsertOffset;
std::vector<BlockStoreLocation> Locations(Count);
@@ -845,11 +855,11 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con
bool
BlockStore::HasChunk(const BlockStoreLocation& Location) const
{
- ZEN_TRACE_CPU("BlockStore::TryGetChunk");
+ ZEN_TRACE_CPU("BlockStore::HasChunk");
RwLock::SharedLockScope InsertLock(m_InsertLock);
if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end())
{
- if (const Ref<BlockStoreFile>& Block = BlockIt->second; Block)
+ if (Ref<BlockStoreFile> Block = BlockIt->second; Block)
{
InsertLock.ReleaseNow();
@@ -878,8 +888,10 @@ BlockStore::TryGetChunk(const BlockStoreLocation& Location) const
RwLock::SharedLockScope InsertLock(m_InsertLock);
if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end())
{
- if (const Ref<BlockStoreFile>& Block = BlockIt->second; Block)
+ if (Ref<BlockStoreFile> Block = BlockIt->second; Block)
{
+ InsertLock.ReleaseNow();
+
IoBuffer Chunk = Block->GetChunk(Location.Offset, Location.Size);
if (Chunk.GetSize() == Location.Size)
{
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index a5de5c448..37a8c36b8 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -301,13 +301,14 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
{
ZEN_TRACE_CPU("CasContainer::FindChunk");
- RwLock::SharedLockScope _(m_LocationMapLock);
+ RwLock::SharedLockScope Lock(m_LocationMapLock);
auto KeyIt = m_LocationMap.find(ChunkHash);
if (KeyIt == m_LocationMap.end())
{
return IoBuffer();
}
- const BlockStoreLocation& Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment);
+ const BlockStoreLocation Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment);
+ Lock.ReleaseNow();
IoBuffer Chunk = m_BlockStore.TryGetChunk(Location);
return Chunk;
@@ -316,10 +317,11 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
bool
CasContainerStrategy::HaveChunk(const IoHash& ChunkHash)
{
- RwLock::SharedLockScope _(m_LocationMapLock);
+ RwLock::SharedLockScope Lock(m_LocationMapLock);
if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end())
{
- const BlockStoreLocation& Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment);
+ const BlockStoreLocation Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment);
+ Lock.ReleaseNow();
return m_BlockStore.HasChunk(Location);
}
return false;
diff --git a/src/zenstore/projectstore.cpp b/src/zenstore/projectstore.cpp
index f1001f665..c5b27c1ea 100644
--- a/src/zenstore/projectstore.cpp
+++ b/src/zenstore/projectstore.cpp
@@ -3917,7 +3917,7 @@ ProjectStore::Project::Scrub(ScrubContext& Ctx)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
- ZEN_INFO("scrubbing '{}'", ProjectRootDir);
+ ZEN_INFO("scrubbing '{}'", m_OplogStoragePath);
// Scrubbing needs to check all existing oplogs
std::vector<std::string> OpLogs = ScanForOplogs();