diff options
| author | Dan Engelbrecht <[email protected]> | 2024-01-23 10:21:03 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-01-23 10:21:03 +0100 |
| commit | ff8d2e7c432c58114b528bfc9670eba7e387843c (patch) | |
| tree | 5dbb80fbab73835047794a56a76d45220b33571c | |
| parent | add --ignore-missing-attachments to oplog-import command (#637) (diff) | |
| download | zen-ff8d2e7c432c58114b528bfc9670eba7e387843c.tar.xz zen-ff8d2e7c432c58114b528bfc9670eba7e387843c.zip | |
oplog import/export improvements (#634)
* improve feedback from oplog import/export
* improve oplog save performance
| -rw-r--r-- | CHANGELOG.md | 2 | ||||
| -rw-r--r-- | src/zencore/stream.cpp | 5 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 116 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 12 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 464 |
5 files changed, 373 insertions, 226 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index ee270caea..de0977f95 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,8 @@ - Improvement: Removed use of <random> in stats, for better performance (runtime as well as build) - Improvement: Separated cache RPC handling code from general structured cache HTTP code - Improvement: Get more detailed information on Jupiter upstream errors +- Improvement: Improved performance when saving oplog via oplog import command +- Improvement: Add more feedback and progress information when executing oplog import/export - Bugfix: RPC recording would not release memory as early as intended which resulted in memory buildup during long recording sessions. Previously certain memory was only released when recording stopped, now it gets released immediately when a segment is complete and written to disk. - Bugfix: File log format now contains dates again (PR #631) - Bugfix: Jobqueue - Allow multiple threads to report progress/messages (oplog import/export) diff --git a/src/zencore/stream.cpp b/src/zencore/stream.cpp index ee97a53c4..13c90fd92 100644 --- a/src/zencore/stream.cpp +++ b/src/zencore/stream.cpp @@ -25,8 +25,9 @@ BinaryWriter::Write(std::initializer_list<const MemoryView> Buffers) } for (const MemoryView& View : Buffers) { - memcpy(m_Buffer.data() + m_Offset, View.GetData(), View.GetSize()); - m_Offset += View.GetSize(); + size_t Size = View.GetSize(); + memcpy(m_Buffer.data() + m_Offset, View.GetData(), Size); + m_Offset += Size; } } diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index e37fb26f4..42af9b79b 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -228,7 +228,7 @@ namespace { return {static_cast<HttpResponseCode>(Result.ErrorCode), Result.Reason.empty() ? Result.Text : Result.Text.empty() ? Result.Reason - : fmt::format("{}. Reason: '{}'", Result.Text, Result.Reason)}; + : fmt::format("{}: {}", Result.Reason, Result.Text)}; } } // namespace @@ -448,7 +448,7 @@ struct ProjectStore::OplogStorage : public RefCounted return CbObject(SharedBuffer(std::move(OpBuffer))); } - OplogEntry AppendOp(SharedBuffer Buffer, uint32_t OpCoreHash, Oid KeyHash) + OplogEntry AppendOp(MemoryView Buffer, uint32_t OpCoreHash, Oid KeyHash) { ZEN_TRACE_CPU("Store::OplogStorage::AppendOp"); @@ -765,7 +765,7 @@ ProjectStore::Oplog::ReplayLog() } IoBuffer -ProjectStore::Oplog::FindChunk(Oid ChunkId) +ProjectStore::Oplog::FindChunk(const Oid& ChunkId) { RwLock::SharedLockScope OplogLock(m_OplogLock); if (!m_Storage) @@ -1044,8 +1044,8 @@ ProjectStore::Oplog::AddChunkMappings(const std::unordered_map<Oid, IoHash, Oid: void ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&, - Oid FileId, - IoHash Hash, + const Oid& FileId, + const IoHash& Hash, std::string_view ServerPath, std::string_view ClientPath) { @@ -1066,13 +1066,13 @@ ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&, } void -ProjectStore::Oplog::AddChunkMapping(const RwLock::ExclusiveLockScope&, Oid ChunkId, IoHash Hash) +ProjectStore::Oplog::AddChunkMapping(const RwLock::ExclusiveLockScope&, const Oid& ChunkId, const IoHash& Hash) { m_ChunkMap.insert_or_assign(ChunkId, Hash); } void -ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, Oid ChunkId, IoHash Hash) +ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, const Oid& ChunkId, const IoHash& Hash) { m_MetaMap.insert_or_assign(ChunkId, Hash); } @@ -1244,7 +1244,7 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) } uint32_t -ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core) +ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core) { ZEN_TRACE_CPU("Store::Oplog::AppendNewOplogEntry"); @@ -1252,7 +1252,7 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core) OplogEntryMapping Mapping = GetMapping(Core); - SharedBuffer Buffer = Core.GetBuffer(); + MemoryView Buffer = Core.GetView(); const uint64_t WriteSize = Buffer.GetSize(); const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF); @@ -3129,37 +3129,36 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op NiceBytes(MaxBlockSize), NiceBytes(MaxChunkEmbedSize)); - JobId JobId = - m_JobQueue.QueueJob(fmt::format("Export oplog '{}/{}' to {}", Project->Identifier, Oplog.OplogId(), StoreInfo.Description), - [this, - ActualRemoteStore = std::move(RemoteStore), - Project, - OplogPtr = &Oplog, - MaxBlockSize, - MaxChunkEmbedSize, - EmbedLooseFile, - CreateBlocks = StoreInfo.CreateBlocks, - UseTempBlockFiles = StoreInfo.UseTempBlockFiles, - Force](JobContext& Context) { - RemoteProjectStore::Result Result = SaveOplog(m_CidStore, - *ActualRemoteStore, - *Project.Get(), - *OplogPtr, - MaxBlockSize, - MaxChunkEmbedSize, - EmbedLooseFile, - CreateBlocks, - UseTempBlockFiles, - Force, - &Context); - auto Response = ConvertResult(Result); - ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second); - if (!IsHttpSuccessCode(Response.first)) - { - throw std::runtime_error( - fmt::format("Export failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second)); - } - }); + JobId JobId = m_JobQueue.QueueJob( + fmt::format("Export oplog '{}/{}' to {}", Project->Identifier, Oplog.OplogId(), StoreInfo.Description), + [this, + ActualRemoteStore = std::move(RemoteStore), + Project, + OplogPtr = &Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + EmbedLooseFile, + CreateBlocks = StoreInfo.CreateBlocks, + UseTempBlockFiles = StoreInfo.UseTempBlockFiles, + Force](JobContext& Context) { + RemoteProjectStore::Result Result = SaveOplog(m_CidStore, + *ActualRemoteStore, + *Project.Get(), + *OplogPtr, + MaxBlockSize, + MaxChunkEmbedSize, + EmbedLooseFile, + CreateBlocks, + UseTempBlockFiles, + Force, + &Context); + auto Response = ConvertResult(Result); + ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second); + if (!IsHttpSuccessCode(Response.first)) + { + throw std::runtime_error(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second); + } + }); return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)}; } @@ -3186,25 +3185,24 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description); - JobId JobId = - m_JobQueue.QueueJob(fmt::format("Import oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description), - [this, - ActualRemoteStore = std::move(RemoteStore), - OplogPtr = &Oplog, - MaxBlockSize, - MaxChunkEmbedSize, - Force, - IgnoreMissingAttachments](JobContext& Context) { - RemoteProjectStore::Result Result = - LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, IgnoreMissingAttachments, &Context); - auto Response = ConvertResult(Result); - ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); - if (!IsHttpSuccessCode(Response.first)) - { - throw std::runtime_error( - fmt::format("Import failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second)); - } - }); + JobId JobId = m_JobQueue.QueueJob( + fmt::format("Import oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description), + [this, + ActualRemoteStore = std::move(RemoteStore), + OplogPtr = &Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + Force, + IgnoreMissingAttachments](JobContext& Context) { + RemoteProjectStore::Result Result = + LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, IgnoreMissingAttachments, &Context); + auto Response = ConvertResult(Result); + ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); + if (!IsHttpSuccessCode(Response.first)) + { + throw std::runtime_error(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second); + } + }); return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)}; } diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index 5ebcd420c..f9611653b 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -99,7 +99,7 @@ public: int GetOpIndexByKey(const Oid& Key); int GetMaxOpIndex() const; - IoBuffer FindChunk(Oid ChunkId); + IoBuffer FindChunk(const Oid& ChunkId); inline static const uint32_t kInvalidOp = ~0u; @@ -109,7 +109,7 @@ public: */ uint32_t AppendNewOplogEntry(CbPackage Op); - uint32_t AppendNewOplogEntry(CbObject Core); + uint32_t AppendNewOplogEntry(CbObjectView Core); enum UpdateType { @@ -202,12 +202,12 @@ public: uint32_t RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, const OplogEntryMapping& OpMapping, const OplogEntry& OpEntry); void AddFileMapping(const RwLock::ExclusiveLockScope& OplogLock, - Oid FileId, - IoHash Hash, + const Oid& FileId, + const IoHash& Hash, std::string_view ServerPath, std::string_view ClientPath); - void AddChunkMapping(const RwLock::ExclusiveLockScope& OplogLock, Oid ChunkId, IoHash Hash); - void AddMetaMapping(const RwLock::ExclusiveLockScope& OplogLock, Oid ChunkId, IoHash Hash); + void AddChunkMapping(const RwLock::ExclusiveLockScope& OplogLock, const Oid& ChunkId, const IoHash& Hash); + void AddMetaMapping(const RwLock::ExclusiveLockScope& OplogLock, const Oid& ChunkId, const IoHash& Hash); friend class ProjectStoreReferenceChecker; }; diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index a8f4c5106..ddab7432d 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -82,7 +82,6 @@ ReportProgress(JobContext* OptionalContext, std::string_view CurrentOp, ptrdiff_ ZEN_ASSERT(Total > 0); OptionalContext->ReportProgress(CurrentOp, gsl::narrow<uint32_t>((100 * (Total - Remaining)) / Total)); } - ZEN_INFO("{}", CurrentOp); } void @@ -319,7 +318,7 @@ BuildContainer(CidStore& ChunkStore, { return false; } - ZEN_WARN("Failed to create temp attachment '{}', reason: '{}', retries left: {}.", + ZEN_WARN("Failed to create temp attachment '{}': '{}', retries left: {}.", AttachmentPath, Ec.message(), RetriesLeft); @@ -560,7 +559,8 @@ BuildContainer(CidStore& ChunkStore, Op.value().ToJson(Sb); RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {}); - ZEN_ERROR("Failed to build container ({}). Reason: '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason()); + ReportMessage(OptionalContext, + fmt::format("Failed to build container ({}): '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason())); BlockCreateLatch.CountDown(); while (!BlockCreateLatch.Wait(1000)) @@ -822,6 +822,7 @@ BuildContainer(CidStore& ChunkStore, RemoteResult); return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject}; } + void UploadAttachments(WorkerThreadPool& WorkerPool, CidStore& ChunkStore, @@ -892,10 +893,11 @@ UploadAttachments(WorkerThreadPool& WorkerPool, RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), "Invalid attachment", fmt::format("Upload requested of unknown attachment '{}'", Needed)); - ZEN_ERROR("Failed to upload attachment '{}'. ({}). Reason: '{}'", - Needed, - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); + ReportMessage(OptionalContext, + fmt::format("Failed to upload attachment '{}'. ({}): '{}'", + Needed, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); return; } } @@ -948,44 +950,47 @@ UploadAttachments(WorkerThreadPool& WorkerPool, SaveAttachmentsLatch.AddCount(1); AttachmentsToSave++; - WorkerPool.ScheduleWork( - [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, RawHash, &CreatedBlocks, TempPayload = std::move(Payload)]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - IoBuffer Payload = TempPayload ? TempPayload : ChunkStore.FindChunkByCid(RawHash); - if (!Payload) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Failed to find attachment {}", RawHash), - {}); - ZEN_WARN("Failed to save attachment '{}' ({}). Reason: '{}'", - RawHash, - RemoteResult.GetErrorReason(), - RemoteResult.GetError()); - return; - } + WorkerPool.ScheduleWork([&ChunkStore, + &RemoteStore, + &SaveAttachmentsLatch, + &RemoteResult, + RawHash, + &CreatedBlocks, + TempPayload = std::move(Payload), + OptionalContext]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + IoBuffer Payload = TempPayload ? TempPayload : ChunkStore.FindChunkByCid(RawHash); + if (!Payload) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to find attachment {}", RawHash), + {}); + ZEN_WARN("Failed to save attachment '{}' ({}): '{}'", RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason()); + return; + } - RemoteProjectStore::SaveAttachmentResult Result = - RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_WARN("Failed to save attachment '{}', {} ({}). Reason: '{}'", - RawHash, - NiceBytes(Payload.GetSize()), - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - return; - } - ZEN_DEBUG("Saved attachment {}, {} in {}", - RawHash, - NiceBytes(Payload.GetSize()), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ReportMessage(OptionalContext, + fmt::format("Failed to save attachment '{}', {} ({}): '{}'", + RawHash, + NiceBytes(Payload.GetSize()), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); return; - }); + } + ZEN_DEBUG("Saved attachment {}, {} in {}", + RawHash, + NiceBytes(Payload.GetSize()), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + return; + }); } if (IsCancelled(OptionalContext)) @@ -1012,31 +1017,34 @@ UploadAttachments(WorkerThreadPool& WorkerPool, ZEN_ASSERT(Payload); SaveAttachmentsLatch.AddCount(1); AttachmentsToSave++; - WorkerPool.ScheduleWork([&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } + WorkerPool.ScheduleWork( + [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash, OptionalContext]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } - RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_WARN("Failed to save attachment '{}', {} ({}). Reason: '{}'", - RawHash, - NiceBytes(Payload.GetSize()), - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - return; - } + RemoteProjectStore::SaveAttachmentResult Result = + RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ReportMessage(OptionalContext, + fmt::format("Failed to save attachment '{}', {} ({}): '{}'", + RawHash, + NiceBytes(Payload.GetSize()), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + return; + } - ZEN_DEBUG("Saved attachment {}, {} in {}", - RawHash, - NiceBytes(Payload.GetSize()), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); - return; - }); + ZEN_DEBUG("Saved attachment {}, {} in {}", + RawHash, + NiceBytes(Payload.GetSize()), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + return; + }); } if (IsCancelled(OptionalContext)) @@ -1077,7 +1085,8 @@ UploadAttachments(WorkerThreadPool& WorkerPool, &RemoteResult, &Chunks, NeededChunks = std::move(NeededChunks), - &BulkAttachmentCountToUpload]() { + &BulkAttachmentCountToUpload, + OptionalContext]() { auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); std::vector<SharedBuffer> ChunkBuffers; ChunkBuffers.reserve(NeededChunks.size()); @@ -1098,10 +1107,11 @@ UploadAttachments(WorkerThreadPool& WorkerPool, if (Result.ErrorCode) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_WARN("Failed to save attachments with {} chunks ({}). Reason: '{}'", - Chunks.size(), - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); + ReportMessage(OptionalContext, + fmt::format("Failed to save attachments with {} chunks ({}): '{}'", + Chunks.size(), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); return; } ZEN_DEBUG("Saved {} bulk attachments in {}", @@ -1182,10 +1192,7 @@ SaveOplog(CidStore& ChunkStore, { return false; } - ZEN_WARN("Failed to create temporary oplog block '{}', reason: '{}', retries left: {}.", - BlockPath, - Ec.message(), - RetriesLeft); + ZEN_WARN("Failed to create temporary oplog block '{}': '{}', retries left: {}.", BlockPath, Ec.message(), RetriesLeft); Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms RetriesLeft--; return true; @@ -1217,12 +1224,13 @@ SaveOplog(CidStore& ChunkStore, } }; - auto UploadBlock = [&RemoteStore, &RemoteResult](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) { + auto UploadBlock = [&RemoteStore, &RemoteResult, OptionalContext](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) { RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash); if (Result.ErrorCode) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_WARN("Failed to save attachment ({}). Reason: '{}'", RemoteResult.GetErrorReason(), RemoteResult.GetError()); + ReportMessage(OptionalContext, + fmt::format("Failed to save attachment ({}): '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return; } ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize())); @@ -1262,9 +1270,10 @@ SaveOplog(CidStore& ChunkStore, { if (BaseContainerResult.ErrorCode) { - ZEN_WARN("Failed to load oplog base container, reason: '{}', error code: {}", - BaseContainerResult.Reason, - BaseContainerResult.ErrorCode); + ReportMessage(OptionalContext, + fmt::format("Failed to load oplog base container: '{}', error code: {}", + BaseContainerResult.Reason, + BaseContainerResult.ErrorCode)); } else { @@ -1327,7 +1336,8 @@ SaveOplog(CidStore& ChunkStore, if (ContainerSaveResult.ErrorCode) { RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container"); - ZEN_WARN("Failed to save oplog container ({}). Reason: '{}'", RemoteResult.GetErrorReason(), RemoteResult.GetError()); + ReportMessage(OptionalContext, + fmt::format("Failed to save oplog container ({}): '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason())); } else { @@ -1361,10 +1371,13 @@ SaveOplog(CidStore& ChunkStore, if (ContainerFinalizeResult.ErrorCode) { RemoteResult.SetError(ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Reason, ContainerFinalizeResult.Text); - ZEN_WARN("Failed to finalize oplog container {} ({}). Reason: '{}'", - ContainerSaveResult.RawHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); + ReportMessage(OptionalContext, + fmt::format("Failed to finalize oplog container {} ({}): '{}'", + ContainerSaveResult.RawHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + RemoteProjectStore::Result Result = RemoteResult.ConvertResult(); + return Result; } ZEN_DEBUG("Finalized container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000))); if (ContainerFinalizeResult.Needs.empty()) @@ -1429,8 +1442,9 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, continue; } OnNeedAttachment(AttachmentHash); + NeedAttachmentCount++; }; - ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachements", NeedAttachmentCount, LargeChunksArray.Num())); + ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num())); size_t NeedBlockCount = 0; CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); @@ -1470,6 +1484,7 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, } OnNeedBlock(BlockHash, {}); + NeedBlockCount++; break; } }; @@ -1482,33 +1497,51 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, CbObject SectionObject = LoadCompactBinaryObject(SectionPayload); if (!SectionObject) { - ZEN_WARN("Failed to save oplog container. Reason: '{}'", "Section has unexpected data type"); + ReportMessage(OptionalContext, fmt::format("Failed to save oplog container: '{}'", "Section has unexpected data type")); return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest), Timer.GetElapsedTimeMs() / 1000.500, "Section has unexpected data type", "Failed to save oplog container"}; } - CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); + CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); + const uint64_t OpCount = OpsArray.Num(); + uint64_t OpsLoaded = 0; ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpsArray.Num())); + BinaryWriter Writer; for (CbFieldView OpEntry : OpsArray) { - CbObjectView Core = OpEntry.AsObjectView(); - BinaryWriter Writer; - Core.CopyTo(Writer); - MemoryView OpView = Writer.GetView(); - IoBuffer OpBuffer(IoBuffer::Wrap, OpView.GetData(), OpView.GetSize()); - CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType); - const uint32_t OpLsn = Oplog.AppendNewOplogEntry(Op); + CbObjectView Op = OpEntry.AsObjectView(); + Op.CopyTo(Writer); + CbObjectView TypedOp(Writer.GetData()); + const uint32_t OpLsn = Oplog.AppendNewOplogEntry(TypedOp); + Writer.Reset(); if (OpLsn == ProjectStore::Oplog::kInvalidOp) { + ReportMessage(OptionalContext, fmt::format("Failed to save op {}", OpsLoaded)); return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest), Timer.GetElapsedTimeMs() / 1000.500, "Failed saving op", "Failed to save oplog container"}; } ZEN_DEBUG("oplog entry #{}", OpLsn); + if (OpCount > 100000) + { + if (IsCancelled(OptionalContext)) + { + return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::OK), + Timer.GetElapsedTimeMs() / 1000.500, + "Operation cancelled", + ""}; + } + if (OpsLoaded % 10000 == 0) + { + ReportProgress(OptionalContext, "Writing oplog", OpCount, OpCount - OpsLoaded); + } + } + OpsLoaded++; } + ReportProgress(OptionalContext, "Writing oplog", OpCount, 0); return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500}; } @@ -1522,6 +1555,20 @@ LoadOplog(CidStore& ChunkStore, { using namespace std::literals; + struct DownloadInfo + { + uint64_t OplogSizeBytes = 0; + std::atomic<uint64_t> AttachmentsDownloaded = 0; + std::atomic<uint64_t> AttachmentBlocksDownloaded = 0; + std::atomic<uint64_t> AttachmentBytesDownloaded = 0; + std::atomic<uint64_t> AttachmentBlockBytesDownloaded = 0; + std::atomic<uint64_t> AttachmentsStored = 0; + std::atomic<uint64_t> AttachmentBytesStored = 0; + std::atomic_size_t MissingAttachmentCount = 0; + }; + + DownloadInfo Info; + Stopwatch Timer; WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); @@ -1532,14 +1579,19 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore::LoadContainerResult LoadContainerResult = RemoteStore.LoadContainer(); if (LoadContainerResult.ErrorCode) { - ZEN_WARN("Failed to load oplog container, reason: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode); + ReportMessage( + OptionalContext, + fmt::format("Failed to load oplog container: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode)); return RemoteProjectStore::Result{.ErrorCode = LoadContainerResult.ErrorCode, .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, .Reason = LoadContainerResult.Reason, .Text = LoadContainerResult.Text}; } ReportMessage(OptionalContext, - fmt::format("Loaded container in {}", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)))); + fmt::format("Loaded container in {} ({})", + NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)), + NiceBytes(LoadContainerResult.ContainerObject.GetSize()))); + Info.OplogSizeBytes = LoadContainerResult.ContainerObject.GetSize(); AsyncRemoteResult RemoteResult; Latch AttachmentsWorkLatch(1); @@ -1555,7 +1607,9 @@ LoadOplog(CidStore& ChunkStore, &AttachmentsWorkLatch, &AttachmentCount, &RemoteResult, - IgnoreMissingAttachments](const IoHash& BlockHash, std::vector<IoHash>&& Chunks) { + &Info, + IgnoreMissingAttachments, + OptionalContext](const IoHash& BlockHash, std::vector<IoHash>&& Chunks) { if (RemoteResult.IsError()) { return; @@ -1564,39 +1618,70 @@ LoadOplog(CidStore& ChunkStore, { AttachmentsWorkLatch.AddCount(1); AttachmentCount.fetch_add(1); - WorkerPool.ScheduleWork( - [&RemoteStore, &ChunkStore, &AttachmentsWorkLatch, &RemoteResult, Chunks = std::move(Chunks), IgnoreMissingAttachments]() { - auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); - if (RemoteResult.IsError()) + WorkerPool.ScheduleWork([&RemoteStore, + &ChunkStore, + &AttachmentsWorkLatch, + &RemoteResult, + Chunks = std::move(Chunks), + &Info, + IgnoreMissingAttachments, + OptionalContext]() { + auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); + if (Result.ErrorCode) + { + ReportMessage(OptionalContext, + fmt::format("Failed to load attachments with {} chunks ({}): '{}'", + Chunks.size(), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + if (IgnoreMissingAttachments) { - return; + Info.MissingAttachmentCount.fetch_add(1); } - RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); - if (Result.ErrorCode) + else { - ZEN_WARN("Failed to load attachments with {} chunks ({}). Reason: '{}'", - Chunks.size(), - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - if (!IgnoreMissingAttachments) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - } - return; + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); } - ZEN_DEBUG("Loaded {} bulk attachments in {}", - Chunks.size(), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); - for (const auto& It : Result.Chunks) - { + return; + } + Info.AttachmentsDownloaded.fetch_add(Chunks.size()); + ZEN_INFO("Loaded {} bulk attachments in {}", + Chunks.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + if (RemoteResult.IsError()) + { + return; + } + for (const auto& It : Result.Chunks) + { + uint64_t ChunkSize = It.second.GetCompressedSize(); + Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); + CidStore::InsertResult InsertResult = ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), It.first, CidStore::InsertMode::kCopyOnly); + if (InsertResult.New) + { + Info.AttachmentBytesStored.fetch_add(ChunkSize); + Info.AttachmentsStored.fetch_add(1); } - }); + } + }); return; } AttachmentsWorkLatch.AddCount(1); AttachmentCount.fetch_add(1); - WorkerPool.ScheduleWork([&AttachmentsWorkLatch, &ChunkStore, &RemoteStore, BlockHash, &RemoteResult, IgnoreMissingAttachments]() { + WorkerPool.ScheduleWork([&AttachmentsWorkLatch, + &ChunkStore, + &RemoteStore, + BlockHash, + &RemoteResult, + &Info, + IgnoreMissingAttachments, + OptionalContext]() { auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -1605,29 +1690,53 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash); if (BlockResult.ErrorCode) { - ZEN_WARN("Failed to load oplog container, missing attachment {} ({}). Reason: '{}'", - BlockHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - if (!IgnoreMissingAttachments) + ReportMessage(OptionalContext, + fmt::format("Failed to download block attachment {} ({}): '{}'", + BlockHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + if (IgnoreMissingAttachments) + { + Info.MissingAttachmentCount.fetch_add(1); + } + else { RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); } return; } - ZEN_DEBUG("Loaded block attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000))); + Info.AttachmentBlocksDownloaded.fetch_add(1); + ZEN_INFO("Loaded block attachment '{}' in {}", + BlockHash, + NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000))); + if (RemoteResult.IsError()) + { + return; + } - if (!IterateBlock(std::move(BlockResult.Bytes), [&ChunkStore](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { - ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash); - })) + bool StoreChunksOK = + IterateBlock(std::move(BlockResult.Bytes), [&ChunkStore, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { + uint64_t ChunkSize = Chunk.GetCompressedSize(); + Info.AttachmentBlockBytesDownloaded.fetch_add(ChunkSize); + CidStore::InsertResult InsertResult = + ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash); + if (InsertResult.New) + { + Info.AttachmentBytesStored.fetch_add(ChunkSize); + Info.AttachmentsStored.fetch_add(1); + } + }); + + if (!StoreChunksOK) { + ReportMessage(OptionalContext, + fmt::format("Block attachment {} has invalid format ({}): '{}'", + BlockHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), fmt::format("Invalid format for block {}", BlockHash), {}); - ZEN_WARN("Failed to load oplog container, attachment {} has invalid format ({}). Reason: '{}'", - BlockHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); return; } }); @@ -1640,44 +1749,74 @@ LoadOplog(CidStore& ChunkStore, &RemoteResult, &Attachments, &AttachmentCount, - IgnoreMissingAttachments](const IoHash& RawHash) { + &Info, + IgnoreMissingAttachments, + OptionalContext](const IoHash& RawHash) { if (!Attachments.insert(RawHash).second) { return; } + if (RemoteResult.IsError()) + { + return; + } AttachmentsWorkLatch.AddCount(1); AttachmentCount.fetch_add(1); - WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash, IgnoreMissingAttachments]() { - auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); - if (AttachmentResult.ErrorCode) - { - ZEN_WARN("Failed to download attachment {}, reason: '{}', error code: {}", + WorkerPool.ScheduleWork( + [&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash, &Info, IgnoreMissingAttachments, OptionalContext]() { + auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); + if (AttachmentResult.ErrorCode) + { + ReportMessage(OptionalContext, + fmt::format("Failed to download large attachment {}: '{}', error code : {}", + RawHash, + AttachmentResult.Reason, + AttachmentResult.ErrorCode)); + if (IgnoreMissingAttachments) + { + Info.MissingAttachmentCount.fetch_add(1); + } + else + { + RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); + } + return; + } + ZEN_INFO("Loaded large attachment '{}' in {}", RawHash, - AttachmentResult.Reason, - AttachmentResult.ErrorCode); - if (!IgnoreMissingAttachments) + NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000))); + Info.AttachmentsDownloaded.fetch_add(1); + if (RemoteResult.IsError()) { - RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); + return; } - return; - } - ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000))); - ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash); - }); + uint64_t ChunkSize = AttachmentResult.Bytes.GetSize(); + Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); + CidStore::InsertResult InsertResult = ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash); + if (InsertResult.New) + { + Info.AttachmentBytesStored.fetch_add(ChunkSize); + Info.AttachmentsStored.fetch_add(1); + } + }); }; RemoteProjectStore::Result Result = SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, OptionalContext); - if (!Attachments.empty()) + if (Result.ErrorCode != 0) { - ReportMessage(OptionalContext, fmt::format("Found {} attachments to download", Attachments.size())); + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); } + ReportMessage(OptionalContext, + fmt::format("Loaded oplog container in {}, found {} attachments to download", + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), + Attachments.size())); AttachmentsWorkLatch.CountDown(); while (!AttachmentsWorkLatch.Wait(1000)) @@ -1703,10 +1842,17 @@ LoadOplog(CidStore& ChunkStore, Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; ReportMessage(OptionalContext, - fmt::format("Loaded oplog {} in {}", + fmt::format("Loaded oplog {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {}", RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)))); - + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), + NiceBytes(Info.OplogSizeBytes), + Info.AttachmentBlocksDownloaded.load(), + NiceBytes(Info.AttachmentBlockBytesDownloaded.load()), + Info.AttachmentsDownloaded.load(), + NiceBytes(Info.AttachmentBytesDownloaded.load()), + NiceBytes(Info.AttachmentsStored.load()), + NiceBytes(Info.AttachmentBytesStored.load()), + Info.MissingAttachmentCount.load())); return Result; } |