diff options
Diffstat (limited to 'zenserver/projectstore.cpp')
| -rw-r--r-- | zenserver/projectstore.cpp | 212 |
1 files changed, 183 insertions, 29 deletions
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index 3e242653a..5c7de2a43 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -274,10 +274,15 @@ private: ////////////////////////////////////////////////////////////////////////// -ProjectStore::Oplog::Oplog(std::string_view Id, Project* Project, CidStore& Store, std::filesystem::path BasePath) +ProjectStore::Oplog::Oplog(std::string_view Id, + Project* Project, + CidStore& Store, + std::filesystem::path BasePath, + const std::filesystem::path& MarkerPath) : m_OuterProject(Project) , m_CidStore(Store) , m_BasePath(BasePath) +, m_MarkerPath(MarkerPath) , m_OplogId(Id) { using namespace std::literals; @@ -348,6 +353,16 @@ ProjectStore::Oplog::TotalSize() const return 0; } +bool +ProjectStore::Oplog::IsExpired() const +{ + if (m_MarkerPath.empty()) + { + return false; + } + return !std::filesystem::exists(m_MarkerPath); +} + std::filesystem::path ProjectStore::Oplog::PrepareForDelete(bool MoveFolder) { @@ -373,19 +388,90 @@ ProjectStore::Oplog::PrepareForDelete(bool MoveFolder) bool ProjectStore::Oplog::ExistsAt(std::filesystem::path BasePath) { - return OplogStorage::Exists(BasePath); + using namespace std::literals; + + std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv; + return std::filesystem::is_regular_file(StateFilePath); +} + +void +ProjectStore::Oplog::Read() +{ + using namespace std::literals; + + std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv; + if (std::filesystem::is_regular_file(StateFilePath)) + { + ZEN_INFO("reading config for oplog '{}' in project '{}' from {}", m_OplogId, m_OuterProject->Identifier, StateFilePath); + + BasicFile Blob; + Blob.Open(StateFilePath, BasicFile::Mode::kRead); + + IoBuffer Obj = Blob.ReadAll(); + CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); + + if (ValidationError != CbValidateError::None) + { + ZEN_ERROR("validation error {} hit for '{}'", int(ValidationError), StateFilePath); + return; + } + + CbObject Cfg = LoadCompactBinaryObject(Obj); + + m_MarkerPath = Cfg["gcpath"sv].AsString(); + } + else + { + ZEN_INFO("config for oplog '{}' in project '{}' not found at {}. Assuming legacy store", + m_OplogId, + m_OuterProject->Identifier, + StateFilePath); + } + ReplayLog(); +} + +void +ProjectStore::Oplog::Write() +{ + using namespace std::literals; + + BinaryWriter Mem; + + CbObjectWriter Cfg; + + Cfg << "gcpath"sv << PathToUtf8(m_MarkerPath); + + Cfg.Save(Mem); + + std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv; + + ZEN_INFO("persisting config for oplog '{}' in project '{}' to {}", m_OplogId, m_OuterProject->Identifier, StateFilePath); + + BasicFile Blob; + Blob.Open(StateFilePath, BasicFile::Mode::kTruncate); + Blob.Write(Mem.Data(), Mem.Size(), 0); + Blob.Flush(); } void ProjectStore::Oplog::ReplayLog() { - m_Storage->ReplayLog([&](CbObject Op, const OplogEntry& OpEntry) { RegisterOplogEntry(Op, OpEntry, kUpdateReplay); }); + RwLock::ExclusiveLockScope OplogLock(m_OplogLock); + if (!m_Storage) + { + return; + } + m_Storage->ReplayLog([&](CbObject Op, const OplogEntry& OpEntry) { RegisterOplogEntry(OplogLock, Op, OpEntry, kUpdateReplay); }); } IoBuffer ProjectStore::Oplog::FindChunk(Oid ChunkId) { RwLock::SharedLockScope OplogLock(m_OplogLock); + if (!m_Storage) + { + return IoBuffer{}; + } if (auto ChunkIt = m_ChunkMap.find(ChunkId); ChunkIt != m_ChunkMap.end()) { @@ -429,6 +515,10 @@ ProjectStore::Oplog::IterateFileMap( std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn) { RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return; + } for (const auto& Kv : m_FileMap) { @@ -440,6 +530,10 @@ void ProjectStore::Oplog::IterateOplog(std::function<void(CbObject)>&& Handler) { RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return; + } std::vector<OplogEntryAddress> Entries; Entries.reserve(m_LatestOpMap.size()); @@ -463,6 +557,10 @@ std::optional<CbObject> ProjectStore::Oplog::GetOpByKey(const Oid& Key) { RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return {}; + } if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end()) { @@ -479,6 +577,10 @@ std::optional<CbObject> ProjectStore::Oplog::GetOpByIndex(int Index) { RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return {}; + } if (const auto AddressEntryIt = m_OpAddressMap.find(Index); AddressEntryIt != m_OpAddressMap.end()) { @@ -532,7 +634,10 @@ ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, Oid Chunk } uint32_t -ProjectStore::Oplog::RegisterOplogEntry(CbObject Core, const OplogEntry& OpEntry, UpdateType TypeOfUpdate) +ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, + CbObject Core, + const OplogEntry& OpEntry, + UpdateType TypeOfUpdate) { ZEN_TRACE_CPU("ProjectStore::Oplog::RegisterOplogEntry"); @@ -541,8 +646,6 @@ ProjectStore::Oplog::RegisterOplogEntry(CbObject Core, const OplogEntry& OpEntry // For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing // too many problems. Longer term we'll probably want to ensure we can do concurrent updates however - RwLock::ExclusiveLockScope OplogLock(m_OplogLock); - using namespace std::literals; // Update chunk id maps @@ -624,12 +727,15 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) { ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry"); - ZEN_ASSERT(m_Storage); - using namespace std::literals; const CbObject& Core = OpPackage.GetObject(); const uint32_t EntryId = AppendNewOplogEntry(Core); + if (EntryId == 0xffffffffu) + { + // The oplog has been deleted so just drop this + return EntryId; + } // Persist attachments after oplog entry so GC won't find attachments without references @@ -663,12 +769,16 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core) { ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry"); - ZEN_ASSERT(m_Storage); + RwLock::ExclusiveLockScope OplogLock(m_OplogLock); + if (!m_Storage) + { + return 0xffffffffu; + } using namespace std::literals; const OplogEntry OpEntry = m_Storage->AppendOp(Core); - const uint32_t EntryId = RegisterOplogEntry(Core, OpEntry, kUpdateNewEntry); + const uint32_t EntryId = RegisterOplogEntry(OplogLock, Core, OpEntry, kUpdateNewEntry); return EntryId; } @@ -764,7 +874,7 @@ ProjectStore::Project::BasePathForOplog(std::string_view OplogId) } ProjectStore::Oplog* -ProjectStore::Project::NewOplog(std::string_view OplogId) +ProjectStore::Project::NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath) { RwLock::ExclusiveLockScope _(m_ProjectLock); @@ -772,10 +882,12 @@ ProjectStore::Project::NewOplog(std::string_view OplogId) try { - Oplog* Log = - m_Oplogs.try_emplace(std::string{OplogId}, std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath)) - .first->second.get(); + Oplog* Log = m_Oplogs + .try_emplace(std::string{OplogId}, + std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath, MarkerPath)) + .first->second.get(); + Log->Write(); return Log; } catch (std::exception&) @@ -815,10 +927,11 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId) try { Oplog* Log = - m_Oplogs.try_emplace(std::string{OplogId}, std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath)) + m_Oplogs + .try_emplace(std::string{OplogId}, + std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath, std::filesystem::path{})) .first->second.get(); - - Log->ReplayLog(); + Log->Read(); return Log; } @@ -910,7 +1023,12 @@ ProjectStore::Project::Scrub(ScrubContext& Ctx) { OpenOplog(OpLogId); } - IterateOplogs([&](const Oplog& Ops) { Ops.Scrub(Ctx); }); + IterateOplogs([&](const Oplog& Ops) { + if (!Ops.IsExpired()) + { + Ops.Scrub(Ctx); + } + }); } void @@ -929,7 +1047,12 @@ ProjectStore::Project::GatherReferences(GcContext& GcCtx) { OpenOplog(OpLogId); } - IterateOplogs([&](Oplog& Ops) { Ops.GatherReferences(GcCtx); }); + IterateOplogs([&](Oplog& Ops) { + if (!Ops.IsExpired()) + { + Ops.GatherReferences(GcCtx); + } + }); } uint64_t @@ -1133,6 +1256,7 @@ ProjectStore::CollectGarbage(GcContext& GcCtx) ExpiredProjectCount); }); std::vector<Ref<Project>> ExpiredProjects; + std::vector<Ref<Project>> Projects; { RwLock::SharedLockScope _(m_ProjectsLock); @@ -1144,19 +1268,41 @@ ProjectStore::CollectGarbage(GcContext& GcCtx) ExpiredProjectCount++; continue; } + Projects.push_back(Kv.second); ProjectCount++; } } - if (ExpiredProjects.empty()) + if (!GcCtx.IsDeletionMode()) { - ZEN_DEBUG("garbage collect SKIPPED, for '{}', no expired projects found", m_ProjectBasePath.string()); + ZEN_DEBUG("garbage collect DISABLED, for '{}' ", m_ProjectBasePath.string()); return; } - if (!GcCtx.IsDeletionMode()) + for (const Ref<Project>& Project : Projects) { - ZEN_DEBUG("garbage collect DISABLED, for '{}' ", m_ProjectBasePath.string()); + std::vector<std::string> ExpiredOplogs; + { + RwLock::ExclusiveLockScope _(m_ProjectsLock); + Project->IterateOplogs([&ExpiredOplogs](ProjectStore::Oplog& Oplog) { + if (Oplog.IsExpired()) + { + ExpiredOplogs.push_back(Oplog.OplogId()); + } + }); + } + for (const std::string& OplogId : ExpiredOplogs) + { + ZEN_DEBUG("ProjectStore::CollectGarbage garbage collected oplog '{}' in project '{}'. Removing storage on disk", + OplogId, + Project->Identifier); + Project->DeleteOplog(OplogId); + } + } + + if (ExpiredProjects.empty()) + { + ZEN_DEBUG("garbage collect for '{}', no expired projects found", m_ProjectBasePath.string()); return; } @@ -2347,17 +2493,25 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) case HttpVerb::kPost: { + std::filesystem::path OplogMarkerPath; + IoBuffer Payload = Req.ServerRequest().ReadPayload(); + if (Payload.GetSize() > 0) + { + CbObject Params = LoadCompactBinaryObject(Payload); + OplogMarkerPath = Params["gcpath"sv].AsString(); + } + ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId); if (!OplogIt) { - if (!Project->NewOplog(OplogId)) + if (!Project->NewOplog(OplogId, OplogMarkerPath)) { // TODO: indicate why the operation failed! return Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError); } - ZEN_INFO("established oplog '{}/{}'", ProjectId, OplogId); + ZEN_INFO("established oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath); return Req.ServerRequest().WriteResponse(HttpResponseCode::Created); } @@ -2683,7 +2837,7 @@ TEST_CASE("project.store.lifetimes") EngineRootDir.string(), ProjectRootDir.string(), ProjectFilePath.string())); - ProjectStore::Oplog* Oplog = Project->NewOplog("oplog1"); + ProjectStore::Oplog* Oplog = Project->NewOplog("oplog1", {}); CHECK(Oplog != nullptr); std::filesystem::path DeletePath; @@ -2736,7 +2890,7 @@ TEST_CASE("project.store.gc") EngineRootDir.string(), Project1RootDir.string(), Project1FilePath.string())); - ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"); + ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1", {}); CHECK(Oplog != nullptr); Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {})); @@ -2752,7 +2906,7 @@ TEST_CASE("project.store.gc") EngineRootDir.string(), Project2RootDir.string(), Project2FilePath.string())); - ProjectStore::Oplog* Oplog = Project2->NewOplog("oplog1"); + ProjectStore::Oplog* Oplog = Project2->NewOplog("oplog1", {}); CHECK(Oplog != nullptr); Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {})); @@ -2821,7 +2975,7 @@ TEST_CASE("project.store.partial.read") EngineRootDir.string(), Project1RootDir.string(), Project1FilePath.string())); - ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv); + ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, {}); CHECK(Oplog != nullptr); Attachments[OpIds[0]] = {}; Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{77}); |