diff options
| author | Dan Engelbrecht <[email protected]> | 2023-10-24 14:54:26 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-10-24 14:54:26 +0200 |
| commit | 1a144212278aa7158d6b32b63e398db95a7ae868 (patch) | |
| tree | 58735827b0b706368a82bcaaa8aaa68f211e1d10 /src/zenserver/cache/cachedisklayer.h | |
| parent | chunking moved to zenstore (#490) (diff) | |
| download | zen-1a144212278aa7158d6b32b63e398db95a7ae868.tar.xz zen-1a144212278aa7158d6b32b63e398db95a7ae868.zip | |
merge disk and memory layers (#493)
- Feature: Added `--cache-memlayer-sizethreshold` option to zenserver to control at which size cache entries get cached in memory
- Changed: Merged cache memory layer with cache disk layer to reduce memory and cpu overhead
Diffstat (limited to 'src/zenserver/cache/cachedisklayer.h')
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.h | 136 |
1 files changed, 99 insertions, 37 deletions
diff --git a/src/zenserver/cache/cachedisklayer.h b/src/zenserver/cache/cachedisklayer.h index 7e05430a2..ea390f807 100644 --- a/src/zenserver/cache/cachedisklayer.h +++ b/src/zenserver/cache/cachedisklayer.h @@ -17,6 +17,7 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { class IoBuffer; +class JobQueue; #pragma pack(push) #pragma pack(1) @@ -90,32 +91,48 @@ static_assert(sizeof(DiskIndexEntry) == 32); class ZenCacheDiskLayer { public: + struct BucketConfiguration + { + uint64_t MaxBlockSize = 1ull << 30; + uint64_t PayloadAlignment = 1ull << 4; + uint64_t MemCacheSizeThreshold = 1 * 1024; + uint64_t LargeObjectThreshold = 128 * 1024; + bool EnableReferenceCaching = false; + }; + struct Configuration { - std::filesystem::path RootDir; - bool EnableReferenceCaching; + BucketConfiguration BucketConfig; + uint64_t MemCacheTargetFootprintBytes = 512 * 1024 * 1024; + uint64_t MemCacheTrimIntervalSeconds = 60; + uint64_t MemCacheMaxAgeSeconds = gsl::narrow<uint64_t>(std::chrono::seconds(std::chrono::days(1)).count()); }; struct BucketInfo { - uint64_t EntryCount = 0; - uint64_t TotalSize = 0; + uint64_t EntryCount = 0; + GcStorageSize StorageSize; }; struct Info { + std::filesystem::path RootDir; Configuration Config; std::vector<std::string> BucketNames; uint64_t EntryCount = 0; - uint64_t TotalSize = 0; + GcStorageSize StorageSize; }; struct BucketStats { - uint64_t TotalSize; - uint64_t HitCount; - uint64_t MissCount; - uint64_t WriteCount; + uint64_t DiskSize; + uint64_t MemorySize; + uint64_t DiskHitCount; + uint64_t DiskMissCount; + uint64_t DiskWriteCount; + uint64_t MemoryHitCount; + uint64_t MemoryMissCount; + uint64_t MemoryWriteCount; metrics::RequestStatsSnapshot PutOps; metrics::RequestStatsSnapshot GetOps; }; @@ -129,9 +146,11 @@ public: struct DiskStats { std::vector<NamedBucketStats> BucketStats; + uint64_t DiskSize; + uint64_t MemorySize; }; - explicit ZenCacheDiskLayer(const std::filesystem::path& RootDir, bool EnableReferenceCaching); + explicit ZenCacheDiskLayer(JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config); ~ZenCacheDiskLayer(); bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); @@ -142,11 +161,10 @@ public: void ScrubStorage(ScrubContext& Ctx); void GatherReferences(GcContext& GcCtx); void CollectGarbage(GcContext& GcCtx); - void UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes); - void DiscoverBuckets(); - uint64_t TotalSize() const; - DiskStats Stats() const; + void DiscoverBuckets(); + GcStorageSize StorageSize() const; + DiskStats Stats() const; Info GetInfo() const; std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const; @@ -161,38 +179,40 @@ private: */ struct CacheBucket { - CacheBucket(std::string BucketName, bool EnableReferenceCaching); + CacheBucket(std::string BucketName, const BucketConfiguration& Config); ~CacheBucket(); bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); - bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); - bool Drop(); + bool Get(const IoHash& HashKey, ZenCacheValue& OutValue, std::atomic_uint64_t& CacheMemoryUsage); + void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, std::atomic_uint64_t& CacheMemoryUsage); + void MemCacheTrim(GcClock::TimePoint ExpireTime, std::atomic_uint64_t& CacheMemoryUsage); + bool Drop(std::atomic_uint64_t& CacheMemoryUsage); void Flush(); - void ScrubStorage(ScrubContext& Ctx); + void ScrubStorage(ScrubContext& Ctx, std::atomic_uint64_t& CacheMemoryUsage); void GatherReferences(GcContext& GcCtx); - void CollectGarbage(GcContext& GcCtx); - void UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes); + void CollectGarbage(GcContext& GcCtx, std::atomic_uint64_t& CacheMemoryUsage); - inline uint64_t TotalSize() const { return m_TotalStandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(); } - uint64_t EntryCount() const; - BucketStats Stats(); + inline GcStorageSize StorageSize() const + { + return {.DiskSize = m_StandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(), + .MemorySize = m_MemCachedSize.load(std::memory_order::relaxed)}; + } + uint64_t EntryCount() const; + BucketStats Stats(); CacheValueDetails::BucketDetails GetValueDetails(const std::string_view ValueFilter) const; void EnumerateBucketContents(std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const; - private: - const uint64_t MaxBlockSize = 1ull << 30; - uint64_t m_PayloadAlignment = 1ull << 4; + void GetUsageByAccess(GcClock::TimePoint TickStart, GcClock::Duration SectionLength, std::vector<uint64_t>& InOutUsageSlots); + private: std::string m_BucketName; std::filesystem::path m_BucketDir; std::filesystem::path m_BlocksBasePath; + BucketConfiguration m_Configuration; BlockStore m_BlockStore; Oid m_BucketId; - uint64_t m_LargeObjectThreshold = 128 * 1024; std::atomic_bool m_IsFlushing{}; - const bool m_EnableReferenceCaching = false; // These files are used to manage storage of small objects for this bucket @@ -216,9 +236,12 @@ private: using IndexMap = tsl::robin_map<IoHash, size_t, IoHash::Hasher>; - std::atomic<uint64_t> m_HitCount; - std::atomic<uint64_t> m_MissCount; - std::atomic<uint64_t> m_WriteCount; + std::atomic<uint64_t> m_DiskHitCount; + std::atomic<uint64_t> m_DiskMissCount; + std::atomic<uint64_t> m_DiskWriteCount; + std::atomic<uint64_t> m_MemoryHitCount; + std::atomic<uint64_t> m_MemoryMissCount; + std::atomic<uint64_t> m_MemoryWriteCount; metrics::RequestStats m_PutOps; metrics::RequestStats m_GetOps; @@ -226,16 +249,24 @@ private: IndexMap m_Index; std::vector<AccessTime> m_AccessTimes; std::vector<BucketPayload> m_Payloads; + std::vector<IoBuffer> m_CachedPayloads; std::vector<size_t> m_FirstReferenceIndex; std::vector<IoHash> m_ReferenceHashes; std::vector<size_t> m_NextReferenceHashesIndexes; size_t m_ReferenceCount = 0; - std::atomic_uint64_t m_TotalStandaloneSize{}; + std::atomic_uint64_t m_StandaloneSize{}; + std::atomic_uint64_t m_MemCachedSize{}; void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const; - void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); + void PutStandaloneCacheValue(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + std::atomic_uint64_t& CacheMemoryUsage); IoBuffer GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const; - void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); + void PutInlineCacheValue(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + std::atomic_uint64_t& CacheMemoryUsage); IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const; void MakeIndexSnapshot(); uint64_t ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion); @@ -257,6 +288,14 @@ private: } size_t AllocateReferenceEntry(RwLock::ExclusiveLockScope&, const IoHash& Key); bool LockedGetReferences(std::size_t FirstReferenceIndex, std::vector<IoHash>& OutReferences) const; + + void CompactState(std::vector<BucketPayload>& TmpPayloads, + std::vector<AccessTime>& TmpAccessTimes, + std::vector<IoBuffer>& TmpCachedPayloads, + std::vector<size_t>& TmpFirstReferenceIndex, + IndexMap& TmpIndex, + RwLock::ExclusiveLockScope& IndexLock); + // These locks are here to avoid contention on file creation, therefore it's sufficient // that we take the same lock for the same hash // @@ -267,11 +306,34 @@ private: inline RwLock& LockForHash(const IoHash& Hash) const { return m_ShardedLocks[Hash.Hash[19]]; } }; + inline void TryMemCacheTrim() + { + if (m_Configuration.MemCacheTargetFootprintBytes == 0) + { + return; + } + if (m_Configuration.MemCacheMaxAgeSeconds == 0 || m_Configuration.MemCacheTrimIntervalSeconds == 0) + { + return; + } + if (m_TotalMemCachedSize <= m_Configuration.MemCacheTargetFootprintBytes) + { + return; + } + MemCacheTrim(); + } + void MemCacheTrim(); + void MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime); + + JobQueue& m_JobQueue; std::filesystem::path m_RootDir; + Configuration m_Configuration; + std::atomic_uint64_t m_TotalMemCachedSize{}; + std::atomic_bool m_IsMemCacheTrimming = false; + std::atomic<GcClock::Tick> m_LastTickMemCacheTrim; mutable RwLock m_Lock; - std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets; // TODO: make this case insensitive + std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets; std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets; - const bool m_EnableReferenceCaching; ZenCacheDiskLayer(const ZenCacheDiskLayer&) = delete; ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete; |