aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-01-23 10:21:03 +0100
committerGitHub <[email protected]>2024-01-23 10:21:03 +0100
commitff8d2e7c432c58114b528bfc9670eba7e387843c (patch)
tree5dbb80fbab73835047794a56a76d45220b33571c /src
parentadd --ignore-missing-attachments to oplog-import command (#637) (diff)
downloadzen-ff8d2e7c432c58114b528bfc9670eba7e387843c.tar.xz
zen-ff8d2e7c432c58114b528bfc9670eba7e387843c.zip
oplog import/export improvements (#634)
* improve feedback from oplog import/export * improve oplog save performance
Diffstat (limited to 'src')
-rw-r--r--src/zencore/stream.cpp5
-rw-r--r--src/zenserver/projectstore/projectstore.cpp116
-rw-r--r--src/zenserver/projectstore/projectstore.h12
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp464
4 files changed, 371 insertions, 226 deletions
diff --git a/src/zencore/stream.cpp b/src/zencore/stream.cpp
index ee97a53c4..13c90fd92 100644
--- a/src/zencore/stream.cpp
+++ b/src/zencore/stream.cpp
@@ -25,8 +25,9 @@ BinaryWriter::Write(std::initializer_list<const MemoryView> Buffers)
}
for (const MemoryView& View : Buffers)
{
- memcpy(m_Buffer.data() + m_Offset, View.GetData(), View.GetSize());
- m_Offset += View.GetSize();
+ size_t Size = View.GetSize();
+ memcpy(m_Buffer.data() + m_Offset, View.GetData(), Size);
+ m_Offset += Size;
}
}
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index e37fb26f4..42af9b79b 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -228,7 +228,7 @@ namespace {
return {static_cast<HttpResponseCode>(Result.ErrorCode),
Result.Reason.empty() ? Result.Text
: Result.Text.empty() ? Result.Reason
- : fmt::format("{}. Reason: '{}'", Result.Text, Result.Reason)};
+ : fmt::format("{}: {}", Result.Reason, Result.Text)};
}
} // namespace
@@ -448,7 +448,7 @@ struct ProjectStore::OplogStorage : public RefCounted
return CbObject(SharedBuffer(std::move(OpBuffer)));
}
- OplogEntry AppendOp(SharedBuffer Buffer, uint32_t OpCoreHash, Oid KeyHash)
+ OplogEntry AppendOp(MemoryView Buffer, uint32_t OpCoreHash, Oid KeyHash)
{
ZEN_TRACE_CPU("Store::OplogStorage::AppendOp");
@@ -765,7 +765,7 @@ ProjectStore::Oplog::ReplayLog()
}
IoBuffer
-ProjectStore::Oplog::FindChunk(Oid ChunkId)
+ProjectStore::Oplog::FindChunk(const Oid& ChunkId)
{
RwLock::SharedLockScope OplogLock(m_OplogLock);
if (!m_Storage)
@@ -1044,8 +1044,8 @@ ProjectStore::Oplog::AddChunkMappings(const std::unordered_map<Oid, IoHash, Oid:
void
ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&,
- Oid FileId,
- IoHash Hash,
+ const Oid& FileId,
+ const IoHash& Hash,
std::string_view ServerPath,
std::string_view ClientPath)
{
@@ -1066,13 +1066,13 @@ ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&,
}
void
-ProjectStore::Oplog::AddChunkMapping(const RwLock::ExclusiveLockScope&, Oid ChunkId, IoHash Hash)
+ProjectStore::Oplog::AddChunkMapping(const RwLock::ExclusiveLockScope&, const Oid& ChunkId, const IoHash& Hash)
{
m_ChunkMap.insert_or_assign(ChunkId, Hash);
}
void
-ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, Oid ChunkId, IoHash Hash)
+ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, const Oid& ChunkId, const IoHash& Hash)
{
m_MetaMap.insert_or_assign(ChunkId, Hash);
}
@@ -1244,7 +1244,7 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
}
uint32_t
-ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core)
+ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core)
{
ZEN_TRACE_CPU("Store::Oplog::AppendNewOplogEntry");
@@ -1252,7 +1252,7 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core)
OplogEntryMapping Mapping = GetMapping(Core);
- SharedBuffer Buffer = Core.GetBuffer();
+ MemoryView Buffer = Core.GetView();
const uint64_t WriteSize = Buffer.GetSize();
const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF);
@@ -3129,37 +3129,36 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op
NiceBytes(MaxBlockSize),
NiceBytes(MaxChunkEmbedSize));
- JobId JobId =
- m_JobQueue.QueueJob(fmt::format("Export oplog '{}/{}' to {}", Project->Identifier, Oplog.OplogId(), StoreInfo.Description),
- [this,
- ActualRemoteStore = std::move(RemoteStore),
- Project,
- OplogPtr = &Oplog,
- MaxBlockSize,
- MaxChunkEmbedSize,
- EmbedLooseFile,
- CreateBlocks = StoreInfo.CreateBlocks,
- UseTempBlockFiles = StoreInfo.UseTempBlockFiles,
- Force](JobContext& Context) {
- RemoteProjectStore::Result Result = SaveOplog(m_CidStore,
- *ActualRemoteStore,
- *Project.Get(),
- *OplogPtr,
- MaxBlockSize,
- MaxChunkEmbedSize,
- EmbedLooseFile,
- CreateBlocks,
- UseTempBlockFiles,
- Force,
- &Context);
- auto Response = ConvertResult(Result);
- ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second);
- if (!IsHttpSuccessCode(Response.first))
- {
- throw std::runtime_error(
- fmt::format("Export failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second));
- }
- });
+ JobId JobId = m_JobQueue.QueueJob(
+ fmt::format("Export oplog '{}/{}' to {}", Project->Identifier, Oplog.OplogId(), StoreInfo.Description),
+ [this,
+ ActualRemoteStore = std::move(RemoteStore),
+ Project,
+ OplogPtr = &Oplog,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ EmbedLooseFile,
+ CreateBlocks = StoreInfo.CreateBlocks,
+ UseTempBlockFiles = StoreInfo.UseTempBlockFiles,
+ Force](JobContext& Context) {
+ RemoteProjectStore::Result Result = SaveOplog(m_CidStore,
+ *ActualRemoteStore,
+ *Project.Get(),
+ *OplogPtr,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ EmbedLooseFile,
+ CreateBlocks,
+ UseTempBlockFiles,
+ Force,
+ &Context);
+ auto Response = ConvertResult(Result);
+ ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second);
+ if (!IsHttpSuccessCode(Response.first))
+ {
+ throw std::runtime_error(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second);
+ }
+ });
return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)};
}
@@ -3186,25 +3185,24 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog,
RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description);
- JobId JobId =
- m_JobQueue.QueueJob(fmt::format("Import oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description),
- [this,
- ActualRemoteStore = std::move(RemoteStore),
- OplogPtr = &Oplog,
- MaxBlockSize,
- MaxChunkEmbedSize,
- Force,
- IgnoreMissingAttachments](JobContext& Context) {
- RemoteProjectStore::Result Result =
- LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, IgnoreMissingAttachments, &Context);
- auto Response = ConvertResult(Result);
- ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second);
- if (!IsHttpSuccessCode(Response.first))
- {
- throw std::runtime_error(
- fmt::format("Import failed. Status '{}'. Reason: '{}'", ToString(Response.first), Response.second));
- }
- });
+ JobId JobId = m_JobQueue.QueueJob(
+ fmt::format("Import oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description),
+ [this,
+ ActualRemoteStore = std::move(RemoteStore),
+ OplogPtr = &Oplog,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ Force,
+ IgnoreMissingAttachments](JobContext& Context) {
+ RemoteProjectStore::Result Result =
+ LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, IgnoreMissingAttachments, &Context);
+ auto Response = ConvertResult(Result);
+ ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second);
+ if (!IsHttpSuccessCode(Response.first))
+ {
+ throw std::runtime_error(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second);
+ }
+ });
return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)};
}
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index 5ebcd420c..f9611653b 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -99,7 +99,7 @@ public:
int GetOpIndexByKey(const Oid& Key);
int GetMaxOpIndex() const;
- IoBuffer FindChunk(Oid ChunkId);
+ IoBuffer FindChunk(const Oid& ChunkId);
inline static const uint32_t kInvalidOp = ~0u;
@@ -109,7 +109,7 @@ public:
*/
uint32_t AppendNewOplogEntry(CbPackage Op);
- uint32_t AppendNewOplogEntry(CbObject Core);
+ uint32_t AppendNewOplogEntry(CbObjectView Core);
enum UpdateType
{
@@ -202,12 +202,12 @@ public:
uint32_t RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, const OplogEntryMapping& OpMapping, const OplogEntry& OpEntry);
void AddFileMapping(const RwLock::ExclusiveLockScope& OplogLock,
- Oid FileId,
- IoHash Hash,
+ const Oid& FileId,
+ const IoHash& Hash,
std::string_view ServerPath,
std::string_view ClientPath);
- void AddChunkMapping(const RwLock::ExclusiveLockScope& OplogLock, Oid ChunkId, IoHash Hash);
- void AddMetaMapping(const RwLock::ExclusiveLockScope& OplogLock, Oid ChunkId, IoHash Hash);
+ void AddChunkMapping(const RwLock::ExclusiveLockScope& OplogLock, const Oid& ChunkId, const IoHash& Hash);
+ void AddMetaMapping(const RwLock::ExclusiveLockScope& OplogLock, const Oid& ChunkId, const IoHash& Hash);
friend class ProjectStoreReferenceChecker;
};
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index a8f4c5106..ddab7432d 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -82,7 +82,6 @@ ReportProgress(JobContext* OptionalContext, std::string_view CurrentOp, ptrdiff_
ZEN_ASSERT(Total > 0);
OptionalContext->ReportProgress(CurrentOp, gsl::narrow<uint32_t>((100 * (Total - Remaining)) / Total));
}
- ZEN_INFO("{}", CurrentOp);
}
void
@@ -319,7 +318,7 @@ BuildContainer(CidStore& ChunkStore,
{
return false;
}
- ZEN_WARN("Failed to create temp attachment '{}', reason: '{}', retries left: {}.",
+ ZEN_WARN("Failed to create temp attachment '{}': '{}', retries left: {}.",
AttachmentPath,
Ec.message(),
RetriesLeft);
@@ -560,7 +559,8 @@ BuildContainer(CidStore& ChunkStore,
Op.value().ToJson(Sb);
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {});
- ZEN_ERROR("Failed to build container ({}). Reason: '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason());
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to build container ({}): '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
BlockCreateLatch.CountDown();
while (!BlockCreateLatch.Wait(1000))
@@ -822,6 +822,7 @@ BuildContainer(CidStore& ChunkStore,
RemoteResult);
return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject};
}
+
void
UploadAttachments(WorkerThreadPool& WorkerPool,
CidStore& ChunkStore,
@@ -892,10 +893,11 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
"Invalid attachment",
fmt::format("Upload requested of unknown attachment '{}'", Needed));
- ZEN_ERROR("Failed to upload attachment '{}'. ({}). Reason: '{}'",
- Needed,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason());
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to upload attachment '{}'. ({}): '{}'",
+ Needed,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
return;
}
}
@@ -948,44 +950,47 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
SaveAttachmentsLatch.AddCount(1);
AttachmentsToSave++;
- WorkerPool.ScheduleWork(
- [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, RawHash, &CreatedBlocks, TempPayload = std::move(Payload)]() {
- auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- IoBuffer Payload = TempPayload ? TempPayload : ChunkStore.FindChunkByCid(RawHash);
- if (!Payload)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
- fmt::format("Failed to find attachment {}", RawHash),
- {});
- ZEN_WARN("Failed to save attachment '{}' ({}). Reason: '{}'",
- RawHash,
- RemoteResult.GetErrorReason(),
- RemoteResult.GetError());
- return;
- }
+ WorkerPool.ScheduleWork([&ChunkStore,
+ &RemoteStore,
+ &SaveAttachmentsLatch,
+ &RemoteResult,
+ RawHash,
+ &CreatedBlocks,
+ TempPayload = std::move(Payload),
+ OptionalContext]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ IoBuffer Payload = TempPayload ? TempPayload : ChunkStore.FindChunkByCid(RawHash);
+ if (!Payload)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Failed to find attachment {}", RawHash),
+ {});
+ ZEN_WARN("Failed to save attachment '{}' ({}): '{}'", RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason());
+ return;
+ }
- RemoteProjectStore::SaveAttachmentResult Result =
- RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash);
- if (Result.ErrorCode)
- {
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ZEN_WARN("Failed to save attachment '{}', {} ({}). Reason: '{}'",
- RawHash,
- NiceBytes(Payload.GetSize()),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason());
- return;
- }
- ZEN_DEBUG("Saved attachment {}, {} in {}",
- RawHash,
- NiceBytes(Payload.GetSize()),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to save attachment '{}', {} ({}): '{}'",
+ RawHash,
+ NiceBytes(Payload.GetSize()),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
return;
- });
+ }
+ ZEN_DEBUG("Saved attachment {}, {} in {}",
+ RawHash,
+ NiceBytes(Payload.GetSize()),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ return;
+ });
}
if (IsCancelled(OptionalContext))
@@ -1012,31 +1017,34 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
ZEN_ASSERT(Payload);
SaveAttachmentsLatch.AddCount(1);
AttachmentsToSave++;
- WorkerPool.ScheduleWork([&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash]() {
- auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
+ WorkerPool.ScheduleWork(
+ [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash, OptionalContext]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
- RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash);
- if (Result.ErrorCode)
- {
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ZEN_WARN("Failed to save attachment '{}', {} ({}). Reason: '{}'",
- RawHash,
- NiceBytes(Payload.GetSize()),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason());
- return;
- }
+ RemoteProjectStore::SaveAttachmentResult Result =
+ RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to save attachment '{}', {} ({}): '{}'",
+ RawHash,
+ NiceBytes(Payload.GetSize()),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ return;
+ }
- ZEN_DEBUG("Saved attachment {}, {} in {}",
- RawHash,
- NiceBytes(Payload.GetSize()),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
- return;
- });
+ ZEN_DEBUG("Saved attachment {}, {} in {}",
+ RawHash,
+ NiceBytes(Payload.GetSize()),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ return;
+ });
}
if (IsCancelled(OptionalContext))
@@ -1077,7 +1085,8 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
&RemoteResult,
&Chunks,
NeededChunks = std::move(NeededChunks),
- &BulkAttachmentCountToUpload]() {
+ &BulkAttachmentCountToUpload,
+ OptionalContext]() {
auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
std::vector<SharedBuffer> ChunkBuffers;
ChunkBuffers.reserve(NeededChunks.size());
@@ -1098,10 +1107,11 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
if (Result.ErrorCode)
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ZEN_WARN("Failed to save attachments with {} chunks ({}). Reason: '{}'",
- Chunks.size(),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason());
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to save attachments with {} chunks ({}): '{}'",
+ Chunks.size(),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
return;
}
ZEN_DEBUG("Saved {} bulk attachments in {}",
@@ -1182,10 +1192,7 @@ SaveOplog(CidStore& ChunkStore,
{
return false;
}
- ZEN_WARN("Failed to create temporary oplog block '{}', reason: '{}', retries left: {}.",
- BlockPath,
- Ec.message(),
- RetriesLeft);
+ ZEN_WARN("Failed to create temporary oplog block '{}': '{}', retries left: {}.", BlockPath, Ec.message(), RetriesLeft);
Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms
RetriesLeft--;
return true;
@@ -1217,12 +1224,13 @@ SaveOplog(CidStore& ChunkStore,
}
};
- auto UploadBlock = [&RemoteStore, &RemoteResult](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) {
+ auto UploadBlock = [&RemoteStore, &RemoteResult, OptionalContext](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) {
RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash);
if (Result.ErrorCode)
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ZEN_WARN("Failed to save attachment ({}). Reason: '{}'", RemoteResult.GetErrorReason(), RemoteResult.GetError());
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to save attachment ({}): '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return;
}
ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize()));
@@ -1262,9 +1270,10 @@ SaveOplog(CidStore& ChunkStore,
{
if (BaseContainerResult.ErrorCode)
{
- ZEN_WARN("Failed to load oplog base container, reason: '{}', error code: {}",
- BaseContainerResult.Reason,
- BaseContainerResult.ErrorCode);
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to load oplog base container: '{}', error code: {}",
+ BaseContainerResult.Reason,
+ BaseContainerResult.ErrorCode));
}
else
{
@@ -1327,7 +1336,8 @@ SaveOplog(CidStore& ChunkStore,
if (ContainerSaveResult.ErrorCode)
{
RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container");
- ZEN_WARN("Failed to save oplog container ({}). Reason: '{}'", RemoteResult.GetErrorReason(), RemoteResult.GetError());
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to save oplog container ({}): '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
}
else
{
@@ -1361,10 +1371,13 @@ SaveOplog(CidStore& ChunkStore,
if (ContainerFinalizeResult.ErrorCode)
{
RemoteResult.SetError(ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Reason, ContainerFinalizeResult.Text);
- ZEN_WARN("Failed to finalize oplog container {} ({}). Reason: '{}'",
- ContainerSaveResult.RawHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason());
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to finalize oplog container {} ({}): '{}'",
+ ContainerSaveResult.RawHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ RemoteProjectStore::Result Result = RemoteResult.ConvertResult();
+ return Result;
}
ZEN_DEBUG("Finalized container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000)));
if (ContainerFinalizeResult.Needs.empty())
@@ -1429,8 +1442,9 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
continue;
}
OnNeedAttachment(AttachmentHash);
+ NeedAttachmentCount++;
};
- ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachements", NeedAttachmentCount, LargeChunksArray.Num()));
+ ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num()));
size_t NeedBlockCount = 0;
CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
@@ -1470,6 +1484,7 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
}
OnNeedBlock(BlockHash, {});
+ NeedBlockCount++;
break;
}
};
@@ -1482,33 +1497,51 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
CbObject SectionObject = LoadCompactBinaryObject(SectionPayload);
if (!SectionObject)
{
- ZEN_WARN("Failed to save oplog container. Reason: '{}'", "Section has unexpected data type");
+ ReportMessage(OptionalContext, fmt::format("Failed to save oplog container: '{}'", "Section has unexpected data type"));
return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest),
Timer.GetElapsedTimeMs() / 1000.500,
"Section has unexpected data type",
"Failed to save oplog container"};
}
- CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView();
+ CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView();
+ const uint64_t OpCount = OpsArray.Num();
+ uint64_t OpsLoaded = 0;
ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpsArray.Num()));
+ BinaryWriter Writer;
for (CbFieldView OpEntry : OpsArray)
{
- CbObjectView Core = OpEntry.AsObjectView();
- BinaryWriter Writer;
- Core.CopyTo(Writer);
- MemoryView OpView = Writer.GetView();
- IoBuffer OpBuffer(IoBuffer::Wrap, OpView.GetData(), OpView.GetSize());
- CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType);
- const uint32_t OpLsn = Oplog.AppendNewOplogEntry(Op);
+ CbObjectView Op = OpEntry.AsObjectView();
+ Op.CopyTo(Writer);
+ CbObjectView TypedOp(Writer.GetData());
+ const uint32_t OpLsn = Oplog.AppendNewOplogEntry(TypedOp);
+ Writer.Reset();
if (OpLsn == ProjectStore::Oplog::kInvalidOp)
{
+ ReportMessage(OptionalContext, fmt::format("Failed to save op {}", OpsLoaded));
return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest),
Timer.GetElapsedTimeMs() / 1000.500,
"Failed saving op",
"Failed to save oplog container"};
}
ZEN_DEBUG("oplog entry #{}", OpLsn);
+ if (OpCount > 100000)
+ {
+ if (IsCancelled(OptionalContext))
+ {
+ return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::OK),
+ Timer.GetElapsedTimeMs() / 1000.500,
+ "Operation cancelled",
+ ""};
+ }
+ if (OpsLoaded % 10000 == 0)
+ {
+ ReportProgress(OptionalContext, "Writing oplog", OpCount, OpCount - OpsLoaded);
+ }
+ }
+ OpsLoaded++;
}
+ ReportProgress(OptionalContext, "Writing oplog", OpCount, 0);
return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500};
}
@@ -1522,6 +1555,20 @@ LoadOplog(CidStore& ChunkStore,
{
using namespace std::literals;
+ struct DownloadInfo
+ {
+ uint64_t OplogSizeBytes = 0;
+ std::atomic<uint64_t> AttachmentsDownloaded = 0;
+ std::atomic<uint64_t> AttachmentBlocksDownloaded = 0;
+ std::atomic<uint64_t> AttachmentBytesDownloaded = 0;
+ std::atomic<uint64_t> AttachmentBlockBytesDownloaded = 0;
+ std::atomic<uint64_t> AttachmentsStored = 0;
+ std::atomic<uint64_t> AttachmentBytesStored = 0;
+ std::atomic_size_t MissingAttachmentCount = 0;
+ };
+
+ DownloadInfo Info;
+
Stopwatch Timer;
WorkerThreadPool& WorkerPool = GetSmallWorkerPool();
@@ -1532,14 +1579,19 @@ LoadOplog(CidStore& ChunkStore,
RemoteProjectStore::LoadContainerResult LoadContainerResult = RemoteStore.LoadContainer();
if (LoadContainerResult.ErrorCode)
{
- ZEN_WARN("Failed to load oplog container, reason: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode);
+ ReportMessage(
+ OptionalContext,
+ fmt::format("Failed to load oplog container: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode));
return RemoteProjectStore::Result{.ErrorCode = LoadContainerResult.ErrorCode,
.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
.Reason = LoadContainerResult.Reason,
.Text = LoadContainerResult.Text};
}
ReportMessage(OptionalContext,
- fmt::format("Loaded container in {}", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000))));
+ fmt::format("Loaded container in {} ({})",
+ NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)),
+ NiceBytes(LoadContainerResult.ContainerObject.GetSize())));
+ Info.OplogSizeBytes = LoadContainerResult.ContainerObject.GetSize();
AsyncRemoteResult RemoteResult;
Latch AttachmentsWorkLatch(1);
@@ -1555,7 +1607,9 @@ LoadOplog(CidStore& ChunkStore,
&AttachmentsWorkLatch,
&AttachmentCount,
&RemoteResult,
- IgnoreMissingAttachments](const IoHash& BlockHash, std::vector<IoHash>&& Chunks) {
+ &Info,
+ IgnoreMissingAttachments,
+ OptionalContext](const IoHash& BlockHash, std::vector<IoHash>&& Chunks) {
if (RemoteResult.IsError())
{
return;
@@ -1564,39 +1618,70 @@ LoadOplog(CidStore& ChunkStore,
{
AttachmentsWorkLatch.AddCount(1);
AttachmentCount.fetch_add(1);
- WorkerPool.ScheduleWork(
- [&RemoteStore, &ChunkStore, &AttachmentsWorkLatch, &RemoteResult, Chunks = std::move(Chunks), IgnoreMissingAttachments]() {
- auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
- if (RemoteResult.IsError())
+ WorkerPool.ScheduleWork([&RemoteStore,
+ &ChunkStore,
+ &AttachmentsWorkLatch,
+ &RemoteResult,
+ Chunks = std::move(Chunks),
+ &Info,
+ IgnoreMissingAttachments,
+ OptionalContext]() {
+ auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks);
+ if (Result.ErrorCode)
+ {
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to load attachments with {} chunks ({}): '{}'",
+ Chunks.size(),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ if (IgnoreMissingAttachments)
{
- return;
+ Info.MissingAttachmentCount.fetch_add(1);
}
- RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks);
- if (Result.ErrorCode)
+ else
{
- ZEN_WARN("Failed to load attachments with {} chunks ({}). Reason: '{}'",
- Chunks.size(),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason());
- if (!IgnoreMissingAttachments)
- {
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- }
- return;
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
}
- ZEN_DEBUG("Loaded {} bulk attachments in {}",
- Chunks.size(),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
- for (const auto& It : Result.Chunks)
- {
+ return;
+ }
+ Info.AttachmentsDownloaded.fetch_add(Chunks.size());
+ ZEN_INFO("Loaded {} bulk attachments in {}",
+ Chunks.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ for (const auto& It : Result.Chunks)
+ {
+ uint64_t ChunkSize = It.second.GetCompressedSize();
+ Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
+ CidStore::InsertResult InsertResult =
ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), It.first, CidStore::InsertMode::kCopyOnly);
+ if (InsertResult.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(ChunkSize);
+ Info.AttachmentsStored.fetch_add(1);
}
- });
+ }
+ });
return;
}
AttachmentsWorkLatch.AddCount(1);
AttachmentCount.fetch_add(1);
- WorkerPool.ScheduleWork([&AttachmentsWorkLatch, &ChunkStore, &RemoteStore, BlockHash, &RemoteResult, IgnoreMissingAttachments]() {
+ WorkerPool.ScheduleWork([&AttachmentsWorkLatch,
+ &ChunkStore,
+ &RemoteStore,
+ BlockHash,
+ &RemoteResult,
+ &Info,
+ IgnoreMissingAttachments,
+ OptionalContext]() {
auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -1605,29 +1690,53 @@ LoadOplog(CidStore& ChunkStore,
RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash);
if (BlockResult.ErrorCode)
{
- ZEN_WARN("Failed to load oplog container, missing attachment {} ({}). Reason: '{}'",
- BlockHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason());
- if (!IgnoreMissingAttachments)
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to download block attachment {} ({}): '{}'",
+ BlockHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ if (IgnoreMissingAttachments)
+ {
+ Info.MissingAttachmentCount.fetch_add(1);
+ }
+ else
{
RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
}
return;
}
- ZEN_DEBUG("Loaded block attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)));
+ Info.AttachmentBlocksDownloaded.fetch_add(1);
+ ZEN_INFO("Loaded block attachment '{}' in {}",
+ BlockHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)));
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
- if (!IterateBlock(std::move(BlockResult.Bytes), [&ChunkStore](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
- ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash);
- }))
+ bool StoreChunksOK =
+ IterateBlock(std::move(BlockResult.Bytes), [&ChunkStore, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
+ uint64_t ChunkSize = Chunk.GetCompressedSize();
+ Info.AttachmentBlockBytesDownloaded.fetch_add(ChunkSize);
+ CidStore::InsertResult InsertResult =
+ ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash);
+ if (InsertResult.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(ChunkSize);
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ });
+
+ if (!StoreChunksOK)
{
+ ReportMessage(OptionalContext,
+ fmt::format("Block attachment {} has invalid format ({}): '{}'",
+ BlockHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
fmt::format("Invalid format for block {}", BlockHash),
{});
- ZEN_WARN("Failed to load oplog container, attachment {} has invalid format ({}). Reason: '{}'",
- BlockHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason());
return;
}
});
@@ -1640,44 +1749,74 @@ LoadOplog(CidStore& ChunkStore,
&RemoteResult,
&Attachments,
&AttachmentCount,
- IgnoreMissingAttachments](const IoHash& RawHash) {
+ &Info,
+ IgnoreMissingAttachments,
+ OptionalContext](const IoHash& RawHash) {
if (!Attachments.insert(RawHash).second)
{
return;
}
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
AttachmentsWorkLatch.AddCount(1);
AttachmentCount.fetch_add(1);
- WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash, IgnoreMissingAttachments]() {
- auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
- if (AttachmentResult.ErrorCode)
- {
- ZEN_WARN("Failed to download attachment {}, reason: '{}', error code: {}",
+ WorkerPool.ScheduleWork(
+ [&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash, &Info, IgnoreMissingAttachments, OptionalContext]() {
+ auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
+ if (AttachmentResult.ErrorCode)
+ {
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to download large attachment {}: '{}', error code : {}",
+ RawHash,
+ AttachmentResult.Reason,
+ AttachmentResult.ErrorCode));
+ if (IgnoreMissingAttachments)
+ {
+ Info.MissingAttachmentCount.fetch_add(1);
+ }
+ else
+ {
+ RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
+ }
+ return;
+ }
+ ZEN_INFO("Loaded large attachment '{}' in {}",
RawHash,
- AttachmentResult.Reason,
- AttachmentResult.ErrorCode);
- if (!IgnoreMissingAttachments)
+ NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)));
+ Info.AttachmentsDownloaded.fetch_add(1);
+ if (RemoteResult.IsError())
{
- RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
+ return;
}
- return;
- }
- ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)));
- ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash);
- });
+ uint64_t ChunkSize = AttachmentResult.Bytes.GetSize();
+ Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
+ CidStore::InsertResult InsertResult = ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash);
+ if (InsertResult.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(ChunkSize);
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ });
};
RemoteProjectStore::Result Result =
SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, OptionalContext);
- if (!Attachments.empty())
+ if (Result.ErrorCode != 0)
{
- ReportMessage(OptionalContext, fmt::format("Found {} attachments to download", Attachments.size()));
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
}
+ ReportMessage(OptionalContext,
+ fmt::format("Loaded oplog container in {}, found {} attachments to download",
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
+ Attachments.size()));
AttachmentsWorkLatch.CountDown();
while (!AttachmentsWorkLatch.Wait(1000))
@@ -1703,10 +1842,17 @@ LoadOplog(CidStore& ChunkStore,
Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
ReportMessage(OptionalContext,
- fmt::format("Loaded oplog {} in {}",
+ fmt::format("Loaded oplog {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {}",
RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0))));
-
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
+ NiceBytes(Info.OplogSizeBytes),
+ Info.AttachmentBlocksDownloaded.load(),
+ NiceBytes(Info.AttachmentBlockBytesDownloaded.load()),
+ Info.AttachmentsDownloaded.load(),
+ NiceBytes(Info.AttachmentBytesDownloaded.load()),
+ NiceBytes(Info.AttachmentsStored.load()),
+ NiceBytes(Info.AttachmentBytesStored.load()),
+ Info.MissingAttachmentCount.load()));
return Result;
}