diff options
| author | Dan Engelbrecht <[email protected]> | 2023-10-24 14:54:26 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-10-24 14:54:26 +0200 |
| commit | 1a144212278aa7158d6b32b63e398db95a7ae868 (patch) | |
| tree | 58735827b0b706368a82bcaaa8aaa68f211e1d10 /src/zenserver/cache/cachedisklayer.cpp | |
| parent | chunking moved to zenstore (#490) (diff) | |
| download | zen-1a144212278aa7158d6b32b63e398db95a7ae868.tar.xz zen-1a144212278aa7158d6b32b63e398db95a7ae868.zip | |
merge disk and memory layers (#493)
- Feature: Added `--cache-memlayer-sizethreshold` option to zenserver to control at which size cache entries get cached in memory
- Changed: Merged cache memory layer with cache disk layer to reduce memory and cpu overhead
Diffstat (limited to 'src/zenserver/cache/cachedisklayer.cpp')
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.cpp | 826 |
1 files changed, 535 insertions, 291 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 73c02bcc2..f755436e0 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -7,6 +7,7 @@ #include <zencore/compactbinaryvalidation.h> #include <zencore/compress.h> #include <zencore/fmtutils.h> +#include <zencore/jobqueue.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> #include <zencore/trace.h> @@ -167,16 +168,16 @@ SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) const size_t ZenCacheDiskLayer::CacheBucket::UnknownReferencesIndex; const size_t ZenCacheDiskLayer::CacheBucket::NoReferencesIndex; -ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName, bool EnableReferenceCaching) +ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName, const BucketConfiguration& Config) : m_BucketName(std::move(BucketName)) +, m_Configuration(Config) , m_BucketId(Oid::Zero) -, m_EnableReferenceCaching(EnableReferenceCaching) { if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap"))) { // This is pretty ad hoc but in order to avoid too many individual files // it makes sense to have a different strategy for legacy values - m_LargeObjectThreshold = 16 * 1024 * 1024; + m_Configuration.LargeObjectThreshold = 16 * 1024 * 1024; } } @@ -397,7 +398,7 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); CacheBucketIndexHeader Header = {.EntryCount = Entries.size(), .LogPosition = LogCount, - .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)}; + .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)}; Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); @@ -470,7 +471,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - m_PayloadAlignment = Header.PayloadAlignment; + m_Configuration.PayloadAlignment = Header.PayloadAlignment; std::vector<DiskIndexEntry> Entries; Entries.resize(Header.EntryCount); @@ -479,11 +480,6 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index sizeof(CacheBucketIndexHeader)); m_Payloads.reserve(Header.EntryCount); - if (m_EnableReferenceCaching) - { - m_FirstReferenceIndex.reserve(Header.EntryCount); - } - m_AccessTimes.reserve(Header.EntryCount); m_Index.reserve(Header.EntryCount); std::string InvalidEntryReason; @@ -496,14 +492,18 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index } size_t EntryIndex = m_Payloads.size(); m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location, .RawSize = 0, .RawHash = IoHash::Zero}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - if (m_EnableReferenceCaching) - { - m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex); - } m_Index.insert_or_assign(Entry.Key, EntryIndex); EntryCount++; } + m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount())); + if (m_Configuration.MemCacheSizeThreshold > 0) + { + m_CachedPayloads.resize(m_Payloads.size()); + } + if (m_Configuration.EnableReferenceCaching) + { + m_FirstReferenceIndex.resize(m_Payloads.size(), UnknownReferencesIndex); + } OutVersion = CacheBucketIndexHeader::Version2; return Header.LogPosition; } @@ -540,8 +540,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); SkipEntryCount = 0; } - LogEntryCount = EntryCount - SkipEntryCount; - m_Index.reserve(LogEntryCount); + LogEntryCount = EntryCount - SkipEntryCount; uint64_t InvalidEntryCount = 0; CasLog.Replay( [&](const DiskIndexEntry& Record) { @@ -559,14 +558,18 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui } size_t EntryIndex = m_Payloads.size(); m_Payloads.emplace_back(BucketPayload{.Location = Record.Location, .RawSize = 0u, .RawHash = IoHash::Zero}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - if (m_EnableReferenceCaching) - { - m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex); - } m_Index.insert_or_assign(Record.Key, EntryIndex); }, SkipEntryCount); + m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount())); + if (m_Configuration.MemCacheSizeThreshold > 0) + { + m_CachedPayloads.resize(m_Payloads.size()); + } + if (m_Configuration.EnableReferenceCaching) + { + m_FirstReferenceIndex.resize(m_Payloads.size(), UnknownReferencesIndex); + } if (InvalidEntryCount) { ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir); @@ -582,11 +585,12 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) { ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenLog"); - m_TotalStandaloneSize = 0; + m_StandaloneSize = 0; m_Index.clear(); m_Payloads.clear(); m_AccessTimes.clear(); + m_CachedPayloads.clear(); m_FirstReferenceIndex.clear(); m_ReferenceHashes.clear(); m_NextReferenceHashesIndexes.clear(); @@ -604,7 +608,7 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) CreateDirectories(m_BucketDir); - m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1); + m_BlockStore.Initialize(m_BlocksBasePath, m_Configuration.MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1); if (std::filesystem::is_regular_file(IndexPath)) { @@ -643,10 +647,10 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { - m_TotalStandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed); + m_StandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed); continue; } - const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment); + const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment); KnownLocations.push_back(BlockLocation); } @@ -681,7 +685,7 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) con { ZEN_TRACE_CPU("Z$::Disk::Bucket::GetInlineCacheValue"); - BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment); + BlockStoreLocation Location = Loc.GetBlockLocation(m_Configuration.PayloadAlignment); IoBuffer Value = m_BlockStore.TryGetChunk(Location); if (Value) @@ -713,88 +717,190 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentTy } bool -ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) +ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue, std::atomic_uint64_t& CacheMemoryUsage) { metrics::RequestStats::Scope StatsScope(m_GetOps, 0); - RwLock::SharedLockScope _(m_IndexLock); + RwLock::SharedLockScope IndexLock(m_IndexLock); auto It = m_Index.find(HashKey); if (It == m_Index.end()) { - m_MissCount++; + m_DiskMissCount++; + if (m_Configuration.MemCacheSizeThreshold > 0) + { + m_MemoryMissCount++; + } return false; } - size_t EntryIndex = It.value(); - const BucketPayload& Payload = m_Payloads[EntryIndex]; - m_AccessTimes[EntryIndex] = GcClock::TickCount(); - DiskLocation Location = Payload.Location; - OutValue.RawSize = Payload.RawSize; - OutValue.RawHash = Payload.RawHash; - if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + + size_t EntryIndex = It.value(); + m_AccessTimes[EntryIndex] = GcClock::TickCount(); + DiskLocation Location = m_Payloads[EntryIndex].Location; + bool FillRawHashAndRawSize = (!Location.IsFlagSet(DiskLocation::kStructured)) && (Location.Size() > 0); + if (FillRawHashAndRawSize) { - // We don't need to hold the index lock when we read a standalone file - _.ReleaseNow(); - OutValue.Value = GetStandaloneCacheValue(Location.GetContentType(), HashKey); + const BucketPayload& Payload = m_Payloads[EntryIndex]; + if (Payload.RawHash != IoHash::Zero || Payload.RawSize != 0) + { + OutValue.RawHash = Payload.RawHash; + OutValue.RawSize = Payload.RawSize; + FillRawHashAndRawSize = false; + } } - else + + if (m_Configuration.MemCacheSizeThreshold > 0 && m_CachedPayloads[EntryIndex]) { - OutValue.Value = GetInlineCacheValue(Location); + OutValue.Value = m_CachedPayloads[EntryIndex]; + IndexLock.ReleaseNow(); + m_MemoryHitCount++; } - _.ReleaseNow(); - - if (!Location.IsFlagSet(DiskLocation::kStructured)) + else { - if (OutValue.RawHash == IoHash::Zero && OutValue.RawSize == 0 && OutValue.Value.GetSize() > 0) + IndexLock.ReleaseNow(); + if (m_Configuration.MemCacheSizeThreshold > 0) + { + m_MemoryMissCount++; + } + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + OutValue.Value = GetStandaloneCacheValue(Location.GetContentType(), HashKey); + } + else { - if (Location.IsFlagSet(DiskLocation::kCompressed)) + OutValue.Value = GetInlineCacheValue(Location); + if (m_Configuration.MemCacheSizeThreshold > 0) { - (void)CompressedBuffer::FromCompressed(SharedBuffer(OutValue.Value), OutValue.RawHash, OutValue.RawSize); + size_t ValueSize = OutValue.Value.GetSize(); + if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold) + { + ZEN_TRACE_CPU("Z$::Disk::Bucket::Get::MemCache"); + OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value); + RwLock::ExclusiveLockScope _(m_IndexLock); + if (auto UpdateIt = m_Index.find(HashKey); UpdateIt != m_Index.end()) + { + // Only update if it has not already been updated by other thread + if (!m_CachedPayloads[UpdateIt->second]) + { + m_CachedPayloads[UpdateIt->second] = OutValue.Value; + m_MemCachedSize.fetch_add(ValueSize); + CacheMemoryUsage.fetch_add(ValueSize); + m_MemoryWriteCount++; + } + } + } } - else + } + } + + if (FillRawHashAndRawSize) + { + ZEN_TRACE_CPU("Z$::Disk::Bucket::Get::MetaData"); + if (Location.IsFlagSet(DiskLocation::kCompressed)) + { + if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize)) { - OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); - OutValue.RawSize = OutValue.Value.GetSize(); + OutValue = ZenCacheValue{}; + m_DiskMissCount++; + return false; } - RwLock::ExclusiveLockScope __(m_IndexLock); - if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end()) + } + else + { + OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); + OutValue.RawSize = OutValue.Value.GetSize(); + } + RwLock::ExclusiveLockScope __(m_IndexLock); + if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end()) + { + BucketPayload& WritePayload = m_Payloads[WriteIt.value()]; + if (OutValue.RawHash == IoHash::Zero && OutValue.RawSize == 0) { - BucketPayload& WritePayload = m_Payloads[WriteIt.value()]; - WritePayload.RawHash = OutValue.RawHash; - WritePayload.RawSize = OutValue.RawSize; - - m_LogFlushPosition = 0; // Force resave of index on exit + WritePayload.RawHash = OutValue.RawHash; + WritePayload.RawSize = OutValue.RawSize; } } } if (OutValue.Value) { - m_HitCount++; + m_DiskHitCount++; StatsScope.SetBytes(OutValue.Value.GetSize()); return true; } else { - m_MissCount++; + m_DiskMissCount++; return false; } } void -ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) +ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + std::atomic_uint64_t& CacheMemoryUsage) { metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); - if (Value.Value.Size() >= m_LargeObjectThreshold) + if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) { - return PutStandaloneCacheValue(HashKey, Value, References); + PutStandaloneCacheValue(HashKey, Value, References, CacheMemoryUsage); } - PutInlineCacheValue(HashKey, Value, References); + else + { + PutInlineCacheValue(HashKey, Value, References, CacheMemoryUsage); + } + + m_DiskWriteCount++; +} - m_WriteCount++; +void +ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime, std::atomic_uint64_t& CacheMemoryUsage) +{ + GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); + + RwLock::ExclusiveLockScope _(m_IndexLock); + for (const auto& Kv : m_Index) + { + if (m_AccessTimes[Kv.second] < ExpireTicks) + { + size_t PayloadSize = m_CachedPayloads[Kv.second].GetSize(); + m_MemCachedSize.fetch_sub(PayloadSize); + CacheMemoryUsage.fetch_sub(PayloadSize); + m_CachedPayloads[Kv.second] = {}; + } + } +} + +void +ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart, + GcClock::Duration SectionLength, + std::vector<uint64_t>& InOutUsageSlots) +{ + RwLock::SharedLockScope _(m_IndexLock); + for (const auto& It : m_Index) + { + size_t Index = It.second; + if (!m_CachedPayloads[Index]) + { + continue; + } + GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index])); + GcClock::Duration Age = TickStart.time_since_epoch() - ItemAccessTime.time_since_epoch(); + uint64_t Slot = gsl::narrow<uint64_t>(Age.count() > 0 ? Age.count() / SectionLength.count() : 0); + if (Slot >= InOutUsageSlots.capacity()) + { + Slot = InOutUsageSlots.capacity() - 1; + } + if (Slot > InOutUsageSlots.size()) + { + InOutUsageSlots.resize(uint64_t(Slot + 1), 0); + } + InOutUsageSlots[Slot] += m_CachedPayloads[Index].GetSize(); + } } bool -ZenCacheDiskLayer::CacheBucket::Drop() +ZenCacheDiskLayer::CacheBucket::Drop(std::atomic_uint64_t& CacheMemoryUsage) { ZEN_TRACE_CPU("Z$::Disk::Bucket::Drop"); @@ -814,10 +920,14 @@ ZenCacheDiskLayer::CacheBucket::Drop() m_Index.clear(); m_Payloads.clear(); m_AccessTimes.clear(); + m_CachedPayloads.clear(); m_FirstReferenceIndex.clear(); m_ReferenceHashes.clear(); m_NextReferenceHashesIndexes.clear(); m_ReferenceCount = 0; + m_StandaloneSize.store(0); + CacheMemoryUsage.fetch_sub(m_MemCachedSize.load()); + m_MemCachedSize.store(0); return Deleted; } @@ -992,7 +1102,7 @@ ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer) }; void -ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) +ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx, std::atomic_uint64_t& CacheMemoryUsage) { ZEN_TRACE_CPU("Z$::Disk::Bucket::Scrub"); @@ -1083,7 +1193,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) } else { - ChunkLocations.emplace_back(Loc.GetBlockLocation(m_PayloadAlignment)); + ChunkLocations.emplace_back(Loc.GetBlockLocation(m_Configuration.PayloadAlignment)); ChunkIndexToChunkHash.push_back(HashKey); continue; } @@ -1169,11 +1279,24 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) // Log a tombstone and delete the in-memory index for the bad entry const auto It = m_Index.find(BadKey); BucketPayload& Payload = m_Payloads[It->second]; - if (m_EnableReferenceCaching) + if (m_Configuration.EnableReferenceCaching) { RemoveReferences(IndexLock, m_FirstReferenceIndex[It->second]); } DiskLocation Location = Payload.Location; + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + m_StandaloneSize.fetch_sub(Location.Size(), std::memory_order::relaxed); + } + + if (m_Configuration.MemCacheSizeThreshold > 0) + { + size_t CachedSize = m_CachedPayloads[It->second].GetSize(); + m_MemCachedSize.fetch_sub(CachedSize); + CacheMemoryUsage.fetch_sub(CachedSize); + m_CachedPayloads[It->second] = IoBuffer{}; + } + Location.Flags |= DiskLocation::kTombStone; LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location}); m_Index.erase(BadKey); @@ -1193,7 +1316,6 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) std::error_code Ec; fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file... } - m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); } } m_SlogFile.Append(LogEntries); @@ -1202,38 +1324,13 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) { std::vector<BucketPayload> Payloads; std::vector<AccessTime> AccessTimes; + std::vector<IoBuffer> CachedPayloads; std::vector<size_t> FirstReferenceIndex; IndexMap Index; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - size_t EntryCount = m_Index.size(); - Payloads.reserve(EntryCount); - AccessTimes.reserve(EntryCount); - if (m_EnableReferenceCaching) - { - FirstReferenceIndex.reserve(EntryCount); - } - Index.reserve(EntryCount); - for (auto It : m_Index) - { - size_t EntryIndex = Payloads.size(); - Payloads.push_back(m_Payloads[It.second]); - AccessTimes.push_back(m_AccessTimes[It.second]); - if (m_EnableReferenceCaching) - { - FirstReferenceIndex.push_back(m_FirstReferenceIndex[It.second]); - } - Index.insert({It.first, EntryIndex}); - } - m_Index.swap(Index); - m_Payloads.swap(Payloads); - m_AccessTimes.swap(AccessTimes); - if (m_EnableReferenceCaching) - { - m_FirstReferenceIndex.swap(FirstReferenceIndex); - CompactReferences(IndexLock); - } + CompactState(Payloads, AccessTimes, CachedPayloads, FirstReferenceIndex, Index, IndexLock); } } } @@ -1334,7 +1431,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) { continue; } - if (m_EnableReferenceCaching) + if (m_Configuration.EnableReferenceCaching) { if (FirstReferenceIndex.empty() || (FirstReferenceIndex[Entry.second] == UnknownReferencesIndex)) { @@ -1397,8 +1494,17 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) #endif // CALCULATE_BLOCKING_TIME if (auto It = m_Index.find(Entry.first); It != m_Index.end()) { - BucketPayload& UpdatePayload = m_Payloads[It->second]; - Buffer = GetInlineCacheValue(UpdatePayload.Location); + if (m_Configuration.MemCacheSizeThreshold > 0) + { + Buffer = m_CachedPayloads[It->second]; + } + if (!Buffer) + { + DiskLocation Location = m_Payloads[It->second].Location; + IndexLock.ReleaseNow(); + Buffer = GetInlineCacheValue(Location); + // Don't memcache items when doing GC + } } if (!Buffer) { @@ -1411,7 +1517,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) CbObject Obj(SharedBuffer{Buffer}); size_t CurrentCidCount = Cids.size(); Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); - if (m_EnableReferenceCaching) + if (m_Configuration.EnableReferenceCaching) { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); #if CALCULATE_BLOCKING_TIME @@ -1450,20 +1556,20 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) } void -ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) +ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx, std::atomic_uint64_t& CacheMemoryUsage) { ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage"); ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir); - Stopwatch TotalTimer; - uint64_t WriteBlockTimeUs = 0; - uint64_t WriteBlockLongestTimeUs = 0; - uint64_t ReadBlockTimeUs = 0; - uint64_t ReadBlockLongestTimeUs = 0; - uint64_t TotalChunkCount = 0; - uint64_t DeletedSize = 0; - uint64_t OldTotalSize = TotalSize(); + Stopwatch TotalTimer; + uint64_t WriteBlockTimeUs = 0; + uint64_t WriteBlockLongestTimeUs = 0; + uint64_t ReadBlockTimeUs = 0; + uint64_t ReadBlockLongestTimeUs = 0; + uint64_t TotalChunkCount = 0; + uint64_t DeletedSize = 0; + GcStorageSize OldTotalSize = StorageSize(); std::unordered_set<IoHash> DeletedChunks; uint64_t MovedCount = 0; @@ -1473,7 +1579,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved " "{} " "of {} " - "entires ({}).", + "entries ({}/{}).", m_BucketDir, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), NiceLatencyNs(WriteBlockTimeUs), @@ -1484,7 +1590,8 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) DeletedChunks.size(), MovedCount, TotalChunkCount, - NiceBytes(OldTotalSize)); + NiceBytes(OldTotalSize.DiskSize), + NiceBytes(OldTotalSize.MemorySize)); bool Expected = false; if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) @@ -1514,45 +1621,18 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) // Clean up m_AccessTimes and m_Payloads vectors std::vector<BucketPayload> Payloads; std::vector<AccessTime> AccessTimes; + std::vector<IoBuffer> CachedPayloads; std::vector<size_t> FirstReferenceIndex; IndexMap Index; - { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - size_t EntryCount = m_Index.size(); - Payloads.reserve(EntryCount); - AccessTimes.reserve(EntryCount); - if (m_EnableReferenceCaching) - { - FirstReferenceIndex.reserve(EntryCount); - } - Index.reserve(EntryCount); - for (auto It : m_Index) - { - size_t OldEntryIndex = It.second; - size_t NewEntryIndex = Payloads.size(); - Payloads.push_back(m_Payloads[OldEntryIndex]); - AccessTimes.push_back(m_AccessTimes[OldEntryIndex]); - if (m_EnableReferenceCaching) - { - FirstReferenceIndex.push_back(m_FirstReferenceIndex[It.second]); - } - Index.insert({It.first, NewEntryIndex}); - } - m_Index.swap(Index); - m_Payloads.swap(Payloads); - m_AccessTimes.swap(AccessTimes); - if (m_EnableReferenceCaching) - { - m_FirstReferenceIndex.swap(FirstReferenceIndex); - CompactReferences(IndexLock); - } + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + CompactState(Payloads, AccessTimes, CachedPayloads, FirstReferenceIndex, Index, IndexLock); } GcCtx.AddDeletedCids(std::vector<IoHash>(DeletedChunks.begin(), DeletedChunks.end())); } @@ -1610,7 +1690,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) for (const auto& Entry : ExpiredStandaloneEntries) { m_Index.erase(Entry.Key); - m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); + m_StandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); DeletedChunks.insert(Entry.Key); } m_SlogFile.Append(ExpiredStandaloneEntries); @@ -1623,20 +1703,18 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::Delete"); - std::error_code Ec; ExtendablePathBuilder<256> Path; for (const auto& Entry : ExpiredStandaloneEntries) { - const IoHash& Key = Entry.Key; - const DiskLocation& Loc = Entry.Location; + const IoHash& Key = Entry.Key; Path.Reset(); BuildPath(Path, Key); fs::path FilePath = Path.ToPath(); { - RwLock::SharedLockScope __(m_IndexLock); + RwLock::SharedLockScope IndexLock(m_IndexLock); Stopwatch Timer; const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); @@ -1649,47 +1727,21 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8()); continue; } - __.ReleaseNow(); + IndexLock.ReleaseNow(); RwLock::ExclusiveLockScope ValueLock(LockForHash(Key)); if (fs::is_regular_file(FilePath)) { ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); + std::error_code Ec; fs::remove(FilePath, Ec); + if (Ec) + { + ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message()); + continue; + } } } - - if (Ec) - { - ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message()); - Ec.clear(); - DiskLocation RestoreLocation = Loc; - RestoreLocation.Flags &= ~DiskLocation::kTombStone; - - RwLock::ExclusiveLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - if (m_Index.contains(Key)) - { - continue; - } - m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation}); - size_t EntryIndex = m_Payloads.size(); - m_Payloads.emplace_back(BucketPayload{.Location = RestoreLocation}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - if (m_EnableReferenceCaching) - { - m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex); - } - m_Index.insert({Key, EntryIndex}); - m_TotalStandaloneSize.fetch_add(RestoreLocation.Size(), std::memory_order::relaxed); - DeletedChunks.erase(Key); - continue; - } DeletedSize += Entry.Location.Size(); } } @@ -1712,7 +1764,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { continue; } - BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment); + BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment); size_t ChunkIndex = ChunkLocations.size(); ChunkLocations.push_back(Location); ChunkIndexToChunkHash[ChunkIndex] = Entry.first; @@ -1729,13 +1781,14 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); if (!PerformDelete) { - m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); - uint64_t CurrentTotalSize = TotalSize(); - ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} {}", + m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_Configuration.PayloadAlignment, true); + GcStorageSize CurrentTotalSize = StorageSize(); + ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} ({}/{})", m_BucketDir, DeleteCount, TotalChunkCount, - NiceBytes(CurrentTotalSize)); + NiceBytes(CurrentTotalSize.DiskSize), + NiceBytes(CurrentTotalSize.MemorySize)); return; } @@ -1743,7 +1796,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) BlockStoreState, ChunkLocations, KeepChunkIndexes, - m_PayloadAlignment, + m_Configuration.PayloadAlignment, false, [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { std::vector<DiskIndexEntry> LogEntries; @@ -1768,7 +1821,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) // Entry has been updated while GC was running, ignore the move continue; } - Payload.Location = DiskLocation(NewLocation, m_PayloadAlignment, Payload.Location.GetFlags()); + Payload.Location = DiskLocation(NewLocation, m_Configuration.PayloadAlignment, Payload.Location.GetFlags()); LogEntries.push_back({.Key = ChunkHash, .Location = Payload.Location}); } for (const size_t ChunkIndex : RemovedChunks) @@ -1783,9 +1836,16 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) } const DiskLocation& OldDiskLocation = Payload.Location; LogEntries.push_back({.Key = ChunkHash, - .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_PayloadAlignment), - m_PayloadAlignment, + .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment), + m_Configuration.PayloadAlignment, OldDiskLocation.GetFlags() | DiskLocation::kTombStone)}); + if (m_Configuration.MemCacheSizeThreshold > 0 && m_CachedPayloads[PayloadIndex]) + { + uint64_t CachePayloadSize = m_CachedPayloads[PayloadIndex].Size(); + m_MemCachedSize.fetch_sub(CachePayloadSize, std::memory_order::relaxed); + CacheMemoryUsage.fetch_sub(CachePayloadSize, std::memory_order::relaxed); + m_CachedPayloads[PayloadIndex] = IoBuffer{}; + } m_Index.erase(ChunkHash); DeletedChunks.insert(ChunkHash); } @@ -1797,33 +1857,20 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) [&]() { return GcCtx.ClaimGCReserve(); }); } -void -ZenCacheDiskLayer::CacheBucket::UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes) -{ - ZEN_TRACE_CPU("Z$::Disk::Bucket::UpdateAccessTimes"); - - using namespace access_tracking; - - for (const KeyAccessTime& KeyTime : AccessTimes) - { - if (auto It = m_Index.find(KeyTime.Key); It != m_Index.end()) - { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_AccessTimes[EntryIndex] = KeyTime.LastAccess; - } - } -} - ZenCacheDiskLayer::BucketStats ZenCacheDiskLayer::CacheBucket::Stats() { - return ZenCacheDiskLayer::BucketStats{.TotalSize = TotalSize(), - .HitCount = m_HitCount, - .MissCount = m_MissCount, - .WriteCount = m_WriteCount, - .PutOps = m_PutOps.Snapshot(), - .GetOps = m_GetOps.Snapshot()}; + GcStorageSize Size = StorageSize(); + return ZenCacheDiskLayer::BucketStats{.DiskSize = Size.DiskSize, + .MemorySize = Size.MemorySize, + .DiskHitCount = m_DiskHitCount, + .DiskMissCount = m_DiskMissCount, + .DiskWriteCount = m_DiskWriteCount, + .MemoryHitCount = m_MemoryHitCount, + .MemoryMissCount = m_MemoryMissCount, + .MemoryWriteCount = m_MemoryWriteCount, + .PutOps = m_PutOps.Snapshot(), + .GetOps = m_GetOps.Snapshot()}; } uint64_t @@ -1907,27 +1954,16 @@ ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) } for (CacheBucket* Bucket : Buckets) { - Bucket->CollectGarbage(GcCtx); - } -} - -void -ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes) -{ - RwLock::SharedLockScope _(m_Lock); - - for (const auto& Kv : AccessTimes.Buckets) - { - if (auto It = m_Buckets.find(Kv.first); It != m_Buckets.end()) - { - CacheBucket& Bucket = *It->second; - Bucket.UpdateAccessTimes(Kv.second); - } + Bucket->CollectGarbage(GcCtx, m_TotalMemCachedSize); } + MemCacheTrim(Buckets, GcCtx.CacheExpireTime()); } void -ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) +ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + std::atomic_uint64_t& CacheMemoryUsage) { ZEN_TRACE_CPU("Z$::Disk::Bucket::PutStandaloneCacheValue"); @@ -1942,7 +1978,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir)); } - bool CleanUpTempFile = false; + bool CleanUpTempFile = true; auto __ = MakeGuard([&] { if (CleanUpTempFile) { @@ -2042,13 +2078,19 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c DiskLocation Loc(NewFileSize, EntryFlags); RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + ValueLock.ReleaseNow(); + if (auto It = m_Index.find(HashKey); It == m_Index.end()) { // Previously unknown object size_t EntryIndex = m_Payloads.size(); m_Payloads.emplace_back(BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); m_AccessTimes.emplace_back(GcClock::TickCount()); - if (m_EnableReferenceCaching) + if (m_Configuration.MemCacheSizeThreshold > 0) + { + m_CachedPayloads.emplace_back(IoBuffer{}); + } + if (m_Configuration.EnableReferenceCaching) { m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex); SetReferences(IndexLock, m_FirstReferenceIndex.back(), References); @@ -2057,25 +2099,38 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c } else { - // TODO: should check if write is idempotent and bail out if it is? size_t EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); BucketPayload& Payload = m_Payloads[EntryIndex]; + uint64_t OldSize = Payload.Location.Size(); Payload = BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash}; - if (m_EnableReferenceCaching) + if (m_Configuration.EnableReferenceCaching) { SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References); } m_AccessTimes[EntryIndex] = GcClock::TickCount(); - m_TotalStandaloneSize.fetch_sub(Loc.Size(), std::memory_order::relaxed); + if (m_Configuration.MemCacheSizeThreshold > 0) + { + if (m_CachedPayloads[EntryIndex]) + { + uint64_t CachePayloadSize = m_CachedPayloads[EntryIndex].Size(); + m_MemCachedSize.fetch_sub(CachePayloadSize, std::memory_order::relaxed); + CacheMemoryUsage.fetch_sub(CachePayloadSize, std::memory_order::relaxed); + m_CachedPayloads[EntryIndex] = IoBuffer{}; + } + } + m_StandaloneSize.fetch_sub(OldSize, std::memory_order::relaxed); } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - m_TotalStandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed); + m_StandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed); } void -ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) +ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + std::atomic_uint64_t& CacheMemoryUsage) { ZEN_TRACE_CPU("Z$::Disk::Bucket::PutInlineCacheValue"); @@ -2090,38 +2145,73 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const EntryFlags |= DiskLocation::kCompressed; } - m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) { - DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); - m_SlogFile.Append({.Key = HashKey, .Location = Location}); + uint64_t PayloadSize = Value.Value.GetSize(); + const bool MemCacheEnabled = (m_Configuration.MemCacheSizeThreshold > 0); + IoBuffer MemCacheBuffer = (MemCacheEnabled && (PayloadSize <= m_Configuration.MemCacheSizeThreshold)) + ? IoBufferBuilder::ReadFromFileMaybe(Value.Value) + : IoBuffer{}; - RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - if (auto It = m_Index.find(HashKey); It != m_Index.end()) - { - // TODO: should check if write is idempotent and bail out if it is? - // this would requiring comparing contents on disk unless we add a - // content hash to the index entry - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_Payloads[EntryIndex] = (BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); - m_AccessTimes[EntryIndex] = GcClock::TickCount(); - if (m_EnableReferenceCaching) + m_BlockStore.WriteChunk( + Value.Value.Data(), + Value.Value.Size(), + m_Configuration.PayloadAlignment, + [&](const BlockStoreLocation& BlockStoreLocation) { + DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags); + m_SlogFile.Append({.Key = HashKey, .Location = Location}); + + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + if (auto It = m_Index.find(HashKey); It != m_Index.end()) { - SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References); + size_t EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); + m_Payloads[EntryIndex] = (BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); + m_AccessTimes[EntryIndex] = GcClock::TickCount(); + + if (MemCacheEnabled) + { + if (m_CachedPayloads[EntryIndex]) + { + uint64_t OldCachedSize = m_CachedPayloads[EntryIndex].GetSize(); + m_MemCachedSize.fetch_sub(OldCachedSize); + CacheMemoryUsage.fetch_sub(OldCachedSize); + } + + if (MemCacheBuffer) + { + m_MemCachedSize.fetch_add(PayloadSize); + CacheMemoryUsage.fetch_add(PayloadSize); + m_MemoryWriteCount++; + } + m_CachedPayloads[EntryIndex] = std::move(MemCacheBuffer); + } + if (m_Configuration.EnableReferenceCaching) + { + SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References); + } } - } - else - { - size_t EntryIndex = m_Payloads.size(); - m_Payloads.emplace_back(BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - if (m_EnableReferenceCaching) + else { - m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex); - SetReferences(IndexLock, m_FirstReferenceIndex.back(), References); + size_t EntryIndex = m_Payloads.size(); + m_Payloads.emplace_back(BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); + m_AccessTimes.emplace_back(GcClock::TickCount()); + if (MemCacheEnabled) + { + if (MemCacheBuffer) + { + m_MemCachedSize.fetch_add(PayloadSize); + CacheMemoryUsage.fetch_add(PayloadSize); + m_MemoryWriteCount++; + } + m_CachedPayloads.emplace_back(std::move(MemCacheBuffer)); + } + if (m_Configuration.EnableReferenceCaching) + { + m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex); + SetReferences(IndexLock, m_FirstReferenceIndex.back(), References); + } + m_Index.insert_or_assign(HashKey, EntryIndex); } - m_Index.insert_or_assign(HashKey, EntryIndex); - } - }); + }); } void @@ -2286,11 +2376,58 @@ ZenCacheDiskLayer::CacheBucket::LockedGetReferences(std::size_t FirstReferenceIn return true; } +void +ZenCacheDiskLayer::CacheBucket::CompactState(std::vector<BucketPayload>& Payloads, + std::vector<AccessTime>& AccessTimes, + std::vector<IoBuffer>& CachedPayloads, + std::vector<size_t>& FirstReferenceIndex, + IndexMap& Index, + RwLock::ExclusiveLockScope& IndexLock) +{ + size_t EntryCount = m_Index.size(); + Payloads.reserve(EntryCount); + AccessTimes.reserve(EntryCount); + CachedPayloads.reserve(EntryCount); + if (m_Configuration.EnableReferenceCaching) + { + FirstReferenceIndex.reserve(EntryCount); + } + Index.reserve(EntryCount); + for (auto It : m_Index) + { + size_t EntryIndex = Payloads.size(); + Payloads.push_back(m_Payloads[It.second]); + AccessTimes.push_back(m_AccessTimes[It.second]); + if (m_Configuration.MemCacheSizeThreshold > 0) + { + CachedPayloads.push_back(std::move(m_CachedPayloads[It.second])); + } + if (m_Configuration.EnableReferenceCaching) + { + FirstReferenceIndex.push_back(m_FirstReferenceIndex[It.second]); + } + Index.insert({It.first, EntryIndex}); + } + m_Index.swap(Index); + m_Payloads.swap(Payloads); + m_AccessTimes.swap(AccessTimes); + if (m_Configuration.MemCacheSizeThreshold > 0) + { + m_CachedPayloads.swap(CachedPayloads); + } + if (m_Configuration.EnableReferenceCaching) + { + m_FirstReferenceIndex.swap(FirstReferenceIndex); + CompactReferences(IndexLock); + } +} + ////////////////////////////////////////////////////////////////////////// -ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir, bool EnableReferenceCaching) -: m_RootDir(RootDir) -, m_EnableReferenceCaching(EnableReferenceCaching) +ZenCacheDiskLayer::ZenCacheDiskLayer(JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config) +: m_JobQueue(JobQueue) +, m_RootDir(RootDir) +, m_Configuration(Config) { } @@ -2327,7 +2464,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach } else { - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_EnableReferenceCaching)); + auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_Configuration.BucketConfig)); Bucket = InsertResult.first->second.get(); std::filesystem::path BucketPath = m_RootDir; @@ -2342,7 +2479,12 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach } ZEN_ASSERT(Bucket != nullptr); - return Bucket->Get(HashKey, OutValue); + if (Bucket->Get(HashKey, OutValue, m_TotalMemCachedSize)) + { + TryMemCacheTrim(); + return true; + } + return false; } void @@ -2376,7 +2518,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z } else { - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_EnableReferenceCaching)); + auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_Configuration.BucketConfig)); Bucket = InsertResult.first->second.get(); std::filesystem::path BucketPath = m_RootDir; @@ -2401,7 +2543,8 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z ZEN_ASSERT(Bucket != nullptr); - Bucket->Put(HashKey, Value, References); + Bucket->Put(HashKey, Value, References, m_TotalMemCachedSize); + TryMemCacheTrim(); } void @@ -2432,7 +2575,7 @@ ZenCacheDiskLayer::DiscoverBuckets() continue; } - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_EnableReferenceCaching)); + auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_Configuration.BucketConfig)); CacheBucket& Bucket = *InsertResult.first->second; try @@ -2489,7 +2632,7 @@ ZenCacheDiskLayer::DropBucket(std::string_view InBucket) m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It); - return Bucket.Drop(); + return Bucket.Drop(m_TotalMemCachedSize); } // Make sure we remove the folder even if we don't know about the bucket @@ -2511,7 +2654,7 @@ ZenCacheDiskLayer::Drop() CacheBucket& Bucket = *It->second; m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It->first); - if (!Bucket.Drop()) + if (!Bucket.Drop(m_TotalMemCachedSize)) { return false; } @@ -2552,11 +2695,11 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) for (auto& Kv : m_Buckets) { #if 1 - Results.push_back( - Ctx.ThreadPool().EnqueueTask(std::packaged_task<void()>{[Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }})); + Results.push_back(Ctx.ThreadPool().EnqueueTask( + std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx, m_TotalMemCachedSize); }})); #else CacheBucket& Bucket = *Kv.second; - Bucket.ScrubStorage(Ctx); + Bucket.ScrubStorage(Ctx, m_TotalMemCachedSize); #endif } @@ -2587,24 +2730,27 @@ ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx) } } -uint64_t -ZenCacheDiskLayer::TotalSize() const +GcStorageSize +ZenCacheDiskLayer::StorageSize() const { - uint64_t TotalSize{}; - RwLock::SharedLockScope _(m_Lock); + GcStorageSize StorageSize{}; + RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { - TotalSize += Kv.second->TotalSize(); + GcStorageSize BucketSize = Kv.second->StorageSize(); + StorageSize.DiskSize += BucketSize.DiskSize; + StorageSize.MemorySize += BucketSize.MemorySize; } - return TotalSize; + return StorageSize; } ZenCacheDiskLayer::DiskStats ZenCacheDiskLayer::Stats() const { - ZenCacheDiskLayer::DiskStats Stats = {}; + GcStorageSize Size = StorageSize(); + ZenCacheDiskLayer::DiskStats Stats = {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize}; { RwLock::SharedLockScope _(m_Lock); Stats.BucketStats.reserve(m_Buckets.size()); @@ -2619,8 +2765,7 @@ ZenCacheDiskLayer::Stats() const ZenCacheDiskLayer::Info ZenCacheDiskLayer::GetInfo() const { - ZenCacheDiskLayer::Info Info = {.Config = {.RootDir = m_RootDir, .EnableReferenceCaching = m_EnableReferenceCaching}, - .TotalSize = TotalSize()}; + ZenCacheDiskLayer::Info Info = {.RootDir = m_RootDir, .Config = m_Configuration}; { RwLock::SharedLockScope _(m_Lock); Info.BucketNames.reserve(m_Buckets.size()); @@ -2628,6 +2773,9 @@ ZenCacheDiskLayer::GetInfo() const { Info.BucketNames.push_back(Kv.first); Info.EntryCount += Kv.second->EntryCount(); + GcStorageSize BucketSize = Kv.second->StorageSize(); + Info.StorageSize.DiskSize += BucketSize.DiskSize; + Info.StorageSize.MemorySize += BucketSize.MemorySize; } } return Info; @@ -2640,7 +2788,7 @@ ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) { - return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()}; + return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->StorageSize()}; } return {}; } @@ -2677,4 +2825,100 @@ ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const st return Details; } +void +ZenCacheDiskLayer::MemCacheTrim() +{ + ZEN_TRACE_CPU("Z$::Disk::MemCacheTrim"); + + ZEN_ASSERT(m_Configuration.MemCacheTargetFootprintBytes != 0); + + const GcClock::TimePoint Now = GcClock::Now(); + + const GcClock::Tick NowTick = Now.time_since_epoch().count(); + const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); + GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim; + const GcClock::Tick NextAllowedTrimTick = LastTrimTick + GcClock::Duration(TrimInterval).count(); + if (NowTick < NextAllowedTrimTick) + { + return; + } + + bool Expected = false; + if (!m_IsMemCacheTrimming.compare_exchange_strong(Expected, true)) + { + return; + } + + // Bump time forward so we don't keep trying to do m_IsTrimming.compare_exchange_strong + const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); + m_LastTickMemCacheTrim.store(NextTrimTick); + + m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this, Now, TrimInterval](JobContext&) { + ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]"); + + uint64_t StartSize = m_TotalMemCachedSize.load(); + Stopwatch Timer; + const auto Guard = MakeGuard([&] { + uint64_t EndSize = m_TotalMemCachedSize.load(); + ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}", + NiceBytes(StartSize > EndSize ? StartSize - EndSize : 0), + NiceBytes(m_TotalMemCachedSize), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + m_IsMemCacheTrimming.store(false); + }); + + const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds); + + std::vector<uint64_t> UsageSlots; + UsageSlots.reserve(std::chrono::seconds(MaxAge / TrimInterval).count()); + + std::vector<CacheBucket*> Buckets; + { + RwLock::SharedLockScope __(m_Lock); + RwLock::SharedLockScope _(m_Lock); + Buckets.reserve(m_Buckets.size()); + for (auto& Kv : m_Buckets) + { + Buckets.push_back(Kv.second.get()); + } + } + for (CacheBucket* Bucket : Buckets) + { + Bucket->GetUsageByAccess(Now, GcClock::Duration(TrimInterval), UsageSlots); + } + + uint64_t TotalSize = 0; + for (size_t Index = 0; Index < UsageSlots.size(); ++Index) + { + TotalSize += UsageSlots[Index]; + if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes) + { + GcClock::TimePoint ExpireTime = Now - (TrimInterval * Index); + MemCacheTrim(Buckets, ExpireTime); + break; + } + } + }); +} + +void +ZenCacheDiskLayer::MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime) +{ + if (m_Configuration.MemCacheTargetFootprintBytes == 0) + { + return; + } + RwLock::SharedLockScope __(m_Lock); + for (CacheBucket* Bucket : Buckets) + { + Bucket->MemCacheTrim(ExpireTime, m_TotalMemCachedSize); + } + const GcClock::TimePoint Now = GcClock::Now(); + const GcClock::Tick NowTick = Now.time_since_epoch().count(); + const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); + GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim; + const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); + m_LastTickMemCacheTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick); +} + } // namespace zen |