diff options
| author | Dan Engelbrecht <[email protected]> | 2024-01-23 10:21:03 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-01-23 10:21:03 +0100 |
| commit | ff8d2e7c432c58114b528bfc9670eba7e387843c (patch) | |
| tree | 5dbb80fbab73835047794a56a76d45220b33571c /src | |
| parent | add --ignore-missing-attachments to oplog-import command (#637) (diff) | |
| download | zen-ff8d2e7c432c58114b528bfc9670eba7e387843c.tar.xz zen-ff8d2e7c432c58114b528bfc9670eba7e387843c.zip | |
oplog import/export improvements (#634)
* improve feedback from oplog import/export
* improve oplog save performance
Diffstat (limited to 'src')
| -rw-r--r-- | src/zencore/stream.cpp | 5 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 116 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 12 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 464 |
4 files changed, 371 insertions, 226 deletions
diff --git a/src/zencore/stream.cpp b/src/zencore/stream.cpp index ee97a53c4..13c90fd92 100644 --- a/src/zencore/stream.cpp +++ b/src/zencore/stream.cpp @@ -25,8 +25,9 @@ BinaryWriter::Write(std::initializer_list<const MemoryView> Buffers) } for (const MemoryView& View : Buffers) { - memcpy(m_Buffer.data() + m_Offset, View.GetData(), View.GetSize()); - m_Offset += View.GetSize(); + size_t Size = View.GetSize(); + memcpy(m_Buffer.data() + m_Offset, View.GetData(), Size); + m_Offset += Size; } } diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index e37fb26f4..42af9b79b 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -228,7 +228,7 @@ namespace { return {static_cast<HttpResponseCode>(Result.ErrorCode), Result.Reason.empty() ? Result.Text : Result.Text.empty() ? Result.Reason - : fmt::format("{}. Reason: '{}'", Result.Text, Result.Reason)}; + : fmt::format("{}: {}", Result.Reason, Result.Text)}; } } // namespace @@ -448,7 +448,7 @@ struct ProjectStore::OplogStorage : public RefCounted return CbObject(SharedBuffer(std::move(OpBuffer))); } - OplogEntry AppendOp(SharedBuffer Buffer, uint32_t OpCoreHash, Oid KeyHash) + OplogEntry AppendOp(MemoryView Buffer, uint32_t OpCoreHash, Oid KeyHash) { ZEN_TRACE_CPU("Store::OplogStorage::AppendOp"); @@ -765,7 +765,7 @@ ProjectStore::Oplog::ReplayLog() } IoBuffer -ProjectStore::Oplog::FindChunk(Oid ChunkId) +ProjectStore::Oplog::FindChunk(const Oid& ChunkId) { RwLock::SharedLockScope OplogLock(m_OplogLock); if (!m_Storage) @@ -1044,8 +1044,8 @@ ProjectStore::Oplog::AddChunkMappings(const std::unordered_map<Oid, IoHash, Oid: void ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&, - Oid FileId, - IoHash Hash, + const Oid& FileId, + const IoHash& Hash, std::string_view ServerPath, std::string_view ClientPath) { @@ -1066,13 +1066,13 @@ ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&, } void -ProjectStore::Oplog::AddChunkMapping(const RwLock::ExclusiveLockScope&, Oid ChunkId, IoHash Hash) +ProjectStore::Oplog::AddChunkMapping(const RwLock::ExclusiveLockScope&, const Oid& ChunkId, const IoHash& Hash) { m_ChunkMap.insert_or_assign(ChunkId, Hash); } void -ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, Oid ChunkId, IoHash Hash) +ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, const Oid& ChunkId, const IoHash& Hash) { m_MetaMap.insert_or_assign(ChunkId, Hash); } @@ -1244,7 +1244,7 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) } uint32_t -ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core) +ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core) { ZEN_TRACE_CPU("Store::Oplog::AppendNewOplogEntry"); @@ -1252,7 +1252,7 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core) OplogEntryMapping Mapping = GetMapping(Core); - SharedBuffer Buffer = Core.GetBuffer(); + MemoryView Buffer = Core.GetView(); const uint64_t WriteSize = Buffer.GetSize(); const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF); @@ -3129,37 +3129,36 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op NiceBytes(MaxBlockSize), NiceBytes(MaxChunkEmbedSize)); - JobId JobId = - m_JobQueue.QueueJob(fmt::format("Export oplog '{}/{}' to {}", Project->Identifier, Oplog.OplogId(), StoreInfo.Description), - [this, - ActualRemoteStore = std::move(RemoteStore), - Project, - OplogPtr = &Oplog, - MaxBlockSize, - MaxChunkEmbedSize, - EmbedLooseFile, - CreateBlocks = StoreInfo.CreateBlocks, - UseTempBlockFiles = StoreInfo.UseTempBlockFiles, - Force](JobContext& Context) { - RemoteProjectStore::Result Result = SaveOplog(m_CidStore, - *ActualRemoteStore, - *Project.Get(), - *OplogPtr, - MaxBlockSize, - MaxChunkEmbedSize, - EmbedLooseFile, - CreateBlocks, - UseTempBlockFiles, - Force, - &Context); - auto Response = ConvertResult(Result); - ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second); - if (!IsHttpSuccessCode(Response.first)) - { - throw std::runtime_error( - fmt::format("Export failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second)); - } - }); + JobId JobId = m_JobQueue.QueueJob( + fmt::format("Export oplog '{}/{}' to {}", Project->Identifier, Oplog.OplogId(), StoreInfo.Description), + [this, + ActualRemoteStore = std::move(RemoteStore), + Project, + OplogPtr = &Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + EmbedLooseFile, + CreateBlocks = StoreInfo.CreateBlocks, + UseTempBlockFiles = StoreInfo.UseTempBlockFiles, + Force](JobContext& Context) { + RemoteProjectStore::Result Result = SaveOplog(m_CidStore, + *ActualRemoteStore, + *Project.Get(), + *OplogPtr, + MaxBlockSize, + MaxChunkEmbedSize, + EmbedLooseFile, + CreateBlocks, + UseTempBlockFiles, + Force, + &Context); + auto Response = ConvertResult(Result); + ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second); + if (!IsHttpSuccessCode(Response.first)) + { + throw std::runtime_error(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second); + } + }); return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)}; } @@ -3186,25 +3185,24 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description); - JobId JobId = - m_JobQueue.QueueJob(fmt::format("Import oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description), - [this, - ActualRemoteStore = std::move(RemoteStore), - OplogPtr = &Oplog, - MaxBlockSize, - MaxChunkEmbedSize, - Force, - IgnoreMissingAttachments](JobContext& Context) { - RemoteProjectStore::Result Result = - LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, IgnoreMissingAttachments, &Context); - auto Response = ConvertResult(Result); - ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); - if (!IsHttpSuccessCode(Response.first)) - { - throw std::runtime_error( - fmt::format("Import failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second)); - } - }); + JobId JobId = m_JobQueue.QueueJob( + fmt::format("Import oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description), + [this, + ActualRemoteStore = std::move(RemoteStore), + OplogPtr = &Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + Force, + IgnoreMissingAttachments](JobContext& Context) { + RemoteProjectStore::Result Result = + LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, IgnoreMissingAttachments, &Context); + auto Response = ConvertResult(Result); + ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); + if (!IsHttpSuccessCode(Response.first)) + { + throw std::runtime_error(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second); + } + }); return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)}; } diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index 5ebcd420c..f9611653b 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -99,7 +99,7 @@ public: int GetOpIndexByKey(const Oid& Key); int GetMaxOpIndex() const; - IoBuffer FindChunk(Oid ChunkId); + IoBuffer FindChunk(const Oid& ChunkId); inline static const uint32_t kInvalidOp = ~0u; @@ -109,7 +109,7 @@ public: */ uint32_t AppendNewOplogEntry(CbPackage Op); - uint32_t AppendNewOplogEntry(CbObject Core); + uint32_t AppendNewOplogEntry(CbObjectView Core); enum UpdateType { @@ -202,12 +202,12 @@ public: uint32_t RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, const OplogEntryMapping& OpMapping, const OplogEntry& OpEntry); void AddFileMapping(const RwLock::ExclusiveLockScope& OplogLock, - Oid FileId, - IoHash Hash, + const Oid& FileId, + const IoHash& Hash, std::string_view ServerPath, std::string_view ClientPath); - void AddChunkMapping(const RwLock::ExclusiveLockScope& OplogLock, Oid ChunkId, IoHash Hash); - void AddMetaMapping(const RwLock::ExclusiveLockScope& OplogLock, Oid ChunkId, IoHash Hash); + void AddChunkMapping(const RwLock::ExclusiveLockScope& OplogLock, const Oid& ChunkId, const IoHash& Hash); + void AddMetaMapping(const RwLock::ExclusiveLockScope& OplogLock, const Oid& ChunkId, const IoHash& Hash); friend class ProjectStoreReferenceChecker; }; diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index a8f4c5106..ddab7432d 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -82,7 +82,6 @@ ReportProgress(JobContext* OptionalContext, std::string_view CurrentOp, ptrdiff_ ZEN_ASSERT(Total > 0); OptionalContext->ReportProgress(CurrentOp, gsl::narrow<uint32_t>((100 * (Total - Remaining)) / Total)); } - ZEN_INFO("{}", CurrentOp); } void @@ -319,7 +318,7 @@ BuildContainer(CidStore& ChunkStore, { return false; } - ZEN_WARN("Failed to create temp attachment '{}', reason: '{}', retries left: {}.", + ZEN_WARN("Failed to create temp attachment '{}': '{}', retries left: {}.", AttachmentPath, Ec.message(), RetriesLeft); @@ -560,7 +559,8 @@ BuildContainer(CidStore& ChunkStore, Op.value().ToJson(Sb); RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {}); - ZEN_ERROR("Failed to build container ({}). Reason: '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason()); + ReportMessage(OptionalContext, + fmt::format("Failed to build container ({}): '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason())); BlockCreateLatch.CountDown(); while (!BlockCreateLatch.Wait(1000)) @@ -822,6 +822,7 @@ BuildContainer(CidStore& ChunkStore, RemoteResult); return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject}; } + void UploadAttachments(WorkerThreadPool& WorkerPool, CidStore& ChunkStore, @@ -892,10 +893,11 @@ UploadAttachments(WorkerThreadPool& WorkerPool, RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), "Invalid attachment", fmt::format("Upload requested of unknown attachment '{}'", Needed)); - ZEN_ERROR("Failed to upload attachment '{}'. ({}). Reason: '{}'", - Needed, - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); + ReportMessage(OptionalContext, + fmt::format("Failed to upload attachment '{}'. ({}): '{}'", + Needed, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); return; } } @@ -948,44 +950,47 @@ UploadAttachments(WorkerThreadPool& WorkerPool, SaveAttachmentsLatch.AddCount(1); AttachmentsToSave++; - WorkerPool.ScheduleWork( - [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, RawHash, &CreatedBlocks, TempPayload = std::move(Payload)]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - IoBuffer Payload = TempPayload ? TempPayload : ChunkStore.FindChunkByCid(RawHash); - if (!Payload) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Failed to find attachment {}", RawHash), - {}); - ZEN_WARN("Failed to save attachment '{}' ({}). Reason: '{}'", - RawHash, - RemoteResult.GetErrorReason(), - RemoteResult.GetError()); - return; - } + WorkerPool.ScheduleWork([&ChunkStore, + &RemoteStore, + &SaveAttachmentsLatch, + &RemoteResult, + RawHash, + &CreatedBlocks, + TempPayload = std::move(Payload), + OptionalContext]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + IoBuffer Payload = TempPayload ? TempPayload : ChunkStore.FindChunkByCid(RawHash); + if (!Payload) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to find attachment {}", RawHash), + {}); + ZEN_WARN("Failed to save attachment '{}' ({}): '{}'", RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason()); + return; + } - RemoteProjectStore::SaveAttachmentResult Result = - RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_WARN("Failed to save attachment '{}', {} ({}). Reason: '{}'", - RawHash, - NiceBytes(Payload.GetSize()), - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - return; - } - ZEN_DEBUG("Saved attachment {}, {} in {}", - RawHash, - NiceBytes(Payload.GetSize()), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ReportMessage(OptionalContext, + fmt::format("Failed to save attachment '{}', {} ({}): '{}'", + RawHash, + NiceBytes(Payload.GetSize()), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); return; - }); + } + ZEN_DEBUG("Saved attachment {}, {} in {}", + RawHash, + NiceBytes(Payload.GetSize()), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + return; + }); } if (IsCancelled(OptionalContext)) @@ -1012,31 +1017,34 @@ UploadAttachments(WorkerThreadPool& WorkerPool, ZEN_ASSERT(Payload); SaveAttachmentsLatch.AddCount(1); AttachmentsToSave++; - WorkerPool.ScheduleWork([&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } + WorkerPool.ScheduleWork( + [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash, OptionalContext]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } - RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_WARN("Failed to save attachment '{}', {} ({}). Reason: '{}'", - RawHash, - NiceBytes(Payload.GetSize()), - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - return; - } + RemoteProjectStore::SaveAttachmentResult Result = + RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ReportMessage(OptionalContext, + fmt::format("Failed to save attachment '{}', {} ({}): '{}'", + RawHash, + NiceBytes(Payload.GetSize()), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + return; + } - ZEN_DEBUG("Saved attachment {}, {} in {}", - RawHash, - NiceBytes(Payload.GetSize()), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); - return; - }); + ZEN_DEBUG("Saved attachment {}, {} in {}", + RawHash, + NiceBytes(Payload.GetSize()), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + return; + }); } if (IsCancelled(OptionalContext)) @@ -1077,7 +1085,8 @@ UploadAttachments(WorkerThreadPool& WorkerPool, &RemoteResult, &Chunks, NeededChunks = std::move(NeededChunks), - &BulkAttachmentCountToUpload]() { + &BulkAttachmentCountToUpload, + OptionalContext]() { auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); std::vector<SharedBuffer> ChunkBuffers; ChunkBuffers.reserve(NeededChunks.size()); @@ -1098,10 +1107,11 @@ UploadAttachments(WorkerThreadPool& WorkerPool, if (Result.ErrorCode) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_WARN("Failed to save attachments with {} chunks ({}). Reason: '{}'", - Chunks.size(), - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); + ReportMessage(OptionalContext, + fmt::format("Failed to save attachments with {} chunks ({}): '{}'", + Chunks.size(), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); return; } ZEN_DEBUG("Saved {} bulk attachments in {}", @@ -1182,10 +1192,7 @@ SaveOplog(CidStore& ChunkStore, { return false; } - ZEN_WARN("Failed to create temporary oplog block '{}', reason: '{}', retries left: {}.", - BlockPath, - Ec.message(), - RetriesLeft); + ZEN_WARN("Failed to create temporary oplog block '{}': '{}', retries left: {}.", BlockPath, Ec.message(), RetriesLeft); Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms RetriesLeft--; return true; @@ -1217,12 +1224,13 @@ SaveOplog(CidStore& ChunkStore, } }; - auto UploadBlock = [&RemoteStore, &RemoteResult](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) { + auto UploadBlock = [&RemoteStore, &RemoteResult, OptionalContext](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) { RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash); if (Result.ErrorCode) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_WARN("Failed to save attachment ({}). Reason: '{}'", RemoteResult.GetErrorReason(), RemoteResult.GetError()); + ReportMessage(OptionalContext, + fmt::format("Failed to save attachment ({}): '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return; } ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize())); @@ -1262,9 +1270,10 @@ SaveOplog(CidStore& ChunkStore, { if (BaseContainerResult.ErrorCode) { - ZEN_WARN("Failed to load oplog base container, reason: '{}', error code: {}", - BaseContainerResult.Reason, - BaseContainerResult.ErrorCode); + ReportMessage(OptionalContext, + fmt::format("Failed to load oplog base container: '{}', error code: {}", + BaseContainerResult.Reason, + BaseContainerResult.ErrorCode)); } else { @@ -1327,7 +1336,8 @@ SaveOplog(CidStore& ChunkStore, if (ContainerSaveResult.ErrorCode) { RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container"); - ZEN_WARN("Failed to save oplog container ({}). Reason: '{}'", RemoteResult.GetErrorReason(), RemoteResult.GetError()); + ReportMessage(OptionalContext, + fmt::format("Failed to save oplog container ({}): '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason())); } else { @@ -1361,10 +1371,13 @@ SaveOplog(CidStore& ChunkStore, if (ContainerFinalizeResult.ErrorCode) { RemoteResult.SetError(ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Reason, ContainerFinalizeResult.Text); - ZEN_WARN("Failed to finalize oplog container {} ({}). Reason: '{}'", - ContainerSaveResult.RawHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); + ReportMessage(OptionalContext, + fmt::format("Failed to finalize oplog container {} ({}): '{}'", + ContainerSaveResult.RawHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + RemoteProjectStore::Result Result = RemoteResult.ConvertResult(); + return Result; } ZEN_DEBUG("Finalized container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000))); if (ContainerFinalizeResult.Needs.empty()) @@ -1429,8 +1442,9 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, continue; } OnNeedAttachment(AttachmentHash); + NeedAttachmentCount++; }; - ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachements", NeedAttachmentCount, LargeChunksArray.Num())); + ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num())); size_t NeedBlockCount = 0; CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); @@ -1470,6 +1484,7 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, } OnNeedBlock(BlockHash, {}); + NeedBlockCount++; break; } }; @@ -1482,33 +1497,51 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, CbObject SectionObject = LoadCompactBinaryObject(SectionPayload); if (!SectionObject) { - ZEN_WARN("Failed to save oplog container. Reason: '{}'", "Section has unexpected data type"); + ReportMessage(OptionalContext, fmt::format("Failed to save oplog container: '{}'", "Section has unexpected data type")); return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest), Timer.GetElapsedTimeMs() / 1000.500, "Section has unexpected data type", "Failed to save oplog container"}; } - CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); + CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); + const uint64_t OpCount = OpsArray.Num(); + uint64_t OpsLoaded = 0; ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpsArray.Num())); + BinaryWriter Writer; for (CbFieldView OpEntry : OpsArray) { - CbObjectView Core = OpEntry.AsObjectView(); - BinaryWriter Writer; - Core.CopyTo(Writer); - MemoryView OpView = Writer.GetView(); - IoBuffer OpBuffer(IoBuffer::Wrap, OpView.GetData(), OpView.GetSize()); - CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType); - const uint32_t OpLsn = Oplog.AppendNewOplogEntry(Op); + CbObjectView Op = OpEntry.AsObjectView(); + Op.CopyTo(Writer); + CbObjectView TypedOp(Writer.GetData()); + const uint32_t OpLsn = Oplog.AppendNewOplogEntry(TypedOp); + Writer.Reset(); if (OpLsn == ProjectStore::Oplog::kInvalidOp) { + ReportMessage(OptionalContext, fmt::format("Failed to save op {}", OpsLoaded)); return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest), Timer.GetElapsedTimeMs() / 1000.500, "Failed saving op", "Failed to save oplog container"}; } ZEN_DEBUG("oplog entry #{}", OpLsn); + if (OpCount > 100000) + { + if (IsCancelled(OptionalContext)) + { + return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::OK), + Timer.GetElapsedTimeMs() / 1000.500, + "Operation cancelled", + ""}; + } + if (OpsLoaded % 10000 == 0) + { + ReportProgress(OptionalContext, "Writing oplog", OpCount, OpCount - OpsLoaded); + } + } + OpsLoaded++; } + ReportProgress(OptionalContext, "Writing oplog", OpCount, 0); return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500}; } @@ -1522,6 +1555,20 @@ LoadOplog(CidStore& ChunkStore, { using namespace std::literals; + struct DownloadInfo + { + uint64_t OplogSizeBytes = 0; + std::atomic<uint64_t> AttachmentsDownloaded = 0; + std::atomic<uint64_t> AttachmentBlocksDownloaded = 0; + std::atomic<uint64_t> AttachmentBytesDownloaded = 0; + std::atomic<uint64_t> AttachmentBlockBytesDownloaded = 0; + std::atomic<uint64_t> AttachmentsStored = 0; + std::atomic<uint64_t> AttachmentBytesStored = 0; + std::atomic_size_t MissingAttachmentCount = 0; + }; + + DownloadInfo Info; + Stopwatch Timer; WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); @@ -1532,14 +1579,19 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore::LoadContainerResult LoadContainerResult = RemoteStore.LoadContainer(); if (LoadContainerResult.ErrorCode) { - ZEN_WARN("Failed to load oplog container, reason: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode); + ReportMessage( + OptionalContext, + fmt::format("Failed to load oplog container: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode)); return RemoteProjectStore::Result{.ErrorCode = LoadContainerResult.ErrorCode, .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, .Reason = LoadContainerResult.Reason, .Text = LoadContainerResult.Text}; } ReportMessage(OptionalContext, - fmt::format("Loaded container in {}", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)))); + fmt::format("Loaded container in {} ({})", + NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)), + NiceBytes(LoadContainerResult.ContainerObject.GetSize()))); + Info.OplogSizeBytes = LoadContainerResult.ContainerObject.GetSize(); AsyncRemoteResult RemoteResult; Latch AttachmentsWorkLatch(1); @@ -1555,7 +1607,9 @@ LoadOplog(CidStore& ChunkStore, &AttachmentsWorkLatch, &AttachmentCount, &RemoteResult, - IgnoreMissingAttachments](const IoHash& BlockHash, std::vector<IoHash>&& Chunks) { + &Info, + IgnoreMissingAttachments, + OptionalContext](const IoHash& BlockHash, std::vector<IoHash>&& Chunks) { if (RemoteResult.IsError()) { return; @@ -1564,39 +1618,70 @@ LoadOplog(CidStore& ChunkStore, { AttachmentsWorkLatch.AddCount(1); AttachmentCount.fetch_add(1); - WorkerPool.ScheduleWork( - [&RemoteStore, &ChunkStore, &AttachmentsWorkLatch, &RemoteResult, Chunks = std::move(Chunks), IgnoreMissingAttachments]() { - auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); - if (RemoteResult.IsError()) + WorkerPool.ScheduleWork([&RemoteStore, + &ChunkStore, + &AttachmentsWorkLatch, + &RemoteResult, + Chunks = std::move(Chunks), + &Info, + IgnoreMissingAttachments, + OptionalContext]() { + auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); + if (Result.ErrorCode) + { + ReportMessage(OptionalContext, + fmt::format("Failed to load attachments with {} chunks ({}): '{}'", + Chunks.size(), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + if (IgnoreMissingAttachments) { - return; + Info.MissingAttachmentCount.fetch_add(1); } - RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); - if (Result.ErrorCode) + else { - ZEN_WARN("Failed to load attachments with {} chunks ({}). Reason: '{}'", - Chunks.size(), - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - if (!IgnoreMissingAttachments) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - } - return; + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); } - ZEN_DEBUG("Loaded {} bulk attachments in {}", - Chunks.size(), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); - for (const auto& It : Result.Chunks) - { + return; + } + Info.AttachmentsDownloaded.fetch_add(Chunks.size()); + ZEN_INFO("Loaded {} bulk attachments in {}", + Chunks.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + if (RemoteResult.IsError()) + { + return; + } + for (const auto& It : Result.Chunks) + { + uint64_t ChunkSize = It.second.GetCompressedSize(); + Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); + CidStore::InsertResult InsertResult = ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), It.first, CidStore::InsertMode::kCopyOnly); + if (InsertResult.New) + { + Info.AttachmentBytesStored.fetch_add(ChunkSize); + Info.AttachmentsStored.fetch_add(1); } - }); + } + }); return; } AttachmentsWorkLatch.AddCount(1); AttachmentCount.fetch_add(1); - WorkerPool.ScheduleWork([&AttachmentsWorkLatch, &ChunkStore, &RemoteStore, BlockHash, &RemoteResult, IgnoreMissingAttachments]() { + WorkerPool.ScheduleWork([&AttachmentsWorkLatch, + &ChunkStore, + &RemoteStore, + BlockHash, + &RemoteResult, + &Info, + IgnoreMissingAttachments, + OptionalContext]() { auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -1605,29 +1690,53 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash); if (BlockResult.ErrorCode) { - ZEN_WARN("Failed to load oplog container, missing attachment {} ({}). Reason: '{}'", - BlockHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - if (!IgnoreMissingAttachments) + ReportMessage(OptionalContext, + fmt::format("Failed to download block attachment {} ({}): '{}'", + BlockHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + if (IgnoreMissingAttachments) + { + Info.MissingAttachmentCount.fetch_add(1); + } + else { RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); } return; } - ZEN_DEBUG("Loaded block attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000))); + Info.AttachmentBlocksDownloaded.fetch_add(1); + ZEN_INFO("Loaded block attachment '{}' in {}", + BlockHash, + NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000))); + if (RemoteResult.IsError()) + { + return; + } - if (!IterateBlock(std::move(BlockResult.Bytes), [&ChunkStore](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { - ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash); - })) + bool StoreChunksOK = + IterateBlock(std::move(BlockResult.Bytes), [&ChunkStore, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { + uint64_t ChunkSize = Chunk.GetCompressedSize(); + Info.AttachmentBlockBytesDownloaded.fetch_add(ChunkSize); + CidStore::InsertResult InsertResult = + ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash); + if (InsertResult.New) + { + Info.AttachmentBytesStored.fetch_add(ChunkSize); + Info.AttachmentsStored.fetch_add(1); + } + }); + + if (!StoreChunksOK) { + ReportMessage(OptionalContext, + fmt::format("Block attachment {} has invalid format ({}): '{}'", + BlockHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), fmt::format("Invalid format for block {}", BlockHash), {}); - ZEN_WARN("Failed to load oplog container, attachment {} has invalid format ({}). Reason: '{}'", - BlockHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); return; } }); @@ -1640,44 +1749,74 @@ LoadOplog(CidStore& ChunkStore, &RemoteResult, &Attachments, &AttachmentCount, - IgnoreMissingAttachments](const IoHash& RawHash) { + &Info, + IgnoreMissingAttachments, + OptionalContext](const IoHash& RawHash) { if (!Attachments.insert(RawHash).second) { return; } + if (RemoteResult.IsError()) + { + return; + } AttachmentsWorkLatch.AddCount(1); AttachmentCount.fetch_add(1); - WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash, IgnoreMissingAttachments]() { - auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); - if (AttachmentResult.ErrorCode) - { - ZEN_WARN("Failed to download attachment {}, reason: '{}', error code: {}", + WorkerPool.ScheduleWork( + [&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash, &Info, IgnoreMissingAttachments, OptionalContext]() { + auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); + if (AttachmentResult.ErrorCode) + { + ReportMessage(OptionalContext, + fmt::format("Failed to download large attachment {}: '{}', error code : {}", + RawHash, + AttachmentResult.Reason, + AttachmentResult.ErrorCode)); + if (IgnoreMissingAttachments) + { + Info.MissingAttachmentCount.fetch_add(1); + } + else + { + RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); + } + return; + } + ZEN_INFO("Loaded large attachment '{}' in {}", RawHash, - AttachmentResult.Reason, - AttachmentResult.ErrorCode); - if (!IgnoreMissingAttachments) + NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000))); + Info.AttachmentsDownloaded.fetch_add(1); + if (RemoteResult.IsError()) { - RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); + return; } - return; - } - ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000))); - ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash); - }); + uint64_t ChunkSize = AttachmentResult.Bytes.GetSize(); + Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); + CidStore::InsertResult InsertResult = ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash); + if (InsertResult.New) + { + Info.AttachmentBytesStored.fetch_add(ChunkSize); + Info.AttachmentsStored.fetch_add(1); + } + }); }; RemoteProjectStore::Result Result = SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, OptionalContext); - if (!Attachments.empty()) + if (Result.ErrorCode != 0) { - ReportMessage(OptionalContext, fmt::format("Found {} attachments to download", Attachments.size())); + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); } + ReportMessage(OptionalContext, + fmt::format("Loaded oplog container in {}, found {} attachments to download", + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), + Attachments.size())); AttachmentsWorkLatch.CountDown(); while (!AttachmentsWorkLatch.Wait(1000)) @@ -1703,10 +1842,17 @@ LoadOplog(CidStore& ChunkStore, Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; ReportMessage(OptionalContext, - fmt::format("Loaded oplog {} in {}", + fmt::format("Loaded oplog {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {}", RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)))); - + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), + NiceBytes(Info.OplogSizeBytes), + Info.AttachmentBlocksDownloaded.load(), + NiceBytes(Info.AttachmentBlockBytesDownloaded.load()), + Info.AttachmentsDownloaded.load(), + NiceBytes(Info.AttachmentBytesDownloaded.load()), + NiceBytes(Info.AttachmentsStored.load()), + NiceBytes(Info.AttachmentBytesStored.load()), + Info.MissingAttachmentCount.load())); return Result; } |