From 0973cca6ff44862bf9df65464e0a2ad207063d83 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 14 Nov 2023 14:46:16 +0100 Subject: fix index out of bounds in CacheBucket::CompactState (#532) * use PayloadIndex for indexing into payload array * naming cleanup * fix metadata index in CacheBucket::CompactState --- src/zenserver/cache/cachedisklayer.cpp | 49 +++++++++++++++++----------------- 1 file changed, 24 insertions(+), 25 deletions(-) (limited to 'src/zenserver/cache/cachedisklayer.cpp') diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 9bb75480e..a6cb54444 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -246,12 +246,10 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo const auto _ = MakeGuard([&] { ZEN_INFO("read store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - const uint64_t kInvalidIndex = ~(0ull); - const uint64_t Count = Manifest["Count"sv].AsUInt64(0); if (Count != 0) { - std::vector KeysIndexes; + std::vector KeysIndexes; KeysIndexes.reserve(Count); CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView(); for (CbFieldView& KeyView : KeyArray) @@ -262,15 +260,15 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo } else { - KeysIndexes.push_back(kInvalidIndex); + KeysIndexes.push_back(PayloadIndex()); } } size_t KeyIndexOffset = 0; CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView(); for (CbFieldView& TimeStampView : TimeStampArray) { - const size_t KeyIndex = KeysIndexes[KeyIndexOffset++]; - if (KeyIndex != kInvalidIndex) + const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; + if (KeyIndex) { m_AccessTimes[KeyIndex] = TimeStampView.AsInt64(); } @@ -284,9 +282,9 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo auto RawSizeIt = RawSizeArray.CreateViewIterator(); while (RawHashIt != CbFieldViewIterator()) { - const size_t KeyIndex = KeysIndexes[KeyIndexOffset++]; + const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; - if (KeyIndex != kInvalidIndex) + if (KeyIndex) { uint64_t RawSize = RawSizeIt.AsUInt64(); IoHash RawHash = RawHashIt.AsHash(); @@ -1729,8 +1727,8 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) std::unordered_set ExpiredCacheKeys(ExpiredCacheKeySpan.begin(), ExpiredCacheKeySpan.end()); std::vector ExpiredStandaloneEntries; - IndexMap Index; - std::vector Payloads; + IndexMap IndexSnapshot; + std::vector PayloadsSnapshot; BlockStore::ReclaimSnapshotState BlockStoreState; { bool Expected = false; @@ -1741,7 +1739,6 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) } auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); - std::vector AccessTimes; { ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::State"); RwLock::SharedLockScope IndexLock(m_IndexLock); @@ -1755,23 +1752,23 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); - Payloads = m_Payloads; - AccessTimes = m_AccessTimes; - Index = m_Index; - for (const IoHash& Key : ExpiredCacheKeys) { - if (auto It = Index.find(Key); It != Index.end()) + if (auto It = m_Index.find(Key); It != m_Index.end()) { - const BucketPayload& Payload = Payloads[It->second]; - DiskIndexEntry Entry = {.Key = It->first, .Location = Payload.Location}; - if (Entry.Location.Flags & DiskLocation::kStandaloneFile) + const BucketPayload& Payload = m_Payloads[It->second]; + if (Payload.Location.Flags & DiskLocation::kStandaloneFile) { + DiskIndexEntry Entry = {.Key = Key, .Location = Payload.Location}; Entry.Location.Flags |= DiskLocation::kTombStone; ExpiredStandaloneEntries.push_back(Entry); } } } + + PayloadsSnapshot = m_Payloads; + IndexSnapshot = m_Index; + if (GcCtx.IsDeletionMode()) { IndexLock.ReleaseNow(); @@ -1836,7 +1833,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) } } - TotalChunkCount = Index.size(); + TotalChunkCount = IndexSnapshot.size(); std::vector ChunkLocations; BlockStore::ChunkIndexArray KeepChunkIndexes; @@ -1846,10 +1843,10 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) ChunkIndexToChunkHash.reserve(TotalChunkCount); { TotalChunkCount = 0; - for (const auto& Entry : Index) + for (const auto& Entry : IndexSnapshot) { size_t EntryIndex = Entry.second; - const DiskLocation& DiskLocation = Payloads[EntryIndex].Location; + const DiskLocation& DiskLocation = PayloadsSnapshot[EntryIndex].Location; if (DiskLocation.Flags & DiskLocation::kStandaloneFile) { @@ -1908,7 +1905,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; size_t EntryIndex = m_Index[ChunkHash]; BucketPayload& Payload = m_Payloads[EntryIndex]; - if (Payloads[Index[ChunkHash]].Location != m_Payloads[EntryIndex].Location) + if (PayloadsSnapshot[IndexSnapshot[ChunkHash]].Location != m_Payloads[EntryIndex].Location) { // Entry has been updated while GC was running, ignore the move continue; @@ -1921,7 +1918,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; size_t EntryIndex = m_Index[ChunkHash]; BucketPayload& Payload = m_Payloads[EntryIndex]; - if (Payloads[Index[ChunkHash]].Location != Payload.Location) + if (PayloadsSnapshot[IndexSnapshot[ChunkHash]].Location != Payload.Location) { // Entry has been updated while GC was running, ignore the delete continue; @@ -2813,7 +2810,9 @@ ZenCacheDiskLayer::CacheBucket::CompactReferences(RwLock::ExclusiveLockScope&) } m_FirstReferenceIndex.swap(FirstReferenceIndex); m_ReferenceHashes.swap(NewReferenceHashes); + m_ReferenceHashes.shrink_to_fit(); m_NextReferenceHashesIndexes.swap(NewNextReferenceHashesIndexes); + m_NextReferenceHashesIndexes.shrink_to_fit(); m_ReferenceCount = m_ReferenceHashes.size(); } @@ -2975,7 +2974,7 @@ ZenCacheDiskLayer::CacheBucket::CompactState(std::vector& Payloa if (Payload.MetaData) { MetaDatas.push_back(m_MetaDatas[Payload.MetaData]); - Payload.MetaData = MetaDataIndex(m_MetaDatas.size() - 1); + Payload.MetaData = MetaDataIndex(MetaDatas.size() - 1); } if (Payload.MemCached) { -- cgit v1.2.3 From 573907447db3e19d49c0bcaf3f659cf2d599c738 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 16 Nov 2023 18:50:27 +0100 Subject: blocking queue fix (#550) * make BlockingQueue::m_CompleteAdding non-atomic * ZenCacheDiskLayer::Flush logging * name worker threads in ZenCacheDiskLayer::DiscoverBuckets * name worker threads in gcv2 * improved logging in ZenServerInstance * scrub threadpool naming * remove waitpid handling, we should just call wait to kill zombie processes --- src/zenserver/cache/cachedisklayer.cpp | 43 ++++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 15 deletions(-) (limited to 'src/zenserver/cache/cachedisklayer.cpp') diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index a6cb54444..afb974d76 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -3210,7 +3210,7 @@ ZenCacheDiskLayer::DiscoverBuckets() const size_t MaxHwTreadUse = std::thread::hardware_concurrency(); const int WorkerThreadPoolCount = gsl::narrow(Min(MaxHwTreadUse, FoundBucketDirectories.size())); - WorkerThreadPool Pool(WorkerThreadPoolCount); + WorkerThreadPool Pool(WorkerThreadPoolCount, "CacheBucket::OpenOrCreate"); Latch WorkLatch(1); for (auto& BucketPath : FoundBucketDirectories) { @@ -3300,9 +3300,17 @@ void ZenCacheDiskLayer::Flush() { std::vector Buckets; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (Buckets.empty()) + { + return; + } + ZEN_INFO("Flushed {} buckets at '{}' in {}", Buckets.size(), m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); { - RwLock::SharedLockScope _(m_Lock); + RwLock::SharedLockScope __(m_Lock); if (m_Buckets.empty()) { return; @@ -3314,21 +3322,26 @@ ZenCacheDiskLayer::Flush() Buckets.push_back(Bucket); } } - const size_t MaxHwTreadUse = Max((std::thread::hardware_concurrency() / 4u), 1u); - const int WorkerThreadPoolCount = gsl::narrow(Min(MaxHwTreadUse, Buckets.size())); - - WorkerThreadPool Pool(WorkerThreadPoolCount); - Latch WorkLatch(1); - for (auto& Bucket : Buckets) { - WorkLatch.AddCount(1); - Pool.ScheduleWork([&]() { - auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); - Bucket->Flush(); - }); + const size_t MaxHwTreadUse = Max((std::thread::hardware_concurrency() / 4u), 1u); + const int WorkerThreadPoolCount = gsl::narrow(Min(MaxHwTreadUse, Buckets.size())); + + WorkerThreadPool Pool(WorkerThreadPoolCount, "CacheBucket::Flush"); + Latch WorkLatch(1); + for (auto& Bucket : Buckets) + { + WorkLatch.AddCount(1); + Pool.ScheduleWork([&]() { + auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); + Bucket->Flush(); + }); + } + WorkLatch.CountDown(); + while (!WorkLatch.Wait(1000)) + { + ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", WorkLatch.Remaining(), m_RootDir); + } } - WorkLatch.CountDown(); - WorkLatch.Wait(); } void -- cgit v1.2.3 From 05178f7c18a48b21b9e260de282a86b91df26955 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 21 Nov 2023 15:06:25 +0100 Subject: compact separate for gc referencer (#533) - Refactor GCV2 so GcReferencer::RemoveExpiredData returns a store compactor, moving out the actual disk work from deleting items in the index. - Refactor GCV2 GcResult to reuse GcCompactStoreStats and GcStats - Make Compacting of stores non-parallell to not eat all the disk I/O when running GC --- src/zenserver/cache/cachedisklayer.cpp | 380 ++++++++++++++++++++------------- 1 file changed, 231 insertions(+), 149 deletions(-) (limited to 'src/zenserver/cache/cachedisklayer.cpp') diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index afb974d76..32ef420d1 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -2351,12 +2351,212 @@ ZenCacheDiskLayer::CacheBucket::GetGcName(GcCtx&) return fmt::format("cachebucket:'{}'", m_BucketDir.string()); } -void -ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcReferencerStats& Stats) +class DiskBucketStoreCompactor : public GcStoreCompactor { - size_t TotalEntries = 0; - tsl::robin_set ExpiredInlineKeys; - std::vector> ExpiredStandaloneKeys; +public: + DiskBucketStoreCompactor(ZenCacheDiskLayer::CacheBucket& Bucket, std::vector>&& ExpiredStandaloneKeys) + : m_Bucket(Bucket) + , m_ExpiredStandaloneKeys(std::move(ExpiredStandaloneKeys)) + { + m_ExpiredStandaloneKeys.shrink_to_fit(); + } + + virtual ~DiskBucketStoreCompactor() {} + + virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function& ClaimDiskReserveCallback) override + { + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': RemovedDisk: {} in {}", + m_Bucket.m_BucketDir, + NiceBytes(Stats.RemovedDisk), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + if (!m_ExpiredStandaloneKeys.empty()) + { + // Compact standalone items + size_t Skipped = 0; + ExtendablePathBuilder<256> Path; + for (const std::pair& ExpiredKey : m_ExpiredStandaloneKeys) + { + Path.Reset(); + m_Bucket.BuildPath(Path, ExpiredKey.first); + fs::path FilePath = Path.ToPath(); + + RwLock::SharedLockScope IndexLock(m_Bucket.m_IndexLock); + if (m_Bucket.m_Index.contains(ExpiredKey.first)) + { + // Someone added it back, let the file on disk be + ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': skipping z$ delete standalone of file '{}' FAILED, it has been added back", + m_Bucket.m_BucketDir, + Path.ToUtf8()); + continue; + } + + if (Ctx.Settings.IsDeleteMode) + { + RwLock::ExclusiveLockScope ValueLock(m_Bucket.LockForHash(ExpiredKey.first)); + IndexLock.ReleaseNow(); + ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': deleting standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8()); + + std::error_code Ec; + if (!fs::remove(FilePath, Ec)) + { + continue; + } + if (Ec) + { + ZEN_WARN("GCV2: cachebucket [COMPACT] '{}': delete expired z$ standalone file '{}' FAILED, reason: '{}'", + m_Bucket.m_BucketDir, + Path.ToUtf8(), + Ec.message()); + continue; + } + Stats.RemovedDisk += ExpiredKey.second; + } + else + { + std::error_code Ec; + bool Existed = std::filesystem::is_regular_file(FilePath, Ec); + if (Ec) + { + ZEN_WARN("GCV2: cachebucket [COMPACT] '{}': failed checking cache payload file '{}'. Reason '{}'", + m_Bucket.m_BucketDir, + FilePath, + Ec.message()); + continue; + } + if (!Existed) + { + continue; + } + Skipped++; + } + } + if (Skipped > 0) + { + ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': skipped deleting of {} eligible files", m_Bucket.m_BucketDir, Skipped); + } + } + + if (Ctx.Settings.CollectSmallObjects) + { + std::unordered_map BlockUsage; + { + for (const auto& Entry : m_Bucket.m_Index) + { + ZenCacheDiskLayer::CacheBucket::PayloadIndex Index = Entry.second; + const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[Index]; + const DiskLocation& Loc = Payload.Location; + + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + continue; + } + uint32_t BlockIndex = Loc.Location.BlockLocation.GetBlockIndex(); + uint64_t ChunkSize = RoundUp(Loc.Size(), m_Bucket.m_Configuration.PayloadAlignment); + auto It = BlockUsage.find(BlockIndex); + if (It == BlockUsage.end()) + { + BlockUsage.insert_or_assign(BlockIndex, ChunkSize); + } + else + { + It->second += ChunkSize; + } + } + } + + { + BlockStoreCompactState BlockCompactState; + std::vector BlockCompactStateKeys; + + std::vector BlocksToCompact = + m_Bucket.m_BlockStore.GetBlocksToCompact(BlockUsage, Ctx.Settings.CompactBlockUsageThresholdPercent); + BlockCompactState.IncludeBlocks(BlocksToCompact); + + { + RwLock::SharedLockScope __(m_Bucket.m_IndexLock); + for (const auto& Entry : m_Bucket.m_Index) + { + ZenCacheDiskLayer::CacheBucket::PayloadIndex Index = Entry.second; + const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[Index]; + const DiskLocation& Loc = Payload.Location; + + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + continue; + } + if (!BlockCompactState.AddKeepLocation(Loc.GetBlockLocation(m_Bucket.m_Configuration.PayloadAlignment))) + { + continue; + } + BlockCompactStateKeys.push_back(Entry.first); + } + } + + if (Ctx.Settings.IsDeleteMode) + { + ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': compacting {} blocks", m_Bucket.m_BucketDir, BlocksToCompact.size()); + + m_Bucket.m_BlockStore.CompactBlocks( + BlockCompactState, + m_Bucket.m_Configuration.PayloadAlignment, + [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) { + std::vector MovedEntries; + RwLock::ExclusiveLockScope _(m_Bucket.m_IndexLock); + for (const std::pair& Moved : MovedArray) + { + size_t ChunkIndex = Moved.first; + const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; + + if (auto It = m_Bucket.m_Index.find(Key); It != m_Bucket.m_Index.end()) + { + ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[It->second]; + const BlockStoreLocation& OldLocation = BlockCompactState.GetLocation(ChunkIndex); + if (Payload.Location.GetBlockLocation(m_Bucket.m_Configuration.PayloadAlignment) != OldLocation) + { + // Someone has moved our chunk so lets just skip the new location we were provided, it will be GC:d + // at a later time + continue; + } + const BlockStoreLocation& NewLocation = Moved.second; + + Payload.Location = + DiskLocation(NewLocation, m_Bucket.m_Configuration.PayloadAlignment, Payload.Location.GetFlags()); + MovedEntries.push_back({.Key = Key, .Location = Payload.Location}); + } + } + m_Bucket.m_SlogFile.Append(MovedEntries); + Stats.RemovedDisk += FreedDiskSpace; + }, + ClaimDiskReserveCallback); + } + else + { + ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': skipped compacting of {} eligible blocks", + m_Bucket.m_BucketDir, + BlocksToCompact.size()); + } + } + } + m_ExpiredStandaloneKeys.clear(); + } + +private: + ZenCacheDiskLayer::CacheBucket& m_Bucket; + std::vector> m_ExpiredStandaloneKeys; +}; + +GcStoreCompactor* +ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) +{ + size_t TotalEntries = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -2364,37 +2564,30 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcReferencerStats& { return; } - ZEN_INFO("GCV2: cachebucket [REMOVE EXPIRED] '{}': Count: {}, Expired: {}, Deleted: {}, RemovedDisk: {}, RemovedMemory: {} in {}", + ZEN_INFO("GCV2: cachebucket [REMOVE EXPIRED] '{}': Count: {}, Expired: {}, Deleted: {}, FreedMemory: {} in {}", m_BucketDir, - Stats.Count, - Stats.Expired, - Stats.Deleted, - NiceBytes(Stats.RemovedDisk), - NiceBytes(Stats.RemovedMemory), + Stats.CheckedCount, + Stats.FoundCount, + Stats.DeletedCount, + NiceBytes(Stats.FreedMemory), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); const GcClock::Tick ExpireTicks = Ctx.Settings.CacheExpireTime.time_since_epoch().count(); - BlockStoreCompactState BlockCompactState; - BlockStore::ReclaimSnapshotState BlockSnapshotState; - std::vector BlockCompactStateKeys; - std::vector ExpiredEntries; - uint64_t RemovedStandaloneSize = 0; + std::vector ExpiredEntries; + std::vector> ExpiredStandaloneKeys; + uint64_t RemovedStandaloneSize = 0; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - if (Ctx.Settings.CollectSmallObjects) - { - BlockSnapshotState = m_BlockStore.GetReclaimSnapshotState(); - } TotalEntries = m_Index.size(); - // Find out expired keys and affected blocks + // Find out expired keys for (const auto& Entry : m_Index) { - const IoHash& Key = Entry.first; - size_t EntryIndex = Entry.second; - GcClock::Tick AccessTime = m_AccessTimes[EntryIndex]; + const IoHash& Key = Entry.first; + ZenCacheDiskLayer::CacheBucket::PayloadIndex EntryIndex = Entry.second; + GcClock::Tick AccessTime = m_AccessTimes[EntryIndex]; if (AccessTime >= ExpireTicks) { continue; @@ -2412,41 +2605,12 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcReferencerStats& } else if (Ctx.Settings.CollectSmallObjects) { - ExpiredInlineKeys.insert(Key); - uint32_t BlockIndex = Payload.Location.Location.BlockLocation.GetBlockIndex(); - bool IsActiveWriteBlock = BlockSnapshotState.m_ActiveWriteBlocks.contains(BlockIndex); - if (!IsActiveWriteBlock) - { - BlockCompactState.IncludeBlock(BlockIndex); - } ExpiredEntries.push_back(ExpiredEntry); } } - Stats.Expired += ExpiredStandaloneKeys.size() + ExpiredInlineKeys.size(); - - // Get all locations we need to keep for affected blocks - if (Ctx.Settings.CollectSmallObjects && !ExpiredInlineKeys.empty()) - { - for (const auto& Entry : m_Index) - { - const IoHash& Key = Entry.first; - if (ExpiredInlineKeys.contains(Key)) - { - continue; - } - size_t EntryIndex = Entry.second; - const BucketPayload& Payload = m_Payloads[EntryIndex]; - if (Payload.Location.Flags & DiskLocation::kStandaloneFile) - { - continue; - } - if (BlockCompactState.AddKeepLocation(Payload.Location.GetBlockLocation(m_Configuration.PayloadAlignment))) - { - BlockCompactStateKeys.push_back(Key); - } - } - } + Stats.CheckedCount += TotalEntries; + Stats.FoundCount += ExpiredEntries.size(); if (Ctx.Settings.IsDeleteMode) { @@ -2456,112 +2620,30 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcReferencerStats& ZEN_ASSERT(It != m_Index.end()); BucketPayload& Payload = m_Payloads[It->second]; RemoveMetaData(Payload); - Stats.RemovedMemory += RemoveMemCachedData(Payload); + Stats.FreedMemory += RemoveMemCachedData(Payload); m_Index.erase(It); + Stats.DeletedCount++; } m_SlogFile.Append(ExpiredEntries); m_StandaloneSize.fetch_sub(RemovedStandaloneSize, std::memory_order::relaxed); } } - Stats.Count += TotalEntries; - - if (ExpiredEntries.empty()) - { - return; - } - if (!Ctx.Settings.IsDeleteMode) + if (!ExpiredEntries.empty()) { - return; - } - - Stats.Deleted += ExpiredEntries.size(); - - // Compact standalone items - ExtendablePathBuilder<256> Path; - for (const std::pair& ExpiredKey : ExpiredStandaloneKeys) - { - Path.Reset(); - BuildPath(Path, ExpiredKey.first); - fs::path FilePath = Path.ToPath(); - - RwLock::SharedLockScope IndexLock(m_IndexLock); - if (m_Index.contains(ExpiredKey.first)) - { - // Someone added it back, let the file on disk be - ZEN_DEBUG("gc cache bucket '{}': skipping z$ delete standalone of file '{}' FAILED, it has been added back", - m_BucketDir, - Path.ToUtf8()); - continue; - } - - RwLock::ExclusiveLockScope ValueLock(LockForHash(ExpiredKey.first)); - IndexLock.ReleaseNow(); - ZEN_DEBUG("gc cache bucket '{}': deleting standalone cache file '{}'", m_BucketDir, Path.ToUtf8()); - - std::error_code Ec; - if (!fs::remove(FilePath, Ec)) - { - continue; - } - if (Ec) + std::vector Payloads; + std::vector AccessTimes; + std::vector MetaDatas; + std::vector MemCachedPayloads; + std::vector FirstReferenceIndex; + IndexMap Index; { - ZEN_WARN("gc cache bucket '{}': delete expired z$ standalone file '{}' FAILED, reason: '{}'", - m_BucketDir, - Path.ToUtf8(), - Ec.message()); - continue; + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + CompactState(Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock); } - Stats.RemovedDisk += ExpiredKey.second; } - if (Ctx.Settings.CollectSmallObjects && !ExpiredInlineKeys.empty()) - { - // Compact block store - m_BlockStore.CompactBlocks( - BlockCompactState, - m_Configuration.PayloadAlignment, - [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) { - std::vector MovedEntries; - RwLock::ExclusiveLockScope _(m_IndexLock); - for (const std::pair& Moved : MovedArray) - { - size_t ChunkIndex = Moved.first; - const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; - - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - BucketPayload& Payload = m_Payloads[It->second]; - const BlockStoreLocation& OldLocation = BlockCompactState.GetLocation(ChunkIndex); - if (Payload.Location.GetBlockLocation(m_Configuration.PayloadAlignment) != OldLocation) - { - // Someone has moved our chunk so lets just skip the new location we were provided, it will be GC:d at a later - // time - continue; - } - - const BlockStoreLocation& NewLocation = Moved.second; - - Payload.Location = DiskLocation(NewLocation, m_Configuration.PayloadAlignment, Payload.Location.GetFlags()); - MovedEntries.push_back({.Key = Key, .Location = Payload.Location}); - } - } - m_SlogFile.Append(MovedEntries); - Stats.RemovedDisk += FreedDiskSpace; - }, - [&]() { return 0; }); - } - - std::vector Payloads; - std::vector AccessTimes; - std::vector MetaDatas; - std::vector MemCachedPayloads; - std::vector FirstReferenceIndex; - IndexMap Index; - { - RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - CompactState(Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock); - } + return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys)); } class DiskBucketReferenceChecker : public GcReferenceChecker -- cgit v1.2.3 From 669a8869b5414c0e8708dd90b1f4aa297d091887 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 21 Nov 2023 16:58:43 +0100 Subject: add command line options for compact block threshold and gc verbose (#557) - Feature: Added new options to zenserver for GC V2 - `--gc-compactblock-threshold` GCV2 - how much of a compact block should be used to skip compacting the block, default is 90% - `--gc-verbose` GCV2 - enable more verbose output when running a GC pass - Feature: Added new options to `zen gc` command for GC V2 - `--compactblockthreshold` GCV2 - how much of a compact block should be used to skip compacting the block, default is 90% - `--verbose` GCV2 - enable more verbose output when running a GC pass - Feature: Added new parameters for endpoint `admin/gc` (PUT) - `compactblockthreshold` GCV2 - how much of a compact block should be used to skip compacting the block, default is 90% - `verbose` GCV2 - enable more verbose output when running a GC pass --- src/zenserver/cache/cachedisklayer.cpp | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) (limited to 'src/zenserver/cache/cachedisklayer.cpp') diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 32ef420d1..2be32e372 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -2502,7 +2502,10 @@ public: if (Ctx.Settings.IsDeleteMode) { - ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': compacting {} blocks", m_Bucket.m_BucketDir, BlocksToCompact.size()); + if (Ctx.Settings.Verbose) + { + ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': compacting {} blocks", m_Bucket.m_BucketDir, BlocksToCompact.size()); + } m_Bucket.m_BlockStore.CompactBlocks( BlockCompactState, @@ -2539,9 +2542,12 @@ public: } else { - ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': skipped compacting of {} eligible blocks", - m_Bucket.m_BucketDir, - BlocksToCompact.size()); + if (Ctx.Settings.Verbose) + { + ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': skipped compacting of {} eligible blocks", + m_Bucket.m_BucketDir, + BlocksToCompact.size()); + } } } } @@ -2712,9 +2718,22 @@ public: } } - virtual void RemoveUsedReferencesFromSet(GcCtx&, HashSet& IoCids) override + virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override { ZEN_ASSERT(m_IndexLock); + size_t InitialCount = IoCids.size(); + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: cachebucket [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}", + m_CacheBucket.m_BucketDir, + InitialCount - IoCids.size(), + InitialCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); for (const IoHash& ReferenceHash : m_CacheBucket.m_ReferenceHashes) { -- cgit v1.2.3 From bb4a0b17c07e5a4f19f56a7c089c49f82aecf2fa Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Wed, 22 Nov 2023 15:04:53 +0100 Subject: reduce work when there are no blocks to compact (#558) * reduce work when there are no blocks to compact * fix lock scopes --- src/zenserver/cache/cachedisklayer.cpp | 115 +++++++++++++++++---------------- 1 file changed, 61 insertions(+), 54 deletions(-) (limited to 'src/zenserver/cache/cachedisklayer.cpp') diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 2be32e372..af8b6227b 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -2448,6 +2448,7 @@ public: { std::unordered_map BlockUsage; { + RwLock::SharedLockScope __(m_Bucket.m_IndexLock); for (const auto& Entry : m_Bucket.m_Index) { ZenCacheDiskLayer::CacheBucket::PayloadIndex Index = Entry.second; @@ -2480,73 +2481,79 @@ public: m_Bucket.m_BlockStore.GetBlocksToCompact(BlockUsage, Ctx.Settings.CompactBlockUsageThresholdPercent); BlockCompactState.IncludeBlocks(BlocksToCompact); + if (BlocksToCompact.size() > 0) { - RwLock::SharedLockScope __(m_Bucket.m_IndexLock); - for (const auto& Entry : m_Bucket.m_Index) { - ZenCacheDiskLayer::CacheBucket::PayloadIndex Index = Entry.second; - const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[Index]; - const DiskLocation& Loc = Payload.Location; - - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - continue; - } - if (!BlockCompactState.AddKeepLocation(Loc.GetBlockLocation(m_Bucket.m_Configuration.PayloadAlignment))) + RwLock::SharedLockScope __(m_Bucket.m_IndexLock); + for (const auto& Entry : m_Bucket.m_Index) { - continue; + ZenCacheDiskLayer::CacheBucket::PayloadIndex Index = Entry.second; + const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[Index]; + const DiskLocation& Loc = Payload.Location; + + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + continue; + } + if (!BlockCompactState.AddKeepLocation(Loc.GetBlockLocation(m_Bucket.m_Configuration.PayloadAlignment))) + { + continue; + } + BlockCompactStateKeys.push_back(Entry.first); } - BlockCompactStateKeys.push_back(Entry.first); } - } - if (Ctx.Settings.IsDeleteMode) - { - if (Ctx.Settings.Verbose) + if (Ctx.Settings.IsDeleteMode) { - ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': compacting {} blocks", m_Bucket.m_BucketDir, BlocksToCompact.size()); - } - - m_Bucket.m_BlockStore.CompactBlocks( - BlockCompactState, - m_Bucket.m_Configuration.PayloadAlignment, - [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) { - std::vector MovedEntries; - RwLock::ExclusiveLockScope _(m_Bucket.m_IndexLock); - for (const std::pair& Moved : MovedArray) - { - size_t ChunkIndex = Moved.first; - const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; + if (Ctx.Settings.Verbose) + { + ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': compacting {} blocks", + m_Bucket.m_BucketDir, + BlocksToCompact.size()); + } - if (auto It = m_Bucket.m_Index.find(Key); It != m_Bucket.m_Index.end()) + m_Bucket.m_BlockStore.CompactBlocks( + BlockCompactState, + m_Bucket.m_Configuration.PayloadAlignment, + [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) { + std::vector MovedEntries; + RwLock::ExclusiveLockScope _(m_Bucket.m_IndexLock); + for (const std::pair& Moved : MovedArray) { - ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[It->second]; - const BlockStoreLocation& OldLocation = BlockCompactState.GetLocation(ChunkIndex); - if (Payload.Location.GetBlockLocation(m_Bucket.m_Configuration.PayloadAlignment) != OldLocation) + size_t ChunkIndex = Moved.first; + const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; + + if (auto It = m_Bucket.m_Index.find(Key); It != m_Bucket.m_Index.end()) { - // Someone has moved our chunk so lets just skip the new location we were provided, it will be GC:d - // at a later time - continue; + ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[It->second]; + const BlockStoreLocation& OldLocation = BlockCompactState.GetLocation(ChunkIndex); + if (Payload.Location.GetBlockLocation(m_Bucket.m_Configuration.PayloadAlignment) != OldLocation) + { + // Someone has moved our chunk so lets just skip the new location we were provided, it will be + // GC:d at a later time + continue; + } + const BlockStoreLocation& NewLocation = Moved.second; + + Payload.Location = DiskLocation(NewLocation, + m_Bucket.m_Configuration.PayloadAlignment, + Payload.Location.GetFlags()); + MovedEntries.push_back({.Key = Key, .Location = Payload.Location}); } - const BlockStoreLocation& NewLocation = Moved.second; - - Payload.Location = - DiskLocation(NewLocation, m_Bucket.m_Configuration.PayloadAlignment, Payload.Location.GetFlags()); - MovedEntries.push_back({.Key = Key, .Location = Payload.Location}); } - } - m_Bucket.m_SlogFile.Append(MovedEntries); - Stats.RemovedDisk += FreedDiskSpace; - }, - ClaimDiskReserveCallback); - } - else - { - if (Ctx.Settings.Verbose) + m_Bucket.m_SlogFile.Append(MovedEntries); + Stats.RemovedDisk += FreedDiskSpace; + }, + ClaimDiskReserveCallback); + } + else { - ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': skipped compacting of {} eligible blocks", - m_Bucket.m_BucketDir, - BlocksToCompact.size()); + if (Ctx.Settings.Verbose) + { + ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': skipped compacting of {} eligible blocks", + m_Bucket.m_BucketDir, + BlocksToCompact.size()); + } } } } -- cgit v1.2.3 From 254d2f89c110fc5f14e658505559a7e7534a984d Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Fri, 24 Nov 2023 13:26:51 +0100 Subject: Add GC Cancel/Stop (#568) - GcScheduler will now cancel any running GC when it shuts down. - Old GC is rather limited in *when* it reacts to cancel of GC. GCv2 is more responsive. --- src/zenserver/cache/cachedisklayer.cpp | 71 ++++++++++++++++++++++++++++------ 1 file changed, 60 insertions(+), 11 deletions(-) (limited to 'src/zenserver/cache/cachedisklayer.cpp') diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index af8b6227b..6ab3c7746 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -54,6 +54,13 @@ namespace { #pragma pack(pop) + template + void Reset(T& V) + { + T Tmp; + V.swap(Tmp); + } + const char* IndexExtension = ".uidx"; const char* LogExtension = ".slog"; @@ -2367,6 +2374,7 @@ public: { Stopwatch Timer; const auto _ = MakeGuard([&] { + Reset(m_ExpiredStandaloneKeys); if (!Ctx.Settings.Verbose) { return; @@ -2384,6 +2392,10 @@ public: ExtendablePathBuilder<256> Path; for (const std::pair& ExpiredKey : m_ExpiredStandaloneKeys) { + if (Ctx.IsCancelledFlag.load()) + { + return; + } Path.Reset(); m_Bucket.BuildPath(Path, ExpiredKey.first); fs::path FilePath = Path.ToPath(); @@ -2543,6 +2555,11 @@ public: } m_Bucket.m_SlogFile.Append(MovedEntries); Stats.RemovedDisk += FreedDiskSpace; + if (Ctx.IsCancelledFlag.load()) + { + return false; + } + return true; }, ClaimDiskReserveCallback); } @@ -2558,7 +2575,6 @@ public: } } } - m_ExpiredStandaloneKeys.clear(); } private: @@ -2593,6 +2609,11 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) uint64_t RemovedStandaloneSize = 0; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + if (Ctx.IsCancelledFlag.load()) + { + return nullptr; + } + TotalEntries = m_Index.size(); // Find out expired keys @@ -2625,6 +2646,11 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) Stats.CheckedCount += TotalEntries; Stats.FoundCount += ExpiredEntries.size(); + if (Ctx.IsCancelledFlag.load()) + { + return nullptr; + } + if (Ctx.Settings.IsDeleteMode) { for (const DiskIndexEntry& Entry : ExpiredEntries) @@ -2656,6 +2682,11 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) } } + if (Ctx.IsCancelledFlag.load()) + { + return nullptr; + } + return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys)); } @@ -2689,10 +2720,23 @@ public: }); m_IndexLock = std::make_unique(m_CacheBucket.m_IndexLock); + if (Ctx.IsCancelledFlag.load()) + { + m_UncachedReferences.clear(); + m_IndexLock.reset(); + return; + } // Rescan to see if any cache items needs refreshing since last pass when we had the lock for (const auto& Entry : m_CacheBucket.m_Index) { + if (Ctx.IsCancelledFlag.load()) + { + m_UncachedReferences.clear(); + m_IndexLock.reset(); + return; + } + size_t PayloadIndex = Entry.second; const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_CacheBucket.m_Payloads[PayloadIndex]; const DiskLocation& Loc = Payload.Location; @@ -2782,6 +2826,11 @@ ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx) RwLock::SharedLockScope IndexLock(m_IndexLock); for (const auto& Entry : m_Index) { + if (Ctx.IsCancelledFlag.load()) + { + return {}; + } + size_t PayloadIndex = Entry.second; const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Payloads[PayloadIndex]; const DiskLocation& Loc = Payload.Location; @@ -2821,6 +2870,11 @@ ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx) { for (const IoHash& Key : StandaloneKeys) { + if (Ctx.IsCancelledFlag.load()) + { + return {}; + } + IoBuffer Buffer = GetStandaloneCacheValue(ZenContentType::kCbObject, Key); if (!Buffer) { @@ -3047,12 +3101,9 @@ void ZenCacheDiskLayer::CacheBucket::ClearReferenceCache() { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - m_FirstReferenceIndex.clear(); - m_FirstReferenceIndex.shrink_to_fit(); - m_ReferenceHashes.clear(); - m_ReferenceHashes.shrink_to_fit(); - m_NextReferenceHashesIndexes.clear(); - m_NextReferenceHashesIndexes.shrink_to_fit(); + Reset(m_FirstReferenceIndex); + Reset(m_ReferenceHashes); + Reset(m_NextReferenceHashesIndexes); m_ReferenceCount = 0; } @@ -3099,11 +3150,9 @@ ZenCacheDiskLayer::CacheBucket::CompactState(std::vector& Payloa m_Payloads.swap(Payloads); m_AccessTimes.swap(AccessTimes); m_MetaDatas.swap(MetaDatas); - m_FreeMetaDatas.clear(); - m_FreeMetaDatas.shrink_to_fit(); + Reset(m_FreeMetaDatas); m_MemCachedPayloads.swap(MemCachedPayloads); - m_FreeMemCachedPayloads.clear(); - m_FreeMetaDatas.shrink_to_fit(); + Reset(m_FreeMemCachedPayloads); if (m_Configuration.EnableReferenceCaching) { m_FirstReferenceIndex.swap(FirstReferenceIndex); -- cgit v1.2.3 From 4d95b578350ebfbbf6d54407c9403547b01cac4c Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Mon, 27 Nov 2023 14:32:19 +0100 Subject: optimized index snapshot reading/writing (#561) the previous implementation of in-memory index snapshots serialise data to memory before writing to disk and vice versa when reading. This leads to some memory spikes which end up pushing useful data out of system cache and also cause stalls on I/O operations. this change moves more code to a streaming serialisation approach which scales better from a memory usage perspective and also performs much better --- src/zenserver/cache/cachedisklayer.cpp | 1231 ++++++++++++++++++++------------ 1 file changed, 781 insertions(+), 450 deletions(-) (limited to 'src/zenserver/cache/cachedisklayer.cpp') diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 6ab3c7746..0767086ce 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -25,12 +25,6 @@ namespace { #pragma pack(push) #pragma pack(1) - // We use this to indicate if a on disk bucket needs wiping - // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scrable the references - // to block items. - // See: https://github.com/EpicGames/zen/pull/299 - static const uint32_t CurrentDiskBucketVersion = 1; - struct CacheBucketIndexHeader { static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; @@ -48,12 +42,75 @@ namespace { { return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); } + + bool IsValid() const + { + if (Magic != ExpectedMagic) + { + return false; + } + + if (Checksum != ComputeChecksum(*this)) + { + return false; + } + + if (PayloadAlignment == 0) + { + return false; + } + + return true; + } }; static_assert(sizeof(CacheBucketIndexHeader) == 32); + struct BucketMetaHeader + { + static constexpr uint32_t ExpectedMagic = 0x61'74'65'6d; // 'meta'; + static constexpr uint32_t Version1 = 1; + static constexpr uint32_t CurrentVersion = Version1; + + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + uint64_t EntryCount = 0; + uint64_t LogPosition = 0; + uint32_t Padding = 0; + uint32_t Checksum = 0; + + static uint32_t ComputeChecksum(const BucketMetaHeader& Header) + { + return XXH32(&Header.Magic, sizeof(BucketMetaHeader) - sizeof(uint32_t), 0xC0C0'BABA); + } + + bool IsValid() const + { + if (Magic != ExpectedMagic) + { + return false; + } + + if (Checksum != ComputeChecksum(*this)) + { + return false; + } + + if (Padding != 0) + { + return false; + } + + return true; + } + }; + + static_assert(sizeof(BucketMetaHeader) == 32); + #pragma pack(pop) + ////////////////////////////////////////////////////////////////////////// + template void Reset(T& V) { @@ -63,15 +120,16 @@ namespace { const char* IndexExtension = ".uidx"; const char* LogExtension = ".slog"; + const char* MetaExtension = ".meta"; std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { return BucketDir / (BucketName + IndexExtension); } - std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + std::filesystem::path GetMetaPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { - return BucketDir / (BucketName + ".tmp"); + return BucketDir / (BucketName + MetaExtension); } std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName) @@ -79,6 +137,12 @@ namespace { return BucketDir / (BucketName + LogExtension); } + std::filesystem::path GetManifestPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + ZEN_UNUSED(BucketName); + return BucketDir / "zen_manifest"; + } + bool ValidateCacheBucketIndexEntry(const DiskIndexEntry& Entry, std::string& OutReason) { if (Entry.Key == IoHash::Zero) @@ -147,22 +211,430 @@ namespace { } // namespace namespace fs = std::filesystem; +using namespace std::literals; + +class BucketManifestSerializer +{ + using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex; + using BucketMetaData = ZenCacheDiskLayer::CacheBucket::BucketMetaData; + + using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex; + using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload; + +public: + // We use this to indicate if a on disk bucket needs wiping + // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scramble the references + // to block items. + // See: https://github.com/EpicGames/zen/pull/299 + static inline const uint32_t CurrentDiskBucketVersion = 1; + + bool Open(std::filesystem::path ManifestPath) + { + Manifest = LoadCompactBinaryObject(ManifestPath); + return !!Manifest; + } + + Oid GetBucketId() const { return Manifest["BucketId"sv].AsObjectId(); } + + bool IsCurrentVersion(uint32_t& OutVersion) const + { + OutVersion = Manifest["Version"sv].AsUInt32(0); + return OutVersion == CurrentDiskBucketVersion; + } + + void ParseManifest(ZenCacheDiskLayer::CacheBucket& Bucket, + std::filesystem::path ManifestPath, + ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + std::vector& AccessTimes, + std::vector& Payloads); + + Oid GenerateNewManifest(std::filesystem::path ManifestPath); + + IoBuffer MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount); + uint64_t GetSidecarSize() const { return m_ManifestEntryCount * sizeof(ManifestData); } + void WriteSidecarFile(const std::filesystem::path& SidecarPath, + uint64_t SnapshotLogPosition, + ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, + std::vector&& AccessTimes, + const std::vector& Payloads, + const std::vector& MetaDatas); + bool ReadSidecarFile(ZenCacheDiskLayer::CacheBucket& Bucket, + std::filesystem::path SidecarPath, + ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + std::vector& AccessTimes, + std::vector& Payloads); -static CbObject -LoadCompactBinaryObject(const fs::path& Path) + IoBuffer MakeManifest(const Oid& BucketId, + ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, + std::vector&& AccessTimes, + const std::vector& Payloads, + const std::vector& MetaDatas); + + CbObject Manifest; + +private: + CbObject LoadCompactBinaryObject(const fs::path& Path) + { + FileContents Result = ReadFile(Path); + + if (!Result.ErrorCode) + { + IoBuffer Buffer = Result.Flatten(); + if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) + { + return zen::LoadCompactBinaryObject(Buffer); + } + } + + return CbObject(); + } + + uint64_t m_ManifestEntryCount = 0; + + struct ManifestData + { + IoHash Key; // 20 + AccessTime Timestamp; // 4 + IoHash RawHash; // 20 + uint32_t Padding_0; // 4 + size_t RawSize; // 8 + uint64_t Padding_1; // 8 + }; + + static_assert(sizeof(ManifestData) == 64); +}; + +void +BucketManifestSerializer::ParseManifest(ZenCacheDiskLayer::CacheBucket& Bucket, + std::filesystem::path ManifestPath, + ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + std::vector& AccessTimes, + std::vector& Payloads) { - FileContents Result = ReadFile(Path); + if (Manifest["UsingMetaFile"sv].AsBool()) + { + ReadSidecarFile(Bucket, GetMetaPath(Bucket.m_BucketDir, Bucket.m_BucketName), Index, AccessTimes, Payloads); + + return; + } + + ZEN_TRACE_CPU("Z$::ParseManifest"); + + Stopwatch Timer; + const auto _ = MakeGuard([&] { ZEN_INFO("parsed store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + + const uint64_t Count = Manifest["Count"sv].AsUInt64(0); + std::vector KeysIndexes; + KeysIndexes.reserve(Count); + + CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView(); + for (CbFieldView& KeyView : KeyArray) + { + if (auto It = Index.find(KeyView.AsHash()); It != Index.end()) + { + KeysIndexes.push_back(It.value()); + } + else + { + KeysIndexes.push_back(PayloadIndex()); + } + } - if (!Result.ErrorCode) + size_t KeyIndexOffset = 0; + CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView(); + for (CbFieldView& TimeStampView : TimeStampArray) { - IoBuffer Buffer = Result.Flatten(); - if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) + const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; + if (KeyIndex) { - return LoadCompactBinaryObject(Buffer); + AccessTimes[KeyIndex] = TimeStampView.AsInt64(); + } + } + + KeyIndexOffset = 0; + CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView(); + CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView(); + if (RawHashArray.Num() == RawSizeArray.Num()) + { + auto RawHashIt = RawHashArray.CreateViewIterator(); + auto RawSizeIt = RawSizeArray.CreateViewIterator(); + while (RawHashIt != CbFieldViewIterator()) + { + const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; + + if (KeyIndex) + { + uint64_t RawSize = RawSizeIt.AsUInt64(); + IoHash RawHash = RawHashIt.AsHash(); + if (RawSize != 0 || RawHash != IoHash::Zero) + { + BucketPayload& Payload = Payloads[KeyIndex]; + Bucket.SetMetaData(Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash}); + } + } + + RawHashIt++; + RawSizeIt++; } } + else + { + ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath); + } +} - return CbObject(); +Oid +BucketManifestSerializer::GenerateNewManifest(std::filesystem::path ManifestPath) +{ + const Oid BucketId = Oid::NewOid(); + + CbObjectWriter Writer; + Writer << "BucketId"sv << BucketId; + Writer << "Version"sv << CurrentDiskBucketVersion; + Manifest = Writer.Save(); + WriteFile(ManifestPath, Manifest.GetBuffer().AsIoBuffer()); + + return BucketId; +} + +IoBuffer +BucketManifestSerializer::MakeManifest(const Oid& BucketId, + ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, + std::vector&& AccessTimes, + const std::vector& Payloads, + const std::vector& MetaDatas) +{ + using namespace std::literals; + + ZEN_TRACE_CPU("Z$::MakeManifest"); + + size_t ItemCount = Index.size(); + + // This tends to overestimate a little bit but it is still way more accurate than what we get with exponential growth + // And we don't need to reallocate the underlying buffer in almost every case + const size_t EstimatedSizePerItem = 54u; + const size_t ReserveSize = ItemCount == 0 ? 48u : RoundUp(32u + (ItemCount * EstimatedSizePerItem), 128); + CbObjectWriter Writer(ReserveSize); + + Writer << "BucketId"sv << BucketId; + Writer << "Version"sv << CurrentDiskBucketVersion; + + if (!Index.empty()) + { + Writer.AddInteger("Count"sv, gsl::narrow(Index.size())); + Writer.BeginArray("Keys"sv); + for (auto& Kv : Index) + { + const IoHash& Key = Kv.first; + Writer.AddHash(Key); + } + Writer.EndArray(); + + Writer.BeginArray("Timestamps"sv); + for (auto& Kv : Index) + { + GcClock::Tick AccessTime = AccessTimes[Kv.second]; + Writer.AddInteger(AccessTime); + } + Writer.EndArray(); + + if (!MetaDatas.empty()) + { + Writer.BeginArray("RawHash"sv); + for (auto& Kv : Index) + { + const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second]; + if (Payload.MetaData) + { + Writer.AddHash(MetaDatas[Payload.MetaData].RawHash); + } + else + { + Writer.AddHash(IoHash::Zero); + } + } + Writer.EndArray(); + + Writer.BeginArray("RawSize"sv); + for (auto& Kv : Index) + { + const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second]; + if (Payload.MetaData) + { + Writer.AddInteger(MetaDatas[Payload.MetaData].RawSize); + } + else + { + Writer.AddInteger(0); + } + } + Writer.EndArray(); + } + } + + Manifest = Writer.Save(); + return Manifest.GetBuffer().AsIoBuffer(); +} + +IoBuffer +BucketManifestSerializer::MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount) +{ + m_ManifestEntryCount = EntryCount; + + CbObjectWriter Writer; + Writer << "BucketId"sv << BucketId; + Writer << "Version"sv << CurrentDiskBucketVersion; + Writer << "Count"sv << EntryCount; + Writer << "UsingMetaFile"sv << true; + Manifest = Writer.Save(); + + return Manifest.GetBuffer().AsIoBuffer(); +} + +bool +BucketManifestSerializer::ReadSidecarFile(ZenCacheDiskLayer::CacheBucket& Bucket, + std::filesystem::path SidecarPath, + ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + std::vector& AccessTimes, + std::vector& Payloads) +{ + ZEN_ASSERT(AccessTimes.size() == Payloads.size()); + + std::error_code Ec; + + BasicFile SidecarFile; + SidecarFile.Open(SidecarPath, BasicFile::Mode::kRead, Ec); + + if (Ec) + { + throw std::system_error(Ec, fmt::format("failed to open sidecar file '{}'", SidecarPath)); + } + + uint64_t FileSize = SidecarFile.FileSize(); + + auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid sidecar file '{}'", SidecarPath); }); + + if (FileSize < sizeof(BucketMetaHeader)) + { + return false; + } + + BasicFileBuffer Sidecar(SidecarFile, 128 * 1024); + + BucketMetaHeader Header; + Sidecar.Read(&Header, sizeof Header, 0); + + if (!Header.IsValid()) + { + return false; + } + + if (Header.Version != BucketMetaHeader::Version1) + { + return false; + } + + const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(BucketMetaHeader))) / sizeof(ManifestData); + if (Header.EntryCount > ExpectedEntryCount) + { + return false; + } + + InvalidGuard.Dismiss(); + + uint64_t RemainingEntryCount = ExpectedEntryCount; + uint64_t EntryCount = 0; + uint64_t CurrentReadOffset = sizeof(Header); + + while (RemainingEntryCount--) + { + const ManifestData* Entry = Sidecar.MakeView(CurrentReadOffset); + CurrentReadOffset += sizeof(ManifestData); + + if (auto It = Index.find(Entry->Key); It != Index.end()) + { + PayloadIndex PlIndex = It.value(); + + ZEN_ASSERT(size_t(PlIndex) <= Payloads.size()); + + ZenCacheDiskLayer::CacheBucket::BucketPayload& PayloadEntry = Payloads[PlIndex]; + + AccessTimes[PlIndex] = Entry->Timestamp; + + if (Entry->RawSize && Entry->RawHash != IoHash::Zero) + { + Bucket.SetMetaData(PayloadEntry, BucketMetaData{.RawSize = Entry->RawSize, .RawHash = Entry->RawHash}); + } + } + + EntryCount++; + } + + ZEN_ASSERT(EntryCount == ExpectedEntryCount); + + return true; +} + +void +BucketManifestSerializer::WriteSidecarFile(const std::filesystem::path& SidecarPath, + uint64_t SnapshotLogPosition, + ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, + std::vector&& AccessTimes, + const std::vector& Payloads, + const std::vector& MetaDatas) +{ + BucketMetaHeader Header; + Header.EntryCount = m_ManifestEntryCount; + Header.LogPosition = SnapshotLogPosition; + Header.Checksum = Header.ComputeChecksum(Header); + + std::error_code Ec; + + TemporaryFile SidecarFile; + SidecarFile.CreateTemporary(SidecarPath.parent_path(), Ec); + + if (Ec) + { + throw std::system_error(Ec, fmt::format("failed creating '{}'", SidecarFile.GetPath())); + } + + SidecarFile.Write(&Header, sizeof Header, 0); + + // TODO: make this batching for better performance + { + uint64_t WriteOffset = sizeof Header; + + BasicFileWriter SidecarWriter(SidecarFile, 128 * 1024); + + for (auto& Kv : Index) + { + const IoHash& Key = Kv.first; + const PayloadIndex PlIndex = Kv.second; + + IoHash RawHash = IoHash::Zero; + uint64_t RawSize = 0; + + if (const MetaDataIndex MetaIndex = Payloads[PlIndex].MetaData) + { + RawHash = MetaDatas[MetaIndex].RawHash; + RawSize = MetaDatas[MetaIndex].RawSize; + } + + ManifestData ManifestEntry = + {.Key = Key, .Timestamp = AccessTimes[PlIndex], .RawHash = RawHash, .Padding_0 = 0, .RawSize = RawSize, .Padding_1 = 0}; + + SidecarWriter.Write(&ManifestEntry, sizeof ManifestEntry, WriteOffset); + + WriteOffset += sizeof ManifestEntry; + } + } + + SidecarFile.MoveTemporaryIntoPlace(SidecarPath, Ec); + + if (Ec) + { + throw std::system_error(Ec, fmt::format("failed to move '{}' into '{}'", SidecarFile.GetPath(), SidecarPath)); + } } ////////////////////////////////////////////////////////////////////////// @@ -207,211 +679,80 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo CreateDirectories(m_BucketDir); - std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"}; + std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName); bool IsNew = false; - CbObject Manifest = LoadCompactBinaryObject(ManifestPath); + BucketManifestSerializer ManifestReader; - if (Manifest) + if (ManifestReader.Open(ManifestPath)) { - m_BucketId = Manifest["BucketId"sv].AsObjectId(); + m_BucketId = ManifestReader.GetBucketId(); if (m_BucketId == Oid::Zero) { return false; } - const uint32_t Version = Manifest["Version"sv].AsUInt32(0); - if (Version != CurrentDiskBucketVersion) + + uint32_t Version = 0; + if (ManifestReader.IsCurrentVersion(/* out */ Version) == false) { - ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, CurrentDiskBucketVersion); + ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", + BucketDir, + Version, + BucketManifestSerializer::CurrentDiskBucketVersion); IsNew = true; } } else if (AllowCreate) { - m_BucketId.Generate(); - - CbObjectWriter Writer; - Writer << "BucketId"sv << m_BucketId; - Writer << "Version"sv << CurrentDiskBucketVersion; - Manifest = Writer.Save(); - WriteFile(m_BucketDir / "zen_manifest", Manifest.GetBuffer().AsIoBuffer()); - IsNew = true; + m_BucketId = ManifestReader.GenerateNewManifest(ManifestPath); + IsNew = true; } else { return false; } - OpenLog(IsNew); + InitializeIndexFromDisk(IsNew); - if (!IsNew) + if (IsNew) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenOrCreate::Manifest"); - - Stopwatch Timer; - const auto _ = - MakeGuard([&] { ZEN_INFO("read store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - - const uint64_t Count = Manifest["Count"sv].AsUInt64(0); - if (Count != 0) - { - std::vector KeysIndexes; - KeysIndexes.reserve(Count); - CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView(); - for (CbFieldView& KeyView : KeyArray) - { - if (auto It = m_Index.find(KeyView.AsHash()); It != m_Index.end()) - { - KeysIndexes.push_back(It.value()); - } - else - { - KeysIndexes.push_back(PayloadIndex()); - } - } - size_t KeyIndexOffset = 0; - CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView(); - for (CbFieldView& TimeStampView : TimeStampArray) - { - const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; - if (KeyIndex) - { - m_AccessTimes[KeyIndex] = TimeStampView.AsInt64(); - } - } - KeyIndexOffset = 0; - CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView(); - CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView(); - if (RawHashArray.Num() == RawSizeArray.Num()) - { - auto RawHashIt = RawHashArray.CreateViewIterator(); - auto RawSizeIt = RawSizeArray.CreateViewIterator(); - while (RawHashIt != CbFieldViewIterator()) - { - const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; - - if (KeyIndex) - { - uint64_t RawSize = RawSizeIt.AsUInt64(); - IoHash RawHash = RawHashIt.AsHash(); - if (RawSize != 0 || RawHash != IoHash::Zero) - { - BucketPayload& Payload = m_Payloads[KeyIndex]; - SetMetaData(Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash}); - } - } - - RawHashIt++; - RawSizeIt++; - } - } - else - { - ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath); - } - } - - ////// Legacy format read - { - for (CbFieldView Entry : Manifest["Timestamps"sv]) - { - const CbObjectView Obj = Entry.AsObjectView(); - const IoHash Key = Obj["Key"sv].AsHash(); - - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64(); - } - } - for (CbFieldView Entry : Manifest["RawInfo"sv]) - { - const CbObjectView Obj = Entry.AsObjectView(); - const IoHash Key = Obj["Key"sv].AsHash(); - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - - const IoHash RawHash = Obj["RawHash"sv].AsHash(); - const uint64_t RawSize = Obj["RawSize"sv].AsUInt64(); - - if (RawHash == IoHash::Zero || RawSize == 0) - { - ZEN_SCOPED_ERROR("detected bad index entry in index - {}", EntryIndex); - } - - BucketPayload& Payload = m_Payloads[EntryIndex]; - SetMetaData(Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash}); - } - } - } + return true; } + ManifestReader.ParseManifest(*this, ManifestPath, m_Index, m_AccessTimes, m_Payloads); + return true; } void -ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot(const std::function& ClaimDiskReserveFunc) +ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshot(const std::function& ClaimDiskReserveFunc) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::MakeIndexSnapshot"); + ZEN_TRACE_CPU("Z$::Disk::Bucket::WriteIndexSnapshot"); - uint64_t LogCount = m_SlogFile.GetLogCount(); + const uint64_t LogCount = m_SlogFile.GetLogCount(); if (m_LogFlushPosition == LogCount) { return; } - ZEN_DEBUG("writing store snapshot for '{}'", m_BucketDir); - uint64_t EntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", - m_BucketDir, - EntryCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - namespace fs = std::filesystem; - - fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); - fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName); - - // Move index away, we keep it if something goes wrong - if (fs::is_regular_file(STmpIndexPath)) - { - std::error_code Ec; - if (!fs::remove(STmpIndexPath, Ec) || Ec) - { - ZEN_WARN("snapshot failed to clean up temp snapshot at {}, reason: '{}'", STmpIndexPath, Ec.message()); - return; - } - } - - try - { - if (fs::is_regular_file(IndexPath)) - { - fs::rename(IndexPath, STmpIndexPath); - } + ZEN_DEBUG("writing store snapshot for '{}'", m_BucketDir); + const uint64_t EntryCount = m_Index.size(); + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", + m_BucketDir, + EntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); - // Write the current state of the location map to a new index state - std::vector Entries; - Entries.resize(m_Index.size()); + namespace fs = std::filesystem; - { - uint64_t EntryIndex = 0; - for (auto& Entry : m_Index) - { - DiskIndexEntry& IndexEntry = Entries[EntryIndex++]; - IndexEntry.Key = Entry.first; - IndexEntry.Location = m_Payloads[Entry.second].Location; - } - } + fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); - uint64_t IndexSize = sizeof(CacheBucketIndexHeader) + Entries.size() * sizeof(DiskIndexEntry); + try + { + const uint64_t IndexSize = sizeof(CacheBucketIndexHeader) + EntryCount * sizeof(DiskIndexEntry); std::error_code Error; DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); if (Error) @@ -431,45 +772,67 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot(const std::function(m_Configuration.PayloadAlignment)}; + TemporaryFile ObjectIndexFile; + std::error_code Ec; + ObjectIndexFile.CreateTemporary(m_BucketDir, Ec); + if (Ec) + { + throw std::system_error(Ec, fmt::format("failed to create new snapshot file in '{}'", m_BucketDir)); + } + + { + // This is in a separate scope just to ensure IndexWriter goes out + // of scope before the file is flushed/closed, in order to ensure + // all data is written to the file + BasicFileWriter IndexWriter(ObjectIndexFile, 128 * 1024); - Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); - ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0); - ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); - ObjectIndexFile.Flush(); - ObjectIndexFile.Close(); - EntryCount = Entries.size(); - m_LogFlushPosition = LogCount; - } - catch (std::exception& Err) - { - ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); + CacheBucketIndexHeader Header = {.EntryCount = EntryCount, + .LogPosition = LogCount, + .PayloadAlignment = gsl::narrow(m_Configuration.PayloadAlignment)}; + + Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); + IndexWriter.Write(&Header, sizeof(CacheBucketIndexHeader), 0); + + uint64_t IndexWriteOffset = sizeof(CacheBucketIndexHeader); + + for (auto& Entry : m_Index) + { + DiskIndexEntry IndexEntry; + IndexEntry.Key = Entry.first; + IndexEntry.Location = m_Payloads[Entry.second].Location; + IndexWriter.Write(&IndexEntry, sizeof(DiskIndexEntry), IndexWriteOffset); + + IndexWriteOffset += sizeof(DiskIndexEntry); + } - // Restore any previous snapshot + IndexWriter.Flush(); + } - if (fs::is_regular_file(STmpIndexPath)) + ObjectIndexFile.Flush(); + ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec); + if (Ec) { - std::error_code Ec; - fs::remove(IndexPath, Ec); // We don't care if this fails, we try to move the old temp file regardless - fs::rename(STmpIndexPath, IndexPath, Ec); - if (Ec) + std::filesystem::path TempFilePath = ObjectIndexFile.GetPath(); + ZEN_WARN("snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", TempFilePath, IndexPath, Ec.message()); + + if (std::filesystem::is_regular_file(TempFilePath)) { - ZEN_WARN("snapshot failed to restore old snapshot from {}, reason: '{}'", STmpIndexPath, Ec.message()); + if (!std::filesystem::remove(TempFilePath, Ec) || Ec) + { + ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", TempFilePath, Ec.message()); + } } } - } - if (fs::is_regular_file(STmpIndexPath)) - { - std::error_code Ec; - if (!fs::remove(STmpIndexPath, Ec) || Ec) + else { - ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", STmpIndexPath, Ec.message()); + // We must only update the log flush position once the snapshot write succeeds + m_LogFlushPosition = LogCount; } } + catch (std::exception& Err) + { + ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); + } } uint64_t @@ -477,77 +840,88 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index { ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadIndexFile"); - if (std::filesystem::is_regular_file(IndexPath)) + if (!std::filesystem::is_regular_file(IndexPath)) { - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); - uint64_t Size = ObjectIndexFile.FileSize(); - if (Size >= sizeof(CacheBucketIndexHeader)) - { - CacheBucketIndexHeader Header; - ObjectIndexFile.Read(&Header, sizeof(Header), 0); - if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) && - (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0)) - { - switch (Header.Version) - { - case CacheBucketIndexHeader::Version2: - { - uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); - if (Header.EntryCount > ExpectedEntryCount) - { - break; - } - size_t EntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' index containing {} entries in {}", - IndexPath, - EntryCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); + return 0; + } - m_Configuration.PayloadAlignment = Header.PayloadAlignment; + auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid index file '{}'", IndexPath); }); - std::vector Entries; - Entries.resize(Header.EntryCount); - ObjectIndexFile.Read(Entries.data(), - Header.EntryCount * sizeof(DiskIndexEntry), - sizeof(CacheBucketIndexHeader)); + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t FileSize = ObjectIndexFile.FileSize(); + if (FileSize < sizeof(CacheBucketIndexHeader)) + { + return 0; + } - m_Payloads.reserve(Header.EntryCount); - m_Index.reserve(Header.EntryCount); + CacheBucketIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); - std::string InvalidEntryReason; - for (const DiskIndexEntry& Entry : Entries) - { - if (!ValidateCacheBucketIndexEntry(Entry, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); - continue; - } - PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); - m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location}); - m_Index.insert_or_assign(Entry.Key, EntryIndex); - EntryCount++; - } - m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount())); - if (m_Configuration.EnableReferenceCaching) - { - m_FirstReferenceIndex.resize(m_Payloads.size()); - } - OutVersion = CacheBucketIndexHeader::Version2; - return Header.LogPosition; - } - break; - default: - break; - } - } + if (!Header.IsValid()) + { + return 0; + } + + if (Header.Version != CacheBucketIndexHeader::Version2) + { + return 0; + } + + const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); + if (Header.EntryCount > ExpectedEntryCount) + { + return 0; + } + + InvalidGuard.Dismiss(); + + size_t EntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + m_Configuration.PayloadAlignment = Header.PayloadAlignment; + + m_Payloads.reserve(Header.EntryCount); + m_Index.reserve(Header.EntryCount); + + BasicFileBuffer FileBuffer(ObjectIndexFile, 128 * 1024); + + uint64_t CurrentReadOffset = sizeof(CacheBucketIndexHeader); + uint64_t RemainingEntryCount = Header.EntryCount; + + std::string InvalidEntryReason; + while (RemainingEntryCount--) + { + const DiskIndexEntry* Entry = FileBuffer.MakeView(CurrentReadOffset); + CurrentReadOffset += sizeof(DiskIndexEntry); + + if (!ValidateCacheBucketIndexEntry(*Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); + continue; } - ZEN_WARN("skipping invalid index file '{}'", IndexPath); + + const PayloadIndex EntryIndex = PayloadIndex(EntryCount); + m_Payloads.emplace_back(BucketPayload{.Location = Entry->Location}); + m_Index.insert_or_assign(Entry->Key, EntryIndex); + + EntryCount++; } - return 0; + + ZEN_ASSERT(EntryCount == m_Payloads.size()); + + m_AccessTimes.resize(EntryCount, AccessTime(GcClock::TickCount())); + + if (m_Configuration.EnableReferenceCaching) + { + m_FirstReferenceIndex.resize(EntryCount); + } + + OutVersion = CacheBucketIndexHeader::Version2; + return Header.LogPosition; } uint64_t @@ -555,61 +929,73 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui { ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadLog"); - if (std::filesystem::is_regular_file(LogPath)) + if (!std::filesystem::is_regular_file(LogPath)) { - uint64_t LogEntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - TCasLogFile CasLog; - CasLog.Open(LogPath, CasLogFile::Mode::kRead); - if (CasLog.Initialize()) - { - uint64_t EntryCount = CasLog.GetLogCount(); - if (EntryCount < SkipEntryCount) - { - ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); - SkipEntryCount = 0; - } - LogEntryCount = EntryCount - SkipEntryCount; - uint64_t InvalidEntryCount = 0; - CasLog.Replay( - [&](const DiskIndexEntry& Record) { - std::string InvalidEntryReason; - if (Record.Location.Flags & DiskLocation::kTombStone) - { - m_Index.erase(Record.Key); - return; - } - if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); - ++InvalidEntryCount; - return; - } - PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); - m_Payloads.emplace_back(BucketPayload{.Location = Record.Location}); - m_Index.insert_or_assign(Record.Key, EntryIndex); - }, - SkipEntryCount); - m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount())); - if (m_Configuration.EnableReferenceCaching) + return 0; + } + + uint64_t LogEntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + TCasLogFile CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (!CasLog.Initialize()) + { + return 0; + } + + const uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; + } + + LogEntryCount = EntryCount - SkipEntryCount; + uint64_t InvalidEntryCount = 0; + + CasLog.Replay( + [&](const DiskIndexEntry& Record) { + std::string InvalidEntryReason; + if (Record.Location.Flags & DiskLocation::kTombStone) { - m_FirstReferenceIndex.resize(m_Payloads.size()); + // Note: this leaves m_Payloads and other arrays with 'holes' in them + m_Index.erase(Record.Key); + return; } - if (InvalidEntryCount) + + if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason)) { - ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir); + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; } - return LogEntryCount; - } + PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); + m_Payloads.emplace_back(BucketPayload{.Location = Record.Location}); + m_Index.insert_or_assign(Record.Key, EntryIndex); + }, + SkipEntryCount); + + m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount())); + + if (m_Configuration.EnableReferenceCaching) + { + m_FirstReferenceIndex.resize(m_Payloads.size()); } - return 0; + + if (InvalidEntryCount) + { + ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir); + } + + return LogEntryCount; }; void -ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) +ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(const bool IsNew) { ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenLog"); @@ -661,15 +1047,14 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) } else if (fs::is_regular_file(LogPath)) { - ZEN_WARN("removing invalid cas log at '{}'", LogPath); + ZEN_WARN("removing invalid log at '{}'", LogPath); std::filesystem::remove(LogPath); } } m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); - std::vector KnownLocations; - KnownLocations.reserve(m_Index.size()); + BlockStore::BlockIndexSet KnownBlocks; for (const auto& Entry : m_Index) { size_t EntryIndex = Entry.second; @@ -679,19 +1064,19 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { m_StandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed); - continue; } - const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment); - KnownLocations.push_back(BlockLocation); + else + { + const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment); + KnownBlocks.Add(BlockLocation.BlockIndex); + } } - - m_BlockStore.SyncExistingBlocksOnDisk(KnownLocations); + m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); if (IsNew || LogEntryCount > 0) { - MakeIndexSnapshot(); + WriteIndexSnapshot(); } - // TODO: should validate integrity of container files here } void @@ -981,20 +1366,7 @@ ZenCacheDiskLayer::CacheBucket::Flush() m_BlockStore.Flush(/*ForceNewBlock*/ false); m_SlogFile.Flush(); - std::vector AccessTimes; - std::vector Payloads; - std::vector MetaDatas; - IndexMap Index; - - { - RwLock::SharedLockScope IndexLock(m_IndexLock); - MakeIndexSnapshot(); - Index = m_Index; - Payloads = m_Payloads; - AccessTimes = m_AccessTimes; - MetaDatas = m_MetaDatas; - } - SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), Payloads, MetaDatas)); + SaveSnapshot(); } catch (std::exception& Ex) { @@ -1003,113 +1375,85 @@ ZenCacheDiskLayer::CacheBucket::Flush() } void -ZenCacheDiskLayer::CacheBucket::SaveManifest(CbObject&& Manifest, const std::function& ClaimDiskReserveFunc) +ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function& ClaimDiskReserveFunc) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::SaveManifest"); try { - IoBuffer Buffer = Manifest.GetBuffer().AsIoBuffer(); - - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); - if (Error) - { - ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); - return; - } - bool EnoughSpace = Space.Free >= Buffer.GetSize() + 1024 * 512; - if (!EnoughSpace) - { - uint64_t ReclaimedSpace = ClaimDiskReserveFunc(); - EnoughSpace = (Space.Free + ReclaimedSpace) >= Buffer.GetSize() + 1024 * 512; - } - if (!EnoughSpace) - { - ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}", m_BucketDir, NiceBytes(Buffer.GetSize())); - return; - } - WriteFile(m_BucketDir / "zen_manifest", Buffer); - } - catch (std::exception& Err) - { - ZEN_WARN("writing manifest in '{}' FAILED, reason: '{}'", m_BucketDir, Err.what()); - } -} - -CbObject -ZenCacheDiskLayer::CacheBucket::MakeManifest(IndexMap&& Index, - std::vector&& AccessTimes, - const std::vector& Payloads, - const std::vector& MetaDatas) -{ - using namespace std::literals; - - ZEN_TRACE_CPU("Z$::Disk::Bucket::MakeManifest"); + bool UseLegacyScheme = false; + uint64_t SidecarSize = 0; - size_t ItemCount = Index.size(); + IoBuffer Buffer; + BucketManifestSerializer ManifestWriter; - // This tends to overestimate a little bit but it is still way more accurate than what we get with exponential growth - // And we don't need to reallocate theunderying buffer in almost every case - const size_t EstimatedSizePerItem = 54u; - const size_t ReserveSize = ItemCount == 0 ? 48u : RoundUp(32u + (ItemCount * EstimatedSizePerItem), 128); - CbObjectWriter Writer(ReserveSize); + { + std::vector AccessTimes; + std::vector Payloads; + std::vector MetaDatas; + IndexMap Index; - Writer << "BucketId"sv << m_BucketId; - Writer << "Version"sv << CurrentDiskBucketVersion; + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + WriteIndexSnapshot(); + // Note: this copy could be eliminated on shutdown to + // reduce memory usage and execution time + Index = m_Index; + Payloads = m_Payloads; + AccessTimes = m_AccessTimes; + MetaDatas = m_MetaDatas; + } - if (!Index.empty()) - { - Writer.AddInteger("Count"sv, gsl::narrow(Index.size())); - Writer.BeginArray("Keys"sv); - for (auto& Kv : Index) - { - const IoHash& Key = Kv.first; - Writer.AddHash(Key); - } - Writer.EndArray(); + if (UseLegacyScheme) + { + Buffer = ManifestWriter.MakeManifest(m_BucketId, std::move(Index), std::move(AccessTimes), Payloads, MetaDatas); + } + else + { + const uint64_t EntryCount = Index.size(); + Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount); + SidecarSize = ManifestWriter.GetSidecarSize(); + } - Writer.BeginArray("Timestamps"sv); - for (auto& Kv : Index) - { - GcClock::Tick AccessTime = AccessTimes[Kv.second]; - Writer.AddInteger(AccessTime); - } - Writer.EndArray(); + const uint64_t RequiredSpace = SidecarSize + Buffer.GetSize() + 1024 * 512; - if (!MetaDatas.empty()) - { - Writer.BeginArray("RawHash"sv); - for (auto& Kv : Index) + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); + if (Error) { - const BucketPayload& Payload = Payloads[Kv.second]; - if (Payload.MetaData) - { - Writer.AddHash(MetaDatas[Payload.MetaData].RawHash); - } - else - { - Writer.AddHash(IoHash::Zero); - } + ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); + return; + } + bool EnoughSpace = Space.Free >= RequiredSpace; + if (!EnoughSpace) + { + uint64_t ReclaimedSpace = ClaimDiskReserveFunc(); + EnoughSpace = (Space.Free + ReclaimedSpace) >= RequiredSpace; + } + if (!EnoughSpace) + { + ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}", + m_BucketDir, + NiceBytes(Buffer.GetSize())); + return; } - Writer.EndArray(); - Writer.BeginArray("RawSize"sv); - for (auto& Kv : Index) + if (!UseLegacyScheme) { - const BucketPayload& Payload = Payloads[Kv.second]; - if (Payload.MetaData) - { - Writer.AddInteger(MetaDatas[Payload.MetaData].RawSize); - } - else - { - Writer.AddInteger(0); - } + ManifestWriter.WriteSidecarFile(GetMetaPath(m_BucketDir, m_BucketName), + m_LogFlushPosition, + std::move(Index), + std::move(AccessTimes), + Payloads, + MetaDatas); } - Writer.EndArray(); } + + std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName); + WriteFile(ManifestPath, Buffer); + } + catch (std::exception& Err) + { + ZEN_WARN("writing manifest in '{}' FAILED, reason: '{}'", m_BucketDir, Err.what()); } - return Writer.Save(); } IoHash @@ -1683,20 +2027,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) try { - std::vector AccessTimes; - std::vector Payloads; - std::vector MetaDatas; - IndexMap Index; - { - RwLock::SharedLockScope IndexLock(m_IndexLock); - MakeIndexSnapshot([&]() { return GcCtx.ClaimGCReserve(); }); - Index = m_Index; - Payloads = m_Payloads; - AccessTimes = m_AccessTimes; - MetaDatas = m_MetaDatas; - } - SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), Payloads, MetaDatas), - [&]() { return GcCtx.ClaimGCReserve(); }); + SaveSnapshot([&]() { return GcCtx.ClaimGCReserve(); }); } catch (std::exception& Ex) { -- cgit v1.2.3 From dcaeaac4ebc6255cb210ce54a18b1cd01b9eeaf8 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 28 Nov 2023 07:54:05 -0500 Subject: tracing for gcv2 (#574) - Improvement: Added more trace scopes for GCv2 - Bugfix: Make sure we can override flags to "false" when running `zen gc` commmand - `smallobjects`, `skipcid`, `skipdelete`, `verbose` --- src/zenserver/cache/cachedisklayer.cpp | 14 ++++++++++++++ 1 file changed, 14 insertions(+) (limited to 'src/zenserver/cache/cachedisklayer.cpp') diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 0767086ce..c8bc3871a 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -2703,6 +2703,8 @@ public: virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function& ClaimDiskReserveCallback) override { + ZEN_TRACE_CPU("Z$::Disk::Bucket::CompactStore"); + Stopwatch Timer; const auto _ = MakeGuard([&] { Reset(m_ExpiredStandaloneKeys); @@ -2916,6 +2918,8 @@ private: GcStoreCompactor* ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) { + ZEN_TRACE_CPU("Z$::Disk::Bucket::RemoveExpiredData"); + size_t TotalEntries = 0; Stopwatch Timer; @@ -3038,6 +3042,8 @@ public: virtual void LockState(GcCtx& Ctx) override { + ZEN_TRACE_CPU("Z$::Disk::Bucket::RemoveExpiredData"); + Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) @@ -3102,6 +3108,8 @@ public: virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override { + ZEN_TRACE_CPU("Z$::Disk::Bucket::RemoveUsedReferencesFromSet"); + ZEN_ASSERT(m_IndexLock); size_t InitialCount = IoCids.size(); Stopwatch Timer; @@ -3135,6 +3143,8 @@ public: std::vector ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx) { + ZEN_TRACE_CPU("Z$::Disk::Bucket::CreateReferenceCheckers"); + Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) @@ -3267,6 +3277,8 @@ ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx) void ZenCacheDiskLayer::CacheBucket::CompactReferences(RwLock::ExclusiveLockScope&) { + ZEN_TRACE_CPU("Z$::Disk::Bucket::CompactReferences"); + std::vector FirstReferenceIndex; std::vector NewReferenceHashes; std::vector NewNextReferenceHashesIndexes; @@ -3447,6 +3459,8 @@ ZenCacheDiskLayer::CacheBucket::CompactState(std::vector& Payloa IndexMap& Index, RwLock::ExclusiveLockScope& IndexLock) { + ZEN_TRACE_CPU("Z$::Disk::Bucket::CompactState"); + size_t EntryCount = m_Index.size(); Payloads.reserve(EntryCount); AccessTimes.reserve(EntryCount); -- cgit v1.2.3 From 68b3382ef7e0f7795b9a601aae73adc2f8ef9873 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Wed, 29 Nov 2023 09:14:57 -0500 Subject: global thread worker pools (#577) - Improvement: Use two global worker thread pools instead of ad-hoc creation of worker pools --- src/zenserver/cache/cachedisklayer.cpp | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) (limited to 'src/zenserver/cache/cachedisklayer.cpp') diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index c8bc3871a..9117b8820 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include @@ -3709,11 +3710,8 @@ ZenCacheDiskLayer::DiscoverBuckets() RwLock SyncLock; - const size_t MaxHwTreadUse = std::thread::hardware_concurrency(); - const int WorkerThreadPoolCount = gsl::narrow(Min(MaxHwTreadUse, FoundBucketDirectories.size())); - - WorkerThreadPool Pool(WorkerThreadPoolCount, "CacheBucket::OpenOrCreate"); - Latch WorkLatch(1); + WorkerThreadPool& Pool = GetLargeWorkerPool(); + Latch WorkLatch(1); for (auto& BucketPath : FoundBucketDirectories) { WorkLatch.AddCount(1); @@ -3825,11 +3823,8 @@ ZenCacheDiskLayer::Flush() } } { - const size_t MaxHwTreadUse = Max((std::thread::hardware_concurrency() / 4u), 1u); - const int WorkerThreadPoolCount = gsl::narrow(Min(MaxHwTreadUse, Buckets.size())); - - WorkerThreadPool Pool(WorkerThreadPoolCount, "CacheBucket::Flush"); - Latch WorkLatch(1); + WorkerThreadPool& Pool = GetSmallWorkerPool(); + Latch WorkLatch(1); for (auto& Bucket : Buckets) { WorkLatch.AddCount(1); -- cgit v1.2.3 From 1bbdc86732464170c2e7c6145a5a19cdb48fe396 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Fri, 1 Dec 2023 04:48:58 -0500 Subject: add separate PreCache step for GcReferenceChecker (#578) - Improvement: GCv2: Use separate PreCache step to improve concurrency when checking references - Improvement: GCv2: Improved verbose logging - Improvement: GCv2: Sort chunks to read by block/offset when finding references - Improvement: GCv2: Exit as soon as no more unreferenced items are left --- src/zenserver/cache/cachedisklayer.cpp | 483 +++++++++++++++++++++------------ 1 file changed, 314 insertions(+), 169 deletions(-) (limited to 'src/zenserver/cache/cachedisklayer.cpp') diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 9117b8820..955ab3a04 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -2502,6 +2502,10 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c RwLock::ExclusiveLockScope IndexLock(m_IndexLock); ValueLock.ReleaseNow(); + if (m_UpdatedKeys) + { + m_UpdatedKeys->insert(HashKey); + } PayloadIndex EntryIndex = {}; if (auto It = m_Index.find(HashKey); It == m_Index.end()) @@ -2652,6 +2656,10 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const m_SlogFile.Append({.Key = HashKey, .Location = Location}); RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + if (m_UpdatedKeys) + { + m_UpdatedKeys->insert(HashKey); + } if (auto It = m_Index.find(HashKey); It != m_Index.end()) { PayloadIndex EntryIndex = It.value(); @@ -2767,6 +2775,10 @@ public: } else { + RwLock::SharedLockScope ValueLock(m_Bucket.LockForHash(ExpiredKey.first)); + IndexLock.ReleaseNow(); + ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': checking standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8()); + std::error_code Ec; bool Existed = std::filesystem::is_regular_file(FilePath, Ec); if (Ec) @@ -2792,9 +2804,12 @@ public: if (Ctx.Settings.CollectSmallObjects) { + m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_UpdatedKeys = std::make_unique(); }); + auto __ = MakeGuard([&]() { m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_UpdatedKeys.reset(); }); }); + std::unordered_map BlockUsage; { - RwLock::SharedLockScope __(m_Bucket.m_IndexLock); + RwLock::SharedLockScope ___(m_Bucket.m_IndexLock); for (const auto& Entry : m_Bucket.m_Index) { ZenCacheDiskLayer::CacheBucket::PayloadIndex Index = Entry.second; @@ -2807,14 +2822,13 @@ public: } uint32_t BlockIndex = Loc.Location.BlockLocation.GetBlockIndex(); uint64_t ChunkSize = RoundUp(Loc.Size(), m_Bucket.m_Configuration.PayloadAlignment); - auto It = BlockUsage.find(BlockIndex); - if (It == BlockUsage.end()) + if (auto It = BlockUsage.find(BlockIndex); It != BlockUsage.end()) { - BlockUsage.insert_or_assign(BlockIndex, ChunkSize); + It->second += ChunkSize; } else { - It->second += ChunkSize; + BlockUsage.insert_or_assign(BlockIndex, ChunkSize); } } } @@ -2830,7 +2844,7 @@ public: if (BlocksToCompact.size() > 0) { { - RwLock::SharedLockScope __(m_Bucket.m_IndexLock); + RwLock::SharedLockScope ___(m_Bucket.m_IndexLock); for (const auto& Entry : m_Bucket.m_Index) { ZenCacheDiskLayer::CacheBucket::PayloadIndex Index = Entry.second; @@ -2863,27 +2877,25 @@ public: m_Bucket.m_Configuration.PayloadAlignment, [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) { std::vector MovedEntries; - RwLock::ExclusiveLockScope _(m_Bucket.m_IndexLock); + MovedEntries.reserve(MovedArray.size()); + RwLock::ExclusiveLockScope _(m_Bucket.m_IndexLock); for (const std::pair& Moved : MovedArray) { size_t ChunkIndex = Moved.first; const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; + if (m_Bucket.m_UpdatedKeys->contains(Key)) + { + continue; + } + if (auto It = m_Bucket.m_Index.find(Key); It != m_Bucket.m_Index.end()) { - ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[It->second]; - const BlockStoreLocation& OldLocation = BlockCompactState.GetLocation(ChunkIndex); - if (Payload.Location.GetBlockLocation(m_Bucket.m_Configuration.PayloadAlignment) != OldLocation) - { - // Someone has moved our chunk so lets just skip the new location we were provided, it will be - // GC:d at a later time - continue; - } - const BlockStoreLocation& NewLocation = Moved.second; - - Payload.Location = DiskLocation(NewLocation, - m_Bucket.m_Configuration.PayloadAlignment, - Payload.Location.GetFlags()); + ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[It->second]; + const BlockStoreLocation& NewLocation = Moved.second; + Payload.Location = DiskLocation(NewLocation, + m_Bucket.m_Configuration.PayloadAlignment, + Payload.Location.GetFlags()); MovedEntries.push_back({.Key = Key, .Location = Payload.Location}); } } @@ -2955,9 +2967,9 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) // Find out expired keys for (const auto& Entry : m_Index) { - const IoHash& Key = Entry.first; - ZenCacheDiskLayer::CacheBucket::PayloadIndex EntryIndex = Entry.second; - GcClock::Tick AccessTime = m_AccessTimes[EntryIndex]; + const IoHash& Key = Entry.first; + PayloadIndex EntryIndex = Entry.second; + GcClock::Tick AccessTime = m_AccessTimes[EntryIndex]; if (AccessTime >= ExpireTicks) { continue; @@ -3004,7 +3016,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) } } - if (!ExpiredEntries.empty()) + if (Ctx.Settings.IsDeleteMode && !ExpiredEntries.empty()) { std::vector Payloads; std::vector AccessTimes; @@ -3028,22 +3040,253 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) class DiskBucketReferenceChecker : public GcReferenceChecker { + using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex; + using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload; + using CacheBucket = ZenCacheDiskLayer::CacheBucket; + using ReferenceIndex = ZenCacheDiskLayer::CacheBucket::ReferenceIndex; + public: - DiskBucketReferenceChecker(ZenCacheDiskLayer::CacheBucket& Owner) : m_CacheBucket(Owner) {} + DiskBucketReferenceChecker(CacheBucket& Owner) : m_CacheBucket(Owner) {} virtual ~DiskBucketReferenceChecker() { - m_IndexLock.reset(); - if (!m_CacheBucket.m_Configuration.EnableReferenceCaching) + try + { + m_IndexLock.reset(); + if (!m_CacheBucket.m_Configuration.EnableReferenceCaching) + { + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); }); + // If reference caching is not enabled, we temporarily used the data structure for reference caching, lets reset it + m_CacheBucket.ClearReferenceCache(); + } + } + catch (std::exception& Ex) + { + ZEN_ERROR("~DiskBucketReferenceChecker threw exception: '{}'", Ex.what()); + } + } + + virtual void PreCache(GcCtx& Ctx) override + { + ZEN_TRACE_CPU("Z$::Disk::Bucket::PreCache"); + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: cachebucket [PRECACHE] '{}': found {} references in {}", + m_CacheBucket.m_BucketDir, + m_CacheBucket.m_ReferenceCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + std::vector UpdateKeys; + std::vector ReferenceCounts; + std::vector References; + + auto GetAttachments = [&References, &ReferenceCounts](const void* CbObjectData) { + size_t CurrentReferenceCount = References.size(); + CbObjectView Obj(CbObjectData); + Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); }); + ReferenceCounts.push_back(References.size() - CurrentReferenceCount); + }; + + // Refresh cache { - // If reference caching is not enabled, we temporarily used the data structure for reference caching, lets reset it - m_CacheBucket.ClearReferenceCache(); + // If reference caching is enabled the references will be updated at modification for us so we don't need to track modifications + if (!m_CacheBucket.m_Configuration.EnableReferenceCaching) + { + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys = std::make_unique(); }); + } + + std::vector StandaloneKeys; + { + std::vector InlineKeys; + std::unordered_map BlockIndexToEntriesPerBlockIndex; + struct InlineEntry + { + uint32_t InlineKeyIndex; + uint32_t Offset; + uint32_t Size; + }; + std::vector> EntriesPerBlock; + + { + RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock); + for (const auto& Entry : m_CacheBucket.m_Index) + { + if (Ctx.IsCancelledFlag.load()) + { + IndexLock.ReleaseNow(); + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); }); + return; + } + + PayloadIndex EntryIndex = Entry.second; + const BucketPayload& Payload = m_CacheBucket.m_Payloads[EntryIndex]; + const DiskLocation& Loc = Payload.Location; + + if (!Loc.IsFlagSet(DiskLocation::kStructured)) + { + continue; + } + if (m_CacheBucket.m_Configuration.EnableReferenceCaching && + m_CacheBucket.m_FirstReferenceIndex[EntryIndex] != ReferenceIndex::Unknown()) + { + continue; + } + const IoHash& Key = Entry.first; + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + StandaloneKeys.push_back(Key); + continue; + } + + BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_CacheBucket.m_Configuration.PayloadAlignment); + InlineEntry UpdateEntry = {.InlineKeyIndex = gsl::narrow(InlineKeys.size()), + .Offset = gsl::narrow(ChunkLocation.Offset), + .Size = gsl::narrow(ChunkLocation.Size)}; + InlineKeys.push_back(Key); + + if (auto It = BlockIndexToEntriesPerBlockIndex.find(ChunkLocation.BlockIndex); + It != BlockIndexToEntriesPerBlockIndex.end()) + { + EntriesPerBlock[It->second].emplace_back(UpdateEntry); + } + else + { + BlockIndexToEntriesPerBlockIndex.insert_or_assign(ChunkLocation.BlockIndex, EntriesPerBlock.size()); + EntriesPerBlock.emplace_back(std::vector{UpdateEntry}); + } + } + } + + for (auto It : BlockIndexToEntriesPerBlockIndex) + { + uint32_t BlockIndex = It.first; + + Ref BlockFile = m_CacheBucket.m_BlockStore.GetBlockFile(BlockIndex); + if (BlockFile) + { + size_t EntriesPerBlockIndex = It.second; + std::vector& InlineEntries = EntriesPerBlock[EntriesPerBlockIndex]; + + std::sort(InlineEntries.begin(), InlineEntries.end(), [&](const InlineEntry& Lhs, const InlineEntry& Rhs) -> bool { + return Lhs.Offset < Rhs.Offset; + }); + + uint64_t BlockFileSize = BlockFile->FileSize(); + BasicFileBuffer BlockBuffer(BlockFile->GetBasicFile(), 32768); + for (const InlineEntry& InlineEntry : InlineEntries) + { + if ((InlineEntry.Offset + InlineEntry.Size) > BlockFileSize) + { + ReferenceCounts.push_back(0); + } + else + { + MemoryView ChunkView = BlockBuffer.MakeView(InlineEntry.Size, InlineEntry.Offset); + if (ChunkView.GetSize() == InlineEntry.Size) + { + GetAttachments(ChunkView.GetData()); + } + else + { + std::vector Buffer(InlineEntry.Size); + BlockBuffer.Read(Buffer.data(), InlineEntry.Size, InlineEntry.Offset); + GetAttachments(Buffer.data()); + } + } + const IoHash& Key = InlineKeys[InlineEntry.InlineKeyIndex]; + UpdateKeys.push_back(Key); + } + } + } + } + { + for (const IoHash& Key : StandaloneKeys) + { + if (Ctx.IsCancelledFlag.load()) + { + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); }); + return; + } + + IoBuffer Buffer = m_CacheBucket.GetStandaloneCacheValue(ZenContentType::kCbObject, Key); + if (!Buffer) + { + continue; + } + + GetAttachments(Buffer.GetData()); + UpdateKeys.push_back(Key); + } + } + } + + { + size_t ReferenceOffset = 0; + RwLock::ExclusiveLockScope IndexLock(m_CacheBucket.m_IndexLock); + + if (!m_CacheBucket.m_Configuration.EnableReferenceCaching) + { + ZEN_ASSERT(m_CacheBucket.m_FirstReferenceIndex.empty()); + ZEN_ASSERT(m_CacheBucket.m_ReferenceHashes.empty()); + ZEN_ASSERT(m_CacheBucket.m_NextReferenceHashesIndexes.empty()); + ZEN_ASSERT(m_CacheBucket.m_ReferenceCount == 0); + ZEN_ASSERT(m_CacheBucket.m_UpdatedKeys); + + // If reference caching is not enabled, we will resize and use the data structure in place for reference caching when + // we figure out what this bucket references. This will be reset once the DiskBucketReferenceChecker is deleted. + m_CacheBucket.m_FirstReferenceIndex.resize(m_CacheBucket.m_Payloads.size(), ReferenceIndex::Unknown()); + m_CacheBucket.m_ReferenceHashes.reserve(References.size()); + m_CacheBucket.m_NextReferenceHashesIndexes.reserve(References.size()); + } + else + { + ZEN_ASSERT(!m_CacheBucket.m_UpdatedKeys); + } + + for (size_t Index = 0; Index < UpdateKeys.size(); Index++) + { + const IoHash& Key = UpdateKeys[Index]; + size_t ReferenceCount = ReferenceCounts[Index]; + if (auto It = m_CacheBucket.m_Index.find(Key); It != m_CacheBucket.m_Index.end()) + { + PayloadIndex EntryIndex = It->second; + if (m_CacheBucket.m_Configuration.EnableReferenceCaching) + { + if (m_CacheBucket.m_FirstReferenceIndex[EntryIndex] != ReferenceIndex::Unknown()) + { + // The reference data is valid and what we have is old/redundant + continue; + } + } + else if (m_CacheBucket.m_UpdatedKeys->contains(Key)) + { + // Our pre-cache data is invalid + continue; + } + + m_CacheBucket.SetReferences(IndexLock, + m_CacheBucket.m_FirstReferenceIndex[EntryIndex], + std::span{References.data() + ReferenceOffset, ReferenceCount}); + } + ReferenceOffset += ReferenceCount; + } + + if (m_CacheBucket.m_Configuration.EnableReferenceCaching && !UpdateKeys.empty()) + { + m_CacheBucket.CompactReferences(IndexLock); + } } } virtual void LockState(GcCtx& Ctx) override { - ZEN_TRACE_CPU("Z$::Disk::Bucket::RemoveExpiredData"); + ZEN_TRACE_CPU("Z$::Disk::Bucket::LockState"); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -3062,31 +3305,38 @@ public: { m_UncachedReferences.clear(); m_IndexLock.reset(); + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); }); return; } - // Rescan to see if any cache items needs refreshing since last pass when we had the lock - for (const auto& Entry : m_CacheBucket.m_Index) + if (m_CacheBucket.m_UpdatedKeys) { - if (Ctx.IsCancelledFlag.load()) + const HashSet& UpdatedKeys(*m_CacheBucket.m_UpdatedKeys); + for (const IoHash& Key : UpdatedKeys) { - m_UncachedReferences.clear(); - m_IndexLock.reset(); - return; - } + if (Ctx.IsCancelledFlag.load()) + { + m_UncachedReferences.clear(); + m_IndexLock.reset(); + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); }); + return; + } + + auto It = m_CacheBucket.m_Index.find(Key); + if (It == m_CacheBucket.m_Index.end()) + { + continue; + } - size_t PayloadIndex = Entry.second; - const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_CacheBucket.m_Payloads[PayloadIndex]; - const DiskLocation& Loc = Payload.Location; + PayloadIndex EntryIndex = It->second; + const BucketPayload& Payload = m_CacheBucket.m_Payloads[EntryIndex]; + const DiskLocation& Loc = Payload.Location; + + if (!Loc.IsFlagSet(DiskLocation::kStructured)) + { + continue; + } - if (!Loc.IsFlagSet(DiskLocation::kStructured)) - { - continue; - } - ZEN_ASSERT(!m_CacheBucket.m_FirstReferenceIndex.empty()); - const IoHash& Key = Entry.first; - if (m_CacheBucket.m_FirstReferenceIndex[PayloadIndex] == ZenCacheDiskLayer::CacheBucket::ReferenceIndex::Unknown()) - { IoBuffer Buffer; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { @@ -3128,15 +3378,27 @@ public: for (const IoHash& ReferenceHash : m_CacheBucket.m_ReferenceHashes) { - IoCids.erase(ReferenceHash); + if (IoCids.erase(ReferenceHash) == 1) + { + if (IoCids.empty()) + { + return; + } + } } for (const IoHash& ReferenceHash : m_UncachedReferences) { - IoCids.erase(ReferenceHash); + if (IoCids.erase(ReferenceHash) == 1) + { + if (IoCids.empty()) + { + return; + } + } } } - ZenCacheDiskLayer::CacheBucket& m_CacheBucket; + CacheBucket& m_CacheBucket; std::unique_ptr m_IndexLock; HashSet m_UncachedReferences; }; @@ -3152,126 +3414,9 @@ ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx) { return; } - ZEN_INFO("GCV2: cachebucket [CREATE CHECKERS] '{}': found {} references in {}", - m_BucketDir, - m_ReferenceCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + ZEN_INFO("GCV2: cachebucket [CREATE CHECKERS] '{}': completed in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - std::vector UpdateKeys; - std::vector StandaloneKeys; - std::vector ReferenceCounts; - std::vector References; - - // Refresh cache - { - RwLock::SharedLockScope IndexLock(m_IndexLock); - for (const auto& Entry : m_Index) - { - if (Ctx.IsCancelledFlag.load()) - { - return {}; - } - - size_t PayloadIndex = Entry.second; - const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Payloads[PayloadIndex]; - const DiskLocation& Loc = Payload.Location; - - if (!Loc.IsFlagSet(DiskLocation::kStructured)) - { - continue; - } - if (m_Configuration.EnableReferenceCaching && - m_FirstReferenceIndex[PayloadIndex] != ZenCacheDiskLayer::CacheBucket::ReferenceIndex::Unknown()) - { - continue; - } - const IoHash& Key = Entry.first; - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - StandaloneKeys.push_back(Key); - continue; - } - IoBuffer Buffer = GetInlineCacheValue(Loc); - if (!Buffer) - { - UpdateKeys.push_back(Key); - ReferenceCounts.push_back(0); - continue; - } - size_t CurrentReferenceCount = References.size(); - { - CbObjectView Obj(Buffer.GetData()); - Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); }); - Buffer = {}; - } - UpdateKeys.push_back(Key); - ReferenceCounts.push_back(References.size() - CurrentReferenceCount); - } - } - { - for (const IoHash& Key : StandaloneKeys) - { - if (Ctx.IsCancelledFlag.load()) - { - return {}; - } - - IoBuffer Buffer = GetStandaloneCacheValue(ZenContentType::kCbObject, Key); - if (!Buffer) - { - continue; - } - - size_t CurrentReferenceCount = References.size(); - { - CbObjectView Obj(Buffer.GetData()); - Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); }); - Buffer = {}; - } - UpdateKeys.push_back(Key); - ReferenceCounts.push_back(References.size() - CurrentReferenceCount); - } - } - - { - size_t ReferenceOffset = 0; - RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - if (!m_Configuration.EnableReferenceCaching) - { - ZEN_ASSERT(m_FirstReferenceIndex.empty()); - ZEN_ASSERT(m_ReferenceHashes.empty()); - ZEN_ASSERT(m_NextReferenceHashesIndexes.empty()); - ZEN_ASSERT(m_ReferenceCount == 0); - // If reference caching is not enabled, we will resize and use the data structure in place for reference caching when - // we figure out what this bucket references. This will be reset once the DiskBucketReferenceChecker is deleted. - m_FirstReferenceIndex.resize(m_Payloads.size()); - } - for (size_t Index = 0; Index < UpdateKeys.size(); Index++) - { - const IoHash& Key = UpdateKeys[Index]; - size_t ReferenceCount = ReferenceCounts[Index]; - auto It = m_Index.find(Key); - if (It == m_Index.end()) - { - ReferenceOffset += ReferenceCount; - continue; - } - if (m_FirstReferenceIndex[It->second] != ReferenceIndex::Unknown()) - { - continue; - } - SetReferences(IndexLock, - m_FirstReferenceIndex[It->second], - std::span{References.data() + ReferenceOffset, ReferenceCount}); - ReferenceOffset += ReferenceCount; - } - if (m_Configuration.EnableReferenceCaching) - { - CompactReferences(IndexLock); - } - } - return {new DiskBucketReferenceChecker(*this)}; } -- cgit v1.2.3 From 2755b0dbfd47237e018048598e9c85c71b9a0736 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Fri, 1 Dec 2023 07:57:13 -0500 Subject: use 32 bit offset and size in BlockStoreLocation (#581) - Improvement: Reduce memory usage in GC and diskbucket flush --- src/zenserver/cache/cachedisklayer.cpp | 113 ++++++++++++++++++++++----------- 1 file changed, 75 insertions(+), 38 deletions(-) (limited to 'src/zenserver/cache/cachedisklayer.cpp') diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 955ab3a04..2c344dd1d 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -255,8 +255,8 @@ public: uint64_t GetSidecarSize() const { return m_ManifestEntryCount * sizeof(ManifestData); } void WriteSidecarFile(const std::filesystem::path& SidecarPath, uint64_t SnapshotLogPosition, - ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, - std::vector&& AccessTimes, + const ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + const std::vector& AccessTimes, const std::vector& Payloads, const std::vector& MetaDatas); bool ReadSidecarFile(ZenCacheDiskLayer::CacheBucket& Bucket, @@ -265,11 +265,11 @@ public: std::vector& AccessTimes, std::vector& Payloads); - IoBuffer MakeManifest(const Oid& BucketId, - ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, - std::vector&& AccessTimes, - const std::vector& Payloads, - const std::vector& MetaDatas); + IoBuffer MakeManifest(const Oid& BucketId, + ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, + std::vector&& AccessTimes, + std::vector&& Payloads, + std::vector&& MetaDatas); CbObject Manifest; @@ -399,11 +399,11 @@ BucketManifestSerializer::GenerateNewManifest(std::filesystem::path ManifestPath } IoBuffer -BucketManifestSerializer::MakeManifest(const Oid& BucketId, - ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, - std::vector&& AccessTimes, - const std::vector& Payloads, - const std::vector& MetaDatas) +BucketManifestSerializer::MakeManifest(const Oid& BucketId, + ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, + std::vector&& AccessTimes, + std::vector&& Payloads, + std::vector&& MetaDatas) { using namespace std::literals; @@ -579,8 +579,8 @@ BucketManifestSerializer::ReadSidecarFile(ZenCacheDiskLayer::CacheBucket& void BucketManifestSerializer::WriteSidecarFile(const std::filesystem::path& SidecarPath, uint64_t SnapshotLogPosition, - ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, - std::vector&& AccessTimes, + const ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + const std::vector& AccessTimes, const std::vector& Payloads, const std::vector& MetaDatas) { @@ -605,8 +605,11 @@ BucketManifestSerializer::WriteSidecarFile(const std::filesystem::path& { uint64_t WriteOffset = sizeof Header; - BasicFileWriter SidecarWriter(SidecarFile, 128 * 1024); + // BasicFileWriter SidecarWriter(SidecarFile, 128 * 1024); + std::vector ManifestDataBuffer; + const size_t MaxManifestDataBufferCount = Min(Index.size(), 4096u); // 256 Kb + ManifestDataBuffer.reserve(MaxManifestDataBufferCount); for (auto& Kv : Index) { const IoHash& Key = Kv.first; @@ -621,12 +624,24 @@ BucketManifestSerializer::WriteSidecarFile(const std::filesystem::path& RawSize = MetaDatas[MetaIndex].RawSize; } - ManifestData ManifestEntry = - {.Key = Key, .Timestamp = AccessTimes[PlIndex], .RawHash = RawHash, .Padding_0 = 0, .RawSize = RawSize, .Padding_1 = 0}; - - SidecarWriter.Write(&ManifestEntry, sizeof ManifestEntry, WriteOffset); - - WriteOffset += sizeof ManifestEntry; + ManifestDataBuffer.emplace_back(ManifestData{.Key = Key, + .Timestamp = AccessTimes[PlIndex], + .RawHash = RawHash, + .Padding_0 = 0, + .RawSize = RawSize, + .Padding_1 = 0}); + if (ManifestDataBuffer.size() == MaxManifestDataBufferCount) + { + const uint64_t WriteSize = sizeof(ManifestData) * ManifestDataBuffer.size(); + SidecarFile.Write(ManifestDataBuffer.data(), WriteSize, WriteOffset); + WriteOffset += WriteSize; + ManifestDataBuffer.clear(); + ManifestDataBuffer.reserve(MaxManifestDataBufferCount); + } + } + if (ManifestDataBuffer.size() > 0) + { + SidecarFile.Write(ManifestDataBuffer.data(), sizeof(ManifestData) * ManifestDataBuffer.size(), WriteOffset); } } @@ -1380,12 +1395,12 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function& Cl { try { - bool UseLegacyScheme = false; - uint64_t SidecarSize = 0; + bool UseLegacyScheme = false; IoBuffer Buffer; BucketManifestSerializer ManifestWriter; + if (UseLegacyScheme) { std::vector AccessTimes; std::vector Payloads; @@ -1403,16 +1418,41 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function& Cl MetaDatas = m_MetaDatas; } - if (UseLegacyScheme) + Buffer = ManifestWriter.MakeManifest(m_BucketId, + std::move(Index), + std::move(AccessTimes), + std::move(Payloads), + std::move(MetaDatas)); + const uint64_t RequiredSpace = Buffer.GetSize() + 1024 * 512; + + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); + if (Error) { - Buffer = ManifestWriter.MakeManifest(m_BucketId, std::move(Index), std::move(AccessTimes), Payloads, MetaDatas); + ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); + return; } - else + bool EnoughSpace = Space.Free >= RequiredSpace; + if (!EnoughSpace) { - const uint64_t EntryCount = Index.size(); - Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount); - SidecarSize = ManifestWriter.GetSidecarSize(); + uint64_t ReclaimedSpace = ClaimDiskReserveFunc(); + EnoughSpace = (Space.Free + ReclaimedSpace) >= RequiredSpace; } + if (!EnoughSpace) + { + ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}", + m_BucketDir, + NiceBytes(Buffer.GetSize())); + return; + } + } + else + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + WriteIndexSnapshot(); + const uint64_t EntryCount = m_Index.size(); + Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount); + uint64_t SidecarSize = ManifestWriter.GetSidecarSize(); const uint64_t RequiredSpace = SidecarSize + Buffer.GetSize() + 1024 * 512; @@ -1437,15 +1477,12 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function& Cl return; } - if (!UseLegacyScheme) - { - ManifestWriter.WriteSidecarFile(GetMetaPath(m_BucketDir, m_BucketName), - m_LogFlushPosition, - std::move(Index), - std::move(AccessTimes), - Payloads, - MetaDatas); - } + ManifestWriter.WriteSidecarFile(GetMetaPath(m_BucketDir, m_BucketName), + m_LogFlushPosition, + m_Index, + m_AccessTimes, + m_Payloads, + m_MetaDatas); } std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName); -- cgit v1.2.3 From 0b49aa0c7eca736871488009254b31356c9a32ce Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 4 Dec 2023 08:33:07 -0500 Subject: memory usage estimation for memcached entries (#586) * do a more accurate memory usage estimation for memcached entries * early exit when checking memcache usage --- src/zenserver/cache/cachedisklayer.cpp | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) (limited to 'src/zenserver/cache/cachedisklayer.cpp') diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 2c344dd1d..05eb9658b 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -209,6 +209,9 @@ namespace { zen::Sleep(100); } while (true); } + + uint64_t EstimateMemCachePayloadMemory(uint64_t PayloadSize) { return 8u + 32u + RoundUp(PayloadSize, 8u); } + } // namespace namespace fs = std::filesystem; @@ -1289,14 +1292,26 @@ ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime) GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); RwLock::ExclusiveLockScope _(m_IndexLock); + if (m_MemCachedPayloads.empty()) + { + return; + } for (const auto& Kv : m_Index) { - if (m_AccessTimes[Kv.second] < ExpireTicks) + size_t Index = Kv.second; + BucketPayload& Payload = m_Payloads[Index]; + if (!Payload.MemCached) + { + continue; + } + if (m_AccessTimes[Index] < ExpireTicks) { - BucketPayload& Payload = m_Payloads[Kv.second]; RemoveMemCachedData(Payload); } } + m_MemCachedPayloads.shrink_to_fit(); + m_FreeMemCachedPayloads.shrink_to_fit(); + m_FreeMetaDatas.shrink_to_fit(); } void @@ -1305,6 +1320,10 @@ ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart, std::vector& InOutUsageSlots) { RwLock::SharedLockScope _(m_IndexLock); + if (m_MemCachedPayloads.empty()) + { + return; + } for (const auto& It : m_Index) { size_t Index = It.second; @@ -2630,7 +2649,7 @@ ZenCacheDiskLayer::CacheBucket::SetMemCachedData(BucketPayload& Payload, IoBuffe { Payload.MemCached = MemCachedIndex(gsl::narrow(m_MemCachedPayloads.size())); m_MemCachedPayloads.push_back(MemCachedData); - AddMemCacheUsage(PayloadSize); + AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); m_MemoryWriteCount++; } } @@ -2639,7 +2658,7 @@ ZenCacheDiskLayer::CacheBucket::SetMemCachedData(BucketPayload& Payload, IoBuffe Payload.MemCached = m_FreeMemCachedPayloads.back(); m_FreeMemCachedPayloads.pop_back(); m_MemCachedPayloads[Payload.MemCached] = MemCachedData; - AddMemCacheUsage(PayloadSize); + AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); m_MemoryWriteCount++; } } @@ -2650,7 +2669,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(BucketPayload& Payload) if (Payload.MemCached) { size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].GetSize(); - RemoveMemCacheUsage(PayloadSize); + RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); m_MemCachedPayloads[Payload.MemCached] = IoBuffer{}; m_FreeMemCachedPayloads.push_back(Payload.MemCached); Payload.MemCached = {}; -- cgit v1.2.3 From 8269e0616cf4333fd1007ccd8a7b1dac09743e11 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 4 Dec 2023 08:37:05 -0500 Subject: reserve vectors in gcv2 upfront / load factor for robin_map (#582) * reserve vectors in gcv2 upfront * set max load factor for robin_map indexes to reduce memory usage * set min load factor for robin_map indexes to allow them to shrink --- src/zenserver/cache/cachedisklayer.cpp | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) (limited to 'src/zenserver/cache/cachedisklayer.cpp') diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 05eb9658b..a4a37b0af 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -658,6 +658,9 @@ BucketManifestSerializer::WriteSidecarFile(const std::filesystem::path& ////////////////////////////////////////////////////////////////////////// +static const float IndexMinLoadFactor = 0.2f; +static const float IndexMaxLoadFactor = 0.7f; + ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, std::string BucketName, @@ -668,6 +671,9 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, , m_Configuration(Config) , m_BucketId(Oid::Zero) { + m_Index.min_load_factor(IndexMinLoadFactor); + m_Index.max_load_factor(IndexMaxLoadFactor); + if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap"))) { const uint64_t LegacyOverrideSize = 16 * 1024 * 1024; @@ -2863,7 +2869,8 @@ public: m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_UpdatedKeys = std::make_unique(); }); auto __ = MakeGuard([&]() { m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_UpdatedKeys.reset(); }); }); - std::unordered_map BlockUsage; + size_t InlineEntryCount = 0; + BlockStore::BlockUsageMap BlockUsage; { RwLock::SharedLockScope ___(m_Bucket.m_IndexLock); for (const auto& Entry : m_Bucket.m_Index) @@ -2876,15 +2883,17 @@ public: { continue; } + InlineEntryCount++; uint32_t BlockIndex = Loc.Location.BlockLocation.GetBlockIndex(); uint64_t ChunkSize = RoundUp(Loc.Size(), m_Bucket.m_Configuration.PayloadAlignment); if (auto It = BlockUsage.find(BlockIndex); It != BlockUsage.end()) { - It->second += ChunkSize; + It->second.EntryCount++; + It->second.DiskUsage += ChunkSize; } else { - BlockUsage.insert_or_assign(BlockIndex, ChunkSize); + BlockUsage.insert_or_assign(BlockIndex, BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1}); } } } @@ -2892,8 +2901,9 @@ public: { BlockStoreCompactState BlockCompactState; std::vector BlockCompactStateKeys; + BlockCompactStateKeys.reserve(InlineEntryCount); - std::vector BlocksToCompact = + BlockStore::BlockEntryCountMap BlocksToCompact = m_Bucket.m_BlockStore.GetBlocksToCompact(BlockUsage, Ctx.Settings.CompactBlockUsageThresholdPercent); BlockCompactState.IncludeBlocks(BlocksToCompact); @@ -3168,7 +3178,7 @@ public: uint32_t Size; }; std::vector> EntriesPerBlock; - + size_t UpdateCount = 0; { RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock); for (const auto& Entry : m_CacheBucket.m_Index) @@ -3193,6 +3203,7 @@ public: { continue; } + UpdateCount++; const IoHash& Key = Entry.first; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { @@ -3219,6 +3230,8 @@ public: } } + UpdateKeys.reserve(UpdateCount); + for (auto It : BlockIndexToEntriesPerBlockIndex) { uint32_t BlockIndex = It.first; @@ -3671,6 +3684,8 @@ ZenCacheDiskLayer::CacheBucket::CompactState(std::vector& Payloa FirstReferenceIndex.reserve(EntryCount); } Index.reserve(EntryCount); + Index.min_load_factor(IndexMinLoadFactor); + Index.max_load_factor(IndexMaxLoadFactor); for (auto It : m_Index) { PayloadIndex EntryIndex = PayloadIndex(Payloads.size()); -- cgit v1.2.3 From 4aa1b4d6e3312e72952d26ccd702209e7051e258 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 5 Dec 2023 16:21:47 -0500 Subject: Use correct iterator index when looking up memcached payload in GatherReferences (#591) * Use correct iterator index when looking up memcached payload in gatherreferences --- src/zenserver/cache/cachedisklayer.cpp | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) (limited to 'src/zenserver/cache/cachedisklayer.cpp') diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index a4a37b0af..700529443 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -1954,10 +1954,9 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) for (const auto& Entry : StructuredItemsWithUnknownAttachments) { - const IoHash& Key = Entry.first; - size_t PayloadIndex = Entry.second; - BucketPayload& Payload = Payloads[PayloadIndex]; - const DiskLocation& Loc = Payload.Location; + const IoHash& Key = Entry.first; + BucketPayload& Payload = Payloads[Entry.second]; + const DiskLocation& Loc = Payload.Location; { IoBuffer Buffer; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) @@ -1980,7 +1979,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) #endif // CALCULATE_BLOCKING_TIME if (auto It = m_Index.find(Key); It != m_Index.end()) { - const BucketPayload& CachedPayload = Payloads[PayloadIndex]; + const BucketPayload& CachedPayload = Payloads[It->second]; if (CachedPayload.MemCached) { Buffer = m_MemCachedPayloads[CachedPayload.MemCached]; -- cgit v1.2.3 From a331880c88668c8c1e793c12acd88bc1d60f9ee0 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 11 Dec 2023 03:44:00 -0500 Subject: fix deadlock at bucket creation (#598) - Make sure we don't hold the namespace bucket lock when we create buckets to avoid deadlock - Pass lock scope to helper functions to clarify locking rules - Block flush and gc operations for a bucket that is not yet initialized - Add ZenCacheDiskLayer::GetOrCreateBucket to avoid code duplication --- src/zenserver/cache/cachedisklayer.cpp | 338 +++++++++++++++++---------------- 1 file changed, 175 insertions(+), 163 deletions(-) (limited to 'src/zenserver/cache/cachedisklayer.cpp') diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 700529443..13f3c9e58 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -246,7 +246,8 @@ public: return OutVersion == CurrentDiskBucketVersion; } - void ParseManifest(ZenCacheDiskLayer::CacheBucket& Bucket, + void ParseManifest(RwLock::ExclusiveLockScope& BucketLock, + ZenCacheDiskLayer::CacheBucket& Bucket, std::filesystem::path ManifestPath, ZenCacheDiskLayer::CacheBucket::IndexMap& Index, std::vector& AccessTimes, @@ -256,13 +257,15 @@ public: IoBuffer MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount); uint64_t GetSidecarSize() const { return m_ManifestEntryCount * sizeof(ManifestData); } - void WriteSidecarFile(const std::filesystem::path& SidecarPath, + void WriteSidecarFile(RwLock::SharedLockScope& BucketLock, + const std::filesystem::path& SidecarPath, uint64_t SnapshotLogPosition, const ZenCacheDiskLayer::CacheBucket::IndexMap& Index, const std::vector& AccessTimes, const std::vector& Payloads, const std::vector& MetaDatas); - bool ReadSidecarFile(ZenCacheDiskLayer::CacheBucket& Bucket, + bool ReadSidecarFile(RwLock::ExclusiveLockScope& BucketLock, + ZenCacheDiskLayer::CacheBucket& Bucket, std::filesystem::path SidecarPath, ZenCacheDiskLayer::CacheBucket::IndexMap& Index, std::vector& AccessTimes, @@ -309,7 +312,8 @@ private: }; void -BucketManifestSerializer::ParseManifest(ZenCacheDiskLayer::CacheBucket& Bucket, +BucketManifestSerializer::ParseManifest(RwLock::ExclusiveLockScope& BucketLock, + ZenCacheDiskLayer::CacheBucket& Bucket, std::filesystem::path ManifestPath, ZenCacheDiskLayer::CacheBucket::IndexMap& Index, std::vector& AccessTimes, @@ -317,7 +321,7 @@ BucketManifestSerializer::ParseManifest(ZenCacheDiskLayer::CacheBucket& B { if (Manifest["UsingMetaFile"sv].AsBool()) { - ReadSidecarFile(Bucket, GetMetaPath(Bucket.m_BucketDir, Bucket.m_BucketName), Index, AccessTimes, Payloads); + ReadSidecarFile(BucketLock, Bucket, GetMetaPath(Bucket.m_BucketDir, Bucket.m_BucketName), Index, AccessTimes, Payloads); return; } @@ -373,7 +377,7 @@ BucketManifestSerializer::ParseManifest(ZenCacheDiskLayer::CacheBucket& B if (RawSize != 0 || RawHash != IoHash::Zero) { BucketPayload& Payload = Payloads[KeyIndex]; - Bucket.SetMetaData(Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash}); + Bucket.SetMetaData(BucketLock, Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash}); } } @@ -496,7 +500,8 @@ BucketManifestSerializer::MakeSidecarManifest(const Oid& BucketId, uint64_t Entr } bool -BucketManifestSerializer::ReadSidecarFile(ZenCacheDiskLayer::CacheBucket& Bucket, +BucketManifestSerializer::ReadSidecarFile(RwLock::ExclusiveLockScope& BucketLock, + ZenCacheDiskLayer::CacheBucket& Bucket, std::filesystem::path SidecarPath, ZenCacheDiskLayer::CacheBucket::IndexMap& Index, std::vector& AccessTimes, @@ -567,7 +572,7 @@ BucketManifestSerializer::ReadSidecarFile(ZenCacheDiskLayer::CacheBucket& if (Entry->RawSize && Entry->RawHash != IoHash::Zero) { - Bucket.SetMetaData(PayloadEntry, BucketMetaData{.RawSize = Entry->RawSize, .RawHash = Entry->RawHash}); + Bucket.SetMetaData(BucketLock, PayloadEntry, BucketMetaData{.RawSize = Entry->RawSize, .RawHash = Entry->RawHash}); } } @@ -580,7 +585,8 @@ BucketManifestSerializer::ReadSidecarFile(ZenCacheDiskLayer::CacheBucket& } void -BucketManifestSerializer::WriteSidecarFile(const std::filesystem::path& SidecarPath, +BucketManifestSerializer::WriteSidecarFile(RwLock::SharedLockScope&, + const std::filesystem::path& SidecarPath, uint64_t SnapshotLogPosition, const ZenCacheDiskLayer::CacheBucket::IndexMap& Index, const std::vector& AccessTimes, @@ -696,6 +702,10 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo using namespace std::literals; ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenOrCreate"); + ZEN_ASSERT(m_IsFlushing.load()); + + // We want to take the lock here since we register as a GC referencer a construction + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); ZEN_LOG_SCOPE("opening cache bucket '{}'", BucketDir); @@ -738,20 +748,25 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo return false; } - InitializeIndexFromDisk(IsNew); + InitializeIndexFromDisk(IndexLock, IsNew); + + auto _ = MakeGuard([&]() { + // We are now initialized, allow flushing when we exit + m_IsFlushing.store(false); + }); if (IsNew) { return true; } - ManifestReader.ParseManifest(*this, ManifestPath, m_Index, m_AccessTimes, m_Payloads); + ManifestReader.ParseManifest(IndexLock, *this, ManifestPath, m_Index, m_AccessTimes, m_Payloads); return true; } void -ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshot(const std::function& ClaimDiskReserveFunc) +ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(const std::function& ClaimDiskReserveFunc) { ZEN_TRACE_CPU("Z$::Disk::Bucket::WriteIndexSnapshot"); @@ -861,7 +876,7 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshot(const std::function::IsValid(LogPath)) { - LogEntryCount = ReadLog(LogPath, m_LogFlushPosition); + LogEntryCount = ReadLog(IndexLock, LogPath, m_LogFlushPosition); } else if (fs::is_regular_file(LogPath)) { @@ -1100,7 +1115,7 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(const bool IsNew) if (IsNew || LogEntryCount > 0) { - WriteIndexSnapshot(); + WriteIndexSnapshot(IndexLock); } } @@ -1218,14 +1233,14 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal { ZEN_TRACE_CPU("Z$::Disk::Bucket::Get::MemCache"); OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value); - RwLock::ExclusiveLockScope _(m_IndexLock); + RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock); if (auto UpdateIt = m_Index.find(HashKey); UpdateIt != m_Index.end()) { - BucketPayload& WritePayload = m_Payloads[EntryIndex]; + BucketPayload& WritePayload = m_Payloads[UpdateIt->second]; // Only update if it has not already been updated by other thread if (!WritePayload.MemCached) { - SetMemCachedData(WritePayload, OutValue.Value); + SetMemCachedData(UpdateIndexLock, WritePayload, OutValue.Value); } } } @@ -1250,7 +1265,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); OutValue.RawSize = OutValue.Value.GetSize(); } - RwLock::ExclusiveLockScope __(m_IndexLock); + RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock); if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end()) { BucketPayload& WritePayload = m_Payloads[WriteIt.value()]; @@ -1258,7 +1273,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal // Only set if no other path has already updated the meta data if (!WritePayload.MetaData) { - SetMetaData(WritePayload, {.RawSize = OutValue.RawSize, .RawHash = OutValue.RawHash}); + SetMetaData(UpdateIndexLock, WritePayload, {.RawSize = OutValue.RawSize, .RawHash = OutValue.RawHash}); } } } @@ -1297,7 +1312,7 @@ ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime) { GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); - RwLock::ExclusiveLockScope _(m_IndexLock); + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); if (m_MemCachedPayloads.empty()) { return; @@ -1312,7 +1327,7 @@ ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime) } if (m_AccessTimes[Index] < ExpireTicks) { - RemoveMemCachedData(Payload); + RemoveMemCachedData(IndexLock, Payload); } } m_MemCachedPayloads.shrink_to_fit(); @@ -1434,7 +1449,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function& Cl { RwLock::SharedLockScope IndexLock(m_IndexLock); - WriteIndexSnapshot(); + WriteIndexSnapshot(IndexLock); // Note: this copy could be eliminated on shutdown to // reduce memory usage and execution time Index = m_Index; @@ -1474,7 +1489,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function& Cl else { RwLock::SharedLockScope IndexLock(m_IndexLock); - WriteIndexSnapshot(); + WriteIndexSnapshot(IndexLock); const uint64_t EntryCount = m_Index.size(); Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount); uint64_t SidecarSize = ManifestWriter.GetSidecarSize(); @@ -1502,7 +1517,8 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function& Cl return; } - ManifestWriter.WriteSidecarFile(GetMetaPath(m_BucketDir, m_BucketName), + ManifestWriter.WriteSidecarFile(IndexLock, + GetMetaPath(m_BucketDir, m_BucketName), m_LogFlushPosition, m_Index, m_AccessTimes, @@ -1776,8 +1792,8 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) m_StandaloneSize.fetch_sub(Location.Size(), std::memory_order::relaxed); } - RemoveMemCachedData(Payload); - RemoveMetaData(Payload); + RemoveMemCachedData(IndexLock, Payload); + RemoveMetaData(IndexLock, Payload); Location.Flags |= DiskLocation::kTombStone; LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location}); @@ -1813,7 +1829,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - CompactState(Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock); + CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock); } } } @@ -1875,6 +1891,10 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); #endif // CALCULATE_BLOCKING_TIME + if (m_Index.empty()) + { + return; + } Index = m_Index; AccessTimes = m_AccessTimes; Payloads = m_Payloads; @@ -2097,8 +2117,6 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) } }); - m_SlogFile.Flush(); - auto __ = MakeGuard([&]() { if (!DeletedChunks.empty()) { @@ -2117,13 +2135,20 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); - CompactState(Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock); + CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock); } GcCtx.AddDeletedCids(std::vector(DeletedChunks.begin(), DeletedChunks.end())); } }); - std::span ExpiredCacheKeySpan = GcCtx.ExpiredCacheKeys(m_BucketDir.string()); + std::span ExpiredCacheKeySpan = GcCtx.ExpiredCacheKeys(m_BucketDir.string()); + if (ExpiredCacheKeySpan.empty()) + { + return; + } + + m_SlogFile.Flush(); + std::unordered_set ExpiredCacheKeys(ExpiredCacheKeySpan.begin(), ExpiredCacheKeySpan.end()); std::vector ExpiredStandaloneEntries; @@ -2291,7 +2316,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) std::vector LogEntries; LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); { - RwLock::ExclusiveLockScope __(m_IndexLock); + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); Stopwatch Timer; const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); @@ -2329,8 +2354,8 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) m_Configuration.PayloadAlignment, OldDiskLocation.GetFlags() | DiskLocation::kTombStone)}); - RemoveMemCachedData(Payload); - RemoveMetaData(Payload); + RemoveMemCachedData(IndexLock, Payload); + RemoveMetaData(IndexLock, Payload); m_Index.erase(ChunkHash); DeletedChunks.insert(ChunkHash); @@ -2367,7 +2392,7 @@ ZenCacheDiskLayer::CacheBucket::EntryCount() const } CacheValueDetails::ValueDetails -ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, PayloadIndex Index) const +ZenCacheDiskLayer::CacheBucket::GetValueDetails(RwLock::SharedLockScope& IndexLock, const IoHash& Key, PayloadIndex Index) const { std::vector Attachments; const BucketPayload& Payload = m_Payloads[Index]; @@ -2379,7 +2404,7 @@ ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, PayloadIndex CbObjectView Obj(Value.GetData()); Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); }); } - BucketMetaData MetaData = GetMetaData(Payload); + BucketMetaData MetaData = GetMetaData(IndexLock, Payload); return CacheValueDetails::ValueDetails{.Size = Payload.Location.Size(), .RawSize = MetaData.RawSize, .RawHash = MetaData.RawHash, @@ -2389,7 +2414,7 @@ ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, PayloadIndex } CacheValueDetails::BucketDetails -ZenCacheDiskLayer::CacheBucket::GetValueDetails(const std::string_view ValueFilter) const +ZenCacheDiskLayer::CacheBucket::GetValueDetails(RwLock::SharedLockScope& IndexLock, const std::string_view ValueFilter) const { CacheValueDetails::BucketDetails Details; RwLock::SharedLockScope _(m_IndexLock); @@ -2398,7 +2423,7 @@ ZenCacheDiskLayer::CacheBucket::GetValueDetails(const std::string_view ValueFilt Details.Values.reserve(m_Index.size()); for (const auto& It : m_Index) { - Details.Values.insert_or_assign(It.first, GetValueDetails(It.first, It.second)); + Details.Values.insert_or_assign(It.first, GetValueDetails(IndexLock, It.first, It.second)); } } else @@ -2406,7 +2431,7 @@ ZenCacheDiskLayer::CacheBucket::GetValueDetails(const std::string_view ValueFilt IoHash Key = IoHash::FromHexString(ValueFilter); if (auto It = m_Index.find(Key); It != m_Index.end()) { - Details.Values.insert_or_assign(It->first, GetValueDetails(It->first, It->second)); + Details.Values.insert_or_assign(It->first, GetValueDetails(IndexLock, It->first, It->second)); } } return Details; @@ -2416,10 +2441,10 @@ void ZenCacheDiskLayer::CacheBucket::EnumerateBucketContents( std::function& Fn) const { - RwLock::SharedLockScope _(m_IndexLock); + RwLock::SharedLockScope IndexLock(m_IndexLock); for (const auto& It : m_Index) { - CacheValueDetails::ValueDetails Vd = GetValueDetails(It.first, It.second); + CacheValueDetails::ValueDetails Vd = GetValueDetails(IndexLock, It.first, It.second); Fn(It.first, Vd); } @@ -2594,16 +2619,16 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References); } m_AccessTimes[EntryIndex] = GcClock::TickCount(); - RemoveMemCachedData(Payload); + RemoveMemCachedData(IndexLock, Payload); m_StandaloneSize.fetch_sub(OldSize, std::memory_order::relaxed); } if (Value.RawSize != 0 || Value.RawHash != IoHash::Zero) { - SetMetaData(m_Payloads[EntryIndex], {.RawSize = Value.RawSize, .RawHash = Value.RawHash}); + SetMetaData(IndexLock, m_Payloads[EntryIndex], {.RawSize = Value.RawSize, .RawHash = Value.RawHash}); } else { - RemoveMetaData(m_Payloads[EntryIndex]); + RemoveMetaData(IndexLock, m_Payloads[EntryIndex]); } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); @@ -2611,7 +2636,9 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c } void -ZenCacheDiskLayer::CacheBucket::SetMetaData(BucketPayload& Payload, const ZenCacheDiskLayer::CacheBucket::BucketMetaData& MetaData) +ZenCacheDiskLayer::CacheBucket::SetMetaData(RwLock::ExclusiveLockScope&, + BucketPayload& Payload, + const ZenCacheDiskLayer::CacheBucket::BucketMetaData& MetaData) { if (Payload.MetaData) { @@ -2634,7 +2661,7 @@ ZenCacheDiskLayer::CacheBucket::SetMetaData(BucketPayload& Payload, const ZenCac } void -ZenCacheDiskLayer::CacheBucket::RemoveMetaData(BucketPayload& Payload) +ZenCacheDiskLayer::CacheBucket::RemoveMetaData(RwLock::ExclusiveLockScope&, BucketPayload& Payload) { if (Payload.MetaData) { @@ -2644,7 +2671,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveMetaData(BucketPayload& Payload) } void -ZenCacheDiskLayer::CacheBucket::SetMemCachedData(BucketPayload& Payload, IoBuffer& MemCachedData) +ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload, IoBuffer& MemCachedData) { uint64_t PayloadSize = MemCachedData.GetSize(); ZEN_ASSERT(PayloadSize != 0); @@ -2669,7 +2696,7 @@ ZenCacheDiskLayer::CacheBucket::SetMemCachedData(BucketPayload& Payload, IoBuffe } size_t -ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(BucketPayload& Payload) +ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload) { if (Payload.MemCached) { @@ -2684,7 +2711,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(BucketPayload& Payload) } ZenCacheDiskLayer::CacheBucket::BucketMetaData -ZenCacheDiskLayer::CacheBucket::GetMetaData(const BucketPayload& Payload) const +ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const BucketPayload& Payload) const { if (Payload.MetaData) { @@ -2727,8 +2754,8 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZEN_ASSERT_SLOW(EntryIndex < PayloadIndex(m_AccessTimes.size())); BucketPayload& Payload = m_Payloads[EntryIndex]; - RemoveMemCachedData(Payload); - RemoveMetaData(Payload); + RemoveMemCachedData(IndexLock, Payload); + RemoveMetaData(IndexLock, Payload); Payload = (BucketPayload{.Location = Location}); m_AccessTimes[EntryIndex] = GcClock::TickCount(); @@ -3026,6 +3053,10 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) { return nullptr; } + if (m_Index.empty()) + { + return nullptr; + } TotalEntries = m_Index.size(); @@ -3071,8 +3102,8 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) auto It = m_Index.find(Entry.Key); ZEN_ASSERT(It != m_Index.end()); BucketPayload& Payload = m_Payloads[It->second]; - RemoveMetaData(Payload); - Stats.FreedMemory += RemoveMemCachedData(Payload); + RemoveMetaData(IndexLock, Payload); + Stats.FreedMemory += RemoveMemCachedData(IndexLock, Payload); m_Index.erase(It); Stats.DeletedCount++; } @@ -3091,7 +3122,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) IndexMap Index; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - CompactState(Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock); + CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock); } } @@ -3485,6 +3516,14 @@ ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx) ZEN_INFO("GCV2: cachebucket [CREATE CHECKERS] '{}': completed in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + { + RwLock::SharedLockScope __(m_IndexLock); + if (m_Index.empty()) + { + return {}; + } + } + return {new DiskBucketReferenceChecker(*this)}; } @@ -3665,7 +3704,8 @@ ZenCacheDiskLayer::CacheBucket::ClearReferenceCache() } void -ZenCacheDiskLayer::CacheBucket::CompactState(std::vector& Payloads, +ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&, + std::vector& Payloads, std::vector& AccessTimes, std::vector& MetaDatas, std::vector& MemCachedPayloads, @@ -3748,124 +3788,99 @@ ZenCacheDiskLayer::ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const st ZenCacheDiskLayer::~ZenCacheDiskLayer() { -} - -bool -ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) -{ - ZEN_TRACE_CPU("Z$::Disk::Get"); - - const auto BucketName = std::string(InBucket); - CacheBucket* Bucket = nullptr; - + try { - RwLock::SharedLockScope _(m_Lock); - - auto It = m_Buckets.find(BucketName); - - if (It != m_Buckets.end()) { - Bucket = It->second.get(); + RwLock::ExclusiveLockScope _(m_Lock); + for (auto& It : m_Buckets) + { + m_DroppedBuckets.emplace_back(std::move(It.second)); + } + m_Buckets.clear(); } + // We destroy the buckets without holding a lock since destructor calls GcManager::RemoveGcReferencer which takes an exclusive lock. + // This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock + m_DroppedBuckets.clear(); } - - if (Bucket == nullptr) + catch (std::exception& Ex) { - // Bucket needs to be opened/created + ZEN_ERROR("~ZenCacheDiskLayer() failed. Reason: '{}'", Ex.what()); + } +} - RwLock::ExclusiveLockScope _(m_Lock); +ZenCacheDiskLayer::CacheBucket* +ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) +{ + ZEN_TRACE_CPU("Z$::Disk::GetOrCreateBucket"); + const auto BucketName = std::string(InBucket); + { + RwLock::SharedLockScope SharedLock(m_Lock); if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { - Bucket = It->second.get(); - } - else - { - auto InsertResult = - m_Buckets.emplace(BucketName, - std::make_unique(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig)); - Bucket = InsertResult.first->second.get(); - - std::filesystem::path BucketPath = m_RootDir; - BucketPath /= BucketName; - - if (!Bucket->OpenOrCreate(BucketPath)) - { - m_Buckets.erase(InsertResult.first); - return false; - } + return It->second.get(); } } - ZEN_ASSERT(Bucket != nullptr); - if (Bucket->Get(HashKey, OutValue)) + // We create the bucket without holding a lock since contructor calls GcManager::AddGcReferencer which takes an exclusive lock. + // This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock + std::unique_ptr Bucket( + std::make_unique(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig)); + + RwLock::ExclusiveLockScope Lock(m_Lock); + if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { - TryMemCacheTrim(); - return true; + return It->second.get(); } - return false; -} - -void -ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span References) -{ - ZEN_TRACE_CPU("Z$::Disk::Put"); - - const auto BucketName = std::string(InBucket); - CacheBucket* Bucket = nullptr; + std::filesystem::path BucketPath = m_RootDir; + BucketPath /= BucketName; + try { - RwLock::SharedLockScope _(m_Lock); - - auto It = m_Buckets.find(BucketName); - - if (It != m_Buckets.end()) + if (!Bucket->OpenOrCreate(BucketPath)) { - Bucket = It->second.get(); + ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); + return nullptr; } } - - if (Bucket == nullptr) + catch (const std::exception& Err) { - // New bucket needs to be created + ZEN_WARN("Creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); + throw; + } - RwLock::ExclusiveLockScope _(m_Lock); + CacheBucket* Result = Bucket.get(); + m_Buckets.emplace(BucketName, std::move(Bucket)); - if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - else - { - auto InsertResult = - m_Buckets.emplace(BucketName, - std::make_unique(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig)); - Bucket = InsertResult.first->second.get(); + return Result; +} - std::filesystem::path BucketPath = m_RootDir; - BucketPath /= BucketName; +bool +ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) +{ + ZEN_TRACE_CPU("Z$::Disk::Get"); - try - { - if (!Bucket->OpenOrCreate(BucketPath)) - { - ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); - m_Buckets.erase(InsertResult.first); - return; - } - } - catch (const std::exception& Err) - { - ZEN_WARN("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); - throw; - } + if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) + { + if (Bucket->Get(HashKey, OutValue)) + { + TryMemCacheTrim(); + return true; } } + return false; +} - ZEN_ASSERT(Bucket != nullptr); +void +ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span References) +{ + ZEN_TRACE_CPU("Z$::Disk::Put"); - Bucket->Put(HashKey, Value, References); - TryMemCacheTrim(); + if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) + { + Bucket->Put(HashKey, Value, References); + TryMemCacheTrim(); + } } void @@ -4026,10 +4041,6 @@ ZenCacheDiskLayer::Flush() { RwLock::SharedLockScope __(m_Lock); - if (m_Buckets.empty()) - { - return; - } Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { @@ -4060,7 +4071,6 @@ void ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) { RwLock::SharedLockScope _(m_Lock); - { std::vector> Results; Results.reserve(m_Buckets.size()); @@ -4181,20 +4191,22 @@ ZenCacheDiskLayer::EnumerateBucketContents(std::string_view CacheValueDetails::NamespaceDetails ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const { - RwLock::SharedLockScope _(m_Lock); CacheValueDetails::NamespaceDetails Details; - if (BucketFilter.empty()) { - Details.Buckets.reserve(BucketFilter.empty() ? m_Buckets.size() : 1); - for (auto& Kv : m_Buckets) + RwLock::SharedLockScope IndexLock(m_Lock); + if (BucketFilter.empty()) + { + Details.Buckets.reserve(BucketFilter.empty() ? m_Buckets.size() : 1); + for (auto& Kv : m_Buckets) + { + Details.Buckets[Kv.first] = Kv.second->GetValueDetails(IndexLock, ValueFilter); + } + } + else if (auto It = m_Buckets.find(std::string(BucketFilter)); It != m_Buckets.end()) { - Details.Buckets[Kv.first] = Kv.second->GetValueDetails(ValueFilter); + Details.Buckets[It->first] = It->second->GetValueDetails(IndexLock, ValueFilter); } } - else if (auto It = m_Buckets.find(std::string(BucketFilter)); It != m_Buckets.end()) - { - Details.Buckets[It->first] = It->second->GetValueDetails(ValueFilter); - } return Details; } -- cgit v1.2.3 From c6cce91a514ba747b19f4fe8acfd2443405c960d Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 11 Dec 2023 06:36:48 -0500 Subject: mem cache perf improvements (#592) - Improvement: Refactor memory cache for faster trimming and correct trim reporting - Improvement: Added trace scopes for memory cache trimming Adding a link back to the cache item payload on the memory cache item allows us to iterate over only the items cached in memory instead of over the entire index. This also allows us to do efficient compact of the memory cache array when trimming. It adds 4 bytes of overhead to each item cached in memory. --- src/zenserver/cache/cachedisklayer.cpp | 236 ++++++++++++++++++--------------- 1 file changed, 132 insertions(+), 104 deletions(-) (limited to 'src/zenserver/cache/cachedisklayer.cpp') diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 13f3c9e58..0987cd0f1 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -209,9 +209,6 @@ namespace { zen::Sleep(100); } while (true); } - - uint64_t EstimateMemCachePayloadMemory(uint64_t PayloadSize) { return 8u + 32u + RoundUp(PayloadSize, 8u); } - } // namespace namespace fs = std::filesystem; @@ -1189,7 +1186,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal return false; } - size_t EntryIndex = It.value(); + PayloadIndex EntryIndex = It.value(); m_AccessTimes[EntryIndex] = GcClock::TickCount(); DiskLocation Location = m_Payloads[EntryIndex].Location; @@ -1206,7 +1203,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal if (Payload->MemCached) { - OutValue.Value = m_MemCachedPayloads[Payload->MemCached]; + OutValue.Value = m_MemCachedPayloads[Payload->MemCached].Payload; Payload = nullptr; IndexLock.ReleaseNow(); m_MemoryHitCount++; @@ -1240,7 +1237,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal // Only update if it has not already been updated by other thread if (!WritePayload.MemCached) { - SetMemCachedData(UpdateIndexLock, WritePayload, OutValue.Value); + SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value); } } } @@ -1307,64 +1304,84 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& m_DiskWriteCount++; } -void +uint64_t ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime) { + ZEN_TRACE_CPU("Z$::Disk::Bucket::MemCacheTrim"); + + uint64_t Trimmed = 0; GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - if (m_MemCachedPayloads.empty()) + uint32_t MemCachedCount = gsl::narrow(m_MemCachedPayloads.size()); + if (MemCachedCount == 0) { - return; + return 0; } - for (const auto& Kv : m_Index) + + uint32_t WriteIndex = 0; + for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex) { - size_t Index = Kv.second; - BucketPayload& Payload = m_Payloads[Index]; - if (!Payload.MemCached) + MemCacheData& Data = m_MemCachedPayloads[ReadIndex]; + if (!Data.Payload) { continue; } - if (m_AccessTimes[Index] < ExpireTicks) + PayloadIndex Index = Data.OwnerIndex; + ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex)); + GcClock::Tick AccessTime = m_AccessTimes[Index]; + if (AccessTime < ExpireTicks) + { + size_t PayloadSize = Data.Payload.GetSize(); + RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); + Data = {}; + m_Payloads[Index].MemCached = {}; + Trimmed += PayloadSize; + continue; + } + if (ReadIndex > WriteIndex) { - RemoveMemCachedData(IndexLock, Payload); + m_MemCachedPayloads[WriteIndex] = MemCacheData{.Payload = std::move(Data.Payload), .OwnerIndex = Index}; + m_Payloads[Index].MemCached = MemCachedIndex(WriteIndex); } + WriteIndex++; } + m_MemCachedPayloads.resize(WriteIndex); m_MemCachedPayloads.shrink_to_fit(); - m_FreeMemCachedPayloads.shrink_to_fit(); - m_FreeMetaDatas.shrink_to_fit(); + zen::Reset(m_FreeMemCachedPayloads); + return Trimmed; } void -ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart, - GcClock::Duration SectionLength, - std::vector& InOutUsageSlots) +ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint Now, GcClock::Duration MaxAge, std::vector& InOutUsageSlots) { + ZEN_TRACE_CPU("Z$::Disk::Bucket::GetUsageByAccess"); + + size_t SlotCount = InOutUsageSlots.capacity(); RwLock::SharedLockScope _(m_IndexLock); - if (m_MemCachedPayloads.empty()) + uint32_t MemCachedCount = gsl::narrow(m_MemCachedPayloads.size()); + if (MemCachedCount == 0) { return; } - for (const auto& It : m_Index) + for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex) { - size_t Index = It.second; - BucketPayload& Payload = m_Payloads[Index]; - if (!Payload.MemCached) + MemCacheData& Data = m_MemCachedPayloads[ReadIndex]; + if (!Data.Payload) { continue; } + PayloadIndex Index = Data.OwnerIndex; + ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex)); GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index])); - GcClock::Duration Age = TickStart.time_since_epoch() - ItemAccessTime.time_since_epoch(); - uint64_t Slot = gsl::narrow(Age.count() > 0 ? Age.count() / SectionLength.count() : 0); - if (Slot >= InOutUsageSlots.capacity()) + GcClock::Duration Age = Now > ItemAccessTime ? Now - ItemAccessTime : GcClock::Duration(0); + size_t Slot = Age < MaxAge ? gsl::narrow((Age.count() * SlotCount) / MaxAge.count()) : (SlotCount - 1); + ZEN_ASSERT_SLOW(Slot < SlotCount); + if (Slot >= InOutUsageSlots.size()) { - Slot = InOutUsageSlots.capacity() - 1; + InOutUsageSlots.resize(Slot + 1, 0); } - if (Slot > InOutUsageSlots.size()) - { - InOutUsageSlots.resize(uint64_t(Slot + 1), 0); - } - InOutUsageSlots[Slot] += m_MemCachedPayloads[Payload.MemCached].GetSize(); + InOutUsageSlots[Slot] += EstimateMemCachePayloadMemory(Data.Payload.GetSize()); } } @@ -1823,7 +1840,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) std::vector Payloads; std::vector AccessTimes; std::vector MetaDatas; - std::vector MemCachedPayloads; + std::vector MemCachedPayloads; std::vector FirstReferenceIndex; IndexMap Index; @@ -2002,7 +2019,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) const BucketPayload& CachedPayload = Payloads[It->second]; if (CachedPayload.MemCached) { - Buffer = m_MemCachedPayloads[CachedPayload.MemCached]; + Buffer = m_MemCachedPayloads[CachedPayload.MemCached].Payload; ZEN_ASSERT_SLOW(Buffer); } else @@ -2124,7 +2141,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) std::vector Payloads; std::vector AccessTimes; std::vector MetaDatas; - std::vector MemCachedPayloads; + std::vector MemCachedPayloads; std::vector FirstReferenceIndex; IndexMap Index; { @@ -2468,7 +2485,10 @@ ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) { Bucket->CollectGarbage(GcCtx); } - MemCacheTrim(Buckets, GcCtx.CacheExpireTime()); + if (!m_IsMemCacheTrimming) + { + MemCacheTrim(Buckets, GcCtx.CacheExpireTime()); + } } void @@ -2671,16 +2691,17 @@ ZenCacheDiskLayer::CacheBucket::RemoveMetaData(RwLock::ExclusiveLockScope&, Buck } void -ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload, IoBuffer& MemCachedData) +ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, PayloadIndex PayloadIndex, IoBuffer& MemCachedData) { - uint64_t PayloadSize = MemCachedData.GetSize(); + BucketPayload& Payload = m_Payloads[PayloadIndex]; + uint64_t PayloadSize = MemCachedData.GetSize(); ZEN_ASSERT(PayloadSize != 0); if (m_FreeMemCachedPayloads.empty()) { if (m_MemCachedPayloads.size() != std::numeric_limits::max()) { Payload.MemCached = MemCachedIndex(gsl::narrow(m_MemCachedPayloads.size())); - m_MemCachedPayloads.push_back(MemCachedData); + m_MemCachedPayloads.emplace_back(MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex}); AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); m_MemoryWriteCount++; } @@ -2689,7 +2710,7 @@ ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, Bu { Payload.MemCached = m_FreeMemCachedPayloads.back(); m_FreeMemCachedPayloads.pop_back(); - m_MemCachedPayloads[Payload.MemCached] = MemCachedData; + m_MemCachedPayloads[Payload.MemCached] = MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex}; AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); m_MemoryWriteCount++; } @@ -2700,9 +2721,9 @@ ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(RwLock::ExclusiveLockScope&, { if (Payload.MemCached) { - size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].GetSize(); + size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].Payload.GetSize(); RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); - m_MemCachedPayloads[Payload.MemCached] = IoBuffer{}; + m_MemCachedPayloads[Payload.MemCached] = {}; m_FreeMemCachedPayloads.push_back(Payload.MemCached); Payload.MemCached = {}; return PayloadSize; @@ -3117,7 +3138,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) std::vector Payloads; std::vector AccessTimes; std::vector MetaDatas; - std::vector MemCachedPayloads; + std::vector MemCachedPayloads; std::vector FirstReferenceIndex; IndexMap Index; { @@ -3708,7 +3729,7 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&, std::vector& Payloads, std::vector& AccessTimes, std::vector& MetaDatas, - std::vector& MemCachedPayloads, + std::vector& MemCachedPayloads, std::vector& FirstReferenceIndex, IndexMap& Index, RwLock::ExclusiveLockScope& IndexLock) @@ -3738,7 +3759,8 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&, } if (Payload.MemCached) { - MemCachedPayloads.push_back(std::move(m_MemCachedPayloads[Payload.MemCached])); + MemCachedPayloads.emplace_back( + MemCacheData{.Payload = std::move(m_MemCachedPayloads[Payload.MemCached].Payload), .OwnerIndex = EntryIndex}); Payload.MemCached = MemCachedIndex(gsl::narrow(MemCachedPayloads.size() - 1)); } if (m_Configuration.EnableReferenceCaching) @@ -4216,17 +4238,8 @@ ZenCacheDiskLayer::MemCacheTrim() ZEN_TRACE_CPU("Z$::Disk::MemCacheTrim"); ZEN_ASSERT(m_Configuration.MemCacheTargetFootprintBytes != 0); - - const GcClock::TimePoint Now = GcClock::Now(); - - const GcClock::Tick NowTick = Now.time_since_epoch().count(); - const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); - GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim; - const GcClock::Tick NextAllowedTrimTick = LastTrimTick + GcClock::Duration(TrimInterval).count(); - if (NowTick < NextAllowedTrimTick) - { - return; - } + ZEN_ASSERT(m_Configuration.MemCacheMaxAgeSeconds != 0); + ZEN_ASSERT(m_Configuration.MemCacheTrimIntervalSeconds != 0); bool Expected = false; if (!m_IsMemCacheTrimming.compare_exchange_strong(Expected, true)) @@ -4234,75 +4247,90 @@ ZenCacheDiskLayer::MemCacheTrim() return; } - // Bump time forward so we don't keep trying to do m_IsTrimming.compare_exchange_strong - const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); - m_LastTickMemCacheTrim.store(NextTrimTick); + try + { + m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this](JobContext&) { + ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]"); + + const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); + uint64_t TrimmedSize = 0; + Stopwatch Timer; + const auto Guard = MakeGuard([&] { + ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}", + NiceBytes(TrimmedSize), + NiceBytes(m_TotalMemCachedSize), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + + const GcClock::Tick NowTick = GcClock::TickCount(); + const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); + m_NextAllowedTrimTick.store(NextTrimTick); + m_IsMemCacheTrimming.store(false); + }); - m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this, Now, TrimInterval](JobContext&) { - ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]"); + const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds); - uint64_t StartSize = m_TotalMemCachedSize.load(); - Stopwatch Timer; - const auto Guard = MakeGuard([&] { - uint64_t EndSize = m_TotalMemCachedSize.load(); - ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}", - NiceBytes(StartSize > EndSize ? StartSize - EndSize : 0), - NiceBytes(m_TotalMemCachedSize), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - m_IsMemCacheTrimming.store(false); - }); + static const size_t UsageSlotCount = 2048; + std::vector UsageSlots; + UsageSlots.reserve(UsageSlotCount); - const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds); - - std::vector UsageSlots; - UsageSlots.reserve(std::chrono::seconds(MaxAge / TrimInterval).count()); + std::vector Buckets; + { + RwLock::SharedLockScope __(m_Lock); + Buckets.reserve(m_Buckets.size()); + for (auto& Kv : m_Buckets) + { + Buckets.push_back(Kv.second.get()); + } + } - std::vector Buckets; - { - RwLock::SharedLockScope __(m_Lock); - Buckets.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) + const GcClock::TimePoint Now = GcClock::Now(); { - Buckets.push_back(Kv.second.get()); + ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim GetUsageByAccess"); + for (CacheBucket* Bucket : Buckets) + { + Bucket->GetUsageByAccess(Now, MaxAge, UsageSlots); + } } - } - for (CacheBucket* Bucket : Buckets) - { - Bucket->GetUsageByAccess(Now, GcClock::Duration(TrimInterval), UsageSlots); - } - uint64_t TotalSize = 0; - for (size_t Index = 0; Index < UsageSlots.size(); ++Index) - { - TotalSize += UsageSlots[Index]; - if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes) + uint64_t TotalSize = 0; + for (size_t Index = 0; Index < UsageSlots.size(); ++Index) { - GcClock::TimePoint ExpireTime = Now - (TrimInterval * Index); - MemCacheTrim(Buckets, ExpireTime); - break; + TotalSize += UsageSlots[Index]; + if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes) + { + GcClock::TimePoint ExpireTime = Now - ((GcClock::Duration(MaxAge) * Index) / UsageSlotCount); + TrimmedSize = MemCacheTrim(Buckets, ExpireTime); + break; + } } - } - }); + }); + } + catch (std::exception& Ex) + { + ZEN_ERROR("Failed scheduling ZenCacheDiskLayer::MemCacheTrim. Reason: '{}'", Ex.what()); + m_IsMemCacheTrimming.store(false); + } } -void +uint64_t ZenCacheDiskLayer::MemCacheTrim(std::vector& Buckets, GcClock::TimePoint ExpireTime) { if (m_Configuration.MemCacheTargetFootprintBytes == 0) { - return; + return 0; } - RwLock::SharedLockScope __(m_Lock); + uint64_t TrimmedSize = 0; for (CacheBucket* Bucket : Buckets) { - Bucket->MemCacheTrim(ExpireTime); + TrimmedSize += Bucket->MemCacheTrim(ExpireTime); } const GcClock::TimePoint Now = GcClock::Now(); const GcClock::Tick NowTick = Now.time_since_epoch().count(); const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); - GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim; + GcClock::Tick LastTrimTick = m_NextAllowedTrimTick; const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); - m_LastTickMemCacheTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick); + m_NextAllowedTrimTick.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick); + return TrimmedSize; } #if ZEN_WITH_TESTS -- cgit v1.2.3