// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END #include #include namespace zen { class CbPackage; class CidStore; class AuthMgr; class ScrubContext; class JobQueue; class OpenProcessCache; enum class HttpResponseCode; struct OplogEntry { uint32_t OpLsn; uint32_t OpCoreOffset; // note: Multiple of alignment! uint32_t OpCoreSize; uint32_t OpCoreHash; // Used as checksum Oid OpKeyHash; uint32_t Reserved; inline bool IsTombstone() const { return OpCoreOffset == 0 && OpCoreSize == 0 && OpLsn == 0; } inline void MakeTombstone() { OpLsn = OpCoreOffset = OpCoreSize = OpCoreHash = Reserved = 0; } }; struct OplogEntryAddress { uint64_t Offset; uint64_t Size; }; static_assert(IsPow2(sizeof(OplogEntry))); /** Project Store A project store consists of a number of Projects. Each project contains a number of oplogs (short for "operation log"). UE uses one oplog per target platform to store the output of the cook process. An oplog consists of a sequence of "op" entries. Each entry is a structured object containing references to attachments. Attachments are typically the serialized package data split into separate chunks for bulk data, exports and header information. */ class ProjectStore : public RefCounted, public GcStorage, public GcReferencer, public GcReferenceLocker { struct OplogStorage; public: struct Configuration { }; typedef std::function GetCidStoreFunc; ProjectStore(GetCidStoreFunc&& GetCidStore, std::filesystem::path BasePath, GcManager& Gc, JobQueue& JobQueue, OpenProcessCache& InOpenProcessCache, const Configuration& Config); ~ProjectStore(); struct Project; struct Oplog { Oplog(std::string_view Id, Project* Project, CidStore& Store, std::filesystem::path BasePath, const std::filesystem::path& MarkerPath); ~Oplog(); [[nodiscard]] static bool ExistsAt(const std::filesystem::path& BasePath); bool Exists() const; void Read(); void Write(); void Update(const std::filesystem::path& MarkerPath); bool Reset(); struct ChunkInfo { Oid ChunkId; uint64_t ChunkSize; }; struct Paging { int32_t Start = -1; int32_t Count = -1; }; std::vector GetAllChunksInfo(); void IterateChunkMap(std::function&& Fn); void IterateFileMap(std::function&& Fn); void IterateOplog(std::function&& Fn, const Paging& EntryPaging); void IterateOplogWithKey(std::function&& Fn); void IterateOplogWithKey(std::function&& Fn, const Paging& EntryPaging); void IterateOplogLocked(std::function&& Fn, const Paging& EntryPaging); size_t GetOplogEntryCount() const; std::optional GetOpByKey(const Oid& Key); std::optional GetOpByIndex(uint32_t Index); std::optional GetOpIndexByKey(const Oid& Key); IoBuffer FindChunk(const Oid& ChunkId, uint64_t* OptOutModificationTag); IoBuffer GetChunkByRawHash(const IoHash& RawHash); bool IterateChunks(std::span RawHashes, bool IncludeModTag, const std::function& AsyncCallback, WorkerThreadPool* OptionalWorkerPool, uint64_t LargeSizeLimit); bool IterateChunks(std::span ChunkIds, bool IncludeModTag, const std::function& AsyncCallback, WorkerThreadPool* OptionalWorkerPool, uint64_t LargeSizeLimit); inline static const uint32_t kInvalidOp = ~0u; /** Persist a new oplog entry * * Returns the oplog LSN assigned to the new entry, or kInvalidOp if the entry is rejected */ uint32_t AppendNewOplogEntry(CbPackage Op); uint32_t AppendNewOplogEntry(CbObjectView Core); std::vector AppendNewOplogEntries(std::span Cores); enum UpdateType { kUpdateNewEntry, kUpdateReplay }; const std::string& OplogId() const { return m_OplogId; } const std::filesystem::path& TempPath() const { return m_TempPath; } const std::filesystem::path& MarkerPath() const { return m_MarkerPath; } LoggerRef Log() { return m_OuterProject->Log(); } void Flush(); void Scrub(ScrubContext& Ctx); static uint64_t TotalSize(const std::filesystem::path& BasePath); uint64_t TotalSize() const; std::size_t OplogCount() const { RwLock::SharedLockScope _(m_OplogLock); return m_LatestOpMap.size(); } void ResetState(); bool PrepareForDelete(std::filesystem::path& OutRemoveDirectory); void AddChunkMappings(const std::unordered_map& ChunkMappings); void EnableUpdateCapture(); void DisableUpdateCapture(); void CaptureAddedAttachments(std::span AttachmentHashes); std::vector GetCapturedAttachmentsLocked(); std::vector CheckPendingChunkReferences(std::span ChunkHashes, const GcClock::Duration& RetainTime); void RemovePendingChunkReferences(std::span ChunkHashes); std::vector GetPendingChunkReferencesLocked(); RwLock::SharedLockScope GetGcReferencerLock() { return RwLock::SharedLockScope(m_OplogLock); } uint32_t GetUnusedSpacePercent() const; void Compact(bool DryRun, bool RetainLSNs, std::string_view LogPrefix); void GetAttachmentsLocked(std::vector& OutAttachments, bool StoreMetaDataOnDisk); Project* GetOuterProject() const { return m_OuterProject; } void CompactIfUnusedExceeds(bool DryRun, uint32_t CompactUnusedThreshold, std::string_view LogPrefix); static std::optional ReadStateFile(const std::filesystem::path& BasePath, std::function&& Log); struct ChunkMapping { Oid Id; IoHash Hash; }; struct FileMapping { Oid Id; IoHash Hash; // This is either zero or a cid std::string ServerPath; // If Hash is valid then this should be empty std::string ClientPath; }; struct ValidationResult { uint32_t OpCount = 0; uint32_t LSNLow = 0; uint32_t LSNHigh = 0; std::vector> MissingFiles; std::vector> MissingChunks; std::vector> MissingMetas; std::vector> MissingAttachments; std::vector> OpKeys; bool IsEmpty() const { return MissingFiles.empty() && MissingChunks.empty() && MissingMetas.empty() && MissingAttachments.empty(); } }; ValidationResult Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPool* OptionalWorkerPool); private: struct FileMapEntry { std::string ServerPath; std::string ClientPath; }; template using OidMap = tsl::robin_map; Project* m_OuterProject = nullptr; const std::string m_OplogId; CidStore& m_CidStore; const std::filesystem::path m_BasePath; std::filesystem::path m_MarkerPath; std::filesystem::path m_TempPath; std::filesystem::path m_MetaPath; mutable RwLock m_OplogLock; OidMap m_ChunkMap; // output data chunk id -> CAS address OidMap m_MetaMap; // meta chunk id -> CAS address OidMap m_FileMap; // file id -> file map entry int32_t m_ManifestVersion; // File system manifest version tsl::robin_map m_OpAddressMap; // Index LSN -> op data in ops blob file OidMap m_LatestOpMap; // op key -> latest op LSN for key std::atomic m_MetaValid = false; uint32_t m_UpdateCaptureRefCounter = 0; std::unique_ptr> m_CapturedLSNs; std::unique_ptr> m_CapturedAttachments; std::unordered_set m_PendingPrepOpAttachments; GcClock::TimePoint m_PendingPrepOpAttachmentsRetainEnd; RefPtr m_Storage; uint64_t m_LogFlushPosition = 0; RefPtr GetStorage(); /** Scan oplog and register each entry, thus updating the in-memory tracking tables */ uint32_t GetUnusedSpacePercentLocked() const; void WriteIndexSnapshot(); void ReadIndexSnapshot(); struct OplogEntryMapping { std::vector Chunks; std::vector Meta; std::vector Files; }; OplogEntryMapping GetMapping(CbObjectView Core); /** Update tracking metadata for a new oplog entry * * This is used during replay (and gets called as part of new op append) * * Returns the oplog LSN assigned to the new entry, or kInvalidOp if the entry is rejected */ uint32_t RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, const OplogEntryMapping& OpMapping, const OplogEntry& OpEntry); void AddFileMapping(const RwLock::ExclusiveLockScope& OplogLock, const Oid& FileId, const IoHash& Hash, std::string_view ServerPath, std::string_view ClientPath); void AddChunkMapping(const RwLock::ExclusiveLockScope& OplogLock, const Oid& ChunkId, const IoHash& Hash); void AddMetaMapping(const RwLock::ExclusiveLockScope& OplogLock, const Oid& ChunkId, const IoHash& Hash); void Compact(RwLock::ExclusiveLockScope& Lock, bool DryRun, bool RetainLSNs, std::string_view LogPrefix); void IterateCapturedLSNsLocked(std::function&& Callback); friend class ProjectStoreOplogReferenceChecker; friend class ProjectStoreReferenceChecker; friend class ProjectStoreOplogReferenceValidator; }; struct Project : public RefCounted { std::string Identifier; std::filesystem::path RootDir; std::filesystem::path EngineRootDir; std::filesystem::path ProjectRootDir; std::filesystem::path ProjectFilePath; Oplog* NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath); Oplog* OpenOplog(std::string_view OplogId, bool AllowCompact, bool VerifyPathOnDisk); bool DeleteOplog(std::string_view OplogId); bool RemoveOplog(std::string_view OplogId, std::filesystem::path& OutDeletePath); void IterateOplogs(std::function&& Fn) const; void IterateOplogs(std::function&& Fn); std::vector ScanForOplogs() const; bool IsExpired(const GcClock::TimePoint ExpireTime) const; bool IsExpired(const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog) const; bool IsExpired(const GcClock::TimePoint ExpireTime, std::string_view OplogId) const; bool IsOplogTouchedSince(const GcClock::TimePoint TouchTime, std::string_view Oplog) const; void TouchProject(); void TouchOplog(std::string_view Oplog); GcClock::TimePoint LastOplogAccessTime(std::string_view Oplog) const; Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath); virtual ~Project(); CidStore& GetCidStore() { return m_CidStore; }; void Read(); void Write(); [[nodiscard]] static bool Exists(const std::filesystem::path& BasePath); void Flush(); void Scrub(ScrubContext& Ctx); LoggerRef Log() const; static uint64_t TotalSize(const std::filesystem::path& BasePath); uint64_t TotalSize() const; bool PrepareForDelete(std::filesystem::path& OutDeletePath); void EnableUpdateCapture(); void DisableUpdateCapture(); std::vector GetCapturedOplogsLocked(); std::vector GetGcReferencerLocks(); void AddOplogToCompact(std::string_view OplogId) { m_OplogsToCompactLock.WithExclusiveLock([&]() { m_OplogsToCompact.insert(std::string(OplogId)); }); } std::vector GetOplogsToCompact() { std::vector Result; m_OplogsToCompactLock.WithExclusiveLock([&]() { Result.reserve(m_OplogsToCompact.size()); Result.insert(Result.end(), m_OplogsToCompact.begin(), m_OplogsToCompact.end()); m_OplogsToCompact.clear(); }); return Result; } private: ProjectStore* m_ProjectStore; CidStore& m_CidStore; mutable RwLock m_ProjectLock; std::map> m_Oplogs; std::vector> m_DeletedOplogs; std::filesystem::path m_OplogStoragePath; mutable RwLock m_LastAccessTimesLock; mutable tsl::robin_map m_LastAccessTimes; uint32_t m_UpdateCaptureRefCounter = 0; std::unique_ptr> m_CapturedOplogs; RwLock m_OplogsToCompactLock; std::unordered_set m_OplogsToCompact; std::filesystem::path BasePathForOplog(std::string_view OplogId) const; bool IsExpired(const std::string& EntryName, const std::filesystem::path& MarkerPath, const GcClock::TimePoint ExpireTime) const; void WriteAccessTimes(); void ReadAccessTimes(); friend class ProjectStoreOplogReferenceChecker; friend class ProjectStoreReferenceChecker; friend class ProjectStoreOplogReferenceValidator; friend class ProjectStoreGcStoreCompactor; }; Ref OpenProject(std::string_view ProjectId); Ref NewProject(const std::filesystem::path& BasePath, std::string_view ProjectId, const std::filesystem::path& RootDir, const std::filesystem::path& EngineRootDir, const std::filesystem::path& ProjectRootDir, const std::filesystem::path& ProjectFilePath); bool UpdateProject(std::string_view ProjectId, const std::filesystem::path& RootDir, const std::filesystem::path& EngineRootDir, const std::filesystem::path& ProjectRootDir, const std::filesystem::path& ProjectFilePath); bool RemoveProject(std::string_view ProjectId, std::filesystem::path& OutDeletePath); bool DeleteProject(std::string_view ProjectId); bool Exists(std::string_view ProjectId); void Flush(); void DiscoverProjects(); void IterateProjects(std::function&& Fn); LoggerRef Log() { return m_Log; } const std::filesystem::path& BasePath() const { return m_ProjectBasePath; } // GcStorage virtual void ScrubStorage(ScrubContext& Ctx) override; virtual GcStorageSize StorageSize() const override; virtual std::string GetGcName(GcCtx& Ctx) override; virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override; virtual std::vector CreateReferenceCheckers(GcCtx& Ctx) override; virtual std::vector CreateReferenceValidators(GcCtx& Ctx) override; virtual std::vector LockState(GcCtx& Ctx) override; CbArray GetProjectsList(); std::pair GetProjectFiles(const std::string_view ProjectId, const std::string_view OplogId, const std::unordered_set& WantedFieldNames, CbObject& OutPayload); std::pair GetProjectChunkInfos(const std::string_view ProjectId, const std::string_view OplogId, const std::unordered_set& WantedFieldNames, CbObject& OutPayload); std::pair GetChunkInfo(const std::string_view ProjectId, const std::string_view OplogId, const std::string_view ChunkId, CbObject& OutPayload); std::pair GetChunkRange(const std::string_view ProjectId, const std::string_view OplogId, const Oid ChunkId, uint64_t Offset, uint64_t Size, ZenContentType AcceptType, CompositeBuffer& OutChunk, ZenContentType& OutContentType, uint64_t* OptionalInOutModificationTag); std::pair GetChunkRange(const std::string_view ProjectId, const std::string_view OplogId, const std::string_view ChunkId, uint64_t Offset, uint64_t Size, ZenContentType AcceptType, CompositeBuffer& OutChunk, ZenContentType& OutContentType, uint64_t* OptionalInOutModificationTag); std::pair GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const std::string_view Cid, IoBuffer& OutChunk, uint64_t* OptionalInOutModificationTag); std::pair PutChunk(const std::string_view ProjectId, const std::string_view OplogId, const std::string_view Cid, ZenContentType ContentType, IoBuffer&& Chunk); std::pair WriteOplog(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload, CbObject& OutResponse); std::pair ReadOplog(const std::string_view ProjectId, const std::string_view OplogId, const HttpServerRequest::QueryParams& Params, CbObject& OutResponse); std::pair GetChunks(const std::string_view ProjectId, const std::string_view OplogId, const CbObject& RequestObject, CbPackage& OutResponsePackage); bool Rpc(HttpServerRequest& HttpReq, const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload, AuthMgr& AuthManager); std::pair Export(Ref Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager); std::pair Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager); bool AreDiskWritesAllowed() const; void EnableUpdateCapture(); void DisableUpdateCapture(); std::vector GetCapturedProjectsLocked(); private: LoggerRef m_Log; GcManager& m_Gc; GetCidStoreFunc m_GetCidStore; JobQueue& m_JobQueue; OpenProcessCache& m_OpenProcessCache; std::filesystem::path m_ProjectBasePath; const Configuration m_Config; mutable RwLock m_ProjectsLock; std::map> m_Projects; const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; uint32_t m_UpdateCaptureRefCounter = 0; std::unique_ptr> m_CapturedProjects; std::filesystem::path BasePathForProject(std::string_view ProjectId); friend class ProjectStoreGcStoreCompactor; friend class ProjectStoreOplogReferenceChecker; friend class ProjectStoreReferenceChecker; }; Oid OpKeyStringAsOid(std::string_view OpKey); void prj_forcelink(); } // namespace zen