diff options
| author | Dan Engelbrecht <[email protected]> | 2022-12-14 13:31:17 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-12-14 04:31:17 -0800 |
| commit | 110c1561ca9372641760f8a1a4ad8dc318fef895 (patch) | |
| tree | 2aaf2b47a4460f823a6c8a88c6d6e2f99869cfdf | |
| parent | Changed so CompressedBuffer::DecodeRawHash returns IoHash just like on the UE... (diff) | |
| download | zen-110c1561ca9372641760f8a1a4ad8dc318fef895.tar.xz zen-110c1561ca9372641760f8a1a4ad8dc318fef895.zip | |
oplog level GC (#209)
Adds check for marker file supplied by UE to see if an oplog is expired (user has deleted the corresponding cooked folder).
Fixed concurrency vulnerabilities is project store related to oplogs.
* fix concurrency vulnerabilities
* propagate lifetime file path
* oplog level gc
* changelog
| -rw-r--r-- | CHANGELOG.md | 3 | ||||
| -rw-r--r-- | zenserver/projectstore.cpp | 212 | ||||
| -rw-r--r-- | zenserver/projectstore.h | 40 |
3 files changed, 212 insertions, 43 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 56f874dc9..37c73fc7a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,7 @@ ## +- Feature: Oplog level GC in project store. If gc marker file path is given by UE, oplogs will be GC:d when marker file is deleted (and GC is triggered) + +## 0.2.0 - Feature: Recording and playback of cache request with full data - both get and put operations can be replayed. Invoke via web request - `<host>/z$/exec$/start-recording?<disk-storage-path>` - `<host>/z$/exec$/stop-recording` diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index 3e242653a..5c7de2a43 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -274,10 +274,15 @@ private: ////////////////////////////////////////////////////////////////////////// -ProjectStore::Oplog::Oplog(std::string_view Id, Project* Project, CidStore& Store, std::filesystem::path BasePath) +ProjectStore::Oplog::Oplog(std::string_view Id, + Project* Project, + CidStore& Store, + std::filesystem::path BasePath, + const std::filesystem::path& MarkerPath) : m_OuterProject(Project) , m_CidStore(Store) , m_BasePath(BasePath) +, m_MarkerPath(MarkerPath) , m_OplogId(Id) { using namespace std::literals; @@ -348,6 +353,16 @@ ProjectStore::Oplog::TotalSize() const return 0; } +bool +ProjectStore::Oplog::IsExpired() const +{ + if (m_MarkerPath.empty()) + { + return false; + } + return !std::filesystem::exists(m_MarkerPath); +} + std::filesystem::path ProjectStore::Oplog::PrepareForDelete(bool MoveFolder) { @@ -373,19 +388,90 @@ ProjectStore::Oplog::PrepareForDelete(bool MoveFolder) bool ProjectStore::Oplog::ExistsAt(std::filesystem::path BasePath) { - return OplogStorage::Exists(BasePath); + using namespace std::literals; + + std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv; + return std::filesystem::is_regular_file(StateFilePath); +} + +void +ProjectStore::Oplog::Read() +{ + using namespace std::literals; + + std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv; + if (std::filesystem::is_regular_file(StateFilePath)) + { + ZEN_INFO("reading config for oplog '{}' in project '{}' from {}", m_OplogId, m_OuterProject->Identifier, StateFilePath); + + BasicFile Blob; + Blob.Open(StateFilePath, BasicFile::Mode::kRead); + + IoBuffer Obj = Blob.ReadAll(); + CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); + + if (ValidationError != CbValidateError::None) + { + ZEN_ERROR("validation error {} hit for '{}'", int(ValidationError), StateFilePath); + return; + } + + CbObject Cfg = LoadCompactBinaryObject(Obj); + + m_MarkerPath = Cfg["gcpath"sv].AsString(); + } + else + { + ZEN_INFO("config for oplog '{}' in project '{}' not found at {}. Assuming legacy store", + m_OplogId, + m_OuterProject->Identifier, + StateFilePath); + } + ReplayLog(); +} + +void +ProjectStore::Oplog::Write() +{ + using namespace std::literals; + + BinaryWriter Mem; + + CbObjectWriter Cfg; + + Cfg << "gcpath"sv << PathToUtf8(m_MarkerPath); + + Cfg.Save(Mem); + + std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv; + + ZEN_INFO("persisting config for oplog '{}' in project '{}' to {}", m_OplogId, m_OuterProject->Identifier, StateFilePath); + + BasicFile Blob; + Blob.Open(StateFilePath, BasicFile::Mode::kTruncate); + Blob.Write(Mem.Data(), Mem.Size(), 0); + Blob.Flush(); } void ProjectStore::Oplog::ReplayLog() { - m_Storage->ReplayLog([&](CbObject Op, const OplogEntry& OpEntry) { RegisterOplogEntry(Op, OpEntry, kUpdateReplay); }); + RwLock::ExclusiveLockScope OplogLock(m_OplogLock); + if (!m_Storage) + { + return; + } + m_Storage->ReplayLog([&](CbObject Op, const OplogEntry& OpEntry) { RegisterOplogEntry(OplogLock, Op, OpEntry, kUpdateReplay); }); } IoBuffer ProjectStore::Oplog::FindChunk(Oid ChunkId) { RwLock::SharedLockScope OplogLock(m_OplogLock); + if (!m_Storage) + { + return IoBuffer{}; + } if (auto ChunkIt = m_ChunkMap.find(ChunkId); ChunkIt != m_ChunkMap.end()) { @@ -429,6 +515,10 @@ ProjectStore::Oplog::IterateFileMap( std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn) { RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return; + } for (const auto& Kv : m_FileMap) { @@ -440,6 +530,10 @@ void ProjectStore::Oplog::IterateOplog(std::function<void(CbObject)>&& Handler) { RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return; + } std::vector<OplogEntryAddress> Entries; Entries.reserve(m_LatestOpMap.size()); @@ -463,6 +557,10 @@ std::optional<CbObject> ProjectStore::Oplog::GetOpByKey(const Oid& Key) { RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return {}; + } if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end()) { @@ -479,6 +577,10 @@ std::optional<CbObject> ProjectStore::Oplog::GetOpByIndex(int Index) { RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return {}; + } if (const auto AddressEntryIt = m_OpAddressMap.find(Index); AddressEntryIt != m_OpAddressMap.end()) { @@ -532,7 +634,10 @@ ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, Oid Chunk } uint32_t -ProjectStore::Oplog::RegisterOplogEntry(CbObject Core, const OplogEntry& OpEntry, UpdateType TypeOfUpdate) +ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, + CbObject Core, + const OplogEntry& OpEntry, + UpdateType TypeOfUpdate) { ZEN_TRACE_CPU("ProjectStore::Oplog::RegisterOplogEntry"); @@ -541,8 +646,6 @@ ProjectStore::Oplog::RegisterOplogEntry(CbObject Core, const OplogEntry& OpEntry // For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing // too many problems. Longer term we'll probably want to ensure we can do concurrent updates however - RwLock::ExclusiveLockScope OplogLock(m_OplogLock); - using namespace std::literals; // Update chunk id maps @@ -624,12 +727,15 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) { ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry"); - ZEN_ASSERT(m_Storage); - using namespace std::literals; const CbObject& Core = OpPackage.GetObject(); const uint32_t EntryId = AppendNewOplogEntry(Core); + if (EntryId == 0xffffffffu) + { + // The oplog has been deleted so just drop this + return EntryId; + } // Persist attachments after oplog entry so GC won't find attachments without references @@ -663,12 +769,16 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core) { ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry"); - ZEN_ASSERT(m_Storage); + RwLock::ExclusiveLockScope OplogLock(m_OplogLock); + if (!m_Storage) + { + return 0xffffffffu; + } using namespace std::literals; const OplogEntry OpEntry = m_Storage->AppendOp(Core); - const uint32_t EntryId = RegisterOplogEntry(Core, OpEntry, kUpdateNewEntry); + const uint32_t EntryId = RegisterOplogEntry(OplogLock, Core, OpEntry, kUpdateNewEntry); return EntryId; } @@ -764,7 +874,7 @@ ProjectStore::Project::BasePathForOplog(std::string_view OplogId) } ProjectStore::Oplog* -ProjectStore::Project::NewOplog(std::string_view OplogId) +ProjectStore::Project::NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath) { RwLock::ExclusiveLockScope _(m_ProjectLock); @@ -772,10 +882,12 @@ ProjectStore::Project::NewOplog(std::string_view OplogId) try { - Oplog* Log = - m_Oplogs.try_emplace(std::string{OplogId}, std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath)) - .first->second.get(); + Oplog* Log = m_Oplogs + .try_emplace(std::string{OplogId}, + std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath, MarkerPath)) + .first->second.get(); + Log->Write(); return Log; } catch (std::exception&) @@ -815,10 +927,11 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId) try { Oplog* Log = - m_Oplogs.try_emplace(std::string{OplogId}, std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath)) + m_Oplogs + .try_emplace(std::string{OplogId}, + std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath, std::filesystem::path{})) .first->second.get(); - - Log->ReplayLog(); + Log->Read(); return Log; } @@ -910,7 +1023,12 @@ ProjectStore::Project::Scrub(ScrubContext& Ctx) { OpenOplog(OpLogId); } - IterateOplogs([&](const Oplog& Ops) { Ops.Scrub(Ctx); }); + IterateOplogs([&](const Oplog& Ops) { + if (!Ops.IsExpired()) + { + Ops.Scrub(Ctx); + } + }); } void @@ -929,7 +1047,12 @@ ProjectStore::Project::GatherReferences(GcContext& GcCtx) { OpenOplog(OpLogId); } - IterateOplogs([&](Oplog& Ops) { Ops.GatherReferences(GcCtx); }); + IterateOplogs([&](Oplog& Ops) { + if (!Ops.IsExpired()) + { + Ops.GatherReferences(GcCtx); + } + }); } uint64_t @@ -1133,6 +1256,7 @@ ProjectStore::CollectGarbage(GcContext& GcCtx) ExpiredProjectCount); }); std::vector<Ref<Project>> ExpiredProjects; + std::vector<Ref<Project>> Projects; { RwLock::SharedLockScope _(m_ProjectsLock); @@ -1144,19 +1268,41 @@ ProjectStore::CollectGarbage(GcContext& GcCtx) ExpiredProjectCount++; continue; } + Projects.push_back(Kv.second); ProjectCount++; } } - if (ExpiredProjects.empty()) + if (!GcCtx.IsDeletionMode()) { - ZEN_DEBUG("garbage collect SKIPPED, for '{}', no expired projects found", m_ProjectBasePath.string()); + ZEN_DEBUG("garbage collect DISABLED, for '{}' ", m_ProjectBasePath.string()); return; } - if (!GcCtx.IsDeletionMode()) + for (const Ref<Project>& Project : Projects) { - ZEN_DEBUG("garbage collect DISABLED, for '{}' ", m_ProjectBasePath.string()); + std::vector<std::string> ExpiredOplogs; + { + RwLock::ExclusiveLockScope _(m_ProjectsLock); + Project->IterateOplogs([&ExpiredOplogs](ProjectStore::Oplog& Oplog) { + if (Oplog.IsExpired()) + { + ExpiredOplogs.push_back(Oplog.OplogId()); + } + }); + } + for (const std::string& OplogId : ExpiredOplogs) + { + ZEN_DEBUG("ProjectStore::CollectGarbage garbage collected oplog '{}' in project '{}'. Removing storage on disk", + OplogId, + Project->Identifier); + Project->DeleteOplog(OplogId); + } + } + + if (ExpiredProjects.empty()) + { + ZEN_DEBUG("garbage collect for '{}', no expired projects found", m_ProjectBasePath.string()); return; } @@ -2347,17 +2493,25 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) case HttpVerb::kPost: { + std::filesystem::path OplogMarkerPath; + IoBuffer Payload = Req.ServerRequest().ReadPayload(); + if (Payload.GetSize() > 0) + { + CbObject Params = LoadCompactBinaryObject(Payload); + OplogMarkerPath = Params["gcpath"sv].AsString(); + } + ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId); if (!OplogIt) { - if (!Project->NewOplog(OplogId)) + if (!Project->NewOplog(OplogId, OplogMarkerPath)) { // TODO: indicate why the operation failed! return Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError); } - ZEN_INFO("established oplog '{}/{}'", ProjectId, OplogId); + ZEN_INFO("established oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath); return Req.ServerRequest().WriteResponse(HttpResponseCode::Created); } @@ -2683,7 +2837,7 @@ TEST_CASE("project.store.lifetimes") EngineRootDir.string(), ProjectRootDir.string(), ProjectFilePath.string())); - ProjectStore::Oplog* Oplog = Project->NewOplog("oplog1"); + ProjectStore::Oplog* Oplog = Project->NewOplog("oplog1", {}); CHECK(Oplog != nullptr); std::filesystem::path DeletePath; @@ -2736,7 +2890,7 @@ TEST_CASE("project.store.gc") EngineRootDir.string(), Project1RootDir.string(), Project1FilePath.string())); - ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"); + ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1", {}); CHECK(Oplog != nullptr); Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {})); @@ -2752,7 +2906,7 @@ TEST_CASE("project.store.gc") EngineRootDir.string(), Project2RootDir.string(), Project2FilePath.string())); - ProjectStore::Oplog* Oplog = Project2->NewOplog("oplog1"); + ProjectStore::Oplog* Oplog = Project2->NewOplog("oplog1", {}); CHECK(Oplog != nullptr); Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {})); @@ -2821,7 +2975,7 @@ TEST_CASE("project.store.partial.read") EngineRootDir.string(), Project1RootDir.string(), Project1FilePath.string())); - ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv); + ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, {}); CHECK(Oplog != nullptr); Attachments[OpIds[0]] = {}; Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{77}); diff --git a/zenserver/projectstore.h b/zenserver/projectstore.h index 9d909de8c..8cebc5f6c 100644 --- a/zenserver/projectstore.h +++ b/zenserver/projectstore.h @@ -70,11 +70,18 @@ public: struct Oplog { - Oplog(std::string_view Id, Project* Project, CidStore& Store, std::filesystem::path BasePath); + Oplog(std::string_view Id, + Project* Project, + CidStore& Store, + std::filesystem::path BasePath, + const std::filesystem::path& MarkerPath); ~Oplog(); [[nodiscard]] static bool ExistsAt(std::filesystem::path BasePath); + void Read(); + void Write(); + void IterateFileMap(std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn); void IterateOplog(std::function<void(CbObject)>&& Fn); std::optional<CbObject> GetOpByKey(const Oid& Key); @@ -98,18 +105,6 @@ public: kUpdateReplay }; - /** Update tracking metadata for a new oplog entry - * - * This is used during replay (and gets called as part of new op append) - * - * Returns the oplog LSN assigned to the new entry, or kInvalidOp if the entry is rejected - */ - uint32_t RegisterOplogEntry(CbObject Core, const OplogEntry& OpEntry, UpdateType TypeOfUpdate); - - /** Scan oplog and register each entry, thus updating the in-memory tracking tables - */ - void ReplayLog(); - const std::string& OplogId() const { return m_OplogId; } const std::filesystem::path& TempPath() const { return m_TempPath; } @@ -126,6 +121,7 @@ public: return m_LatestOpMap.size(); } + bool IsExpired() const; std::filesystem::path PrepareForDelete(bool MoveFolder); private: @@ -141,6 +137,7 @@ public: Project* m_OuterProject = nullptr; CidStore& m_CidStore; std::filesystem::path m_BasePath; + std::filesystem::path m_MarkerPath; std::filesystem::path m_TempPath; mutable RwLock m_OplogLock; @@ -154,6 +151,21 @@ public: RefPtr<OplogStorage> m_Storage; std::string m_OplogId; + /** Scan oplog and register each entry, thus updating the in-memory tracking tables + */ + void ReplayLog(); + + /** Update tracking metadata for a new oplog entry + * + * This is used during replay (and gets called as part of new op append) + * + * Returns the oplog LSN assigned to the new entry, or kInvalidOp if the entry is rejected + */ + uint32_t RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, + CbObject Core, + const OplogEntry& OpEntry, + UpdateType TypeOfUpdate); + bool AddFileMapping(const RwLock::ExclusiveLockScope& OplogLock, Oid FileId, IoHash Hash, @@ -171,7 +183,7 @@ public: std::string ProjectRootDir; std::string ProjectFilePath; - Oplog* NewOplog(std::string_view OplogId); + Oplog* NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath); Oplog* OpenOplog(std::string_view OplogId); void DeleteOplog(std::string_view OplogId); void IterateOplogs(std::function<void(const Oplog&)>&& Fn) const; |