aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-01-22 13:38:44 +0100
committerGitHub <[email protected]>2024-01-22 13:38:44 +0100
commit295b3312a0bb37cf68985454a2a1bd6d7a27b2d4 (patch)
tree3917ccb145812668398e203e7f5c772836a0cd73 /src
parentimproved errors from jupiter upstream (#636) (diff)
downloadzen-295b3312a0bb37cf68985454a2a1bd6d7a27b2d4.tar.xz
zen-295b3312a0bb37cf68985454a2a1bd6d7a27b2d4.zip
add --ignore-missing-attachments to oplog-import command (#637)
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/projectstore_cmd.cpp10
-rw-r--r--src/zen/cmds/projectstore_cmd.h9
-rw-r--r--src/zenserver/projectstore/projectstore.cpp38
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp93
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.h1
5 files changed, 98 insertions, 53 deletions
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp
index 723b45cda..9ef38a924 100644
--- a/src/zen/cmds/projectstore_cmd.cpp
+++ b/src/zen/cmds/projectstore_cmd.cpp
@@ -956,6 +956,12 @@ ImportOplogCommand::ImportOplogCommand()
"<chunksize>");
m_Options.add_option("", "f", "force", "Force import of all attachments", cxxopts::value(m_Force), "<force>");
m_Options.add_option("", "a", "async", "Trigger import but don't wait for completion", cxxopts::value(m_Async), "<async>");
+ m_Options.add_option("",
+ "",
+ "ignore-missing-attachments",
+ "Continue importing oplog even if attachments are missing",
+ cxxopts::value(m_IgnoreMissingAttachments),
+ "<ignore>");
m_Options.add_option("", "", "cloud", "Cloud Storage URL", cxxopts::value(m_CloudUrl), "<url>");
m_Options.add_option("cloud", "", "namespace", "Cloud Storage namespace", cxxopts::value(m_CloudNamespace), "<namespace>");
@@ -1122,6 +1128,10 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
{
Writer.AddBool("force"sv, true);
}
+ if (m_IgnoreMissingAttachments)
+ {
+ Writer.AddBool("ignoremissingattachments"sv, true);
+ }
if (!m_FileDirectoryPath.empty())
{
Writer.BeginObject("file"sv);
diff --git a/src/zen/cmds/projectstore_cmd.h b/src/zen/cmds/projectstore_cmd.h
index 8891fdaf4..3a7aee609 100644
--- a/src/zen/cmds/projectstore_cmd.h
+++ b/src/zen/cmds/projectstore_cmd.h
@@ -170,10 +170,11 @@ private:
std::string m_HostName;
std::string m_ProjectName;
std::string m_OplogName;
- size_t m_MaxBlockSize = 0;
- size_t m_MaxChunkEmbedSize = 0;
- bool m_Force = false;
- bool m_Async = false;
+ size_t m_MaxBlockSize = 0;
+ size_t m_MaxChunkEmbedSize = 0;
+ bool m_Force = false;
+ bool m_Async = false;
+ bool m_IgnoreMissingAttachments = false;
std::string m_CloudUrl;
std::string m_CloudNamespace;
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 038a6db47..e37fb26f4 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -3171,9 +3171,10 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog,
using namespace std::literals;
- size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u);
- size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u);
- bool Force = Params["force"sv].AsBool(false);
+ size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u);
+ size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u);
+ bool Force = Params["force"sv].AsBool(false);
+ bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false);
CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath());
@@ -3185,18 +3186,25 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog,
RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description);
- JobId JobId = m_JobQueue.QueueJob(
- fmt::format("Import oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description),
- [this, ActualRemoteStore = std::move(RemoteStore), OplogPtr = &Oplog, MaxBlockSize, MaxChunkEmbedSize, Force](JobContext& Context) {
- RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, &Context);
- auto Response = ConvertResult(Result);
- ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second);
- if (!IsHttpSuccessCode(Response.first))
- {
- throw std::runtime_error(
- fmt::format("Import failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second));
- }
- });
+ JobId JobId =
+ m_JobQueue.QueueJob(fmt::format("Import oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description),
+ [this,
+ ActualRemoteStore = std::move(RemoteStore),
+ OplogPtr = &Oplog,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ Force,
+ IgnoreMissingAttachments](JobContext& Context) {
+ RemoteProjectStore::Result Result =
+ LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, IgnoreMissingAttachments, &Context);
+ auto Response = ConvertResult(Result);
+ ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second);
+ if (!IsHttpSuccessCode(Response.first))
+ {
+ throw std::runtime_error(
+ fmt::format("Import failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second));
+ }
+ });
return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)};
}
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index 826c8ff51..a8f4c5106 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -1517,6 +1517,7 @@ LoadOplog(CidStore& ChunkStore,
RemoteProjectStore& RemoteStore,
ProjectStore::Oplog& Oplog,
bool ForceDownload,
+ bool IgnoreMissingAttachments,
JobContext* OptionalContext)
{
using namespace std::literals;
@@ -1547,43 +1548,55 @@ LoadOplog(CidStore& ChunkStore,
auto HasAttachment = [&ChunkStore, ForceDownload](const IoHash& RawHash) {
return !ForceDownload && ChunkStore.ContainsChunk(RawHash);
};
- auto OnNeedBlock = [&RemoteStore, &ChunkStore, &WorkerPool, &ChunksInBlocks, &AttachmentsWorkLatch, &AttachmentCount, &RemoteResult](
- const IoHash& BlockHash,
- std::vector<IoHash>&& Chunks) {
+ auto OnNeedBlock = [&RemoteStore,
+ &ChunkStore,
+ &WorkerPool,
+ &ChunksInBlocks,
+ &AttachmentsWorkLatch,
+ &AttachmentCount,
+ &RemoteResult,
+ IgnoreMissingAttachments](const IoHash& BlockHash, std::vector<IoHash>&& Chunks) {
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
if (BlockHash == IoHash::Zero)
{
AttachmentsWorkLatch.AddCount(1);
AttachmentCount.fetch_add(1);
- WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &AttachmentsWorkLatch, &RemoteResult, Chunks = std::move(Chunks)]() {
- auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
-
- RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks);
- if (Result.ErrorCode)
- {
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ZEN_WARN("Failed to load attachments with {} chunks ({}). Reason: '{}'",
- Chunks.size(),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason());
- return;
- }
- ZEN_DEBUG("Loaded {} bulk attachments in {}",
- Chunks.size(),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
- for (const auto& It : Result.Chunks)
- {
- ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), It.first, CidStore::InsertMode::kCopyOnly);
- }
- });
+ WorkerPool.ScheduleWork(
+ [&RemoteStore, &ChunkStore, &AttachmentsWorkLatch, &RemoteResult, Chunks = std::move(Chunks), IgnoreMissingAttachments]() {
+ auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks);
+ if (Result.ErrorCode)
+ {
+ ZEN_WARN("Failed to load attachments with {} chunks ({}). Reason: '{}'",
+ Chunks.size(),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ }
+ return;
+ }
+ ZEN_DEBUG("Loaded {} bulk attachments in {}",
+ Chunks.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ for (const auto& It : Result.Chunks)
+ {
+ ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), It.first, CidStore::InsertMode::kCopyOnly);
+ }
+ });
return;
}
AttachmentsWorkLatch.AddCount(1);
AttachmentCount.fetch_add(1);
- WorkerPool.ScheduleWork([&AttachmentsWorkLatch, &ChunkStore, &RemoteStore, BlockHash, &RemoteResult]() {
+ WorkerPool.ScheduleWork([&AttachmentsWorkLatch, &ChunkStore, &RemoteStore, BlockHash, &RemoteResult, IgnoreMissingAttachments]() {
auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -1592,11 +1605,14 @@ LoadOplog(CidStore& ChunkStore,
RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash);
if (BlockResult.ErrorCode)
{
- RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
ZEN_WARN("Failed to load oplog container, missing attachment {} ({}). Reason: '{}'",
BlockHash,
RemoteResult.GetError(),
RemoteResult.GetErrorReason());
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
+ }
return;
}
ZEN_DEBUG("Loaded block attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)));
@@ -1617,8 +1633,14 @@ LoadOplog(CidStore& ChunkStore,
});
};
- auto OnNeedAttachment = [&RemoteStore, &ChunkStore, &WorkerPool, &AttachmentsWorkLatch, &RemoteResult, &Attachments, &AttachmentCount](
- const IoHash& RawHash) {
+ auto OnNeedAttachment = [&RemoteStore,
+ &ChunkStore,
+ &WorkerPool,
+ &AttachmentsWorkLatch,
+ &RemoteResult,
+ &Attachments,
+ &AttachmentCount,
+ IgnoreMissingAttachments](const IoHash& RawHash) {
if (!Attachments.insert(RawHash).second)
{
return;
@@ -1626,7 +1648,7 @@ LoadOplog(CidStore& ChunkStore,
AttachmentsWorkLatch.AddCount(1);
AttachmentCount.fetch_add(1);
- WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash]() {
+ WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash, IgnoreMissingAttachments]() {
auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -1635,11 +1657,14 @@ LoadOplog(CidStore& ChunkStore,
RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
if (AttachmentResult.ErrorCode)
{
- RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
ZEN_WARN("Failed to download attachment {}, reason: '{}', error code: {}",
RawHash,
AttachmentResult.Reason,
AttachmentResult.ErrorCode);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
+ }
return;
}
ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)));
diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h
index be086084c..bb6a11501 100644
--- a/src/zenserver/projectstore/remoteprojectstore.h
+++ b/src/zenserver/projectstore/remoteprojectstore.h
@@ -124,6 +124,7 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore,
RemoteProjectStore& RemoteStore,
ProjectStore::Oplog& Oplog,
bool ForceDownload,
+ bool IgnoreMissingAttachments,
JobContext* OptionalContext);
CompressedBuffer GenerateBlock(std::vector<SharedBuffer>&& Chunks);