aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache/cachememorylayer.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-10-02 11:29:00 +0200
committerGitHub <[email protected]>2023-10-02 11:29:00 +0200
commit14deb110acac35e96afa72316f6cd871dfe04168 (patch)
tree9206028a2423c01f562d6e3b45b2e2c34947b611 /src/zenserver/cache/cachememorylayer.cpp
parentlightweight gc (#431) (diff)
downloadzen-14deb110acac35e96afa72316f6cd871dfe04168.tar.xz
zen-14deb110acac35e96afa72316f6cd871dfe04168.zip
Limit size of memory cache layer (#423)
- Feature: Limit the size ZenCacheMemoryLayer may use - `--cache-memlayer-targetfootprint` option to set which size (in bytes) it should be limited to, zero to have it unbounded - `--cache-memlayer-maxage` option to set how long (in seconds) cache items should be kept in the memory cache Do more "standard" GC rather than clearing everything. Tries to purge memory on Get/Put on the fly if exceeding limit - not sure if we should have a polling thread instead of adding overhead to Get/Put (however light it may be).
Diffstat (limited to 'src/zenserver/cache/cachememorylayer.cpp')
-rw-r--r--src/zenserver/cache/cachememorylayer.cpp221
1 files changed, 206 insertions, 15 deletions
diff --git a/src/zenserver/cache/cachememorylayer.cpp b/src/zenserver/cache/cachememorylayer.cpp
index be21e60f1..d48ee5aa8 100644
--- a/src/zenserver/cache/cachememorylayer.cpp
+++ b/src/zenserver/cache/cachememorylayer.cpp
@@ -4,13 +4,19 @@
#include <zencore/compactbinaryvalidation.h>
#include <zencore/compress.h>
+#include <zencore/fmtutils.h>
+#include <zencore/jobqueue.h>
+#include <zencore/scopeguard.h>
#include <zencore/trace.h>
//////////////////////////////////////////////////////////////////////////
namespace zen {
-ZenCacheMemoryLayer::ZenCacheMemoryLayer()
+ZenCacheMemoryLayer::ZenCacheMemoryLayer(JobQueue& JobQueue, const Configuration& Config)
+: m_JobQueue(JobQueue)
+, m_Configuration(Config)
+, m_LastTickTrim(GcClock::Clock::time_point::min().time_since_epoch().count())
{
}
@@ -21,6 +27,10 @@ ZenCacheMemoryLayer::~ZenCacheMemoryLayer()
bool
ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
{
+ if (m_Configuration.TargetFootprintBytes == 0)
+ {
+ return false;
+ }
ZEN_TRACE_CPU("Z$::Mem::Get");
RwLock::SharedLockScope _(m_Lock);
@@ -40,12 +50,18 @@ ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCa
// inserts, the bucket delete path could end up deleting the
// underlying data structure
+ Trim();
+
return Bucket->Get(HashKey, OutValue);
}
void
ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value)
{
+ if (m_Configuration.TargetFootprintBytes == 0)
+ {
+ return;
+ }
ZEN_TRACE_CPU("Z$::Mem::Put");
const auto BucketName = std::string(InBucket);
@@ -77,9 +93,18 @@ ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const
}
}
- // Note that since the underlying IoBuffer is retained, the content type is also
+ Trim();
- Bucket->Put(HashKey, Value);
+ // Note that since the underlying IoBuffer is retained, the content type is also
+ int64_t SizeDiff = Bucket->Put(HashKey, Value);
+ if (SizeDiff > 0)
+ {
+ m_TotalSize.fetch_add(gsl::narrow<uint64_t>(SizeDiff));
+ }
+ else if (SizeDiff < 0)
+ {
+ m_TotalSize.fetch_sub(gsl::narrow<uint64_t>(-SizeDiff));
+ }
}
bool
@@ -92,6 +117,7 @@ ZenCacheMemoryLayer::DropBucket(std::string_view InBucket)
if (It != m_Buckets.end())
{
CacheBucket& Bucket = *It->second;
+ m_TotalSize.fetch_sub(Bucket.TotalSize());
m_DroppedBuckets.push_back(std::move(It->second));
m_Buckets.erase(It);
Bucket.Drop();
@@ -110,6 +136,7 @@ ZenCacheMemoryLayer::Drop()
{
const auto& It = m_Buckets.begin();
CacheBucket& Bucket = *It->second;
+ m_TotalSize.fetch_sub(Bucket.TotalSize());
m_DroppedBuckets.push_back(std::move(It->second));
m_Buckets.erase(It->first);
Bucket.Drop();
@@ -141,11 +168,101 @@ ZenCacheMemoryLayer::GatherAccessTimes(zen::access_tracking::AccessTimes& Access
}
}
+uint64_t
+ZenCacheMemoryLayer::CollectGarbage(GcClock::TimePoint ExpireTime)
+{
+ uint64_t TrimmedSize = 0;
+ RwLock::SharedLockScope __(m_Lock);
+ for (auto& Kv : m_Buckets)
+ {
+ uint64_t BucketTrimmedSize = Kv.second->Trim(ExpireTime);
+ if (BucketTrimmedSize > 0)
+ {
+ m_TotalSize.fetch_sub(BucketTrimmedSize);
+ TrimmedSize += BucketTrimmedSize;
+ }
+ }
+ const GcClock::TimePoint Now = GcClock::Now();
+ const GcClock::Tick NowTick = Now.time_since_epoch().count();
+ const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.TrimIntervalSeconds);
+ GcClock::Tick LastTrimTick = m_LastTickTrim;
+ const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
+ m_LastTickTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick);
+ return TrimmedSize;
+}
+
void
-ZenCacheMemoryLayer::Reset()
+ZenCacheMemoryLayer::Trim()
{
- RwLock::ExclusiveLockScope _(m_Lock);
- m_Buckets.clear();
+ if (m_TotalSize <= m_Configuration.TargetFootprintBytes)
+ {
+ return;
+ }
+ if (m_Configuration.MaxAgeSeconds == 0 || m_Configuration.TrimIntervalSeconds == 0)
+ {
+ return;
+ }
+
+ const GcClock::TimePoint Now = GcClock::Now();
+
+ const GcClock::Tick NowTick = Now.time_since_epoch().count();
+ const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.TrimIntervalSeconds);
+ GcClock::Tick LastTrimTick = m_LastTickTrim;
+ const GcClock::Tick NextAllowedTrimTick = LastTrimTick + GcClock::Duration(TrimInterval).count();
+ if (NowTick < NextAllowedTrimTick)
+ {
+ return;
+ }
+
+ bool Expected = false;
+ if (!m_IsTrimming.compare_exchange_strong(Expected, true))
+ {
+ return;
+ }
+
+ // Bump time forward so we don't keep trying to do m_IsTrimming.compare_exchange_strong
+ const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
+ m_LastTickTrim.store(NextTrimTick);
+
+ m_JobQueue.QueueJob("ZenCacheMemoryLayer::Trim", [this, Now, TrimInterval](JobContext&) {
+ ZEN_TRACE_CPU("Z$::Mem::Trim");
+
+ Stopwatch Timer;
+ uint64_t TrimmedSize = 0;
+ const auto Guard = MakeGuard([&] {
+ if (TrimmedSize > 0)
+ {
+ ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}",
+ NiceBytes(TrimmedSize),
+ NiceBytes(m_TotalSize),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ m_IsTrimming.store(false);
+ });
+
+ const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MaxAgeSeconds);
+
+ std::vector<uint64_t> UsageSlots;
+ UsageSlots.reserve(std::chrono::seconds(MaxAge / TrimInterval).count());
+ {
+ RwLock::SharedLockScope __(m_Lock);
+ for (auto& Kv : m_Buckets)
+ {
+ Kv.second->GetUsageByAccess(Now, GcClock::Duration(TrimInterval), UsageSlots);
+ }
+ }
+ uint64_t TotalSize = 0;
+ for (size_t Index = 0; Index < UsageSlots.size(); ++Index)
+ {
+ TotalSize += UsageSlots[Index];
+ if (TotalSize >= m_Configuration.TargetFootprintBytes)
+ {
+ GcClock::TimePoint ExpireTime = Now - (TrimInterval * Index);
+ TrimmedSize = CollectGarbage(ExpireTime);
+ break;
+ }
+ }
+ });
}
uint64_t
@@ -267,34 +384,36 @@ ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutV
return false;
}
-void
+int64_t
ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
{
ZEN_TRACE_CPU("Z$::Mem::Bucket::Put");
metrics::OperationTiming::Scope $(m_PutOps);
- size_t PayloadSize = Value.Value.GetSize();
+ size_t PayloadSize = Value.Value.GetSize();
+ uint64_t OldPayloadSize = 0;
+
{
GcClock::Tick AccessTime = GcClock::TickCount();
RwLock::ExclusiveLockScope _(m_BucketLock);
- if (m_CacheMap.size() == std::numeric_limits<uint32_t>::max())
- {
- // No more space in our memory cache!
- return;
- }
if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end())
{
uint32_t EntryIndex = It.value();
ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size());
- m_TotalSize.fetch_sub(PayloadSize, std::memory_order::relaxed);
BucketPayload& Payload = m_Payloads[EntryIndex];
+ OldPayloadSize = Payload.Payload.GetSize();
Payload.Payload = Value.Value;
Payload.RawHash = Value.RawHash;
Payload.RawSize = gsl::narrow<uint32_t>(Value.RawSize);
m_AccessTimes[EntryIndex] = AccessTime;
}
+ else if (m_CacheMap.size() == std::numeric_limits<uint32_t>::max())
+ {
+ // No more space in our memory cache!
+ return 0;
+ }
else
{
uint32_t EntryIndex = gsl::narrow<uint32_t>(m_Payloads.size());
@@ -307,7 +426,17 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue
ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size());
}
- m_TotalSize.fetch_add(PayloadSize, std::memory_order::relaxed);
+ if (PayloadSize > OldPayloadSize)
+ {
+ m_TotalSize.fetch_add(PayloadSize - OldPayloadSize);
+ return gsl::narrow<int64_t>(PayloadSize - OldPayloadSize);
+ }
+ else if (PayloadSize < OldPayloadSize)
+ {
+ m_TotalSize.fetch_sub(OldPayloadSize - PayloadSize);
+ return -gsl::narrow<int64_t>(OldPayloadSize - PayloadSize);
+ }
+ return 0;
}
void
@@ -321,10 +450,72 @@ ZenCacheMemoryLayer::CacheBucket::Drop()
}
uint64_t
+ZenCacheMemoryLayer::CacheBucket::Trim(GcClock::TimePoint ExpireTime)
+{
+ std::vector<AccessTime> AccessTimes;
+ std::vector<BucketPayload> Payloads;
+ tsl::robin_map<IoHash, uint32_t> CacheMap;
+
+ size_t TrimmedSize = 0;
+ GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
+
+ RwLock::ExclusiveLockScope _(m_BucketLock);
+ {
+ AccessTimes.reserve(m_CacheMap.size());
+ Payloads.reserve(m_CacheMap.size());
+ CacheMap.reserve(m_CacheMap.size());
+
+ for (const auto& Kv : m_CacheMap)
+ {
+ if (m_AccessTimes[Kv.second] < ExpireTicks)
+ {
+ size_t PayloadSize = m_Payloads[Kv.second].Payload.GetSize();
+ m_TotalSize.fetch_sub(PayloadSize);
+ TrimmedSize += PayloadSize;
+ continue;
+ }
+ size_t Index = gsl::narrow<uint32_t>(Payloads.size());
+ Payloads.emplace_back(m_Payloads[Kv.second]);
+ AccessTimes.push_back(m_AccessTimes[Kv.second]);
+ CacheMap.insert_or_assign(Kv.first, Index);
+ }
+
+ m_AccessTimes.swap(AccessTimes);
+ m_Payloads.swap(Payloads);
+ m_CacheMap.swap(CacheMap);
+ }
+ return TrimmedSize;
+}
+
+uint64_t
ZenCacheMemoryLayer::CacheBucket::EntryCount() const
{
RwLock::SharedLockScope _(m_BucketLock);
return static_cast<uint64_t>(m_CacheMap.size());
}
+void
+ZenCacheMemoryLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart,
+ GcClock::Duration SectionLength,
+ std::vector<uint64_t>& InOutUsageSlots)
+{
+ RwLock::SharedLockScope _(m_BucketLock);
+ for (const auto& It : m_CacheMap)
+ {
+ uint32_t Index = It.second;
+ GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index]));
+ GcClock::Duration Age = TickStart.time_since_epoch() - ItemAccessTime.time_since_epoch();
+ uint64_t Slot = gsl::narrow<uint64_t>(Age.count() > 0 ? Age.count() / SectionLength.count() : 0);
+ if (Slot >= InOutUsageSlots.capacity())
+ {
+ Slot = InOutUsageSlots.capacity() - 1;
+ }
+ if (Slot > InOutUsageSlots.size())
+ {
+ InOutUsageSlots.resize(uint64_t(Slot + 1), 0);
+ }
+ InOutUsageSlots[Slot] += m_Payloads[Index].Payload.GetSize();
+ }
+}
+
} // namespace zen