diff options
Diffstat (limited to 'zenserver')
| -rw-r--r-- | zenserver/projectstore.cpp | 212 | ||||
| -rw-r--r-- | zenserver/projectstore.h | 40 |
2 files changed, 209 insertions, 43 deletions
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index 3e242653a..5c7de2a43 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -274,10 +274,15 @@ private: ////////////////////////////////////////////////////////////////////////// -ProjectStore::Oplog::Oplog(std::string_view Id, Project* Project, CidStore& Store, std::filesystem::path BasePath) +ProjectStore::Oplog::Oplog(std::string_view Id, + Project* Project, + CidStore& Store, + std::filesystem::path BasePath, + const std::filesystem::path& MarkerPath) : m_OuterProject(Project) , m_CidStore(Store) , m_BasePath(BasePath) +, m_MarkerPath(MarkerPath) , m_OplogId(Id) { using namespace std::literals; @@ -348,6 +353,16 @@ ProjectStore::Oplog::TotalSize() const return 0; } +bool +ProjectStore::Oplog::IsExpired() const +{ + if (m_MarkerPath.empty()) + { + return false; + } + return !std::filesystem::exists(m_MarkerPath); +} + std::filesystem::path ProjectStore::Oplog::PrepareForDelete(bool MoveFolder) { @@ -373,19 +388,90 @@ ProjectStore::Oplog::PrepareForDelete(bool MoveFolder) bool ProjectStore::Oplog::ExistsAt(std::filesystem::path BasePath) { - return OplogStorage::Exists(BasePath); + using namespace std::literals; + + std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv; + return std::filesystem::is_regular_file(StateFilePath); +} + +void +ProjectStore::Oplog::Read() +{ + using namespace std::literals; + + std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv; + if (std::filesystem::is_regular_file(StateFilePath)) + { + ZEN_INFO("reading config for oplog '{}' in project '{}' from {}", m_OplogId, m_OuterProject->Identifier, StateFilePath); + + BasicFile Blob; + Blob.Open(StateFilePath, BasicFile::Mode::kRead); + + IoBuffer Obj = Blob.ReadAll(); + CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); + + if (ValidationError != CbValidateError::None) + { + ZEN_ERROR("validation error {} hit for '{}'", int(ValidationError), StateFilePath); + return; + } + + CbObject Cfg = LoadCompactBinaryObject(Obj); + + m_MarkerPath = Cfg["gcpath"sv].AsString(); + } + else + { + ZEN_INFO("config for oplog '{}' in project '{}' not found at {}. Assuming legacy store", + m_OplogId, + m_OuterProject->Identifier, + StateFilePath); + } + ReplayLog(); +} + +void +ProjectStore::Oplog::Write() +{ + using namespace std::literals; + + BinaryWriter Mem; + + CbObjectWriter Cfg; + + Cfg << "gcpath"sv << PathToUtf8(m_MarkerPath); + + Cfg.Save(Mem); + + std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv; + + ZEN_INFO("persisting config for oplog '{}' in project '{}' to {}", m_OplogId, m_OuterProject->Identifier, StateFilePath); + + BasicFile Blob; + Blob.Open(StateFilePath, BasicFile::Mode::kTruncate); + Blob.Write(Mem.Data(), Mem.Size(), 0); + Blob.Flush(); } void ProjectStore::Oplog::ReplayLog() { - m_Storage->ReplayLog([&](CbObject Op, const OplogEntry& OpEntry) { RegisterOplogEntry(Op, OpEntry, kUpdateReplay); }); + RwLock::ExclusiveLockScope OplogLock(m_OplogLock); + if (!m_Storage) + { + return; + } + m_Storage->ReplayLog([&](CbObject Op, const OplogEntry& OpEntry) { RegisterOplogEntry(OplogLock, Op, OpEntry, kUpdateReplay); }); } IoBuffer ProjectStore::Oplog::FindChunk(Oid ChunkId) { RwLock::SharedLockScope OplogLock(m_OplogLock); + if (!m_Storage) + { + return IoBuffer{}; + } if (auto ChunkIt = m_ChunkMap.find(ChunkId); ChunkIt != m_ChunkMap.end()) { @@ -429,6 +515,10 @@ ProjectStore::Oplog::IterateFileMap( std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn) { RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return; + } for (const auto& Kv : m_FileMap) { @@ -440,6 +530,10 @@ void ProjectStore::Oplog::IterateOplog(std::function<void(CbObject)>&& Handler) { RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return; + } std::vector<OplogEntryAddress> Entries; Entries.reserve(m_LatestOpMap.size()); @@ -463,6 +557,10 @@ std::optional<CbObject> ProjectStore::Oplog::GetOpByKey(const Oid& Key) { RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return {}; + } if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end()) { @@ -479,6 +577,10 @@ std::optional<CbObject> ProjectStore::Oplog::GetOpByIndex(int Index) { RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return {}; + } if (const auto AddressEntryIt = m_OpAddressMap.find(Index); AddressEntryIt != m_OpAddressMap.end()) { @@ -532,7 +634,10 @@ ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, Oid Chunk } uint32_t -ProjectStore::Oplog::RegisterOplogEntry(CbObject Core, const OplogEntry& OpEntry, UpdateType TypeOfUpdate) +ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, + CbObject Core, + const OplogEntry& OpEntry, + UpdateType TypeOfUpdate) { ZEN_TRACE_CPU("ProjectStore::Oplog::RegisterOplogEntry"); @@ -541,8 +646,6 @@ ProjectStore::Oplog::RegisterOplogEntry(CbObject Core, const OplogEntry& OpEntry // For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing // too many problems. Longer term we'll probably want to ensure we can do concurrent updates however - RwLock::ExclusiveLockScope OplogLock(m_OplogLock); - using namespace std::literals; // Update chunk id maps @@ -624,12 +727,15 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) { ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry"); - ZEN_ASSERT(m_Storage); - using namespace std::literals; const CbObject& Core = OpPackage.GetObject(); const uint32_t EntryId = AppendNewOplogEntry(Core); + if (EntryId == 0xffffffffu) + { + // The oplog has been deleted so just drop this + return EntryId; + } // Persist attachments after oplog entry so GC won't find attachments without references @@ -663,12 +769,16 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core) { ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry"); - ZEN_ASSERT(m_Storage); + RwLock::ExclusiveLockScope OplogLock(m_OplogLock); + if (!m_Storage) + { + return 0xffffffffu; + } using namespace std::literals; const OplogEntry OpEntry = m_Storage->AppendOp(Core); - const uint32_t EntryId = RegisterOplogEntry(Core, OpEntry, kUpdateNewEntry); + const uint32_t EntryId = RegisterOplogEntry(OplogLock, Core, OpEntry, kUpdateNewEntry); return EntryId; } @@ -764,7 +874,7 @@ ProjectStore::Project::BasePathForOplog(std::string_view OplogId) } ProjectStore::Oplog* -ProjectStore::Project::NewOplog(std::string_view OplogId) +ProjectStore::Project::NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath) { RwLock::ExclusiveLockScope _(m_ProjectLock); @@ -772,10 +882,12 @@ ProjectStore::Project::NewOplog(std::string_view OplogId) try { - Oplog* Log = - m_Oplogs.try_emplace(std::string{OplogId}, std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath)) - .first->second.get(); + Oplog* Log = m_Oplogs + .try_emplace(std::string{OplogId}, + std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath, MarkerPath)) + .first->second.get(); + Log->Write(); return Log; } catch (std::exception&) @@ -815,10 +927,11 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId) try { Oplog* Log = - m_Oplogs.try_emplace(std::string{OplogId}, std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath)) + m_Oplogs + .try_emplace(std::string{OplogId}, + std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath, std::filesystem::path{})) .first->second.get(); - - Log->ReplayLog(); + Log->Read(); return Log; } @@ -910,7 +1023,12 @@ ProjectStore::Project::Scrub(ScrubContext& Ctx) { OpenOplog(OpLogId); } - IterateOplogs([&](const Oplog& Ops) { Ops.Scrub(Ctx); }); + IterateOplogs([&](const Oplog& Ops) { + if (!Ops.IsExpired()) + { + Ops.Scrub(Ctx); + } + }); } void @@ -929,7 +1047,12 @@ ProjectStore::Project::GatherReferences(GcContext& GcCtx) { OpenOplog(OpLogId); } - IterateOplogs([&](Oplog& Ops) { Ops.GatherReferences(GcCtx); }); + IterateOplogs([&](Oplog& Ops) { + if (!Ops.IsExpired()) + { + Ops.GatherReferences(GcCtx); + } + }); } uint64_t @@ -1133,6 +1256,7 @@ ProjectStore::CollectGarbage(GcContext& GcCtx) ExpiredProjectCount); }); std::vector<Ref<Project>> ExpiredProjects; + std::vector<Ref<Project>> Projects; { RwLock::SharedLockScope _(m_ProjectsLock); @@ -1144,19 +1268,41 @@ ProjectStore::CollectGarbage(GcContext& GcCtx) ExpiredProjectCount++; continue; } + Projects.push_back(Kv.second); ProjectCount++; } } - if (ExpiredProjects.empty()) + if (!GcCtx.IsDeletionMode()) { - ZEN_DEBUG("garbage collect SKIPPED, for '{}', no expired projects found", m_ProjectBasePath.string()); + ZEN_DEBUG("garbage collect DISABLED, for '{}' ", m_ProjectBasePath.string()); return; } - if (!GcCtx.IsDeletionMode()) + for (const Ref<Project>& Project : Projects) { - ZEN_DEBUG("garbage collect DISABLED, for '{}' ", m_ProjectBasePath.string()); + std::vector<std::string> ExpiredOplogs; + { + RwLock::ExclusiveLockScope _(m_ProjectsLock); + Project->IterateOplogs([&ExpiredOplogs](ProjectStore::Oplog& Oplog) { + if (Oplog.IsExpired()) + { + ExpiredOplogs.push_back(Oplog.OplogId()); + } + }); + } + for (const std::string& OplogId : ExpiredOplogs) + { + ZEN_DEBUG("ProjectStore::CollectGarbage garbage collected oplog '{}' in project '{}'. Removing storage on disk", + OplogId, + Project->Identifier); + Project->DeleteOplog(OplogId); + } + } + + if (ExpiredProjects.empty()) + { + ZEN_DEBUG("garbage collect for '{}', no expired projects found", m_ProjectBasePath.string()); return; } @@ -2347,17 +2493,25 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) case HttpVerb::kPost: { + std::filesystem::path OplogMarkerPath; + IoBuffer Payload = Req.ServerRequest().ReadPayload(); + if (Payload.GetSize() > 0) + { + CbObject Params = LoadCompactBinaryObject(Payload); + OplogMarkerPath = Params["gcpath"sv].AsString(); + } + ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId); if (!OplogIt) { - if (!Project->NewOplog(OplogId)) + if (!Project->NewOplog(OplogId, OplogMarkerPath)) { // TODO: indicate why the operation failed! return Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError); } - ZEN_INFO("established oplog '{}/{}'", ProjectId, OplogId); + ZEN_INFO("established oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath); return Req.ServerRequest().WriteResponse(HttpResponseCode::Created); } @@ -2683,7 +2837,7 @@ TEST_CASE("project.store.lifetimes") EngineRootDir.string(), ProjectRootDir.string(), ProjectFilePath.string())); - ProjectStore::Oplog* Oplog = Project->NewOplog("oplog1"); + ProjectStore::Oplog* Oplog = Project->NewOplog("oplog1", {}); CHECK(Oplog != nullptr); std::filesystem::path DeletePath; @@ -2736,7 +2890,7 @@ TEST_CASE("project.store.gc") EngineRootDir.string(), Project1RootDir.string(), Project1FilePath.string())); - ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"); + ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1", {}); CHECK(Oplog != nullptr); Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {})); @@ -2752,7 +2906,7 @@ TEST_CASE("project.store.gc") EngineRootDir.string(), Project2RootDir.string(), Project2FilePath.string())); - ProjectStore::Oplog* Oplog = Project2->NewOplog("oplog1"); + ProjectStore::Oplog* Oplog = Project2->NewOplog("oplog1", {}); CHECK(Oplog != nullptr); Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {})); @@ -2821,7 +2975,7 @@ TEST_CASE("project.store.partial.read") EngineRootDir.string(), Project1RootDir.string(), Project1FilePath.string())); - ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv); + ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, {}); CHECK(Oplog != nullptr); Attachments[OpIds[0]] = {}; Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{77}); diff --git a/zenserver/projectstore.h b/zenserver/projectstore.h index 9d909de8c..8cebc5f6c 100644 --- a/zenserver/projectstore.h +++ b/zenserver/projectstore.h @@ -70,11 +70,18 @@ public: struct Oplog { - Oplog(std::string_view Id, Project* Project, CidStore& Store, std::filesystem::path BasePath); + Oplog(std::string_view Id, + Project* Project, + CidStore& Store, + std::filesystem::path BasePath, + const std::filesystem::path& MarkerPath); ~Oplog(); [[nodiscard]] static bool ExistsAt(std::filesystem::path BasePath); + void Read(); + void Write(); + void IterateFileMap(std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn); void IterateOplog(std::function<void(CbObject)>&& Fn); std::optional<CbObject> GetOpByKey(const Oid& Key); @@ -98,18 +105,6 @@ public: kUpdateReplay }; - /** Update tracking metadata for a new oplog entry - * - * This is used during replay (and gets called as part of new op append) - * - * Returns the oplog LSN assigned to the new entry, or kInvalidOp if the entry is rejected - */ - uint32_t RegisterOplogEntry(CbObject Core, const OplogEntry& OpEntry, UpdateType TypeOfUpdate); - - /** Scan oplog and register each entry, thus updating the in-memory tracking tables - */ - void ReplayLog(); - const std::string& OplogId() const { return m_OplogId; } const std::filesystem::path& TempPath() const { return m_TempPath; } @@ -126,6 +121,7 @@ public: return m_LatestOpMap.size(); } + bool IsExpired() const; std::filesystem::path PrepareForDelete(bool MoveFolder); private: @@ -141,6 +137,7 @@ public: Project* m_OuterProject = nullptr; CidStore& m_CidStore; std::filesystem::path m_BasePath; + std::filesystem::path m_MarkerPath; std::filesystem::path m_TempPath; mutable RwLock m_OplogLock; @@ -154,6 +151,21 @@ public: RefPtr<OplogStorage> m_Storage; std::string m_OplogId; + /** Scan oplog and register each entry, thus updating the in-memory tracking tables + */ + void ReplayLog(); + + /** Update tracking metadata for a new oplog entry + * + * This is used during replay (and gets called as part of new op append) + * + * Returns the oplog LSN assigned to the new entry, or kInvalidOp if the entry is rejected + */ + uint32_t RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, + CbObject Core, + const OplogEntry& OpEntry, + UpdateType TypeOfUpdate); + bool AddFileMapping(const RwLock::ExclusiveLockScope& OplogLock, Oid FileId, IoHash Hash, @@ -171,7 +183,7 @@ public: std::string ProjectRootDir; std::string ProjectFilePath; - Oplog* NewOplog(std::string_view OplogId); + Oplog* NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath); Oplog* OpenOplog(std::string_view OplogId); void DeleteOplog(std::string_view OplogId); void IterateOplogs(std::function<void(const Oplog&)>&& Fn) const; |