diff options
Diffstat (limited to 'src/zenserver/projectstore/remoteprojectstore.cpp')
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 172 |
1 files changed, 120 insertions, 52 deletions
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index cc9385f5e..65ef099e4 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -2094,13 +2094,13 @@ SaveOplog(CidStore& ChunkStore, }; RemoteProjectStore::Result -SaveOplogContainer(ProjectStore::Oplog& Oplog, - const CbObject& ContainerObject, - const std::function<bool(const IoHash& RawHash)>& HasAttachment, - const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, - const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, - const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment, - JobContext* OptionalContext) +ParseOplogContainer(const CbObject& ContainerObject, + const std::function<bool(const IoHash& RawHash)>& HasAttachment, + const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, + const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, + const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment, + CbObject& OutOplogSection, + JobContext* OptionalContext) { using namespace std::literals; @@ -2211,64 +2211,104 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, "Section has unexpected data type", "Failed to save oplog container"}; } + OutOplogSection = SectionObject; + } + return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0}; +} - CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); - const uint64_t OpCount = OpsArray.Num(); +static RemoteProjectStore::Result +WriteOplogSection(ProjectStore::Oplog& Oplog, const CbObjectView& SectionObject, JobContext* OptionalContext) +{ + using namespace std::literals; - ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpCount)); + Stopwatch Timer; - const size_t OpsBatchSize = 8192; - std::vector<uint8_t> OpsData; - std::vector<size_t> OpDataOffsets; - size_t OpsCompleteCount = 0; + CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); + const uint64_t OpCount = OpsArray.Num(); - OpsData.reserve(OpsBatchSize); + ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpCount)); - auto AppendBatch = [&]() { - std::vector<CbObjectView> Ops; - Ops.reserve(OpDataOffsets.size()); - for (size_t OpDataOffset : OpDataOffsets) - { - Ops.emplace_back(CbObjectView(&OpsData[OpDataOffset])); - } - std::vector<uint32_t> OpLsns = Oplog.AppendNewOplogEntries(Ops); - OpsCompleteCount += OpLsns.size(); - OpsData.clear(); - OpDataOffsets.clear(); - ReportProgress(OptionalContext, - fmt::format("Writing oplog, {} remaining...", OpCount - OpsCompleteCount), - OpCount, - OpCount - OpsCompleteCount); - }; + const size_t OpsBatchSize = 8192; + std::vector<uint8_t> OpsData; + std::vector<size_t> OpDataOffsets; + size_t OpsCompleteCount = 0; - BinaryWriter Writer; - for (CbFieldView OpEntry : OpsArray) - { - CbObjectView Op = OpEntry.AsObjectView(); - Op.CopyTo(Writer); - OpDataOffsets.push_back(OpsData.size()); - OpsData.insert(OpsData.end(), (const uint8_t*)Writer.GetData(), ((const uint8_t*)Writer.GetData()) + Writer.GetSize()); - Writer.Reset(); + OpsData.reserve(OpsBatchSize); - if (OpDataOffsets.size() == OpsBatchSize) - { - AppendBatch(); - } + auto AppendBatch = [&]() { + std::vector<CbObjectView> Ops; + Ops.reserve(OpDataOffsets.size()); + for (size_t OpDataOffset : OpDataOffsets) + { + Ops.emplace_back(CbObjectView(&OpsData[OpDataOffset])); } - if (!OpDataOffsets.empty()) + std::vector<uint32_t> OpLsns = Oplog.AppendNewOplogEntries(Ops); + OpsCompleteCount += OpLsns.size(); + OpsData.clear(); + OpDataOffsets.clear(); + ReportProgress(OptionalContext, + fmt::format("Writing oplog, {} remaining...", OpCount - OpsCompleteCount), + OpCount, + OpCount - OpsCompleteCount); + }; + + BinaryWriter Writer; + for (CbFieldView OpEntry : OpsArray) + { + CbObjectView Op = OpEntry.AsObjectView(); + Op.CopyTo(Writer); + OpDataOffsets.push_back(OpsData.size()); + OpsData.insert(OpsData.end(), (const uint8_t*)Writer.GetData(), ((const uint8_t*)Writer.GetData()) + Writer.GetSize()); + Writer.Reset(); + + if (OpDataOffsets.size() == OpsBatchSize) { AppendBatch(); } } + if (!OpDataOffsets.empty()) + { + AppendBatch(); + } return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0}; } RemoteProjectStore::Result +SaveOplogContainer(ProjectStore::Oplog& Oplog, + const CbObject& ContainerObject, + const std::function<bool(const IoHash& RawHash)>& HasAttachment, + const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, + const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, + const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment, + JobContext* OptionalContext) +{ + using namespace std::literals; + + Stopwatch Timer; + CbObject OplogSection; + RemoteProjectStore::Result Result = ParseOplogContainer(ContainerObject, + HasAttachment, + OnNeedBlock, + OnNeedAttachment, + OnChunkedAttachment, + OplogSection, + OptionalContext); + if (Result.ErrorCode != 0) + { + return Result; + } + Result = WriteOplogSection(Oplog, OplogSection, OptionalContext); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; + return Result; +} + +RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload, bool IgnoreMissingAttachments, + bool CleanOplog, JobContext* OptionalContext) { using namespace std::literals; @@ -2338,6 +2378,7 @@ LoadOplog(CidStore& ChunkStore, return false; }; auto OnNeedBlock = [&RemoteStore, + &Oplog, &ChunkStore, &NetworkWorkerPool, &AttachmentsWorkLatch, @@ -2353,6 +2394,9 @@ LoadOplog(CidStore& ChunkStore, { return; } + + Oplog.CaptureAddedAttachments(std::vector<IoHash>{Chunks}); + BlockCountToDownload++; if (BlockHash == IoHash::Zero) { @@ -2495,6 +2539,7 @@ LoadOplog(CidStore& ChunkStore, }; auto OnNeedAttachment = [&RemoteStore, + &Oplog, &ChunkStore, &NetworkWorkerPool, &AttachmentsWorkLatch, @@ -2515,6 +2560,8 @@ LoadOplog(CidStore& ChunkStore, return; } + Oplog.CaptureAddedAttachments(std::vector<IoHash>{RawHash}); + AttachmentsWorkLatch.AddCount(1); AttachmentCount.fetch_add(1); NetworkWorkerPool.ScheduleWork([&RemoteStore, @@ -2571,6 +2618,7 @@ LoadOplog(CidStore& ChunkStore, std::vector<ChunkedInfo> FilesToDechunk; auto OnChunkedAttachment = [&Oplog, &ChunkStore, &FilesToDechunk, ForceDownload](const ChunkedInfo& Chunked) { + Oplog.CaptureAddedAttachments(std::vector<IoHash>{Chunked.RawHash}); if (ForceDownload || !ChunkStore.ContainsChunk(Chunked.RawHash)) { Oplog.CaptureAddedAttachments(Chunked.ChunkHashes); @@ -2578,19 +2626,21 @@ LoadOplog(CidStore& ChunkStore, } }; - RemoteProjectStore::Result Result = SaveOplogContainer(Oplog, - LoadContainerResult.ContainerObject, - HasAttachment, - OnNeedBlock, - OnNeedAttachment, - OnChunkedAttachment, - OptionalContext); + CbObject OplogSection; + RemoteProjectStore::Result Result = // SaveOplogContainer(Oplog, + ParseOplogContainer(LoadContainerResult.ContainerObject, + HasAttachment, + OnNeedBlock, + OnNeedAttachment, + OnChunkedAttachment, + OplogSection, + OptionalContext); if (Result.ErrorCode != 0) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); } ReportMessage(OptionalContext, - fmt::format("Wrote oplog in {}, found {} attachments, {} blocks and {} chunked files to download", + fmt::format("Parsed oplog in {}, found {} attachments, {} blocks and {} chunked files to download", NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), Attachments.size(), BlockCountToDownload, @@ -2741,6 +2791,24 @@ LoadOplog(CidStore& ChunkStore, Result = RemoteResult.ConvertResult(); } + if (Result.ErrorCode == 0) + { + if (CleanOplog) + { + if (!Oplog.Reset()) + { + Result = RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::InternalServerError), + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, + .Reason = fmt::format("Failed to clean existing oplog '{}'", Oplog.OplogId())}; + ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Reason)); + } + } + if (Result.ErrorCode == 0) + { + WriteOplogSection(Oplog, OplogSection, OptionalContext); + } + } + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; ReportRemoteStoreStats(OptionalContext, RemoteStoreInfo, RemoteStore.GetStats(), TransferWallTimeMS); |