diff options
| author | Dan Engelbrecht <[email protected]> | 2024-04-18 17:31:15 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-04-18 17:31:15 +0200 |
| commit | 93e252c50db8947ea065fb5ea8ad17892ddc37d0 (patch) | |
| tree | 6e0e95662d7319d439b7629b47686d48679a57f9 | |
| parent | improved lock file handling (#50) (diff) | |
| download | zen-de/safer-oplog-import.tar.xz zen-de/safer-oplog-import.zip | |
safer oplog import (#52)de/safer-oplog-import
* reference cache gc update capture
* When importing oplogs we now import all attachments first and (optionally clean) write the oplog on success
| -rw-r--r-- | CHANGELOG.md | 1 | ||||
| -rw-r--r-- | src/zen/cmds/projectstore_cmd.cpp | 23 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 79 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 2 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 172 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.h | 1 |
6 files changed, 202 insertions, 76 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 51b4bf6ae..456150f6b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ - Improvement: Detect zombie processes on Mac/Linux when checking for running processes - Imrpovement: Make sure zenserver detaches itself as a child process at startup to avoid zombie process if parent process does not wait for zenserver child process - Improvement: Trying to load a compact binary object from an empty file no longer causes access violation +- Improvement: When importing oplogs we now import all attachments first and (optionally clean) write the oplog on success to avoid invalid import results ## 5.4.4 - Bugfix: Get raw size for compressed chunks correctly for `/prj/{project}/oplog/{log}/chunkinfos` diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp index f877a3c51..6591c05cd 100644 --- a/src/zen/cmds/projectstore_cmd.cpp +++ b/src/zen/cmds/projectstore_cmd.cpp @@ -1158,25 +1158,12 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg std::string Url = fmt::format("/prj/{}/oplog/{}", m_ProjectName, m_OplogName); bool CreateOplog = false; - if (HttpClient::Response Result = Http.Get(Url, HttpClient::Accept(ZenContentType::kJSON))) - { - if (m_Clean) - { - ZEN_WARN("Deleting oplog '{}/{}'", m_ProjectName, m_OplogName) - Result = Http.Delete(Url, HttpClient::Accept(ZenContentType::kJSON)); - if (!Result) - { - Result.ThrowError("failed deleting existing oplog"sv); - return 1; - } - CreateOplog = true; - } - } - else if (Result.StatusCode == HttpResponseCode::NotFound) + if (HttpClient::Response Result = Http.Get(Url, HttpClient::Accept(ZenContentType::kJSON)); + Result.StatusCode == HttpResponseCode::NotFound) { CreateOplog = true; } - else + else if (!IsHttpSuccessCode(Result.StatusCode)) { Result.ThrowError("failed checking oplog"sv); return 1; @@ -1211,6 +1198,10 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg { Writer.AddBool("ignoremissingattachments"sv, true); } + if (m_Clean) + { + Writer.AddBool("clean"sv, true); + } if (!m_FileDirectoryPath.empty()) { Writer.BeginObject("file"sv); diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 65d2730d8..e452c658e 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -831,6 +831,38 @@ ProjectStore::Oplog::Update(const std::filesystem::path& MarkerPath) Write(); } +bool +ProjectStore::Oplog::Reset() +{ + std::filesystem::path MovedDir; + + { + RwLock::ExclusiveLockScope OplogLock(m_OplogLock); + m_Storage = {}; + if (!PrepareDirectoryDelete(m_BasePath, MovedDir)) + { + m_Storage = new OplogStorage(this, m_BasePath); + const bool StoreExists = m_Storage->Exists(); + m_Storage->Open(/* IsCreate */ !StoreExists); + return false; + } + m_ChunkMap.clear(); + m_MetaMap.clear(); + m_FileMap.clear(); + m_OpAddressMap.clear(); + m_LatestOpMap.clear(); + m_Storage = new OplogStorage(this, m_BasePath); + m_Storage->Open(true); + CleanDirectory(m_TempPath); + } + // Erase content on disk + if (!MovedDir.empty()) + { + OplogStorage::Delete(MovedDir); + } + return true; +} + void ProjectStore::Oplog::ReplayLog() { @@ -1167,8 +1199,19 @@ void ProjectStore::Oplog::EnableUpdateCapture() { m_OplogLock.WithExclusiveLock([&]() { - m_UpdatedLSNs = std::make_unique<std::vector<int>>(); - m_NonGCAttachments = std::make_unique<std::vector<IoHash>>(); + if (m_UpdateCaptureRefCounter == 0) + { + ZEN_ASSERT(!m_UpdatedLSNs); + ZEN_ASSERT(!m_NonGCAttachments); + m_UpdatedLSNs = std::make_unique<std::vector<int>>(); + m_NonGCAttachments = std::make_unique<std::vector<IoHash>>(); + } + else + { + ZEN_ASSERT(m_UpdatedLSNs); + ZEN_ASSERT(m_NonGCAttachments); + } + m_UpdateCaptureRefCounter++; }); } @@ -1176,8 +1219,15 @@ void ProjectStore::Oplog::DisableUpdateCapture() { m_OplogLock.WithExclusiveLock([&]() { - m_UpdatedLSNs.reset(); - m_NonGCAttachments.reset(); + ZEN_ASSERT(m_UpdatedLSNs); + ZEN_ASSERT(m_NonGCAttachments); + ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); + m_UpdateCaptureRefCounter--; + if (m_UpdateCaptureRefCounter == 0) + { + m_UpdatedLSNs.reset(); + m_NonGCAttachments.reset(); + } }); } @@ -3581,6 +3631,7 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize); bool Force = Params["force"sv].AsBool(false); bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false); + bool CleanOplog = Params["clean"].AsBool(false); CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath()); @@ -3594,9 +3645,10 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description); JobId JobId = m_JobQueue.QueueJob( fmt::format("Import oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description), - [this, ActualRemoteStore = std::move(RemoteStore), OplogPtr = &Oplog, Force, IgnoreMissingAttachments](JobContext& Context) { + [this, ActualRemoteStore = std::move(RemoteStore), OplogPtr = &Oplog, Force, IgnoreMissingAttachments, CleanOplog]( + JobContext& Context) { RemoteProjectStore::Result Result = - LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, IgnoreMissingAttachments, &Context); + LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, IgnoreMissingAttachments, CleanOplog, &Context); auto Response = ConvertResult(Result); ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); if (!IsHttpSuccessCode(Response.first)) @@ -4258,11 +4310,22 @@ TEST_CASE_TEMPLATE("project.store.export", ProjectStore::Oplog* OplogImport = Project->NewOplog("oplog2", {}); CHECK(OplogImport != nullptr); - RemoteProjectStore::Result ImportResult = LoadOplog(CidStore, *RemoteStore, *OplogImport, false, false, nullptr); + + RemoteProjectStore::Result ImportResult = + LoadOplog(CidStore, *RemoteStore, *OplogImport, /*Force*/ false, /*IgnoreMissingAttachments*/ false, /*CleanOplog*/ false, nullptr); CHECK(ImportResult.ErrorCode == 0); - RemoteProjectStore::Result ImportForceResult = LoadOplog(CidStore, *RemoteStore, *OplogImport, true, false, nullptr); + RemoteProjectStore::Result ImportForceResult = + LoadOplog(CidStore, *RemoteStore, *OplogImport, /*Force*/ true, /*IgnoreMissingAttachments*/ false, /*CleanOplog*/ false, nullptr); CHECK(ImportForceResult.ErrorCode == 0); + + RemoteProjectStore::Result ImportCleanResult = + LoadOplog(CidStore, *RemoteStore, *OplogImport, /*Force*/ false, /*IgnoreMissingAttachments*/ false, /*CleanOplog*/ true, nullptr); + CHECK(ImportCleanResult.ErrorCode == 0); + + RemoteProjectStore::Result ImportForceCleanResult = + LoadOplog(CidStore, *RemoteStore, *OplogImport, /*Force*/ true, /*IgnoreMissingAttachments*/ false, /*CleanOplog*/ true, nullptr); + CHECK(ImportForceCleanResult.ErrorCode == 0); } TEST_CASE("project.store.gc") diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index e27bf6e49..75844f84e 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -82,6 +82,7 @@ public: void Read(); void Write(); void Update(const std::filesystem::path& MarkerPath); + bool Reset(); struct ChunkInfo { @@ -175,6 +176,7 @@ public: tsl::robin_map<int, OplogEntryAddress> m_OpAddressMap; // Index LSN -> op data in ops blob file OidMap<int> m_LatestOpMap; // op key -> latest op LSN for key + uint32_t m_UpdateCaptureRefCounter = 0; std::unique_ptr<std::vector<int>> m_UpdatedLSNs; std::unique_ptr<std::vector<IoHash>> m_NonGCAttachments; diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index cc9385f5e..65ef099e4 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -2094,13 +2094,13 @@ SaveOplog(CidStore& ChunkStore, }; RemoteProjectStore::Result -SaveOplogContainer(ProjectStore::Oplog& Oplog, - const CbObject& ContainerObject, - const std::function<bool(const IoHash& RawHash)>& HasAttachment, - const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, - const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, - const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment, - JobContext* OptionalContext) +ParseOplogContainer(const CbObject& ContainerObject, + const std::function<bool(const IoHash& RawHash)>& HasAttachment, + const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, + const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, + const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment, + CbObject& OutOplogSection, + JobContext* OptionalContext) { using namespace std::literals; @@ -2211,64 +2211,104 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog, "Section has unexpected data type", "Failed to save oplog container"}; } + OutOplogSection = SectionObject; + } + return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0}; +} - CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); - const uint64_t OpCount = OpsArray.Num(); +static RemoteProjectStore::Result +WriteOplogSection(ProjectStore::Oplog& Oplog, const CbObjectView& SectionObject, JobContext* OptionalContext) +{ + using namespace std::literals; - ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpCount)); + Stopwatch Timer; - const size_t OpsBatchSize = 8192; - std::vector<uint8_t> OpsData; - std::vector<size_t> OpDataOffsets; - size_t OpsCompleteCount = 0; + CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); + const uint64_t OpCount = OpsArray.Num(); - OpsData.reserve(OpsBatchSize); + ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpCount)); - auto AppendBatch = [&]() { - std::vector<CbObjectView> Ops; - Ops.reserve(OpDataOffsets.size()); - for (size_t OpDataOffset : OpDataOffsets) - { - Ops.emplace_back(CbObjectView(&OpsData[OpDataOffset])); - } - std::vector<uint32_t> OpLsns = Oplog.AppendNewOplogEntries(Ops); - OpsCompleteCount += OpLsns.size(); - OpsData.clear(); - OpDataOffsets.clear(); - ReportProgress(OptionalContext, - fmt::format("Writing oplog, {} remaining...", OpCount - OpsCompleteCount), - OpCount, - OpCount - OpsCompleteCount); - }; + const size_t OpsBatchSize = 8192; + std::vector<uint8_t> OpsData; + std::vector<size_t> OpDataOffsets; + size_t OpsCompleteCount = 0; - BinaryWriter Writer; - for (CbFieldView OpEntry : OpsArray) - { - CbObjectView Op = OpEntry.AsObjectView(); - Op.CopyTo(Writer); - OpDataOffsets.push_back(OpsData.size()); - OpsData.insert(OpsData.end(), (const uint8_t*)Writer.GetData(), ((const uint8_t*)Writer.GetData()) + Writer.GetSize()); - Writer.Reset(); + OpsData.reserve(OpsBatchSize); - if (OpDataOffsets.size() == OpsBatchSize) - { - AppendBatch(); - } + auto AppendBatch = [&]() { + std::vector<CbObjectView> Ops; + Ops.reserve(OpDataOffsets.size()); + for (size_t OpDataOffset : OpDataOffsets) + { + Ops.emplace_back(CbObjectView(&OpsData[OpDataOffset])); } - if (!OpDataOffsets.empty()) + std::vector<uint32_t> OpLsns = Oplog.AppendNewOplogEntries(Ops); + OpsCompleteCount += OpLsns.size(); + OpsData.clear(); + OpDataOffsets.clear(); + ReportProgress(OptionalContext, + fmt::format("Writing oplog, {} remaining...", OpCount - OpsCompleteCount), + OpCount, + OpCount - OpsCompleteCount); + }; + + BinaryWriter Writer; + for (CbFieldView OpEntry : OpsArray) + { + CbObjectView Op = OpEntry.AsObjectView(); + Op.CopyTo(Writer); + OpDataOffsets.push_back(OpsData.size()); + OpsData.insert(OpsData.end(), (const uint8_t*)Writer.GetData(), ((const uint8_t*)Writer.GetData()) + Writer.GetSize()); + Writer.Reset(); + + if (OpDataOffsets.size() == OpsBatchSize) { AppendBatch(); } } + if (!OpDataOffsets.empty()) + { + AppendBatch(); + } return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0}; } RemoteProjectStore::Result +SaveOplogContainer(ProjectStore::Oplog& Oplog, + const CbObject& ContainerObject, + const std::function<bool(const IoHash& RawHash)>& HasAttachment, + const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, + const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, + const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment, + JobContext* OptionalContext) +{ + using namespace std::literals; + + Stopwatch Timer; + CbObject OplogSection; + RemoteProjectStore::Result Result = ParseOplogContainer(ContainerObject, + HasAttachment, + OnNeedBlock, + OnNeedAttachment, + OnChunkedAttachment, + OplogSection, + OptionalContext); + if (Result.ErrorCode != 0) + { + return Result; + } + Result = WriteOplogSection(Oplog, OplogSection, OptionalContext); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; + return Result; +} + +RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload, bool IgnoreMissingAttachments, + bool CleanOplog, JobContext* OptionalContext) { using namespace std::literals; @@ -2338,6 +2378,7 @@ LoadOplog(CidStore& ChunkStore, return false; }; auto OnNeedBlock = [&RemoteStore, + &Oplog, &ChunkStore, &NetworkWorkerPool, &AttachmentsWorkLatch, @@ -2353,6 +2394,9 @@ LoadOplog(CidStore& ChunkStore, { return; } + + Oplog.CaptureAddedAttachments(std::vector<IoHash>{Chunks}); + BlockCountToDownload++; if (BlockHash == IoHash::Zero) { @@ -2495,6 +2539,7 @@ LoadOplog(CidStore& ChunkStore, }; auto OnNeedAttachment = [&RemoteStore, + &Oplog, &ChunkStore, &NetworkWorkerPool, &AttachmentsWorkLatch, @@ -2515,6 +2560,8 @@ LoadOplog(CidStore& ChunkStore, return; } + Oplog.CaptureAddedAttachments(std::vector<IoHash>{RawHash}); + AttachmentsWorkLatch.AddCount(1); AttachmentCount.fetch_add(1); NetworkWorkerPool.ScheduleWork([&RemoteStore, @@ -2571,6 +2618,7 @@ LoadOplog(CidStore& ChunkStore, std::vector<ChunkedInfo> FilesToDechunk; auto OnChunkedAttachment = [&Oplog, &ChunkStore, &FilesToDechunk, ForceDownload](const ChunkedInfo& Chunked) { + Oplog.CaptureAddedAttachments(std::vector<IoHash>{Chunked.RawHash}); if (ForceDownload || !ChunkStore.ContainsChunk(Chunked.RawHash)) { Oplog.CaptureAddedAttachments(Chunked.ChunkHashes); @@ -2578,19 +2626,21 @@ LoadOplog(CidStore& ChunkStore, } }; - RemoteProjectStore::Result Result = SaveOplogContainer(Oplog, - LoadContainerResult.ContainerObject, - HasAttachment, - OnNeedBlock, - OnNeedAttachment, - OnChunkedAttachment, - OptionalContext); + CbObject OplogSection; + RemoteProjectStore::Result Result = // SaveOplogContainer(Oplog, + ParseOplogContainer(LoadContainerResult.ContainerObject, + HasAttachment, + OnNeedBlock, + OnNeedAttachment, + OnChunkedAttachment, + OplogSection, + OptionalContext); if (Result.ErrorCode != 0) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); } ReportMessage(OptionalContext, - fmt::format("Wrote oplog in {}, found {} attachments, {} blocks and {} chunked files to download", + fmt::format("Parsed oplog in {}, found {} attachments, {} blocks and {} chunked files to download", NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), Attachments.size(), BlockCountToDownload, @@ -2741,6 +2791,24 @@ LoadOplog(CidStore& ChunkStore, Result = RemoteResult.ConvertResult(); } + if (Result.ErrorCode == 0) + { + if (CleanOplog) + { + if (!Oplog.Reset()) + { + Result = RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::InternalServerError), + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, + .Reason = fmt::format("Failed to clean existing oplog '{}'", Oplog.OplogId())}; + ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Reason)); + } + } + if (Result.ErrorCode == 0) + { + WriteOplogSection(Oplog, OplogSection, OptionalContext); + } + } + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; ReportRemoteStoreStats(OptionalContext, RemoteStoreInfo, RemoteStore.GetStats(), TransferWallTimeMS); diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h index 6b83e526c..b00aa231f 100644 --- a/src/zenserver/projectstore/remoteprojectstore.h +++ b/src/zenserver/projectstore/remoteprojectstore.h @@ -158,6 +158,7 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, ProjectStore::Oplog& Oplog, bool ForceDownload, bool IgnoreMissingAttachments, + bool CleanOplog, JobContext* OptionalContext); CompressedBuffer GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks); |