// Copyright Epic Games, Inc. All Rights Reserved. #include "cachememorylayer.h" #include #include #include #include #include #include ////////////////////////////////////////////////////////////////////////// namespace zen { ZenCacheMemoryLayer::ZenCacheMemoryLayer(JobQueue& JobQueue, const Configuration& Config) : m_JobQueue(JobQueue) , m_Configuration(Config) , m_LastTickTrim(GcClock::Clock::time_point::min().time_since_epoch().count()) { } ZenCacheMemoryLayer::~ZenCacheMemoryLayer() { } bool ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { if (m_Configuration.TargetFootprintBytes == 0) { return false; } ZEN_TRACE_CPU("Z$::Mem::Get"); RwLock::SharedLockScope _(m_Lock); auto It = m_Buckets.find(std::string(InBucket)); if (It == m_Buckets.end()) { return false; } CacheBucket* Bucket = It->second.get(); _.ReleaseNow(); // There's a race here. Since the lock is released early to allow // inserts, the bucket delete path could end up deleting the // underlying data structure Trim(); return Bucket->Get(HashKey, OutValue); } void ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { if (m_Configuration.TargetFootprintBytes == 0) { return; } ZEN_TRACE_CPU("Z$::Mem::Put"); const auto BucketName = std::string(InBucket); CacheBucket* Bucket = nullptr; { RwLock::SharedLockScope _(m_Lock); if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end()) { Bucket = It->second.get(); } } if (Bucket == nullptr) { // New bucket RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end()) { Bucket = It->second.get(); } else { auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique()); Bucket = InsertResult.first->second.get(); } } // Note that since the underlying IoBuffer is retained, the content type is also int64_t Diff = Bucket->Put(HashKey, Value); if (Diff > 0) { m_TotalSize.fetch_add(static_cast(Diff)); Trim(); } else if (Diff < 0) { m_TotalSize.fetch_sub(static_cast(-Diff)); } } bool ZenCacheMemoryLayer::DropBucket(std::string_view InBucket) { RwLock::ExclusiveLockScope _(m_Lock); auto It = m_Buckets.find(std::string(InBucket)); if (It != m_Buckets.end()) { CacheBucket& Bucket = *It->second; m_TotalSize.fetch_sub(Bucket.TotalSize()); m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It); Bucket.Drop(); return true; } return false; } void ZenCacheMemoryLayer::Drop() { RwLock::ExclusiveLockScope _(m_Lock); while (!m_Buckets.empty()) { const auto& It = m_Buckets.begin(); CacheBucket& Bucket = *It->second; m_TotalSize.fetch_sub(Bucket.TotalSize()); m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It->first); Bucket.Drop(); } } void ZenCacheMemoryLayer::ScrubStorage(ScrubContext& Ctx) { RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { Kv.second->ScrubStorage(Ctx); } } void ZenCacheMemoryLayer::GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes) { using namespace zen::access_tracking; RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { std::vector& Bucket = AccessTimes.Buckets[Kv.first]; Kv.second->GatherAccessTimes(Bucket); } } uint64_t ZenCacheMemoryLayer::CollectGarbage(GcClock::TimePoint ExpireTime) { uint64_t TrimmedSize = 0; RwLock::SharedLockScope __(m_Lock); for (auto& Kv : m_Buckets) { uint64_t BucketTrimmedSize = Kv.second->Trim(ExpireTime); if (BucketTrimmedSize > 0) { m_TotalSize.fetch_sub(BucketTrimmedSize); TrimmedSize += BucketTrimmedSize; } } const GcClock::TimePoint Now = GcClock::Now(); const GcClock::Tick NowTick = Now.time_since_epoch().count(); const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.TrimIntervalSeconds); GcClock::Tick LastTrimTick = m_LastTickTrim; const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); m_LastTickTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick); return TrimmedSize; } void ZenCacheMemoryLayer::Trim() { if (m_TotalSize <= m_Configuration.TargetFootprintBytes) { return; } if (m_Configuration.MaxAgeSeconds == 0 || m_Configuration.TrimIntervalSeconds == 0) { return; } const GcClock::TimePoint Now = GcClock::Now(); const GcClock::Tick NowTick = Now.time_since_epoch().count(); const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.TrimIntervalSeconds); GcClock::Tick LastTrimTick = m_LastTickTrim; const GcClock::Tick NextAllowedTrimTick = LastTrimTick + GcClock::Duration(TrimInterval).count(); if (NowTick < NextAllowedTrimTick) { return; } bool Expected = false; if (!m_IsTrimming.compare_exchange_strong(Expected, true)) { return; } // Bump time forward so we don't keep trying to do m_IsTrimming.compare_exchange_strong const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); m_LastTickTrim.store(NextTrimTick); m_JobQueue.QueueJob("ZenCacheMemoryLayer::Trim", [this, Now, TrimInterval](JobContext&) { ZEN_TRACE_CPU("Z$::Mem::Trim"); Stopwatch Timer; uint64_t TrimmedSize = 0; const auto Guard = MakeGuard([&] { if (TrimmedSize > 0) { ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}", NiceBytes(TrimmedSize), NiceBytes(m_TotalSize), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } m_IsTrimming.store(false); }); const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MaxAgeSeconds); std::vector UsageSlots; UsageSlots.reserve(std::chrono::seconds(MaxAge / TrimInterval).count()); { RwLock::SharedLockScope __(m_Lock); for (auto& Kv : m_Buckets) { Kv.second->GetUsageByAccess(Now, GcClock::Duration(TrimInterval), UsageSlots); } } uint64_t TotalSize = 0; for (size_t Index = 0; Index < UsageSlots.size(); ++Index) { TotalSize += UsageSlots[Index]; if (TotalSize >= m_Configuration.TargetFootprintBytes) { GcClock::TimePoint ExpireTime = Now - (TrimInterval * Index); TrimmedSize = CollectGarbage(ExpireTime); break; } } }); } uint64_t ZenCacheMemoryLayer::TotalSize() const { uint64_t TotalSize{}; RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { TotalSize += Kv.second->TotalSize(); } return TotalSize; } ZenCacheMemoryLayer::Info ZenCacheMemoryLayer::GetInfo() const { ZenCacheMemoryLayer::Info Info = {.Config = m_Configuration, .TotalSize = TotalSize()}; RwLock::SharedLockScope _(m_Lock); Info.BucketNames.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { Info.BucketNames.push_back(Kv.first); Info.EntryCount += Kv.second->EntryCount(); } return Info; } std::optional ZenCacheMemoryLayer::GetBucketInfo(std::string_view Bucket) const { RwLock::SharedLockScope _(m_Lock); if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) { return ZenCacheMemoryLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()}; } return {}; } void ZenCacheMemoryLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) { RwLock::SharedLockScope _(m_BucketLock); std::vector BadHashes; auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) { if (ContentType == ZenContentType::kCbObject) { CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); return Error == CbValidateError::None; } if (ContentType == ZenContentType::kCompressedBinary) { IoHash RawHash; uint64_t RawSize; if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) { return false; } if (Hash != RawHash) { return false; } } return true; }; for (auto& Kv : m_CacheMap) { const BucketPayload& Payload = m_Payloads[Kv.second]; if (!ValidateEntry(Kv.first, Payload.Payload.GetContentType(), Payload.Payload)) { BadHashes.push_back(Kv.first); } } if (!BadHashes.empty()) { Ctx.ReportBadCidChunks(BadHashes); } } void ZenCacheMemoryLayer::CacheBucket::GatherAccessTimes(std::vector& AccessTimes) { RwLock::SharedLockScope _(m_BucketLock); std::transform(m_CacheMap.begin(), m_CacheMap.end(), std::back_inserter(AccessTimes), [this](const auto& Kv) { return access_tracking::KeyAccessTime{.Key = Kv.first, .LastAccess = m_AccessTimes[Kv.second]}; }); } bool ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { ZEN_TRACE_CPU("Z$::Mem::Bucket::Get"); metrics::OperationTiming::Scope $(m_GetOps); RwLock::SharedLockScope _(m_BucketLock); if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end()) { uint32_t EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size()); const BucketPayload& Payload = m_Payloads[EntryIndex]; OutValue = {.Value = Payload.Payload, .RawSize = Payload.RawSize, .RawHash = Payload.RawHash}; m_AccessTimes[EntryIndex] = GcClock::TickCount(); return true; } return false; } int64_t ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { ZEN_TRACE_CPU("Z$::Mem::Bucket::Put"); metrics::OperationTiming::Scope $(m_PutOps); size_t PayloadSize = Value.Value.GetSize(); uint64_t OldPayloadSize = 0; { GcClock::Tick AccessTime = GcClock::TickCount(); RwLock::ExclusiveLockScope _(m_BucketLock); if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end()) { uint32_t EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); BucketPayload& Payload = m_Payloads[EntryIndex]; OldPayloadSize = Payload.Payload.GetSize(); Payload.Payload = IoBufferBuilder::ReadFromFileMaybe(Value.Value); Payload.RawHash = Value.RawHash; Payload.RawSize = gsl::narrow(Value.RawSize); m_AccessTimes[EntryIndex] = AccessTime; } else if (m_CacheMap.size() == std::numeric_limits::max()) { // No more space in our memory cache! return 0; } else { uint32_t EntryIndex = gsl::narrow(m_Payloads.size()); m_Payloads.emplace_back(BucketPayload{.Payload = IoBufferBuilder::ReadFromFileMaybe(Value.Value), .RawSize = gsl::narrow(Value.RawSize), .RawHash = Value.RawHash}); m_AccessTimes.emplace_back(AccessTime); m_CacheMap.insert_or_assign(HashKey, EntryIndex); } ZEN_ASSERT_SLOW(m_Payloads.size() == m_CacheMap.size()); ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size()); } if (PayloadSize > OldPayloadSize) { m_TotalSize.fetch_add(PayloadSize - OldPayloadSize); return PayloadSize - OldPayloadSize; } else if (PayloadSize < OldPayloadSize) { m_TotalSize.fetch_sub(OldPayloadSize - PayloadSize); return -static_cast(OldPayloadSize - PayloadSize); } return 0; } void ZenCacheMemoryLayer::CacheBucket::Drop() { RwLock::ExclusiveLockScope _(m_BucketLock); m_CacheMap.clear(); m_AccessTimes.clear(); m_Payloads.clear(); m_TotalSize.store(0); } uint64_t ZenCacheMemoryLayer::CacheBucket::Trim(GcClock::TimePoint ExpireTime) { std::vector AccessTimes; std::vector Payloads; tsl::robin_map CacheMap; size_t TrimmedSize = 0; GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); RwLock::ExclusiveLockScope _(m_BucketLock); { AccessTimes.reserve(m_CacheMap.size()); Payloads.reserve(m_CacheMap.size()); CacheMap.reserve(m_CacheMap.size()); for (const auto& Kv : m_CacheMap) { if (m_AccessTimes[Kv.second] < ExpireTicks) { size_t PayloadSize = m_Payloads[Kv.second].Payload.GetSize(); m_TotalSize.fetch_sub(PayloadSize); TrimmedSize += PayloadSize; continue; } size_t Index = gsl::narrow(Payloads.size()); Payloads.emplace_back(m_Payloads[Kv.second]); AccessTimes.push_back(m_AccessTimes[Kv.second]); CacheMap.insert_or_assign(Kv.first, Index); } m_AccessTimes.swap(AccessTimes); m_Payloads.swap(Payloads); m_CacheMap.swap(CacheMap); } return TrimmedSize; } uint64_t ZenCacheMemoryLayer::CacheBucket::EntryCount() const { RwLock::SharedLockScope _(m_BucketLock); return static_cast(m_CacheMap.size()); } void ZenCacheMemoryLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart, GcClock::Duration SectionLength, std::vector& InOutUsageSlots) { RwLock::SharedLockScope _(m_BucketLock); for (const auto& It : m_CacheMap) { uint32_t Index = It.second; GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index])); GcClock::Duration Age = TickStart.time_since_epoch() - ItemAccessTime.time_since_epoch(); uint64_t Slot = gsl::narrow(Age.count() > 0 ? Age.count() / SectionLength.count() : 0); if (Slot >= InOutUsageSlots.capacity()) { Slot = InOutUsageSlots.capacity() - 1; } if (Slot > InOutUsageSlots.size()) { InOutUsageSlots.resize(uint64_t(Slot + 1), 0); } InOutUsageSlots[Slot] += m_Payloads[Index].Payload.GetSize(); } } } // namespace zen