diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-15 22:57:46 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2026-03-15 23:14:23 +0100 |
| commit | b888c409afcdb0b9b62e5b730b57eb43c7f53304 (patch) | |
| tree | 7ebcf94d6ba3779c5357408011bdfec04a253ca4 | |
| parent | fix removed code (diff) | |
| download | zen-b888c409afcdb0b9b62e5b730b57eb43c7f53304.tar.xz zen-b888c409afcdb0b9b62e5b730b57eb43c7f53304.zip | |
wip
| -rw-r--r-- | src/zenremotestore/projectstore/remoteprojectstore.cpp | 659 |
1 files changed, 372 insertions, 287 deletions
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp index 935af9af0..2cdc76034 100644 --- a/src/zenremotestore/projectstore/remoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp @@ -1197,30 +1197,25 @@ namespace remotestore_impl { }; void DownloadAndSaveBlockChunks(LoadOplogContext& Context, - Latch& AttachmentsDownloadLatch, - Latch& AttachmentsWriteLatch, - AsyncRemoteResult& RemoteResult, + ParallelWork& AttachmentWork, DownloadInfo& Info, Stopwatch& LoadAttachmentsTimer, std::atomic_uint64_t& DownloadStartMS, ThinChunkBlockDescription&& ThinBlockDescription, std::vector<uint32_t>&& NeededChunkIndexes) { - AttachmentsDownloadLatch.AddCount(1); - Context.NetworkWorkerPool.ScheduleWork( + AttachmentWork.ScheduleWork( + Context.NetworkWorkerPool, [&Context, - &AttachmentsDownloadLatch, - &AttachmentsWriteLatch, - &RemoteResult, + &AttachmentWork, ThinBlockDescription = std::move(ThinBlockDescription), NeededChunkIndexes = std::move(NeededChunkIndexes), &Info, &LoadAttachmentsTimer, - &DownloadStartMS]() { + &DownloadStartMS](std::atomic<bool>& AbortFlag) { ZEN_TRACE_CPU("DownloadBlockChunks"); - auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); - if (RemoteResult.IsError()) + if (AbortFlag) { return; } @@ -1246,7 +1241,7 @@ namespace remotestore_impl { Info.MissingAttachmentCount.fetch_add(1); if (!Context.IgnoreMissingAttachments) { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + throw RemoteStoreError(Result.Reason, Result.ErrorCode, Result.Text); } return; } @@ -1260,70 +1255,60 @@ namespace remotestore_impl { fmt::format("Loaded {} bulk attachments in {}", Chunks.size(), NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)))); - if (RemoteResult.IsError()) + if (AbortFlag) { return; } - AttachmentsWriteLatch.AddCount(1); - Context.WorkerPool.ScheduleWork( - [&AttachmentsWriteLatch, &RemoteResult, &Info, &Context, Chunks = std::move(Result.Chunks)]() { - auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); - if (RemoteResult.IsError()) + AttachmentWork.ScheduleWork( + Context.WorkerPool, + [&Info, &Context, Chunks = std::move(Result.Chunks)](std::atomic<bool>& AbortFlag) { + if (AbortFlag) { return; } if (!Chunks.empty()) { - try - { - std::vector<IoBuffer> WriteAttachmentBuffers; - std::vector<IoHash> WriteRawHashes; - WriteAttachmentBuffers.reserve(Chunks.size()); - WriteRawHashes.reserve(Chunks.size()); + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; + WriteAttachmentBuffers.reserve(Chunks.size()); + WriteRawHashes.reserve(Chunks.size()); - for (const auto& It : Chunks) - { - WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer()); - WriteRawHashes.push_back(It.first); - } - std::vector<CidStore::InsertResult> InsertResults = - Context.ChunkStore.AddChunks(WriteAttachmentBuffers, - WriteRawHashes, - CidStore::InsertMode::kCopyOnly); + for (const auto& It : Chunks) + { + WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(It.first); + } + std::vector<CidStore::InsertResult> InsertResults = + Context.ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly); - for (size_t Index = 0; Index < InsertResults.size(); Index++) + for (size_t Index = 0; Index < InsertResults.size(); Index++) + { + if (InsertResults[Index].New) { - if (InsertResults[Index].New) - { - Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); - Info.AttachmentsStored.fetch_add(1); - } + Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); + Info.AttachmentsStored.fetch_add(1); } } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to bulk save {} attachments", Chunks.size()), - Ex.what()); - } } }, WorkerThreadPool::EMode::EnableBacklog); } + catch (const RemoteStoreError&) + { + throw; + } catch (const std::exception& Ex) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to bulk load {} attachments", NeededChunkIndexes.size()), - Ex.what()); + throw RemoteStoreError(fmt::format("Failed to bulk load {} attachments", NeededChunkIndexes.size()), + gsl::narrow<int>(HttpResponseCode::InternalServerError), + Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); }; void DownloadAndSaveBlock(LoadOplogContext& Context, - Latch& AttachmentsDownloadLatch, - Latch& AttachmentsWriteLatch, - AsyncRemoteResult& RemoteResult, + ParallelWork& AttachmentWork, DownloadInfo& Info, Stopwatch& LoadAttachmentsTimer, std::atomic_uint64_t& DownloadStartMS, @@ -1332,23 +1317,20 @@ namespace remotestore_impl { std::span<std::atomic<bool>> ChunkDownloadedFlags, uint32_t RetriesLeft) { - AttachmentsDownloadLatch.AddCount(1); - Context.NetworkWorkerPool.ScheduleWork( - [&AttachmentsDownloadLatch, - &AttachmentsWriteLatch, + AttachmentWork.ScheduleWork( + Context.NetworkWorkerPool, + [&AttachmentWork, &Context, - &RemoteResult, &Info, &LoadAttachmentsTimer, &DownloadStartMS, RetriesLeft, BlockHash = IoHash(BlockHash), &AllNeededPartialChunkHashesLookup, - ChunkDownloadedFlags]() { + ChunkDownloadedFlags](std::atomic<bool>& AbortFlag) { ZEN_TRACE_CPU("DownloadBlock"); - auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); - if (RemoteResult.IsError()) + if (AbortFlag) { return; } @@ -1376,11 +1358,11 @@ namespace remotestore_impl { Info.MissingAttachmentCount.fetch_add(1); if (!Context.IgnoreMissingAttachments) { - RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); + throw RemoteStoreError(BlockResult.Reason, BlockResult.ErrorCode, BlockResult.Text); } return; } - if (RemoteResult.IsError()) + if (AbortFlag) { return; } @@ -1401,12 +1383,10 @@ namespace remotestore_impl { Info.AttachmentBlocksDownloaded.fetch_add(1); Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize); - AttachmentsWriteLatch.AddCount(1); - Context.WorkerPool.ScheduleWork( - [&AttachmentsDownloadLatch, - &AttachmentsWriteLatch, + AttachmentWork.ScheduleWork( + Context.WorkerPool, + [&AttachmentWork, &Context, - &RemoteResult, &Info, &LoadAttachmentsTimer, &DownloadStartMS, @@ -1414,9 +1394,8 @@ namespace remotestore_impl { BlockHash = IoHash(BlockHash), &AllNeededPartialChunkHashesLookup, ChunkDownloadedFlags, - Bytes = std::move(BlobBuffer)]() { - auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); - if (RemoteResult.IsError()) + Bytes = std::move(BlobBuffer)](std::atomic<bool>& AbortFlag) { + if (AbortFlag) { return; } @@ -1525,9 +1504,7 @@ namespace remotestore_impl { ReportMessage(Context.OptionalJobContext, fmt::format("{}, retrying download", ErrorString)); return DownloadAndSaveBlock(Context, - AttachmentsDownloadLatch, - AttachmentsWriteLatch, - RemoteResult, + AttachmentWork, Info, LoadAttachmentsTimer, DownloadStartMS, @@ -1539,32 +1516,41 @@ namespace remotestore_impl { else { ReportMessage(Context.OptionalJobContext, ErrorString); - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), ErrorString, {}); - return; + throw RemoteStoreError(ErrorString, + gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + {}); } } } + catch (const RemoteStoreError&) + { + throw; + } catch (const std::exception& Ex) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to save block attachment {}", BlockHash), - Ex.what()); + throw RemoteStoreError(fmt::format("Failed to save block attachment {}", BlockHash), + gsl::narrow<int>(HttpResponseCode::InternalServerError), + Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); } + catch (const RemoteStoreError&) + { + throw; + } catch (const std::exception& Ex) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to download block attachment {}", BlockHash), - Ex.what()); + throw RemoteStoreError(fmt::format("Failed to download block attachment {}", BlockHash), + gsl::narrow<int>(HttpResponseCode::InternalServerError), + Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); }; void DownloadPartialBlock(LoadOplogContext& Context, - AsyncRemoteResult& RemoteResult, + std::atomic<bool>& AbortFlag, DownloadInfo& Info, double& DownloadTimeSeconds, const ChunkBlockDescription& BlockDescription, @@ -1593,7 +1579,7 @@ namespace remotestore_impl { while (SubRangeCountComplete < SubBlockRangeCount) { - if (RemoteResult.IsError()) + if (AbortFlag.load()) { break; } @@ -1615,7 +1601,7 @@ namespace remotestore_impl { SubRange.first, SubRange.second); DownloadTimeSeconds += CacheTimer.GetElapsedTimeMs() / 1000.0; - if (RemoteResult.IsError()) + if (AbortFlag.load()) { break; } @@ -1636,7 +1622,7 @@ namespace remotestore_impl { BuildStorageCache::BuildBlobRanges RangeBuffers = Context.OptionalCache->GetBuildBlobRanges(Context.CacheBuildId, BlockDescription.BlockHash, SubRanges); DownloadTimeSeconds += CacheTimer.GetElapsedTimeMs() / 1000.0; - if (RemoteResult.IsError()) + if (AbortFlag.load()) { break; } @@ -1668,7 +1654,7 @@ namespace remotestore_impl { RemoteProjectStore::LoadAttachmentRangesResult BlockResult = Context.RemoteStore.LoadAttachmentRanges(BlockDescription.BlockHash, SubRanges); DownloadTimeSeconds += BlockResult.ElapsedSeconds; - if (RemoteResult.IsError()) + if (AbortFlag.load()) { break; } @@ -1683,8 +1669,7 @@ namespace remotestore_impl { Info.MissingAttachmentCount.fetch_add(1); if (!Context.IgnoreMissingAttachments) { - RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); - break; + throw RemoteStoreError(BlockResult.Reason, BlockResult.ErrorCode, BlockResult.Text); } } else @@ -1700,7 +1685,7 @@ namespace remotestore_impl { BlockDescription.BlockHash, ZenContentType::kCompressedBinary, CompositeBuffer(std::vector<IoBuffer>{BlockResult.Bytes})); - if (RemoteResult.IsError()) + if (AbortFlag.load()) { break; } @@ -1714,13 +1699,12 @@ namespace remotestore_impl { { if (BlockResult.Ranges.size() != SubRanges.size()) { - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Range response for block {} contains {} ranges, expected {} ranges", - BlockDescription.BlockHash, - BlockResult.Ranges.size(), - SubRanges.size()), - ""); - break; + throw RemoteStoreError(fmt::format("Range response for block {} contains {} ranges, expected {} ranges", + BlockDescription.BlockHash, + BlockResult.Ranges.size(), + SubRanges.size()), + gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + ""); } OnDownloaded(std::move(BlockResult.Bytes), SubRangeStartIndex, BlockResult.Ranges); } @@ -1731,9 +1715,7 @@ namespace remotestore_impl { } void DownloadAndSavePartialBlock(LoadOplogContext& Context, - Latch& AttachmentsDownloadLatch, - Latch& AttachmentsWriteLatch, - AsyncRemoteResult& RemoteResult, + ParallelWork& AttachmentWork, DownloadInfo& Info, Stopwatch& LoadAttachmentsTimer, std::atomic_uint64_t& DownloadStartMS, @@ -1746,12 +1728,10 @@ namespace remotestore_impl { std::span<std::atomic<bool>> ChunkDownloadedFlags, uint32_t RetriesLeft) { - AttachmentsDownloadLatch.AddCount(1); - Context.NetworkWorkerPool.ScheduleWork( - [&AttachmentsDownloadLatch, - &AttachmentsWriteLatch, + AttachmentWork.ScheduleWork( + Context.NetworkWorkerPool, + [&AttachmentWork, &Context, - &RemoteResult, &Info, &LoadAttachmentsTimer, &DownloadStartMS, @@ -1762,10 +1742,8 @@ namespace remotestore_impl { BlockRangeCount, &AllNeededPartialChunkHashesLookup, ChunkDownloadedFlags, - RetriesLeft]() { + RetriesLeft](std::atomic<bool>& AbortFlag) { ZEN_TRACE_CPU("DownloadBlockRanges"); - - auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); try { uint64_t Unset = (std::uint64_t)-1; @@ -1776,7 +1754,7 @@ namespace remotestore_impl { DownloadPartialBlock( Context, - RemoteResult, + AbortFlag, Info, DownloadElapsedSeconds, BlockDescription, @@ -1793,12 +1771,10 @@ namespace remotestore_impl { Info.AttachmentBlockRangeBytesDownloaded.fetch_add(BlockPartSize); Info.AttachmentBlocksRangesDownloaded++; - AttachmentsWriteLatch.AddCount(1); - Context.WorkerPool.ScheduleWork( - [&AttachmentsWriteLatch, + AttachmentWork.ScheduleWork( + Context.WorkerPool, + [&AttachmentWork, &Context, - &AttachmentsDownloadLatch, - &RemoteResult, &Info, &LoadAttachmentsTimer, &DownloadStartMS, @@ -1811,8 +1787,8 @@ namespace remotestore_impl { RetriesLeft, BlockPayload = std::move(Buffer), OffsetAndLengths = - std::vector<std::pair<uint64_t, uint64_t>>(OffsetAndLengths.begin(), OffsetAndLengths.end())]() { - auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + std::vector<std::pair<uint64_t, uint64_t>>(OffsetAndLengths.begin(), OffsetAndLengths.end())]( + std::atomic<bool>& AbortFlag) { try { ZEN_ASSERT(BlockPayload.Size() > 0); @@ -1820,7 +1796,7 @@ namespace remotestore_impl { size_t RangeCount = OffsetAndLengths.size(); for (size_t RangeOffset = 0; RangeOffset < RangeCount; RangeOffset++) { - if (RemoteResult.IsError()) + if (AbortFlag) { return; } @@ -1842,7 +1818,7 @@ namespace remotestore_impl { ChunkBlockIndex < BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount; ChunkBlockIndex++) { - if (RemoteResult.IsError()) + if (AbortFlag) { break; } @@ -1904,9 +1880,7 @@ namespace remotestore_impl { ReportMessage(Context.OptionalJobContext, fmt::format("{}, retrying download", ErrorString)); return DownloadAndSavePartialBlock(Context, - AttachmentsDownloadLatch, - AttachmentsWriteLatch, - RemoteResult, + AttachmentWork, Info, LoadAttachmentsTimer, DownloadStartMS, @@ -1924,9 +1898,9 @@ namespace remotestore_impl { Info.MissingAttachmentCount.fetch_add(1); if (!Context.IgnoreMissingAttachments) { - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound), - "Malformed chunk block", - ErrorString); + throw RemoteStoreError("Malformed chunk block", + gsl::narrow<int32_t>(HttpResponseCode::NotFound), + ErrorString); } } else @@ -1978,18 +1952,22 @@ namespace remotestore_impl { } } } + catch (const RemoteStoreError&) + { + throw; + } catch (const std::exception& Ex) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed saving {} ranges from block attachment {}", - OffsetAndLengths.size(), - BlockDescription.BlockHash), - Ex.what()); + throw RemoteStoreError(fmt::format("Failed saving {} ranges from block attachment {}", + OffsetAndLengths.size(), + BlockDescription.BlockHash), + gsl::narrow<int>(HttpResponseCode::InternalServerError), + Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); }); - if (!RemoteResult.IsError()) + if (!AbortFlag) { ZEN_DEBUG("Loaded {} ranges from block attachment '{}' in {} ({})", BlockRangeCount, @@ -1998,39 +1976,33 @@ namespace remotestore_impl { NiceBytes(DownloadedBytes)); } } + catch (const RemoteStoreError&) + { + throw; + } catch (const std::exception& Ex) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to download block attachment {} ranges", BlockDescription.BlockHash), - Ex.what()); + throw RemoteStoreError(fmt::format("Failed to download block attachment {} ranges", BlockDescription.BlockHash), + gsl::narrow<int>(HttpResponseCode::InternalServerError), + Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); }; void DownloadAndSaveAttachment(LoadOplogContext& Context, - Latch& AttachmentsDownloadLatch, - Latch& AttachmentsWriteLatch, - AsyncRemoteResult& RemoteResult, + ParallelWork& AttachmentWork, DownloadInfo& Info, Stopwatch& LoadAttachmentsTimer, std::atomic_uint64_t& DownloadStartMS, const IoHash& RawHash) { - AttachmentsDownloadLatch.AddCount(1); - Context.NetworkWorkerPool.ScheduleWork( - [&Context, - &RemoteResult, - &AttachmentsDownloadLatch, - &AttachmentsWriteLatch, - RawHash, - &LoadAttachmentsTimer, - &DownloadStartMS, - &Info]() { + AttachmentWork.ScheduleWork( + Context.NetworkWorkerPool, + [&Context, &AttachmentWork, RawHash, &LoadAttachmentsTimer, &DownloadStartMS, &Info](std::atomic<bool>& AbortFlag) { ZEN_TRACE_CPU("DownloadAttachment"); - auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); - if (RemoteResult.IsError()) + if (AbortFlag) { return; } @@ -2056,7 +2028,7 @@ namespace remotestore_impl { Info.MissingAttachmentCount.fetch_add(1); if (!Context.IgnoreMissingAttachments) { - RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); + throw RemoteStoreError(AttachmentResult.Reason, AttachmentResult.ErrorCode, AttachmentResult.Text); } return; } @@ -2074,7 +2046,7 @@ namespace remotestore_impl { CompositeBuffer(SharedBuffer(BlobBuffer))); } } - if (RemoteResult.IsError()) + if (AbortFlag) { return; } @@ -2083,42 +2055,36 @@ namespace remotestore_impl { Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize); ZEN_ASSERT(BlobBuffer); - AttachmentsWriteLatch.AddCount(1); - Context.WorkerPool.ScheduleWork( - [&Context, &AttachmentsWriteLatch, &RemoteResult, &Info, RawHash, AttachmentSize, Bytes = std::move(BlobBuffer)]() { + AttachmentWork.ScheduleWork( + Context.WorkerPool, + [&Context, &Info, RawHash, AttachmentSize, Bytes = std::move(BlobBuffer)](std::atomic<bool>& AbortFlag) { ZEN_TRACE_CPU("WriteAttachment"); ZEN_ASSERT(Bytes); - auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); - if (RemoteResult.IsError()) + if (AbortFlag) { return; } - try - { - CidStore::InsertResult InsertResult = Context.ChunkStore.AddChunk(Bytes, RawHash); - if (InsertResult.New) - { - Info.AttachmentBytesStored.fetch_add(AttachmentSize); - Info.AttachmentsStored.fetch_add(1); - } - Info.ChunksCompleteCount++; - } - catch (const std::exception& Ex) + CidStore::InsertResult InsertResult = Context.ChunkStore.AddChunk(Bytes, RawHash); + if (InsertResult.New) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Saving attachment {} failed", RawHash), - Ex.what()); + Info.AttachmentBytesStored.fetch_add(AttachmentSize); + Info.AttachmentsStored.fetch_add(1); } + Info.ChunksCompleteCount++; }, WorkerThreadPool::EMode::EnableBacklog); } + catch (const RemoteStoreError&) + { + throw; + } catch (const std::exception& Ex) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Loading attachment {} failed", RawHash), - Ex.what()); + throw RemoteStoreError(fmt::format("Loading attachment {} failed", RawHash), + gsl::narrow<int>(HttpResponseCode::InternalServerError), + Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); @@ -2342,7 +2308,6 @@ namespace remotestore_impl { RemoteStore.SaveAttachment(std::move(Payload), RawHash, std::move(Block)); if (Result.ErrorCode) { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); ReportMessage(OptionalContext, fmt::format("Failed to save attachment '{}', {} ({}): {}", RawHash, @@ -2456,7 +2421,6 @@ namespace remotestore_impl { RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); if (Result.ErrorCode) { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); ReportMessage(OptionalContext, fmt::format("Failed to save attachments with {} chunks ({}): {}", NeededChunks.size(), @@ -3872,10 +3836,10 @@ LoadOplog(LoadOplogContext&& Context) NiceBytes(LoadContainerResult.ContainerObject.GetSize()))); Info.OplogSizeBytes = LoadContainerResult.ContainerObject.GetSize(); - remotestore_impl::AsyncRemoteResult RemoteResult; - Latch AttachmentsDownloadLatch(1); - Latch AttachmentsWriteLatch(1); - std::atomic_size_t AttachmentCount = 0; + std::atomic<bool> AbortFlag(false); + std::atomic<bool> PauseFlag(false); + ParallelWork AttachmentWork(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + std::atomic_size_t AttachmentCount = 0; Stopwatch LoadAttachmentsTimer; std::atomic_uint64_t DownloadStartMS = (std::uint64_t)-1; @@ -3901,17 +3865,16 @@ LoadOplog(LoadOplogContext&& Context) std::vector<NeededBlockDownload> NeededBlockDownloads; auto OnNeedBlock = [&Context, - &AttachmentsDownloadLatch, - &AttachmentsWriteLatch, + &AttachmentWork, + &AbortFlag, &AttachmentCount, - &RemoteResult, &BlockCountToDownload, &Info, &LoadAttachmentsTimer, &DownloadStartMS, &NeededBlockDownloads](ThinChunkBlockDescription&& ThinBlockDescription, std::vector<uint32_t>&& NeededChunkIndexes) { - if (RemoteResult.IsError()) + if (AbortFlag.load()) { return; } @@ -3921,9 +3884,7 @@ LoadOplog(LoadOplogContext&& Context) if (ThinBlockDescription.BlockHash == IoHash::Zero) { DownloadAndSaveBlockChunks(Context, - AttachmentsDownloadLatch, - AttachmentsWriteLatch, - RemoteResult, + AttachmentWork, Info, LoadAttachmentsTimer, DownloadStartMS, @@ -3939,12 +3900,12 @@ LoadOplog(LoadOplogContext&& Context) std::vector<IoHash> AttachmentsToDownload; - auto OnNeedAttachment = [&AttachmentsToDownload, &RemoteResult, &Attachments, &AttachmentCount](const IoHash& RawHash) { + auto OnNeedAttachment = [&AttachmentsToDownload, &AbortFlag, &Attachments, &AttachmentCount](const IoHash& RawHash) { if (!Attachments.insert(RawHash).second) { return; } - if (RemoteResult.IsError()) + if (AbortFlag.load()) { return; } @@ -3961,6 +3922,7 @@ LoadOplog(LoadOplogContext&& Context) Context.Oplog.EnableUpdateCapture(); auto _ = MakeGuard([&Context]() { Context.Oplog.DisableUpdateCapture(); }); + RemoteProjectStore::Result LoadResult; CbObject OplogSection; RemoteProjectStore::Result Result = ParseOplogContainer(LoadContainerResult.ContainerObject, OnReferencedAttachments, @@ -3972,7 +3934,8 @@ LoadOplog(LoadOplogContext&& Context) Context.OptionalJobContext); if (Result.ErrorCode != 0) { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + AbortFlag = true; + LoadResult = {.ErrorCode = Result.ErrorCode, .Reason = Result.Reason, .Text = Result.Text}; } remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Parsed oplog in {}, found {} attachments, {} blocks and {} chunked files to download", @@ -4023,9 +3986,7 @@ LoadOplog(LoadOplogContext&& Context) { // Fall back to full download as we can't get enough information about the block DownloadAndSaveBlock(Context, - AttachmentsDownloadLatch, - AttachmentsWriteLatch, - RemoteResult, + AttachmentWork, Info, LoadAttachmentsTimer, DownloadStartMS, @@ -4153,9 +4114,7 @@ LoadOplog(LoadOplogContext&& Context) for (uint32_t FullBlockIndex : PartialBlocksResult.FullBlockIndexes) { DownloadAndSaveBlock(Context, - AttachmentsDownloadLatch, - AttachmentsWriteLatch, - RemoteResult, + AttachmentWork, Info, LoadAttachmentsTimer, DownloadStartMS, @@ -4177,9 +4136,7 @@ LoadOplog(LoadOplogContext&& Context) } DownloadAndSavePartialBlock(Context, - AttachmentsDownloadLatch, - AttachmentsWriteLatch, - RemoteResult, + AttachmentWork, Info, LoadAttachmentsTimer, DownloadStartMS, @@ -4197,27 +4154,16 @@ LoadOplog(LoadOplogContext&& Context) for (const IoHash& AttachmentToDownload : AttachmentsToDownload) { - DownloadAndSaveAttachment(Context, - AttachmentsDownloadLatch, - AttachmentsWriteLatch, - RemoteResult, - Info, - LoadAttachmentsTimer, - DownloadStartMS, - AttachmentToDownload); + DownloadAndSaveAttachment(Context, AttachmentWork, Info, LoadAttachmentsTimer, DownloadStartMS, AttachmentToDownload); } uint64_t TotalChunksToDownload = AllNeededChunkHashes.size() + AttachmentsToDownload.size(); - AttachmentsDownloadLatch.CountDown(); + try { - while (!AttachmentsDownloadLatch.Wait(1000)) - { - if (remotestore_impl::IsCancelled(Context.OptionalJobContext)) + AttachmentWork.Wait(1000, [&](bool /*IsAborted*/, bool /*IsPaused*/, std::ptrdiff_t /*Pending*/) { + if (remotestore_impl::IsCancelled(Context.OptionalJobContext) && !AbortFlag) { - if (!RemoteResult.IsError()) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - } + AbortFlag = true; } uint64_t PartialTransferWallTimeMS = TransferWallTimeMS; if (DownloadStartMS != (uint64_t)-1) @@ -4246,45 +4192,26 @@ LoadOplog(LoadOplogContext&& Context) TotalChunksToDownload, TotalChunksToDownload - CompletedChunkCount, AttachmentsDownloadProgressTimer.GetElapsedTimeMs()); - } + }); } - if (DownloadStartMS != (uint64_t)-1) + catch (const RemoteStoreError& Ex) { - TransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load(); + if (!LoadResult.ErrorCode) + { + LoadResult = {.ErrorCode = Ex.GetErrorCode(), .Reason = Ex.what(), .Text = std::string(Ex.GetText())}; + } } - - AttachmentsWriteLatch.CountDown(); + catch (const std::exception& Ex) { - while (!AttachmentsWriteLatch.Wait(1000)) + if (!LoadResult.ErrorCode) { - if (remotestore_impl::IsCancelled(Context.OptionalJobContext)) - { - if (!RemoteResult.IsError()) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - } - } - - uint64_t CompletedChunkCount = Info.ChunksCompleteCount.load(); - - uint64_t AttachmentsDownloaded = - Info.AttachmentBlocksDownloaded.load() + Info.AttachmentBlocksRangesDownloaded.load() + Info.AttachmentsDownloaded.load(); - uint64_t AttachmentBytesDownloaded = Info.AttachmentBlockBytesDownloaded.load() + - Info.AttachmentBlockRangeBytesDownloaded.load() + Info.AttachmentBytesDownloaded.load(); - - remotestore_impl::ReportProgress(Context.OptionalJobContext, - "Loading attachments"sv, - fmt::format("{}/{} ({}) chunks. {} ({}) blobs downloaded.", - CompletedChunkCount, - TotalChunksToDownload, - NiceBytes(Info.AttachmentBytesStored.load()), - AttachmentsDownloaded, - NiceBytes(AttachmentBytesDownloaded)), - TotalChunksToDownload, - TotalChunksToDownload - CompletedChunkCount, - AttachmentsDownloadProgressTimer.GetElapsedTimeMs()); + LoadResult = {.ErrorCode = gsl::narrow<int>(HttpResponseCode::InternalServerError), .Reason = Ex.what()}; } } + if (DownloadStartMS != (uint64_t)-1) + { + TransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load(); + } if (AttachmentCount.load() > 0) { @@ -4295,25 +4222,24 @@ LoadOplog(LoadOplogContext&& Context) 0, AttachmentsDownloadProgressTimer.GetElapsedTimeMs()); } - - if (Result.ErrorCode == 0) + if (LoadResult.ErrorCode == 0) { if (!FilesToDechunk.empty()) { remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Dechunking {} attachments", FilesToDechunk.size())); - Latch DechunkLatch(1); + ParallelWork DechunkWork(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); std::filesystem::path TempFilePath = Context.Oplog.TempPath(); for (size_t ChunkedIndex = 0; ChunkedIndex < FilesToDechunk.size(); ChunkedIndex++) { const ChunkedInfo& Chunked = FilesToDechunk[ChunkedIndex]; std::filesystem::path TempFileName = TempFilePath / Chunked.RawHash.ToHexString(); - DechunkLatch.AddCount(1); - Context.WorkerPool.ScheduleWork( - [&Context, &DechunkLatch, TempFileName, &FilesToDechunk, ChunkedIndex, &RemoteResult, &Info]() { + DechunkWork.ScheduleWork( + Context.WorkerPool, + [&Context, TempFileName, &FilesToDechunk, ChunkedIndex, &Info](std::atomic<bool>& AbortFlag) { ZEN_TRACE_CPU("DechunkAttachment"); - auto _ = MakeGuard([&DechunkLatch, &TempFileName] { + auto _ = MakeGuard([&TempFileName] { std::error_code Ec; if (IsFile(TempFileName, Ec)) { @@ -4323,13 +4249,12 @@ LoadOplog(LoadOplogContext&& Context) ZEN_INFO("Failed to remove temporary file '{}'. Reason: {}", TempFileName, Ec.message()); } } - DechunkLatch.CountDown(); }); const ChunkedInfo& Chunked = FilesToDechunk[ChunkedIndex]; try { - if (RemoteResult.IsError()) + if (AbortFlag.load()) { return; } @@ -4342,12 +4267,11 @@ LoadOplog(LoadOplogContext&& Context) TmpFile.Open(TempFileName, BasicFile::Mode::kTruncate, Ec); if (Ec) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - "Write error", - fmt::format("Failed to open temp file {} for chunked attachment {}", - TempFileName, - Chunked.RawHash)); - return; + throw RemoteStoreError("Write error", + gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to open temp file {} for chunked attachment {}", + TempFileName, + Chunked.RawHash)); } else { @@ -4357,7 +4281,7 @@ LoadOplog(LoadOplogContext&& Context) BLAKE3Stream HashingStream; for (std::uint32_t SequenceIndex : Chunked.ChunkSequence) { - if (RemoteResult.IsError()) + if (AbortFlag.load()) { return; } @@ -4374,9 +4298,9 @@ LoadOplog(LoadOplogContext&& Context) Info.MissingAttachmentCount.fetch_add(1); if (!Context.IgnoreMissingAttachments) { - RemoteResult.SetError( - gsl::narrow<int>(HttpResponseCode::NotFound), + throw RemoteStoreError( "Missing chunk", + gsl::narrow<int>(HttpResponseCode::NotFound), fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); } return; @@ -4403,9 +4327,9 @@ LoadOplog(LoadOplogContext&& Context) Info.MissingAttachmentCount.fetch_add(1); if (!Context.IgnoreMissingAttachments) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - "Missing chunk", - Message); + throw RemoteStoreError("Missing chunk", + gsl::narrow<int>(HttpResponseCode::NotFound), + Message); } return; } @@ -4443,9 +4367,9 @@ LoadOplog(LoadOplogContext&& Context) Info.MissingAttachmentCount.fetch_add(1); if (!Context.IgnoreMissingAttachments) { - RemoteResult.SetError( - gsl::narrow<int>(HttpResponseCode::NotFound), + throw RemoteStoreError( "Missing chunk", + gsl::narrow<int>(HttpResponseCode::NotFound), fmt::format("Failed to decompress chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); @@ -4477,37 +4401,49 @@ LoadOplog(LoadOplogContext&& Context) NiceBytes(Chunked.RawSize), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } + catch (const RemoteStoreError&) + { + throw; + } catch (const std::exception& Ex) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to dechunk file {}", Chunked.RawHash), - Ex.what()); + throw RemoteStoreError(fmt::format("Failed to dechunk file {}", Chunked.RawHash), + gsl::narrow<int>(HttpResponseCode::InternalServerError), + Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); } Stopwatch DechunkProgressTimer; - DechunkLatch.CountDown(); - while (!DechunkLatch.Wait(1000)) + try { - ptrdiff_t Remaining = DechunkLatch.Remaining(); - if (remotestore_impl::IsCancelled(Context.OptionalJobContext)) - { - if (!RemoteResult.IsError()) + DechunkWork.Wait(1000, [&](bool /*IsAborted*/, bool /*IsPaused*/, std::ptrdiff_t Remaining) { + if (remotestore_impl::IsCancelled(Context.OptionalJobContext) && !AbortFlag) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage( - Context.OptionalJobContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + AbortFlag = true; } + remotestore_impl::ReportProgress(Context.OptionalJobContext, + "Dechunking attachments"sv, + fmt::format("{} remaining...", Remaining), + FilesToDechunk.size(), + Remaining, + DechunkProgressTimer.GetElapsedTimeMs()); + }); + } + catch (const RemoteStoreError& Ex) + { + if (!LoadResult.ErrorCode) + { + LoadResult = {.ErrorCode = Ex.GetErrorCode(), .Reason = Ex.what(), .Text = std::string(Ex.GetText())}; + } + } + catch (const std::exception& Ex) + { + if (!LoadResult.ErrorCode) + { + LoadResult = {.ErrorCode = gsl::narrow<int>(HttpResponseCode::InternalServerError), .Reason = Ex.what()}; } - remotestore_impl::ReportProgress(Context.OptionalJobContext, - "Dechunking attachments"sv, - fmt::format("{} remaining...", Remaining), - FilesToDechunk.size(), - Remaining, - DechunkProgressTimer.GetElapsedTimeMs()); } remotestore_impl::ReportProgress(Context.OptionalJobContext, "Dechunking attachments"sv, @@ -4516,8 +4452,8 @@ LoadOplog(LoadOplogContext&& Context) 0, DechunkProgressTimer.GetElapsedTimeMs()); } - Result = RemoteResult.ConvertResult(); } + Result = LoadResult; if (Result.ErrorCode == 0) { @@ -4541,7 +4477,6 @@ LoadOplog(LoadOplogContext&& Context) Result = remotestore_impl::WriteOplogSection(Context.Oplog, OplogSection, Context.OptionalJobContext); if (Result.ErrorCode) { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Reason)); } @@ -6659,6 +6594,156 @@ TEST_CASE("project.store.embed_loose_files_already_resolved") /*ForceDisableBlocks=*/false); } +TEST_CASE("project.store.import.error.missing_attachment") +{ + // Export a small oplog (blocks disabled to avoid pre-existing ZEN_ASSERT), delete one + // attachment file from the remote store, then verify that LoadOplog returns a non-zero + // error code when IgnoreMissingAttachments=false. + using namespace projectstore_testutils; + + ScopedTemporaryDirectory TempDir; + ScopedTemporaryDirectory ExportDir; + + GcManager Gc; + CidStore CidStore(Gc); + std::unique_ptr<ProjectStore> ProjectStoreDummy; + Ref<ProjectStore::Project> Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); + + Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("oplog_missing_att", {}); + REQUIRE(Oplog); + + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{512, 1024}))); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{2048, 3000}))); + + uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); + uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; + WorkerThreadPool WorkerPool(WorkerCount); + WorkerThreadPool NetworkPool(NetworkWorkerCount); + + std::shared_ptr<RemoteProjectStore> RemoteStore; + RunSaveOplog(CidStore, + *Project, + *Oplog, + NetworkPool, + WorkerPool, + ExportDir.Path(), + "oplog_missing_att", + 64u * 1024u, + 1000, + 32u * 1024u, + /*EmbedLooseFiles=*/false, + /*ForceUpload=*/false, + /*IgnoreMissingAttachments=*/false, + /*OptionalContext=*/nullptr, + /*ForceDisableBlocks=*/true, + &RemoteStore); + + // Find and delete one .blob attachment file from the remote store directory. + std::filesystem::path DeletedBlob; + for (const auto& Entry : std::filesystem::recursive_directory_iterator(ExportDir.Path())) + { + if (Entry.path().extension() == ".blob") + { + DeletedBlob = Entry.path(); + break; + } + } + REQUIRE(!DeletedBlob.empty()); + std::error_code Ec; + std::filesystem::remove(DeletedBlob, Ec); + REQUIRE(!Ec); + + Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_missing_att_import", {}); + REQUIRE(ImportOplog); + + CapturingJobContext Ctx; + RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .Oplog = *ImportOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = true, + .IgnoreMissingAttachments = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, + .OptionalJobContext = &Ctx}); + CHECK(ImportResult.ErrorCode != 0); +} + +TEST_CASE("project.store.import.ignore_missing_attachment") +{ + // Same setup as project.store.import.error.missing_attachment, but with + // IgnoreMissingAttachments=true. LoadOplog should succeed (ErrorCode == 0) + // despite the missing attachment file. + using namespace projectstore_testutils; + + ScopedTemporaryDirectory TempDir; + ScopedTemporaryDirectory ExportDir; + + GcManager Gc; + CidStore CidStore(Gc); + std::unique_ptr<ProjectStore> ProjectStoreDummy; + Ref<ProjectStore::Project> Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); + + Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("oplog_ignore_missing", {}); + REQUIRE(Oplog); + + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{512, 1024}))); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{2048, 3000}))); + + uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); + uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; + WorkerThreadPool WorkerPool(WorkerCount); + WorkerThreadPool NetworkPool(NetworkWorkerCount); + + std::shared_ptr<RemoteProjectStore> RemoteStore; + RunSaveOplog(CidStore, + *Project, + *Oplog, + NetworkPool, + WorkerPool, + ExportDir.Path(), + "oplog_ignore_missing", + 64u * 1024u, + 1000, + 32u * 1024u, + /*EmbedLooseFiles=*/false, + /*ForceUpload=*/false, + /*IgnoreMissingAttachments=*/false, + /*OptionalContext=*/nullptr, + /*ForceDisableBlocks=*/true, + &RemoteStore); + + // Find and delete one .blob attachment file. + std::filesystem::path DeletedBlob; + for (const auto& Entry : std::filesystem::recursive_directory_iterator(ExportDir.Path())) + { + if (Entry.path().extension() == ".blob") + { + DeletedBlob = Entry.path(); + break; + } + } + REQUIRE(!DeletedBlob.empty()); + std::error_code Ec; + std::filesystem::remove(DeletedBlob, Ec); + REQUIRE(!Ec); + + Ref<ProjectStore::Oplog> ImportOplog = Project->NewOplog("oplog_ignore_missing_import", {}); + REQUIRE(ImportOplog); + + CapturingJobContext Ctx; + RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .Oplog = *ImportOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = true, + .IgnoreMissingAttachments = true, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, + .OptionalJobContext = &Ctx}); + CHECK(ImportResult.ErrorCode == 0); +} + TEST_CASE("project.store.blockcomposer.path_a_standalone_block") { // Path A: a single op with exactly MaxChunksPerBlock (4) chunks. |