diff options
| author | Dan Engelbrecht <[email protected]> | 2023-09-13 16:13:30 -0400 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-09-13 22:13:30 +0200 |
| commit | b2cef5900b6e251bed4bc0a02161fd90646d37f0 (patch) | |
| tree | e9085a92e9499bca55dfda9b63779be94218409f /src/zenserver/projectstore/remoteprojectstore.cpp | |
| parent | scan oplog object for fields (#397) (diff) | |
| download | zen-b2cef5900b6e251bed4bc0a02161fd90646d37f0.tar.xz zen-b2cef5900b6e251bed4bc0a02161fd90646d37f0.zip | |
job queue and async oplog-import/export (#395)
- Feature: New http endpoint for background jobs `/admin/jobs/status` which will return a response listing the currently active background jobs and their status
- Feature: New http endpoint for background jobs information `/admin/jobs/status/{jobid}` which will return a response detailing status, pending messages and progress status
- GET will return a response detailing status, pending messages and progress status
- DELETE will mark the job for cancelling and return without waiting for completion
- If status returned is "Complete" or "Aborted" the jobid will be removed from the server and can not be queried again
- Feature: New zen command `jobs` to list, get info about and cancel background jobs
- If no options are given it will display a list of active background jobs
- `--jobid` accepts an id (returned from for example `oplog-export` with `--async`) and will return a response detailing status, pending messages and progress status for that job
- `--cancel` can be added when `--jobid` is given which will request zenserver to cancel the background job
- Feature: oplog import and export http rpc requests are now async operations that will run in the background
- Feature: `oplog-export` and `oplog-import` now reports progress to the console as work progress by default
- Feature: `oplog-export` and `oplog-import` can now be cancelled using Ctrl+C
- Feature: `oplog-export` and `oplog-import` has a new option `--async` which will only trigger the work and report a background job id back
Diffstat (limited to 'src/zenserver/projectstore/remoteprojectstore.cpp')
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 306 |
1 files changed, 241 insertions, 65 deletions
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index 235166659..aca9410a2 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -71,6 +71,27 @@ private: std::string m_ErrorText; }; +void +ReportProgress(JobContext* OptionalContext, std::string_view CurrentOp, ptrdiff_t Total, ptrdiff_t Remaining) +{ + if (OptionalContext) + { + ZEN_ASSERT(Total > 0); + OptionalContext->Queue.ReportProgress(OptionalContext->Id, CurrentOp, gsl::narrow<uint32_t>((100 * (Total - Remaining)) / Total)); + } + ZEN_INFO("{}", CurrentOp); +} + +void +ReportMessage(JobContext* OptionalContext, std::string_view Message) +{ + if (OptionalContext) + { + OptionalContext->Queue.ReportMessage(OptionalContext->Id, Message); + } + ZEN_INFO("{}", Message); +} + bool IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor) { @@ -201,6 +222,7 @@ BuildContainer(CidStore& ChunkStore, const std::function<void(const IoHash&)>& OnLargeAttachment, const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks, tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>* OutLooseAttachments, + JobContext* OptionalContext, AsyncRemoteResult& RemoteResult) { using namespace std::literals; @@ -217,8 +239,6 @@ BuildContainer(CidStore& ChunkStore, std::vector<Block> Blocks; CompressedBuffer OpsBuffer; - Latch BlockCreateLatch(1); - std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes; size_t BlockSize = 0; @@ -365,11 +385,15 @@ BuildContainer(CidStore& ChunkStore, CB(RewrittenOp); }; - ZEN_INFO("Building exported oplog and fetching attachments"); + ReportMessage(OptionalContext, "Building exported oplog and fetching attachments"); tsl::robin_map<int, std::string> OpLSNToKey; Oplog.IterateOplogWithKey([&](int LSN, const Oid&, CbObject Op) { + if (RemoteResult.IsError()) + { + return; + } std::string_view Key = Op["key"sv].AsString(); OpLSNToKey.insert({LSN, std::string(Key)}); Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert({FieldView.AsAttachment(), LSN}); }); @@ -382,12 +406,23 @@ BuildContainer(CidStore& ChunkStore, SectionOpsWriter << Op; } OpCount++; + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + } }); + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + return {}; + } + if (!Attachments.empty() && !KnownBlocks.empty()) { + ReportMessage(OptionalContext, fmt::format("Checking {} known blocks for reuse", KnownBlocks.size())); + size_t ReusedBlockCount = 0; - ZEN_INFO("Checking {} known blocks for reuse", KnownBlocks.size()); for (const Block& KnownBlock : KnownBlocks) { size_t BlockAttachmentCount = KnownBlock.ChunksInBlock.size(); @@ -428,10 +463,10 @@ BuildContainer(CidStore& ChunkStore, ReusePercent); } } - ZEN_INFO("Reusing {} out of {} known blocks", ReusedBlockCount, KnownBlocks.size()); + ReportMessage(OptionalContext, fmt::format("Reusing {} out of {} known blocks", ReusedBlockCount, KnownBlocks.size())); } - ZEN_INFO("Sorting {} attachments from {} ops", Attachments.size(), OpLSNToKey.size()); + ReportMessage(OptionalContext, fmt::format("Sorting {} attachments from {} ops", Attachments.size(), OpLSNToKey.size())); // Sort attachments so we get predictable blocks for the same oplog upload std::vector<IoHash> SortedAttachments; @@ -456,7 +491,15 @@ BuildContainer(CidStore& ChunkStore, return LhsKeyIt->second < RhsKeyIt->second; }); - ZEN_INFO("Assembling {} attachments from {} ops into blocks and loose attachments", SortedAttachments.size(), OpLSNToKey.size()); + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + return {}; + } + ReportMessage(OptionalContext, + fmt::format("Assembling {} attachments from {} ops into blocks and loose attachments", + SortedAttachments.size(), + OpLSNToKey.size())); auto GetPayload = [&](const IoHash& AttachmentHash) { if (OutLooseAttachments != nullptr) @@ -474,8 +517,20 @@ BuildContainer(CidStore& ChunkStore, size_t GeneratedBlockCount = 0; size_t LargeAttachmentCount = 0; + Latch BlockCreateLatch(1); for (const IoHash& AttachmentHash : SortedAttachments) { + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + BlockCreateLatch.CountDown(); + while (!BlockCreateLatch.Wait(1000)) + { + ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining()); + } + return {}; + } + auto It = Attachments.find(AttachmentHash); ZEN_ASSERT(It != Attachments.end()); IoBuffer Payload = GetPayload(AttachmentHash); @@ -595,19 +650,67 @@ BuildContainer(CidStore& ChunkStore, } SectionOpsWriter.EndArray(); // "ops" - ZEN_INFO("Assembled {} attachments from {} ops into {} blocks and {} loose attachments", - SortedAttachments.size(), - OpLSNToKey.size(), - GeneratedBlockCount, - LargeAttachmentCount); + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + BlockCreateLatch.CountDown(); + while (!BlockCreateLatch.Wait(1000)) + { + ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining()); + } + return {}; + } + ReportMessage(OptionalContext, + fmt::format("Assembled {} attachments from {} ops into {} blocks and {} loose attachments", + SortedAttachments.size(), + OpLSNToKey.size(), + GeneratedBlockCount, + LargeAttachmentCount)); CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer()); - ZEN_DEBUG("Added oplog section {}, {}", CompressedOpsSection.DecodeRawHash(), NiceBytes(CompressedOpsSection.GetCompressedSize())); + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + BlockCreateLatch.CountDown(); + while (!BlockCreateLatch.Wait(1000)) + { + ptrdiff_t Remaining = BlockCreateLatch.Remaining(); + ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", Remaining), GeneratedBlockCount, Remaining); + } + if (GeneratedBlockCount > 0) + { + ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", 0), GeneratedBlockCount, 0); + } + return {}; + } + ReportMessage(OptionalContext, + fmt::format("Added oplog section {}, {}", + CompressedOpsSection.DecodeRawHash(), + NiceBytes(CompressedOpsSection.GetCompressedSize()))); BlockCreateLatch.CountDown(); while (!BlockCreateLatch.Wait(1000)) { - ZEN_INFO("Creating blocks, {} remaining...", BlockCreateLatch.Remaining()); + ptrdiff_t Remaining = BlockCreateLatch.Remaining(); + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + while (!BlockCreateLatch.Wait(1000)) + { + Remaining = BlockCreateLatch.Remaining(); + ReportProgress(OptionalContext, + fmt::format("Aborting, {} blocks remaining...", Remaining), + GeneratedBlockCount, + Remaining); + } + ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", 0), GeneratedBlockCount, 0); + return {}; + } + ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", Remaining), GeneratedBlockCount, Remaining); + } + if (GeneratedBlockCount > 0) + { + ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", 0), GeneratedBlockCount, 0); } if (!RemoteResult.IsError()) @@ -703,6 +806,7 @@ BuildContainer(CidStore& ChunkStore, OnLargeAttachment, OnBlockChunks, OutOptionalTempAttachments, + nullptr, RemoteResult); return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject}; } @@ -717,7 +821,8 @@ SaveOplog(CidStore& ChunkStore, bool EmbedLooseFiles, bool BuildBlocks, bool UseTempBlocks, - bool ForceUpload) + bool ForceUpload, + JobContext* OptionalContext) { using namespace std::literals; @@ -831,7 +936,7 @@ SaveOplog(CidStore& ChunkStore, if (BuildBlocks) { - ZEN_INFO("Loading oplog base container"); + ReportMessage(OptionalContext, "Loading oplog base container"); RemoteProjectStore::LoadContainerResult BaseContainerResult = RemoteStore.LoadBaseContainer(); if (BaseContainerResult.ErrorCode != static_cast<int>(HttpResponseCode::NoContent)) { @@ -864,6 +969,7 @@ SaveOplog(CidStore& ChunkStore, KnownBlocks.push_back({.BlockHash = BlockHash, .ChunksInBlock = std::move(ChunksInBlock)}); }; } + ReportMessage(OptionalContext, fmt::format("Loading oplog base container in {:.3} s", BaseContainerResult.ElapsedSeconds)); } } @@ -880,13 +986,22 @@ SaveOplog(CidStore& ChunkStore, OnLargeAttachment, OnBlockChunks, EmbedLooseFiles ? &TempAttachments : nullptr, + OptionalContext, /* out */ RemoteResult); if (!RemoteResult.IsError()) { + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteProjectStore::Result Result = {.ErrorCode = 0, + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, + .Text = "Operation cancelled"}; + return Result; + } + uint64_t ChunkCount = OplogContainerObject["chunks"sv].AsArrayView().Num(); uint64_t BlockCount = OplogContainerObject["blocks"sv].AsArrayView().Num(); - ZEN_INFO("Saving oplog container with {} attachments and {} blocks...", ChunkCount, BlockCount); + ReportMessage(OptionalContext, fmt::format("Saving oplog container with {} attachments and {} blocks...", ChunkCount, BlockCount)); RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer()); if (ContainerSaveResult.ErrorCode) @@ -901,7 +1016,16 @@ SaveOplog(CidStore& ChunkStore, if (!ContainerSaveResult.Needs.empty() || ForceUpload) { - ZEN_INFO("Filtering needed attachments..."); + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteProjectStore::Result Result = {.ErrorCode = 0, + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, + .Text = "Operation cancelled"}; + return Result; + } + + ReportMessage(OptionalContext, "Filtering needed attachments..."); + std::vector<IoHash> NeededLargeAttachments; std::unordered_set<IoHash, IoHash::Hasher> NeededOtherAttachments; NeededLargeAttachments.reserve(LargeAttachments.size()); @@ -924,10 +1048,18 @@ SaveOplog(CidStore& ChunkStore, } } - Latch SaveAttachmentsLatch(1); + ptrdiff_t AttachmentsToSave(0); + Latch SaveAttachmentsLatch(1); if (!NeededLargeAttachments.empty()) { - ZEN_INFO("Saving large attachments..."); + if (OptionalContext && OptionalContext->CancelFlag) + { + RemoteProjectStore::Result Result = {.ErrorCode = 0, + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, + .Text = "Operation cancelled"}; + return Result; + } + ReportMessage(OptionalContext, "Saving large attachments..."); for (const IoHash& RawHash : NeededLargeAttachments) { if (RemoteResult.IsError()) @@ -946,6 +1078,7 @@ SaveOplog(CidStore& ChunkStore, } SaveAttachmentsLatch.AddCount(1); + AttachmentsToSave++; WorkerPool.ScheduleWork([&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, @@ -994,7 +1127,7 @@ SaveOplog(CidStore& ChunkStore, if (!CreatedBlocks.empty()) { - ZEN_INFO("Saving created block attachments..."); + ReportMessage(OptionalContext, "Saving created block attachments..."); for (auto& It : CreatedBlocks) { if (RemoteResult.IsError()) @@ -1007,6 +1140,7 @@ SaveOplog(CidStore& ChunkStore, IoBuffer Payload = It.second; ZEN_ASSERT(Payload); SaveAttachmentsLatch.AddCount(1); + AttachmentsToSave++; WorkerPool.ScheduleWork( [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash]() { auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); @@ -1041,7 +1175,7 @@ SaveOplog(CidStore& ChunkStore, if (!BlockChunks.empty()) { - ZEN_INFO("Saving chunk block attachments..."); + ReportMessage(OptionalContext, "Saving chunk block attachments..."); for (const std::vector<IoHash>& Chunks : BlockChunks) { if (RemoteResult.IsError()) @@ -1069,6 +1203,7 @@ SaveOplog(CidStore& ChunkStore, } } SaveAttachmentsLatch.AddCount(1); + AttachmentsToSave++; WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &SaveAttachmentsLatch, @@ -1111,14 +1246,28 @@ SaveOplog(CidStore& ChunkStore, SaveAttachmentsLatch.CountDown(); while (!SaveAttachmentsLatch.Wait(1000)) { - ZEN_INFO("Saving attachments, {} remaining...", SaveAttachmentsLatch.Remaining()); + ptrdiff_t Remaining = SaveAttachmentsLatch.Remaining(); + if (OptionalContext && OptionalContext->CancelFlag) + { + if (!RemoteResult.IsError()) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + } + } + ReportProgress(OptionalContext, + fmt::format("Saving attachments, {} remaining...", Remaining), + AttachmentsToSave, + Remaining); + } + if (AttachmentsToSave > 0) + { + ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", 0), AttachmentsToSave, 0); } - SaveAttachmentsLatch.Wait(); } if (!RemoteResult.IsError()) { - ZEN_INFO("Finalizing oplog container..."); + ReportMessage(OptionalContext, "Finalizing oplog container..."); RemoteProjectStore::Result ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash); if (ContainerFinalizeResult.ErrorCode) { @@ -1145,13 +1294,15 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, const CbObject& ContainerObject, const std::function<bool(const IoHash& RawHash)>& HasAttachment, const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, - const std::function<void(const IoHash& RawHash)>& OnNeedAttachment) + const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, + JobContext* OptionalContext) { using namespace std::literals; Stopwatch Timer; - CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView(); + size_t NeedAttachmentCount = 0; + CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView(); for (CbFieldView LargeChunksField : LargeChunksArray) { IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment(); @@ -1161,8 +1312,10 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, } OnNeedAttachment(AttachmentHash); }; + ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachements", NeedAttachmentCount, LargeChunksArray.Num())); - CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); + size_t NeedBlockCount = 0; + CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); for (CbFieldView BlockField : BlocksArray) { CbObjectView BlockView = BlockField.AsObjectView(); @@ -1202,6 +1355,7 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, break; } }; + ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num())); MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView(); IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize()); @@ -1218,6 +1372,7 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, } CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); + ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpsArray.Num())); for (CbFieldView OpEntry : OpsArray) { CbObjectView Core = OpEntry.AsObjectView(); @@ -1240,7 +1395,11 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, } RemoteProjectStore::Result -LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload) +LoadOplog(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + ProjectStore::Oplog& Oplog, + bool ForceDownload, + JobContext* OptionalContext) { using namespace std::literals; @@ -1263,20 +1422,23 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::O .Reason = LoadContainerResult.Reason, .Text = LoadContainerResult.Text}; } - ZEN_DEBUG("Loaded container in {}", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000))); + ReportMessage(OptionalContext, + fmt::format("Loaded container in {}", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)))); - AsyncRemoteResult RemoteResult; - Latch AttachmentsWorkLatch(1); + AsyncRemoteResult RemoteResult; + Latch AttachmentsWorkLatch(1); + std::atomic_size_t AttachmentCount = 0; auto HasAttachment = [&ChunkStore, ForceDownload](const IoHash& RawHash) { return !ForceDownload && ChunkStore.ContainsChunk(RawHash); }; - auto OnNeedBlock = [&RemoteStore, &ChunkStore, &WorkerPool, &ChunksInBlocks, &AttachmentsWorkLatch, &RemoteResult]( + auto OnNeedBlock = [&RemoteStore, &ChunkStore, &WorkerPool, &ChunksInBlocks, &AttachmentsWorkLatch, &AttachmentCount, &RemoteResult]( const IoHash& BlockHash, std::vector<IoHash>&& Chunks) { if (BlockHash == IoHash::Zero) { AttachmentsWorkLatch.AddCount(1); + AttachmentCount.fetch_add(1); WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &AttachmentsWorkLatch, &RemoteResult, Chunks = std::move(Chunks)]() { auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); if (RemoteResult.IsError()) @@ -1305,6 +1467,7 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::O return; } AttachmentsWorkLatch.AddCount(1); + AttachmentCount.fetch_add(1); WorkerPool.ScheduleWork([&AttachmentsWorkLatch, &ChunkStore, &RemoteStore, BlockHash, &RemoteResult]() { auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); if (RemoteResult.IsError()) @@ -1339,57 +1502,70 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::O }); }; - auto OnNeedAttachment = - [&RemoteStore, &ChunkStore, &WorkerPool, &AttachmentsWorkLatch, &RemoteResult, &Attachments](const IoHash& RawHash) { - if (!Attachments.insert(RawHash).second) + auto OnNeedAttachment = [&RemoteStore, &ChunkStore, &WorkerPool, &AttachmentsWorkLatch, &RemoteResult, &Attachments, &AttachmentCount]( + const IoHash& RawHash) { + if (!Attachments.insert(RawHash).second) + { + return; + } + + AttachmentsWorkLatch.AddCount(1); + AttachmentCount.fetch_add(1); + WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash]() { + auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + if (RemoteResult.IsError()) { return; } - - AttachmentsWorkLatch.AddCount(1); - WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash]() { - auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); - if (AttachmentResult.ErrorCode) - { - RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); - ZEN_ERROR("Failed to download attachment {}, reason: '{}', error code: {}", - RawHash, - AttachmentResult.Reason, - AttachmentResult.ErrorCode); - return; - } - ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000))); - ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash); - }); - }; + RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); + if (AttachmentResult.ErrorCode) + { + RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); + ZEN_ERROR("Failed to download attachment {}, reason: '{}', error code: {}", + RawHash, + AttachmentResult.Reason, + AttachmentResult.ErrorCode); + return; + } + ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000))); + ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash); + }); + }; RemoteProjectStore::Result Result = - SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment); + SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, OptionalContext); if (!Attachments.empty()) { - ZEN_INFO("Found {} attachments to download", Attachments.size()); + ReportMessage(OptionalContext, fmt::format("Found {} attachments to download", Attachments.size())); } AttachmentsWorkLatch.CountDown(); while (!AttachmentsWorkLatch.Wait(1000)) { - ZEN_INFO("Loading attachments, {} remaining...", AttachmentsWorkLatch.Remaining()); + ptrdiff_t Remaining = AttachmentsWorkLatch.Remaining(); + if (OptionalContext && OptionalContext->CancelFlag) + { + if (!RemoteResult.IsError()) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + } + } + ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", Remaining), AttachmentCount.load(), Remaining); + } + if (AttachmentCount.load() > 0) + { + ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", 0), AttachmentCount.load(), 0); } - AttachmentsWorkLatch.Wait(); if (Result.ErrorCode == 0) { Result = RemoteResult.ConvertResult(); } Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - ZEN_INFO("Loaded oplog {} in {}", - RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0))); + ReportMessage(OptionalContext, + fmt::format("Loaded oplog {} in {}", + RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)))); return Result; } |