diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-14 15:32:33 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2026-03-14 15:32:33 +0100 |
| commit | 57df0ee256fe5a247bb7d33b9e7db95542b010a7 (patch) | |
| tree | 266bd4ffcc32e1f9d78adfd6f5a985b607b24831 | |
| parent | remove redundant asserts (diff) | |
| download | zen-57df0ee256fe5a247bb7d33b9e7db95542b010a7.tar.xz zen-57df0ee256fe5a247bb7d33b9e7db95542b010a7.zip | |
break up SaveOplog phase 1
| -rw-r--r-- | src/zenremotestore/projectstore/remoteprojectstore.cpp | 2335 |
1 files changed, 1184 insertions, 1151 deletions
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp index 406229dff..fcd5fc9c0 100644 --- a/src/zenremotestore/projectstore/remoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp @@ -291,12 +291,12 @@ namespace remotestore_impl { while (!CurrentOpChunkSizes.empty()) { + size_t CurrentOpAttachmentCount = CurrentOpChunkSizes.size(); + ZEN_ASSERT(CurrentOpRawHashes.size() == CurrentOpChunkSizes.size()); ZEN_ASSERT(CurrentOpAttachmentsSize <= m_UsableBlockSize); ZEN_ASSERT(CurrentOpAttachmentCount <= m_Config.MaxChunksPerBlock); - size_t CurrentOpAttachmentCount = CurrentOpChunkSizes.size(); - // Path A: gathered chunks exactly fill one block -- emit as a standalone block immediately. if (CurrentOpFillFullBlock) { @@ -409,6 +409,401 @@ namespace remotestore_impl { const uint64_t m_UsableBlockSize = 0; }; + struct FoundAttachment + { + std::filesystem::path RawPath; // If not stored in cid + uint64_t Size = 0; + Oid Key = Oid::Zero; + }; + + CbObject RewriteOplog( + ProjectStore::Project& Project, + ProjectStore::Oplog& Oplog, + bool IgnoreMissingAttachments, + bool EmbedLooseFiles, + const std::filesystem::path& AttachmentTempPath, + std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher>& UploadAttachments, // TODO: Rename to OutUploadAttachments + remotestore_impl::AsyncRemoteResult& RemoteResult, + JobContext* OptionalContext) + { + size_t OpCount = 0; + CreateDirectories(AttachmentTempPath); + + auto RewriteOp = [&](const Oid& Key, CbObjectView Op, const std::function<void(CbObjectView)>& CB) { + bool OpRewritten = false; + CbArrayView Files = Op["files"sv].AsArrayView(); + if (Files.Num() == 0) + { + CB(Op); + return; + } + + CbWriter Cbo; + Cbo.BeginArray("files"sv); + + for (CbFieldView& Field : Files) + { + if (remotestore_impl::IsCancelled(OptionalContext)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + CB(Op); + return; + } + + bool CopyField = true; + + if (CbObjectView View = Field.AsObjectView()) + { + IoHash DataHash = View["data"sv].AsHash(); + + if (DataHash == IoHash::Zero) + { + std::string_view ServerPath = View["serverpath"sv].AsString(); + std::filesystem::path FilePath = (Project.RootDir / ServerPath).make_preferred(); + if (!IsFile(FilePath)) + { + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Missing attachment '{}' for op '{}'", FilePath, View["id"sv].AsObjectId())); + if (IgnoreMissingAttachments) + { + continue; + } + else + { + ExtendableStringBuilder<1024> Sb; + Sb.Append("Failed to find attachment '"); + Sb.Append(FilePath.string()); + Sb.Append("' for op: \n"); + View.ToJson(Sb); + throw std::runtime_error(Sb.ToString()); + } + } + + { + Stopwatch HashTimer; + SharedBuffer DataBuffer(IoBufferBuilder::MakeFromFile(FilePath)); + DataHash = IoHash::HashBuffer(CompositeBuffer(DataBuffer)); + ZEN_INFO("Hashed loose file '{}' {}: {} in {}", + FilePath, + NiceBytes(DataBuffer.GetSize()), + DataHash, + NiceTimeSpanMs(HashTimer.GetElapsedTimeMs())); + } + + // Rewrite file array entry with new data reference + CbObjectWriter Writer; + RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool { + if (Field.GetName() == "data"sv) + { + // omit this field as we will write it explicitly ourselves + return true; + } + return false; + }); + Writer.AddBinaryAttachment("data"sv, DataHash); + UploadAttachments.insert_or_assign(DataHash, FoundAttachment{.RawPath = FilePath, .Key = Key}); + + CbObject RewrittenOp = Writer.Save(); + Cbo.AddObject(std::move(RewrittenOp)); + CopyField = false; + } + } + + if (CopyField) + { + Cbo.AddField(Field); + } + else + { + OpRewritten = true; + } + } + + if (!OpRewritten) + { + CB(Op); + return; + } + + Cbo.EndArray(); + CbArray FilesArray = Cbo.Save().AsArray(); + + CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool { + if (Field.GetName() == "files"sv) + { + NewWriter.AddArray("files"sv, FilesArray); + + return true; + } + + return false; + }); + CB(RewrittenOp); + }; + + remotestore_impl::ReportMessage(OptionalContext, "Building exported oplog and collecting attachments"); + + Stopwatch Timer; + + size_t TotalOpCount = Oplog.GetOplogEntryCount(); + Stopwatch RewriteOplogTimer; + CbObjectWriter SectionOpsWriter; + SectionOpsWriter.BeginArray("ops"sv); + { + Stopwatch BuildingOplogProgressTimer; + Oplog.IterateOplogWithKey([&](int, const Oid& Key, CbObjectView Op) { + if (RemoteResult.IsError()) + { + return; + } + Op.IterateAttachments([&](CbFieldView FieldView) { + UploadAttachments.insert_or_assign(FieldView.AsAttachment(), FoundAttachment{.Key = Key}); + }); + if (EmbedLooseFiles) + { + RewriteOp(Key, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; }); + } + else + { + SectionOpsWriter << Op; + } + OpCount++; + if (remotestore_impl::IsCancelled(OptionalContext)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + return; + } + if (OpCount % 1000 == 0) + { + remotestore_impl::ReportProgress(OptionalContext, + "Building oplog"sv, + fmt::format("{} ops processed", OpCount), + TotalOpCount, + TotalOpCount - OpCount, + BuildingOplogProgressTimer.GetElapsedTimeMs()); + } + }); + if (RemoteResult.IsError()) + { + return {}; + } + if (remotestore_impl::IsCancelled(OptionalContext)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + return {}; + } + if (TotalOpCount > 0) + { + remotestore_impl::ReportProgress(OptionalContext, + "Building oplog"sv, + fmt::format("{} ops processed", OpCount), + TotalOpCount, + 0, + BuildingOplogProgressTimer.GetElapsedTimeMs()); + } + } + SectionOpsWriter.EndArray(); // "ops" + + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Rewrote {} ops to new oplog in {}", + OpCount, + NiceTimeSpanMs(static_cast<uint64_t>(RewriteOplogTimer.GetElapsedTimeMs())))); + + return SectionOpsWriter.Save(); + } + + struct FoundChunkedFile + { + IoHash RawHash = IoHash::Zero; + IoBuffer Source; + uint64_t Offset = 0; + uint64_t Size = 0; + }; + + void FindChunkSizes(CidStore& ChunkStore, + WorkerThreadPool& WorkerPool, + size_t MaxChunkEmbedSize, + size_t ChunkFileSizeLimit, + bool AllowChunking, + const std::filesystem::path& AttachmentTempPath, + std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher>& UploadAttachments, + std::unordered_set<IoHash, IoHash::Hasher>& MissingHashes, + std::vector<FoundChunkedFile>& AttachmentsToChunk, + JobContext* OptionalContext, + remotestore_impl::AsyncRemoteResult& RemoteResult) + { + { + RwLock FindChunkSizesLock; + Latch FindChunkSizesLatch(1); + for (auto& It : UploadAttachments) + { + if (remotestore_impl::IsCancelled(OptionalContext)) + { + break; + } + + FindChunkSizesLatch.AddCount(1); + + WorkerPool.ScheduleWork( + [&ChunkStore, + UploadAttachment = &It.second, + RawHash = It.first, + &FindChunkSizesLatch, + &FindChunkSizesLock, + &MissingHashes, + AttachmentTempPath, + MaxChunkEmbedSize, + ChunkFileSizeLimit, + AllowChunking, + &RemoteResult, + &AttachmentsToChunk, + OptionalContext]() { + ZEN_TRACE_CPU("PrepareChunk"); + + auto _ = MakeGuard([&FindChunkSizesLatch] { FindChunkSizesLatch.CountDown(); }); + if (remotestore_impl::IsCancelled(OptionalContext)) + { + return; + } + + try + { + if (!UploadAttachment->RawPath.empty()) + { + const std::filesystem::path& FilePath = UploadAttachment->RawPath; + IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath); + if (RawData) + { + UploadAttachment->Size = RawData.GetSize(); + if (AllowChunking && UploadAttachment->Size > ChunkFileSizeLimit) + { + FindChunkSizesLock.WithExclusiveLock([&]() { + AttachmentsToChunk.push_back(FoundChunkedFile{.RawHash = RawHash, + .Source = RawData, + .Offset = 0, + .Size = RawData.GetSize()}); + }); + } + } + else + { + FindChunkSizesLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); }); + } + } + else + { + IoBuffer Data = ChunkStore.FindChunkByCid(RawHash); + if (Data) + { + UploadAttachment->Size = Data.GetSize(); + if (AllowChunking && Data.IsWholeFile()) + { + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(Data), VerifyRawHash, VerifyRawSize); + if (Compressed) + { + if (VerifyRawSize > ChunkFileSizeLimit) + { + OodleCompressor Compressor; + OodleCompressionLevel CompressionLevel; + uint64_t BlockSize; + if (Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) + { + if (CompressionLevel == OodleCompressionLevel::None) + { + CompositeBuffer Decompressed = Compressed.DecompressToComposite(); + if (Decompressed) + { + std::span<const SharedBuffer> Segments = Decompressed.GetSegments(); + if (Segments.size() == 1) + { + IoBuffer DecompressedData = Segments[0].AsIoBuffer(); + IoBufferFileReference DecompressedFileRef; + if (DecompressedData.GetFileReference(DecompressedFileRef)) + { + // Are we still pointing to disk? + FindChunkSizesLock.WithExclusiveLock([&]() { + AttachmentsToChunk.push_back( + FoundChunkedFile{.RawHash = RawHash, + .Source = Data, + .Offset = DecompressedFileRef.FileChunkOffset, + .Size = DecompressedFileRef.FileChunkSize}); + }); + } + } + } + } + } + } + } + } + } + else + { + FindChunkSizesLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); }); + } + } + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to resolve attachment {}", RawHash), + Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + FindChunkSizesLatch.CountDown(); + + Stopwatch ResolveAttachmentsProgressTimer; + while (!FindChunkSizesLatch.Wait(1000)) + { + ptrdiff_t Remaining = FindChunkSizesLatch.Remaining(); + if (remotestore_impl::IsCancelled(OptionalContext)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + while (!FindChunkSizesLatch.Wait(1000)) + { + Remaining = FindChunkSizesLatch.Remaining(); + remotestore_impl::ReportProgress(OptionalContext, + "Resolving attachments"sv, + fmt::format("Aborting, {} attachments remaining...", Remaining), + UploadAttachments.size(), + Remaining, + ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); + } + remotestore_impl::ReportProgress(OptionalContext, + "Resolving attachments"sv, + "Aborted"sv, + UploadAttachments.size(), + 0, + ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); + return; + } + remotestore_impl::ReportProgress(OptionalContext, + "Resolving attachments"sv, + fmt::format("{} remaining...", Remaining), + UploadAttachments.size(), + Remaining, + ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); + } + } + } + RemoteProjectStore::Result WriteOplogSection(ProjectStore::Oplog& Oplog, const CbObjectView& SectionObject, JobContext* OptionalContext) { using namespace std::literals; @@ -2003,1353 +2398,993 @@ BuildContainer(CidStore& ChunkStore, std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<remotestore_impl::JobContextLogOutput>(OptionalContext)); - size_t OpCount = 0; + Stopwatch Timer; - CbObject OplogContainerObject; + CbObject OplogContainerObject; + CompressedBuffer CompressedOpsSection; + std::unordered_map<IoHash, remotestore_impl::FoundAttachment, IoHash::Hasher> UploadAttachments; + std::filesystem::path AttachmentTempPath = Oplog.TempPath(); + AttachmentTempPath.append(".pending"); + + CbObject SectionOps = remotestore_impl::RewriteOplog(Project, + Oplog, + IgnoreMissingAttachments, + EmbedLooseFiles, + AttachmentTempPath, + UploadAttachments, + RemoteResult, + OptionalContext); + + size_t TotalOpCount = Oplog.GetOplogEntryCount(); { - struct FoundAttachment - { - std::filesystem::path RawPath; // If not stored in cid - uint64_t Size = 0; - Oid Key = Oid::Zero; - }; - - std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher> UploadAttachments; - - CompressedBuffer OpsBuffer; - - std::filesystem::path AttachmentTempPath = Oplog.TempPath(); - AttachmentTempPath.append(".pending"); - CreateDirectories(AttachmentTempPath); - - auto RewriteOp = [&](const Oid& Key, CbObjectView Op, const std::function<void(CbObjectView)>& CB) { - bool OpRewritten = false; - CbArrayView Files = Op["files"sv].AsArrayView(); - if (Files.Num() == 0) - { - CB(Op); - return; - } - - CbWriter Cbo; - Cbo.BeginArray("files"sv); - - for (CbFieldView& Field : Files) - { - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - CB(Op); - return; - } - - bool CopyField = true; - - if (CbObjectView View = Field.AsObjectView()) - { - IoHash DataHash = View["data"sv].AsHash(); - - if (DataHash == IoHash::Zero) - { - std::string_view ServerPath = View["serverpath"sv].AsString(); - std::filesystem::path FilePath = (Project.RootDir / ServerPath).make_preferred(); - if (!IsFile(FilePath)) - { - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Missing attachment '{}' for op '{}'", FilePath, View["id"sv].AsObjectId())); - if (IgnoreMissingAttachments) - { - continue; - } - else - { - ExtendableStringBuilder<1024> Sb; - Sb.Append("Failed to find attachment '"); - Sb.Append(FilePath.string()); - Sb.Append("' for op: \n"); - View.ToJson(Sb); - throw std::runtime_error(Sb.ToString()); - } - } - - { - Stopwatch HashTimer; - SharedBuffer DataBuffer(IoBufferBuilder::MakeFromFile(FilePath)); - DataHash = IoHash::HashBuffer(CompositeBuffer(DataBuffer)); - ZEN_INFO("Hashed loose file '{}' {}: {} in {}", - FilePath, - NiceBytes(DataBuffer.GetSize()), - DataHash, - NiceTimeSpanMs(HashTimer.GetElapsedTimeMs())); - } - - // Rewrite file array entry with new data reference - CbObjectWriter Writer; - RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool { - if (Field.GetName() == "data"sv) - { - // omit this field as we will write it explicitly ourselves - return true; - } - return false; - }); - Writer.AddBinaryAttachment("data"sv, DataHash); - UploadAttachments.insert_or_assign(DataHash, FoundAttachment{.RawPath = FilePath, .Key = Key}); - - CbObject RewrittenOp = Writer.Save(); - Cbo.AddObject(std::move(RewrittenOp)); - CopyField = false; - } - } - - if (CopyField) - { - Cbo.AddField(Field); - } - else - { - OpRewritten = true; - } - } - - if (!OpRewritten) - { - CB(Op); - return; - } - - Cbo.EndArray(); - CbArray FilesArray = Cbo.Save().AsArray(); + Stopwatch CompressOpsTimer; + CompressedOpsSection = CompressedBuffer::Compress(SectionOps.GetBuffer(), OodleCompressor::Mermaid, OodleCompressionLevel::Fast); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Compressed oplog section {} ({} -> {}) in {}", + CompressedOpsSection.DecodeRawHash(), + NiceBytes(CompressedOpsSection.DecodeRawSize()), + NiceBytes(CompressedOpsSection.GetCompressedSize()), + NiceTimeSpanMs(static_cast<uint64_t>(CompressOpsTimer.GetElapsedTimeMs())))); + } - CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool { - if (Field.GetName() == "files"sv) - { - NewWriter.AddArray("files"sv, FilesArray); + if (remotestore_impl::IsCancelled(OptionalContext)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + return {}; + } - return true; - } + std::unordered_set<IoHash, IoHash::Hasher> FoundHashes; + FoundHashes.reserve(UploadAttachments.size()); + for (const auto& It : UploadAttachments) + { + FoundHashes.insert(It.first); + } - return false; - }); - CB(RewrittenOp); - }; + struct ChunkedFile + { + IoBuffer Source; + ChunkedInfoWithSource Chunked; + }; - remotestore_impl::ReportMessage(OptionalContext, "Building exported oplog and collecting attachments"); + std::unordered_set<IoHash, IoHash::Hasher> MissingHashes; + std::vector<remotestore_impl::FoundChunkedFile> AttachmentsToChunk; + + remotestore_impl::FindChunkSizes(ChunkStore, + WorkerPool, + MaxChunkEmbedSize, + ChunkFileSizeLimit, + AllowChunking, + AttachmentTempPath, + UploadAttachments, + MissingHashes, + AttachmentsToChunk, + OptionalContext, + RemoteResult); + if (remotestore_impl::IsCancelled(OptionalContext)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + return {}; + } - Stopwatch Timer; + for (const IoHash& AttachmentHash : MissingHashes) + { + auto It = UploadAttachments.find(AttachmentHash); + ZEN_ASSERT(It != UploadAttachments.end()); + std::optional<CbObject> Op = Oplog.GetOpByKey(It->second.Key); + ZEN_ASSERT(Op.has_value()); - size_t TotalOpCount = Oplog.GetOplogEntryCount(); - CompressedBuffer CompressedOpsSection; + if (IgnoreMissingAttachments) { - Stopwatch RewriteOplogTimer; - CbObjectWriter SectionOpsWriter; - SectionOpsWriter.BeginArray("ops"sv); - { - Stopwatch BuildingOplogProgressTimer; - Oplog.IterateOplogWithKey([&](int, const Oid& Key, CbObjectView Op) { - if (RemoteResult.IsError()) - { - return; - } - Op.IterateAttachments([&](CbFieldView FieldView) { - UploadAttachments.insert_or_assign(FieldView.AsAttachment(), FoundAttachment{.Key = Key}); - }); - if (EmbedLooseFiles) - { - RewriteOp(Key, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; }); - } - else - { - SectionOpsWriter << Op; - } - OpCount++; - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return; - } - if (OpCount % 1000 == 0) - { - remotestore_impl::ReportProgress(OptionalContext, - "Building oplog"sv, - fmt::format("{} ops processed", OpCount), - TotalOpCount, - TotalOpCount - OpCount, - BuildingOplogProgressTimer.GetElapsedTimeMs()); - } - }); - if (RemoteResult.IsError()) - { - return {}; - } - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return {}; - } - if (TotalOpCount > 0) - { - remotestore_impl::ReportProgress(OptionalContext, - "Building oplog"sv, - fmt::format("{} ops processed", OpCount), - TotalOpCount, - 0, - BuildingOplogProgressTimer.GetElapsedTimeMs()); - } - } - SectionOpsWriter.EndArray(); // "ops" - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Rewrote {} ops to new oplog in {}", - OpCount, - NiceTimeSpanMs(static_cast<uint64_t>(RewriteOplogTimer.GetElapsedTimeMs())))); - - { - Stopwatch CompressOpsTimer; - CompressedOpsSection = - CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer(), OodleCompressor::Mermaid, OodleCompressionLevel::Fast); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Compressed oplog section {} ({} -> {}) in {}", - CompressedOpsSection.DecodeRawHash(), - NiceBytes(CompressedOpsSection.DecodeRawSize()), - NiceBytes(CompressedOpsSection.GetCompressedSize()), - NiceTimeSpanMs(static_cast<uint64_t>(CompressOpsTimer.GetElapsedTimeMs())))); - } + fmt::format("Missing attachment '{}' for op '{}'", AttachmentHash, It->second.Key)); } - - if (remotestore_impl::IsCancelled(OptionalContext)) + else { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + ExtendableStringBuilder<1024> Sb; + Sb.Append("Failed to find attachment '"); + Sb.Append(AttachmentHash.ToHexString()); + Sb.Append("' for op: \n"); + Op.value().ToJson(Sb); + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {}); return {}; } + UploadAttachments.erase(AttachmentHash); + } - std::unordered_set<IoHash, IoHash::Hasher> FoundHashes; - FoundHashes.reserve(UploadAttachments.size()); - for (const auto& It : UploadAttachments) - { - FoundHashes.insert(It.first); - } - - struct ChunkedFile - { - IoBuffer Source; - - ChunkedInfoWithSource Chunked; - }; - - std::unordered_set<IoHash, IoHash::Hasher> MissingHashes; + std::unordered_set<IoHash, IoHash::Hasher> ChunkedHashes; - struct FoundChunkedFile - { - IoHash RawHash = IoHash::Zero; - IoBuffer Source; - uint64_t Offset = 0; - uint64_t Size = 0; - }; + std::vector<ChunkedFile> ChunkedFiles(AttachmentsToChunk.size()); - std::vector<FoundChunkedFile> AttachmentsToChunk; + { + Latch ChunkFilesLatch(1); + for (size_t ChunkFileIndexToChunk = 0; ChunkFileIndexToChunk < AttachmentsToChunk.size(); ChunkFileIndexToChunk++) { - RwLock FindChunkSizesLock; - Latch FindChunkSizesLatch(1); - for (auto& It : UploadAttachments) - { - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return {}; - } + ChunkFilesLatch.AddCount(1); - FindChunkSizesLatch.AddCount(1); - - WorkerPool.ScheduleWork( - [&ChunkStore, - UploadAttachment = &It.second, - RawHash = It.first, - &FindChunkSizesLatch, - &FindChunkSizesLock, - // &LooseUploadAttachments, - &MissingHashes, - &OnLargeAttachment, - &AttachmentTempPath, - MaxChunkEmbedSize, - ChunkFileSizeLimit, - AllowChunking, - &RemoteResult, - &AttachmentsToChunk, - OptionalContext]() { - ZEN_TRACE_CPU("PrepareChunk"); - - auto _ = MakeGuard([&FindChunkSizesLatch] { FindChunkSizesLatch.CountDown(); }); - if (remotestore_impl::IsCancelled(OptionalContext)) - { - return; - } - - try - { - if (!UploadAttachment->RawPath.empty()) - { - const std::filesystem::path& FilePath = UploadAttachment->RawPath; - IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath); - if (RawData) - { - UploadAttachment->Size = RawData.GetSize(); - if (AllowChunking && UploadAttachment->Size > ChunkFileSizeLimit) - { - FindChunkSizesLock.WithExclusiveLock([&]() { - AttachmentsToChunk.push_back(FoundChunkedFile{.RawHash = RawHash, - .Source = RawData, - .Offset = 0, - .Size = RawData.GetSize()}); - }); - } - } - else - { - FindChunkSizesLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); }); - } - } - else - { - IoBuffer Data = ChunkStore.FindChunkByCid(RawHash); - if (Data) - { - UploadAttachment->Size = Data.GetSize(); - if (AllowChunking && Data.IsWholeFile()) - { - IoHash VerifyRawHash; - uint64_t VerifyRawSize; - CompressedBuffer Compressed = - CompressedBuffer::FromCompressed(SharedBuffer(Data), VerifyRawHash, VerifyRawSize); - if (Compressed) - { - if (VerifyRawSize > ChunkFileSizeLimit) - { - OodleCompressor Compressor; - OodleCompressionLevel CompressionLevel; - uint64_t BlockSize; - if (Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) - { - if (CompressionLevel == OodleCompressionLevel::None) - { - CompositeBuffer Decompressed = Compressed.DecompressToComposite(); - if (Decompressed) - { - std::span<const SharedBuffer> Segments = Decompressed.GetSegments(); - if (Segments.size() == 1) - { - IoBuffer DecompressedData = Segments[0].AsIoBuffer(); - IoBufferFileReference DecompressedFileRef; - if (DecompressedData.GetFileReference(DecompressedFileRef)) - { - // Are we still pointing to disk? - FindChunkSizesLock.WithExclusiveLock([&]() { - AttachmentsToChunk.push_back( - FoundChunkedFile{.RawHash = RawHash, - .Source = Data, - .Offset = DecompressedFileRef.FileChunkOffset, - .Size = DecompressedFileRef.FileChunkSize}); - }); - } - } - } - } - } - } - } - } - } - else - { - FindChunkSizesLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); }); - } - } - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Failed to resolve attachment {}", RawHash), - Ex.what()); - } - }, - WorkerThreadPool::EMode::DisableBacklog); - } - FindChunkSizesLatch.CountDown(); + WorkerPool.ScheduleWork( + [&ChunkFilesLatch, &AttachmentsToChunk, ChunkFileIndexToChunk, &ChunkedFiles, OptionalContext]() { + ZEN_TRACE_CPU("ChunkFileAsync"); - Stopwatch ResolveAttachmentsProgressTimer; - while (!FindChunkSizesLatch.Wait(1000)) - { - ptrdiff_t Remaining = FindChunkSizesLatch.Remaining(); - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - while (!FindChunkSizesLatch.Wait(1000)) + auto _ = MakeGuard([&ChunkFilesLatch] { ChunkFilesLatch.CountDown(); }); + if (remotestore_impl::IsCancelled(OptionalContext)) { - Remaining = FindChunkSizesLatch.Remaining(); - remotestore_impl::ReportProgress(OptionalContext, - "Resolving attachments"sv, - fmt::format("Aborting, {} attachments remaining...", Remaining), - UploadAttachments.size(), - Remaining, - ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); + return; } - remotestore_impl::ReportProgress(OptionalContext, - "Resolving attachments"sv, - "Aborted"sv, - UploadAttachments.size(), - 0, - ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); - return {}; - } - remotestore_impl::ReportProgress(OptionalContext, - "Resolving attachments"sv, - fmt::format("{} remaining...", Remaining), - UploadAttachments.size(), - Remaining, - ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); - } - } - for (const IoHash& AttachmentHash : MissingHashes) - { - auto It = UploadAttachments.find(AttachmentHash); - ZEN_ASSERT(It != UploadAttachments.end()); - std::optional<CbObject> Op = Oplog.GetOpByKey(It->second.Key); - ZEN_ASSERT(Op.has_value()); + const remotestore_impl::FoundChunkedFile& AttachmentToChunk = AttachmentsToChunk[ChunkFileIndexToChunk]; + const IoHash& RawHash = AttachmentToChunk.RawHash; - if (IgnoreMissingAttachments) - { - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Missing attachment '{}' for op '{}'", AttachmentHash, It->second.Key)); - } - else - { - ExtendableStringBuilder<1024> Sb; - Sb.Append("Failed to find attachment '"); - Sb.Append(AttachmentHash.ToHexString()); - Sb.Append("' for op: \n"); - Op.value().ToJson(Sb); - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {}); - return {}; - } - UploadAttachments.erase(AttachmentHash); - } + const IoBuffer& Buffer = AttachmentToChunk.Source; + IoBufferFileReference FileRef; + bool IsFile = Buffer.GetFileReference(FileRef); + ZEN_ASSERT(IsFile); - std::unordered_set<IoHash, IoHash::Hasher> ChunkedHashes; + Stopwatch ChunkOneTimer; - std::vector<ChunkedFile> ChunkedFiles(AttachmentsToChunk.size()); - - { - Latch ChunkFilesLatch(1); + uint64_t Offset = AttachmentToChunk.Offset; + uint64_t Size = AttachmentToChunk.Size; - for (size_t ChunkFileIndexToChunk = 0; ChunkFileIndexToChunk < AttachmentsToChunk.size(); ChunkFileIndexToChunk++) - { - ChunkFilesLatch.AddCount(1); + BasicFile SourceFile; + SourceFile.Attach(FileRef.FileHandle); + auto __ = MakeGuard([&SourceFile]() { SourceFile.Detach(); }); - WorkerPool.ScheduleWork( - [&ChunkFilesLatch, &AttachmentsToChunk, ChunkFileIndexToChunk, &ChunkedFiles, OptionalContext]() { - ZEN_TRACE_CPU("ChunkFileAsync"); + ChunkedFile& Chunked = ChunkedFiles[ChunkFileIndexToChunk]; + Chunked.Source = Buffer; + Chunked.Chunked = ChunkData(SourceFile, Offset, Size, UShaderByteCodeParams); + ZEN_ASSERT(Chunked.Chunked.Info.RawHash == RawHash); - auto _ = MakeGuard([&ChunkFilesLatch] { ChunkFilesLatch.CountDown(); }); - if (remotestore_impl::IsCancelled(OptionalContext)) - { - return; - } - - const FoundChunkedFile& AttachmentToChunk = AttachmentsToChunk[ChunkFileIndexToChunk]; - const IoHash& RawHash = AttachmentToChunk.RawHash; - - const IoBuffer& Buffer = AttachmentToChunk.Source; - IoBufferFileReference FileRef; - bool IsFile = Buffer.GetFileReference(FileRef); - ZEN_ASSERT(IsFile); - - Stopwatch ChunkOneTimer; - - uint64_t Offset = AttachmentToChunk.Offset; - uint64_t Size = AttachmentToChunk.Size; - - BasicFile SourceFile; - SourceFile.Attach(FileRef.FileHandle); - auto __ = MakeGuard([&SourceFile]() { SourceFile.Detach(); }); - - ChunkedFile& Chunked = ChunkedFiles[ChunkFileIndexToChunk]; - Chunked.Source = Buffer; - Chunked.Chunked = ChunkData(SourceFile, Offset, Size, UShaderByteCodeParams); - ZEN_ASSERT(Chunked.Chunked.Info.RawHash == RawHash); - - ZEN_INFO("Chunked large attachment '{}' {} into {} chunks in {}", - RawHash, - NiceBytes(Chunked.Chunked.Info.RawSize), - Chunked.Chunked.Info.ChunkHashes.size(), - NiceTimeSpanMs(ChunkOneTimer.GetElapsedTimeMs())); - }, - WorkerThreadPool::EMode::DisableBacklog); - } - ChunkFilesLatch.CountDown(); - ChunkFilesLatch.Wait(); - // TODO: Progress (change WorkerThreadPool::EMode::DisableBacklog) + ZEN_INFO("Chunked large attachment '{}' {} into {} chunks in {}", + RawHash, + NiceBytes(Chunked.Chunked.Info.RawSize), + Chunked.Chunked.Info.ChunkHashes.size(), + NiceTimeSpanMs(ChunkOneTimer.GetElapsedTimeMs())); + }, + WorkerThreadPool::EMode::DisableBacklog); } + ChunkFilesLatch.CountDown(); + ChunkFilesLatch.Wait(); + // TODO: Progress (change WorkerThreadPool::EMode::DisableBacklog) + } - for (const ChunkedFile& Chunked : ChunkedFiles) + for (const ChunkedFile& Chunked : ChunkedFiles) + { + UploadAttachments.erase(Chunked.Chunked.Info.RawHash); + for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes) { - UploadAttachments.erase(Chunked.Chunked.Info.RawHash); - for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes) - { - UploadAttachments.erase(ChunkHash); - } + UploadAttachments.erase(ChunkHash); } + } - size_t ChunkedChunkCount = - std::accumulate(ChunkedFiles.begin(), ChunkedFiles.end(), size_t(0), [](size_t Current, const ChunkedFile& Value) { - return Current + Value.Chunked.Info.ChunkHashes.size(); - }); + size_t ChunkedChunkCount = + std::accumulate(ChunkedFiles.begin(), ChunkedFiles.end(), size_t(0), [](size_t Current, const ChunkedFile& Value) { + return Current + Value.Chunked.Info.ChunkHashes.size(); + }); - size_t ReusedAttachmentCount = 0; - std::vector<size_t> ReusedBlockIndexes; - { - std::unordered_set<IoHash, IoHash::Hasher> UniqueChunkHashes; - UniqueChunkHashes.reserve(FoundHashes.size() + ChunkedChunkCount); + size_t ReusedAttachmentCount = 0; + std::vector<size_t> ReusedBlockIndexes; + { + std::unordered_set<IoHash, IoHash::Hasher> UniqueChunkHashes; + UniqueChunkHashes.reserve(FoundHashes.size() + ChunkedChunkCount); - UniqueChunkHashes.insert(FoundHashes.begin(), FoundHashes.end()); + UniqueChunkHashes.insert(FoundHashes.begin(), FoundHashes.end()); - for (ChunkedFile& Chunked : ChunkedFiles) - { - UniqueChunkHashes.insert(Chunked.Chunked.Info.ChunkHashes.begin(), Chunked.Chunked.Info.ChunkHashes.end()); - } - std::vector<IoHash> ChunkHashes(UniqueChunkHashes.begin(), UniqueChunkHashes.end()); - - std::vector<uint32_t> ChunkIndexes; - ChunkIndexes.resize(ChunkHashes.size()); - std::iota(ChunkIndexes.begin(), ChunkIndexes.end(), 0); - - std::vector<uint32_t> UnusedChunkIndexes; - ReuseBlocksStatistics ReuseBlocksStats; - - ReusedBlockIndexes = FindReuseBlocks(*LogOutput, - /*BlockReuseMinPercentLimit*/ 80, - /*IsVerbose*/ false, - ReuseBlocksStats, - KnownBlocks, - ChunkHashes, - ChunkIndexes, - UnusedChunkIndexes); - for (size_t KnownBlockIndex : ReusedBlockIndexes) + for (ChunkedFile& Chunked : ChunkedFiles) + { + UniqueChunkHashes.insert(Chunked.Chunked.Info.ChunkHashes.begin(), Chunked.Chunked.Info.ChunkHashes.end()); + } + std::vector<IoHash> ChunkHashes(UniqueChunkHashes.begin(), UniqueChunkHashes.end()); + + std::vector<uint32_t> ChunkIndexes; + ChunkIndexes.resize(ChunkHashes.size()); + std::iota(ChunkIndexes.begin(), ChunkIndexes.end(), 0); + + std::vector<uint32_t> UnusedChunkIndexes; + ReuseBlocksStatistics ReuseBlocksStats; + + ReusedBlockIndexes = FindReuseBlocks(*LogOutput, + /*BlockReuseMinPercentLimit*/ 80, + /*IsVerbose*/ false, + ReuseBlocksStats, + KnownBlocks, + ChunkHashes, + ChunkIndexes, + UnusedChunkIndexes); + for (size_t KnownBlockIndex : ReusedBlockIndexes) + { + const ThinChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; + for (const IoHash& KnownHash : KnownBlock.ChunkRawHashes) { - const ThinChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; - for (const IoHash& KnownHash : KnownBlock.ChunkRawHashes) + if (UploadAttachments.erase(KnownHash) == 1) { - if (UploadAttachments.erase(KnownHash) == 1) - { - ReusedAttachmentCount++; - } + ReusedAttachmentCount++; } } } + } - RwLock ResolveLock; - std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes; - std::unordered_map<IoHash, std::pair<uint64_t, IoBuffer>, IoHash::Hasher> LooseUploadAttachments; + RwLock ResolveLock; + std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes; + std::unordered_map<IoHash, std::pair<uint64_t, IoBuffer>, IoHash::Hasher> LooseUploadAttachments; - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Resolving {} attachments from {} ops", UploadAttachments.size(), TotalOpCount)); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Resolving {} attachments from {} ops", UploadAttachments.size(), TotalOpCount)); - Stopwatch ResolveAttachmentsProgressTimer; - Latch ResolveAttachmentsLatch(1); - for (auto& It : UploadAttachments) + Stopwatch ResolveAttachmentsProgressTimer; + Latch ResolveAttachmentsLatch(1); + for (auto& It : UploadAttachments) + { + if (remotestore_impl::IsCancelled(OptionalContext)) { - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return {}; - } + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + return {}; + } - ResolveAttachmentsLatch.AddCount(1); + ResolveAttachmentsLatch.AddCount(1); - WorkerPool.ScheduleWork( - [&ChunkStore, - UploadAttachment = &It.second, - RawHash = It.first, - &ResolveAttachmentsLatch, - &ResolveLock, - &ChunkedHashes, - &LargeChunkHashes, - &LooseUploadAttachments, - &MissingHashes, - &OnLargeAttachment, - &AttachmentTempPath, - MaxChunkEmbedSize, - ChunkFileSizeLimit, - AllowChunking, - &RemoteResult, - OptionalContext]() { - ZEN_TRACE_CPU("PrepareChunk"); + WorkerPool.ScheduleWork( + [&ChunkStore, + UploadAttachment = &It.second, + RawHash = It.first, + &ResolveAttachmentsLatch, + &ResolveLock, + &ChunkedHashes, + &LargeChunkHashes, + &LooseUploadAttachments, + &MissingHashes, + &OnLargeAttachment, + &AttachmentTempPath, + MaxChunkEmbedSize, + ChunkFileSizeLimit, + AllowChunking, + &RemoteResult, + OptionalContext]() { + ZEN_TRACE_CPU("PrepareChunk"); - auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); }); - if (remotestore_impl::IsCancelled(OptionalContext)) - { - return; - } + auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); }); + if (remotestore_impl::IsCancelled(OptionalContext)) + { + return; + } - try + try + { + ZEN_ASSERT(UploadAttachment->Size != 0); + if (!UploadAttachment->RawPath.empty()) { - ZEN_ASSERT(UploadAttachment->Size != 0); - if (!UploadAttachment->RawPath.empty()) + if (UploadAttachment->Size > (MaxChunkEmbedSize * 2)) { - if (UploadAttachment->Size > (MaxChunkEmbedSize * 2)) - { - // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't - // it will be a loose attachment instead of going into a block - OnLargeAttachment( - RawHash, - [RawPath = UploadAttachment->RawPath, AttachmentTempPath, UploadAttachment]( - const IoHash& RawHash) -> CompositeBuffer { - IoBuffer RawData = IoBufferBuilder::MakeFromFile(RawPath); - if (!RawData) - { - throw std::runtime_error( - fmt::format("Failed to read source file for blob {} from '{}'", RawHash, RawPath)); - } - - std::filesystem::path AttachmentPath = AttachmentTempPath; - AttachmentPath.append(RawHash.ToHexString()); - - IoBuffer TempAttachmentBuffer = - remotestore_impl::CompressToTempFile(RawHash, - RawData, - AttachmentPath, - OodleCompressor::Mermaid, - OodleCompressionLevel::VeryFast); - TempAttachmentBuffer.SetDeleteOnClose(true); - -#if ZEN_BUILD_DEBUG - IoHash VerifyRawHash; - uint64_t VerifyRawSize; - ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressed(SharedBuffer(TempAttachmentBuffer), - VerifyRawHash, - VerifyRawSize)); - ZEN_ASSERT_SLOW(VerifyRawHash == RawHash); - ZEN_ASSERT_SLOW(VerifyRawSize == RawData.GetSize()); -#endif // ZEN_BUILD_DEBUG - - ZEN_INFO("Saved temp attachment to '{}', {} ({})", - AttachmentPath, - NiceBytes(UploadAttachment->Size), - NiceBytes(TempAttachmentBuffer.GetSize())); - return CompositeBuffer(SharedBuffer(std::move(TempAttachmentBuffer))); - }); - ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); - } - else - { - // Compress inline - check compressed size to see if it should go into a block or not - IoBuffer RawData = IoBufferBuilder::MakeFromFile(UploadAttachment->RawPath); - if (!RawData) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Failed to read attachment {}", UploadAttachment->RawPath), - ""); - return; - } + // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't + // it will be a loose attachment instead of going into a block + OnLargeAttachment( + RawHash, + [RawPath = UploadAttachment->RawPath, AttachmentTempPath, UploadAttachment]( + const IoHash& RawHash) -> CompositeBuffer { + IoBuffer RawData = IoBufferBuilder::MakeFromFile(RawPath); + if (!RawData) + { + throw std::runtime_error( + fmt::format("Failed to read source file for blob {} from '{}'", RawHash, RawPath)); + } - std::filesystem::path TempFilePath = AttachmentTempPath; - TempFilePath.append(RawHash.ToHexString()); + std::filesystem::path AttachmentPath = AttachmentTempPath; + AttachmentPath.append(RawHash.ToHexString()); - try - { IoBuffer TempAttachmentBuffer = remotestore_impl::CompressToTempFile(RawHash, RawData, - TempFilePath, + AttachmentPath, OodleCompressor::Mermaid, OodleCompressionLevel::VeryFast); TempAttachmentBuffer.SetDeleteOnClose(true); + #if ZEN_BUILD_DEBUG IoHash VerifyRawHash; uint64_t VerifyRawSize; ZEN_ASSERT_SLOW( CompressedBuffer::FromCompressed(SharedBuffer(TempAttachmentBuffer), VerifyRawHash, VerifyRawSize)); - ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)) - .CompressedBuffer::Decompress()); ZEN_ASSERT_SLOW(VerifyRawHash == RawHash); ZEN_ASSERT_SLOW(VerifyRawSize == RawData.GetSize()); #endif // ZEN_BUILD_DEBUG - uint64_t CompressedSize = TempAttachmentBuffer.GetSize(); - ZEN_INFO("Saved temp attachment to '{}', {} ({})", - TempFilePath, + AttachmentPath, NiceBytes(UploadAttachment->Size), - NiceBytes(CompressedSize)); + NiceBytes(TempAttachmentBuffer.GetSize())); + return CompositeBuffer(SharedBuffer(std::move(TempAttachmentBuffer))); + }); + ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); + } + else + { + // Compress inline - check compressed size to see if it should go into a block or not + IoBuffer RawData = IoBufferBuilder::MakeFromFile(UploadAttachment->RawPath); + if (!RawData) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to read attachment {}", UploadAttachment->RawPath), + ""); + return; + } - if (CompressedSize > MaxChunkEmbedSize) - { - OnLargeAttachment(RawHash, [Data = std::move(TempAttachmentBuffer)](const IoHash&) { - return CompositeBuffer(SharedBuffer(std::move(Data))); - }); - ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); - } - else - { - UploadAttachment->Size = CompressedSize; - { - IoHash VerifyRawHash2; - uint64_t VerifyRawSize2; - CompressedBuffer Compressed = - CompressedBuffer::FromCompressed(SharedBuffer(TempAttachmentBuffer), - VerifyRawHash2, - VerifyRawSize2); - ZEN_ASSERT_SLOW(Compressed.Decompress()); - ZEN_ASSERT_SLOW(VerifyRawHash2 == RawHash); - ZEN_ASSERT_SLOW(VerifyRawSize2 == RawData.GetSize()); - } + std::filesystem::path TempFilePath = AttachmentTempPath; + TempFilePath.append(RawHash.ToHexString()); - ResolveLock.WithExclusiveLock([RawHash, - RawSize = RawData.GetSize(), - &LooseUploadAttachments, - Data = std::move(TempAttachmentBuffer)]() { - LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data))); - }); - } - } - catch (const std::system_error& SysEx) + try + { + IoBuffer TempAttachmentBuffer = remotestore_impl::CompressToTempFile(RawHash, + RawData, + TempFilePath, + OodleCompressor::Mermaid, + OodleCompressionLevel::VeryFast); + TempAttachmentBuffer.SetDeleteOnClose(true); +#if ZEN_BUILD_DEBUG + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + ZEN_ASSERT_SLOW( + CompressedBuffer::FromCompressed(SharedBuffer(TempAttachmentBuffer), VerifyRawHash, VerifyRawSize)); + ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)) + .CompressedBuffer::Decompress()); + ZEN_ASSERT_SLOW(VerifyRawHash == RawHash); + ZEN_ASSERT_SLOW(VerifyRawSize == RawData.GetSize()); +#endif // ZEN_BUILD_DEBUG + + uint64_t CompressedSize = TempAttachmentBuffer.GetSize(); + + ZEN_INFO("Saved temp attachment to '{}', {} ({})", + TempFilePath, + NiceBytes(UploadAttachment->Size), + NiceBytes(CompressedSize)); + + if (CompressedSize > MaxChunkEmbedSize) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to compress blob {} to temporary file '{}', reason: ({}) {}", - RawHash, - TempFilePath, - SysEx.code().value(), - SysEx.what()), - ""); + OnLargeAttachment(RawHash, [Data = std::move(TempAttachmentBuffer)](const IoHash&) { + return CompositeBuffer(SharedBuffer(std::move(Data))); + }); + ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); } - catch (const std::exception& Ex) + else { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to compress blob {} to temporary file '{}', reason: {}", - RawHash, - TempFilePath, - Ex.what()), - ""); + UploadAttachment->Size = CompressedSize; + { + IoHash VerifyRawHash2; + uint64_t VerifyRawSize2; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(TempAttachmentBuffer), + VerifyRawHash2, + VerifyRawSize2); + ZEN_ASSERT_SLOW(Compressed.Decompress()); + ZEN_ASSERT_SLOW(VerifyRawHash2 == RawHash); + ZEN_ASSERT_SLOW(VerifyRawSize2 == RawData.GetSize()); + } + + ResolveLock.WithExclusiveLock([RawHash, + RawSize = RawData.GetSize(), + &LooseUploadAttachments, + Data = std::move(TempAttachmentBuffer)]() { + LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data))); + }); } } - } - else - { - if (UploadAttachment->Size > MaxChunkEmbedSize) + catch (const std::system_error& SysEx) { - OnLargeAttachment(RawHash, [&ChunkStore](const IoHash& RawHash) { - return CompositeBuffer(SharedBuffer(std::move(ChunkStore.FindChunkByCid(RawHash)))); - }); - ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to compress blob {} to temporary file '{}', reason: ({}) {}", + RawHash, + TempFilePath, + SysEx.code().value(), + SysEx.what()), + ""); + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to compress blob {} to temporary file '{}', reason: {}", + RawHash, + TempFilePath, + Ex.what()), + ""); } } } - catch (const std::exception& Ex) + else { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Failed to resolve attachment {}", RawHash), - Ex.what()); + if (UploadAttachment->Size > MaxChunkEmbedSize) + { + OnLargeAttachment(RawHash, [&ChunkStore](const IoHash& RawHash) { + return CompositeBuffer(SharedBuffer(std::move(ChunkStore.FindChunkByCid(RawHash)))); + }); + ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); + } } - }, - WorkerThreadPool::EMode::EnableBacklog); - } - ResolveAttachmentsLatch.CountDown(); + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to resolve attachment {}", RawHash), + Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + ResolveAttachmentsLatch.CountDown(); + { + while (!ResolveAttachmentsLatch.Wait(1000)) { - while (!ResolveAttachmentsLatch.Wait(1000)) + ptrdiff_t Remaining = ResolveAttachmentsLatch.Remaining(); + if (remotestore_impl::IsCancelled(OptionalContext)) { - ptrdiff_t Remaining = ResolveAttachmentsLatch.Remaining(); - if (remotestore_impl::IsCancelled(OptionalContext)) + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + while (!ResolveAttachmentsLatch.Wait(1000)) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - while (!ResolveAttachmentsLatch.Wait(1000)) - { - Remaining = ResolveAttachmentsLatch.Remaining(); - remotestore_impl::ReportProgress(OptionalContext, - "Resolving attachments"sv, - "Aborted"sv, - UploadAttachments.size(), - Remaining, - ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); - } + Remaining = ResolveAttachmentsLatch.Remaining(); remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, "Aborted"sv, UploadAttachments.size(), - 0, + Remaining, ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); - return {}; } remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, - fmt::format("{} remaining...", Remaining), - UploadAttachments.size(), - Remaining, - ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); - } - if (UploadAttachments.size() > 0) - { - remotestore_impl::ReportProgress(OptionalContext, - "Resolving attachments"sv, - ""sv, + "Aborted"sv, UploadAttachments.size(), 0, ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); + return {}; } + remotestore_impl::ReportProgress(OptionalContext, + "Resolving attachments"sv, + fmt::format("{} remaining...", Remaining), + UploadAttachments.size(), + Remaining, + ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); } - - if (remotestore_impl::IsCancelled(OptionalContext)) + if (UploadAttachments.size() > 0) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return {}; + remotestore_impl::ReportProgress(OptionalContext, + "Resolving attachments"sv, + ""sv, + UploadAttachments.size(), + 0, + ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); } + } - for (const auto& It : LargeChunkHashes) - { - UploadAttachments.erase(It); - } + if (remotestore_impl::IsCancelled(OptionalContext)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + return {}; + } - RwLock BlocksLock; - std::vector<ChunkBlockDescription> Blocks; + for (const auto& It : LargeChunkHashes) + { + UploadAttachments.erase(It); + } - std::vector<std::pair<IoHash, Oid>> SortedUploadAttachments; - SortedUploadAttachments.reserve(UploadAttachments.size()); - for (const auto& It : UploadAttachments) - { - SortedUploadAttachments.push_back(std::make_pair(It.first, It.second.Key)); - } + RwLock BlocksLock; + std::vector<ChunkBlockDescription> Blocks; - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return {}; - } + std::vector<std::pair<IoHash, Oid>> SortedUploadAttachments; + SortedUploadAttachments.reserve(UploadAttachments.size()); + for (const auto& It : UploadAttachments) + { + SortedUploadAttachments.push_back(std::make_pair(It.first, It.second.Key)); + } + if (remotestore_impl::IsCancelled(OptionalContext)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Sorting {} attachments from {} ops", SortedUploadAttachments.size(), TotalOpCount)); - - // Sort attachments so we get predictable blocks for the same oplog upload - std::sort(SortedUploadAttachments.begin(), - SortedUploadAttachments.end(), - [](const std::pair<IoHash, Oid>& Lhs, const std::pair<IoHash, Oid>& Rhs) { - if (Lhs.second == Rhs.second) - { - // Same key, sort by raw hash - return Lhs.first < Rhs.first; - } - // Sort by key - return Lhs.second < Rhs.second; - }); - - std::vector<size_t> ChunkedFilesOrder; - ChunkedFilesOrder.reserve(ChunkedFiles.size()); - for (size_t Index = 0; Index < ChunkedFiles.size(); Index++) - { - ChunkedFilesOrder.push_back(Index); - } - std::sort(ChunkedFilesOrder.begin(), ChunkedFilesOrder.end(), [&ChunkedFiles](size_t Lhs, size_t Rhs) { - return ChunkedFiles[Lhs].Chunked.Info.RawHash < ChunkedFiles[Rhs].Chunked.Info.RawHash; - }); + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + return {}; + } - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return {}; - } - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Assembling {} attachments and {} chunked parts from {} ops into blocks and loose attachments", - SortedUploadAttachments.size(), - ChunkedHashes.size(), - TotalOpCount)); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Sorting {} attachments from {} ops", SortedUploadAttachments.size(), TotalOpCount)); + + // Sort attachments so we get predictable blocks for the same oplog upload + std::sort(SortedUploadAttachments.begin(), + SortedUploadAttachments.end(), + [](const std::pair<IoHash, Oid>& Lhs, const std::pair<IoHash, Oid>& Rhs) { + if (Lhs.second == Rhs.second) + { + // Same key, sort by raw hash + return Lhs.first < Rhs.first; + } + // Sort by key + return Lhs.second < Rhs.second; + }); + + std::vector<size_t> ChunkedFilesOrder; + ChunkedFilesOrder.reserve(ChunkedFiles.size()); + for (size_t Index = 0; Index < ChunkedFiles.size(); Index++) + { + ChunkedFilesOrder.push_back(Index); + } + std::sort(ChunkedFilesOrder.begin(), ChunkedFilesOrder.end(), [&ChunkedFiles](size_t Lhs, size_t Rhs) { + return ChunkedFiles[Lhs].Chunked.Info.RawHash < ChunkedFiles[Rhs].Chunked.Info.RawHash; + }); - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return {}; - } + if (remotestore_impl::IsCancelled(OptionalContext)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + return {}; + } + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Assembling {} attachments and {} chunked parts from {} ops into blocks and loose attachments", + SortedUploadAttachments.size(), + ChunkedHashes.size(), + TotalOpCount)); - size_t ChunkAssembleCount = SortedUploadAttachments.size() + ChunkedHashes.size(); - size_t ChunksAssembled = 0; + if (remotestore_impl::IsCancelled(OptionalContext)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Assembling {} attachments from {} ops into blocks", ChunkAssembleCount, TotalOpCount)); + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + return {}; + } - Latch BlockCreateLatch(1); - uint32_t ComposedBlocks = 0; + size_t ChunkAssembleCount = SortedUploadAttachments.size() + ChunkedHashes.size(); + size_t ChunksAssembled = 0; + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Assembling {} attachments from {} ops into blocks", ChunkAssembleCount, TotalOpCount)); - uint64_t CreateBlocksStartMS = Timer.GetElapsedTimeMs(); - try - { - Stopwatch AssembleBlocksProgressTimer; - remotestore_impl::BlockComposer Composer(remotestore_impl::BlockComposer::Configuration{ - .MaxBlockSize = MaxBlockSize, - .MaxChunksPerBlock = MaxChunksPerBlock, - .MaxChunkEmbedSize = MaxChunkEmbedSize, - .IsCancelledFunc = [OptionalContext]() { return remotestore_impl::IsCancelled(OptionalContext); }}); - - auto OnNewBlock = [&WorkerPool, - BuildBlocks, - &AssembleBlocksProgressTimer, - &BlockCreateLatch, - &BlocksLock, - &Blocks, - &AsyncOnBlock, - &OnBlockChunks, - ChunkAssembleCount, - &ChunksAssembled, - &ComposedBlocks, - OptionalContext, - &RemoteResult](std::vector<IoHash>&& ChunkRawHashes, - const std::function<FetchChunkFunc(const IoHash& AttachmentHash)>& FetchAttachmentResolver) { - size_t ChunkCount = ChunkRawHashes.size(); - std::vector<std::pair<IoHash, FetchChunkFunc>> ChunksInBlock; - ChunksInBlock.reserve(ChunkCount); - - for (const IoHash& AttachmentHash : ChunkRawHashes) - { - ChunksInBlock.emplace_back(std::make_pair(AttachmentHash, FetchAttachmentResolver(AttachmentHash))); - } + Latch BlockCreateLatch(1); + uint32_t ComposedBlocks = 0; - size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks); - if (BuildBlocks) - { - remotestore_impl::AsyncCreateBlock(WorkerPool, - BlockCreateLatch, - std::move(ChunksInBlock), - BlocksLock, - Blocks, - BlockIndex, - AsyncOnBlock, - RemoteResult); - } - else - { - ZEN_INFO("Bulk group {} attachments", ChunkCount); + uint64_t CreateBlocksStartMS = Timer.GetElapsedTimeMs(); + try + { + Stopwatch AssembleBlocksProgressTimer; + remotestore_impl::BlockComposer Composer(remotestore_impl::BlockComposer::Configuration{ + .MaxBlockSize = MaxBlockSize, + .MaxChunksPerBlock = MaxChunksPerBlock, + .MaxChunkEmbedSize = MaxChunkEmbedSize, + .IsCancelledFunc = [OptionalContext]() { return remotestore_impl::IsCancelled(OptionalContext); }}); + + auto OnNewBlock = [&WorkerPool, + BuildBlocks, + &AssembleBlocksProgressTimer, + &BlockCreateLatch, + &BlocksLock, + &Blocks, + &AsyncOnBlock, + &OnBlockChunks, + ChunkAssembleCount, + &ChunksAssembled, + &ComposedBlocks, + OptionalContext, + &RemoteResult](std::vector<IoHash>&& ChunkRawHashes, + const std::function<FetchChunkFunc(const IoHash& AttachmentHash)>& FetchAttachmentResolver) { + size_t ChunkCount = ChunkRawHashes.size(); + std::vector<std::pair<IoHash, FetchChunkFunc>> ChunksInBlock; + ChunksInBlock.reserve(ChunkCount); + + for (const IoHash& AttachmentHash : ChunkRawHashes) + { + ChunksInBlock.emplace_back(std::make_pair(AttachmentHash, FetchAttachmentResolver(AttachmentHash))); + } - // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index - RwLock::SharedLockScope _(BlocksLock); - Blocks[BlockIndex].ChunkRawHashes = std::move(ChunkRawHashes); - OnBlockChunks(std::move(ChunksInBlock)); - } + size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks); + if (BuildBlocks) + { + remotestore_impl::AsyncCreateBlock(WorkerPool, + BlockCreateLatch, + std::move(ChunksInBlock), + BlocksLock, + Blocks, + BlockIndex, + AsyncOnBlock, + RemoteResult); + } + else + { + ZEN_INFO("Bulk group {} attachments", ChunkCount); - ChunksAssembled += ChunkCount; - ComposedBlocks++; + // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index + RwLock::SharedLockScope _(BlocksLock); + Blocks[BlockIndex].ChunkRawHashes = std::move(ChunkRawHashes); + OnBlockChunks(std::move(ChunksInBlock)); + } - if (ChunksAssembled % 1000 == 0) - { - remotestore_impl::ReportProgress( - OptionalContext, - "Assembling blocks"sv, - fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), - ChunkAssembleCount, - ChunkAssembleCount - ChunksAssembled, - AssembleBlocksProgressTimer.GetElapsedTimeMs()); - } - }; + ChunksAssembled += ChunkCount; + ComposedBlocks++; + + if (ChunksAssembled % 1000 == 0) + { + remotestore_impl::ReportProgress( + OptionalContext, + "Assembling blocks"sv, + fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), + ChunkAssembleCount, + ChunkAssembleCount - ChunksAssembled, + AssembleBlocksProgressTimer.GetElapsedTimeMs()); + } + }; + { + std::vector<IoHash> AttachmentHashes; + AttachmentHashes.reserve(SortedUploadAttachments.size()); + std::vector<uint64_t> AttachmentSizes; + AttachmentSizes.reserve(SortedUploadAttachments.size()); + std::vector<Oid> AttachmentKeys; + AttachmentKeys.reserve(SortedUploadAttachments.size()); + + for (const std::pair<IoHash, Oid>& Attachment : SortedUploadAttachments) { - std::vector<IoHash> AttachmentHashes; - AttachmentHashes.reserve(SortedUploadAttachments.size()); - std::vector<uint64_t> AttachmentSizes; - AttachmentSizes.reserve(SortedUploadAttachments.size()); - std::vector<Oid> AttachmentKeys; - AttachmentKeys.reserve(SortedUploadAttachments.size()); - - for (const std::pair<IoHash, Oid>& Attachment : SortedUploadAttachments) + AttachmentHashes.push_back(Attachment.first); + if (auto It = UploadAttachments.find(Attachment.first); It != UploadAttachments.end()) { - AttachmentHashes.push_back(Attachment.first); - if (auto It = UploadAttachments.find(Attachment.first); It != UploadAttachments.end()) - { - AttachmentSizes.push_back(It->second.Size); - } - else - { - RemoteResult.SetError( - gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Attachment to upload state inconsistent, could not find attachment {}", Attachment.first), - ""); - return {}; - } - AttachmentKeys.push_back(Attachment.second); + AttachmentSizes.push_back(It->second.Size); } + else + { + RemoteResult.SetError( + gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Attachment to upload state inconsistent, could not find attachment {}", Attachment.first), + ""); + return {}; + } + AttachmentKeys.push_back(Attachment.second); + } - auto FetchWholeAttachmentResolver = [&LooseUploadAttachments, &ChunkStore](const IoHash& AttachmentHash) -> FetchChunkFunc { - if (auto It = LooseUploadAttachments.find(AttachmentHash); It != LooseUploadAttachments.end()) - { - uint64_t RawSize = It->second.first; - IoBuffer Payload = It->second.second; - + auto FetchWholeAttachmentResolver = [&LooseUploadAttachments, &ChunkStore](const IoHash& AttachmentHash) -> FetchChunkFunc { + if (auto It = LooseUploadAttachments.find(AttachmentHash); It != LooseUploadAttachments.end()) + { + uint64_t RawSize = It->second.first; + IoBuffer Payload = It->second.second; + + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), VerifyRawHash, VerifyRawSize); + ZEN_ASSERT_SLOW(Compressed.Decompress()); + ZEN_ASSERT_SLOW(VerifyRawHash == AttachmentHash); + ZEN_ASSERT_SLOW(VerifyRawSize == RawSize); + + LooseUploadAttachments.erase(It); + return [RawSize = RawSize, + Payload = SharedBuffer(Payload)](const IoHash& ChunkHash) -> std::pair<uint64_t, CompressedBuffer> { + ZEN_UNUSED(ChunkHash); + // CompressedBuffer Compressed = + // CompressedBuffer::FromCompressedNoValidate(Payload.AsIoBuffer()); IoHash VerifyRawHash; uint64_t VerifyRawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), VerifyRawHash, VerifyRawSize); + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Payload, VerifyRawHash, VerifyRawSize); ZEN_ASSERT_SLOW(Compressed.Decompress()); - ZEN_ASSERT_SLOW(VerifyRawHash == AttachmentHash); + ZEN_ASSERT_SLOW(VerifyRawHash == ChunkHash); ZEN_ASSERT_SLOW(VerifyRawSize == RawSize); + return {RawSize, Compressed}; + }; + } + else + { + ZEN_ASSERT_SLOW(ChunkStore.ContainsChunk(AttachmentHash)); + return [&ChunkStore](const IoHash& RawHash) -> std::pair<uint64_t, CompressedBuffer> { + IoBuffer Chunk = ChunkStore.FindChunkByCid(RawHash); + if (!Chunk) + { + throw std::runtime_error(fmt::format("Failed to find chunk {} in cid store", RawHash)); + } + IoHash ValidateRawHash; + uint64_t RawSize = 0; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), ValidateRawHash, RawSize); + if (!Compressed) + { + throw std::runtime_error(fmt::format("Chunk {} in cid store is malformed (not a compressed buffer)", RawHash)); + } + if (RawHash != ValidateRawHash) + { + throw std::runtime_error(fmt::format("Chunk {} in cid store is malformed (mismatching raw hash)", RawHash)); + } + return {RawSize, Compressed}; + }; + } + }; - LooseUploadAttachments.erase(It); - return [RawSize = RawSize, - Payload = SharedBuffer(Payload)](const IoHash& ChunkHash) -> std::pair<uint64_t, CompressedBuffer> { - ZEN_UNUSED(ChunkHash); - // CompressedBuffer Compressed = - // CompressedBuffer::FromCompressedNoValidate(Payload.AsIoBuffer()); - IoHash VerifyRawHash; - uint64_t VerifyRawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Payload, VerifyRawHash, VerifyRawSize); - ZEN_ASSERT_SLOW(Compressed.Decompress()); - ZEN_ASSERT_SLOW(VerifyRawHash == ChunkHash); - ZEN_ASSERT_SLOW(VerifyRawSize == RawSize); - return {RawSize, Compressed}; - }; - } - else - { - ZEN_ASSERT_SLOW(ChunkStore.ContainsChunk(AttachmentHash)); - return [&ChunkStore](const IoHash& RawHash) -> std::pair<uint64_t, CompressedBuffer> { - IoBuffer Chunk = ChunkStore.FindChunkByCid(RawHash); - if (!Chunk) - { - throw std::runtime_error(fmt::format("Failed to find chunk {} in cid store", RawHash)); - } - IoHash ValidateRawHash; - uint64_t RawSize = 0; - CompressedBuffer Compressed = - CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), ValidateRawHash, RawSize); - if (!Compressed) - { - throw std::runtime_error( - fmt::format("Chunk {} in cid store is malformed (not a compressed buffer)", RawHash)); - } - if (RawHash != ValidateRawHash) - { - throw std::runtime_error(fmt::format("Chunk {} in cid store is malformed (mismatching raw hash)", RawHash)); - } - return {RawSize, Compressed}; - }; - } - }; - - Composer.Compose(AttachmentHashes, - AttachmentSizes, - AttachmentKeys, - [&OnNewBlock, &FetchWholeAttachmentResolver](std::vector<IoHash>&& ChunkRawHashes) { - OnNewBlock(std::move(ChunkRawHashes), FetchWholeAttachmentResolver); - }); - } + Composer.Compose(AttachmentHashes, + AttachmentSizes, + AttachmentKeys, + [&OnNewBlock, &FetchWholeAttachmentResolver](std::vector<IoHash>&& ChunkRawHashes) { + OnNewBlock(std::move(ChunkRawHashes), FetchWholeAttachmentResolver); + }); + } - { - std::vector<IoHash> AttachmentHashes; - AttachmentHashes.reserve(ChunkedChunkCount); - std::vector<uint64_t> AttachmentSizes; - AttachmentSizes.reserve(ChunkedChunkCount); - std::vector<Oid> AttachmentKeys; - AttachmentKeys.reserve(ChunkedChunkCount); + { + std::vector<IoHash> AttachmentHashes; + AttachmentHashes.reserve(ChunkedChunkCount); + std::vector<uint64_t> AttachmentSizes; + AttachmentSizes.reserve(ChunkedChunkCount); + std::vector<Oid> AttachmentKeys; + AttachmentKeys.reserve(ChunkedChunkCount); - tsl::robin_map<IoHash, std::pair<size_t, size_t>, IoHash::Hasher> ChunkHashToChunkFileIndexAndChunkIndex; + tsl::robin_map<IoHash, std::pair<size_t, size_t>, IoHash::Hasher> ChunkHashToChunkFileIndexAndChunkIndex; - for (size_t ChunkedFileIndex : ChunkedFilesOrder) + for (size_t ChunkedFileIndex : ChunkedFilesOrder) + { + const ChunkedFile& ChunkedFile = ChunkedFiles[ChunkedFileIndex]; + const ChunkedInfoWithSource& Chunked = ChunkedFile.Chunked; + size_t ChunkCount = Chunked.Info.ChunkHashes.size(); + Oid ChunkedFileOid = Oid::NewOid(); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ChunkIndex++) { - const ChunkedFile& ChunkedFile = ChunkedFiles[ChunkedFileIndex]; - const ChunkedInfoWithSource& Chunked = ChunkedFile.Chunked; - size_t ChunkCount = Chunked.Info.ChunkHashes.size(); - Oid ChunkedFileOid = Oid::NewOid(); - for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ChunkIndex++) + const IoHash& ChunkHash = Chunked.Info.ChunkHashes[ChunkIndex]; + uint64_t ChunkSize = Chunked.ChunkSources[ChunkIndex].Size; { - const IoHash& ChunkHash = Chunked.Info.ChunkHashes[ChunkIndex]; - uint64_t ChunkSize = Chunked.ChunkSources[ChunkIndex].Size; + if (ChunkHashToChunkFileIndexAndChunkIndex + .insert(std::make_pair(ChunkHash, std::make_pair(ChunkedFileIndex, ChunkIndex))) + .second) { - if (ChunkHashToChunkFileIndexAndChunkIndex - .insert(std::make_pair(ChunkHash, std::make_pair(ChunkedFileIndex, ChunkIndex))) - .second) + if (ChunkSize > MaxChunkEmbedSize) { - if (ChunkSize > MaxChunkEmbedSize) - { - OnLargeAttachment(ChunkHash, - [SourceBuffer = ChunkedFile.Source, - ChunkSource = Chunked.ChunkSources[ChunkIndex], - ChunkHash](const IoHash& RawHash) -> CompositeBuffer { - ZEN_ASSERT(RawHash == ChunkHash); - CompressedBuffer Compressed = CompressedBuffer::Compress( - SharedBuffer(IoBuffer(SourceBuffer, ChunkSource.Offset, ChunkSource.Size)), - OodleCompressor::Mermaid, - OodleCompressionLevel::None); - return Compressed.GetCompressed(); - }); - - ResolveLock.WithExclusiveLock([ChunkHash, &LargeChunkHashes]() { LargeChunkHashes.insert(ChunkHash); }); - } - else - { - AttachmentHashes.push_back(ChunkHash); - AttachmentSizes.push_back(ChunkSize); - AttachmentKeys.push_back(ChunkedFileOid); - } + OnLargeAttachment(ChunkHash, + [SourceBuffer = ChunkedFile.Source, + ChunkSource = Chunked.ChunkSources[ChunkIndex], + ChunkHash](const IoHash& RawHash) -> CompositeBuffer { + ZEN_ASSERT(RawHash == ChunkHash); + CompressedBuffer Compressed = CompressedBuffer::Compress( + SharedBuffer(IoBuffer(SourceBuffer, ChunkSource.Offset, ChunkSource.Size)), + OodleCompressor::Mermaid, + OodleCompressionLevel::None); + return Compressed.GetCompressed(); + }); + + ResolveLock.WithExclusiveLock([ChunkHash, &LargeChunkHashes]() { LargeChunkHashes.insert(ChunkHash); }); + } + else + { + AttachmentHashes.push_back(ChunkHash); + AttachmentSizes.push_back(ChunkSize); + AttachmentKeys.push_back(ChunkedFileOid); } } } } - - auto ChunkedFileAttachmentResolver = [&ChunkHashToChunkFileIndexAndChunkIndex, - &ChunkedFiles](const IoHash& AttachmentHash) -> FetchChunkFunc { - if (auto It = ChunkHashToChunkFileIndexAndChunkIndex.find(AttachmentHash); - It != ChunkHashToChunkFileIndexAndChunkIndex.end()) - { - const std::pair<size_t, size_t>& ChunkFileIndexAndChunkIndex = It->second; - size_t ChunkedFileIndex = ChunkFileIndexAndChunkIndex.first; - size_t ChunkIndex = ChunkFileIndexAndChunkIndex.second; - const ChunkedFile& ChunkedFile = ChunkedFiles[ChunkedFileIndex]; - - const ChunkSource& Source = ChunkedFile.Chunked.ChunkSources[ChunkIndex]; - ZEN_ASSERT(Source.Offset + Source.Size <= ChunkedFile.Source.GetSize()); - - return [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size]( - const IoHash&) -> std::pair<uint64_t, CompressedBuffer> { - return {Size, - CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)), - OodleCompressor::Mermaid, - OodleCompressionLevel::None)}; - }; - } - else - { - ZEN_ASSERT(false); - } - }; - - Composer.Compose(AttachmentHashes, - AttachmentSizes, - AttachmentKeys, - [&OnNewBlock, &ChunkedFileAttachmentResolver](std::vector<IoHash>&& ChunkRawHashes) { - OnNewBlock(std::move(ChunkRawHashes), ChunkedFileAttachmentResolver); - }); } - if (ChunkAssembleCount > 0) - { - remotestore_impl::ReportProgress( - OptionalContext, - "Assembling blocks"sv, - fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), - ChunkAssembleCount, - 0, - AssembleBlocksProgressTimer.GetElapsedTimeMs()); - } - - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and {} loose attachments in {}", - ChunkAssembleCount, - TotalOpCount, - ComposedBlocks, - LargeChunkHashes.size(), - NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs())))); - - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - BlockCreateLatch.CountDown(); - while (!BlockCreateLatch.Wait(1000)) + auto ChunkedFileAttachmentResolver = [&ChunkHashToChunkFileIndexAndChunkIndex, + &ChunkedFiles](const IoHash& AttachmentHash) -> FetchChunkFunc { + if (auto It = ChunkHashToChunkFileIndexAndChunkIndex.find(AttachmentHash); + It != ChunkHashToChunkFileIndexAndChunkIndex.end()) { - ptrdiff_t Remaining = BlockCreateLatch.Remaining(); - remotestore_impl::ReportProgress(OptionalContext, - "Assembling blocks"sv, - fmt::format("Aborting, {} blocks remaining...", Remaining), - ComposedBlocks, - Remaining, - AssembleBlocksProgressTimer.GetElapsedTimeMs()); + const std::pair<size_t, size_t>& ChunkFileIndexAndChunkIndex = It->second; + size_t ChunkedFileIndex = ChunkFileIndexAndChunkIndex.first; + size_t ChunkIndex = ChunkFileIndexAndChunkIndex.second; + const ChunkedFile& ChunkedFile = ChunkedFiles[ChunkedFileIndex]; + + const ChunkSource& Source = ChunkedFile.Chunked.ChunkSources[ChunkIndex]; + ZEN_ASSERT(Source.Offset + Source.Size <= ChunkedFile.Source.GetSize()); + + return [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size]( + const IoHash&) -> std::pair<uint64_t, CompressedBuffer> { + return {Size, + CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)), + OodleCompressor::Mermaid, + OodleCompressionLevel::None)}; + }; } - if (ComposedBlocks > 0) + else { - remotestore_impl::ReportProgress(OptionalContext, - "Assembling blocks"sv, - fmt::format("Aborting, {} blocks remaining...", 0), - ComposedBlocks, - 0, - AssembleBlocksProgressTimer.GetElapsedTimeMs()); + ZEN_ASSERT(false); } - return {}; - } + }; + + Composer.Compose(AttachmentHashes, + AttachmentSizes, + AttachmentKeys, + [&OnNewBlock, &ChunkedFileAttachmentResolver](std::vector<IoHash>&& ChunkRawHashes) { + OnNewBlock(std::move(ChunkRawHashes), ChunkedFileAttachmentResolver); + }); } - catch (const std::exception& Ex) + + if (ChunkAssembleCount > 0) { + remotestore_impl::ReportProgress(OptionalContext, + "Assembling blocks"sv, + fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), + ChunkAssembleCount, + 0, + AssembleBlocksProgressTimer.GetElapsedTimeMs()); + } + + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and {} loose attachments in {}", + ChunkAssembleCount, + TotalOpCount, + ComposedBlocks, + LargeChunkHashes.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs())))); + + if (remotestore_impl::IsCancelled(OptionalContext)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); BlockCreateLatch.CountDown(); while (!BlockCreateLatch.Wait(1000)) { + ptrdiff_t Remaining = BlockCreateLatch.Remaining(); + remotestore_impl::ReportProgress(OptionalContext, + "Assembling blocks"sv, + fmt::format("Aborting, {} blocks remaining...", Remaining), + ComposedBlocks, + Remaining, + AssembleBlocksProgressTimer.GetElapsedTimeMs()); + } + if (ComposedBlocks > 0) + { + remotestore_impl::ReportProgress(OptionalContext, + "Assembling blocks"sv, + fmt::format("Aborting, {} blocks remaining...", 0), + ComposedBlocks, + 0, + AssembleBlocksProgressTimer.GetElapsedTimeMs()); } - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), "Block creation failed", Ex.what()); - throw; + return {}; } - - Stopwatch BlockCreateProgressTimer; + } + catch (const std::exception& Ex) + { BlockCreateLatch.CountDown(); while (!BlockCreateLatch.Wait(1000)) { - ptrdiff_t Remaining = BlockCreateLatch.Remaining(); - if (remotestore_impl::IsCancelled(OptionalContext)) + } + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), "Block creation failed", Ex.what()); + throw; + } + + Stopwatch BlockCreateProgressTimer; + BlockCreateLatch.CountDown(); + while (!BlockCreateLatch.Wait(1000)) + { + ptrdiff_t Remaining = BlockCreateLatch.Remaining(); + if (remotestore_impl::IsCancelled(OptionalContext)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + while (!BlockCreateLatch.Wait(1000)) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - while (!BlockCreateLatch.Wait(1000)) - { - Remaining = BlockCreateLatch.Remaining(); - remotestore_impl::ReportProgress(OptionalContext, - "Creating blocks"sv, - fmt::format("Aborting, {} blocks remaining...", Remaining), - ComposedBlocks, - Remaining, - BlockCreateProgressTimer.GetElapsedTimeMs()); - } + Remaining = BlockCreateLatch.Remaining(); remotestore_impl::ReportProgress(OptionalContext, "Creating blocks"sv, - "Aborted"sv, + fmt::format("Aborting, {} blocks remaining...", Remaining), ComposedBlocks, - 0, + Remaining, BlockCreateProgressTimer.GetElapsedTimeMs()); - return {}; } remotestore_impl::ReportProgress(OptionalContext, "Creating blocks"sv, - fmt::format("{} remaining...", Remaining), - ComposedBlocks, - Remaining, - BlockCreateProgressTimer.GetElapsedTimeMs()); - } - - if (ComposedBlocks > 0) - { - uint64_t NowMS = Timer.GetElapsedTimeMs(); - remotestore_impl::ReportProgress(OptionalContext, - "Creating blocks"sv, - ""sv, + "Aborted"sv, ComposedBlocks, 0, BlockCreateProgressTimer.GetElapsedTimeMs()); - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Created {} blocks in {}", ComposedBlocks, NiceTimeSpanMs(NowMS - CreateBlocksStartMS))); + return {}; } + remotestore_impl::ReportProgress(OptionalContext, + "Creating blocks"sv, + fmt::format("{} remaining...", Remaining), + ComposedBlocks, + Remaining, + BlockCreateProgressTimer.GetElapsedTimeMs()); + } - if (!RemoteResult.IsError()) + if (ComposedBlocks > 0) + { + uint64_t NowMS = Timer.GetElapsedTimeMs(); + remotestore_impl::ReportProgress(OptionalContext, + "Creating blocks"sv, + ""sv, + ComposedBlocks, + 0, + BlockCreateProgressTimer.GetElapsedTimeMs()); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Created {} blocks in {}", ComposedBlocks, NiceTimeSpanMs(NowMS - CreateBlocksStartMS))); + } + + if (!RemoteResult.IsError()) + { + CbObjectWriter OplogContinerWriter; + RwLock::SharedLockScope _(BlocksLock); + OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer()); + OplogContinerWriter.BeginArray("blocks"sv); { - CbObjectWriter OplogContinerWriter; - RwLock::SharedLockScope _(BlocksLock); - OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer()); - OplogContinerWriter.BeginArray("blocks"sv); + for (const ChunkBlockDescription& B : Blocks) { - for (const ChunkBlockDescription& B : Blocks) + ZEN_ASSERT(!B.ChunkRawHashes.empty()); + if (BuildBlocks) { - ZEN_ASSERT(!B.ChunkRawHashes.empty()); - if (BuildBlocks) - { - ZEN_ASSERT(B.BlockHash != IoHash::Zero); + ZEN_ASSERT(B.BlockHash != IoHash::Zero); - OplogContinerWriter.BeginObject(); - { - OplogContinerWriter.AddBinaryAttachment("rawhash"sv, B.BlockHash); - OplogContinerWriter.BeginArray("chunks"sv); - { - for (const IoHash& RawHash : B.ChunkRawHashes) - { - OplogContinerWriter.AddHash(RawHash); - } - } - OplogContinerWriter.EndArray(); // "chunks" - } - OplogContinerWriter.EndObject(); - continue; - } - - ZEN_ASSERT(B.BlockHash == IoHash::Zero); OplogContinerWriter.BeginObject(); { + OplogContinerWriter.AddBinaryAttachment("rawhash"sv, B.BlockHash); OplogContinerWriter.BeginArray("chunks"sv); { for (const IoHash& RawHash : B.ChunkRawHashes) { - OplogContinerWriter.AddBinaryAttachment(RawHash); + OplogContinerWriter.AddHash(RawHash); } } - OplogContinerWriter.EndArray(); + OplogContinerWriter.EndArray(); // "chunks" } OplogContinerWriter.EndObject(); + continue; } + + ZEN_ASSERT(B.BlockHash == IoHash::Zero); + OplogContinerWriter.BeginObject(); + { + OplogContinerWriter.BeginArray("chunks"sv); + { + for (const IoHash& RawHash : B.ChunkRawHashes) + { + OplogContinerWriter.AddBinaryAttachment(RawHash); + } + } + OplogContinerWriter.EndArray(); + } + OplogContinerWriter.EndObject(); } - OplogContinerWriter.EndArray(); // "blocks"sv - OplogContinerWriter.BeginArray("chunkedfiles"sv); + } + OplogContinerWriter.EndArray(); // "blocks"sv + OplogContinerWriter.BeginArray("chunkedfiles"sv); + { + for (const ChunkedFile& F : ChunkedFiles) { - for (const ChunkedFile& F : ChunkedFiles) + OplogContinerWriter.BeginObject(); { - OplogContinerWriter.BeginObject(); + OplogContinerWriter.AddHash("rawhash"sv, F.Chunked.Info.RawHash); + OplogContinerWriter.AddInteger("rawsize"sv, F.Chunked.Info.RawSize); + OplogContinerWriter.BeginArray("chunks"sv); { - OplogContinerWriter.AddHash("rawhash"sv, F.Chunked.Info.RawHash); - OplogContinerWriter.AddInteger("rawsize"sv, F.Chunked.Info.RawSize); - OplogContinerWriter.BeginArray("chunks"sv); + for (const IoHash& RawHash : F.Chunked.Info.ChunkHashes) { - for (const IoHash& RawHash : F.Chunked.Info.ChunkHashes) - { - OplogContinerWriter.AddHash(RawHash); - } + OplogContinerWriter.AddHash(RawHash); } - OplogContinerWriter.EndArray(); // "chunks" - OplogContinerWriter.BeginArray("sequence"sv); + } + OplogContinerWriter.EndArray(); // "chunks" + OplogContinerWriter.BeginArray("sequence"sv); + { + for (uint32_t ChunkIndex : F.Chunked.Info.ChunkSequence) { - for (uint32_t ChunkIndex : F.Chunked.Info.ChunkSequence) - { - OplogContinerWriter.AddInteger(ChunkIndex); - } + OplogContinerWriter.AddInteger(ChunkIndex); } - OplogContinerWriter.EndArray(); // "sequence" } - OplogContinerWriter.EndObject(); + OplogContinerWriter.EndArray(); // "sequence" } + OplogContinerWriter.EndObject(); } - OplogContinerWriter.EndArray(); // "chunkedfiles"sv + } + OplogContinerWriter.EndArray(); // "chunkedfiles"sv - OplogContinerWriter.BeginArray("chunks"sv); + OplogContinerWriter.BeginArray("chunks"sv); + { + for (const IoHash& AttachmentHash : LargeChunkHashes) { - for (const IoHash& AttachmentHash : LargeChunkHashes) - { - OplogContinerWriter.AddBinaryAttachment(AttachmentHash); - } + OplogContinerWriter.AddBinaryAttachment(AttachmentHash); } - OplogContinerWriter.EndArray(); // "chunks" - - OplogContainerObject = OplogContinerWriter.Save(); } + OplogContinerWriter.EndArray(); // "chunks" + + OplogContainerObject = OplogContinerWriter.Save(); } + return OplogContainerObject; } @@ -3370,8 +3405,6 @@ BuildContainer(CidStore& ChunkStore, const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles) { - // WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background); - remotestore_impl::AsyncRemoteResult RemoteResult; CbObject ContainerObject = BuildContainer(ChunkStore, Project, |