From ff8d2e7c432c58114b528bfc9670eba7e387843c Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 23 Jan 2024 10:21:03 +0100 Subject: oplog import/export improvements (#634) * improve feedback from oplog import/export * improve oplog save performance --- src/zenserver/projectstore/projectstore.cpp | 116 ++++++++++++++-------------- 1 file changed, 57 insertions(+), 59 deletions(-) (limited to 'src/zenserver/projectstore/projectstore.cpp') diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index e37fb26f4..42af9b79b 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -228,7 +228,7 @@ namespace { return {static_cast(Result.ErrorCode), Result.Reason.empty() ? Result.Text : Result.Text.empty() ? Result.Reason - : fmt::format("{}. Reason: '{}'", Result.Text, Result.Reason)}; + : fmt::format("{}: {}", Result.Reason, Result.Text)}; } } // namespace @@ -448,7 +448,7 @@ struct ProjectStore::OplogStorage : public RefCounted return CbObject(SharedBuffer(std::move(OpBuffer))); } - OplogEntry AppendOp(SharedBuffer Buffer, uint32_t OpCoreHash, Oid KeyHash) + OplogEntry AppendOp(MemoryView Buffer, uint32_t OpCoreHash, Oid KeyHash) { ZEN_TRACE_CPU("Store::OplogStorage::AppendOp"); @@ -765,7 +765,7 @@ ProjectStore::Oplog::ReplayLog() } IoBuffer -ProjectStore::Oplog::FindChunk(Oid ChunkId) +ProjectStore::Oplog::FindChunk(const Oid& ChunkId) { RwLock::SharedLockScope OplogLock(m_OplogLock); if (!m_Storage) @@ -1044,8 +1044,8 @@ ProjectStore::Oplog::AddChunkMappings(const std::unordered_map Project, ProjectStore::Oplog& Op NiceBytes(MaxBlockSize), NiceBytes(MaxChunkEmbedSize)); - JobId JobId = - m_JobQueue.QueueJob(fmt::format("Export oplog '{}/{}' to {}", Project->Identifier, Oplog.OplogId(), StoreInfo.Description), - [this, - ActualRemoteStore = std::move(RemoteStore), - Project, - OplogPtr = &Oplog, - MaxBlockSize, - MaxChunkEmbedSize, - EmbedLooseFile, - CreateBlocks = StoreInfo.CreateBlocks, - UseTempBlockFiles = StoreInfo.UseTempBlockFiles, - Force](JobContext& Context) { - RemoteProjectStore::Result Result = SaveOplog(m_CidStore, - *ActualRemoteStore, - *Project.Get(), - *OplogPtr, - MaxBlockSize, - MaxChunkEmbedSize, - EmbedLooseFile, - CreateBlocks, - UseTempBlockFiles, - Force, - &Context); - auto Response = ConvertResult(Result); - ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second); - if (!IsHttpSuccessCode(Response.first)) - { - throw std::runtime_error( - fmt::format("Export failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second)); - } - }); + JobId JobId = m_JobQueue.QueueJob( + fmt::format("Export oplog '{}/{}' to {}", Project->Identifier, Oplog.OplogId(), StoreInfo.Description), + [this, + ActualRemoteStore = std::move(RemoteStore), + Project, + OplogPtr = &Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + EmbedLooseFile, + CreateBlocks = StoreInfo.CreateBlocks, + UseTempBlockFiles = StoreInfo.UseTempBlockFiles, + Force](JobContext& Context) { + RemoteProjectStore::Result Result = SaveOplog(m_CidStore, + *ActualRemoteStore, + *Project.Get(), + *OplogPtr, + MaxBlockSize, + MaxChunkEmbedSize, + EmbedLooseFile, + CreateBlocks, + UseTempBlockFiles, + Force, + &Context); + auto Response = ConvertResult(Result); + ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second); + if (!IsHttpSuccessCode(Response.first)) + { + throw std::runtime_error(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second); + } + }); return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)}; } @@ -3186,25 +3185,24 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description); - JobId JobId = - m_JobQueue.QueueJob(fmt::format("Import oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description), - [this, - ActualRemoteStore = std::move(RemoteStore), - OplogPtr = &Oplog, - MaxBlockSize, - MaxChunkEmbedSize, - Force, - IgnoreMissingAttachments](JobContext& Context) { - RemoteProjectStore::Result Result = - LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, IgnoreMissingAttachments, &Context); - auto Response = ConvertResult(Result); - ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); - if (!IsHttpSuccessCode(Response.first)) - { - throw std::runtime_error( - fmt::format("Import failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second)); - } - }); + JobId JobId = m_JobQueue.QueueJob( + fmt::format("Import oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description), + [this, + ActualRemoteStore = std::move(RemoteStore), + OplogPtr = &Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + Force, + IgnoreMissingAttachments](JobContext& Context) { + RemoteProjectStore::Result Result = + LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, IgnoreMissingAttachments, &Context); + auto Response = ConvertResult(Result); + ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); + if (!IsHttpSuccessCode(Response.first)) + { + throw std::runtime_error(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second); + } + }); return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)}; } -- cgit v1.2.3