aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore/remoteprojectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-09-13 16:13:30 -0400
committerGitHub <[email protected]>2023-09-13 22:13:30 +0200
commitb2cef5900b6e251bed4bc0a02161fd90646d37f0 (patch)
treee9085a92e9499bca55dfda9b63779be94218409f /src/zenserver/projectstore/remoteprojectstore.cpp
parentscan oplog object for fields (#397) (diff)
downloadzen-b2cef5900b6e251bed4bc0a02161fd90646d37f0.tar.xz
zen-b2cef5900b6e251bed4bc0a02161fd90646d37f0.zip
job queue and async oplog-import/export (#395)
- Feature: New http endpoint for background jobs `/admin/jobs/status` which will return a response listing the currently active background jobs and their status - Feature: New http endpoint for background jobs information `/admin/jobs/status/{jobid}` which will return a response detailing status, pending messages and progress status - GET will return a response detailing status, pending messages and progress status - DELETE will mark the job for cancelling and return without waiting for completion - If status returned is "Complete" or "Aborted" the jobid will be removed from the server and can not be queried again - Feature: New zen command `jobs` to list, get info about and cancel background jobs - If no options are given it will display a list of active background jobs - `--jobid` accepts an id (returned from for example `oplog-export` with `--async`) and will return a response detailing status, pending messages and progress status for that job - `--cancel` can be added when `--jobid` is given which will request zenserver to cancel the background job - Feature: oplog import and export http rpc requests are now async operations that will run in the background - Feature: `oplog-export` and `oplog-import` now reports progress to the console as work progress by default - Feature: `oplog-export` and `oplog-import` can now be cancelled using Ctrl+C - Feature: `oplog-export` and `oplog-import` has a new option `--async` which will only trigger the work and report a background job id back
Diffstat (limited to 'src/zenserver/projectstore/remoteprojectstore.cpp')
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp306
1 files changed, 241 insertions, 65 deletions
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index 235166659..aca9410a2 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -71,6 +71,27 @@ private:
std::string m_ErrorText;
};
+void
+ReportProgress(JobContext* OptionalContext, std::string_view CurrentOp, ptrdiff_t Total, ptrdiff_t Remaining)
+{
+ if (OptionalContext)
+ {
+ ZEN_ASSERT(Total > 0);
+ OptionalContext->Queue.ReportProgress(OptionalContext->Id, CurrentOp, gsl::narrow<uint32_t>((100 * (Total - Remaining)) / Total));
+ }
+ ZEN_INFO("{}", CurrentOp);
+}
+
+void
+ReportMessage(JobContext* OptionalContext, std::string_view Message)
+{
+ if (OptionalContext)
+ {
+ OptionalContext->Queue.ReportMessage(OptionalContext->Id, Message);
+ }
+ ZEN_INFO("{}", Message);
+}
+
bool
IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
{
@@ -201,6 +222,7 @@ BuildContainer(CidStore& ChunkStore,
const std::function<void(const IoHash&)>& OnLargeAttachment,
const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks,
tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>* OutLooseAttachments,
+ JobContext* OptionalContext,
AsyncRemoteResult& RemoteResult)
{
using namespace std::literals;
@@ -217,8 +239,6 @@ BuildContainer(CidStore& ChunkStore,
std::vector<Block> Blocks;
CompressedBuffer OpsBuffer;
- Latch BlockCreateLatch(1);
-
std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes;
size_t BlockSize = 0;
@@ -365,11 +385,15 @@ BuildContainer(CidStore& ChunkStore,
CB(RewrittenOp);
};
- ZEN_INFO("Building exported oplog and fetching attachments");
+ ReportMessage(OptionalContext, "Building exported oplog and fetching attachments");
tsl::robin_map<int, std::string> OpLSNToKey;
Oplog.IterateOplogWithKey([&](int LSN, const Oid&, CbObject Op) {
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
std::string_view Key = Op["key"sv].AsString();
OpLSNToKey.insert({LSN, std::string(Key)});
Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert({FieldView.AsAttachment(), LSN}); });
@@ -382,12 +406,23 @@ BuildContainer(CidStore& ChunkStore,
SectionOpsWriter << Op;
}
OpCount++;
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ }
});
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ return {};
+ }
+
if (!Attachments.empty() && !KnownBlocks.empty())
{
+ ReportMessage(OptionalContext, fmt::format("Checking {} known blocks for reuse", KnownBlocks.size()));
+
size_t ReusedBlockCount = 0;
- ZEN_INFO("Checking {} known blocks for reuse", KnownBlocks.size());
for (const Block& KnownBlock : KnownBlocks)
{
size_t BlockAttachmentCount = KnownBlock.ChunksInBlock.size();
@@ -428,10 +463,10 @@ BuildContainer(CidStore& ChunkStore,
ReusePercent);
}
}
- ZEN_INFO("Reusing {} out of {} known blocks", ReusedBlockCount, KnownBlocks.size());
+ ReportMessage(OptionalContext, fmt::format("Reusing {} out of {} known blocks", ReusedBlockCount, KnownBlocks.size()));
}
- ZEN_INFO("Sorting {} attachments from {} ops", Attachments.size(), OpLSNToKey.size());
+ ReportMessage(OptionalContext, fmt::format("Sorting {} attachments from {} ops", Attachments.size(), OpLSNToKey.size()));
// Sort attachments so we get predictable blocks for the same oplog upload
std::vector<IoHash> SortedAttachments;
@@ -456,7 +491,15 @@ BuildContainer(CidStore& ChunkStore,
return LhsKeyIt->second < RhsKeyIt->second;
});
- ZEN_INFO("Assembling {} attachments from {} ops into blocks and loose attachments", SortedAttachments.size(), OpLSNToKey.size());
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ return {};
+ }
+ ReportMessage(OptionalContext,
+ fmt::format("Assembling {} attachments from {} ops into blocks and loose attachments",
+ SortedAttachments.size(),
+ OpLSNToKey.size()));
auto GetPayload = [&](const IoHash& AttachmentHash) {
if (OutLooseAttachments != nullptr)
@@ -474,8 +517,20 @@ BuildContainer(CidStore& ChunkStore,
size_t GeneratedBlockCount = 0;
size_t LargeAttachmentCount = 0;
+ Latch BlockCreateLatch(1);
for (const IoHash& AttachmentHash : SortedAttachments)
{
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ BlockCreateLatch.CountDown();
+ while (!BlockCreateLatch.Wait(1000))
+ {
+ ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining());
+ }
+ return {};
+ }
+
auto It = Attachments.find(AttachmentHash);
ZEN_ASSERT(It != Attachments.end());
IoBuffer Payload = GetPayload(AttachmentHash);
@@ -595,19 +650,67 @@ BuildContainer(CidStore& ChunkStore,
}
SectionOpsWriter.EndArray(); // "ops"
- ZEN_INFO("Assembled {} attachments from {} ops into {} blocks and {} loose attachments",
- SortedAttachments.size(),
- OpLSNToKey.size(),
- GeneratedBlockCount,
- LargeAttachmentCount);
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ BlockCreateLatch.CountDown();
+ while (!BlockCreateLatch.Wait(1000))
+ {
+ ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining());
+ }
+ return {};
+ }
+ ReportMessage(OptionalContext,
+ fmt::format("Assembled {} attachments from {} ops into {} blocks and {} loose attachments",
+ SortedAttachments.size(),
+ OpLSNToKey.size(),
+ GeneratedBlockCount,
+ LargeAttachmentCount));
CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer());
- ZEN_DEBUG("Added oplog section {}, {}", CompressedOpsSection.DecodeRawHash(), NiceBytes(CompressedOpsSection.GetCompressedSize()));
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ BlockCreateLatch.CountDown();
+ while (!BlockCreateLatch.Wait(1000))
+ {
+ ptrdiff_t Remaining = BlockCreateLatch.Remaining();
+ ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", Remaining), GeneratedBlockCount, Remaining);
+ }
+ if (GeneratedBlockCount > 0)
+ {
+ ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", 0), GeneratedBlockCount, 0);
+ }
+ return {};
+ }
+ ReportMessage(OptionalContext,
+ fmt::format("Added oplog section {}, {}",
+ CompressedOpsSection.DecodeRawHash(),
+ NiceBytes(CompressedOpsSection.GetCompressedSize())));
BlockCreateLatch.CountDown();
while (!BlockCreateLatch.Wait(1000))
{
- ZEN_INFO("Creating blocks, {} remaining...", BlockCreateLatch.Remaining());
+ ptrdiff_t Remaining = BlockCreateLatch.Remaining();
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ while (!BlockCreateLatch.Wait(1000))
+ {
+ Remaining = BlockCreateLatch.Remaining();
+ ReportProgress(OptionalContext,
+ fmt::format("Aborting, {} blocks remaining...", Remaining),
+ GeneratedBlockCount,
+ Remaining);
+ }
+ ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", 0), GeneratedBlockCount, 0);
+ return {};
+ }
+ ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", Remaining), GeneratedBlockCount, Remaining);
+ }
+ if (GeneratedBlockCount > 0)
+ {
+ ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", 0), GeneratedBlockCount, 0);
}
if (!RemoteResult.IsError())
@@ -703,6 +806,7 @@ BuildContainer(CidStore& ChunkStore,
OnLargeAttachment,
OnBlockChunks,
OutOptionalTempAttachments,
+ nullptr,
RemoteResult);
return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject};
}
@@ -717,7 +821,8 @@ SaveOplog(CidStore& ChunkStore,
bool EmbedLooseFiles,
bool BuildBlocks,
bool UseTempBlocks,
- bool ForceUpload)
+ bool ForceUpload,
+ JobContext* OptionalContext)
{
using namespace std::literals;
@@ -831,7 +936,7 @@ SaveOplog(CidStore& ChunkStore,
if (BuildBlocks)
{
- ZEN_INFO("Loading oplog base container");
+ ReportMessage(OptionalContext, "Loading oplog base container");
RemoteProjectStore::LoadContainerResult BaseContainerResult = RemoteStore.LoadBaseContainer();
if (BaseContainerResult.ErrorCode != static_cast<int>(HttpResponseCode::NoContent))
{
@@ -864,6 +969,7 @@ SaveOplog(CidStore& ChunkStore,
KnownBlocks.push_back({.BlockHash = BlockHash, .ChunksInBlock = std::move(ChunksInBlock)});
};
}
+ ReportMessage(OptionalContext, fmt::format("Loading oplog base container in {:.3} s", BaseContainerResult.ElapsedSeconds));
}
}
@@ -880,13 +986,22 @@ SaveOplog(CidStore& ChunkStore,
OnLargeAttachment,
OnBlockChunks,
EmbedLooseFiles ? &TempAttachments : nullptr,
+ OptionalContext,
/* out */ RemoteResult);
if (!RemoteResult.IsError())
{
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteProjectStore::Result Result = {.ErrorCode = 0,
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
+ .Text = "Operation cancelled"};
+ return Result;
+ }
+
uint64_t ChunkCount = OplogContainerObject["chunks"sv].AsArrayView().Num();
uint64_t BlockCount = OplogContainerObject["blocks"sv].AsArrayView().Num();
- ZEN_INFO("Saving oplog container with {} attachments and {} blocks...", ChunkCount, BlockCount);
+ ReportMessage(OptionalContext, fmt::format("Saving oplog container with {} attachments and {} blocks...", ChunkCount, BlockCount));
RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer());
if (ContainerSaveResult.ErrorCode)
@@ -901,7 +1016,16 @@ SaveOplog(CidStore& ChunkStore,
if (!ContainerSaveResult.Needs.empty() || ForceUpload)
{
- ZEN_INFO("Filtering needed attachments...");
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteProjectStore::Result Result = {.ErrorCode = 0,
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
+ .Text = "Operation cancelled"};
+ return Result;
+ }
+
+ ReportMessage(OptionalContext, "Filtering needed attachments...");
+
std::vector<IoHash> NeededLargeAttachments;
std::unordered_set<IoHash, IoHash::Hasher> NeededOtherAttachments;
NeededLargeAttachments.reserve(LargeAttachments.size());
@@ -924,10 +1048,18 @@ SaveOplog(CidStore& ChunkStore,
}
}
- Latch SaveAttachmentsLatch(1);
+ ptrdiff_t AttachmentsToSave(0);
+ Latch SaveAttachmentsLatch(1);
if (!NeededLargeAttachments.empty())
{
- ZEN_INFO("Saving large attachments...");
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ RemoteProjectStore::Result Result = {.ErrorCode = 0,
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
+ .Text = "Operation cancelled"};
+ return Result;
+ }
+ ReportMessage(OptionalContext, "Saving large attachments...");
for (const IoHash& RawHash : NeededLargeAttachments)
{
if (RemoteResult.IsError())
@@ -946,6 +1078,7 @@ SaveOplog(CidStore& ChunkStore,
}
SaveAttachmentsLatch.AddCount(1);
+ AttachmentsToSave++;
WorkerPool.ScheduleWork([&ChunkStore,
&RemoteStore,
&SaveAttachmentsLatch,
@@ -994,7 +1127,7 @@ SaveOplog(CidStore& ChunkStore,
if (!CreatedBlocks.empty())
{
- ZEN_INFO("Saving created block attachments...");
+ ReportMessage(OptionalContext, "Saving created block attachments...");
for (auto& It : CreatedBlocks)
{
if (RemoteResult.IsError())
@@ -1007,6 +1140,7 @@ SaveOplog(CidStore& ChunkStore,
IoBuffer Payload = It.second;
ZEN_ASSERT(Payload);
SaveAttachmentsLatch.AddCount(1);
+ AttachmentsToSave++;
WorkerPool.ScheduleWork(
[&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash]() {
auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
@@ -1041,7 +1175,7 @@ SaveOplog(CidStore& ChunkStore,
if (!BlockChunks.empty())
{
- ZEN_INFO("Saving chunk block attachments...");
+ ReportMessage(OptionalContext, "Saving chunk block attachments...");
for (const std::vector<IoHash>& Chunks : BlockChunks)
{
if (RemoteResult.IsError())
@@ -1069,6 +1203,7 @@ SaveOplog(CidStore& ChunkStore,
}
}
SaveAttachmentsLatch.AddCount(1);
+ AttachmentsToSave++;
WorkerPool.ScheduleWork([&RemoteStore,
&ChunkStore,
&SaveAttachmentsLatch,
@@ -1111,14 +1246,28 @@ SaveOplog(CidStore& ChunkStore,
SaveAttachmentsLatch.CountDown();
while (!SaveAttachmentsLatch.Wait(1000))
{
- ZEN_INFO("Saving attachments, {} remaining...", SaveAttachmentsLatch.Remaining());
+ ptrdiff_t Remaining = SaveAttachmentsLatch.Remaining();
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ if (!RemoteResult.IsError())
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ }
+ }
+ ReportProgress(OptionalContext,
+ fmt::format("Saving attachments, {} remaining...", Remaining),
+ AttachmentsToSave,
+ Remaining);
+ }
+ if (AttachmentsToSave > 0)
+ {
+ ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", 0), AttachmentsToSave, 0);
}
- SaveAttachmentsLatch.Wait();
}
if (!RemoteResult.IsError())
{
- ZEN_INFO("Finalizing oplog container...");
+ ReportMessage(OptionalContext, "Finalizing oplog container...");
RemoteProjectStore::Result ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash);
if (ContainerFinalizeResult.ErrorCode)
{
@@ -1145,13 +1294,15 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
const CbObject& ContainerObject,
const std::function<bool(const IoHash& RawHash)>& HasAttachment,
const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
- const std::function<void(const IoHash& RawHash)>& OnNeedAttachment)
+ const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
+ JobContext* OptionalContext)
{
using namespace std::literals;
Stopwatch Timer;
- CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView();
+ size_t NeedAttachmentCount = 0;
+ CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView();
for (CbFieldView LargeChunksField : LargeChunksArray)
{
IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment();
@@ -1161,8 +1312,10 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
}
OnNeedAttachment(AttachmentHash);
};
+ ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachements", NeedAttachmentCount, LargeChunksArray.Num()));
- CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
+ size_t NeedBlockCount = 0;
+ CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
for (CbFieldView BlockField : BlocksArray)
{
CbObjectView BlockView = BlockField.AsObjectView();
@@ -1202,6 +1355,7 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
break;
}
};
+ ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num()));
MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView();
IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize());
@@ -1218,6 +1372,7 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
}
CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView();
+ ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpsArray.Num()));
for (CbFieldView OpEntry : OpsArray)
{
CbObjectView Core = OpEntry.AsObjectView();
@@ -1240,7 +1395,11 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
}
RemoteProjectStore::Result
-LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload)
+LoadOplog(CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ ProjectStore::Oplog& Oplog,
+ bool ForceDownload,
+ JobContext* OptionalContext)
{
using namespace std::literals;
@@ -1263,20 +1422,23 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::O
.Reason = LoadContainerResult.Reason,
.Text = LoadContainerResult.Text};
}
- ZEN_DEBUG("Loaded container in {}", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)));
+ ReportMessage(OptionalContext,
+ fmt::format("Loaded container in {}", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000))));
- AsyncRemoteResult RemoteResult;
- Latch AttachmentsWorkLatch(1);
+ AsyncRemoteResult RemoteResult;
+ Latch AttachmentsWorkLatch(1);
+ std::atomic_size_t AttachmentCount = 0;
auto HasAttachment = [&ChunkStore, ForceDownload](const IoHash& RawHash) {
return !ForceDownload && ChunkStore.ContainsChunk(RawHash);
};
- auto OnNeedBlock = [&RemoteStore, &ChunkStore, &WorkerPool, &ChunksInBlocks, &AttachmentsWorkLatch, &RemoteResult](
+ auto OnNeedBlock = [&RemoteStore, &ChunkStore, &WorkerPool, &ChunksInBlocks, &AttachmentsWorkLatch, &AttachmentCount, &RemoteResult](
const IoHash& BlockHash,
std::vector<IoHash>&& Chunks) {
if (BlockHash == IoHash::Zero)
{
AttachmentsWorkLatch.AddCount(1);
+ AttachmentCount.fetch_add(1);
WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &AttachmentsWorkLatch, &RemoteResult, Chunks = std::move(Chunks)]() {
auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
if (RemoteResult.IsError())
@@ -1305,6 +1467,7 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::O
return;
}
AttachmentsWorkLatch.AddCount(1);
+ AttachmentCount.fetch_add(1);
WorkerPool.ScheduleWork([&AttachmentsWorkLatch, &ChunkStore, &RemoteStore, BlockHash, &RemoteResult]() {
auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
if (RemoteResult.IsError())
@@ -1339,57 +1502,70 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::O
});
};
- auto OnNeedAttachment =
- [&RemoteStore, &ChunkStore, &WorkerPool, &AttachmentsWorkLatch, &RemoteResult, &Attachments](const IoHash& RawHash) {
- if (!Attachments.insert(RawHash).second)
+ auto OnNeedAttachment = [&RemoteStore, &ChunkStore, &WorkerPool, &AttachmentsWorkLatch, &RemoteResult, &Attachments, &AttachmentCount](
+ const IoHash& RawHash) {
+ if (!Attachments.insert(RawHash).second)
+ {
+ return;
+ }
+
+ AttachmentsWorkLatch.AddCount(1);
+ AttachmentCount.fetch_add(1);
+ WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash]() {
+ auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
+ if (RemoteResult.IsError())
{
return;
}
-
- AttachmentsWorkLatch.AddCount(1);
- WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash]() {
- auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
- if (AttachmentResult.ErrorCode)
- {
- RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
- ZEN_ERROR("Failed to download attachment {}, reason: '{}', error code: {}",
- RawHash,
- AttachmentResult.Reason,
- AttachmentResult.ErrorCode);
- return;
- }
- ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)));
- ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash);
- });
- };
+ RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
+ if (AttachmentResult.ErrorCode)
+ {
+ RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
+ ZEN_ERROR("Failed to download attachment {}, reason: '{}', error code: {}",
+ RawHash,
+ AttachmentResult.Reason,
+ AttachmentResult.ErrorCode);
+ return;
+ }
+ ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)));
+ ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash);
+ });
+ };
RemoteProjectStore::Result Result =
- SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment);
+ SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, OptionalContext);
if (!Attachments.empty())
{
- ZEN_INFO("Found {} attachments to download", Attachments.size());
+ ReportMessage(OptionalContext, fmt::format("Found {} attachments to download", Attachments.size()));
}
AttachmentsWorkLatch.CountDown();
while (!AttachmentsWorkLatch.Wait(1000))
{
- ZEN_INFO("Loading attachments, {} remaining...", AttachmentsWorkLatch.Remaining());
+ ptrdiff_t Remaining = AttachmentsWorkLatch.Remaining();
+ if (OptionalContext && OptionalContext->CancelFlag)
+ {
+ if (!RemoteResult.IsError())
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ }
+ }
+ ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", Remaining), AttachmentCount.load(), Remaining);
+ }
+ if (AttachmentCount.load() > 0)
+ {
+ ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", 0), AttachmentCount.load(), 0);
}
- AttachmentsWorkLatch.Wait();
if (Result.ErrorCode == 0)
{
Result = RemoteResult.ConvertResult();
}
Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
- ZEN_INFO("Loaded oplog {} in {}",
- RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)));
+ ReportMessage(OptionalContext,
+ fmt::format("Loaded oplog {} in {}",
+ RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0))));
return Result;
}