aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-03-14 23:12:04 +0100
committerDan Engelbrecht <[email protected]>2026-03-14 23:12:04 +0100
commit73a3944d15e09b437ad22f6899d8bfdcc96db339 (patch)
tree9bf11fec06fcfa71ef006fc3c4f13f4449660fb8
parentbroke out ChunkAttachments helper (diff)
downloadzen-73a3944d15e09b437ad22f6899d8bfdcc96db339.tar.xz
zen-73a3944d15e09b437ad22f6899d8bfdcc96db339.zip
operator on CompositeBuffer for block building
refactor and clean up
-rw-r--r--src/zenremotestore/builds/buildstorageoperations.cpp4
-rw-r--r--src/zenremotestore/chunking/chunkblock.cpp29
-rw-r--r--src/zenremotestore/include/zenremotestore/chunking/chunkblock.h2
-rw-r--r--src/zenremotestore/projectstore/remoteprojectstore.cpp843
4 files changed, 458 insertions, 420 deletions
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp
index 44d52451c..a04063c4c 100644
--- a/src/zenremotestore/builds/buildstorageoperations.cpp
+++ b/src/zenremotestore/builds/buildstorageoperations.cpp
@@ -5214,7 +5214,7 @@ BuildsOperationUploadFolder::GenerateBlock(const ChunkedFolderContent& Content,
{
BlockContent.emplace_back(std::make_pair(
Content.ChunkedContent.ChunkHashes[ChunkIndex],
- [this, &Content, &Lookup, &OpenFileCache, ChunkIndex](const IoHash& ChunkHash) -> std::pair<uint64_t, CompressedBuffer> {
+ [this, &Content, &Lookup, &OpenFileCache, ChunkIndex](const IoHash& ChunkHash) -> std::pair<uint64_t, CompositeBuffer> {
CompositeBuffer Chunk = FetchChunk(Content, Lookup, ChunkHash, OpenFileCache);
ZEN_ASSERT(Chunk);
uint64_t RawSize = Chunk.GetSize();
@@ -5224,7 +5224,7 @@ BuildsOperationUploadFolder::GenerateBlock(const ChunkedFolderContent& Content,
const OodleCompressionLevel CompressionLevel =
ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None;
- return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, CompressionLevel)};
+ return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, CompressionLevel).GetCompressed()};
}));
}
diff --git a/src/zenremotestore/chunking/chunkblock.cpp b/src/zenremotestore/chunking/chunkblock.cpp
index 11a38917c..0fe3c09ce 100644
--- a/src/zenremotestore/chunking/chunkblock.cpp
+++ b/src/zenremotestore/chunking/chunkblock.cpp
@@ -352,12 +352,9 @@ GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks,
BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(ChunkCount), BufferEndPtr);
for (const auto& It : FetchChunks)
{
- std::pair<uint64_t, CompressedBuffer> Chunk = It.second(It.first);
- ZEN_ASSERT_SLOW(Chunk.second.DecodeRawSize() == Chunk.first);
- ZEN_ASSERT_SLOW(Chunk.second.DecodeRawHash() == It.first);
- ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk.second.Decompress().AsIoBuffer()) == It.first);
- uint64_t ChunkSize = 0;
- std::span<const SharedBuffer> Segments = Chunk.second.GetCompressed().GetSegments();
+ std::pair<uint64_t, CompositeBuffer> Chunk = It.second(It.first);
+ uint64_t ChunkSize = 0;
+ std::span<const SharedBuffer> Segments = Chunk.second.GetSegments();
for (const SharedBuffer& Segment : Segments)
{
ZEN_ASSERT(Segment.IsOwned());
@@ -377,18 +374,6 @@ GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks,
CompressedBuffer CompressedBlock =
CompressedBuffer::Compress(CompositeBuffer(std::move(ChunkSegments)), OodleCompressor::Mermaid, OodleCompressionLevel::None);
OutBlock.BlockHash = CompressedBlock.DecodeRawHash();
-
-#if ZEN_BUILD_DEBUG
- uint64_t VerifyHeaderSize = 0;
- ZEN_ASSERT_SLOW(IterateChunkBlock(
- CompressedBlock.Decompress(),
- [](CompressedBuffer&& Chunk, const IoHash& AttachmentHash) {
- ZEN_ASSERT(Chunk.DecodeRawHash() == AttachmentHash);
- SharedBuffer Decompressed = Chunk.Decompress();
- ZEN_ASSERT(AttachmentHash == IoHash::HashBuffer(Decompressed.AsIoBuffer()));
- },
- VerifyHeaderSize));
-#endif // ZEN_BUILD_DEBUG
return CompressedBlock;
}
@@ -972,8 +957,8 @@ TEST_CASE("chunkblock.block")
for (const auto& It : AttachmentsWithId)
{
Chunks.push_back(
- std::make_pair(It.second.DecodeRawHash(), [Buffer = It.second](const IoHash&) -> std::pair<uint64_t, CompressedBuffer> {
- return {Buffer.DecodeRawSize(), Buffer};
+ std::make_pair(It.second.DecodeRawHash(), [Buffer = It.second](const IoHash&) -> std::pair<uint64_t, CompositeBuffer> {
+ return {Buffer.DecodeRawSize(), Buffer.GetCompressed()};
}));
}
ChunkBlockDescription Block;
@@ -1007,8 +992,8 @@ TEST_CASE("chunkblock.reuseblocks")
for (const auto& It : AttachmentsWithId)
{
Chunks.push_back(
- std::make_pair(It.second.DecodeRawHash(), [Buffer = It.second](const IoHash&) -> std::pair<uint64_t, CompressedBuffer> {
- return {Buffer.DecodeRawSize(), Buffer};
+ std::make_pair(It.second.DecodeRawHash(), [Buffer = It.second](const IoHash&) -> std::pair<uint64_t, CompositeBuffer> {
+ return {Buffer.DecodeRawSize(), Buffer.GetCompressed()};
}));
}
ChunkBlockDescription Block;
diff --git a/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h b/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h
index 931bb2097..e3a5f6539 100644
--- a/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h
+++ b/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h
@@ -31,7 +31,7 @@ ChunkBlockDescription ParseChunkBlockDescription(const CbObjectView& BlockO
std::vector<ChunkBlockDescription> ParseBlockMetadatas(std::span<const CbObject> BlockMetadatas);
CbObject BuildChunkBlockDescription(const ChunkBlockDescription& Block, CbObjectView MetaData);
ChunkBlockDescription GetChunkBlockDescription(const SharedBuffer& BlockPayload, const IoHash& RawHash);
-typedef std::function<std::pair<uint64_t, CompressedBuffer>(const IoHash& RawHash)> FetchChunkFunc;
+typedef std::function<std::pair<uint64_t, CompositeBuffer>(const IoHash& RawHash)> FetchChunkFunc;
CompressedBuffer GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, ChunkBlockDescription& OutBlock);
bool IterateChunkBlock(const SharedBuffer& BlockPayload,
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp
index 51dd686bb..39e2f2413 100644
--- a/src/zenremotestore/projectstore/remoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp
@@ -253,6 +253,7 @@ namespace remotestore_impl {
const IoHash& FirstAttachmentHash = AttachmentHashes[SortedUploadAttachmentsIndex];
const Oid FirstAttachmentOpKey = AttachmentKeys[SortedUploadAttachmentsIndex];
uint64_t CurrentOpAttachmentsSize = AttachmentSizes[SortedUploadAttachmentsIndex];
+ ZEN_ASSERT(CurrentOpAttachmentsSize <= m_Config.MaxChunkEmbedSize);
std::vector<IoHash> CurrentOpRawHashes;
CurrentOpRawHashes.push_back(FirstAttachmentHash);
@@ -272,6 +273,7 @@ namespace remotestore_impl {
}
const IoHash& NextAttachmentHash = AttachmentHashes[NextSortedUploadAttachmentsIndex];
uint64_t NextOpAttachmentSize = AttachmentSizes[NextSortedUploadAttachmentsIndex];
+ ZEN_ASSERT(NextOpAttachmentSize <= m_Config.MaxChunkEmbedSize);
if (CurrentOpAttachmentsSize + NextOpAttachmentSize > m_UsableBlockSize)
{
@@ -362,6 +364,7 @@ namespace remotestore_impl {
ZEN_ASSERT(PendingBlockSize <= m_UsableBlockSize);
ZEN_ASSERT(PendingChunkHashes.size() <= m_Config.MaxChunksPerBlock);
}
+ ZEN_ASSERT(AddedChunkSize <= CurrentOpAttachmentsSize);
ZEN_ASSERT(PendingBlockSize <= m_UsableBlockSize);
ZEN_ASSERT(PendingChunkHashes.size() <= m_Config.MaxChunksPerBlock);
@@ -410,6 +413,63 @@ namespace remotestore_impl {
const uint64_t m_UsableBlockSize = 0;
};
+ IoBuffer CompressToTempFile(const IoHash& RawHash,
+ const IoBuffer& RawData,
+ const std::filesystem::path& AttachmentPath,
+ OodleCompressor Compressor,
+ OodleCompressionLevel CompressionLevel)
+ {
+ ZEN_ASSERT(!IsFile(AttachmentPath));
+ BasicFile CompressedFile;
+ std::error_code Ec;
+ CompressedFile.Open(AttachmentPath, BasicFile::Mode::kTruncateDelete, Ec);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to create temp file for blob {} at '{}'", RawHash, AttachmentPath));
+ }
+
+ if (RawData.GetSize() < 512u * 1024u)
+ {
+ CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(RawData)), Compressor, CompressionLevel);
+ if (!CompressedBlob)
+ {
+ throw std::runtime_error(fmt::format("Failed to compress blob {}", RawHash));
+ }
+ CompressedFile.Write(CompressedBlob.GetCompressed(), 0);
+ }
+ else
+ {
+ bool CouldCompress = CompressedBuffer::CompressToStream(
+ CompositeBuffer(SharedBuffer(RawData)),
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset);
+ CompressedFile.Write(RangeBuffer, Offset);
+ ZEN_UNUSED(SourceOffset, SourceSize);
+ },
+ Compressor,
+ CompressionLevel);
+ if (!CouldCompress)
+ {
+ // Compressed is larger than source data...
+ CompressedBuffer CompressedBlob =
+ CompressedBuffer::Compress(SharedBuffer(std::move(RawData)), OodleCompressor::Mermaid, OodleCompressionLevel::None);
+ if (!CompressedBlob)
+ {
+ throw std::runtime_error(fmt::format("Failed to compress blob {}", RawHash));
+ }
+ CompressedFile.SetFileSize(0);
+ CompressedFile.Write(CompressedBlob.GetCompressed(), 0);
+ }
+ }
+ IoBuffer TempAttachmentBuffer = IoBufferBuilder::MakeFromFile(AttachmentPath);
+ ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress());
+ CompressedFile.Close();
+ ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress());
+ TempAttachmentBuffer.SetDeleteOnClose(true);
+ ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress());
+ return TempAttachmentBuffer;
+ }
+
struct FoundAttachment
{
std::filesystem::path RawPath; // If not stored in cid
@@ -734,7 +794,8 @@ namespace remotestore_impl {
}
});
}
- Work.Wait(200, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+
+ Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(IsAborted, IsPaused);
if (remotestore_impl::IsCancelled(OptionalContext))
{
@@ -747,6 +808,16 @@ namespace remotestore_impl {
PendingWork,
FindChunkSizesTimer.GetElapsedTimeMs());
});
+
+ if (!AbortFlag.load())
+ {
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Finding attachments"sv,
+ "",
+ UploadAttachments.size(),
+ 0,
+ FindChunkSizesTimer.GetElapsedTimeMs());
+ }
}
struct ChunkedFile
@@ -818,7 +889,7 @@ namespace remotestore_impl {
});
}
- Work.Wait(200, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(IsAborted, IsPaused);
if (remotestore_impl::IsCancelled(OptionalContext))
{
@@ -831,9 +902,190 @@ namespace remotestore_impl {
PendingWork,
ChunkAttachmentsTimer.GetElapsedTimeMs());
});
+
+ if (!AbortFlag.load())
+ {
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Chunking attachments"sv,
+ "",
+ AttachmentsToChunk.size(),
+ 0,
+ ChunkAttachmentsTimer.GetElapsedTimeMs());
+ }
return ChunkedFiles;
}
+ void ResolveAttachments(CidStore& ChunkStore,
+ WorkerThreadPool& WorkerPool,
+ uint64_t MaxChunkEmbedSize,
+ const std::filesystem::path& AttachmentTempPath,
+ std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher>& UploadAttachments,
+ std::unordered_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher>& LargeChunkAttachments,
+ std::unordered_map<IoHash, std::pair<uint64_t, IoBuffer>, IoHash::Hasher>& LooseUploadAttachments,
+ JobContext* OptionalContext)
+ {
+ if (UploadAttachments.empty())
+ {
+ return;
+ }
+ Stopwatch UploadAttachmentsTimer;
+
+ RwLock ResolveLock;
+
+ std::atomic<bool> AbortFlag(false);
+ std::atomic<bool> PauseFlag(false);
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ for (auto& It : UploadAttachments)
+ {
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ AbortFlag.store(true);
+ break;
+ }
+ Work.ScheduleWork(
+ WorkerPool,
+ [&ChunkStore,
+ MaxChunkEmbedSize,
+ &AttachmentTempPath,
+ &ResolveLock,
+ &LargeChunkAttachments,
+ &LooseUploadAttachments,
+ UploadAttachment = &It.second,
+ RawHash = It.first,
+ OptionalContext](std::atomic<bool>& AbortFlag) {
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ AbortFlag.store(true);
+ }
+ if (AbortFlag)
+ {
+ return;
+ }
+ ZEN_ASSERT(UploadAttachment->Size != 0);
+ if (!UploadAttachment->RawPath.empty())
+ {
+ if (UploadAttachment->Size > (MaxChunkEmbedSize * 2))
+ {
+ // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't
+ // it will be a loose attachment instead of going into a block
+
+ TGetAttachmentBufferFunc FetchFunc = [RawPath = UploadAttachment->RawPath,
+ AttachmentTempPath,
+ UploadAttachment](const IoHash& RawHash) -> CompositeBuffer {
+ IoBuffer RawData = IoBufferBuilder::MakeFromFile(RawPath);
+ if (!RawData)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed to read source file for blob {} from '{}'", RawHash, RawPath));
+ }
+
+ std::filesystem::path AttachmentPath = AttachmentTempPath;
+ AttachmentPath.append(RawHash.ToHexString());
+
+ IoBuffer TempAttachmentBuffer = remotestore_impl::CompressToTempFile(RawHash,
+ RawData,
+ AttachmentPath,
+ OodleCompressor::Mermaid,
+ OodleCompressionLevel::VeryFast);
+ TempAttachmentBuffer.SetDeleteOnClose(true);
+
+ ZEN_INFO("Saved temp attachment to '{}', {} ({})",
+ AttachmentPath,
+ NiceBytes(UploadAttachment->Size),
+ NiceBytes(TempAttachmentBuffer.GetSize()));
+ return CompositeBuffer(SharedBuffer(std::move(TempAttachmentBuffer)));
+ };
+
+ RwLock::ExclusiveLockScope _(ResolveLock);
+ LargeChunkAttachments.insert_or_assign(RawHash, std::move(FetchFunc));
+ }
+ else
+ {
+ // Compress inline - check compressed size to see if it should go into a block or not
+ IoBuffer RawData = IoBufferBuilder::MakeFromFile(UploadAttachment->RawPath);
+ if (!RawData)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed to read source file for blob {} from '{}'", RawHash, UploadAttachment->RawPath));
+ }
+
+ std::filesystem::path TempFilePath = AttachmentTempPath;
+ TempFilePath.append(RawHash.ToHexString());
+
+ IoBuffer TempAttachmentBuffer = remotestore_impl::CompressToTempFile(RawHash,
+ RawData,
+ TempFilePath,
+ OodleCompressor::Mermaid,
+ OodleCompressionLevel::VeryFast);
+ TempAttachmentBuffer.SetDeleteOnClose(true);
+
+ uint64_t CompressedSize = TempAttachmentBuffer.GetSize();
+
+ ZEN_INFO("Saved temp attachment to '{}', {} ({})",
+ TempFilePath,
+ NiceBytes(UploadAttachment->Size),
+ NiceBytes(CompressedSize));
+
+ if (CompressedSize > MaxChunkEmbedSize)
+ {
+ TGetAttachmentBufferFunc FetchFunc = [Data = std::move(TempAttachmentBuffer)](const IoHash&) {
+ return CompositeBuffer(SharedBuffer(std::move(Data)));
+ };
+
+ RwLock::ExclusiveLockScope _(ResolveLock);
+ LargeChunkAttachments.insert_or_assign(RawHash, std::move(FetchFunc));
+ }
+ else
+ {
+ UploadAttachment->Size = CompressedSize;
+
+ std::pair<uint64_t, IoBuffer> LooseAttachment(RawData.GetSize(), std::move(TempAttachmentBuffer));
+
+ RwLock::ExclusiveLockScope _(ResolveLock);
+ LooseUploadAttachments.insert_or_assign(RawHash, std::move(LooseAttachment));
+ }
+ }
+ }
+ else
+ {
+ if (UploadAttachment->Size > MaxChunkEmbedSize)
+ {
+ TGetAttachmentBufferFunc FetchFunc = [&ChunkStore](const IoHash& RawHash) {
+ return CompositeBuffer(SharedBuffer(std::move(ChunkStore.FindChunkByCid(RawHash))));
+ };
+ RwLock::ExclusiveLockScope _(ResolveLock);
+ LargeChunkAttachments.insert_or_assign(RawHash, std::move(FetchFunc));
+ }
+ }
+ });
+ }
+
+ Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(IsAborted, IsPaused);
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ AbortFlag.store(true);
+ }
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Resolving attachments"sv,
+ fmt::format("{}{} remaining...", AbortFlag.load() ? "Aborting, " : "", PendingWork),
+ UploadAttachments.size(),
+ PendingWork,
+ UploadAttachmentsTimer.GetElapsedTimeMs());
+ });
+
+ if (!AbortFlag.load())
+ {
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Resolving attachments"sv,
+ "",
+ UploadAttachments.size(),
+ 0,
+ UploadAttachmentsTimer.GetElapsedTimeMs());
+ }
+ }
+
RemoteProjectStore::Result WriteOplogSection(ProjectStore::Oplog& Oplog, const CbObjectView& SectionObject, JobContext* OptionalContext)
{
using namespace std::literals;
@@ -941,70 +1193,6 @@ namespace remotestore_impl {
JobContext* m_OptionalContext;
};
- IoBuffer CompressToTempFile(const IoHash& RawHash,
- const IoBuffer& RawData,
- const std::filesystem::path& AttachmentPath,
- OodleCompressor Compressor,
- OodleCompressionLevel CompressionLevel)
- {
- ZEN_ASSERT(!IsFile(AttachmentPath));
- BasicFile CompressedFile;
- std::error_code Ec;
- CompressedFile.Open(AttachmentPath, BasicFile::Mode::kTruncateDelete, Ec);
- if (Ec)
- {
- throw std::system_error(Ec, fmt::format("Failed to create temp file for blob {} at '{}'", RawHash, AttachmentPath));
- }
-
- bool CouldCompress = CompressedBuffer::CompressToStream(
- CompositeBuffer(SharedBuffer(RawData)),
- [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
- ZEN_UNUSED(SourceOffset);
- CompressedFile.Write(RangeBuffer, Offset);
- ZEN_UNUSED(SourceOffset, SourceSize);
- // StreamRawBytes += SourceSize;
- // StreamCompressedBytes += RangeBuffer.GetSize();
- },
- Compressor,
- CompressionLevel);
- if (CouldCompress)
- {
-#if ZEN_BUILD_DEBUG
- uint64_t CompressedSize = CompressedFile.FileSize();
- // void* FileHandle = CompressedFile.Handle();
- IoBuffer TempPayload = IoBuffer(IoBuffer::BorrowedFile, CompressedFile.Handle(), 0, CompressedSize);
- // ZEN_ASSERT(TempPayload);
- // TempPayload.SetDeleteOnClose(true);
- IoHash VerifyRawHash;
- uint64_t VerifyRawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(TempPayload), VerifyRawHash, VerifyRawSize);
- ZEN_ASSERT(Compressed);
- ZEN_ASSERT(RawHash == VerifyRawHash);
- ZEN_ASSERT(RawData.GetSize() == VerifyRawSize);
- // CompressedFile.Attach(FileHandle);
-#endif // ZEN_BUILD_DEBUG
- }
- else
- {
- // Compressed is larger than source data...
- CompressedBuffer CompressedBlob =
- CompressedBuffer::Compress(SharedBuffer(std::move(RawData)), OodleCompressor::Mermaid, OodleCompressionLevel::None);
- if (!CompressedBlob)
- {
- throw std::runtime_error(fmt::format("Failed to compress blob {}", RawHash));
- }
- CompressedFile.SetFileSize(0);
- CompressedFile.Write(CompressedBlob.GetCompressed(), 0);
- }
- IoBuffer TempAttachmentBuffer = IoBufferBuilder::MakeFromFile(AttachmentPath);
- ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress());
- CompressedFile.Close();
- ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress());
- TempAttachmentBuffer.SetDeleteOnClose(true);
- ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress());
- return TempAttachmentBuffer;
- }
-
void DownloadAndSaveBlockChunks(LoadOplogContext& Context,
Latch& AttachmentsDownloadLatch,
Latch& AttachmentsWriteLatch,
@@ -2265,7 +2453,7 @@ namespace remotestore_impl {
{
auto It = BulkBlockAttachmentsToUpload.find(Chunk);
ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end());
- CompressedBuffer ChunkPayload = It->second(It->first).second;
+ CompositeBuffer ChunkPayload = It->second(It->first).second;
if (!ChunkPayload)
{
RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound),
@@ -2274,8 +2462,8 @@ namespace remotestore_impl {
ChunkBuffers.clear();
break;
}
- ChunksSize += ChunkPayload.GetCompressedSize();
- ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).GetCompressed().Flatten().AsIoBuffer()));
+ ChunksSize += ChunkPayload.GetSize();
+ ChunkBuffers.emplace_back(SharedBuffer(ChunkPayload.Flatten().AsIoBuffer()));
}
RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers);
if (Result.ErrorCode)
@@ -2583,268 +2771,33 @@ BuildContainer(CidStore& ChunkStore,
}
}
- RwLock ResolveLock;
- std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes;
+ std::unordered_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LargeChunkAttachments;
std::unordered_map<IoHash, std::pair<uint64_t, IoBuffer>, IoHash::Hasher> LooseUploadAttachments;
remotestore_impl::ReportMessage(OptionalContext,
fmt::format("Resolving {} attachments from {} ops", UploadAttachments.size(), TotalOpCount));
- Stopwatch ResolveAttachmentsProgressTimer;
- Latch ResolveAttachmentsLatch(1);
- for (auto& It : UploadAttachments)
- {
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return {};
- }
-
- ResolveAttachmentsLatch.AddCount(1);
-
- WorkerPool.ScheduleWork(
- [&ChunkStore,
- UploadAttachment = &It.second,
- RawHash = It.first,
- &ResolveAttachmentsLatch,
- &ResolveLock,
- &ChunkedHashes,
- &LargeChunkHashes,
- &LooseUploadAttachments,
- &MissingHashes,
- &OnLargeAttachment,
- &AttachmentTempPath,
- MaxChunkEmbedSize,
- ChunkFileSizeLimit,
- AllowChunking,
- &RemoteResult,
- OptionalContext]() {
- ZEN_TRACE_CPU("PrepareChunk");
-
- auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); });
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- return;
- }
-
- try
- {
- ZEN_ASSERT(UploadAttachment->Size != 0);
- if (!UploadAttachment->RawPath.empty())
- {
- if (UploadAttachment->Size > (MaxChunkEmbedSize * 2))
- {
- // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't
- // it will be a loose attachment instead of going into a block
- OnLargeAttachment(
- RawHash,
- [RawPath = UploadAttachment->RawPath, AttachmentTempPath, UploadAttachment](
- const IoHash& RawHash) -> CompositeBuffer {
- IoBuffer RawData = IoBufferBuilder::MakeFromFile(RawPath);
- if (!RawData)
- {
- throw std::runtime_error(
- fmt::format("Failed to read source file for blob {} from '{}'", RawHash, RawPath));
- }
-
- std::filesystem::path AttachmentPath = AttachmentTempPath;
- AttachmentPath.append(RawHash.ToHexString());
-
- IoBuffer TempAttachmentBuffer = remotestore_impl::CompressToTempFile(RawHash,
- RawData,
- AttachmentPath,
- OodleCompressor::Mermaid,
- OodleCompressionLevel::VeryFast);
- TempAttachmentBuffer.SetDeleteOnClose(true);
-
-#if ZEN_BUILD_DEBUG
- IoHash VerifyRawHash;
- uint64_t VerifyRawSize;
- ZEN_ASSERT_SLOW(
- CompressedBuffer::FromCompressed(SharedBuffer(TempAttachmentBuffer), VerifyRawHash, VerifyRawSize));
- ZEN_ASSERT_SLOW(VerifyRawHash == RawHash);
- ZEN_ASSERT_SLOW(VerifyRawSize == RawData.GetSize());
-#endif // ZEN_BUILD_DEBUG
-
- ZEN_INFO("Saved temp attachment to '{}', {} ({})",
- AttachmentPath,
- NiceBytes(UploadAttachment->Size),
- NiceBytes(TempAttachmentBuffer.GetSize()));
- return CompositeBuffer(SharedBuffer(std::move(TempAttachmentBuffer)));
- });
- ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
- }
- else
- {
- // Compress inline - check compressed size to see if it should go into a block or not
- IoBuffer RawData = IoBufferBuilder::MakeFromFile(UploadAttachment->RawPath);
- if (!RawData)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
- fmt::format("Failed to read attachment {}", UploadAttachment->RawPath),
- "");
- return;
- }
-
- std::filesystem::path TempFilePath = AttachmentTempPath;
- TempFilePath.append(RawHash.ToHexString());
-
- try
- {
- IoBuffer TempAttachmentBuffer = remotestore_impl::CompressToTempFile(RawHash,
- RawData,
- TempFilePath,
- OodleCompressor::Mermaid,
- OodleCompressionLevel::VeryFast);
- TempAttachmentBuffer.SetDeleteOnClose(true);
-#if ZEN_BUILD_DEBUG
- IoHash VerifyRawHash;
- uint64_t VerifyRawSize;
- ZEN_ASSERT_SLOW(
- CompressedBuffer::FromCompressed(SharedBuffer(TempAttachmentBuffer), VerifyRawHash, VerifyRawSize));
- ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer))
- .CompressedBuffer::Decompress());
- ZEN_ASSERT_SLOW(VerifyRawHash == RawHash);
- ZEN_ASSERT_SLOW(VerifyRawSize == RawData.GetSize());
-#endif // ZEN_BUILD_DEBUG
-
- uint64_t CompressedSize = TempAttachmentBuffer.GetSize();
-
- ZEN_INFO("Saved temp attachment to '{}', {} ({})",
- TempFilePath,
- NiceBytes(UploadAttachment->Size),
- NiceBytes(CompressedSize));
-
- if (CompressedSize > MaxChunkEmbedSize)
- {
- OnLargeAttachment(RawHash, [Data = std::move(TempAttachmentBuffer)](const IoHash&) {
- return CompositeBuffer(SharedBuffer(std::move(Data)));
- });
- ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
- }
- else
- {
- UploadAttachment->Size = CompressedSize;
- {
- IoHash VerifyRawHash2;
- uint64_t VerifyRawSize2;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(TempAttachmentBuffer),
- VerifyRawHash2,
- VerifyRawSize2);
- ZEN_ASSERT_SLOW(Compressed.Decompress());
- ZEN_ASSERT_SLOW(VerifyRawHash2 == RawHash);
- ZEN_ASSERT_SLOW(VerifyRawSize2 == RawData.GetSize());
- }
-
- ResolveLock.WithExclusiveLock([RawHash,
- RawSize = RawData.GetSize(),
- &LooseUploadAttachments,
- Data = std::move(TempAttachmentBuffer)]() {
- LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data)));
- });
- }
- }
- catch (const std::system_error& SysEx)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to compress blob {} to temporary file '{}', reason: ({}) {}",
- RawHash,
- TempFilePath,
- SysEx.code().value(),
- SysEx.what()),
- "");
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to compress blob {} to temporary file '{}', reason: {}",
- RawHash,
- TempFilePath,
- Ex.what()),
- "");
- }
- }
- }
- else
- {
- if (UploadAttachment->Size > MaxChunkEmbedSize)
- {
- OnLargeAttachment(RawHash, [&ChunkStore](const IoHash& RawHash) {
- return CompositeBuffer(SharedBuffer(std::move(ChunkStore.FindChunkByCid(RawHash))));
- });
- ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
- }
- }
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
- fmt::format("Failed to resolve attachment {}", RawHash),
- Ex.what());
- }
- },
- WorkerThreadPool::EMode::EnableBacklog);
- }
- ResolveAttachmentsLatch.CountDown();
-
- {
- while (!ResolveAttachmentsLatch.Wait(1000))
- {
- ptrdiff_t Remaining = ResolveAttachmentsLatch.Remaining();
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- while (!ResolveAttachmentsLatch.Wait(1000))
- {
- Remaining = ResolveAttachmentsLatch.Remaining();
- remotestore_impl::ReportProgress(OptionalContext,
- "Resolving attachments"sv,
- "Aborted"sv,
- UploadAttachments.size(),
- Remaining,
- ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
- }
- remotestore_impl::ReportProgress(OptionalContext,
- "Resolving attachments"sv,
- "Aborted"sv,
- UploadAttachments.size(),
- 0,
- ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
- return {};
- }
- remotestore_impl::ReportProgress(OptionalContext,
- "Resolving attachments"sv,
- fmt::format("{} remaining...", Remaining),
- UploadAttachments.size(),
- Remaining,
- ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
- }
- if (UploadAttachments.size() > 0)
- {
- remotestore_impl::ReportProgress(OptionalContext,
- "Resolving attachments"sv,
- ""sv,
- UploadAttachments.size(),
- 0,
- ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
- }
- }
+ ResolveAttachments(ChunkStore,
+ WorkerPool,
+ MaxChunkEmbedSize,
+ AttachmentTempPath,
+ UploadAttachments,
+ LargeChunkAttachments,
+ LooseUploadAttachments,
+ OptionalContext);
if (remotestore_impl::IsCancelled(OptionalContext))
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return {};
}
- for (const auto& It : LargeChunkHashes)
+ std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes;
+
+ for (auto& It : LargeChunkAttachments)
{
- UploadAttachments.erase(It);
+ UploadAttachments.erase(It.first);
+ LargeChunkHashes.insert(It.first);
+ OnLargeAttachment(It.first, std::move(It.second));
}
RwLock BlocksLock;
@@ -3022,38 +2975,23 @@ BuildContainer(CidStore& ChunkStore,
{
uint64_t RawSize = It->second.first;
IoBuffer Payload = It->second.second;
-
- IoHash VerifyRawHash;
- uint64_t VerifyRawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), VerifyRawHash, VerifyRawSize);
- ZEN_ASSERT_SLOW(Compressed.Decompress());
- ZEN_ASSERT_SLOW(VerifyRawHash == AttachmentHash);
- ZEN_ASSERT_SLOW(VerifyRawSize == RawSize);
-
- LooseUploadAttachments.erase(It);
- return [RawSize = RawSize,
- Payload = SharedBuffer(Payload)](const IoHash& ChunkHash) -> std::pair<uint64_t, CompressedBuffer> {
+ return [RawSize = RawSize, Payload = Payload](const IoHash& ChunkHash) -> std::pair<uint64_t, CompositeBuffer> {
ZEN_UNUSED(ChunkHash);
- // CompressedBuffer Compressed =
- // CompressedBuffer::FromCompressedNoValidate(Payload.AsIoBuffer());
- IoHash VerifyRawHash;
- uint64_t VerifyRawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Payload, VerifyRawHash, VerifyRawSize);
- ZEN_ASSERT_SLOW(Compressed.Decompress());
- ZEN_ASSERT_SLOW(VerifyRawHash == ChunkHash);
- ZEN_ASSERT_SLOW(VerifyRawSize == RawSize);
- return {RawSize, Compressed};
+ return {RawSize, CompositeBuffer(SharedBuffer(std::move(Payload)))};
};
}
else
{
- ZEN_ASSERT_SLOW(ChunkStore.ContainsChunk(AttachmentHash));
- return [&ChunkStore](const IoHash& RawHash) -> std::pair<uint64_t, CompressedBuffer> {
+ return [&ChunkStore](const IoHash& RawHash) -> std::pair<uint64_t, CompositeBuffer> {
IoBuffer Chunk = ChunkStore.FindChunkByCid(RawHash);
if (!Chunk)
{
throw std::runtime_error(fmt::format("Failed to find chunk {} in cid store", RawHash));
}
+
+ // These are small chunks - make memory resident
+ Chunk = IoBufferBuilder::ReadFromFileMaybe(Chunk);
+
IoHash ValidateRawHash;
uint64_t RawSize = 0;
CompressedBuffer Compressed =
@@ -3066,7 +3004,7 @@ BuildContainer(CidStore& ChunkStore,
{
throw std::runtime_error(fmt::format("Chunk {} in cid store is malformed (mismatching raw hash)", RawHash));
}
- return {RawSize, Compressed};
+ return {RawSize, Compressed.GetCompressed()};
};
}
};
@@ -3118,7 +3056,9 @@ BuildContainer(CidStore& ChunkStore,
return Compressed.GetCompressed();
});
- ResolveLock.WithExclusiveLock([ChunkHash, &LargeChunkHashes]() { LargeChunkHashes.insert(ChunkHash); });
+ // ResolveLock.WithExclusiveLock([ChunkHash, &LargeChunkHashes]() {
+ LargeChunkHashes.insert(ChunkHash);
+ // });
}
else
{
@@ -3145,11 +3085,12 @@ BuildContainer(CidStore& ChunkStore,
ZEN_ASSERT(Source.Offset + Source.Size <= ChunkedFile.Source.GetSize());
return [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size](
- const IoHash&) -> std::pair<uint64_t, CompressedBuffer> {
+ const IoHash&) -> std::pair<uint64_t, CompositeBuffer> {
return {Size,
CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)),
OodleCompressor::Mermaid,
- OodleCompressionLevel::None)};
+ OodleCompressionLevel::None)
+ .GetCompressed()};
};
}
else
@@ -6915,7 +6856,7 @@ TEST_CASE("project.store.blockcomposer.path_b_fits_pending")
Oid Op1 = MakeTestOid(1);
std::vector<IoHash> Hashes = {MakeTestHash(1), MakeTestHash(2)};
- std::vector<uint64_t> Sizes = {200, 300};
+ std::vector<uint64_t> Sizes = {60, 80}; // each <= MaxChunkEmbedSize (100); sum=140 << UsableSize (1000)
std::vector<Oid> Keys = {Op1, Op1};
std::vector<std::vector<IoHash>> Blocks;
@@ -6958,19 +6899,22 @@ TEST_CASE("project.store.blockcomposer.path_c_75pct_flush")
// Path C: the pending block is >75% full by bytes when a new op arrives that does not fit.
// The pending block is flushed first; the new op chunk is then placed via Path B, and
// emitted by the final flush.
+ //
+ // UsableSize=100 so that 75% threshold=75. MaxChunkEmbedSize is fixed at 100 by MakeTestConfig,
+ // so Op1=80 bytes exceeds the 75% threshold within a single chunk.
using namespace projectstore_testutils;
- constexpr uint64_t UsableSize = 1000; // 75% threshold = 750 bytes
+ constexpr uint64_t UsableSize = 100; // 75% threshold = 75 bytes
constexpr uint64_t MaxChunks = 4;
remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks));
Oid Op1 = MakeTestOid(1);
Oid Op2 = MakeTestOid(2);
- // Op1: 800 bytes -> Path B, pending = {800 bytes, 1 chunk} (800 > 750)
- // Op2: 300 bytes -> does not fit (800+300=1100 > 1000) and 800 > 750 -> Path C flush,
- // then Path B, pending = {300 bytes} -> final flush
+ // Op1: 80 bytes -> Path B, pending = {80 bytes, 1 chunk} (80 > 75)
+ // Op2: 30 bytes -> does not fit (80+30=110 > 100) and 80 > 75 -> Path C flush,
+ // then Path B, pending = {30 bytes} -> final flush
std::vector<IoHash> Hashes = {MakeTestHash(1), MakeTestHash(2)};
- std::vector<uint64_t> Sizes = {800, 300};
+ std::vector<uint64_t> Sizes = {80, 30};
std::vector<Oid> Keys = {Op1, Op2};
std::vector<std::vector<IoHash>> Blocks;
@@ -7063,7 +7007,7 @@ TEST_CASE("project.store.blockcomposer.final_flush")
Oid Op2 = MakeTestOid(2);
Oid Op3 = MakeTestOid(3);
std::vector<IoHash> Hashes = {MakeTestHash(1), MakeTestHash(2), MakeTestHash(3)};
- std::vector<uint64_t> Sizes = {100, 200, 150};
+ std::vector<uint64_t> Sizes = {60, 80, 70}; // each <= MaxChunkEmbedSize (100); sum=210 << UsableSize (1000)
std::vector<Oid> Keys = {Op1, Op2, Op3};
std::vector<std::vector<IoHash>> Blocks;
@@ -7084,19 +7028,21 @@ TEST_CASE("project.store.blockcomposer.path_b_b_c")
// by the final flush.
using namespace projectstore_testutils;
- constexpr uint64_t UsableSize = 1000; // 75% threshold = 750 bytes
+ // UsableSize=200 so that 75% threshold=150. MaxChunkEmbedSize is fixed at 100 by MakeTestConfig,
+ // so two ops of 90 bytes each accumulate 180 bytes in pending, exceeding the threshold.
+ constexpr uint64_t UsableSize = 200; // 75% threshold = 150 bytes
constexpr uint64_t MaxChunks = 8;
remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks));
Oid Op1 = MakeTestOid(1);
Oid Op2 = MakeTestOid(2);
Oid Op3 = MakeTestOid(3);
- // Op1: 400 bytes -> Path B, pending = {400 bytes, 1 chunk}
- // Op2: 400 bytes -> Path B, pending = {800 bytes, 2 chunks} (800 > 750)
- // Op3: 300 bytes -> does not fit (1100 > 1000) and 800 > 750 -> Path C flush -> block 1
- // then Path B, pending = {300 bytes} -> final flush -> block 2
+ // Op1: 90 bytes -> Path B, pending = {90 bytes, 1 chunk} (90 <= 150)
+ // Op2: 90 bytes -> Path B, pending = {180 bytes, 2 chunks} (180 > 150)
+ // Op3: 60 bytes -> does not fit (180+60=240 > 200) and 180 > 150 -> Path C flush -> block 1
+ // then Path B, pending = {60 bytes} -> final flush -> block 2
std::vector<IoHash> Hashes = {MakeTestHash(1), MakeTestHash(2), MakeTestHash(3)};
- std::vector<uint64_t> Sizes = {400, 400, 300};
+ std::vector<uint64_t> Sizes = {90, 90, 60};
std::vector<Oid> Keys = {Op1, Op2, Op3};
std::vector<std::vector<IoHash>> Blocks;
@@ -7110,12 +7056,11 @@ TEST_CASE("project.store.blockcomposer.path_b_b_c")
CHECK(Blocks[1][0] == MakeTestHash(3));
}
-TEST_CASE("project.store.blockcomposer.path_d_b")
+TEST_CASE("project.store.blockcomposer.path_a_b_final_flush")
{
- // Multi-step: Path D -> Path B
- // Path D fills and flushes the pending block with a partial set of the current op chunks.
- // The remainder of that op then fits into the freshly emptied pending block via Path B,
- // and is emitted by the final flush.
+ // Multi-step: Path A -> Path B -> final flush
+ // The first op exactly fills a block (count-saturated, Path A). The second op fits into
+ // the now-empty pending block via Path B and is emitted by the final flush.
using namespace projectstore_testutils;
constexpr uint64_t UsableSize = 1000;
@@ -7124,47 +7069,86 @@ TEST_CASE("project.store.blockcomposer.path_d_b")
Oid Op1 = MakeTestOid(1);
Oid Op2 = MakeTestOid(2);
- // Op1: 3 x 100 bytes -> Path B, pending = {3 chunks, 300 bytes}
- // Op2: 2 x 100 bytes -> count 3+2=5 > 4; bytes 300+200=500 <= 1000; 300 <= 750 -> Path D
- // D adds op2[0] to pending (4 chunks, count capacity reached), flushes -> block 1
- // op2[1] remaining -> Path B (pending empty) -> final flush -> block 2
- std::vector<IoHash> Hashes = {MakeTestHash(1), MakeTestHash(2), MakeTestHash(3), MakeTestHash(4), MakeTestHash(5)};
- std::vector<uint64_t> Sizes = {100, 100, 100, 100, 100};
- std::vector<Oid> Keys = {Op1, Op1, Op1, Op2, Op2};
+ // Op1: 4 x 100 bytes -> MaxChunksPerBlock reached -> CurrentOpFillFullBlock=true -> Path A
+ // Op2: 2 x 100 bytes -> Path B (pending empty) -> final flush
+ std::vector<IoHash> Hashes = {MakeTestHash(1), MakeTestHash(2), MakeTestHash(3), MakeTestHash(4), MakeTestHash(5), MakeTestHash(6)};
+ std::vector<uint64_t> Sizes = {100, 100, 100, 100, 100, 100};
+ std::vector<Oid> Keys = {Op1, Op1, Op1, Op1, Op2, Op2};
std::vector<std::vector<IoHash>> Blocks;
Composer.Compose(Hashes, Sizes, Keys, [&](std::vector<IoHash>&& B) { Blocks.push_back(std::move(B)); });
+ // Block 1: Path A standalone (all 4 Op1 chunks). Block 2: final flush of Op2 (2 chunks).
REQUIRE(Blocks.size() == 2);
CHECK(Blocks[0].size() == 4);
+ CHECK(Blocks[0][0] == MakeTestHash(1));
CHECK(Blocks[0][3] == MakeTestHash(4));
- CHECK(Blocks[1].size() == 1);
+ CHECK(Blocks[1].size() == 2);
CHECK(Blocks[1][0] == MakeTestHash(5));
+ CHECK(Blocks[1][1] == MakeTestHash(6));
}
-TEST_CASE("project.store.blockcomposer.path_c_b")
+TEST_CASE("project.store.blockcomposer.empty_input")
{
- // Multi-step: Path C -> Path B
- // Path C flushes the pending block. The op chunk that triggered Path C is then placed
- // via Path B into the freshly emptied pending block, and emitted by the final flush.
+ // Compose called with zero attachments emits no blocks and does not invoke the final flush.
using namespace projectstore_testutils;
- constexpr uint64_t UsableSize = 1000; // 75% threshold = 750 bytes
+ constexpr uint64_t UsableSize = 1000;
+ constexpr uint64_t MaxChunks = 4;
+ remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks));
+
+ std::vector<std::vector<IoHash>> Blocks;
+ Composer.Compose({}, {}, {}, [&](std::vector<IoHash>&& B) { Blocks.push_back(std::move(B)); });
+
+ CHECK(Blocks.empty());
+}
+
+TEST_CASE("project.store.blockcomposer.single_attachment")
+{
+ // A single chunk from a single op: the gather inner loop never runs.
+ // Path B places it into the empty pending block; the final flush emits it.
+ using namespace projectstore_testutils;
+
+ constexpr uint64_t UsableSize = 1000;
+ constexpr uint64_t MaxChunks = 4;
+ remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks));
+
+ Oid Op1 = MakeTestOid(1);
+ std::vector<IoHash> Hashes = {MakeTestHash(1)};
+ std::vector<uint64_t> Sizes = {60};
+ std::vector<Oid> Keys = {Op1};
+
+ std::vector<std::vector<IoHash>> Blocks;
+ Composer.Compose(Hashes, Sizes, Keys, [&](std::vector<IoHash>&& B) { Blocks.push_back(std::move(B)); });
+
+ REQUIRE(Blocks.size() == 1);
+ CHECK(Blocks[0].size() == 1);
+ CHECK(Blocks[0][0] == MakeTestHash(1));
+}
+
+TEST_CASE("project.store.blockcomposer.path_a_size_saturation")
+{
+ // Path A triggered by size overflow in the gather phase (not count).
+ // Op1 has 2 chunks of 60 bytes; adding the second would exceed UsableSize=100.
+ // The gather loop sets CurrentOpFillFullBlock=true after collecting only the first chunk,
+ // which Path A emits as a standalone block. The second chunk is then processed in the next
+ // outer-loop iteration and placed via Path B, emitted by the final flush.
+ using namespace projectstore_testutils;
+
+ constexpr uint64_t UsableSize = 100; // MaxChunkEmbedSize=100; two 60-byte chunks overflow
constexpr uint64_t MaxChunks = 4;
remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks));
Oid Op1 = MakeTestOid(1);
- Oid Op2 = MakeTestOid(2);
- // Op1: 800 bytes -> Path B, pending = {800 bytes, 1 chunk}
- // Op2: 300 bytes -> does not fit (1100 > 1000) and 800 > 750 -> Path C flush -> block 1
- // then Path B, pending = {300 bytes} -> final flush -> block 2
+ // chunk0=60, chunk1=60: 60+60=120 > UsableSize=100 -> size overflow after gathering chunk0
std::vector<IoHash> Hashes = {MakeTestHash(1), MakeTestHash(2)};
- std::vector<uint64_t> Sizes = {800, 300};
- std::vector<Oid> Keys = {Op1, Op2};
+ std::vector<uint64_t> Sizes = {60, 60};
+ std::vector<Oid> Keys = {Op1, Op1};
std::vector<std::vector<IoHash>> Blocks;
Composer.Compose(Hashes, Sizes, Keys, [&](std::vector<IoHash>&& B) { Blocks.push_back(std::move(B)); });
+ // Block 1: Path A standalone (chunk0 only). Block 2: final flush of chunk1.
REQUIRE(Blocks.size() == 2);
CHECK(Blocks[0].size() == 1);
CHECK(Blocks[0][0] == MakeTestHash(1));
@@ -7172,11 +7156,78 @@ TEST_CASE("project.store.blockcomposer.path_c_b")
CHECK(Blocks[1][0] == MakeTestHash(2));
}
-TEST_CASE("project.store.blockcomposer.path_a_b_final_flush")
+TEST_CASE("project.store.blockcomposer.path_b_exact_size_fill")
{
- // Multi-step: Path A -> Path B -> final flush
- // The first op exactly fills a block (count-saturated, Path A). The second op fits into
- // the now-empty pending block via Path B and is emitted by the final flush.
+ // Path B immediate flush triggered by exact byte fill (PendingBlockSize == UsableBlockSize),
+ // as opposed to the count-fill case covered by path_b_exact_count_fill.
+ // Op1 adds 60 bytes; Op2 adds the remaining 40 bytes to exactly reach UsableSize=100.
+ // Path B flushes immediately after Op2 merges; no separate final flush is needed.
+ using namespace projectstore_testutils;
+
+ constexpr uint64_t UsableSize = 100;
+ constexpr uint64_t MaxChunks = 4;
+ remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks));
+
+ Oid Op1 = MakeTestOid(1);
+ Oid Op2 = MakeTestOid(2);
+ // Op1: 60 bytes -> Path B, pending = {60 bytes, 1 chunk}
+ // Op2: 40 bytes -> 60+40=100 == UsableSize -> Path B, immediate size-exact flush
+ std::vector<IoHash> Hashes = {MakeTestHash(1), MakeTestHash(2)};
+ std::vector<uint64_t> Sizes = {60, 40};
+ std::vector<Oid> Keys = {Op1, Op2};
+
+ std::vector<std::vector<IoHash>> Blocks;
+ Composer.Compose(Hashes, Sizes, Keys, [&](std::vector<IoHash>&& B) { Blocks.push_back(std::move(B)); });
+
+ REQUIRE(Blocks.size() == 1);
+ CHECK(Blocks[0].size() == 2);
+ CHECK(Blocks[0][0] == MakeTestHash(1));
+ CHECK(Blocks[0][1] == MakeTestHash(2));
+}
+
+TEST_CASE("project.store.blockcomposer.path_d_size_limited_greedy")
+{
+ // Path D where the greedy fill loop stops because the next chunk would exceed
+ // UsableBlockSize by bytes (not because count reaches MaxChunksPerBlock).
+ //
+ // UsableSize=200, MaxChunks=8 (high count cap to ensure size is the binding constraint).
+ // Op1 puts 90 bytes in pending. Op2 has 3x60-byte chunks (total 180); they don't fit
+ // (90+180=270>200) and pending is <=75% full (90<=150) -> Path D.
+ // Greedy loop adds Op2[0] (90+60=150<=200), then Op2[1] would overflow (150+60=210>200)
+ // -> breaks on size. Pending [Op1,Op2[0]] is flushed as block 1.
+ // Op2[1] and Op2[2] (120 bytes total) fit in the empty pending via Path B -> final flush
+ // -> block 2.
+ using namespace projectstore_testutils;
+
+ constexpr uint64_t UsableSize = 200; // 75% threshold = 150 bytes
+ constexpr uint64_t MaxChunks = 8;
+ remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks));
+
+ Oid Op1 = MakeTestOid(1);
+ Oid Op2 = MakeTestOid(2);
+ std::vector<IoHash> Hashes = {MakeTestHash(1), MakeTestHash(2), MakeTestHash(3), MakeTestHash(4)};
+ std::vector<uint64_t> Sizes = {90, 60, 60, 60};
+ std::vector<Oid> Keys = {Op1, Op2, Op2, Op2};
+
+ std::vector<std::vector<IoHash>> Blocks;
+ Composer.Compose(Hashes, Sizes, Keys, [&](std::vector<IoHash>&& B) { Blocks.push_back(std::move(B)); });
+
+ // Block 1: [Op1, Op2[0]] (Path D size-limited flush). Block 2: [Op2[1], Op2[2]] (final flush).
+ REQUIRE(Blocks.size() == 2);
+ CHECK(Blocks[0].size() == 2);
+ CHECK(Blocks[0][0] == MakeTestHash(1));
+ CHECK(Blocks[0][1] == MakeTestHash(2));
+ CHECK(Blocks[1].size() == 2);
+ CHECK(Blocks[1][0] == MakeTestHash(3));
+ CHECK(Blocks[1][1] == MakeTestHash(4));
+}
+
+TEST_CASE("project.store.blockcomposer.path_a_pending_untouched")
+{
+ // Path A leaves the pending block untouched. Op1 accumulates 2 chunks in pending via
+ // Path B. Op2 then count-saturates the gather phase (4 chunks == MaxChunksPerBlock),
+ // triggering Path A which emits Op2 as a standalone block without disturbing pending.
+ // The final flush then emits Op1's 2 chunks still held in pending.
using namespace projectstore_testutils;
constexpr uint64_t UsableSize = 1000;
@@ -7185,23 +7236,25 @@ TEST_CASE("project.store.blockcomposer.path_a_b_final_flush")
Oid Op1 = MakeTestOid(1);
Oid Op2 = MakeTestOid(2);
- // Op1: 4 x 100 bytes -> MaxChunksPerBlock reached -> CurrentOpFillFullBlock=true -> Path A
- // Op2: 2 x 100 bytes -> Path B (pending empty) -> final flush
+ // Op1: 2 x 60 bytes -> Path B, pending = {2 chunks, 120 bytes}
+ // Op2: 4 x 100 bytes -> count reaches MaxChunks=4 -> CurrentOpFillFullBlock=true -> Path A
+ // Path A emits Op2 standalone as block 1; pending (Op1's chunks) is left untouched.
+ // Final flush emits pending -> block 2.
std::vector<IoHash> Hashes = {MakeTestHash(1), MakeTestHash(2), MakeTestHash(3), MakeTestHash(4), MakeTestHash(5), MakeTestHash(6)};
- std::vector<uint64_t> Sizes = {100, 100, 100, 100, 100, 100};
- std::vector<Oid> Keys = {Op1, Op1, Op1, Op1, Op2, Op2};
+ std::vector<uint64_t> Sizes = {60, 60, 100, 100, 100, 100};
+ std::vector<Oid> Keys = {Op1, Op1, Op2, Op2, Op2, Op2};
std::vector<std::vector<IoHash>> Blocks;
Composer.Compose(Hashes, Sizes, Keys, [&](std::vector<IoHash>&& B) { Blocks.push_back(std::move(B)); });
- // Block 1: Path A standalone (all 4 Op1 chunks). Block 2: final flush of Op2 (2 chunks).
+ // Block 1: Path A standalone (Op2's 4 chunks). Block 2: final flush of Op1's 2 pending chunks.
REQUIRE(Blocks.size() == 2);
CHECK(Blocks[0].size() == 4);
- CHECK(Blocks[0][0] == MakeTestHash(1));
- CHECK(Blocks[0][3] == MakeTestHash(4));
+ CHECK(Blocks[0][0] == MakeTestHash(3));
+ CHECK(Blocks[0][3] == MakeTestHash(6));
CHECK(Blocks[1].size() == 2);
- CHECK(Blocks[1][0] == MakeTestHash(5));
- CHECK(Blocks[1][1] == MakeTestHash(6));
+ CHECK(Blocks[1][0] == MakeTestHash(1));
+ CHECK(Blocks[1][1] == MakeTestHash(2));
}
TEST_SUITE_END();