diff options
| author | Dan Engelbrecht <[email protected]> | 2024-01-31 10:31:00 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-01-31 10:31:00 +0100 |
| commit | 78968c2e97a5c407a65088aa9861052d80498053 (patch) | |
| tree | 4fa02e7b9fd0b321b8ed7e4adaa7e06c6527929c /src | |
| parent | Update README.md (diff) | |
| download | zen-78968c2e97a5c407a65088aa9861052d80498053.tar.xz zen-78968c2e97a5c407a65088aa9861052d80498053.zip | |
improve oplog export logging (#644)
- Improvement: More details in oplog import/export logs
- Improvement: Switch from Download to Get when fetching Refs from Jupiter as they can't be resumed anyway and streaming to disk is redundant
- Bugfix: Make sure we clear read callback when doing Put in HttpClient to avoid timeout due to not sending data when reusing sessions
- Bugfix: Respect `--ignore-missing-attachments` in `oplog-export` command when loose file is missing on disk
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenhttp/httpclient.cpp | 18 | ||||
| -rw-r--r-- | src/zenserver/projectstore/fileremoteprojectstore.cpp | 2 | ||||
| -rw-r--r-- | src/zenserver/projectstore/jupiterremoteprojectstore.cpp | 4 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 4 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 303 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.h | 4 | ||||
| -rw-r--r-- | src/zenserver/projectstore/zenremoteprojectstore.cpp | 6 | ||||
| -rw-r--r-- | src/zenserver/upstream/jupiter.cpp | 11 | ||||
| -rw-r--r-- | src/zenserver/upstream/jupiter.h | 6 |
9 files changed, 232 insertions, 126 deletions
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index c9005b1cf..cc8a3f033 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -210,10 +210,15 @@ struct HttpClient::Impl : public RefCounted ZEN_TRACE("HEAD {}", Result); return Result; } - inline cpr::Response Put() + inline cpr::Response Put(std::optional<cpr::ReadCallback>&& Read = {}) { + if (Read) + { + CprSession->SetReadCallback(std::move(Read.value())); + } cpr::Response Result = CprSession->Put(); ZEN_TRACE("PUT {}", Result); + CprSession->SetReadCallback({}); return Result; } inline cpr::Response Post() @@ -790,12 +795,9 @@ HttpClient::Upload(std::string_view Url, const IoBuffer& Payload, const KeyValue Offset += size; return true; }; - Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); - } - else - { - Sess->SetBody(AsCprBody(Payload)); + return Sess.Put(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); } + Sess->SetBody(AsCprBody(Payload)); return Sess.Put(); }, m_ConnectionSettings.RetryCount)); @@ -821,9 +823,7 @@ HttpClient::Upload(std::string_view Url, const CompositeBuffer& Payload, ZenCont SizeLeft -= size; return true; }; - Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); - - return Sess.Put(); + return Sess.Put(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); }, m_ConnectionSettings.RetryCount)); } diff --git a/src/zenserver/projectstore/fileremoteprojectstore.cpp b/src/zenserver/projectstore/fileremoteprojectstore.cpp index 8029d02de..defa7bf80 100644 --- a/src/zenserver/projectstore/fileremoteprojectstore.cpp +++ b/src/zenserver/projectstore/fileremoteprojectstore.cpp @@ -39,6 +39,8 @@ public: return { .CreateBlocks = m_EnableBlocks, .UseTempBlockFiles = m_UseTempBlocks, + .ContainerName = m_Name, + .BaseContainerName = m_OptionalBaseName, .Description = fmt::format("[file] {}/{}{}{}"sv, m_OutputPath, m_Name, m_OptionalBaseName.empty() ? "" : " Base: ", m_OptionalBaseName)}; } diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp index c9f1f5f6f..418c2aa84 100644 --- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp +++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp @@ -44,6 +44,8 @@ public: { return {.CreateBlocks = m_EnableBlocks, .UseTempBlockFiles = m_UseTempBlocks, + .ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_Key), + .BaseContainerName = m_OptionalBaseKey == IoHash::Zero ? "" : fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_Key), .Description = fmt::format("[cloud] {} as {}/{}/{}{}"sv, m_CloudClient->ServiceUrl(), m_Namespace, @@ -167,7 +169,7 @@ private: LoadContainerResult LoadContainer(const IoHash& Key) { CloudCacheSession Session(m_CloudClient.Get()); - CloudCacheResult GetResult = Session.GetRef(m_Namespace, m_Bucket, Key, ZenContentType::kCbObject, m_TempFilePath); + CloudCacheResult GetResult = Session.GetRef(m_Namespace, m_Bucket, Key, ZenContentType::kCbObject); if (GetResult.ErrorCode || !GetResult.Success) { LoadContainerResult Result{ConvertResult(GetResult)}; diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index caf405066..6bb543d63 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -3140,8 +3140,6 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op MaxBlockSize, MaxChunkEmbedSize, EmbedLooseFile, - CreateBlocks = StoreInfo.CreateBlocks, - UseTempBlockFiles = StoreInfo.UseTempBlockFiles, Force, IgnoreMissingAttachments](JobContext& Context) { RemoteProjectStore::Result Result = SaveOplog(m_CidStore, @@ -3151,8 +3149,6 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op MaxBlockSize, MaxChunkEmbedSize, EmbedLooseFile, - CreateBlocks, - UseTempBlockFiles, Force, IgnoreMissingAttachments, &Context); diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index c85cd1825..445179983 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -288,7 +288,28 @@ BuildContainer(CidStore& ChunkStore, std::string_view ServerPath = View["serverpath"sv].AsString(); std::filesystem::path FilePath = Project.RootDir / ServerPath; BasicFile DataFile; - DataFile.Open(FilePath, BasicFile::Mode::kRead); + std::error_code Ec; + DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec); + if (Ec) + { + ExtendableStringBuilder<1024> Sb; + Sb.Append("Failed to find attachment '"); + Sb.Append(FilePath.string()); + Sb.Append("' for op: \n"); + Op.ToJson(Sb); + + ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", FilePath, Sb.ToView())); + if (IgnoreMissingAttachments) + { + continue; + } + else + { + throw std::system_error( + Ec, + fmt::format("failed to open file '{}', mode: {:x}", FilePath, uint32_t(BasicFile::Mode::kRead))); + } + } IoBuffer FileIoBuffer = DataFile.ReadAll(); DataFile.Close(); @@ -402,30 +423,42 @@ BuildContainer(CidStore& ChunkStore, ReportMessage(OptionalContext, "Building exported oplog and collecting attachments"); + Stopwatch Timer; tsl::robin_map<int, std::string> OpLSNToKey; - Oplog.IterateOplogWithKey([&](int LSN, const Oid&, CbObjectView Op) { - if (RemoteResult.IsError()) - { - return; - } - std::string_view Key = Op["key"sv].AsString(); - OpLSNToKey.insert({LSN, std::string(Key)}); - Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert({FieldView.AsAttachment(), LSN}); }); - if (OutLooseAttachments != nullptr) - { - RewriteOp(LSN, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; }); - } - else - { - SectionOpsWriter << Op; - } - OpCount++; - if (IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - } - }); + { + Stopwatch RewriteOplogTimer; + Oplog.IterateOplogWithKey([&](int LSN, const Oid&, CbObjectView Op) { + if (RemoteResult.IsError()) + { + return; + } + std::string_view Key = Op["key"sv].AsString(); + OpLSNToKey.insert({LSN, std::string(Key)}); + Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert({FieldView.AsAttachment(), LSN}); }); + if (OutLooseAttachments != nullptr) + { + RewriteOp(LSN, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; }); + } + else + { + SectionOpsWriter << Op; + } + OpCount++; + if (IsCancelled(OptionalContext)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + } + if (OpCount % 100000 == 0) + { + ReportMessage(OptionalContext, fmt::format("Building oplog, at op {}...", OpCount)); + } + }); + ReportMessage(OptionalContext, + fmt::format("Rewrote {} ops to new oplog in {}", + OpCount, + NiceTimeSpanMs(static_cast<uint64_t>(RewriteOplogTimer.GetElapsedTimeMs())))); + } if (IsCancelled(OptionalContext)) { @@ -436,8 +469,9 @@ BuildContainer(CidStore& ChunkStore, if (!Attachments.empty() && !KnownBlocks.empty()) { ReportMessage(OptionalContext, fmt::format("Checking {} known blocks for reuse", KnownBlocks.size())); + Stopwatch ReuseTimer; - size_t ReusedBlockCount = 0; + size_t SkippedAttachmentCount = 0; for (const Block& KnownBlock : KnownBlocks) { size_t BlockAttachmentCount = KnownBlock.ChunksInBlock.size(); @@ -465,10 +499,10 @@ BuildContainer(CidStore& ChunkStore, for (const IoHash& KnownHash : KnownBlock.ChunksInBlock) { Attachments.erase(KnownHash); + SkippedAttachmentCount++; } - BlocksLock.WithExclusiveLock([&]() { Blocks.push_back(KnownBlock); }); - ReusedBlockCount++; + Blocks.push_back(KnownBlock); } else if (FoundAttachmentCount > 0) { @@ -478,33 +512,48 @@ BuildContainer(CidStore& ChunkStore, ReusePercent); } } - ReportMessage(OptionalContext, fmt::format("Reusing {} out of {} known blocks", ReusedBlockCount, KnownBlocks.size())); + ReportMessage(OptionalContext, + fmt::format("Reusing {} out of {} known blocks, skipping upload of {} attachments, completed in {}", + Blocks.size(), + KnownBlocks.size(), + SkippedAttachmentCount, + NiceTimeSpanMs(static_cast<uint64_t>(ReuseTimer.GetElapsedTimeMs())))); + } + + if (IsCancelled(OptionalContext)) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + return {}; } ReportMessage(OptionalContext, fmt::format("Sorting {} attachments from {} ops", Attachments.size(), OpLSNToKey.size())); // Sort attachments so we get predictable blocks for the same oplog upload std::vector<IoHash> SortedAttachments; - SortedAttachments.reserve(Attachments.size()); - for (const auto& It : Attachments) { - SortedAttachments.push_back(It.first); - } - std::sort(SortedAttachments.begin(), SortedAttachments.end(), [&Attachments, &OpLSNToKey](const IoHash& Lhs, const IoHash& Rhs) { - auto LhsLNSIt = Attachments.find(Lhs); - ZEN_ASSERT_SLOW(LhsLNSIt != Attachments.end()); - auto RhsLNSIt = Attachments.find(Rhs); - ZEN_ASSERT_SLOW(RhsLNSIt != Attachments.end()); - if (LhsLNSIt->second == RhsLNSIt->second) + SortedAttachments.reserve(Attachments.size()); + for (const auto& It : Attachments) { - return Lhs < Rhs; + SortedAttachments.push_back(It.first); } - auto LhsKeyIt = OpLSNToKey.find(LhsLNSIt->second); - ZEN_ASSERT_SLOW(LhsKeyIt != OpLSNToKey.end()); - auto RhsKeyIt = OpLSNToKey.find(RhsLNSIt->second); - ZEN_ASSERT_SLOW(RhsKeyIt != OpLSNToKey.end()); - return LhsKeyIt->second < RhsKeyIt->second; - }); + std::sort(SortedAttachments.begin(), + SortedAttachments.end(), + [&Attachments, &OpLSNToKey](const IoHash& Lhs, const IoHash& Rhs) { + auto LhsLNSIt = Attachments.find(Lhs); + ZEN_ASSERT_SLOW(LhsLNSIt != Attachments.end()); + auto RhsLNSIt = Attachments.find(Rhs); + ZEN_ASSERT_SLOW(RhsLNSIt != Attachments.end()); + if (LhsLNSIt->second == RhsLNSIt->second) + { + return Lhs < Rhs; + } + auto LhsKeyIt = OpLSNToKey.find(LhsLNSIt->second); + ZEN_ASSERT_SLOW(LhsKeyIt != OpLSNToKey.end()); + auto RhsKeyIt = OpLSNToKey.find(RhsLNSIt->second); + ZEN_ASSERT_SLOW(RhsKeyIt != OpLSNToKey.end()); + return LhsKeyIt->second < RhsKeyIt->second; + }); + } if (IsCancelled(OptionalContext)) { @@ -527,12 +576,12 @@ BuildContainer(CidStore& ChunkStore, } return ChunkStore.FindChunkByCid(AttachmentHash); }; - - int LastLSNOp = -1; + Latch BlockCreateLatch(1); size_t GeneratedBlockCount = 0; size_t LargeAttachmentCount = 0; - Latch BlockCreateLatch(1); + int LastLSNOp = -1; + for (const IoHash& AttachmentHash : SortedAttachments) { if (IsCancelled(OptionalContext)) @@ -682,11 +731,12 @@ BuildContainer(CidStore& ChunkStore, return {}; } ReportMessage(OptionalContext, - fmt::format("Assembled {} attachments from {} ops into {} blocks and {} loose attachments", + fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and {} loose attachments in {}", SortedAttachments.size(), OpLSNToKey.size(), GeneratedBlockCount, - LargeAttachmentCount)); + LargeAttachmentCount, + NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs())))); CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer()); if (IsCancelled(OptionalContext)) @@ -831,6 +881,15 @@ BuildContainer(CidStore& ChunkStore, return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject}; } +struct UploadInfo +{ + uint64_t OplogSizeBytes = 0; + std::atomic<uint64_t> AttachmentsUploaded = 0; + std::atomic<uint64_t> AttachmentBlocksUploaded = 0; + std::atomic<uint64_t> AttachmentBytesUploaded = 0; + std::atomic<uint64_t> AttachmentBlockBytesUploaded = 0; +}; + void UploadAttachments(WorkerThreadPool& WorkerPool, CidStore& ChunkStore, @@ -841,6 +900,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool, const tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>& TempAttachments, const std::unordered_set<IoHash, IoHash::Hasher>& Needs, bool ForceAll, + UploadInfo& Info, AsyncRemoteResult& RemoteResult, JobContext* OptionalContext) { @@ -851,7 +911,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool, return; } - ReportMessage(OptionalContext, "Filtering needed attachments..."); + ReportMessage(OptionalContext, "Filtering needed attachments for upload..."); std::unordered_set<IoHash, IoHash::Hasher> AttachmentsToUpload; @@ -900,7 +960,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool, { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), "Invalid attachment", - fmt::format("Upload requested of unknown attachment '{}'", Needed)); + fmt::format("Upload requested an unknown attachment '{}'", Needed)); ReportMessage( OptionalContext, fmt::format("Failed to upload attachment '{}'. ({}): {}", Needed, RemoteResult.GetError(), RemoteResult.GetErrorReason())); @@ -963,6 +1023,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool, RawHash, &CreatedBlocks, TempPayload = std::move(Payload), + &Info, OptionalContext]() { auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); if (RemoteResult.IsError()) @@ -991,6 +1052,8 @@ UploadAttachments(WorkerThreadPool& WorkerPool, RemoteResult.GetErrorReason())); return; } + Info.AttachmentsUploaded.fetch_add(1); + Info.AttachmentBytesUploaded.fetch_add(Payload.GetSize()); ZEN_INFO("Saved large attachment '{}' in {} ({})", RawHash, NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), @@ -1023,34 +1086,42 @@ UploadAttachments(WorkerThreadPool& WorkerPool, ZEN_ASSERT(Payload); SaveAttachmentsLatch.AddCount(1); AttachmentsToSave++; - WorkerPool.ScheduleWork( - [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash, OptionalContext]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - - RemoteProjectStore::SaveAttachmentResult Result = - RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ReportMessage(OptionalContext, - fmt::format("Failed to save attachment '{}', {} ({}): {}", - RawHash, - NiceBytes(Payload.GetSize()), - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - return; - } + WorkerPool.ScheduleWork([&ChunkStore, + &RemoteStore, + &SaveAttachmentsLatch, + &RemoteResult, + Payload = std::move(Payload), + RawHash, + &Info, + OptionalContext]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } - ZEN_INFO("Saved block attachment '{}' in {} ({})", - RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), - NiceBytes(Payload.GetSize())); + RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ReportMessage(OptionalContext, + fmt::format("Failed to save attachment '{}', {} ({}): {}", + RawHash, + NiceBytes(Payload.GetSize()), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); return; - }); + } + + Info.AttachmentBlocksUploaded.fetch_add(1); + Info.AttachmentBlockBytesUploaded.fetch_add(Payload.GetSize()); + + ZEN_INFO("Saved block attachment '{}' in {} ({})", + RawHash, + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), + NiceBytes(Payload.GetSize())); + return; + }); } if (IsCancelled(OptionalContext)) @@ -1092,6 +1163,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool, &Chunks, NeededChunks = std::move(NeededChunks), &BulkAttachmentCountToUpload, + &Info, OptionalContext]() { auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); size_t ChunksSize = 0; @@ -1122,6 +1194,9 @@ UploadAttachments(WorkerThreadPool& WorkerPool, RemoteResult.GetErrorReason())); return; } + Info.AttachmentsUploaded.fetch_add(NeededChunks.size()); + Info.AttachmentBytesUploaded.fetch_add(ChunksSize); + ZEN_INFO("Saved {} bulk attachments in {} ({})", Chunks.size(), NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), @@ -1161,8 +1236,6 @@ SaveOplog(CidStore& ChunkStore, size_t MaxBlockSize, size_t MaxChunkEmbedSize, bool EmbedLooseFiles, - bool BuildBlocks, - bool UseTempBlocks, bool ForceUpload, bool IgnoreMissingAttachments, JobContext* OptionalContext) @@ -1171,10 +1244,14 @@ SaveOplog(CidStore& ChunkStore, Stopwatch Timer; + UploadInfo Info; + WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); + const RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo(); + std::filesystem::path AttachmentTempPath; - if (UseTempBlocks) + if (RemoteStoreInfo.UseTempBlockFiles) { AttachmentTempPath = Oplog.TempPath(); AttachmentTempPath.append(".pending"); @@ -1234,7 +1311,7 @@ SaveOplog(CidStore& ChunkStore, } }; - auto UploadBlock = [&RemoteStore, &RemoteResult, OptionalContext](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) { + auto UploadBlock = [&RemoteStore, &RemoteResult, &Info, OptionalContext](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) { RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash); if (Result.ErrorCode) { @@ -1243,6 +1320,8 @@ SaveOplog(CidStore& ChunkStore, fmt::format("Failed to save attachment ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return; } + Info.AttachmentBlocksUploaded.fetch_add(1); + Info.AttachmentBlockBytesUploaded.fetch_add(CompressedBlock.GetCompressedSize()); ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize())); }; @@ -1261,7 +1340,7 @@ SaveOplog(CidStore& ChunkStore, }; std::function<void(CompressedBuffer&&, const IoHash&)> OnBlock; - if (UseTempBlocks) + if (RemoteStoreInfo.UseTempBlockFiles) { OnBlock = MakeTempBlock; } @@ -1272,21 +1351,24 @@ SaveOplog(CidStore& ChunkStore, std::vector<Block> KnownBlocks; - if (BuildBlocks) + if (RemoteStoreInfo.CreateBlocks && !RemoteStoreInfo.BaseContainerName.empty()) { - ReportMessage(OptionalContext, "Loading oplog base container"); + ReportMessage(OptionalContext, fmt::format("Loading oplog base container '{}'", RemoteStoreInfo.BaseContainerName)); RemoteProjectStore::LoadContainerResult BaseContainerResult = RemoteStore.LoadBaseContainer(); if (BaseContainerResult.ErrorCode != static_cast<int>(HttpResponseCode::NoContent)) { if (BaseContainerResult.ErrorCode) { ReportMessage(OptionalContext, - fmt::format("Failed to load oplog base container ({}): {}, uploading all attachments", + fmt::format("Failed to load oplog base container '{}' ({}): {}, uploading all attachments", + RemoteStoreInfo.BaseContainerName, BaseContainerResult.ErrorCode, BaseContainerResult.Reason)); } else { + ReportMessage(OptionalContext, fmt::format("Loaded oplog base container in {:.3} s", BaseContainerResult.ElapsedSeconds)); + CbArrayView BlocksArray = BaseContainerResult.ContainerObject["blocks"sv].AsArrayView(); KnownBlocks.reserve(BlocksArray.Num()); for (CbFieldView BlockField : BlocksArray) @@ -1309,7 +1391,6 @@ SaveOplog(CidStore& ChunkStore, KnownBlocks.push_back({.BlockHash = BlockHash, .ChunksInBlock = std::move(ChunksInBlock)}); }; } - ReportMessage(OptionalContext, fmt::format("Loaded oplog base container in {:.3} s", BaseContainerResult.ElapsedSeconds)); } } @@ -1319,7 +1400,7 @@ SaveOplog(CidStore& ChunkStore, Oplog, MaxBlockSize, MaxChunkEmbedSize, - BuildBlocks, + RemoteStoreInfo.CreateBlocks, IgnoreMissingAttachments, KnownBlocks, WorkerPool, @@ -1331,6 +1412,8 @@ SaveOplog(CidStore& ChunkStore, /* out */ RemoteResult); if (!RemoteResult.IsError()) { + Info.OplogSizeBytes = OplogContainerObject.GetSize(); + if (IsCancelled(OptionalContext)) { RemoteProjectStore::Result Result = {.ErrorCode = 0, @@ -1341,7 +1424,11 @@ SaveOplog(CidStore& ChunkStore, uint64_t ChunkCount = OplogContainerObject["chunks"sv].AsArrayView().Num(); uint64_t BlockCount = OplogContainerObject["blocks"sv].AsArrayView().Num(); - ReportMessage(OptionalContext, fmt::format("Saving oplog container with {} attachments and {} blocks...", ChunkCount, BlockCount)); + ReportMessage(OptionalContext, + fmt::format("Saving oplog container '{}' with {} attachments and {} blocks...", + RemoteStoreInfo.ContainerName, + ChunkCount, + BlockCount)); RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer()); if (ContainerSaveResult.ErrorCode) @@ -1352,7 +1439,10 @@ SaveOplog(CidStore& ChunkStore, } else { - ZEN_DEBUG("Saved container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000))); + ReportMessage(OptionalContext, + fmt::format("Saved container '{}' in {}", + RemoteStoreInfo.ContainerName, + NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000)))); } UploadAttachments(WorkerPool, @@ -1364,6 +1454,7 @@ SaveOplog(CidStore& ChunkStore, TempAttachments, ContainerSaveResult.Needs, ForceUpload, + Info, RemoteResult, OptionalContext); @@ -1390,7 +1481,11 @@ SaveOplog(CidStore& ChunkStore, RemoteProjectStore::Result Result = RemoteResult.ConvertResult(); return Result; } - ZEN_DEBUG("Finalized container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000))); + ReportMessage(OptionalContext, + fmt::format("Finalized container '{}' in {}", + RemoteStoreInfo.ContainerName, + NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000)))); + if (ContainerFinalizeResult.Needs.empty()) { break; @@ -1405,7 +1500,9 @@ SaveOplog(CidStore& ChunkStore, } ReportMessage(OptionalContext, - fmt::format("Finalize reported {} missing attachments...", ContainerFinalizeResult.Needs.size())); + fmt::format("Finalize of container '{}' reported {} missing attachments. Uploading missing attachements.", + RemoteStoreInfo.ContainerName, + ContainerFinalizeResult.Needs.size())); UploadAttachments(WorkerPool, ChunkStore, @@ -1416,6 +1513,7 @@ SaveOplog(CidStore& ChunkStore, TempAttachments, ContainerFinalizeResult.Needs, false, + Info, RemoteResult, OptionalContext); } @@ -1425,9 +1523,18 @@ SaveOplog(CidStore& ChunkStore, } RemoteProjectStore::Result Result = RemoteResult.ConvertResult(); Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - ZEN_INFO("Saved oplog {} in {}", - RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + + ReportMessage(OptionalContext, + fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({})", + RemoteStoreInfo.ContainerName, + RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), + NiceBytes(Info.OplogSizeBytes), + Info.AttachmentBlocksUploaded.load(), + NiceBytes(Info.AttachmentBlockBytesUploaded.load()), + Info.AttachmentsUploaded.load(), + NiceBytes(Info.AttachmentBytesUploaded.load()))); + return Result; }; @@ -1587,6 +1694,9 @@ LoadOplog(CidStore& ChunkStore, std::unordered_set<IoHash, IoHash::Hasher> Attachments; std::vector<std::vector<IoHash>> ChunksInBlocks; + RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo(); + ReportMessage(OptionalContext, fmt::format("Loading oplog container '{}'", RemoteStoreInfo.ContainerName)); + RemoteProjectStore::LoadContainerResult LoadContainerResult = RemoteStore.LoadContainer(); if (LoadContainerResult.ErrorCode) { @@ -1828,7 +1938,7 @@ LoadOplog(CidStore& ChunkStore, RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); } ReportMessage(OptionalContext, - fmt::format("Loaded oplog container in {}, found {} attachments to download", + fmt::format("Wrote oplog in {}, found {} attachments to download", NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), Attachments.size())); @@ -1856,7 +1966,8 @@ LoadOplog(CidStore& ChunkStore, Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; ReportMessage(OptionalContext, - fmt::format("Loaded oplog {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {}", + fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {}", + RemoteStoreInfo.ContainerName, RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), NiceBytes(Info.OplogSizeBytes), diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h index f4df78f8c..7f0cb0ebb 100644 --- a/src/zenserver/projectstore/remoteprojectstore.h +++ b/src/zenserver/projectstore/remoteprojectstore.h @@ -61,6 +61,8 @@ public: { bool CreateBlocks; bool UseTempBlockFiles; + std::string ContainerName; + std::string BaseContainerName; std::string Description; }; @@ -116,8 +118,6 @@ RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, size_t MaxBlockSize, size_t MaxChunkEmbedSize, bool EmbedLooseFiles, - bool BuildBlocks, - bool UseTempBlocks, bool ForceUpload, bool IgnoreMissingAttachments, JobContext* OptionalContext); diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp index 7823010b5..95fcc9e21 100644 --- a/src/zenserver/projectstore/zenremoteprojectstore.cpp +++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp @@ -38,7 +38,11 @@ public: virtual RemoteStoreInfo GetInfo() const override { - return {.CreateBlocks = false, .UseTempBlockFiles = false, .Description = fmt::format("[zen] {}"sv, m_HostAddress)}; + return {.CreateBlocks = false, + .UseTempBlockFiles = false, + .ContainerName = fmt::format("{}/{}", m_Project, m_Oplog), + .BaseContainerName = "", + .Description = fmt::format("[zen] {}"sv, m_HostAddress)}; } virtual SaveResult SaveContainer(const IoBuffer& Payload) override diff --git a/src/zenserver/upstream/jupiter.cpp b/src/zenserver/upstream/jupiter.cpp index fe4d31604..c77834657 100644 --- a/src/zenserver/upstream/jupiter.cpp +++ b/src/zenserver/upstream/jupiter.cpp @@ -71,18 +71,13 @@ CloudCacheSession::Authenticate() } CloudCacheResult -CloudCacheSession::GetRef(std::string_view Namespace, - std::string_view BucketId, - const IoHash& Key, - ZenContentType RefType, - std::filesystem::path TempFolderPath) +CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType) { ZEN_TRACE_CPU("JupiterClient::GetRef"); HttpClient::Response Response = - m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), - TempFolderPath, - {HttpClient::Accept(RefType)}); + m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), + {HttpClient::Accept(RefType)}); return detail::ConvertResponse(Response, "CloudCacheSession::GetRef"sv); } diff --git a/src/zenserver/upstream/jupiter.h b/src/zenserver/upstream/jupiter.h index 93f2cc883..cfe8f6186 100644 --- a/src/zenserver/upstream/jupiter.h +++ b/src/zenserver/upstream/jupiter.h @@ -94,11 +94,7 @@ public: ~CloudCacheSession(); CloudCacheResult Authenticate(); - CloudCacheResult GetRef(std::string_view Namespace, - std::string_view BucketId, - const IoHash& Key, - ZenContentType RefType, - std::filesystem::path TempFolderPath = {}); + CloudCacheResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType); CloudCacheResult GetBlob(std::string_view Namespace, const IoHash& Key); CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath = {}); CloudCacheResult GetObject(std::string_view Namespace, const IoHash& Key); |