diff options
| author | Stefan Boberg <[email protected]> | 2023-12-19 21:49:55 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-12-19 21:49:55 +0100 |
| commit | 8a90a40b4517d198855c1e52740a7e7fb21ecc20 (patch) | |
| tree | ffd8524bcebf8841b69725934f31d438a03e7746 /src/zenstore/include | |
| parent | 0.2.38 (diff) | |
| download | zen-8a90a40b4517d198855c1e52740a7e7fb21ecc20.tar.xz zen-8a90a40b4517d198855c1e52740a7e7fb21ecc20.zip | |
move cachedisklayer and structuredcachestore into zenstore (#624)
Diffstat (limited to 'src/zenstore/include')
| -rw-r--r-- | src/zenstore/include/zenstore/cachedisklayer.h | 482 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cacheshared.h | 100 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/structuredcachestore.h | 271 |
3 files changed, 853 insertions, 0 deletions
diff --git a/src/zenstore/include/zenstore/cachedisklayer.h b/src/zenstore/include/zenstore/cachedisklayer.h new file mode 100644 index 000000000..6997a12e4 --- /dev/null +++ b/src/zenstore/include/zenstore/cachedisklayer.h @@ -0,0 +1,482 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "cacheshared.h" + +#include <zencore/stats.h> +#include <zenstore/blockstore.h> +#include <zenstore/caslog.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> +ZEN_THIRD_PARTY_INCLUDES_END + +#include <filesystem> +#include <unordered_map> + +namespace zen { + +class IoBuffer; +class JobQueue; + +#pragma pack(push) +#pragma pack(1) + +struct DiskLocation +{ + inline DiskLocation() = default; + + inline DiskLocation(uint64_t ValueSize, uint8_t Flags) : Flags(Flags | kStandaloneFile) { Location.StandaloneSize = ValueSize; } + + inline DiskLocation(const BlockStoreLocation& Location, uint32_t PayloadAlignment, uint8_t Flags) : Flags(Flags & ~kStandaloneFile) + { + this->Location.BlockLocation = BlockStoreDiskLocation(Location, PayloadAlignment); + } + + inline bool operator!=(const DiskLocation& Rhs) const + { + if (Flags != Rhs.Flags) + { + return true; + } + if (Flags & kStandaloneFile) + { + return Location.StandaloneSize != Rhs.Location.StandaloneSize; + } + return Location.BlockLocation != Rhs.Location.BlockLocation; + } + + inline BlockStoreLocation GetBlockLocation(uint32_t PayloadAlignment) const + { + ZEN_ASSERT(!(Flags & kStandaloneFile)); + return Location.BlockLocation.Get(PayloadAlignment); + } + + inline uint64_t Size() const { return (Flags & kStandaloneFile) ? Location.StandaloneSize : Location.BlockLocation.GetSize(); } + inline uint8_t IsFlagSet(uint64_t Flag) const { return Flags & Flag; } + inline uint8_t GetFlags() const { return Flags; } + inline ZenContentType GetContentType() const + { + ZenContentType ContentType = ZenContentType::kBinary; + + if (IsFlagSet(kStructured)) + { + ContentType = ZenContentType::kCbObject; + } + + if (IsFlagSet(kCompressed)) + { + ContentType = ZenContentType::kCompressedBinary; + } + + return ContentType; + } + + union + { + BlockStoreDiskLocation BlockLocation; // 10 bytes + uint64_t StandaloneSize = 0; // 8 bytes + } Location; + + static const uint8_t kStandaloneFile = 0x80u; // Stored as a separate file + static const uint8_t kStructured = 0x40u; // Serialized as compact binary + static const uint8_t kTombStone = 0x20u; // Represents a deleted key/value + static const uint8_t kCompressed = 0x10u; // Stored in compressed buffer format + + uint8_t Flags = 0; + uint8_t Reserved = 0; +}; + +struct DiskIndexEntry +{ + IoHash Key; // 20 bytes + DiskLocation Location; // 12 bytes +}; + +#pragma pack(pop) + +static_assert(sizeof(DiskIndexEntry) == 32); + +////////////////////////////////////////////////////////////////////////// + +class ZenCacheDiskLayer +{ +public: + struct BucketConfiguration + { + uint64_t MaxBlockSize = 1ull << 30; + uint32_t PayloadAlignment = 1u << 4; + uint64_t MemCacheSizeThreshold = 1 * 1024; + uint64_t LargeObjectThreshold = 128 * 1024; + bool EnableReferenceCaching = false; + }; + + struct Configuration + { + BucketConfiguration BucketConfig; + uint64_t MemCacheTargetFootprintBytes = 512 * 1024 * 1024; + uint64_t MemCacheTrimIntervalSeconds = 60; + uint64_t MemCacheMaxAgeSeconds = gsl::narrow<uint64_t>(std::chrono::seconds(std::chrono::days(1)).count()); + }; + + struct BucketInfo + { + uint64_t EntryCount = 0; + GcStorageSize StorageSize; + }; + + struct Info + { + std::filesystem::path RootDir; + Configuration Config; + std::vector<std::string> BucketNames; + uint64_t EntryCount = 0; + GcStorageSize StorageSize; + }; + + struct BucketStats + { + uint64_t DiskSize; + uint64_t MemorySize; + uint64_t DiskHitCount; + uint64_t DiskMissCount; + uint64_t DiskWriteCount; + uint64_t MemoryHitCount; + uint64_t MemoryMissCount; + uint64_t MemoryWriteCount; + metrics::RequestStatsSnapshot PutOps; + metrics::RequestStatsSnapshot GetOps; + }; + + struct NamedBucketStats + { + std::string BucketName; + BucketStats Stats; + }; + + struct DiskStats + { + std::vector<NamedBucketStats> BucketStats; + uint64_t DiskSize; + uint64_t MemorySize; + }; + + explicit ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config); + ~ZenCacheDiskLayer(); + + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); + bool Drop(); + bool DropBucket(std::string_view Bucket); + void Flush(); + void ScrubStorage(ScrubContext& Ctx); + void GatherReferences(GcContext& GcCtx); + void CollectGarbage(GcContext& GcCtx); + + void DiscoverBuckets(); + GcStorageSize StorageSize() const; + DiskStats Stats() const; + + Info GetInfo() const; + std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const; + void EnumerateBucketContents(std::string_view Bucket, + std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const; + + CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const; + +#if ZEN_WITH_TESTS + void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time); +#endif // ZEN_WITH_TESTS + + /** A cache bucket manages a single directory containing + metadata and data for that bucket + */ + struct CacheBucket : public GcReferencer + { + CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, std::string BucketName, const BucketConfiguration& Config); + ~CacheBucket(); + + bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); + bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); + uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime); + bool Drop(); + void Flush(); + void ScrubStorage(ScrubContext& Ctx); + void GatherReferences(GcContext& GcCtx); + void CollectGarbage(GcContext& GcCtx); + + inline GcStorageSize StorageSize() const + { + return {.DiskSize = m_StandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(), + .MemorySize = m_MemCachedSize.load(std::memory_order::relaxed)}; + } + uint64_t EntryCount() const; + BucketStats Stats(); + + CacheValueDetails::BucketDetails GetValueDetails(RwLock::SharedLockScope& IndexLock, const std::string_view ValueFilter) const; + void EnumerateBucketContents(std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const; + + void GetUsageByAccess(GcClock::TimePoint Now, GcClock::Duration MaxAge, std::vector<uint64_t>& InOutUsageSlots); +#if ZEN_WITH_TESTS + void SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time); +#endif // ZEN_WITH_TESTS + + private: +#pragma pack(push) +#pragma pack(1) + struct MetaDataIndex + { + uint32_t Index = std::numeric_limits<uint32_t>::max(); + + operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); }; + MetaDataIndex() = default; + explicit MetaDataIndex(size_t InIndex) : Index(uint32_t(InIndex)) {} + operator size_t() const { return Index; }; + inline auto operator<=>(const MetaDataIndex& Other) const = default; + }; + + struct MemCachedIndex + { + uint32_t Index = std::numeric_limits<uint32_t>::max(); + + operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); }; + MemCachedIndex() = default; + explicit MemCachedIndex(uint32_t InIndex) : Index(InIndex) {} + operator size_t() const { return Index; }; + inline auto operator<=>(const MemCachedIndex& Other) const = default; + }; + + struct ReferenceIndex + { + uint32_t Index = std::numeric_limits<uint32_t>::max(); + + static const ReferenceIndex Unknown() { return ReferenceIndex{std::numeric_limits<uint32_t>::max()}; } + static const ReferenceIndex None() { return ReferenceIndex{std::numeric_limits<uint32_t>::max() - 1}; } + + ReferenceIndex() = default; + explicit ReferenceIndex(size_t InIndex) : Index(uint32_t(InIndex)) {} + operator size_t() const { return Index; }; + operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); }; + inline auto operator<=>(const ReferenceIndex& Other) const = default; + }; + + struct PayloadIndex + { + uint32_t Index = std::numeric_limits<uint32_t>::max(); + + operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); }; + PayloadIndex() = default; + explicit PayloadIndex(size_t InIndex) : Index(uint32_t(InIndex)) {} + operator size_t() const { return Index; }; + inline auto operator<=>(const PayloadIndex& Other) const = default; + }; + + struct BucketPayload + { + DiskLocation Location; // 12 + MetaDataIndex MetaData; // 4 + MemCachedIndex MemCached; // 4 + }; + struct BucketMetaData + { + uint64_t RawSize = 0; // 8 + IoHash RawHash; // 20 + + operator bool() const { return RawSize != 0 || RawHash != IoHash::Zero; }; + }; + struct MemCacheData + { + IoBuffer Payload; + PayloadIndex OwnerIndex; + }; +#pragma pack(pop) + static_assert(sizeof(BucketPayload) == 20u); + static_assert(sizeof(BucketMetaData) == 28u); + static_assert(sizeof(AccessTime) == 4u); + + using IndexMap = tsl::robin_map<IoHash, PayloadIndex, IoHash::Hasher>; + + GcManager& m_Gc; + std::atomic_uint64_t& m_OuterCacheMemoryUsage; + std::string m_BucketName; + std::filesystem::path m_BucketDir; + std::filesystem::path m_BlocksBasePath; + BucketConfiguration m_Configuration; + BlockStore m_BlockStore; + Oid m_BucketId; + std::atomic_bool m_IsFlushing{true}; // Don't allow flush until we are properly initialized + + // These files are used to manage storage of small objects for this bucket + + TCasLogFile<DiskIndexEntry> m_SlogFile; + uint64_t m_LogFlushPosition = 0; + + std::atomic<uint64_t> m_DiskHitCount; + std::atomic<uint64_t> m_DiskMissCount; + std::atomic<uint64_t> m_DiskWriteCount; + std::atomic<uint64_t> m_MemoryHitCount; + std::atomic<uint64_t> m_MemoryMissCount; + std::atomic<uint64_t> m_MemoryWriteCount; + metrics::RequestStats m_PutOps; + metrics::RequestStats m_GetOps; + + mutable RwLock m_IndexLock; + IndexMap m_Index; + std::vector<AccessTime> m_AccessTimes; + std::vector<BucketPayload> m_Payloads; + std::vector<BucketMetaData> m_MetaDatas; + std::vector<MetaDataIndex> m_FreeMetaDatas; + std::vector<MemCacheData> m_MemCachedPayloads; + std::vector<MemCachedIndex> m_FreeMemCachedPayloads; + std::vector<ReferenceIndex> m_FirstReferenceIndex; + std::vector<IoHash> m_ReferenceHashes; + std::vector<ReferenceIndex> m_NextReferenceHashesIndexes; + std::unique_ptr<HashSet> m_UpdatedKeys; + size_t m_ReferenceCount = 0; + std::atomic_uint64_t m_StandaloneSize{}; + std::atomic_uint64_t m_MemCachedSize{}; + + virtual std::string GetGcName(GcCtx& Ctx) override; + virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override; + virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override; + + void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const; + void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); + IoBuffer GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const; + void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); + IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const; + CacheValueDetails::ValueDetails GetValueDetails(RwLock::SharedLockScope&, const IoHash& Key, PayloadIndex Index) const; + + void CompactReferences(RwLock::ExclusiveLockScope&); + void SetReferences(RwLock::ExclusiveLockScope&, ReferenceIndex& FirstReferenceIndex, std::span<IoHash> References); + void RemoveReferences(RwLock::ExclusiveLockScope&, ReferenceIndex& FirstReferenceIndex); + inline bool GetReferences(RwLock::SharedLockScope&, ReferenceIndex FirstReferenceIndex, std::vector<IoHash>& OutReferences) const + { + return LockedGetReferences(FirstReferenceIndex, OutReferences); + } + inline bool GetReferences(RwLock::ExclusiveLockScope&, ReferenceIndex FirstReferenceIndex, std::vector<IoHash>& OutReferences) const + { + return LockedGetReferences(FirstReferenceIndex, OutReferences); + } + ReferenceIndex AllocateReferenceEntry(RwLock::ExclusiveLockScope&, const IoHash& Key); + bool LockedGetReferences(ReferenceIndex FirstReferenceIndex, std::vector<IoHash>& OutReferences) const; + void ClearReferenceCache(); + + void SetMetaData(RwLock::ExclusiveLockScope&, + BucketPayload& Payload, + const ZenCacheDiskLayer::CacheBucket::BucketMetaData& MetaData); + void RemoveMetaData(RwLock::ExclusiveLockScope&, BucketPayload& Payload); + BucketMetaData GetMetaData(RwLock::SharedLockScope&, const BucketPayload& Payload) const; + void SetMemCachedData(RwLock::ExclusiveLockScope&, PayloadIndex PayloadIndex, IoBuffer& MemCachedData); + size_t RemoveMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload); + + void InitializeIndexFromDisk(RwLock::ExclusiveLockScope&, bool IsNew); + uint64_t ReadIndexFile(RwLock::ExclusiveLockScope&, const std::filesystem::path& IndexPath, uint32_t& OutVersion); + uint64_t ReadLog(RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t LogPosition); + + void SaveSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; }); + void WriteIndexSnapshot( + RwLock::ExclusiveLockScope&, + const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; }) + { + WriteIndexSnapshotLocked(ClaimDiskReserveFunc); + } + void WriteIndexSnapshot( + RwLock::SharedLockScope&, + const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; }) + { + WriteIndexSnapshotLocked(ClaimDiskReserveFunc); + } + void WriteIndexSnapshotLocked(const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; }); + + void CompactState(RwLock::ExclusiveLockScope&, + std::vector<BucketPayload>& Payloads, + std::vector<AccessTime>& AccessTimes, + std::vector<BucketMetaData>& MetaDatas, + std::vector<MemCacheData>& MemCachedPayloads, + std::vector<ReferenceIndex>& FirstReferenceIndex, + IndexMap& Index, + RwLock::ExclusiveLockScope& IndexLock); + + void AddMemCacheUsage(uint64_t ValueSize) + { + m_MemCachedSize.fetch_add(ValueSize, std::memory_order::relaxed); + m_OuterCacheMemoryUsage.fetch_add(ValueSize, std::memory_order::relaxed); + } + void RemoveMemCacheUsage(uint64_t ValueSize) + { + m_MemCachedSize.fetch_sub(ValueSize, std::memory_order::relaxed); + m_OuterCacheMemoryUsage.fetch_sub(ValueSize, std::memory_order::relaxed); + } + static inline uint64_t EstimateMemCachePayloadMemory(uint64_t PayloadSize) + { + return sizeof(MemCacheData) + sizeof(IoBufferCore) + RoundUp(PayloadSize, 8u); + } + + // These locks are here to avoid contention on file creation, therefore it's sufficient + // that we take the same lock for the same hash + // + // These locks are small and should really be spaced out so they don't share cache lines, + // but we don't currently access them at particularly high frequency so it should not be + // an issue in practice + mutable RwLock m_ShardedLocks[256]; + inline RwLock& LockForHash(const IoHash& Hash) const { return m_ShardedLocks[Hash.Hash[19]]; } + + friend class DiskBucketReferenceChecker; + friend class DiskBucketStoreCompactor; + friend class BucketManifestSerializer; + }; + +private: + CacheBucket* GetOrCreateBucket(std::string_view InBucket); + inline void TryMemCacheTrim() + { + if (m_Configuration.MemCacheTargetFootprintBytes == 0) + { + return; + } + if (m_Configuration.MemCacheMaxAgeSeconds == 0 || m_Configuration.MemCacheTrimIntervalSeconds == 0) + { + return; + } + if (m_TotalMemCachedSize <= m_Configuration.MemCacheTargetFootprintBytes) + { + return; + } + if (m_IsMemCacheTrimming) + { + return; + } + + const GcClock::Tick NowTick = GcClock::TickCount(); + if (NowTick < m_NextAllowedTrimTick) + { + return; + } + + MemCacheTrim(); + } + void MemCacheTrim(); + uint64_t MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime); + + GcManager& m_Gc; + JobQueue& m_JobQueue; + std::filesystem::path m_RootDir; + Configuration m_Configuration; + std::atomic_uint64_t m_TotalMemCachedSize{}; + std::atomic_bool m_IsMemCacheTrimming = false; + std::atomic<GcClock::Tick> m_NextAllowedTrimTick; + mutable RwLock m_Lock; + std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets; + std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets; + + ZenCacheDiskLayer(const ZenCacheDiskLayer&) = delete; + ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete; + + friend class DiskBucketStoreCompactor; + friend class DiskBucketReferenceChecker; +}; + +} // namespace zen diff --git a/src/zenstore/include/zenstore/cacheshared.h b/src/zenstore/include/zenstore/cacheshared.h new file mode 100644 index 000000000..e3e8a2f84 --- /dev/null +++ b/src/zenstore/include/zenstore/cacheshared.h @@ -0,0 +1,100 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/iobuffer.h> +#include <zencore/iohash.h> +#include <zenstore/gc.h> + +#include <gsl/gsl-lite.hpp> +#include <unordered_map> + +namespace zen { + +namespace access_tracking { + + struct KeyAccessTime + { + IoHash Key; + GcClock::Tick LastAccess{}; + }; + + struct AccessTimes + { + std::unordered_map<std::string, std::vector<KeyAccessTime>> Buckets; + }; +}; // namespace access_tracking + +struct ZenCacheValue +{ + IoBuffer Value; + uint64_t RawSize = 0; + IoHash RawHash = IoHash::Zero; +}; + +struct CacheValueDetails +{ + struct ValueDetails + { + uint64_t Size; + uint64_t RawSize; + IoHash RawHash; + GcClock::Tick LastAccess{}; + std::vector<IoHash> Attachments; + ZenContentType ContentType; + }; + + struct BucketDetails + { + std::unordered_map<IoHash, ValueDetails, IoHash::Hasher> Values; + }; + + struct NamespaceDetails + { + std::unordered_map<std::string, BucketDetails> Buckets; + }; + + std::unordered_map<std::string, NamespaceDetails> Namespaces; +}; + +bool IsKnownBadBucketName(std::string_view BucketName); + +////////////////////////////////////////////////////////////////////////// + +// This store the access time as seconds since epoch internally in a 32-bit value giving is a range of 136 years since epoch +struct AccessTime +{ + explicit AccessTime(GcClock::Tick Tick) noexcept : SecondsSinceEpoch(ToSeconds(Tick)) {} + AccessTime& operator=(GcClock::Tick Tick) noexcept + { + SecondsSinceEpoch.store(ToSeconds(Tick), std::memory_order_relaxed); + return *this; + } + operator GcClock::Tick() const noexcept + { + return std::chrono::duration_cast<GcClock::Duration>(std::chrono::seconds(SecondsSinceEpoch.load(std::memory_order_relaxed))) + .count(); + } + + AccessTime(AccessTime&& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {} + AccessTime(const AccessTime& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {} + AccessTime& operator=(AccessTime&& Rhs) noexcept + { + SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed); + return *this; + } + AccessTime& operator=(const AccessTime& Rhs) noexcept + { + SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed); + return *this; + } + +private: + static uint32_t ToSeconds(GcClock::Tick Tick) + { + return gsl::narrow<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>(GcClock::Duration(Tick)).count()); + } + std::atomic_uint32_t SecondsSinceEpoch; +}; + +} // namespace zen diff --git a/src/zenstore/include/zenstore/structuredcachestore.h b/src/zenstore/include/zenstore/structuredcachestore.h new file mode 100644 index 000000000..56b2105a9 --- /dev/null +++ b/src/zenstore/include/zenstore/structuredcachestore.h @@ -0,0 +1,271 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/compactbinary.h> +#include <zencore/iohash.h> +#include <zencore/stats.h> +#include <zenstore/cachedisklayer.h> +#include <zenstore/gc.h> +#include <zenutil/cache/cache.h> +#include <zenutil/statsreporter.h> + +#include <atomic> +#include <compare> +#include <filesystem> +#include <string_view> +#include <unordered_map> + +namespace zen { + +class StatsDaemonClient; + +/****************************************************************************** + + /$$$$$$$$ /$$$$$$ /$$ + |_____ $$ /$$__ $$ | $$ + /$$/ /$$$$$$ /$$$$$$$ | $$ \__/ /$$$$$$ /$$$$$$| $$$$$$$ /$$$$$$ + /$$/ /$$__ $| $$__ $$ | $$ |____ $$/$$_____| $$__ $$/$$__ $$ + /$$/ | $$$$$$$| $$ \ $$ | $$ /$$$$$$| $$ | $$ \ $| $$$$$$$$ + /$$/ | $$_____| $$ | $$ | $$ $$/$$__ $| $$ | $$ | $| $$_____/ + /$$$$$$$| $$$$$$| $$ | $$ | $$$$$$| $$$$$$| $$$$$$| $$ | $| $$$$$$$ + |________/\_______|__/ |__/ \______/ \_______/\_______|__/ |__/\_______/ + + Cache store for UE5. Restricts keys to "{bucket}/{hash}" pairs where the hash + is 40 (hex) chars in size. Values may be opaque blobs or structured objects + which can in turn contain references to other objects (or blobs). Buckets are + organized in namespaces to enable project isolation. + +******************************************************************************/ + +class WorkerThreadPool; +class DiskWriteBlocker; +class JobQueue; + +/* Z$ namespace + + A namespace scopes a set of buckets, and would typically be used to isolate + projects from each other. + + */ +class ZenCacheNamespace final : public GcStorage, public GcContributor +{ +public: + struct Configuration + { + ZenCacheDiskLayer::Configuration DiskLayerConfig; + }; + struct BucketInfo + { + ZenCacheDiskLayer::BucketInfo DiskLayerInfo; + }; + struct Info + { + std::filesystem::path RootDir; + Configuration Config; + std::vector<std::string> BucketNames; + ZenCacheDiskLayer::Info DiskLayerInfo; + }; + + struct NamespaceStats + { + uint64_t HitCount; + uint64_t MissCount; + uint64_t WriteCount; + metrics::RequestStatsSnapshot PutOps; + metrics::RequestStatsSnapshot GetOps; + ZenCacheDiskLayer::DiskStats DiskStats; + }; + + ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config); + ~ZenCacheNamespace(); + + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); + + bool DropBucket(std::string_view Bucket); + void EnumerateBucketContents(std::string_view Bucket, + std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const; + + bool Drop(); + void Flush(); + + // GcContributor + virtual void GatherReferences(GcContext& GcCtx) override; + + // GcStorage + virtual void ScrubStorage(ScrubContext& ScrubCtx) override; + virtual void CollectGarbage(GcContext& GcCtx) override; + virtual GcStorageSize StorageSize() const override; + + Configuration GetConfig() const { return m_Configuration; } + Info GetInfo() const; + std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const; + NamespaceStats Stats(); + + CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const; + +#if ZEN_WITH_TESTS + void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time); +#endif // ZEN_WITH_TESTS + +private: + GcManager& m_Gc; + JobQueue& m_JobQueue; + std::filesystem::path m_RootDir; + Configuration m_Configuration; + ZenCacheDiskLayer m_DiskLayer; + std::atomic<uint64_t> m_HitCount{}; + std::atomic<uint64_t> m_MissCount{}; + std::atomic<uint64_t> m_WriteCount{}; + metrics::RequestStats m_PutOps; + metrics::RequestStats m_GetOps; + uint64_t m_LastScrubTime = 0; + + ZenCacheNamespace(const ZenCacheNamespace&) = delete; + ZenCacheNamespace& operator=(const ZenCacheNamespace&) = delete; +}; + +/** Cache store interface + + This manages a set of namespaces used for derived data caching purposes. + + */ + +class ZenCacheStore final : public RefCounted, public StatsProvider +{ +public: + static constexpr std::string_view DefaultNamespace = + "!default!"; // This is intentionally not a valid namespace name and will only be used for mapping when no namespace is given + static constexpr std::string_view NamespaceDiskPrefix = "ns_"; + + struct Configuration + { + ZenCacheNamespace::Configuration NamespaceConfig; + bool AllowAutomaticCreationOfNamespaces = false; + struct LogConfig + { + bool EnableWriteLog = true; + bool EnableAccessLog = true; + } Logging; + }; + + struct Info + { + std::filesystem::path BasePath; + Configuration Config; + std::vector<std::string> NamespaceNames; + uint64_t DiskEntryCount = 0; + GcStorageSize StorageSize; + }; + + struct NamedNamespaceStats + { + std::string NamespaceName; + ZenCacheNamespace::NamespaceStats Stats; + }; + + struct CacheStoreStats + { + uint64_t HitCount = 0; + uint64_t MissCount = 0; + uint64_t WriteCount = 0; + uint64_t RejectedWriteCount = 0; + uint64_t RejectedReadCount = 0; + metrics::RequestStatsSnapshot PutOps; + metrics::RequestStatsSnapshot GetOps; + std::vector<NamedNamespaceStats> NamespaceStats; + }; + + ZenCacheStore(GcManager& Gc, + JobQueue& JobQueue, + const std::filesystem::path& BasePath, + const Configuration& Configuration, + const DiskWriteBlocker* InDiskWriteBlocker); + ~ZenCacheStore(); + + bool Get(const CacheRequestContext& Context, + std::string_view Namespace, + std::string_view Bucket, + const IoHash& HashKey, + ZenCacheValue& OutValue); + void Put(const CacheRequestContext& Context, + std::string_view Namespace, + std::string_view Bucket, + const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References); + bool DropBucket(std::string_view Namespace, std::string_view Bucket); + bool DropNamespace(std::string_view Namespace); + void Flush(); + void ScrubStorage(ScrubContext& Ctx); + + CacheValueDetails GetValueDetails(const std::string_view NamespaceFilter, + const std::string_view BucketFilter, + const std::string_view ValueFilter) const; + + GcStorageSize StorageSize() const; + CacheStoreStats Stats(bool IncludeNamespaceStats = true); + + Configuration GetConfiguration() const { return m_Configuration; } + void SetLoggingConfig(const Configuration::LogConfig& Loggingconfig); + Info GetInfo() const; + std::optional<ZenCacheNamespace::Info> GetNamespaceInfo(std::string_view Namespace); + std::optional<ZenCacheNamespace::BucketInfo> GetBucketInfo(std::string_view Namespace, std::string_view Bucket); + std::vector<std::string> GetNamespaces(); + + void EnumerateBucketContents(std::string_view Namespace, + std::string_view Bucket, + std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>&& Fn); + + // StatsProvider + virtual void ReportMetrics(StatsMetrics& Statsd) override; + +private: + const ZenCacheNamespace* FindNamespace(std::string_view Namespace) const; + ZenCacheNamespace* GetNamespace(std::string_view Namespace); + void IterateNamespaces(const std::function<void(std::string_view Namespace, ZenCacheNamespace& Store)>& Callback) const; + + typedef std::unordered_map<std::string, std::unique_ptr<ZenCacheNamespace>> NamespaceMap; + + CacheStoreStats m_LastReportedMetrics; + const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; + mutable RwLock m_NamespacesLock; + NamespaceMap m_Namespaces; + std::vector<std::unique_ptr<ZenCacheNamespace>> m_DroppedNamespaces; + + GcManager& m_Gc; + JobQueue& m_JobQueue; + std::filesystem::path m_BasePath; + Configuration m_Configuration; + std::atomic<uint64_t> m_HitCount{}; + std::atomic<uint64_t> m_MissCount{}; + std::atomic<uint64_t> m_WriteCount{}; + std::atomic<uint64_t> m_RejectedWriteCount{}; + std::atomic<uint64_t> m_RejectedReadCount{}; + metrics::RequestStats m_PutOps; + metrics::RequestStats m_GetOps; + + struct AccessLogItem + { + const char* Op; + CacheRequestContext Context; + std::string Namespace; + std::string Bucket; + IoHash HashKey; + ZenCacheValue Value; + }; + + void LogWorker(); + RwLock m_LogQueueLock; + std::vector<AccessLogItem> m_LogQueue; + std::atomic_bool m_ExitLogging; + Event m_LogEvent; + std::thread m_AsyncLoggingThread; + std::atomic_bool m_WriteLogEnabled; + std::atomic_bool m_AccessLogEnabled; +}; + +void z$_forcelink(); + +} // namespace zen |