// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include "cacheshared.h" #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END #include #include namespace zen { class IoBuffer; class JobQueue; #pragma pack(push) #pragma pack(1) struct DiskLocation { inline DiskLocation() = default; inline DiskLocation(uint64_t ValueSize, uint8_t Flags) : Flags(Flags | kStandaloneFile) { Location.StandaloneSize = ValueSize; } inline DiskLocation(const BlockStoreLocation& Location, uint32_t PayloadAlignment, uint8_t Flags) : Flags(Flags & ~kStandaloneFile) { this->Location.BlockLocation = BlockStoreDiskLocation(Location, PayloadAlignment); } inline bool operator!=(const DiskLocation& Rhs) const { if (Flags != Rhs.Flags) { return true; } if (Flags & kStandaloneFile) { return Location.StandaloneSize != Rhs.Location.StandaloneSize; } return Location.BlockLocation != Rhs.Location.BlockLocation; } inline BlockStoreLocation GetBlockLocation(uint32_t PayloadAlignment) const { ZEN_ASSERT(!(Flags & kStandaloneFile)); return Location.BlockLocation.Get(PayloadAlignment); } inline uint64_t Size() const { return (Flags & kStandaloneFile) ? Location.StandaloneSize : Location.BlockLocation.GetSize(); } inline uint8_t IsFlagSet(uint64_t Flag) const { return Flags & Flag; } inline uint8_t GetFlags() const { return Flags; } inline ZenContentType GetContentType() const { ZenContentType ContentType = ZenContentType::kBinary; if (IsFlagSet(kStructured)) { ContentType = ZenContentType::kCbObject; } if (IsFlagSet(kCompressed)) { ContentType = ZenContentType::kCompressedBinary; } return ContentType; } union { BlockStoreDiskLocation BlockLocation; // 10 bytes uint64_t StandaloneSize = 0; // 8 bytes } Location; static const uint8_t kStandaloneFile = 0x80u; // Stored as a separate file static const uint8_t kStructured = 0x40u; // Serialized as compact binary static const uint8_t kTombStone = 0x20u; // Represents a deleted key/value static const uint8_t kCompressed = 0x10u; // Stored in compressed buffer format uint8_t Flags = 0; uint8_t Reserved = 0; }; struct DiskIndexEntry { IoHash Key; // 20 bytes DiskLocation Location; // 12 bytes }; #pragma pack(pop) static_assert(sizeof(DiskIndexEntry) == 32); ////////////////////////////////////////////////////////////////////////// class ZenCacheDiskLayer { public: struct BucketConfiguration { uint64_t MaxBlockSize = 1ull << 30; uint32_t PayloadAlignment = 1u << 4; uint64_t MemCacheSizeThreshold = 1 * 1024; uint64_t LargeObjectThreshold = 128 * 1024; bool EnableReferenceCaching = false; }; struct Configuration { BucketConfiguration BucketConfig; uint64_t MemCacheTargetFootprintBytes = 512 * 1024 * 1024; uint64_t MemCacheTrimIntervalSeconds = 60; uint64_t MemCacheMaxAgeSeconds = gsl::narrow(std::chrono::seconds(std::chrono::days(1)).count()); }; struct BucketInfo { uint64_t EntryCount = 0; GcStorageSize StorageSize; }; struct Info { std::filesystem::path RootDir; Configuration Config; std::vector BucketNames; uint64_t EntryCount = 0; GcStorageSize StorageSize; }; struct BucketStats { uint64_t DiskSize; uint64_t MemorySize; uint64_t DiskHitCount; uint64_t DiskMissCount; uint64_t DiskWriteCount; uint64_t MemoryHitCount; uint64_t MemoryMissCount; uint64_t MemoryWriteCount; metrics::RequestStatsSnapshot PutOps; metrics::RequestStatsSnapshot GetOps; }; struct NamedBucketStats { std::string BucketName; BucketStats Stats; }; struct DiskStats { std::vector BucketStats; uint64_t DiskSize; uint64_t MemorySize; }; explicit ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config); ~ZenCacheDiskLayer(); bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span References); bool Drop(); bool DropBucket(std::string_view Bucket); void Flush(); void ScrubStorage(ScrubContext& Ctx); void GatherReferences(GcContext& GcCtx); void CollectGarbage(GcContext& GcCtx); void DiscoverBuckets(); GcStorageSize StorageSize() const; DiskStats Stats() const; Info GetInfo() const; std::optional GetBucketInfo(std::string_view Bucket) const; void EnumerateBucketContents(std::string_view Bucket, std::function& Fn) const; CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const; #if ZEN_WITH_TESTS void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time); #endif // ZEN_WITH_TESTS /** A cache bucket manages a single directory containing metadata and data for that bucket */ struct CacheBucket : public GcReferencer { CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, std::string BucketName, const BucketConfiguration& Config); ~CacheBucket(); bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span References); uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime); bool Drop(); void Flush(); void ScrubStorage(ScrubContext& Ctx); void GatherReferences(GcContext& GcCtx); void CollectGarbage(GcContext& GcCtx); inline GcStorageSize StorageSize() const { return {.DiskSize = m_StandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(), .MemorySize = m_MemCachedSize.load(std::memory_order::relaxed)}; } uint64_t EntryCount() const; BucketStats Stats(); CacheValueDetails::BucketDetails GetValueDetails(RwLock::SharedLockScope& IndexLock, const std::string_view ValueFilter) const; void EnumerateBucketContents(std::function& Fn) const; void GetUsageByAccess(GcClock::TimePoint Now, GcClock::Duration MaxAge, std::vector& InOutUsageSlots); #if ZEN_WITH_TESTS void SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time); #endif // ZEN_WITH_TESTS private: #pragma pack(push) #pragma pack(1) struct MetaDataIndex { uint32_t Index = std::numeric_limits::max(); operator bool() const { return Index != std::numeric_limits::max(); }; MetaDataIndex() = default; explicit MetaDataIndex(size_t InIndex) : Index(uint32_t(InIndex)) {} operator size_t() const { return Index; }; inline auto operator<=>(const MetaDataIndex& Other) const = default; }; struct MemCachedIndex { uint32_t Index = std::numeric_limits::max(); operator bool() const { return Index != std::numeric_limits::max(); }; MemCachedIndex() = default; explicit MemCachedIndex(uint32_t InIndex) : Index(InIndex) {} operator size_t() const { return Index; }; inline auto operator<=>(const MemCachedIndex& Other) const = default; }; struct ReferenceIndex { uint32_t Index = std::numeric_limits::max(); static const ReferenceIndex Unknown() { return ReferenceIndex{std::numeric_limits::max()}; } static const ReferenceIndex None() { return ReferenceIndex{std::numeric_limits::max() - 1}; } ReferenceIndex() = default; explicit ReferenceIndex(size_t InIndex) : Index(uint32_t(InIndex)) {} operator size_t() const { return Index; }; operator bool() const { return Index != std::numeric_limits::max(); }; inline auto operator<=>(const ReferenceIndex& Other) const = default; }; struct PayloadIndex { uint32_t Index = std::numeric_limits::max(); operator bool() const { return Index != std::numeric_limits::max(); }; PayloadIndex() = default; explicit PayloadIndex(size_t InIndex) : Index(uint32_t(InIndex)) {} operator size_t() const { return Index; }; inline auto operator<=>(const PayloadIndex& Other) const = default; }; struct BucketPayload { DiskLocation Location; // 12 MetaDataIndex MetaData; // 4 MemCachedIndex MemCached; // 4 }; struct BucketMetaData { uint64_t RawSize = 0; // 8 IoHash RawHash; // 20 operator bool() const { return RawSize != 0 || RawHash != IoHash::Zero; }; }; struct MemCacheData { IoBuffer Payload; PayloadIndex OwnerIndex; }; #pragma pack(pop) static_assert(sizeof(BucketPayload) == 20u); static_assert(sizeof(BucketMetaData) == 28u); static_assert(sizeof(AccessTime) == 4u); using IndexMap = tsl::robin_map; GcManager& m_Gc; std::atomic_uint64_t& m_OuterCacheMemoryUsage; std::string m_BucketName; std::filesystem::path m_BucketDir; std::filesystem::path m_BlocksBasePath; BucketConfiguration m_Configuration; BlockStore m_BlockStore; Oid m_BucketId; std::atomic_bool m_IsFlushing{true}; // Don't allow flush until we are properly initialized // These files are used to manage storage of small objects for this bucket TCasLogFile m_SlogFile; uint64_t m_LogFlushPosition = 0; std::atomic m_DiskHitCount; std::atomic m_DiskMissCount; std::atomic m_DiskWriteCount; std::atomic m_MemoryHitCount; std::atomic m_MemoryMissCount; std::atomic m_MemoryWriteCount; metrics::RequestStats m_PutOps; metrics::RequestStats m_GetOps; mutable RwLock m_IndexLock; IndexMap m_Index; std::vector m_AccessTimes; std::vector m_Payloads; std::vector m_MetaDatas; std::vector m_FreeMetaDatas; std::vector m_MemCachedPayloads; std::vector m_FreeMemCachedPayloads; std::vector m_FirstReferenceIndex; std::vector m_ReferenceHashes; std::vector m_NextReferenceHashesIndexes; std::unique_ptr m_UpdatedKeys; size_t m_ReferenceCount = 0; std::atomic_uint64_t m_StandaloneSize{}; std::atomic_uint64_t m_MemCachedSize{}; virtual std::string GetGcName(GcCtx& Ctx) override; virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override; virtual std::vector CreateReferenceCheckers(GcCtx& Ctx) override; void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const; void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span References); IoBuffer GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const; void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span References); IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const; CacheValueDetails::ValueDetails GetValueDetails(RwLock::SharedLockScope&, const IoHash& Key, PayloadIndex Index) const; void CompactReferences(RwLock::ExclusiveLockScope&); void SetReferences(RwLock::ExclusiveLockScope&, ReferenceIndex& FirstReferenceIndex, std::span References); void RemoveReferences(RwLock::ExclusiveLockScope&, ReferenceIndex& FirstReferenceIndex); inline bool GetReferences(RwLock::SharedLockScope&, ReferenceIndex FirstReferenceIndex, std::vector& OutReferences) const { return LockedGetReferences(FirstReferenceIndex, OutReferences); } inline bool GetReferences(RwLock::ExclusiveLockScope&, ReferenceIndex FirstReferenceIndex, std::vector& OutReferences) const { return LockedGetReferences(FirstReferenceIndex, OutReferences); } ReferenceIndex AllocateReferenceEntry(RwLock::ExclusiveLockScope&, const IoHash& Key); bool LockedGetReferences(ReferenceIndex FirstReferenceIndex, std::vector& OutReferences) const; void ClearReferenceCache(); void SetMetaData(RwLock::ExclusiveLockScope&, BucketPayload& Payload, const ZenCacheDiskLayer::CacheBucket::BucketMetaData& MetaData); void RemoveMetaData(RwLock::ExclusiveLockScope&, BucketPayload& Payload); BucketMetaData GetMetaData(RwLock::SharedLockScope&, const BucketPayload& Payload) const; void SetMemCachedData(RwLock::ExclusiveLockScope&, PayloadIndex PayloadIndex, IoBuffer& MemCachedData); size_t RemoveMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload); void InitializeIndexFromDisk(RwLock::ExclusiveLockScope&, bool IsNew); uint64_t ReadIndexFile(RwLock::ExclusiveLockScope&, const std::filesystem::path& IndexPath, uint32_t& OutVersion); uint64_t ReadLog(RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t LogPosition); void SaveSnapshot(const std::function& ClaimDiskReserveFunc = []() { return 0; }); void WriteIndexSnapshot( RwLock::ExclusiveLockScope&, const std::function& ClaimDiskReserveFunc = []() { return 0; }) { WriteIndexSnapshotLocked(ClaimDiskReserveFunc); } void WriteIndexSnapshot( RwLock::SharedLockScope&, const std::function& ClaimDiskReserveFunc = []() { return 0; }) { WriteIndexSnapshotLocked(ClaimDiskReserveFunc); } void WriteIndexSnapshotLocked(const std::function& ClaimDiskReserveFunc = []() { return 0; }); void CompactState(RwLock::ExclusiveLockScope&, std::vector& Payloads, std::vector& AccessTimes, std::vector& MetaDatas, std::vector& MemCachedPayloads, std::vector& FirstReferenceIndex, IndexMap& Index, RwLock::ExclusiveLockScope& IndexLock); void AddMemCacheUsage(uint64_t ValueSize) { m_MemCachedSize.fetch_add(ValueSize, std::memory_order::relaxed); m_OuterCacheMemoryUsage.fetch_add(ValueSize, std::memory_order::relaxed); } void RemoveMemCacheUsage(uint64_t ValueSize) { m_MemCachedSize.fetch_sub(ValueSize, std::memory_order::relaxed); m_OuterCacheMemoryUsage.fetch_sub(ValueSize, std::memory_order::relaxed); } static inline uint64_t EstimateMemCachePayloadMemory(uint64_t PayloadSize) { return sizeof(MemCacheData) + sizeof(IoBufferCore) + RoundUp(PayloadSize, 8u); } // These locks are here to avoid contention on file creation, therefore it's sufficient // that we take the same lock for the same hash // // These locks are small and should really be spaced out so they don't share cache lines, // but we don't currently access them at particularly high frequency so it should not be // an issue in practice mutable RwLock m_ShardedLocks[256]; inline RwLock& LockForHash(const IoHash& Hash) const { return m_ShardedLocks[Hash.Hash[19]]; } friend class DiskBucketReferenceChecker; friend class DiskBucketStoreCompactor; friend class BucketManifestSerializer; }; private: CacheBucket* GetOrCreateBucket(std::string_view InBucket); inline void TryMemCacheTrim() { if (m_Configuration.MemCacheTargetFootprintBytes == 0) { return; } if (m_Configuration.MemCacheMaxAgeSeconds == 0 || m_Configuration.MemCacheTrimIntervalSeconds == 0) { return; } if (m_TotalMemCachedSize <= m_Configuration.MemCacheTargetFootprintBytes) { return; } if (m_IsMemCacheTrimming) { return; } const GcClock::Tick NowTick = GcClock::TickCount(); if (NowTick < m_NextAllowedTrimTick) { return; } MemCacheTrim(); } void MemCacheTrim(); uint64_t MemCacheTrim(std::vector& Buckets, GcClock::TimePoint ExpireTime); GcManager& m_Gc; JobQueue& m_JobQueue; std::filesystem::path m_RootDir; Configuration m_Configuration; std::atomic_uint64_t m_TotalMemCachedSize{}; std::atomic_bool m_IsMemCacheTrimming = false; std::atomic m_NextAllowedTrimTick; mutable RwLock m_Lock; std::unordered_map> m_Buckets; std::vector> m_DroppedBuckets; ZenCacheDiskLayer(const ZenCacheDiskLayer&) = delete; ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete; friend class DiskBucketStoreCompactor; friend class DiskBucketReferenceChecker; }; } // namespace zen