diff options
| author | Dan Engelbrecht <[email protected]> | 2024-01-22 13:38:44 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-01-22 13:38:44 +0100 |
| commit | 295b3312a0bb37cf68985454a2a1bd6d7a27b2d4 (patch) | |
| tree | 3917ccb145812668398e203e7f5c772836a0cd73 /src | |
| parent | improved errors from jupiter upstream (#636) (diff) | |
| download | zen-295b3312a0bb37cf68985454a2a1bd6d7a27b2d4.tar.xz zen-295b3312a0bb37cf68985454a2a1bd6d7a27b2d4.zip | |
add --ignore-missing-attachments to oplog-import command (#637)
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/projectstore_cmd.cpp | 10 | ||||
| -rw-r--r-- | src/zen/cmds/projectstore_cmd.h | 9 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 38 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 93 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.h | 1 |
5 files changed, 98 insertions, 53 deletions
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp index 723b45cda..9ef38a924 100644 --- a/src/zen/cmds/projectstore_cmd.cpp +++ b/src/zen/cmds/projectstore_cmd.cpp @@ -956,6 +956,12 @@ ImportOplogCommand::ImportOplogCommand() "<chunksize>"); m_Options.add_option("", "f", "force", "Force import of all attachments", cxxopts::value(m_Force), "<force>"); m_Options.add_option("", "a", "async", "Trigger import but don't wait for completion", cxxopts::value(m_Async), "<async>"); + m_Options.add_option("", + "", + "ignore-missing-attachments", + "Continue importing oplog even if attachments are missing", + cxxopts::value(m_IgnoreMissingAttachments), + "<ignore>"); m_Options.add_option("", "", "cloud", "Cloud Storage URL", cxxopts::value(m_CloudUrl), "<url>"); m_Options.add_option("cloud", "", "namespace", "Cloud Storage namespace", cxxopts::value(m_CloudNamespace), "<namespace>"); @@ -1122,6 +1128,10 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg { Writer.AddBool("force"sv, true); } + if (m_IgnoreMissingAttachments) + { + Writer.AddBool("ignoremissingattachments"sv, true); + } if (!m_FileDirectoryPath.empty()) { Writer.BeginObject("file"sv); diff --git a/src/zen/cmds/projectstore_cmd.h b/src/zen/cmds/projectstore_cmd.h index 8891fdaf4..3a7aee609 100644 --- a/src/zen/cmds/projectstore_cmd.h +++ b/src/zen/cmds/projectstore_cmd.h @@ -170,10 +170,11 @@ private: std::string m_HostName; std::string m_ProjectName; std::string m_OplogName; - size_t m_MaxBlockSize = 0; - size_t m_MaxChunkEmbedSize = 0; - bool m_Force = false; - bool m_Async = false; + size_t m_MaxBlockSize = 0; + size_t m_MaxChunkEmbedSize = 0; + bool m_Force = false; + bool m_Async = false; + bool m_IgnoreMissingAttachments = false; std::string m_CloudUrl; std::string m_CloudNamespace; diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 038a6db47..e37fb26f4 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -3171,9 +3171,10 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, using namespace std::literals; - size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u); - size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u); - bool Force = Params["force"sv].AsBool(false); + size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u); + size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u); + bool Force = Params["force"sv].AsBool(false); + bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false); CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath()); @@ -3185,18 +3186,25 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description); - JobId JobId = m_JobQueue.QueueJob( - fmt::format("Import oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description), - [this, ActualRemoteStore = std::move(RemoteStore), OplogPtr = &Oplog, MaxBlockSize, MaxChunkEmbedSize, Force](JobContext& Context) { - RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, &Context); - auto Response = ConvertResult(Result); - ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); - if (!IsHttpSuccessCode(Response.first)) - { - throw std::runtime_error( - fmt::format("Import failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second)); - } - }); + JobId JobId = + m_JobQueue.QueueJob(fmt::format("Import oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description), + [this, + ActualRemoteStore = std::move(RemoteStore), + OplogPtr = &Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + Force, + IgnoreMissingAttachments](JobContext& Context) { + RemoteProjectStore::Result Result = + LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, IgnoreMissingAttachments, &Context); + auto Response = ConvertResult(Result); + ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); + if (!IsHttpSuccessCode(Response.first)) + { + throw std::runtime_error( + fmt::format("Import failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second)); + } + }); return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)}; } diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index 826c8ff51..a8f4c5106 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -1517,6 +1517,7 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload, + bool IgnoreMissingAttachments, JobContext* OptionalContext) { using namespace std::literals; @@ -1547,43 +1548,55 @@ LoadOplog(CidStore& ChunkStore, auto HasAttachment = [&ChunkStore, ForceDownload](const IoHash& RawHash) { return !ForceDownload && ChunkStore.ContainsChunk(RawHash); }; - auto OnNeedBlock = [&RemoteStore, &ChunkStore, &WorkerPool, &ChunksInBlocks, &AttachmentsWorkLatch, &AttachmentCount, &RemoteResult]( - const IoHash& BlockHash, - std::vector<IoHash>&& Chunks) { + auto OnNeedBlock = [&RemoteStore, + &ChunkStore, + &WorkerPool, + &ChunksInBlocks, + &AttachmentsWorkLatch, + &AttachmentCount, + &RemoteResult, + IgnoreMissingAttachments](const IoHash& BlockHash, std::vector<IoHash>&& Chunks) { + if (RemoteResult.IsError()) + { + return; + } if (BlockHash == IoHash::Zero) { AttachmentsWorkLatch.AddCount(1); AttachmentCount.fetch_add(1); - WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &AttachmentsWorkLatch, &RemoteResult, Chunks = std::move(Chunks)]() { - auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - - RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_WARN("Failed to load attachments with {} chunks ({}). Reason: '{}'", - Chunks.size(), - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - return; - } - ZEN_DEBUG("Loaded {} bulk attachments in {}", - Chunks.size(), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); - for (const auto& It : Result.Chunks) - { - ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), It.first, CidStore::InsertMode::kCopyOnly); - } - }); + WorkerPool.ScheduleWork( + [&RemoteStore, &ChunkStore, &AttachmentsWorkLatch, &RemoteResult, Chunks = std::move(Chunks), IgnoreMissingAttachments]() { + auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); + if (Result.ErrorCode) + { + ZEN_WARN("Failed to load attachments with {} chunks ({}). Reason: '{}'", + Chunks.size(), + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + } + return; + } + ZEN_DEBUG("Loaded {} bulk attachments in {}", + Chunks.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + for (const auto& It : Result.Chunks) + { + ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), It.first, CidStore::InsertMode::kCopyOnly); + } + }); return; } AttachmentsWorkLatch.AddCount(1); AttachmentCount.fetch_add(1); - WorkerPool.ScheduleWork([&AttachmentsWorkLatch, &ChunkStore, &RemoteStore, BlockHash, &RemoteResult]() { + WorkerPool.ScheduleWork([&AttachmentsWorkLatch, &ChunkStore, &RemoteStore, BlockHash, &RemoteResult, IgnoreMissingAttachments]() { auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -1592,11 +1605,14 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash); if (BlockResult.ErrorCode) { - RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); ZEN_WARN("Failed to load oplog container, missing attachment {} ({}). Reason: '{}'", BlockHash, RemoteResult.GetError(), RemoteResult.GetErrorReason()); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); + } return; } ZEN_DEBUG("Loaded block attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000))); @@ -1617,8 +1633,14 @@ LoadOplog(CidStore& ChunkStore, }); }; - auto OnNeedAttachment = [&RemoteStore, &ChunkStore, &WorkerPool, &AttachmentsWorkLatch, &RemoteResult, &Attachments, &AttachmentCount]( - const IoHash& RawHash) { + auto OnNeedAttachment = [&RemoteStore, + &ChunkStore, + &WorkerPool, + &AttachmentsWorkLatch, + &RemoteResult, + &Attachments, + &AttachmentCount, + IgnoreMissingAttachments](const IoHash& RawHash) { if (!Attachments.insert(RawHash).second) { return; @@ -1626,7 +1648,7 @@ LoadOplog(CidStore& ChunkStore, AttachmentsWorkLatch.AddCount(1); AttachmentCount.fetch_add(1); - WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash]() { + WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash, IgnoreMissingAttachments]() { auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -1635,11 +1657,14 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); if (AttachmentResult.ErrorCode) { - RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); ZEN_WARN("Failed to download attachment {}, reason: '{}', error code: {}", RawHash, AttachmentResult.Reason, AttachmentResult.ErrorCode); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); + } return; } ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000))); diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h index be086084c..bb6a11501 100644 --- a/src/zenserver/projectstore/remoteprojectstore.h +++ b/src/zenserver/projectstore/remoteprojectstore.h @@ -124,6 +124,7 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload, + bool IgnoreMissingAttachments, JobContext* OptionalContext); CompressedBuffer GenerateBlock(std::vector<SharedBuffer>&& Chunks); |