diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-15 14:25:53 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2026-03-15 14:25:53 +0100 |
| commit | 0b6b4316bdf425b76200932c9701b6880c460561 (patch) | |
| tree | 0ca10b745b4526763e2f9f9fff2ed67e53e2729c | |
| parent | clang fixes, nicer reuse report (diff) | |
| download | zen-0b6b4316bdf425b76200932c9701b6880c460561.tar.xz zen-0b6b4316bdf425b76200932c9701b6880c460561.zip | |
make SaveOplog report errors through exceptions
3 files changed, 763 insertions, 927 deletions
diff --git a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h index 72a6f2969..604ecb6ab 100644 --- a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h +++ b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h @@ -152,25 +152,48 @@ struct RemoteStoreOptions typedef std::function<CompositeBuffer(const IoHash& AttachmentHash)> TGetAttachmentBufferFunc; -RemoteProjectStore::LoadContainerResult BuildContainer( - CidStore& ChunkStore, - ProjectStore::Project& Project, - ProjectStore::Oplog& Oplog, - WorkerThreadPool& WorkerPool, - size_t MaxBlockSize, - size_t MaxChunksPerBlock, - size_t MaxChunkEmbedSize, - size_t ChunkFileSizeLimit, - bool BuildBlocks, - bool IgnoreMissingAttachments, - bool AllowChunking, - const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock, - const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, - const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, - bool EmbedLooseFiles); +CbObject BuildContainer(CidStore& ChunkStore, + ProjectStore::Project& Project, + ProjectStore::Oplog& Oplog, + WorkerThreadPool& WorkerPool, + size_t MaxBlockSize, + size_t MaxChunksPerBlock, + size_t MaxChunkEmbedSize, + size_t ChunkFileSizeLimit, + bool BuildBlocks, + bool IgnoreMissingAttachments, + bool AllowChunking, + const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock, + const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, + const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, + bool EmbedLooseFiles); class JobContext; +class RemoteStoreError : public std::runtime_error +{ +public: + using _Mybase = runtime_error; + + RemoteStoreError(const std::string& Message, int32_t ErrorCode, std::string_view Text) + : _Mybase(Message) + , m_ErrorCode(ErrorCode) + , m_Text(Text) + { + } + + RemoteStoreError(const char* Message, int32_t ErrorCode, std::string_view Text) : _Mybase(Message), m_ErrorCode(ErrorCode), m_Text(Text) + { + } + + inline int32_t GetErrorCode() const { return m_ErrorCode; } + inline std::string_view GetText() const { return m_Text; } + +private: + int32_t m_ErrorCode = 0; + std::string m_Text; +}; + RemoteProjectStore::Result SaveOplogContainer( ProjectStore::Oplog& Oplog, const CbObject& ContainerObject, @@ -181,20 +204,20 @@ RemoteProjectStore::Result SaveOplogContainer( const std::function<void(const ChunkedInfo& Chunked)>& OnChunkedAttachment, JobContext* OptionalContext); -RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - ProjectStore::Project& Project, - ProjectStore::Oplog& Oplog, - WorkerThreadPool& NetworkWorkerPool, - WorkerThreadPool& WorkerPool, - size_t MaxBlockSize, - size_t MaxChunksPerBlock, - size_t MaxChunkEmbedSize, - size_t ChunkFileSizeLimit, - bool EmbedLooseFiles, - bool ForceUpload, - bool IgnoreMissingAttachments, - JobContext* OptionalContext); +void SaveOplog(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + ProjectStore::Project& Project, + ProjectStore::Oplog& Oplog, + WorkerThreadPool& NetworkWorkerPool, + WorkerThreadPool& WorkerPool, + size_t MaxBlockSize, + size_t MaxChunksPerBlock, + size_t MaxChunkEmbedSize, + size_t ChunkFileSizeLimit, + bool EmbedLooseFiles, + bool ForceUpload, + bool IgnoreMissingAttachments, + JobContext* OptionalContext); struct LoadOplogContext { diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp index cbdd23fa9..16c108092 100644 --- a/src/zenremotestore/projectstore/remoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp @@ -2118,60 +2118,47 @@ namespace remotestore_impl { WorkerThreadPool::EMode::EnableBacklog); }; - void AsyncCreateBlock(WorkerThreadPool& WorkerPool, - Latch& OpSectionsLatch, + void AsyncCreateBlock(ParallelWork& Work, + WorkerThreadPool& WorkerPool, std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock, RwLock& SectionsLock, std::vector<ChunkBlockDescription>& Blocks, size_t BlockIndex, const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock, - AsyncRemoteResult& RemoteResult) - { - OpSectionsLatch.AddCount(1); - WorkerPool.ScheduleWork( - [&Blocks, - &SectionsLock, - &OpSectionsLatch, - BlockIndex, - Chunks = std::move(ChunksInBlock), - &AsyncOnBlock, - &RemoteResult]() mutable { - ZEN_TRACE_CPU("CreateBlock"); - - auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - size_t ChunkCount = Chunks.size(); - try - { - ZEN_ASSERT(ChunkCount > 0); - Stopwatch Timer; - ChunkBlockDescription Block; - CompressedBuffer CompressedBlock = GenerateChunkBlock(std::move(Chunks), Block); - IoHash BlockHash = CompressedBlock.DecodeRawHash(); - ZEN_UNUSED(BlockHash); - { - // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index - RwLock::SharedLockScope __(SectionsLock); - Blocks[BlockIndex] = Block; - } - uint64_t BlockSize = CompressedBlock.GetCompressedSize(); - AsyncOnBlock(std::move(CompressedBlock), std::move(Block)); - ZEN_INFO("Generated block with {} attachments in {} ({})", - ChunkCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - NiceBytes(BlockSize)); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed creating block {} with {} chunks", BlockIndex, ChunkCount), - Ex.what()); - } - }, - WorkerThreadPool::EMode::EnableBacklog); + JobContext* OptionalContext) + { + Work.ScheduleWork(WorkerPool, + [&Blocks, &SectionsLock, BlockIndex, Chunks = std::move(ChunksInBlock), &AsyncOnBlock, OptionalContext]( + std::atomic<bool>& AbortFlag) mutable { + ZEN_TRACE_CPU("CreateBlock"); + + if (remotestore_impl::IsCancelled(OptionalContext)) + { + AbortFlag.store(true); + } + if (AbortFlag) + { + return; + } + size_t ChunkCount = Chunks.size(); + ZEN_ASSERT(ChunkCount > 0); + Stopwatch Timer; + ChunkBlockDescription Block; + CompressedBuffer CompressedBlock = GenerateChunkBlock(std::move(Chunks), Block); + IoHash BlockHash = CompressedBlock.DecodeRawHash(); + ZEN_UNUSED(BlockHash); + { + // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index + RwLock::SharedLockScope __(SectionsLock); + Blocks[BlockIndex] = Block; + } + uint64_t BlockSize = CompressedBlock.GetCompressedSize(); + AsyncOnBlock(std::move(CompressedBlock), std::move(Block)); + ZEN_INFO("Generated block with {} attachments in {} ({})", + ChunkCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + NiceBytes(BlockSize)); + }); } struct UploadInfo @@ -2606,8 +2593,7 @@ BuildContainer(CidStore& ChunkStore, const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles, - JobContext* OptionalContext, - remotestore_impl::AsyncRemoteResult& RemoteResult) + JobContext* OptionalContext) { using namespace std::literals; @@ -2857,9 +2843,6 @@ BuildContainer(CidStore& ChunkStore, if (remotestore_impl::IsCancelled(OptionalContext)) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return {}; } remotestore_impl::ReportMessage( @@ -2871,9 +2854,6 @@ BuildContainer(CidStore& ChunkStore, if (remotestore_impl::IsCancelled(OptionalContext)) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return {}; } @@ -2882,23 +2862,25 @@ BuildContainer(CidStore& ChunkStore, remotestore_impl::ReportMessage(OptionalContext, fmt::format("Assembling {} attachments from {} ops into blocks", ChunkAssembleCount, TotalOpCount)); - Latch BlockCreateLatch(1); + std::atomic<bool> AbortFlag(false); + std::atomic<bool> PauseFlag(false); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + uint32_t ComposedBlocks = 0; uint64_t CreateBlocksStartMS = Timer.GetElapsedTimeMs(); - try { - Stopwatch AssembleBlocksProgressTimer; + Stopwatch BlockCreateProgressTimer; remotestore_impl::BlockComposer Composer(remotestore_impl::BlockComposer::Configuration{ .MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = MaxChunksPerBlock, .MaxChunkEmbedSize = MaxChunkEmbedSize, .IsCancelledFunc = [OptionalContext]() { return remotestore_impl::IsCancelled(OptionalContext); }}); - auto OnNewBlock = [&WorkerPool, + auto OnNewBlock = [&Work, + &WorkerPool, BuildBlocks, - &AssembleBlocksProgressTimer, - &BlockCreateLatch, + &BlockCreateProgressTimer, &BlocksLock, &Blocks, &AsyncOnBlock, @@ -2906,9 +2888,8 @@ BuildContainer(CidStore& ChunkStore, ChunkAssembleCount, &ChunksAssembled, &ComposedBlocks, - OptionalContext, - &RemoteResult](std::vector<IoHash>&& ChunkRawHashes, - const std::function<FetchChunkFunc(const IoHash& AttachmentHash)>& FetchAttachmentResolver) { + OptionalContext](std::vector<IoHash>&& ChunkRawHashes, + const std::function<FetchChunkFunc(const IoHash& AttachmentHash)>& FetchAttachmentResolver) { size_t ChunkCount = ChunkRawHashes.size(); std::vector<std::pair<IoHash, FetchChunkFunc>> ChunksInBlock; ChunksInBlock.reserve(ChunkCount); @@ -2921,14 +2902,14 @@ BuildContainer(CidStore& ChunkStore, size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks); if (BuildBlocks) { - remotestore_impl::AsyncCreateBlock(WorkerPool, - BlockCreateLatch, + remotestore_impl::AsyncCreateBlock(Work, + WorkerPool, std::move(ChunksInBlock), BlocksLock, Blocks, BlockIndex, AsyncOnBlock, - RemoteResult); + OptionalContext); } else { @@ -2951,7 +2932,7 @@ BuildContainer(CidStore& ChunkStore, fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), ChunkAssembleCount, ChunkAssembleCount - ChunksAssembled, - AssembleBlocksProgressTimer.GetElapsedTimeMs()); + BlockCreateProgressTimer.GetElapsedTimeMs()); } }; @@ -2972,11 +2953,8 @@ BuildContainer(CidStore& ChunkStore, } else { - RemoteResult.SetError( - gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Attachment to upload state inconsistent, could not find attachment {}", Attachment.first), - ""); - return {}; + throw std::runtime_error( + fmt::format("Attachment to upload state inconsistent, could not find attachment {}", Attachment.first)); } AttachmentKeys.push_back(Attachment.second); } @@ -3118,205 +3096,135 @@ BuildContainer(CidStore& ChunkStore, }); } - if (ChunkAssembleCount > 0) + if (remotestore_impl::IsCancelled(OptionalContext)) { - remotestore_impl::ReportProgress(OptionalContext, - "Assembling blocks"sv, - fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), - ChunkAssembleCount, - 0, - AssembleBlocksProgressTimer.GetElapsedTimeMs()); + Work.Abort(); } - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and {} loose attachments in {}", - ChunkAssembleCount, - TotalOpCount, - ComposedBlocks, - LargeChunkHashes.size(), - NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs())))); - - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - BlockCreateLatch.CountDown(); - while (!BlockCreateLatch.Wait(1000)) - { - ptrdiff_t Remaining = BlockCreateLatch.Remaining(); - remotestore_impl::ReportProgress(OptionalContext, - "Assembling blocks"sv, - fmt::format("Aborting, {} blocks remaining...", Remaining), - ComposedBlocks, - Remaining, - AssembleBlocksProgressTimer.GetElapsedTimeMs()); - } - if (ComposedBlocks > 0) + Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted, IsPaused); + if (remotestore_impl::IsCancelled(OptionalContext)) { - remotestore_impl::ReportProgress(OptionalContext, - "Assembling blocks"sv, - fmt::format("Aborting, {} blocks remaining...", 0), - ComposedBlocks, - 0, - AssembleBlocksProgressTimer.GetElapsedTimeMs()); + AbortFlag.store(true); } - return {}; - } - } - catch (const std::exception& Ex) - { - BlockCreateLatch.CountDown(); - while (!BlockCreateLatch.Wait(1000)) - { - } - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), "Block creation failed", Ex.what()); - throw; - } + remotestore_impl::ReportProgress(OptionalContext, + "Creating blocks"sv, + fmt::format("{}{} remaining...", AbortFlag.load() ? "Aborting, " : "", PendingWork), + ComposedBlocks, + PendingWork, + BlockCreateProgressTimer.GetElapsedTimeMs()); + }); - Stopwatch BlockCreateProgressTimer; - BlockCreateLatch.CountDown(); - while (!BlockCreateLatch.Wait(1000)) - { - ptrdiff_t Remaining = BlockCreateLatch.Remaining(); - if (remotestore_impl::IsCancelled(OptionalContext)) + if (!AbortFlag.load() && ComposedBlocks > 0) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - while (!BlockCreateLatch.Wait(1000)) - { - Remaining = BlockCreateLatch.Remaining(); - remotestore_impl::ReportProgress(OptionalContext, - "Creating blocks"sv, - fmt::format("Aborting, {} blocks remaining...", Remaining), - ComposedBlocks, - Remaining, - BlockCreateProgressTimer.GetElapsedTimeMs()); - } remotestore_impl::ReportProgress(OptionalContext, "Creating blocks"sv, - "Aborted"sv, + ""sv, ComposedBlocks, 0, BlockCreateProgressTimer.GetElapsedTimeMs()); - return {}; + + uint64_t NowMS = Timer.GetElapsedTimeMs(); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Created {} blocks in {}", ComposedBlocks, NiceTimeSpanMs(NowMS - CreateBlocksStartMS))); } - remotestore_impl::ReportProgress(OptionalContext, - "Creating blocks"sv, - fmt::format("{} remaining...", Remaining), - ComposedBlocks, - Remaining, - BlockCreateProgressTimer.GetElapsedTimeMs()); } - if (ComposedBlocks > 0) + if (remotestore_impl::IsCancelled(OptionalContext)) { - uint64_t NowMS = Timer.GetElapsedTimeMs(); - remotestore_impl::ReportProgress(OptionalContext, - "Creating blocks"sv, - ""sv, - ComposedBlocks, - 0, - BlockCreateProgressTimer.GetElapsedTimeMs()); - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Created {} blocks in {}", ComposedBlocks, NiceTimeSpanMs(NowMS - CreateBlocksStartMS))); + return {}; } - if (!RemoteResult.IsError()) + CbObjectWriter OplogContinerWriter; + RwLock::SharedLockScope _(BlocksLock); + OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer()); + OplogContinerWriter.BeginArray("blocks"sv); { - CbObjectWriter OplogContinerWriter; - RwLock::SharedLockScope _(BlocksLock); - OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer()); - OplogContinerWriter.BeginArray("blocks"sv); + for (const ChunkBlockDescription& B : Blocks) { - for (const ChunkBlockDescription& B : Blocks) + ZEN_ASSERT(!B.ChunkRawHashes.empty()); + if (BuildBlocks) { - ZEN_ASSERT(!B.ChunkRawHashes.empty()); - if (BuildBlocks) - { - ZEN_ASSERT(B.BlockHash != IoHash::Zero); + ZEN_ASSERT(B.BlockHash != IoHash::Zero); - OplogContinerWriter.BeginObject(); - { - OplogContinerWriter.AddBinaryAttachment("rawhash"sv, B.BlockHash); - OplogContinerWriter.BeginArray("chunks"sv); - { - for (const IoHash& RawHash : B.ChunkRawHashes) - { - OplogContinerWriter.AddHash(RawHash); - } - } - OplogContinerWriter.EndArray(); // "chunks" - } - OplogContinerWriter.EndObject(); - continue; - } - - ZEN_ASSERT(B.BlockHash == IoHash::Zero); OplogContinerWriter.BeginObject(); { + OplogContinerWriter.AddBinaryAttachment("rawhash"sv, B.BlockHash); OplogContinerWriter.BeginArray("chunks"sv); { for (const IoHash& RawHash : B.ChunkRawHashes) { - OplogContinerWriter.AddBinaryAttachment(RawHash); + OplogContinerWriter.AddHash(RawHash); } } - OplogContinerWriter.EndArray(); + OplogContinerWriter.EndArray(); // "chunks" } OplogContinerWriter.EndObject(); + continue; } + + ZEN_ASSERT(B.BlockHash == IoHash::Zero); + OplogContinerWriter.BeginObject(); + { + OplogContinerWriter.BeginArray("chunks"sv); + { + for (const IoHash& RawHash : B.ChunkRawHashes) + { + OplogContinerWriter.AddBinaryAttachment(RawHash); + } + } + OplogContinerWriter.EndArray(); + } + OplogContinerWriter.EndObject(); } - OplogContinerWriter.EndArray(); // "blocks"sv - OplogContinerWriter.BeginArray("chunkedfiles"sv); + } + OplogContinerWriter.EndArray(); // "blocks"sv + OplogContinerWriter.BeginArray("chunkedfiles"sv); + { + for (const remotestore_impl::ChunkedFile& F : ChunkedFiles) { - for (const remotestore_impl::ChunkedFile& F : ChunkedFiles) + OplogContinerWriter.BeginObject(); { - OplogContinerWriter.BeginObject(); + OplogContinerWriter.AddHash("rawhash"sv, F.Chunked.Info.RawHash); + OplogContinerWriter.AddInteger("rawsize"sv, F.Chunked.Info.RawSize); + OplogContinerWriter.BeginArray("chunks"sv); { - OplogContinerWriter.AddHash("rawhash"sv, F.Chunked.Info.RawHash); - OplogContinerWriter.AddInteger("rawsize"sv, F.Chunked.Info.RawSize); - OplogContinerWriter.BeginArray("chunks"sv); + for (const IoHash& RawHash : F.Chunked.Info.ChunkHashes) { - for (const IoHash& RawHash : F.Chunked.Info.ChunkHashes) - { - OplogContinerWriter.AddHash(RawHash); - } + OplogContinerWriter.AddHash(RawHash); } - OplogContinerWriter.EndArray(); // "chunks" - OplogContinerWriter.BeginArray("sequence"sv); + } + OplogContinerWriter.EndArray(); // "chunks" + OplogContinerWriter.BeginArray("sequence"sv); + { + for (uint32_t ChunkIndex : F.Chunked.Info.ChunkSequence) { - for (uint32_t ChunkIndex : F.Chunked.Info.ChunkSequence) - { - OplogContinerWriter.AddInteger(ChunkIndex); - } + OplogContinerWriter.AddInteger(ChunkIndex); } - OplogContinerWriter.EndArray(); // "sequence" } - OplogContinerWriter.EndObject(); + OplogContinerWriter.EndArray(); // "sequence" } + OplogContinerWriter.EndObject(); } - OplogContinerWriter.EndArray(); // "chunkedfiles"sv + } + OplogContinerWriter.EndArray(); // "chunkedfiles"sv - OplogContinerWriter.BeginArray("chunks"sv); + OplogContinerWriter.BeginArray("chunks"sv); + { + for (const IoHash& AttachmentHash : LargeChunkHashes) { - for (const IoHash& AttachmentHash : LargeChunkHashes) - { - OplogContinerWriter.AddBinaryAttachment(AttachmentHash); - } + OplogContinerWriter.AddBinaryAttachment(AttachmentHash); } - OplogContinerWriter.EndArray(); // "chunks" - - OplogContainerObject = OplogContinerWriter.Save(); } + OplogContinerWriter.EndArray(); // "chunks" + + OplogContainerObject = OplogContinerWriter.Save(); return OplogContainerObject; } -RemoteProjectStore::LoadContainerResult +CbObject BuildContainer(CidStore& ChunkStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, @@ -3333,29 +3241,26 @@ BuildContainer(CidStore& ChunkStore, const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles) { - remotestore_impl::AsyncRemoteResult RemoteResult; - CbObject ContainerObject = BuildContainer(ChunkStore, - Project, - Oplog, - MaxBlockSize, - MaxChunksPerBlock, - MaxChunkEmbedSize, - ChunkFileSizeLimit, - BuildBlocks, - IgnoreMissingAttachments, - AllowChunking, - {}, - WorkerPool, - AsyncOnBlock, - OnLargeAttachment, - OnBlockChunks, - EmbedLooseFiles, - nullptr, - RemoteResult); - return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject}; + return BuildContainer(ChunkStore, + Project, + Oplog, + MaxBlockSize, + MaxChunksPerBlock, + MaxChunkEmbedSize, + ChunkFileSizeLimit, + BuildBlocks, + IgnoreMissingAttachments, + AllowChunking, + {}, + WorkerPool, + AsyncOnBlock, + OnLargeAttachment, + OnBlockChunks, + EmbedLooseFiles, + /*OptionalContext*/ nullptr); } -RemoteProjectStore::Result +void SaveOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Project& Project, @@ -3387,46 +3292,36 @@ SaveOplog(CidStore& ChunkStore, CreateDirectories(AttachmentTempPath); } - remotestore_impl::AsyncRemoteResult RemoteResult; RwLock AttachmentsLock; std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments; std::unordered_map<IoHash, remotestore_impl::CreatedBlock, IoHash::Hasher> CreatedBlocks; tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseLargeFiles; - auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock, - ChunkBlockDescription&& Block) { + auto MakeTempBlock = [AttachmentTempPath, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock, + ChunkBlockDescription&& Block) { std::filesystem::path BlockPath = AttachmentTempPath; BlockPath.append(Block.BlockHash.ToHexString()); - try - { - IoBuffer BlockBuffer = WriteToTempFile(std::move(CompressedBlock).GetCompressed(), BlockPath); - const uint64_t BlockSize = BlockBuffer.GetSize(); - RwLock::ExclusiveLockScope __(AttachmentsLock); - CreatedBlocks.insert( - {Block.BlockHash, {.Payload = CompositeBuffer(SharedBuffer(std::move(BlockBuffer))), .Block = std::move(Block)}}); - ZEN_DEBUG("Saved temp block to '{}', {}", AttachmentTempPath, NiceBytes(BlockSize)); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - Ex.what(), - "Unable to create temp block file"); - return; - } + IoBuffer BlockBuffer = WriteToTempFile(std::move(CompressedBlock).GetCompressed(), BlockPath); + const uint64_t BlockSize = BlockBuffer.GetSize(); + RwLock::ExclusiveLockScope __(AttachmentsLock); + CreatedBlocks.insert( + {Block.BlockHash, {.Payload = CompositeBuffer(SharedBuffer(std::move(BlockBuffer))), .Block = std::move(Block)}}); + ZEN_DEBUG("Saved temp block to '{}', {}", AttachmentTempPath, NiceBytes(BlockSize)); }; - auto UploadBlock = [&RemoteStore, &RemoteResult, &Info, OptionalContext](CompressedBuffer&& CompressedBlock, - ChunkBlockDescription&& Block) { + auto UploadBlock = [&RemoteStore, &RemoteStoreInfo, &Info, OptionalContext](CompressedBuffer&& CompressedBlock, + ChunkBlockDescription&& Block) { IoHash BlockHash = Block.BlockHash; RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash, std::move(Block)); if (Result.ErrorCode) { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Failed to save attachment ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return; + throw RemoteStoreError(fmt::format("Failed to save block attachment {} for oplog '{}': {}", + BlockHash, + RemoteStoreInfo.ContainerName, + Result.Reason), + Result.ErrorCode, + Result.Text); } Info.AttachmentBlocksUploaded.fetch_add(1); Info.AttachmentBlockBytesUploaded.fetch_add(CompressedBlock.GetCompressedSize()); @@ -3466,15 +3361,10 @@ SaveOplog(CidStore& ChunkStore, RemoteProjectStore::CreateContainerResult ContainerResult = RemoteStore.CreateContainer(); if (ContainerResult.ErrorCode) { - RemoteProjectStore::Result Result = {.ErrorCode = ContainerResult.ErrorCode, - .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, - .Text = fmt::format("Failed to create container for oplog '{}' ({}): {}", - RemoteStoreInfo.ContainerName, - ContainerResult.ErrorCode, - ContainerResult.Reason)}; - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return Result; + throw RemoteStoreError( + fmt::format("Failed to create container for oplog '{}': {}", RemoteStoreInfo.ContainerName, ContainerResult.Reason), + ContainerResult.ErrorCode, + ContainerResult.Text); } if (RemoteStoreInfo.CreateBlocks) @@ -3490,7 +3380,7 @@ SaveOplog(CidStore& ChunkStore, { ZEN_ASSERT(BlockDescription.ChunkCompressedLengths.empty()); - size_t ChunkCount = BlockDescription.ChunkRawLengths.size(); + size_t ChunkCount = BlockDescription.ChunkRawHashes.size(); if (ChunkCount > 0) { // Fake sizes, will give usage number of number of chunks used rather than bytes used - better than nothing @@ -3540,97 +3430,92 @@ SaveOplog(CidStore& ChunkStore, OnLargeAttachment, OnBlockChunks, EmbedLooseFiles, - OptionalContext, - /* out */ RemoteResult); - if (!RemoteResult.IsError()) + OptionalContext); + if (remotestore_impl::IsCancelled(OptionalContext)) { - Info.OplogSizeBytes = OplogContainerObject.GetSize(); + return; + } - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteProjectStore::Result Result = {.ErrorCode = 0, - .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, - .Text = "Operation cancelled"}; - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return Result; - } + Info.OplogSizeBytes = OplogContainerObject.GetSize(); - uint64_t ChunkCount = OplogContainerObject["chunks"sv].AsArrayView().Num(); - uint64_t BlockCount = OplogContainerObject["blocks"sv].AsArrayView().Num(); + if (remotestore_impl::IsCancelled(OptionalContext)) + { + return; + } + + uint64_t ChunkCount = OplogContainerObject["chunks"sv].AsArrayView().Num(); + uint64_t BlockCount = OplogContainerObject["blocks"sv].AsArrayView().Num(); + remotestore_impl::ReportMessage(OptionalContext, + fmt::format("Saving oplog container '{}' with {} attachments and {} blocks...", + RemoteStoreInfo.ContainerName, + ChunkCount, + BlockCount)); + Stopwatch SaveContainerTimer; + IoBuffer ContainerPayload = OplogContainerObject.GetBuffer().AsIoBuffer(); + ContainerPayload.SetContentType(ZenContentType::kCbObject); + RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(std::move(ContainerPayload)); + TransferWallTimeMS += SaveContainerTimer.GetElapsedTimeMs(); + if (ContainerSaveResult.ErrorCode) + { + throw RemoteStoreError(fmt::format("Failed to save oplog container for oplog '{}': {}", + RemoteStoreInfo.ContainerName, + ContainerSaveResult.RawHash, + ContainerSaveResult.Reason), + ContainerSaveResult.ErrorCode, + ContainerSaveResult.Text); + } + else + { remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Saving oplog container '{}' with {} attachments and {} blocks...", + fmt::format("Saved container '{}' in {}", RemoteStoreInfo.ContainerName, - ChunkCount, - BlockCount)); - Stopwatch SaveContainerTimer; - IoBuffer ContainerPayload = OplogContainerObject.GetBuffer().AsIoBuffer(); - ContainerPayload.SetContentType(ZenContentType::kCbObject); - RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(std::move(ContainerPayload)); - TransferWallTimeMS += SaveContainerTimer.GetElapsedTimeMs(); - if (ContainerSaveResult.ErrorCode) - { - RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container"); - RemoteProjectStore::Result Result = { - .ErrorCode = RemoteResult.GetError(), - .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, - .Text = fmt::format("Failed to save oplog container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())}; - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Failed to save oplog container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return Result; - } - else - { - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Saved container '{}' in {}", - RemoteStoreInfo.ContainerName, - NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000.0)))); - } + NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000.0)))); + } + { + Stopwatch UploadAttachmentsTimer; + remotestore_impl::AsyncRemoteResult RemoteResult; + UploadAttachments(NetworkWorkerPool, + ChunkStore, + RemoteStore, + LargeAttachments, + BlockChunks, + CreatedBlocks, + LooseLargeFiles, + ContainerSaveResult.Needs, + ForceUpload, + Info, + RemoteResult, + OptionalContext); + TransferWallTimeMS += UploadAttachmentsTimer.GetElapsedTimeMs(); + if (RemoteResult.IsError()) { - Stopwatch UploadAttachmentsTimer; - UploadAttachments(NetworkWorkerPool, - ChunkStore, - RemoteStore, - LargeAttachments, - BlockChunks, - CreatedBlocks, - LooseLargeFiles, - ContainerSaveResult.Needs, - ForceUpload, - Info, - RemoteResult, - OptionalContext); - TransferWallTimeMS += UploadAttachmentsTimer.GetElapsedTimeMs(); + throw RemoteStoreError(fmt::format("Failed to upload attachments for oplog '{}': {}", + RemoteStoreInfo.ContainerName, + RemoteResult.GetErrorReason()), + RemoteResult.GetError(), + RemoteResult.GetErrorText()); } - uint32_t Try = 0; - while (!RemoteResult.IsError()) + const uint32_t MaxTries = 8; + uint32_t Try = 0; + while (Try < MaxTries) { if (remotestore_impl::IsCancelled(OptionalContext)) { - RemoteProjectStore::Result Result = {.ErrorCode = 0, - .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, - .Text = "Operation cancelled"}; - remotestore_impl::ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Text)); - return Result; + return; } remotestore_impl::ReportMessage(OptionalContext, "Finalizing oplog container..."); RemoteProjectStore::FinalizeResult ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash); if (ContainerFinalizeResult.ErrorCode) { - RemoteResult.SetError(ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Reason, ContainerFinalizeResult.Text); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Failed to finalize oplog container {} ({}): {}", - ContainerSaveResult.RawHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - RemoteProjectStore::Result Result = RemoteResult.ConvertResult(); - return Result; + throw RemoteStoreError( + fmt::format("Failed to finalize oplog container {}: {}", ContainerSaveResult.RawHash, ContainerFinalizeResult.Reason), + ContainerFinalizeResult.ErrorCode, + ContainerFinalizeResult.Text); } + remotestore_impl::ReportMessage( OptionalContext, fmt::format("Finalized container '{}' in {}", @@ -3644,13 +3529,9 @@ SaveOplog(CidStore& ChunkStore, if (remotestore_impl::IsCancelled(OptionalContext)) { - RemoteProjectStore::Result Result = {.ErrorCode = 0, - .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, - .Text = "Operation cancelled"}; - return Result; + return; } - const uint32_t MaxTries = 8; if (Try < MaxTries) { Try++; @@ -3662,7 +3543,7 @@ SaveOplog(CidStore& ChunkStore, ContainerFinalizeResult.Needs.size(), Try)); - Stopwatch UploadAttachmentsTimer; + Stopwatch RetryUploadAttachmentsTimer; UploadAttachments(NetworkWorkerPool, ChunkStore, RemoteStore, @@ -3675,30 +3556,31 @@ SaveOplog(CidStore& ChunkStore, Info, RemoteResult, OptionalContext); - TransferWallTimeMS += UploadAttachmentsTimer.GetElapsedTimeMs(); + TransferWallTimeMS += RetryUploadAttachmentsTimer.GetElapsedTimeMs(); + if (RemoteResult.IsError()) + { + throw RemoteStoreError(fmt::format("Failed to upload attachments for oplog '{}': {}", + RemoteStoreInfo.ContainerName, + RemoteResult.GetErrorReason()), + RemoteResult.GetError(), + RemoteResult.GetErrorText()); + } } else { - RemoteResult.SetError( - gsl::narrow<int>(HttpResponseCode::InternalServerError), - "Failed to save oplog container", + throw std::runtime_error( fmt::format("Giving up finalize oplog container {} after {} retries, still getting reports of missing attachments", ContainerSaveResult.RawHash, ContainerFinalizeResult.Needs.size())); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Failed to finalize oplog container container {} ({}): {}", - ContainerSaveResult.RawHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - break; } } - - LooseLargeFiles.clear(); - CreatedBlocks.clear(); } - RemoteProjectStore::Result Result = RemoteResult.ConvertResult(); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; + + LooseLargeFiles.clear(); + CreatedBlocks.clear(); + + // RemoteProjectStore::Result Result = RemoteResult.ConvertResult(); + // Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; remotestore_impl::LogRemoteStoreStatsDetails(RemoteStore.GetStats()); @@ -3706,8 +3588,8 @@ SaveOplog(CidStore& ChunkStore, OptionalContext, fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}) {}", RemoteStoreInfo.ContainerName, - RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), + "SUCCESS", + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), NiceBytes(Info.OplogSizeBytes), Info.AttachmentBlocksUploaded.load(), NiceBytes(Info.AttachmentBlockBytesUploaded.load()), @@ -3715,7 +3597,7 @@ SaveOplog(CidStore& ChunkStore, NiceBytes(Info.AttachmentBytesUploaded.load()), remotestore_impl::GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, TransferWallTimeMS))); - return Result; + // return Result; }; RemoteProjectStore::Result @@ -5044,22 +4926,20 @@ TEST_CASE_TEMPLATE("project.store.export", WorkerThreadPool WorkerPool(WorkerCount); WorkerThreadPool NetworkPool(NetworkWorkerCount); - RemoteProjectStore::Result ExportResult = SaveOplog(CidStore, - *RemoteStore, - *Project.Get(), - *Oplog, - NetworkPool, - WorkerPool, - Options.MaxBlockSize, - Options.MaxChunksPerBlock, - Options.MaxChunkEmbedSize, - Options.ChunkFileSizeLimit, - true, - false, - false, - nullptr); - - REQUIRE(ExportResult.ErrorCode == 0); + SaveOplog(CidStore, + *RemoteStore, + *Project.Get(), + *Oplog, + NetworkPool, + WorkerPool, + Options.MaxBlockSize, + Options.MaxChunksPerBlock, + Options.MaxChunkEmbedSize, + Options.ChunkFileSizeLimit, + true, + false, + false, + nullptr); Ref<ProjectStore::Oplog> OplogImport = Project->NewOplog("oplog2", {}); CHECK(OplogImport); @@ -5132,13 +5012,12 @@ TEST_CASE_TEMPLATE("project.store.export", // Common oplog setup used by the two tests below. // Returns a FileRemoteStore backed by ExportDir that has been populated with a SaveOplog call. // Keeps the test data identical to project.store.export so the two test suites exercise the same blocks/attachments. -static RemoteProjectStore::Result -SetupExportStore(CidStore& CidStore, - ProjectStore::Project& Project, - WorkerThreadPool& NetworkPool, - WorkerThreadPool& WorkerPool, - const std::filesystem::path& ExportDir, - std::shared_ptr<RemoteProjectStore>& OutRemoteStore) +static std::shared_ptr<RemoteProjectStore> +SetupExportStore(CidStore& CidStore, + ProjectStore::Project& Project, + WorkerThreadPool& NetworkPool, + WorkerThreadPool& WorkerPool, + const std::filesystem::path& ExportDir) { using namespace projectstore_testutils; using namespace std::literals; @@ -5146,7 +5025,7 @@ SetupExportStore(CidStore& CidStore, Ref<ProjectStore::Oplog> Oplog = Project.NewOplog("oplog_export", {}); if (!Oplog) { - return RemoteProjectStore::Result{.ErrorCode = -1}; + throw std::runtime_error("Failed to create oplog"); } Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); @@ -5172,21 +5051,22 @@ SetupExportStore(CidStore& CidStore, /*.ForceDisableBlocks =*/false, /*.ForceEnableTempBlocks =*/false}; - OutRemoteStore = CreateFileRemoteStore(Log(), Options); - return SaveOplog(CidStore, - *OutRemoteStore, - Project, - *Oplog, - NetworkPool, - WorkerPool, - Options.MaxBlockSize, - Options.MaxChunksPerBlock, - Options.MaxChunkEmbedSize, - Options.ChunkFileSizeLimit, - /*EmbedLooseFiles*/ true, - /*ForceUpload*/ false, - /*IgnoreMissingAttachments*/ false, - /*OptionalContext*/ nullptr); + std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Log(), Options); + SaveOplog(CidStore, + *RemoteStore, + Project, + *Oplog, + NetworkPool, + WorkerPool, + Options.MaxBlockSize, + Options.MaxChunksPerBlock, + Options.MaxChunkEmbedSize, + Options.ChunkFileSizeLimit, + /*EmbedLooseFiles*/ true, + /*ForceUpload*/ false, + /*IgnoreMissingAttachments*/ false, + /*OptionalContext*/ nullptr); + return RemoteStore; } // Creates an export store with a single oplog entry that packs six 512 KB chunks into one @@ -5198,11 +5078,8 @@ SetupExportStore(CidStore& CidStore, // Project internally so that each call is independent of any outer test context. After // SaveOplog returns, all persistent data lives on disk inside ExportDir and the caller can // freely query OutRemoteStore without holding any references to the internal context. -static RemoteProjectStore::Result -SetupPartialBlockExportStore(WorkerThreadPool& NetworkPool, - WorkerThreadPool& WorkerPool, - const std::filesystem::path& ExportDir, - std::shared_ptr<RemoteProjectStore>& OutRemoteStore) +static std::shared_ptr<RemoteProjectStore> +SetupPartialBlockExportStore(WorkerThreadPool& NetworkPool, WorkerThreadPool& WorkerPool, const std::filesystem::path& ExportDir) { using namespace projectstore_testutils; using namespace std::literals; @@ -5226,7 +5103,7 @@ SetupPartialBlockExportStore(WorkerThreadPool& NetworkPool, Ref<ProjectStore::Oplog> Oplog = LocalProject->NewOplog("oplog_partial_block", {}); if (!Oplog) { - return RemoteProjectStore::Result{.ErrorCode = -1}; + throw std::runtime_error("Failed to create oplog"); } // Six 512 KB chunks with OodleCompressionLevel::None so the compressed size stays large @@ -5240,30 +5117,31 @@ SetupPartialBlockExportStore(WorkerThreadPool& NetworkPool, // (OodleCompressionLevel::None → compressed ≈ raw ≈ 512 KB). With the legacy // 32 KB limit all six chunks would become loose large attachments and no block would // be created, so we use the production default of 1.5 MB instead. - FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 8u * 1024u * 1024u, - .MaxChunksPerBlock = 1000, - .MaxChunkEmbedSize = RemoteStoreOptions::DefaultMaxChunkEmbedSize, - .ChunkFileSizeLimit = 64u * 1024u * 1024u}, - /*.FolderPath =*/ExportDir, - /*.Name =*/std::string("oplog_partial_block"), - /*.OptionalBaseName =*/std::string(), - /*.ForceDisableBlocks =*/false, - /*.ForceEnableTempBlocks =*/false}; - OutRemoteStore = CreateFileRemoteStore(Log(), Options); - return SaveOplog(LocalCidStore, - *OutRemoteStore, - *LocalProject, - *Oplog, - NetworkPool, - WorkerPool, - Options.MaxBlockSize, - Options.MaxChunksPerBlock, - Options.MaxChunkEmbedSize, - Options.ChunkFileSizeLimit, - /*EmbedLooseFiles*/ true, - /*ForceUpload*/ false, - /*IgnoreMissingAttachments*/ false, - /*OptionalContext*/ nullptr); + FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 8u * 1024u * 1024u, + .MaxChunksPerBlock = 1000, + .MaxChunkEmbedSize = RemoteStoreOptions::DefaultMaxChunkEmbedSize, + .ChunkFileSizeLimit = 64u * 1024u * 1024u}, + /*.FolderPath =*/ExportDir, + /*.Name =*/std::string("oplog_partial_block"), + /*.OptionalBaseName =*/std::string(), + /*.ForceDisableBlocks =*/false, + /*.ForceEnableTempBlocks =*/false}; + std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Log(), Options); + SaveOplog(LocalCidStore, + *RemoteStore, + *LocalProject, + *Oplog, + NetworkPool, + WorkerPool, + Options.MaxBlockSize, + Options.MaxChunksPerBlock, + Options.MaxChunkEmbedSize, + Options.ChunkFileSizeLimit, + /*EmbedLooseFiles*/ true, + /*ForceUpload*/ false, + /*IgnoreMissingAttachments*/ false, + /*OptionalContext*/ nullptr); + return RemoteStore; } // Returns the first block hash that has at least MinChunkCount chunks, or a zero IoHash @@ -5373,10 +5251,8 @@ TEST_CASE("project.store.import.context_settings") WorkerThreadPool WorkerPool(WorkerCount); WorkerThreadPool NetworkPool(NetworkWorkerCount); - std::shared_ptr<RemoteProjectStore> RemoteStore; - RemoteProjectStore::Result ExportResult = - SetupExportStore(ExportCidStore, *ExportProject, NetworkPool, WorkerPool, ExportDir.Path(), RemoteStore); - REQUIRE(ExportResult.ErrorCode == 0); + std::shared_ptr<RemoteProjectStore> RemoteStore = + SetupExportStore(ExportCidStore, *ExportProject, NetworkPool, WorkerPool, ExportDir.Path()); // Import-side CAS and project store: starts empty, mirroring a fresh machine that has never // downloaded the data. HasAttachment() therefore returns false for every chunk, so the import @@ -5558,10 +5434,8 @@ TEST_CASE("project.store.import.context_settings") { // Export store with 6 × 512 KB chunks packed into one ~3 MB block. ScopedTemporaryDirectory PartialExportDir; - std::shared_ptr<RemoteProjectStore> PartialRemoteStore; - RemoteProjectStore::Result ExportR = - SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path(), PartialRemoteStore); - REQUIRE(ExportR.ErrorCode == 0); + std::shared_ptr<RemoteProjectStore> PartialRemoteStore = + SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path()); // Seeding even-indexed chunks (0, 2, 4) leaves odd ones (1, 3, 5) absent in // ImportCidStore. Three non-adjacent needed positions → three BlockRangeDescriptors. @@ -5596,10 +5470,8 @@ TEST_CASE("project.store.import.context_settings") // Same block layout as partial_block_cloud_multirange but StoreMaxRangeCountPerRequest=1. // DownloadPartialBlock issues one LoadAttachmentRanges call per range. ScopedTemporaryDirectory PartialExportDir; - std::shared_ptr<RemoteProjectStore> PartialRemoteStore; - RemoteProjectStore::Result ExportR = - SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path(), PartialRemoteStore); - REQUIRE(ExportR.ErrorCode == 0); + std::shared_ptr<RemoteProjectStore> PartialRemoteStore = + SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path()); IoHash BlockHash = FindBlockWithMultipleChunks(*PartialRemoteStore, 4u); CHECK(BlockHash != IoHash::Zero); @@ -5629,10 +5501,8 @@ TEST_CASE("project.store.import.context_settings") SUBCASE("partial_block_cache_multirange") { ScopedTemporaryDirectory PartialExportDir; - std::shared_ptr<RemoteProjectStore> PartialRemoteStore; - RemoteProjectStore::Result ExportR = - SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path(), PartialRemoteStore); - REQUIRE(ExportR.ErrorCode == 0); + std::shared_ptr<RemoteProjectStore> PartialRemoteStore = + SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path()); IoHash BlockHash = FindBlockWithMultipleChunks(*PartialRemoteStore, 4u); CHECK(BlockHash != IoHash::Zero); @@ -5700,10 +5570,8 @@ TEST_CASE("project.store.import.context_settings") SUBCASE("partial_block_cache_singlerange") { ScopedTemporaryDirectory PartialExportDir; - std::shared_ptr<RemoteProjectStore> PartialRemoteStore; - RemoteProjectStore::Result ExportR = - SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path(), PartialRemoteStore); - REQUIRE(ExportR.ErrorCode == 0); + std::shared_ptr<RemoteProjectStore> PartialRemoteStore = + SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path()); IoHash BlockHash = FindBlockWithMultipleChunks(*PartialRemoteStore, 4u); CHECK(BlockHash != IoHash::Zero); @@ -5797,7 +5665,7 @@ MakeTestProject(CidStore& CidStore, } // Helper: call SaveOplog with a FileRemoteStore backed by ExportDir. -static RemoteProjectStore::Result +static void RunSaveOplog(CidStore& CidStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, @@ -5831,48 +5699,20 @@ RunSaveOplog(CidStore& CidStore, *OutRemoteStore = RemoteStore; } RemoteProjectStore::Result Result; - try - { - Result = SaveOplog(CidStore, - *RemoteStore, - Project, - Oplog, - NetworkPool, - WorkerPool, - Options.MaxBlockSize, - Options.MaxChunksPerBlock, - Options.MaxChunkEmbedSize, - Options.ChunkFileSizeLimit, - EmbedLooseFiles, - ForceUpload, - IgnoreMissingAttachments, - OptionalContext); - } - catch (const AssertException& AssertEx) - { - Result.ErrorCode = static_cast<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = AssertEx.what(); - Result.Text = fmt::format("Failed due to an assert exception: {}", AssertEx.FullDescription()); - } - catch (const std::system_error& SysEx) - { - Result.ErrorCode = static_cast<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = SysEx.what(); - Result.Text = fmt::format("Failed due to a system error ({}): {}", SysEx.code().value(), SysEx.what()); - } - catch (const std::exception& Ex) - { - Result.ErrorCode = static_cast<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = Ex.what(); - Result.Text = fmt::format("Failed due to an exception: {}", Ex.what()); - } - - if (OptionalContext && OptionalContext->IsCancelled()) - { - Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK); - Result.Reason = "Operation cancelled"; - } - return Result; + SaveOplog(CidStore, + *RemoteStore, + Project, + Oplog, + NetworkPool, + WorkerPool, + Options.MaxBlockSize, + Options.MaxChunksPerBlock, + Options.MaxChunkEmbedSize, + Options.ChunkFileSizeLimit, + EmbedLooseFiles, + ForceUpload, + IgnoreMissingAttachments, + OptionalContext); } TEST_CASE("project.store.export.no_attachments_needed") @@ -5901,24 +5741,23 @@ TEST_CASE("project.store.export.no_attachments_needed") WorkerThreadPool WorkerPool(WorkerCount); WorkerThreadPool NetworkPool(NetworkWorkerCount); - CapturingJobContext Ctx; - RemoteProjectStore::Result Result = RunSaveOplog(CidStore, - *Project, - *Oplog, - NetworkPool, - WorkerPool, - ExportDir.Path(), - "oplog_no_att", - 64u * 1024u, - 1000, - 32u * 1024u, - /*EmbedLooseFiles=*/true, - /*ForceUpload=*/true, - /*IgnoreMissingAttachments=*/false, - &Ctx, - /*ForceDisableBlocks=*/false); - - CHECK(Result.ErrorCode == 0); + CapturingJobContext Ctx; + RunSaveOplog(CidStore, + *Project, + *Oplog, + NetworkPool, + WorkerPool, + ExportDir.Path(), + "oplog_no_att", + 64u * 1024u, + 1000, + 32u * 1024u, + /*EmbedLooseFiles=*/true, + /*ForceUpload=*/true, + /*IgnoreMissingAttachments=*/false, + &Ctx, + /*ForceDisableBlocks=*/false); + CHECK(Ctx.HasMessage("No attachments needed")); } @@ -5952,24 +5791,22 @@ TEST_CASE("project.store.embed_loose_files_true") WorkerThreadPool NetworkPool(NetworkWorkerCount); std::shared_ptr<RemoteProjectStore> RemoteStore; - RemoteProjectStore::Result ExportResult = RunSaveOplog(CidStore, - *Project, - *Oplog, - NetworkPool, - WorkerPool, - ExportDir.Path(), - "oplog_embed_true", - 64u * 1024u, - 1000, - 32u * 1024u, - /*EmbedLooseFiles=*/true, - /*ForceUpload=*/false, - /*IgnoreMissingAttachments=*/false, - /*OptionalContext=*/nullptr, - /*ForceDisableBlocks=*/false, - &RemoteStore); - REQUIRE(ExportResult.ErrorCode == 0); - + RunSaveOplog(CidStore, + *Project, + *Oplog, + NetworkPool, + WorkerPool, + ExportDir.Path(), + "oplog_embed_true", + 64u * 1024u, + 1000, + 32u * 1024u, + /*EmbedLooseFiles=*/true, + /*ForceUpload=*/false, + /*IgnoreMissingAttachments=*/false, + /*OptionalContext=*/nullptr, + /*ForceDisableBlocks=*/false, + &RemoteStore); Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_embed_true_import", {}); RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, .RemoteStore = *RemoteStore, @@ -6010,23 +5847,22 @@ TEST_CASE("project.store.embed_loose_files_false") WorkerThreadPool NetworkPool(NetworkWorkerCount); std::shared_ptr<RemoteProjectStore> RemoteStore; - RemoteProjectStore::Result ExportResult = RunSaveOplog(CidStore, - *Project, - *Oplog, - NetworkPool, - WorkerPool, - ExportDir.Path(), - "oplog_embed_false", - 64u * 1024u, - 1000, - 32u * 1024u, - /*EmbedLooseFiles=*/false, - /*ForceUpload=*/false, - /*IgnoreMissingAttachments=*/false, - /*OptionalContext=*/nullptr, - /*ForceDisableBlocks=*/false, - &RemoteStore); - REQUIRE(ExportResult.ErrorCode == 0); + RunSaveOplog(CidStore, + *Project, + *Oplog, + NetworkPool, + WorkerPool, + ExportDir.Path(), + "oplog_embed_false", + 64u * 1024u, + 1000, + 32u * 1024u, + /*EmbedLooseFiles=*/false, + /*ForceUpload=*/false, + /*IgnoreMissingAttachments=*/false, + /*OptionalContext=*/nullptr, + /*ForceDisableBlocks=*/false, + &RemoteStore); Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_embed_false_import", {}); RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, @@ -6073,24 +5909,23 @@ TEST_CASE("project.store.export.missing_attachment_ignored") WorkerThreadPool WorkerPool(WorkerCount); WorkerThreadPool NetworkPool(NetworkWorkerCount); - CapturingJobContext Ctx; - RemoteProjectStore::Result Result = RunSaveOplog(CidStore, - *Project, - *Oplog, - NetworkPool, - WorkerPool, - ExportDir.Path(), - "oplog_missing_att", - 64u * 1024u, - 1000, - 32u * 1024u, - /*EmbedLooseFiles=*/true, - /*ForceUpload=*/false, - /*IgnoreMissingAttachments=*/true, - &Ctx, - /*ForceDisableBlocks=*/false); - - CHECK(Result.ErrorCode == 0); + CapturingJobContext Ctx; + RunSaveOplog(CidStore, + *Project, + *Oplog, + NetworkPool, + WorkerPool, + ExportDir.Path(), + "oplog_missing_att", + 64u * 1024u, + 1000, + 32u * 1024u, + /*EmbedLooseFiles=*/true, + /*ForceUpload=*/false, + /*IgnoreMissingAttachments=*/true, + &Ctx, + /*ForceDisableBlocks=*/false); + CHECK(Ctx.HasMessage("Missing attachment")); } @@ -6139,24 +5974,24 @@ TEST_CASE("project.store.export.missing_chunk_in_cidstore") WorkerThreadPool WorkerPool(WorkerCount); WorkerThreadPool NetworkPool(NetworkWorkerCount); - RemoteProjectStore::Result Result = RunSaveOplog(CidStore, - *Project, - *Oplog, - NetworkPool, - WorkerPool, - ExportDir.Path(), - "oplog_missing_cid", - 64u * 1024u, - 1000, - 32u * 1024u, - /*EmbedLooseFiles=*/true, - /*ForceUpload=*/false, - /*IgnoreMissingAttachments=*/false, - /*OptionalContext=*/nullptr, - /*ForceDisableBlocks=*/false); - - REQUIRE_NE(Result.ErrorCode, 0); - CHECK_NE(Result.Reason.find("Failed to find attachment"), std::string::npos); + CHECK_THROWS(RunSaveOplog(CidStore, + *Project, + *Oplog, + NetworkPool, + WorkerPool, + ExportDir.Path(), + "oplog_missing_cid", + 64u * 1024u, + 1000, + 32u * 1024u, + /*EmbedLooseFiles=*/true, + /*ForceUpload=*/false, + /*IgnoreMissingAttachments=*/false, + /*OptionalContext=*/nullptr, + /*ForceDisableBlocks=*/false)); + + // REQUIRE_NE(Result.ErrorCode, 0); + // CHECK_NE(Result.Reason.find("Failed to find attachment"), std::string::npos); } TEST_CASE("project.store.export.large_file_attachment_direct") @@ -6191,23 +6026,22 @@ TEST_CASE("project.store.export.large_file_attachment_direct") constexpr size_t MaxChunkEmbedSize = 32u * 1024u; std::shared_ptr<RemoteProjectStore> RemoteStore; - RemoteProjectStore::Result ExportResult = RunSaveOplog(CidStore, - *Project, - *Oplog, - NetworkPool, - WorkerPool, - ExportDir.Path(), - "oplog_large_direct", - 64u * 1024u, - 1000, - MaxChunkEmbedSize, - /*EmbedLooseFiles=*/true, - /*ForceUpload=*/false, - /*IgnoreMissingAttachments=*/false, - /*OptionalContext=*/nullptr, - /*ForceDisableBlocks=*/false, - &RemoteStore); - REQUIRE(ExportResult.ErrorCode == 0); + RunSaveOplog(CidStore, + *Project, + *Oplog, + NetworkPool, + WorkerPool, + ExportDir.Path(), + "oplog_large_direct", + 64u * 1024u, + 1000, + MaxChunkEmbedSize, + /*EmbedLooseFiles=*/true, + /*ForceUpload=*/false, + /*IgnoreMissingAttachments=*/false, + /*OptionalContext=*/nullptr, + /*ForceDisableBlocks=*/false, + &RemoteStore); Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_large_direct_import", {}); RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, @@ -6255,23 +6089,22 @@ TEST_CASE("project.store.export.large_file_attachment_via_temp") constexpr size_t MaxChunkEmbedSize = 32u * 1024u; std::shared_ptr<RemoteProjectStore> RemoteStore; - RemoteProjectStore::Result ExportResult = RunSaveOplog(CidStore, - *Project, - *Oplog, - NetworkPool, - WorkerPool, - ExportDir.Path(), - "oplog_large_via_temp", - 64u * 1024u, - 1000, - MaxChunkEmbedSize, - /*EmbedLooseFiles=*/true, - /*ForceUpload=*/false, - /*IgnoreMissingAttachments=*/false, - /*OptionalContext=*/nullptr, - /*ForceDisableBlocks=*/false, - &RemoteStore); - REQUIRE(ExportResult.ErrorCode == 0); + RunSaveOplog(CidStore, + *Project, + *Oplog, + NetworkPool, + WorkerPool, + ExportDir.Path(), + "oplog_large_via_temp", + 64u * 1024u, + 1000, + MaxChunkEmbedSize, + /*EmbedLooseFiles=*/true, + /*ForceUpload=*/false, + /*IgnoreMissingAttachments=*/false, + /*OptionalContext=*/nullptr, + /*ForceDisableBlocks=*/false, + &RemoteStore); Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_large_via_temp_import", {}); RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, @@ -6315,23 +6148,22 @@ TEST_CASE("project.store.export.large_chunk_from_cidstore") constexpr size_t MaxChunkEmbedSize = 32u * 1024u; std::shared_ptr<RemoteProjectStore> RemoteStore; - RemoteProjectStore::Result ExportResult = RunSaveOplog(CidStore, - *Project, - *Oplog, - NetworkPool, - WorkerPool, - ExportDir.Path(), - "oplog_large_cid", - 64u * 1024u, - 1000, - MaxChunkEmbedSize, - /*EmbedLooseFiles=*/true, - /*ForceUpload=*/false, - /*IgnoreMissingAttachments=*/false, - /*OptionalContext=*/nullptr, - /*ForceDisableBlocks=*/false, - &RemoteStore); - REQUIRE(ExportResult.ErrorCode == 0); + RunSaveOplog(CidStore, + *Project, + *Oplog, + NetworkPool, + WorkerPool, + ExportDir.Path(), + "oplog_large_cid", + 64u * 1024u, + 1000, + MaxChunkEmbedSize, + /*EmbedLooseFiles=*/true, + /*ForceUpload=*/false, + /*IgnoreMissingAttachments=*/false, + /*OptionalContext=*/nullptr, + /*ForceDisableBlocks=*/false, + &RemoteStore); Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_large_cid_import", {}); RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, @@ -6380,23 +6212,22 @@ TEST_CASE("project.store.export.block_reuse") // First export. std::shared_ptr<RemoteProjectStore> RemoteStore; - RemoteProjectStore::Result FirstResult = RunSaveOplog(CidStore, - *Project, - *Oplog, - NetworkPool, - WorkerPool, - ExportDir.Path(), - "oplog_reuse", - MaxBlockSize, - 1000, - MaxChunkEmbedSize, - /*EmbedLooseFiles=*/true, - /*ForceUpload=*/false, - /*IgnoreMissingAttachments=*/false, - /*OptionalContext=*/nullptr, - /*ForceDisableBlocks=*/false, - &RemoteStore); - REQUIRE(FirstResult.ErrorCode == 0); + RunSaveOplog(CidStore, + *Project, + *Oplog, + NetworkPool, + WorkerPool, + ExportDir.Path(), + "oplog_reuse", + MaxBlockSize, + 1000, + MaxChunkEmbedSize, + /*EmbedLooseFiles=*/true, + /*ForceUpload=*/false, + /*IgnoreMissingAttachments=*/false, + /*OptionalContext=*/nullptr, + /*ForceDisableBlocks=*/false, + &RemoteStore); RemoteProjectStore::GetKnownBlocksResult KnownAfterFirst = RemoteStore->GetKnownBlocks(); REQUIRE(!KnownAfterFirst.Blocks.empty()); @@ -6409,21 +6240,20 @@ TEST_CASE("project.store.export.block_reuse") // Second export on the same remote store. FindReuseBlocks must match all blocks // from the first export, so no new block files are written. - RemoteProjectStore::Result SecondResult = SaveOplog(CidStore, - *RemoteStore, - *Project, - *Oplog, - NetworkPool, - WorkerPool, - MaxBlockSize, - 1000, - MaxChunkEmbedSize, - 64u * 1024u * 1024u, - /*EmbedLooseFiles=*/true, - /*ForceUpload=*/false, - /*IgnoreMissingAttachments=*/false, - /*OptionalContext=*/nullptr); - CHECK(SecondResult.ErrorCode == 0); + SaveOplog(CidStore, + *RemoteStore, + *Project, + *Oplog, + NetworkPool, + WorkerPool, + MaxBlockSize, + 1000, + MaxChunkEmbedSize, + 64u * 1024u * 1024u, + /*EmbedLooseFiles=*/true, + /*ForceUpload=*/false, + /*IgnoreMissingAttachments=*/false, + /*OptionalContext=*/nullptr); RemoteProjectStore::GetKnownBlocksResult KnownAfterSecond = RemoteStore->GetKnownBlocks(); std::vector<IoHash> BlockHashesAfterSecond; @@ -6474,23 +6304,22 @@ TEST_CASE("project.store.export.max_chunks_per_block") constexpr size_t MaxChunkEmbedSize = 4u * 1024u; std::shared_ptr<RemoteProjectStore> RemoteStore; - RemoteProjectStore::Result ExportResult = RunSaveOplog(CidStore, - *Project, - *Oplog, - NetworkPool, - WorkerPool, - ExportDir.Path(), - "oplog_max_chunks", - MaxBlockSize, - MaxChunksPerBlock, - MaxChunkEmbedSize, - /*EmbedLooseFiles=*/true, - /*ForceUpload=*/false, - /*IgnoreMissingAttachments=*/false, - /*OptionalContext=*/nullptr, - /*ForceDisableBlocks=*/false, - &RemoteStore); - REQUIRE(ExportResult.ErrorCode == 0); + RunSaveOplog(CidStore, + *Project, + *Oplog, + NetworkPool, + WorkerPool, + ExportDir.Path(), + "oplog_max_chunks", + MaxBlockSize, + MaxChunksPerBlock, + MaxChunkEmbedSize, + /*EmbedLooseFiles=*/true, + /*ForceUpload=*/false, + /*IgnoreMissingAttachments=*/false, + /*OptionalContext=*/nullptr, + /*ForceDisableBlocks=*/false, + &RemoteStore); RemoteProjectStore::GetKnownBlocksResult KnownBlocks = RemoteStore->GetKnownBlocks(); CHECK(KnownBlocks.Blocks.size() >= 2); @@ -6566,23 +6395,22 @@ TEST_CASE("project.store.export.max_data_per_block") constexpr size_t MaxChunkEmbedSize = 3u * 1024u; std::shared_ptr<RemoteProjectStore> RemoteStore; - RemoteProjectStore::Result ExportResult = RunSaveOplog(CidStore, - *Project, - *Oplog, - NetworkPool, - WorkerPool, - ExportDir.Path(), - "oplog_max_data_per_chunk", - MaxBlockSize, - MaxChunksPerBlock, - MaxChunkEmbedSize, - /*EmbedLooseFiles=*/true, - /*ForceUpload=*/false, - /*IgnoreMissingAttachments=*/false, - /*OptionalContext=*/nullptr, - /*ForceDisableBlocks=*/false, - &RemoteStore); - REQUIRE(ExportResult.ErrorCode == 0); + RunSaveOplog(CidStore, + *Project, + *Oplog, + NetworkPool, + WorkerPool, + ExportDir.Path(), + "oplog_max_data_per_chunk", + MaxBlockSize, + MaxChunksPerBlock, + MaxChunkEmbedSize, + /*EmbedLooseFiles=*/true, + /*ForceUpload=*/false, + /*IgnoreMissingAttachments=*/false, + /*OptionalContext=*/nullptr, + /*ForceDisableBlocks=*/false, + &RemoteStore); RemoteProjectStore::GetKnownBlocksResult KnownBlocks = RemoteStore->GetKnownBlocks(); CHECK(KnownBlocks.Blocks.size() >= 2); @@ -6657,23 +6485,22 @@ TEST_CASE("project.store.export.file_deleted_between_phases") DeleteOnRewriteContext Ctx; Ctx.Paths = &FilePaths; - RemoteProjectStore::Result Result = RunSaveOplog(CidStore, - *Project, - *Oplog, - NetworkPool, - WorkerPool, - ExportDir.Path(), - "oplog_file_deleted", - 64u * 1024u, - 1000, - 32u * 1024u, - /*EmbedLooseFiles=*/true, - /*ForceUpload=*/false, - /*IgnoreMissingAttachments=*/true, - &Ctx, - /*ForceDisableBlocks=*/false); - - CHECK(Result.ErrorCode == 0); + RunSaveOplog(CidStore, + *Project, + *Oplog, + NetworkPool, + WorkerPool, + ExportDir.Path(), + "oplog_file_deleted", + 64u * 1024u, + 1000, + 32u * 1024u, + /*EmbedLooseFiles=*/true, + /*ForceUpload=*/false, + /*IgnoreMissingAttachments=*/true, + &Ctx, + /*ForceDisableBlocks=*/false); + // The AllowChunking phase cleanup (line 2195-2205) emits "Missing attachment" for each hash // whose MakeFromFile call failed at line 2088. CHECK(Ctx.HasMessage("Missing attachment")); @@ -6715,23 +6542,22 @@ TEST_CASE("project.store.embed_loose_files_zero_data_hash") WorkerThreadPool NetworkPool(NetworkWorkerCount); std::shared_ptr<RemoteProjectStore> RemoteStore; - RemoteProjectStore::Result ExportResult = RunSaveOplog(CidStore, - *Project, - *Oplog, - NetworkPool, - WorkerPool, - ExportDir.Path(), - "oplog_zero_data_hash", - 64u * 1024u, - 1000, - 32u * 1024u, - /*EmbedLooseFiles=*/true, - /*ForceUpload=*/false, - /*IgnoreMissingAttachments=*/false, - /*OptionalContext=*/nullptr, - /*ForceDisableBlocks=*/false, - &RemoteStore); - REQUIRE(ExportResult.ErrorCode == 0); + RunSaveOplog(CidStore, + *Project, + *Oplog, + NetworkPool, + WorkerPool, + ExportDir.Path(), + "oplog_zero_data_hash", + 64u * 1024u, + 1000, + 32u * 1024u, + /*EmbedLooseFiles=*/true, + /*ForceUpload=*/false, + /*IgnoreMissingAttachments=*/false, + /*OptionalContext=*/nullptr, + /*ForceDisableBlocks=*/false, + &RemoteStore); // Round-trip: the resolved attachment data must be recoverable. Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_zero_data_hash_import", {}); @@ -6779,23 +6605,22 @@ TEST_CASE("project.store.embed_loose_files_already_resolved") // First export: RewriteOp hashes the files and rewrites each entry to include // "data": BinaryAttachment(H). std::shared_ptr<RemoteProjectStore> RemoteStore1; - RemoteProjectStore::Result FirstExport = RunSaveOplog(CidStore, - *Project, - *Oplog, - NetworkPool, - WorkerPool, - ExportDir1.Path(), - "oplog_already_resolved", - 64u * 1024u, - 1000, - 32u * 1024u, - /*EmbedLooseFiles=*/true, - /*ForceUpload=*/false, - /*IgnoreMissingAttachments=*/false, - /*OptionalContext=*/nullptr, - /*ForceDisableBlocks=*/false, - &RemoteStore1); - REQUIRE(FirstExport.ErrorCode == 0); + RunSaveOplog(CidStore, + *Project, + *Oplog, + NetworkPool, + WorkerPool, + ExportDir1.Path(), + "oplog_already_resolved", + 64u * 1024u, + 1000, + 32u * 1024u, + /*EmbedLooseFiles=*/true, + /*ForceUpload=*/false, + /*IgnoreMissingAttachments=*/false, + /*OptionalContext=*/nullptr, + /*ForceDisableBlocks=*/false, + &RemoteStore1); // Import: WriteOplogSection copies the rewritten ops (with pre-resolved "data" // BinaryAttachment fields) into the local oplog; attachment data lands in CidStore. @@ -6812,22 +6637,21 @@ TEST_CASE("project.store.embed_loose_files_already_resolved") // Re-export: the imported oplog ops carry "data": BinaryAttachment(H) (DataHash != Zero). // RewriteOp copies those entries via line 1873 without re-reading files from disk. // The attachment data is fetched from CidStore in the AllowChunking phase. - RemoteProjectStore::Result SecondExport = RunSaveOplog(CidStore, - *Project, - *ImportOplog, - NetworkPool, - WorkerPool, - ExportDir2.Path(), - "oplog_already_resolved_reexport", - 64u * 1024u, - 1000, - 32u * 1024u, - /*EmbedLooseFiles=*/true, - /*ForceUpload=*/false, - /*IgnoreMissingAttachments=*/false, - /*OptionalContext=*/nullptr, - /*ForceDisableBlocks=*/false); - CHECK(SecondExport.ErrorCode == 0); + RunSaveOplog(CidStore, + *Project, + *ImportOplog, + NetworkPool, + WorkerPool, + ExportDir2.Path(), + "oplog_already_resolved_reexport", + 64u * 1024u, + 1000, + 32u * 1024u, + /*EmbedLooseFiles=*/true, + /*ForceUpload=*/false, + /*IgnoreMissingAttachments=*/false, + /*OptionalContext=*/nullptr, + /*ForceDisableBlocks=*/false); } TEST_CASE("project.store.blockcomposer.path_a_standalone_block") diff --git a/src/zenserver/storage/projectstore/httpprojectstore.cpp b/src/zenserver/storage/projectstore/httpprojectstore.cpp index b7a3762af..d988a6050 100644 --- a/src/zenserver/storage/projectstore/httpprojectstore.cpp +++ b/src/zenserver/storage/projectstore/httpprojectstore.cpp @@ -2671,44 +2671,38 @@ HttpProjectService::HandleOplogLoadRequest(HttpRouterRequest& Req) WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background); - RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer( - m_CidStore, - *Project, - *Oplog, - WorkerPool, - MaxBlockSize, - MaxChunkEmbedSize, - MaxChunksPerBlock, - ChunkFileSizeLimit, - /* BuildBlocks */ false, - /* IgnoreMissingAttachments */ false, - /* AllowChunking*/ false, - [](CompressedBuffer&&, ChunkBlockDescription&&) {}, - [](const IoHash&, TGetAttachmentBufferFunc&&) {}, - [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, - /* EmbedLooseFiles*/ false); - - if (ContainerResult.ErrorCode == 0) - { - return HttpReq.WriteResponse(HttpResponseCode::OK, ContainerResult.ContainerObject); - } - else - { - ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", - ToString(HttpReq.RequestVerb()), - HttpReq.QueryString(), - ContainerResult.ErrorCode, - ContainerResult.Reason); - - if (ContainerResult.Reason.empty()) + try + { + CbObject ContainerObject = BuildContainer( + m_CidStore, + *Project, + *Oplog, + WorkerPool, + MaxBlockSize, + MaxChunksPerBlock, + MaxChunkEmbedSize, + ChunkFileSizeLimit, + /* BuildBlocks */ false, + /* IgnoreMissingAttachments */ false, + /* AllowChunking*/ false, + [](CompressedBuffer&&, ChunkBlockDescription&&) {}, + [](const IoHash&, TGetAttachmentBufferFunc&&) {}, + [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, + /* EmbedLooseFiles*/ false); + return HttpReq.WriteResponse(HttpResponseCode::OK, ContainerObject); + } + catch (const HttpClientError& HttpEx) + { + if (HttpEx.GetInternalErrorCode() != HttpClientErrorCode::kOK) { - return HttpReq.WriteResponse(HttpResponseCode(ContainerResult.ErrorCode)); + return HttpReq.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, HttpEx.what()); } else { - return HttpReq.WriteResponse(HttpResponseCode(ContainerResult.ErrorCode), HttpContentType::kText, ContainerResult.Reason); + return HttpReq.WriteResponse(HttpEx.GetHttpResponseCode(), HttpContentType::kText, HttpEx.what()); } } + // Let server request handler deal with other exceptions } void @@ -2872,73 +2866,78 @@ HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req) WorkerThreadPool& NetworkWorkerPool = Workers->GetNetworkPool(); Context.ReportMessage(fmt::format("{}", Workers->GetWorkersInfo())); - RemoteProjectStore::Result Result; + ; try { - Result = LoadOplog(LoadOplogContext{ - .ChunkStore = m_CidStore, - .RemoteStore = *RemoteStoreResult->Store, - .OptionalCache = RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->Cache.get() : nullptr, - .CacheBuildId = RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->BuildsId : Oid::Zero, - .OptionalCacheStats = RemoteStoreResult->OptionalCache ? &RemoteStoreResult->OptionalCache->Stats : nullptr, - .Oplog = *Oplog, - .NetworkWorkerPool = NetworkWorkerPool, - .WorkerPool = WorkerPool, - .ForceDownload = Force, - .IgnoreMissingAttachments = IgnoreMissingAttachments, - .CleanOplog = CleanOplog, - .PartialBlockRequestMode = PartialBlockRequestMode, - .PopulateCache = RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->Populate : false, - .StoreLatencySec = RemoteStoreResult->LatencySec, - .StoreMaxRangeCountPerRequest = RemoteStoreResult->MaxRangeCountPerRequest, - .CacheLatencySec = RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->LatencySec : -1.0, - .CacheMaxRangeCountPerRequest = - RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->MaxRangeCountPerRequest : 0, - .OptionalJobContext = &Context}); + RemoteProjectStore::Result Result = LoadOplog(LoadOplogContext{ + .ChunkStore = m_CidStore, + .RemoteStore = *RemoteStoreResult->Store, + .OptionalCache = RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->Cache.get() : nullptr, + .CacheBuildId = RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->BuildsId : Oid::Zero, + .OptionalCacheStats = RemoteStoreResult->OptionalCache ? &RemoteStoreResult->OptionalCache->Stats : nullptr, + .Oplog = *Oplog, + .NetworkWorkerPool = NetworkWorkerPool, + .WorkerPool = WorkerPool, + .ForceDownload = Force, + .IgnoreMissingAttachments = IgnoreMissingAttachments, + .CleanOplog = CleanOplog, + .PartialBlockRequestMode = PartialBlockRequestMode, + .PopulateCache = RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->Populate : false, + .StoreLatencySec = RemoteStoreResult->LatencySec, + .StoreMaxRangeCountPerRequest = RemoteStoreResult->MaxRangeCountPerRequest, + .CacheLatencySec = RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->LatencySec : -1.0, + .CacheMaxRangeCountPerRequest = + RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->MaxRangeCountPerRequest : 0, + .OptionalJobContext = &Context}); + auto Response = ConvertResult(Result); + ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); + if (!IsHttpSuccessCode(Response.first)) + { + throw JobError( + Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second, + (int)Response.first); + } } catch (const HttpClientError& HttpEx) { if (HttpEx.GetInternalErrorCode() != HttpClientErrorCode::kOK) { - Result.ErrorCode = static_cast<int32_t>(HttpEx.GetInternalErrorCode()); + throw JobError(fmt::format("Failed due to an http exception (Err: {}): {}", + static_cast<int>(HttpEx.GetInternalErrorCode()), + HttpEx.what()), + static_cast<int>(HttpEx.GetResponseClass())); } else { - Result.ErrorCode = static_cast<int32_t>(HttpEx.GetHttpResponseCode() != HttpResponseCode::ImATeapot - ? static_cast<int>(HttpEx.GetHttpResponseCode()) - : 0); + throw JobError(fmt::format("Failed due to an http exception (Code: {}): {}", + static_cast<int>(HttpEx.GetHttpResponseCode()), + HttpEx.what()), + static_cast<int>(HttpEx.GetHttpResponseCode())); } - Result.Reason = HttpEx.what(); - Result.Text = fmt::format("Failed due to an http exception (Err: {}, Code: {}): {}", - static_cast<int32_t>(HttpEx.GetInternalErrorCode()), - static_cast<int32_t>(HttpEx.GetHttpResponseCode()), - HttpEx.what()); } catch (const AssertException& AssertEx) { - Result.ErrorCode = static_cast<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = AssertEx.what(); - Result.Text = fmt::format("Failed due to an assert exception: {}", AssertEx.FullDescription()); + throw JobError(fmt::format("Failed due to an assert exception: {}", AssertEx.FullDescription()), + static_cast<int>(HttpResponseCode::InternalServerError)); } catch (const std::system_error& SysEx) { - Result.ErrorCode = static_cast<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = SysEx.what(); - Result.Text = fmt::format("Failed due to a system error ({}): {}", SysEx.code().value(), SysEx.what()); + throw JobError(fmt::format("Failed due to a system error ({}): {}", SysEx.code().value(), SysEx.what()), + SysEx.code().value()); } catch (const std::exception& Ex) { - Result.ErrorCode = static_cast<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = Ex.what(); - Result.Text = fmt::format("Failed due to an exception: {}", Ex.what()); + throw JobError(fmt::format("Failed due to an exception: {}", Ex.what()), + static_cast<int>(HttpResponseCode::InternalServerError)); } - auto Response = ConvertResult(Result); - ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); - if (!IsHttpSuccessCode(Response.first)) + if (Context.IsCancelled()) + { + ZEN_INFO("LoadOplog: Operation cancelled"); + } + else { - throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second, - (int)Response.first); + ZEN_INFO("LoadOplog: Complete"); } }); @@ -3002,73 +3001,63 @@ HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req) WorkerThreadPool& WorkerPool = Workers->GetIOWorkerPool(); WorkerThreadPool& NetworkWorkerPool = Workers->GetNetworkPool(); - RemoteProjectStore::Result Result; try { - Result = SaveOplog(m_CidStore, - *ActualRemoteStore, - *Project, - *Oplog, - NetworkWorkerPool, - WorkerPool, - MaxBlockSize, - MaxChunksPerBlock, - MaxChunkEmbedSize, - ChunkFileSizeLimit, - EmbedLooseFile, - Force, - IgnoreMissingAttachments, - &Context); + SaveOplog(m_CidStore, + *ActualRemoteStore, + *Project, + *Oplog, + NetworkWorkerPool, + WorkerPool, + MaxBlockSize, + MaxChunksPerBlock, + MaxChunkEmbedSize, + ChunkFileSizeLimit, + EmbedLooseFile, + Force, + IgnoreMissingAttachments, + &Context); } catch (const HttpClientError& HttpEx) { if (HttpEx.GetInternalErrorCode() != HttpClientErrorCode::kOK) { - Result.ErrorCode = static_cast<int32_t>(HttpEx.GetInternalErrorCode()); + throw JobError(fmt::format("Failed due to an http exception (Err: {}): {}", + static_cast<int>(HttpEx.GetInternalErrorCode()), + HttpEx.what()), + static_cast<int>(HttpEx.GetResponseClass())); } else { - Result.ErrorCode = static_cast<int32_t>(HttpEx.GetHttpResponseCode() != HttpResponseCode::ImATeapot - ? static_cast<int>(HttpEx.GetHttpResponseCode()) - : 0); + throw JobError(fmt::format("Failed due to an http exception (Code: {}): {}", + static_cast<int>(HttpEx.GetHttpResponseCode()), + HttpEx.what()), + static_cast<int>(HttpEx.GetHttpResponseCode())); } - Result.Reason = HttpEx.what(); - Result.Text = fmt::format("Failed due to an http exception (Err: {}, Code: {}): {}", - static_cast<int32_t>(HttpEx.GetInternalErrorCode()), - static_cast<int32_t>(HttpEx.GetHttpResponseCode()), - HttpEx.what()); } catch (const AssertException& AssertEx) { - Result.ErrorCode = static_cast<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = AssertEx.what(); - Result.Text = fmt::format("Failed due to an assert exception: {}", AssertEx.FullDescription()); + throw JobError(fmt::format("Failed due to an assert exception: {}", AssertEx.FullDescription()), + static_cast<int>(HttpResponseCode::InternalServerError)); } catch (const std::system_error& SysEx) { - Result.ErrorCode = static_cast<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = SysEx.what(); - Result.Text = fmt::format("Failed due to a system error ({}): {}", SysEx.code().value(), SysEx.what()); + throw JobError(fmt::format("Failed due to a system error ({}): {}", SysEx.code().value(), SysEx.what()), + SysEx.code().value()); } catch (const std::exception& Ex) { - Result.ErrorCode = static_cast<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = Ex.what(); - Result.Text = fmt::format("Failed due to an exception: {}", Ex.what()); + throw JobError(fmt::format("Failed due to an exception: {}", Ex.what()), + static_cast<int>(HttpResponseCode::InternalServerError)); } if (Context.IsCancelled()) { - Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK); - Result.Reason = "Operation cancelled"; + ZEN_INFO("SaveOplog: Operation cancelled"); } - - auto Response = ConvertResult(Result); - ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second); - if (!IsHttpSuccessCode(Response.first)) + else { - throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second, - (int)Response.first); + ZEN_INFO("SaveOplog: Complete"); } }); |