diff options
| author | Dan Engelbrecht <[email protected]> | 2023-10-24 14:54:26 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-10-24 14:54:26 +0200 |
| commit | 1a144212278aa7158d6b32b63e398db95a7ae868 (patch) | |
| tree | 58735827b0b706368a82bcaaa8aaa68f211e1d10 /src/zenserver/cache/cachememorylayer.cpp | |
| parent | chunking moved to zenstore (#490) (diff) | |
| download | zen-1a144212278aa7158d6b32b63e398db95a7ae868.tar.xz zen-1a144212278aa7158d6b32b63e398db95a7ae868.zip | |
merge disk and memory layers (#493)
- Feature: Added `--cache-memlayer-sizethreshold` option to zenserver to control at which size cache entries get cached in memory
- Changed: Merged cache memory layer with cache disk layer to reduce memory and cpu overhead
Diffstat (limited to 'src/zenserver/cache/cachememorylayer.cpp')
| -rw-r--r-- | src/zenserver/cache/cachememorylayer.cpp | 521 |
1 files changed, 0 insertions, 521 deletions
diff --git a/src/zenserver/cache/cachememorylayer.cpp b/src/zenserver/cache/cachememorylayer.cpp deleted file mode 100644 index b62791974..000000000 --- a/src/zenserver/cache/cachememorylayer.cpp +++ /dev/null @@ -1,521 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "cachememorylayer.h" - -#include <zencore/compactbinaryvalidation.h> -#include <zencore/compress.h> -#include <zencore/fmtutils.h> -#include <zencore/jobqueue.h> -#include <zencore/scopeguard.h> -#include <zencore/trace.h> - -////////////////////////////////////////////////////////////////////////// - -namespace zen { - -ZenCacheMemoryLayer::ZenCacheMemoryLayer(JobQueue& JobQueue, const Configuration& Config) -: m_JobQueue(JobQueue) -, m_Configuration(Config) -, m_LastTickTrim(GcClock::Clock::time_point::min().time_since_epoch().count()) -{ -} - -ZenCacheMemoryLayer::~ZenCacheMemoryLayer() -{ -} - -bool -ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) -{ - if (m_Configuration.TargetFootprintBytes == 0) - { - return false; - } - ZEN_TRACE_CPU("Z$::Mem::Get"); - - RwLock::SharedLockScope _(m_Lock); - - auto It = m_Buckets.find(std::string(InBucket)); - - if (It == m_Buckets.end()) - { - return false; - } - - CacheBucket* Bucket = It->second.get(); - - _.ReleaseNow(); - - // There's a race here. Since the lock is released early to allow - // inserts, the bucket delete path could end up deleting the - // underlying data structure - - Trim(); - - return Bucket->Get(HashKey, OutValue); -} - -void -ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) -{ - if (m_Configuration.TargetFootprintBytes == 0) - { - return; - } - ZEN_TRACE_CPU("Z$::Mem::Put"); - - const auto BucketName = std::string(InBucket); - CacheBucket* Bucket = nullptr; - - { - RwLock::SharedLockScope _(m_Lock); - - if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - } - - if (Bucket == nullptr) - { - // New bucket - - RwLock::ExclusiveLockScope _(m_Lock); - - if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - else - { - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>()); - Bucket = InsertResult.first->second.get(); - } - } - - // Note that since the underlying IoBuffer is retained, the content type is also - int64_t Diff = Bucket->Put(HashKey, Value); - - if (Diff > 0) - { - m_TotalSize.fetch_add(static_cast<uint64_t>(Diff)); - Trim(); - } - else if (Diff < 0) - { - m_TotalSize.fetch_sub(static_cast<uint64_t>(-Diff)); - } -} - -bool -ZenCacheMemoryLayer::DropBucket(std::string_view InBucket) -{ - RwLock::ExclusiveLockScope _(m_Lock); - - auto It = m_Buckets.find(std::string(InBucket)); - - if (It != m_Buckets.end()) - { - CacheBucket& Bucket = *It->second; - m_TotalSize.fetch_sub(Bucket.TotalSize()); - m_DroppedBuckets.push_back(std::move(It->second)); - m_Buckets.erase(It); - Bucket.Drop(); - return true; - } - return false; -} - -void -ZenCacheMemoryLayer::Drop() -{ - RwLock::ExclusiveLockScope _(m_Lock); - - while (!m_Buckets.empty()) - { - const auto& It = m_Buckets.begin(); - CacheBucket& Bucket = *It->second; - m_TotalSize.fetch_sub(Bucket.TotalSize()); - m_DroppedBuckets.push_back(std::move(It->second)); - m_Buckets.erase(It->first); - Bucket.Drop(); - } -} - -void -ZenCacheMemoryLayer::ScrubStorage(ScrubContext& Ctx) -{ - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - Kv.second->ScrubStorage(Ctx); - } -} - -void -ZenCacheMemoryLayer::GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes) -{ - using namespace zen::access_tracking; - - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - std::vector<KeyAccessTime>& Bucket = AccessTimes.Buckets[Kv.first]; - Kv.second->GatherAccessTimes(Bucket); - } -} - -uint64_t -ZenCacheMemoryLayer::CollectGarbage(GcClock::TimePoint ExpireTime) -{ - uint64_t TrimmedSize = 0; - RwLock::SharedLockScope __(m_Lock); - for (auto& Kv : m_Buckets) - { - uint64_t BucketTrimmedSize = Kv.second->Trim(ExpireTime); - if (BucketTrimmedSize > 0) - { - m_TotalSize.fetch_sub(BucketTrimmedSize); - TrimmedSize += BucketTrimmedSize; - } - } - const GcClock::TimePoint Now = GcClock::Now(); - const GcClock::Tick NowTick = Now.time_since_epoch().count(); - const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.TrimIntervalSeconds); - GcClock::Tick LastTrimTick = m_LastTickTrim; - const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); - m_LastTickTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick); - return TrimmedSize; -} - -void -ZenCacheMemoryLayer::Trim() -{ - if (m_TotalSize <= m_Configuration.TargetFootprintBytes) - { - return; - } - if (m_Configuration.MaxAgeSeconds == 0 || m_Configuration.TrimIntervalSeconds == 0) - { - return; - } - - const GcClock::TimePoint Now = GcClock::Now(); - - const GcClock::Tick NowTick = Now.time_since_epoch().count(); - const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.TrimIntervalSeconds); - GcClock::Tick LastTrimTick = m_LastTickTrim; - const GcClock::Tick NextAllowedTrimTick = LastTrimTick + GcClock::Duration(TrimInterval).count(); - if (NowTick < NextAllowedTrimTick) - { - return; - } - - bool Expected = false; - if (!m_IsTrimming.compare_exchange_strong(Expected, true)) - { - return; - } - - // Bump time forward so we don't keep trying to do m_IsTrimming.compare_exchange_strong - const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); - m_LastTickTrim.store(NextTrimTick); - - m_JobQueue.QueueJob("ZenCacheMemoryLayer::Trim", [this, Now, TrimInterval](JobContext&) { - ZEN_TRACE_CPU("Z$::Mem::Trim"); - - Stopwatch Timer; - uint64_t TrimmedSize = 0; - const auto Guard = MakeGuard([&] { - if (TrimmedSize > 0) - { - ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}", - NiceBytes(TrimmedSize), - NiceBytes(m_TotalSize), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - } - m_IsTrimming.store(false); - }); - - const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MaxAgeSeconds); - - std::vector<uint64_t> UsageSlots; - UsageSlots.reserve(std::chrono::seconds(MaxAge / TrimInterval).count()); - { - RwLock::SharedLockScope __(m_Lock); - for (auto& Kv : m_Buckets) - { - Kv.second->GetUsageByAccess(Now, GcClock::Duration(TrimInterval), UsageSlots); - } - } - uint64_t TotalSize = 0; - for (size_t Index = 0; Index < UsageSlots.size(); ++Index) - { - TotalSize += UsageSlots[Index]; - if (TotalSize >= m_Configuration.TargetFootprintBytes) - { - GcClock::TimePoint ExpireTime = Now - (TrimInterval * Index); - TrimmedSize = CollectGarbage(ExpireTime); - break; - } - } - }); -} - -uint64_t -ZenCacheMemoryLayer::TotalSize() const -{ - uint64_t TotalSize{}; - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - TotalSize += Kv.second->TotalSize(); - } - - return TotalSize; -} - -ZenCacheMemoryLayer::Info -ZenCacheMemoryLayer::GetInfo() const -{ - ZenCacheMemoryLayer::Info Info = {.Config = m_Configuration, .TotalSize = TotalSize()}; - - RwLock::SharedLockScope _(m_Lock); - Info.BucketNames.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) - { - Info.BucketNames.push_back(Kv.first); - Info.EntryCount += Kv.second->EntryCount(); - } - return Info; -} - -std::optional<ZenCacheMemoryLayer::BucketInfo> -ZenCacheMemoryLayer::GetBucketInfo(std::string_view Bucket) const -{ - RwLock::SharedLockScope _(m_Lock); - - if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) - { - return ZenCacheMemoryLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()}; - } - return {}; -} - -void -ZenCacheMemoryLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) -{ - RwLock::SharedLockScope _(m_BucketLock); - - std::vector<IoHash> BadHashes; - - auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) { - if (ContentType == ZenContentType::kCbObject) - { - CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); - return Error == CbValidateError::None; - } - if (ContentType == ZenContentType::kCompressedBinary) - { - IoHash RawHash; - uint64_t RawSize; - if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) - { - return false; - } - if (Hash != RawHash) - { - return false; - } - } - return true; - }; - - for (auto& Kv : m_CacheMap) - { - const BucketPayload& Payload = m_Payloads[Kv.second]; - if (!ValidateEntry(Kv.first, Payload.Payload.GetContentType(), Payload.Payload)) - { - BadHashes.push_back(Kv.first); - } - } - - if (!BadHashes.empty()) - { - Ctx.ReportBadCidChunks(BadHashes); - } -} - -void -ZenCacheMemoryLayer::CacheBucket::GatherAccessTimes(std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes) -{ - RwLock::SharedLockScope _(m_BucketLock); - std::transform(m_CacheMap.begin(), m_CacheMap.end(), std::back_inserter(AccessTimes), [this](const auto& Kv) { - return access_tracking::KeyAccessTime{.Key = Kv.first, .LastAccess = m_AccessTimes[Kv.second]}; - }); -} - -bool -ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) -{ - ZEN_TRACE_CPU("Z$::Mem::Bucket::Get"); - - metrics::OperationTiming::Scope $(m_GetOps); - - RwLock::SharedLockScope _(m_BucketLock); - - if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end()) - { - uint32_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size()); - - const BucketPayload& Payload = m_Payloads[EntryIndex]; - OutValue = {.Value = Payload.Payload, .RawSize = Payload.RawSize, .RawHash = Payload.RawHash}; - m_AccessTimes[EntryIndex] = GcClock::TickCount(); - - return true; - } - - return false; -} - -int64_t -ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) -{ - ZEN_TRACE_CPU("Z$::Mem::Bucket::Put"); - - metrics::OperationTiming::Scope $(m_PutOps); - - size_t PayloadSize = Value.Value.GetSize(); - uint64_t OldPayloadSize = 0; - - { - GcClock::Tick AccessTime = GcClock::TickCount(); - RwLock::ExclusiveLockScope _(m_BucketLock); - if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end()) - { - uint32_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - - BucketPayload& Payload = m_Payloads[EntryIndex]; - OldPayloadSize = Payload.Payload.GetSize(); - Payload.Payload = IoBufferBuilder::ReadFromFileMaybe(Value.Value); - Payload.RawHash = Value.RawHash; - Payload.RawSize = gsl::narrow<uint32_t>(Value.RawSize); - m_AccessTimes[EntryIndex] = AccessTime; - } - else if (m_CacheMap.size() == std::numeric_limits<uint32_t>::max()) - { - // No more space in our memory cache! - return 0; - } - else - { - uint32_t EntryIndex = gsl::narrow<uint32_t>(m_Payloads.size()); - m_Payloads.emplace_back(BucketPayload{.Payload = IoBufferBuilder::ReadFromFileMaybe(Value.Value), - .RawSize = gsl::narrow<uint32_t>(Value.RawSize), - .RawHash = Value.RawHash}); - m_AccessTimes.emplace_back(AccessTime); - m_CacheMap.insert_or_assign(HashKey, EntryIndex); - } - ZEN_ASSERT_SLOW(m_Payloads.size() == m_CacheMap.size()); - ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size()); - } - - if (PayloadSize > OldPayloadSize) - { - m_TotalSize.fetch_add(PayloadSize - OldPayloadSize); - return PayloadSize - OldPayloadSize; - } - else if (PayloadSize < OldPayloadSize) - { - m_TotalSize.fetch_sub(OldPayloadSize - PayloadSize); - return -static_cast<int64_t>(OldPayloadSize - PayloadSize); - } - return 0; -} - -void -ZenCacheMemoryLayer::CacheBucket::Drop() -{ - RwLock::ExclusiveLockScope _(m_BucketLock); - m_CacheMap.clear(); - m_AccessTimes.clear(); - m_Payloads.clear(); - m_TotalSize.store(0); -} - -uint64_t -ZenCacheMemoryLayer::CacheBucket::Trim(GcClock::TimePoint ExpireTime) -{ - std::vector<AccessTime> AccessTimes; - std::vector<BucketPayload> Payloads; - tsl::robin_map<IoHash, uint32_t> CacheMap; - - size_t TrimmedSize = 0; - GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); - - RwLock::ExclusiveLockScope _(m_BucketLock); - { - AccessTimes.reserve(m_CacheMap.size()); - Payloads.reserve(m_CacheMap.size()); - CacheMap.reserve(m_CacheMap.size()); - - for (const auto& Kv : m_CacheMap) - { - if (m_AccessTimes[Kv.second] < ExpireTicks) - { - size_t PayloadSize = m_Payloads[Kv.second].Payload.GetSize(); - m_TotalSize.fetch_sub(PayloadSize); - TrimmedSize += PayloadSize; - continue; - } - size_t Index = gsl::narrow<uint32_t>(Payloads.size()); - Payloads.emplace_back(m_Payloads[Kv.second]); - AccessTimes.push_back(m_AccessTimes[Kv.second]); - CacheMap.insert_or_assign(Kv.first, Index); - } - - m_AccessTimes.swap(AccessTimes); - m_Payloads.swap(Payloads); - m_CacheMap.swap(CacheMap); - } - return TrimmedSize; -} - -uint64_t -ZenCacheMemoryLayer::CacheBucket::EntryCount() const -{ - RwLock::SharedLockScope _(m_BucketLock); - return static_cast<uint64_t>(m_CacheMap.size()); -} - -void -ZenCacheMemoryLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart, - GcClock::Duration SectionLength, - std::vector<uint64_t>& InOutUsageSlots) -{ - RwLock::SharedLockScope _(m_BucketLock); - for (const auto& It : m_CacheMap) - { - uint32_t Index = It.second; - GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index])); - GcClock::Duration Age = TickStart.time_since_epoch() - ItemAccessTime.time_since_epoch(); - uint64_t Slot = gsl::narrow<uint64_t>(Age.count() > 0 ? Age.count() / SectionLength.count() : 0); - if (Slot >= InOutUsageSlots.capacity()) - { - Slot = InOutUsageSlots.capacity() - 1; - } - if (Slot > InOutUsageSlots.size()) - { - InOutUsageSlots.resize(uint64_t(Slot + 1), 0); - } - InOutUsageSlots[Slot] += m_Payloads[Index].Payload.GetSize(); - } -} - -} // namespace zen |