aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore/remoteprojectstore.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/projectstore/remoteprojectstore.cpp')
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp172
1 files changed, 120 insertions, 52 deletions
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index cc9385f5e..65ef099e4 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -2094,13 +2094,13 @@ SaveOplog(CidStore& ChunkStore,
};
RemoteProjectStore::Result
-SaveOplogContainer(ProjectStore::Oplog& Oplog,
- const CbObject& ContainerObject,
- const std::function<bool(const IoHash& RawHash)>& HasAttachment,
- const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
- const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
- const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment,
- JobContext* OptionalContext)
+ParseOplogContainer(const CbObject& ContainerObject,
+ const std::function<bool(const IoHash& RawHash)>& HasAttachment,
+ const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
+ const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
+ const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment,
+ CbObject& OutOplogSection,
+ JobContext* OptionalContext)
{
using namespace std::literals;
@@ -2211,64 +2211,104 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
"Section has unexpected data type",
"Failed to save oplog container"};
}
+ OutOplogSection = SectionObject;
+ }
+ return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0};
+}
- CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView();
- const uint64_t OpCount = OpsArray.Num();
+static RemoteProjectStore::Result
+WriteOplogSection(ProjectStore::Oplog& Oplog, const CbObjectView& SectionObject, JobContext* OptionalContext)
+{
+ using namespace std::literals;
- ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpCount));
+ Stopwatch Timer;
- const size_t OpsBatchSize = 8192;
- std::vector<uint8_t> OpsData;
- std::vector<size_t> OpDataOffsets;
- size_t OpsCompleteCount = 0;
+ CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView();
+ const uint64_t OpCount = OpsArray.Num();
- OpsData.reserve(OpsBatchSize);
+ ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpCount));
- auto AppendBatch = [&]() {
- std::vector<CbObjectView> Ops;
- Ops.reserve(OpDataOffsets.size());
- for (size_t OpDataOffset : OpDataOffsets)
- {
- Ops.emplace_back(CbObjectView(&OpsData[OpDataOffset]));
- }
- std::vector<uint32_t> OpLsns = Oplog.AppendNewOplogEntries(Ops);
- OpsCompleteCount += OpLsns.size();
- OpsData.clear();
- OpDataOffsets.clear();
- ReportProgress(OptionalContext,
- fmt::format("Writing oplog, {} remaining...", OpCount - OpsCompleteCount),
- OpCount,
- OpCount - OpsCompleteCount);
- };
+ const size_t OpsBatchSize = 8192;
+ std::vector<uint8_t> OpsData;
+ std::vector<size_t> OpDataOffsets;
+ size_t OpsCompleteCount = 0;
- BinaryWriter Writer;
- for (CbFieldView OpEntry : OpsArray)
- {
- CbObjectView Op = OpEntry.AsObjectView();
- Op.CopyTo(Writer);
- OpDataOffsets.push_back(OpsData.size());
- OpsData.insert(OpsData.end(), (const uint8_t*)Writer.GetData(), ((const uint8_t*)Writer.GetData()) + Writer.GetSize());
- Writer.Reset();
+ OpsData.reserve(OpsBatchSize);
- if (OpDataOffsets.size() == OpsBatchSize)
- {
- AppendBatch();
- }
+ auto AppendBatch = [&]() {
+ std::vector<CbObjectView> Ops;
+ Ops.reserve(OpDataOffsets.size());
+ for (size_t OpDataOffset : OpDataOffsets)
+ {
+ Ops.emplace_back(CbObjectView(&OpsData[OpDataOffset]));
}
- if (!OpDataOffsets.empty())
+ std::vector<uint32_t> OpLsns = Oplog.AppendNewOplogEntries(Ops);
+ OpsCompleteCount += OpLsns.size();
+ OpsData.clear();
+ OpDataOffsets.clear();
+ ReportProgress(OptionalContext,
+ fmt::format("Writing oplog, {} remaining...", OpCount - OpsCompleteCount),
+ OpCount,
+ OpCount - OpsCompleteCount);
+ };
+
+ BinaryWriter Writer;
+ for (CbFieldView OpEntry : OpsArray)
+ {
+ CbObjectView Op = OpEntry.AsObjectView();
+ Op.CopyTo(Writer);
+ OpDataOffsets.push_back(OpsData.size());
+ OpsData.insert(OpsData.end(), (const uint8_t*)Writer.GetData(), ((const uint8_t*)Writer.GetData()) + Writer.GetSize());
+ Writer.Reset();
+
+ if (OpDataOffsets.size() == OpsBatchSize)
{
AppendBatch();
}
}
+ if (!OpDataOffsets.empty())
+ {
+ AppendBatch();
+ }
return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0};
}
RemoteProjectStore::Result
+SaveOplogContainer(ProjectStore::Oplog& Oplog,
+ const CbObject& ContainerObject,
+ const std::function<bool(const IoHash& RawHash)>& HasAttachment,
+ const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
+ const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
+ const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment,
+ JobContext* OptionalContext)
+{
+ using namespace std::literals;
+
+ Stopwatch Timer;
+ CbObject OplogSection;
+ RemoteProjectStore::Result Result = ParseOplogContainer(ContainerObject,
+ HasAttachment,
+ OnNeedBlock,
+ OnNeedAttachment,
+ OnChunkedAttachment,
+ OplogSection,
+ OptionalContext);
+ if (Result.ErrorCode != 0)
+ {
+ return Result;
+ }
+ Result = WriteOplogSection(Oplog, OplogSection, OptionalContext);
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
+ return Result;
+}
+
+RemoteProjectStore::Result
LoadOplog(CidStore& ChunkStore,
RemoteProjectStore& RemoteStore,
ProjectStore::Oplog& Oplog,
bool ForceDownload,
bool IgnoreMissingAttachments,
+ bool CleanOplog,
JobContext* OptionalContext)
{
using namespace std::literals;
@@ -2338,6 +2378,7 @@ LoadOplog(CidStore& ChunkStore,
return false;
};
auto OnNeedBlock = [&RemoteStore,
+ &Oplog,
&ChunkStore,
&NetworkWorkerPool,
&AttachmentsWorkLatch,
@@ -2353,6 +2394,9 @@ LoadOplog(CidStore& ChunkStore,
{
return;
}
+
+ Oplog.CaptureAddedAttachments(std::vector<IoHash>{Chunks});
+
BlockCountToDownload++;
if (BlockHash == IoHash::Zero)
{
@@ -2495,6 +2539,7 @@ LoadOplog(CidStore& ChunkStore,
};
auto OnNeedAttachment = [&RemoteStore,
+ &Oplog,
&ChunkStore,
&NetworkWorkerPool,
&AttachmentsWorkLatch,
@@ -2515,6 +2560,8 @@ LoadOplog(CidStore& ChunkStore,
return;
}
+ Oplog.CaptureAddedAttachments(std::vector<IoHash>{RawHash});
+
AttachmentsWorkLatch.AddCount(1);
AttachmentCount.fetch_add(1);
NetworkWorkerPool.ScheduleWork([&RemoteStore,
@@ -2571,6 +2618,7 @@ LoadOplog(CidStore& ChunkStore,
std::vector<ChunkedInfo> FilesToDechunk;
auto OnChunkedAttachment = [&Oplog, &ChunkStore, &FilesToDechunk, ForceDownload](const ChunkedInfo& Chunked) {
+ Oplog.CaptureAddedAttachments(std::vector<IoHash>{Chunked.RawHash});
if (ForceDownload || !ChunkStore.ContainsChunk(Chunked.RawHash))
{
Oplog.CaptureAddedAttachments(Chunked.ChunkHashes);
@@ -2578,19 +2626,21 @@ LoadOplog(CidStore& ChunkStore,
}
};
- RemoteProjectStore::Result Result = SaveOplogContainer(Oplog,
- LoadContainerResult.ContainerObject,
- HasAttachment,
- OnNeedBlock,
- OnNeedAttachment,
- OnChunkedAttachment,
- OptionalContext);
+ CbObject OplogSection;
+ RemoteProjectStore::Result Result = // SaveOplogContainer(Oplog,
+ ParseOplogContainer(LoadContainerResult.ContainerObject,
+ HasAttachment,
+ OnNeedBlock,
+ OnNeedAttachment,
+ OnChunkedAttachment,
+ OplogSection,
+ OptionalContext);
if (Result.ErrorCode != 0)
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
}
ReportMessage(OptionalContext,
- fmt::format("Wrote oplog in {}, found {} attachments, {} blocks and {} chunked files to download",
+ fmt::format("Parsed oplog in {}, found {} attachments, {} blocks and {} chunked files to download",
NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
Attachments.size(),
BlockCountToDownload,
@@ -2741,6 +2791,24 @@ LoadOplog(CidStore& ChunkStore,
Result = RemoteResult.ConvertResult();
}
+ if (Result.ErrorCode == 0)
+ {
+ if (CleanOplog)
+ {
+ if (!Oplog.Reset())
+ {
+ Result = RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
+ .Reason = fmt::format("Failed to clean existing oplog '{}'", Oplog.OplogId())};
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Reason));
+ }
+ }
+ if (Result.ErrorCode == 0)
+ {
+ WriteOplogSection(Oplog, OplogSection, OptionalContext);
+ }
+ }
+
Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
ReportRemoteStoreStats(OptionalContext, RemoteStoreInfo, RemoteStore.GetStats(), TransferWallTimeMS);