diff options
| author | Dan Engelbrecht <[email protected]> | 2023-10-24 14:54:26 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-10-24 14:54:26 +0200 |
| commit | 1a144212278aa7158d6b32b63e398db95a7ae868 (patch) | |
| tree | 58735827b0b706368a82bcaaa8aaa68f211e1d10 /src/zenserver/cache | |
| parent | chunking moved to zenstore (#490) (diff) | |
| download | zen-1a144212278aa7158d6b32b63e398db95a7ae868.tar.xz zen-1a144212278aa7158d6b32b63e398db95a7ae868.zip | |
merge disk and memory layers (#493)
- Feature: Added `--cache-memlayer-sizethreshold` option to zenserver to control at which size cache entries get cached in memory
- Changed: Merged cache memory layer with cache disk layer to reduce memory and cpu overhead
Diffstat (limited to 'src/zenserver/cache')
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.cpp | 826 | ||||
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.h | 136 | ||||
| -rw-r--r-- | src/zenserver/cache/cachememorylayer.cpp | 521 | ||||
| -rw-r--r-- | src/zenserver/cache/cachememorylayer.h | 124 | ||||
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 82 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.cpp | 165 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.h | 52 |
7 files changed, 785 insertions, 1121 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 73c02bcc2..f755436e0 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -7,6 +7,7 @@ #include <zencore/compactbinaryvalidation.h> #include <zencore/compress.h> #include <zencore/fmtutils.h> +#include <zencore/jobqueue.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> #include <zencore/trace.h> @@ -167,16 +168,16 @@ SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) const size_t ZenCacheDiskLayer::CacheBucket::UnknownReferencesIndex; const size_t ZenCacheDiskLayer::CacheBucket::NoReferencesIndex; -ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName, bool EnableReferenceCaching) +ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName, const BucketConfiguration& Config) : m_BucketName(std::move(BucketName)) +, m_Configuration(Config) , m_BucketId(Oid::Zero) -, m_EnableReferenceCaching(EnableReferenceCaching) { if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap"))) { // This is pretty ad hoc but in order to avoid too many individual files // it makes sense to have a different strategy for legacy values - m_LargeObjectThreshold = 16 * 1024 * 1024; + m_Configuration.LargeObjectThreshold = 16 * 1024 * 1024; } } @@ -397,7 +398,7 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); CacheBucketIndexHeader Header = {.EntryCount = Entries.size(), .LogPosition = LogCount, - .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)}; + .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)}; Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); @@ -470,7 +471,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - m_PayloadAlignment = Header.PayloadAlignment; + m_Configuration.PayloadAlignment = Header.PayloadAlignment; std::vector<DiskIndexEntry> Entries; Entries.resize(Header.EntryCount); @@ -479,11 +480,6 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index sizeof(CacheBucketIndexHeader)); m_Payloads.reserve(Header.EntryCount); - if (m_EnableReferenceCaching) - { - m_FirstReferenceIndex.reserve(Header.EntryCount); - } - m_AccessTimes.reserve(Header.EntryCount); m_Index.reserve(Header.EntryCount); std::string InvalidEntryReason; @@ -496,14 +492,18 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index } size_t EntryIndex = m_Payloads.size(); m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location, .RawSize = 0, .RawHash = IoHash::Zero}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - if (m_EnableReferenceCaching) - { - m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex); - } m_Index.insert_or_assign(Entry.Key, EntryIndex); EntryCount++; } + m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount())); + if (m_Configuration.MemCacheSizeThreshold > 0) + { + m_CachedPayloads.resize(m_Payloads.size()); + } + if (m_Configuration.EnableReferenceCaching) + { + m_FirstReferenceIndex.resize(m_Payloads.size(), UnknownReferencesIndex); + } OutVersion = CacheBucketIndexHeader::Version2; return Header.LogPosition; } @@ -540,8 +540,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); SkipEntryCount = 0; } - LogEntryCount = EntryCount - SkipEntryCount; - m_Index.reserve(LogEntryCount); + LogEntryCount = EntryCount - SkipEntryCount; uint64_t InvalidEntryCount = 0; CasLog.Replay( [&](const DiskIndexEntry& Record) { @@ -559,14 +558,18 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui } size_t EntryIndex = m_Payloads.size(); m_Payloads.emplace_back(BucketPayload{.Location = Record.Location, .RawSize = 0u, .RawHash = IoHash::Zero}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - if (m_EnableReferenceCaching) - { - m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex); - } m_Index.insert_or_assign(Record.Key, EntryIndex); }, SkipEntryCount); + m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount())); + if (m_Configuration.MemCacheSizeThreshold > 0) + { + m_CachedPayloads.resize(m_Payloads.size()); + } + if (m_Configuration.EnableReferenceCaching) + { + m_FirstReferenceIndex.resize(m_Payloads.size(), UnknownReferencesIndex); + } if (InvalidEntryCount) { ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir); @@ -582,11 +585,12 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) { ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenLog"); - m_TotalStandaloneSize = 0; + m_StandaloneSize = 0; m_Index.clear(); m_Payloads.clear(); m_AccessTimes.clear(); + m_CachedPayloads.clear(); m_FirstReferenceIndex.clear(); m_ReferenceHashes.clear(); m_NextReferenceHashesIndexes.clear(); @@ -604,7 +608,7 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) CreateDirectories(m_BucketDir); - m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1); + m_BlockStore.Initialize(m_BlocksBasePath, m_Configuration.MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1); if (std::filesystem::is_regular_file(IndexPath)) { @@ -643,10 +647,10 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { - m_TotalStandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed); + m_StandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed); continue; } - const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment); + const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment); KnownLocations.push_back(BlockLocation); } @@ -681,7 +685,7 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) con { ZEN_TRACE_CPU("Z$::Disk::Bucket::GetInlineCacheValue"); - BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment); + BlockStoreLocation Location = Loc.GetBlockLocation(m_Configuration.PayloadAlignment); IoBuffer Value = m_BlockStore.TryGetChunk(Location); if (Value) @@ -713,88 +717,190 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentTy } bool -ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) +ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue, std::atomic_uint64_t& CacheMemoryUsage) { metrics::RequestStats::Scope StatsScope(m_GetOps, 0); - RwLock::SharedLockScope _(m_IndexLock); + RwLock::SharedLockScope IndexLock(m_IndexLock); auto It = m_Index.find(HashKey); if (It == m_Index.end()) { - m_MissCount++; + m_DiskMissCount++; + if (m_Configuration.MemCacheSizeThreshold > 0) + { + m_MemoryMissCount++; + } return false; } - size_t EntryIndex = It.value(); - const BucketPayload& Payload = m_Payloads[EntryIndex]; - m_AccessTimes[EntryIndex] = GcClock::TickCount(); - DiskLocation Location = Payload.Location; - OutValue.RawSize = Payload.RawSize; - OutValue.RawHash = Payload.RawHash; - if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + + size_t EntryIndex = It.value(); + m_AccessTimes[EntryIndex] = GcClock::TickCount(); + DiskLocation Location = m_Payloads[EntryIndex].Location; + bool FillRawHashAndRawSize = (!Location.IsFlagSet(DiskLocation::kStructured)) && (Location.Size() > 0); + if (FillRawHashAndRawSize) { - // We don't need to hold the index lock when we read a standalone file - _.ReleaseNow(); - OutValue.Value = GetStandaloneCacheValue(Location.GetContentType(), HashKey); + const BucketPayload& Payload = m_Payloads[EntryIndex]; + if (Payload.RawHash != IoHash::Zero || Payload.RawSize != 0) + { + OutValue.RawHash = Payload.RawHash; + OutValue.RawSize = Payload.RawSize; + FillRawHashAndRawSize = false; + } } - else + + if (m_Configuration.MemCacheSizeThreshold > 0 && m_CachedPayloads[EntryIndex]) { - OutValue.Value = GetInlineCacheValue(Location); + OutValue.Value = m_CachedPayloads[EntryIndex]; + IndexLock.ReleaseNow(); + m_MemoryHitCount++; } - _.ReleaseNow(); - - if (!Location.IsFlagSet(DiskLocation::kStructured)) + else { - if (OutValue.RawHash == IoHash::Zero && OutValue.RawSize == 0 && OutValue.Value.GetSize() > 0) + IndexLock.ReleaseNow(); + if (m_Configuration.MemCacheSizeThreshold > 0) + { + m_MemoryMissCount++; + } + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + OutValue.Value = GetStandaloneCacheValue(Location.GetContentType(), HashKey); + } + else { - if (Location.IsFlagSet(DiskLocation::kCompressed)) + OutValue.Value = GetInlineCacheValue(Location); + if (m_Configuration.MemCacheSizeThreshold > 0) { - (void)CompressedBuffer::FromCompressed(SharedBuffer(OutValue.Value), OutValue.RawHash, OutValue.RawSize); + size_t ValueSize = OutValue.Value.GetSize(); + if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold) + { + ZEN_TRACE_CPU("Z$::Disk::Bucket::Get::MemCache"); + OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value); + RwLock::ExclusiveLockScope _(m_IndexLock); + if (auto UpdateIt = m_Index.find(HashKey); UpdateIt != m_Index.end()) + { + // Only update if it has not already been updated by other thread + if (!m_CachedPayloads[UpdateIt->second]) + { + m_CachedPayloads[UpdateIt->second] = OutValue.Value; + m_MemCachedSize.fetch_add(ValueSize); + CacheMemoryUsage.fetch_add(ValueSize); + m_MemoryWriteCount++; + } + } + } } - else + } + } + + if (FillRawHashAndRawSize) + { + ZEN_TRACE_CPU("Z$::Disk::Bucket::Get::MetaData"); + if (Location.IsFlagSet(DiskLocation::kCompressed)) + { + if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize)) { - OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); - OutValue.RawSize = OutValue.Value.GetSize(); + OutValue = ZenCacheValue{}; + m_DiskMissCount++; + return false; } - RwLock::ExclusiveLockScope __(m_IndexLock); - if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end()) + } + else + { + OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); + OutValue.RawSize = OutValue.Value.GetSize(); + } + RwLock::ExclusiveLockScope __(m_IndexLock); + if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end()) + { + BucketPayload& WritePayload = m_Payloads[WriteIt.value()]; + if (OutValue.RawHash == IoHash::Zero && OutValue.RawSize == 0) { - BucketPayload& WritePayload = m_Payloads[WriteIt.value()]; - WritePayload.RawHash = OutValue.RawHash; - WritePayload.RawSize = OutValue.RawSize; - - m_LogFlushPosition = 0; // Force resave of index on exit + WritePayload.RawHash = OutValue.RawHash; + WritePayload.RawSize = OutValue.RawSize; } } } if (OutValue.Value) { - m_HitCount++; + m_DiskHitCount++; StatsScope.SetBytes(OutValue.Value.GetSize()); return true; } else { - m_MissCount++; + m_DiskMissCount++; return false; } } void -ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) +ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + std::atomic_uint64_t& CacheMemoryUsage) { metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); - if (Value.Value.Size() >= m_LargeObjectThreshold) + if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) { - return PutStandaloneCacheValue(HashKey, Value, References); + PutStandaloneCacheValue(HashKey, Value, References, CacheMemoryUsage); } - PutInlineCacheValue(HashKey, Value, References); + else + { + PutInlineCacheValue(HashKey, Value, References, CacheMemoryUsage); + } + + m_DiskWriteCount++; +} - m_WriteCount++; +void +ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime, std::atomic_uint64_t& CacheMemoryUsage) +{ + GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); + + RwLock::ExclusiveLockScope _(m_IndexLock); + for (const auto& Kv : m_Index) + { + if (m_AccessTimes[Kv.second] < ExpireTicks) + { + size_t PayloadSize = m_CachedPayloads[Kv.second].GetSize(); + m_MemCachedSize.fetch_sub(PayloadSize); + CacheMemoryUsage.fetch_sub(PayloadSize); + m_CachedPayloads[Kv.second] = {}; + } + } +} + +void +ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart, + GcClock::Duration SectionLength, + std::vector<uint64_t>& InOutUsageSlots) +{ + RwLock::SharedLockScope _(m_IndexLock); + for (const auto& It : m_Index) + { + size_t Index = It.second; + if (!m_CachedPayloads[Index]) + { + continue; + } + GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index])); + GcClock::Duration Age = TickStart.time_since_epoch() - ItemAccessTime.time_since_epoch(); + uint64_t Slot = gsl::narrow<uint64_t>(Age.count() > 0 ? Age.count() / SectionLength.count() : 0); + if (Slot >= InOutUsageSlots.capacity()) + { + Slot = InOutUsageSlots.capacity() - 1; + } + if (Slot > InOutUsageSlots.size()) + { + InOutUsageSlots.resize(uint64_t(Slot + 1), 0); + } + InOutUsageSlots[Slot] += m_CachedPayloads[Index].GetSize(); + } } bool -ZenCacheDiskLayer::CacheBucket::Drop() +ZenCacheDiskLayer::CacheBucket::Drop(std::atomic_uint64_t& CacheMemoryUsage) { ZEN_TRACE_CPU("Z$::Disk::Bucket::Drop"); @@ -814,10 +920,14 @@ ZenCacheDiskLayer::CacheBucket::Drop() m_Index.clear(); m_Payloads.clear(); m_AccessTimes.clear(); + m_CachedPayloads.clear(); m_FirstReferenceIndex.clear(); m_ReferenceHashes.clear(); m_NextReferenceHashesIndexes.clear(); m_ReferenceCount = 0; + m_StandaloneSize.store(0); + CacheMemoryUsage.fetch_sub(m_MemCachedSize.load()); + m_MemCachedSize.store(0); return Deleted; } @@ -992,7 +1102,7 @@ ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer) }; void -ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) +ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx, std::atomic_uint64_t& CacheMemoryUsage) { ZEN_TRACE_CPU("Z$::Disk::Bucket::Scrub"); @@ -1083,7 +1193,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) } else { - ChunkLocations.emplace_back(Loc.GetBlockLocation(m_PayloadAlignment)); + ChunkLocations.emplace_back(Loc.GetBlockLocation(m_Configuration.PayloadAlignment)); ChunkIndexToChunkHash.push_back(HashKey); continue; } @@ -1169,11 +1279,24 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) // Log a tombstone and delete the in-memory index for the bad entry const auto It = m_Index.find(BadKey); BucketPayload& Payload = m_Payloads[It->second]; - if (m_EnableReferenceCaching) + if (m_Configuration.EnableReferenceCaching) { RemoveReferences(IndexLock, m_FirstReferenceIndex[It->second]); } DiskLocation Location = Payload.Location; + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + m_StandaloneSize.fetch_sub(Location.Size(), std::memory_order::relaxed); + } + + if (m_Configuration.MemCacheSizeThreshold > 0) + { + size_t CachedSize = m_CachedPayloads[It->second].GetSize(); + m_MemCachedSize.fetch_sub(CachedSize); + CacheMemoryUsage.fetch_sub(CachedSize); + m_CachedPayloads[It->second] = IoBuffer{}; + } + Location.Flags |= DiskLocation::kTombStone; LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location}); m_Index.erase(BadKey); @@ -1193,7 +1316,6 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) std::error_code Ec; fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file... } - m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); } } m_SlogFile.Append(LogEntries); @@ -1202,38 +1324,13 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) { std::vector<BucketPayload> Payloads; std::vector<AccessTime> AccessTimes; + std::vector<IoBuffer> CachedPayloads; std::vector<size_t> FirstReferenceIndex; IndexMap Index; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - size_t EntryCount = m_Index.size(); - Payloads.reserve(EntryCount); - AccessTimes.reserve(EntryCount); - if (m_EnableReferenceCaching) - { - FirstReferenceIndex.reserve(EntryCount); - } - Index.reserve(EntryCount); - for (auto It : m_Index) - { - size_t EntryIndex = Payloads.size(); - Payloads.push_back(m_Payloads[It.second]); - AccessTimes.push_back(m_AccessTimes[It.second]); - if (m_EnableReferenceCaching) - { - FirstReferenceIndex.push_back(m_FirstReferenceIndex[It.second]); - } - Index.insert({It.first, EntryIndex}); - } - m_Index.swap(Index); - m_Payloads.swap(Payloads); - m_AccessTimes.swap(AccessTimes); - if (m_EnableReferenceCaching) - { - m_FirstReferenceIndex.swap(FirstReferenceIndex); - CompactReferences(IndexLock); - } + CompactState(Payloads, AccessTimes, CachedPayloads, FirstReferenceIndex, Index, IndexLock); } } } @@ -1334,7 +1431,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) { continue; } - if (m_EnableReferenceCaching) + if (m_Configuration.EnableReferenceCaching) { if (FirstReferenceIndex.empty() || (FirstReferenceIndex[Entry.second] == UnknownReferencesIndex)) { @@ -1397,8 +1494,17 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) #endif // CALCULATE_BLOCKING_TIME if (auto It = m_Index.find(Entry.first); It != m_Index.end()) { - BucketPayload& UpdatePayload = m_Payloads[It->second]; - Buffer = GetInlineCacheValue(UpdatePayload.Location); + if (m_Configuration.MemCacheSizeThreshold > 0) + { + Buffer = m_CachedPayloads[It->second]; + } + if (!Buffer) + { + DiskLocation Location = m_Payloads[It->second].Location; + IndexLock.ReleaseNow(); + Buffer = GetInlineCacheValue(Location); + // Don't memcache items when doing GC + } } if (!Buffer) { @@ -1411,7 +1517,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) CbObject Obj(SharedBuffer{Buffer}); size_t CurrentCidCount = Cids.size(); Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); - if (m_EnableReferenceCaching) + if (m_Configuration.EnableReferenceCaching) { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); #if CALCULATE_BLOCKING_TIME @@ -1450,20 +1556,20 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) } void -ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) +ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx, std::atomic_uint64_t& CacheMemoryUsage) { ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage"); ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir); - Stopwatch TotalTimer; - uint64_t WriteBlockTimeUs = 0; - uint64_t WriteBlockLongestTimeUs = 0; - uint64_t ReadBlockTimeUs = 0; - uint64_t ReadBlockLongestTimeUs = 0; - uint64_t TotalChunkCount = 0; - uint64_t DeletedSize = 0; - uint64_t OldTotalSize = TotalSize(); + Stopwatch TotalTimer; + uint64_t WriteBlockTimeUs = 0; + uint64_t WriteBlockLongestTimeUs = 0; + uint64_t ReadBlockTimeUs = 0; + uint64_t ReadBlockLongestTimeUs = 0; + uint64_t TotalChunkCount = 0; + uint64_t DeletedSize = 0; + GcStorageSize OldTotalSize = StorageSize(); std::unordered_set<IoHash> DeletedChunks; uint64_t MovedCount = 0; @@ -1473,7 +1579,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved " "{} " "of {} " - "entires ({}).", + "entries ({}/{}).", m_BucketDir, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), NiceLatencyNs(WriteBlockTimeUs), @@ -1484,7 +1590,8 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) DeletedChunks.size(), MovedCount, TotalChunkCount, - NiceBytes(OldTotalSize)); + NiceBytes(OldTotalSize.DiskSize), + NiceBytes(OldTotalSize.MemorySize)); bool Expected = false; if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) @@ -1514,45 +1621,18 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) // Clean up m_AccessTimes and m_Payloads vectors std::vector<BucketPayload> Payloads; std::vector<AccessTime> AccessTimes; + std::vector<IoBuffer> CachedPayloads; std::vector<size_t> FirstReferenceIndex; IndexMap Index; - { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - size_t EntryCount = m_Index.size(); - Payloads.reserve(EntryCount); - AccessTimes.reserve(EntryCount); - if (m_EnableReferenceCaching) - { - FirstReferenceIndex.reserve(EntryCount); - } - Index.reserve(EntryCount); - for (auto It : m_Index) - { - size_t OldEntryIndex = It.second; - size_t NewEntryIndex = Payloads.size(); - Payloads.push_back(m_Payloads[OldEntryIndex]); - AccessTimes.push_back(m_AccessTimes[OldEntryIndex]); - if (m_EnableReferenceCaching) - { - FirstReferenceIndex.push_back(m_FirstReferenceIndex[It.second]); - } - Index.insert({It.first, NewEntryIndex}); - } - m_Index.swap(Index); - m_Payloads.swap(Payloads); - m_AccessTimes.swap(AccessTimes); - if (m_EnableReferenceCaching) - { - m_FirstReferenceIndex.swap(FirstReferenceIndex); - CompactReferences(IndexLock); - } + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + CompactState(Payloads, AccessTimes, CachedPayloads, FirstReferenceIndex, Index, IndexLock); } GcCtx.AddDeletedCids(std::vector<IoHash>(DeletedChunks.begin(), DeletedChunks.end())); } @@ -1610,7 +1690,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) for (const auto& Entry : ExpiredStandaloneEntries) { m_Index.erase(Entry.Key); - m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); + m_StandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); DeletedChunks.insert(Entry.Key); } m_SlogFile.Append(ExpiredStandaloneEntries); @@ -1623,20 +1703,18 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::Delete"); - std::error_code Ec; ExtendablePathBuilder<256> Path; for (const auto& Entry : ExpiredStandaloneEntries) { - const IoHash& Key = Entry.Key; - const DiskLocation& Loc = Entry.Location; + const IoHash& Key = Entry.Key; Path.Reset(); BuildPath(Path, Key); fs::path FilePath = Path.ToPath(); { - RwLock::SharedLockScope __(m_IndexLock); + RwLock::SharedLockScope IndexLock(m_IndexLock); Stopwatch Timer; const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); @@ -1649,47 +1727,21 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8()); continue; } - __.ReleaseNow(); + IndexLock.ReleaseNow(); RwLock::ExclusiveLockScope ValueLock(LockForHash(Key)); if (fs::is_regular_file(FilePath)) { ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); + std::error_code Ec; fs::remove(FilePath, Ec); + if (Ec) + { + ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message()); + continue; + } } } - - if (Ec) - { - ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message()); - Ec.clear(); - DiskLocation RestoreLocation = Loc; - RestoreLocation.Flags &= ~DiskLocation::kTombStone; - - RwLock::ExclusiveLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - if (m_Index.contains(Key)) - { - continue; - } - m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation}); - size_t EntryIndex = m_Payloads.size(); - m_Payloads.emplace_back(BucketPayload{.Location = RestoreLocation}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - if (m_EnableReferenceCaching) - { - m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex); - } - m_Index.insert({Key, EntryIndex}); - m_TotalStandaloneSize.fetch_add(RestoreLocation.Size(), std::memory_order::relaxed); - DeletedChunks.erase(Key); - continue; - } DeletedSize += Entry.Location.Size(); } } @@ -1712,7 +1764,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { continue; } - BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment); + BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment); size_t ChunkIndex = ChunkLocations.size(); ChunkLocations.push_back(Location); ChunkIndexToChunkHash[ChunkIndex] = Entry.first; @@ -1729,13 +1781,14 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); if (!PerformDelete) { - m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); - uint64_t CurrentTotalSize = TotalSize(); - ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} {}", + m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_Configuration.PayloadAlignment, true); + GcStorageSize CurrentTotalSize = StorageSize(); + ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} ({}/{})", m_BucketDir, DeleteCount, TotalChunkCount, - NiceBytes(CurrentTotalSize)); + NiceBytes(CurrentTotalSize.DiskSize), + NiceBytes(CurrentTotalSize.MemorySize)); return; } @@ -1743,7 +1796,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) BlockStoreState, ChunkLocations, KeepChunkIndexes, - m_PayloadAlignment, + m_Configuration.PayloadAlignment, false, [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { std::vector<DiskIndexEntry> LogEntries; @@ -1768,7 +1821,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) // Entry has been updated while GC was running, ignore the move continue; } - Payload.Location = DiskLocation(NewLocation, m_PayloadAlignment, Payload.Location.GetFlags()); + Payload.Location = DiskLocation(NewLocation, m_Configuration.PayloadAlignment, Payload.Location.GetFlags()); LogEntries.push_back({.Key = ChunkHash, .Location = Payload.Location}); } for (const size_t ChunkIndex : RemovedChunks) @@ -1783,9 +1836,16 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) } const DiskLocation& OldDiskLocation = Payload.Location; LogEntries.push_back({.Key = ChunkHash, - .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_PayloadAlignment), - m_PayloadAlignment, + .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment), + m_Configuration.PayloadAlignment, OldDiskLocation.GetFlags() | DiskLocation::kTombStone)}); + if (m_Configuration.MemCacheSizeThreshold > 0 && m_CachedPayloads[PayloadIndex]) + { + uint64_t CachePayloadSize = m_CachedPayloads[PayloadIndex].Size(); + m_MemCachedSize.fetch_sub(CachePayloadSize, std::memory_order::relaxed); + CacheMemoryUsage.fetch_sub(CachePayloadSize, std::memory_order::relaxed); + m_CachedPayloads[PayloadIndex] = IoBuffer{}; + } m_Index.erase(ChunkHash); DeletedChunks.insert(ChunkHash); } @@ -1797,33 +1857,20 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) [&]() { return GcCtx.ClaimGCReserve(); }); } -void -ZenCacheDiskLayer::CacheBucket::UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes) -{ - ZEN_TRACE_CPU("Z$::Disk::Bucket::UpdateAccessTimes"); - - using namespace access_tracking; - - for (const KeyAccessTime& KeyTime : AccessTimes) - { - if (auto It = m_Index.find(KeyTime.Key); It != m_Index.end()) - { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_AccessTimes[EntryIndex] = KeyTime.LastAccess; - } - } -} - ZenCacheDiskLayer::BucketStats ZenCacheDiskLayer::CacheBucket::Stats() { - return ZenCacheDiskLayer::BucketStats{.TotalSize = TotalSize(), - .HitCount = m_HitCount, - .MissCount = m_MissCount, - .WriteCount = m_WriteCount, - .PutOps = m_PutOps.Snapshot(), - .GetOps = m_GetOps.Snapshot()}; + GcStorageSize Size = StorageSize(); + return ZenCacheDiskLayer::BucketStats{.DiskSize = Size.DiskSize, + .MemorySize = Size.MemorySize, + .DiskHitCount = m_DiskHitCount, + .DiskMissCount = m_DiskMissCount, + .DiskWriteCount = m_DiskWriteCount, + .MemoryHitCount = m_MemoryHitCount, + .MemoryMissCount = m_MemoryMissCount, + .MemoryWriteCount = m_MemoryWriteCount, + .PutOps = m_PutOps.Snapshot(), + .GetOps = m_GetOps.Snapshot()}; } uint64_t @@ -1907,27 +1954,16 @@ ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) } for (CacheBucket* Bucket : Buckets) { - Bucket->CollectGarbage(GcCtx); - } -} - -void -ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes) -{ - RwLock::SharedLockScope _(m_Lock); - - for (const auto& Kv : AccessTimes.Buckets) - { - if (auto It = m_Buckets.find(Kv.first); It != m_Buckets.end()) - { - CacheBucket& Bucket = *It->second; - Bucket.UpdateAccessTimes(Kv.second); - } + Bucket->CollectGarbage(GcCtx, m_TotalMemCachedSize); } + MemCacheTrim(Buckets, GcCtx.CacheExpireTime()); } void -ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) +ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + std::atomic_uint64_t& CacheMemoryUsage) { ZEN_TRACE_CPU("Z$::Disk::Bucket::PutStandaloneCacheValue"); @@ -1942,7 +1978,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir)); } - bool CleanUpTempFile = false; + bool CleanUpTempFile = true; auto __ = MakeGuard([&] { if (CleanUpTempFile) { @@ -2042,13 +2078,19 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c DiskLocation Loc(NewFileSize, EntryFlags); RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + ValueLock.ReleaseNow(); + if (auto It = m_Index.find(HashKey); It == m_Index.end()) { // Previously unknown object size_t EntryIndex = m_Payloads.size(); m_Payloads.emplace_back(BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); m_AccessTimes.emplace_back(GcClock::TickCount()); - if (m_EnableReferenceCaching) + if (m_Configuration.MemCacheSizeThreshold > 0) + { + m_CachedPayloads.emplace_back(IoBuffer{}); + } + if (m_Configuration.EnableReferenceCaching) { m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex); SetReferences(IndexLock, m_FirstReferenceIndex.back(), References); @@ -2057,25 +2099,38 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c } else { - // TODO: should check if write is idempotent and bail out if it is? size_t EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); BucketPayload& Payload = m_Payloads[EntryIndex]; + uint64_t OldSize = Payload.Location.Size(); Payload = BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash}; - if (m_EnableReferenceCaching) + if (m_Configuration.EnableReferenceCaching) { SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References); } m_AccessTimes[EntryIndex] = GcClock::TickCount(); - m_TotalStandaloneSize.fetch_sub(Loc.Size(), std::memory_order::relaxed); + if (m_Configuration.MemCacheSizeThreshold > 0) + { + if (m_CachedPayloads[EntryIndex]) + { + uint64_t CachePayloadSize = m_CachedPayloads[EntryIndex].Size(); + m_MemCachedSize.fetch_sub(CachePayloadSize, std::memory_order::relaxed); + CacheMemoryUsage.fetch_sub(CachePayloadSize, std::memory_order::relaxed); + m_CachedPayloads[EntryIndex] = IoBuffer{}; + } + } + m_StandaloneSize.fetch_sub(OldSize, std::memory_order::relaxed); } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - m_TotalStandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed); + m_StandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed); } void -ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) +ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + std::atomic_uint64_t& CacheMemoryUsage) { ZEN_TRACE_CPU("Z$::Disk::Bucket::PutInlineCacheValue"); @@ -2090,38 +2145,73 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const EntryFlags |= DiskLocation::kCompressed; } - m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) { - DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); - m_SlogFile.Append({.Key = HashKey, .Location = Location}); + uint64_t PayloadSize = Value.Value.GetSize(); + const bool MemCacheEnabled = (m_Configuration.MemCacheSizeThreshold > 0); + IoBuffer MemCacheBuffer = (MemCacheEnabled && (PayloadSize <= m_Configuration.MemCacheSizeThreshold)) + ? IoBufferBuilder::ReadFromFileMaybe(Value.Value) + : IoBuffer{}; - RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - if (auto It = m_Index.find(HashKey); It != m_Index.end()) - { - // TODO: should check if write is idempotent and bail out if it is? - // this would requiring comparing contents on disk unless we add a - // content hash to the index entry - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_Payloads[EntryIndex] = (BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); - m_AccessTimes[EntryIndex] = GcClock::TickCount(); - if (m_EnableReferenceCaching) + m_BlockStore.WriteChunk( + Value.Value.Data(), + Value.Value.Size(), + m_Configuration.PayloadAlignment, + [&](const BlockStoreLocation& BlockStoreLocation) { + DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags); + m_SlogFile.Append({.Key = HashKey, .Location = Location}); + + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + if (auto It = m_Index.find(HashKey); It != m_Index.end()) { - SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References); + size_t EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); + m_Payloads[EntryIndex] = (BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); + m_AccessTimes[EntryIndex] = GcClock::TickCount(); + + if (MemCacheEnabled) + { + if (m_CachedPayloads[EntryIndex]) + { + uint64_t OldCachedSize = m_CachedPayloads[EntryIndex].GetSize(); + m_MemCachedSize.fetch_sub(OldCachedSize); + CacheMemoryUsage.fetch_sub(OldCachedSize); + } + + if (MemCacheBuffer) + { + m_MemCachedSize.fetch_add(PayloadSize); + CacheMemoryUsage.fetch_add(PayloadSize); + m_MemoryWriteCount++; + } + m_CachedPayloads[EntryIndex] = std::move(MemCacheBuffer); + } + if (m_Configuration.EnableReferenceCaching) + { + SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References); + } } - } - else - { - size_t EntryIndex = m_Payloads.size(); - m_Payloads.emplace_back(BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - if (m_EnableReferenceCaching) + else { - m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex); - SetReferences(IndexLock, m_FirstReferenceIndex.back(), References); + size_t EntryIndex = m_Payloads.size(); + m_Payloads.emplace_back(BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); + m_AccessTimes.emplace_back(GcClock::TickCount()); + if (MemCacheEnabled) + { + if (MemCacheBuffer) + { + m_MemCachedSize.fetch_add(PayloadSize); + CacheMemoryUsage.fetch_add(PayloadSize); + m_MemoryWriteCount++; + } + m_CachedPayloads.emplace_back(std::move(MemCacheBuffer)); + } + if (m_Configuration.EnableReferenceCaching) + { + m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex); + SetReferences(IndexLock, m_FirstReferenceIndex.back(), References); + } + m_Index.insert_or_assign(HashKey, EntryIndex); } - m_Index.insert_or_assign(HashKey, EntryIndex); - } - }); + }); } void @@ -2286,11 +2376,58 @@ ZenCacheDiskLayer::CacheBucket::LockedGetReferences(std::size_t FirstReferenceIn return true; } +void +ZenCacheDiskLayer::CacheBucket::CompactState(std::vector<BucketPayload>& Payloads, + std::vector<AccessTime>& AccessTimes, + std::vector<IoBuffer>& CachedPayloads, + std::vector<size_t>& FirstReferenceIndex, + IndexMap& Index, + RwLock::ExclusiveLockScope& IndexLock) +{ + size_t EntryCount = m_Index.size(); + Payloads.reserve(EntryCount); + AccessTimes.reserve(EntryCount); + CachedPayloads.reserve(EntryCount); + if (m_Configuration.EnableReferenceCaching) + { + FirstReferenceIndex.reserve(EntryCount); + } + Index.reserve(EntryCount); + for (auto It : m_Index) + { + size_t EntryIndex = Payloads.size(); + Payloads.push_back(m_Payloads[It.second]); + AccessTimes.push_back(m_AccessTimes[It.second]); + if (m_Configuration.MemCacheSizeThreshold > 0) + { + CachedPayloads.push_back(std::move(m_CachedPayloads[It.second])); + } + if (m_Configuration.EnableReferenceCaching) + { + FirstReferenceIndex.push_back(m_FirstReferenceIndex[It.second]); + } + Index.insert({It.first, EntryIndex}); + } + m_Index.swap(Index); + m_Payloads.swap(Payloads); + m_AccessTimes.swap(AccessTimes); + if (m_Configuration.MemCacheSizeThreshold > 0) + { + m_CachedPayloads.swap(CachedPayloads); + } + if (m_Configuration.EnableReferenceCaching) + { + m_FirstReferenceIndex.swap(FirstReferenceIndex); + CompactReferences(IndexLock); + } +} + ////////////////////////////////////////////////////////////////////////// -ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir, bool EnableReferenceCaching) -: m_RootDir(RootDir) -, m_EnableReferenceCaching(EnableReferenceCaching) +ZenCacheDiskLayer::ZenCacheDiskLayer(JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config) +: m_JobQueue(JobQueue) +, m_RootDir(RootDir) +, m_Configuration(Config) { } @@ -2327,7 +2464,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach } else { - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_EnableReferenceCaching)); + auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_Configuration.BucketConfig)); Bucket = InsertResult.first->second.get(); std::filesystem::path BucketPath = m_RootDir; @@ -2342,7 +2479,12 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach } ZEN_ASSERT(Bucket != nullptr); - return Bucket->Get(HashKey, OutValue); + if (Bucket->Get(HashKey, OutValue, m_TotalMemCachedSize)) + { + TryMemCacheTrim(); + return true; + } + return false; } void @@ -2376,7 +2518,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z } else { - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_EnableReferenceCaching)); + auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_Configuration.BucketConfig)); Bucket = InsertResult.first->second.get(); std::filesystem::path BucketPath = m_RootDir; @@ -2401,7 +2543,8 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z ZEN_ASSERT(Bucket != nullptr); - Bucket->Put(HashKey, Value, References); + Bucket->Put(HashKey, Value, References, m_TotalMemCachedSize); + TryMemCacheTrim(); } void @@ -2432,7 +2575,7 @@ ZenCacheDiskLayer::DiscoverBuckets() continue; } - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_EnableReferenceCaching)); + auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_Configuration.BucketConfig)); CacheBucket& Bucket = *InsertResult.first->second; try @@ -2489,7 +2632,7 @@ ZenCacheDiskLayer::DropBucket(std::string_view InBucket) m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It); - return Bucket.Drop(); + return Bucket.Drop(m_TotalMemCachedSize); } // Make sure we remove the folder even if we don't know about the bucket @@ -2511,7 +2654,7 @@ ZenCacheDiskLayer::Drop() CacheBucket& Bucket = *It->second; m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It->first); - if (!Bucket.Drop()) + if (!Bucket.Drop(m_TotalMemCachedSize)) { return false; } @@ -2552,11 +2695,11 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) for (auto& Kv : m_Buckets) { #if 1 - Results.push_back( - Ctx.ThreadPool().EnqueueTask(std::packaged_task<void()>{[Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }})); + Results.push_back(Ctx.ThreadPool().EnqueueTask( + std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx, m_TotalMemCachedSize); }})); #else CacheBucket& Bucket = *Kv.second; - Bucket.ScrubStorage(Ctx); + Bucket.ScrubStorage(Ctx, m_TotalMemCachedSize); #endif } @@ -2587,24 +2730,27 @@ ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx) } } -uint64_t -ZenCacheDiskLayer::TotalSize() const +GcStorageSize +ZenCacheDiskLayer::StorageSize() const { - uint64_t TotalSize{}; - RwLock::SharedLockScope _(m_Lock); + GcStorageSize StorageSize{}; + RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { - TotalSize += Kv.second->TotalSize(); + GcStorageSize BucketSize = Kv.second->StorageSize(); + StorageSize.DiskSize += BucketSize.DiskSize; + StorageSize.MemorySize += BucketSize.MemorySize; } - return TotalSize; + return StorageSize; } ZenCacheDiskLayer::DiskStats ZenCacheDiskLayer::Stats() const { - ZenCacheDiskLayer::DiskStats Stats = {}; + GcStorageSize Size = StorageSize(); + ZenCacheDiskLayer::DiskStats Stats = {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize}; { RwLock::SharedLockScope _(m_Lock); Stats.BucketStats.reserve(m_Buckets.size()); @@ -2619,8 +2765,7 @@ ZenCacheDiskLayer::Stats() const ZenCacheDiskLayer::Info ZenCacheDiskLayer::GetInfo() const { - ZenCacheDiskLayer::Info Info = {.Config = {.RootDir = m_RootDir, .EnableReferenceCaching = m_EnableReferenceCaching}, - .TotalSize = TotalSize()}; + ZenCacheDiskLayer::Info Info = {.RootDir = m_RootDir, .Config = m_Configuration}; { RwLock::SharedLockScope _(m_Lock); Info.BucketNames.reserve(m_Buckets.size()); @@ -2628,6 +2773,9 @@ ZenCacheDiskLayer::GetInfo() const { Info.BucketNames.push_back(Kv.first); Info.EntryCount += Kv.second->EntryCount(); + GcStorageSize BucketSize = Kv.second->StorageSize(); + Info.StorageSize.DiskSize += BucketSize.DiskSize; + Info.StorageSize.MemorySize += BucketSize.MemorySize; } } return Info; @@ -2640,7 +2788,7 @@ ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) { - return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()}; + return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->StorageSize()}; } return {}; } @@ -2677,4 +2825,100 @@ ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const st return Details; } +void +ZenCacheDiskLayer::MemCacheTrim() +{ + ZEN_TRACE_CPU("Z$::Disk::MemCacheTrim"); + + ZEN_ASSERT(m_Configuration.MemCacheTargetFootprintBytes != 0); + + const GcClock::TimePoint Now = GcClock::Now(); + + const GcClock::Tick NowTick = Now.time_since_epoch().count(); + const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); + GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim; + const GcClock::Tick NextAllowedTrimTick = LastTrimTick + GcClock::Duration(TrimInterval).count(); + if (NowTick < NextAllowedTrimTick) + { + return; + } + + bool Expected = false; + if (!m_IsMemCacheTrimming.compare_exchange_strong(Expected, true)) + { + return; + } + + // Bump time forward so we don't keep trying to do m_IsTrimming.compare_exchange_strong + const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); + m_LastTickMemCacheTrim.store(NextTrimTick); + + m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this, Now, TrimInterval](JobContext&) { + ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]"); + + uint64_t StartSize = m_TotalMemCachedSize.load(); + Stopwatch Timer; + const auto Guard = MakeGuard([&] { + uint64_t EndSize = m_TotalMemCachedSize.load(); + ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}", + NiceBytes(StartSize > EndSize ? StartSize - EndSize : 0), + NiceBytes(m_TotalMemCachedSize), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + m_IsMemCacheTrimming.store(false); + }); + + const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds); + + std::vector<uint64_t> UsageSlots; + UsageSlots.reserve(std::chrono::seconds(MaxAge / TrimInterval).count()); + + std::vector<CacheBucket*> Buckets; + { + RwLock::SharedLockScope __(m_Lock); + RwLock::SharedLockScope _(m_Lock); + Buckets.reserve(m_Buckets.size()); + for (auto& Kv : m_Buckets) + { + Buckets.push_back(Kv.second.get()); + } + } + for (CacheBucket* Bucket : Buckets) + { + Bucket->GetUsageByAccess(Now, GcClock::Duration(TrimInterval), UsageSlots); + } + + uint64_t TotalSize = 0; + for (size_t Index = 0; Index < UsageSlots.size(); ++Index) + { + TotalSize += UsageSlots[Index]; + if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes) + { + GcClock::TimePoint ExpireTime = Now - (TrimInterval * Index); + MemCacheTrim(Buckets, ExpireTime); + break; + } + } + }); +} + +void +ZenCacheDiskLayer::MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime) +{ + if (m_Configuration.MemCacheTargetFootprintBytes == 0) + { + return; + } + RwLock::SharedLockScope __(m_Lock); + for (CacheBucket* Bucket : Buckets) + { + Bucket->MemCacheTrim(ExpireTime, m_TotalMemCachedSize); + } + const GcClock::TimePoint Now = GcClock::Now(); + const GcClock::Tick NowTick = Now.time_since_epoch().count(); + const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); + GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim; + const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); + m_LastTickMemCacheTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick); +} + } // namespace zen diff --git a/src/zenserver/cache/cachedisklayer.h b/src/zenserver/cache/cachedisklayer.h index 7e05430a2..ea390f807 100644 --- a/src/zenserver/cache/cachedisklayer.h +++ b/src/zenserver/cache/cachedisklayer.h @@ -17,6 +17,7 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { class IoBuffer; +class JobQueue; #pragma pack(push) #pragma pack(1) @@ -90,32 +91,48 @@ static_assert(sizeof(DiskIndexEntry) == 32); class ZenCacheDiskLayer { public: + struct BucketConfiguration + { + uint64_t MaxBlockSize = 1ull << 30; + uint64_t PayloadAlignment = 1ull << 4; + uint64_t MemCacheSizeThreshold = 1 * 1024; + uint64_t LargeObjectThreshold = 128 * 1024; + bool EnableReferenceCaching = false; + }; + struct Configuration { - std::filesystem::path RootDir; - bool EnableReferenceCaching; + BucketConfiguration BucketConfig; + uint64_t MemCacheTargetFootprintBytes = 512 * 1024 * 1024; + uint64_t MemCacheTrimIntervalSeconds = 60; + uint64_t MemCacheMaxAgeSeconds = gsl::narrow<uint64_t>(std::chrono::seconds(std::chrono::days(1)).count()); }; struct BucketInfo { - uint64_t EntryCount = 0; - uint64_t TotalSize = 0; + uint64_t EntryCount = 0; + GcStorageSize StorageSize; }; struct Info { + std::filesystem::path RootDir; Configuration Config; std::vector<std::string> BucketNames; uint64_t EntryCount = 0; - uint64_t TotalSize = 0; + GcStorageSize StorageSize; }; struct BucketStats { - uint64_t TotalSize; - uint64_t HitCount; - uint64_t MissCount; - uint64_t WriteCount; + uint64_t DiskSize; + uint64_t MemorySize; + uint64_t DiskHitCount; + uint64_t DiskMissCount; + uint64_t DiskWriteCount; + uint64_t MemoryHitCount; + uint64_t MemoryMissCount; + uint64_t MemoryWriteCount; metrics::RequestStatsSnapshot PutOps; metrics::RequestStatsSnapshot GetOps; }; @@ -129,9 +146,11 @@ public: struct DiskStats { std::vector<NamedBucketStats> BucketStats; + uint64_t DiskSize; + uint64_t MemorySize; }; - explicit ZenCacheDiskLayer(const std::filesystem::path& RootDir, bool EnableReferenceCaching); + explicit ZenCacheDiskLayer(JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config); ~ZenCacheDiskLayer(); bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); @@ -142,11 +161,10 @@ public: void ScrubStorage(ScrubContext& Ctx); void GatherReferences(GcContext& GcCtx); void CollectGarbage(GcContext& GcCtx); - void UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes); - void DiscoverBuckets(); - uint64_t TotalSize() const; - DiskStats Stats() const; + void DiscoverBuckets(); + GcStorageSize StorageSize() const; + DiskStats Stats() const; Info GetInfo() const; std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const; @@ -161,38 +179,40 @@ private: */ struct CacheBucket { - CacheBucket(std::string BucketName, bool EnableReferenceCaching); + CacheBucket(std::string BucketName, const BucketConfiguration& Config); ~CacheBucket(); bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); - bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); - bool Drop(); + bool Get(const IoHash& HashKey, ZenCacheValue& OutValue, std::atomic_uint64_t& CacheMemoryUsage); + void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, std::atomic_uint64_t& CacheMemoryUsage); + void MemCacheTrim(GcClock::TimePoint ExpireTime, std::atomic_uint64_t& CacheMemoryUsage); + bool Drop(std::atomic_uint64_t& CacheMemoryUsage); void Flush(); - void ScrubStorage(ScrubContext& Ctx); + void ScrubStorage(ScrubContext& Ctx, std::atomic_uint64_t& CacheMemoryUsage); void GatherReferences(GcContext& GcCtx); - void CollectGarbage(GcContext& GcCtx); - void UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes); + void CollectGarbage(GcContext& GcCtx, std::atomic_uint64_t& CacheMemoryUsage); - inline uint64_t TotalSize() const { return m_TotalStandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(); } - uint64_t EntryCount() const; - BucketStats Stats(); + inline GcStorageSize StorageSize() const + { + return {.DiskSize = m_StandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(), + .MemorySize = m_MemCachedSize.load(std::memory_order::relaxed)}; + } + uint64_t EntryCount() const; + BucketStats Stats(); CacheValueDetails::BucketDetails GetValueDetails(const std::string_view ValueFilter) const; void EnumerateBucketContents(std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const; - private: - const uint64_t MaxBlockSize = 1ull << 30; - uint64_t m_PayloadAlignment = 1ull << 4; + void GetUsageByAccess(GcClock::TimePoint TickStart, GcClock::Duration SectionLength, std::vector<uint64_t>& InOutUsageSlots); + private: std::string m_BucketName; std::filesystem::path m_BucketDir; std::filesystem::path m_BlocksBasePath; + BucketConfiguration m_Configuration; BlockStore m_BlockStore; Oid m_BucketId; - uint64_t m_LargeObjectThreshold = 128 * 1024; std::atomic_bool m_IsFlushing{}; - const bool m_EnableReferenceCaching = false; // These files are used to manage storage of small objects for this bucket @@ -216,9 +236,12 @@ private: using IndexMap = tsl::robin_map<IoHash, size_t, IoHash::Hasher>; - std::atomic<uint64_t> m_HitCount; - std::atomic<uint64_t> m_MissCount; - std::atomic<uint64_t> m_WriteCount; + std::atomic<uint64_t> m_DiskHitCount; + std::atomic<uint64_t> m_DiskMissCount; + std::atomic<uint64_t> m_DiskWriteCount; + std::atomic<uint64_t> m_MemoryHitCount; + std::atomic<uint64_t> m_MemoryMissCount; + std::atomic<uint64_t> m_MemoryWriteCount; metrics::RequestStats m_PutOps; metrics::RequestStats m_GetOps; @@ -226,16 +249,24 @@ private: IndexMap m_Index; std::vector<AccessTime> m_AccessTimes; std::vector<BucketPayload> m_Payloads; + std::vector<IoBuffer> m_CachedPayloads; std::vector<size_t> m_FirstReferenceIndex; std::vector<IoHash> m_ReferenceHashes; std::vector<size_t> m_NextReferenceHashesIndexes; size_t m_ReferenceCount = 0; - std::atomic_uint64_t m_TotalStandaloneSize{}; + std::atomic_uint64_t m_StandaloneSize{}; + std::atomic_uint64_t m_MemCachedSize{}; void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const; - void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); + void PutStandaloneCacheValue(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + std::atomic_uint64_t& CacheMemoryUsage); IoBuffer GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const; - void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); + void PutInlineCacheValue(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + std::atomic_uint64_t& CacheMemoryUsage); IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const; void MakeIndexSnapshot(); uint64_t ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion); @@ -257,6 +288,14 @@ private: } size_t AllocateReferenceEntry(RwLock::ExclusiveLockScope&, const IoHash& Key); bool LockedGetReferences(std::size_t FirstReferenceIndex, std::vector<IoHash>& OutReferences) const; + + void CompactState(std::vector<BucketPayload>& TmpPayloads, + std::vector<AccessTime>& TmpAccessTimes, + std::vector<IoBuffer>& TmpCachedPayloads, + std::vector<size_t>& TmpFirstReferenceIndex, + IndexMap& TmpIndex, + RwLock::ExclusiveLockScope& IndexLock); + // These locks are here to avoid contention on file creation, therefore it's sufficient // that we take the same lock for the same hash // @@ -267,11 +306,34 @@ private: inline RwLock& LockForHash(const IoHash& Hash) const { return m_ShardedLocks[Hash.Hash[19]]; } }; + inline void TryMemCacheTrim() + { + if (m_Configuration.MemCacheTargetFootprintBytes == 0) + { + return; + } + if (m_Configuration.MemCacheMaxAgeSeconds == 0 || m_Configuration.MemCacheTrimIntervalSeconds == 0) + { + return; + } + if (m_TotalMemCachedSize <= m_Configuration.MemCacheTargetFootprintBytes) + { + return; + } + MemCacheTrim(); + } + void MemCacheTrim(); + void MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime); + + JobQueue& m_JobQueue; std::filesystem::path m_RootDir; + Configuration m_Configuration; + std::atomic_uint64_t m_TotalMemCachedSize{}; + std::atomic_bool m_IsMemCacheTrimming = false; + std::atomic<GcClock::Tick> m_LastTickMemCacheTrim; mutable RwLock m_Lock; - std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets; // TODO: make this case insensitive + std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets; std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets; - const bool m_EnableReferenceCaching; ZenCacheDiskLayer(const ZenCacheDiskLayer&) = delete; ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete; diff --git a/src/zenserver/cache/cachememorylayer.cpp b/src/zenserver/cache/cachememorylayer.cpp deleted file mode 100644 index b62791974..000000000 --- a/src/zenserver/cache/cachememorylayer.cpp +++ /dev/null @@ -1,521 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "cachememorylayer.h" - -#include <zencore/compactbinaryvalidation.h> -#include <zencore/compress.h> -#include <zencore/fmtutils.h> -#include <zencore/jobqueue.h> -#include <zencore/scopeguard.h> -#include <zencore/trace.h> - -////////////////////////////////////////////////////////////////////////// - -namespace zen { - -ZenCacheMemoryLayer::ZenCacheMemoryLayer(JobQueue& JobQueue, const Configuration& Config) -: m_JobQueue(JobQueue) -, m_Configuration(Config) -, m_LastTickTrim(GcClock::Clock::time_point::min().time_since_epoch().count()) -{ -} - -ZenCacheMemoryLayer::~ZenCacheMemoryLayer() -{ -} - -bool -ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) -{ - if (m_Configuration.TargetFootprintBytes == 0) - { - return false; - } - ZEN_TRACE_CPU("Z$::Mem::Get"); - - RwLock::SharedLockScope _(m_Lock); - - auto It = m_Buckets.find(std::string(InBucket)); - - if (It == m_Buckets.end()) - { - return false; - } - - CacheBucket* Bucket = It->second.get(); - - _.ReleaseNow(); - - // There's a race here. Since the lock is released early to allow - // inserts, the bucket delete path could end up deleting the - // underlying data structure - - Trim(); - - return Bucket->Get(HashKey, OutValue); -} - -void -ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) -{ - if (m_Configuration.TargetFootprintBytes == 0) - { - return; - } - ZEN_TRACE_CPU("Z$::Mem::Put"); - - const auto BucketName = std::string(InBucket); - CacheBucket* Bucket = nullptr; - - { - RwLock::SharedLockScope _(m_Lock); - - if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - } - - if (Bucket == nullptr) - { - // New bucket - - RwLock::ExclusiveLockScope _(m_Lock); - - if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - else - { - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>()); - Bucket = InsertResult.first->second.get(); - } - } - - // Note that since the underlying IoBuffer is retained, the content type is also - int64_t Diff = Bucket->Put(HashKey, Value); - - if (Diff > 0) - { - m_TotalSize.fetch_add(static_cast<uint64_t>(Diff)); - Trim(); - } - else if (Diff < 0) - { - m_TotalSize.fetch_sub(static_cast<uint64_t>(-Diff)); - } -} - -bool -ZenCacheMemoryLayer::DropBucket(std::string_view InBucket) -{ - RwLock::ExclusiveLockScope _(m_Lock); - - auto It = m_Buckets.find(std::string(InBucket)); - - if (It != m_Buckets.end()) - { - CacheBucket& Bucket = *It->second; - m_TotalSize.fetch_sub(Bucket.TotalSize()); - m_DroppedBuckets.push_back(std::move(It->second)); - m_Buckets.erase(It); - Bucket.Drop(); - return true; - } - return false; -} - -void -ZenCacheMemoryLayer::Drop() -{ - RwLock::ExclusiveLockScope _(m_Lock); - - while (!m_Buckets.empty()) - { - const auto& It = m_Buckets.begin(); - CacheBucket& Bucket = *It->second; - m_TotalSize.fetch_sub(Bucket.TotalSize()); - m_DroppedBuckets.push_back(std::move(It->second)); - m_Buckets.erase(It->first); - Bucket.Drop(); - } -} - -void -ZenCacheMemoryLayer::ScrubStorage(ScrubContext& Ctx) -{ - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - Kv.second->ScrubStorage(Ctx); - } -} - -void -ZenCacheMemoryLayer::GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes) -{ - using namespace zen::access_tracking; - - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - std::vector<KeyAccessTime>& Bucket = AccessTimes.Buckets[Kv.first]; - Kv.second->GatherAccessTimes(Bucket); - } -} - -uint64_t -ZenCacheMemoryLayer::CollectGarbage(GcClock::TimePoint ExpireTime) -{ - uint64_t TrimmedSize = 0; - RwLock::SharedLockScope __(m_Lock); - for (auto& Kv : m_Buckets) - { - uint64_t BucketTrimmedSize = Kv.second->Trim(ExpireTime); - if (BucketTrimmedSize > 0) - { - m_TotalSize.fetch_sub(BucketTrimmedSize); - TrimmedSize += BucketTrimmedSize; - } - } - const GcClock::TimePoint Now = GcClock::Now(); - const GcClock::Tick NowTick = Now.time_since_epoch().count(); - const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.TrimIntervalSeconds); - GcClock::Tick LastTrimTick = m_LastTickTrim; - const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); - m_LastTickTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick); - return TrimmedSize; -} - -void -ZenCacheMemoryLayer::Trim() -{ - if (m_TotalSize <= m_Configuration.TargetFootprintBytes) - { - return; - } - if (m_Configuration.MaxAgeSeconds == 0 || m_Configuration.TrimIntervalSeconds == 0) - { - return; - } - - const GcClock::TimePoint Now = GcClock::Now(); - - const GcClock::Tick NowTick = Now.time_since_epoch().count(); - const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.TrimIntervalSeconds); - GcClock::Tick LastTrimTick = m_LastTickTrim; - const GcClock::Tick NextAllowedTrimTick = LastTrimTick + GcClock::Duration(TrimInterval).count(); - if (NowTick < NextAllowedTrimTick) - { - return; - } - - bool Expected = false; - if (!m_IsTrimming.compare_exchange_strong(Expected, true)) - { - return; - } - - // Bump time forward so we don't keep trying to do m_IsTrimming.compare_exchange_strong - const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); - m_LastTickTrim.store(NextTrimTick); - - m_JobQueue.QueueJob("ZenCacheMemoryLayer::Trim", [this, Now, TrimInterval](JobContext&) { - ZEN_TRACE_CPU("Z$::Mem::Trim"); - - Stopwatch Timer; - uint64_t TrimmedSize = 0; - const auto Guard = MakeGuard([&] { - if (TrimmedSize > 0) - { - ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}", - NiceBytes(TrimmedSize), - NiceBytes(m_TotalSize), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - } - m_IsTrimming.store(false); - }); - - const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MaxAgeSeconds); - - std::vector<uint64_t> UsageSlots; - UsageSlots.reserve(std::chrono::seconds(MaxAge / TrimInterval).count()); - { - RwLock::SharedLockScope __(m_Lock); - for (auto& Kv : m_Buckets) - { - Kv.second->GetUsageByAccess(Now, GcClock::Duration(TrimInterval), UsageSlots); - } - } - uint64_t TotalSize = 0; - for (size_t Index = 0; Index < UsageSlots.size(); ++Index) - { - TotalSize += UsageSlots[Index]; - if (TotalSize >= m_Configuration.TargetFootprintBytes) - { - GcClock::TimePoint ExpireTime = Now - (TrimInterval * Index); - TrimmedSize = CollectGarbage(ExpireTime); - break; - } - } - }); -} - -uint64_t -ZenCacheMemoryLayer::TotalSize() const -{ - uint64_t TotalSize{}; - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - TotalSize += Kv.second->TotalSize(); - } - - return TotalSize; -} - -ZenCacheMemoryLayer::Info -ZenCacheMemoryLayer::GetInfo() const -{ - ZenCacheMemoryLayer::Info Info = {.Config = m_Configuration, .TotalSize = TotalSize()}; - - RwLock::SharedLockScope _(m_Lock); - Info.BucketNames.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) - { - Info.BucketNames.push_back(Kv.first); - Info.EntryCount += Kv.second->EntryCount(); - } - return Info; -} - -std::optional<ZenCacheMemoryLayer::BucketInfo> -ZenCacheMemoryLayer::GetBucketInfo(std::string_view Bucket) const -{ - RwLock::SharedLockScope _(m_Lock); - - if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) - { - return ZenCacheMemoryLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()}; - } - return {}; -} - -void -ZenCacheMemoryLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) -{ - RwLock::SharedLockScope _(m_BucketLock); - - std::vector<IoHash> BadHashes; - - auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) { - if (ContentType == ZenContentType::kCbObject) - { - CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); - return Error == CbValidateError::None; - } - if (ContentType == ZenContentType::kCompressedBinary) - { - IoHash RawHash; - uint64_t RawSize; - if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) - { - return false; - } - if (Hash != RawHash) - { - return false; - } - } - return true; - }; - - for (auto& Kv : m_CacheMap) - { - const BucketPayload& Payload = m_Payloads[Kv.second]; - if (!ValidateEntry(Kv.first, Payload.Payload.GetContentType(), Payload.Payload)) - { - BadHashes.push_back(Kv.first); - } - } - - if (!BadHashes.empty()) - { - Ctx.ReportBadCidChunks(BadHashes); - } -} - -void -ZenCacheMemoryLayer::CacheBucket::GatherAccessTimes(std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes) -{ - RwLock::SharedLockScope _(m_BucketLock); - std::transform(m_CacheMap.begin(), m_CacheMap.end(), std::back_inserter(AccessTimes), [this](const auto& Kv) { - return access_tracking::KeyAccessTime{.Key = Kv.first, .LastAccess = m_AccessTimes[Kv.second]}; - }); -} - -bool -ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) -{ - ZEN_TRACE_CPU("Z$::Mem::Bucket::Get"); - - metrics::OperationTiming::Scope $(m_GetOps); - - RwLock::SharedLockScope _(m_BucketLock); - - if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end()) - { - uint32_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size()); - - const BucketPayload& Payload = m_Payloads[EntryIndex]; - OutValue = {.Value = Payload.Payload, .RawSize = Payload.RawSize, .RawHash = Payload.RawHash}; - m_AccessTimes[EntryIndex] = GcClock::TickCount(); - - return true; - } - - return false; -} - -int64_t -ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) -{ - ZEN_TRACE_CPU("Z$::Mem::Bucket::Put"); - - metrics::OperationTiming::Scope $(m_PutOps); - - size_t PayloadSize = Value.Value.GetSize(); - uint64_t OldPayloadSize = 0; - - { - GcClock::Tick AccessTime = GcClock::TickCount(); - RwLock::ExclusiveLockScope _(m_BucketLock); - if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end()) - { - uint32_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - - BucketPayload& Payload = m_Payloads[EntryIndex]; - OldPayloadSize = Payload.Payload.GetSize(); - Payload.Payload = IoBufferBuilder::ReadFromFileMaybe(Value.Value); - Payload.RawHash = Value.RawHash; - Payload.RawSize = gsl::narrow<uint32_t>(Value.RawSize); - m_AccessTimes[EntryIndex] = AccessTime; - } - else if (m_CacheMap.size() == std::numeric_limits<uint32_t>::max()) - { - // No more space in our memory cache! - return 0; - } - else - { - uint32_t EntryIndex = gsl::narrow<uint32_t>(m_Payloads.size()); - m_Payloads.emplace_back(BucketPayload{.Payload = IoBufferBuilder::ReadFromFileMaybe(Value.Value), - .RawSize = gsl::narrow<uint32_t>(Value.RawSize), - .RawHash = Value.RawHash}); - m_AccessTimes.emplace_back(AccessTime); - m_CacheMap.insert_or_assign(HashKey, EntryIndex); - } - ZEN_ASSERT_SLOW(m_Payloads.size() == m_CacheMap.size()); - ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size()); - } - - if (PayloadSize > OldPayloadSize) - { - m_TotalSize.fetch_add(PayloadSize - OldPayloadSize); - return PayloadSize - OldPayloadSize; - } - else if (PayloadSize < OldPayloadSize) - { - m_TotalSize.fetch_sub(OldPayloadSize - PayloadSize); - return -static_cast<int64_t>(OldPayloadSize - PayloadSize); - } - return 0; -} - -void -ZenCacheMemoryLayer::CacheBucket::Drop() -{ - RwLock::ExclusiveLockScope _(m_BucketLock); - m_CacheMap.clear(); - m_AccessTimes.clear(); - m_Payloads.clear(); - m_TotalSize.store(0); -} - -uint64_t -ZenCacheMemoryLayer::CacheBucket::Trim(GcClock::TimePoint ExpireTime) -{ - std::vector<AccessTime> AccessTimes; - std::vector<BucketPayload> Payloads; - tsl::robin_map<IoHash, uint32_t> CacheMap; - - size_t TrimmedSize = 0; - GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); - - RwLock::ExclusiveLockScope _(m_BucketLock); - { - AccessTimes.reserve(m_CacheMap.size()); - Payloads.reserve(m_CacheMap.size()); - CacheMap.reserve(m_CacheMap.size()); - - for (const auto& Kv : m_CacheMap) - { - if (m_AccessTimes[Kv.second] < ExpireTicks) - { - size_t PayloadSize = m_Payloads[Kv.second].Payload.GetSize(); - m_TotalSize.fetch_sub(PayloadSize); - TrimmedSize += PayloadSize; - continue; - } - size_t Index = gsl::narrow<uint32_t>(Payloads.size()); - Payloads.emplace_back(m_Payloads[Kv.second]); - AccessTimes.push_back(m_AccessTimes[Kv.second]); - CacheMap.insert_or_assign(Kv.first, Index); - } - - m_AccessTimes.swap(AccessTimes); - m_Payloads.swap(Payloads); - m_CacheMap.swap(CacheMap); - } - return TrimmedSize; -} - -uint64_t -ZenCacheMemoryLayer::CacheBucket::EntryCount() const -{ - RwLock::SharedLockScope _(m_BucketLock); - return static_cast<uint64_t>(m_CacheMap.size()); -} - -void -ZenCacheMemoryLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart, - GcClock::Duration SectionLength, - std::vector<uint64_t>& InOutUsageSlots) -{ - RwLock::SharedLockScope _(m_BucketLock); - for (const auto& It : m_CacheMap) - { - uint32_t Index = It.second; - GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index])); - GcClock::Duration Age = TickStart.time_since_epoch() - ItemAccessTime.time_since_epoch(); - uint64_t Slot = gsl::narrow<uint64_t>(Age.count() > 0 ? Age.count() / SectionLength.count() : 0); - if (Slot >= InOutUsageSlots.capacity()) - { - Slot = InOutUsageSlots.capacity() - 1; - } - if (Slot > InOutUsageSlots.size()) - { - InOutUsageSlots.resize(uint64_t(Slot + 1), 0); - } - InOutUsageSlots[Slot] += m_Payloads[Index].Payload.GetSize(); - } -} - -} // namespace zen diff --git a/src/zenserver/cache/cachememorylayer.h b/src/zenserver/cache/cachememorylayer.h deleted file mode 100644 index f15fe241b..000000000 --- a/src/zenserver/cache/cachememorylayer.h +++ /dev/null @@ -1,124 +0,0 @@ - -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include "cacheshared.h" - -#include <zencore/iohash.h> -#include <zencore/stats.h> -#include <zenstore/gc.h> - -#include <inttypes.h> -#include <string_view> -#include <vector> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <tsl/robin_map.h> -ZEN_THIRD_PARTY_INCLUDES_END - -namespace zen { - -class JobQueue; - -/** In-memory cache storage - - Intended for small values which are frequently accessed - - This should have a better memory management policy to maintain reasonable - footprint. - */ -class ZenCacheMemoryLayer -{ -public: - struct Configuration - { - uint64_t TargetFootprintBytes = 512 * 1024 * 1024; - uint64_t TrimIntervalSeconds = 60; - uint64_t MaxAgeSeconds = gsl::narrow<uint64_t>(std::chrono::seconds(std::chrono::days(1)).count()); - }; - - struct BucketInfo - { - uint64_t EntryCount = 0; - uint64_t TotalSize = 0; - }; - - struct Info - { - Configuration Config; - std::vector<std::string> BucketNames; - uint64_t EntryCount = 0; - uint64_t TotalSize = 0; - }; - - ZenCacheMemoryLayer(JobQueue& JobQueue, const Configuration& Config); - ~ZenCacheMemoryLayer(); - - bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); - uint64_t CollectGarbage(GcClock::TimePoint ExpireTime); - void Drop(); - bool DropBucket(std::string_view Bucket); - void ScrubStorage(ScrubContext& Ctx); - void GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes); - uint64_t TotalSize() const; - - Info GetInfo() const; - std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const; - - const Configuration& GetConfiguration() const { return m_Configuration; } - void SetConfiguration(const Configuration& NewConfig) { m_Configuration = NewConfig; } - -private: - void Trim(); - - struct CacheBucket - { -#pragma pack(push) -#pragma pack(1) - struct BucketPayload - { - IoBuffer Payload; // 8 - uint32_t RawSize; // 4 - IoHash RawHash; // 20 - }; -#pragma pack(pop) - static_assert(sizeof(BucketPayload) == 32u); - static_assert(sizeof(AccessTime) == 4u); - - metrics::OperationTiming m_PutOps; - metrics::OperationTiming m_GetOps; - - mutable RwLock m_BucketLock; - std::vector<AccessTime> m_AccessTimes; - std::vector<BucketPayload> m_Payloads; - tsl::robin_map<IoHash, uint32_t> m_CacheMap; - - std::atomic_uint64_t m_TotalSize{}; - - bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); - int64_t Put(const IoHash& HashKey, const ZenCacheValue& Value); - uint64_t Trim(GcClock::TimePoint ExpireTime); - void Drop(); - void ScrubStorage(ScrubContext& Ctx); - void GatherAccessTimes(std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes); - inline uint64_t TotalSize() const { return m_TotalSize; } - uint64_t EntryCount() const; - void GetUsageByAccess(GcClock::TimePoint TickStart, GcClock::Duration SectionLength, std::vector<uint64_t>& InOutUsageSlots); - }; - - JobQueue& m_JobQueue; - mutable RwLock m_Lock; - std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets; - std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets; - Configuration m_Configuration; - std::atomic_uint64_t m_TotalSize{}; - std::atomic_bool m_IsTrimming = false; - std::atomic<GcClock::Tick> m_LastTickTrim; - - ZenCacheMemoryLayer(const ZenCacheMemoryLayer&) = delete; - ZenCacheMemoryLayer& operator=(const ZenCacheMemoryLayer&) = delete; -}; - -} // namespace zen diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 91f895b0b..d8bc00c03 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -366,7 +366,7 @@ HttpStructuredCacheService::ScrubStorage(ScrubContext& Ctx) ZenCacheStore::Info Info = m_CacheStore.GetInfo(); - ZEN_INFO("scrubbing '{}'", Info.Config.BasePath); + ZEN_INFO("scrubbing '{}'", Info.BasePath); m_LastScrubTime = Ctx.ScrubTimestamp(); @@ -712,7 +712,7 @@ HttpStructuredCacheService::HandleCacheRequest(HttpServerRequest& Request) ResponseWriter.BeginObject("Configuration"); { ExtendableStringBuilder<128> BasePathString; - BasePathString << Info.Config.BasePath.u8string(); + BasePathString << Info.BasePath.u8string(); ResponseWriter.AddString("BasePath"sv, BasePathString.ToView()); ResponseWriter.AddBool("AllowAutomaticCreationOfNamespaces", Info.Config.AllowAutomaticCreationOfNamespaces); ResponseWriter.BeginObject("Logging"); @@ -742,7 +742,6 @@ HttpStructuredCacheService::HandleCacheRequest(HttpServerRequest& Request) ResponseWriter.EndObject(); ResponseWriter.AddInteger("DiskEntryCount", Info.DiskEntryCount); - ResponseWriter.AddInteger("MemoryEntryCount", Info.MemoryEntryCount); return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); } @@ -772,9 +771,16 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque ResponseWriter.BeginObject("Configuration"); { ExtendableStringBuilder<128> BasePathString; - BasePathString << Info->Config.RootDir.u8string(); + BasePathString << Info->RootDir.u8string(); ResponseWriter.AddString("RootDir"sv, BasePathString.ToView()); - ResponseWriter.AddInteger("DiskLayerThreshold"sv, Info->Config.DiskLayerThreshold); + ResponseWriter.AddInteger("MaxBlockSize"sv, Info->Config.DiskLayerConfig.BucketConfig.MaxBlockSize); + ResponseWriter.AddInteger("PayloadAlignment"sv, Info->Config.DiskLayerConfig.BucketConfig.PayloadAlignment); + ResponseWriter.AddInteger("MemCacheSizeThreshold"sv, Info->Config.DiskLayerConfig.BucketConfig.MemCacheSizeThreshold); + ResponseWriter.AddInteger("LargeObjectThreshold"sv, Info->Config.DiskLayerConfig.BucketConfig.LargeObjectThreshold); + ResponseWriter.AddInteger("MemCacheTargetFootprintBytes"sv, Info->Config.DiskLayerConfig.MemCacheTargetFootprintBytes); + ResponseWriter.AddInteger("MemCacheTrimIntervalSeconds"sv, Info->Config.DiskLayerConfig.MemCacheTrimIntervalSeconds); + ResponseWriter.AddInteger("MemCacheMaxAgeSeconds"sv, Info->Config.DiskLayerConfig.MemCacheMaxAgeSeconds); + ResponseWriter.AddBool("EnableReferenceCaching"sv, Info->Config.DiskLayerConfig.BucketConfig.EnableReferenceCaching); } ResponseWriter.EndObject(); @@ -791,13 +797,12 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque ResponseWriter.BeginObject("StorageSize"sv); { - ResponseWriter.AddInteger("DiskSize"sv, Info->DiskLayerInfo.TotalSize); - ResponseWriter.AddInteger("MemorySize"sv, Info->MemoryLayerInfo.TotalSize); + ResponseWriter.AddInteger("DiskSize"sv, Info->DiskLayerInfo.StorageSize.DiskSize); + ResponseWriter.AddInteger("MemorySize"sv, Info->DiskLayerInfo.StorageSize.MemorySize); } ResponseWriter.EndObject(); - ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount); - ResponseWriter.AddInteger("MemoryEntryCount", Info->MemoryLayerInfo.EntryCount); + ResponseWriter.AddInteger("EntryCount", Info->DiskLayerInfo.EntryCount); return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); } @@ -842,13 +847,12 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, ResponseWriter.BeginObject("StorageSize"); { - ResponseWriter.AddInteger("DiskSize", Info->DiskLayerInfo.TotalSize); - ResponseWriter.AddInteger("MemorySize", Info->MemoryLayerInfo.TotalSize); + ResponseWriter.AddInteger("DiskSize", Info->DiskLayerInfo.StorageSize.DiskSize); + ResponseWriter.AddInteger("MemorySize", Info->DiskLayerInfo.StorageSize.MemorySize); } ResponseWriter.EndObject(); ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount); - ResponseWriter.AddInteger("MemoryEntryCount", Info->MemoryLayerInfo.EntryCount); return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); } @@ -3362,6 +3366,12 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) Cbo << "hit_ratio" << (NamespaceTotal > 0 ? (double(NamespaceStats.Stats.HitCount) / double(NamespaceTotal)) : 0.0); EmitSnapshot("read", NamespaceStats.Stats.GetOps, Cbo); EmitSnapshot("write", NamespaceStats.Stats.PutOps, Cbo); + Cbo.BeginObject("size"); + { + Cbo << "disk" << NamespaceStats.Stats.DiskStats.DiskSize; + Cbo << "memory" << NamespaceStats.Stats.DiskStats.MemorySize; + } + Cbo.EndObject(); if (!NamespaceStats.Stats.DiskStats.BucketStats.empty()) { Cbo.BeginArray("buckets"); @@ -3369,24 +3379,50 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) { Cbo.BeginObject(); Cbo.AddString("bucket", BucketStats.BucketName); - if (BucketStats.Stats.TotalSize == 0 && BucketStats.Stats.HitCount == 0 && BucketStats.Stats.MissCount == 0 && - BucketStats.Stats.WriteCount == 0) + if (BucketStats.Stats.DiskSize != 0 || BucketStats.Stats.MemorySize != 0) { + Cbo.BeginObject("size"); + { + Cbo << "disk" << BucketStats.Stats.DiskSize; + Cbo << "memory" << BucketStats.Stats.MemorySize; + } Cbo.EndObject(); - continue; } - Cbo << "size" << BucketStats.Stats.TotalSize; - const uint64_t BucketTotal = BucketStats.Stats.HitCount + BucketStats.Stats.MissCount; - if (BucketTotal == 0 && BucketStats.Stats.WriteCount == 0) + + if (BucketStats.Stats.DiskSize == 0 && BucketStats.Stats.DiskHitCount == 0 && + BucketStats.Stats.DiskMissCount == 0 && BucketStats.Stats.DiskWriteCount == 0 && + BucketStats.Stats.MemoryHitCount == 0 && BucketStats.Stats.MemoryMissCount == 0 && + BucketStats.Stats.MemoryWriteCount == 0) { Cbo.EndObject(); continue; } - Cbo << "hits" << BucketStats.Stats.HitCount << "misses" << BucketStats.Stats.MissCount << "writes" - << BucketStats.Stats.WriteCount; - Cbo << "hit_ratio" << (BucketTotal > 0 ? (double(BucketStats.Stats.HitCount) / double(BucketTotal)) : 0.0); - EmitSnapshot("read", BucketStats.Stats.GetOps, Cbo); - EmitSnapshot("write", BucketStats.Stats.PutOps, Cbo); + + const uint64_t BucketDiskTotal = BucketStats.Stats.DiskHitCount + BucketStats.Stats.DiskMissCount; + if (BucketDiskTotal != 0 || BucketStats.Stats.DiskWriteCount != 0) + { + Cbo << "hits" << BucketStats.Stats.DiskHitCount << "misses" << BucketStats.Stats.DiskMissCount << "writes" + << BucketStats.Stats.DiskWriteCount; + Cbo << "hit_ratio" + << (BucketDiskTotal > 0 ? (double(BucketStats.Stats.DiskHitCount) / double(BucketDiskTotal)) : 0.0); + } + + const uint64_t BucketMemoryTotal = BucketStats.Stats.MemoryHitCount + BucketStats.Stats.MemoryMissCount; + if (BucketMemoryTotal != 0 || BucketStats.Stats.MemoryWriteCount != 0) + { + Cbo << "mem_hits" << BucketStats.Stats.MemoryHitCount << "mem_misses" << BucketStats.Stats.MemoryMissCount + << "mem_writes" << BucketStats.Stats.MemoryWriteCount; + Cbo << "mem_hit_ratio" + << (BucketMemoryTotal > 0 ? (double(BucketStats.Stats.MemoryHitCount) / double(BucketMemoryTotal)) + : 0.0); + } + + if (BucketDiskTotal != 0 || BucketStats.Stats.DiskWriteCount != 0 || BucketMemoryTotal != 0 || + BucketStats.Stats.MemoryWriteCount != 0) + { + EmitSnapshot("read", BucketStats.Stats.GetOps, Cbo); + EmitSnapshot("write", BucketStats.Stats.PutOps, Cbo); + } Cbo.EndObject(); } diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp index 77316a5dc..89123a70f 100644 --- a/src/zenserver/cache/structuredcachestore.cpp +++ b/src/zenserver/cache/structuredcachestore.cpp @@ -58,19 +58,15 @@ IsKnownBadBucketName(std::string_view Bucket) return false; } -ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, - JobQueue& JobQueue, - const std::filesystem::path& RootDir, - bool EnableReferenceCaching, - const ZenCacheMemoryLayer::Configuration MemLayerConfig) +ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config) : m_Gc(Gc) -, m_RootDir(RootDir) , m_JobQueue(JobQueue) -, m_MemLayer(m_JobQueue, MemLayerConfig) -, m_DiskLayer(RootDir, EnableReferenceCaching) +, m_RootDir(RootDir) +, m_Configuration(Config) +, m_DiskLayer(m_JobQueue, m_RootDir, m_Configuration.DiskLayerConfig) { - ZEN_INFO("initializing structured cache at '{}'", RootDir); - CreateDirectories(RootDir); + ZEN_INFO("initializing structured cache at '{}'", m_RootDir); + CreateDirectories(m_RootDir); m_DiskLayer.DiscoverBuckets(); @@ -91,27 +87,13 @@ ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach metrics::RequestStats::Scope StatsScope(m_GetOps, 0); - bool Ok = m_MemLayer.Get(InBucket, HashKey, OutValue); - - if (Ok) - { - ZEN_ASSERT(OutValue.Value.Size()); - StatsScope.SetBytes(OutValue.Value.Size()); - m_HitCount++; - return true; - } - - Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue); + bool Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue); if (Ok) { ZEN_ASSERT(OutValue.Value.Size()); StatsScope.SetBytes(OutValue.Value.Size()); - if (OutValue.Value.Size() <= m_DiskLayerSizeThreshold) - { - m_MemLayer.Put(InBucket, HashKey, OutValue); - } m_HitCount++; return true; } @@ -132,11 +114,6 @@ ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const Z ZEN_ASSERT(Value.Value.Size()); m_DiskLayer.Put(InBucket, HashKey, Value, References); - - if (Value.Value.Size() <= m_DiskLayerSizeThreshold) - { - m_MemLayer.Put(InBucket, HashKey, Value); - } m_WriteCount++; } @@ -145,15 +122,11 @@ ZenCacheNamespace::DropBucket(std::string_view Bucket) { ZEN_INFO("dropping bucket '{}'", Bucket); - // TODO: should ensure this is done atomically across all layers + const bool Dropped = m_DiskLayer.DropBucket(Bucket); - const bool MemDropped = m_MemLayer.DropBucket(Bucket); - const bool DiskDropped = m_DiskLayer.DropBucket(Bucket); - const bool AnyDropped = MemDropped || DiskDropped; + ZEN_INFO("bucket '{}' was {}", Bucket, Dropped ? "dropped" : "not found"); - ZEN_INFO("bucket '{}' was {}", Bucket, AnyDropped ? "dropped" : "not found"); - - return AnyDropped; + return Dropped; } void @@ -166,7 +139,6 @@ ZenCacheNamespace::EnumerateBucketContents(std::string_view bool ZenCacheNamespace::Drop() { - m_MemLayer.Drop(); return m_DiskLayer.Drop(); } @@ -189,7 +161,6 @@ ZenCacheNamespace::ScrubStorage(ScrubContext& Ctx) m_LastScrubTime = Ctx.ScrubTimestamp(); m_DiskLayer.ScrubStorage(Ctx); - m_MemLayer.ScrubStorage(Ctx); } void @@ -201,10 +172,6 @@ ZenCacheNamespace::GatherReferences(GcContext& GcCtx) const auto Guard = MakeGuard([&] { ZEN_DEBUG("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - access_tracking::AccessTimes AccessTimes; - m_MemLayer.GatherAccessTimes(AccessTimes); - - m_DiskLayer.UpdateAccessTimes(AccessTimes); m_DiskLayer.GatherReferences(GcCtx); } @@ -213,31 +180,24 @@ ZenCacheNamespace::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::Namespace::CollectGarbage"); - (void)m_MemLayer.CollectGarbage(GcCtx.CacheExpireTime()); m_DiskLayer.CollectGarbage(GcCtx); } GcStorageSize ZenCacheNamespace::StorageSize() const { - return {.DiskSize = m_DiskLayer.TotalSize(), .MemorySize = m_MemLayer.TotalSize()}; + return m_DiskLayer.StorageSize(); } ZenCacheNamespace::Info ZenCacheNamespace::GetInfo() const { - ZenCacheNamespace::Info Info = {.Config = {.RootDir = m_RootDir, .DiskLayerThreshold = m_DiskLayerSizeThreshold}, - .DiskLayerInfo = m_DiskLayer.GetInfo(), - .MemoryLayerInfo = m_MemLayer.GetInfo()}; + ZenCacheNamespace::Info Info = {.RootDir = m_RootDir, .Config = m_Configuration, .DiskLayerInfo = m_DiskLayer.GetInfo()}; std::unordered_set<std::string> BucketNames; for (const std::string& BucketName : Info.DiskLayerInfo.BucketNames) { BucketNames.insert(BucketName); } - for (const std::string& BucketName : Info.MemoryLayerInfo.BucketNames) - { - BucketNames.insert(BucketName); - } Info.BucketNames.insert(Info.BucketNames.end(), BucketNames.begin(), BucketNames.end()); return Info; } @@ -250,8 +210,7 @@ ZenCacheNamespace::GetBucketInfo(std::string_view Bucket) const { return {}; } - ZenCacheNamespace::BucketInfo Info = {.DiskLayerInfo = *DiskBucketInfo, - .MemoryLayerInfo = m_MemLayer.GetBucketInfo(Bucket).value_or(ZenCacheMemoryLayer::BucketInfo{})}; + ZenCacheNamespace::BucketInfo Info = {.DiskLayerInfo = *DiskBucketInfo}; return Info; } @@ -278,23 +237,25 @@ ZEN_DEFINE_LOG_CATEGORY_STATIC(LogCacheActivity, "z$"); static constinit std::string_view UE4DDCNamespaceName = "ue4.ddc"; -ZenCacheStore::ZenCacheStore(GcManager& Gc, - JobQueue& JobQueue, - const Configuration& Configuration, - const DiskWriteBlocker* InDiskWriteBlocker) +ZenCacheStore::ZenCacheStore(GcManager& Gc, + JobQueue& JobQueue, + const std::filesystem::path& BasePath, + const Configuration& Configuration, + const DiskWriteBlocker* InDiskWriteBlocker) : m_DiskWriteBlocker(InDiskWriteBlocker) , m_Gc(Gc) , m_JobQueue(JobQueue) +, m_BasePath(BasePath) , m_Configuration(Configuration) , m_ExitLogging(false) { SetLoggingConfig(m_Configuration.Logging); - CreateDirectories(m_Configuration.BasePath); + CreateDirectories(m_BasePath); - ZEN_INFO("initializing cache store at '{}'", m_Configuration.BasePath); + ZEN_INFO("initializing cache store at '{}'", m_BasePath); DirectoryContent DirContent; - GetDirectoryContent(m_Configuration.BasePath, DirectoryContent::IncludeDirsFlag, DirContent); + GetDirectoryContent(m_BasePath, DirectoryContent::IncludeDirsFlag, DirContent); std::vector<std::string> Namespaces; for (const std::filesystem::path& DirPath : DirContent.Directories) @@ -307,14 +268,13 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc, } } - ZEN_INFO("Found {} namespaces in '{}'", Namespaces.size(), m_Configuration.BasePath); + ZEN_INFO("Found {} namespaces in '{}'", Namespaces.size(), m_BasePath); if (std::find(Namespaces.begin(), Namespaces.end(), UE4DDCNamespaceName) == Namespaces.end()) { // default (unspecified) and ue4-ddc namespace points to the same namespace instance - std::filesystem::path DefaultNamespaceFolder = - m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName); + std::filesystem::path DefaultNamespaceFolder = m_BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName); CreateDirectories(DefaultNamespaceFolder); Namespaces.push_back(std::string(UE4DDCNamespaceName)); } @@ -324,15 +284,14 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc, m_Namespaces[NamespaceName] = std::make_unique<ZenCacheNamespace>(Gc, m_JobQueue, - m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName), - m_Configuration.EnableReferenceCaching, - m_Configuration.MemLayerConfig); + m_BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName), + m_Configuration.NamespaceConfig); } } ZenCacheStore::~ZenCacheStore() { - ZEN_INFO("closing cache store at '{}'", m_Configuration.BasePath); + ZEN_INFO("closing cache store at '{}'", m_BasePath); SetLoggingConfig({.EnableWriteLog = false, .EnableAccessLog = false}); m_Namespaces.clear(); } @@ -564,7 +523,7 @@ ZenCacheStore::DropNamespace(std::string_view InNamespace) void ZenCacheStore::Flush() { - ZEN_INFO("flushing cache store at '{}'", m_Configuration.BasePath); + ZEN_INFO("flushing cache store at '{}'", m_BasePath); IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Flush(); }); } @@ -632,13 +591,12 @@ ZenCacheStore::GetNamespace(std::string_view Namespace) return It->second.get(); } - auto NewNamespace = m_Namespaces.insert_or_assign( - std::string(Namespace), - std::make_unique<ZenCacheNamespace>(m_Gc, - m_JobQueue, - m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, Namespace), - m_Configuration.EnableReferenceCaching, - m_Configuration.MemLayerConfig)); + auto NewNamespace = + m_Namespaces.insert_or_assign(std::string(Namespace), + std::make_unique<ZenCacheNamespace>(m_Gc, + m_JobQueue, + m_BasePath / fmt::format("{}{}", NamespaceDiskPrefix, Namespace), + m_Configuration.NamespaceConfig)); return NewNamespace.first->second.get(); } @@ -754,7 +712,6 @@ ZenCacheStore::GetInfo() const Info.NamespaceNames.push_back(std::string(NamespaceName)); ZenCacheNamespace::Info NamespaceInfo = Namespace.GetInfo(); Info.DiskEntryCount += NamespaceInfo.DiskLayerInfo.EntryCount; - Info.MemoryEntryCount += NamespaceInfo.MemoryLayerInfo.EntryCount; }); return Info; @@ -816,7 +773,7 @@ TEST_CASE("z$.store") GcManager Gc; auto JobQueue = MakeJobQueue(1, "testqueue"); - ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", false); + ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {}); const int kIterationCount = 100; @@ -872,9 +829,9 @@ TEST_CASE("z$.size") { GcManager Gc; - ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", false); + ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {}); - CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() - 256); + CbObject CacheValue = CreateCacheValue(Zcs.GetConfig().DiskLayerConfig.BucketConfig.MemCacheSizeThreshold - 256); IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); Buffer.SetContentType(ZenContentType::kCbObject); @@ -895,7 +852,7 @@ TEST_CASE("z$.size") { GcManager Gc; - ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", false); + ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {}); const GcStorageSize SerializedSize = Zcs.StorageSize(); CHECK_EQ(SerializedSize.MemorySize, 0); @@ -906,6 +863,7 @@ TEST_CASE("z$.size") Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket)); } CHECK_EQ(0, Zcs.StorageSize().DiskSize); + CHECK_EQ(0, Zcs.StorageSize().MemorySize); } } @@ -918,9 +876,9 @@ TEST_CASE("z$.size") { GcManager Gc; - ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", false); + ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {}); - CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() + 64); + CbObject CacheValue = CreateCacheValue(Zcs.GetConfig().DiskLayerConfig.BucketConfig.MemCacheSizeThreshold + 64); IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); Buffer.SetContentType(ZenContentType::kCbObject); @@ -938,7 +896,7 @@ TEST_CASE("z$.size") { GcManager Gc; - ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", false); + ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {}); const GcStorageSize SerializedSize = Zcs.StorageSize(); CHECK_EQ(SerializedSize.MemorySize, 0); @@ -977,7 +935,10 @@ TEST_CASE("z$.gc") { GcManager Gc; - ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", true); + ZenCacheNamespace Zcs(Gc, + *JobQueue, + TempDir.Path() / "cache", + {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}}); const auto Bucket = "teardrinker"sv; // Create a cache record @@ -1014,7 +975,10 @@ TEST_CASE("z$.gc") // Expect timestamps to be serialized { GcManager Gc; - ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", true); + ZenCacheNamespace Zcs(Gc, + *JobQueue, + TempDir.Path() / "cache", + {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}}); std::vector<IoHash> Keep; // Collect garbage with 1 hour max cache duration @@ -1035,7 +999,10 @@ TEST_CASE("z$.gc") { ScopedTemporaryDirectory TempDir; GcManager Gc; - ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", true); + ZenCacheNamespace Zcs(Gc, + *JobQueue, + TempDir.Path() / "cache", + {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}}); const auto Bucket = "fortysixandtwo"sv; const GcClock::TimePoint CurrentTime = GcClock::Now(); @@ -1081,7 +1048,10 @@ TEST_CASE("z$.gc") ScopedTemporaryDirectory TempDir; GcManager Gc; { - ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", true); + ZenCacheNamespace Zcs(Gc, + *JobQueue, + TempDir.Path() / "cache", + {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}}); const auto Bucket = "rightintwo"sv; std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)}; @@ -1126,7 +1096,10 @@ TEST_CASE("z$.gc") } { // Unreferenced blocks will be pruned so size should now be zero - ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", true); + ZenCacheNamespace Zcs(Gc, + *JobQueue, + TempDir.Path() / "cache", + {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}}); CHECK_EQ(0, Zcs.StorageSize().DiskSize); } } @@ -1183,7 +1156,7 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) WorkerThreadPool ThreadPool(4); GcManager Gc; auto JobQueue = MakeJobQueue(1, "testqueue"); - ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path(), true); + ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path(), {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}}); { std::atomic<size_t> WorkCompleted = 0; @@ -1402,7 +1375,7 @@ TEST_CASE("z$.namespaces") IoHash Key2; { GcManager Gc; - ZenCacheStore Zcs(Gc, *JobQueue, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = false}, nullptr); + ZenCacheStore Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {.AllowAutomaticCreationOfNamespaces = false}, nullptr); const auto Bucket = "teardrinker"sv; const auto CustomNamespace = "mynamespace"sv; @@ -1427,7 +1400,7 @@ TEST_CASE("z$.namespaces") { GcManager Gc; - ZenCacheStore Zcs(Gc, *JobQueue, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}, nullptr); + ZenCacheStore Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {.AllowAutomaticCreationOfNamespaces = true}, nullptr); const auto Bucket = "teardrinker"sv; const auto CustomNamespace = "mynamespace"sv; @@ -1490,7 +1463,7 @@ TEST_CASE("z$.drop.bucket") WorkerThreadPool Workers(1); { GcManager Gc; - ZenCacheStore Zcs(Gc, *JobQueue, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}, nullptr); + ZenCacheStore Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {.AllowAutomaticCreationOfNamespaces = true}, nullptr); const auto Bucket = "teardrinker"sv; const auto Namespace = "mynamespace"sv; @@ -1563,7 +1536,7 @@ TEST_CASE("z$.drop.namespace") WorkerThreadPool Workers(1); { GcManager Gc; - ZenCacheStore Zcs(Gc, *JobQueue, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}, nullptr); + ZenCacheStore Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {.AllowAutomaticCreationOfNamespaces = true}, nullptr); const auto Bucket1 = "teardrinker1"sv; const auto Bucket2 = "teardrinker2"sv; const auto Namespace1 = "mynamespace1"sv; @@ -1629,7 +1602,7 @@ TEST_CASE("z$.blocked.disklayer.put") GcManager Gc; auto JobQueue = MakeJobQueue(1, "testqueue"); - ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", false); + ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {}); CbObject CacheValue = CreateCacheValue(64 * 1024 + 64); @@ -1724,7 +1697,7 @@ TEST_CASE("z$.scrub") GcManager Gc; CidStore CidStore(Gc); auto JobQueue = MakeJobQueue(1, "testqueue"); - ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", true); + ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {}); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); diff --git a/src/zenserver/cache/structuredcachestore.h b/src/zenserver/cache/structuredcachestore.h index 28b2189ae..02d0e31c0 100644 --- a/src/zenserver/cache/structuredcachestore.h +++ b/src/zenserver/cache/structuredcachestore.h @@ -3,7 +3,6 @@ #pragma once #include "cachedisklayer.h" -#include "cachememorylayer.h" #include <zencore/compactbinary.h> #include <zencore/iohash.h> @@ -52,20 +51,18 @@ class ZenCacheNamespace final : public GcStorage, public GcContributor public: struct Configuration { - std::filesystem::path RootDir; - uint64_t DiskLayerThreshold = 0; + ZenCacheDiskLayer::Configuration DiskLayerConfig; }; struct BucketInfo { - ZenCacheDiskLayer::BucketInfo DiskLayerInfo; - ZenCacheMemoryLayer::BucketInfo MemoryLayerInfo; + ZenCacheDiskLayer::BucketInfo DiskLayerInfo; }; struct Info { - Configuration Config; - std::vector<std::string> BucketNames; - ZenCacheDiskLayer::Info DiskLayerInfo; - ZenCacheMemoryLayer::Info MemoryLayerInfo; + std::filesystem::path RootDir; + Configuration Config; + std::vector<std::string> BucketNames; + ZenCacheDiskLayer::Info DiskLayerInfo; }; struct NamespaceStats @@ -78,11 +75,7 @@ public: ZenCacheDiskLayer::DiskStats DiskStats; }; - ZenCacheNamespace(GcManager& Gc, - JobQueue& JobQueue, - const std::filesystem::path& RootDir, - bool EnableReferenceCaching, - const ZenCacheMemoryLayer::Configuration MemLayerConfig = {}); + ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config); ~ZenCacheNamespace(); bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); @@ -92,12 +85,10 @@ public: void EnumerateBucketContents(std::string_view Bucket, std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const; - bool Drop(); - void Flush(); - uint64_t DiskLayerThreshold() const { return m_DiskLayerSizeThreshold; } + bool Drop(); + void Flush(); // GcContributor - virtual void GatherReferences(GcContext& GcCtx) override; // GcStorage @@ -105,6 +96,7 @@ public: virtual void CollectGarbage(GcContext& GcCtx) override; virtual GcStorageSize StorageSize() const override; + Configuration GetConfig() const { return m_Configuration; } Info GetInfo() const; std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const; NamespaceStats Stats(); @@ -113,17 +105,16 @@ public: private: GcManager& m_Gc; - std::filesystem::path m_RootDir; JobQueue& m_JobQueue; - ZenCacheMemoryLayer m_MemLayer; + std::filesystem::path m_RootDir; + Configuration m_Configuration; ZenCacheDiskLayer m_DiskLayer; std::atomic<uint64_t> m_HitCount{}; std::atomic<uint64_t> m_MissCount{}; std::atomic<uint64_t> m_WriteCount{}; metrics::RequestStats m_PutOps; metrics::RequestStats m_GetOps; - uint64_t m_DiskLayerSizeThreshold = 1 * 1024; - uint64_t m_LastScrubTime = 0; + uint64_t m_LastScrubTime = 0; ZenCacheNamespace(const ZenCacheNamespace&) = delete; ZenCacheNamespace& operator=(const ZenCacheNamespace&) = delete; @@ -144,10 +135,8 @@ public: struct Configuration { - std::filesystem::path BasePath; - bool AllowAutomaticCreationOfNamespaces = false; - bool EnableReferenceCaching = false; - ZenCacheMemoryLayer::Configuration MemLayerConfig; + ZenCacheNamespace::Configuration NamespaceConfig; + bool AllowAutomaticCreationOfNamespaces = false; struct LogConfig { bool EnableWriteLog = true; @@ -157,10 +146,10 @@ public: struct Info { + std::filesystem::path BasePath; Configuration Config; std::vector<std::string> NamespaceNames; - uint64_t DiskEntryCount = 0; - uint64_t MemoryEntryCount = 0; + uint64_t DiskEntryCount = 0; GcStorageSize StorageSize; }; @@ -182,7 +171,11 @@ public: std::vector<NamedNamespaceStats> NamespaceStats; }; - ZenCacheStore(GcManager& Gc, JobQueue& JobQueue, const Configuration& Configuration, const DiskWriteBlocker* InDiskWriteBlocker); + ZenCacheStore(GcManager& Gc, + JobQueue& JobQueue, + const std::filesystem::path& BasePath, + const Configuration& Configuration, + const DiskWriteBlocker* InDiskWriteBlocker); ~ZenCacheStore(); bool Get(const CacheRequestContext& Context, @@ -233,6 +226,7 @@ private: GcManager& m_Gc; JobQueue& m_JobQueue; + std::filesystem::path m_BasePath; Configuration m_Configuration; std::atomic<uint64_t> m_HitCount{}; std::atomic<uint64_t> m_MissCount{}; |