// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include "cacheshared.h" #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END #include #include namespace zen { class IoBuffer; class JobQueue; #pragma pack(push) #pragma pack(1) struct DiskLocation { inline DiskLocation() = default; inline DiskLocation(uint64_t ValueSize, uint8_t Flags) : Flags(Flags | kStandaloneFile) { Location.StandaloneSize = ValueSize; } inline DiskLocation(const BlockStoreLocation& Location, uint64_t PayloadAlignment, uint8_t Flags) : Flags(Flags & ~kStandaloneFile) { this->Location.BlockLocation = BlockStoreDiskLocation(Location, PayloadAlignment); } inline bool operator!=(const DiskLocation& Rhs) const { return memcmp(&Location, &Rhs.Location, sizeof(Location)) != 0; } inline BlockStoreLocation GetBlockLocation(uint64_t PayloadAlignment) const { ZEN_ASSERT(!(Flags & kStandaloneFile)); return Location.BlockLocation.Get(PayloadAlignment); } inline uint64_t Size() const { return (Flags & kStandaloneFile) ? Location.StandaloneSize : Location.BlockLocation.GetSize(); } inline uint8_t IsFlagSet(uint64_t Flag) const { return Flags & Flag; } inline uint8_t GetFlags() const { return Flags; } inline ZenContentType GetContentType() const { ZenContentType ContentType = ZenContentType::kBinary; if (IsFlagSet(kStructured)) { ContentType = ZenContentType::kCbObject; } if (IsFlagSet(kCompressed)) { ContentType = ZenContentType::kCompressedBinary; } return ContentType; } union { BlockStoreDiskLocation BlockLocation; // 10 bytes uint64_t StandaloneSize = 0; // 8 bytes } Location; static const uint8_t kStandaloneFile = 0x80u; // Stored as a separate file static const uint8_t kStructured = 0x40u; // Serialized as compact binary static const uint8_t kTombStone = 0x20u; // Represents a deleted key/value static const uint8_t kCompressed = 0x10u; // Stored in compressed buffer format uint8_t Flags = 0; uint8_t Reserved = 0; }; struct DiskIndexEntry { IoHash Key; // 20 bytes DiskLocation Location; // 12 bytes }; #pragma pack(pop) static_assert(sizeof(DiskIndexEntry) == 32); ////////////////////////////////////////////////////////////////////////// class ZenCacheDiskLayer { public: struct BucketConfiguration { uint64_t MaxBlockSize = 1ull << 30; uint64_t PayloadAlignment = 1ull << 4; uint64_t MemCacheSizeThreshold = 1 * 1024; uint64_t LargeObjectThreshold = 128 * 1024; bool EnableReferenceCaching = false; }; struct Configuration { BucketConfiguration BucketConfig; uint64_t MemCacheTargetFootprintBytes = 512 * 1024 * 1024; uint64_t MemCacheTrimIntervalSeconds = 60; uint64_t MemCacheMaxAgeSeconds = gsl::narrow(std::chrono::seconds(std::chrono::days(1)).count()); }; struct BucketInfo { uint64_t EntryCount = 0; GcStorageSize StorageSize; }; struct Info { std::filesystem::path RootDir; Configuration Config; std::vector BucketNames; uint64_t EntryCount = 0; GcStorageSize StorageSize; }; struct BucketStats { uint64_t DiskSize; uint64_t MemorySize; uint64_t DiskHitCount; uint64_t DiskMissCount; uint64_t DiskWriteCount; uint64_t MemoryHitCount; uint64_t MemoryMissCount; uint64_t MemoryWriteCount; metrics::RequestStatsSnapshot PutOps; metrics::RequestStatsSnapshot GetOps; }; struct NamedBucketStats { std::string BucketName; BucketStats Stats; }; struct DiskStats { std::vector BucketStats; uint64_t DiskSize; uint64_t MemorySize; }; explicit ZenCacheDiskLayer(JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config); ~ZenCacheDiskLayer(); bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span References); bool Drop(); bool DropBucket(std::string_view Bucket); void Flush(); void ScrubStorage(ScrubContext& Ctx); void GatherReferences(GcContext& GcCtx); void CollectGarbage(GcContext& GcCtx); void DiscoverBuckets(); GcStorageSize StorageSize() const; DiskStats Stats() const; Info GetInfo() const; std::optional GetBucketInfo(std::string_view Bucket) const; void EnumerateBucketContents(std::string_view Bucket, std::function& Fn) const; CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const; private: /** A cache bucket manages a single directory containing metadata and data for that bucket */ struct CacheBucket { CacheBucket(std::string BucketName, const BucketConfiguration& Config); ~CacheBucket(); bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); bool Get(const IoHash& HashKey, ZenCacheValue& OutValue, std::atomic_uint64_t& CacheMemoryUsage); void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span References, std::atomic_uint64_t& CacheMemoryUsage); void MemCacheTrim(GcClock::TimePoint ExpireTime, std::atomic_uint64_t& CacheMemoryUsage); bool Drop(std::atomic_uint64_t& CacheMemoryUsage); void Flush(); void ScrubStorage(ScrubContext& Ctx, std::atomic_uint64_t& CacheMemoryUsage); void GatherReferences(GcContext& GcCtx); void CollectGarbage(GcContext& GcCtx, std::atomic_uint64_t& CacheMemoryUsage); inline GcStorageSize StorageSize() const { return {.DiskSize = m_StandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(), .MemorySize = m_MemCachedSize.load(std::memory_order::relaxed)}; } uint64_t EntryCount() const; BucketStats Stats(); CacheValueDetails::BucketDetails GetValueDetails(const std::string_view ValueFilter) const; void EnumerateBucketContents(std::function& Fn) const; void GetUsageByAccess(GcClock::TimePoint TickStart, GcClock::Duration SectionLength, std::vector& InOutUsageSlots); private: std::string m_BucketName; std::filesystem::path m_BucketDir; std::filesystem::path m_BlocksBasePath; BucketConfiguration m_Configuration; BlockStore m_BlockStore; Oid m_BucketId; std::atomic_bool m_IsFlushing{}; // These files are used to manage storage of small objects for this bucket TCasLogFile m_SlogFile; uint64_t m_LogFlushPosition = 0; #pragma pack(push) #pragma pack(1) static const size_t UnknownReferencesIndex = (size_t)-1; static const size_t NoReferencesIndex = (size_t)-2; struct BucketPayload { DiskLocation Location; // 12 uint64_t RawSize; // 8 IoHash RawHash; // 20 }; #pragma pack(pop) static_assert(sizeof(BucketPayload) == 40u); static_assert(sizeof(AccessTime) == 4u); using IndexMap = tsl::robin_map; std::atomic m_DiskHitCount; std::atomic m_DiskMissCount; std::atomic m_DiskWriteCount; std::atomic m_MemoryHitCount; std::atomic m_MemoryMissCount; std::atomic m_MemoryWriteCount; metrics::RequestStats m_PutOps; metrics::RequestStats m_GetOps; mutable RwLock m_IndexLock; IndexMap m_Index; std::vector m_AccessTimes; std::vector m_Payloads; std::vector m_CachedPayloads; std::vector m_FirstReferenceIndex; std::vector m_ReferenceHashes; std::vector m_NextReferenceHashesIndexes; size_t m_ReferenceCount = 0; std::atomic_uint64_t m_StandaloneSize{}; std::atomic_uint64_t m_MemCachedSize{}; void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const; void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span References, std::atomic_uint64_t& CacheMemoryUsage); IoBuffer GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const; void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span References, std::atomic_uint64_t& CacheMemoryUsage); IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const; void MakeIndexSnapshot(); uint64_t ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion); uint64_t ReadLog(const std::filesystem::path& LogPath, uint64_t LogPosition); void OpenLog(const bool IsNew); CbObject MakeManifest(IndexMap&& Index, std::vector&& AccessTimes, const std::vector& Payloads); void SaveManifest(CbObject&& Manifest); CacheValueDetails::ValueDetails GetValueDetails(const IoHash& Key, size_t Index) const; void CompactReferences(RwLock::ExclusiveLockScope&); void SetReferences(RwLock::ExclusiveLockScope&, std::size_t& FirstReferenceIndex, std::span References); void RemoveReferences(RwLock::ExclusiveLockScope&, std::size_t& FirstReferenceIndex); inline bool GetReferences(RwLock::SharedLockScope&, std::size_t FirstReferenceIndex, std::vector& OutReferences) const { return LockedGetReferences(FirstReferenceIndex, OutReferences); } inline bool GetReferences(RwLock::ExclusiveLockScope&, std::size_t FirstReferenceIndex, std::vector& OutReferences) const { return LockedGetReferences(FirstReferenceIndex, OutReferences); } size_t AllocateReferenceEntry(RwLock::ExclusiveLockScope&, const IoHash& Key); bool LockedGetReferences(std::size_t FirstReferenceIndex, std::vector& OutReferences) const; void CompactState(std::vector& TmpPayloads, std::vector& TmpAccessTimes, std::vector& TmpCachedPayloads, std::vector& TmpFirstReferenceIndex, IndexMap& TmpIndex, RwLock::ExclusiveLockScope& IndexLock); // These locks are here to avoid contention on file creation, therefore it's sufficient // that we take the same lock for the same hash // // These locks are small and should really be spaced out so they don't share cache lines, // but we don't currently access them at particularly high frequency so it should not be // an issue in practice mutable RwLock m_ShardedLocks[256]; inline RwLock& LockForHash(const IoHash& Hash) const { return m_ShardedLocks[Hash.Hash[19]]; } }; inline void TryMemCacheTrim() { if (m_Configuration.MemCacheTargetFootprintBytes == 0) { return; } if (m_Configuration.MemCacheMaxAgeSeconds == 0 || m_Configuration.MemCacheTrimIntervalSeconds == 0) { return; } if (m_TotalMemCachedSize <= m_Configuration.MemCacheTargetFootprintBytes) { return; } MemCacheTrim(); } void MemCacheTrim(); void MemCacheTrim(std::vector& Buckets, GcClock::TimePoint ExpireTime); JobQueue& m_JobQueue; std::filesystem::path m_RootDir; Configuration m_Configuration; std::atomic_uint64_t m_TotalMemCachedSize{}; std::atomic_bool m_IsMemCacheTrimming = false; std::atomic m_LastTickMemCacheTrim; mutable RwLock m_Lock; std::unordered_map> m_Buckets; std::vector> m_DroppedBuckets; ZenCacheDiskLayer(const ZenCacheDiskLayer&) = delete; ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete; }; } // namespace zen