aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache/cachememorylayer.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-10-24 14:54:26 +0200
committerGitHub <[email protected]>2023-10-24 14:54:26 +0200
commit1a144212278aa7158d6b32b63e398db95a7ae868 (patch)
tree58735827b0b706368a82bcaaa8aaa68f211e1d10 /src/zenserver/cache/cachememorylayer.cpp
parentchunking moved to zenstore (#490) (diff)
downloadzen-1a144212278aa7158d6b32b63e398db95a7ae868.tar.xz
zen-1a144212278aa7158d6b32b63e398db95a7ae868.zip
merge disk and memory layers (#493)
- Feature: Added `--cache-memlayer-sizethreshold` option to zenserver to control at which size cache entries get cached in memory - Changed: Merged cache memory layer with cache disk layer to reduce memory and cpu overhead
Diffstat (limited to 'src/zenserver/cache/cachememorylayer.cpp')
-rw-r--r--src/zenserver/cache/cachememorylayer.cpp521
1 files changed, 0 insertions, 521 deletions
diff --git a/src/zenserver/cache/cachememorylayer.cpp b/src/zenserver/cache/cachememorylayer.cpp
deleted file mode 100644
index b62791974..000000000
--- a/src/zenserver/cache/cachememorylayer.cpp
+++ /dev/null
@@ -1,521 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "cachememorylayer.h"
-
-#include <zencore/compactbinaryvalidation.h>
-#include <zencore/compress.h>
-#include <zencore/fmtutils.h>
-#include <zencore/jobqueue.h>
-#include <zencore/scopeguard.h>
-#include <zencore/trace.h>
-
-//////////////////////////////////////////////////////////////////////////
-
-namespace zen {
-
-ZenCacheMemoryLayer::ZenCacheMemoryLayer(JobQueue& JobQueue, const Configuration& Config)
-: m_JobQueue(JobQueue)
-, m_Configuration(Config)
-, m_LastTickTrim(GcClock::Clock::time_point::min().time_since_epoch().count())
-{
-}
-
-ZenCacheMemoryLayer::~ZenCacheMemoryLayer()
-{
-}
-
-bool
-ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
-{
- if (m_Configuration.TargetFootprintBytes == 0)
- {
- return false;
- }
- ZEN_TRACE_CPU("Z$::Mem::Get");
-
- RwLock::SharedLockScope _(m_Lock);
-
- auto It = m_Buckets.find(std::string(InBucket));
-
- if (It == m_Buckets.end())
- {
- return false;
- }
-
- CacheBucket* Bucket = It->second.get();
-
- _.ReleaseNow();
-
- // There's a race here. Since the lock is released early to allow
- // inserts, the bucket delete path could end up deleting the
- // underlying data structure
-
- Trim();
-
- return Bucket->Get(HashKey, OutValue);
-}
-
-void
-ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value)
-{
- if (m_Configuration.TargetFootprintBytes == 0)
- {
- return;
- }
- ZEN_TRACE_CPU("Z$::Mem::Put");
-
- const auto BucketName = std::string(InBucket);
- CacheBucket* Bucket = nullptr;
-
- {
- RwLock::SharedLockScope _(m_Lock);
-
- if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end())
- {
- Bucket = It->second.get();
- }
- }
-
- if (Bucket == nullptr)
- {
- // New bucket
-
- RwLock::ExclusiveLockScope _(m_Lock);
-
- if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end())
- {
- Bucket = It->second.get();
- }
- else
- {
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>());
- Bucket = InsertResult.first->second.get();
- }
- }
-
- // Note that since the underlying IoBuffer is retained, the content type is also
- int64_t Diff = Bucket->Put(HashKey, Value);
-
- if (Diff > 0)
- {
- m_TotalSize.fetch_add(static_cast<uint64_t>(Diff));
- Trim();
- }
- else if (Diff < 0)
- {
- m_TotalSize.fetch_sub(static_cast<uint64_t>(-Diff));
- }
-}
-
-bool
-ZenCacheMemoryLayer::DropBucket(std::string_view InBucket)
-{
- RwLock::ExclusiveLockScope _(m_Lock);
-
- auto It = m_Buckets.find(std::string(InBucket));
-
- if (It != m_Buckets.end())
- {
- CacheBucket& Bucket = *It->second;
- m_TotalSize.fetch_sub(Bucket.TotalSize());
- m_DroppedBuckets.push_back(std::move(It->second));
- m_Buckets.erase(It);
- Bucket.Drop();
- return true;
- }
- return false;
-}
-
-void
-ZenCacheMemoryLayer::Drop()
-{
- RwLock::ExclusiveLockScope _(m_Lock);
-
- while (!m_Buckets.empty())
- {
- const auto& It = m_Buckets.begin();
- CacheBucket& Bucket = *It->second;
- m_TotalSize.fetch_sub(Bucket.TotalSize());
- m_DroppedBuckets.push_back(std::move(It->second));
- m_Buckets.erase(It->first);
- Bucket.Drop();
- }
-}
-
-void
-ZenCacheMemoryLayer::ScrubStorage(ScrubContext& Ctx)
-{
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto& Kv : m_Buckets)
- {
- Kv.second->ScrubStorage(Ctx);
- }
-}
-
-void
-ZenCacheMemoryLayer::GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes)
-{
- using namespace zen::access_tracking;
-
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto& Kv : m_Buckets)
- {
- std::vector<KeyAccessTime>& Bucket = AccessTimes.Buckets[Kv.first];
- Kv.second->GatherAccessTimes(Bucket);
- }
-}
-
-uint64_t
-ZenCacheMemoryLayer::CollectGarbage(GcClock::TimePoint ExpireTime)
-{
- uint64_t TrimmedSize = 0;
- RwLock::SharedLockScope __(m_Lock);
- for (auto& Kv : m_Buckets)
- {
- uint64_t BucketTrimmedSize = Kv.second->Trim(ExpireTime);
- if (BucketTrimmedSize > 0)
- {
- m_TotalSize.fetch_sub(BucketTrimmedSize);
- TrimmedSize += BucketTrimmedSize;
- }
- }
- const GcClock::TimePoint Now = GcClock::Now();
- const GcClock::Tick NowTick = Now.time_since_epoch().count();
- const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.TrimIntervalSeconds);
- GcClock::Tick LastTrimTick = m_LastTickTrim;
- const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
- m_LastTickTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick);
- return TrimmedSize;
-}
-
-void
-ZenCacheMemoryLayer::Trim()
-{
- if (m_TotalSize <= m_Configuration.TargetFootprintBytes)
- {
- return;
- }
- if (m_Configuration.MaxAgeSeconds == 0 || m_Configuration.TrimIntervalSeconds == 0)
- {
- return;
- }
-
- const GcClock::TimePoint Now = GcClock::Now();
-
- const GcClock::Tick NowTick = Now.time_since_epoch().count();
- const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.TrimIntervalSeconds);
- GcClock::Tick LastTrimTick = m_LastTickTrim;
- const GcClock::Tick NextAllowedTrimTick = LastTrimTick + GcClock::Duration(TrimInterval).count();
- if (NowTick < NextAllowedTrimTick)
- {
- return;
- }
-
- bool Expected = false;
- if (!m_IsTrimming.compare_exchange_strong(Expected, true))
- {
- return;
- }
-
- // Bump time forward so we don't keep trying to do m_IsTrimming.compare_exchange_strong
- const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
- m_LastTickTrim.store(NextTrimTick);
-
- m_JobQueue.QueueJob("ZenCacheMemoryLayer::Trim", [this, Now, TrimInterval](JobContext&) {
- ZEN_TRACE_CPU("Z$::Mem::Trim");
-
- Stopwatch Timer;
- uint64_t TrimmedSize = 0;
- const auto Guard = MakeGuard([&] {
- if (TrimmedSize > 0)
- {
- ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}",
- NiceBytes(TrimmedSize),
- NiceBytes(m_TotalSize),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- }
- m_IsTrimming.store(false);
- });
-
- const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MaxAgeSeconds);
-
- std::vector<uint64_t> UsageSlots;
- UsageSlots.reserve(std::chrono::seconds(MaxAge / TrimInterval).count());
- {
- RwLock::SharedLockScope __(m_Lock);
- for (auto& Kv : m_Buckets)
- {
- Kv.second->GetUsageByAccess(Now, GcClock::Duration(TrimInterval), UsageSlots);
- }
- }
- uint64_t TotalSize = 0;
- for (size_t Index = 0; Index < UsageSlots.size(); ++Index)
- {
- TotalSize += UsageSlots[Index];
- if (TotalSize >= m_Configuration.TargetFootprintBytes)
- {
- GcClock::TimePoint ExpireTime = Now - (TrimInterval * Index);
- TrimmedSize = CollectGarbage(ExpireTime);
- break;
- }
- }
- });
-}
-
-uint64_t
-ZenCacheMemoryLayer::TotalSize() const
-{
- uint64_t TotalSize{};
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto& Kv : m_Buckets)
- {
- TotalSize += Kv.second->TotalSize();
- }
-
- return TotalSize;
-}
-
-ZenCacheMemoryLayer::Info
-ZenCacheMemoryLayer::GetInfo() const
-{
- ZenCacheMemoryLayer::Info Info = {.Config = m_Configuration, .TotalSize = TotalSize()};
-
- RwLock::SharedLockScope _(m_Lock);
- Info.BucketNames.reserve(m_Buckets.size());
- for (auto& Kv : m_Buckets)
- {
- Info.BucketNames.push_back(Kv.first);
- Info.EntryCount += Kv.second->EntryCount();
- }
- return Info;
-}
-
-std::optional<ZenCacheMemoryLayer::BucketInfo>
-ZenCacheMemoryLayer::GetBucketInfo(std::string_view Bucket) const
-{
- RwLock::SharedLockScope _(m_Lock);
-
- if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end())
- {
- return ZenCacheMemoryLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()};
- }
- return {};
-}
-
-void
-ZenCacheMemoryLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
-{
- RwLock::SharedLockScope _(m_BucketLock);
-
- std::vector<IoHash> BadHashes;
-
- auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) {
- if (ContentType == ZenContentType::kCbObject)
- {
- CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All);
- return Error == CbValidateError::None;
- }
- if (ContentType == ZenContentType::kCompressedBinary)
- {
- IoHash RawHash;
- uint64_t RawSize;
- if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
- {
- return false;
- }
- if (Hash != RawHash)
- {
- return false;
- }
- }
- return true;
- };
-
- for (auto& Kv : m_CacheMap)
- {
- const BucketPayload& Payload = m_Payloads[Kv.second];
- if (!ValidateEntry(Kv.first, Payload.Payload.GetContentType(), Payload.Payload))
- {
- BadHashes.push_back(Kv.first);
- }
- }
-
- if (!BadHashes.empty())
- {
- Ctx.ReportBadCidChunks(BadHashes);
- }
-}
-
-void
-ZenCacheMemoryLayer::CacheBucket::GatherAccessTimes(std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes)
-{
- RwLock::SharedLockScope _(m_BucketLock);
- std::transform(m_CacheMap.begin(), m_CacheMap.end(), std::back_inserter(AccessTimes), [this](const auto& Kv) {
- return access_tracking::KeyAccessTime{.Key = Kv.first, .LastAccess = m_AccessTimes[Kv.second]};
- });
-}
-
-bool
-ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
-{
- ZEN_TRACE_CPU("Z$::Mem::Bucket::Get");
-
- metrics::OperationTiming::Scope $(m_GetOps);
-
- RwLock::SharedLockScope _(m_BucketLock);
-
- if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end())
- {
- uint32_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size());
- ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size());
-
- const BucketPayload& Payload = m_Payloads[EntryIndex];
- OutValue = {.Value = Payload.Payload, .RawSize = Payload.RawSize, .RawHash = Payload.RawHash};
- m_AccessTimes[EntryIndex] = GcClock::TickCount();
-
- return true;
- }
-
- return false;
-}
-
-int64_t
-ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
-{
- ZEN_TRACE_CPU("Z$::Mem::Bucket::Put");
-
- metrics::OperationTiming::Scope $(m_PutOps);
-
- size_t PayloadSize = Value.Value.GetSize();
- uint64_t OldPayloadSize = 0;
-
- {
- GcClock::Tick AccessTime = GcClock::TickCount();
- RwLock::ExclusiveLockScope _(m_BucketLock);
- if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end())
- {
- uint32_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size());
-
- BucketPayload& Payload = m_Payloads[EntryIndex];
- OldPayloadSize = Payload.Payload.GetSize();
- Payload.Payload = IoBufferBuilder::ReadFromFileMaybe(Value.Value);
- Payload.RawHash = Value.RawHash;
- Payload.RawSize = gsl::narrow<uint32_t>(Value.RawSize);
- m_AccessTimes[EntryIndex] = AccessTime;
- }
- else if (m_CacheMap.size() == std::numeric_limits<uint32_t>::max())
- {
- // No more space in our memory cache!
- return 0;
- }
- else
- {
- uint32_t EntryIndex = gsl::narrow<uint32_t>(m_Payloads.size());
- m_Payloads.emplace_back(BucketPayload{.Payload = IoBufferBuilder::ReadFromFileMaybe(Value.Value),
- .RawSize = gsl::narrow<uint32_t>(Value.RawSize),
- .RawHash = Value.RawHash});
- m_AccessTimes.emplace_back(AccessTime);
- m_CacheMap.insert_or_assign(HashKey, EntryIndex);
- }
- ZEN_ASSERT_SLOW(m_Payloads.size() == m_CacheMap.size());
- ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size());
- }
-
- if (PayloadSize > OldPayloadSize)
- {
- m_TotalSize.fetch_add(PayloadSize - OldPayloadSize);
- return PayloadSize - OldPayloadSize;
- }
- else if (PayloadSize < OldPayloadSize)
- {
- m_TotalSize.fetch_sub(OldPayloadSize - PayloadSize);
- return -static_cast<int64_t>(OldPayloadSize - PayloadSize);
- }
- return 0;
-}
-
-void
-ZenCacheMemoryLayer::CacheBucket::Drop()
-{
- RwLock::ExclusiveLockScope _(m_BucketLock);
- m_CacheMap.clear();
- m_AccessTimes.clear();
- m_Payloads.clear();
- m_TotalSize.store(0);
-}
-
-uint64_t
-ZenCacheMemoryLayer::CacheBucket::Trim(GcClock::TimePoint ExpireTime)
-{
- std::vector<AccessTime> AccessTimes;
- std::vector<BucketPayload> Payloads;
- tsl::robin_map<IoHash, uint32_t> CacheMap;
-
- size_t TrimmedSize = 0;
- GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
-
- RwLock::ExclusiveLockScope _(m_BucketLock);
- {
- AccessTimes.reserve(m_CacheMap.size());
- Payloads.reserve(m_CacheMap.size());
- CacheMap.reserve(m_CacheMap.size());
-
- for (const auto& Kv : m_CacheMap)
- {
- if (m_AccessTimes[Kv.second] < ExpireTicks)
- {
- size_t PayloadSize = m_Payloads[Kv.second].Payload.GetSize();
- m_TotalSize.fetch_sub(PayloadSize);
- TrimmedSize += PayloadSize;
- continue;
- }
- size_t Index = gsl::narrow<uint32_t>(Payloads.size());
- Payloads.emplace_back(m_Payloads[Kv.second]);
- AccessTimes.push_back(m_AccessTimes[Kv.second]);
- CacheMap.insert_or_assign(Kv.first, Index);
- }
-
- m_AccessTimes.swap(AccessTimes);
- m_Payloads.swap(Payloads);
- m_CacheMap.swap(CacheMap);
- }
- return TrimmedSize;
-}
-
-uint64_t
-ZenCacheMemoryLayer::CacheBucket::EntryCount() const
-{
- RwLock::SharedLockScope _(m_BucketLock);
- return static_cast<uint64_t>(m_CacheMap.size());
-}
-
-void
-ZenCacheMemoryLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart,
- GcClock::Duration SectionLength,
- std::vector<uint64_t>& InOutUsageSlots)
-{
- RwLock::SharedLockScope _(m_BucketLock);
- for (const auto& It : m_CacheMap)
- {
- uint32_t Index = It.second;
- GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index]));
- GcClock::Duration Age = TickStart.time_since_epoch() - ItemAccessTime.time_since_epoch();
- uint64_t Slot = gsl::narrow<uint64_t>(Age.count() > 0 ? Age.count() / SectionLength.count() : 0);
- if (Slot >= InOutUsageSlots.capacity())
- {
- Slot = InOutUsageSlots.capacity() - 1;
- }
- if (Slot > InOutUsageSlots.size())
- {
- InOutUsageSlots.resize(uint64_t(Slot + 1), 0);
- }
- InOutUsageSlots[Slot] += m_Payloads[Index].Payload.GetSize();
- }
-}
-
-} // namespace zen