diff options
| author | Dan Engelbrecht <[email protected]> | 2023-10-30 09:32:54 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-10-30 09:32:54 +0100 |
| commit | 3a6a5855cf36967c6bde31292669bfaf832c6f0b (patch) | |
| tree | 593e7c21e6840e7ad312207fddc63e1934e19d85 /src/zenserver/cache/cachedisklayer.cpp | |
| parent | set up arch properly when running tests (mac) (#505) (diff) | |
| download | zen-3a6a5855cf36967c6bde31292669bfaf832c6f0b.tar.xz zen-3a6a5855cf36967c6bde31292669bfaf832c6f0b.zip | |
New GC implementation (#459)
- Feature: New garbage collection implementation, still in evaluation mode. Enabled by `--gc-v2` command line option
Diffstat (limited to 'src/zenserver/cache/cachedisklayer.cpp')
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.cpp | 633 |
1 files changed, 547 insertions, 86 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 2efec1e66..38cbf3a93 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -168,8 +168,13 @@ SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) const size_t ZenCacheDiskLayer::CacheBucket::UnknownReferencesIndex; const size_t ZenCacheDiskLayer::CacheBucket::NoReferencesIndex; -ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName, const BucketConfiguration& Config) -: m_BucketName(std::move(BucketName)) +ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, + std::atomic_uint64_t& OuterCacheMemoryUsage, + std::string BucketName, + const BucketConfiguration& Config) +: m_Gc(Gc) +, m_OuterCacheMemoryUsage(OuterCacheMemoryUsage) +, m_BucketName(std::move(BucketName)) , m_Configuration(Config) , m_BucketId(Oid::Zero) { @@ -179,10 +184,12 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName, const Bucket // it makes sense to have a different strategy for legacy values m_Configuration.LargeObjectThreshold = 16 * 1024 * 1024; } + m_Gc.AddGcReferencer(*this); } ZenCacheDiskLayer::CacheBucket::~CacheBucket() { + m_Gc.RemoveGcReferencer(*this); } bool @@ -717,7 +724,7 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentTy } bool -ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue, std::atomic_uint64_t& CacheMemoryUsage) +ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { metrics::RequestStats::Scope StatsScope(m_GetOps, 0); @@ -782,8 +789,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal if (!m_CachedPayloads[UpdateIt->second]) { m_CachedPayloads[UpdateIt->second] = OutValue.Value; - m_MemCachedSize.fetch_add(ValueSize); - CacheMemoryUsage.fetch_add(ValueSize); + AddMemCacheUsage(ValueSize); m_MemoryWriteCount++; } } @@ -834,27 +840,24 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } void -ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, - const ZenCacheValue& Value, - std::span<IoHash> References, - std::atomic_uint64_t& CacheMemoryUsage) +ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) { metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) { - PutStandaloneCacheValue(HashKey, Value, References, CacheMemoryUsage); + PutStandaloneCacheValue(HashKey, Value, References); } else { - PutInlineCacheValue(HashKey, Value, References, CacheMemoryUsage); + PutInlineCacheValue(HashKey, Value, References); } m_DiskWriteCount++; } void -ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime, std::atomic_uint64_t& CacheMemoryUsage) +ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime) { GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); @@ -864,8 +867,7 @@ ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime, std: if (m_AccessTimes[Kv.second] < ExpireTicks) { size_t PayloadSize = m_CachedPayloads[Kv.second].GetSize(); - m_MemCachedSize.fetch_sub(PayloadSize); - CacheMemoryUsage.fetch_sub(PayloadSize); + RemoveMemCacheUsage(PayloadSize); m_CachedPayloads[Kv.second] = {}; } } @@ -900,7 +902,7 @@ ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart, } bool -ZenCacheDiskLayer::CacheBucket::Drop(std::atomic_uint64_t& CacheMemoryUsage) +ZenCacheDiskLayer::CacheBucket::Drop() { ZEN_TRACE_CPU("Z$::Disk::Bucket::Drop"); @@ -926,7 +928,7 @@ ZenCacheDiskLayer::CacheBucket::Drop(std::atomic_uint64_t& CacheMemoryUsage) m_NextReferenceHashesIndexes.clear(); m_ReferenceCount = 0; m_StandaloneSize.store(0); - CacheMemoryUsage.fetch_sub(m_MemCachedSize.load()); + m_OuterCacheMemoryUsage.fetch_sub(m_MemCachedSize.load()); m_MemCachedSize.store(0); return Deleted; @@ -1102,7 +1104,7 @@ ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer) }; void -ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx, std::atomic_uint64_t& CacheMemoryUsage) +ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) { ZEN_TRACE_CPU("Z$::Disk::Bucket::Scrub"); @@ -1292,8 +1294,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx, std::atomic_uint if (m_Configuration.MemCacheSizeThreshold > 0) { size_t CachedSize = m_CachedPayloads[It->second].GetSize(); - m_MemCachedSize.fetch_sub(CachedSize); - CacheMemoryUsage.fetch_sub(CachedSize); + RemoveMemCacheUsage(CachedSize); m_CachedPayloads[It->second] = IoBuffer{}; } @@ -1411,8 +1412,9 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) for (const auto& Entry : Index) { - const IoHash& Key = Entry.first; - GcClock::Tick AccessTime = AccessTimes[Entry.second]; + const IoHash& Key = Entry.first; + size_t PayloadIndex = Entry.second; + GcClock::Tick AccessTime = AccessTimes[PayloadIndex]; if (AccessTime < ExpireTicks) { ExpiredKeys.push_back(Key); @@ -1424,7 +1426,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) continue; } - BucketPayload& Payload = Payloads[Entry.second]; + BucketPayload& Payload = Payloads[PayloadIndex]; const DiskLocation& Loc = Payload.Location; if (!Loc.IsFlagSet(DiskLocation::kStructured)) @@ -1433,7 +1435,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) } if (m_Configuration.EnableReferenceCaching) { - if (FirstReferenceIndex.empty() || (FirstReferenceIndex[Entry.second] == UnknownReferencesIndex)) + if (FirstReferenceIndex.empty() || (FirstReferenceIndex[PayloadIndex] == UnknownReferencesIndex)) { StructuredItemsWithUnknownAttachments.push_back(Entry); continue; @@ -1450,7 +1452,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); #endif // CALCULATE_BLOCKING_TIME - if (auto It = m_Index.find(Entry.first); It != m_Index.end()) + if (auto It = m_Index.find(Key); It != m_Index.end()) { ReferencesAreKnown = GetReferences(IndexLock, m_FirstReferenceIndex[It->second], Cids); } @@ -1470,13 +1472,15 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) for (const auto& Entry : StructuredItemsWithUnknownAttachments) { - BucketPayload& Payload = Payloads[Entry.second]; - const DiskLocation& Loc = Payload.Location; + const IoHash& Key = Entry.first; + size_t PayloadIndex = Entry.second; + BucketPayload& Payload = Payloads[PayloadIndex]; + const DiskLocation& Loc = Payload.Location; { IoBuffer Buffer; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { - if (Buffer = GetStandaloneCacheValue(Loc.GetContentType(), Entry.first); !Buffer) + if (Buffer = GetStandaloneCacheValue(Loc.GetContentType(), Key); !Buffer) { continue; } @@ -1492,7 +1496,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); #endif // CALCULATE_BLOCKING_TIME - if (auto It = m_Index.find(Entry.first); It != m_Index.end()) + if (auto It = m_Index.find(Key); It != m_Index.end()) { if (m_Configuration.MemCacheSizeThreshold > 0) { @@ -1514,8 +1518,8 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) ZEN_ASSERT(Buffer); ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject); - CbObject Obj(SharedBuffer{Buffer}); - size_t CurrentCidCount = Cids.size(); + CbObjectView Obj(Buffer.GetData()); + size_t CurrentCidCount = Cids.size(); Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); if (m_Configuration.EnableReferenceCaching) { @@ -1528,7 +1532,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); #endif // CALCULATE_BLOCKING_TIME - if (auto It = m_Index.find(Entry.first); It != m_Index.end()) + if (auto It = m_Index.find(Key); It != m_Index.end()) { if (m_FirstReferenceIndex[It->second] == UnknownReferencesIndex) { @@ -1556,7 +1560,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) } void -ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx, std::atomic_uint64_t& CacheMemoryUsage) +ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage"); @@ -1762,17 +1766,19 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx, std::atomic_uin TotalChunkCount = 0; for (const auto& Entry : Index) { - const DiskLocation& DiskLocation = Payloads[Entry.second].Location; + size_t EntryIndex = Entry.second; + const DiskLocation& DiskLocation = Payloads[EntryIndex].Location; if (DiskLocation.Flags & DiskLocation::kStandaloneFile) { continue; } + const IoHash& Key = Entry.first; BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment); size_t ChunkIndex = ChunkLocations.size(); ChunkLocations.push_back(Location); - ChunkIndexToChunkHash[ChunkIndex] = Entry.first; - if (ExpiredCacheKeys.contains(Entry.first)) + ChunkIndexToChunkHash[ChunkIndex] = Key; + if (ExpiredCacheKeys.contains(Key)) { continue; } @@ -1815,12 +1821,12 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx, std::atomic_uin }); for (const auto& Entry : MovedChunks) { - size_t ChunkIndex = Entry.first; - const BlockStoreLocation& NewLocation = Entry.second; - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - size_t PayloadIndex = m_Index[ChunkHash]; - BucketPayload& Payload = m_Payloads[PayloadIndex]; - if (Payloads[Index[ChunkHash]].Location != m_Payloads[PayloadIndex].Location) + size_t ChunkIndex = Entry.first; + const BlockStoreLocation& NewLocation = Entry.second; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + size_t EntryIndex = m_Index[ChunkHash]; + BucketPayload& Payload = m_Payloads[EntryIndex]; + if (Payloads[Index[ChunkHash]].Location != m_Payloads[EntryIndex].Location) { // Entry has been updated while GC was running, ignore the move continue; @@ -1830,9 +1836,9 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx, std::atomic_uin } for (const size_t ChunkIndex : RemovedChunks) { - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - size_t PayloadIndex = m_Index[ChunkHash]; - const BucketPayload& Payload = m_Payloads[PayloadIndex]; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + size_t EntryIndex = m_Index[ChunkHash]; + const BucketPayload& Payload = m_Payloads[EntryIndex]; if (Payloads[Index[ChunkHash]].Location != Payload.Location) { // Entry has been updated while GC was running, ignore the delete @@ -1843,12 +1849,11 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx, std::atomic_uin .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment), m_Configuration.PayloadAlignment, OldDiskLocation.GetFlags() | DiskLocation::kTombStone)}); - if (m_Configuration.MemCacheSizeThreshold > 0 && m_CachedPayloads[PayloadIndex]) + if (m_Configuration.MemCacheSizeThreshold > 0 && m_CachedPayloads[EntryIndex]) { - uint64_t CachePayloadSize = m_CachedPayloads[PayloadIndex].Size(); - m_MemCachedSize.fetch_sub(CachePayloadSize, std::memory_order::relaxed); - CacheMemoryUsage.fetch_sub(CachePayloadSize, std::memory_order::relaxed); - m_CachedPayloads[PayloadIndex] = IoBuffer{}; + uint64_t CachePayloadSize = m_CachedPayloads[EntryIndex].Size(); + RemoveMemCacheUsage(CachePayloadSize); + m_CachedPayloads[EntryIndex] = IoBuffer{}; } m_Index.erase(ChunkHash); DeletedChunks.insert(ChunkHash); @@ -1891,10 +1896,10 @@ ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, size_t Index) const BucketPayload& Payload = m_Payloads[Index]; if (Payload.Location.IsFlagSet(DiskLocation::kStructured)) { - IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile) - ? GetStandaloneCacheValue(Payload.Location.GetContentType(), Key) - : GetInlineCacheValue(Payload.Location); - CbObject Obj(SharedBuffer{Value}); + IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile) + ? GetStandaloneCacheValue(Payload.Location.GetContentType(), Key) + : GetInlineCacheValue(Payload.Location); + CbObjectView Obj(Value.GetData()); Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); }); } return CacheValueDetails::ValueDetails{.Size = Payload.Location.Size(), @@ -1958,16 +1963,13 @@ ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) } for (CacheBucket* Bucket : Buckets) { - Bucket->CollectGarbage(GcCtx, m_TotalMemCachedSize); + Bucket->CollectGarbage(GcCtx); } MemCacheTrim(Buckets, GcCtx.CacheExpireTime()); } void -ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, - const ZenCacheValue& Value, - std::span<IoHash> References, - std::atomic_uint64_t& CacheMemoryUsage) +ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) { ZEN_TRACE_CPU("Z$::Disk::Bucket::PutStandaloneCacheValue"); @@ -2118,8 +2120,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey if (m_CachedPayloads[EntryIndex]) { uint64_t CachePayloadSize = m_CachedPayloads[EntryIndex].Size(); - m_MemCachedSize.fetch_sub(CachePayloadSize, std::memory_order::relaxed); - CacheMemoryUsage.fetch_sub(CachePayloadSize, std::memory_order::relaxed); + RemoveMemCacheUsage(CachePayloadSize); m_CachedPayloads[EntryIndex] = IoBuffer{}; } } @@ -2131,10 +2132,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey } void -ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, - const ZenCacheValue& Value, - std::span<IoHash> References, - std::atomic_uint64_t& CacheMemoryUsage) +ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) { ZEN_TRACE_CPU("Z$::Disk::Bucket::PutInlineCacheValue"); @@ -2176,14 +2174,12 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, if (m_CachedPayloads[EntryIndex]) { uint64_t OldCachedSize = m_CachedPayloads[EntryIndex].GetSize(); - m_MemCachedSize.fetch_sub(OldCachedSize); - CacheMemoryUsage.fetch_sub(OldCachedSize); + RemoveMemCacheUsage(OldCachedSize); } if (MemCacheBuffer) { - m_MemCachedSize.fetch_add(PayloadSize); - CacheMemoryUsage.fetch_add(PayloadSize); + AddMemCacheUsage(PayloadSize); m_MemoryWriteCount++; } m_CachedPayloads[EntryIndex] = std::move(MemCacheBuffer); @@ -2202,8 +2198,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, { if (MemCacheBuffer) { - m_MemCachedSize.fetch_add(PayloadSize); - CacheMemoryUsage.fetch_add(PayloadSize); + AddMemCacheUsage(PayloadSize); m_MemoryWriteCount++; } m_CachedPayloads.emplace_back(std::move(MemCacheBuffer)); @@ -2219,6 +2214,409 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, } void +ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx) +{ + size_t TotalEntries = 0; + tsl::robin_set<IoHash, IoHash::Hasher> ExpiredInlineKeys; + std::vector<std::pair<IoHash, uint64_t>> ExpiredStandaloneKeys; + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_DEBUG("gc cache bucket '{}': removed {} expired keys out of {} in {}", + m_BucketDir, + ExpiredStandaloneKeys.size() + ExpiredInlineKeys.size(), + TotalEntries, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + const GcClock::Tick ExpireTicks = Ctx.Settings.CacheExpireTime.time_since_epoch().count(); + + BlockStoreCompactState BlockCompactState; + BlockStore::ReclaimSnapshotState BlockSnapshotState; + std::vector<IoHash> BlockCompactStateKeys; + std::vector<DiskIndexEntry> ExpiredEntries; + uint64_t RemovedStandaloneSize = 0; + { + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + if (Ctx.Settings.CollectSmallObjects) + { + BlockSnapshotState = m_BlockStore.GetReclaimSnapshotState(); + } + TotalEntries = m_Index.size(); + + // Find out expired keys and affected blocks + for (const auto& Entry : m_Index) + { + const IoHash& Key = Entry.first; + size_t EntryIndex = Entry.second; + GcClock::Tick AccessTime = m_AccessTimes[EntryIndex]; + if (AccessTime >= ExpireTicks) + { + continue; + } + + const BucketPayload& Payload = m_Payloads[EntryIndex]; + DiskIndexEntry ExpiredEntry = {.Key = Key, .Location = Payload.Location}; + ExpiredEntry.Location.Flags |= DiskLocation::kTombStone; + + if (Payload.Location.Flags & DiskLocation::kStandaloneFile) + { + ExpiredStandaloneKeys.push_back({Key, Payload.Location.Size()}); + RemovedStandaloneSize += Payload.Location.Size(); + ExpiredEntries.push_back(ExpiredEntry); + } + else if (Ctx.Settings.CollectSmallObjects) + { + ExpiredInlineKeys.insert(Key); + uint32_t BlockIndex = Payload.Location.Location.BlockLocation.GetBlockIndex(); + bool IsActiveWriteBlock = BlockSnapshotState.m_ActiveWriteBlocks.contains(BlockIndex); + if (!IsActiveWriteBlock) + { + BlockCompactState.AddBlock(BlockIndex); + } + ExpiredEntries.push_back(ExpiredEntry); + } + } + + Ctx.ExpiredItems.fetch_add(ExpiredStandaloneKeys.size() + ExpiredInlineKeys.size()); + + // Get all locations we need to keep for affected blocks + if (Ctx.Settings.CollectSmallObjects && !ExpiredInlineKeys.empty()) + { + for (const auto& Entry : m_Index) + { + const IoHash& Key = Entry.first; + if (ExpiredInlineKeys.contains(Key)) + { + continue; + } + size_t EntryIndex = Entry.second; + const BucketPayload& Payload = m_Payloads[EntryIndex]; + if (Payload.Location.Flags & DiskLocation::kStandaloneFile) + { + continue; + } + if (BlockCompactState.AddKeepLocation(Payload.Location.GetBlockLocation(m_Configuration.PayloadAlignment))) + { + BlockCompactStateKeys.push_back(Key); + } + } + } + + if (Ctx.Settings.IsDeleteMode) + { + for (const DiskIndexEntry& Entry : ExpiredEntries) + { + auto It = m_Index.find(Entry.Key); + ZEN_ASSERT(It != m_Index.end()); + if (m_Configuration.MemCacheSizeThreshold > 0 && m_CachedPayloads[It->second]) + { + size_t PayloadSize = m_CachedPayloads[It->second].GetSize(); + Ctx.RemovedMemory.fetch_add(PayloadSize); + RemoveMemCacheUsage(PayloadSize); + } + m_Index.erase(It); + } + m_SlogFile.Append(ExpiredEntries); + m_StandaloneSize.fetch_sub(RemovedStandaloneSize, std::memory_order::relaxed); + } + } + Ctx.Items.fetch_add(TotalEntries); + + if (ExpiredEntries.empty()) + { + return; + } + + if (!Ctx.Settings.IsDeleteMode) + { + return; + } + + Ctx.DeletedItems.fetch_add(ExpiredEntries.size()); + + // Compact standalone items + ExtendablePathBuilder<256> Path; + for (const std::pair<IoHash, uint64_t>& ExpiredKey : ExpiredStandaloneKeys) + { + Path.Reset(); + BuildPath(Path, ExpiredKey.first); + fs::path FilePath = Path.ToPath(); + + RwLock::SharedLockScope IndexLock(m_IndexLock); + if (m_Index.contains(ExpiredKey.first)) + { + // Someone added it back, let the file on disk be + ZEN_DEBUG("gc cache bucket '{}': skipping z$ delete standalone of file '{}' FAILED, it has been added back", + m_BucketDir, + Path.ToUtf8()); + continue; + } + + RwLock::ExclusiveLockScope ValueLock(LockForHash(ExpiredKey.first)); + IndexLock.ReleaseNow(); + ZEN_DEBUG("gc cache bucket '{}': deleting standalone cache file '{}'", m_BucketDir, Path.ToUtf8()); + + std::error_code Ec; + if (!fs::remove(FilePath, Ec)) + { + continue; + } + if (Ec) + { + ZEN_WARN("gc cache bucket '{}': delete expired z$ standalone file '{}' FAILED, reason: '{}'", + m_BucketDir, + Path.ToUtf8(), + Ec.message()); + continue; + } + Ctx.RemovedDiskSpace.fetch_add(ExpiredKey.second); + } + + if (Ctx.Settings.CollectSmallObjects && !ExpiredInlineKeys.empty()) + { + // Compact block store + m_BlockStore.CompactBlocks( + BlockCompactState, + m_Configuration.PayloadAlignment, + [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) { + std::vector<DiskIndexEntry> MovedEntries; + RwLock::ExclusiveLockScope _(m_IndexLock); + for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray) + { + size_t ChunkIndex = Moved.first; + const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; + + if (auto It = m_Index.find(Key); It != m_Index.end()) + { + BucketPayload& Payload = m_Payloads[It->second]; + const BlockStoreLocation& OldLocation = BlockCompactState.GetLocation(ChunkIndex); + if (Payload.Location.GetBlockLocation(m_Configuration.PayloadAlignment) != OldLocation) + { + // Someone has moved our chunk so lets just skip the new location we were provided, it will be GC:d at a later + // time + continue; + } + + const BlockStoreLocation& NewLocation = Moved.second; + + Payload.Location = DiskLocation(NewLocation, m_Configuration.PayloadAlignment, Payload.Location.GetFlags()); + MovedEntries.push_back({.Key = Key, .Location = Payload.Location}); + } + } + m_SlogFile.Append(MovedEntries); + Ctx.RemovedDiskSpace.fetch_add(FreedDiskSpace); + }, + [&]() { return 0; }); + } + + std::vector<BucketPayload> Payloads; + std::vector<AccessTime> AccessTimes; + std::vector<IoBuffer> CachedPayloads; + std::vector<size_t> FirstReferenceIndex; + IndexMap Index; + { + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + CompactState(Payloads, AccessTimes, CachedPayloads, FirstReferenceIndex, Index, IndexLock); + } +} + +class DiskBucketReferenceChecker : public GcReferenceChecker +{ +public: + DiskBucketReferenceChecker(ZenCacheDiskLayer::CacheBucket& Owner) : m_CacheBucket(Owner) {} + + virtual ~DiskBucketReferenceChecker() + { + m_IndexLock.reset(); + if (!m_CacheBucket.m_Configuration.EnableReferenceCaching) + { + // If reference caching is not enabled, we temporarily used the data structure for reference caching, lets reset it + m_CacheBucket.ClearReferenceCache(); + } + } + + virtual void LockState(GcCtx&) override + { + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_DEBUG("gc cache bucket '{}': found {} references in {}", + m_CacheBucket.m_BucketDir, + m_CacheBucket.m_ReferenceCount + m_UncachedReferences.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + m_IndexLock = std::make_unique<RwLock::SharedLockScope>(m_CacheBucket.m_IndexLock); + + // Rescan to see if any cache items needs refreshing since last pass when we had the lock + for (const auto& Entry : m_CacheBucket.m_Index) + { + size_t PayloadIndex = Entry.second; + const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_CacheBucket.m_Payloads[PayloadIndex]; + const DiskLocation& Loc = Payload.Location; + + if (!Loc.IsFlagSet(DiskLocation::kStructured)) + { + continue; + } + ZEN_ASSERT(!m_CacheBucket.m_FirstReferenceIndex.empty()); + const IoHash& Key = Entry.first; + if (m_CacheBucket.m_FirstReferenceIndex[PayloadIndex] == ZenCacheDiskLayer::CacheBucket::UnknownReferencesIndex) + { + IoBuffer Buffer; + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + Buffer = m_CacheBucket.GetStandaloneCacheValue(Loc.GetContentType(), Key); + } + else + { + Buffer = m_CacheBucket.GetInlineCacheValue(Loc); + } + + if (Buffer) + { + ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject); + CbObjectView Obj(Buffer.GetData()); + Obj.IterateAttachments([this](CbFieldView Field) { m_UncachedReferences.insert(Field.AsAttachment()); }); + } + } + } + } + + virtual void RemoveUsedReferencesFromSet(GcCtx&, HashSet& IoCids) override + { + ZEN_ASSERT(m_IndexLock); + + for (const IoHash& ReferenceHash : m_CacheBucket.m_ReferenceHashes) + { + IoCids.erase(ReferenceHash); + } + + for (const IoHash& ReferenceHash : m_UncachedReferences) + { + IoCids.erase(ReferenceHash); + } + } + ZenCacheDiskLayer::CacheBucket& m_CacheBucket; + std::unique_ptr<RwLock::SharedLockScope> m_IndexLock; + HashSet m_UncachedReferences; +}; + +std::vector<GcReferenceChecker*> +ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx&) +{ + Stopwatch Timer; + const auto _ = MakeGuard( + [&] { ZEN_DEBUG("gc cache bucket '{}': refreshed reference cache in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + + std::vector<IoHash> UpdateKeys; + std::vector<IoHash> StandaloneKeys; + std::vector<size_t> ReferenceCounts; + std::vector<IoHash> References; + + // Refresh cache + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + for (const auto& Entry : m_Index) + { + size_t PayloadIndex = Entry.second; + const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Payloads[PayloadIndex]; + const DiskLocation& Loc = Payload.Location; + + if (!Loc.IsFlagSet(DiskLocation::kStructured)) + { + continue; + } + if (m_Configuration.EnableReferenceCaching && + m_FirstReferenceIndex[PayloadIndex] != ZenCacheDiskLayer::CacheBucket::UnknownReferencesIndex) + { + continue; + } + const IoHash& Key = Entry.first; + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + StandaloneKeys.push_back(Key); + continue; + } + IoBuffer Buffer = GetInlineCacheValue(Loc); + if (!Buffer) + { + UpdateKeys.push_back(Key); + ReferenceCounts.push_back(0); + continue; + } + size_t CurrentReferenceCount = References.size(); + { + CbObjectView Obj(Buffer.GetData()); + Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); }); + Buffer = {}; + } + UpdateKeys.push_back(Key); + ReferenceCounts.push_back(References.size() - CurrentReferenceCount); + } + } + { + for (const IoHash& Key : StandaloneKeys) + { + IoBuffer Buffer = GetStandaloneCacheValue(ZenContentType::kCbObject, Key); + if (!Buffer) + { + continue; + } + + size_t CurrentReferenceCount = References.size(); + { + CbObjectView Obj(Buffer.GetData()); + Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); }); + Buffer = {}; + } + UpdateKeys.push_back(Key); + ReferenceCounts.push_back(References.size() - CurrentReferenceCount); + } + } + + { + size_t ReferenceOffset = 0; + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + if (!m_Configuration.EnableReferenceCaching) + { + ZEN_ASSERT(m_FirstReferenceIndex.empty()); + ZEN_ASSERT(m_ReferenceHashes.empty()); + ZEN_ASSERT(m_NextReferenceHashesIndexes.empty()); + ZEN_ASSERT(m_ReferenceCount == 0); + // If reference caching is not enabled, we will resize and use the data structure in place for reference caching when + // we figure out what this bucket references. This will be reset once the DiskBucketReferenceChecker is deleted. + m_FirstReferenceIndex.resize(m_Payloads.size(), UnknownReferencesIndex); + } + for (size_t Index = 0; Index < UpdateKeys.size(); Index++) + { + const IoHash& Key = UpdateKeys[Index]; + size_t ReferenceCount = ReferenceCounts[Index]; + auto It = m_Index.find(Key); + if (It == m_Index.end()) + { + ReferenceOffset += ReferenceCount; + continue; + } + if (m_FirstReferenceIndex[It->second] != ZenCacheDiskLayer::CacheBucket::UnknownReferencesIndex) + { + continue; + } + SetReferences(IndexLock, + m_FirstReferenceIndex[It->second], + std::span<IoHash>{References.data() + ReferenceOffset, ReferenceCount}); + ReferenceOffset += ReferenceCount; + } + if (m_Configuration.EnableReferenceCaching) + { + CompactReferences(IndexLock); + } + } + + return {new DiskBucketReferenceChecker(*this)}; +} + +void ZenCacheDiskLayer::CacheBucket::CompactReferences(RwLock::ExclusiveLockScope&) { std::vector<size_t> FirstReferenceIndex; @@ -2381,6 +2779,19 @@ ZenCacheDiskLayer::CacheBucket::LockedGetReferences(std::size_t FirstReferenceIn } void +ZenCacheDiskLayer::CacheBucket::ClearReferenceCache() +{ + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + m_FirstReferenceIndex.clear(); + m_FirstReferenceIndex.shrink_to_fit(); + m_ReferenceHashes.clear(); + m_ReferenceHashes.shrink_to_fit(); + m_NextReferenceHashesIndexes.clear(); + m_NextReferenceHashesIndexes.shrink_to_fit(); + m_ReferenceCount = 0; +} + +void ZenCacheDiskLayer::CacheBucket::CompactState(std::vector<BucketPayload>& Payloads, std::vector<AccessTime>& AccessTimes, std::vector<IoBuffer>& CachedPayloads, @@ -2426,16 +2837,34 @@ ZenCacheDiskLayer::CacheBucket::CompactState(std::vector<BucketPayload>& Payload } } +#if ZEN_WITH_TESTS +void +ZenCacheDiskLayer::CacheBucket::SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time) +{ + GcClock::Tick TimeTick = Time.time_since_epoch().count(); + RwLock::SharedLockScope IndexLock(m_IndexLock); + if (auto It = m_Index.find(HashKey); It != m_Index.end()) + { + size_t EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); + m_AccessTimes[EntryIndex] = TimeTick; + } +} +#endif // ZEN_WITH_TESTS + ////////////////////////////////////////////////////////////////////////// -ZenCacheDiskLayer::ZenCacheDiskLayer(JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config) -: m_JobQueue(JobQueue) +ZenCacheDiskLayer::ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config) +: m_Gc(Gc) +, m_JobQueue(JobQueue) , m_RootDir(RootDir) , m_Configuration(Config) { } -ZenCacheDiskLayer::~ZenCacheDiskLayer() = default; +ZenCacheDiskLayer::~ZenCacheDiskLayer() +{ +} bool ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) @@ -2468,8 +2897,10 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach } else { - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_Configuration.BucketConfig)); - Bucket = InsertResult.first->second.get(); + auto InsertResult = + m_Buckets.emplace(BucketName, + std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig)); + Bucket = InsertResult.first->second.get(); std::filesystem::path BucketPath = m_RootDir; BucketPath /= BucketName; @@ -2483,7 +2914,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach } ZEN_ASSERT(Bucket != nullptr); - if (Bucket->Get(HashKey, OutValue, m_TotalMemCachedSize)) + if (Bucket->Get(HashKey, OutValue)) { TryMemCacheTrim(); return true; @@ -2522,8 +2953,10 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z } else { - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_Configuration.BucketConfig)); - Bucket = InsertResult.first->second.get(); + auto InsertResult = + m_Buckets.emplace(BucketName, + std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig)); + Bucket = InsertResult.first->second.get(); std::filesystem::path BucketPath = m_RootDir; BucketPath /= BucketName; @@ -2547,7 +2980,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z ZEN_ASSERT(Bucket != nullptr); - Bucket->Put(HashKey, Value, References, m_TotalMemCachedSize); + Bucket->Put(HashKey, Value, References); TryMemCacheTrim(); } @@ -2579,8 +3012,10 @@ ZenCacheDiskLayer::DiscoverBuckets() continue; } - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_Configuration.BucketConfig)); - CacheBucket& Bucket = *InsertResult.first->second; + auto InsertResult = + m_Buckets.emplace(BucketName, + std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig)); + CacheBucket& Bucket = *InsertResult.first->second; try { @@ -2636,7 +3071,7 @@ ZenCacheDiskLayer::DropBucket(std::string_view InBucket) m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It); - return Bucket.Drop(m_TotalMemCachedSize); + return Bucket.Drop(); } // Make sure we remove the folder even if we don't know about the bucket @@ -2658,7 +3093,7 @@ ZenCacheDiskLayer::Drop() CacheBucket& Bucket = *It->second; m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It->first); - if (!Bucket.Drop(m_TotalMemCachedSize)) + if (!Bucket.Drop()) { return false; } @@ -2700,10 +3135,10 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) { #if 1 Results.push_back(Ctx.ThreadPool().EnqueueTask( - std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx, m_TotalMemCachedSize); }})); + std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }})); #else CacheBucket& Bucket = *Kv.second; - Bucket.ScrubStorage(Ctx, m_TotalMemCachedSize); + Bucket.ScrubStorage(Ctx); #endif } @@ -2914,7 +3349,7 @@ ZenCacheDiskLayer::MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::Tim RwLock::SharedLockScope __(m_Lock); for (CacheBucket* Bucket : Buckets) { - Bucket->MemCacheTrim(ExpireTime, m_TotalMemCachedSize); + Bucket->MemCacheTrim(ExpireTime); } const GcClock::TimePoint Now = GcClock::Now(); const GcClock::Tick NowTick = Now.time_since_epoch().count(); @@ -2924,4 +3359,30 @@ ZenCacheDiskLayer::MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::Tim m_LastTickMemCacheTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick); } +#if ZEN_WITH_TESTS +void +ZenCacheDiskLayer::SetAccessTime(std::string_view InBucket, const IoHash& HashKey, GcClock::TimePoint Time) +{ + const auto BucketName = std::string(InBucket); + CacheBucket* Bucket = nullptr; + + { + RwLock::SharedLockScope _(m_Lock); + + auto It = m_Buckets.find(BucketName); + + if (It != m_Buckets.end()) + { + Bucket = It->second.get(); + } + } + + if (Bucket == nullptr) + { + return; + } + Bucket->SetAccessTime(HashKey, Time); +} +#endif // ZEN_WITH_TESTS + } // namespace zen |