aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-03-11 16:01:29 +0100
committerGitHub Enterprise <[email protected]>2026-03-11 16:01:29 +0100
commit57816d04b61f6bdc1403583201246abd5883c457 (patch)
tree6148371fcc7d98702f0b121a3f40efe4e6bc08d4 /src
parentblock scavenge of other downloads that uses an older state file (#822) (diff)
downloadzen-57816d04b61f6bdc1403583201246abd5883c457.tar.xz
zen-57816d04b61f6bdc1403583201246abd5883c457.zip
improved oplog import progress reporting (#825)
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/projectstore_cmd.cpp18
-rw-r--r--src/zen/progressbar.cpp2
-rw-r--r--src/zen/progressbar.h5
-rw-r--r--src/zencore/include/zencore/jobqueue.h7
-rw-r--r--src/zencore/jobqueue.cpp56
-rw-r--r--src/zenremotestore/projectstore/remoteprojectstore.cpp317
-rw-r--r--src/zenserver/storage/admin/admin.cpp4
7 files changed, 251 insertions, 158 deletions
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp
index db931e49a..c0780c7e8 100644
--- a/src/zen/cmds/projectstore_cmd.cpp
+++ b/src/zen/cmds/projectstore_cmd.cpp
@@ -163,10 +163,11 @@ namespace projectstore_impl {
if (Status == "Running")
{
- std::string_view CurrentOp = StatusObject["Op"sv].AsString();
- std::string_view CurrentOpDetails = StatusObject["Details"sv].AsString();
- uint64_t TotalCount = StatusObject["TotalCount"sv].AsUInt64();
- uint64_t RemainingCount = StatusObject["RemainingCount"sv].AsUInt64();
+ std::string_view CurrentOp = StatusObject["Op"sv].AsString();
+ std::string_view CurrentOpDetails = StatusObject["Details"sv].AsString();
+ uint64_t TotalCount = StatusObject["TotalCount"sv].AsUInt64();
+ uint64_t RemainingCount = StatusObject["RemainingCount"sv].AsUInt64();
+ uint64_t ProgressElapsedTimeMs = StatusObject["ProgressElapsedTimeMs"sv].AsUInt64((uint64_t)-1);
if (!ProgressBar.IsSameTask(CurrentOp))
{
@@ -179,10 +180,11 @@ namespace projectstore_impl {
MessagesDone = true;
}
- ProgressBar.UpdateState({.Task = std::string(CurrentOp),
- .Details = std::string(CurrentOpDetails),
- .TotalCount = TotalCount,
- .RemainingCount = RemainingCount},
+ ProgressBar.UpdateState({.Task = std::string(CurrentOp),
+ .Details = std::string(CurrentOpDetails),
+ .TotalCount = TotalCount,
+ .RemainingCount = RemainingCount,
+ .OptionalElapsedTime = ProgressElapsedTimeMs},
false);
}
diff --git a/src/zen/progressbar.cpp b/src/zen/progressbar.cpp
index b758c061b..961375c0b 100644
--- a/src/zen/progressbar.cpp
+++ b/src/zen/progressbar.cpp
@@ -156,7 +156,7 @@ ProgressBar::UpdateState(const State& NewState, bool DoLinebreak)
return;
}
- uint64_t ElapsedTimeMS = m_SW.GetElapsedTimeMs();
+ uint64_t ElapsedTimeMS = NewState.OptionalElapsedTime == (uint64_t)-1 ? m_SW.GetElapsedTimeMs() : NewState.OptionalElapsedTime;
if (m_LastUpdateMS != (uint64_t)-1)
{
if (!DoLinebreak && (NewState.Status == m_State.Status) && (NewState.Task == m_State.Task) &&
diff --git a/src/zen/progressbar.h b/src/zen/progressbar.h
index cb1c7023b..b54c009e1 100644
--- a/src/zen/progressbar.h
+++ b/src/zen/progressbar.h
@@ -19,8 +19,9 @@ public:
bool operator==(const State&) const = default;
std::string Task;
std::string Details;
- uint64_t TotalCount = 0;
- uint64_t RemainingCount = 0;
+ uint64_t TotalCount = 0;
+ uint64_t RemainingCount = 0;
+ uint64_t OptionalElapsedTime = (uint64_t)-1;
enum class EStatus
{
Running,
diff --git a/src/zencore/include/zencore/jobqueue.h b/src/zencore/include/zencore/jobqueue.h
index d348bd021..6631e5766 100644
--- a/src/zencore/include/zencore/jobqueue.h
+++ b/src/zencore/include/zencore/jobqueue.h
@@ -25,7 +25,11 @@ public:
virtual bool IsCancelled() const = 0;
virtual void ReportMessage(std::string_view Message) = 0;
// virtual void ReportProgress(std::string_view CurrentOp, uint32_t CurrentOpPercentComplete) = 0;
- virtual void ReportProgress(std::string_view CurrentOp, std::string_view Details, ptrdiff_t TotalCount, ptrdiff_t RemainingCount) = 0;
+ virtual void ReportProgress(std::string_view CurrentOp,
+ std::string_view Details,
+ ptrdiff_t TotalCount,
+ ptrdiff_t RemainingCount,
+ uint64_t ElapsedTimeMs) = 0;
};
class JobError : public std::runtime_error
@@ -62,6 +66,7 @@ public:
std::string CurrentOpDetails;
ptrdiff_t TotalCount;
ptrdiff_t RemainingCount;
+ uint64_t ProgressElapsedTimeMs;
std::vector<std::string> Messages;
std::string AbortReason;
};
diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp
index d6a8a6479..3e58fb97d 100644
--- a/src/zencore/jobqueue.cpp
+++ b/src/zencore/jobqueue.cpp
@@ -57,9 +57,10 @@ public:
virtual void ReportProgress(std::string_view CurrentOp,
std::string_view Details,
ptrdiff_t TotalCount,
- ptrdiff_t RemainingCount) override
+ ptrdiff_t RemainingCount,
+ uint64_t ElapsedTimeMs) override
{
- Queue->ReportProgress(Id, CurrentOp, Details, TotalCount, RemainingCount);
+ Queue->ReportProgress(Id, CurrentOp, Details, TotalCount, RemainingCount, ElapsedTimeMs);
}
};
@@ -265,21 +266,20 @@ public:
virtual std::optional<JobDetails> Get(JobId Id) override
{
auto Convert = [](JobStatus Status, Job& Job) -> JobDetails {
- return JobDetails{
- .Name = Job.Name,
- .Status = Status,
- .State = {.CurrentOp = Job.State.CurrentOp,
- .CurrentOpDetails = Job.State.CurrentOpDetails,
- .TotalCount = Job.State.TotalCount,
- .RemainingCount = Job.State.RemainingCount,
- // .CurrentOpPercentComplete = Job.State.CurrentOpPercentComplete,
- .Messages = std::move(Job.State.Messages),
- .AbortReason = Job.State.AbortReason},
- .CreateTime = JobClock::TimePointFromTick(Job.CreateTick),
- .StartTime = JobClock::TimePointFromTick(Job.StartTick),
- .EndTime = JobClock::TimePointFromTick(Job.EndTick),
- .WorkerThreadId = Job.WorkerThreadId,
- .ReturnCode = Job.ReturnCode};
+ return JobDetails{.Name = Job.Name,
+ .Status = Status,
+ .State = {.CurrentOp = Job.State.CurrentOp,
+ .CurrentOpDetails = Job.State.CurrentOpDetails,
+ .TotalCount = Job.State.TotalCount,
+ .RemainingCount = Job.State.RemainingCount,
+ .ProgressElapsedTimeMs = Job.State.ProgressElapsedTimeMs,
+ .Messages = std::move(Job.State.Messages),
+ .AbortReason = Job.State.AbortReason},
+ .CreateTime = JobClock::TimePointFromTick(Job.CreateTick),
+ .StartTime = JobClock::TimePointFromTick(Job.StartTick),
+ .EndTime = JobClock::TimePointFromTick(Job.EndTick),
+ .WorkerThreadId = Job.WorkerThreadId,
+ .ReturnCode = Job.ReturnCode};
};
std::optional<JobDetails> Result;
@@ -320,15 +320,21 @@ public:
});
}
- void ReportProgress(JobId Id, std::string_view CurrentOp, std::string_view Details, ptrdiff_t TotalCount, ptrdiff_t RemainingCount)
+ void ReportProgress(JobId Id,
+ std::string_view CurrentOp,
+ std::string_view Details,
+ ptrdiff_t TotalCount,
+ ptrdiff_t RemainingCount,
+ uint64_t ElapsedTimeMs)
{
QueueLock.WithExclusiveLock([&]() {
auto It = RunningJobs.find(Id.Id);
ZEN_ASSERT(It != RunningJobs.end());
- It->second->State.CurrentOp = CurrentOp;
- It->second->State.CurrentOpDetails = Details;
- It->second->State.TotalCount = TotalCount;
- It->second->State.RemainingCount = RemainingCount;
+ It->second->State.CurrentOp = CurrentOp;
+ It->second->State.CurrentOpDetails = Details;
+ It->second->State.TotalCount = TotalCount;
+ It->second->State.RemainingCount = RemainingCount;
+ It->second->State.ProgressElapsedTimeMs = ElapsedTimeMs;
});
}
@@ -476,13 +482,13 @@ TEST_CASE("JobQueue")
{
return;
}
- Context.ReportProgress("going to sleep", "", 100, 100);
+ Context.ReportProgress("going to sleep", "", 100, 100, (uint64_t)-1);
Sleep(5);
if (Context.IsCancelled())
{
return;
}
- Context.ReportProgress("going to sleep again", "", 100, 50);
+ Context.ReportProgress("going to sleep again", "", 100, 50, (uint64_t)-1);
if ((I & 0xFF) == 0x10)
{
zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", I));
@@ -492,7 +498,7 @@ TEST_CASE("JobQueue")
{
return;
}
- Context.ReportProgress("done", "", 100, 0);
+ Context.ReportProgress("done", "", 100, 0, (uint64_t)-1);
});
ZEN_UNUSED(Id);
},
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp
index 247bd6cb9..c44b06305 100644
--- a/src/zenremotestore/projectstore/remoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp
@@ -98,12 +98,13 @@ namespace remotestore_impl {
std::string_view CurrentOp,
std::string_view Details,
ptrdiff_t Total,
- ptrdiff_t Remaining)
+ ptrdiff_t Remaining,
+ uint64_t ElapsedTimeMS)
{
if (OptionalContext)
{
ZEN_ASSERT(Total > 0);
- OptionalContext->ReportProgress(CurrentOp, Details, Total, Remaining);
+ OptionalContext->ReportProgress(CurrentOp, Details, Total, Remaining, ElapsedTimeMS);
}
}
@@ -202,7 +203,8 @@ namespace remotestore_impl {
"Writing oplog"sv,
fmt::format("{} remaining...", OpCount - OpsCompleteCount),
OpCount,
- OpCount - OpsCompleteCount);
+ OpCount - OpsCompleteCount,
+ Timer.GetElapsedTimeMs());
};
BinaryWriter Writer;
@@ -226,7 +228,7 @@ namespace remotestore_impl {
if (OpCount > 0)
{
- ReportProgress(OptionalContext, "Writing oplog"sv, ""sv, OpCount, 0);
+ ReportProgress(OptionalContext, "Writing oplog"sv, ""sv, OpCount, 0, Timer.GetElapsedTimeMs());
}
return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0};
@@ -244,6 +246,8 @@ namespace remotestore_impl {
std::atomic<uint64_t> AttachmentsStored = 0;
std::atomic<uint64_t> AttachmentBytesStored = 0;
std::atomic_size_t MissingAttachmentCount = 0;
+
+ std::atomic<uint64_t> ChunksCompleteCount = 0;
};
class JobContextLogOutput : public OperationLogOutput
@@ -580,6 +584,7 @@ namespace remotestore_impl {
UsedSize += WriteAttachmentBuffers[Index].GetSize();
}
}
+ Info.ChunksCompleteCount += WriteAttachmentBuffers.size();
if (UsedSize < BlockSize)
{
ZEN_DEBUG("Used {} (skipping {}) out of {} for block {} ({} %) (use of matching {}%)",
@@ -1036,6 +1041,7 @@ namespace remotestore_impl {
UsedSize += WriteAttachmentBuffers[Index].GetSize();
}
}
+ Info.ChunksCompleteCount += WriteAttachmentBuffers.size();
if (UsedSize < BlockPartSize)
{
ZEN_DEBUG(
@@ -1174,6 +1180,7 @@ namespace remotestore_impl {
Info.AttachmentBytesStored.fetch_add(AttachmentSize);
Info.AttachmentsStored.fetch_add(1);
}
+ Info.ChunksCompleteCount++;
}
catch (const std::exception& Ex)
{
@@ -1568,6 +1575,7 @@ namespace remotestore_impl {
}
}
+ Stopwatch SaveAttachmentsProgressTimer;
SaveAttachmentsLatch.CountDown();
while (!SaveAttachmentsLatch.Wait(1000))
{
@@ -1588,7 +1596,8 @@ namespace remotestore_impl {
Remaining,
GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, PartialTransferWallTimeMS)),
AttachmentsToSave,
- Remaining);
+ Remaining,
+ SaveAttachmentsProgressTimer.GetElapsedTimeMs());
}
uint64_t ElapsedTimeMS = Timer.GetElapsedTimeMs();
if (AttachmentsToSave > 0)
@@ -1597,7 +1606,8 @@ namespace remotestore_impl {
"Saving attachments"sv,
fmt::format("{}", GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, ElapsedTimeMS)),
AttachmentsToSave,
- 0);
+ 0,
+ SaveAttachmentsProgressTimer.GetElapsedTimeMs());
}
ReportMessage(OptionalContext,
fmt::format("Saved {} attachments ({} blocks, {} attachments, {} bulk attachments) in {} {}",
@@ -1834,6 +1844,7 @@ BuildContainer(CidStore& ChunkStore,
CbObjectWriter SectionOpsWriter;
SectionOpsWriter.BeginArray("ops"sv);
{
+ Stopwatch BuildingOplogProgressTimer;
Oplog.IterateOplogWithKey([&](int, const Oid& Key, CbObjectView Op) {
if (RemoteResult.IsError())
{
@@ -1865,7 +1876,8 @@ BuildContainer(CidStore& ChunkStore,
"Building oplog"sv,
fmt::format("{} ops processed", OpCount),
TotalOpCount,
- TotalOpCount - OpCount);
+ TotalOpCount - OpCount,
+ BuildingOplogProgressTimer.GetElapsedTimeMs());
}
});
if (RemoteResult.IsError())
@@ -1886,7 +1898,8 @@ BuildContainer(CidStore& ChunkStore,
"Building oplog"sv,
fmt::format("{} ops processed", OpCount),
TotalOpCount,
- 0);
+ 0,
+ BuildingOplogProgressTimer.GetElapsedTimeMs());
}
}
SectionOpsWriter.EndArray(); // "ops"
@@ -2216,6 +2229,7 @@ BuildContainer(CidStore& ChunkStore,
ResolveAttachmentsLatch.CountDown();
{
+ Stopwatch ResolveAttachmentsProgressTimer;
ptrdiff_t AttachmentCountToUseForProgress = ResolveAttachmentsLatch.Remaining();
while (!ResolveAttachmentsLatch.Wait(1000))
{
@@ -2233,9 +2247,15 @@ BuildContainer(CidStore& ChunkStore,
"Resolving attachments"sv,
fmt::format("Aborting, {} attachments remaining...", Remaining),
UploadAttachments.size(),
- Remaining);
+ Remaining,
+ ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
}
- remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, "Aborted"sv, UploadAttachments.size(), 0);
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Resolving attachments"sv,
+ "Aborted"sv,
+ UploadAttachments.size(),
+ 0,
+ ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
return {};
}
AttachmentCountToUseForProgress = Max(Remaining, AttachmentCountToUseForProgress);
@@ -2243,14 +2263,19 @@ BuildContainer(CidStore& ChunkStore,
"Resolving attachments"sv,
fmt::format("{} remaining...", Remaining),
AttachmentCountToUseForProgress,
- Remaining);
+ Remaining,
+ ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
+ }
+ if (UploadAttachments.size() > 0)
+ {
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Resolving attachments"sv,
+ ""sv,
+ UploadAttachments.size(),
+ 0,
+ ResolveAttachmentsProgressTimer.GetElapsedTimeMs());
}
}
- if (UploadAttachments.size() > 0)
- {
- remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, ""sv, UploadAttachments.size(), 0);
- }
-
if (remotestore_impl::IsCancelled(OptionalContext))
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
@@ -2469,6 +2494,8 @@ BuildContainer(CidStore& ChunkStore,
GeneratedBlockCount++;
};
+ Stopwatch AssembleBlocksProgressTimer;
+ uint64_t LastAssembleBlocksProgressUpdateMs = AssembleBlocksProgressTimer.GetElapsedTimeMs();
for (auto HashIt = SortedUploadAttachments.begin(); HashIt != SortedUploadAttachments.end(); HashIt++)
{
if (remotestore_impl::IsCancelled(OptionalContext))
@@ -2479,14 +2506,16 @@ BuildContainer(CidStore& ChunkStore,
fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
break;
}
- if (ChunksAssembled % 1000 == 0)
+ if (AssembleBlocksProgressTimer.GetElapsedTimeMs() - LastAssembleBlocksProgressUpdateMs > 200)
{
remotestore_impl::ReportProgress(
OptionalContext,
"Assembling blocks"sv,
fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
ChunkAssembleCount,
- ChunkAssembleCount - ChunksAssembled);
+ ChunkAssembleCount - ChunksAssembled,
+ AssembleBlocksProgressTimer.GetElapsedTimeMs());
+ LastAssembleBlocksProgressUpdateMs = AssembleBlocksProgressTimer.GetElapsedTimeMs();
}
const IoHash& RawHash(HashIt->first);
const Oid CurrentOpKey = HashIt->second;
@@ -2572,14 +2601,16 @@ BuildContainer(CidStore& ChunkStore,
fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
break;
}
- if (ChunksAssembled % 1000 == 0)
+ if (AssembleBlocksProgressTimer.GetElapsedTimeMs() - LastAssembleBlocksProgressUpdateMs > 200)
{
remotestore_impl::ReportProgress(
OptionalContext,
"Assembling blocks"sv,
fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
ChunkAssembleCount,
- ChunkAssembleCount - ChunksAssembled);
+ ChunkAssembleCount - ChunksAssembled,
+ AssembleBlocksProgressTimer.GetElapsedTimeMs());
+ LastAssembleBlocksProgressUpdateMs = AssembleBlocksProgressTimer.GetElapsedTimeMs();
}
const IoHash& ChunkHash = ChunkedFile.Chunked.Info.ChunkHashes[ChunkIndex];
if (auto FindIt = ChunkedHashes.find(ChunkHash); FindIt != ChunkedHashes.end())
@@ -2628,7 +2659,8 @@ BuildContainer(CidStore& ChunkStore,
"Assembling blocks"sv,
fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
ChunkAssembleCount,
- 0);
+ 0,
+ AssembleBlocksProgressTimer.GetElapsedTimeMs());
}
remotestore_impl::ReportMessage(
@@ -2653,7 +2685,8 @@ BuildContainer(CidStore& ChunkStore,
"Assembling blocks"sv,
fmt::format("Aborting, {} blocks remaining...", Remaining),
GeneratedBlockCount,
- Remaining);
+ Remaining,
+ AssembleBlocksProgressTimer.GetElapsedTimeMs());
}
if (GeneratedBlockCount > 0)
{
@@ -2661,7 +2694,8 @@ BuildContainer(CidStore& ChunkStore,
"Assembling blocks"sv,
fmt::format("Aborting, {} blocks remaining...", 0),
GeneratedBlockCount,
- 0);
+ 0,
+ AssembleBlocksProgressTimer.GetElapsedTimeMs());
}
return {};
}
@@ -2676,6 +2710,7 @@ BuildContainer(CidStore& ChunkStore,
throw;
}
+ Stopwatch BlockCreateProgressTimer;
BlockCreateLatch.CountDown();
while (!BlockCreateLatch.Wait(1000))
{
@@ -2692,22 +2727,34 @@ BuildContainer(CidStore& ChunkStore,
"Creating blocks"sv,
fmt::format("Aborting, {} blocks remaining...", Remaining),
GeneratedBlockCount,
- Remaining);
+ Remaining,
+ BlockCreateProgressTimer.GetElapsedTimeMs());
}
- remotestore_impl::ReportProgress(OptionalContext, "Creating blocks"sv, "Aborted"sv, GeneratedBlockCount, 0);
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Creating blocks"sv,
+ "Aborted"sv,
+ GeneratedBlockCount,
+ 0,
+ BlockCreateProgressTimer.GetElapsedTimeMs());
return {};
}
remotestore_impl::ReportProgress(OptionalContext,
"Creating blocks"sv,
fmt::format("{} remaining...", Remaining),
GeneratedBlockCount,
- Remaining);
+ Remaining,
+ BlockCreateProgressTimer.GetElapsedTimeMs());
}
if (GeneratedBlockCount > 0)
{
uint64_t NowMS = Timer.GetElapsedTimeMs();
- remotestore_impl::ReportProgress(OptionalContext, "Creating blocks"sv, ""sv, GeneratedBlockCount, 0);
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Creating blocks"sv,
+ ""sv,
+ GeneratedBlockCount,
+ 0,
+ BlockCreateProgressTimer.GetElapsedTimeMs());
remotestore_impl::ReportMessage(
OptionalContext,
fmt::format("Created {} blocks in {}", GeneratedBlockCount, NiceTimeSpanMs(NowMS - CreateBlocksStartMS)));
@@ -3250,6 +3297,8 @@ ParseOplogContainer(
remotestore_impl::ReportMessage(OptionalContext, fmt::format("Scanning {} ops for attachments", OpCount));
+ Stopwatch ScanOplogProgressTimer;
+ uint64_t LastReportTimeMs = ScanOplogProgressTimer.GetElapsedTimeMs();
for (CbFieldView OpEntry : OpsArray)
{
OpEntry.IterateAttachments([&](CbFieldView FieldView) { NeededAttachments.insert(FieldView.AsAttachment()); });
@@ -3260,21 +3309,24 @@ ParseOplogContainer(
.Reason = "Operation cancelled"};
}
OpsCompleteCount++;
- if ((OpsCompleteCount & 4095) == 0)
+ if (ScanOplogProgressTimer.GetElapsedTimeMs() - LastReportTimeMs > 200)
{
remotestore_impl::ReportProgress(
OptionalContext,
"Scanning oplog"sv,
fmt::format("{} attachments found, {} ops remaining...", NeededAttachments.size(), OpCount - OpsCompleteCount),
OpCount,
- OpCount - OpsCompleteCount);
+ OpCount - OpsCompleteCount,
+ ScanOplogProgressTimer.GetElapsedTimeMs());
+ LastReportTimeMs = ScanOplogProgressTimer.GetElapsedTimeMs();
}
}
remotestore_impl::ReportProgress(OptionalContext,
"Scanning oplog"sv,
fmt::format("{} attachments found", NeededAttachments.size()),
OpCount,
- OpCount - OpsCompleteCount);
+ OpCount - OpsCompleteCount,
+ ScanOplogProgressTimer.GetElapsedTimeMs());
}
{
std::vector<IoHash> ReferencedAttachments(NeededAttachments.begin(), NeededAttachments.end());
@@ -3549,15 +3601,9 @@ LoadOplog(LoadOplogContext&& Context)
}
};
- auto OnNeedAttachment = [&Context,
- &AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
- &RemoteResult,
- &Attachments,
- &AttachmentCount,
- &LoadAttachmentsTimer,
- &DownloadStartMS,
- &Info](const IoHash& RawHash) {
+ std::vector<IoHash> AttachmentsToDownload;
+
+ auto OnNeedAttachment = [&AttachmentsToDownload, &RemoteResult, &Attachments, &AttachmentCount](const IoHash& RawHash) {
if (!Attachments.insert(RawHash).second)
{
return;
@@ -3567,14 +3613,7 @@ LoadOplog(LoadOplogContext&& Context)
return;
}
AttachmentCount.fetch_add(1);
- DownloadAndSaveAttachment(Context,
- AttachmentsDownloadLatch,
- AttachmentsWriteLatch,
- RemoteResult,
- Info,
- LoadAttachmentsTimer,
- DownloadStartMS,
- RawHash);
+ AttachmentsToDownload.push_back(RawHash);
};
std::vector<ChunkedInfo> FilesToDechunk;
@@ -3681,10 +3720,12 @@ LoadOplog(LoadOplogContext&& Context)
}
}
}
+
+ std::vector<bool> BlockExistsInCache(BlocksWithDescription.size(), false);
+
if (!AllNeededChunkHashes.empty())
{
std::vector<ChunkBlockAnalyser::EPartialBlockDownloadMode> PartialBlockDownloadModes;
- std::vector<bool> BlockExistsInCache(BlocksWithDescription.size(), false);
if (Context.PartialBlockRequestMode == EPartialBlockRequestMode::Off)
{
@@ -3770,59 +3811,71 @@ LoadOplog(LoadOplogContext&& Context)
[&](uint32_t ChunkIndex) { return !DownloadedViaLegacyChunkFlag[ChunkIndex]; });
PartialBlocksResult = PartialAnalyser.CalculatePartialBlockDownloads(NeededBlocks, PartialBlockDownloadModes);
+ }
+
+ Stopwatch AttachmentsDownloadProgressTimer;
+ for (uint32_t FullBlockIndex : PartialBlocksResult.FullBlockIndexes)
+ {
+ DownloadAndSaveBlock(Context,
+ AttachmentsDownloadLatch,
+ AttachmentsWriteLatch,
+ RemoteResult,
+ Info,
+ LoadAttachmentsTimer,
+ DownloadStartMS,
+ BlockDescriptions.Blocks[FullBlockIndex].BlockHash,
+ AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
+ 3);
+ }
- for (uint32_t FullBlockIndex : PartialBlocksResult.FullBlockIndexes)
+ for (size_t BlockRangeIndex = 0; BlockRangeIndex < PartialBlocksResult.BlockRanges.size();)
+ {
+ size_t RangeCount = 1;
+ size_t RangesLeft = PartialBlocksResult.BlockRanges.size() - BlockRangeIndex;
+ const ChunkBlockAnalyser::BlockRangeDescriptor& CurrentBlockRange = PartialBlocksResult.BlockRanges[BlockRangeIndex];
+ while (RangeCount < RangesLeft &&
+ CurrentBlockRange.BlockIndex == PartialBlocksResult.BlockRanges[BlockRangeIndex + RangeCount].BlockIndex)
{
- DownloadAndSaveBlock(Context,
- AttachmentsDownloadLatch,
- AttachmentsWriteLatch,
- RemoteResult,
- Info,
- LoadAttachmentsTimer,
- DownloadStartMS,
- BlockDescriptions.Blocks[FullBlockIndex].BlockHash,
- AllNeededPartialChunkHashesLookup,
- ChunkDownloadedFlags,
- 3);
+ RangeCount++;
}
- for (size_t BlockRangeIndex = 0; BlockRangeIndex < PartialBlocksResult.BlockRanges.size();)
- {
- size_t RangeCount = 1;
- size_t RangesLeft = PartialBlocksResult.BlockRanges.size() - BlockRangeIndex;
- const ChunkBlockAnalyser::BlockRangeDescriptor& CurrentBlockRange = PartialBlocksResult.BlockRanges[BlockRangeIndex];
- while (RangeCount < RangesLeft &&
- CurrentBlockRange.BlockIndex == PartialBlocksResult.BlockRanges[BlockRangeIndex + RangeCount].BlockIndex)
- {
- RangeCount++;
- }
+ DownloadAndSavePartialBlock(Context,
+ AttachmentsDownloadLatch,
+ AttachmentsWriteLatch,
+ RemoteResult,
+ Info,
+ LoadAttachmentsTimer,
+ DownloadStartMS,
+ BlockDescriptions.Blocks[CurrentBlockRange.BlockIndex],
+ BlockExistsInCache[CurrentBlockRange.BlockIndex],
+ PartialBlocksResult.BlockRanges,
+ BlockRangeIndex,
+ RangeCount,
+ AllNeededPartialChunkHashesLookup,
+ ChunkDownloadedFlags,
+ /* RetriesLeft*/ 3);
+
+ BlockRangeIndex += RangeCount;
+ }
- DownloadAndSavePartialBlock(Context,
- AttachmentsDownloadLatch,
- AttachmentsWriteLatch,
- RemoteResult,
- Info,
- LoadAttachmentsTimer,
- DownloadStartMS,
- BlockDescriptions.Blocks[CurrentBlockRange.BlockIndex],
- BlockExistsInCache[CurrentBlockRange.BlockIndex],
- PartialBlocksResult.BlockRanges,
- BlockRangeIndex,
- RangeCount,
- AllNeededPartialChunkHashesLookup,
- ChunkDownloadedFlags,
- /* RetriesLeft*/ 3);
-
- BlockRangeIndex += RangeCount;
- }
+ for (const IoHash& AttachmentToDownload : AttachmentsToDownload)
+ {
+ DownloadAndSaveAttachment(Context,
+ AttachmentsDownloadLatch,
+ AttachmentsWriteLatch,
+ RemoteResult,
+ Info,
+ LoadAttachmentsTimer,
+ DownloadStartMS,
+ AttachmentToDownload);
}
+ uint64_t TotalChunksToDownload = AllNeededChunkHashes.size() + AttachmentsToDownload.size();
AttachmentsDownloadLatch.CountDown();
{
- ptrdiff_t AttachmentCountToUseForProgress = AttachmentsDownloadLatch.Remaining();
while (!AttachmentsDownloadLatch.Wait(1000))
{
- ptrdiff_t Remaining = AttachmentsDownloadLatch.Remaining();
if (remotestore_impl::IsCancelled(Context.OptionalJobContext))
{
if (!RemoteResult.IsError())
@@ -3836,25 +3889,27 @@ LoadOplog(LoadOplogContext&& Context)
PartialTransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load();
}
+ uint64_t CompletedChunkCount = Info.ChunksCompleteCount.load();
+
uint64_t AttachmentsDownloaded =
Info.AttachmentBlocksDownloaded.load() + Info.AttachmentBlocksRangesDownloaded.load() + Info.AttachmentsDownloaded.load();
uint64_t AttachmentBytesDownloaded = Info.AttachmentBlockBytesDownloaded.load() +
Info.AttachmentBlockRangeBytesDownloaded.load() + Info.AttachmentBytesDownloaded.load();
- AttachmentCountToUseForProgress = Max(Remaining, AttachmentCountToUseForProgress);
remotestore_impl::ReportProgress(
Context.OptionalJobContext,
"Loading attachments"sv,
fmt::format(
- "{} ({}) downloaded, {} ({}) stored, {} remaining. {}",
+ "{}/{} ({}) chunks. {} ({}) blobs downloaded. {}",
+ CompletedChunkCount,
+ TotalChunksToDownload,
+ NiceBytes(Info.AttachmentBytesStored.load()),
AttachmentsDownloaded,
NiceBytes(AttachmentBytesDownloaded),
- Info.AttachmentsStored.load(),
- NiceBytes(Info.AttachmentBytesStored.load()),
- Remaining,
remotestore_impl::GetStats(Context.RemoteStore.GetStats(), Context.OptionalCacheStats, PartialTransferWallTimeMS)),
- AttachmentCountToUseForProgress,
- Remaining);
+ TotalChunksToDownload,
+ TotalChunksToDownload - CompletedChunkCount,
+ AttachmentsDownloadProgressTimer.GetElapsedTimeMs());
}
}
if (DownloadStartMS != (uint64_t)-1)
@@ -3862,22 +3917,10 @@ LoadOplog(LoadOplogContext&& Context)
TransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load();
}
- if (AttachmentCount.load() > 0)
- {
- remotestore_impl::ReportProgress(
- Context.OptionalJobContext,
- "Loading attachments"sv,
- fmt::format("{}", remotestore_impl::GetStats(Context.RemoteStore.GetStats(), Context.OptionalCacheStats, TransferWallTimeMS)),
- AttachmentCount.load(),
- 0);
- }
-
AttachmentsWriteLatch.CountDown();
{
- ptrdiff_t AttachmentCountToUseForProgress = AttachmentsWriteLatch.Remaining();
while (!AttachmentsWriteLatch.Wait(1000))
{
- ptrdiff_t Remaining = AttachmentsWriteLatch.Remaining();
if (remotestore_impl::IsCancelled(Context.OptionalJobContext))
{
if (!RemoteResult.IsError())
@@ -3885,21 +3928,36 @@ LoadOplog(LoadOplogContext&& Context)
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
}
}
- AttachmentCountToUseForProgress = Max(Remaining, AttachmentCountToUseForProgress);
+
+ uint64_t CompletedChunkCount = Info.ChunksCompleteCount.load();
+
+ uint64_t AttachmentsDownloaded =
+ Info.AttachmentBlocksDownloaded.load() + Info.AttachmentBlocksRangesDownloaded.load() + Info.AttachmentsDownloaded.load();
+ uint64_t AttachmentBytesDownloaded = Info.AttachmentBlockBytesDownloaded.load() +
+ Info.AttachmentBlockRangeBytesDownloaded.load() + Info.AttachmentBytesDownloaded.load();
+
remotestore_impl::ReportProgress(Context.OptionalJobContext,
- "Writing attachments"sv,
- fmt::format("{} ({}), {} remaining.",
- Info.AttachmentsStored.load(),
+ "Loading attachments"sv,
+ fmt::format("{}/{} ({}) chunks. {} ({}) blobs downloaded.",
+ CompletedChunkCount,
+ TotalChunksToDownload,
NiceBytes(Info.AttachmentBytesStored.load()),
- Remaining),
- AttachmentCountToUseForProgress,
- Remaining);
+ AttachmentsDownloaded,
+ NiceBytes(AttachmentBytesDownloaded)),
+ TotalChunksToDownload,
+ TotalChunksToDownload - CompletedChunkCount,
+ AttachmentsDownloadProgressTimer.GetElapsedTimeMs());
}
}
if (AttachmentCount.load() > 0)
{
- remotestore_impl::ReportProgress(Context.OptionalJobContext, "Writing attachments", ""sv, AttachmentCount.load(), 0);
+ remotestore_impl::ReportProgress(Context.OptionalJobContext,
+ "Loading attachments",
+ ""sv,
+ AttachmentCount.load(),
+ 0,
+ AttachmentsDownloadProgressTimer.GetElapsedTimeMs());
}
if (Result.ErrorCode == 0)
@@ -3948,6 +4006,11 @@ LoadOplog(LoadOplogContext&& Context)
BLAKE3Stream HashingStream;
for (std::uint32_t SequenceIndex : Chunked.ChunkSequence)
{
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+
const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex];
IoBuffer Chunk = Context.ChunkStore.FindChunkByCid(ChunkHash);
if (!Chunk)
@@ -4073,8 +4136,9 @@ LoadOplog(LoadOplogContext&& Context)
},
WorkerThreadPool::EMode::EnableBacklog);
}
- DechunkLatch.CountDown();
+ Stopwatch DechunkProgressTimer;
+ DechunkLatch.CountDown();
while (!DechunkLatch.Wait(1000))
{
ptrdiff_t Remaining = DechunkLatch.Remaining();
@@ -4092,9 +4156,15 @@ LoadOplog(LoadOplogContext&& Context)
"Dechunking attachments"sv,
fmt::format("{} remaining...", Remaining),
FilesToDechunk.size(),
- Remaining);
+ Remaining,
+ DechunkProgressTimer.GetElapsedTimeMs());
}
- remotestore_impl::ReportProgress(Context.OptionalJobContext, "Dechunking attachments"sv, ""sv, FilesToDechunk.size(), 0);
+ remotestore_impl::ReportProgress(Context.OptionalJobContext,
+ "Dechunking attachments"sv,
+ ""sv,
+ FilesToDechunk.size(),
+ 0,
+ DechunkProgressTimer.GetElapsedTimeMs());
}
Result = RemoteResult.ConvertResult();
}
@@ -4281,8 +4351,13 @@ namespace projectstore_testutils {
explicit TestJobContext(int& OpIndex) : m_OpIndex(OpIndex) {}
virtual bool IsCancelled() const { return false; }
virtual void ReportMessage(std::string_view Message) { ZEN_INFO("Job {}: {}", m_OpIndex, Message); }
- virtual void ReportProgress(std::string_view CurrentOp, std::string_view Details, ptrdiff_t TotalCount, ptrdiff_t RemainingCount)
+ virtual void ReportProgress(std::string_view CurrentOp,
+ std::string_view Details,
+ ptrdiff_t TotalCount,
+ ptrdiff_t RemainingCount,
+ uint64_t ElapsedTimeMs)
{
+ ZEN_UNUSED(ElapsedTimeMs);
ZEN_INFO("Job {}: Op '{}'{} {}/{}",
m_OpIndex,
CurrentOp,
diff --git a/src/zenserver/storage/admin/admin.cpp b/src/zenserver/storage/admin/admin.cpp
index c9f999c69..6e78a6179 100644
--- a/src/zenserver/storage/admin/admin.cpp
+++ b/src/zenserver/storage/admin/admin.cpp
@@ -184,6 +184,10 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler,
State.TotalCount > 0
? gsl::narrow<uint32_t>((100 * (State.TotalCount - State.RemainingCount)) / State.TotalCount)
: 0);
+ if (State.ProgressElapsedTimeMs != (uint64_t)-1)
+ {
+ Obj.AddInteger("ProgressElapsedTimeMs"sv, gsl::narrow<uint64_t>(State.ProgressElapsedTimeMs));
+ }
}
if (!State.Messages.empty())
{