diff options
| author | Dan Engelbrecht <[email protected]> | 2023-10-02 11:29:00 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-10-02 11:29:00 +0200 |
| commit | 14deb110acac35e96afa72316f6cd871dfe04168 (patch) | |
| tree | 9206028a2423c01f562d6e3b45b2e2c34947b611 /src/zenserver/cache/cachememorylayer.cpp | |
| parent | lightweight gc (#431) (diff) | |
| download | zen-14deb110acac35e96afa72316f6cd871dfe04168.tar.xz zen-14deb110acac35e96afa72316f6cd871dfe04168.zip | |
Limit size of memory cache layer (#423)
- Feature: Limit the size ZenCacheMemoryLayer may use
- `--cache-memlayer-targetfootprint` option to set which size (in bytes) it should be limited to, zero to have it unbounded
- `--cache-memlayer-maxage` option to set how long (in seconds) cache items should be kept in the memory cache
Do more "standard" GC rather than clearing everything.
Tries to purge memory on Get/Put on the fly if exceeding limit - not sure if we should have a polling thread instead of adding overhead to Get/Put (however light it may be).
Diffstat (limited to 'src/zenserver/cache/cachememorylayer.cpp')
| -rw-r--r-- | src/zenserver/cache/cachememorylayer.cpp | 221 |
1 files changed, 206 insertions, 15 deletions
diff --git a/src/zenserver/cache/cachememorylayer.cpp b/src/zenserver/cache/cachememorylayer.cpp index be21e60f1..d48ee5aa8 100644 --- a/src/zenserver/cache/cachememorylayer.cpp +++ b/src/zenserver/cache/cachememorylayer.cpp @@ -4,13 +4,19 @@ #include <zencore/compactbinaryvalidation.h> #include <zencore/compress.h> +#include <zencore/fmtutils.h> +#include <zencore/jobqueue.h> +#include <zencore/scopeguard.h> #include <zencore/trace.h> ////////////////////////////////////////////////////////////////////////// namespace zen { -ZenCacheMemoryLayer::ZenCacheMemoryLayer() +ZenCacheMemoryLayer::ZenCacheMemoryLayer(JobQueue& JobQueue, const Configuration& Config) +: m_JobQueue(JobQueue) +, m_Configuration(Config) +, m_LastTickTrim(GcClock::Clock::time_point::min().time_since_epoch().count()) { } @@ -21,6 +27,10 @@ ZenCacheMemoryLayer::~ZenCacheMemoryLayer() bool ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { + if (m_Configuration.TargetFootprintBytes == 0) + { + return false; + } ZEN_TRACE_CPU("Z$::Mem::Get"); RwLock::SharedLockScope _(m_Lock); @@ -40,12 +50,18 @@ ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCa // inserts, the bucket delete path could end up deleting the // underlying data structure + Trim(); + return Bucket->Get(HashKey, OutValue); } void ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { + if (m_Configuration.TargetFootprintBytes == 0) + { + return; + } ZEN_TRACE_CPU("Z$::Mem::Put"); const auto BucketName = std::string(InBucket); @@ -77,9 +93,18 @@ ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const } } - // Note that since the underlying IoBuffer is retained, the content type is also + Trim(); - Bucket->Put(HashKey, Value); + // Note that since the underlying IoBuffer is retained, the content type is also + int64_t SizeDiff = Bucket->Put(HashKey, Value); + if (SizeDiff > 0) + { + m_TotalSize.fetch_add(gsl::narrow<uint64_t>(SizeDiff)); + } + else if (SizeDiff < 0) + { + m_TotalSize.fetch_sub(gsl::narrow<uint64_t>(-SizeDiff)); + } } bool @@ -92,6 +117,7 @@ ZenCacheMemoryLayer::DropBucket(std::string_view InBucket) if (It != m_Buckets.end()) { CacheBucket& Bucket = *It->second; + m_TotalSize.fetch_sub(Bucket.TotalSize()); m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It); Bucket.Drop(); @@ -110,6 +136,7 @@ ZenCacheMemoryLayer::Drop() { const auto& It = m_Buckets.begin(); CacheBucket& Bucket = *It->second; + m_TotalSize.fetch_sub(Bucket.TotalSize()); m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It->first); Bucket.Drop(); @@ -141,11 +168,101 @@ ZenCacheMemoryLayer::GatherAccessTimes(zen::access_tracking::AccessTimes& Access } } +uint64_t +ZenCacheMemoryLayer::CollectGarbage(GcClock::TimePoint ExpireTime) +{ + uint64_t TrimmedSize = 0; + RwLock::SharedLockScope __(m_Lock); + for (auto& Kv : m_Buckets) + { + uint64_t BucketTrimmedSize = Kv.second->Trim(ExpireTime); + if (BucketTrimmedSize > 0) + { + m_TotalSize.fetch_sub(BucketTrimmedSize); + TrimmedSize += BucketTrimmedSize; + } + } + const GcClock::TimePoint Now = GcClock::Now(); + const GcClock::Tick NowTick = Now.time_since_epoch().count(); + const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.TrimIntervalSeconds); + GcClock::Tick LastTrimTick = m_LastTickTrim; + const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); + m_LastTickTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick); + return TrimmedSize; +} + void -ZenCacheMemoryLayer::Reset() +ZenCacheMemoryLayer::Trim() { - RwLock::ExclusiveLockScope _(m_Lock); - m_Buckets.clear(); + if (m_TotalSize <= m_Configuration.TargetFootprintBytes) + { + return; + } + if (m_Configuration.MaxAgeSeconds == 0 || m_Configuration.TrimIntervalSeconds == 0) + { + return; + } + + const GcClock::TimePoint Now = GcClock::Now(); + + const GcClock::Tick NowTick = Now.time_since_epoch().count(); + const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.TrimIntervalSeconds); + GcClock::Tick LastTrimTick = m_LastTickTrim; + const GcClock::Tick NextAllowedTrimTick = LastTrimTick + GcClock::Duration(TrimInterval).count(); + if (NowTick < NextAllowedTrimTick) + { + return; + } + + bool Expected = false; + if (!m_IsTrimming.compare_exchange_strong(Expected, true)) + { + return; + } + + // Bump time forward so we don't keep trying to do m_IsTrimming.compare_exchange_strong + const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); + m_LastTickTrim.store(NextTrimTick); + + m_JobQueue.QueueJob("ZenCacheMemoryLayer::Trim", [this, Now, TrimInterval](JobContext&) { + ZEN_TRACE_CPU("Z$::Mem::Trim"); + + Stopwatch Timer; + uint64_t TrimmedSize = 0; + const auto Guard = MakeGuard([&] { + if (TrimmedSize > 0) + { + ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}", + NiceBytes(TrimmedSize), + NiceBytes(m_TotalSize), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + m_IsTrimming.store(false); + }); + + const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MaxAgeSeconds); + + std::vector<uint64_t> UsageSlots; + UsageSlots.reserve(std::chrono::seconds(MaxAge / TrimInterval).count()); + { + RwLock::SharedLockScope __(m_Lock); + for (auto& Kv : m_Buckets) + { + Kv.second->GetUsageByAccess(Now, GcClock::Duration(TrimInterval), UsageSlots); + } + } + uint64_t TotalSize = 0; + for (size_t Index = 0; Index < UsageSlots.size(); ++Index) + { + TotalSize += UsageSlots[Index]; + if (TotalSize >= m_Configuration.TargetFootprintBytes) + { + GcClock::TimePoint ExpireTime = Now - (TrimInterval * Index); + TrimmedSize = CollectGarbage(ExpireTime); + break; + } + } + }); } uint64_t @@ -267,34 +384,36 @@ ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutV return false; } -void +int64_t ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { ZEN_TRACE_CPU("Z$::Mem::Bucket::Put"); metrics::OperationTiming::Scope $(m_PutOps); - size_t PayloadSize = Value.Value.GetSize(); + size_t PayloadSize = Value.Value.GetSize(); + uint64_t OldPayloadSize = 0; + { GcClock::Tick AccessTime = GcClock::TickCount(); RwLock::ExclusiveLockScope _(m_BucketLock); - if (m_CacheMap.size() == std::numeric_limits<uint32_t>::max()) - { - // No more space in our memory cache! - return; - } if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end()) { uint32_t EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - m_TotalSize.fetch_sub(PayloadSize, std::memory_order::relaxed); BucketPayload& Payload = m_Payloads[EntryIndex]; + OldPayloadSize = Payload.Payload.GetSize(); Payload.Payload = Value.Value; Payload.RawHash = Value.RawHash; Payload.RawSize = gsl::narrow<uint32_t>(Value.RawSize); m_AccessTimes[EntryIndex] = AccessTime; } + else if (m_CacheMap.size() == std::numeric_limits<uint32_t>::max()) + { + // No more space in our memory cache! + return 0; + } else { uint32_t EntryIndex = gsl::narrow<uint32_t>(m_Payloads.size()); @@ -307,7 +426,17 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size()); } - m_TotalSize.fetch_add(PayloadSize, std::memory_order::relaxed); + if (PayloadSize > OldPayloadSize) + { + m_TotalSize.fetch_add(PayloadSize - OldPayloadSize); + return gsl::narrow<int64_t>(PayloadSize - OldPayloadSize); + } + else if (PayloadSize < OldPayloadSize) + { + m_TotalSize.fetch_sub(OldPayloadSize - PayloadSize); + return -gsl::narrow<int64_t>(OldPayloadSize - PayloadSize); + } + return 0; } void @@ -321,10 +450,72 @@ ZenCacheMemoryLayer::CacheBucket::Drop() } uint64_t +ZenCacheMemoryLayer::CacheBucket::Trim(GcClock::TimePoint ExpireTime) +{ + std::vector<AccessTime> AccessTimes; + std::vector<BucketPayload> Payloads; + tsl::robin_map<IoHash, uint32_t> CacheMap; + + size_t TrimmedSize = 0; + GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); + + RwLock::ExclusiveLockScope _(m_BucketLock); + { + AccessTimes.reserve(m_CacheMap.size()); + Payloads.reserve(m_CacheMap.size()); + CacheMap.reserve(m_CacheMap.size()); + + for (const auto& Kv : m_CacheMap) + { + if (m_AccessTimes[Kv.second] < ExpireTicks) + { + size_t PayloadSize = m_Payloads[Kv.second].Payload.GetSize(); + m_TotalSize.fetch_sub(PayloadSize); + TrimmedSize += PayloadSize; + continue; + } + size_t Index = gsl::narrow<uint32_t>(Payloads.size()); + Payloads.emplace_back(m_Payloads[Kv.second]); + AccessTimes.push_back(m_AccessTimes[Kv.second]); + CacheMap.insert_or_assign(Kv.first, Index); + } + + m_AccessTimes.swap(AccessTimes); + m_Payloads.swap(Payloads); + m_CacheMap.swap(CacheMap); + } + return TrimmedSize; +} + +uint64_t ZenCacheMemoryLayer::CacheBucket::EntryCount() const { RwLock::SharedLockScope _(m_BucketLock); return static_cast<uint64_t>(m_CacheMap.size()); } +void +ZenCacheMemoryLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart, + GcClock::Duration SectionLength, + std::vector<uint64_t>& InOutUsageSlots) +{ + RwLock::SharedLockScope _(m_BucketLock); + for (const auto& It : m_CacheMap) + { + uint32_t Index = It.second; + GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index])); + GcClock::Duration Age = TickStart.time_since_epoch() - ItemAccessTime.time_since_epoch(); + uint64_t Slot = gsl::narrow<uint64_t>(Age.count() > 0 ? Age.count() / SectionLength.count() : 0); + if (Slot >= InOutUsageSlots.capacity()) + { + Slot = InOutUsageSlots.capacity() - 1; + } + if (Slot > InOutUsageSlots.size()) + { + InOutUsageSlots.resize(uint64_t(Slot + 1), 0); + } + InOutUsageSlots[Slot] += m_Payloads[Index].Payload.GetSize(); + } +} + } // namespace zen |