diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-11 16:01:29 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-11 16:01:29 +0100 |
| commit | 57816d04b61f6bdc1403583201246abd5883c457 (patch) | |
| tree | 6148371fcc7d98702f0b121a3f40efe4e6bc08d4 /src | |
| parent | block scavenge of other downloads that uses an older state file (#822) (diff) | |
| download | zen-57816d04b61f6bdc1403583201246abd5883c457.tar.xz zen-57816d04b61f6bdc1403583201246abd5883c457.zip | |
improved oplog import progress reporting (#825)
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/projectstore_cmd.cpp | 18 | ||||
| -rw-r--r-- | src/zen/progressbar.cpp | 2 | ||||
| -rw-r--r-- | src/zen/progressbar.h | 5 | ||||
| -rw-r--r-- | src/zencore/include/zencore/jobqueue.h | 7 | ||||
| -rw-r--r-- | src/zencore/jobqueue.cpp | 56 | ||||
| -rw-r--r-- | src/zenremotestore/projectstore/remoteprojectstore.cpp | 317 | ||||
| -rw-r--r-- | src/zenserver/storage/admin/admin.cpp | 4 |
7 files changed, 251 insertions, 158 deletions
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp index db931e49a..c0780c7e8 100644 --- a/src/zen/cmds/projectstore_cmd.cpp +++ b/src/zen/cmds/projectstore_cmd.cpp @@ -163,10 +163,11 @@ namespace projectstore_impl { if (Status == "Running") { - std::string_view CurrentOp = StatusObject["Op"sv].AsString(); - std::string_view CurrentOpDetails = StatusObject["Details"sv].AsString(); - uint64_t TotalCount = StatusObject["TotalCount"sv].AsUInt64(); - uint64_t RemainingCount = StatusObject["RemainingCount"sv].AsUInt64(); + std::string_view CurrentOp = StatusObject["Op"sv].AsString(); + std::string_view CurrentOpDetails = StatusObject["Details"sv].AsString(); + uint64_t TotalCount = StatusObject["TotalCount"sv].AsUInt64(); + uint64_t RemainingCount = StatusObject["RemainingCount"sv].AsUInt64(); + uint64_t ProgressElapsedTimeMs = StatusObject["ProgressElapsedTimeMs"sv].AsUInt64((uint64_t)-1); if (!ProgressBar.IsSameTask(CurrentOp)) { @@ -179,10 +180,11 @@ namespace projectstore_impl { MessagesDone = true; } - ProgressBar.UpdateState({.Task = std::string(CurrentOp), - .Details = std::string(CurrentOpDetails), - .TotalCount = TotalCount, - .RemainingCount = RemainingCount}, + ProgressBar.UpdateState({.Task = std::string(CurrentOp), + .Details = std::string(CurrentOpDetails), + .TotalCount = TotalCount, + .RemainingCount = RemainingCount, + .OptionalElapsedTime = ProgressElapsedTimeMs}, false); } diff --git a/src/zen/progressbar.cpp b/src/zen/progressbar.cpp index b758c061b..961375c0b 100644 --- a/src/zen/progressbar.cpp +++ b/src/zen/progressbar.cpp @@ -156,7 +156,7 @@ ProgressBar::UpdateState(const State& NewState, bool DoLinebreak) return; } - uint64_t ElapsedTimeMS = m_SW.GetElapsedTimeMs(); + uint64_t ElapsedTimeMS = NewState.OptionalElapsedTime == (uint64_t)-1 ? m_SW.GetElapsedTimeMs() : NewState.OptionalElapsedTime; if (m_LastUpdateMS != (uint64_t)-1) { if (!DoLinebreak && (NewState.Status == m_State.Status) && (NewState.Task == m_State.Task) && diff --git a/src/zen/progressbar.h b/src/zen/progressbar.h index cb1c7023b..b54c009e1 100644 --- a/src/zen/progressbar.h +++ b/src/zen/progressbar.h @@ -19,8 +19,9 @@ public: bool operator==(const State&) const = default; std::string Task; std::string Details; - uint64_t TotalCount = 0; - uint64_t RemainingCount = 0; + uint64_t TotalCount = 0; + uint64_t RemainingCount = 0; + uint64_t OptionalElapsedTime = (uint64_t)-1; enum class EStatus { Running, diff --git a/src/zencore/include/zencore/jobqueue.h b/src/zencore/include/zencore/jobqueue.h index d348bd021..6631e5766 100644 --- a/src/zencore/include/zencore/jobqueue.h +++ b/src/zencore/include/zencore/jobqueue.h @@ -25,7 +25,11 @@ public: virtual bool IsCancelled() const = 0; virtual void ReportMessage(std::string_view Message) = 0; // virtual void ReportProgress(std::string_view CurrentOp, uint32_t CurrentOpPercentComplete) = 0; - virtual void ReportProgress(std::string_view CurrentOp, std::string_view Details, ptrdiff_t TotalCount, ptrdiff_t RemainingCount) = 0; + virtual void ReportProgress(std::string_view CurrentOp, + std::string_view Details, + ptrdiff_t TotalCount, + ptrdiff_t RemainingCount, + uint64_t ElapsedTimeMs) = 0; }; class JobError : public std::runtime_error @@ -62,6 +66,7 @@ public: std::string CurrentOpDetails; ptrdiff_t TotalCount; ptrdiff_t RemainingCount; + uint64_t ProgressElapsedTimeMs; std::vector<std::string> Messages; std::string AbortReason; }; diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp index d6a8a6479..3e58fb97d 100644 --- a/src/zencore/jobqueue.cpp +++ b/src/zencore/jobqueue.cpp @@ -57,9 +57,10 @@ public: virtual void ReportProgress(std::string_view CurrentOp, std::string_view Details, ptrdiff_t TotalCount, - ptrdiff_t RemainingCount) override + ptrdiff_t RemainingCount, + uint64_t ElapsedTimeMs) override { - Queue->ReportProgress(Id, CurrentOp, Details, TotalCount, RemainingCount); + Queue->ReportProgress(Id, CurrentOp, Details, TotalCount, RemainingCount, ElapsedTimeMs); } }; @@ -265,21 +266,20 @@ public: virtual std::optional<JobDetails> Get(JobId Id) override { auto Convert = [](JobStatus Status, Job& Job) -> JobDetails { - return JobDetails{ - .Name = Job.Name, - .Status = Status, - .State = {.CurrentOp = Job.State.CurrentOp, - .CurrentOpDetails = Job.State.CurrentOpDetails, - .TotalCount = Job.State.TotalCount, - .RemainingCount = Job.State.RemainingCount, - // .CurrentOpPercentComplete = Job.State.CurrentOpPercentComplete, - .Messages = std::move(Job.State.Messages), - .AbortReason = Job.State.AbortReason}, - .CreateTime = JobClock::TimePointFromTick(Job.CreateTick), - .StartTime = JobClock::TimePointFromTick(Job.StartTick), - .EndTime = JobClock::TimePointFromTick(Job.EndTick), - .WorkerThreadId = Job.WorkerThreadId, - .ReturnCode = Job.ReturnCode}; + return JobDetails{.Name = Job.Name, + .Status = Status, + .State = {.CurrentOp = Job.State.CurrentOp, + .CurrentOpDetails = Job.State.CurrentOpDetails, + .TotalCount = Job.State.TotalCount, + .RemainingCount = Job.State.RemainingCount, + .ProgressElapsedTimeMs = Job.State.ProgressElapsedTimeMs, + .Messages = std::move(Job.State.Messages), + .AbortReason = Job.State.AbortReason}, + .CreateTime = JobClock::TimePointFromTick(Job.CreateTick), + .StartTime = JobClock::TimePointFromTick(Job.StartTick), + .EndTime = JobClock::TimePointFromTick(Job.EndTick), + .WorkerThreadId = Job.WorkerThreadId, + .ReturnCode = Job.ReturnCode}; }; std::optional<JobDetails> Result; @@ -320,15 +320,21 @@ public: }); } - void ReportProgress(JobId Id, std::string_view CurrentOp, std::string_view Details, ptrdiff_t TotalCount, ptrdiff_t RemainingCount) + void ReportProgress(JobId Id, + std::string_view CurrentOp, + std::string_view Details, + ptrdiff_t TotalCount, + ptrdiff_t RemainingCount, + uint64_t ElapsedTimeMs) { QueueLock.WithExclusiveLock([&]() { auto It = RunningJobs.find(Id.Id); ZEN_ASSERT(It != RunningJobs.end()); - It->second->State.CurrentOp = CurrentOp; - It->second->State.CurrentOpDetails = Details; - It->second->State.TotalCount = TotalCount; - It->second->State.RemainingCount = RemainingCount; + It->second->State.CurrentOp = CurrentOp; + It->second->State.CurrentOpDetails = Details; + It->second->State.TotalCount = TotalCount; + It->second->State.RemainingCount = RemainingCount; + It->second->State.ProgressElapsedTimeMs = ElapsedTimeMs; }); } @@ -476,13 +482,13 @@ TEST_CASE("JobQueue") { return; } - Context.ReportProgress("going to sleep", "", 100, 100); + Context.ReportProgress("going to sleep", "", 100, 100, (uint64_t)-1); Sleep(5); if (Context.IsCancelled()) { return; } - Context.ReportProgress("going to sleep again", "", 100, 50); + Context.ReportProgress("going to sleep again", "", 100, 50, (uint64_t)-1); if ((I & 0xFF) == 0x10) { zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", I)); @@ -492,7 +498,7 @@ TEST_CASE("JobQueue") { return; } - Context.ReportProgress("done", "", 100, 0); + Context.ReportProgress("done", "", 100, 0, (uint64_t)-1); }); ZEN_UNUSED(Id); }, diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp index 247bd6cb9..c44b06305 100644 --- a/src/zenremotestore/projectstore/remoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp @@ -98,12 +98,13 @@ namespace remotestore_impl { std::string_view CurrentOp, std::string_view Details, ptrdiff_t Total, - ptrdiff_t Remaining) + ptrdiff_t Remaining, + uint64_t ElapsedTimeMS) { if (OptionalContext) { ZEN_ASSERT(Total > 0); - OptionalContext->ReportProgress(CurrentOp, Details, Total, Remaining); + OptionalContext->ReportProgress(CurrentOp, Details, Total, Remaining, ElapsedTimeMS); } } @@ -202,7 +203,8 @@ namespace remotestore_impl { "Writing oplog"sv, fmt::format("{} remaining...", OpCount - OpsCompleteCount), OpCount, - OpCount - OpsCompleteCount); + OpCount - OpsCompleteCount, + Timer.GetElapsedTimeMs()); }; BinaryWriter Writer; @@ -226,7 +228,7 @@ namespace remotestore_impl { if (OpCount > 0) { - ReportProgress(OptionalContext, "Writing oplog"sv, ""sv, OpCount, 0); + ReportProgress(OptionalContext, "Writing oplog"sv, ""sv, OpCount, 0, Timer.GetElapsedTimeMs()); } return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0}; @@ -244,6 +246,8 @@ namespace remotestore_impl { std::atomic<uint64_t> AttachmentsStored = 0; std::atomic<uint64_t> AttachmentBytesStored = 0; std::atomic_size_t MissingAttachmentCount = 0; + + std::atomic<uint64_t> ChunksCompleteCount = 0; }; class JobContextLogOutput : public OperationLogOutput @@ -580,6 +584,7 @@ namespace remotestore_impl { UsedSize += WriteAttachmentBuffers[Index].GetSize(); } } + Info.ChunksCompleteCount += WriteAttachmentBuffers.size(); if (UsedSize < BlockSize) { ZEN_DEBUG("Used {} (skipping {}) out of {} for block {} ({} %) (use of matching {}%)", @@ -1036,6 +1041,7 @@ namespace remotestore_impl { UsedSize += WriteAttachmentBuffers[Index].GetSize(); } } + Info.ChunksCompleteCount += WriteAttachmentBuffers.size(); if (UsedSize < BlockPartSize) { ZEN_DEBUG( @@ -1174,6 +1180,7 @@ namespace remotestore_impl { Info.AttachmentBytesStored.fetch_add(AttachmentSize); Info.AttachmentsStored.fetch_add(1); } + Info.ChunksCompleteCount++; } catch (const std::exception& Ex) { @@ -1568,6 +1575,7 @@ namespace remotestore_impl { } } + Stopwatch SaveAttachmentsProgressTimer; SaveAttachmentsLatch.CountDown(); while (!SaveAttachmentsLatch.Wait(1000)) { @@ -1588,7 +1596,8 @@ namespace remotestore_impl { Remaining, GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, PartialTransferWallTimeMS)), AttachmentsToSave, - Remaining); + Remaining, + SaveAttachmentsProgressTimer.GetElapsedTimeMs()); } uint64_t ElapsedTimeMS = Timer.GetElapsedTimeMs(); if (AttachmentsToSave > 0) @@ -1597,7 +1606,8 @@ namespace remotestore_impl { "Saving attachments"sv, fmt::format("{}", GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, ElapsedTimeMS)), AttachmentsToSave, - 0); + 0, + SaveAttachmentsProgressTimer.GetElapsedTimeMs()); } ReportMessage(OptionalContext, fmt::format("Saved {} attachments ({} blocks, {} attachments, {} bulk attachments) in {} {}", @@ -1834,6 +1844,7 @@ BuildContainer(CidStore& ChunkStore, CbObjectWriter SectionOpsWriter; SectionOpsWriter.BeginArray("ops"sv); { + Stopwatch BuildingOplogProgressTimer; Oplog.IterateOplogWithKey([&](int, const Oid& Key, CbObjectView Op) { if (RemoteResult.IsError()) { @@ -1865,7 +1876,8 @@ BuildContainer(CidStore& ChunkStore, "Building oplog"sv, fmt::format("{} ops processed", OpCount), TotalOpCount, - TotalOpCount - OpCount); + TotalOpCount - OpCount, + BuildingOplogProgressTimer.GetElapsedTimeMs()); } }); if (RemoteResult.IsError()) @@ -1886,7 +1898,8 @@ BuildContainer(CidStore& ChunkStore, "Building oplog"sv, fmt::format("{} ops processed", OpCount), TotalOpCount, - 0); + 0, + BuildingOplogProgressTimer.GetElapsedTimeMs()); } } SectionOpsWriter.EndArray(); // "ops" @@ -2216,6 +2229,7 @@ BuildContainer(CidStore& ChunkStore, ResolveAttachmentsLatch.CountDown(); { + Stopwatch ResolveAttachmentsProgressTimer; ptrdiff_t AttachmentCountToUseForProgress = ResolveAttachmentsLatch.Remaining(); while (!ResolveAttachmentsLatch.Wait(1000)) { @@ -2233,9 +2247,15 @@ BuildContainer(CidStore& ChunkStore, "Resolving attachments"sv, fmt::format("Aborting, {} attachments remaining...", Remaining), UploadAttachments.size(), - Remaining); + Remaining, + ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); } - remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, "Aborted"sv, UploadAttachments.size(), 0); + remotestore_impl::ReportProgress(OptionalContext, + "Resolving attachments"sv, + "Aborted"sv, + UploadAttachments.size(), + 0, + ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); return {}; } AttachmentCountToUseForProgress = Max(Remaining, AttachmentCountToUseForProgress); @@ -2243,14 +2263,19 @@ BuildContainer(CidStore& ChunkStore, "Resolving attachments"sv, fmt::format("{} remaining...", Remaining), AttachmentCountToUseForProgress, - Remaining); + Remaining, + ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); + } + if (UploadAttachments.size() > 0) + { + remotestore_impl::ReportProgress(OptionalContext, + "Resolving attachments"sv, + ""sv, + UploadAttachments.size(), + 0, + ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); } } - if (UploadAttachments.size() > 0) - { - remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, ""sv, UploadAttachments.size(), 0); - } - if (remotestore_impl::IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); @@ -2469,6 +2494,8 @@ BuildContainer(CidStore& ChunkStore, GeneratedBlockCount++; }; + Stopwatch AssembleBlocksProgressTimer; + uint64_t LastAssembleBlocksProgressUpdateMs = AssembleBlocksProgressTimer.GetElapsedTimeMs(); for (auto HashIt = SortedUploadAttachments.begin(); HashIt != SortedUploadAttachments.end(); HashIt++) { if (remotestore_impl::IsCancelled(OptionalContext)) @@ -2479,14 +2506,16 @@ BuildContainer(CidStore& ChunkStore, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); break; } - if (ChunksAssembled % 1000 == 0) + if (AssembleBlocksProgressTimer.GetElapsedTimeMs() - LastAssembleBlocksProgressUpdateMs > 200) { remotestore_impl::ReportProgress( OptionalContext, "Assembling blocks"sv, fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), ChunkAssembleCount, - ChunkAssembleCount - ChunksAssembled); + ChunkAssembleCount - ChunksAssembled, + AssembleBlocksProgressTimer.GetElapsedTimeMs()); + LastAssembleBlocksProgressUpdateMs = AssembleBlocksProgressTimer.GetElapsedTimeMs(); } const IoHash& RawHash(HashIt->first); const Oid CurrentOpKey = HashIt->second; @@ -2572,14 +2601,16 @@ BuildContainer(CidStore& ChunkStore, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); break; } - if (ChunksAssembled % 1000 == 0) + if (AssembleBlocksProgressTimer.GetElapsedTimeMs() - LastAssembleBlocksProgressUpdateMs > 200) { remotestore_impl::ReportProgress( OptionalContext, "Assembling blocks"sv, fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), ChunkAssembleCount, - ChunkAssembleCount - ChunksAssembled); + ChunkAssembleCount - ChunksAssembled, + AssembleBlocksProgressTimer.GetElapsedTimeMs()); + LastAssembleBlocksProgressUpdateMs = AssembleBlocksProgressTimer.GetElapsedTimeMs(); } const IoHash& ChunkHash = ChunkedFile.Chunked.Info.ChunkHashes[ChunkIndex]; if (auto FindIt = ChunkedHashes.find(ChunkHash); FindIt != ChunkedHashes.end()) @@ -2628,7 +2659,8 @@ BuildContainer(CidStore& ChunkStore, "Assembling blocks"sv, fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), ChunkAssembleCount, - 0); + 0, + AssembleBlocksProgressTimer.GetElapsedTimeMs()); } remotestore_impl::ReportMessage( @@ -2653,7 +2685,8 @@ BuildContainer(CidStore& ChunkStore, "Assembling blocks"sv, fmt::format("Aborting, {} blocks remaining...", Remaining), GeneratedBlockCount, - Remaining); + Remaining, + AssembleBlocksProgressTimer.GetElapsedTimeMs()); } if (GeneratedBlockCount > 0) { @@ -2661,7 +2694,8 @@ BuildContainer(CidStore& ChunkStore, "Assembling blocks"sv, fmt::format("Aborting, {} blocks remaining...", 0), GeneratedBlockCount, - 0); + 0, + AssembleBlocksProgressTimer.GetElapsedTimeMs()); } return {}; } @@ -2676,6 +2710,7 @@ BuildContainer(CidStore& ChunkStore, throw; } + Stopwatch BlockCreateProgressTimer; BlockCreateLatch.CountDown(); while (!BlockCreateLatch.Wait(1000)) { @@ -2692,22 +2727,34 @@ BuildContainer(CidStore& ChunkStore, "Creating blocks"sv, fmt::format("Aborting, {} blocks remaining...", Remaining), GeneratedBlockCount, - Remaining); + Remaining, + BlockCreateProgressTimer.GetElapsedTimeMs()); } - remotestore_impl::ReportProgress(OptionalContext, "Creating blocks"sv, "Aborted"sv, GeneratedBlockCount, 0); + remotestore_impl::ReportProgress(OptionalContext, + "Creating blocks"sv, + "Aborted"sv, + GeneratedBlockCount, + 0, + BlockCreateProgressTimer.GetElapsedTimeMs()); return {}; } remotestore_impl::ReportProgress(OptionalContext, "Creating blocks"sv, fmt::format("{} remaining...", Remaining), GeneratedBlockCount, - Remaining); + Remaining, + BlockCreateProgressTimer.GetElapsedTimeMs()); } if (GeneratedBlockCount > 0) { uint64_t NowMS = Timer.GetElapsedTimeMs(); - remotestore_impl::ReportProgress(OptionalContext, "Creating blocks"sv, ""sv, GeneratedBlockCount, 0); + remotestore_impl::ReportProgress(OptionalContext, + "Creating blocks"sv, + ""sv, + GeneratedBlockCount, + 0, + BlockCreateProgressTimer.GetElapsedTimeMs()); remotestore_impl::ReportMessage( OptionalContext, fmt::format("Created {} blocks in {}", GeneratedBlockCount, NiceTimeSpanMs(NowMS - CreateBlocksStartMS))); @@ -3250,6 +3297,8 @@ ParseOplogContainer( remotestore_impl::ReportMessage(OptionalContext, fmt::format("Scanning {} ops for attachments", OpCount)); + Stopwatch ScanOplogProgressTimer; + uint64_t LastReportTimeMs = ScanOplogProgressTimer.GetElapsedTimeMs(); for (CbFieldView OpEntry : OpsArray) { OpEntry.IterateAttachments([&](CbFieldView FieldView) { NeededAttachments.insert(FieldView.AsAttachment()); }); @@ -3260,21 +3309,24 @@ ParseOplogContainer( .Reason = "Operation cancelled"}; } OpsCompleteCount++; - if ((OpsCompleteCount & 4095) == 0) + if (ScanOplogProgressTimer.GetElapsedTimeMs() - LastReportTimeMs > 200) { remotestore_impl::ReportProgress( OptionalContext, "Scanning oplog"sv, fmt::format("{} attachments found, {} ops remaining...", NeededAttachments.size(), OpCount - OpsCompleteCount), OpCount, - OpCount - OpsCompleteCount); + OpCount - OpsCompleteCount, + ScanOplogProgressTimer.GetElapsedTimeMs()); + LastReportTimeMs = ScanOplogProgressTimer.GetElapsedTimeMs(); } } remotestore_impl::ReportProgress(OptionalContext, "Scanning oplog"sv, fmt::format("{} attachments found", NeededAttachments.size()), OpCount, - OpCount - OpsCompleteCount); + OpCount - OpsCompleteCount, + ScanOplogProgressTimer.GetElapsedTimeMs()); } { std::vector<IoHash> ReferencedAttachments(NeededAttachments.begin(), NeededAttachments.end()); @@ -3549,15 +3601,9 @@ LoadOplog(LoadOplogContext&& Context) } }; - auto OnNeedAttachment = [&Context, - &AttachmentsDownloadLatch, - &AttachmentsWriteLatch, - &RemoteResult, - &Attachments, - &AttachmentCount, - &LoadAttachmentsTimer, - &DownloadStartMS, - &Info](const IoHash& RawHash) { + std::vector<IoHash> AttachmentsToDownload; + + auto OnNeedAttachment = [&AttachmentsToDownload, &RemoteResult, &Attachments, &AttachmentCount](const IoHash& RawHash) { if (!Attachments.insert(RawHash).second) { return; @@ -3567,14 +3613,7 @@ LoadOplog(LoadOplogContext&& Context) return; } AttachmentCount.fetch_add(1); - DownloadAndSaveAttachment(Context, - AttachmentsDownloadLatch, - AttachmentsWriteLatch, - RemoteResult, - Info, - LoadAttachmentsTimer, - DownloadStartMS, - RawHash); + AttachmentsToDownload.push_back(RawHash); }; std::vector<ChunkedInfo> FilesToDechunk; @@ -3681,10 +3720,12 @@ LoadOplog(LoadOplogContext&& Context) } } } + + std::vector<bool> BlockExistsInCache(BlocksWithDescription.size(), false); + if (!AllNeededChunkHashes.empty()) { std::vector<ChunkBlockAnalyser::EPartialBlockDownloadMode> PartialBlockDownloadModes; - std::vector<bool> BlockExistsInCache(BlocksWithDescription.size(), false); if (Context.PartialBlockRequestMode == EPartialBlockRequestMode::Off) { @@ -3770,59 +3811,71 @@ LoadOplog(LoadOplogContext&& Context) [&](uint32_t ChunkIndex) { return !DownloadedViaLegacyChunkFlag[ChunkIndex]; }); PartialBlocksResult = PartialAnalyser.CalculatePartialBlockDownloads(NeededBlocks, PartialBlockDownloadModes); + } + + Stopwatch AttachmentsDownloadProgressTimer; + for (uint32_t FullBlockIndex : PartialBlocksResult.FullBlockIndexes) + { + DownloadAndSaveBlock(Context, + AttachmentsDownloadLatch, + AttachmentsWriteLatch, + RemoteResult, + Info, + LoadAttachmentsTimer, + DownloadStartMS, + BlockDescriptions.Blocks[FullBlockIndex].BlockHash, + AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, + 3); + } - for (uint32_t FullBlockIndex : PartialBlocksResult.FullBlockIndexes) + for (size_t BlockRangeIndex = 0; BlockRangeIndex < PartialBlocksResult.BlockRanges.size();) + { + size_t RangeCount = 1; + size_t RangesLeft = PartialBlocksResult.BlockRanges.size() - BlockRangeIndex; + const ChunkBlockAnalyser::BlockRangeDescriptor& CurrentBlockRange = PartialBlocksResult.BlockRanges[BlockRangeIndex]; + while (RangeCount < RangesLeft && + CurrentBlockRange.BlockIndex == PartialBlocksResult.BlockRanges[BlockRangeIndex + RangeCount].BlockIndex) { - DownloadAndSaveBlock(Context, - AttachmentsDownloadLatch, - AttachmentsWriteLatch, - RemoteResult, - Info, - LoadAttachmentsTimer, - DownloadStartMS, - BlockDescriptions.Blocks[FullBlockIndex].BlockHash, - AllNeededPartialChunkHashesLookup, - ChunkDownloadedFlags, - 3); + RangeCount++; } - for (size_t BlockRangeIndex = 0; BlockRangeIndex < PartialBlocksResult.BlockRanges.size();) - { - size_t RangeCount = 1; - size_t RangesLeft = PartialBlocksResult.BlockRanges.size() - BlockRangeIndex; - const ChunkBlockAnalyser::BlockRangeDescriptor& CurrentBlockRange = PartialBlocksResult.BlockRanges[BlockRangeIndex]; - while (RangeCount < RangesLeft && - CurrentBlockRange.BlockIndex == PartialBlocksResult.BlockRanges[BlockRangeIndex + RangeCount].BlockIndex) - { - RangeCount++; - } + DownloadAndSavePartialBlock(Context, + AttachmentsDownloadLatch, + AttachmentsWriteLatch, + RemoteResult, + Info, + LoadAttachmentsTimer, + DownloadStartMS, + BlockDescriptions.Blocks[CurrentBlockRange.BlockIndex], + BlockExistsInCache[CurrentBlockRange.BlockIndex], + PartialBlocksResult.BlockRanges, + BlockRangeIndex, + RangeCount, + AllNeededPartialChunkHashesLookup, + ChunkDownloadedFlags, + /* RetriesLeft*/ 3); + + BlockRangeIndex += RangeCount; + } - DownloadAndSavePartialBlock(Context, - AttachmentsDownloadLatch, - AttachmentsWriteLatch, - RemoteResult, - Info, - LoadAttachmentsTimer, - DownloadStartMS, - BlockDescriptions.Blocks[CurrentBlockRange.BlockIndex], - BlockExistsInCache[CurrentBlockRange.BlockIndex], - PartialBlocksResult.BlockRanges, - BlockRangeIndex, - RangeCount, - AllNeededPartialChunkHashesLookup, - ChunkDownloadedFlags, - /* RetriesLeft*/ 3); - - BlockRangeIndex += RangeCount; - } + for (const IoHash& AttachmentToDownload : AttachmentsToDownload) + { + DownloadAndSaveAttachment(Context, + AttachmentsDownloadLatch, + AttachmentsWriteLatch, + RemoteResult, + Info, + LoadAttachmentsTimer, + DownloadStartMS, + AttachmentToDownload); } + uint64_t TotalChunksToDownload = AllNeededChunkHashes.size() + AttachmentsToDownload.size(); AttachmentsDownloadLatch.CountDown(); { - ptrdiff_t AttachmentCountToUseForProgress = AttachmentsDownloadLatch.Remaining(); while (!AttachmentsDownloadLatch.Wait(1000)) { - ptrdiff_t Remaining = AttachmentsDownloadLatch.Remaining(); if (remotestore_impl::IsCancelled(Context.OptionalJobContext)) { if (!RemoteResult.IsError()) @@ -3836,25 +3889,27 @@ LoadOplog(LoadOplogContext&& Context) PartialTransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load(); } + uint64_t CompletedChunkCount = Info.ChunksCompleteCount.load(); + uint64_t AttachmentsDownloaded = Info.AttachmentBlocksDownloaded.load() + Info.AttachmentBlocksRangesDownloaded.load() + Info.AttachmentsDownloaded.load(); uint64_t AttachmentBytesDownloaded = Info.AttachmentBlockBytesDownloaded.load() + Info.AttachmentBlockRangeBytesDownloaded.load() + Info.AttachmentBytesDownloaded.load(); - AttachmentCountToUseForProgress = Max(Remaining, AttachmentCountToUseForProgress); remotestore_impl::ReportProgress( Context.OptionalJobContext, "Loading attachments"sv, fmt::format( - "{} ({}) downloaded, {} ({}) stored, {} remaining. {}", + "{}/{} ({}) chunks. {} ({}) blobs downloaded. {}", + CompletedChunkCount, + TotalChunksToDownload, + NiceBytes(Info.AttachmentBytesStored.load()), AttachmentsDownloaded, NiceBytes(AttachmentBytesDownloaded), - Info.AttachmentsStored.load(), - NiceBytes(Info.AttachmentBytesStored.load()), - Remaining, remotestore_impl::GetStats(Context.RemoteStore.GetStats(), Context.OptionalCacheStats, PartialTransferWallTimeMS)), - AttachmentCountToUseForProgress, - Remaining); + TotalChunksToDownload, + TotalChunksToDownload - CompletedChunkCount, + AttachmentsDownloadProgressTimer.GetElapsedTimeMs()); } } if (DownloadStartMS != (uint64_t)-1) @@ -3862,22 +3917,10 @@ LoadOplog(LoadOplogContext&& Context) TransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load(); } - if (AttachmentCount.load() > 0) - { - remotestore_impl::ReportProgress( - Context.OptionalJobContext, - "Loading attachments"sv, - fmt::format("{}", remotestore_impl::GetStats(Context.RemoteStore.GetStats(), Context.OptionalCacheStats, TransferWallTimeMS)), - AttachmentCount.load(), - 0); - } - AttachmentsWriteLatch.CountDown(); { - ptrdiff_t AttachmentCountToUseForProgress = AttachmentsWriteLatch.Remaining(); while (!AttachmentsWriteLatch.Wait(1000)) { - ptrdiff_t Remaining = AttachmentsWriteLatch.Remaining(); if (remotestore_impl::IsCancelled(Context.OptionalJobContext)) { if (!RemoteResult.IsError()) @@ -3885,21 +3928,36 @@ LoadOplog(LoadOplogContext&& Context) RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); } } - AttachmentCountToUseForProgress = Max(Remaining, AttachmentCountToUseForProgress); + + uint64_t CompletedChunkCount = Info.ChunksCompleteCount.load(); + + uint64_t AttachmentsDownloaded = + Info.AttachmentBlocksDownloaded.load() + Info.AttachmentBlocksRangesDownloaded.load() + Info.AttachmentsDownloaded.load(); + uint64_t AttachmentBytesDownloaded = Info.AttachmentBlockBytesDownloaded.load() + + Info.AttachmentBlockRangeBytesDownloaded.load() + Info.AttachmentBytesDownloaded.load(); + remotestore_impl::ReportProgress(Context.OptionalJobContext, - "Writing attachments"sv, - fmt::format("{} ({}), {} remaining.", - Info.AttachmentsStored.load(), + "Loading attachments"sv, + fmt::format("{}/{} ({}) chunks. {} ({}) blobs downloaded.", + CompletedChunkCount, + TotalChunksToDownload, NiceBytes(Info.AttachmentBytesStored.load()), - Remaining), - AttachmentCountToUseForProgress, - Remaining); + AttachmentsDownloaded, + NiceBytes(AttachmentBytesDownloaded)), + TotalChunksToDownload, + TotalChunksToDownload - CompletedChunkCount, + AttachmentsDownloadProgressTimer.GetElapsedTimeMs()); } } if (AttachmentCount.load() > 0) { - remotestore_impl::ReportProgress(Context.OptionalJobContext, "Writing attachments", ""sv, AttachmentCount.load(), 0); + remotestore_impl::ReportProgress(Context.OptionalJobContext, + "Loading attachments", + ""sv, + AttachmentCount.load(), + 0, + AttachmentsDownloadProgressTimer.GetElapsedTimeMs()); } if (Result.ErrorCode == 0) @@ -3948,6 +4006,11 @@ LoadOplog(LoadOplogContext&& Context) BLAKE3Stream HashingStream; for (std::uint32_t SequenceIndex : Chunked.ChunkSequence) { + if (RemoteResult.IsError()) + { + return; + } + const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex]; IoBuffer Chunk = Context.ChunkStore.FindChunkByCid(ChunkHash); if (!Chunk) @@ -4073,8 +4136,9 @@ LoadOplog(LoadOplogContext&& Context) }, WorkerThreadPool::EMode::EnableBacklog); } - DechunkLatch.CountDown(); + Stopwatch DechunkProgressTimer; + DechunkLatch.CountDown(); while (!DechunkLatch.Wait(1000)) { ptrdiff_t Remaining = DechunkLatch.Remaining(); @@ -4092,9 +4156,15 @@ LoadOplog(LoadOplogContext&& Context) "Dechunking attachments"sv, fmt::format("{} remaining...", Remaining), FilesToDechunk.size(), - Remaining); + Remaining, + DechunkProgressTimer.GetElapsedTimeMs()); } - remotestore_impl::ReportProgress(Context.OptionalJobContext, "Dechunking attachments"sv, ""sv, FilesToDechunk.size(), 0); + remotestore_impl::ReportProgress(Context.OptionalJobContext, + "Dechunking attachments"sv, + ""sv, + FilesToDechunk.size(), + 0, + DechunkProgressTimer.GetElapsedTimeMs()); } Result = RemoteResult.ConvertResult(); } @@ -4281,8 +4351,13 @@ namespace projectstore_testutils { explicit TestJobContext(int& OpIndex) : m_OpIndex(OpIndex) {} virtual bool IsCancelled() const { return false; } virtual void ReportMessage(std::string_view Message) { ZEN_INFO("Job {}: {}", m_OpIndex, Message); } - virtual void ReportProgress(std::string_view CurrentOp, std::string_view Details, ptrdiff_t TotalCount, ptrdiff_t RemainingCount) + virtual void ReportProgress(std::string_view CurrentOp, + std::string_view Details, + ptrdiff_t TotalCount, + ptrdiff_t RemainingCount, + uint64_t ElapsedTimeMs) { + ZEN_UNUSED(ElapsedTimeMs); ZEN_INFO("Job {}: Op '{}'{} {}/{}", m_OpIndex, CurrentOp, diff --git a/src/zenserver/storage/admin/admin.cpp b/src/zenserver/storage/admin/admin.cpp index c9f999c69..6e78a6179 100644 --- a/src/zenserver/storage/admin/admin.cpp +++ b/src/zenserver/storage/admin/admin.cpp @@ -184,6 +184,10 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, State.TotalCount > 0 ? gsl::narrow<uint32_t>((100 * (State.TotalCount - State.RemainingCount)) / State.TotalCount) : 0); + if (State.ProgressElapsedTimeMs != (uint64_t)-1) + { + Obj.AddInteger("ProgressElapsedTimeMs"sv, gsl::narrow<uint64_t>(State.ProgressElapsedTimeMs)); + } } if (!State.Messages.empty()) { |