diff options
Diffstat (limited to 'src/zenserver/projectstore')
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 83 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 162 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.h | 7 |
3 files changed, 138 insertions, 114 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index d177b0b2b..b36c8caa0 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -854,6 +854,7 @@ ProjectStore::Oplog::Reset() m_Storage = new OplogStorage(this, m_BasePath); m_Storage->Open(true); CleanDirectory(m_TempPath); + Write(); } // Erase content on disk if (!MovedDir.empty()) @@ -1244,15 +1245,15 @@ ProjectStore::Oplog::IterateUpdatedLSNs(RwLock::SharedLockScope&, std::function< { if (m_UpdatedLSNs) { + if (!m_Storage) + { + return; + } for (int UpdatedLSN : *m_UpdatedLSNs) { - std::optional<CbObject> UpdatedOp = GetOpByIndex(UpdatedLSN); - if (UpdatedOp) + if (const auto AddressEntryIt = m_OpAddressMap.find(UpdatedLSN); AddressEntryIt != m_OpAddressMap.end()) { - if (!Callback(UpdatedOp.value())) - { - break; - } + Callback(m_Storage->GetOp(AddressEntryIt->second)); } } } @@ -3134,8 +3135,19 @@ ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_vie auto OnChunkedAttachment = [](const ChunkedInfo&) {}; - RemoteProjectStore::Result RemoteResult = - SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, OnChunkedAttachment, nullptr); + auto OnReferencedAttachments = [&Oplog](std::span<IoHash> RawHashes) { Oplog->CaptureAddedAttachments(RawHashes); }; + // Make sure we retain any attachments we download before writing the oplog + Oplog->EnableUpdateCapture(); + auto _ = MakeGuard([&Oplog]() { Oplog->DisableUpdateCapture(); }); + + RemoteProjectStore::Result RemoteResult = SaveOplogContainer(*Oplog, + ContainerObject, + OnReferencedAttachments, + HasAttachment, + OnNeedBlock, + OnNeedAttachment, + OnChunkedAttachment, + nullptr); if (RemoteResult.ErrorCode) { @@ -3899,7 +3911,7 @@ ProjectStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) class ProjectStoreReferenceChecker : public GcReferenceChecker { public: - ProjectStoreReferenceChecker(ProjectStore::Oplog& Owner, bool PreCache) : m_Oplog(Owner), m_PreCache(PreCache) {} + ProjectStoreReferenceChecker(ProjectStore::Oplog& Owner) : m_Oplog(Owner) {} virtual ~ProjectStoreReferenceChecker() { @@ -3921,36 +3933,33 @@ public: virtual void PreCache(GcCtx& Ctx) override { - if (m_PreCache) - { - ZEN_TRACE_CPU("Store::PreCache"); - - Stopwatch Timer; - const auto _ = MakeGuard([&] { - if (!Ctx.Settings.Verbose) - { - return; - } - ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': precached {} references in {} from {}/{}", - m_Oplog.m_BasePath, - m_References.size(), - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - m_Oplog.m_OuterProject->Identifier, - m_Oplog.OplogId()); - }); + ZEN_TRACE_CPU("Store::PreCache"); - m_Oplog.EnableUpdateCapture(); - m_OplogCaptureEnabled = true; - - RwLock::SharedLockScope __(m_Oplog.m_OplogLock); - if (Ctx.IsCancelledFlag) + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) { return; } - m_Oplog.IterateOplog([&](CbObjectView Op) { - Op.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); }); - }); + ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': precached {} references in {} from {}/{}", + m_Oplog.m_BasePath, + m_References.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + m_Oplog.m_OuterProject->Identifier, + m_Oplog.OplogId()); + }); + + m_Oplog.EnableUpdateCapture(); + m_OplogCaptureEnabled = true; + + RwLock::SharedLockScope __(m_Oplog.m_OplogLock); + if (Ctx.IsCancelledFlag) + { + return; } + m_Oplog.IterateOplog([&](CbObjectView Op) { + Op.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); }); + }); } virtual void LockState(GcCtx& Ctx) override @@ -3972,7 +3981,6 @@ public: }); m_OplogLock = std::make_unique<RwLock::SharedLockScope>(m_Oplog.m_OplogLock); - m_Oplog.IterateUpdatedLSNs(*m_OplogLock, [&](const CbObjectView& UpdateOp) -> bool { UpdateOp.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); }); return true; @@ -4021,7 +4029,6 @@ public: }); } ProjectStore::Oplog& m_Oplog; - bool m_PreCache; std::unique_ptr<RwLock::SharedLockScope> m_OplogLock; std::vector<IoHash> m_References; bool m_OplogCaptureEnabled = false; @@ -4075,9 +4082,7 @@ ProjectStore::CreateReferenceCheckers(GcCtx& Ctx) { continue; } - GcClock::TimePoint Now = GcClock::Now(); - bool TryPreCache = Project->LastOplogAccessTime(OpLogId) < (Now - std::chrono::minutes(5)); - Checkers.emplace_back(new ProjectStoreReferenceChecker(*Oplog, TryPreCache)); + Checkers.emplace_back(new ProjectStoreReferenceChecker(*Oplog)); OplogCount++; } } diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index e922fcf1c..0aa8df362 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -1928,7 +1928,6 @@ SaveOplog(CidStore& ChunkStore, RemoteStoreInfo.CreateBlocks, IgnoreMissingAttachments, RemoteStoreInfo.AllowChunking, - KnownBlocks, WorkerPool, OnBlock, @@ -2103,6 +2102,7 @@ SaveOplog(CidStore& ChunkStore, RemoteProjectStore::Result ParseOplogContainer(const CbObject& ContainerObject, + const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments, const std::function<bool(const IoHash& RawHash)>& HasAttachment, const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, @@ -2114,6 +2114,71 @@ ParseOplogContainer(const CbObject& ContainerObject, Stopwatch Timer; + MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView(); + IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize()); + IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer(); + + { + CbObject SectionObject = LoadCompactBinaryObject(SectionPayload); + if (!SectionObject) + { + ReportMessage(OptionalContext, fmt::format("Failed to save oplog container: '{}'", "Section has unexpected data type")); + return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest), + Timer.GetElapsedTimeMs() / 1000.0, + "Section has unexpected data type", + "Failed to save oplog container"}; + } + OutOplogSection = SectionObject; + } + std::unordered_set<IoHash, IoHash::Hasher> OpsAttachments; + { + CbArrayView OpsArray = OutOplogSection["ops"sv].AsArrayView(); + for (CbFieldView OpEntry : OpsArray) + { + OpEntry.IterateAttachments([&](CbFieldView FieldView) { OpsAttachments.insert(FieldView.AsAttachment()); }); + } + } + { + std::vector<IoHash> ReferencedAttachments(OpsAttachments.begin(), OpsAttachments.end()); + OnReferencedAttachments(ReferencedAttachments); + } + ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", OpsAttachments.size())); + + CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView(); + for (CbFieldView ChunkedFileField : ChunkedFilesArray) + { + CbObjectView ChunkedFileView = ChunkedFileField.AsObjectView(); + IoHash RawHash = ChunkedFileView["rawhash"sv].AsHash(); + if (OpsAttachments.contains(RawHash) && (!HasAttachment(RawHash))) + { + ChunkedInfo Chunked; + Chunked.RawHash = RawHash; + Chunked.RawSize = ChunkedFileView["rawsize"sv].AsUInt64(); + CbArrayView ChunksArray = ChunkedFileView["chunks"sv].AsArrayView(); + Chunked.ChunkHashes.reserve(ChunksArray.Num()); + for (CbFieldView ChunkField : ChunksArray) + { + const IoHash ChunkHash = ChunkField.AsHash(); + Chunked.ChunkHashes.emplace_back(ChunkHash); + } + OnReferencedAttachments(Chunked.ChunkHashes); + OpsAttachments.insert(Chunked.ChunkHashes.begin(), Chunked.ChunkHashes.end()); + CbArrayView SequenceArray = ChunkedFileView["sequence"sv].AsArrayView(); + Chunked.ChunkSequence.reserve(SequenceArray.Num()); + for (CbFieldView SequenceField : SequenceArray) + { + uint32_t SequenceIndex = SequenceField.AsUInt32(); + ZEN_ASSERT(SequenceIndex < Chunked.ChunkHashes.size()); + Chunked.ChunkSequence.push_back(SequenceIndex); + } + OnChunkedAttachment(Chunked); + ZEN_INFO("Requesting chunked attachment '{}' ({}) built from {} chunks", + Chunked.RawHash, + NiceBytes(Chunked.RawSize), + Chunked.ChunkHashes.size()); + } + } + size_t NeedBlockCount = 0; CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); for (CbFieldView BlockField : BlocksArray) @@ -2130,11 +2195,10 @@ ParseOplogContainer(const CbObject& ContainerObject, for (CbFieldView ChunkField : ChunksArray) { IoHash ChunkHash = ChunkField.AsBinaryAttachment(); - if (HasAttachment(ChunkHash)) + if (OpsAttachments.contains(ChunkHash) && !HasAttachment(ChunkHash)) { - continue; + NeededChunks.emplace_back(ChunkHash); } - NeededChunks.emplace_back(ChunkHash); } } else @@ -2142,11 +2206,10 @@ ParseOplogContainer(const CbObject& ContainerObject, for (CbFieldView ChunkField : ChunksArray) { const IoHash ChunkHash = ChunkField.AsHash(); - if (HasAttachment(ChunkHash)) + if (OpsAttachments.contains(ChunkHash) && !HasAttachment(ChunkHash)) { - continue; + NeededChunks.emplace_back(ChunkHash); } - NeededChunks.emplace_back(ChunkHash); } } @@ -2166,60 +2229,14 @@ ParseOplogContainer(const CbObject& ContainerObject, for (CbFieldView LargeChunksField : LargeChunksArray) { IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment(); - if (HasAttachment(AttachmentHash)) + if (OpsAttachments.contains(AttachmentHash) && !HasAttachment(AttachmentHash)) { - continue; + OnNeedAttachment(AttachmentHash); + NeedAttachmentCount++; } - OnNeedAttachment(AttachmentHash); - NeedAttachmentCount++; }; ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num())); - CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView(); - for (CbFieldView ChunkedFileField : ChunkedFilesArray) - { - CbObjectView ChunkedFileView = ChunkedFileField.AsObjectView(); - ChunkedInfo Chunked; - Chunked.RawHash = ChunkedFileView["rawhash"sv].AsHash(); - Chunked.RawSize = ChunkedFileView["rawsize"sv].AsUInt64(); - CbArrayView ChunksArray = ChunkedFileView["chunks"sv].AsArrayView(); - Chunked.ChunkHashes.reserve(ChunksArray.Num()); - for (CbFieldView ChunkField : ChunksArray) - { - const IoHash ChunkHash = ChunkField.AsHash(); - Chunked.ChunkHashes.emplace_back(ChunkHash); - } - CbArrayView SequenceArray = ChunkedFileView["sequence"sv].AsArrayView(); - Chunked.ChunkSequence.reserve(SequenceArray.Num()); - for (CbFieldView SequenceField : SequenceArray) - { - uint32_t SequenceIndex = SequenceField.AsUInt32(); - ZEN_ASSERT(SequenceIndex < Chunked.ChunkHashes.size()); - Chunked.ChunkSequence.push_back(SequenceIndex); - } - OnChunkedAttachment(Chunked); - ZEN_INFO("Found chunked attachment '{}' ({}) built from {} chunks", - Chunked.RawHash, - NiceBytes(Chunked.RawSize), - Chunked.ChunkHashes.size()); - } - - MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView(); - IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize()); - IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer(); - - { - CbObject SectionObject = LoadCompactBinaryObject(SectionPayload); - if (!SectionObject) - { - ReportMessage(OptionalContext, fmt::format("Failed to save oplog container: '{}'", "Section has unexpected data type")); - return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest), - Timer.GetElapsedTimeMs() / 1000.0, - "Section has unexpected data type", - "Failed to save oplog container"}; - } - OutOplogSection = SectionObject; - } return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0}; } @@ -2283,6 +2300,7 @@ WriteOplogSection(ProjectStore::Oplog& Oplog, const CbObjectView& SectionObject, RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog, const CbObject& ContainerObject, + const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments, const std::function<bool(const IoHash& RawHash)>& HasAttachment, const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, @@ -2294,6 +2312,7 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, Stopwatch Timer; CbObject OplogSection; RemoteProjectStore::Result Result = ParseOplogContainer(ContainerObject, + OnReferencedAttachments, HasAttachment, OnNeedBlock, OnNeedAttachment, @@ -2374,7 +2393,7 @@ LoadOplog(CidStore& ChunkStore, Stopwatch LoadAttachmentsTimer; std::atomic_uint64_t DownloadStartMS = (std::uint64_t)-1; - auto HasAttachment = [&ChunkStore, ForceDownload](const IoHash& RawHash) { + auto HasAttachment = [&Oplog, &ChunkStore, ForceDownload](const IoHash& RawHash) { if (ForceDownload) { return false; @@ -2386,7 +2405,6 @@ LoadOplog(CidStore& ChunkStore, return false; }; auto OnNeedBlock = [&RemoteStore, - &Oplog, &ChunkStore, &NetworkWorkerPool, &WorkerPool, @@ -2405,8 +2423,6 @@ LoadOplog(CidStore& ChunkStore, return; } - Oplog.CaptureAddedAttachments(std::vector<IoHash>{Chunks}); - BlockCountToDownload++; if (BlockHash == IoHash::Zero) { @@ -2645,8 +2661,6 @@ LoadOplog(CidStore& ChunkStore, return; } - Oplog.CaptureAddedAttachments(std::vector<IoHash>{RawHash}); - AttachmentsDownloadLatch.AddCount(1); AttachmentCount.fetch_add(1); NetworkWorkerPool.ScheduleWork([&RemoteStore, @@ -2721,23 +2735,27 @@ LoadOplog(CidStore& ChunkStore, std::vector<ChunkedInfo> FilesToDechunk; auto OnChunkedAttachment = [&Oplog, &ChunkStore, &FilesToDechunk, ForceDownload](const ChunkedInfo& Chunked) { - Oplog.CaptureAddedAttachments(std::vector<IoHash>{Chunked.RawHash}); if (ForceDownload || !ChunkStore.ContainsChunk(Chunked.RawHash)) { - Oplog.CaptureAddedAttachments(Chunked.ChunkHashes); FilesToDechunk.push_back(Chunked); } }; + auto OnReferencedAttachments = [&Oplog](std::span<IoHash> RawHashes) { Oplog.CaptureAddedAttachments(RawHashes); }; + + // Make sure we retain any attachments we download before writing the oplog + Oplog.EnableUpdateCapture(); + auto _ = MakeGuard([&Oplog]() { Oplog.DisableUpdateCapture(); }); + CbObject OplogSection; - RemoteProjectStore::Result Result = // SaveOplogContainer(Oplog, - ParseOplogContainer(LoadContainerResult.ContainerObject, - HasAttachment, - OnNeedBlock, - OnNeedAttachment, - OnChunkedAttachment, - OplogSection, - OptionalContext); + RemoteProjectStore::Result Result = ParseOplogContainer(LoadContainerResult.ContainerObject, + OnReferencedAttachments, + HasAttachment, + OnNeedBlock, + OnNeedAttachment, + OnChunkedAttachment, + OplogSection, + OptionalContext); if (Result.ErrorCode != 0) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h index d6e064bdf..f0655fb59 100644 --- a/src/zenserver/projectstore/remoteprojectstore.h +++ b/src/zenserver/projectstore/remoteprojectstore.h @@ -133,9 +133,10 @@ RemoteProjectStore::LoadContainerResult BuildContainer( class JobContext; -RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog, - const CbObject& ContainerObject, - const std::function<bool(const IoHash& RawHash)>& HasAttachment, +RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog, + const CbObject& ContainerObject, + const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments, + const std::function<bool(const IoHash& RawHash)>& HasAttachment, const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, const std::function<void(const ChunkedInfo& Chunked)>& OnChunkedAttachment, |