diff options
| author | Dan Engelbrecht <[email protected]> | 2024-04-20 13:22:05 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-04-20 13:22:05 +0200 |
| commit | aa0b0d3cbfc6c4561591df856396703f7177292e (patch) | |
| tree | 6ff9a4e94559ba62d8ee07076d56dedc7d2e9115 /src/zenserver | |
| parent | 5.4.5-pre0 (diff) | |
| download | zen-aa0b0d3cbfc6c4561591df856396703f7177292e.tar.xz zen-aa0b0d3cbfc6c4561591df856396703f7177292e.zip | |
import oplog improvements (#54)
* report down/up transfer speed during progress
* add disk buffering in http client
* offload block decoding and chunk writing form network worker pool threads
add block hash verification for blocks recevied at oplog import
* separate download-latch from write-latch to get more accurate download speed
* check headers when downloading with http client to go directly to file writing for large payloads
* we must clear write callback even if we only provide it as an argument to the Download() call
* make timeout optional in AddSponsorProcess
* check return codes when creating windows threadpool
Diffstat (limited to 'src/zenserver')
| -rw-r--r-- | src/zenserver/main.cpp | 5 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 33 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 4 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 285 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.h | 4 |
5 files changed, 202 insertions, 129 deletions
diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp index 8715f5447..b96118484 100644 --- a/src/zenserver/main.cpp +++ b/src/zenserver/main.cpp @@ -123,7 +123,7 @@ ZenEntryPoint::Run() Entry->Pid.load(), m_ServerOptions.OwnerPid); - if (Entry->AddSponsorProcess(m_ServerOptions.OwnerPid)) + if (Entry->AddSponsorProcess(m_ServerOptions.OwnerPid, 2000)) { std::exit(0); } @@ -193,7 +193,8 @@ ZenEntryPoint::Run() if (m_ServerOptions.OwnerPid) { - Entry->AddSponsorProcess(m_ServerOptions.OwnerPid); + // We are adding a sponsor process to our own entry, can't wait for pick since the code is not run until later + Entry->AddSponsorProcess(m_ServerOptions.OwnerPid, 0); } ZenServer Server; diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index e452c658e..3a7922aaf 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -3210,37 +3210,6 @@ ProjectStore::ReadOplog(const std::string_view ProjectId, return ConvertResult(ContainerResult); } -std::pair<HttpResponseCode, std::string> -ProjectStore::WriteBlock(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload) -{ - ZEN_TRACE_CPU("Store::WriteBlock"); - - Ref<ProjectStore::Project> Project = OpenProject(ProjectId); - if (!Project) - { - return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown project '{}'", ProjectId)}; - } - Project->TouchProject(); - - ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId); - if (!Oplog) - { - return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown oplog '{}/{}'", ProjectId, OplogId)}; - } - Project->TouchOplog(OplogId); - - if (!IterateBlock(std::move(Payload), [this](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { - IoBuffer Compressed = Chunk.GetCompressed().Flatten().AsIoBuffer(); - m_CidStore.AddChunk(Compressed, AttachmentRawHash); - ZEN_DEBUG("Saved attachment {} from block, size {}", AttachmentRawHash, Compressed.GetSize()); - })) - { - return {HttpResponseCode::BadRequest, "Invalid chunk in block"}; - } - - return {HttpResponseCode::OK, {}}; -} - bool ProjectStore::Rpc(HttpServerRequest& HttpReq, const std::string_view ProjectId, @@ -4736,7 +4705,7 @@ TEST_CASE("project.store.block") } CompressedBuffer Block = GenerateBlock(std::move(Chunks)); IoBuffer BlockBuffer = Block.GetCompressed().Flatten().AsIoBuffer(); - CHECK(IterateBlock(std::move(BlockBuffer), [](CompressedBuffer&&, const IoHash&) {})); + CHECK(IterateBlock(Block.DecodeRawHash(), std::move(BlockBuffer), [](CompressedBuffer&&, const IoHash&) {})); } #endif diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index 75844f84e..269fe7336 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -369,10 +369,6 @@ public: const HttpServerRequest::QueryParams& Params, CbObject& OutResponse); - std::pair<HttpResponseCode, std::string> WriteBlock(const std::string_view ProjectId, - const std::string_view OplogId, - IoBuffer&& Payload); - bool Rpc(HttpServerRequest& HttpReq, const std::string_view ProjectId, const std::string_view OplogId, diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index 65ef099e4..d8402366d 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -113,21 +113,19 @@ IsCancelled(JobContext* OptionalContext) return OptionalContext->IsCancelled(); } +std::string +GetStats(const RemoteProjectStore::Stats& Stats, uint64_t ElapsedWallTimeMS) +{ + return fmt::format("Sent: {} ({}/s) Recv: {} ({}/s)", + NiceBytes(Stats.m_SentBytes), + NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 1000) / ElapsedWallTimeMS) : 0u), + NiceBytes(Stats.m_ReceivedBytes), + NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 1000) / ElapsedWallTimeMS) : 0u)); +} + void -ReportRemoteStoreStats(JobContext* OptionalContext, - const RemoteProjectStore::RemoteStoreInfo& RemoteStoreInfo, - const RemoteProjectStore::Stats& Stats, - uint64_t ElapsedWallTimeMS) +LogRemoteStoreStatsDetails(const RemoteProjectStore::Stats& Stats) { - ReportMessage( - OptionalContext, - fmt::format("Remote store '{}': " - "Sent: {} ({}/s) Recv: {} ({}/s)", - RemoteStoreInfo.Description, - NiceBytes(Stats.m_SentBytes), - NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 1000) / ElapsedWallTimeMS) : 0u), - NiceBytes(Stats.m_ReceivedBytes), - NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 1000) / ElapsedWallTimeMS) : 0u))); ZEN_INFO("Oplog request count: {}. Average request size: {}. Peak request speed: {}", Stats.m_RequestCount, NiceLatencyNs(Stats.m_RequestCount > 0 ? (Stats.m_ReceivedBytes + Stats.m_SentBytes) / Stats.m_RequestCount : 0u), @@ -144,9 +142,19 @@ ReportRemoteStoreStats(JobContext* OptionalContext, } bool -IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor) +IterateBlock(const IoHash& BlockHash, + IoBuffer&& CompressedBlock, + std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor) { - IoBuffer BlockPayload = CompressedBuffer::FromCompressedNoValidate(std::move(CompressedBlock)).Decompress().AsIoBuffer(); + IoHash RawHash; + uint64_t RawSize; + IoBuffer BlockPayload = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(CompressedBlock)), RawHash, RawSize).Decompress().AsIoBuffer(); + if (RawHash != BlockHash) + { + ZEN_WARN("Header rawhash for downloaded block {} does not match, got {}", BlockHash, RawHash); + return false; + } MemoryView BlockView = BlockPayload.GetView(); const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData()); @@ -1711,19 +1719,29 @@ UploadAttachments(WorkerThreadPool& WorkerPool, ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); } } - ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", Remaining), AttachmentsToSave, Remaining); + uint64_t PartialTransferWallTimeMS = Timer.GetElapsedTimeMs(); + ReportProgress( + OptionalContext, + fmt::format("Saving attachments, {} remaining... {}", Remaining, GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)), + AttachmentsToSave, + Remaining); } + uint64_t ElapsedTimeMS = Timer.GetElapsedTimeMs(); if (AttachmentsToSave > 0) { - ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", 0), AttachmentsToSave, 0); + ReportProgress(OptionalContext, + fmt::format("Saving attachments, {} remaining. {}", 0, GetStats(RemoteStore.GetStats(), ElapsedTimeMS)), + AttachmentsToSave, + 0); } ReportMessage(OptionalContext, - fmt::format("Saved {} attachments ({} blocks, {} attachments, {} bulk attachments) in {}", + fmt::format("Saved {} attachments ({} blocks, {} attachments, {} bulk attachments) in {} {}", AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(), BlockAttachmentCountToUpload, LargeAttachmentCountToUpload, BulkAttachmentCountToUpload, - NiceTimeSpanMs(Timer.GetElapsedTimeMs()))); + NiceTimeSpanMs(ElapsedTimeMS), + GetStats(RemoteStore.GetStats(), ElapsedTimeMS))); } RemoteProjectStore::Result @@ -2077,10 +2095,10 @@ SaveOplog(CidStore& ChunkStore, RemoteProjectStore::Result Result = RemoteResult.ConvertResult(); Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - ReportRemoteStoreStats(OptionalContext, RemoteStoreInfo, RemoteStore.GetStats(), TransferWallTimeMS); + LogRemoteStoreStatsDetails(RemoteStore.GetStats()); ReportMessage(OptionalContext, - fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({})", + fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}) {}", RemoteStoreInfo.ContainerName, RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), @@ -2088,7 +2106,8 @@ SaveOplog(CidStore& ChunkStore, Info.AttachmentBlocksUploaded.load(), NiceBytes(Info.AttachmentBlockBytesUploaded.load()), Info.AttachmentsUploaded.load(), - NiceBytes(Info.AttachmentBytesUploaded.load()))); + NiceBytes(Info.AttachmentBytesUploaded.load()), + GetStats(RemoteStore.GetStats(), TransferWallTimeMS))); return Result; }; @@ -2360,7 +2379,8 @@ LoadOplog(CidStore& ChunkStore, Info.OplogSizeBytes = LoadContainerResult.ContainerObject.GetSize(); AsyncRemoteResult RemoteResult; - Latch AttachmentsWorkLatch(1); + Latch AttachmentsDownloadLatch(1); + Latch AttachmentsWriteLatch(1); std::atomic_size_t AttachmentCount = 0; Stopwatch LoadAttachmentsTimer; @@ -2381,7 +2401,9 @@ LoadOplog(CidStore& ChunkStore, &Oplog, &ChunkStore, &NetworkWorkerPool, - &AttachmentsWorkLatch, + &WorkerPool, + &AttachmentsDownloadLatch, + &AttachmentsWriteLatch, &AttachmentCount, &RemoteResult, &BlockCountToDownload, @@ -2400,11 +2422,13 @@ LoadOplog(CidStore& ChunkStore, BlockCountToDownload++; if (BlockHash == IoHash::Zero) { - AttachmentsWorkLatch.AddCount(1); + AttachmentsDownloadLatch.AddCount(1); AttachmentCount.fetch_add(1); NetworkWorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, - &AttachmentsWorkLatch, + &WorkerPool, + &AttachmentsDownloadLatch, + &AttachmentsWriteLatch, &RemoteResult, Chunks = std::move(Chunks), &Info, @@ -2412,7 +2436,7 @@ LoadOplog(CidStore& ChunkStore, &DownloadStartMS, IgnoreMissingAttachments, OptionalContext]() { - auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); if (RemoteResult.IsError()) { return; @@ -2442,26 +2466,37 @@ LoadOplog(CidStore& ChunkStore, { return; } - for (const auto& It : Result.Chunks) - { - uint64_t ChunkSize = It.second.GetCompressedSize(); - Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); - CidStore::InsertResult InsertResult = - ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), It.first, CidStore::InsertMode::kCopyOnly); - if (InsertResult.New) + AttachmentsWriteLatch.AddCount(1); + WorkerPool.ScheduleWork([&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() { + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + if (RemoteResult.IsError()) { - Info.AttachmentBytesStored.fetch_add(ChunkSize); - Info.AttachmentsStored.fetch_add(1); + return; } - } + for (const auto& It : Chunks) + { + uint64_t ChunkSize = It.second.GetCompressedSize(); + Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); + CidStore::InsertResult InsertResult = ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), + It.first, + CidStore::InsertMode::kCopyOnly); + if (InsertResult.New) + { + Info.AttachmentBytesStored.fetch_add(ChunkSize); + Info.AttachmentsStored.fetch_add(1); + } + } + }); }); return; } - AttachmentsWorkLatch.AddCount(1); + AttachmentsDownloadLatch.AddCount(1); AttachmentCount.fetch_add(1); - NetworkWorkerPool.ScheduleWork([&AttachmentsWorkLatch, + NetworkWorkerPool.ScheduleWork([&AttachmentsDownloadLatch, + &AttachmentsWriteLatch, &ChunkStore, &RemoteStore, + &WorkerPool, BlockHash, &RemoteResult, Chunks = std::move(Chunks), @@ -2470,7 +2505,7 @@ LoadOplog(CidStore& ChunkStore, &DownloadStartMS, IgnoreMissingAttachments, OptionalContext]() { - auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); if (RemoteResult.IsError()) { return; @@ -2503,38 +2538,56 @@ LoadOplog(CidStore& ChunkStore, NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)), NiceBytes(BlockSize)); Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize); - std::unordered_set<IoHash, IoHash::Hasher> WantedChunks; - WantedChunks.reserve(Chunks.size()); - WantedChunks.insert(Chunks.begin(), Chunks.end()); - bool StoreChunksOK = - IterateBlock(std::move(BlockResult.Bytes), - [&ChunkStore, &WantedChunks, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { - if (WantedChunks.contains(AttachmentRawHash)) - { - uint64_t ChunkSize = Chunk.GetCompressedSize(); - CidStore::InsertResult InsertResult = - ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash); - if (InsertResult.New) + + AttachmentsWriteLatch.AddCount(1); + WorkerPool.ScheduleWork([&AttachmentsWriteLatch, + &RemoteResult, + &Info, + &ChunkStore, + BlockHash, + Chunks = std::move(Chunks), + Bytes = std::move(BlockResult.Bytes), + OptionalContext]() { + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + ZEN_ASSERT(Bytes.Size() > 0); + std::unordered_set<IoHash, IoHash::Hasher> WantedChunks; + WantedChunks.reserve(Chunks.size()); + WantedChunks.insert(Chunks.begin(), Chunks.end()); + bool StoreChunksOK = + IterateBlock(BlockHash, + IoBuffer(Bytes), + [&ChunkStore, &WantedChunks, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { + if (WantedChunks.contains(AttachmentRawHash)) { - Info.AttachmentBytesStored.fetch_add(ChunkSize); - Info.AttachmentsStored.fetch_add(1); + uint64_t ChunkSize = Chunk.GetCompressedSize(); + CidStore::InsertResult InsertResult = + ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash); + if (InsertResult.New) + { + Info.AttachmentBytesStored.fetch_add(ChunkSize); + Info.AttachmentsStored.fetch_add(1); + } + WantedChunks.erase(AttachmentRawHash); } - WantedChunks.erase(AttachmentRawHash); - } - }); - if (!StoreChunksOK) - { - ReportMessage(OptionalContext, - fmt::format("Block attachment {} has invalid format ({}): {}", - BlockHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Invalid format for block {}", BlockHash), - {}); - return; - } - ZEN_ASSERT(WantedChunks.empty()); + }); + if (!StoreChunksOK) + { + ReportMessage(OptionalContext, + fmt::format("Block attachment {} has invalid format ({}): {}", + BlockHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Invalid format for block {}", BlockHash), + {}); + return; + } + ZEN_ASSERT(WantedChunks.empty()); + }); }); }; @@ -2542,7 +2595,9 @@ LoadOplog(CidStore& ChunkStore, &Oplog, &ChunkStore, &NetworkWorkerPool, - &AttachmentsWorkLatch, + &WorkerPool, + &AttachmentsDownloadLatch, + &AttachmentsWriteLatch, &RemoteResult, &Attachments, &AttachmentCount, @@ -2562,19 +2617,21 @@ LoadOplog(CidStore& ChunkStore, Oplog.CaptureAddedAttachments(std::vector<IoHash>{RawHash}); - AttachmentsWorkLatch.AddCount(1); + AttachmentsDownloadLatch.AddCount(1); AttachmentCount.fetch_add(1); NetworkWorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, + &WorkerPool, &RemoteResult, - &AttachmentsWorkLatch, + &AttachmentsDownloadLatch, + &AttachmentsWriteLatch, RawHash, &LoadAttachmentsTimer, &DownloadStartMS, &Info, IgnoreMissingAttachments, OptionalContext]() { - auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); if (RemoteResult.IsError()) { return; @@ -2607,12 +2664,28 @@ LoadOplog(CidStore& ChunkStore, return; } Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize); - CidStore::InsertResult InsertResult = ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash); - if (InsertResult.New) - { - Info.AttachmentBytesStored.fetch_add(AttachmentSize); - Info.AttachmentsStored.fetch_add(1); - } + + AttachmentsWriteLatch.AddCount(1); + WorkerPool.ScheduleWork([&AttachmentsWriteLatch, + &RemoteResult, + &Info, + &ChunkStore, + RawHash, + AttachmentSize, + Bytes = std::move(AttachmentResult.Bytes), + OptionalContext]() { + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash); + if (InsertResult.New) + { + Info.AttachmentBytesStored.fetch_add(AttachmentSize); + Info.AttachmentsStored.fetch_add(1); + } + }); }); }; @@ -2646,10 +2719,10 @@ LoadOplog(CidStore& ChunkStore, BlockCountToDownload, FilesToDechunk.size())); - AttachmentsWorkLatch.CountDown(); - while (!AttachmentsWorkLatch.Wait(1000)) + AttachmentsDownloadLatch.CountDown(); + while (!AttachmentsDownloadLatch.Wait(1000)) { - ptrdiff_t Remaining = AttachmentsWorkLatch.Remaining(); + ptrdiff_t Remaining = AttachmentsDownloadLatch.Remaining(); if (IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) @@ -2657,16 +2730,47 @@ LoadOplog(CidStore& ChunkStore, RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); } } - ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", Remaining), AttachmentCount.load(), Remaining); + uint64_t PartialTransferWallTimeMS = TransferWallTimeMS; + if (DownloadStartMS != (uint64_t)-1) + { + PartialTransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load(); + } + ReportProgress( + OptionalContext, + fmt::format("Loading attachments, {} remaining. {}", Remaining, GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)), + AttachmentCount.load(), + Remaining); + } + if (DownloadStartMS != (uint64_t)-1) + { + TransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load(); } + if (AttachmentCount.load() > 0) { - ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", 0), AttachmentCount.load(), 0); + ReportProgress(OptionalContext, + fmt::format("Loading attachments, {} remaining. {}", 0, GetStats(RemoteStore.GetStats(), TransferWallTimeMS)), + AttachmentCount.load(), + 0); } - if (DownloadStartMS != (uint64_t)-1) + AttachmentsWriteLatch.CountDown(); + while (!AttachmentsWriteLatch.Wait(1000)) { - TransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load(); + ptrdiff_t Remaining = AttachmentsWriteLatch.Remaining(); + if (IsCancelled(OptionalContext)) + { + if (!RemoteResult.IsError()) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + } + } + ReportProgress(OptionalContext, fmt::format("Writing attachments, {} remaining.", Remaining), AttachmentCount.load(), Remaining); + } + + if (AttachmentCount.load() > 0) + { + ReportProgress(OptionalContext, fmt::format("Writing attachments, {} remaining.", 0), AttachmentCount.load(), 0); } if (Result.ErrorCode == 0) @@ -2811,10 +2915,10 @@ LoadOplog(CidStore& ChunkStore, Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - ReportRemoteStoreStats(OptionalContext, RemoteStoreInfo, RemoteStore.GetStats(), TransferWallTimeMS); + LogRemoteStoreStatsDetails(RemoteStore.GetStats()); ReportMessage(OptionalContext, - fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {}", + fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {} {}", RemoteStoreInfo.ContainerName, Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE", NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), @@ -2825,7 +2929,8 @@ LoadOplog(CidStore& ChunkStore, NiceBytes(Info.AttachmentBytesDownloaded.load()), Info.AttachmentsStored.load(), NiceBytes(Info.AttachmentBytesStored.load()), - Info.MissingAttachmentCount.load())); + Info.MissingAttachmentCount.load(), + GetStats(RemoteStore.GetStats(), TransferWallTimeMS))); return Result; } diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h index b00aa231f..d4ccd8c7b 100644 --- a/src/zenserver/projectstore/remoteprojectstore.h +++ b/src/zenserver/projectstore/remoteprojectstore.h @@ -162,6 +162,8 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, JobContext* OptionalContext); CompressedBuffer GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks); -bool IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor); +bool IterateBlock(const IoHash& BlockHash, + IoBuffer&& CompressedBlock, + std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor); } // namespace zen |