// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include "cacheshared.h" #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END #include namespace zen { class IoBuffer; #pragma pack(push) #pragma pack(1) struct DiskLocation { inline DiskLocation() = default; inline DiskLocation(uint64_t ValueSize, uint8_t Flags) : Flags(Flags | kStandaloneFile) { Location.StandaloneSize = ValueSize; } inline DiskLocation(const BlockStoreLocation& Location, uint64_t PayloadAlignment, uint8_t Flags) : Flags(Flags & ~kStandaloneFile) { this->Location.BlockLocation = BlockStoreDiskLocation(Location, PayloadAlignment); } inline bool operator!=(const DiskLocation& Rhs) const { return memcmp(&Location, &Rhs.Location, sizeof(Location)) != 0; } inline BlockStoreLocation GetBlockLocation(uint64_t PayloadAlignment) const { ZEN_ASSERT(!(Flags & kStandaloneFile)); return Location.BlockLocation.Get(PayloadAlignment); } inline uint64_t Size() const { return (Flags & kStandaloneFile) ? Location.StandaloneSize : Location.BlockLocation.GetSize(); } inline uint8_t IsFlagSet(uint64_t Flag) const { return Flags & Flag; } inline uint8_t GetFlags() const { return Flags; } inline ZenContentType GetContentType() const { ZenContentType ContentType = ZenContentType::kBinary; if (IsFlagSet(kStructured)) { ContentType = ZenContentType::kCbObject; } if (IsFlagSet(kCompressed)) { ContentType = ZenContentType::kCompressedBinary; } return ContentType; } union { BlockStoreDiskLocation BlockLocation; // 10 bytes uint64_t StandaloneSize = 0; // 8 bytes } Location; static const uint8_t kStandaloneFile = 0x80u; // Stored as a separate file static const uint8_t kStructured = 0x40u; // Serialized as compact binary static const uint8_t kTombStone = 0x20u; // Represents a deleted key/value static const uint8_t kCompressed = 0x10u; // Stored in compressed buffer format uint8_t Flags = 0; uint8_t Reserved = 0; }; struct DiskIndexEntry { IoHash Key; // 20 bytes DiskLocation Location; // 12 bytes }; #pragma pack(pop) static_assert(sizeof(DiskIndexEntry) == 32); ////////////////////////////////////////////////////////////////////////// class ZenCacheDiskLayer { public: struct Configuration { std::filesystem::path RootDir; bool EnableReferenceCaching; }; struct BucketInfo { uint64_t EntryCount = 0; uint64_t TotalSize = 0; }; struct Info { Configuration Config; std::vector BucketNames; uint64_t EntryCount = 0; uint64_t TotalSize = 0; }; struct BucketStats { uint64_t TotalSize; uint64_t HitCount; uint64_t MissCount; uint64_t WriteCount; metrics::RequestStatsSnapshot PutOps; metrics::RequestStatsSnapshot GetOps; }; struct NamedBucketStats { std::string BucketName; BucketStats Stats; }; struct DiskStats { std::vector BucketStats; }; explicit ZenCacheDiskLayer(const std::filesystem::path& RootDir, bool EnableReferenceCaching); ~ZenCacheDiskLayer(); bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span References); bool Drop(); bool DropBucket(std::string_view Bucket); void Flush(); void ScrubStorage(ScrubContext& Ctx); void GatherReferences(GcContext& GcCtx); void CollectGarbage(GcContext& GcCtx); void UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes); void DiscoverBuckets(); uint64_t TotalSize() const; DiskStats Stats() const; Info GetInfo() const; std::optional GetBucketInfo(std::string_view Bucket) const; void EnumerateBucketContents(std::string_view Bucket, std::function& Fn) const; CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const; private: /** A cache bucket manages a single directory containing metadata and data for that bucket */ struct CacheBucket { CacheBucket(std::string BucketName, bool EnableReferenceCaching); ~CacheBucket(); bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span References); bool Drop(); void Flush(); void ScrubStorage(ScrubContext& Ctx); void GatherReferences(GcContext& GcCtx); void CollectGarbage(GcContext& GcCtx); void UpdateAccessTimes(const std::vector& AccessTimes); inline uint64_t TotalSize() const { return m_TotalStandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(); } uint64_t EntryCount() const; BucketStats Stats(); CacheValueDetails::BucketDetails GetValueDetails(const std::string_view ValueFilter) const; void EnumerateBucketContents(std::function& Fn) const; private: const uint64_t MaxBlockSize = 1ull << 30; uint64_t m_PayloadAlignment = 1ull << 4; std::string m_BucketName; std::filesystem::path m_BucketDir; std::filesystem::path m_BlocksBasePath; BlockStore m_BlockStore; Oid m_BucketId; uint64_t m_LargeObjectThreshold = 128 * 1024; std::atomic_bool m_IsFlushing{}; const bool m_EnableReferenceCaching = false; // These files are used to manage storage of small objects for this bucket TCasLogFile m_SlogFile; uint64_t m_LogFlushPosition = 0; #pragma pack(push) #pragma pack(1) static const size_t UnknownReferencesIndex = (size_t)-1; static const size_t NoReferencesIndex = (size_t)-2; struct BucketPayload { DiskLocation Location; // 12 uint64_t RawSize; // 8 IoHash RawHash; // 20 }; #pragma pack(pop) static_assert(sizeof(BucketPayload) == 40u); static_assert(sizeof(AccessTime) == 4u); using IndexMap = tsl::robin_map; std::atomic m_HitCount; std::atomic m_MissCount; std::atomic m_WriteCount; metrics::RequestStats m_PutOps; metrics::RequestStats m_GetOps; mutable RwLock m_IndexLock; IndexMap m_Index; std::vector m_AccessTimes; std::vector m_Payloads; std::vector m_FirstReferenceIndex; std::vector m_ReferenceHashes; std::vector m_NextReferenceHashesIndexes; size_t m_ReferenceCount = 0; std::atomic_uint64_t m_TotalStandaloneSize{}; void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const; void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span References); IoBuffer GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const; void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span References); IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const; void MakeIndexSnapshot(); uint64_t ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion); uint64_t ReadLog(const std::filesystem::path& LogPath, uint64_t LogPosition); void OpenLog(const bool IsNew); CbObject MakeManifest(IndexMap&& Index, std::vector&& AccessTimes, const std::vector& Payloads); void SaveManifest(CbObject&& Manifest); CacheValueDetails::ValueDetails GetValueDetails(const IoHash& Key, size_t Index) const; void CompactReferences(RwLock::ExclusiveLockScope&); void SetReferences(RwLock::ExclusiveLockScope&, std::size_t& FirstReferenceIndex, std::span References); void RemoveReferences(RwLock::ExclusiveLockScope&, std::size_t& FirstReferenceIndex); inline bool GetReferences(RwLock::SharedLockScope&, std::size_t FirstReferenceIndex, std::vector& OutReferences) const { return LockedGetReferences(FirstReferenceIndex, OutReferences); } inline bool GetReferences(RwLock::ExclusiveLockScope&, std::size_t FirstReferenceIndex, std::vector& OutReferences) const { return LockedGetReferences(FirstReferenceIndex, OutReferences); } size_t AllocateReferenceEntry(RwLock::ExclusiveLockScope&, const IoHash& Key); bool LockedGetReferences(std::size_t FirstReferenceIndex, std::vector& OutReferences) const; // These locks are here to avoid contention on file creation, therefore it's sufficient // that we take the same lock for the same hash // // These locks are small and should really be spaced out so they don't share cache lines, // but we don't currently access them at particularly high frequency so it should not be // an issue in practice mutable RwLock m_ShardedLocks[256]; inline RwLock& LockForHash(const IoHash& Hash) const { return m_ShardedLocks[Hash.Hash[19]]; } }; std::filesystem::path m_RootDir; mutable RwLock m_Lock; std::unordered_map> m_Buckets; // TODO: make this case insensitive std::vector> m_DroppedBuckets; const bool m_EnableReferenceCaching; ZenCacheDiskLayer(const ZenCacheDiskLayer&) = delete; ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete; }; } // namespace zen