aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-01-31 10:31:00 +0100
committerGitHub <[email protected]>2024-01-31 10:31:00 +0100
commit78968c2e97a5c407a65088aa9861052d80498053 (patch)
tree4fa02e7b9fd0b321b8ed7e4adaa7e06c6527929c /src
parentUpdate README.md (diff)
downloadzen-78968c2e97a5c407a65088aa9861052d80498053.tar.xz
zen-78968c2e97a5c407a65088aa9861052d80498053.zip
improve oplog export logging (#644)
- Improvement: More details in oplog import/export logs - Improvement: Switch from Download to Get when fetching Refs from Jupiter as they can't be resumed anyway and streaming to disk is redundant - Bugfix: Make sure we clear read callback when doing Put in HttpClient to avoid timeout due to not sending data when reusing sessions - Bugfix: Respect `--ignore-missing-attachments` in `oplog-export` command when loose file is missing on disk
Diffstat (limited to 'src')
-rw-r--r--src/zenhttp/httpclient.cpp18
-rw-r--r--src/zenserver/projectstore/fileremoteprojectstore.cpp2
-rw-r--r--src/zenserver/projectstore/jupiterremoteprojectstore.cpp4
-rw-r--r--src/zenserver/projectstore/projectstore.cpp4
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp303
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.h4
-rw-r--r--src/zenserver/projectstore/zenremoteprojectstore.cpp6
-rw-r--r--src/zenserver/upstream/jupiter.cpp11
-rw-r--r--src/zenserver/upstream/jupiter.h6
9 files changed, 232 insertions, 126 deletions
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index c9005b1cf..cc8a3f033 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -210,10 +210,15 @@ struct HttpClient::Impl : public RefCounted
ZEN_TRACE("HEAD {}", Result);
return Result;
}
- inline cpr::Response Put()
+ inline cpr::Response Put(std::optional<cpr::ReadCallback>&& Read = {})
{
+ if (Read)
+ {
+ CprSession->SetReadCallback(std::move(Read.value()));
+ }
cpr::Response Result = CprSession->Put();
ZEN_TRACE("PUT {}", Result);
+ CprSession->SetReadCallback({});
return Result;
}
inline cpr::Response Post()
@@ -790,12 +795,9 @@ HttpClient::Upload(std::string_view Url, const IoBuffer& Payload, const KeyValue
Offset += size;
return true;
};
- Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
- }
- else
- {
- Sess->SetBody(AsCprBody(Payload));
+ return Sess.Put(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
}
+ Sess->SetBody(AsCprBody(Payload));
return Sess.Put();
},
m_ConnectionSettings.RetryCount));
@@ -821,9 +823,7 @@ HttpClient::Upload(std::string_view Url, const CompositeBuffer& Payload, ZenCont
SizeLeft -= size;
return true;
};
- Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
-
- return Sess.Put();
+ return Sess.Put(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
},
m_ConnectionSettings.RetryCount));
}
diff --git a/src/zenserver/projectstore/fileremoteprojectstore.cpp b/src/zenserver/projectstore/fileremoteprojectstore.cpp
index 8029d02de..defa7bf80 100644
--- a/src/zenserver/projectstore/fileremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/fileremoteprojectstore.cpp
@@ -39,6 +39,8 @@ public:
return {
.CreateBlocks = m_EnableBlocks,
.UseTempBlockFiles = m_UseTempBlocks,
+ .ContainerName = m_Name,
+ .BaseContainerName = m_OptionalBaseName,
.Description =
fmt::format("[file] {}/{}{}{}"sv, m_OutputPath, m_Name, m_OptionalBaseName.empty() ? "" : " Base: ", m_OptionalBaseName)};
}
diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
index c9f1f5f6f..418c2aa84 100644
--- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
@@ -44,6 +44,8 @@ public:
{
return {.CreateBlocks = m_EnableBlocks,
.UseTempBlockFiles = m_UseTempBlocks,
+ .ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_Key),
+ .BaseContainerName = m_OptionalBaseKey == IoHash::Zero ? "" : fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_Key),
.Description = fmt::format("[cloud] {} as {}/{}/{}{}"sv,
m_CloudClient->ServiceUrl(),
m_Namespace,
@@ -167,7 +169,7 @@ private:
LoadContainerResult LoadContainer(const IoHash& Key)
{
CloudCacheSession Session(m_CloudClient.Get());
- CloudCacheResult GetResult = Session.GetRef(m_Namespace, m_Bucket, Key, ZenContentType::kCbObject, m_TempFilePath);
+ CloudCacheResult GetResult = Session.GetRef(m_Namespace, m_Bucket, Key, ZenContentType::kCbObject);
if (GetResult.ErrorCode || !GetResult.Success)
{
LoadContainerResult Result{ConvertResult(GetResult)};
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index caf405066..6bb543d63 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -3140,8 +3140,6 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op
MaxBlockSize,
MaxChunkEmbedSize,
EmbedLooseFile,
- CreateBlocks = StoreInfo.CreateBlocks,
- UseTempBlockFiles = StoreInfo.UseTempBlockFiles,
Force,
IgnoreMissingAttachments](JobContext& Context) {
RemoteProjectStore::Result Result = SaveOplog(m_CidStore,
@@ -3151,8 +3149,6 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op
MaxBlockSize,
MaxChunkEmbedSize,
EmbedLooseFile,
- CreateBlocks,
- UseTempBlockFiles,
Force,
IgnoreMissingAttachments,
&Context);
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index c85cd1825..445179983 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -288,7 +288,28 @@ BuildContainer(CidStore& ChunkStore,
std::string_view ServerPath = View["serverpath"sv].AsString();
std::filesystem::path FilePath = Project.RootDir / ServerPath;
BasicFile DataFile;
- DataFile.Open(FilePath, BasicFile::Mode::kRead);
+ std::error_code Ec;
+ DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec);
+ if (Ec)
+ {
+ ExtendableStringBuilder<1024> Sb;
+ Sb.Append("Failed to find attachment '");
+ Sb.Append(FilePath.string());
+ Sb.Append("' for op: \n");
+ Op.ToJson(Sb);
+
+ ReportMessage(OptionalContext, fmt::format("Missing attachment '{}': {}", FilePath, Sb.ToView()));
+ if (IgnoreMissingAttachments)
+ {
+ continue;
+ }
+ else
+ {
+ throw std::system_error(
+ Ec,
+ fmt::format("failed to open file '{}', mode: {:x}", FilePath, uint32_t(BasicFile::Mode::kRead)));
+ }
+ }
IoBuffer FileIoBuffer = DataFile.ReadAll();
DataFile.Close();
@@ -402,30 +423,42 @@ BuildContainer(CidStore& ChunkStore,
ReportMessage(OptionalContext, "Building exported oplog and collecting attachments");
+ Stopwatch Timer;
tsl::robin_map<int, std::string> OpLSNToKey;
- Oplog.IterateOplogWithKey([&](int LSN, const Oid&, CbObjectView Op) {
- if (RemoteResult.IsError())
- {
- return;
- }
- std::string_view Key = Op["key"sv].AsString();
- OpLSNToKey.insert({LSN, std::string(Key)});
- Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert({FieldView.AsAttachment(), LSN}); });
- if (OutLooseAttachments != nullptr)
- {
- RewriteOp(LSN, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; });
- }
- else
- {
- SectionOpsWriter << Op;
- }
- OpCount++;
- if (IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- }
- });
+ {
+ Stopwatch RewriteOplogTimer;
+ Oplog.IterateOplogWithKey([&](int LSN, const Oid&, CbObjectView Op) {
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ std::string_view Key = Op["key"sv].AsString();
+ OpLSNToKey.insert({LSN, std::string(Key)});
+ Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert({FieldView.AsAttachment(), LSN}); });
+ if (OutLooseAttachments != nullptr)
+ {
+ RewriteOp(LSN, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; });
+ }
+ else
+ {
+ SectionOpsWriter << Op;
+ }
+ OpCount++;
+ if (IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ }
+ if (OpCount % 100000 == 0)
+ {
+ ReportMessage(OptionalContext, fmt::format("Building oplog, at op {}...", OpCount));
+ }
+ });
+ ReportMessage(OptionalContext,
+ fmt::format("Rewrote {} ops to new oplog in {}",
+ OpCount,
+ NiceTimeSpanMs(static_cast<uint64_t>(RewriteOplogTimer.GetElapsedTimeMs()))));
+ }
if (IsCancelled(OptionalContext))
{
@@ -436,8 +469,9 @@ BuildContainer(CidStore& ChunkStore,
if (!Attachments.empty() && !KnownBlocks.empty())
{
ReportMessage(OptionalContext, fmt::format("Checking {} known blocks for reuse", KnownBlocks.size()));
+ Stopwatch ReuseTimer;
- size_t ReusedBlockCount = 0;
+ size_t SkippedAttachmentCount = 0;
for (const Block& KnownBlock : KnownBlocks)
{
size_t BlockAttachmentCount = KnownBlock.ChunksInBlock.size();
@@ -465,10 +499,10 @@ BuildContainer(CidStore& ChunkStore,
for (const IoHash& KnownHash : KnownBlock.ChunksInBlock)
{
Attachments.erase(KnownHash);
+ SkippedAttachmentCount++;
}
- BlocksLock.WithExclusiveLock([&]() { Blocks.push_back(KnownBlock); });
- ReusedBlockCount++;
+ Blocks.push_back(KnownBlock);
}
else if (FoundAttachmentCount > 0)
{
@@ -478,33 +512,48 @@ BuildContainer(CidStore& ChunkStore,
ReusePercent);
}
}
- ReportMessage(OptionalContext, fmt::format("Reusing {} out of {} known blocks", ReusedBlockCount, KnownBlocks.size()));
+ ReportMessage(OptionalContext,
+ fmt::format("Reusing {} out of {} known blocks, skipping upload of {} attachments, completed in {}",
+ Blocks.size(),
+ KnownBlocks.size(),
+ SkippedAttachmentCount,
+ NiceTimeSpanMs(static_cast<uint64_t>(ReuseTimer.GetElapsedTimeMs()))));
+ }
+
+ if (IsCancelled(OptionalContext))
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ return {};
}
ReportMessage(OptionalContext, fmt::format("Sorting {} attachments from {} ops", Attachments.size(), OpLSNToKey.size()));
// Sort attachments so we get predictable blocks for the same oplog upload
std::vector<IoHash> SortedAttachments;
- SortedAttachments.reserve(Attachments.size());
- for (const auto& It : Attachments)
{
- SortedAttachments.push_back(It.first);
- }
- std::sort(SortedAttachments.begin(), SortedAttachments.end(), [&Attachments, &OpLSNToKey](const IoHash& Lhs, const IoHash& Rhs) {
- auto LhsLNSIt = Attachments.find(Lhs);
- ZEN_ASSERT_SLOW(LhsLNSIt != Attachments.end());
- auto RhsLNSIt = Attachments.find(Rhs);
- ZEN_ASSERT_SLOW(RhsLNSIt != Attachments.end());
- if (LhsLNSIt->second == RhsLNSIt->second)
+ SortedAttachments.reserve(Attachments.size());
+ for (const auto& It : Attachments)
{
- return Lhs < Rhs;
+ SortedAttachments.push_back(It.first);
}
- auto LhsKeyIt = OpLSNToKey.find(LhsLNSIt->second);
- ZEN_ASSERT_SLOW(LhsKeyIt != OpLSNToKey.end());
- auto RhsKeyIt = OpLSNToKey.find(RhsLNSIt->second);
- ZEN_ASSERT_SLOW(RhsKeyIt != OpLSNToKey.end());
- return LhsKeyIt->second < RhsKeyIt->second;
- });
+ std::sort(SortedAttachments.begin(),
+ SortedAttachments.end(),
+ [&Attachments, &OpLSNToKey](const IoHash& Lhs, const IoHash& Rhs) {
+ auto LhsLNSIt = Attachments.find(Lhs);
+ ZEN_ASSERT_SLOW(LhsLNSIt != Attachments.end());
+ auto RhsLNSIt = Attachments.find(Rhs);
+ ZEN_ASSERT_SLOW(RhsLNSIt != Attachments.end());
+ if (LhsLNSIt->second == RhsLNSIt->second)
+ {
+ return Lhs < Rhs;
+ }
+ auto LhsKeyIt = OpLSNToKey.find(LhsLNSIt->second);
+ ZEN_ASSERT_SLOW(LhsKeyIt != OpLSNToKey.end());
+ auto RhsKeyIt = OpLSNToKey.find(RhsLNSIt->second);
+ ZEN_ASSERT_SLOW(RhsKeyIt != OpLSNToKey.end());
+ return LhsKeyIt->second < RhsKeyIt->second;
+ });
+ }
if (IsCancelled(OptionalContext))
{
@@ -527,12 +576,12 @@ BuildContainer(CidStore& ChunkStore,
}
return ChunkStore.FindChunkByCid(AttachmentHash);
};
-
- int LastLSNOp = -1;
+ Latch BlockCreateLatch(1);
size_t GeneratedBlockCount = 0;
size_t LargeAttachmentCount = 0;
- Latch BlockCreateLatch(1);
+ int LastLSNOp = -1;
+
for (const IoHash& AttachmentHash : SortedAttachments)
{
if (IsCancelled(OptionalContext))
@@ -682,11 +731,12 @@ BuildContainer(CidStore& ChunkStore,
return {};
}
ReportMessage(OptionalContext,
- fmt::format("Assembled {} attachments from {} ops into {} blocks and {} loose attachments",
+ fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and {} loose attachments in {}",
SortedAttachments.size(),
OpLSNToKey.size(),
GeneratedBlockCount,
- LargeAttachmentCount));
+ LargeAttachmentCount,
+ NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs()))));
CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer());
if (IsCancelled(OptionalContext))
@@ -831,6 +881,15 @@ BuildContainer(CidStore& ChunkStore,
return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject};
}
+struct UploadInfo
+{
+ uint64_t OplogSizeBytes = 0;
+ std::atomic<uint64_t> AttachmentsUploaded = 0;
+ std::atomic<uint64_t> AttachmentBlocksUploaded = 0;
+ std::atomic<uint64_t> AttachmentBytesUploaded = 0;
+ std::atomic<uint64_t> AttachmentBlockBytesUploaded = 0;
+};
+
void
UploadAttachments(WorkerThreadPool& WorkerPool,
CidStore& ChunkStore,
@@ -841,6 +900,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
const tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>& TempAttachments,
const std::unordered_set<IoHash, IoHash::Hasher>& Needs,
bool ForceAll,
+ UploadInfo& Info,
AsyncRemoteResult& RemoteResult,
JobContext* OptionalContext)
{
@@ -851,7 +911,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
return;
}
- ReportMessage(OptionalContext, "Filtering needed attachments...");
+ ReportMessage(OptionalContext, "Filtering needed attachments for upload...");
std::unordered_set<IoHash, IoHash::Hasher> AttachmentsToUpload;
@@ -900,7 +960,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
"Invalid attachment",
- fmt::format("Upload requested of unknown attachment '{}'", Needed));
+ fmt::format("Upload requested an unknown attachment '{}'", Needed));
ReportMessage(
OptionalContext,
fmt::format("Failed to upload attachment '{}'. ({}): {}", Needed, RemoteResult.GetError(), RemoteResult.GetErrorReason()));
@@ -963,6 +1023,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
RawHash,
&CreatedBlocks,
TempPayload = std::move(Payload),
+ &Info,
OptionalContext]() {
auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
if (RemoteResult.IsError())
@@ -991,6 +1052,8 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
RemoteResult.GetErrorReason()));
return;
}
+ Info.AttachmentsUploaded.fetch_add(1);
+ Info.AttachmentBytesUploaded.fetch_add(Payload.GetSize());
ZEN_INFO("Saved large attachment '{}' in {} ({})",
RawHash,
NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
@@ -1023,34 +1086,42 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
ZEN_ASSERT(Payload);
SaveAttachmentsLatch.AddCount(1);
AttachmentsToSave++;
- WorkerPool.ScheduleWork(
- [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash, OptionalContext]() {
- auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
-
- RemoteProjectStore::SaveAttachmentResult Result =
- RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash);
- if (Result.ErrorCode)
- {
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ReportMessage(OptionalContext,
- fmt::format("Failed to save attachment '{}', {} ({}): {}",
- RawHash,
- NiceBytes(Payload.GetSize()),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- return;
- }
+ WorkerPool.ScheduleWork([&ChunkStore,
+ &RemoteStore,
+ &SaveAttachmentsLatch,
+ &RemoteResult,
+ Payload = std::move(Payload),
+ RawHash,
+ &Info,
+ OptionalContext]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
- ZEN_INFO("Saved block attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(Payload.GetSize()));
+ RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to save attachment '{}', {} ({}): {}",
+ RawHash,
+ NiceBytes(Payload.GetSize()),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
return;
- });
+ }
+
+ Info.AttachmentBlocksUploaded.fetch_add(1);
+ Info.AttachmentBlockBytesUploaded.fetch_add(Payload.GetSize());
+
+ ZEN_INFO("Saved block attachment '{}' in {} ({})",
+ RawHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
+ NiceBytes(Payload.GetSize()));
+ return;
+ });
}
if (IsCancelled(OptionalContext))
@@ -1092,6 +1163,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
&Chunks,
NeededChunks = std::move(NeededChunks),
&BulkAttachmentCountToUpload,
+ &Info,
OptionalContext]() {
auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
size_t ChunksSize = 0;
@@ -1122,6 +1194,9 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
RemoteResult.GetErrorReason()));
return;
}
+ Info.AttachmentsUploaded.fetch_add(NeededChunks.size());
+ Info.AttachmentBytesUploaded.fetch_add(ChunksSize);
+
ZEN_INFO("Saved {} bulk attachments in {} ({})",
Chunks.size(),
NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
@@ -1161,8 +1236,6 @@ SaveOplog(CidStore& ChunkStore,
size_t MaxBlockSize,
size_t MaxChunkEmbedSize,
bool EmbedLooseFiles,
- bool BuildBlocks,
- bool UseTempBlocks,
bool ForceUpload,
bool IgnoreMissingAttachments,
JobContext* OptionalContext)
@@ -1171,10 +1244,14 @@ SaveOplog(CidStore& ChunkStore,
Stopwatch Timer;
+ UploadInfo Info;
+
WorkerThreadPool& WorkerPool = GetSmallWorkerPool();
+ const RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo();
+
std::filesystem::path AttachmentTempPath;
- if (UseTempBlocks)
+ if (RemoteStoreInfo.UseTempBlockFiles)
{
AttachmentTempPath = Oplog.TempPath();
AttachmentTempPath.append(".pending");
@@ -1234,7 +1311,7 @@ SaveOplog(CidStore& ChunkStore,
}
};
- auto UploadBlock = [&RemoteStore, &RemoteResult, OptionalContext](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) {
+ auto UploadBlock = [&RemoteStore, &RemoteResult, &Info, OptionalContext](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) {
RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash);
if (Result.ErrorCode)
{
@@ -1243,6 +1320,8 @@ SaveOplog(CidStore& ChunkStore,
fmt::format("Failed to save attachment ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return;
}
+ Info.AttachmentBlocksUploaded.fetch_add(1);
+ Info.AttachmentBlockBytesUploaded.fetch_add(CompressedBlock.GetCompressedSize());
ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize()));
};
@@ -1261,7 +1340,7 @@ SaveOplog(CidStore& ChunkStore,
};
std::function<void(CompressedBuffer&&, const IoHash&)> OnBlock;
- if (UseTempBlocks)
+ if (RemoteStoreInfo.UseTempBlockFiles)
{
OnBlock = MakeTempBlock;
}
@@ -1272,21 +1351,24 @@ SaveOplog(CidStore& ChunkStore,
std::vector<Block> KnownBlocks;
- if (BuildBlocks)
+ if (RemoteStoreInfo.CreateBlocks && !RemoteStoreInfo.BaseContainerName.empty())
{
- ReportMessage(OptionalContext, "Loading oplog base container");
+ ReportMessage(OptionalContext, fmt::format("Loading oplog base container '{}'", RemoteStoreInfo.BaseContainerName));
RemoteProjectStore::LoadContainerResult BaseContainerResult = RemoteStore.LoadBaseContainer();
if (BaseContainerResult.ErrorCode != static_cast<int>(HttpResponseCode::NoContent))
{
if (BaseContainerResult.ErrorCode)
{
ReportMessage(OptionalContext,
- fmt::format("Failed to load oplog base container ({}): {}, uploading all attachments",
+ fmt::format("Failed to load oplog base container '{}' ({}): {}, uploading all attachments",
+ RemoteStoreInfo.BaseContainerName,
BaseContainerResult.ErrorCode,
BaseContainerResult.Reason));
}
else
{
+ ReportMessage(OptionalContext, fmt::format("Loaded oplog base container in {:.3} s", BaseContainerResult.ElapsedSeconds));
+
CbArrayView BlocksArray = BaseContainerResult.ContainerObject["blocks"sv].AsArrayView();
KnownBlocks.reserve(BlocksArray.Num());
for (CbFieldView BlockField : BlocksArray)
@@ -1309,7 +1391,6 @@ SaveOplog(CidStore& ChunkStore,
KnownBlocks.push_back({.BlockHash = BlockHash, .ChunksInBlock = std::move(ChunksInBlock)});
};
}
- ReportMessage(OptionalContext, fmt::format("Loaded oplog base container in {:.3} s", BaseContainerResult.ElapsedSeconds));
}
}
@@ -1319,7 +1400,7 @@ SaveOplog(CidStore& ChunkStore,
Oplog,
MaxBlockSize,
MaxChunkEmbedSize,
- BuildBlocks,
+ RemoteStoreInfo.CreateBlocks,
IgnoreMissingAttachments,
KnownBlocks,
WorkerPool,
@@ -1331,6 +1412,8 @@ SaveOplog(CidStore& ChunkStore,
/* out */ RemoteResult);
if (!RemoteResult.IsError())
{
+ Info.OplogSizeBytes = OplogContainerObject.GetSize();
+
if (IsCancelled(OptionalContext))
{
RemoteProjectStore::Result Result = {.ErrorCode = 0,
@@ -1341,7 +1424,11 @@ SaveOplog(CidStore& ChunkStore,
uint64_t ChunkCount = OplogContainerObject["chunks"sv].AsArrayView().Num();
uint64_t BlockCount = OplogContainerObject["blocks"sv].AsArrayView().Num();
- ReportMessage(OptionalContext, fmt::format("Saving oplog container with {} attachments and {} blocks...", ChunkCount, BlockCount));
+ ReportMessage(OptionalContext,
+ fmt::format("Saving oplog container '{}' with {} attachments and {} blocks...",
+ RemoteStoreInfo.ContainerName,
+ ChunkCount,
+ BlockCount));
RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer());
if (ContainerSaveResult.ErrorCode)
@@ -1352,7 +1439,10 @@ SaveOplog(CidStore& ChunkStore,
}
else
{
- ZEN_DEBUG("Saved container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000)));
+ ReportMessage(OptionalContext,
+ fmt::format("Saved container '{}' in {}",
+ RemoteStoreInfo.ContainerName,
+ NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000))));
}
UploadAttachments(WorkerPool,
@@ -1364,6 +1454,7 @@ SaveOplog(CidStore& ChunkStore,
TempAttachments,
ContainerSaveResult.Needs,
ForceUpload,
+ Info,
RemoteResult,
OptionalContext);
@@ -1390,7 +1481,11 @@ SaveOplog(CidStore& ChunkStore,
RemoteProjectStore::Result Result = RemoteResult.ConvertResult();
return Result;
}
- ZEN_DEBUG("Finalized container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000)));
+ ReportMessage(OptionalContext,
+ fmt::format("Finalized container '{}' in {}",
+ RemoteStoreInfo.ContainerName,
+ NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000))));
+
if (ContainerFinalizeResult.Needs.empty())
{
break;
@@ -1405,7 +1500,9 @@ SaveOplog(CidStore& ChunkStore,
}
ReportMessage(OptionalContext,
- fmt::format("Finalize reported {} missing attachments...", ContainerFinalizeResult.Needs.size()));
+ fmt::format("Finalize of container '{}' reported {} missing attachments. Uploading missing attachements.",
+ RemoteStoreInfo.ContainerName,
+ ContainerFinalizeResult.Needs.size()));
UploadAttachments(WorkerPool,
ChunkStore,
@@ -1416,6 +1513,7 @@ SaveOplog(CidStore& ChunkStore,
TempAttachments,
ContainerFinalizeResult.Needs,
false,
+ Info,
RemoteResult,
OptionalContext);
}
@@ -1425,9 +1523,18 @@ SaveOplog(CidStore& ChunkStore,
}
RemoteProjectStore::Result Result = RemoteResult.ConvertResult();
Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
- ZEN_INFO("Saved oplog {} in {}",
- RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+
+ ReportMessage(OptionalContext,
+ fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({})",
+ RemoteStoreInfo.ContainerName,
+ RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
+ NiceBytes(Info.OplogSizeBytes),
+ Info.AttachmentBlocksUploaded.load(),
+ NiceBytes(Info.AttachmentBlockBytesUploaded.load()),
+ Info.AttachmentsUploaded.load(),
+ NiceBytes(Info.AttachmentBytesUploaded.load())));
+
return Result;
};
@@ -1587,6 +1694,9 @@ LoadOplog(CidStore& ChunkStore,
std::unordered_set<IoHash, IoHash::Hasher> Attachments;
std::vector<std::vector<IoHash>> ChunksInBlocks;
+ RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo();
+ ReportMessage(OptionalContext, fmt::format("Loading oplog container '{}'", RemoteStoreInfo.ContainerName));
+
RemoteProjectStore::LoadContainerResult LoadContainerResult = RemoteStore.LoadContainer();
if (LoadContainerResult.ErrorCode)
{
@@ -1828,7 +1938,7 @@ LoadOplog(CidStore& ChunkStore,
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
}
ReportMessage(OptionalContext,
- fmt::format("Loaded oplog container in {}, found {} attachments to download",
+ fmt::format("Wrote oplog in {}, found {} attachments to download",
NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
Attachments.size()));
@@ -1856,7 +1966,8 @@ LoadOplog(CidStore& ChunkStore,
Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
ReportMessage(OptionalContext,
- fmt::format("Loaded oplog {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {}",
+ fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {}",
+ RemoteStoreInfo.ContainerName,
RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
NiceBytes(Info.OplogSizeBytes),
diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h
index f4df78f8c..7f0cb0ebb 100644
--- a/src/zenserver/projectstore/remoteprojectstore.h
+++ b/src/zenserver/projectstore/remoteprojectstore.h
@@ -61,6 +61,8 @@ public:
{
bool CreateBlocks;
bool UseTempBlockFiles;
+ std::string ContainerName;
+ std::string BaseContainerName;
std::string Description;
};
@@ -116,8 +118,6 @@ RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore,
size_t MaxBlockSize,
size_t MaxChunkEmbedSize,
bool EmbedLooseFiles,
- bool BuildBlocks,
- bool UseTempBlocks,
bool ForceUpload,
bool IgnoreMissingAttachments,
JobContext* OptionalContext);
diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp
index 7823010b5..95fcc9e21 100644
--- a/src/zenserver/projectstore/zenremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp
@@ -38,7 +38,11 @@ public:
virtual RemoteStoreInfo GetInfo() const override
{
- return {.CreateBlocks = false, .UseTempBlockFiles = false, .Description = fmt::format("[zen] {}"sv, m_HostAddress)};
+ return {.CreateBlocks = false,
+ .UseTempBlockFiles = false,
+ .ContainerName = fmt::format("{}/{}", m_Project, m_Oplog),
+ .BaseContainerName = "",
+ .Description = fmt::format("[zen] {}"sv, m_HostAddress)};
}
virtual SaveResult SaveContainer(const IoBuffer& Payload) override
diff --git a/src/zenserver/upstream/jupiter.cpp b/src/zenserver/upstream/jupiter.cpp
index fe4d31604..c77834657 100644
--- a/src/zenserver/upstream/jupiter.cpp
+++ b/src/zenserver/upstream/jupiter.cpp
@@ -71,18 +71,13 @@ CloudCacheSession::Authenticate()
}
CloudCacheResult
-CloudCacheSession::GetRef(std::string_view Namespace,
- std::string_view BucketId,
- const IoHash& Key,
- ZenContentType RefType,
- std::filesystem::path TempFolderPath)
+CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType)
{
ZEN_TRACE_CPU("JupiterClient::GetRef");
HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
- TempFolderPath,
- {HttpClient::Accept(RefType)});
+ m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
+ {HttpClient::Accept(RefType)});
return detail::ConvertResponse(Response, "CloudCacheSession::GetRef"sv);
}
diff --git a/src/zenserver/upstream/jupiter.h b/src/zenserver/upstream/jupiter.h
index 93f2cc883..cfe8f6186 100644
--- a/src/zenserver/upstream/jupiter.h
+++ b/src/zenserver/upstream/jupiter.h
@@ -94,11 +94,7 @@ public:
~CloudCacheSession();
CloudCacheResult Authenticate();
- CloudCacheResult GetRef(std::string_view Namespace,
- std::string_view BucketId,
- const IoHash& Key,
- ZenContentType RefType,
- std::filesystem::path TempFolderPath = {});
+ CloudCacheResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType);
CloudCacheResult GetBlob(std::string_view Namespace, const IoHash& Key);
CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath = {});
CloudCacheResult GetObject(std::string_view Namespace, const IoHash& Key);