diff options
| author | Dan Engelbrecht <[email protected]> | 2024-01-23 10:21:03 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-01-23 10:21:03 +0100 |
| commit | ff8d2e7c432c58114b528bfc9670eba7e387843c (patch) | |
| tree | 5dbb80fbab73835047794a56a76d45220b33571c /src/zenserver/projectstore/remoteprojectstore.cpp | |
| parent | add --ignore-missing-attachments to oplog-import command (#637) (diff) | |
| download | zen-ff8d2e7c432c58114b528bfc9670eba7e387843c.tar.xz zen-ff8d2e7c432c58114b528bfc9670eba7e387843c.zip | |
oplog import/export improvements (#634)
* improve feedback from oplog import/export
* improve oplog save performance
Diffstat (limited to 'src/zenserver/projectstore/remoteprojectstore.cpp')
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 464 |
1 files changed, 305 insertions, 159 deletions
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index a8f4c5106..ddab7432d 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -82,7 +82,6 @@ ReportProgress(JobContext* OptionalContext, std::string_view CurrentOp, ptrdiff_ ZEN_ASSERT(Total > 0); OptionalContext->ReportProgress(CurrentOp, gsl::narrow<uint32_t>((100 * (Total - Remaining)) / Total)); } - ZEN_INFO("{}", CurrentOp); } void @@ -319,7 +318,7 @@ BuildContainer(CidStore& ChunkStore, { return false; } - ZEN_WARN("Failed to create temp attachment '{}', reason: '{}', retries left: {}.", + ZEN_WARN("Failed to create temp attachment '{}': '{}', retries left: {}.", AttachmentPath, Ec.message(), RetriesLeft); @@ -560,7 +559,8 @@ BuildContainer(CidStore& ChunkStore, Op.value().ToJson(Sb); RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {}); - ZEN_ERROR("Failed to build container ({}). Reason: '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason()); + ReportMessage(OptionalContext, + fmt::format("Failed to build container ({}): '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason())); BlockCreateLatch.CountDown(); while (!BlockCreateLatch.Wait(1000)) @@ -822,6 +822,7 @@ BuildContainer(CidStore& ChunkStore, RemoteResult); return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject}; } + void UploadAttachments(WorkerThreadPool& WorkerPool, CidStore& ChunkStore, @@ -892,10 +893,11 @@ UploadAttachments(WorkerThreadPool& WorkerPool, RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), "Invalid attachment", fmt::format("Upload requested of unknown attachment '{}'", Needed)); - ZEN_ERROR("Failed to upload attachment '{}'. ({}). Reason: '{}'", - Needed, - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); + ReportMessage(OptionalContext, + fmt::format("Failed to upload attachment '{}'. ({}): '{}'", + Needed, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); return; } } @@ -948,44 +950,47 @@ UploadAttachments(WorkerThreadPool& WorkerPool, SaveAttachmentsLatch.AddCount(1); AttachmentsToSave++; - WorkerPool.ScheduleWork( - [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, RawHash, &CreatedBlocks, TempPayload = std::move(Payload)]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - IoBuffer Payload = TempPayload ? TempPayload : ChunkStore.FindChunkByCid(RawHash); - if (!Payload) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Failed to find attachment {}", RawHash), - {}); - ZEN_WARN("Failed to save attachment '{}' ({}). Reason: '{}'", - RawHash, - RemoteResult.GetErrorReason(), - RemoteResult.GetError()); - return; - } + WorkerPool.ScheduleWork([&ChunkStore, + &RemoteStore, + &SaveAttachmentsLatch, + &RemoteResult, + RawHash, + &CreatedBlocks, + TempPayload = std::move(Payload), + OptionalContext]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + IoBuffer Payload = TempPayload ? TempPayload : ChunkStore.FindChunkByCid(RawHash); + if (!Payload) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to find attachment {}", RawHash), + {}); + ZEN_WARN("Failed to save attachment '{}' ({}): '{}'", RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason()); + return; + } - RemoteProjectStore::SaveAttachmentResult Result = - RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_WARN("Failed to save attachment '{}', {} ({}). Reason: '{}'", - RawHash, - NiceBytes(Payload.GetSize()), - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - return; - } - ZEN_DEBUG("Saved attachment {}, {} in {}", - RawHash, - NiceBytes(Payload.GetSize()), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ReportMessage(OptionalContext, + fmt::format("Failed to save attachment '{}', {} ({}): '{}'", + RawHash, + NiceBytes(Payload.GetSize()), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); return; - }); + } + ZEN_DEBUG("Saved attachment {}, {} in {}", + RawHash, + NiceBytes(Payload.GetSize()), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + return; + }); } if (IsCancelled(OptionalContext)) @@ -1012,31 +1017,34 @@ UploadAttachments(WorkerThreadPool& WorkerPool, ZEN_ASSERT(Payload); SaveAttachmentsLatch.AddCount(1); AttachmentsToSave++; - WorkerPool.ScheduleWork([&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } + WorkerPool.ScheduleWork( + [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash, OptionalContext]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } - RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_WARN("Failed to save attachment '{}', {} ({}). Reason: '{}'", - RawHash, - NiceBytes(Payload.GetSize()), - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - return; - } + RemoteProjectStore::SaveAttachmentResult Result = + RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ReportMessage(OptionalContext, + fmt::format("Failed to save attachment '{}', {} ({}): '{}'", + RawHash, + NiceBytes(Payload.GetSize()), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + return; + } - ZEN_DEBUG("Saved attachment {}, {} in {}", - RawHash, - NiceBytes(Payload.GetSize()), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); - return; - }); + ZEN_DEBUG("Saved attachment {}, {} in {}", + RawHash, + NiceBytes(Payload.GetSize()), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + return; + }); } if (IsCancelled(OptionalContext)) @@ -1077,7 +1085,8 @@ UploadAttachments(WorkerThreadPool& WorkerPool, &RemoteResult, &Chunks, NeededChunks = std::move(NeededChunks), - &BulkAttachmentCountToUpload]() { + &BulkAttachmentCountToUpload, + OptionalContext]() { auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); std::vector<SharedBuffer> ChunkBuffers; ChunkBuffers.reserve(NeededChunks.size()); @@ -1098,10 +1107,11 @@ UploadAttachments(WorkerThreadPool& WorkerPool, if (Result.ErrorCode) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_WARN("Failed to save attachments with {} chunks ({}). Reason: '{}'", - Chunks.size(), - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); + ReportMessage(OptionalContext, + fmt::format("Failed to save attachments with {} chunks ({}): '{}'", + Chunks.size(), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); return; } ZEN_DEBUG("Saved {} bulk attachments in {}", @@ -1182,10 +1192,7 @@ SaveOplog(CidStore& ChunkStore, { return false; } - ZEN_WARN("Failed to create temporary oplog block '{}', reason: '{}', retries left: {}.", - BlockPath, - Ec.message(), - RetriesLeft); + ZEN_WARN("Failed to create temporary oplog block '{}': '{}', retries left: {}.", BlockPath, Ec.message(), RetriesLeft); Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms RetriesLeft--; return true; @@ -1217,12 +1224,13 @@ SaveOplog(CidStore& ChunkStore, } }; - auto UploadBlock = [&RemoteStore, &RemoteResult](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) { + auto UploadBlock = [&RemoteStore, &RemoteResult, OptionalContext](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) { RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash); if (Result.ErrorCode) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_WARN("Failed to save attachment ({}). Reason: '{}'", RemoteResult.GetErrorReason(), RemoteResult.GetError()); + ReportMessage(OptionalContext, + fmt::format("Failed to save attachment ({}): '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return; } ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize())); @@ -1262,9 +1270,10 @@ SaveOplog(CidStore& ChunkStore, { if (BaseContainerResult.ErrorCode) { - ZEN_WARN("Failed to load oplog base container, reason: '{}', error code: {}", - BaseContainerResult.Reason, - BaseContainerResult.ErrorCode); + ReportMessage(OptionalContext, + fmt::format("Failed to load oplog base container: '{}', error code: {}", + BaseContainerResult.Reason, + BaseContainerResult.ErrorCode)); } else { @@ -1327,7 +1336,8 @@ SaveOplog(CidStore& ChunkStore, if (ContainerSaveResult.ErrorCode) { RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container"); - ZEN_WARN("Failed to save oplog container ({}). Reason: '{}'", RemoteResult.GetErrorReason(), RemoteResult.GetError()); + ReportMessage(OptionalContext, + fmt::format("Failed to save oplog container ({}): '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason())); } else { @@ -1361,10 +1371,13 @@ SaveOplog(CidStore& ChunkStore, if (ContainerFinalizeResult.ErrorCode) { RemoteResult.SetError(ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Reason, ContainerFinalizeResult.Text); - ZEN_WARN("Failed to finalize oplog container {} ({}). Reason: '{}'", - ContainerSaveResult.RawHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); + ReportMessage(OptionalContext, + fmt::format("Failed to finalize oplog container {} ({}): '{}'", + ContainerSaveResult.RawHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + RemoteProjectStore::Result Result = RemoteResult.ConvertResult(); + return Result; } ZEN_DEBUG("Finalized container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000))); if (ContainerFinalizeResult.Needs.empty()) @@ -1429,8 +1442,9 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, continue; } OnNeedAttachment(AttachmentHash); + NeedAttachmentCount++; }; - ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachements", NeedAttachmentCount, LargeChunksArray.Num())); + ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num())); size_t NeedBlockCount = 0; CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); @@ -1470,6 +1484,7 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, } OnNeedBlock(BlockHash, {}); + NeedBlockCount++; break; } }; @@ -1482,33 +1497,51 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, CbObject SectionObject = LoadCompactBinaryObject(SectionPayload); if (!SectionObject) { - ZEN_WARN("Failed to save oplog container. Reason: '{}'", "Section has unexpected data type"); + ReportMessage(OptionalContext, fmt::format("Failed to save oplog container: '{}'", "Section has unexpected data type")); return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest), Timer.GetElapsedTimeMs() / 1000.500, "Section has unexpected data type", "Failed to save oplog container"}; } - CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); + CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); + const uint64_t OpCount = OpsArray.Num(); + uint64_t OpsLoaded = 0; ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpsArray.Num())); + BinaryWriter Writer; for (CbFieldView OpEntry : OpsArray) { - CbObjectView Core = OpEntry.AsObjectView(); - BinaryWriter Writer; - Core.CopyTo(Writer); - MemoryView OpView = Writer.GetView(); - IoBuffer OpBuffer(IoBuffer::Wrap, OpView.GetData(), OpView.GetSize()); - CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType); - const uint32_t OpLsn = Oplog.AppendNewOplogEntry(Op); + CbObjectView Op = OpEntry.AsObjectView(); + Op.CopyTo(Writer); + CbObjectView TypedOp(Writer.GetData()); + const uint32_t OpLsn = Oplog.AppendNewOplogEntry(TypedOp); + Writer.Reset(); if (OpLsn == ProjectStore::Oplog::kInvalidOp) { + ReportMessage(OptionalContext, fmt::format("Failed to save op {}", OpsLoaded)); return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest), Timer.GetElapsedTimeMs() / 1000.500, "Failed saving op", "Failed to save oplog container"}; } ZEN_DEBUG("oplog entry #{}", OpLsn); + if (OpCount > 100000) + { + if (IsCancelled(OptionalContext)) + { + return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::OK), + Timer.GetElapsedTimeMs() / 1000.500, + "Operation cancelled", + ""}; + } + if (OpsLoaded % 10000 == 0) + { + ReportProgress(OptionalContext, "Writing oplog", OpCount, OpCount - OpsLoaded); + } + } + OpsLoaded++; } + ReportProgress(OptionalContext, "Writing oplog", OpCount, 0); return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500}; } @@ -1522,6 +1555,20 @@ LoadOplog(CidStore& ChunkStore, { using namespace std::literals; + struct DownloadInfo + { + uint64_t OplogSizeBytes = 0; + std::atomic<uint64_t> AttachmentsDownloaded = 0; + std::atomic<uint64_t> AttachmentBlocksDownloaded = 0; + std::atomic<uint64_t> AttachmentBytesDownloaded = 0; + std::atomic<uint64_t> AttachmentBlockBytesDownloaded = 0; + std::atomic<uint64_t> AttachmentsStored = 0; + std::atomic<uint64_t> AttachmentBytesStored = 0; + std::atomic_size_t MissingAttachmentCount = 0; + }; + + DownloadInfo Info; + Stopwatch Timer; WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); @@ -1532,14 +1579,19 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore::LoadContainerResult LoadContainerResult = RemoteStore.LoadContainer(); if (LoadContainerResult.ErrorCode) { - ZEN_WARN("Failed to load oplog container, reason: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode); + ReportMessage( + OptionalContext, + fmt::format("Failed to load oplog container: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode)); return RemoteProjectStore::Result{.ErrorCode = LoadContainerResult.ErrorCode, .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, .Reason = LoadContainerResult.Reason, .Text = LoadContainerResult.Text}; } ReportMessage(OptionalContext, - fmt::format("Loaded container in {}", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)))); + fmt::format("Loaded container in {} ({})", + NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)), + NiceBytes(LoadContainerResult.ContainerObject.GetSize()))); + Info.OplogSizeBytes = LoadContainerResult.ContainerObject.GetSize(); AsyncRemoteResult RemoteResult; Latch AttachmentsWorkLatch(1); @@ -1555,7 +1607,9 @@ LoadOplog(CidStore& ChunkStore, &AttachmentsWorkLatch, &AttachmentCount, &RemoteResult, - IgnoreMissingAttachments](const IoHash& BlockHash, std::vector<IoHash>&& Chunks) { + &Info, + IgnoreMissingAttachments, + OptionalContext](const IoHash& BlockHash, std::vector<IoHash>&& Chunks) { if (RemoteResult.IsError()) { return; @@ -1564,39 +1618,70 @@ LoadOplog(CidStore& ChunkStore, { AttachmentsWorkLatch.AddCount(1); AttachmentCount.fetch_add(1); - WorkerPool.ScheduleWork( - [&RemoteStore, &ChunkStore, &AttachmentsWorkLatch, &RemoteResult, Chunks = std::move(Chunks), IgnoreMissingAttachments]() { - auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); - if (RemoteResult.IsError()) + WorkerPool.ScheduleWork([&RemoteStore, + &ChunkStore, + &AttachmentsWorkLatch, + &RemoteResult, + Chunks = std::move(Chunks), + &Info, + IgnoreMissingAttachments, + OptionalContext]() { + auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); + if (Result.ErrorCode) + { + ReportMessage(OptionalContext, + fmt::format("Failed to load attachments with {} chunks ({}): '{}'", + Chunks.size(), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + if (IgnoreMissingAttachments) { - return; + Info.MissingAttachmentCount.fetch_add(1); } - RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); - if (Result.ErrorCode) + else { - ZEN_WARN("Failed to load attachments with {} chunks ({}). Reason: '{}'", - Chunks.size(), - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - if (!IgnoreMissingAttachments) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - } - return; + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); } - ZEN_DEBUG("Loaded {} bulk attachments in {}", - Chunks.size(), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); - for (const auto& It : Result.Chunks) - { + return; + } + Info.AttachmentsDownloaded.fetch_add(Chunks.size()); + ZEN_INFO("Loaded {} bulk attachments in {}", + Chunks.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + if (RemoteResult.IsError()) + { + return; + } + for (const auto& It : Result.Chunks) + { + uint64_t ChunkSize = It.second.GetCompressedSize(); + Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); + CidStore::InsertResult InsertResult = ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), It.first, CidStore::InsertMode::kCopyOnly); + if (InsertResult.New) + { + Info.AttachmentBytesStored.fetch_add(ChunkSize); + Info.AttachmentsStored.fetch_add(1); } - }); + } + }); return; } AttachmentsWorkLatch.AddCount(1); AttachmentCount.fetch_add(1); - WorkerPool.ScheduleWork([&AttachmentsWorkLatch, &ChunkStore, &RemoteStore, BlockHash, &RemoteResult, IgnoreMissingAttachments]() { + WorkerPool.ScheduleWork([&AttachmentsWorkLatch, + &ChunkStore, + &RemoteStore, + BlockHash, + &RemoteResult, + &Info, + IgnoreMissingAttachments, + OptionalContext]() { auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -1605,29 +1690,53 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash); if (BlockResult.ErrorCode) { - ZEN_WARN("Failed to load oplog container, missing attachment {} ({}). Reason: '{}'", - BlockHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - if (!IgnoreMissingAttachments) + ReportMessage(OptionalContext, + fmt::format("Failed to download block attachment {} ({}): '{}'", + BlockHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + if (IgnoreMissingAttachments) + { + Info.MissingAttachmentCount.fetch_add(1); + } + else { RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); } return; } - ZEN_DEBUG("Loaded block attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000))); + Info.AttachmentBlocksDownloaded.fetch_add(1); + ZEN_INFO("Loaded block attachment '{}' in {}", + BlockHash, + NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000))); + if (RemoteResult.IsError()) + { + return; + } - if (!IterateBlock(std::move(BlockResult.Bytes), [&ChunkStore](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { - ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash); - })) + bool StoreChunksOK = + IterateBlock(std::move(BlockResult.Bytes), [&ChunkStore, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { + uint64_t ChunkSize = Chunk.GetCompressedSize(); + Info.AttachmentBlockBytesDownloaded.fetch_add(ChunkSize); + CidStore::InsertResult InsertResult = + ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash); + if (InsertResult.New) + { + Info.AttachmentBytesStored.fetch_add(ChunkSize); + Info.AttachmentsStored.fetch_add(1); + } + }); + + if (!StoreChunksOK) { + ReportMessage(OptionalContext, + fmt::format("Block attachment {} has invalid format ({}): '{}'", + BlockHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), fmt::format("Invalid format for block {}", BlockHash), {}); - ZEN_WARN("Failed to load oplog container, attachment {} has invalid format ({}). Reason: '{}'", - BlockHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); return; } }); @@ -1640,44 +1749,74 @@ LoadOplog(CidStore& ChunkStore, &RemoteResult, &Attachments, &AttachmentCount, - IgnoreMissingAttachments](const IoHash& RawHash) { + &Info, + IgnoreMissingAttachments, + OptionalContext](const IoHash& RawHash) { if (!Attachments.insert(RawHash).second) { return; } + if (RemoteResult.IsError()) + { + return; + } AttachmentsWorkLatch.AddCount(1); AttachmentCount.fetch_add(1); - WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash, IgnoreMissingAttachments]() { - auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); - if (AttachmentResult.ErrorCode) - { - ZEN_WARN("Failed to download attachment {}, reason: '{}', error code: {}", + WorkerPool.ScheduleWork( + [&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash, &Info, IgnoreMissingAttachments, OptionalContext]() { + auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); + if (AttachmentResult.ErrorCode) + { + ReportMessage(OptionalContext, + fmt::format("Failed to download large attachment {}: '{}', error code : {}", + RawHash, + AttachmentResult.Reason, + AttachmentResult.ErrorCode)); + if (IgnoreMissingAttachments) + { + Info.MissingAttachmentCount.fetch_add(1); + } + else + { + RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); + } + return; + } + ZEN_INFO("Loaded large attachment '{}' in {}", RawHash, - AttachmentResult.Reason, - AttachmentResult.ErrorCode); - if (!IgnoreMissingAttachments) + NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000))); + Info.AttachmentsDownloaded.fetch_add(1); + if (RemoteResult.IsError()) { - RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); + return; } - return; - } - ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000))); - ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash); - }); + uint64_t ChunkSize = AttachmentResult.Bytes.GetSize(); + Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); + CidStore::InsertResult InsertResult = ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash); + if (InsertResult.New) + { + Info.AttachmentBytesStored.fetch_add(ChunkSize); + Info.AttachmentsStored.fetch_add(1); + } + }); }; RemoteProjectStore::Result Result = SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, OptionalContext); - if (!Attachments.empty()) + if (Result.ErrorCode != 0) { - ReportMessage(OptionalContext, fmt::format("Found {} attachments to download", Attachments.size())); + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); } + ReportMessage(OptionalContext, + fmt::format("Loaded oplog container in {}, found {} attachments to download", + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), + Attachments.size())); AttachmentsWorkLatch.CountDown(); while (!AttachmentsWorkLatch.Wait(1000)) @@ -1703,10 +1842,17 @@ LoadOplog(CidStore& ChunkStore, Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; ReportMessage(OptionalContext, - fmt::format("Loaded oplog {} in {}", + fmt::format("Loaded oplog {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {}", RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)))); - + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), + NiceBytes(Info.OplogSizeBytes), + Info.AttachmentBlocksDownloaded.load(), + NiceBytes(Info.AttachmentBlockBytesDownloaded.load()), + Info.AttachmentsDownloaded.load(), + NiceBytes(Info.AttachmentBytesDownloaded.load()), + NiceBytes(Info.AttachmentsStored.load()), + NiceBytes(Info.AttachmentBytesStored.load()), + Info.MissingAttachmentCount.load())); return Result; } |