aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-03-12 12:48:33 +0100
committerDan Engelbrecht <[email protected]>2026-03-14 14:19:07 +0100
commitbe6187b1299c338dfbea3409e5c54cf72be384e7 (patch)
treebeeb5fce4ecf3c039dbcddb88a841a9b34d0c762 /src
parentadd buildid updates to oplog and builds test scripts (#838) (diff)
downloadzen-be6187b1299c338dfbea3409e5c54cf72be384e7.tar.xz
zen-be6187b1299c338dfbea3409e5c54cf72be384e7.zip
wip
Diffstat (limited to 'src')
-rw-r--r--src/zenremotestore/chunking/chunkblock.cpp20
-rw-r--r--src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h2
-rw-r--r--src/zenremotestore/projectstore/remoteprojectstore.cpp1345
3 files changed, 925 insertions, 442 deletions
diff --git a/src/zenremotestore/chunking/chunkblock.cpp b/src/zenremotestore/chunking/chunkblock.cpp
index cca32c17d..afc1cb01c 100644
--- a/src/zenremotestore/chunking/chunkblock.cpp
+++ b/src/zenremotestore/chunking/chunkblock.cpp
@@ -352,9 +352,12 @@ GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks,
BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(ChunkCount), BufferEndPtr);
for (const auto& It : FetchChunks)
{
- std::pair<uint64_t, CompressedBuffer> Chunk = It.second(It.first);
- uint64_t ChunkSize = 0;
- std::span<const SharedBuffer> Segments = Chunk.second.GetCompressed().GetSegments();
+ std::pair<uint64_t, CompressedBuffer> Chunk = It.second(It.first);
+ ZEN_ASSERT_SLOW(Chunk.second.DecodeRawSize() == Chunk.first);
+ ZEN_ASSERT_SLOW(Chunk.second.DecodeRawHash() == It.first);
+ ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk.second.Decompress().AsIoBuffer()) == It.first);
+ uint64_t ChunkSize = 0;
+ std::span<const SharedBuffer> Segments = Chunk.second.GetCompressed().GetSegments();
for (const SharedBuffer& Segment : Segments)
{
ZEN_ASSERT(Segment.IsOwned());
@@ -374,6 +377,17 @@ GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks,
CompressedBuffer CompressedBlock =
CompressedBuffer::Compress(CompositeBuffer(std::move(ChunkSegments)), OodleCompressor::Mermaid, OodleCompressionLevel::None);
OutBlock.BlockHash = CompressedBlock.DecodeRawHash();
+
+ uint64_t VerifyHeaderSize = 0;
+ ZEN_ASSERT_SLOW(IterateChunkBlock(
+ CompressedBlock.Decompress(),
+ [](CompressedBuffer&& Chunk, const IoHash& AttachmentHash) {
+ ZEN_ASSERT(Chunk.DecodeRawHash() == AttachmentHash);
+ SharedBuffer Decompressed = Chunk.Decompress();
+ ZEN_ASSERT(AttachmentHash == IoHash::HashBuffer(Decompressed.AsIoBuffer()));
+ },
+ VerifyHeaderSize));
+
return CompressedBlock;
}
diff --git a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h
index 084d975a2..72a6f2969 100644
--- a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h
+++ b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h
@@ -150,7 +150,7 @@ struct RemoteStoreOptions
size_t ChunkFileSizeLimit = DefaultChunkFileSizeLimit;
};
-typedef std::function<IoBuffer(const IoHash& AttachmentHash)> TGetAttachmentBufferFunc;
+typedef std::function<CompositeBuffer(const IoHash& AttachmentHash)> TGetAttachmentBufferFunc;
RemoteProjectStore::LoadContainerResult BuildContainer(
CidStore& ChunkStore,
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp
index c44b06305..09ab0662c 100644
--- a/src/zenremotestore/projectstore/remoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp
@@ -277,6 +277,70 @@ namespace remotestore_impl {
JobContext* m_OptionalContext;
};
+ IoBuffer CompressToTempFile(const IoHash& RawHash,
+ const IoBuffer& RawData,
+ const std::filesystem::path& AttachmentPath,
+ OodleCompressor Compressor,
+ OodleCompressionLevel ComressionLevel)
+ {
+ ZEN_ASSERT(!IsFile(AttachmentPath));
+ BasicFile CompressedFile;
+ std::error_code Ec;
+ CompressedFile.Open(AttachmentPath, BasicFile::Mode::kTruncateDelete, Ec);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to create temp file for blob {} at '{}'", RawHash, AttachmentPath));
+ }
+
+ bool CouldCompress = CompressedBuffer::CompressToStream(
+ CompositeBuffer(SharedBuffer(RawData)),
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset);
+ CompressedFile.Write(RangeBuffer, Offset);
+ ZEN_UNUSED(SourceOffset, SourceSize);
+ // StreamRawBytes += SourceSize;
+ // StreamCompressedBytes += RangeBuffer.GetSize();
+ },
+ Compressor,
+ ComressionLevel);
+ if (CouldCompress)
+ {
+#if ZEN_BUILD_DEBUG
+ uint64_t CompressedSize = CompressedFile.FileSize();
+ // void* FileHandle = CompressedFile.Handle();
+ IoBuffer TempPayload = IoBuffer(IoBuffer::BorrowedFile, CompressedFile.Handle(), 0, CompressedSize);
+ // ZEN_ASSERT(TempPayload);
+ // TempPayload.SetDeleteOnClose(true);
+ IoHash VerifyRawHash;
+ uint64_t VerifyRawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(TempPayload), VerifyRawHash, VerifyRawSize);
+ ZEN_ASSERT(Compressed);
+ ZEN_ASSERT(RawHash == VerifyRawHash);
+ ZEN_ASSERT(RawData.GetSize() == VerifyRawSize);
+ // CompressedFile.Attach(FileHandle);
+#endif // ZEN_BUILD_DEBUG
+ }
+ else
+ {
+ // Compressed is larger than source data...
+ CompressedBuffer CompressedBlob =
+ CompressedBuffer::Compress(SharedBuffer(std::move(RawData)), OodleCompressor::Mermaid, OodleCompressionLevel::None);
+ if (!CompressedBlob)
+ {
+ throw std::runtime_error("Failed to compress blob {}");
+ }
+ CompressedFile.SetFileSize(0);
+ CompressedFile.Write(CompressedBlob.GetCompressed(), 0);
+ }
+ IoBuffer TempAttachmentBuffer = IoBufferBuilder::MakeFromFile(AttachmentPath);
+ ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress());
+ CompressedFile.Close();
+ ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress());
+ TempAttachmentBuffer.SetDeleteOnClose(true);
+ ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress());
+ return TempAttachmentBuffer;
+ }
+
void DownloadAndSaveBlockChunks(LoadOplogContext& Context,
Latch& AttachmentsDownloadLatch,
Latch& AttachmentsWriteLatch,
@@ -1141,6 +1205,7 @@ namespace remotestore_impl {
}
return;
}
+ ZEN_ASSERT(AttachmentResult.Bytes);
BlobBuffer = std::move(AttachmentResult.Bytes);
ZEN_DEBUG("Loaded large attachment '{}' in {} ({})",
RawHash,
@@ -1161,12 +1226,15 @@ namespace remotestore_impl {
uint64_t AttachmentSize = BlobBuffer.GetSize();
Info.AttachmentsDownloaded.fetch_add(1);
Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize);
+ ZEN_ASSERT(BlobBuffer);
AttachmentsWriteLatch.AddCount(1);
Context.WorkerPool.ScheduleWork(
[&Context, &AttachmentsWriteLatch, &RemoteResult, &Info, RawHash, AttachmentSize, Bytes = std::move(BlobBuffer)]() {
ZEN_TRACE_CPU("WriteAttachment");
+ ZEN_ASSERT(Bytes);
+
auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -1268,7 +1336,7 @@ namespace remotestore_impl {
struct CreatedBlock
{
- IoBuffer Payload;
+ CompositeBuffer Payload;
ChunkBlockDescription Block;
};
@@ -1402,7 +1470,7 @@ namespace remotestore_impl {
}
try
{
- IoBuffer Payload;
+ CompositeBuffer Payload;
ChunkBlockDescription Block;
if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
{
@@ -1415,7 +1483,7 @@ namespace remotestore_impl {
}
else
{
- Payload = ChunkStore.FindChunkByCid(RawHash);
+ Payload = CompositeBuffer(SharedBuffer(ChunkStore.FindChunkByCid(RawHash)));
}
if (!Payload)
{
@@ -1431,7 +1499,7 @@ namespace remotestore_impl {
const bool IsBlock = Block.BlockHash == RawHash;
size_t PayloadSize = Payload.GetSize();
RemoteProjectStore::SaveAttachmentResult Result =
- RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash, std::move(Block));
+ RemoteStore.SaveAttachment(std::move(Payload), RawHash, std::move(Block));
if (Result.ErrorCode)
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
@@ -1709,9 +1777,7 @@ BuildContainer(CidStore& ChunkStore,
std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher> UploadAttachments;
- RwLock BlocksLock;
- std::vector<ChunkBlockDescription> Blocks;
- CompressedBuffer OpsBuffer;
+ CompressedBuffer OpsBuffer;
std::filesystem::path AttachmentTempPath = Oplog.TempPath();
AttachmentTempPath.append(".pending");
@@ -1750,7 +1816,8 @@ BuildContainer(CidStore& ChunkStore,
if (DataHash == IoHash::Zero)
{
std::string_view ServerPath = View["serverpath"sv].AsString();
- std::filesystem::path FilePath = Project.RootDir / ServerPath;
+ std::filesystem::path FilePath = (Project.RootDir / ServerPath).make_preferred();
+ ZEN_INFO("{}", FilePath);
if (!IsFile(FilePath))
{
remotestore_impl::ReportMessage(
@@ -1937,14 +2004,286 @@ BuildContainer(CidStore& ChunkStore,
FoundHashes.insert(It.first);
}
+ struct ChunkedFile
+ {
+ IoBuffer Source;
+
+ ChunkedInfoWithSource Chunked;
+ };
+
+ std::unordered_set<IoHash, IoHash::Hasher> MissingHashes;
+
+ struct FoundChunkedFile
+ {
+ IoHash RawHash = IoHash::Zero;
+ IoBuffer Source;
+ uint64_t Offset = 0;
+ uint64_t Size = 0;
+ };
+
+ std::vector<FoundChunkedFile> AttachmentsToChunk;
+
+ if (AllowChunking)
+ {
+ RwLock FindFilesToChunkLock;
+ Latch FindFilesToChunkLatch(1);
+ for (auto& It : UploadAttachments)
+ {
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ return {};
+ }
+
+ FindFilesToChunkLatch.AddCount(1);
+
+ WorkerPool.ScheduleWork(
+ [&ChunkStore,
+ UploadAttachment = &It.second,
+ RawHash = It.first,
+ &FindFilesToChunkLatch,
+ &FindFilesToChunkLock,
+ // &LooseUploadAttachments,
+ &MissingHashes,
+ &OnLargeAttachment,
+ &AttachmentTempPath,
+ MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
+ AllowChunking,
+ &RemoteResult,
+ &AttachmentsToChunk,
+ OptionalContext]() {
+ ZEN_TRACE_CPU("PrepareChunk");
+
+ auto _ = MakeGuard([&FindFilesToChunkLatch] { FindFilesToChunkLatch.CountDown(); });
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ return;
+ }
+
+ try
+ {
+ if (!UploadAttachment->RawPath.empty())
+ {
+ const std::filesystem::path& FilePath = UploadAttachment->RawPath;
+ IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath);
+ if (RawData)
+ {
+ UploadAttachment->Size = RawData.GetSize();
+ if (UploadAttachment->Size > ChunkFileSizeLimit)
+ {
+ FindFilesToChunkLock.WithExclusiveLock([&]() {
+ AttachmentsToChunk.push_back(FoundChunkedFile{.RawHash = RawHash,
+ .Source = RawData,
+ .Offset = 0,
+ .Size = RawData.GetSize()});
+ });
+ }
+ }
+ else
+ {
+ FindFilesToChunkLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); });
+ }
+ }
+ else
+ {
+ IoBuffer Data = ChunkStore.FindChunkByCid(RawHash);
+ if (Data)
+ {
+ UploadAttachment->Size = Data.GetSize();
+ if (Data.IsWholeFile())
+ {
+ IoHash VerifyRawHash;
+ uint64_t VerifyRawSize;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(Data), VerifyRawHash, VerifyRawSize);
+ if (Compressed)
+ {
+ if (VerifyRawSize > ChunkFileSizeLimit)
+ {
+ OodleCompressor Compressor;
+ OodleCompressionLevel CompressionLevel;
+ uint64_t BlockSize;
+ if (Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
+ {
+ if (CompressionLevel == OodleCompressionLevel::None)
+ {
+ CompositeBuffer Decompressed = Compressed.DecompressToComposite();
+ if (Decompressed)
+ {
+ std::span<const SharedBuffer> Segments = Decompressed.GetSegments();
+ if (Segments.size() == 1)
+ {
+ IoBuffer DecompressedData = Segments[0].AsIoBuffer();
+ IoBufferFileReference DecompressedFileRef;
+ if (DecompressedData.GetFileReference(DecompressedFileRef))
+ {
+ // Are we still pointing to disk?
+ FindFilesToChunkLock.WithExclusiveLock([&]() {
+ AttachmentsToChunk.push_back(
+ FoundChunkedFile{.RawHash = RawHash,
+ .Source = Data,
+ .Offset = DecompressedFileRef.FileChunkOffset,
+ .Size = DecompressedFileRef.FileChunkSize});
+ });
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ else
+ {
+ FindFilesToChunkLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); });
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Failed to resolve attachment {}", RawHash),
+ Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ FindFilesToChunkLatch.CountDown();
+
+ Stopwatch ResolveAttachmentsProgressTimer;
+ while (!FindFilesToChunkLatch.Wait(1000))
+ {
+ ptrdiff_t Remaining = FindFilesToChunkLatch.Remaining();
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ while (!FindFilesToChunkLatch.Wait(1000))
+ {
+ Remaining = FindFilesToChunkLatch.Remaining();
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Resolving attachments"sv,
+ fmt::format("Aborting, {} attachments remaining...", Remaining),
+ UploadAttachments.size(),
+ Remaining,
+ ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
+ }
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Resolving attachments"sv,
+ "Aborted"sv,
+ UploadAttachments.size(),
+ 0,
+ ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
+ return {};
+ }
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Resolving attachments"sv,
+ fmt::format("{} remaining...", Remaining),
+ UploadAttachments.size(),
+ Remaining,
+ ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
+ }
+ }
+
+ for (const IoHash& AttachmentHash : MissingHashes)
+ {
+ auto It = UploadAttachments.find(AttachmentHash);
+ ZEN_ASSERT(It != UploadAttachments.end());
+ std::optional<CbObject> Op = Oplog.GetOpByKey(It->second.Key);
+ ZEN_ASSERT(Op.has_value());
+
+ if (IgnoreMissingAttachments)
+ {
+ remotestore_impl::ReportMessage(OptionalContext,
+ fmt::format("Missing attachment '{}' for op '{}'", AttachmentHash, It->second.Key));
+ }
+ else
+ {
+ ExtendableStringBuilder<1024> Sb;
+ Sb.Append("Failed to find attachment '");
+ Sb.Append(AttachmentHash.ToHexString());
+ Sb.Append("' for op: \n");
+ Op.value().ToJson(Sb);
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {});
+ return {};
+ }
+ UploadAttachments.erase(AttachmentHash);
+ }
+
+ std::unordered_set<IoHash, IoHash::Hasher> ChunkedHashes;
+
+ std::vector<ChunkedFile> ChunkedFiles(AttachmentsToChunk.size());
+
+ for (size_t ChunkFileIndexToChunk = 0; ChunkFileIndexToChunk < AttachmentsToChunk.size(); ChunkFileIndexToChunk++)
+ {
+ // TODO: Multithread
+
+ const FoundChunkedFile& AttachmentToChunk = AttachmentsToChunk[ChunkFileIndexToChunk];
+ const IoHash& RawHash = AttachmentToChunk.RawHash;
+
+ const IoBuffer& Buffer = AttachmentToChunk.Source;
+ IoBufferFileReference FileRef;
+ bool IsFile = Buffer.GetFileReference(FileRef);
+ ZEN_ASSERT(IsFile);
+
+ Stopwatch ChunkOneTimer;
+
+ uint64_t Offset = AttachmentToChunk.Offset;
+ uint64_t Size = AttachmentToChunk.Size;
+
+ BasicFile SourceFile;
+ SourceFile.Attach(FileRef.FileHandle);
+ auto __ = MakeGuard([&SourceFile]() { SourceFile.Detach(); });
+
+ ChunkedFile& Chunked = ChunkedFiles[ChunkFileIndexToChunk];
+ Chunked.Source = Buffer;
+ Chunked.Chunked = ChunkData(SourceFile, Offset, Size, UShaderByteCodeParams);
+ ZEN_ASSERT(Chunked.Chunked.Info.RawHash == RawHash);
+
+ ZEN_INFO("Chunked large attachment '{}' {} into {} chunks in {}",
+ RawHash,
+ NiceBytes(Chunked.Chunked.Info.RawSize),
+ Chunked.Chunked.Info.ChunkHashes.size(),
+ NiceTimeSpanMs(ChunkOneTimer.GetElapsedTimeMs()));
+ }
+
+ for (const ChunkedFile& Chunked : ChunkedFiles)
+ {
+ UploadAttachments.erase(Chunked.Chunked.Info.RawHash);
+ for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes)
+ {
+ UploadAttachments.erase(ChunkHash);
+ }
+ }
+
+ size_t ChunkedChunkCount =
+ std::accumulate(ChunkedFiles.begin(), ChunkedFiles.end(), size_t(0), [](size_t Current, const ChunkedFile& Value) {
+ return Current + Value.Chunked.Info.ChunkHashes.size();
+ });
+
size_t ReusedAttachmentCount = 0;
std::vector<size_t> ReusedBlockIndexes;
{
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(FoundHashes.size());
- ChunkHashes.insert(ChunkHashes.begin(), FoundHashes.begin(), FoundHashes.end());
+ std::unordered_set<IoHash, IoHash::Hasher> UniqueChunkHashes;
+ UniqueChunkHashes.reserve(FoundHashes.size() + ChunkedChunkCount);
+
+ UniqueChunkHashes.insert(FoundHashes.begin(), FoundHashes.end());
+
+ for (ChunkedFile& Chunked : ChunkedFiles)
+ {
+ UniqueChunkHashes.insert(Chunked.Chunked.Info.ChunkHashes.begin(), Chunked.Chunked.Info.ChunkHashes.end());
+ }
+ std::vector<IoHash> ChunkHashes(UniqueChunkHashes.begin(), UniqueChunkHashes.end());
+
std::vector<uint32_t> ChunkIndexes;
- ChunkIndexes.resize(FoundHashes.size());
+ ChunkIndexes.resize(ChunkHashes.size());
std::iota(ChunkIndexes.begin(), ChunkIndexes.end(), 0);
std::vector<uint32_t> UnusedChunkIndexes;
@@ -1971,50 +2310,15 @@ BuildContainer(CidStore& ChunkStore,
}
}
- struct ChunkedFile
- {
- IoBuffer Source;
-
- ChunkedInfoWithSource Chunked;
- tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkLoookup;
- };
- std::vector<ChunkedFile> ChunkedFiles;
-
- auto ChunkFile = [](const IoHash& RawHash, IoBuffer& RawData, const IoBufferFileReference& FileRef, JobContext*) -> ChunkedFile {
- ChunkedFile Chunked;
- Stopwatch Timer;
-
- uint64_t Offset = FileRef.FileChunkOffset;
- uint64_t Size = FileRef.FileChunkSize;
-
- BasicFile SourceFile;
- SourceFile.Attach(FileRef.FileHandle);
- auto __ = MakeGuard([&SourceFile]() { SourceFile.Detach(); });
-
- Chunked.Chunked = ChunkData(SourceFile, Offset, Size, UShaderByteCodeParams);
- ZEN_ASSERT(Chunked.Chunked.Info.RawHash == RawHash);
- Chunked.Source = RawData;
-
- ZEN_INFO("Chunked large attachment '{}' {} into {} chunks in {}",
- RawHash,
- NiceBytes(Chunked.Chunked.Info.RawSize),
- Chunked.Chunked.Info.ChunkHashes.size(),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
-
- return Chunked;
- };
-
RwLock ResolveLock;
- std::unordered_set<IoHash, IoHash::Hasher> ChunkedHashes;
std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes;
- std::unordered_map<IoHash, size_t, IoHash::Hasher> ChunkedUploadAttachments;
std::unordered_map<IoHash, std::pair<uint64_t, IoBuffer>, IoHash::Hasher> LooseUploadAttachments;
- std::unordered_set<IoHash, IoHash::Hasher> MissingHashes;
remotestore_impl::ReportMessage(OptionalContext,
fmt::format("Resolving {} attachments from {} ops", UploadAttachments.size(), TotalOpCount));
- Latch ResolveAttachmentsLatch(1);
+ Stopwatch ResolveAttachmentsProgressTimer;
+ Latch ResolveAttachmentsLatch(1);
for (auto& It : UploadAttachments)
{
if (remotestore_impl::IsCancelled(OptionalContext))
@@ -2035,13 +2339,10 @@ BuildContainer(CidStore& ChunkStore,
&ResolveLock,
&ChunkedHashes,
&LargeChunkHashes,
- &ChunkedUploadAttachments,
&LooseUploadAttachments,
&MissingHashes,
&OnLargeAttachment,
&AttachmentTempPath,
- &ChunkFile,
- &ChunkedFiles,
MaxChunkEmbedSize,
ChunkFileSizeLimit,
AllowChunking,
@@ -2057,163 +2358,150 @@ BuildContainer(CidStore& ChunkStore,
try
{
+ ZEN_ASSERT(UploadAttachment->Size != 0);
if (!UploadAttachment->RawPath.empty())
{
- const std::filesystem::path& FilePath = UploadAttachment->RawPath;
- IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath);
- if (RawData)
+ if (UploadAttachment->Size > (MaxChunkEmbedSize * 2))
{
- if (AllowChunking && RawData.GetSize() > ChunkFileSizeLimit)
- {
- IoBufferFileReference FileRef;
- (void)RawData.GetFileReference(FileRef);
-
- ChunkedFile Chunked = ChunkFile(RawHash, RawData, FileRef, OptionalContext);
- ResolveLock.WithExclusiveLock(
- [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() {
- ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size());
- ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size());
- for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes)
- {
- ChunkedHashes.insert(ChunkHash);
- }
- ChunkedFiles.emplace_back(std::move(Chunked));
- });
- }
- else if (RawData.GetSize() > (MaxChunkEmbedSize * 2))
- {
- // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't
- // it will be a loose attachment instead of going into a block
- OnLargeAttachment(RawHash, [RawData = std::move(RawData), AttachmentTempPath](const IoHash& RawHash) {
- size_t RawSize = RawData.GetSize();
- CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(RawData)),
- OodleCompressor::Mermaid,
- OodleCompressionLevel::VeryFast);
+ // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't
+ // it will be a loose attachment instead of going into a block
+ OnLargeAttachment(
+ RawHash,
+ [RawPath = UploadAttachment->RawPath, AttachmentTempPath, UploadAttachment](
+ const IoHash& RawHash) -> CompositeBuffer {
+ IoBuffer RawData = IoBufferBuilder::MakeFromFile(RawPath);
+ if (!RawData)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed to read source file for blob {} from '{}'", RawHash, RawPath));
+ }
std::filesystem::path AttachmentPath = AttachmentTempPath;
AttachmentPath.append(RawHash.ToHexString());
+
IoBuffer TempAttachmentBuffer =
- WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath);
+ remotestore_impl::CompressToTempFile(RawHash,
+ RawData,
+ AttachmentPath,
+ OodleCompressor::Mermaid,
+ OodleCompressionLevel::VeryFast);
+ TempAttachmentBuffer.SetDeleteOnClose(true);
+
+ IoHash VerifyRawHash;
+ uint64_t VerifyRawSize;
+ ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressed(SharedBuffer(TempAttachmentBuffer),
+ VerifyRawHash,
+ VerifyRawSize));
+ ZEN_ASSERT_SLOW(VerifyRawHash == RawHash);
+ ZEN_ASSERT_SLOW(VerifyRawSize == RawData.GetSize());
+
ZEN_INFO("Saved temp attachment to '{}', {} ({})",
AttachmentPath,
- NiceBytes(RawSize),
+ NiceBytes(UploadAttachment->Size),
NiceBytes(TempAttachmentBuffer.GetSize()));
- return TempAttachmentBuffer;
+ return CompositeBuffer(SharedBuffer(std::move(TempAttachmentBuffer)));
});
- ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
- }
- else
+ ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
+ }
+ else
+ {
+ // Compress inline - check compressed size to see if it should go into a block or not
+ IoBuffer RawData = IoBufferBuilder::MakeFromFile(UploadAttachment->RawPath);
+ if (!RawData)
{
- uint64_t RawSize = RawData.GetSize();
- CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(RawData),
- OodleCompressor::Mermaid,
- OodleCompressionLevel::VeryFast);
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Failed to read attachment {}", UploadAttachment->RawPath),
+ "");
+ return;
+ }
+
+ std::filesystem::path TempFilePath = AttachmentTempPath;
+ TempFilePath.append(RawHash.ToHexString());
- std::filesystem::path AttachmentPath = AttachmentTempPath;
- AttachmentPath.append(RawHash.ToHexString());
+ try
+ {
+ IoBuffer TempAttachmentBuffer = remotestore_impl::CompressToTempFile(RawHash,
+ RawData,
+ TempFilePath,
+ OodleCompressor::Mermaid,
+ OodleCompressionLevel::VeryFast);
+ TempAttachmentBuffer.SetDeleteOnClose(true);
+
+ IoHash VerifyRawHash;
+ uint64_t VerifyRawSize;
+ ZEN_ASSERT_SLOW(
+ CompressedBuffer::FromCompressed(SharedBuffer(TempAttachmentBuffer), VerifyRawHash, VerifyRawSize));
+ ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer))
+ .CompressedBuffer::Decompress());
+ ZEN_ASSERT_SLOW(VerifyRawHash == RawHash);
+ ZEN_ASSERT_SLOW(VerifyRawSize == RawData.GetSize());
+
+ uint64_t CompressedSize = TempAttachmentBuffer.GetSize();
- uint64_t CompressedSize = Compressed.GetCompressedSize();
- IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath);
ZEN_INFO("Saved temp attachment to '{}', {} ({})",
- AttachmentPath,
- NiceBytes(RawSize),
- NiceBytes(TempAttachmentBuffer.GetSize()));
+ TempFilePath,
+ NiceBytes(UploadAttachment->Size),
+ NiceBytes(CompressedSize));
if (CompressedSize > MaxChunkEmbedSize)
{
- OnLargeAttachment(RawHash,
- [Data = std::move(TempAttachmentBuffer)](const IoHash&) { return Data; });
+ OnLargeAttachment(RawHash, [Data = std::move(TempAttachmentBuffer)](const IoHash&) {
+ return CompositeBuffer(SharedBuffer(std::move(Data)));
+ });
ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
}
else
{
UploadAttachment->Size = CompressedSize;
- ResolveLock.WithExclusiveLock(
- [RawHash, RawSize, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() {
- LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data)));
- });
- }
- }
- }
- else
- {
- ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); });
- }
- }
- else
- {
- IoBuffer Data = ChunkStore.FindChunkByCid(RawHash);
- if (Data)
- {
- auto GetForChunking =
- [](size_t ChunkFileSizeLimit, const IoBuffer& Data, IoBufferFileReference& OutFileRef) -> bool {
- if (Data.IsWholeFile())
- {
- IoHash VerifyRawHash;
- uint64_t VerifyRawSize;
- CompressedBuffer Compressed =
- CompressedBuffer::FromCompressed(SharedBuffer(Data), VerifyRawHash, VerifyRawSize);
- if (Compressed)
{
- if (VerifyRawSize > ChunkFileSizeLimit)
- {
- OodleCompressor Compressor;
- OodleCompressionLevel CompressionLevel;
- uint64_t BlockSize;
- if (Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
- {
- if (CompressionLevel == OodleCompressionLevel::None)
- {
- CompositeBuffer Decompressed = Compressed.DecompressToComposite();
- if (Decompressed)
- {
- std::span<const SharedBuffer> Segments = Decompressed.GetSegments();
- if (Segments.size() == 1)
- {
- IoBuffer DecompressedData = Segments[0].AsIoBuffer();
- if (DecompressedData.GetFileReference(OutFileRef))
- {
- return true;
- }
- }
- }
- }
- }
- }
+ IoHash VerifyRawHash2;
+ uint64_t VerifyRawSize2;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(TempAttachmentBuffer),
+ VerifyRawHash2,
+ VerifyRawSize2);
+ ZEN_ASSERT_SLOW(Compressed.Decompress());
+ ZEN_ASSERT_SLOW(VerifyRawHash2 == RawHash);
+ ZEN_ASSERT_SLOW(VerifyRawSize2 == RawData.GetSize());
}
- }
- return false;
- };
- IoBufferFileReference FileRef;
- if (AllowChunking && GetForChunking(ChunkFileSizeLimit, Data, FileRef))
- {
- ChunkedFile Chunked = ChunkFile(RawHash, Data, FileRef, OptionalContext);
- ResolveLock.WithExclusiveLock(
- [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() {
- ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size());
- ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size());
- for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes)
- {
- ChunkedHashes.insert(ChunkHash);
- }
- ChunkedFiles.emplace_back(std::move(Chunked));
+ ResolveLock.WithExclusiveLock([RawHash,
+ RawSize = RawData.GetSize(),
+ &LooseUploadAttachments,
+ Data = std::move(TempAttachmentBuffer)]() {
+ LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data)));
});
+ }
}
- else if (Data.GetSize() > MaxChunkEmbedSize)
+ catch (const std::system_error& SysEx)
{
- OnLargeAttachment(RawHash,
- [&ChunkStore](const IoHash& RawHash) { return ChunkStore.FindChunkByCid(RawHash); });
- ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to compress blob {} to temporary file '{}', reason: ({}) {}",
+ RawHash,
+ TempFilePath,
+ SysEx.code().value(),
+ SysEx.what()),
+ "");
}
- else
+ catch (const std::exception& Ex)
{
- UploadAttachment->Size = Data.GetSize();
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to compress blob {} to temporary file '{}', reason: {}",
+ RawHash,
+ TempFilePath,
+ Ex.what()),
+ "");
}
}
- else
+ }
+ else
+ {
+ if (UploadAttachment->Size > MaxChunkEmbedSize)
{
- ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); });
+ OnLargeAttachment(RawHash, [&ChunkStore](const IoHash& RawHash) {
+ return CompositeBuffer(SharedBuffer(std::move(ChunkStore.FindChunkByCid(RawHash))));
+ });
+ ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
}
}
}
@@ -2229,8 +2517,6 @@ BuildContainer(CidStore& ChunkStore,
ResolveAttachmentsLatch.CountDown();
{
- Stopwatch ResolveAttachmentsProgressTimer;
- ptrdiff_t AttachmentCountToUseForProgress = ResolveAttachmentsLatch.Remaining();
while (!ResolveAttachmentsLatch.Wait(1000))
{
ptrdiff_t Remaining = ResolveAttachmentsLatch.Remaining();
@@ -2245,7 +2531,7 @@ BuildContainer(CidStore& ChunkStore,
Remaining = ResolveAttachmentsLatch.Remaining();
remotestore_impl::ReportProgress(OptionalContext,
"Resolving attachments"sv,
- fmt::format("Aborting, {} attachments remaining...", Remaining),
+ "Aborted"sv,
UploadAttachments.size(),
Remaining,
ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
@@ -2258,11 +2544,10 @@ BuildContainer(CidStore& ChunkStore,
ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
return {};
}
- AttachmentCountToUseForProgress = Max(Remaining, AttachmentCountToUseForProgress);
remotestore_impl::ReportProgress(OptionalContext,
"Resolving attachments"sv,
fmt::format("{} remaining...", Remaining),
- AttachmentCountToUseForProgress,
+ UploadAttachments.size(),
Remaining,
ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
}
@@ -2276,6 +2561,7 @@ BuildContainer(CidStore& ChunkStore,
ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
}
}
+
if (remotestore_impl::IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
@@ -2284,86 +2570,13 @@ BuildContainer(CidStore& ChunkStore,
return {};
}
- for (const IoHash& AttachmentHash : MissingHashes)
- {
- auto It = UploadAttachments.find(AttachmentHash);
- ZEN_ASSERT(It != UploadAttachments.end());
- std::optional<CbObject> Op = Oplog.GetOpByKey(It->second.Key);
- ZEN_ASSERT(Op.has_value());
-
- if (IgnoreMissingAttachments)
- {
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Missing attachment '{}' for op '{}'", AttachmentHash, It->second.Key));
- }
- else
- {
- ExtendableStringBuilder<1024> Sb;
- Sb.Append("Failed to find attachment '");
- Sb.Append(AttachmentHash.ToHexString());
- Sb.Append("' for op: \n");
- Op.value().ToJson(Sb);
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {});
- return {};
- }
- UploadAttachments.erase(AttachmentHash);
- }
-
- for (const auto& It : ChunkedUploadAttachments)
- {
- UploadAttachments.erase(It.first);
- }
for (const auto& It : LargeChunkHashes)
{
UploadAttachments.erase(It);
}
- {
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(ChunkedHashes.size());
- ChunkHashes.insert(ChunkHashes.begin(), ChunkedHashes.begin(), ChunkedHashes.end());
- std::vector<uint32_t> ChunkIndexes;
- ChunkIndexes.resize(ChunkedHashes.size());
- std::iota(ChunkIndexes.begin(), ChunkIndexes.end(), 0);
-
- std::vector<uint32_t> UnusedChunkIndexes;
- ReuseBlocksStatistics ReuseBlocksStats;
-
- std::vector<size_t> ReusedBlockFromChunking = FindReuseBlocks(*LogOutput,
- /*BlockReuseMinPercentLimit*/ 80,
- /*IsVerbose*/ false,
- ReuseBlocksStats,
- KnownBlocks,
- ChunkHashes,
- ChunkIndexes,
- UnusedChunkIndexes);
- for (size_t KnownBlockIndex : ReusedBlockIndexes)
- {
- const ThinChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex];
- for (const IoHash& KnownHash : KnownBlock.ChunkRawHashes)
- {
- if (ChunkedHashes.erase(KnownHash) == 1)
- {
- ReusedAttachmentCount++;
- }
- }
- }
- ReusedBlockIndexes.insert(ReusedBlockIndexes.end(), ReusedBlockFromChunking.begin(), ReusedBlockFromChunking.end());
- }
-
- std::sort(ReusedBlockIndexes.begin(), ReusedBlockIndexes.end());
- auto UniqueKnownBlocksEnd = std::unique(ReusedBlockIndexes.begin(), ReusedBlockIndexes.end());
- size_t ReuseBlockCount = std::distance(ReusedBlockIndexes.begin(), UniqueKnownBlocksEnd);
- if (ReuseBlockCount > 0)
- {
- Blocks.reserve(ReuseBlockCount);
- for (auto It = ReusedBlockIndexes.begin(); It != UniqueKnownBlocksEnd; It++)
- {
- Blocks.push_back({KnownBlocks[*It]});
- }
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Reused {} attachments from {} blocks", ReusedAttachmentCount, ReuseBlockCount));
- }
+ RwLock BlocksLock;
+ std::vector<ChunkBlockDescription> Blocks;
std::vector<std::pair<IoHash, Oid>> SortedUploadAttachments;
SortedUploadAttachments.reserve(UploadAttachments.size());
@@ -2406,9 +2619,6 @@ BuildContainer(CidStore& ChunkStore,
return ChunkedFiles[Lhs].Chunked.Info.RawHash < ChunkedFiles[Rhs].Chunked.Info.RawHash;
});
- // SortedUploadAttachments now contains all whole chunks with size to be composed into blocks and uploaded
- // ChunkedHashes contains all chunked up chunks to be composed into blocks
-
if (remotestore_impl::IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
@@ -2431,225 +2641,431 @@ BuildContainer(CidStore& ChunkStore,
return {};
}
- // SortedUploadAttachments now contains all whole chunks with size to be composed into blocks and uploaded
- // ChunkedHashes contains all chunked up chunks to be composed into blocks
-
size_t ChunkAssembleCount = SortedUploadAttachments.size() + ChunkedHashes.size();
size_t ChunksAssembled = 0;
remotestore_impl::ReportMessage(OptionalContext,
fmt::format("Assembling {} attachments from {} ops into blocks", ChunkAssembleCount, TotalOpCount));
- Latch BlockCreateLatch(1);
- size_t GeneratedBlockCount = 0;
- size_t BlockSize = 0;
- std::vector<std::pair<IoHash, FetchChunkFunc>> ChunksInBlock;
-
- Oid LastOpKey = Oid::Zero;
- uint32_t ComposedBlocks = 0;
+ Latch BlockCreateLatch(1);
+ size_t GeneratedBlockCount = 0;
+ uint32_t ComposedBlocks = 0;
uint64_t CreateBlocksStartMS = Timer.GetElapsedTimeMs();
try
{
- uint64_t FetchAttachmentsStartMS = Timer.GetElapsedTimeMs();
- std::unordered_set<IoHash, IoHash::Hasher> AddedAttachmentHashes;
- auto NewBlock = [&]() {
- size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks);
- size_t ChunkCount = ChunksInBlock.size();
- std::vector<IoHash> ChunkRawHashes;
- ChunkRawHashes.reserve(ChunkCount);
- for (const std::pair<IoHash, FetchChunkFunc>& Chunk : ChunksInBlock)
- {
- ChunkRawHashes.push_back(Chunk.first);
- }
- if (BuildBlocks)
- {
- remotestore_impl::CreateBlock(WorkerPool,
- BlockCreateLatch,
- std::move(ChunksInBlock),
- BlocksLock,
- Blocks,
- BlockIndex,
- AsyncOnBlock,
- RemoteResult);
- ComposedBlocks++;
- // Worker will set Blocks[BlockIndex] = Block (including ChunkRawHashes) under shared lock
- }
- else
- {
- ZEN_INFO("Bulk group {} attachments", ChunkCount);
- OnBlockChunks(std::move(ChunksInBlock));
- // We can share the lock as we are not resizing the vector and only touch our own index
- RwLock::SharedLockScope _(BlocksLock);
- Blocks[BlockIndex].ChunkRawHashes = std::move(ChunkRawHashes);
- }
- uint64_t NowMS = Timer.GetElapsedTimeMs();
- ZEN_INFO("Assembled block {} with {} chunks in {} ({})",
- BlockIndex,
- ChunkCount,
- NiceTimeSpanMs(NowMS - FetchAttachmentsStartMS),
- NiceBytes(BlockSize));
- FetchAttachmentsStartMS = NowMS;
- ChunksInBlock.clear();
- BlockSize = 0;
- GeneratedBlockCount++;
- };
+ auto NewBlock =
+ [&WorkerPool, BuildBlocks, &BlockCreateLatch, &BlocksLock, &Blocks, &AsyncOnBlock, &OnBlockChunks, &RemoteResult](
+ std::vector<IoHash>&& ChunkRawHashes,
+ const std::function<FetchChunkFunc(const IoHash&)>& FetchResolveFunc) {
+ size_t ChunkCount = ChunkRawHashes.size();
+ std::vector<std::pair<IoHash, FetchChunkFunc>> ChunksInBlock;
+ ChunksInBlock.reserve(ChunkCount);
+
+ for (const IoHash& AttachmentHash : ChunkRawHashes)
+ {
+ ChunksInBlock.emplace_back(std::make_pair(AttachmentHash, FetchResolveFunc(AttachmentHash)));
+ }
- Stopwatch AssembleBlocksProgressTimer;
- uint64_t LastAssembleBlocksProgressUpdateMs = AssembleBlocksProgressTimer.GetElapsedTimeMs();
- for (auto HashIt = SortedUploadAttachments.begin(); HashIt != SortedUploadAttachments.end(); HashIt++)
- {
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- break;
- }
- if (AssembleBlocksProgressTimer.GetElapsedTimeMs() - LastAssembleBlocksProgressUpdateMs > 200)
- {
- remotestore_impl::ReportProgress(
- OptionalContext,
- "Assembling blocks"sv,
- fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
- ChunkAssembleCount,
- ChunkAssembleCount - ChunksAssembled,
- AssembleBlocksProgressTimer.GetElapsedTimeMs());
- LastAssembleBlocksProgressUpdateMs = AssembleBlocksProgressTimer.GetElapsedTimeMs();
- }
- const IoHash& RawHash(HashIt->first);
- const Oid CurrentOpKey = HashIt->second;
- const IoHash& AttachmentHash(HashIt->first);
- auto InfoIt = UploadAttachments.find(RawHash);
- ZEN_ASSERT(InfoIt != UploadAttachments.end());
- uint64_t PayloadSize = InfoIt->second.Size;
-
- if (AddedAttachmentHashes.insert(AttachmentHash).second)
+ size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks);
+ if (BuildBlocks)
+ {
+#if 1
+ ChunkBlockDescription Block;
+ CompressedBuffer CompressedBlock = GenerateChunkBlock(std::move(ChunksInBlock), Block);
+ IoHash BlockHash = CompressedBlock.DecodeRawHash();
+ ZEN_ASSERT_SLOW(BlockHash != IoHash::Zero);
+ ZEN_UNUSED(BlockHash);
+ {
+ // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ // RwLock::SharedLockScope __(SectionsLock);
+ Blocks[BlockIndex] = Block;
+ }
+ // uint64_t BlockSize = CompressedBlock.GetCompressedSize();
+ AsyncOnBlock(std::move(CompressedBlock), std::move(Block));
+#else
+ remotestore_impl::CreateBlock(WorkerPool,
+ BlockCreateLatch,
+ std::move(ChunksInBlock),
+ BlocksLock,
+ Blocks,
+ BlockIndex,
+ AsyncOnBlock,
+ RemoteResult);
+ // ComposedBlocks++;
+#endif
+ }
+ else
+ {
+ ZEN_INFO("Bulk group {} attachments", ChunkCount);
+
+ // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ RwLock::SharedLockScope _(BlocksLock);
+ Blocks[BlockIndex].ChunkRawHashes = std::move(ChunkRawHashes);
+ OnBlockChunks(std::move(ChunksInBlock));
+ }
+ // uint64_t NowMS = Timer.GetElapsedTimeMs();
+ // ZEN_INFO("Assembled block {} with {} chunks in {} ({})",
+ // BlockIndex,
+ // ChunkCount,
+ // NiceTimeSpanMs(NowMS - FetchAttachmentsStartMS),
+ // NiceBytes(BlockSize));
+ // FetchAttachmentsStartMS = NowMS;
+ // ComposedBlocks++;
+ };
+
+ auto ComposeBlocks = [&ChunksAssembled, &ComposedBlocks, ChunkAssembleCount, &NewBlock](
+ uint64_t MaxBlockSize,
+ uint64_t MaxChunksPerBlock,
+ std::vector<IoHash> AttachmentHashes,
+ std::vector<uint64_t> AttachmentSizes,
+ std::vector<Oid> AttachmentKeys,
+ std::function<FetchChunkFunc(const IoHash& AttachmentHash)>&& FetchResolveFunc,
+ JobContext* OptionalContext,
+ remotestore_impl::AsyncRemoteResult& RemoteResult) {
+ ZEN_ASSERT(AttachmentHashes.size() == AttachmentSizes.size());
+ ZEN_ASSERT(AttachmentHashes.size() == AttachmentKeys.size());
+ ZEN_ASSERT(FetchResolveFunc);
+
+ std::vector<IoHash> PendingChunkHashes;
+ uint64_t PendingBlockSize = 0;
+
+ size_t SortedUploadAttachmentsIndex = 0;
+
+ Stopwatch AssembleBlocksProgressTimer;
+ while (SortedUploadAttachmentsIndex < AttachmentHashes.size())
{
- if (BuildBlocks && ChunksInBlock.size() > 0)
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
- if (((BlockSize + PayloadSize) > MaxBlockSize || (ChunksInBlock.size() + 1) > MaxChunksPerBlock) &&
- (CurrentOpKey != LastOpKey))
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ break;
+ }
+
+ const IoHash& FirstAttachmentHash = AttachmentHashes[SortedUploadAttachmentsIndex];
+ const Oid FirstAttachmentOpKey = AttachmentKeys[SortedUploadAttachmentsIndex];
+ size_t CurrentOpAttachmentsSize = AttachmentSizes[SortedUploadAttachmentsIndex];
+
+ std::vector<IoHash> CurrentOpRawHashes;
+ CurrentOpRawHashes.push_back(FirstAttachmentHash);
+ std::vector<uint64_t> CurrentOpChunkSizes;
+
+ CurrentOpChunkSizes.push_back(CurrentOpAttachmentsSize);
+
+ bool CurrentOpFillFullBlock = false;
+
+ while (SortedUploadAttachmentsIndex + CurrentOpRawHashes.size() < AttachmentHashes.size())
+ {
+ size_t NextSortedUploadAttachmentsIndex = SortedUploadAttachmentsIndex + CurrentOpChunkSizes.size();
+ const Oid NextAttachmentOpKey = AttachmentKeys[NextSortedUploadAttachmentsIndex];
+ if (NextAttachmentOpKey != FirstAttachmentOpKey)
{
- NewBlock();
+ break;
+ }
+ const IoHash& NextAttachmentHash = AttachmentHashes[NextSortedUploadAttachmentsIndex];
+ size_t NextOpAttachmentSize = AttachmentSizes[NextSortedUploadAttachmentsIndex];
+
+ if (CurrentOpAttachmentsSize + NextOpAttachmentSize > MaxBlockSize)
+ {
+ CurrentOpFillFullBlock = true;
+ break;
+ }
+ CurrentOpRawHashes.push_back(NextAttachmentHash);
+ CurrentOpChunkSizes.push_back(NextOpAttachmentSize);
+ CurrentOpAttachmentsSize += NextOpAttachmentSize;
+
+ if (CurrentOpRawHashes.size() == MaxChunksPerBlock)
+ {
+ CurrentOpFillFullBlock = true;
+ break;
}
}
- if (auto It = LooseUploadAttachments.find(RawHash); It != LooseUploadAttachments.end())
+ size_t CurrentOpAttachmentCount = CurrentOpChunkSizes.size();
+ ZEN_ASSERT(CurrentOpAttachmentsSize <= MaxBlockSize);
+ ZEN_ASSERT(CurrentOpAttachmentCount <= MaxChunksPerBlock);
+
+ if (CurrentOpFillFullBlock)
{
- ChunksInBlock.emplace_back(std::make_pair(
- RawHash,
- [RawSize = It->second.first,
- IoBuffer = SharedBuffer(It->second.second)](const IoHash&) -> std::pair<uint64_t, CompressedBuffer> {
- return std::make_pair(RawSize, CompressedBuffer::FromCompressedNoValidate(IoBuffer.AsIoBuffer()));
- }));
- LooseUploadAttachments.erase(It);
+ ZEN_ASSERT(CurrentOpRawHashes.size() <= MaxChunksPerBlock);
+ ZEN_ASSERT(CurrentOpAttachmentsSize <= MaxBlockSize);
+ NewBlock(std::move(CurrentOpRawHashes), /*CurrentOpAttachmentsSize, */ FetchResolveFunc);
+ ComposedBlocks++;
+ }
+ else if ((PendingBlockSize + CurrentOpAttachmentsSize) <= MaxBlockSize &&
+ (PendingChunkHashes.size() + CurrentOpAttachmentCount) <= MaxChunksPerBlock)
+ {
+ // All attachments for Op fits in the current block...
+ PendingChunkHashes.insert(PendingChunkHashes.end(), CurrentOpRawHashes.begin(), CurrentOpRawHashes.end());
+ PendingBlockSize += CurrentOpAttachmentsSize;
+ ZEN_ASSERT(PendingChunkHashes.size() <= MaxChunksPerBlock);
+ ZEN_ASSERT(PendingBlockSize <= MaxBlockSize);
+ if (PendingBlockSize == MaxBlockSize || PendingChunkHashes.size() == MaxChunksPerBlock)
+ {
+ NewBlock(std::move(PendingChunkHashes), /*PendingBlockSize, */ FetchResolveFunc);
+ ComposedBlocks++;
+ PendingBlockSize = 0;
+ }
+ }
+ else if (PendingBlockSize > (MaxBlockSize * 3) / 4)
+ {
+ // Our ops does not fit in the current block and the current block is using 75% of max size so lets move our op to
+ // the next block...
+ ZEN_ASSERT(PendingChunkHashes.size() <= MaxChunksPerBlock);
+ ZEN_ASSERT(PendingBlockSize <= MaxBlockSize);
+ NewBlock(std::move(PendingChunkHashes), /*PendingBlockSize, */ FetchResolveFunc);
+ ComposedBlocks++;
+
+ PendingBlockSize = 0;
+
+ PendingChunkHashes.insert(PendingChunkHashes.end(), CurrentOpRawHashes.begin(), CurrentOpRawHashes.end());
+ PendingBlockSize += CurrentOpAttachmentsSize;
+ ZEN_ASSERT(PendingChunkHashes.size() <= MaxChunksPerBlock);
+ ZEN_ASSERT(PendingBlockSize <= MaxBlockSize);
}
else
{
- ChunksInBlock.emplace_back(
- std::make_pair(RawHash, [&ChunkStore](const IoHash& RawHash) -> std::pair<uint64_t, CompressedBuffer> {
- IoBuffer Chunk = ChunkStore.FindChunkByCid(RawHash);
- if (!Chunk)
- {
- throw std::runtime_error(fmt::format("Failed to find chunk {} in cid store", RawHash));
- }
- IoHash ValidateRawHash;
- uint64_t RawSize = 0;
- CompressedBuffer Compressed =
- CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), ValidateRawHash, RawSize);
- if (!Compressed)
- {
- throw std::runtime_error(
- fmt::format("Chunk {} in cid store is malformed (not a compressed buffer)", RawHash));
- }
- if (RawHash != ValidateRawHash)
- {
- throw std::runtime_error(
- fmt::format("Chunk {} in cid store is malformed (mismatching raw hash)", RawHash));
- }
- return {RawSize, Compressed};
- }));
+ size_t AddedChunkCount = 0;
+
+ // Fit as many as possible in current block...
+ for (size_t CurrentChunkIndex = 0; CurrentChunkIndex < CurrentOpRawHashes.size(); CurrentChunkIndex++)
+ {
+ uint64_t ChunkSize = CurrentOpChunkSizes[CurrentChunkIndex];
+ if (PendingBlockSize + ChunkSize >= MaxBlockSize || PendingChunkHashes.size() >= MaxChunksPerBlock)
+ {
+ break;
+ }
+ AddedChunkCount++;
+ PendingBlockSize += ChunkSize;
+ PendingChunkHashes.push_back(CurrentOpRawHashes[CurrentChunkIndex]);
+ }
+ ZEN_ASSERT(AddedChunkCount < CurrentOpRawHashes.size());
+ ZEN_ASSERT(PendingChunkHashes.size() <= MaxChunksPerBlock);
+ ZEN_ASSERT(PendingBlockSize <= MaxBlockSize);
+ NewBlock(std::move(PendingChunkHashes), /*PendingBlockSize,*/ FetchResolveFunc);
+ ComposedBlocks++;
+
+ if (AddedChunkCount > 0)
+ {
+ PendingChunkHashes =
+ std::vector<IoHash>(CurrentOpRawHashes.begin() + AddedChunkCount, CurrentOpRawHashes.end());
+ PendingBlockSize =
+ std::accumulate(CurrentOpChunkSizes.begin() + AddedChunkCount, CurrentOpChunkSizes.end(), uint64_t(0));
+ }
+ else
+ {
+ PendingChunkHashes = std::move(CurrentOpRawHashes);
+ PendingBlockSize = CurrentOpAttachmentsSize;
+ }
+
+ ZEN_ASSERT(PendingChunkHashes.size() <= MaxChunksPerBlock);
+ ZEN_ASSERT(PendingBlockSize <= MaxBlockSize);
}
- BlockSize += PayloadSize;
+ ChunksAssembled += CurrentOpAttachmentCount;
- LastOpKey = CurrentOpKey;
- ChunksAssembled++;
+ if (ChunksAssembled % 1000 == 0)
+ {
+ remotestore_impl::ReportProgress(
+ OptionalContext,
+ "Assembling blocks"sv,
+ fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
+ ChunkAssembleCount,
+ ChunkAssembleCount - ChunksAssembled,
+ AssembleBlocksProgressTimer.GetElapsedTimeMs());
+ }
+ SortedUploadAttachmentsIndex += CurrentOpAttachmentCount;
}
- }
- if (!RemoteResult.IsError())
+ if (!PendingChunkHashes.empty())
+ {
+ size_t PendingChunkCount = PendingChunkHashes.size();
+ ZEN_ASSERT(PendingChunkHashes.size() <= MaxChunksPerBlock);
+ ZEN_ASSERT(PendingBlockSize <= MaxBlockSize);
+ NewBlock(std::move(PendingChunkHashes), /*PendingBlockSize, */ FetchResolveFunc);
+ ComposedBlocks++;
+
+ ChunksAssembled += PendingChunkCount;
+ }
+ };
+
+ Stopwatch AssembleBlocksProgressTimer;
{
- // Keep the chunked files as separate blocks to make the blocks generated
- // more consistent
- if (BlockSize > 0)
+ std::vector<IoHash> AttachmentHashes;
+ AttachmentHashes.reserve(SortedUploadAttachments.size());
+ std::vector<uint64_t> AttachmentSizes;
+ AttachmentSizes.reserve(SortedUploadAttachments.size());
+ std::vector<Oid> AttachmentKeys;
+ AttachmentKeys.reserve(SortedUploadAttachments.size());
+
+ for (const std::pair<IoHash, Oid>& Attachment : SortedUploadAttachments)
{
- NewBlock();
+ AttachmentHashes.push_back(Attachment.first);
+ if (auto It = UploadAttachments.find(Attachment.first); It != UploadAttachments.end())
+ {
+ AttachmentSizes.push_back(It->second.Size);
+ }
+ else
+ {
+ RemoteResult.SetError(
+ gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Attachment to upload state inconsistent, could not find attachment {}", Attachment.first),
+ "");
+ return {};
+ }
+ AttachmentKeys.push_back(Attachment.second);
}
+ auto FetchWholeAttachmentResolver = [&LooseUploadAttachments, &ChunkStore](const IoHash& AttachmentHash) -> FetchChunkFunc {
+ if (auto It = LooseUploadAttachments.find(AttachmentHash); It != LooseUploadAttachments.end())
+ {
+ uint64_t RawSize = It->second.first;
+ IoBuffer Payload = It->second.second;
+
+ IoHash VerifyRawHash;
+ uint64_t VerifyRawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), VerifyRawHash, VerifyRawSize);
+ ZEN_ASSERT_SLOW(Compressed.Decompress());
+ ZEN_ASSERT_SLOW(VerifyRawHash == AttachmentHash);
+ ZEN_ASSERT_SLOW(VerifyRawSize == RawSize);
+
+ LooseUploadAttachments.erase(It);
+ return [RawSize = RawSize,
+ Payload = SharedBuffer(Payload)](const IoHash& ChunkHash) -> std::pair<uint64_t, CompressedBuffer> {
+ // CompressedBuffer Compressed =
+ // CompressedBuffer::FromCompressedNoValidate(Payload.AsIoBuffer());
+ IoHash VerifyRawHash;
+ uint64_t VerifyRawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Payload, VerifyRawHash, VerifyRawSize);
+ ZEN_ASSERT_SLOW(Compressed.Decompress());
+ ZEN_ASSERT_SLOW(VerifyRawHash == ChunkHash);
+ ZEN_ASSERT_SLOW(VerifyRawSize == RawSize);
+ return {RawSize, Compressed};
+ };
+ }
+ else
+ {
+ ZEN_ASSERT_SLOW(ChunkStore.ContainsChunk(AttachmentHash));
+ return [&ChunkStore](const IoHash& RawHash) -> std::pair<uint64_t, CompressedBuffer> {
+ IoBuffer Chunk = ChunkStore.FindChunkByCid(RawHash);
+ if (!Chunk)
+ {
+ throw std::runtime_error(fmt::format("Failed to find chunk {} in cid store", RawHash));
+ }
+ IoHash ValidateRawHash;
+ uint64_t RawSize = 0;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), ValidateRawHash, RawSize);
+ if (!Compressed)
+ {
+ throw std::runtime_error(
+ fmt::format("Chunk {} in cid store is malformed (not a compressed buffer)", RawHash));
+ }
+ if (RawHash != ValidateRawHash)
+ {
+ throw std::runtime_error(fmt::format("Chunk {} in cid store is malformed (mismatching raw hash)", RawHash));
+ }
+ return {RawSize, Compressed};
+ };
+ }
+ };
+
+ ComposeBlocks(MaxBlockSize,
+ MaxChunksPerBlock,
+ AttachmentHashes,
+ AttachmentSizes,
+ AttachmentKeys,
+ FetchWholeAttachmentResolver,
+ OptionalContext,
+ RemoteResult);
+ }
+
+ {
+ std::vector<IoHash> AttachmentHashes;
+ AttachmentHashes.reserve(ChunkedChunkCount);
+ std::vector<uint64_t> AttachmentSizes;
+ AttachmentSizes.reserve(ChunkedChunkCount);
+ std::vector<Oid> AttachmentKeys;
+ AttachmentKeys.reserve(ChunkedChunkCount);
+
+ tsl::robin_map<IoHash, std::pair<size_t, size_t>, IoHash::Hasher> ChunkHashToChunkFileIndexAndChunkIndex;
+
for (size_t ChunkedFileIndex : ChunkedFilesOrder)
{
- const ChunkedFile& ChunkedFile = ChunkedFiles[ChunkedFileIndex];
- const ChunkedInfoWithSource& Chunked = ChunkedFile.Chunked;
- size_t ChunkCount = Chunked.Info.ChunkHashes.size();
+ const ChunkedFile& ChunkedFile = ChunkedFiles[ChunkedFileIndex];
+ const ChunkedInfoWithSource& Chunked = ChunkedFile.Chunked;
+ size_t ChunkCount = Chunked.Info.ChunkHashes.size();
+ Oid ChunkedFileOid = Oid::NewOid();
for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ChunkIndex++)
{
- if (remotestore_impl::IsCancelled(OptionalContext))
+ const IoHash& ChunkHash = Chunked.Info.ChunkHashes[ChunkIndex];
+ uint64_t ChunkSize = Chunked.ChunkSources[ChunkIndex].Size;
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- break;
- }
- if (AssembleBlocksProgressTimer.GetElapsedTimeMs() - LastAssembleBlocksProgressUpdateMs > 200)
- {
- remotestore_impl::ReportProgress(
- OptionalContext,
- "Assembling blocks"sv,
- fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
- ChunkAssembleCount,
- ChunkAssembleCount - ChunksAssembled,
- AssembleBlocksProgressTimer.GetElapsedTimeMs());
- LastAssembleBlocksProgressUpdateMs = AssembleBlocksProgressTimer.GetElapsedTimeMs();
- }
- const IoHash& ChunkHash = ChunkedFile.Chunked.Info.ChunkHashes[ChunkIndex];
- if (auto FindIt = ChunkedHashes.find(ChunkHash); FindIt != ChunkedHashes.end())
- {
- if (AddedAttachmentHashes.insert(ChunkHash).second)
+ if (ChunkHashToChunkFileIndexAndChunkIndex
+ .insert(std::make_pair(ChunkHash, std::make_pair(ChunkedFileIndex, ChunkIndex)))
+ .second)
{
- const ChunkSource& Source = Chunked.ChunkSources[ChunkIndex];
- uint32_t ChunkSize = gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size);
- if (BuildBlocks && ChunksInBlock.size() > 0)
+ if (ChunkSize > MaxChunkEmbedSize)
{
- if ((BlockSize + ChunkSize) > MaxBlockSize || (ChunksInBlock.size() + 1) > MaxChunksPerBlock)
- {
- NewBlock();
- }
+ OnLargeAttachment(ChunkHash,
+ [SourceBuffer = ChunkedFile.Source,
+ ChunkSource = Chunked.ChunkSources[ChunkIndex],
+ ChunkHash](const IoHash& RawHash) -> CompositeBuffer {
+ ZEN_ASSERT(RawHash == ChunkHash);
+ CompressedBuffer Compressed = CompressedBuffer::Compress(
+ SharedBuffer(IoBuffer(SourceBuffer, ChunkSource.Offset, ChunkSource.Size)),
+ OodleCompressor::Mermaid,
+ OodleCompressionLevel::None);
+ return Compressed.GetCompressed();
+ });
+
+ ResolveLock.WithExclusiveLock([ChunkHash, &LargeChunkHashes]() { LargeChunkHashes.insert(ChunkHash); });
+ }
+ else
+ {
+ AttachmentHashes.push_back(ChunkHash);
+ AttachmentSizes.push_back(ChunkSize);
+ AttachmentKeys.push_back(ChunkedFileOid);
}
- ChunksInBlock.emplace_back(
- std::make_pair(ChunkHash,
- [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size](
- const IoHash&) -> std::pair<uint64_t, CompressedBuffer> {
- return {Size,
- CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)),
- OodleCompressor::Mermaid,
- OodleCompressionLevel::None)};
- }));
- BlockSize += CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size;
- ChunksAssembled++;
}
- ChunkedHashes.erase(FindIt);
}
}
}
- }
- if (BlockSize > 0 && !RemoteResult.IsError())
- {
- if (!remotestore_impl::IsCancelled(OptionalContext))
- {
- NewBlock();
- }
+ auto ChunkedFileAttachmentResolver = [&ChunkHashToChunkFileIndexAndChunkIndex,
+ &ChunkedFiles](const IoHash& AttachmentHash) -> FetchChunkFunc {
+ if (auto It = ChunkHashToChunkFileIndexAndChunkIndex.find(AttachmentHash);
+ It != ChunkHashToChunkFileIndexAndChunkIndex.end())
+ {
+ const std::pair<size_t, size_t>& ChunkFileIndexAndChunkIndex = It->second;
+ size_t ChunkedFileIndex = ChunkFileIndexAndChunkIndex.first;
+ size_t ChunkIndex = ChunkFileIndexAndChunkIndex.second;
+ const ChunkedFile& ChunkedFile = ChunkedFiles[ChunkedFileIndex];
+
+ const ChunkSource& Source = ChunkedFile.Chunked.ChunkSources[ChunkIndex];
+ ZEN_ASSERT(Source.Offset + Source.Size <= ChunkedFile.Source.GetSize());
+
+ return [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size](
+ const IoHash&) -> std::pair<uint64_t, CompressedBuffer> {
+ return {Size,
+ CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)),
+ OodleCompressor::Mermaid,
+ OodleCompressionLevel::None)};
+ };
+ }
+ else
+ {
+ ZEN_ASSERT(false);
+ }
+ };
+
+ ComposeBlocks(MaxBlockSize,
+ MaxChunksPerBlock,
+ AttachmentHashes,
+ AttachmentSizes,
+ AttachmentKeys,
+ ChunkedFileAttachmentResolver,
+ OptionalContext,
+ RemoteResult);
}
if (ChunkAssembleCount > 0)
@@ -2938,7 +3354,8 @@ SaveOplog(CidStore& ChunkStore,
{
IoBuffer BlockBuffer = WriteToTempFile(std::move(CompressedBlock).GetCompressed(), BlockPath);
RwLock::ExclusiveLockScope __(AttachmentsLock);
- CreatedBlocks.insert({Block.BlockHash, {.Payload = std::move(BlockBuffer), .Block = std::move(Block)}});
+ CreatedBlocks.insert(
+ {Block.BlockHash, {.Payload = CompositeBuffer(SharedBuffer(std::move(BlockBuffer))), .Block = std::move(Block)}});
ZEN_DEBUG("Saved temp block to '{}', {}", AttachmentTempPath, NiceBytes(BlockBuffer.GetSize()));
}
catch (const std::exception& Ex)
@@ -4329,6 +4746,32 @@ namespace projectstore_testutils {
return Package;
};
+ static CbPackage CreateFilesOplogPackage(const Oid& Id,
+ const std::filesystem::path ProjectRootDir,
+ const std::span<const std::pair<Oid, std::filesystem::path>>& Attachments)
+ {
+ CbPackage Package;
+ CbObjectWriter Object;
+ Object << "key"sv << OidAsString(Id);
+ if (!Attachments.empty())
+ {
+ Object.BeginArray("files");
+ for (const auto& Attachment : Attachments)
+ {
+ std::filesystem::path ServerPath = std::filesystem::relative(Attachment.second, ProjectRootDir).generic_string();
+ std::filesystem::path ClientPath = ServerPath; // dummy
+ Object.BeginObject();
+ Object << "id"sv << Attachment.first;
+ Object << "serverpath"sv << ServerPath.string();
+ Object << "clientpath"sv << ClientPath.string();
+ Object.EndObject();
+ }
+ Object.EndArray();
+ }
+ Package.SetObject(Object.Save());
+ return Package;
+ };
+
static std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(
const std::span<const size_t>& Sizes,
OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast,
@@ -4345,6 +4788,23 @@ namespace projectstore_testutils {
return Result;
}
+ static std::vector<std::pair<Oid, std::filesystem::path>> CreateFileAttachments(const std::filesystem::path& RootDir,
+ const std::span<const size_t>& Sizes)
+ {
+ std::vector<std::pair<Oid, std::filesystem::path>> Result;
+ Result.reserve(Sizes.size());
+ for (size_t Size : Sizes)
+ {
+ IoBuffer FileBlob = CreateRandomBlob(Size);
+ IoHash FileHash = IoHash::HashBuffer(FileBlob);
+ std::filesystem::path UncompressedFilePath = RootDir / "content" / "uncompressed_file" / FileHash.ToHexString();
+ CreateDirectories(UncompressedFilePath.parent_path());
+ WriteFile(UncompressedFilePath, FileBlob);
+ Result.push_back({Oid::NewOid(), UncompressedFilePath});
+ }
+ return Result;
+ }
+
class TestJobContext : public JobContext
{
public:
@@ -4434,6 +4894,11 @@ TEST_CASE_TEMPLATE("project.store.export",
Oid::NewOid(),
CreateAttachments(std::initializer_list<size_t>{256u * 1024u, 92u * 1024u}, OodleCompressionLevel::None)));
+ Oplog->AppendNewOplogEntry(
+ CreateFilesOplogPackage(Oid::NewOid(),
+ RootDir,
+ CreateFileAttachments(RootDir, std::initializer_list<size_t>{423 * 1024, 2 * 1024, 3213, 762 * 1024})));
+
FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 64u * 1024,
.MaxChunksPerBlock = 1000,
.MaxChunkEmbedSize = 32 * 1024u,
@@ -4565,6 +5030,10 @@ SetupExportStore(CidStore& CidStore,
Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(
Oid::NewOid(),
CreateAttachments(std::initializer_list<size_t>{256u * 1024u, 92u * 1024u}, OodleCompressionLevel::None)));
+ Oplog->AppendNewOplogEntry(CreateFilesOplogPackage(
+ Oid::NewOid(),
+ Project.RootDir,
+ CreateFileAttachments(Project.RootDir, std::initializer_list<size_t>{423 * 1024, 2 * 1024, 3213, 762 * 1024})));
FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 64u * 1024,
.MaxChunksPerBlock = 1000,