diff options
Diffstat (limited to 'src/zenserver/projectstore/remoteprojectstore.cpp')
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 162 |
1 files changed, 90 insertions, 72 deletions
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index e922fcf1c..0aa8df362 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -1928,7 +1928,6 @@ SaveOplog(CidStore& ChunkStore, RemoteStoreInfo.CreateBlocks, IgnoreMissingAttachments, RemoteStoreInfo.AllowChunking, - KnownBlocks, WorkerPool, OnBlock, @@ -2103,6 +2102,7 @@ SaveOplog(CidStore& ChunkStore, RemoteProjectStore::Result ParseOplogContainer(const CbObject& ContainerObject, + const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments, const std::function<bool(const IoHash& RawHash)>& HasAttachment, const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, @@ -2114,6 +2114,71 @@ ParseOplogContainer(const CbObject& ContainerObject, Stopwatch Timer; + MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView(); + IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize()); + IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer(); + + { + CbObject SectionObject = LoadCompactBinaryObject(SectionPayload); + if (!SectionObject) + { + ReportMessage(OptionalContext, fmt::format("Failed to save oplog container: '{}'", "Section has unexpected data type")); + return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest), + Timer.GetElapsedTimeMs() / 1000.0, + "Section has unexpected data type", + "Failed to save oplog container"}; + } + OutOplogSection = SectionObject; + } + std::unordered_set<IoHash, IoHash::Hasher> OpsAttachments; + { + CbArrayView OpsArray = OutOplogSection["ops"sv].AsArrayView(); + for (CbFieldView OpEntry : OpsArray) + { + OpEntry.IterateAttachments([&](CbFieldView FieldView) { OpsAttachments.insert(FieldView.AsAttachment()); }); + } + } + { + std::vector<IoHash> ReferencedAttachments(OpsAttachments.begin(), OpsAttachments.end()); + OnReferencedAttachments(ReferencedAttachments); + } + ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", OpsAttachments.size())); + + CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView(); + for (CbFieldView ChunkedFileField : ChunkedFilesArray) + { + CbObjectView ChunkedFileView = ChunkedFileField.AsObjectView(); + IoHash RawHash = ChunkedFileView["rawhash"sv].AsHash(); + if (OpsAttachments.contains(RawHash) && (!HasAttachment(RawHash))) + { + ChunkedInfo Chunked; + Chunked.RawHash = RawHash; + Chunked.RawSize = ChunkedFileView["rawsize"sv].AsUInt64(); + CbArrayView ChunksArray = ChunkedFileView["chunks"sv].AsArrayView(); + Chunked.ChunkHashes.reserve(ChunksArray.Num()); + for (CbFieldView ChunkField : ChunksArray) + { + const IoHash ChunkHash = ChunkField.AsHash(); + Chunked.ChunkHashes.emplace_back(ChunkHash); + } + OnReferencedAttachments(Chunked.ChunkHashes); + OpsAttachments.insert(Chunked.ChunkHashes.begin(), Chunked.ChunkHashes.end()); + CbArrayView SequenceArray = ChunkedFileView["sequence"sv].AsArrayView(); + Chunked.ChunkSequence.reserve(SequenceArray.Num()); + for (CbFieldView SequenceField : SequenceArray) + { + uint32_t SequenceIndex = SequenceField.AsUInt32(); + ZEN_ASSERT(SequenceIndex < Chunked.ChunkHashes.size()); + Chunked.ChunkSequence.push_back(SequenceIndex); + } + OnChunkedAttachment(Chunked); + ZEN_INFO("Requesting chunked attachment '{}' ({}) built from {} chunks", + Chunked.RawHash, + NiceBytes(Chunked.RawSize), + Chunked.ChunkHashes.size()); + } + } + size_t NeedBlockCount = 0; CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); for (CbFieldView BlockField : BlocksArray) @@ -2130,11 +2195,10 @@ ParseOplogContainer(const CbObject& ContainerObject, for (CbFieldView ChunkField : ChunksArray) { IoHash ChunkHash = ChunkField.AsBinaryAttachment(); - if (HasAttachment(ChunkHash)) + if (OpsAttachments.contains(ChunkHash) && !HasAttachment(ChunkHash)) { - continue; + NeededChunks.emplace_back(ChunkHash); } - NeededChunks.emplace_back(ChunkHash); } } else @@ -2142,11 +2206,10 @@ ParseOplogContainer(const CbObject& ContainerObject, for (CbFieldView ChunkField : ChunksArray) { const IoHash ChunkHash = ChunkField.AsHash(); - if (HasAttachment(ChunkHash)) + if (OpsAttachments.contains(ChunkHash) && !HasAttachment(ChunkHash)) { - continue; + NeededChunks.emplace_back(ChunkHash); } - NeededChunks.emplace_back(ChunkHash); } } @@ -2166,60 +2229,14 @@ ParseOplogContainer(const CbObject& ContainerObject, for (CbFieldView LargeChunksField : LargeChunksArray) { IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment(); - if (HasAttachment(AttachmentHash)) + if (OpsAttachments.contains(AttachmentHash) && !HasAttachment(AttachmentHash)) { - continue; + OnNeedAttachment(AttachmentHash); + NeedAttachmentCount++; } - OnNeedAttachment(AttachmentHash); - NeedAttachmentCount++; }; ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num())); - CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView(); - for (CbFieldView ChunkedFileField : ChunkedFilesArray) - { - CbObjectView ChunkedFileView = ChunkedFileField.AsObjectView(); - ChunkedInfo Chunked; - Chunked.RawHash = ChunkedFileView["rawhash"sv].AsHash(); - Chunked.RawSize = ChunkedFileView["rawsize"sv].AsUInt64(); - CbArrayView ChunksArray = ChunkedFileView["chunks"sv].AsArrayView(); - Chunked.ChunkHashes.reserve(ChunksArray.Num()); - for (CbFieldView ChunkField : ChunksArray) - { - const IoHash ChunkHash = ChunkField.AsHash(); - Chunked.ChunkHashes.emplace_back(ChunkHash); - } - CbArrayView SequenceArray = ChunkedFileView["sequence"sv].AsArrayView(); - Chunked.ChunkSequence.reserve(SequenceArray.Num()); - for (CbFieldView SequenceField : SequenceArray) - { - uint32_t SequenceIndex = SequenceField.AsUInt32(); - ZEN_ASSERT(SequenceIndex < Chunked.ChunkHashes.size()); - Chunked.ChunkSequence.push_back(SequenceIndex); - } - OnChunkedAttachment(Chunked); - ZEN_INFO("Found chunked attachment '{}' ({}) built from {} chunks", - Chunked.RawHash, - NiceBytes(Chunked.RawSize), - Chunked.ChunkHashes.size()); - } - - MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView(); - IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize()); - IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer(); - - { - CbObject SectionObject = LoadCompactBinaryObject(SectionPayload); - if (!SectionObject) - { - ReportMessage(OptionalContext, fmt::format("Failed to save oplog container: '{}'", "Section has unexpected data type")); - return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest), - Timer.GetElapsedTimeMs() / 1000.0, - "Section has unexpected data type", - "Failed to save oplog container"}; - } - OutOplogSection = SectionObject; - } return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0}; } @@ -2283,6 +2300,7 @@ WriteOplogSection(ProjectStore::Oplog& Oplog, const CbObjectView& SectionObject, RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog, const CbObject& ContainerObject, + const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments, const std::function<bool(const IoHash& RawHash)>& HasAttachment, const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, @@ -2294,6 +2312,7 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, Stopwatch Timer; CbObject OplogSection; RemoteProjectStore::Result Result = ParseOplogContainer(ContainerObject, + OnReferencedAttachments, HasAttachment, OnNeedBlock, OnNeedAttachment, @@ -2374,7 +2393,7 @@ LoadOplog(CidStore& ChunkStore, Stopwatch LoadAttachmentsTimer; std::atomic_uint64_t DownloadStartMS = (std::uint64_t)-1; - auto HasAttachment = [&ChunkStore, ForceDownload](const IoHash& RawHash) { + auto HasAttachment = [&Oplog, &ChunkStore, ForceDownload](const IoHash& RawHash) { if (ForceDownload) { return false; @@ -2386,7 +2405,6 @@ LoadOplog(CidStore& ChunkStore, return false; }; auto OnNeedBlock = [&RemoteStore, - &Oplog, &ChunkStore, &NetworkWorkerPool, &WorkerPool, @@ -2405,8 +2423,6 @@ LoadOplog(CidStore& ChunkStore, return; } - Oplog.CaptureAddedAttachments(std::vector<IoHash>{Chunks}); - BlockCountToDownload++; if (BlockHash == IoHash::Zero) { @@ -2645,8 +2661,6 @@ LoadOplog(CidStore& ChunkStore, return; } - Oplog.CaptureAddedAttachments(std::vector<IoHash>{RawHash}); - AttachmentsDownloadLatch.AddCount(1); AttachmentCount.fetch_add(1); NetworkWorkerPool.ScheduleWork([&RemoteStore, @@ -2721,23 +2735,27 @@ LoadOplog(CidStore& ChunkStore, std::vector<ChunkedInfo> FilesToDechunk; auto OnChunkedAttachment = [&Oplog, &ChunkStore, &FilesToDechunk, ForceDownload](const ChunkedInfo& Chunked) { - Oplog.CaptureAddedAttachments(std::vector<IoHash>{Chunked.RawHash}); if (ForceDownload || !ChunkStore.ContainsChunk(Chunked.RawHash)) { - Oplog.CaptureAddedAttachments(Chunked.ChunkHashes); FilesToDechunk.push_back(Chunked); } }; + auto OnReferencedAttachments = [&Oplog](std::span<IoHash> RawHashes) { Oplog.CaptureAddedAttachments(RawHashes); }; + + // Make sure we retain any attachments we download before writing the oplog + Oplog.EnableUpdateCapture(); + auto _ = MakeGuard([&Oplog]() { Oplog.DisableUpdateCapture(); }); + CbObject OplogSection; - RemoteProjectStore::Result Result = // SaveOplogContainer(Oplog, - ParseOplogContainer(LoadContainerResult.ContainerObject, - HasAttachment, - OnNeedBlock, - OnNeedAttachment, - OnChunkedAttachment, - OplogSection, - OptionalContext); + RemoteProjectStore::Result Result = ParseOplogContainer(LoadContainerResult.ContainerObject, + OnReferencedAttachments, + HasAttachment, + OnNeedBlock, + OnNeedAttachment, + OnChunkedAttachment, + OplogSection, + OptionalContext); if (Result.ErrorCode != 0) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); |