diff options
| author | Dan Engelbrecht <[email protected]> | 2025-08-28 13:19:35 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2025-08-28 13:19:35 +0200 |
| commit | d08f360e4e13df816fafbfb9ea236a02506de27c (patch) | |
| tree | 75054aa6bda2f6be783a903b35cf16a609a0cb76 /src/zenserver/projectstore | |
| parent | 5.7.0 (diff) | |
| download | zen-de/reduce-project-store-memory.tar.xz zen-de/reduce-project-store-memory.zip | |
rework lifetime management for oplogde/reduce-project-store-memory
unload inactive oplogs from memory during GC
Diffstat (limited to 'src/zenserver/projectstore')
| -rw-r--r-- | src/zenserver/projectstore/httpprojectstore.cpp | 38 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 422 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 58 |
3 files changed, 288 insertions, 230 deletions
diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp index 317a419eb..cb12d7ce9 100644 --- a/src/zenserver/projectstore/httpprojectstore.cpp +++ b/src/zenserver/projectstore/httpprojectstore.cpp @@ -205,8 +205,8 @@ namespace { { for (const std::string& OpLogId : OpLogs) { - ProjectStore::Oplog* Oplog = Project.OpenOplog(OpLogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); - if (Oplog != nullptr) + Ref<ProjectStore::Oplog> Oplog = Project.OpenOplog(OpLogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); + if (Oplog) { CbWriteOplog(CidStore, *Oplog, Details, OpDetails, AttachmentDetails, Cbo); } @@ -507,7 +507,7 @@ HttpProjectService::HandleChunkBatchRequest(HttpRouterRequest& Req) } Project->TouchProject(); - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); @@ -580,7 +580,7 @@ HttpProjectService::HandleChunkBatchRequest(HttpRouterRequest& Req) for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex) { const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex]; - IoBuffer FoundChunk = FoundLog->FindChunk(RequestedChunk.ChunkId, nullptr); + IoBuffer FoundChunk = FoundLog->FindChunk(Project->RootDir, RequestedChunk.ChunkId, nullptr); if (FoundChunk) { if (RequestedChunk.Offset > 0 || RequestedChunk.RequestBytes < uint64_t(-1)) @@ -1039,7 +1039,7 @@ HttpProjectService::HandleOplogOpPrepRequest(HttpRouterRequest& Req) } Project->TouchProject(); - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); @@ -1118,7 +1118,7 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req) } Project->TouchProject(); - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); @@ -1275,7 +1275,7 @@ HttpProjectService::HandleOplogValidateRequest(HttpRouterRequest& Req) } Project->TouchProject(); - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, @@ -1286,8 +1286,8 @@ HttpProjectService::HandleOplogValidateRequest(HttpRouterRequest& Req) ProjectStore::Oplog& Oplog = *FoundLog; - std::atomic_bool CancelFlag = false; - ProjectStore::Oplog::ValidationResult Result = Oplog.Validate(CancelFlag, &GetSmallWorkerPool(EWorkloadType::Burst)); + std::atomic_bool CancelFlag = false; + ProjectStore::Oplog::ValidationResult Result = Oplog.Validate(Project->RootDir, CancelFlag, &GetSmallWorkerPool(EWorkloadType::Burst)); tsl::robin_map<Oid, std::string, Oid::Hasher> KeyNameLookup; KeyNameLookup.reserve(Result.OpKeys.size()); for (const auto& It : Result.OpKeys) @@ -1386,7 +1386,7 @@ HttpProjectService::HandleOpLogOpRequest(HttpRouterRequest& Req) } Project->TouchProject(); - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); @@ -1483,7 +1483,7 @@ HttpProjectService::HandleOpLogRequest(HttpRouterRequest& Req) { case HttpVerb::kGet: { - ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); + Ref<ProjectStore::Oplog> OplogIt = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); if (!OplogIt) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, @@ -1517,7 +1517,7 @@ HttpProjectService::HandleOpLogRequest(HttpRouterRequest& Req) OplogMarkerPath = Params["gcpath"sv].AsString(); } - ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); + Ref<ProjectStore::Oplog> OplogIt = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); if (!OplogIt) { if (!Project->NewOplog(OplogId, OplogMarkerPath)) @@ -1554,7 +1554,7 @@ HttpProjectService::HandleOpLogRequest(HttpRouterRequest& Req) OplogMarkerPath = Params["gcpath"sv].AsString(); } - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); if (!FoundLog) { if (!Project->NewOplog(OplogId, OplogMarkerPath)) @@ -1604,7 +1604,7 @@ HttpProjectService::HandleOpLogRequest(HttpRouterRequest& Req) } std::optional<OplogReferencedSet> -LoadReferencedSet(ProjectStore::Oplog& Log) +LoadReferencedSet(ProjectStore::Project& Project, ProjectStore::Oplog& Log) { using namespace std::literals; @@ -1622,7 +1622,7 @@ LoadReferencedSet(ProjectStore::Oplog& Log) return std::optional<OplogReferencedSet>(); } - return OplogReferencedSet::LoadFromChunk(Log.FindChunk(ChunkId, nullptr)); + return OplogReferencedSet::LoadFromChunk(Log.FindChunk(Project.RootDir, ChunkId, nullptr)); } void @@ -1644,7 +1644,7 @@ HttpProjectService::HandleOpLogEntriesRequest(HttpRouterRequest& Req) } Project->TouchProject(); - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); @@ -1704,7 +1704,7 @@ HttpProjectService::HandleOpLogEntriesRequest(HttpRouterRequest& Req) std::optional<OplogReferencedSet> ReferencedSet; if (auto TrimString = Params.GetValue("trim_by_referencedset"); TrimString == "true") { - ReferencedSet = LoadReferencedSet(*FoundLog); + ReferencedSet = LoadReferencedSet(*Project, *FoundLog); } Response.BeginArray("entries"sv); @@ -2135,7 +2135,7 @@ HttpProjectService::HandleOplogDetailsRequest(HttpRouterRequest& Req) return HttpReq.WriteResponse(HttpResponseCode::NotFound); } - ProjectStore::Oplog* FoundLog = FoundProject->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); + Ref<ProjectStore::Oplog> FoundLog = FoundProject->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); @@ -2190,7 +2190,7 @@ HttpProjectService::HandleOplogOpDetailsRequest(HttpRouterRequest& Req) return HttpReq.WriteResponse(HttpResponseCode::NotFound); } - ProjectStore::Oplog* FoundLog = FoundProject->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); + Ref<ProjectStore::Oplog> FoundLog = FoundProject->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index a5ab24cfb..a43e07bb4 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -487,7 +487,7 @@ struct ProjectStore::OplogStorage : public RefCounted ~OplogStorage() { ZEN_INFO("oplog '{}/{}': closing oplog storage at {}", - m_OwnerOplog->GetOuterProject()->Identifier, + m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), m_OplogStoragePath); try @@ -499,7 +499,7 @@ struct ProjectStore::OplogStorage : public RefCounted catch (const std::exception& Ex) { ZEN_WARN("oplog '{}/{}': flushing oplog at '{}' failed. Reason: '{}'", - m_OwnerOplog->GetOuterProject()->Identifier, + m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), m_OplogStoragePath, Ex.what()); @@ -553,7 +553,7 @@ struct ProjectStore::OplogStorage : public RefCounted if (IsCreate) { ZEN_INFO("oplog '{}/{}': initializing storage at '{}'", - m_OwnerOplog->GetOuterProject()->Identifier, + m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), m_OplogStoragePath); @@ -563,7 +563,7 @@ struct ProjectStore::OplogStorage : public RefCounted else { ZEN_INFO("oplog '{}/{}': opening storage at '{}'", - m_OwnerOplog->GetOuterProject()->Identifier, + m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), m_OplogStoragePath); } @@ -615,7 +615,7 @@ struct ProjectStore::OplogStorage : public RefCounted ZEN_TRACE_CPU("Store::OplogStorage::Compact"); ZEN_INFO("oplog '{}/{}': compacting at '{}'", - m_OwnerOplog->GetOuterProject()->Identifier, + m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), m_OplogStoragePath); @@ -753,7 +753,7 @@ struct ProjectStore::OplogStorage : public RefCounted } ZEN_INFO("oplog '{}/{}': compact completed in {} - Max LSN# {}, New size: {}, old size {}.", - m_OwnerOplog->GetOuterProject()->Identifier, + m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()), m_MaxLsn.load(), @@ -885,7 +885,7 @@ struct ProjectStore::OplogStorage : public RefCounted if (OpCoreHash != ExpectedOpCoreHash) { ZEN_WARN("oplog '{}/{}': skipping bad checksum op - {}. Expected: {}, found: {}", - m_OwnerOplog->GetOuterProject()->Identifier, + m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), LogEntry.OpKeyHash, ExpectedOpCoreHash, @@ -895,7 +895,7 @@ struct ProjectStore::OplogStorage : public RefCounted Err != CbValidateError::None) { ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Error: '{}'", - m_OwnerOplog->GetOuterProject()->Identifier, + m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), LogEntry.OpKeyHash, ToString(Err)); @@ -914,7 +914,7 @@ struct ProjectStore::OplogStorage : public RefCounted m_NextOpsOffset = NextOpFileOffset; ZEN_INFO("oplog '{}/{}': replay from '{}' completed in {} - Max LSN# {}, Next offset: {}, {} tombstones, {} invalid entries", - m_OwnerOplog->GetOuterProject()->Identifier, + m_OwnerOplog->GetOuterProjectIdentifier(), m_OwnerOplog->OplogId(), m_OplogStoragePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs()), @@ -1105,12 +1105,14 @@ private: ////////////////////////////////////////////////////////////////////////// -ProjectStore::Oplog::Oplog(std::string_view Id, - Project* Project, +ProjectStore::Oplog::Oplog(const LoggerRef& InLog, + std::string_view ProjectIdentifier, + std::string_view Id, CidStore& Store, - std::filesystem::path BasePath, + const std::filesystem::path& BasePath, const std::filesystem::path& MarkerPath) -: m_OuterProject(Project) +: m_Log(InLog) +, m_OuterProjectId(ProjectIdentifier) , m_OplogId(Id) , m_CidStore(Store) , m_BasePath(BasePath) @@ -1220,7 +1222,7 @@ ProjectStore::Oplog::Scrub(ScrubContext& Ctx) if (Ctx.RunRecovery()) { ZEN_WARN("oplog '{}/{}': scrubbing found {} bad ops in oplog @ '{}', these will be removed from the index", - m_OuterProject->Identifier, + m_OuterProjectId, m_OplogId, BadEntryKeys.size(), m_BasePath); @@ -1244,7 +1246,7 @@ ProjectStore::Oplog::Scrub(ScrubContext& Ctx) else { ZEN_WARN("oplog '{}/{}': scrubbing found {} bad ops in oplog @ '{}' but no cleanup will be performed", - m_OuterProject->Identifier, + m_OuterProjectId, m_OplogId, BadEntryKeys.size(), m_BasePath); @@ -1344,7 +1346,7 @@ ProjectStore::Oplog::Read() ZEN_TRACE_CPU("Oplog::Read"); ZEN_LOG_SCOPE("Oplog::Read '{}'", m_OplogId); - ZEN_DEBUG("oplog '{}': reading config from '{}'", m_OuterProject->Identifier, m_OplogId, m_BasePath); + ZEN_DEBUG("oplog '{}': reading config from '{}'", m_OuterProjectId, m_OplogId, m_BasePath); std::optional<CbObject> Config = ReadStateFile(m_BasePath, [this]() { return Log(); }); if (Config.has_value()) @@ -1420,7 +1422,7 @@ ProjectStore::Oplog::Write() std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv; - ZEN_INFO("oplog '{}/{}': persisting config to '{}'", m_OuterProject->Identifier, m_OplogId, StateFilePath); + ZEN_INFO("oplog '{}/{}': persisting config to '{}'", m_OuterProjectId, m_OplogId, StateFilePath); TemporaryFile::SafeWriteFile(StateFilePath, Mem.GetView()); } @@ -1472,6 +1474,30 @@ ProjectStore::Oplog::Reset() return true; } +bool +ProjectStore::Oplog::CanUnload() +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + RwLock::SharedLockScope _(m_OplogLock); + + uint64_t LogCount = m_Storage->LogCount(); + if (m_LogFlushPosition != LogCount) + { + return false; // The oplog is not flushed so likely this is an active oplog + } + + if (!m_PendingPrepOpAttachments.empty()) + { + return false; // We have a pending oplog prep operation in flight + } + if (m_UpdateCaptureRefCounter > 0) + { + return false; // GC capture is enable for the oplog + } + return true; +} + std::optional<CbObject> ProjectStore::Oplog::ReadStateFile(const std::filesystem::path& BasePath, std::function<LoggerRef()>&& Log) { @@ -1482,7 +1508,7 @@ ProjectStore::Oplog::ReadStateFile(const std::filesystem::path& BasePath, std::f std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv; if (IsFile(StateFilePath)) { - // ZEN_INFO("oplog '{}/{}': config read from '{}'", m_OuterProject->Identifier, m_OplogId, StateFilePath); + // ZEN_INFO("oplog '{}/{}': config read from '{}'", m_OuterProjectId, m_OplogId, StateFilePath); BasicFile Blob; Blob.Open(StateFilePath, BasicFile::Mode::kRead); @@ -1503,7 +1529,9 @@ ProjectStore::Oplog::ReadStateFile(const std::filesystem::path& BasePath, std::f } ProjectStore::Oplog::ValidationResult -ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPool* OptionalWorkerPool) +ProjectStore::Oplog::Validate(const std::filesystem::path& ProjectRootDir, + std::atomic_bool& IsCancelledFlag, + WorkerThreadPool* OptionalWorkerPool) { ZEN_MEMSCOPE(GetProjectstoreTag()); @@ -1566,7 +1594,7 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo { if (File.Hash == IoHash::Zero) { - std::filesystem::path FilePath = m_OuterProject->RootDir / File.ServerPath; + std::filesystem::path FilePath = ProjectRootDir / File.ServerPath; if (!IsFile(FilePath)) { ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); }); @@ -1649,12 +1677,12 @@ ProjectStore::Oplog::WriteIndexSnapshot() ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Oplog::WriteIndexSnapshot"); - ZEN_DEBUG("oplog '{}/{}': write store snapshot at '{}'", m_OuterProject->Identifier, m_OplogId, m_BasePath); + ZEN_DEBUG("oplog '{}/{}': write store snapshot at '{}'", m_OuterProjectId, m_OplogId, m_BasePath); uint64_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("oplog '{}/{}': wrote store snapshot for '{}' containing {} entries in {}", - m_OuterProject->Identifier, + m_OuterProjectId, m_OplogId, m_BasePath, EntryCount, @@ -1790,7 +1818,7 @@ ProjectStore::Oplog::WriteIndexSnapshot() } catch (const std::exception& Err) { - ZEN_WARN("oplog '{}/{}': snapshot FAILED, reason: '{}'", m_OuterProject->Identifier, m_OplogId, Err.what()); + ZEN_WARN("oplog '{}/{}': snapshot FAILED, reason: '{}'", m_OuterProjectId, m_OplogId, Err.what()); } } @@ -1807,7 +1835,7 @@ ProjectStore::Oplog::ReadIndexSnapshot() Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("oplog '{}/{}': index read from '{}' containing {} entries in {}", - m_OuterProject->Identifier, + m_OuterProjectId, m_OplogId, IndexPath, EntryCount, @@ -1837,7 +1865,7 @@ ProjectStore::Oplog::ReadIndexSnapshot() if (Header.Checksum != Checksum) { ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'. Checksum mismatch. Expected: {}, Found: {}", - m_OuterProject->Identifier, + m_OuterProjectId, m_OplogId, IndexPath, Header.Checksum, @@ -1848,7 +1876,7 @@ ProjectStore::Oplog::ReadIndexSnapshot() if (Header.LatestOpMapCount + Header.ChunkMapCount + Header.MetaMapCount + Header.FileMapCount != Header.KeyCount) { ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'. Key count mismatch. Expected: {}, Found: {}", - m_OuterProject->Identifier, + m_OuterProjectId, m_OplogId, IndexPath, Header.LatestOpMapCount + Header.ChunkMapCount + Header.MetaMapCount + Header.FileMapCount, @@ -1952,7 +1980,7 @@ ProjectStore::Oplog::ReadIndexSnapshot() } else { - ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'", m_OuterProject->Identifier, m_OplogId, IndexPath); + ZEN_WARN("oplog '{}/{}': skipping invalid index file '{}'", m_OuterProjectId, m_OplogId, IndexPath); } } } @@ -1965,7 +1993,7 @@ ProjectStore::Oplog::ReadIndexSnapshot() m_FileMap.clear(); m_LogFlushPosition = 0; ZEN_ERROR("oplog '{}/{}': failed reading index snapshot from '{}'. Reason: '{}'", - m_OuterProject->Identifier, + m_OuterProjectId, m_OplogId, IndexPath, Ex.what()); @@ -2060,7 +2088,7 @@ ProjectStore::Oplog::Compact(RwLock::ExclusiveLockScope&, bool DryRun, bool Reta ZEN_INFO("{} oplog '{}/{}': Compacted in {}. New size: {}, freeing: {}", LogPrefix, - m_OuterProject->Identifier, + m_OuterProjectId, m_OplogId, NiceTimeSpanMs(Timer.GetElapsedTimeMs()), NiceBytes(PostSize), @@ -2090,7 +2118,8 @@ ProjectStore::Oplog::IterateChunks(std::span<IoHash> RawHashes, } bool -ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, +ProjectStore::Oplog::IterateChunks(const std::filesystem::path& ProjectRootDir, + std::span<Oid> ChunkIds, bool IncludeModTag, const std::function<bool(size_t Index, const IoBuffer& Payload, uint64_t ModTag)>& AsyncCallback, WorkerThreadPool* OptionalWorkerPool, @@ -2120,7 +2149,7 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, else if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end()) { FileChunkIndexes.push_back(ChunkIndex); - FileChunkPaths.emplace_back(m_OuterProject->RootDir / FileIt->second.ServerPath); + FileChunkPaths.emplace_back(ProjectRootDir / FileIt->second.ServerPath); } } } @@ -2165,7 +2194,7 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, catch (const std::exception& Ex) { ZEN_WARN("oplog '{}/{}': exception caught when iterating file chunk {}, path '{}'. Reason: '{}'", - m_OuterProject->Identifier, + m_OuterProjectId, m_OplogId, FileChunkIndex, FilePath, @@ -2235,7 +2264,7 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, } IoBuffer -ProjectStore::Oplog::FindChunk(const Oid& ChunkId, uint64_t* OptOutModificationTag) +ProjectStore::Oplog::FindChunk(const std::filesystem::path& ProjectRootDir, const Oid& ChunkId, uint64_t* OptOutModificationTag) { RwLock::SharedLockScope OplogLock(m_OplogLock); if (!m_Storage) @@ -2258,7 +2287,7 @@ ProjectStore::Oplog::FindChunk(const Oid& ChunkId, uint64_t* OptOutModificationT if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end()) { - std::filesystem::path FilePath = m_OuterProject->RootDir / FileIt->second.ServerPath; + std::filesystem::path FilePath = ProjectRootDir / FileIt->second.ServerPath; OplogLock.ReleaseNow(); @@ -2291,7 +2320,7 @@ ProjectStore::Oplog::FindChunk(const Oid& ChunkId, uint64_t* OptOutModificationT } std::vector<ProjectStore::Oplog::ChunkInfo> -ProjectStore::Oplog::GetAllChunksInfo() +ProjectStore::Oplog::GetAllChunksInfo(const std::filesystem::path& ProjectRootDir) { ZEN_MEMSCOPE(GetProjectstoreTag()); @@ -2322,7 +2351,7 @@ ProjectStore::Oplog::GetAllChunksInfo() for (ChunkInfo& Info : InfoArray) { - if (IoBuffer Chunk = FindChunk(Info.ChunkId, nullptr)) + if (IoBuffer Chunk = FindChunk(ProjectRootDir, Info.ChunkId, nullptr)) { Info.ChunkSize = Chunk.GetSize(); } @@ -2476,7 +2505,7 @@ ProjectStore::Oplog::GetAttachmentsLocked(std::vector<IoHash>& OutAttachments, b { m_MetaValid = false; ZEN_WARN("oplog '{}/{}': unable to set meta data meta path: '{}'. Reason: '{}'", - m_OuterProject->Identifier, + m_OuterProjectId, m_OplogId, MetaPath, Ec.message()); @@ -2840,7 +2869,7 @@ ProjectStore::Oplog::GetMapping(CbObjectView Core) Oid Id = PackageObj["id"sv].AsObjectId(); IoHash Hash = PackageObj["data"sv].AsBinaryAttachment(); Result.Chunks.emplace_back(ChunkMapping{Id, Hash}); - ZEN_DEBUG("oplog {}/{}: package data {} -> {}", m_OuterProject->Identifier, m_OplogId, Id, Hash); + ZEN_DEBUG("oplog {}/{}: package data {} -> {}", m_OuterProjectId, m_OplogId, Id, Hash); continue; } if (FieldName == "bulkdata"sv) @@ -2852,7 +2881,7 @@ ProjectStore::Oplog::GetMapping(CbObjectView Core) Oid Id = BulkObj["id"sv].AsObjectId(); IoHash Hash = BulkObj["data"sv].AsBinaryAttachment(); Result.Chunks.emplace_back(ChunkMapping{Id, Hash}); - ZEN_DEBUG("oplog {}/{}: bulkdata {} -> {}", m_OuterProject->Identifier, m_OplogId, Id, Hash); + ZEN_DEBUG("oplog {}/{}: bulkdata {} -> {}", m_OuterProjectId, m_OplogId, Id, Hash); } continue; } @@ -2865,7 +2894,7 @@ ProjectStore::Oplog::GetMapping(CbObjectView Core) Oid Id = PackageDataObj["id"sv].AsObjectId(); IoHash Hash = PackageDataObj["data"sv].AsBinaryAttachment(); Result.Chunks.emplace_back(ChunkMapping{Id, Hash}); - ZEN_DEBUG("oplog {}/{}: package {} -> {}", m_OuterProject->Identifier, m_OplogId, Id, Hash); + ZEN_DEBUG("oplog {}/{}: package {} -> {}", m_OuterProjectId, m_OplogId, Id, Hash); } continue; } @@ -2884,23 +2913,20 @@ ProjectStore::Oplog::GetMapping(CbObjectView Core) if (ServerPath.empty() && Hash == IoHash::Zero) { ZEN_WARN("oplog {}/{}: invalid file for entry '{}', missing both 'serverpath' and 'data' fields", - m_OuterProject->Identifier, + m_OuterProjectId, m_OplogId, Id); continue; } if (ClientPath.empty()) { - ZEN_WARN("oplog {}/{}: invalid file for entry '{}', missing 'clientpath' field", - m_OuterProject->Identifier, - m_OplogId, - Id); + ZEN_WARN("oplog {}/{}: invalid file for entry '{}', missing 'clientpath' field", m_OuterProjectId, m_OplogId, Id); continue; } Result.Files.emplace_back(FileMapping{Id, Hash, std::string(ServerPath), std::string(ClientPath)}); ZEN_DEBUG("oplog {}/{}: file {} -> {}, ServerPath: {}, ClientPath: {}", - m_OuterProject->Identifier, + m_OuterProjectId, m_OplogId, Id, Hash, @@ -2920,7 +2946,7 @@ ProjectStore::Oplog::GetMapping(CbObjectView Core) IoHash Hash = MetaObj["data"sv].AsBinaryAttachment(); Result.Meta.emplace_back(ChunkMapping{Id, Hash}); auto NameString = MetaObj["name"sv].AsString(); - ZEN_DEBUG("oplog {}/{}: meta data ({}) {} -> {}", m_OuterProject->Identifier, m_OplogId, NameString, Id, Hash); + ZEN_DEBUG("oplog {}/{}: meta data ({}) {} -> {}", m_OuterProjectId, m_OplogId, NameString, Id, Hash); } continue; } @@ -3321,7 +3347,7 @@ ProjectStore::Project::BasePathForOplog(std::string_view OplogId) const return m_OplogStoragePath / OplogId; } -ProjectStore::Oplog* +Ref<ProjectStore::Oplog> ProjectStore::Project::NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath) { ZEN_MEMSCOPE(GetProjectstoreTag()); @@ -3333,10 +3359,8 @@ ProjectStore::Project::NewOplog(std::string_view OplogId, const std::filesystem: { ZEN_INFO("oplog '{}/{}': creating oplog at '{}'", Identifier, OplogId, OplogBasePath); - Oplog* Log = m_Oplogs - .try_emplace(std::string{OplogId}, - std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath, MarkerPath)) - .first->second.get(); + Ref<Oplog> Log(new Oplog(m_ProjectStore->Log(), Identifier, OplogId, m_CidStore, OplogBasePath, MarkerPath)); + m_Oplogs.insert_or_assign(std::string{OplogId}, Log); Log->Write(); @@ -3355,11 +3379,11 @@ ProjectStore::Project::NewOplog(std::string_view OplogId, const std::filesystem: m_Oplogs.erase(std::string{OplogId}); - return nullptr; + return {}; } } -ProjectStore::Oplog* +Ref<ProjectStore::Oplog> ProjectStore::Project::OpenOplog(std::string_view OplogId, bool AllowCompact, bool VerifyPathOnDisk) { ZEN_MEMSCOPE(GetProjectstoreTag()); @@ -3394,7 +3418,7 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId, bool AllowCompact, bo if (!ReOpen) { - return OplogIt->second.get(); + return OplogIt->second; } } } @@ -3404,7 +3428,7 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId, bool AllowCompact, bo RwLock::ExclusiveLockScope Lock(m_ProjectLock); if (auto It = m_Oplogs.find(std::string{OplogId}); It != m_Oplogs.end()) { - return It->second.get(); + return It->second; } if (Oplog::ExistsAt(OplogBasePath)) { @@ -3412,11 +3436,9 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId, bool AllowCompact, bo { ZEN_INFO("oplog '{}/{}': opening oplog at '{}'", Identifier, OplogId, OplogBasePath); - Oplog* Log = - m_Oplogs - .try_emplace(std::string{OplogId}, - std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath, std::filesystem::path{})) - .first->second.get(); + Ref<Oplog> Log(new Oplog(m_ProjectStore->Log(), Identifier, OplogId, m_CidStore, OplogBasePath, std::filesystem::path{})); + + m_Oplogs.insert_or_assign(std::string{OplogId}, Log); Log->Read(); Lock.ReleaseNow(); @@ -3436,7 +3458,7 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId, bool AllowCompact, bo } } - return nullptr; + return {}; } void @@ -3460,6 +3482,27 @@ ProjectStore::Oplog::CompactIfUnusedExceeds(bool DryRun, uint32_t CompactUnusedT } bool +ProjectStore::Project::TryUnloadOplog(std::string_view OplogId) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + RwLock::ExclusiveLockScope _(m_ProjectLock); + if (auto OplogIt = m_Oplogs.find(std::string(OplogId)); OplogIt != m_Oplogs.end()) + { + Ref<Oplog>& Oplog = OplogIt->second; + + if (Oplog->CanUnload()) + { + m_Oplogs.erase(OplogIt); + return true; + } + return false; + } + + return false; +} + +bool ProjectStore::Project::RemoveOplog(std::string_view OplogId, std::filesystem::path& OutDeletePath) { ZEN_MEMSCOPE(GetProjectstoreTag()); @@ -3480,12 +3523,11 @@ ProjectStore::Project::RemoveOplog(std::string_view OplogId, std::filesystem::pa } else { - std::unique_ptr<Oplog>& Oplog = OplogIt->second; + Ref<Oplog>& Oplog = OplogIt->second; if (!Oplog->PrepareForDelete(OutDeletePath)) { return false; } - m_DeletedOplogs.emplace_back(std::move(Oplog)); m_Oplogs.erase(OplogIt); } } @@ -3638,7 +3680,6 @@ ProjectStore::Project::PrepareForDelete(std::filesystem::path& OutDeletePath) for (auto& It : m_Oplogs) { It.second->ResetState(); - m_DeletedOplogs.emplace_back(std::move(It.second)); } m_Oplogs.clear(); @@ -3790,7 +3831,7 @@ ProjectStore::Project::IsExpired(const GcClock::TimePoint ExpireTime, std::strin if (OplogIt != m_Oplogs.end()) { Lock.ReleaseNow(); - return IsExpired(ExpireTime, *OplogIt->second.get()); + return IsExpired(ExpireTime, *OplogIt->second); } } @@ -4220,7 +4261,7 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId, } Project->TouchProject(); - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); if (!FoundLog) { return {HttpResponseCode::NotFound, fmt::format("Project files for unknown oplog '{}/{}'", ProjectId, OplogId)}; @@ -4269,6 +4310,7 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId, } FoundLog->IterateChunks( + Project->RootDir, Ids, false, [&](size_t Index, const IoBuffer& Payload, uint64_t /*ModTag*/) { @@ -4384,7 +4426,7 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId, } Project->TouchProject(); - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); if (!FoundLog) { return {HttpResponseCode::NotFound, fmt::format("unknown oplog '{}/{}'", ProjectId, OplogId)}; @@ -4549,7 +4591,7 @@ ProjectStore::GetChunkInfo(const std::string_view ProjectId, } Project->TouchProject(); - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); if (!FoundLog) { return {HttpResponseCode::NotFound, fmt::format("Chunk info request for unknown oplog '{}/{}'", ProjectId, OplogId)}; @@ -4564,7 +4606,7 @@ ProjectStore::GetChunkInfo(const std::string_view ProjectId, const Oid Obj = Oid::FromHexString(ChunkId); - IoBuffer Chunk = FoundLog->FindChunk(Obj, nullptr); + IoBuffer Chunk = FoundLog->FindChunk(Project->RootDir, Obj, nullptr); if (!Chunk) { return {HttpResponseCode::NotFound, {}}; @@ -4744,7 +4786,7 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId, } Project->TouchProject(); - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); if (!FoundLog) { return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)}; @@ -4752,7 +4794,7 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId, Project->TouchOplog(OplogId); uint64_t OldTag = OptionalInOutModificationTag == nullptr ? 0 : *OptionalInOutModificationTag; - IoBuffer Chunk = FoundLog->FindChunk(ChunkId, OptionalInOutModificationTag); + IoBuffer Chunk = FoundLog->FindChunk(Project->RootDir, ChunkId, OptionalInOutModificationTag); if (!Chunk) { return {HttpResponseCode::NotFound, {}}; @@ -4789,7 +4831,7 @@ ProjectStore::GetChunk(const std::string_view ProjectId, } Project->TouchProject(); - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); if (!FoundLog) { return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)}; @@ -4838,7 +4880,7 @@ ProjectStore::PutChunk(const std::string_view ProjectId, } Project->TouchProject(); - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); if (!FoundLog) { return {HttpResponseCode::NotFound, fmt::format("Chunk put request for unknown oplog '{}/{}'", ProjectId, OplogId)}; @@ -4887,7 +4929,7 @@ ProjectStore::GetChunks(const std::string_view ProjectId, } Project->TouchProject(); - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false); + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false); if (!FoundLog) { return {HttpResponseCode::NotFound, fmt::format("getchunks rpc request for unknown oplog '{}/{}'", ProjectId, OplogId)}; @@ -5031,6 +5073,7 @@ ProjectStore::GetChunks(const std::string_view ProjectId, if (!ChunkIdsRequestIndex.empty()) { FoundLog->IterateChunks( + Project->RootDir, ChunkIds, true, [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool { @@ -5072,7 +5115,7 @@ ProjectStore::GetChunks(const std::string_view ProjectId, { const Oid& ChunkId = std::get<Oid>(ChunkRequest.Input.Id); uint64_t ModTag = 0; - IoBuffer Payload = FoundLog->FindChunk(ChunkId, &ModTag); + IoBuffer Payload = FoundLog->FindChunk(Project->RootDir, ChunkId, &ModTag); if (Payload) { ChunkRequest.Output.Exists = true; @@ -5231,7 +5274,7 @@ ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_vie } Project->TouchProject(); - ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false); + Ref<ProjectStore::Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false); if (!Oplog) { return {HttpResponseCode::NotFound, fmt::format("Write oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)}; @@ -5317,7 +5360,7 @@ ProjectStore::ReadOplog(const std::string_view ProjectId, } Project->TouchProject(); - ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); + Ref<ProjectStore::Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); if (!Oplog) { return {HttpResponseCode::NotFound, fmt::format("Read oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)}; @@ -5452,7 +5495,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, bool VerifyPathOnDisk = Method != "getchunks"sv; - ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, VerifyPathOnDisk); + Ref<ProjectStore::Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, VerifyPathOnDisk); if (!Oplog) { HttpReq.WriteResponse(HttpResponseCode::NotFound, @@ -5842,7 +5885,7 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, IgnoreMissingAttachments, CleanOplog](JobContext& Context) { Context.ReportMessage(fmt::format("Loading oplog '{}/{}' from {}", - OplogPtr->GetOuterProject()->Identifier, + OplogPtr->GetOuterProjectIdentifier(), OplogPtr->OplogId(), ActualRemoteStore->GetInfo().Description)); @@ -5995,18 +6038,22 @@ public: CompactOplogCount += OplogsToCompact.size(); for (const std::string& OplogId : OplogsToCompact) { - ProjectStore::Oplog* OpLog = nullptr; + Ref<ProjectStore::Oplog> OpLog; { RwLock::SharedLockScope __(Project->m_ProjectLock); if (auto OpIt = Project->m_Oplogs.find(OplogId); OpIt != Project->m_Oplogs.end()) { - OpLog = OpIt->second.get(); + OpLog = OpIt->second; } else { std::filesystem::path OplogBasePath = Project->BasePathForOplog(OplogId); - OpLog = - new ProjectStore::Oplog(OplogId, Project.Get(), Project->m_CidStore, OplogBasePath, std::filesystem::path{}); + OpLog = new ProjectStore::Oplog(Project->Log(), + Project->Identifier, + OplogId, + Project->m_CidStore, + OplogBasePath, + std::filesystem::path{}); OpLog->Read(); } @@ -6024,11 +6071,6 @@ public: Stats.RemovedDisk += FreedSize; } - - if (auto OpIt = Project->m_Oplogs.find(OplogId); OpIt == Project->m_Oplogs.end()) - { - delete OpLog; - } } } } @@ -6117,6 +6159,16 @@ ProjectStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) { ExpiredOplogs.push_back(OplogId); } + else if (!Project->IsOplogTouchedSince(GcClock::Now() - std::chrono::minutes(15), OplogId)) + { + if (Project->TryUnloadOplog(OplogId)) + { + ZEN_INFO("GCV2: projectstore [REMOVE EXPIRED] '{}': Unloaded oplog {}/{} due to inactivity", + m_ProjectBasePath, + Project->Identifier, + OplogId); + } + } } std::filesystem::path ProjectPath = BasePathForProject(Project->Identifier); @@ -6208,7 +6260,7 @@ public: Stopwatch Timer; - std::vector<ProjectStore::Oplog*> AddedOplogs; + std::vector<Ref<ProjectStore::Oplog>> AddedOplogs; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) @@ -6230,7 +6282,7 @@ public: ProjectStore::Project& Project = *It->second; for (auto& OplogPair : Project.m_Oplogs) { - ProjectStore::Oplog* Oplog = OplogPair.second.get(); + Ref<ProjectStore::Oplog> Oplog = OplogPair.second; AddedOplogs.push_back(Oplog); } } @@ -6244,13 +6296,13 @@ public: { if (auto It = Project.m_Oplogs.find(OplogName); It != Project.m_Oplogs.end()) { - ProjectStore::Oplog* Oplog = It->second.get(); + Ref<ProjectStore::Oplog> Oplog = It->second; AddedOplogs.push_back(Oplog); } } } - for (ProjectStore::Oplog* Oplog : AddedOplogs) + for (const Ref<ProjectStore::Oplog>& Oplog : AddedOplogs) { size_t BaseReferenceCount = m_References.size(); @@ -6326,13 +6378,14 @@ public: { m_Project->DisableUpdateCapture(); - RwLock::SharedLockScope _(m_Project->m_ProjectLock); - if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end()) + if (m_OplogHasUpdateCapture) { - ProjectStore::Oplog* Oplog = It->second.get(); - if (Oplog == m_OplogWithUpdateCapture) + RwLock::SharedLockScope _(m_Project->m_ProjectLock); + if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end()) { + Ref<ProjectStore::Oplog> Oplog = It->second; Oplog->DisableUpdateCapture(); + m_OplogHasUpdateCapture = false; } } } @@ -6364,50 +6417,51 @@ public: m_OplogId); }); - ProjectStore::Oplog* Oplog = nullptr; - auto __ = MakeGuard([this, &Oplog]() { - if (Oplog != nullptr && m_OplogWithUpdateCapture == nullptr) - { - delete Oplog; - } - }); - m_OplogBasePath = m_Project->BasePathForOplog(m_OplogId); - - RwLock::SharedLockScope ___(m_Project->m_ProjectLock); - if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end()) - { - It->second->EnableUpdateCapture(); - Oplog = It->second.get(); - m_OplogWithUpdateCapture = Oplog; - } - else if (ProjectStore::Oplog::ExistsAt(m_OplogBasePath)) - { - Oplog = new ProjectStore::Oplog(m_OplogId, m_Project.Get(), m_Project->m_CidStore, m_OplogBasePath, std::filesystem::path{}); - Oplog->Read(); - } - else + m_OplogBasePath = m_Project->BasePathForOplog(m_OplogId); { - return; - } + Ref<ProjectStore::Oplog> Oplog; - RwLock::SharedLockScope ____(Oplog->m_OplogLock); - if (Ctx.IsCancelledFlag) - { - return; - } + RwLock::SharedLockScope __(m_Project->m_ProjectLock); + if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end()) + { + Oplog = It->second; + Oplog->EnableUpdateCapture(); + m_OplogHasUpdateCapture = true; + } + else if (ProjectStore::Oplog::ExistsAt(m_OplogBasePath)) + { + Oplog = new ProjectStore::Oplog(m_Project->Log(), + m_Project->Identifier, + m_OplogId, + m_Project->m_CidStore, + m_OplogBasePath, + std::filesystem::path{}); + Oplog->Read(); + } + else + { + return; + } - GcClock::TimePoint CompactExpireTime = GcClock::Now() - std::chrono::minutes(30); - if (!m_Project->IsOplogTouchedSince(CompactExpireTime, m_OplogId)) - { - const uint32_t CompactUnusedThreshold = 25; - if (Oplog->GetUnusedSpacePercent() >= CompactUnusedThreshold) + RwLock::SharedLockScope ___(Oplog->m_OplogLock); + if (Ctx.IsCancelledFlag) { - m_Project->AddOplogToCompact(m_OplogId); + return; + } + + GcClock::TimePoint CompactExpireTime = GcClock::Now() - std::chrono::minutes(30); + if (!m_Project->IsOplogTouchedSince(CompactExpireTime, m_OplogId)) + { + const uint32_t CompactUnusedThreshold = 25; + if (Oplog->GetUnusedSpacePercent() >= CompactUnusedThreshold) + { + m_Project->AddOplogToCompact(m_OplogId); + } } - } - Oplog->GetAttachmentsLocked(m_References, Ctx.Settings.StoreProjectAttachmentMetaData); - m_OplogAccessTime = m_Project->LastOplogAccessTime(m_OplogId); + Oplog->GetAttachmentsLocked(m_References, Ctx.Settings.StoreProjectAttachmentMetaData); + m_OplogAccessTime = m_Project->LastOplogAccessTime(m_OplogId); + } FilterReferences(Ctx, fmt::format("projectstore [PRECACHE] '{}'", m_OplogBasePath), m_References); } @@ -6433,7 +6487,7 @@ public: if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end()) { - ProjectStore::Oplog* Oplog = It->second.get(); + Ref<ProjectStore::Oplog> Oplog = It->second; Oplog->IterateCapturedLSNsLocked([&](const CbObjectView& UpdateOp) -> bool { UpdateOp.IterateAttachments([&](CbFieldView Visitor) { m_AddedReferences.emplace_back(Visitor.AsAttachment()); }); return true; @@ -6447,14 +6501,12 @@ public: } else if (m_Project->LastOplogAccessTime(m_OplogId) > m_OplogAccessTime && ProjectStore::Oplog::ExistsAt(m_OplogBasePath)) { - ProjectStore::Oplog* Oplog = - new ProjectStore::Oplog(m_OplogId, m_Project.Get(), m_Project->m_CidStore, m_OplogBasePath, std::filesystem::path{}); - auto __ = MakeGuard([Oplog]() { - if (Oplog != nullptr) - { - delete Oplog; - } - }); + Ref<ProjectStore::Oplog> Oplog(new ProjectStore::Oplog(m_Project->Log(), + m_Project->Identifier, + m_OplogId, + m_Project->m_CidStore, + m_OplogBasePath, + std::filesystem::path{})); Oplog->Read(); Oplog->GetAttachmentsLocked(m_AddedReferences, Ctx.Settings.StoreProjectAttachmentMetaData); } @@ -6495,7 +6547,7 @@ public: Ref<ProjectStore::Project> m_Project; std::string m_OplogId; std::filesystem::path m_OplogBasePath; - ProjectStore::Oplog* m_OplogWithUpdateCapture = nullptr; + bool m_OplogHasUpdateCapture = false; std::vector<IoHash> m_References; std::vector<IoHash> m_AddedReferences; GcClock::TimePoint m_OplogAccessTime; @@ -6608,7 +6660,7 @@ public: ProjectStore::Oplog::ValidationResult Result; Stopwatch Timer; - const auto _ = MakeGuard([&] { + const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; @@ -6623,27 +6675,24 @@ public: Result.LSNHigh, Status); }); - ProjectStore::Oplog* TempOplog = nullptr; - auto __ = MakeGuard([this, &TempOplog]() { - if (TempOplog != nullptr) - { - delete TempOplog; - } - }); - ProjectStore::Oplog* Oplog = nullptr; - Ref<ProjectStore::Project> Project = m_ProjectStore.OpenProject(m_ProjectId); + Ref<ProjectStore::Oplog> Oplog; + Ref<ProjectStore::Project> Project = m_ProjectStore.OpenProject(m_ProjectId); if (Project) { RwLock::SharedLockScope ___(Project->m_ProjectLock); if (auto It = Project->m_Oplogs.find(m_OplogId); It != Project->m_Oplogs.end()) { - Oplog = It->second.get(); + Oplog = It->second; } else { std::filesystem::path OplogBasePath = Project->BasePathForOplog(m_OplogId); - TempOplog = new ProjectStore::Oplog(m_OplogId, Project.Get(), Project->m_CidStore, OplogBasePath, std::filesystem::path{}); - Oplog = TempOplog; + Oplog = Ref<ProjectStore::Oplog>(new ProjectStore::Oplog(Project->Log(), + Project->Identifier, + m_OplogId, + Project->m_CidStore, + OplogBasePath, + std::filesystem::path{})); Oplog->Read(); if (Ctx.IsCancelledFlag) @@ -6652,9 +6701,9 @@ public: } } - if (Oplog != nullptr) + if (Oplog) { - Result = Oplog->Validate(Ctx.IsCancelledFlag, nullptr); + Result = Oplog->Validate(Project->RootDir, Ctx.IsCancelledFlag, nullptr); if (Ctx.IsCancelledFlag) { return; @@ -6691,6 +6740,9 @@ ProjectStore::CreateReferenceValidators(GcCtx& Ctx) { return {}; } + + auto Log = [&Ctx]() { return Ctx.Logger; }; + DiscoverProjects(); std::vector<std::pair<std::string, std::string>> Oplogs; @@ -6996,13 +7048,13 @@ TEST_CASE("project.store.lifetimes") EngineRootDir.string(), ProjectRootDir.string(), ProjectFilePath.string())); - ProjectStore::Oplog* Oplog = Project->NewOplog("oplog1", {}); - CHECK(Oplog != nullptr); + Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("oplog1", {}); + CHECK(Oplog); std::filesystem::path DeletePath; CHECK(Project->PrepareForDelete(DeletePath)); CHECK(!DeletePath.empty()); - CHECK(Project->OpenOplog("oplog1", /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true) == nullptr); + CHECK(!Project->OpenOplog("oplog1", /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true)); // Oplog is now invalid, but pointer can still be accessed since we store old oplog pointers CHECK(Oplog->OplogCount() == 0); // Project is still valid since we have a Ref to it @@ -7059,8 +7111,8 @@ TEST_CASE_TEMPLATE("project.store.export", EngineRootDir.string(), ProjectRootDir.string(), ProjectFilePath.string())); - ProjectStore::Oplog* Oplog = Project->NewOplog("oplog1", {}); - CHECK(Oplog != nullptr); + Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("oplog1", {}); + CHECK(Oplog); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77}))); @@ -7095,8 +7147,8 @@ TEST_CASE_TEMPLATE("project.store.export", CHECK(ExportResult.ErrorCode == 0); - ProjectStore::Oplog* OplogImport = Project->NewOplog("oplog2", {}); - CHECK(OplogImport != nullptr); + Ref<ProjectStore::Oplog> OplogImport = Project->NewOplog("oplog2", {}); + CHECK(OplogImport); RemoteProjectStore::Result ImportResult = LoadOplog(CidStore, *RemoteStore, @@ -7195,8 +7247,8 @@ TEST_CASE("project.store.gc") EngineRootDir.string(), Project1RootDir.string(), Project1FilePath.string())); - ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1", Project1OplogPath); - CHECK(Oplog != nullptr); + Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1", Project1OplogPath); + CHECK(Oplog); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77}))); @@ -7213,8 +7265,8 @@ TEST_CASE("project.store.gc") Project2RootDir.string(), Project2FilePath.string())); { - ProjectStore::Oplog* Oplog = Project2->NewOplog("oplog2", Project2Oplog1Path); - CHECK(Oplog != nullptr); + Ref<ProjectStore::Oplog> Oplog = Project2->NewOplog("oplog2", Project2Oplog1Path); + CHECK(Oplog); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{177}))); @@ -7224,8 +7276,8 @@ TEST_CASE("project.store.gc") CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{535, 221}))); } { - ProjectStore::Oplog* Oplog = Project2->NewOplog("oplog3", Project2Oplog2Path); - CHECK(Oplog != nullptr); + Ref<ProjectStore::Oplog> Oplog = Project2->NewOplog("oplog3", Project2Oplog2Path); + CHECK(Oplog); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{137}))); @@ -7383,7 +7435,7 @@ TEST_CASE("project.store.gc.prep") EngineRootDir.string(), Project1RootDir.string(), Project1FilePath.string())); - ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); + Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments)); } { @@ -7414,13 +7466,13 @@ TEST_CASE("project.store.gc.prep") { // Make sure the chunks are stored but not the referencing op Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); - ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); + Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments)); Project1->DeleteOplog("oplog1"sv); } { Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); - ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); + Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); // Equivalent of a `prep` call with tracking of ops CHECK(Oplog->CheckPendingChunkReferences(OpChunkHashes, std::chrono::hours(1)).empty()); @@ -7461,7 +7513,7 @@ TEST_CASE("project.store.gc.prep") { Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); - ProjectStore::Oplog* Oplog = Project1->OpenOplog("oplog1"sv, true, true); + Ref<ProjectStore::Oplog> Oplog = Project1->OpenOplog("oplog1"sv, true, true); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments)); Oplog->RemovePendingChunkReferences(OpChunkHashes); CHECK(Oplog->GetPendingChunkReferencesLocked().size() == 0); @@ -7495,7 +7547,7 @@ TEST_CASE("project.store.gc.prep") { // Make sure the chunks are stored but not the referencing op Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); - ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); + Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments)); Project1->DeleteOplog("oplog1"sv); } @@ -7503,7 +7555,7 @@ TEST_CASE("project.store.gc.prep") // Caution - putting breakpoints and stepping through this part of the test likely makes it fails due to expiry time of pending chunks { Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); - ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); + Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); CHECK(Oplog->CheckPendingChunkReferences(OpChunkHashes, std::chrono::milliseconds(100)).empty()); } @@ -7590,8 +7642,8 @@ TEST_CASE("project.store.rpc.getchunks") EngineRootDir.string(), Project1RootDir.string(), Project1FilePath.string())); - ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, {}); - CHECK(Oplog != nullptr); + Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, {}); + CHECK(Oplog); Attachments[OpIds[0]] = {}; Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{77}); Attachments[OpIds[2]] = @@ -8510,8 +8562,8 @@ TEST_CASE("project.store.partial.read") EngineRootDir.string(), Project1RootDir.string(), Project1FilePath.string())); - ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, {}); - CHECK(Oplog != nullptr); + Ref<ProjectStore::Oplog> Oplog = Project1->NewOplog("oplog1"sv, {}); + CHECK(Oplog); Attachments[OpIds[0]] = {}; Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{77}); Attachments[OpIds[2]] = CreateAttachments(std::initializer_list<size_t>{7123, 9583, 690, 99}); @@ -8690,8 +8742,8 @@ TEST_CASE("project.store.iterateoplog") EngineRootDir.string(), ProjectRootDir.string(), ProjectFilePath.string())); - ProjectStore::Oplog* Oplog = TestProject->NewOplog("oplog"sv, ProjectOplogPath); - CHECK(Oplog != nullptr); + Ref<ProjectStore::Oplog> Oplog = TestProject->NewOplog("oplog"sv, ProjectOplogPath); + CHECK(Oplog); struct TestOidData { diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index 2595d7198..b41862f40 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -78,12 +78,13 @@ public: struct Project; - struct Oplog + struct Oplog : public RefCounted { - Oplog(std::string_view Id, - Project* Project, + Oplog(const LoggerRef& Log, + std::string_view ProjectIdentifier, + std::string_view Id, CidStore& Store, - std::filesystem::path BasePath, + const std::filesystem::path& BasePath, const std::filesystem::path& MarkerPath); ~Oplog(); @@ -94,6 +95,7 @@ public: void Write(); void Update(const std::filesystem::path& MarkerPath); bool Reset(); + bool CanUnload(); struct ChunkInfo { @@ -107,7 +109,7 @@ public: int32_t Count = -1; }; - std::vector<ChunkInfo> GetAllChunksInfo(); + std::vector<ChunkInfo> GetAllChunksInfo(const std::filesystem::path& ProjectRootDir); void IterateChunkMap(std::function<void(const Oid&, const IoHash& Hash)>&& Fn); void IterateFileMap(std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn); void IterateOplog(std::function<void(CbObjectView)>&& Fn, const Paging& EntryPaging); @@ -120,18 +122,19 @@ public: std::optional<CbObject> GetOpByIndex(uint32_t Index); std::optional<uint32_t> GetOpIndexByKey(const Oid& Key); - IoBuffer FindChunk(const Oid& ChunkId, uint64_t* OptOutModificationTag); - IoBuffer GetChunkByRawHash(const IoHash& RawHash); - bool IterateChunks(std::span<IoHash> RawHashes, - bool IncludeModTag, - const std::function<bool(size_t Index, const IoBuffer& Payload, uint64_t ModTag)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool, - uint64_t LargeSizeLimit); - bool IterateChunks(std::span<Oid> ChunkIds, - bool IncludeModTag, - const std::function<bool(size_t Index, const IoBuffer& Payload, uint64_t ModTag)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool, - uint64_t LargeSizeLimit); + IoBuffer FindChunk(const std::filesystem::path& ProjectRootDir, const Oid& ChunkId, uint64_t* OptOutModificationTag); + IoBuffer GetChunkByRawHash(const IoHash& RawHash); + bool IterateChunks(std::span<IoHash> RawHashes, + bool IncludeModTag, + const std::function<bool(size_t Index, const IoBuffer& Payload, uint64_t ModTag)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit); + bool IterateChunks(const std::filesystem::path& ProjectRootDir, + std::span<Oid> ChunkIds, + bool IncludeModTag, + const std::function<bool(size_t Index, const IoBuffer& Payload, uint64_t ModTag)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit); inline static const uint32_t kInvalidOp = ~0u; /** Persist a new oplog entry @@ -154,7 +157,7 @@ public: const std::filesystem::path& TempPath() const { return m_TempPath; } const std::filesystem::path& MarkerPath() const { return m_MarkerPath; } - LoggerRef Log() { return m_OuterProject->Log(); } + LoggerRef Log() const { return m_Log; } void Flush(); void Scrub(ScrubContext& Ctx); static uint64_t TotalSize(const std::filesystem::path& BasePath); @@ -186,8 +189,8 @@ public: void GetAttachmentsLocked(std::vector<IoHash>& OutAttachments, bool StoreMetaDataOnDisk); - Project* GetOuterProject() const { return m_OuterProject; } - void CompactIfUnusedExceeds(bool DryRun, uint32_t CompactUnusedThreshold, std::string_view LogPrefix); + std::string_view GetOuterProjectIdentifier() const { return m_OuterProjectId; } + void CompactIfUnusedExceeds(bool DryRun, uint32_t CompactUnusedThreshold, std::string_view LogPrefix); static std::optional<CbObject> ReadStateFile(const std::filesystem::path& BasePath, std::function<LoggerRef()>&& Log); @@ -222,7 +225,9 @@ public: } }; - ValidationResult Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPool* OptionalWorkerPool); + ValidationResult Validate(const std::filesystem::path& ProjectRootDir, + std::atomic_bool& IsCancelledFlag, + WorkerThreadPool* OptionalWorkerPool); private: struct FileMapEntry @@ -234,7 +239,8 @@ public: template<class V> using OidMap = tsl::robin_map<Oid, V, Oid::Hasher>; - Project* m_OuterProject = nullptr; + LoggerRef m_Log; + const std::string m_OuterProjectId; const std::string m_OplogId; CidStore& m_CidStore; const std::filesystem::path m_BasePath; @@ -308,8 +314,9 @@ public: std::filesystem::path ProjectRootDir; std::filesystem::path ProjectFilePath; - Oplog* NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath); - Oplog* OpenOplog(std::string_view OplogId, bool AllowCompact, bool VerifyPathOnDisk); + Ref<Oplog> NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath); + Ref<Oplog> OpenOplog(std::string_view OplogId, bool AllowCompact, bool VerifyPathOnDisk); + bool TryUnloadOplog(std::string_view OplogId); bool DeleteOplog(std::string_view OplogId); bool RemoveOplog(std::string_view OplogId, std::filesystem::path& OutDeletePath); void IterateOplogs(std::function<void(const RwLock::SharedLockScope&, const Oplog&)>&& Fn) const; @@ -361,8 +368,7 @@ public: ProjectStore* m_ProjectStore; CidStore& m_CidStore; mutable RwLock m_ProjectLock; - std::map<std::string, std::unique_ptr<Oplog>> m_Oplogs; - std::vector<std::unique_ptr<Oplog>> m_DeletedOplogs; + std::map<std::string, Ref<Oplog>> m_Oplogs; std::filesystem::path m_OplogStoragePath; mutable RwLock m_LastAccessTimesLock; mutable tsl::robin_map<std::string, GcClock::Tick> m_LastAccessTimes; |