aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/include
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-12-19 21:49:55 +0100
committerGitHub <[email protected]>2023-12-19 21:49:55 +0100
commit8a90a40b4517d198855c1e52740a7e7fb21ecc20 (patch)
treeffd8524bcebf8841b69725934f31d438a03e7746 /src/zenstore/include
parent0.2.38 (diff)
downloadzen-8a90a40b4517d198855c1e52740a7e7fb21ecc20.tar.xz
zen-8a90a40b4517d198855c1e52740a7e7fb21ecc20.zip
move cachedisklayer and structuredcachestore into zenstore (#624)
Diffstat (limited to 'src/zenstore/include')
-rw-r--r--src/zenstore/include/zenstore/cachedisklayer.h482
-rw-r--r--src/zenstore/include/zenstore/cacheshared.h100
-rw-r--r--src/zenstore/include/zenstore/structuredcachestore.h271
3 files changed, 853 insertions, 0 deletions
diff --git a/src/zenstore/include/zenstore/cachedisklayer.h b/src/zenstore/include/zenstore/cachedisklayer.h
new file mode 100644
index 000000000..6997a12e4
--- /dev/null
+++ b/src/zenstore/include/zenstore/cachedisklayer.h
@@ -0,0 +1,482 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "cacheshared.h"
+
+#include <zencore/stats.h>
+#include <zenstore/blockstore.h>
+#include <zenstore/caslog.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_map.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <filesystem>
+#include <unordered_map>
+
+namespace zen {
+
+class IoBuffer;
+class JobQueue;
+
+#pragma pack(push)
+#pragma pack(1)
+
+struct DiskLocation
+{
+ inline DiskLocation() = default;
+
+ inline DiskLocation(uint64_t ValueSize, uint8_t Flags) : Flags(Flags | kStandaloneFile) { Location.StandaloneSize = ValueSize; }
+
+ inline DiskLocation(const BlockStoreLocation& Location, uint32_t PayloadAlignment, uint8_t Flags) : Flags(Flags & ~kStandaloneFile)
+ {
+ this->Location.BlockLocation = BlockStoreDiskLocation(Location, PayloadAlignment);
+ }
+
+ inline bool operator!=(const DiskLocation& Rhs) const
+ {
+ if (Flags != Rhs.Flags)
+ {
+ return true;
+ }
+ if (Flags & kStandaloneFile)
+ {
+ return Location.StandaloneSize != Rhs.Location.StandaloneSize;
+ }
+ return Location.BlockLocation != Rhs.Location.BlockLocation;
+ }
+
+ inline BlockStoreLocation GetBlockLocation(uint32_t PayloadAlignment) const
+ {
+ ZEN_ASSERT(!(Flags & kStandaloneFile));
+ return Location.BlockLocation.Get(PayloadAlignment);
+ }
+
+ inline uint64_t Size() const { return (Flags & kStandaloneFile) ? Location.StandaloneSize : Location.BlockLocation.GetSize(); }
+ inline uint8_t IsFlagSet(uint64_t Flag) const { return Flags & Flag; }
+ inline uint8_t GetFlags() const { return Flags; }
+ inline ZenContentType GetContentType() const
+ {
+ ZenContentType ContentType = ZenContentType::kBinary;
+
+ if (IsFlagSet(kStructured))
+ {
+ ContentType = ZenContentType::kCbObject;
+ }
+
+ if (IsFlagSet(kCompressed))
+ {
+ ContentType = ZenContentType::kCompressedBinary;
+ }
+
+ return ContentType;
+ }
+
+ union
+ {
+ BlockStoreDiskLocation BlockLocation; // 10 bytes
+ uint64_t StandaloneSize = 0; // 8 bytes
+ } Location;
+
+ static const uint8_t kStandaloneFile = 0x80u; // Stored as a separate file
+ static const uint8_t kStructured = 0x40u; // Serialized as compact binary
+ static const uint8_t kTombStone = 0x20u; // Represents a deleted key/value
+ static const uint8_t kCompressed = 0x10u; // Stored in compressed buffer format
+
+ uint8_t Flags = 0;
+ uint8_t Reserved = 0;
+};
+
+struct DiskIndexEntry
+{
+ IoHash Key; // 20 bytes
+ DiskLocation Location; // 12 bytes
+};
+
+#pragma pack(pop)
+
+static_assert(sizeof(DiskIndexEntry) == 32);
+
+//////////////////////////////////////////////////////////////////////////
+
+class ZenCacheDiskLayer
+{
+public:
+ struct BucketConfiguration
+ {
+ uint64_t MaxBlockSize = 1ull << 30;
+ uint32_t PayloadAlignment = 1u << 4;
+ uint64_t MemCacheSizeThreshold = 1 * 1024;
+ uint64_t LargeObjectThreshold = 128 * 1024;
+ bool EnableReferenceCaching = false;
+ };
+
+ struct Configuration
+ {
+ BucketConfiguration BucketConfig;
+ uint64_t MemCacheTargetFootprintBytes = 512 * 1024 * 1024;
+ uint64_t MemCacheTrimIntervalSeconds = 60;
+ uint64_t MemCacheMaxAgeSeconds = gsl::narrow<uint64_t>(std::chrono::seconds(std::chrono::days(1)).count());
+ };
+
+ struct BucketInfo
+ {
+ uint64_t EntryCount = 0;
+ GcStorageSize StorageSize;
+ };
+
+ struct Info
+ {
+ std::filesystem::path RootDir;
+ Configuration Config;
+ std::vector<std::string> BucketNames;
+ uint64_t EntryCount = 0;
+ GcStorageSize StorageSize;
+ };
+
+ struct BucketStats
+ {
+ uint64_t DiskSize;
+ uint64_t MemorySize;
+ uint64_t DiskHitCount;
+ uint64_t DiskMissCount;
+ uint64_t DiskWriteCount;
+ uint64_t MemoryHitCount;
+ uint64_t MemoryMissCount;
+ uint64_t MemoryWriteCount;
+ metrics::RequestStatsSnapshot PutOps;
+ metrics::RequestStatsSnapshot GetOps;
+ };
+
+ struct NamedBucketStats
+ {
+ std::string BucketName;
+ BucketStats Stats;
+ };
+
+ struct DiskStats
+ {
+ std::vector<NamedBucketStats> BucketStats;
+ uint64_t DiskSize;
+ uint64_t MemorySize;
+ };
+
+ explicit ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config);
+ ~ZenCacheDiskLayer();
+
+ bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
+ bool Drop();
+ bool DropBucket(std::string_view Bucket);
+ void Flush();
+ void ScrubStorage(ScrubContext& Ctx);
+ void GatherReferences(GcContext& GcCtx);
+ void CollectGarbage(GcContext& GcCtx);
+
+ void DiscoverBuckets();
+ GcStorageSize StorageSize() const;
+ DiskStats Stats() const;
+
+ Info GetInfo() const;
+ std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const;
+ void EnumerateBucketContents(std::string_view Bucket,
+ std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const;
+
+ CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const;
+
+#if ZEN_WITH_TESTS
+ void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time);
+#endif // ZEN_WITH_TESTS
+
+ /** A cache bucket manages a single directory containing
+ metadata and data for that bucket
+ */
+ struct CacheBucket : public GcReferencer
+ {
+ CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, std::string BucketName, const BucketConfiguration& Config);
+ ~CacheBucket();
+
+ bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
+ bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
+ uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime);
+ bool Drop();
+ void Flush();
+ void ScrubStorage(ScrubContext& Ctx);
+ void GatherReferences(GcContext& GcCtx);
+ void CollectGarbage(GcContext& GcCtx);
+
+ inline GcStorageSize StorageSize() const
+ {
+ return {.DiskSize = m_StandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(),
+ .MemorySize = m_MemCachedSize.load(std::memory_order::relaxed)};
+ }
+ uint64_t EntryCount() const;
+ BucketStats Stats();
+
+ CacheValueDetails::BucketDetails GetValueDetails(RwLock::SharedLockScope& IndexLock, const std::string_view ValueFilter) const;
+ void EnumerateBucketContents(std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const;
+
+ void GetUsageByAccess(GcClock::TimePoint Now, GcClock::Duration MaxAge, std::vector<uint64_t>& InOutUsageSlots);
+#if ZEN_WITH_TESTS
+ void SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time);
+#endif // ZEN_WITH_TESTS
+
+ private:
+#pragma pack(push)
+#pragma pack(1)
+ struct MetaDataIndex
+ {
+ uint32_t Index = std::numeric_limits<uint32_t>::max();
+
+ operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); };
+ MetaDataIndex() = default;
+ explicit MetaDataIndex(size_t InIndex) : Index(uint32_t(InIndex)) {}
+ operator size_t() const { return Index; };
+ inline auto operator<=>(const MetaDataIndex& Other) const = default;
+ };
+
+ struct MemCachedIndex
+ {
+ uint32_t Index = std::numeric_limits<uint32_t>::max();
+
+ operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); };
+ MemCachedIndex() = default;
+ explicit MemCachedIndex(uint32_t InIndex) : Index(InIndex) {}
+ operator size_t() const { return Index; };
+ inline auto operator<=>(const MemCachedIndex& Other) const = default;
+ };
+
+ struct ReferenceIndex
+ {
+ uint32_t Index = std::numeric_limits<uint32_t>::max();
+
+ static const ReferenceIndex Unknown() { return ReferenceIndex{std::numeric_limits<uint32_t>::max()}; }
+ static const ReferenceIndex None() { return ReferenceIndex{std::numeric_limits<uint32_t>::max() - 1}; }
+
+ ReferenceIndex() = default;
+ explicit ReferenceIndex(size_t InIndex) : Index(uint32_t(InIndex)) {}
+ operator size_t() const { return Index; };
+ operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); };
+ inline auto operator<=>(const ReferenceIndex& Other) const = default;
+ };
+
+ struct PayloadIndex
+ {
+ uint32_t Index = std::numeric_limits<uint32_t>::max();
+
+ operator bool() const { return Index != std::numeric_limits<uint32_t>::max(); };
+ PayloadIndex() = default;
+ explicit PayloadIndex(size_t InIndex) : Index(uint32_t(InIndex)) {}
+ operator size_t() const { return Index; };
+ inline auto operator<=>(const PayloadIndex& Other) const = default;
+ };
+
+ struct BucketPayload
+ {
+ DiskLocation Location; // 12
+ MetaDataIndex MetaData; // 4
+ MemCachedIndex MemCached; // 4
+ };
+ struct BucketMetaData
+ {
+ uint64_t RawSize = 0; // 8
+ IoHash RawHash; // 20
+
+ operator bool() const { return RawSize != 0 || RawHash != IoHash::Zero; };
+ };
+ struct MemCacheData
+ {
+ IoBuffer Payload;
+ PayloadIndex OwnerIndex;
+ };
+#pragma pack(pop)
+ static_assert(sizeof(BucketPayload) == 20u);
+ static_assert(sizeof(BucketMetaData) == 28u);
+ static_assert(sizeof(AccessTime) == 4u);
+
+ using IndexMap = tsl::robin_map<IoHash, PayloadIndex, IoHash::Hasher>;
+
+ GcManager& m_Gc;
+ std::atomic_uint64_t& m_OuterCacheMemoryUsage;
+ std::string m_BucketName;
+ std::filesystem::path m_BucketDir;
+ std::filesystem::path m_BlocksBasePath;
+ BucketConfiguration m_Configuration;
+ BlockStore m_BlockStore;
+ Oid m_BucketId;
+ std::atomic_bool m_IsFlushing{true}; // Don't allow flush until we are properly initialized
+
+ // These files are used to manage storage of small objects for this bucket
+
+ TCasLogFile<DiskIndexEntry> m_SlogFile;
+ uint64_t m_LogFlushPosition = 0;
+
+ std::atomic<uint64_t> m_DiskHitCount;
+ std::atomic<uint64_t> m_DiskMissCount;
+ std::atomic<uint64_t> m_DiskWriteCount;
+ std::atomic<uint64_t> m_MemoryHitCount;
+ std::atomic<uint64_t> m_MemoryMissCount;
+ std::atomic<uint64_t> m_MemoryWriteCount;
+ metrics::RequestStats m_PutOps;
+ metrics::RequestStats m_GetOps;
+
+ mutable RwLock m_IndexLock;
+ IndexMap m_Index;
+ std::vector<AccessTime> m_AccessTimes;
+ std::vector<BucketPayload> m_Payloads;
+ std::vector<BucketMetaData> m_MetaDatas;
+ std::vector<MetaDataIndex> m_FreeMetaDatas;
+ std::vector<MemCacheData> m_MemCachedPayloads;
+ std::vector<MemCachedIndex> m_FreeMemCachedPayloads;
+ std::vector<ReferenceIndex> m_FirstReferenceIndex;
+ std::vector<IoHash> m_ReferenceHashes;
+ std::vector<ReferenceIndex> m_NextReferenceHashesIndexes;
+ std::unique_ptr<HashSet> m_UpdatedKeys;
+ size_t m_ReferenceCount = 0;
+ std::atomic_uint64_t m_StandaloneSize{};
+ std::atomic_uint64_t m_MemCachedSize{};
+
+ virtual std::string GetGcName(GcCtx& Ctx) override;
+ virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override;
+ virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override;
+
+ void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const;
+ void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
+ IoBuffer GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const;
+ void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
+ IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const;
+ CacheValueDetails::ValueDetails GetValueDetails(RwLock::SharedLockScope&, const IoHash& Key, PayloadIndex Index) const;
+
+ void CompactReferences(RwLock::ExclusiveLockScope&);
+ void SetReferences(RwLock::ExclusiveLockScope&, ReferenceIndex& FirstReferenceIndex, std::span<IoHash> References);
+ void RemoveReferences(RwLock::ExclusiveLockScope&, ReferenceIndex& FirstReferenceIndex);
+ inline bool GetReferences(RwLock::SharedLockScope&, ReferenceIndex FirstReferenceIndex, std::vector<IoHash>& OutReferences) const
+ {
+ return LockedGetReferences(FirstReferenceIndex, OutReferences);
+ }
+ inline bool GetReferences(RwLock::ExclusiveLockScope&, ReferenceIndex FirstReferenceIndex, std::vector<IoHash>& OutReferences) const
+ {
+ return LockedGetReferences(FirstReferenceIndex, OutReferences);
+ }
+ ReferenceIndex AllocateReferenceEntry(RwLock::ExclusiveLockScope&, const IoHash& Key);
+ bool LockedGetReferences(ReferenceIndex FirstReferenceIndex, std::vector<IoHash>& OutReferences) const;
+ void ClearReferenceCache();
+
+ void SetMetaData(RwLock::ExclusiveLockScope&,
+ BucketPayload& Payload,
+ const ZenCacheDiskLayer::CacheBucket::BucketMetaData& MetaData);
+ void RemoveMetaData(RwLock::ExclusiveLockScope&, BucketPayload& Payload);
+ BucketMetaData GetMetaData(RwLock::SharedLockScope&, const BucketPayload& Payload) const;
+ void SetMemCachedData(RwLock::ExclusiveLockScope&, PayloadIndex PayloadIndex, IoBuffer& MemCachedData);
+ size_t RemoveMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload);
+
+ void InitializeIndexFromDisk(RwLock::ExclusiveLockScope&, bool IsNew);
+ uint64_t ReadIndexFile(RwLock::ExclusiveLockScope&, const std::filesystem::path& IndexPath, uint32_t& OutVersion);
+ uint64_t ReadLog(RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t LogPosition);
+
+ void SaveSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; });
+ void WriteIndexSnapshot(
+ RwLock::ExclusiveLockScope&,
+ const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; })
+ {
+ WriteIndexSnapshotLocked(ClaimDiskReserveFunc);
+ }
+ void WriteIndexSnapshot(
+ RwLock::SharedLockScope&,
+ const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; })
+ {
+ WriteIndexSnapshotLocked(ClaimDiskReserveFunc);
+ }
+ void WriteIndexSnapshotLocked(const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; });
+
+ void CompactState(RwLock::ExclusiveLockScope&,
+ std::vector<BucketPayload>& Payloads,
+ std::vector<AccessTime>& AccessTimes,
+ std::vector<BucketMetaData>& MetaDatas,
+ std::vector<MemCacheData>& MemCachedPayloads,
+ std::vector<ReferenceIndex>& FirstReferenceIndex,
+ IndexMap& Index,
+ RwLock::ExclusiveLockScope& IndexLock);
+
+ void AddMemCacheUsage(uint64_t ValueSize)
+ {
+ m_MemCachedSize.fetch_add(ValueSize, std::memory_order::relaxed);
+ m_OuterCacheMemoryUsage.fetch_add(ValueSize, std::memory_order::relaxed);
+ }
+ void RemoveMemCacheUsage(uint64_t ValueSize)
+ {
+ m_MemCachedSize.fetch_sub(ValueSize, std::memory_order::relaxed);
+ m_OuterCacheMemoryUsage.fetch_sub(ValueSize, std::memory_order::relaxed);
+ }
+ static inline uint64_t EstimateMemCachePayloadMemory(uint64_t PayloadSize)
+ {
+ return sizeof(MemCacheData) + sizeof(IoBufferCore) + RoundUp(PayloadSize, 8u);
+ }
+
+ // These locks are here to avoid contention on file creation, therefore it's sufficient
+ // that we take the same lock for the same hash
+ //
+ // These locks are small and should really be spaced out so they don't share cache lines,
+ // but we don't currently access them at particularly high frequency so it should not be
+ // an issue in practice
+ mutable RwLock m_ShardedLocks[256];
+ inline RwLock& LockForHash(const IoHash& Hash) const { return m_ShardedLocks[Hash.Hash[19]]; }
+
+ friend class DiskBucketReferenceChecker;
+ friend class DiskBucketStoreCompactor;
+ friend class BucketManifestSerializer;
+ };
+
+private:
+ CacheBucket* GetOrCreateBucket(std::string_view InBucket);
+ inline void TryMemCacheTrim()
+ {
+ if (m_Configuration.MemCacheTargetFootprintBytes == 0)
+ {
+ return;
+ }
+ if (m_Configuration.MemCacheMaxAgeSeconds == 0 || m_Configuration.MemCacheTrimIntervalSeconds == 0)
+ {
+ return;
+ }
+ if (m_TotalMemCachedSize <= m_Configuration.MemCacheTargetFootprintBytes)
+ {
+ return;
+ }
+ if (m_IsMemCacheTrimming)
+ {
+ return;
+ }
+
+ const GcClock::Tick NowTick = GcClock::TickCount();
+ if (NowTick < m_NextAllowedTrimTick)
+ {
+ return;
+ }
+
+ MemCacheTrim();
+ }
+ void MemCacheTrim();
+ uint64_t MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime);
+
+ GcManager& m_Gc;
+ JobQueue& m_JobQueue;
+ std::filesystem::path m_RootDir;
+ Configuration m_Configuration;
+ std::atomic_uint64_t m_TotalMemCachedSize{};
+ std::atomic_bool m_IsMemCacheTrimming = false;
+ std::atomic<GcClock::Tick> m_NextAllowedTrimTick;
+ mutable RwLock m_Lock;
+ std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets;
+ std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets;
+
+ ZenCacheDiskLayer(const ZenCacheDiskLayer&) = delete;
+ ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete;
+
+ friend class DiskBucketStoreCompactor;
+ friend class DiskBucketReferenceChecker;
+};
+
+} // namespace zen
diff --git a/src/zenstore/include/zenstore/cacheshared.h b/src/zenstore/include/zenstore/cacheshared.h
new file mode 100644
index 000000000..e3e8a2f84
--- /dev/null
+++ b/src/zenstore/include/zenstore/cacheshared.h
@@ -0,0 +1,100 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/iobuffer.h>
+#include <zencore/iohash.h>
+#include <zenstore/gc.h>
+
+#include <gsl/gsl-lite.hpp>
+#include <unordered_map>
+
+namespace zen {
+
+namespace access_tracking {
+
+ struct KeyAccessTime
+ {
+ IoHash Key;
+ GcClock::Tick LastAccess{};
+ };
+
+ struct AccessTimes
+ {
+ std::unordered_map<std::string, std::vector<KeyAccessTime>> Buckets;
+ };
+}; // namespace access_tracking
+
+struct ZenCacheValue
+{
+ IoBuffer Value;
+ uint64_t RawSize = 0;
+ IoHash RawHash = IoHash::Zero;
+};
+
+struct CacheValueDetails
+{
+ struct ValueDetails
+ {
+ uint64_t Size;
+ uint64_t RawSize;
+ IoHash RawHash;
+ GcClock::Tick LastAccess{};
+ std::vector<IoHash> Attachments;
+ ZenContentType ContentType;
+ };
+
+ struct BucketDetails
+ {
+ std::unordered_map<IoHash, ValueDetails, IoHash::Hasher> Values;
+ };
+
+ struct NamespaceDetails
+ {
+ std::unordered_map<std::string, BucketDetails> Buckets;
+ };
+
+ std::unordered_map<std::string, NamespaceDetails> Namespaces;
+};
+
+bool IsKnownBadBucketName(std::string_view BucketName);
+
+//////////////////////////////////////////////////////////////////////////
+
+// This store the access time as seconds since epoch internally in a 32-bit value giving is a range of 136 years since epoch
+struct AccessTime
+{
+ explicit AccessTime(GcClock::Tick Tick) noexcept : SecondsSinceEpoch(ToSeconds(Tick)) {}
+ AccessTime& operator=(GcClock::Tick Tick) noexcept
+ {
+ SecondsSinceEpoch.store(ToSeconds(Tick), std::memory_order_relaxed);
+ return *this;
+ }
+ operator GcClock::Tick() const noexcept
+ {
+ return std::chrono::duration_cast<GcClock::Duration>(std::chrono::seconds(SecondsSinceEpoch.load(std::memory_order_relaxed)))
+ .count();
+ }
+
+ AccessTime(AccessTime&& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {}
+ AccessTime(const AccessTime& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {}
+ AccessTime& operator=(AccessTime&& Rhs) noexcept
+ {
+ SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed);
+ return *this;
+ }
+ AccessTime& operator=(const AccessTime& Rhs) noexcept
+ {
+ SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed);
+ return *this;
+ }
+
+private:
+ static uint32_t ToSeconds(GcClock::Tick Tick)
+ {
+ return gsl::narrow<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>(GcClock::Duration(Tick)).count());
+ }
+ std::atomic_uint32_t SecondsSinceEpoch;
+};
+
+} // namespace zen
diff --git a/src/zenstore/include/zenstore/structuredcachestore.h b/src/zenstore/include/zenstore/structuredcachestore.h
new file mode 100644
index 000000000..56b2105a9
--- /dev/null
+++ b/src/zenstore/include/zenstore/structuredcachestore.h
@@ -0,0 +1,271 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/compactbinary.h>
+#include <zencore/iohash.h>
+#include <zencore/stats.h>
+#include <zenstore/cachedisklayer.h>
+#include <zenstore/gc.h>
+#include <zenutil/cache/cache.h>
+#include <zenutil/statsreporter.h>
+
+#include <atomic>
+#include <compare>
+#include <filesystem>
+#include <string_view>
+#include <unordered_map>
+
+namespace zen {
+
+class StatsDaemonClient;
+
+/******************************************************************************
+
+ /$$$$$$$$ /$$$$$$ /$$
+ |_____ $$ /$$__ $$ | $$
+ /$$/ /$$$$$$ /$$$$$$$ | $$ \__/ /$$$$$$ /$$$$$$| $$$$$$$ /$$$$$$
+ /$$/ /$$__ $| $$__ $$ | $$ |____ $$/$$_____| $$__ $$/$$__ $$
+ /$$/ | $$$$$$$| $$ \ $$ | $$ /$$$$$$| $$ | $$ \ $| $$$$$$$$
+ /$$/ | $$_____| $$ | $$ | $$ $$/$$__ $| $$ | $$ | $| $$_____/
+ /$$$$$$$| $$$$$$| $$ | $$ | $$$$$$| $$$$$$| $$$$$$| $$ | $| $$$$$$$
+ |________/\_______|__/ |__/ \______/ \_______/\_______|__/ |__/\_______/
+
+ Cache store for UE5. Restricts keys to "{bucket}/{hash}" pairs where the hash
+ is 40 (hex) chars in size. Values may be opaque blobs or structured objects
+ which can in turn contain references to other objects (or blobs). Buckets are
+ organized in namespaces to enable project isolation.
+
+******************************************************************************/
+
+class WorkerThreadPool;
+class DiskWriteBlocker;
+class JobQueue;
+
+/* Z$ namespace
+
+ A namespace scopes a set of buckets, and would typically be used to isolate
+ projects from each other.
+
+ */
+class ZenCacheNamespace final : public GcStorage, public GcContributor
+{
+public:
+ struct Configuration
+ {
+ ZenCacheDiskLayer::Configuration DiskLayerConfig;
+ };
+ struct BucketInfo
+ {
+ ZenCacheDiskLayer::BucketInfo DiskLayerInfo;
+ };
+ struct Info
+ {
+ std::filesystem::path RootDir;
+ Configuration Config;
+ std::vector<std::string> BucketNames;
+ ZenCacheDiskLayer::Info DiskLayerInfo;
+ };
+
+ struct NamespaceStats
+ {
+ uint64_t HitCount;
+ uint64_t MissCount;
+ uint64_t WriteCount;
+ metrics::RequestStatsSnapshot PutOps;
+ metrics::RequestStatsSnapshot GetOps;
+ ZenCacheDiskLayer::DiskStats DiskStats;
+ };
+
+ ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config);
+ ~ZenCacheNamespace();
+
+ bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References);
+
+ bool DropBucket(std::string_view Bucket);
+ void EnumerateBucketContents(std::string_view Bucket,
+ std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const;
+
+ bool Drop();
+ void Flush();
+
+ // GcContributor
+ virtual void GatherReferences(GcContext& GcCtx) override;
+
+ // GcStorage
+ virtual void ScrubStorage(ScrubContext& ScrubCtx) override;
+ virtual void CollectGarbage(GcContext& GcCtx) override;
+ virtual GcStorageSize StorageSize() const override;
+
+ Configuration GetConfig() const { return m_Configuration; }
+ Info GetInfo() const;
+ std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const;
+ NamespaceStats Stats();
+
+ CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const;
+
+#if ZEN_WITH_TESTS
+ void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time);
+#endif // ZEN_WITH_TESTS
+
+private:
+ GcManager& m_Gc;
+ JobQueue& m_JobQueue;
+ std::filesystem::path m_RootDir;
+ Configuration m_Configuration;
+ ZenCacheDiskLayer m_DiskLayer;
+ std::atomic<uint64_t> m_HitCount{};
+ std::atomic<uint64_t> m_MissCount{};
+ std::atomic<uint64_t> m_WriteCount{};
+ metrics::RequestStats m_PutOps;
+ metrics::RequestStats m_GetOps;
+ uint64_t m_LastScrubTime = 0;
+
+ ZenCacheNamespace(const ZenCacheNamespace&) = delete;
+ ZenCacheNamespace& operator=(const ZenCacheNamespace&) = delete;
+};
+
+/** Cache store interface
+
+ This manages a set of namespaces used for derived data caching purposes.
+
+ */
+
+class ZenCacheStore final : public RefCounted, public StatsProvider
+{
+public:
+ static constexpr std::string_view DefaultNamespace =
+ "!default!"; // This is intentionally not a valid namespace name and will only be used for mapping when no namespace is given
+ static constexpr std::string_view NamespaceDiskPrefix = "ns_";
+
+ struct Configuration
+ {
+ ZenCacheNamespace::Configuration NamespaceConfig;
+ bool AllowAutomaticCreationOfNamespaces = false;
+ struct LogConfig
+ {
+ bool EnableWriteLog = true;
+ bool EnableAccessLog = true;
+ } Logging;
+ };
+
+ struct Info
+ {
+ std::filesystem::path BasePath;
+ Configuration Config;
+ std::vector<std::string> NamespaceNames;
+ uint64_t DiskEntryCount = 0;
+ GcStorageSize StorageSize;
+ };
+
+ struct NamedNamespaceStats
+ {
+ std::string NamespaceName;
+ ZenCacheNamespace::NamespaceStats Stats;
+ };
+
+ struct CacheStoreStats
+ {
+ uint64_t HitCount = 0;
+ uint64_t MissCount = 0;
+ uint64_t WriteCount = 0;
+ uint64_t RejectedWriteCount = 0;
+ uint64_t RejectedReadCount = 0;
+ metrics::RequestStatsSnapshot PutOps;
+ metrics::RequestStatsSnapshot GetOps;
+ std::vector<NamedNamespaceStats> NamespaceStats;
+ };
+
+ ZenCacheStore(GcManager& Gc,
+ JobQueue& JobQueue,
+ const std::filesystem::path& BasePath,
+ const Configuration& Configuration,
+ const DiskWriteBlocker* InDiskWriteBlocker);
+ ~ZenCacheStore();
+
+ bool Get(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const IoHash& HashKey,
+ ZenCacheValue& OutValue);
+ void Put(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References);
+ bool DropBucket(std::string_view Namespace, std::string_view Bucket);
+ bool DropNamespace(std::string_view Namespace);
+ void Flush();
+ void ScrubStorage(ScrubContext& Ctx);
+
+ CacheValueDetails GetValueDetails(const std::string_view NamespaceFilter,
+ const std::string_view BucketFilter,
+ const std::string_view ValueFilter) const;
+
+ GcStorageSize StorageSize() const;
+ CacheStoreStats Stats(bool IncludeNamespaceStats = true);
+
+ Configuration GetConfiguration() const { return m_Configuration; }
+ void SetLoggingConfig(const Configuration::LogConfig& Loggingconfig);
+ Info GetInfo() const;
+ std::optional<ZenCacheNamespace::Info> GetNamespaceInfo(std::string_view Namespace);
+ std::optional<ZenCacheNamespace::BucketInfo> GetBucketInfo(std::string_view Namespace, std::string_view Bucket);
+ std::vector<std::string> GetNamespaces();
+
+ void EnumerateBucketContents(std::string_view Namespace,
+ std::string_view Bucket,
+ std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>&& Fn);
+
+ // StatsProvider
+ virtual void ReportMetrics(StatsMetrics& Statsd) override;
+
+private:
+ const ZenCacheNamespace* FindNamespace(std::string_view Namespace) const;
+ ZenCacheNamespace* GetNamespace(std::string_view Namespace);
+ void IterateNamespaces(const std::function<void(std::string_view Namespace, ZenCacheNamespace& Store)>& Callback) const;
+
+ typedef std::unordered_map<std::string, std::unique_ptr<ZenCacheNamespace>> NamespaceMap;
+
+ CacheStoreStats m_LastReportedMetrics;
+ const DiskWriteBlocker* m_DiskWriteBlocker = nullptr;
+ mutable RwLock m_NamespacesLock;
+ NamespaceMap m_Namespaces;
+ std::vector<std::unique_ptr<ZenCacheNamespace>> m_DroppedNamespaces;
+
+ GcManager& m_Gc;
+ JobQueue& m_JobQueue;
+ std::filesystem::path m_BasePath;
+ Configuration m_Configuration;
+ std::atomic<uint64_t> m_HitCount{};
+ std::atomic<uint64_t> m_MissCount{};
+ std::atomic<uint64_t> m_WriteCount{};
+ std::atomic<uint64_t> m_RejectedWriteCount{};
+ std::atomic<uint64_t> m_RejectedReadCount{};
+ metrics::RequestStats m_PutOps;
+ metrics::RequestStats m_GetOps;
+
+ struct AccessLogItem
+ {
+ const char* Op;
+ CacheRequestContext Context;
+ std::string Namespace;
+ std::string Bucket;
+ IoHash HashKey;
+ ZenCacheValue Value;
+ };
+
+ void LogWorker();
+ RwLock m_LogQueueLock;
+ std::vector<AccessLogItem> m_LogQueue;
+ std::atomic_bool m_ExitLogging;
+ Event m_LogEvent;
+ std::thread m_AsyncLoggingThread;
+ std::atomic_bool m_WriteLogEnabled;
+ std::atomic_bool m_AccessLogEnabled;
+};
+
+void z$_forcelink();
+
+} // namespace zen