diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-12 12:48:33 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2026-03-14 14:19:07 +0100 |
| commit | be6187b1299c338dfbea3409e5c54cf72be384e7 (patch) | |
| tree | beeb5fce4ecf3c039dbcddb88a841a9b34d0c762 /src | |
| parent | add buildid updates to oplog and builds test scripts (#838) (diff) | |
| download | zen-be6187b1299c338dfbea3409e5c54cf72be384e7.tar.xz zen-be6187b1299c338dfbea3409e5c54cf72be384e7.zip | |
wip
Diffstat (limited to 'src')
3 files changed, 925 insertions, 442 deletions
diff --git a/src/zenremotestore/chunking/chunkblock.cpp b/src/zenremotestore/chunking/chunkblock.cpp index cca32c17d..afc1cb01c 100644 --- a/src/zenremotestore/chunking/chunkblock.cpp +++ b/src/zenremotestore/chunking/chunkblock.cpp @@ -352,9 +352,12 @@ GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(ChunkCount), BufferEndPtr); for (const auto& It : FetchChunks) { - std::pair<uint64_t, CompressedBuffer> Chunk = It.second(It.first); - uint64_t ChunkSize = 0; - std::span<const SharedBuffer> Segments = Chunk.second.GetCompressed().GetSegments(); + std::pair<uint64_t, CompressedBuffer> Chunk = It.second(It.first); + ZEN_ASSERT_SLOW(Chunk.second.DecodeRawSize() == Chunk.first); + ZEN_ASSERT_SLOW(Chunk.second.DecodeRawHash() == It.first); + ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk.second.Decompress().AsIoBuffer()) == It.first); + uint64_t ChunkSize = 0; + std::span<const SharedBuffer> Segments = Chunk.second.GetCompressed().GetSegments(); for (const SharedBuffer& Segment : Segments) { ZEN_ASSERT(Segment.IsOwned()); @@ -374,6 +377,17 @@ GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, CompressedBuffer CompressedBlock = CompressedBuffer::Compress(CompositeBuffer(std::move(ChunkSegments)), OodleCompressor::Mermaid, OodleCompressionLevel::None); OutBlock.BlockHash = CompressedBlock.DecodeRawHash(); + + uint64_t VerifyHeaderSize = 0; + ZEN_ASSERT_SLOW(IterateChunkBlock( + CompressedBlock.Decompress(), + [](CompressedBuffer&& Chunk, const IoHash& AttachmentHash) { + ZEN_ASSERT(Chunk.DecodeRawHash() == AttachmentHash); + SharedBuffer Decompressed = Chunk.Decompress(); + ZEN_ASSERT(AttachmentHash == IoHash::HashBuffer(Decompressed.AsIoBuffer())); + }, + VerifyHeaderSize)); + return CompressedBlock; } diff --git a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h index 084d975a2..72a6f2969 100644 --- a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h +++ b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h @@ -150,7 +150,7 @@ struct RemoteStoreOptions size_t ChunkFileSizeLimit = DefaultChunkFileSizeLimit; }; -typedef std::function<IoBuffer(const IoHash& AttachmentHash)> TGetAttachmentBufferFunc; +typedef std::function<CompositeBuffer(const IoHash& AttachmentHash)> TGetAttachmentBufferFunc; RemoteProjectStore::LoadContainerResult BuildContainer( CidStore& ChunkStore, diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp index c44b06305..09ab0662c 100644 --- a/src/zenremotestore/projectstore/remoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp @@ -277,6 +277,70 @@ namespace remotestore_impl { JobContext* m_OptionalContext; }; + IoBuffer CompressToTempFile(const IoHash& RawHash, + const IoBuffer& RawData, + const std::filesystem::path& AttachmentPath, + OodleCompressor Compressor, + OodleCompressionLevel ComressionLevel) + { + ZEN_ASSERT(!IsFile(AttachmentPath)); + BasicFile CompressedFile; + std::error_code Ec; + CompressedFile.Open(AttachmentPath, BasicFile::Mode::kTruncateDelete, Ec); + if (Ec) + { + throw std::system_error(Ec, fmt::format("Failed to create temp file for blob {} at '{}'", RawHash, AttachmentPath)); + } + + bool CouldCompress = CompressedBuffer::CompressToStream( + CompositeBuffer(SharedBuffer(RawData)), + [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { + ZEN_UNUSED(SourceOffset); + CompressedFile.Write(RangeBuffer, Offset); + ZEN_UNUSED(SourceOffset, SourceSize); + // StreamRawBytes += SourceSize; + // StreamCompressedBytes += RangeBuffer.GetSize(); + }, + Compressor, + ComressionLevel); + if (CouldCompress) + { +#if ZEN_BUILD_DEBUG + uint64_t CompressedSize = CompressedFile.FileSize(); + // void* FileHandle = CompressedFile.Handle(); + IoBuffer TempPayload = IoBuffer(IoBuffer::BorrowedFile, CompressedFile.Handle(), 0, CompressedSize); + // ZEN_ASSERT(TempPayload); + // TempPayload.SetDeleteOnClose(true); + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(TempPayload), VerifyRawHash, VerifyRawSize); + ZEN_ASSERT(Compressed); + ZEN_ASSERT(RawHash == VerifyRawHash); + ZEN_ASSERT(RawData.GetSize() == VerifyRawSize); + // CompressedFile.Attach(FileHandle); +#endif // ZEN_BUILD_DEBUG + } + else + { + // Compressed is larger than source data... + CompressedBuffer CompressedBlob = + CompressedBuffer::Compress(SharedBuffer(std::move(RawData)), OodleCompressor::Mermaid, OodleCompressionLevel::None); + if (!CompressedBlob) + { + throw std::runtime_error("Failed to compress blob {}"); + } + CompressedFile.SetFileSize(0); + CompressedFile.Write(CompressedBlob.GetCompressed(), 0); + } + IoBuffer TempAttachmentBuffer = IoBufferBuilder::MakeFromFile(AttachmentPath); + ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress()); + CompressedFile.Close(); + ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress()); + TempAttachmentBuffer.SetDeleteOnClose(true); + ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress()); + return TempAttachmentBuffer; + } + void DownloadAndSaveBlockChunks(LoadOplogContext& Context, Latch& AttachmentsDownloadLatch, Latch& AttachmentsWriteLatch, @@ -1141,6 +1205,7 @@ namespace remotestore_impl { } return; } + ZEN_ASSERT(AttachmentResult.Bytes); BlobBuffer = std::move(AttachmentResult.Bytes); ZEN_DEBUG("Loaded large attachment '{}' in {} ({})", RawHash, @@ -1161,12 +1226,15 @@ namespace remotestore_impl { uint64_t AttachmentSize = BlobBuffer.GetSize(); Info.AttachmentsDownloaded.fetch_add(1); Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize); + ZEN_ASSERT(BlobBuffer); AttachmentsWriteLatch.AddCount(1); Context.WorkerPool.ScheduleWork( [&Context, &AttachmentsWriteLatch, &RemoteResult, &Info, RawHash, AttachmentSize, Bytes = std::move(BlobBuffer)]() { ZEN_TRACE_CPU("WriteAttachment"); + ZEN_ASSERT(Bytes); + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -1268,7 +1336,7 @@ namespace remotestore_impl { struct CreatedBlock { - IoBuffer Payload; + CompositeBuffer Payload; ChunkBlockDescription Block; }; @@ -1402,7 +1470,7 @@ namespace remotestore_impl { } try { - IoBuffer Payload; + CompositeBuffer Payload; ChunkBlockDescription Block; if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) { @@ -1415,7 +1483,7 @@ namespace remotestore_impl { } else { - Payload = ChunkStore.FindChunkByCid(RawHash); + Payload = CompositeBuffer(SharedBuffer(ChunkStore.FindChunkByCid(RawHash))); } if (!Payload) { @@ -1431,7 +1499,7 @@ namespace remotestore_impl { const bool IsBlock = Block.BlockHash == RawHash; size_t PayloadSize = Payload.GetSize(); RemoteProjectStore::SaveAttachmentResult Result = - RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash, std::move(Block)); + RemoteStore.SaveAttachment(std::move(Payload), RawHash, std::move(Block)); if (Result.ErrorCode) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); @@ -1709,9 +1777,7 @@ BuildContainer(CidStore& ChunkStore, std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher> UploadAttachments; - RwLock BlocksLock; - std::vector<ChunkBlockDescription> Blocks; - CompressedBuffer OpsBuffer; + CompressedBuffer OpsBuffer; std::filesystem::path AttachmentTempPath = Oplog.TempPath(); AttachmentTempPath.append(".pending"); @@ -1750,7 +1816,8 @@ BuildContainer(CidStore& ChunkStore, if (DataHash == IoHash::Zero) { std::string_view ServerPath = View["serverpath"sv].AsString(); - std::filesystem::path FilePath = Project.RootDir / ServerPath; + std::filesystem::path FilePath = (Project.RootDir / ServerPath).make_preferred(); + ZEN_INFO("{}", FilePath); if (!IsFile(FilePath)) { remotestore_impl::ReportMessage( @@ -1937,14 +2004,286 @@ BuildContainer(CidStore& ChunkStore, FoundHashes.insert(It.first); } + struct ChunkedFile + { + IoBuffer Source; + + ChunkedInfoWithSource Chunked; + }; + + std::unordered_set<IoHash, IoHash::Hasher> MissingHashes; + + struct FoundChunkedFile + { + IoHash RawHash = IoHash::Zero; + IoBuffer Source; + uint64_t Offset = 0; + uint64_t Size = 0; + }; + + std::vector<FoundChunkedFile> AttachmentsToChunk; + + if (AllowChunking) + { + RwLock FindFilesToChunkLock; + Latch FindFilesToChunkLatch(1); + for (auto& It : UploadAttachments) + { + if (remotestore_impl::IsCancelled(OptionalContext)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + return {}; + } + + FindFilesToChunkLatch.AddCount(1); + + WorkerPool.ScheduleWork( + [&ChunkStore, + UploadAttachment = &It.second, + RawHash = It.first, + &FindFilesToChunkLatch, + &FindFilesToChunkLock, + // &LooseUploadAttachments, + &MissingHashes, + &OnLargeAttachment, + &AttachmentTempPath, + MaxChunkEmbedSize, + ChunkFileSizeLimit, + AllowChunking, + &RemoteResult, + &AttachmentsToChunk, + OptionalContext]() { + ZEN_TRACE_CPU("PrepareChunk"); + + auto _ = MakeGuard([&FindFilesToChunkLatch] { FindFilesToChunkLatch.CountDown(); }); + if (remotestore_impl::IsCancelled(OptionalContext)) + { + return; + } + + try + { + if (!UploadAttachment->RawPath.empty()) + { + const std::filesystem::path& FilePath = UploadAttachment->RawPath; + IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath); + if (RawData) + { + UploadAttachment->Size = RawData.GetSize(); + if (UploadAttachment->Size > ChunkFileSizeLimit) + { + FindFilesToChunkLock.WithExclusiveLock([&]() { + AttachmentsToChunk.push_back(FoundChunkedFile{.RawHash = RawHash, + .Source = RawData, + .Offset = 0, + .Size = RawData.GetSize()}); + }); + } + } + else + { + FindFilesToChunkLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); }); + } + } + else + { + IoBuffer Data = ChunkStore.FindChunkByCid(RawHash); + if (Data) + { + UploadAttachment->Size = Data.GetSize(); + if (Data.IsWholeFile()) + { + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(Data), VerifyRawHash, VerifyRawSize); + if (Compressed) + { + if (VerifyRawSize > ChunkFileSizeLimit) + { + OodleCompressor Compressor; + OodleCompressionLevel CompressionLevel; + uint64_t BlockSize; + if (Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) + { + if (CompressionLevel == OodleCompressionLevel::None) + { + CompositeBuffer Decompressed = Compressed.DecompressToComposite(); + if (Decompressed) + { + std::span<const SharedBuffer> Segments = Decompressed.GetSegments(); + if (Segments.size() == 1) + { + IoBuffer DecompressedData = Segments[0].AsIoBuffer(); + IoBufferFileReference DecompressedFileRef; + if (DecompressedData.GetFileReference(DecompressedFileRef)) + { + // Are we still pointing to disk? + FindFilesToChunkLock.WithExclusiveLock([&]() { + AttachmentsToChunk.push_back( + FoundChunkedFile{.RawHash = RawHash, + .Source = Data, + .Offset = DecompressedFileRef.FileChunkOffset, + .Size = DecompressedFileRef.FileChunkSize}); + }); + } + } + } + } + } + } + } + } + } + else + { + FindFilesToChunkLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); }); + } + } + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to resolve attachment {}", RawHash), + Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + FindFilesToChunkLatch.CountDown(); + + Stopwatch ResolveAttachmentsProgressTimer; + while (!FindFilesToChunkLatch.Wait(1000)) + { + ptrdiff_t Remaining = FindFilesToChunkLatch.Remaining(); + if (remotestore_impl::IsCancelled(OptionalContext)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + while (!FindFilesToChunkLatch.Wait(1000)) + { + Remaining = FindFilesToChunkLatch.Remaining(); + remotestore_impl::ReportProgress(OptionalContext, + "Resolving attachments"sv, + fmt::format("Aborting, {} attachments remaining...", Remaining), + UploadAttachments.size(), + Remaining, + ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); + } + remotestore_impl::ReportProgress(OptionalContext, + "Resolving attachments"sv, + "Aborted"sv, + UploadAttachments.size(), + 0, + ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); + return {}; + } + remotestore_impl::ReportProgress(OptionalContext, + "Resolving attachments"sv, + fmt::format("{} remaining...", Remaining), + UploadAttachments.size(), + Remaining, + ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); + } + } + + for (const IoHash& AttachmentHash : MissingHashes) + { + auto It = UploadAttachments.find(AttachmentHash); + ZEN_ASSERT(It != UploadAttachments.end()); + std::optional<CbObject> Op = Oplog.GetOpByKey(It->second.Key); + ZEN_ASSERT(Op.has_value()); + + if (IgnoreMissingAttachments) + { + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Missing attachment '{}' for op '{}'", AttachmentHash, It->second.Key)); + } + else + { + ExtendableStringBuilder<1024> Sb; + Sb.Append("Failed to find attachment '"); + Sb.Append(AttachmentHash.ToHexString()); + Sb.Append("' for op: \n"); + Op.value().ToJson(Sb); + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {}); + return {}; + } + UploadAttachments.erase(AttachmentHash); + } + + std::unordered_set<IoHash, IoHash::Hasher> ChunkedHashes; + + std::vector<ChunkedFile> ChunkedFiles(AttachmentsToChunk.size()); + + for (size_t ChunkFileIndexToChunk = 0; ChunkFileIndexToChunk < AttachmentsToChunk.size(); ChunkFileIndexToChunk++) + { + // TODO: Multithread + + const FoundChunkedFile& AttachmentToChunk = AttachmentsToChunk[ChunkFileIndexToChunk]; + const IoHash& RawHash = AttachmentToChunk.RawHash; + + const IoBuffer& Buffer = AttachmentToChunk.Source; + IoBufferFileReference FileRef; + bool IsFile = Buffer.GetFileReference(FileRef); + ZEN_ASSERT(IsFile); + + Stopwatch ChunkOneTimer; + + uint64_t Offset = AttachmentToChunk.Offset; + uint64_t Size = AttachmentToChunk.Size; + + BasicFile SourceFile; + SourceFile.Attach(FileRef.FileHandle); + auto __ = MakeGuard([&SourceFile]() { SourceFile.Detach(); }); + + ChunkedFile& Chunked = ChunkedFiles[ChunkFileIndexToChunk]; + Chunked.Source = Buffer; + Chunked.Chunked = ChunkData(SourceFile, Offset, Size, UShaderByteCodeParams); + ZEN_ASSERT(Chunked.Chunked.Info.RawHash == RawHash); + + ZEN_INFO("Chunked large attachment '{}' {} into {} chunks in {}", + RawHash, + NiceBytes(Chunked.Chunked.Info.RawSize), + Chunked.Chunked.Info.ChunkHashes.size(), + NiceTimeSpanMs(ChunkOneTimer.GetElapsedTimeMs())); + } + + for (const ChunkedFile& Chunked : ChunkedFiles) + { + UploadAttachments.erase(Chunked.Chunked.Info.RawHash); + for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes) + { + UploadAttachments.erase(ChunkHash); + } + } + + size_t ChunkedChunkCount = + std::accumulate(ChunkedFiles.begin(), ChunkedFiles.end(), size_t(0), [](size_t Current, const ChunkedFile& Value) { + return Current + Value.Chunked.Info.ChunkHashes.size(); + }); + size_t ReusedAttachmentCount = 0; std::vector<size_t> ReusedBlockIndexes; { - std::vector<IoHash> ChunkHashes; - ChunkHashes.reserve(FoundHashes.size()); - ChunkHashes.insert(ChunkHashes.begin(), FoundHashes.begin(), FoundHashes.end()); + std::unordered_set<IoHash, IoHash::Hasher> UniqueChunkHashes; + UniqueChunkHashes.reserve(FoundHashes.size() + ChunkedChunkCount); + + UniqueChunkHashes.insert(FoundHashes.begin(), FoundHashes.end()); + + for (ChunkedFile& Chunked : ChunkedFiles) + { + UniqueChunkHashes.insert(Chunked.Chunked.Info.ChunkHashes.begin(), Chunked.Chunked.Info.ChunkHashes.end()); + } + std::vector<IoHash> ChunkHashes(UniqueChunkHashes.begin(), UniqueChunkHashes.end()); + std::vector<uint32_t> ChunkIndexes; - ChunkIndexes.resize(FoundHashes.size()); + ChunkIndexes.resize(ChunkHashes.size()); std::iota(ChunkIndexes.begin(), ChunkIndexes.end(), 0); std::vector<uint32_t> UnusedChunkIndexes; @@ -1971,50 +2310,15 @@ BuildContainer(CidStore& ChunkStore, } } - struct ChunkedFile - { - IoBuffer Source; - - ChunkedInfoWithSource Chunked; - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkLoookup; - }; - std::vector<ChunkedFile> ChunkedFiles; - - auto ChunkFile = [](const IoHash& RawHash, IoBuffer& RawData, const IoBufferFileReference& FileRef, JobContext*) -> ChunkedFile { - ChunkedFile Chunked; - Stopwatch Timer; - - uint64_t Offset = FileRef.FileChunkOffset; - uint64_t Size = FileRef.FileChunkSize; - - BasicFile SourceFile; - SourceFile.Attach(FileRef.FileHandle); - auto __ = MakeGuard([&SourceFile]() { SourceFile.Detach(); }); - - Chunked.Chunked = ChunkData(SourceFile, Offset, Size, UShaderByteCodeParams); - ZEN_ASSERT(Chunked.Chunked.Info.RawHash == RawHash); - Chunked.Source = RawData; - - ZEN_INFO("Chunked large attachment '{}' {} into {} chunks in {}", - RawHash, - NiceBytes(Chunked.Chunked.Info.RawSize), - Chunked.Chunked.Info.ChunkHashes.size(), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - - return Chunked; - }; - RwLock ResolveLock; - std::unordered_set<IoHash, IoHash::Hasher> ChunkedHashes; std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes; - std::unordered_map<IoHash, size_t, IoHash::Hasher> ChunkedUploadAttachments; std::unordered_map<IoHash, std::pair<uint64_t, IoBuffer>, IoHash::Hasher> LooseUploadAttachments; - std::unordered_set<IoHash, IoHash::Hasher> MissingHashes; remotestore_impl::ReportMessage(OptionalContext, fmt::format("Resolving {} attachments from {} ops", UploadAttachments.size(), TotalOpCount)); - Latch ResolveAttachmentsLatch(1); + Stopwatch ResolveAttachmentsProgressTimer; + Latch ResolveAttachmentsLatch(1); for (auto& It : UploadAttachments) { if (remotestore_impl::IsCancelled(OptionalContext)) @@ -2035,13 +2339,10 @@ BuildContainer(CidStore& ChunkStore, &ResolveLock, &ChunkedHashes, &LargeChunkHashes, - &ChunkedUploadAttachments, &LooseUploadAttachments, &MissingHashes, &OnLargeAttachment, &AttachmentTempPath, - &ChunkFile, - &ChunkedFiles, MaxChunkEmbedSize, ChunkFileSizeLimit, AllowChunking, @@ -2057,163 +2358,150 @@ BuildContainer(CidStore& ChunkStore, try { + ZEN_ASSERT(UploadAttachment->Size != 0); if (!UploadAttachment->RawPath.empty()) { - const std::filesystem::path& FilePath = UploadAttachment->RawPath; - IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath); - if (RawData) + if (UploadAttachment->Size > (MaxChunkEmbedSize * 2)) { - if (AllowChunking && RawData.GetSize() > ChunkFileSizeLimit) - { - IoBufferFileReference FileRef; - (void)RawData.GetFileReference(FileRef); - - ChunkedFile Chunked = ChunkFile(RawHash, RawData, FileRef, OptionalContext); - ResolveLock.WithExclusiveLock( - [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() { - ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size()); - ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size()); - for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes) - { - ChunkedHashes.insert(ChunkHash); - } - ChunkedFiles.emplace_back(std::move(Chunked)); - }); - } - else if (RawData.GetSize() > (MaxChunkEmbedSize * 2)) - { - // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't - // it will be a loose attachment instead of going into a block - OnLargeAttachment(RawHash, [RawData = std::move(RawData), AttachmentTempPath](const IoHash& RawHash) { - size_t RawSize = RawData.GetSize(); - CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(RawData)), - OodleCompressor::Mermaid, - OodleCompressionLevel::VeryFast); + // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't + // it will be a loose attachment instead of going into a block + OnLargeAttachment( + RawHash, + [RawPath = UploadAttachment->RawPath, AttachmentTempPath, UploadAttachment]( + const IoHash& RawHash) -> CompositeBuffer { + IoBuffer RawData = IoBufferBuilder::MakeFromFile(RawPath); + if (!RawData) + { + throw std::runtime_error( + fmt::format("Failed to read source file for blob {} from '{}'", RawHash, RawPath)); + } std::filesystem::path AttachmentPath = AttachmentTempPath; AttachmentPath.append(RawHash.ToHexString()); + IoBuffer TempAttachmentBuffer = - WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath); + remotestore_impl::CompressToTempFile(RawHash, + RawData, + AttachmentPath, + OodleCompressor::Mermaid, + OodleCompressionLevel::VeryFast); + TempAttachmentBuffer.SetDeleteOnClose(true); + + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressed(SharedBuffer(TempAttachmentBuffer), + VerifyRawHash, + VerifyRawSize)); + ZEN_ASSERT_SLOW(VerifyRawHash == RawHash); + ZEN_ASSERT_SLOW(VerifyRawSize == RawData.GetSize()); + ZEN_INFO("Saved temp attachment to '{}', {} ({})", AttachmentPath, - NiceBytes(RawSize), + NiceBytes(UploadAttachment->Size), NiceBytes(TempAttachmentBuffer.GetSize())); - return TempAttachmentBuffer; + return CompositeBuffer(SharedBuffer(std::move(TempAttachmentBuffer))); }); - ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); - } - else + ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); + } + else + { + // Compress inline - check compressed size to see if it should go into a block or not + IoBuffer RawData = IoBufferBuilder::MakeFromFile(UploadAttachment->RawPath); + if (!RawData) { - uint64_t RawSize = RawData.GetSize(); - CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(RawData), - OodleCompressor::Mermaid, - OodleCompressionLevel::VeryFast); + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to read attachment {}", UploadAttachment->RawPath), + ""); + return; + } + + std::filesystem::path TempFilePath = AttachmentTempPath; + TempFilePath.append(RawHash.ToHexString()); - std::filesystem::path AttachmentPath = AttachmentTempPath; - AttachmentPath.append(RawHash.ToHexString()); + try + { + IoBuffer TempAttachmentBuffer = remotestore_impl::CompressToTempFile(RawHash, + RawData, + TempFilePath, + OodleCompressor::Mermaid, + OodleCompressionLevel::VeryFast); + TempAttachmentBuffer.SetDeleteOnClose(true); + + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + ZEN_ASSERT_SLOW( + CompressedBuffer::FromCompressed(SharedBuffer(TempAttachmentBuffer), VerifyRawHash, VerifyRawSize)); + ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)) + .CompressedBuffer::Decompress()); + ZEN_ASSERT_SLOW(VerifyRawHash == RawHash); + ZEN_ASSERT_SLOW(VerifyRawSize == RawData.GetSize()); + + uint64_t CompressedSize = TempAttachmentBuffer.GetSize(); - uint64_t CompressedSize = Compressed.GetCompressedSize(); - IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath); ZEN_INFO("Saved temp attachment to '{}', {} ({})", - AttachmentPath, - NiceBytes(RawSize), - NiceBytes(TempAttachmentBuffer.GetSize())); + TempFilePath, + NiceBytes(UploadAttachment->Size), + NiceBytes(CompressedSize)); if (CompressedSize > MaxChunkEmbedSize) { - OnLargeAttachment(RawHash, - [Data = std::move(TempAttachmentBuffer)](const IoHash&) { return Data; }); + OnLargeAttachment(RawHash, [Data = std::move(TempAttachmentBuffer)](const IoHash&) { + return CompositeBuffer(SharedBuffer(std::move(Data))); + }); ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); } else { UploadAttachment->Size = CompressedSize; - ResolveLock.WithExclusiveLock( - [RawHash, RawSize, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() { - LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data))); - }); - } - } - } - else - { - ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); }); - } - } - else - { - IoBuffer Data = ChunkStore.FindChunkByCid(RawHash); - if (Data) - { - auto GetForChunking = - [](size_t ChunkFileSizeLimit, const IoBuffer& Data, IoBufferFileReference& OutFileRef) -> bool { - if (Data.IsWholeFile()) - { - IoHash VerifyRawHash; - uint64_t VerifyRawSize; - CompressedBuffer Compressed = - CompressedBuffer::FromCompressed(SharedBuffer(Data), VerifyRawHash, VerifyRawSize); - if (Compressed) { - if (VerifyRawSize > ChunkFileSizeLimit) - { - OodleCompressor Compressor; - OodleCompressionLevel CompressionLevel; - uint64_t BlockSize; - if (Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) - { - if (CompressionLevel == OodleCompressionLevel::None) - { - CompositeBuffer Decompressed = Compressed.DecompressToComposite(); - if (Decompressed) - { - std::span<const SharedBuffer> Segments = Decompressed.GetSegments(); - if (Segments.size() == 1) - { - IoBuffer DecompressedData = Segments[0].AsIoBuffer(); - if (DecompressedData.GetFileReference(OutFileRef)) - { - return true; - } - } - } - } - } - } + IoHash VerifyRawHash2; + uint64_t VerifyRawSize2; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(TempAttachmentBuffer), + VerifyRawHash2, + VerifyRawSize2); + ZEN_ASSERT_SLOW(Compressed.Decompress()); + ZEN_ASSERT_SLOW(VerifyRawHash2 == RawHash); + ZEN_ASSERT_SLOW(VerifyRawSize2 == RawData.GetSize()); } - } - return false; - }; - IoBufferFileReference FileRef; - if (AllowChunking && GetForChunking(ChunkFileSizeLimit, Data, FileRef)) - { - ChunkedFile Chunked = ChunkFile(RawHash, Data, FileRef, OptionalContext); - ResolveLock.WithExclusiveLock( - [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() { - ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size()); - ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size()); - for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes) - { - ChunkedHashes.insert(ChunkHash); - } - ChunkedFiles.emplace_back(std::move(Chunked)); + ResolveLock.WithExclusiveLock([RawHash, + RawSize = RawData.GetSize(), + &LooseUploadAttachments, + Data = std::move(TempAttachmentBuffer)]() { + LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data))); }); + } } - else if (Data.GetSize() > MaxChunkEmbedSize) + catch (const std::system_error& SysEx) { - OnLargeAttachment(RawHash, - [&ChunkStore](const IoHash& RawHash) { return ChunkStore.FindChunkByCid(RawHash); }); - ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to compress blob {} to temporary file '{}', reason: ({}) {}", + RawHash, + TempFilePath, + SysEx.code().value(), + SysEx.what()), + ""); } - else + catch (const std::exception& Ex) { - UploadAttachment->Size = Data.GetSize(); + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to compress blob {} to temporary file '{}', reason: {}", + RawHash, + TempFilePath, + Ex.what()), + ""); } } - else + } + else + { + if (UploadAttachment->Size > MaxChunkEmbedSize) { - ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); }); + OnLargeAttachment(RawHash, [&ChunkStore](const IoHash& RawHash) { + return CompositeBuffer(SharedBuffer(std::move(ChunkStore.FindChunkByCid(RawHash)))); + }); + ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); } } } @@ -2229,8 +2517,6 @@ BuildContainer(CidStore& ChunkStore, ResolveAttachmentsLatch.CountDown(); { - Stopwatch ResolveAttachmentsProgressTimer; - ptrdiff_t AttachmentCountToUseForProgress = ResolveAttachmentsLatch.Remaining(); while (!ResolveAttachmentsLatch.Wait(1000)) { ptrdiff_t Remaining = ResolveAttachmentsLatch.Remaining(); @@ -2245,7 +2531,7 @@ BuildContainer(CidStore& ChunkStore, Remaining = ResolveAttachmentsLatch.Remaining(); remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, - fmt::format("Aborting, {} attachments remaining...", Remaining), + "Aborted"sv, UploadAttachments.size(), Remaining, ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); @@ -2258,11 +2544,10 @@ BuildContainer(CidStore& ChunkStore, ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); return {}; } - AttachmentCountToUseForProgress = Max(Remaining, AttachmentCountToUseForProgress); remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, fmt::format("{} remaining...", Remaining), - AttachmentCountToUseForProgress, + UploadAttachments.size(), Remaining, ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); } @@ -2276,6 +2561,7 @@ BuildContainer(CidStore& ChunkStore, ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); } } + if (remotestore_impl::IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); @@ -2284,86 +2570,13 @@ BuildContainer(CidStore& ChunkStore, return {}; } - for (const IoHash& AttachmentHash : MissingHashes) - { - auto It = UploadAttachments.find(AttachmentHash); - ZEN_ASSERT(It != UploadAttachments.end()); - std::optional<CbObject> Op = Oplog.GetOpByKey(It->second.Key); - ZEN_ASSERT(Op.has_value()); - - if (IgnoreMissingAttachments) - { - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Missing attachment '{}' for op '{}'", AttachmentHash, It->second.Key)); - } - else - { - ExtendableStringBuilder<1024> Sb; - Sb.Append("Failed to find attachment '"); - Sb.Append(AttachmentHash.ToHexString()); - Sb.Append("' for op: \n"); - Op.value().ToJson(Sb); - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {}); - return {}; - } - UploadAttachments.erase(AttachmentHash); - } - - for (const auto& It : ChunkedUploadAttachments) - { - UploadAttachments.erase(It.first); - } for (const auto& It : LargeChunkHashes) { UploadAttachments.erase(It); } - { - std::vector<IoHash> ChunkHashes; - ChunkHashes.reserve(ChunkedHashes.size()); - ChunkHashes.insert(ChunkHashes.begin(), ChunkedHashes.begin(), ChunkedHashes.end()); - std::vector<uint32_t> ChunkIndexes; - ChunkIndexes.resize(ChunkedHashes.size()); - std::iota(ChunkIndexes.begin(), ChunkIndexes.end(), 0); - - std::vector<uint32_t> UnusedChunkIndexes; - ReuseBlocksStatistics ReuseBlocksStats; - - std::vector<size_t> ReusedBlockFromChunking = FindReuseBlocks(*LogOutput, - /*BlockReuseMinPercentLimit*/ 80, - /*IsVerbose*/ false, - ReuseBlocksStats, - KnownBlocks, - ChunkHashes, - ChunkIndexes, - UnusedChunkIndexes); - for (size_t KnownBlockIndex : ReusedBlockIndexes) - { - const ThinChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; - for (const IoHash& KnownHash : KnownBlock.ChunkRawHashes) - { - if (ChunkedHashes.erase(KnownHash) == 1) - { - ReusedAttachmentCount++; - } - } - } - ReusedBlockIndexes.insert(ReusedBlockIndexes.end(), ReusedBlockFromChunking.begin(), ReusedBlockFromChunking.end()); - } - - std::sort(ReusedBlockIndexes.begin(), ReusedBlockIndexes.end()); - auto UniqueKnownBlocksEnd = std::unique(ReusedBlockIndexes.begin(), ReusedBlockIndexes.end()); - size_t ReuseBlockCount = std::distance(ReusedBlockIndexes.begin(), UniqueKnownBlocksEnd); - if (ReuseBlockCount > 0) - { - Blocks.reserve(ReuseBlockCount); - for (auto It = ReusedBlockIndexes.begin(); It != UniqueKnownBlocksEnd; It++) - { - Blocks.push_back({KnownBlocks[*It]}); - } - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Reused {} attachments from {} blocks", ReusedAttachmentCount, ReuseBlockCount)); - } + RwLock BlocksLock; + std::vector<ChunkBlockDescription> Blocks; std::vector<std::pair<IoHash, Oid>> SortedUploadAttachments; SortedUploadAttachments.reserve(UploadAttachments.size()); @@ -2406,9 +2619,6 @@ BuildContainer(CidStore& ChunkStore, return ChunkedFiles[Lhs].Chunked.Info.RawHash < ChunkedFiles[Rhs].Chunked.Info.RawHash; }); - // SortedUploadAttachments now contains all whole chunks with size to be composed into blocks and uploaded - // ChunkedHashes contains all chunked up chunks to be composed into blocks - if (remotestore_impl::IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); @@ -2431,225 +2641,431 @@ BuildContainer(CidStore& ChunkStore, return {}; } - // SortedUploadAttachments now contains all whole chunks with size to be composed into blocks and uploaded - // ChunkedHashes contains all chunked up chunks to be composed into blocks - size_t ChunkAssembleCount = SortedUploadAttachments.size() + ChunkedHashes.size(); size_t ChunksAssembled = 0; remotestore_impl::ReportMessage(OptionalContext, fmt::format("Assembling {} attachments from {} ops into blocks", ChunkAssembleCount, TotalOpCount)); - Latch BlockCreateLatch(1); - size_t GeneratedBlockCount = 0; - size_t BlockSize = 0; - std::vector<std::pair<IoHash, FetchChunkFunc>> ChunksInBlock; - - Oid LastOpKey = Oid::Zero; - uint32_t ComposedBlocks = 0; + Latch BlockCreateLatch(1); + size_t GeneratedBlockCount = 0; + uint32_t ComposedBlocks = 0; uint64_t CreateBlocksStartMS = Timer.GetElapsedTimeMs(); try { - uint64_t FetchAttachmentsStartMS = Timer.GetElapsedTimeMs(); - std::unordered_set<IoHash, IoHash::Hasher> AddedAttachmentHashes; - auto NewBlock = [&]() { - size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks); - size_t ChunkCount = ChunksInBlock.size(); - std::vector<IoHash> ChunkRawHashes; - ChunkRawHashes.reserve(ChunkCount); - for (const std::pair<IoHash, FetchChunkFunc>& Chunk : ChunksInBlock) - { - ChunkRawHashes.push_back(Chunk.first); - } - if (BuildBlocks) - { - remotestore_impl::CreateBlock(WorkerPool, - BlockCreateLatch, - std::move(ChunksInBlock), - BlocksLock, - Blocks, - BlockIndex, - AsyncOnBlock, - RemoteResult); - ComposedBlocks++; - // Worker will set Blocks[BlockIndex] = Block (including ChunkRawHashes) under shared lock - } - else - { - ZEN_INFO("Bulk group {} attachments", ChunkCount); - OnBlockChunks(std::move(ChunksInBlock)); - // We can share the lock as we are not resizing the vector and only touch our own index - RwLock::SharedLockScope _(BlocksLock); - Blocks[BlockIndex].ChunkRawHashes = std::move(ChunkRawHashes); - } - uint64_t NowMS = Timer.GetElapsedTimeMs(); - ZEN_INFO("Assembled block {} with {} chunks in {} ({})", - BlockIndex, - ChunkCount, - NiceTimeSpanMs(NowMS - FetchAttachmentsStartMS), - NiceBytes(BlockSize)); - FetchAttachmentsStartMS = NowMS; - ChunksInBlock.clear(); - BlockSize = 0; - GeneratedBlockCount++; - }; + auto NewBlock = + [&WorkerPool, BuildBlocks, &BlockCreateLatch, &BlocksLock, &Blocks, &AsyncOnBlock, &OnBlockChunks, &RemoteResult]( + std::vector<IoHash>&& ChunkRawHashes, + const std::function<FetchChunkFunc(const IoHash&)>& FetchResolveFunc) { + size_t ChunkCount = ChunkRawHashes.size(); + std::vector<std::pair<IoHash, FetchChunkFunc>> ChunksInBlock; + ChunksInBlock.reserve(ChunkCount); + + for (const IoHash& AttachmentHash : ChunkRawHashes) + { + ChunksInBlock.emplace_back(std::make_pair(AttachmentHash, FetchResolveFunc(AttachmentHash))); + } - Stopwatch AssembleBlocksProgressTimer; - uint64_t LastAssembleBlocksProgressUpdateMs = AssembleBlocksProgressTimer.GetElapsedTimeMs(); - for (auto HashIt = SortedUploadAttachments.begin(); HashIt != SortedUploadAttachments.end(); HashIt++) - { - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - break; - } - if (AssembleBlocksProgressTimer.GetElapsedTimeMs() - LastAssembleBlocksProgressUpdateMs > 200) - { - remotestore_impl::ReportProgress( - OptionalContext, - "Assembling blocks"sv, - fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), - ChunkAssembleCount, - ChunkAssembleCount - ChunksAssembled, - AssembleBlocksProgressTimer.GetElapsedTimeMs()); - LastAssembleBlocksProgressUpdateMs = AssembleBlocksProgressTimer.GetElapsedTimeMs(); - } - const IoHash& RawHash(HashIt->first); - const Oid CurrentOpKey = HashIt->second; - const IoHash& AttachmentHash(HashIt->first); - auto InfoIt = UploadAttachments.find(RawHash); - ZEN_ASSERT(InfoIt != UploadAttachments.end()); - uint64_t PayloadSize = InfoIt->second.Size; - - if (AddedAttachmentHashes.insert(AttachmentHash).second) + size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks); + if (BuildBlocks) + { +#if 1 + ChunkBlockDescription Block; + CompressedBuffer CompressedBlock = GenerateChunkBlock(std::move(ChunksInBlock), Block); + IoHash BlockHash = CompressedBlock.DecodeRawHash(); + ZEN_ASSERT_SLOW(BlockHash != IoHash::Zero); + ZEN_UNUSED(BlockHash); + { + // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index + // RwLock::SharedLockScope __(SectionsLock); + Blocks[BlockIndex] = Block; + } + // uint64_t BlockSize = CompressedBlock.GetCompressedSize(); + AsyncOnBlock(std::move(CompressedBlock), std::move(Block)); +#else + remotestore_impl::CreateBlock(WorkerPool, + BlockCreateLatch, + std::move(ChunksInBlock), + BlocksLock, + Blocks, + BlockIndex, + AsyncOnBlock, + RemoteResult); + // ComposedBlocks++; +#endif + } + else + { + ZEN_INFO("Bulk group {} attachments", ChunkCount); + + // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index + RwLock::SharedLockScope _(BlocksLock); + Blocks[BlockIndex].ChunkRawHashes = std::move(ChunkRawHashes); + OnBlockChunks(std::move(ChunksInBlock)); + } + // uint64_t NowMS = Timer.GetElapsedTimeMs(); + // ZEN_INFO("Assembled block {} with {} chunks in {} ({})", + // BlockIndex, + // ChunkCount, + // NiceTimeSpanMs(NowMS - FetchAttachmentsStartMS), + // NiceBytes(BlockSize)); + // FetchAttachmentsStartMS = NowMS; + // ComposedBlocks++; + }; + + auto ComposeBlocks = [&ChunksAssembled, &ComposedBlocks, ChunkAssembleCount, &NewBlock]( + uint64_t MaxBlockSize, + uint64_t MaxChunksPerBlock, + std::vector<IoHash> AttachmentHashes, + std::vector<uint64_t> AttachmentSizes, + std::vector<Oid> AttachmentKeys, + std::function<FetchChunkFunc(const IoHash& AttachmentHash)>&& FetchResolveFunc, + JobContext* OptionalContext, + remotestore_impl::AsyncRemoteResult& RemoteResult) { + ZEN_ASSERT(AttachmentHashes.size() == AttachmentSizes.size()); + ZEN_ASSERT(AttachmentHashes.size() == AttachmentKeys.size()); + ZEN_ASSERT(FetchResolveFunc); + + std::vector<IoHash> PendingChunkHashes; + uint64_t PendingBlockSize = 0; + + size_t SortedUploadAttachmentsIndex = 0; + + Stopwatch AssembleBlocksProgressTimer; + while (SortedUploadAttachmentsIndex < AttachmentHashes.size()) { - if (BuildBlocks && ChunksInBlock.size() > 0) + if (remotestore_impl::IsCancelled(OptionalContext)) { - if (((BlockSize + PayloadSize) > MaxBlockSize || (ChunksInBlock.size() + 1) > MaxChunksPerBlock) && - (CurrentOpKey != LastOpKey)) + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + break; + } + + const IoHash& FirstAttachmentHash = AttachmentHashes[SortedUploadAttachmentsIndex]; + const Oid FirstAttachmentOpKey = AttachmentKeys[SortedUploadAttachmentsIndex]; + size_t CurrentOpAttachmentsSize = AttachmentSizes[SortedUploadAttachmentsIndex]; + + std::vector<IoHash> CurrentOpRawHashes; + CurrentOpRawHashes.push_back(FirstAttachmentHash); + std::vector<uint64_t> CurrentOpChunkSizes; + + CurrentOpChunkSizes.push_back(CurrentOpAttachmentsSize); + + bool CurrentOpFillFullBlock = false; + + while (SortedUploadAttachmentsIndex + CurrentOpRawHashes.size() < AttachmentHashes.size()) + { + size_t NextSortedUploadAttachmentsIndex = SortedUploadAttachmentsIndex + CurrentOpChunkSizes.size(); + const Oid NextAttachmentOpKey = AttachmentKeys[NextSortedUploadAttachmentsIndex]; + if (NextAttachmentOpKey != FirstAttachmentOpKey) { - NewBlock(); + break; + } + const IoHash& NextAttachmentHash = AttachmentHashes[NextSortedUploadAttachmentsIndex]; + size_t NextOpAttachmentSize = AttachmentSizes[NextSortedUploadAttachmentsIndex]; + + if (CurrentOpAttachmentsSize + NextOpAttachmentSize > MaxBlockSize) + { + CurrentOpFillFullBlock = true; + break; + } + CurrentOpRawHashes.push_back(NextAttachmentHash); + CurrentOpChunkSizes.push_back(NextOpAttachmentSize); + CurrentOpAttachmentsSize += NextOpAttachmentSize; + + if (CurrentOpRawHashes.size() == MaxChunksPerBlock) + { + CurrentOpFillFullBlock = true; + break; } } - if (auto It = LooseUploadAttachments.find(RawHash); It != LooseUploadAttachments.end()) + size_t CurrentOpAttachmentCount = CurrentOpChunkSizes.size(); + ZEN_ASSERT(CurrentOpAttachmentsSize <= MaxBlockSize); + ZEN_ASSERT(CurrentOpAttachmentCount <= MaxChunksPerBlock); + + if (CurrentOpFillFullBlock) { - ChunksInBlock.emplace_back(std::make_pair( - RawHash, - [RawSize = It->second.first, - IoBuffer = SharedBuffer(It->second.second)](const IoHash&) -> std::pair<uint64_t, CompressedBuffer> { - return std::make_pair(RawSize, CompressedBuffer::FromCompressedNoValidate(IoBuffer.AsIoBuffer())); - })); - LooseUploadAttachments.erase(It); + ZEN_ASSERT(CurrentOpRawHashes.size() <= MaxChunksPerBlock); + ZEN_ASSERT(CurrentOpAttachmentsSize <= MaxBlockSize); + NewBlock(std::move(CurrentOpRawHashes), /*CurrentOpAttachmentsSize, */ FetchResolveFunc); + ComposedBlocks++; + } + else if ((PendingBlockSize + CurrentOpAttachmentsSize) <= MaxBlockSize && + (PendingChunkHashes.size() + CurrentOpAttachmentCount) <= MaxChunksPerBlock) + { + // All attachments for Op fits in the current block... + PendingChunkHashes.insert(PendingChunkHashes.end(), CurrentOpRawHashes.begin(), CurrentOpRawHashes.end()); + PendingBlockSize += CurrentOpAttachmentsSize; + ZEN_ASSERT(PendingChunkHashes.size() <= MaxChunksPerBlock); + ZEN_ASSERT(PendingBlockSize <= MaxBlockSize); + if (PendingBlockSize == MaxBlockSize || PendingChunkHashes.size() == MaxChunksPerBlock) + { + NewBlock(std::move(PendingChunkHashes), /*PendingBlockSize, */ FetchResolveFunc); + ComposedBlocks++; + PendingBlockSize = 0; + } + } + else if (PendingBlockSize > (MaxBlockSize * 3) / 4) + { + // Our ops does not fit in the current block and the current block is using 75% of max size so lets move our op to + // the next block... + ZEN_ASSERT(PendingChunkHashes.size() <= MaxChunksPerBlock); + ZEN_ASSERT(PendingBlockSize <= MaxBlockSize); + NewBlock(std::move(PendingChunkHashes), /*PendingBlockSize, */ FetchResolveFunc); + ComposedBlocks++; + + PendingBlockSize = 0; + + PendingChunkHashes.insert(PendingChunkHashes.end(), CurrentOpRawHashes.begin(), CurrentOpRawHashes.end()); + PendingBlockSize += CurrentOpAttachmentsSize; + ZEN_ASSERT(PendingChunkHashes.size() <= MaxChunksPerBlock); + ZEN_ASSERT(PendingBlockSize <= MaxBlockSize); } else { - ChunksInBlock.emplace_back( - std::make_pair(RawHash, [&ChunkStore](const IoHash& RawHash) -> std::pair<uint64_t, CompressedBuffer> { - IoBuffer Chunk = ChunkStore.FindChunkByCid(RawHash); - if (!Chunk) - { - throw std::runtime_error(fmt::format("Failed to find chunk {} in cid store", RawHash)); - } - IoHash ValidateRawHash; - uint64_t RawSize = 0; - CompressedBuffer Compressed = - CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), ValidateRawHash, RawSize); - if (!Compressed) - { - throw std::runtime_error( - fmt::format("Chunk {} in cid store is malformed (not a compressed buffer)", RawHash)); - } - if (RawHash != ValidateRawHash) - { - throw std::runtime_error( - fmt::format("Chunk {} in cid store is malformed (mismatching raw hash)", RawHash)); - } - return {RawSize, Compressed}; - })); + size_t AddedChunkCount = 0; + + // Fit as many as possible in current block... + for (size_t CurrentChunkIndex = 0; CurrentChunkIndex < CurrentOpRawHashes.size(); CurrentChunkIndex++) + { + uint64_t ChunkSize = CurrentOpChunkSizes[CurrentChunkIndex]; + if (PendingBlockSize + ChunkSize >= MaxBlockSize || PendingChunkHashes.size() >= MaxChunksPerBlock) + { + break; + } + AddedChunkCount++; + PendingBlockSize += ChunkSize; + PendingChunkHashes.push_back(CurrentOpRawHashes[CurrentChunkIndex]); + } + ZEN_ASSERT(AddedChunkCount < CurrentOpRawHashes.size()); + ZEN_ASSERT(PendingChunkHashes.size() <= MaxChunksPerBlock); + ZEN_ASSERT(PendingBlockSize <= MaxBlockSize); + NewBlock(std::move(PendingChunkHashes), /*PendingBlockSize,*/ FetchResolveFunc); + ComposedBlocks++; + + if (AddedChunkCount > 0) + { + PendingChunkHashes = + std::vector<IoHash>(CurrentOpRawHashes.begin() + AddedChunkCount, CurrentOpRawHashes.end()); + PendingBlockSize = + std::accumulate(CurrentOpChunkSizes.begin() + AddedChunkCount, CurrentOpChunkSizes.end(), uint64_t(0)); + } + else + { + PendingChunkHashes = std::move(CurrentOpRawHashes); + PendingBlockSize = CurrentOpAttachmentsSize; + } + + ZEN_ASSERT(PendingChunkHashes.size() <= MaxChunksPerBlock); + ZEN_ASSERT(PendingBlockSize <= MaxBlockSize); } - BlockSize += PayloadSize; + ChunksAssembled += CurrentOpAttachmentCount; - LastOpKey = CurrentOpKey; - ChunksAssembled++; + if (ChunksAssembled % 1000 == 0) + { + remotestore_impl::ReportProgress( + OptionalContext, + "Assembling blocks"sv, + fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), + ChunkAssembleCount, + ChunkAssembleCount - ChunksAssembled, + AssembleBlocksProgressTimer.GetElapsedTimeMs()); + } + SortedUploadAttachmentsIndex += CurrentOpAttachmentCount; } - } - if (!RemoteResult.IsError()) + if (!PendingChunkHashes.empty()) + { + size_t PendingChunkCount = PendingChunkHashes.size(); + ZEN_ASSERT(PendingChunkHashes.size() <= MaxChunksPerBlock); + ZEN_ASSERT(PendingBlockSize <= MaxBlockSize); + NewBlock(std::move(PendingChunkHashes), /*PendingBlockSize, */ FetchResolveFunc); + ComposedBlocks++; + + ChunksAssembled += PendingChunkCount; + } + }; + + Stopwatch AssembleBlocksProgressTimer; { - // Keep the chunked files as separate blocks to make the blocks generated - // more consistent - if (BlockSize > 0) + std::vector<IoHash> AttachmentHashes; + AttachmentHashes.reserve(SortedUploadAttachments.size()); + std::vector<uint64_t> AttachmentSizes; + AttachmentSizes.reserve(SortedUploadAttachments.size()); + std::vector<Oid> AttachmentKeys; + AttachmentKeys.reserve(SortedUploadAttachments.size()); + + for (const std::pair<IoHash, Oid>& Attachment : SortedUploadAttachments) { - NewBlock(); + AttachmentHashes.push_back(Attachment.first); + if (auto It = UploadAttachments.find(Attachment.first); It != UploadAttachments.end()) + { + AttachmentSizes.push_back(It->second.Size); + } + else + { + RemoteResult.SetError( + gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Attachment to upload state inconsistent, could not find attachment {}", Attachment.first), + ""); + return {}; + } + AttachmentKeys.push_back(Attachment.second); } + auto FetchWholeAttachmentResolver = [&LooseUploadAttachments, &ChunkStore](const IoHash& AttachmentHash) -> FetchChunkFunc { + if (auto It = LooseUploadAttachments.find(AttachmentHash); It != LooseUploadAttachments.end()) + { + uint64_t RawSize = It->second.first; + IoBuffer Payload = It->second.second; + + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), VerifyRawHash, VerifyRawSize); + ZEN_ASSERT_SLOW(Compressed.Decompress()); + ZEN_ASSERT_SLOW(VerifyRawHash == AttachmentHash); + ZEN_ASSERT_SLOW(VerifyRawSize == RawSize); + + LooseUploadAttachments.erase(It); + return [RawSize = RawSize, + Payload = SharedBuffer(Payload)](const IoHash& ChunkHash) -> std::pair<uint64_t, CompressedBuffer> { + // CompressedBuffer Compressed = + // CompressedBuffer::FromCompressedNoValidate(Payload.AsIoBuffer()); + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Payload, VerifyRawHash, VerifyRawSize); + ZEN_ASSERT_SLOW(Compressed.Decompress()); + ZEN_ASSERT_SLOW(VerifyRawHash == ChunkHash); + ZEN_ASSERT_SLOW(VerifyRawSize == RawSize); + return {RawSize, Compressed}; + }; + } + else + { + ZEN_ASSERT_SLOW(ChunkStore.ContainsChunk(AttachmentHash)); + return [&ChunkStore](const IoHash& RawHash) -> std::pair<uint64_t, CompressedBuffer> { + IoBuffer Chunk = ChunkStore.FindChunkByCid(RawHash); + if (!Chunk) + { + throw std::runtime_error(fmt::format("Failed to find chunk {} in cid store", RawHash)); + } + IoHash ValidateRawHash; + uint64_t RawSize = 0; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), ValidateRawHash, RawSize); + if (!Compressed) + { + throw std::runtime_error( + fmt::format("Chunk {} in cid store is malformed (not a compressed buffer)", RawHash)); + } + if (RawHash != ValidateRawHash) + { + throw std::runtime_error(fmt::format("Chunk {} in cid store is malformed (mismatching raw hash)", RawHash)); + } + return {RawSize, Compressed}; + }; + } + }; + + ComposeBlocks(MaxBlockSize, + MaxChunksPerBlock, + AttachmentHashes, + AttachmentSizes, + AttachmentKeys, + FetchWholeAttachmentResolver, + OptionalContext, + RemoteResult); + } + + { + std::vector<IoHash> AttachmentHashes; + AttachmentHashes.reserve(ChunkedChunkCount); + std::vector<uint64_t> AttachmentSizes; + AttachmentSizes.reserve(ChunkedChunkCount); + std::vector<Oid> AttachmentKeys; + AttachmentKeys.reserve(ChunkedChunkCount); + + tsl::robin_map<IoHash, std::pair<size_t, size_t>, IoHash::Hasher> ChunkHashToChunkFileIndexAndChunkIndex; + for (size_t ChunkedFileIndex : ChunkedFilesOrder) { - const ChunkedFile& ChunkedFile = ChunkedFiles[ChunkedFileIndex]; - const ChunkedInfoWithSource& Chunked = ChunkedFile.Chunked; - size_t ChunkCount = Chunked.Info.ChunkHashes.size(); + const ChunkedFile& ChunkedFile = ChunkedFiles[ChunkedFileIndex]; + const ChunkedInfoWithSource& Chunked = ChunkedFile.Chunked; + size_t ChunkCount = Chunked.Info.ChunkHashes.size(); + Oid ChunkedFileOid = Oid::NewOid(); for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ChunkIndex++) { - if (remotestore_impl::IsCancelled(OptionalContext)) + const IoHash& ChunkHash = Chunked.Info.ChunkHashes[ChunkIndex]; + uint64_t ChunkSize = Chunked.ChunkSources[ChunkIndex].Size; { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - break; - } - if (AssembleBlocksProgressTimer.GetElapsedTimeMs() - LastAssembleBlocksProgressUpdateMs > 200) - { - remotestore_impl::ReportProgress( - OptionalContext, - "Assembling blocks"sv, - fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), - ChunkAssembleCount, - ChunkAssembleCount - ChunksAssembled, - AssembleBlocksProgressTimer.GetElapsedTimeMs()); - LastAssembleBlocksProgressUpdateMs = AssembleBlocksProgressTimer.GetElapsedTimeMs(); - } - const IoHash& ChunkHash = ChunkedFile.Chunked.Info.ChunkHashes[ChunkIndex]; - if (auto FindIt = ChunkedHashes.find(ChunkHash); FindIt != ChunkedHashes.end()) - { - if (AddedAttachmentHashes.insert(ChunkHash).second) + if (ChunkHashToChunkFileIndexAndChunkIndex + .insert(std::make_pair(ChunkHash, std::make_pair(ChunkedFileIndex, ChunkIndex))) + .second) { - const ChunkSource& Source = Chunked.ChunkSources[ChunkIndex]; - uint32_t ChunkSize = gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size); - if (BuildBlocks && ChunksInBlock.size() > 0) + if (ChunkSize > MaxChunkEmbedSize) { - if ((BlockSize + ChunkSize) > MaxBlockSize || (ChunksInBlock.size() + 1) > MaxChunksPerBlock) - { - NewBlock(); - } + OnLargeAttachment(ChunkHash, + [SourceBuffer = ChunkedFile.Source, + ChunkSource = Chunked.ChunkSources[ChunkIndex], + ChunkHash](const IoHash& RawHash) -> CompositeBuffer { + ZEN_ASSERT(RawHash == ChunkHash); + CompressedBuffer Compressed = CompressedBuffer::Compress( + SharedBuffer(IoBuffer(SourceBuffer, ChunkSource.Offset, ChunkSource.Size)), + OodleCompressor::Mermaid, + OodleCompressionLevel::None); + return Compressed.GetCompressed(); + }); + + ResolveLock.WithExclusiveLock([ChunkHash, &LargeChunkHashes]() { LargeChunkHashes.insert(ChunkHash); }); + } + else + { + AttachmentHashes.push_back(ChunkHash); + AttachmentSizes.push_back(ChunkSize); + AttachmentKeys.push_back(ChunkedFileOid); } - ChunksInBlock.emplace_back( - std::make_pair(ChunkHash, - [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size]( - const IoHash&) -> std::pair<uint64_t, CompressedBuffer> { - return {Size, - CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)), - OodleCompressor::Mermaid, - OodleCompressionLevel::None)}; - })); - BlockSize += CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size; - ChunksAssembled++; } - ChunkedHashes.erase(FindIt); } } } - } - if (BlockSize > 0 && !RemoteResult.IsError()) - { - if (!remotestore_impl::IsCancelled(OptionalContext)) - { - NewBlock(); - } + auto ChunkedFileAttachmentResolver = [&ChunkHashToChunkFileIndexAndChunkIndex, + &ChunkedFiles](const IoHash& AttachmentHash) -> FetchChunkFunc { + if (auto It = ChunkHashToChunkFileIndexAndChunkIndex.find(AttachmentHash); + It != ChunkHashToChunkFileIndexAndChunkIndex.end()) + { + const std::pair<size_t, size_t>& ChunkFileIndexAndChunkIndex = It->second; + size_t ChunkedFileIndex = ChunkFileIndexAndChunkIndex.first; + size_t ChunkIndex = ChunkFileIndexAndChunkIndex.second; + const ChunkedFile& ChunkedFile = ChunkedFiles[ChunkedFileIndex]; + + const ChunkSource& Source = ChunkedFile.Chunked.ChunkSources[ChunkIndex]; + ZEN_ASSERT(Source.Offset + Source.Size <= ChunkedFile.Source.GetSize()); + + return [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size]( + const IoHash&) -> std::pair<uint64_t, CompressedBuffer> { + return {Size, + CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)), + OodleCompressor::Mermaid, + OodleCompressionLevel::None)}; + }; + } + else + { + ZEN_ASSERT(false); + } + }; + + ComposeBlocks(MaxBlockSize, + MaxChunksPerBlock, + AttachmentHashes, + AttachmentSizes, + AttachmentKeys, + ChunkedFileAttachmentResolver, + OptionalContext, + RemoteResult); } if (ChunkAssembleCount > 0) @@ -2938,7 +3354,8 @@ SaveOplog(CidStore& ChunkStore, { IoBuffer BlockBuffer = WriteToTempFile(std::move(CompressedBlock).GetCompressed(), BlockPath); RwLock::ExclusiveLockScope __(AttachmentsLock); - CreatedBlocks.insert({Block.BlockHash, {.Payload = std::move(BlockBuffer), .Block = std::move(Block)}}); + CreatedBlocks.insert( + {Block.BlockHash, {.Payload = CompositeBuffer(SharedBuffer(std::move(BlockBuffer))), .Block = std::move(Block)}}); ZEN_DEBUG("Saved temp block to '{}', {}", AttachmentTempPath, NiceBytes(BlockBuffer.GetSize())); } catch (const std::exception& Ex) @@ -4329,6 +4746,32 @@ namespace projectstore_testutils { return Package; }; + static CbPackage CreateFilesOplogPackage(const Oid& Id, + const std::filesystem::path ProjectRootDir, + const std::span<const std::pair<Oid, std::filesystem::path>>& Attachments) + { + CbPackage Package; + CbObjectWriter Object; + Object << "key"sv << OidAsString(Id); + if (!Attachments.empty()) + { + Object.BeginArray("files"); + for (const auto& Attachment : Attachments) + { + std::filesystem::path ServerPath = std::filesystem::relative(Attachment.second, ProjectRootDir).generic_string(); + std::filesystem::path ClientPath = ServerPath; // dummy + Object.BeginObject(); + Object << "id"sv << Attachment.first; + Object << "serverpath"sv << ServerPath.string(); + Object << "clientpath"sv << ClientPath.string(); + Object.EndObject(); + } + Object.EndArray(); + } + Package.SetObject(Object.Save()); + return Package; + }; + static std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments( const std::span<const size_t>& Sizes, OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast, @@ -4345,6 +4788,23 @@ namespace projectstore_testutils { return Result; } + static std::vector<std::pair<Oid, std::filesystem::path>> CreateFileAttachments(const std::filesystem::path& RootDir, + const std::span<const size_t>& Sizes) + { + std::vector<std::pair<Oid, std::filesystem::path>> Result; + Result.reserve(Sizes.size()); + for (size_t Size : Sizes) + { + IoBuffer FileBlob = CreateRandomBlob(Size); + IoHash FileHash = IoHash::HashBuffer(FileBlob); + std::filesystem::path UncompressedFilePath = RootDir / "content" / "uncompressed_file" / FileHash.ToHexString(); + CreateDirectories(UncompressedFilePath.parent_path()); + WriteFile(UncompressedFilePath, FileBlob); + Result.push_back({Oid::NewOid(), UncompressedFilePath}); + } + return Result; + } + class TestJobContext : public JobContext { public: @@ -4434,6 +4894,11 @@ TEST_CASE_TEMPLATE("project.store.export", Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{256u * 1024u, 92u * 1024u}, OodleCompressionLevel::None))); + Oplog->AppendNewOplogEntry( + CreateFilesOplogPackage(Oid::NewOid(), + RootDir, + CreateFileAttachments(RootDir, std::initializer_list<size_t>{423 * 1024, 2 * 1024, 3213, 762 * 1024}))); + FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 64u * 1024, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = 32 * 1024u, @@ -4565,6 +5030,10 @@ SetupExportStore(CidStore& CidStore, Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage( Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{256u * 1024u, 92u * 1024u}, OodleCompressionLevel::None))); + Oplog->AppendNewOplogEntry(CreateFilesOplogPackage( + Oid::NewOid(), + Project.RootDir, + CreateFileAttachments(Project.RootDir, std::initializer_list<size_t>{423 * 1024, 2 * 1024, 3213, 762 * 1024}))); FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 64u * 1024, .MaxChunksPerBlock = 1000, |