diff options
| author | Dan Engelbrecht <[email protected]> | 2024-01-25 14:00:42 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-01-25 14:00:42 +0100 |
| commit | bccde0a980b38d9e2291f75ef932a978e78774b4 (patch) | |
| tree | ca48ab1536d0f7a3cf9aa8743dda35abc163d553 | |
| parent | 0.2.39-pre2 (diff) | |
| download | zen-bccde0a980b38d9e2291f75ef932a978e78774b4.tar.xz zen-bccde0a980b38d9e2291f75ef932a978e78774b4.zip | |
add ignore-missing-attachments option to oplog export (debugging tool) (#641)
* add ignore-missing-attachments option to oplog export (debugging tool)
* add more status codes to do retry for in http client
* add missing X-Jupiter-IoHash header for jupiter PutRef
* reduce oplog block size to reduce amount of redundant chunks to download
* improved logging
| -rw-r--r-- | CHANGELOG.md | 2 | ||||
| -rw-r--r-- | src/zen/cmds/projectstore_cmd.cpp | 10 | ||||
| -rw-r--r-- | src/zen/cmds/projectstore_cmd.h | 5 | ||||
| -rw-r--r-- | src/zenhttp/httpclient.cpp | 5 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 26 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 55 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.h | 2 | ||||
| -rw-r--r-- | src/zenserver/upstream/jupiter.cpp | 4 |
8 files changed, 70 insertions, 39 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index de449e164..3b336ffd1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## - Feature: add `--ignore-missing-attachments` to `oplog-import` command +- Feature: add `--ignore-missing-attachments` to `oplog-export` command - Improvement: Removed use of <random> in stats, for better performance (runtime as well as build) - Improvement: Separated cache RPC handling code from general structured cache HTTP code - Improvement: Get more detailed information on Jupiter upstream errors @@ -10,6 +11,7 @@ - Improvement: Added authentication support to HttpClient - Improvement: Clearer logging in GCV2 compact of FileCas/BlockStore - Improvement: Size details in oplog import logging +- Improvement: Reduce oplog block size to 64MB to reduce amount of redundant chunks to download - Bugfix: RPC recording would not release memory as early as intended which resulted in memory buildup during long recording sessions. Previously certain memory was only released when recording stopped, now it gets released immediately when a segment is complete and written to disk. - Bugfix: File log format now contains dates again (PR #631) - Bugfix: Jobqueue - Allow multiple threads to report progress/messages (oplog import/export) diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp index 9ef38a924..510e700d3 100644 --- a/src/zen/cmds/projectstore_cmd.cpp +++ b/src/zen/cmds/projectstore_cmd.cpp @@ -599,6 +599,12 @@ ExportOplogCommand::ExportOplogCommand() m_Options.add_option("", "f", "force", "Force export of all attachments", cxxopts::value(m_Force), "<force>"); m_Options.add_option("", "", + "ignore-missing-attachments", + "Continue importing oplog even if attachments are missing", + cxxopts::value(m_IgnoreMissingAttachments), + "<ignore>"); + m_Options.add_option("", + "", "disableblocks", "Disable block creation and save all attachments individually (applies to file and cloud target)", cxxopts::value(m_DisableBlocks), @@ -816,6 +822,10 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg { Writer.AddBool("force"sv, true); } + if (m_IgnoreMissingAttachments) + { + Writer.AddBool("ignoremissingattachments"sv, true); + } Writer.AddBool("async"sv, true); if (!m_FileDirectoryPath.empty()) { diff --git a/src/zen/cmds/projectstore_cmd.h b/src/zen/cmds/projectstore_cmd.h index 3a7aee609..d7a4ef4d8 100644 --- a/src/zen/cmds/projectstore_cmd.h +++ b/src/zen/cmds/projectstore_cmd.h @@ -141,8 +141,9 @@ private: std::string m_CloudOpenIdProvider; std::string m_CloudAccessToken; std::string m_CloudAccessTokenEnv; - bool m_CloudAssumeHttp2 = false; - bool m_CloudDisableTempBlocks = false; + bool m_CloudAssumeHttp2 = false; + bool m_CloudDisableTempBlocks = false; + bool m_IgnoreMissingAttachments = false; std::string m_ZenUrl; std::string m_ZenProjectName; diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index 8182ac68f..07f631dac 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -140,8 +140,11 @@ ShouldRetry(const cpr::Response& Response) } switch ((HttpResponseCode)Response.status_code) { - case HttpResponseCode::GatewayTimeout: case HttpResponseCode::RequestTimeout: + case HttpResponseCode::TooManyRequests: + case HttpResponseCode::InternalServerError: + case HttpResponseCode::ServiceUnavailable: + case HttpResponseCode::GatewayTimeout: return true; default: return false; diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index f117a4203..caf405066 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -2755,7 +2755,8 @@ ProjectStore::ReadOplog(const std::string_view ProjectId, *Oplog, MaxBlockSize, MaxChunkEmbedSize, - false, + /* BuildBlocks */ false, + /* IgnoreMissingAttachemnts */ false, [](CompressedBuffer&&, const IoHash) {}, [](const IoHash&) {}, [](const std::unordered_set<IoHash, IoHash::Hasher>) {}, @@ -3108,10 +3109,11 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op using namespace std::literals; - size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u); - size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u); - bool Force = Params["force"sv].AsBool(false); - bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false); + size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(64u * 1024u * 1024u); + size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u); + bool Force = Params["force"sv].AsBool(false); + bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false); + bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false); CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath()); @@ -3140,7 +3142,8 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op EmbedLooseFile, CreateBlocks = StoreInfo.CreateBlocks, UseTempBlockFiles = StoreInfo.UseTempBlockFiles, - Force](JobContext& Context) { + Force, + IgnoreMissingAttachments](JobContext& Context) { RemoteProjectStore::Result Result = SaveOplog(m_CidStore, *ActualRemoteStore, *Project.Get(), @@ -3151,6 +3154,7 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op CreateBlocks, UseTempBlockFiles, Force, + IgnoreMissingAttachments, &Context); auto Response = ConvertResult(Result); ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second); @@ -3170,7 +3174,7 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, using namespace std::literals; - size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u); + size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(64u * 1024u * 1024u); size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u); bool Force = Params["force"sv].AsBool(false); bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false); @@ -3187,13 +3191,7 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description); JobId JobId = m_JobQueue.QueueJob( fmt::format("Import oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description), - [this, - ActualRemoteStore = std::move(RemoteStore), - OplogPtr = &Oplog, - MaxBlockSize, - MaxChunkEmbedSize, - Force, - IgnoreMissingAttachments](JobContext& Context) { + [this, ActualRemoteStore = std::move(RemoteStore), OplogPtr = &Oplog, Force, IgnoreMissingAttachments](JobContext& Context) { RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, IgnoreMissingAttachments, &Context); auto Response = ConvertResult(Result); diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index 83cec4725..099b9ceb3 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -230,6 +230,7 @@ BuildContainer(CidStore& ChunkStore, size_t MaxBlockSize, size_t MaxChunkEmbedSize, bool BuildBlocks, + bool IgnoreMissingAttachments, const std::vector<Block>& KnownBlocks, WorkerThreadPool& WorkerPool, const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, @@ -399,7 +400,7 @@ BuildContainer(CidStore& ChunkStore, CB(RewrittenOp); }; - ReportMessage(OptionalContext, "Building exported oplog and fetching attachments"); + ReportMessage(OptionalContext, "Building exported oplog and collecting attachments"); tsl::robin_map<int, std::string> OpLSNToKey; @@ -558,17 +559,22 @@ BuildContainer(CidStore& ChunkStore, Sb.Append("' for op: \n"); Op.value().ToJson(Sb); - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {}); - ReportMessage(OptionalContext, - fmt::format("Failed to build container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", AttachmentHash, Sb.ToView())); - BlockCreateLatch.CountDown(); - while (!BlockCreateLatch.Wait(1000)) + if (IgnoreMissingAttachments) { - ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining()); + continue; + } + else + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {}); + BlockCreateLatch.CountDown(); + while (!BlockCreateLatch.Wait(1000)) + { + ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining()); + } + return {}; } - - return {}; } uint64_t PayloadSize = Payload.GetSize(); @@ -798,6 +804,7 @@ BuildContainer(CidStore& ChunkStore, size_t MaxBlockSize, size_t MaxChunkEmbedSize, bool BuildBlocks, + bool IgnoreMissingAttachments, const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, const std::function<void(const IoHash&)>& OnLargeAttachment, const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks, @@ -812,6 +819,7 @@ BuildContainer(CidStore& ChunkStore, MaxBlockSize, MaxChunkEmbedSize, BuildBlocks, + IgnoreMissingAttachments, {}, WorkerPool, AsyncOnBlock, @@ -983,10 +991,10 @@ UploadAttachments(WorkerThreadPool& WorkerPool, RemoteResult.GetErrorReason())); return; } - ZEN_DEBUG("Saved attachment {}, {} in {}", - RawHash, - NiceBytes(Payload.GetSize()), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + ZEN_INFO("Saved large attachment '{}' in {} ({})", + RawHash, + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), + NiceBytes(Payload.GetSize())); return; }); } @@ -1037,10 +1045,10 @@ UploadAttachments(WorkerThreadPool& WorkerPool, return; } - ZEN_DEBUG("Saved attachment {}, {} in {}", - RawHash, - NiceBytes(Payload.GetSize()), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + ZEN_INFO("Saved block attachment '{}' in {} ({})", + RawHash, + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), + NiceBytes(Payload.GetSize())); return; }); } @@ -1085,7 +1093,8 @@ UploadAttachments(WorkerThreadPool& WorkerPool, NeededChunks = std::move(NeededChunks), &BulkAttachmentCountToUpload, OptionalContext]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + size_t ChunksSize = 0; std::vector<SharedBuffer> ChunkBuffers; ChunkBuffers.reserve(NeededChunks.size()); for (const IoHash& Chunk : NeededChunks) @@ -1099,6 +1108,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool, ChunkBuffers.clear(); break; } + ChunksSize += ChunkPayload.GetSize(); ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload))); } RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); @@ -1112,9 +1122,10 @@ UploadAttachments(WorkerThreadPool& WorkerPool, RemoteResult.GetErrorReason())); return; } - ZEN_DEBUG("Saved {} bulk attachments in {}", - Chunks.size(), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + ZEN_INFO("Saved {} bulk attachments in {} ({})", + Chunks.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), + NiceBytes(ChunksSize)); BulkAttachmentCountToUpload.fetch_sub(Chunks.size()); }); } @@ -1153,6 +1164,7 @@ SaveOplog(CidStore& ChunkStore, bool BuildBlocks, bool UseTempBlocks, bool ForceUpload, + bool IgnoreMissingAttachments, JobContext* OptionalContext) { using namespace std::literals; @@ -1307,6 +1319,7 @@ SaveOplog(CidStore& ChunkStore, MaxBlockSize, MaxChunkEmbedSize, BuildBlocks, + IgnoreMissingAttachments, KnownBlocks, WorkerPool, OnBlock, diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h index bb6a11501..f4df78f8c 100644 --- a/src/zenserver/projectstore/remoteprojectstore.h +++ b/src/zenserver/projectstore/remoteprojectstore.h @@ -93,6 +93,7 @@ RemoteProjectStore::LoadContainerResult BuildContainer( size_t MaxBlockSize, size_t MaxChunkEmbedSize, bool BuildBlocks, + bool IgnoreMissingAttachments, const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, const std::function<void(const IoHash&)>& OnLargeAttachment, const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks, @@ -118,6 +119,7 @@ RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, bool BuildBlocks, bool UseTempBlocks, bool ForceUpload, + bool IgnoreMissingAttachments, JobContext* OptionalContext); RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, diff --git a/src/zenserver/upstream/jupiter.cpp b/src/zenserver/upstream/jupiter.cpp index bf2538908..bfe797712 100644 --- a/src/zenserver/upstream/jupiter.cpp +++ b/src/zenserver/upstream/jupiter.cpp @@ -159,7 +159,9 @@ CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); HttpClient::Response Response = - m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), Ref); + m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), + Ref, + {{"X-Jupiter-IoHash", Hash.ToHexString()}}); PutRefResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) |