diff options
| author | Dan Engelbrecht <[email protected]> | 2023-10-30 09:32:54 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-10-30 09:32:54 +0100 |
| commit | 3a6a5855cf36967c6bde31292669bfaf832c6f0b (patch) | |
| tree | 593e7c21e6840e7ad312207fddc63e1934e19d85 /src | |
| parent | set up arch properly when running tests (mac) (#505) (diff) | |
| download | zen-3a6a5855cf36967c6bde31292669bfaf832c6f0b.tar.xz zen-3a6a5855cf36967c6bde31292669bfaf832c6f0b.zip | |
New GC implementation (#459)
- Feature: New garbage collection implementation, still in evaluation mode. Enabled by `--gc-v2` command line option
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/admin_cmd.cpp | 16 | ||||
| -rw-r--r-- | src/zen/cmds/admin_cmd.h | 2 | ||||
| -rw-r--r-- | src/zencore/workthreadpool.cpp | 20 | ||||
| -rw-r--r-- | src/zenserver/admin/admin.cpp | 10 | ||||
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.cpp | 633 | ||||
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.h | 57 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.cpp | 654 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.h | 4 | ||||
| -rw-r--r-- | src/zenserver/config.cpp | 8 | ||||
| -rw-r--r-- | src/zenserver/config.h | 1 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 258 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 17 | ||||
| -rw-r--r-- | src/zenserver/zenserver.cpp | 3 | ||||
| -rw-r--r-- | src/zenstore/blockstore.cpp | 190 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 217 | ||||
| -rw-r--r-- | src/zenstore/compactcas.h | 7 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 167 | ||||
| -rw-r--r-- | src/zenstore/filecas.h | 7 | ||||
| -rw-r--r-- | src/zenstore/gc.cpp | 326 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/blockstore.h | 58 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/gc.h | 187 |
21 files changed, 2701 insertions, 141 deletions
diff --git a/src/zen/cmds/admin_cmd.cpp b/src/zen/cmds/admin_cmd.cpp index 27341fe58..209390e2a 100644 --- a/src/zen/cmds/admin_cmd.cpp +++ b/src/zen/cmds/admin_cmd.cpp @@ -93,6 +93,10 @@ GcCommand::GcCommand() "Max disk usage size (in bytes)", cxxopts::value(m_DiskSizeSoftLimit)->default_value("0"), "<disksizesoftlimit>"); + m_Options + .add_option("", "", "usegcv1", "Force use of GC version 1", cxxopts::value(m_ForceUseGCV1)->default_value("false"), "<usegcv2>"); + m_Options + .add_option("", "", "usegcv2", "Force use of GC version 2", cxxopts::value(m_ForceUseGCV2)->default_value("false"), "<usegcv2>"); } GcCommand::~GcCommand() @@ -137,6 +141,18 @@ GcCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { Params.Add({"skipdelete", "true"}); } + if (m_ForceUseGCV1) + { + if (m_ForceUseGCV2) + { + throw OptionParseException("only usegcv1 or usegcv2 can be selected, not both"); + } + Params.Add({"forceusegcv1", "true"}); + } + if (m_ForceUseGCV2) + { + Params.Add({"forceusegcv2", "true"}); + } cpr::Session Session; Session.SetHeader(cpr::Header{{"Accept", "application/json"}}); diff --git a/src/zen/cmds/admin_cmd.h b/src/zen/cmds/admin_cmd.h index 9d7a50e09..4a9de89e2 100644 --- a/src/zen/cmds/admin_cmd.h +++ b/src/zen/cmds/admin_cmd.h @@ -41,6 +41,8 @@ private: bool m_SkipDelete{false}; uint64_t m_MaxCacheDuration{0}; uint64_t m_DiskSizeSoftLimit{0}; + bool m_ForceUseGCV1{false}; + bool m_ForceUseGCV2{false}; }; class GcStatusCommand : public ZenCmdBase diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp index cc21e717a..3a4b1e6a1 100644 --- a/src/zencore/workthreadpool.cpp +++ b/src/zencore/workthreadpool.cpp @@ -199,7 +199,10 @@ WorkerThreadPool::WorkerThreadPool(int InThreadCount) : WorkerThreadPool(InThrea WorkerThreadPool::WorkerThreadPool(int InThreadCount, std::string_view WorkerThreadBaseName) { - m_Impl = std::make_unique<Impl>(InThreadCount, WorkerThreadBaseName); + if (InThreadCount > 0) + { + m_Impl = std::make_unique<Impl>(InThreadCount, WorkerThreadBaseName); + } } WorkerThreadPool::~WorkerThreadPool() @@ -210,7 +213,14 @@ WorkerThreadPool::~WorkerThreadPool() void WorkerThreadPool::ScheduleWork(Ref<IWork> Work) { - m_Impl->ScheduleWork(std::move(Work)); + if (m_Impl) + { + m_Impl->ScheduleWork(std::move(Work)); + } + else + { + Work->Execute(); + } } void @@ -222,7 +232,11 @@ WorkerThreadPool::ScheduleWork(std::function<void()>&& Work) [[nodiscard]] size_t WorkerThreadPool::PendingWorkItemCount() const { - return m_Impl->PendingWorkItemCount(); + if (m_Impl) + { + return m_Impl->PendingWorkItemCount(); + } + return 0; } ////////////////////////////////////////////////////////////////////////// diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp index 7c7c729c7..fb376f238 100644 --- a/src/zenserver/admin/admin.cpp +++ b/src/zenserver/admin/admin.cpp @@ -298,6 +298,16 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, GcParams.SkipDelete = Param == "true"sv; } + if (auto Param = Params.GetValue("forceusegcv1"); Param.empty() == false) + { + GcParams.ForceGCVersion = GcVersion::kV1; + } + + if (auto Param = Params.GetValue("forceusegcv2"); Param.empty() == false) + { + GcParams.ForceGCVersion = GcVersion::kV2; + } + const bool Started = m_GcScheduler.TriggerGc(GcParams); CbObjectWriter Response; diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 2efec1e66..38cbf3a93 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -168,8 +168,13 @@ SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) const size_t ZenCacheDiskLayer::CacheBucket::UnknownReferencesIndex; const size_t ZenCacheDiskLayer::CacheBucket::NoReferencesIndex; -ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName, const BucketConfiguration& Config) -: m_BucketName(std::move(BucketName)) +ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, + std::atomic_uint64_t& OuterCacheMemoryUsage, + std::string BucketName, + const BucketConfiguration& Config) +: m_Gc(Gc) +, m_OuterCacheMemoryUsage(OuterCacheMemoryUsage) +, m_BucketName(std::move(BucketName)) , m_Configuration(Config) , m_BucketId(Oid::Zero) { @@ -179,10 +184,12 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName, const Bucket // it makes sense to have a different strategy for legacy values m_Configuration.LargeObjectThreshold = 16 * 1024 * 1024; } + m_Gc.AddGcReferencer(*this); } ZenCacheDiskLayer::CacheBucket::~CacheBucket() { + m_Gc.RemoveGcReferencer(*this); } bool @@ -717,7 +724,7 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentTy } bool -ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue, std::atomic_uint64_t& CacheMemoryUsage) +ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { metrics::RequestStats::Scope StatsScope(m_GetOps, 0); @@ -782,8 +789,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal if (!m_CachedPayloads[UpdateIt->second]) { m_CachedPayloads[UpdateIt->second] = OutValue.Value; - m_MemCachedSize.fetch_add(ValueSize); - CacheMemoryUsage.fetch_add(ValueSize); + AddMemCacheUsage(ValueSize); m_MemoryWriteCount++; } } @@ -834,27 +840,24 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } void -ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, - const ZenCacheValue& Value, - std::span<IoHash> References, - std::atomic_uint64_t& CacheMemoryUsage) +ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) { metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) { - PutStandaloneCacheValue(HashKey, Value, References, CacheMemoryUsage); + PutStandaloneCacheValue(HashKey, Value, References); } else { - PutInlineCacheValue(HashKey, Value, References, CacheMemoryUsage); + PutInlineCacheValue(HashKey, Value, References); } m_DiskWriteCount++; } void -ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime, std::atomic_uint64_t& CacheMemoryUsage) +ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime) { GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); @@ -864,8 +867,7 @@ ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime, std: if (m_AccessTimes[Kv.second] < ExpireTicks) { size_t PayloadSize = m_CachedPayloads[Kv.second].GetSize(); - m_MemCachedSize.fetch_sub(PayloadSize); - CacheMemoryUsage.fetch_sub(PayloadSize); + RemoveMemCacheUsage(PayloadSize); m_CachedPayloads[Kv.second] = {}; } } @@ -900,7 +902,7 @@ ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart, } bool -ZenCacheDiskLayer::CacheBucket::Drop(std::atomic_uint64_t& CacheMemoryUsage) +ZenCacheDiskLayer::CacheBucket::Drop() { ZEN_TRACE_CPU("Z$::Disk::Bucket::Drop"); @@ -926,7 +928,7 @@ ZenCacheDiskLayer::CacheBucket::Drop(std::atomic_uint64_t& CacheMemoryUsage) m_NextReferenceHashesIndexes.clear(); m_ReferenceCount = 0; m_StandaloneSize.store(0); - CacheMemoryUsage.fetch_sub(m_MemCachedSize.load()); + m_OuterCacheMemoryUsage.fetch_sub(m_MemCachedSize.load()); m_MemCachedSize.store(0); return Deleted; @@ -1102,7 +1104,7 @@ ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer) }; void -ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx, std::atomic_uint64_t& CacheMemoryUsage) +ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) { ZEN_TRACE_CPU("Z$::Disk::Bucket::Scrub"); @@ -1292,8 +1294,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx, std::atomic_uint if (m_Configuration.MemCacheSizeThreshold > 0) { size_t CachedSize = m_CachedPayloads[It->second].GetSize(); - m_MemCachedSize.fetch_sub(CachedSize); - CacheMemoryUsage.fetch_sub(CachedSize); + RemoveMemCacheUsage(CachedSize); m_CachedPayloads[It->second] = IoBuffer{}; } @@ -1411,8 +1412,9 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) for (const auto& Entry : Index) { - const IoHash& Key = Entry.first; - GcClock::Tick AccessTime = AccessTimes[Entry.second]; + const IoHash& Key = Entry.first; + size_t PayloadIndex = Entry.second; + GcClock::Tick AccessTime = AccessTimes[PayloadIndex]; if (AccessTime < ExpireTicks) { ExpiredKeys.push_back(Key); @@ -1424,7 +1426,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) continue; } - BucketPayload& Payload = Payloads[Entry.second]; + BucketPayload& Payload = Payloads[PayloadIndex]; const DiskLocation& Loc = Payload.Location; if (!Loc.IsFlagSet(DiskLocation::kStructured)) @@ -1433,7 +1435,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) } if (m_Configuration.EnableReferenceCaching) { - if (FirstReferenceIndex.empty() || (FirstReferenceIndex[Entry.second] == UnknownReferencesIndex)) + if (FirstReferenceIndex.empty() || (FirstReferenceIndex[PayloadIndex] == UnknownReferencesIndex)) { StructuredItemsWithUnknownAttachments.push_back(Entry); continue; @@ -1450,7 +1452,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); #endif // CALCULATE_BLOCKING_TIME - if (auto It = m_Index.find(Entry.first); It != m_Index.end()) + if (auto It = m_Index.find(Key); It != m_Index.end()) { ReferencesAreKnown = GetReferences(IndexLock, m_FirstReferenceIndex[It->second], Cids); } @@ -1470,13 +1472,15 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) for (const auto& Entry : StructuredItemsWithUnknownAttachments) { - BucketPayload& Payload = Payloads[Entry.second]; - const DiskLocation& Loc = Payload.Location; + const IoHash& Key = Entry.first; + size_t PayloadIndex = Entry.second; + BucketPayload& Payload = Payloads[PayloadIndex]; + const DiskLocation& Loc = Payload.Location; { IoBuffer Buffer; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { - if (Buffer = GetStandaloneCacheValue(Loc.GetContentType(), Entry.first); !Buffer) + if (Buffer = GetStandaloneCacheValue(Loc.GetContentType(), Key); !Buffer) { continue; } @@ -1492,7 +1496,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); #endif // CALCULATE_BLOCKING_TIME - if (auto It = m_Index.find(Entry.first); It != m_Index.end()) + if (auto It = m_Index.find(Key); It != m_Index.end()) { if (m_Configuration.MemCacheSizeThreshold > 0) { @@ -1514,8 +1518,8 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) ZEN_ASSERT(Buffer); ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject); - CbObject Obj(SharedBuffer{Buffer}); - size_t CurrentCidCount = Cids.size(); + CbObjectView Obj(Buffer.GetData()); + size_t CurrentCidCount = Cids.size(); Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); if (m_Configuration.EnableReferenceCaching) { @@ -1528,7 +1532,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); #endif // CALCULATE_BLOCKING_TIME - if (auto It = m_Index.find(Entry.first); It != m_Index.end()) + if (auto It = m_Index.find(Key); It != m_Index.end()) { if (m_FirstReferenceIndex[It->second] == UnknownReferencesIndex) { @@ -1556,7 +1560,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) } void -ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx, std::atomic_uint64_t& CacheMemoryUsage) +ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage"); @@ -1762,17 +1766,19 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx, std::atomic_uin TotalChunkCount = 0; for (const auto& Entry : Index) { - const DiskLocation& DiskLocation = Payloads[Entry.second].Location; + size_t EntryIndex = Entry.second; + const DiskLocation& DiskLocation = Payloads[EntryIndex].Location; if (DiskLocation.Flags & DiskLocation::kStandaloneFile) { continue; } + const IoHash& Key = Entry.first; BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment); size_t ChunkIndex = ChunkLocations.size(); ChunkLocations.push_back(Location); - ChunkIndexToChunkHash[ChunkIndex] = Entry.first; - if (ExpiredCacheKeys.contains(Entry.first)) + ChunkIndexToChunkHash[ChunkIndex] = Key; + if (ExpiredCacheKeys.contains(Key)) { continue; } @@ -1815,12 +1821,12 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx, std::atomic_uin }); for (const auto& Entry : MovedChunks) { - size_t ChunkIndex = Entry.first; - const BlockStoreLocation& NewLocation = Entry.second; - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - size_t PayloadIndex = m_Index[ChunkHash]; - BucketPayload& Payload = m_Payloads[PayloadIndex]; - if (Payloads[Index[ChunkHash]].Location != m_Payloads[PayloadIndex].Location) + size_t ChunkIndex = Entry.first; + const BlockStoreLocation& NewLocation = Entry.second; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + size_t EntryIndex = m_Index[ChunkHash]; + BucketPayload& Payload = m_Payloads[EntryIndex]; + if (Payloads[Index[ChunkHash]].Location != m_Payloads[EntryIndex].Location) { // Entry has been updated while GC was running, ignore the move continue; @@ -1830,9 +1836,9 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx, std::atomic_uin } for (const size_t ChunkIndex : RemovedChunks) { - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - size_t PayloadIndex = m_Index[ChunkHash]; - const BucketPayload& Payload = m_Payloads[PayloadIndex]; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + size_t EntryIndex = m_Index[ChunkHash]; + const BucketPayload& Payload = m_Payloads[EntryIndex]; if (Payloads[Index[ChunkHash]].Location != Payload.Location) { // Entry has been updated while GC was running, ignore the delete @@ -1843,12 +1849,11 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx, std::atomic_uin .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment), m_Configuration.PayloadAlignment, OldDiskLocation.GetFlags() | DiskLocation::kTombStone)}); - if (m_Configuration.MemCacheSizeThreshold > 0 && m_CachedPayloads[PayloadIndex]) + if (m_Configuration.MemCacheSizeThreshold > 0 && m_CachedPayloads[EntryIndex]) { - uint64_t CachePayloadSize = m_CachedPayloads[PayloadIndex].Size(); - m_MemCachedSize.fetch_sub(CachePayloadSize, std::memory_order::relaxed); - CacheMemoryUsage.fetch_sub(CachePayloadSize, std::memory_order::relaxed); - m_CachedPayloads[PayloadIndex] = IoBuffer{}; + uint64_t CachePayloadSize = m_CachedPayloads[EntryIndex].Size(); + RemoveMemCacheUsage(CachePayloadSize); + m_CachedPayloads[EntryIndex] = IoBuffer{}; } m_Index.erase(ChunkHash); DeletedChunks.insert(ChunkHash); @@ -1891,10 +1896,10 @@ ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, size_t Index) const BucketPayload& Payload = m_Payloads[Index]; if (Payload.Location.IsFlagSet(DiskLocation::kStructured)) { - IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile) - ? GetStandaloneCacheValue(Payload.Location.GetContentType(), Key) - : GetInlineCacheValue(Payload.Location); - CbObject Obj(SharedBuffer{Value}); + IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile) + ? GetStandaloneCacheValue(Payload.Location.GetContentType(), Key) + : GetInlineCacheValue(Payload.Location); + CbObjectView Obj(Value.GetData()); Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); }); } return CacheValueDetails::ValueDetails{.Size = Payload.Location.Size(), @@ -1958,16 +1963,13 @@ ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) } for (CacheBucket* Bucket : Buckets) { - Bucket->CollectGarbage(GcCtx, m_TotalMemCachedSize); + Bucket->CollectGarbage(GcCtx); } MemCacheTrim(Buckets, GcCtx.CacheExpireTime()); } void -ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, - const ZenCacheValue& Value, - std::span<IoHash> References, - std::atomic_uint64_t& CacheMemoryUsage) +ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) { ZEN_TRACE_CPU("Z$::Disk::Bucket::PutStandaloneCacheValue"); @@ -2118,8 +2120,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey if (m_CachedPayloads[EntryIndex]) { uint64_t CachePayloadSize = m_CachedPayloads[EntryIndex].Size(); - m_MemCachedSize.fetch_sub(CachePayloadSize, std::memory_order::relaxed); - CacheMemoryUsage.fetch_sub(CachePayloadSize, std::memory_order::relaxed); + RemoveMemCacheUsage(CachePayloadSize); m_CachedPayloads[EntryIndex] = IoBuffer{}; } } @@ -2131,10 +2132,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey } void -ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, - const ZenCacheValue& Value, - std::span<IoHash> References, - std::atomic_uint64_t& CacheMemoryUsage) +ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) { ZEN_TRACE_CPU("Z$::Disk::Bucket::PutInlineCacheValue"); @@ -2176,14 +2174,12 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, if (m_CachedPayloads[EntryIndex]) { uint64_t OldCachedSize = m_CachedPayloads[EntryIndex].GetSize(); - m_MemCachedSize.fetch_sub(OldCachedSize); - CacheMemoryUsage.fetch_sub(OldCachedSize); + RemoveMemCacheUsage(OldCachedSize); } if (MemCacheBuffer) { - m_MemCachedSize.fetch_add(PayloadSize); - CacheMemoryUsage.fetch_add(PayloadSize); + AddMemCacheUsage(PayloadSize); m_MemoryWriteCount++; } m_CachedPayloads[EntryIndex] = std::move(MemCacheBuffer); @@ -2202,8 +2198,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, { if (MemCacheBuffer) { - m_MemCachedSize.fetch_add(PayloadSize); - CacheMemoryUsage.fetch_add(PayloadSize); + AddMemCacheUsage(PayloadSize); m_MemoryWriteCount++; } m_CachedPayloads.emplace_back(std::move(MemCacheBuffer)); @@ -2219,6 +2214,409 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, } void +ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx) +{ + size_t TotalEntries = 0; + tsl::robin_set<IoHash, IoHash::Hasher> ExpiredInlineKeys; + std::vector<std::pair<IoHash, uint64_t>> ExpiredStandaloneKeys; + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_DEBUG("gc cache bucket '{}': removed {} expired keys out of {} in {}", + m_BucketDir, + ExpiredStandaloneKeys.size() + ExpiredInlineKeys.size(), + TotalEntries, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + const GcClock::Tick ExpireTicks = Ctx.Settings.CacheExpireTime.time_since_epoch().count(); + + BlockStoreCompactState BlockCompactState; + BlockStore::ReclaimSnapshotState BlockSnapshotState; + std::vector<IoHash> BlockCompactStateKeys; + std::vector<DiskIndexEntry> ExpiredEntries; + uint64_t RemovedStandaloneSize = 0; + { + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + if (Ctx.Settings.CollectSmallObjects) + { + BlockSnapshotState = m_BlockStore.GetReclaimSnapshotState(); + } + TotalEntries = m_Index.size(); + + // Find out expired keys and affected blocks + for (const auto& Entry : m_Index) + { + const IoHash& Key = Entry.first; + size_t EntryIndex = Entry.second; + GcClock::Tick AccessTime = m_AccessTimes[EntryIndex]; + if (AccessTime >= ExpireTicks) + { + continue; + } + + const BucketPayload& Payload = m_Payloads[EntryIndex]; + DiskIndexEntry ExpiredEntry = {.Key = Key, .Location = Payload.Location}; + ExpiredEntry.Location.Flags |= DiskLocation::kTombStone; + + if (Payload.Location.Flags & DiskLocation::kStandaloneFile) + { + ExpiredStandaloneKeys.push_back({Key, Payload.Location.Size()}); + RemovedStandaloneSize += Payload.Location.Size(); + ExpiredEntries.push_back(ExpiredEntry); + } + else if (Ctx.Settings.CollectSmallObjects) + { + ExpiredInlineKeys.insert(Key); + uint32_t BlockIndex = Payload.Location.Location.BlockLocation.GetBlockIndex(); + bool IsActiveWriteBlock = BlockSnapshotState.m_ActiveWriteBlocks.contains(BlockIndex); + if (!IsActiveWriteBlock) + { + BlockCompactState.AddBlock(BlockIndex); + } + ExpiredEntries.push_back(ExpiredEntry); + } + } + + Ctx.ExpiredItems.fetch_add(ExpiredStandaloneKeys.size() + ExpiredInlineKeys.size()); + + // Get all locations we need to keep for affected blocks + if (Ctx.Settings.CollectSmallObjects && !ExpiredInlineKeys.empty()) + { + for (const auto& Entry : m_Index) + { + const IoHash& Key = Entry.first; + if (ExpiredInlineKeys.contains(Key)) + { + continue; + } + size_t EntryIndex = Entry.second; + const BucketPayload& Payload = m_Payloads[EntryIndex]; + if (Payload.Location.Flags & DiskLocation::kStandaloneFile) + { + continue; + } + if (BlockCompactState.AddKeepLocation(Payload.Location.GetBlockLocation(m_Configuration.PayloadAlignment))) + { + BlockCompactStateKeys.push_back(Key); + } + } + } + + if (Ctx.Settings.IsDeleteMode) + { + for (const DiskIndexEntry& Entry : ExpiredEntries) + { + auto It = m_Index.find(Entry.Key); + ZEN_ASSERT(It != m_Index.end()); + if (m_Configuration.MemCacheSizeThreshold > 0 && m_CachedPayloads[It->second]) + { + size_t PayloadSize = m_CachedPayloads[It->second].GetSize(); + Ctx.RemovedMemory.fetch_add(PayloadSize); + RemoveMemCacheUsage(PayloadSize); + } + m_Index.erase(It); + } + m_SlogFile.Append(ExpiredEntries); + m_StandaloneSize.fetch_sub(RemovedStandaloneSize, std::memory_order::relaxed); + } + } + Ctx.Items.fetch_add(TotalEntries); + + if (ExpiredEntries.empty()) + { + return; + } + + if (!Ctx.Settings.IsDeleteMode) + { + return; + } + + Ctx.DeletedItems.fetch_add(ExpiredEntries.size()); + + // Compact standalone items + ExtendablePathBuilder<256> Path; + for (const std::pair<IoHash, uint64_t>& ExpiredKey : ExpiredStandaloneKeys) + { + Path.Reset(); + BuildPath(Path, ExpiredKey.first); + fs::path FilePath = Path.ToPath(); + + RwLock::SharedLockScope IndexLock(m_IndexLock); + if (m_Index.contains(ExpiredKey.first)) + { + // Someone added it back, let the file on disk be + ZEN_DEBUG("gc cache bucket '{}': skipping z$ delete standalone of file '{}' FAILED, it has been added back", + m_BucketDir, + Path.ToUtf8()); + continue; + } + + RwLock::ExclusiveLockScope ValueLock(LockForHash(ExpiredKey.first)); + IndexLock.ReleaseNow(); + ZEN_DEBUG("gc cache bucket '{}': deleting standalone cache file '{}'", m_BucketDir, Path.ToUtf8()); + + std::error_code Ec; + if (!fs::remove(FilePath, Ec)) + { + continue; + } + if (Ec) + { + ZEN_WARN("gc cache bucket '{}': delete expired z$ standalone file '{}' FAILED, reason: '{}'", + m_BucketDir, + Path.ToUtf8(), + Ec.message()); + continue; + } + Ctx.RemovedDiskSpace.fetch_add(ExpiredKey.second); + } + + if (Ctx.Settings.CollectSmallObjects && !ExpiredInlineKeys.empty()) + { + // Compact block store + m_BlockStore.CompactBlocks( + BlockCompactState, + m_Configuration.PayloadAlignment, + [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) { + std::vector<DiskIndexEntry> MovedEntries; + RwLock::ExclusiveLockScope _(m_IndexLock); + for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray) + { + size_t ChunkIndex = Moved.first; + const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; + + if (auto It = m_Index.find(Key); It != m_Index.end()) + { + BucketPayload& Payload = m_Payloads[It->second]; + const BlockStoreLocation& OldLocation = BlockCompactState.GetLocation(ChunkIndex); + if (Payload.Location.GetBlockLocation(m_Configuration.PayloadAlignment) != OldLocation) + { + // Someone has moved our chunk so lets just skip the new location we were provided, it will be GC:d at a later + // time + continue; + } + + const BlockStoreLocation& NewLocation = Moved.second; + + Payload.Location = DiskLocation(NewLocation, m_Configuration.PayloadAlignment, Payload.Location.GetFlags()); + MovedEntries.push_back({.Key = Key, .Location = Payload.Location}); + } + } + m_SlogFile.Append(MovedEntries); + Ctx.RemovedDiskSpace.fetch_add(FreedDiskSpace); + }, + [&]() { return 0; }); + } + + std::vector<BucketPayload> Payloads; + std::vector<AccessTime> AccessTimes; + std::vector<IoBuffer> CachedPayloads; + std::vector<size_t> FirstReferenceIndex; + IndexMap Index; + { + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + CompactState(Payloads, AccessTimes, CachedPayloads, FirstReferenceIndex, Index, IndexLock); + } +} + +class DiskBucketReferenceChecker : public GcReferenceChecker +{ +public: + DiskBucketReferenceChecker(ZenCacheDiskLayer::CacheBucket& Owner) : m_CacheBucket(Owner) {} + + virtual ~DiskBucketReferenceChecker() + { + m_IndexLock.reset(); + if (!m_CacheBucket.m_Configuration.EnableReferenceCaching) + { + // If reference caching is not enabled, we temporarily used the data structure for reference caching, lets reset it + m_CacheBucket.ClearReferenceCache(); + } + } + + virtual void LockState(GcCtx&) override + { + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_DEBUG("gc cache bucket '{}': found {} references in {}", + m_CacheBucket.m_BucketDir, + m_CacheBucket.m_ReferenceCount + m_UncachedReferences.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + m_IndexLock = std::make_unique<RwLock::SharedLockScope>(m_CacheBucket.m_IndexLock); + + // Rescan to see if any cache items needs refreshing since last pass when we had the lock + for (const auto& Entry : m_CacheBucket.m_Index) + { + size_t PayloadIndex = Entry.second; + const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_CacheBucket.m_Payloads[PayloadIndex]; + const DiskLocation& Loc = Payload.Location; + + if (!Loc.IsFlagSet(DiskLocation::kStructured)) + { + continue; + } + ZEN_ASSERT(!m_CacheBucket.m_FirstReferenceIndex.empty()); + const IoHash& Key = Entry.first; + if (m_CacheBucket.m_FirstReferenceIndex[PayloadIndex] == ZenCacheDiskLayer::CacheBucket::UnknownReferencesIndex) + { + IoBuffer Buffer; + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + Buffer = m_CacheBucket.GetStandaloneCacheValue(Loc.GetContentType(), Key); + } + else + { + Buffer = m_CacheBucket.GetInlineCacheValue(Loc); + } + + if (Buffer) + { + ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject); + CbObjectView Obj(Buffer.GetData()); + Obj.IterateAttachments([this](CbFieldView Field) { m_UncachedReferences.insert(Field.AsAttachment()); }); + } + } + } + } + + virtual void RemoveUsedReferencesFromSet(GcCtx&, HashSet& IoCids) override + { + ZEN_ASSERT(m_IndexLock); + + for (const IoHash& ReferenceHash : m_CacheBucket.m_ReferenceHashes) + { + IoCids.erase(ReferenceHash); + } + + for (const IoHash& ReferenceHash : m_UncachedReferences) + { + IoCids.erase(ReferenceHash); + } + } + ZenCacheDiskLayer::CacheBucket& m_CacheBucket; + std::unique_ptr<RwLock::SharedLockScope> m_IndexLock; + HashSet m_UncachedReferences; +}; + +std::vector<GcReferenceChecker*> +ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx&) +{ + Stopwatch Timer; + const auto _ = MakeGuard( + [&] { ZEN_DEBUG("gc cache bucket '{}': refreshed reference cache in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + + std::vector<IoHash> UpdateKeys; + std::vector<IoHash> StandaloneKeys; + std::vector<size_t> ReferenceCounts; + std::vector<IoHash> References; + + // Refresh cache + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + for (const auto& Entry : m_Index) + { + size_t PayloadIndex = Entry.second; + const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Payloads[PayloadIndex]; + const DiskLocation& Loc = Payload.Location; + + if (!Loc.IsFlagSet(DiskLocation::kStructured)) + { + continue; + } + if (m_Configuration.EnableReferenceCaching && + m_FirstReferenceIndex[PayloadIndex] != ZenCacheDiskLayer::CacheBucket::UnknownReferencesIndex) + { + continue; + } + const IoHash& Key = Entry.first; + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + StandaloneKeys.push_back(Key); + continue; + } + IoBuffer Buffer = GetInlineCacheValue(Loc); + if (!Buffer) + { + UpdateKeys.push_back(Key); + ReferenceCounts.push_back(0); + continue; + } + size_t CurrentReferenceCount = References.size(); + { + CbObjectView Obj(Buffer.GetData()); + Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); }); + Buffer = {}; + } + UpdateKeys.push_back(Key); + ReferenceCounts.push_back(References.size() - CurrentReferenceCount); + } + } + { + for (const IoHash& Key : StandaloneKeys) + { + IoBuffer Buffer = GetStandaloneCacheValue(ZenContentType::kCbObject, Key); + if (!Buffer) + { + continue; + } + + size_t CurrentReferenceCount = References.size(); + { + CbObjectView Obj(Buffer.GetData()); + Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); }); + Buffer = {}; + } + UpdateKeys.push_back(Key); + ReferenceCounts.push_back(References.size() - CurrentReferenceCount); + } + } + + { + size_t ReferenceOffset = 0; + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + if (!m_Configuration.EnableReferenceCaching) + { + ZEN_ASSERT(m_FirstReferenceIndex.empty()); + ZEN_ASSERT(m_ReferenceHashes.empty()); + ZEN_ASSERT(m_NextReferenceHashesIndexes.empty()); + ZEN_ASSERT(m_ReferenceCount == 0); + // If reference caching is not enabled, we will resize and use the data structure in place for reference caching when + // we figure out what this bucket references. This will be reset once the DiskBucketReferenceChecker is deleted. + m_FirstReferenceIndex.resize(m_Payloads.size(), UnknownReferencesIndex); + } + for (size_t Index = 0; Index < UpdateKeys.size(); Index++) + { + const IoHash& Key = UpdateKeys[Index]; + size_t ReferenceCount = ReferenceCounts[Index]; + auto It = m_Index.find(Key); + if (It == m_Index.end()) + { + ReferenceOffset += ReferenceCount; + continue; + } + if (m_FirstReferenceIndex[It->second] != ZenCacheDiskLayer::CacheBucket::UnknownReferencesIndex) + { + continue; + } + SetReferences(IndexLock, + m_FirstReferenceIndex[It->second], + std::span<IoHash>{References.data() + ReferenceOffset, ReferenceCount}); + ReferenceOffset += ReferenceCount; + } + if (m_Configuration.EnableReferenceCaching) + { + CompactReferences(IndexLock); + } + } + + return {new DiskBucketReferenceChecker(*this)}; +} + +void ZenCacheDiskLayer::CacheBucket::CompactReferences(RwLock::ExclusiveLockScope&) { std::vector<size_t> FirstReferenceIndex; @@ -2381,6 +2779,19 @@ ZenCacheDiskLayer::CacheBucket::LockedGetReferences(std::size_t FirstReferenceIn } void +ZenCacheDiskLayer::CacheBucket::ClearReferenceCache() +{ + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + m_FirstReferenceIndex.clear(); + m_FirstReferenceIndex.shrink_to_fit(); + m_ReferenceHashes.clear(); + m_ReferenceHashes.shrink_to_fit(); + m_NextReferenceHashesIndexes.clear(); + m_NextReferenceHashesIndexes.shrink_to_fit(); + m_ReferenceCount = 0; +} + +void ZenCacheDiskLayer::CacheBucket::CompactState(std::vector<BucketPayload>& Payloads, std::vector<AccessTime>& AccessTimes, std::vector<IoBuffer>& CachedPayloads, @@ -2426,16 +2837,34 @@ ZenCacheDiskLayer::CacheBucket::CompactState(std::vector<BucketPayload>& Payload } } +#if ZEN_WITH_TESTS +void +ZenCacheDiskLayer::CacheBucket::SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time) +{ + GcClock::Tick TimeTick = Time.time_since_epoch().count(); + RwLock::SharedLockScope IndexLock(m_IndexLock); + if (auto It = m_Index.find(HashKey); It != m_Index.end()) + { + size_t EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); + m_AccessTimes[EntryIndex] = TimeTick; + } +} +#endif // ZEN_WITH_TESTS + ////////////////////////////////////////////////////////////////////////// -ZenCacheDiskLayer::ZenCacheDiskLayer(JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config) -: m_JobQueue(JobQueue) +ZenCacheDiskLayer::ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config) +: m_Gc(Gc) +, m_JobQueue(JobQueue) , m_RootDir(RootDir) , m_Configuration(Config) { } -ZenCacheDiskLayer::~ZenCacheDiskLayer() = default; +ZenCacheDiskLayer::~ZenCacheDiskLayer() +{ +} bool ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) @@ -2468,8 +2897,10 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach } else { - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_Configuration.BucketConfig)); - Bucket = InsertResult.first->second.get(); + auto InsertResult = + m_Buckets.emplace(BucketName, + std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig)); + Bucket = InsertResult.first->second.get(); std::filesystem::path BucketPath = m_RootDir; BucketPath /= BucketName; @@ -2483,7 +2914,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach } ZEN_ASSERT(Bucket != nullptr); - if (Bucket->Get(HashKey, OutValue, m_TotalMemCachedSize)) + if (Bucket->Get(HashKey, OutValue)) { TryMemCacheTrim(); return true; @@ -2522,8 +2953,10 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z } else { - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_Configuration.BucketConfig)); - Bucket = InsertResult.first->second.get(); + auto InsertResult = + m_Buckets.emplace(BucketName, + std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig)); + Bucket = InsertResult.first->second.get(); std::filesystem::path BucketPath = m_RootDir; BucketPath /= BucketName; @@ -2547,7 +2980,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z ZEN_ASSERT(Bucket != nullptr); - Bucket->Put(HashKey, Value, References, m_TotalMemCachedSize); + Bucket->Put(HashKey, Value, References); TryMemCacheTrim(); } @@ -2579,8 +3012,10 @@ ZenCacheDiskLayer::DiscoverBuckets() continue; } - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_Configuration.BucketConfig)); - CacheBucket& Bucket = *InsertResult.first->second; + auto InsertResult = + m_Buckets.emplace(BucketName, + std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig)); + CacheBucket& Bucket = *InsertResult.first->second; try { @@ -2636,7 +3071,7 @@ ZenCacheDiskLayer::DropBucket(std::string_view InBucket) m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It); - return Bucket.Drop(m_TotalMemCachedSize); + return Bucket.Drop(); } // Make sure we remove the folder even if we don't know about the bucket @@ -2658,7 +3093,7 @@ ZenCacheDiskLayer::Drop() CacheBucket& Bucket = *It->second; m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It->first); - if (!Bucket.Drop(m_TotalMemCachedSize)) + if (!Bucket.Drop()) { return false; } @@ -2700,10 +3135,10 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) { #if 1 Results.push_back(Ctx.ThreadPool().EnqueueTask( - std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx, m_TotalMemCachedSize); }})); + std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }})); #else CacheBucket& Bucket = *Kv.second; - Bucket.ScrubStorage(Ctx, m_TotalMemCachedSize); + Bucket.ScrubStorage(Ctx); #endif } @@ -2914,7 +3349,7 @@ ZenCacheDiskLayer::MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::Tim RwLock::SharedLockScope __(m_Lock); for (CacheBucket* Bucket : Buckets) { - Bucket->MemCacheTrim(ExpireTime, m_TotalMemCachedSize); + Bucket->MemCacheTrim(ExpireTime); } const GcClock::TimePoint Now = GcClock::Now(); const GcClock::Tick NowTick = Now.time_since_epoch().count(); @@ -2924,4 +3359,30 @@ ZenCacheDiskLayer::MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::Tim m_LastTickMemCacheTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick); } +#if ZEN_WITH_TESTS +void +ZenCacheDiskLayer::SetAccessTime(std::string_view InBucket, const IoHash& HashKey, GcClock::TimePoint Time) +{ + const auto BucketName = std::string(InBucket); + CacheBucket* Bucket = nullptr; + + { + RwLock::SharedLockScope _(m_Lock); + + auto It = m_Buckets.find(BucketName); + + if (It != m_Buckets.end()) + { + Bucket = It->second.get(); + } + } + + if (Bucket == nullptr) + { + return; + } + Bucket->SetAccessTime(HashKey, Time); +} +#endif // ZEN_WITH_TESTS + } // namespace zen diff --git a/src/zenserver/cache/cachedisklayer.h b/src/zenserver/cache/cachedisklayer.h index cc6653e28..d8f51c398 100644 --- a/src/zenserver/cache/cachedisklayer.h +++ b/src/zenserver/cache/cachedisklayer.h @@ -151,7 +151,7 @@ public: uint64_t MemorySize; }; - explicit ZenCacheDiskLayer(JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config); + explicit ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config); ~ZenCacheDiskLayer(); bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); @@ -174,24 +174,28 @@ public: CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const; +#if ZEN_WITH_TESTS + void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time); +#endif // ZEN_WITH_TESTS + private: /** A cache bucket manages a single directory containing metadata and data for that bucket */ - struct CacheBucket + struct CacheBucket : public GcReferencer { - CacheBucket(std::string BucketName, const BucketConfiguration& Config); + CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, std::string BucketName, const BucketConfiguration& Config); ~CacheBucket(); bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); - bool Get(const IoHash& HashKey, ZenCacheValue& OutValue, std::atomic_uint64_t& CacheMemoryUsage); - void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, std::atomic_uint64_t& CacheMemoryUsage); - void MemCacheTrim(GcClock::TimePoint ExpireTime, std::atomic_uint64_t& CacheMemoryUsage); - bool Drop(std::atomic_uint64_t& CacheMemoryUsage); + bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); + void MemCacheTrim(GcClock::TimePoint ExpireTime); + bool Drop(); void Flush(); - void ScrubStorage(ScrubContext& Ctx, std::atomic_uint64_t& CacheMemoryUsage); + void ScrubStorage(ScrubContext& Ctx); void GatherReferences(GcContext& GcCtx); - void CollectGarbage(GcContext& GcCtx, std::atomic_uint64_t& CacheMemoryUsage); + void CollectGarbage(GcContext& GcCtx); inline GcStorageSize StorageSize() const { @@ -205,8 +209,13 @@ private: void EnumerateBucketContents(std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const; void GetUsageByAccess(GcClock::TimePoint TickStart, GcClock::Duration SectionLength, std::vector<uint64_t>& InOutUsageSlots); +#if ZEN_WITH_TESTS + void SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time); +#endif // ZEN_WITH_TESTS private: + GcManager& m_Gc; + std::atomic_uint64_t& m_OuterCacheMemoryUsage; std::string m_BucketName; std::filesystem::path m_BucketDir; std::filesystem::path m_BlocksBasePath; @@ -258,16 +267,13 @@ private: std::atomic_uint64_t m_StandaloneSize{}; std::atomic_uint64_t m_MemCachedSize{}; + virtual void RemoveExpiredData(GcCtx& Ctx) override; + virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override; + void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const; - void PutStandaloneCacheValue(const IoHash& HashKey, - const ZenCacheValue& Value, - std::span<IoHash> References, - std::atomic_uint64_t& CacheMemoryUsage); + void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); IoBuffer GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const; - void PutInlineCacheValue(const IoHash& HashKey, - const ZenCacheValue& Value, - std::span<IoHash> References, - std::atomic_uint64_t& CacheMemoryUsage); + void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const; void MakeIndexSnapshot(); uint64_t ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion); @@ -289,6 +295,7 @@ private: } size_t AllocateReferenceEntry(RwLock::ExclusiveLockScope&, const IoHash& Key); bool LockedGetReferences(std::size_t FirstReferenceIndex, std::vector<IoHash>& OutReferences) const; + void ClearReferenceCache(); void CompactState(std::vector<BucketPayload>& TmpPayloads, std::vector<AccessTime>& TmpAccessTimes, @@ -297,6 +304,17 @@ private: IndexMap& TmpIndex, RwLock::ExclusiveLockScope& IndexLock); + void AddMemCacheUsage(uint64_t ValueSize) + { + m_MemCachedSize.fetch_add(ValueSize, std::memory_order::relaxed); + m_OuterCacheMemoryUsage.fetch_add(ValueSize, std::memory_order::relaxed); + } + void RemoveMemCacheUsage(uint64_t ValueSize) + { + m_MemCachedSize.fetch_sub(ValueSize, std::memory_order::relaxed); + m_OuterCacheMemoryUsage.fetch_sub(ValueSize, std::memory_order::relaxed); + } + // These locks are here to avoid contention on file creation, therefore it's sufficient // that we take the same lock for the same hash // @@ -305,6 +323,8 @@ private: // an issue in practice mutable RwLock m_ShardedLocks[256]; inline RwLock& LockForHash(const IoHash& Hash) const { return m_ShardedLocks[Hash.Hash[19]]; } + + friend class DiskBucketReferenceChecker; }; inline void TryMemCacheTrim() @@ -326,6 +346,7 @@ private: void MemCacheTrim(); void MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime); + GcManager& m_Gc; JobQueue& m_JobQueue; std::filesystem::path m_RootDir; Configuration m_Configuration; @@ -338,6 +359,8 @@ private: ZenCacheDiskLayer(const ZenCacheDiskLayer&) = delete; ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete; + + friend class DiskBucketReferenceChecker; }; } // namespace zen diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp index 6fab14eee..516532528 100644 --- a/src/zenserver/cache/structuredcachestore.cpp +++ b/src/zenserver/cache/structuredcachestore.cpp @@ -64,7 +64,7 @@ ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const st , m_JobQueue(JobQueue) , m_RootDir(RootDir) , m_Configuration(Config) -, m_DiskLayer(m_JobQueue, m_RootDir, m_Configuration.DiskLayerConfig) +, m_DiskLayer(m_Gc, m_JobQueue, m_RootDir, m_Configuration.DiskLayerConfig) { ZEN_INFO("initializing structured cache at '{}'", m_RootDir); CreateDirectories(m_RootDir); @@ -232,6 +232,14 @@ ZenCacheNamespace::GetValueDetails(const std::string_view BucketFilter, const st return m_DiskLayer.GetValueDetails(BucketFilter, ValueFilter); } +#if ZEN_WITH_TESTS +void +ZenCacheNamespace::SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time) +{ + m_DiskLayer.SetAccessTime(Bucket, HashKey, Time); +} +#endif // ZEN_WITH_TESTS + //////////////////////////// ZenCacheStore ZEN_DEFINE_LOG_CATEGORY_STATIC(LogCacheActivity, "z$"); @@ -784,6 +792,73 @@ namespace testutils { return Buf; }; + IoHash ToIoHash(const Oid& Id) + { + char OIdString[24 + 1]; + Id.ToString(OIdString); + IoHash Key = IoHash::HashBuffer(OIdString, 24); + return Key; + } + + std::pair<Oid, IoBuffer> CreateBinaryBlob(size_t Size) + { + uint64_t seed{Size}; + auto next = [](uint64_t& seed) { + uint64_t z = (seed += UINT64_C(0x9E3779B97F4A7C15)); + z = (z ^ (z >> 30)) * UINT64_C(0xBF58476D1CE4E5B9); + z = (z ^ (z >> 27)) * UINT64_C(0x94D049BB133111EB); + return z ^ (z >> 31); + }; + + IoBuffer Data(Size); + uint64_t* DataPtr = reinterpret_cast<uint64_t*>(Data.MutableData()); + while (Size > sizeof(uint64_t)) + { + *DataPtr++ = next(seed); + Size -= sizeof(uint64_t); + } + uint64_t ByteNext = next(seed); + uint8_t* ByteDataPtr = reinterpret_cast<uint8_t*>(DataPtr); + while (Size > 0) + { + *ByteDataPtr++ = static_cast<uint8_t>(ByteNext & 0xff); + ByteNext >>= 8; + Size--; + } + return {Oid::NewOid(), Data}; + } + + std::vector<std::pair<Oid, CompressedBuffer>> CreateCompressedAttachment(CidStore& Store, const std::span<const size_t>& Sizes) + { + std::vector<std::pair<Oid, CompressedBuffer>> Result; + Result.reserve(Sizes.size()); + for (size_t Size : Sizes) + { + auto Blob = CreateBinaryBlob(Size); + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Blob.second.Data(), Blob.second.Size())); + CHECK(!Store.ContainsChunk(Compressed.DecodeRawHash())); + Result.emplace_back(std::pair<Oid, CompressedBuffer>(Blob.first, Compressed)); + } + return Result; + } + + std::pair<IoHash, IoBuffer> CreateRecord(std::span<std::pair<Oid, CompressedBuffer>> Attachments) + { + Oid Id = Oid::NewOid(); + IoHash Key = ToIoHash(Id); + CbObjectWriter Record; + Record << "Key"sv << Id; + + for (size_t Idx = 0; auto& Cid : Attachments) + { + Record.AddBinaryAttachment(fmt::format("attachment-{}", Idx++), Cid.second.DecodeRawHash()); + } + + IoBuffer Buffer = Record.Save().GetBuffer().AsIoBuffer(); + Buffer.SetContentType(ZenContentType::kCbObject); + return {Key, Buffer}; + } + } // namespace testutils TEST_CASE("z$.store") @@ -1752,6 +1827,583 @@ TEST_CASE("z$.scrub") CHECK(ScrubCtx.BadCids().GetSize() == 0); } +TEST_CASE("z$.newgc.basics") +{ + using namespace testutils; + + ScopedTemporaryDirectory TempDir; + + auto JobQueue = MakeJobQueue(1, "testqueue"); + + struct CacheEntry + { + IoBuffer Data; + std::vector<std::pair<Oid, CompressedBuffer>> Attachments; + }; + + std::unordered_map<IoHash, CacheEntry> CacheEntries; + + auto CreateCacheRecord = + [&](ZenCacheNamespace& Zcs, CidStore& CidStore, std::string_view Bucket, std::span<std::pair<Oid, CompressedBuffer>> Attachments) { + std::vector<IoHash> AttachmentKeys; + for (const auto& Attachment : Attachments) + { + AttachmentKeys.push_back(Attachment.second.DecodeRawHash()); + } + auto Record = CreateRecord(Attachments); + Zcs.Put(Bucket, + Record.first, + {.Value = Record.second, + .RawSize = Record.second.GetSize(), + .RawHash = IoHash::HashBuffer(Record.second.GetData(), Record.second.GetSize())}, + AttachmentKeys); + for (const auto& Attachment : Attachments) + { + CidStore.AddChunk(Attachment.second.GetCompressed().Flatten().AsIoBuffer(), Attachment.second.DecodeRawHash()); + } + CacheEntries.insert({Record.first, CacheEntry{.Data = Record.second, .Attachments = {Attachments.begin(), Attachments.end()}}}); + return Record.first; + }; + auto CreateCacheValue = [&](ZenCacheNamespace& Zcs, std::string_view Bucket, size_t Size) { + std::pair<Oid, IoBuffer> CacheValue = CreateBinaryBlob(Size); + IoHash Key = ToIoHash(CacheValue.first); + Zcs.Put(Bucket, + Key, + {.Value = CacheValue.second, + .RawSize = CacheValue.second.GetSize(), + .RawHash = IoHash::HashBuffer(CacheValue.second.GetData(), CacheValue.second.GetSize())}, + {}); + CacheEntries.insert({Key, CacheEntry{CacheValue.second, {}}}); + return Key; + }; + + auto ValidateCacheEntry = [&](ZenCacheNamespace& Zcs, + CidStore& CidStore, + std::string_view Bucket, + const IoHash& Key, + bool ExpectCacheEntry, + bool ExpectAttachments) { + const CacheEntry& Entry = CacheEntries[Key]; + ZenCacheValue Value; + bool CacheExists = Zcs.Get(Bucket, Key, Value); + if (ExpectCacheEntry) + { + if (!CacheExists) + { + return false; + } + if (Value.Value.GetSize() != Entry.Data.GetSize()) + { + return false; + } + if (!Value.Value.GetView().EqualBytes(Entry.Data.GetView())) + { + return false; + } + } + else if (CacheExists) + { + return false; + } + + if (ExpectAttachments) + { + for (const auto& Attachment : Entry.Attachments) + { + IoHash AttachmentHash = Attachment.second.DecodeRawHash(); + IoBuffer StoredData = CidStore.FindChunkByCid(AttachmentHash); + if (!StoredData) + { + return false; + } + if (!StoredData.GetView().EqualBytes(Attachment.second.GetCompressed().Flatten().GetView())) + { + return false; + } + } + } + else + { + for (const auto& Attachment : Entry.Attachments) + { + IoHash AttachmentHash = Attachment.second.DecodeRawHash(); + if (CidStore.ContainsChunk(AttachmentHash)) + { + return false; + } + } + } + return true; + }; + + std::vector<IoHash> CacheRecords; + std::vector<IoHash> UnstructuredCacheValues; + + const auto TearDrinkerBucket = "teardrinker"sv; + { + GcManager Gc; + CidStore CidStore(Gc); + CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"}); + ZenCacheNamespace Zcs(Gc, + *JobQueue, + TempDir.Path() / "cache", + {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}}); + + // Create some basic data + { + // Structured record with attachments + auto Attachments1 = CreateCompressedAttachment(CidStore, std::vector<size_t>{77, 1024 * 1024 * 2, 99, 1024 * 1024 * 2 + 87}); + CacheRecords.emplace_back(CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments1)); + + // Structured record with reuse of attachments + auto Attachments2 = CreateCompressedAttachment(CidStore, std::vector<size_t>{971}); + Attachments2.push_back(Attachments1[0]); + Attachments2.push_back(Attachments1[1]); + CacheRecords.emplace_back(CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments2)); + } + + CacheRecords.emplace_back(CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, {})); + + { + // Unstructured cache values + UnstructuredCacheValues.push_back(CreateCacheValue(Zcs, TearDrinkerBucket, 84)); + UnstructuredCacheValues.push_back(CreateCacheValue(Zcs, TearDrinkerBucket, 591)); + UnstructuredCacheValues.push_back(CreateCacheValue(Zcs, TearDrinkerBucket, 1024 * 1024 * 3 + 7)); + UnstructuredCacheValues.push_back(CreateCacheValue(Zcs, TearDrinkerBucket, 71)); + } + } + + SUBCASE("expire nothing") + { + GcManager Gc; + CidStore CidStore(Gc); + CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"}); + ZenCacheNamespace Zcs(Gc, + *JobQueue, + TempDir.Path() / "cache", + {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}}); + + GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() - std::chrono::hours(1), + .ProjectStoreExpireTime = GcClock::Now() - std::chrono::hours(1), + .CollectSmallObjects = false, + .IsDeleteMode = false}); + CHECK_EQ(7u, Result.Items); + CHECK_EQ(0u, Result.ExpiredItems); + CHECK_EQ(0u, Result.DeletedItems); + CHECK_EQ(5u, Result.References); + CHECK_EQ(0u, Result.PrunedReferences); + CHECK_EQ(0u, Result.CompactedReferences); + CHECK_EQ(0u, Result.RemovedDiskSpace); + CHECK_EQ(0u, Result.RemovedMemory); + + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true)); + + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true)); + } + SUBCASE("expire all large objects, delete nothing") + { + GcManager Gc; + CidStore CidStore(Gc); + CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"}); + ZenCacheNamespace Zcs(Gc, + *JobQueue, + TempDir.Path() / "cache", + {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}}); + + GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::minutes(1), + .ProjectStoreExpireTime = GcClock::Now() + std::chrono::minutes(1), + .CollectSmallObjects = false, + .IsDeleteMode = false}); + CHECK_EQ(7u, Result.Items); + CHECK_EQ(1u, Result.ExpiredItems); + CHECK_EQ(0u, Result.DeletedItems); + CHECK_EQ(5u, Result.References); + CHECK_EQ(0u, Result.PrunedReferences); + CHECK_EQ(0u, Result.CompactedReferences); + CHECK_EQ(0u, Result.RemovedDiskSpace); + CHECK_EQ(0u, Result.RemovedMemory); + + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true)); + + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true)); + } + SUBCASE("expire all object, delete nothing") + { + GcManager Gc; + CidStore CidStore(Gc); + CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"}); + ZenCacheNamespace Zcs(Gc, + *JobQueue, + TempDir.Path() / "cache", + {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}}); + + GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::minutes(1), + .ProjectStoreExpireTime = GcClock::Now() + std::chrono::minutes(1), + .CollectSmallObjects = true, + .IsDeleteMode = false}); + CHECK_EQ(7u, Result.Items); + CHECK_EQ(7u, Result.ExpiredItems); + CHECK_EQ(0u, Result.DeletedItems); + CHECK_EQ(5u, Result.References); + CHECK_EQ(0u, Result.PrunedReferences); + CHECK_EQ(0u, Result.CompactedReferences); + CHECK_EQ(0u, Result.RemovedDiskSpace); + CHECK_EQ(0u, Result.RemovedMemory); + + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true)); + + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true)); + } + SUBCASE("expire all large objects, skip cid") + { + GcManager Gc; + CidStore CidStore(Gc); + CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"}); + ZenCacheNamespace Zcs(Gc, + *JobQueue, + TempDir.Path() / "cache", + {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}}); + + GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::minutes(1), + .ProjectStoreExpireTime = GcClock::Now() + std::chrono::minutes(1), + .CollectSmallObjects = false, + .IsDeleteMode = true, + .SkipCidDelete = true}); + CHECK_EQ(7u, Result.Items); + CHECK_EQ(1u, Result.ExpiredItems); + CHECK_EQ(1u, Result.DeletedItems); + CHECK_EQ(0u, Result.References); + CHECK_EQ(0u, Result.PrunedReferences); + CHECK_EQ(0u, Result.CompactedReferences); + CHECK_EQ(CacheEntries[UnstructuredCacheValues[2]].Data.GetSize(), Result.RemovedDiskSpace); + CHECK_EQ(0u, Result.RemovedMemory); + + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true)); + + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true)); + } + SUBCASE("expire all objects, skip cid") + { + GcManager Gc; + CidStore CidStore(Gc); + CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"}); + ZenCacheNamespace Zcs(Gc, + *JobQueue, + TempDir.Path() / "cache", + {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}}); + + GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::minutes(1), + .ProjectStoreExpireTime = GcClock::Now() + std::chrono::minutes(1), + .CollectSmallObjects = true, + .IsDeleteMode = true, + .SkipCidDelete = true}); + CHECK_EQ(7u, Result.Items); + CHECK_EQ(7u, Result.ExpiredItems); + CHECK_EQ(7u, Result.DeletedItems); + CHECK_EQ(0u, Result.References); + CHECK_EQ(0u, Result.PrunedReferences); + CHECK_EQ(0u, Result.CompactedReferences); + CHECK_GE(Result.RemovedDiskSpace, 0); + CHECK_EQ(0u, Result.RemovedMemory); + + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], false, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, true)); + + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], false, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], false, true)); + } + SUBCASE("expire all large objects") + { + GcManager Gc; + CidStore CidStore(Gc); + CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"}); + ZenCacheNamespace Zcs(Gc, + *JobQueue, + TempDir.Path() / "cache", + {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}}); + + GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::minutes(1), + .ProjectStoreExpireTime = GcClock::Now() + std::chrono::minutes(1), + .CollectSmallObjects = false, + .IsDeleteMode = true, + .SkipCidDelete = false}); + CHECK_EQ(7u, Result.Items); + CHECK_EQ(1u, Result.ExpiredItems); // Only one cache value is pruned/deleted as that is the only large item in the cache (all other + // large items as in cas) + CHECK_EQ(1u, Result.DeletedItems); + CHECK_EQ(5u, Result.References); + CHECK_EQ(0u, + Result.PrunedReferences); // We won't remove any references since all referencers are small which retains all references + CHECK_EQ(0u, Result.CompactedReferences); + CHECK_EQ(CacheEntries[UnstructuredCacheValues[2]].Data.GetSize(), Result.RemovedDiskSpace); + CHECK_EQ(0u, Result.RemovedMemory); + + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], true, true)); + + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true)); + } + SUBCASE("expire all objects") + { + GcManager Gc; + CidStore CidStore(Gc); + CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"}); + ZenCacheNamespace Zcs(Gc, + *JobQueue, + TempDir.Path() / "cache", + {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}}); + GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::minutes(1), + .ProjectStoreExpireTime = GcClock::Now() + std::chrono::minutes(1), + .CollectSmallObjects = true, + .IsDeleteMode = true, + .SkipCidDelete = false}); + CHECK_EQ(7u, Result.Items); + CHECK_EQ(7u, Result.ExpiredItems); + CHECK_EQ(7u, Result.DeletedItems); + CHECK_EQ(5u, Result.References); + CHECK_EQ(5u, Result.PrunedReferences); + CHECK_EQ(5u, Result.CompactedReferences); + CHECK_GT(Result.RemovedDiskSpace, 0); + CHECK_EQ(0u, Result.RemovedMemory); + + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], false, false)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, false)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, false)); + + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], false, false)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, false)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], false, false)); + } + + SUBCASE("keep 1 cache record, skip cid") + { + GcManager Gc; + CidStore CidStore(Gc); + CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"}); + ZenCacheNamespace Zcs(Gc, + *JobQueue, + TempDir.Path() / "cache", + {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}}); + Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[0], GcClock::Now() + std::chrono::minutes(2)); + + GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::minutes(1), + .ProjectStoreExpireTime = GcClock::Now() + std::chrono::minutes(1), + .CollectSmallObjects = true, + .IsDeleteMode = true, + .SkipCidDelete = true}); + CHECK_EQ(7u, Result.Items); + CHECK_EQ(6u, Result.ExpiredItems); + CHECK_EQ(6u, Result.DeletedItems); + CHECK_EQ(0u, Result.References); + CHECK_EQ(0u, Result.PrunedReferences); + CHECK_EQ(0u, Result.CompactedReferences); + CHECK_GT(Result.RemovedDiskSpace, 0); + CHECK_EQ(0u, Result.RemovedMemory); + + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, true)); + + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], false, false)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, false)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], false, false)); + } + + SUBCASE("keep 2 cache records") + { + GcManager Gc; + CidStore CidStore(Gc); + CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"}); + ZenCacheNamespace Zcs(Gc, + *JobQueue, + TempDir.Path() / "cache", + {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}}); + Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[0], GcClock::Now() + std::chrono::minutes(2)); + Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[1], GcClock::Now() + std::chrono::minutes(2)); + + GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::minutes(1), + .ProjectStoreExpireTime = GcClock::Now() + std::chrono::minutes(1), + .CollectSmallObjects = true, + .IsDeleteMode = true, + .SkipCidDelete = false}); + CHECK_EQ(7u, Result.Items); + CHECK_EQ(5u, Result.ExpiredItems); + CHECK_EQ(5u, Result.DeletedItems); + CHECK_EQ(5u, Result.References); + CHECK_EQ(0u, Result.PrunedReferences); + CHECK_EQ(0u, Result.CompactedReferences); + CHECK_GT(Result.RemovedDiskSpace, 0); + CHECK_EQ(0u, Result.RemovedMemory); + + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, false)); + + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], false, false)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], false, false)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], false, false)); + } + + SUBCASE("keep 3 cache value") + { + GcManager Gc; + CidStore CidStore(Gc); + CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"}); + ZenCacheNamespace Zcs(Gc, + *JobQueue, + TempDir.Path() / "cache", + {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}}); + Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[1], GcClock::Now() + std::chrono::minutes(2)); + Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[2], GcClock::Now() + std::chrono::minutes(2)); + Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[3], GcClock::Now() + std::chrono::minutes(2)); + + GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::minutes(1), + .ProjectStoreExpireTime = GcClock::Now() + std::chrono::minutes(1), + .CollectSmallObjects = true, + .IsDeleteMode = true, + .SkipCidDelete = false}); + CHECK_EQ(7u, Result.Items); + CHECK_EQ(4u, Result.ExpiredItems); + CHECK_EQ(4u, Result.DeletedItems); + CHECK_EQ(5u, Result.References); + CHECK_EQ(5u, Result.PrunedReferences); + CHECK_EQ(5u, Result.CompactedReferences); + CHECK_GT(Result.RemovedDiskSpace, 0); + CHECK_EQ(0u, Result.RemovedMemory); + + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], false, false)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, false)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, false)); + + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true)); + } + + SUBCASE("keep 3 cache value, skip cid") + { + GcManager Gc; + CidStore CidStore(Gc); + CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"}); + ZenCacheNamespace Zcs(Gc, + *JobQueue, + TempDir.Path() / "cache", + {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}}); + + // Prime so we can check GC of memory layer + ZenCacheValue Dummy; + Zcs.Get(TearDrinkerBucket, CacheRecords[0], Dummy); + Zcs.Get(TearDrinkerBucket, CacheRecords[1], Dummy); + Zcs.Get(TearDrinkerBucket, CacheRecords[2], Dummy); + Zcs.Get(TearDrinkerBucket, UnstructuredCacheValues[0], Dummy); + Zcs.Get(TearDrinkerBucket, UnstructuredCacheValues[1], Dummy); + Zcs.Get(TearDrinkerBucket, UnstructuredCacheValues[2], Dummy); + Zcs.Get(TearDrinkerBucket, UnstructuredCacheValues[3], Dummy); + + Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[1], GcClock::Now() + std::chrono::minutes(2)); + Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[2], GcClock::Now() + std::chrono::minutes(2)); + Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[3], GcClock::Now() + std::chrono::minutes(2)); + + GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::minutes(1), + .ProjectStoreExpireTime = GcClock::Now() + std::chrono::minutes(1), + .CollectSmallObjects = true, + .IsDeleteMode = true, + .SkipCidDelete = true}); + CHECK_EQ(7u, Result.Items); + CHECK_EQ(4u, Result.ExpiredItems); + CHECK_EQ(4u, Result.DeletedItems); + CHECK_EQ(0u, Result.References); + CHECK_EQ(0u, Result.PrunedReferences); + CHECK_EQ(0u, Result.CompactedReferences); + CHECK_GT(Result.RemovedDiskSpace, 0); + uint64_t MemoryClean = CacheEntries[CacheRecords[0]].Data.GetSize() + CacheEntries[CacheRecords[1]].Data.GetSize() + + CacheEntries[CacheRecords[2]].Data.GetSize() + CacheEntries[UnstructuredCacheValues[0]].Data.GetSize(); + CHECK_EQ(MemoryClean, Result.RemovedMemory); + + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[0], false, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[1], false, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, CacheRecords[2], false, true)); + + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[0], false, false)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[1], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[2], true, true)); + CHECK(ValidateCacheEntry(Zcs, CidStore, TearDrinkerBucket, UnstructuredCacheValues[3], true, true)); + } + + SUBCASE("leave write block") + { + GcManager Gc; + CidStore CidStore(Gc); + CidStore.Initialize({.RootDirectory = TempDir.Path() / "cas"}); + ZenCacheNamespace Zcs(Gc, + *JobQueue, + TempDir.Path() / "cache", + {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}}); + + auto Attachments = + CreateCompressedAttachment(CidStore, std::vector<size_t>{177, 1024 * 1024 * 2 + 31, 8999, 1024 * 1024 * 2 + 187}); + IoHash CacheRecord = CreateCacheRecord(Zcs, CidStore, TearDrinkerBucket, Attachments); + + Zcs.SetAccessTime(TearDrinkerBucket, CacheRecord, GcClock::Now() - std::chrono::minutes(2)); + + Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[0], GcClock::Now() + std::chrono::minutes(2)); + Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[1], GcClock::Now() + std::chrono::minutes(2)); + Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[2], GcClock::Now() + std::chrono::minutes(2)); + + Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[0], GcClock::Now() + std::chrono::minutes(2)); + Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[1], GcClock::Now() + std::chrono::minutes(2)); + Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[2], GcClock::Now() + std::chrono::minutes(2)); + Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[3], GcClock::Now() + std::chrono::minutes(2)); + + GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::minutes(1), + .ProjectStoreExpireTime = GcClock::Now() + std::chrono::minutes(1), + .CollectSmallObjects = true, + .IsDeleteMode = true, + .SkipCidDelete = false}); + CHECK_EQ(8u, Result.Items); + CHECK_EQ(1u, Result.ExpiredItems); + CHECK_EQ(1u, Result.DeletedItems); + CHECK_EQ(9u, Result.References); + CHECK_EQ(4u, Result.PrunedReferences); + CHECK_EQ(4u, Result.CompactedReferences); + CHECK_EQ(Attachments[1].second.GetCompressed().GetSize() + Attachments[3].second.GetCompressed().GetSize(), + Result.RemovedDiskSpace); + uint64_t MemoryClean = CacheEntries[CacheRecord].Data.GetSize(); + CHECK_EQ(MemoryClean, Result.RemovedMemory); + } +} + #endif void diff --git a/src/zenserver/cache/structuredcachestore.h b/src/zenserver/cache/structuredcachestore.h index dacf482d8..a3cac0d44 100644 --- a/src/zenserver/cache/structuredcachestore.h +++ b/src/zenserver/cache/structuredcachestore.h @@ -106,6 +106,10 @@ public: CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const; +#if ZEN_WITH_TESTS + void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time); +#endif // ZEN_WITH_TESTS + private: GcManager& m_Gc; JobQueue& m_JobQueue; diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp index ca438fe38..43d517520 100644 --- a/src/zenserver/config.cpp +++ b/src/zenserver/config.cpp @@ -865,6 +865,7 @@ ParseConfigFile(const std::filesystem::path& Path, LuaOptions.AddOption("cache.upstream.zen.url"sv, ServerOptions.UpstreamCacheConfig.ZenConfig.Urls); LuaOptions.AddOption("gc.enabled"sv, ServerOptions.GcConfig.Enabled, "gc-enabled"sv); + LuaOptions.AddOption("gc.v2"sv, ServerOptions.GcConfig.UseGCV2, "gc-v2"sv); LuaOptions.AddOption("gc.monitorintervalseconds"sv, ServerOptions.GcConfig.MonitorIntervalSeconds, "gc-monitor-interval-seconds"sv); LuaOptions.AddOption("gc.intervalseconds"sv, ServerOptions.GcConfig.IntervalSeconds, "gc-interval-seconds"sv); @@ -1228,6 +1229,13 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) options.add_option("gc", "", + "gc-v2", + "Use V2 of GC implementation or not.", + cxxopts::value<bool>(ServerOptions.GcConfig.UseGCV2)->default_value("false"), + ""); + + options.add_option("gc", + "", "gc-small-objects", "Whether garbage collection of small objects is enabled or not.", cxxopts::value<bool>(ServerOptions.GcConfig.CollectSmallObjects)->default_value("true"), diff --git a/src/zenserver/config.h b/src/zenserver/config.h index a1e091665..d55f0d5a1 100644 --- a/src/zenserver/config.h +++ b/src/zenserver/config.h @@ -71,6 +71,7 @@ struct ZenGcConfig uint64_t DiskSizeSoftLimit = 0; int32_t LightweightIntervalSeconds = 0; uint64_t MinimumFreeDiskSpaceToAllowWrites = 1ul << 28; + bool UseGCV2 = false; }; struct ZenOpenIdProviderConfig diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index df23db1bd..274876123 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -452,6 +452,8 @@ struct ProjectStore::OplogStorage : public RefCounted m_OpBlobs.Flush(); } + uint32_t GetMaxLsn() const { return m_MaxLsn.load(); } + spdlog::logger& Log() { return m_OwnerOplog->Log(); } private: @@ -855,6 +857,17 @@ ProjectStore::Oplog::GetOpIndexByKey(const Oid& Key) return -1; } +int +ProjectStore::Oplog::GetMaxOpIndex() const +{ + RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return -1; + } + return gsl::narrow<int>(m_Storage->GetMaxLsn()); +} + std::optional<CbObject> ProjectStore::Oplog::GetOpByKey(const Oid& Key) { @@ -1661,6 +1674,17 @@ ProjectStore::Project::TouchOplog(std::string_view Oplog) const m_LastAccessTimes.insert_or_assign(std::string(Oplog), GcClock::TickCount()); }; +GcClock::TimePoint +ProjectStore::Project::LastOplogAccessTime(std::string_view Oplog) const +{ + RwLock::SharedLockScope Lock(m_ProjectLock); + if (auto It = m_LastAccessTimes.find(std::string(Oplog)); It != m_LastAccessTimes.end()) + { + return GcClock::TimePointFromTick(It->second); + } + return GcClock::TimePoint::min(); +} + ////////////////////////////////////////////////////////////////////////// ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, JobQueue& JobQueue) @@ -1675,11 +1699,13 @@ ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcMa // m_Log.set_level(spdlog::level::debug); m_Gc.AddGcContributor(this); m_Gc.AddGcStorage(this); + m_Gc.AddGcReferencer(*this); } ProjectStore::~ProjectStore() { ZEN_INFO("closing project store at '{}'", m_ProjectBasePath); + m_Gc.RemoveGcReferencer(*this); m_Gc.RemoveGcStorage(this); m_Gc.RemoveGcContributor(this); } @@ -3010,6 +3036,238 @@ ProjectStore::AreDiskWritesAllowed() const return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed()); } +void +ProjectStore::RemoveExpiredData(GcCtx& Ctx) +{ + size_t ProjectCount = 0; + size_t ExpiredProjectCount = 0; + size_t OplogCount = 0; + size_t ExpiredOplogCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_DEBUG("gc project store '{}': removed {} expired projects out of {}, {} expired oplogs out of {} in {}", + m_ProjectBasePath, + ExpiredProjectCount, + ProjectCount, + ExpiredOplogCount, + OplogCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + std::vector<Ref<Project>> ExpiredProjects; + std::vector<Ref<Project>> Projects; + + { + RwLock::SharedLockScope Lock(m_ProjectsLock); + for (auto& Kv : m_Projects) + { + if (Kv.second->IsExpired(Lock, Ctx.Settings.ProjectStoreExpireTime)) + { + ExpiredProjects.push_back(Kv.second); + continue; + } + Projects.push_back(Kv.second); + } + } + + for (const Ref<Project>& Project : Projects) + { + std::vector<std::string> ExpiredOplogs; + { + RwLock::ExclusiveLockScope __(m_ProjectsLock); + Project->IterateOplogs( + [&Ctx, &Project, &ExpiredOplogs, &OplogCount](const RwLock::SharedLockScope& Lock, ProjectStore::Oplog& Oplog) { + OplogCount++; + if (Project->IsExpired(Lock, Ctx.Settings.ProjectStoreExpireTime, Oplog)) + { + ExpiredOplogs.push_back(Oplog.OplogId()); + } + }); + } + std::filesystem::path ProjectPath = BasePathForProject(Project->Identifier); + ExpiredOplogCount += ExpiredOplogs.size(); + if (Ctx.Settings.IsDeleteMode) + { + for (const std::string& OplogId : ExpiredOplogs) + { + std::filesystem::path OplogBasePath = ProjectPath / OplogId; + uint64_t OplogSize = Oplog::TotalSize(OplogBasePath); + ZEN_DEBUG("gc project store '{}': garbage collected oplog '{}' in project '{}'. Removing storage on disk", + m_ProjectBasePath, + OplogId, + Project->Identifier); + Project->DeleteOplog(OplogId); + Ctx.RemovedDiskSpace.fetch_add(OplogSize); + } + Ctx.DeletedItems.fetch_add(ExpiredOplogs.size()); + Project->Flush(); + } + } + ProjectCount = Projects.size(); + Ctx.Items.fetch_add(ProjectCount + OplogCount); + ExpiredProjectCount = ExpiredProjects.size(); + + if (ExpiredProjects.empty()) + { + ZEN_DEBUG("gc project store '{}': no expired projects found", m_ProjectBasePath); + return; + } + + if (Ctx.Settings.IsDeleteMode) + { + for (const Ref<Project>& Project : ExpiredProjects) + { + std::filesystem::path PathToRemove; + std::string ProjectId = Project->Identifier; + { + { + RwLock::SharedLockScope Lock(m_ProjectsLock); + if (!Project->IsExpired(Lock, Ctx.Settings.ProjectStoreExpireTime)) + { + ZEN_DEBUG("gc project store '{}': skipped garbage collect of project '{}'. Project no longer expired.", + m_ProjectBasePath, + ProjectId); + continue; + } + } + RwLock::ExclusiveLockScope __(m_ProjectsLock); + bool Success = Project->PrepareForDelete(PathToRemove); + if (!Success) + { + ZEN_DEBUG("gc project store '{}': skipped garbage collect of project '{}'. Project folder is locked.", + m_ProjectBasePath, + ProjectId); + continue; + } + m_Projects.erase(ProjectId); + } + + ZEN_DEBUG("gc project store '{}': sgarbage collected project '{}'. Removing storage on disk", m_ProjectBasePath, ProjectId); + if (PathToRemove.empty()) + { + continue; + } + + DeleteDirectories(PathToRemove); + } + Ctx.DeletedItems.fetch_add(ExpiredProjects.size()); + } + + Ctx.ExpiredItems.fetch_add(ExpiredOplogCount + ExpiredProjectCount); +} + +class ProjectStoreReferenceChecker : public GcReferenceChecker +{ +public: + ProjectStoreReferenceChecker(ProjectStore::Oplog& Owner, bool PreCache) : m_Oplog(Owner) + { + if (PreCache) + { + RwLock::SharedLockScope _(m_Oplog.m_OplogLock); + m_Oplog.IterateOplog([&](CbObjectView Op) { + Op.IterateAttachments([&](CbFieldView Visitor) { m_UncachedReferences.insert(Visitor.AsAttachment()); }); + }); + m_PreCachedLsn = m_Oplog.GetMaxOpIndex(); + } + } + + virtual ~ProjectStoreReferenceChecker() {} + + virtual void LockState(GcCtx&) override + { + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_DEBUG("gc project oplog '{}': found {} references in {} from {}/{}", + m_Oplog.m_BasePath, + m_UncachedReferences.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + m_Oplog.m_OuterProject->Identifier, + m_Oplog.OplogId()); + }); + + m_OplogLock = std::make_unique<RwLock::SharedLockScope>(m_Oplog.m_OplogLock); + if (m_Oplog.GetMaxOpIndex() != m_PreCachedLsn) + { + // TODO: Maybe we could just check the added oplog entries - we might get a few extra references from obsolete entries + // but I don't think that would be critical + m_UncachedReferences.clear(); + m_Oplog.IterateOplog([&](CbObjectView Op) { + Op.IterateAttachments([&](CbFieldView Visitor) { m_UncachedReferences.insert(Visitor.AsAttachment()); }); + }); + } + } + + virtual void RemoveUsedReferencesFromSet(GcCtx&, HashSet& IoCids) override + { + for (const IoHash& ReferenceHash : m_UncachedReferences) + { + IoCids.erase(ReferenceHash); + } + } + ProjectStore::Oplog& m_Oplog; + std::unique_ptr<RwLock::SharedLockScope> m_OplogLock; + HashSet m_UncachedReferences; + int m_PreCachedLsn = -1; +}; + +std::vector<GcReferenceChecker*> +ProjectStore::CreateReferenceCheckers(GcCtx&) +{ + size_t ProjectCount = 0; + size_t OplogCount = 0; + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_DEBUG("gc project store '{}': opened {} projects and {} oplogs in {}", + m_ProjectBasePath, + ProjectCount, + OplogCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + DiscoverProjects(); + + std::vector<Ref<ProjectStore::Project>> Projects; + { + RwLock::SharedLockScope Lock(m_ProjectsLock); + Projects.reserve(m_Projects.size()); + + for (auto& Kv : m_Projects) + { + Projects.push_back(Kv.second); + } + } + ProjectCount += Projects.size(); + std::vector<GcReferenceChecker*> Checkers; + try + { + for (const Ref<ProjectStore::Project>& Project : Projects) + { + std::vector<std::string> OpLogs = Project->ScanForOplogs(); + Checkers.reserve(OpLogs.size()); + for (const std::string& OpLogId : OpLogs) + { + ProjectStore::Oplog* Oplog = Project->OpenOplog(OpLogId); + GcClock::TimePoint Now = GcClock::Now(); + bool TryPreCache = Project->LastOplogAccessTime(OpLogId) < (Now - std::chrono::minutes(5)); + Checkers.emplace_back(new ProjectStoreReferenceChecker(*Oplog, TryPreCache)); + } + OplogCount += OpLogs.size(); + } + } + catch (std::exception&) + { + while (!Checkers.empty()) + { + delete Checkers.back(); + Checkers.pop_back(); + } + throw; + } + + return Checkers; +} + ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index 5aede88b0..94e697278 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -60,7 +60,7 @@ static_assert(IsPow2(sizeof(OplogEntry))); package data split into separate chunks for bulk data, exports and header information. */ -class ProjectStore : public RefCounted, public GcStorage, public GcContributor +class ProjectStore : public RefCounted, public GcStorage, public GcContributor, public GcReferencer { struct OplogStorage; @@ -98,6 +98,7 @@ public: std::optional<CbObject> GetOpByKey(const Oid& Key); std::optional<CbObject> GetOpByIndex(int Index); int GetOpIndexByKey(const Oid& Key); + int GetMaxOpIndex() const; IoBuffer FindChunk(Oid ChunkId); @@ -208,6 +209,8 @@ public: std::string_view ClientPath); void AddChunkMapping(const RwLock::ExclusiveLockScope& OplogLock, Oid ChunkId, IoHash Hash); void AddMetaMapping(const RwLock::ExclusiveLockScope& OplogLock, Oid ChunkId, IoHash Hash); + + friend class ProjectStoreReferenceChecker; }; struct Project : public RefCounted @@ -225,10 +228,11 @@ public: void IterateOplogs(std::function<void(const RwLock::SharedLockScope&, Oplog&)>&& Fn); std::vector<std::string> ScanForOplogs() const; bool IsExpired(const RwLock::SharedLockScope&, const GcClock::TimePoint ExpireTime); - bool IsExpired(const RwLock::SharedLockScope&, const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog); - bool IsExpired(const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog); - void TouchProject() const; - void TouchOplog(std::string_view Oplog) const; + bool IsExpired(const RwLock::SharedLockScope&, const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog); + bool IsExpired(const GcClock::TimePoint ExpireTime, const ProjectStore::Oplog& Oplog); + void TouchProject() const; + void TouchOplog(std::string_view Oplog) const; + GcClock::TimePoint LastOplogAccessTime(std::string_view Oplog) const; Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath); virtual ~Project(); @@ -289,6 +293,9 @@ public: virtual void CollectGarbage(GcContext& GcCtx) override; virtual GcStorageSize StorageSize() const override; + virtual void RemoveExpiredData(GcCtx& Ctx) override; + virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override; + CbArray GetProjectsList(); std::pair<HttpResponseCode, std::string> GetProjectFiles(const std::string_view ProjectId, const std::string_view OplogId, diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index d7fc2d069..f40602769 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -288,7 +288,8 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize, .DiskSizeSoftLimit = ServerOptions.GcConfig.DiskSizeSoftLimit, .MinimumFreeDiskSpaceToAllowWrites = ServerOptions.GcConfig.MinimumFreeDiskSpaceToAllowWrites, - .LightweightInterval = std::chrono::seconds(ServerOptions.GcConfig.LightweightIntervalSeconds)}; + .LightweightInterval = std::chrono::seconds(ServerOptions.GcConfig.LightweightIntervalSeconds), + .UseGCVersion = ServerOptions.GcConfig.UseGCV2 ? GcVersion::kV2 : GcVersion::kV1}; m_GcScheduler.Initialize(GcConfig); // Create and register admin interface last to make sure all is properly initialized diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 02ee204ad..837185201 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -957,6 +957,196 @@ BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, } } +void +BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, + uint64_t PayloadAlignment, + const CompactCallback& ChangeCallback, + const ClaimDiskReserveCallback& DiskReserveCallback) +{ + uint64_t DeletedSize = 0; + uint64_t MovedCount = 0; + uint64_t MovedSize = 0; + + Stopwatch TotalTimer; + const auto _ = MakeGuard([&] { + ZEN_DEBUG("compact blocks for '{}' DONE after {}, deleted {} and moved {} chunks ({}) ", + m_BlocksBasePath, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + NiceBytes(DeletedSize), + MovedCount, + NiceBytes(MovedSize)); + }); + + uint64_t WriteOffset = m_MaxBlockSize + 1u; // Force detect a new block + uint32_t NewBlockIndex = 0; + MovedChunksArray MovedChunks; + + uint64_t RemovedSize = 0; + + Ref<BlockStoreFile> NewBlockFile; + auto NewBlockFileGuard = MakeGuard([&]() { + if (NewBlockFile) + { + ZEN_DEBUG("dropping incomplete cas block store file '{}'", NewBlockFile->GetPath()); + { + RwLock::ExclusiveLockScope _l(m_InsertLock); + if (m_ChunkBlocks[NewBlockIndex] == NewBlockFile) + { + m_ChunkBlocks.erase(NewBlockIndex); + } + } + NewBlockFile->MarkAsDeleteOnClose(); + } + }); + + std::vector<uint32_t> RemovedBlocks; + + CompactState.IterateBlocks( + [&](uint32_t BlockIndex, const std::vector<size_t>& KeepChunkIndexes, const std::vector<BlockStoreLocation>& ChunkLocations) { + ZEN_ASSERT(BlockIndex != m_WriteBlockIndex.load()); + + Ref<BlockStoreFile> OldBlockFile; + { + RwLock::SharedLockScope _(m_InsertLock); + auto It = m_ChunkBlocks.find(BlockIndex); + if (It == m_ChunkBlocks.end()) + { + // This block has unknown, we can't move anything. Report error? + return; + } + if (!It->second) + { + // This block has been removed, we can't move anything. Report error? + return; + } + OldBlockFile = It->second; + } + ZEN_ASSERT(OldBlockFile); + + uint64_t OldBlockSize = OldBlockFile->FileSize(); + + // TODO: Add heuristics for determining if it is worth to compact a block (if only a very small part is removed) + + std::vector<uint8_t> Chunk; + for (const size_t& ChunkIndex : KeepChunkIndexes) + { + const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex]; + Chunk.resize(ChunkLocation.Size); + OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); + + if ((WriteOffset + Chunk.size()) > m_MaxBlockSize) + { + if (NewBlockFile) + { + NewBlockFile->Flush(); + MovedSize += NewBlockFile->FileSize(); + NewBlockFile = nullptr; + + ZEN_ASSERT(!MovedChunks.empty() || RemovedSize > 0); // We should not have a new block if we haven't moved anything + + ChangeCallback(MovedChunks, RemovedSize); + DeletedSize += RemovedSize; + RemovedSize = 0; + MovedCount += MovedChunks.size(); + MovedChunks.clear(); + } + + uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); + { + RwLock::ExclusiveLockScope InsertLock(m_InsertLock); + std::filesystem::path NewBlockPath; + NextBlockIndex = GetFreeBlockIndex(NextBlockIndex, InsertLock, NewBlockPath); + if (NextBlockIndex == (uint32_t)m_MaxBlockCount) + { + ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded", + m_BlocksBasePath, + static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); + return; + } + + NewBlockFile = new BlockStoreFile(NewBlockPath); + m_ChunkBlocks[NextBlockIndex] = NewBlockFile; + } + ZEN_ASSERT(NewBlockFile); + + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error); + if (Error) + { + ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message()); + return; + } + + if (Space.Free < m_MaxBlockSize) + { + uint64_t ReclaimedSpace = DiskReserveCallback(); + if (Space.Free + ReclaimedSpace < m_MaxBlockSize) + { + ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}", + m_BlocksBasePath, + m_MaxBlockSize, + NiceBytes(Space.Free + ReclaimedSpace)); + { + RwLock::ExclusiveLockScope _l(m_InsertLock); + ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile); + m_ChunkBlocks.erase(NextBlockIndex); + } + NewBlockFile->MarkAsDeleteOnClose(); + return; + } + + ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}", + m_BlocksBasePath, + ReclaimedSpace, + NiceBytes(Space.Free + ReclaimedSpace)); + } + NewBlockFile->Create(m_MaxBlockSize); + NewBlockIndex = NextBlockIndex; + WriteOffset = 0; + } + + NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); + MovedChunks.push_back({ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}}); + WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment); + } + Chunk.clear(); + + // Report what we have moved so we can purge the old block + if (!MovedChunks.empty() || RemovedSize > 0) + { + ChangeCallback(MovedChunks, RemovedSize); + DeletedSize += RemovedSize; + RemovedSize = 0; + MovedCount += MovedChunks.size(); + MovedChunks.clear(); + } + + { + RwLock::ExclusiveLockScope InsertLock(m_InsertLock); + ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); + OldBlockFile->MarkAsDeleteOnClose(); + m_ChunkBlocks.erase(BlockIndex); + m_TotalSize.fetch_sub(OldBlockSize); + RemovedSize += OldBlockSize; + } + }); + if (NewBlockFile) + { + NewBlockFile->Flush(); + MovedSize += NewBlockFile->FileSize(); + NewBlockFile = nullptr; + } + + if (!MovedChunks.empty() || RemovedSize > 0) + { + ChangeCallback(MovedChunks, RemovedSize); + DeletedSize += RemovedSize; + RemovedSize = 0; + MovedCount += MovedChunks.size(); + MovedChunks.clear(); + } +} + const char* BlockStore::GetBlockFileExtension() { diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 115bdcf03..f93dafa21 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -117,10 +117,12 @@ namespace { CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : m_Log(logging::Get("containercas")), m_Gc(Gc) { m_Gc.AddGcStorage(this); + m_Gc.AddGcReferenceStore(*this); } CasContainerStrategy::~CasContainerStrategy() { + m_Gc.RemoveGcReferenceStore(*this); m_Gc.RemoveGcStorage(this); } @@ -551,6 +553,221 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) GcCtx.AddDeletedCids(DeletedChunks); } +class CasContainerStoreCompactor : public GcReferenceStoreCompactor +{ +public: + CasContainerStoreCompactor(CasContainerStrategy& Owner, + BlockStoreCompactState&& CompactState, + std::vector<IoHash>&& CompactStateKeys, + std::vector<IoHash>&& PrunedKeys) + : m_CasContainerStrategy(Owner) + , m_CompactState(std::move(CompactState)) + , m_CompactStateKeys(std::move(CompactStateKeys)) + , m_PrunedKeys(std::move(PrunedKeys)) + { + } + + virtual void CompactReferenceStore(GcCtx& Ctx) + { + size_t CompactedCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_DEBUG("gc block store '{}': compacted {} cids in {}", + m_CasContainerStrategy.m_RootDirectory / m_CasContainerStrategy.m_ContainerBaseName, + CompactedCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + if (Ctx.Settings.IsDeleteMode && Ctx.Settings.CollectSmallObjects) + { + // Compact block store + m_CasContainerStrategy.m_BlockStore.CompactBlocks( + m_CompactState, + m_CasContainerStrategy.m_PayloadAlignment, + [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) { + std::vector<CasDiskIndexEntry> MovedEntries; + RwLock::ExclusiveLockScope _(m_CasContainerStrategy.m_LocationMapLock); + for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray) + { + size_t ChunkIndex = Moved.first; + const IoHash& Key = m_CompactStateKeys[ChunkIndex]; + + if (auto It = m_CasContainerStrategy.m_LocationMap.find(Key); It != m_CasContainerStrategy.m_LocationMap.end()) + { + BlockStoreDiskLocation& Location = m_CasContainerStrategy.m_Locations[It->second]; + const BlockStoreLocation& OldLocation = m_CompactState.GetLocation(ChunkIndex); + if (Location.Get(m_CasContainerStrategy.m_PayloadAlignment) != OldLocation) + { + // Someone has moved our chunk so lets just skip the new location we were provided, it will be GC:d at a + // later time + continue; + } + + const BlockStoreLocation& NewLocation = Moved.second; + Location = BlockStoreDiskLocation(NewLocation, m_CasContainerStrategy.m_PayloadAlignment); + MovedEntries.push_back(CasDiskIndexEntry{.Key = Key, .Location = Location}); + } + } + m_CasContainerStrategy.m_CasLog.Append(MovedEntries); + Ctx.RemovedDiskSpace.fetch_add(FreedDiskSpace); + }, + [&]() { return 0; }); + + CompactedCount = m_PrunedKeys.size(); + Ctx.CompactedReferences.fetch_add( + CompactedCount); // Slightly missleading, it might not be compacted if the block is the currently writing block + } + } + + CasContainerStrategy& m_CasContainerStrategy; + BlockStoreCompactState m_CompactState; + std::vector<IoHash> m_CompactStateKeys; + std::vector<IoHash> m_PrunedKeys; +}; + +class CasContainerReferencePruner : public GcReferencePruner +{ +public: + CasContainerReferencePruner(CasContainerStrategy& Owner, std::vector<IoHash>&& Cids) + : m_CasContainerStrategy(Owner) + , m_Cids(std::move(Cids)) + { + } + + virtual GcReferenceStoreCompactor* RemoveUnreferencedData(GcCtx& Ctx, const GetUnusedReferencesFunc& GetUnusedReferences) + { + size_t TotalCount = m_Cids.size(); + size_t PruneCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_DEBUG("gc block store '{}': removed {} unused cid out of {} in {}", + m_CasContainerStrategy.m_RootDirectory / m_CasContainerStrategy.m_ContainerBaseName, + PruneCount, + TotalCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + std::vector<IoHash> UnusedCids = GetUnusedReferences(m_Cids); + m_Cids.clear(); + + if (UnusedCids.empty()) + { + // Nothing to collect + return nullptr; + } + + BlockStoreCompactState CompactState; + BlockStore::ReclaimSnapshotState BlockSnapshotState; + std::vector<IoHash> CompactStateKeys; + std::vector<CasDiskIndexEntry> ExpiredEntries; + ExpiredEntries.reserve(UnusedCids.size()); + tsl::robin_set<IoHash, IoHash::Hasher> UnusedKeys; + + { + RwLock::ExclusiveLockScope __(m_CasContainerStrategy.m_LocationMapLock); + if (Ctx.Settings.CollectSmallObjects) + { + BlockSnapshotState = m_CasContainerStrategy.m_BlockStore.GetReclaimSnapshotState(); + } + + for (const IoHash& Cid : UnusedCids) + { + auto It = m_CasContainerStrategy.m_LocationMap.find(Cid); + if (It == m_CasContainerStrategy.m_LocationMap.end()) + { + continue; + } + CasDiskIndexEntry ExpiredEntry = {.Key = Cid, + .Location = m_CasContainerStrategy.m_Locations[It->second], + .Flags = CasDiskIndexEntry::kTombstone}; + const BlockStoreDiskLocation& Location = m_CasContainerStrategy.m_Locations[It->second]; + BlockStoreLocation BlockLocation = Location.Get(m_CasContainerStrategy.m_PayloadAlignment); + if (Ctx.Settings.CollectSmallObjects) + { + UnusedKeys.insert(Cid); + uint32_t BlockIndex = BlockLocation.BlockIndex; + bool IsActiveWriteBlock = BlockSnapshotState.m_ActiveWriteBlocks.contains(BlockIndex); + if (!IsActiveWriteBlock) + { + CompactState.AddBlock(BlockIndex); + } + ExpiredEntries.push_back(ExpiredEntry); + } + } + + // Get all locations we need to keep for affected blocks + if (Ctx.Settings.CollectSmallObjects && !UnusedKeys.empty()) + { + for (const auto& Entry : m_CasContainerStrategy.m_LocationMap) + { + const IoHash& Key = Entry.first; + if (UnusedKeys.contains(Key)) + { + continue; + } + const BlockStoreDiskLocation& Location = m_CasContainerStrategy.m_Locations[Entry.second]; + BlockStoreLocation BlockLocation = Location.Get(m_CasContainerStrategy.m_PayloadAlignment); + if (CompactState.AddKeepLocation(BlockLocation)) + { + CompactStateKeys.push_back(Key); + } + } + } + + if (Ctx.Settings.IsDeleteMode) + { + for (const CasDiskIndexEntry& Entry : ExpiredEntries) + { + m_CasContainerStrategy.m_LocationMap.erase(Entry.Key); + } + m_CasContainerStrategy.m_CasLog.Append(ExpiredEntries); + m_CasContainerStrategy.m_CasLog.Flush(); + } + } + + PruneCount = UnusedKeys.size(); + Ctx.PrunedReferences.fetch_add(PruneCount); + return new CasContainerStoreCompactor(m_CasContainerStrategy, + std::move(CompactState), + std::move(CompactStateKeys), + std::vector<IoHash>(UnusedKeys.begin(), UnusedKeys.end())); + } + +private: + CasContainerStrategy& m_CasContainerStrategy; + std::vector<IoHash> m_Cids; +}; + +GcReferencePruner* +CasContainerStrategy::CreateReferencePruner(GcCtx& Ctx) +{ + size_t TotalCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_DEBUG("gc block store '{}': found {} cid keys to check in {}", + m_RootDirectory / m_ContainerBaseName, + TotalCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + std::vector<IoHash> CidsToCheck; + { + RwLock::SharedLockScope __(m_LocationMapLock); + CidsToCheck.reserve(m_LocationMap.size()); + for (const auto& It : m_LocationMap) + { + CidsToCheck.push_back(It.first); + } + } + TotalCount = CidsToCheck.size(); + if (TotalCount == 0) + { + return {}; + } + Ctx.References.fetch_add(TotalCount); + return new CasContainerReferencePruner(*this, std::move(CidsToCheck)); +} + void CasContainerStrategy::CompactIndex(RwLock::ExclusiveLockScope&) { diff --git a/src/zenstore/compactcas.h b/src/zenstore/compactcas.h index 478a1f78e..9ff4ae4fc 100644 --- a/src/zenstore/compactcas.h +++ b/src/zenstore/compactcas.h @@ -49,7 +49,7 @@ static_assert(sizeof(CasDiskIndexEntry) == 32); */ -struct CasContainerStrategy final : public GcStorage +struct CasContainerStrategy final : public GcStorage, public GcReferenceStore { CasContainerStrategy(GcManager& Gc); ~CasContainerStrategy(); @@ -71,6 +71,8 @@ struct CasContainerStrategy final : public GcStorage virtual void CollectGarbage(GcContext& GcCtx) override; virtual GcStorageSize StorageSize() const override; + virtual GcReferencePruner* CreateReferencePruner(GcCtx& Ctx) override; + private: CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); void MakeIndexSnapshot(); @@ -97,6 +99,9 @@ private: typedef tsl::robin_map<IoHash, size_t, IoHash::Hasher> LocationMap_t; LocationMap_t m_LocationMap; std::vector<BlockStoreDiskLocation> m_Locations; + + friend class CasContainerReferencePruner; + friend class CasContainerStoreCompactor; }; void compactcas_forcelink(); diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 24d0a39bb..e28e0dea4 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -122,10 +122,12 @@ FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& Roo FileCasStrategy::FileCasStrategy(GcManager& Gc) : m_Log(logging::Get("filecas")), m_Gc(Gc) { m_Gc.AddGcStorage(this); + m_Gc.AddGcReferenceStore(*this); } FileCasStrategy::~FileCasStrategy() { + m_Gc.RemoveGcReferenceStore(*this); m_Gc.RemoveGcStorage(this); } @@ -1329,7 +1331,170 @@ FileCasStrategy::ScanFolderForCasFiles(const std::filesystem::path& RootDir) return Entries; }; - ////////////////////////////////////////////////////////////////////////// +class FileCasStoreCompactor : public GcReferenceStoreCompactor +{ +public: + FileCasStoreCompactor(FileCasStrategy& Owner, std::vector<IoHash>&& ReferencesToClean) + : m_FileCasStrategy(Owner) + , m_ReferencesToClean(std::move(ReferencesToClean)) + { + } + + virtual void CompactReferenceStore(GcCtx& Ctx) + { + size_t CompactedCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_DEBUG("gc file store '{}': removed data for {} unused cids in {}", + m_FileCasStrategy.m_RootDirectory, + CompactedCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + std::vector<IoHash> ReferencedCleaned; + ReferencedCleaned.reserve(m_ReferencesToClean.size()); + + for (const IoHash& ChunkHash : m_ReferencesToClean) + { + FileCasStrategy::ShardingHelper Name(m_FileCasStrategy.m_RootDirectory.c_str(), ChunkHash); + { + RwLock::SharedLockScope __(m_FileCasStrategy.m_Lock); + if (auto It = m_FileCasStrategy.m_Index.find(ChunkHash); It != m_FileCasStrategy.m_Index.end()) + { + // Not regarded as pruned, leave it be + continue; + } + if (Ctx.Settings.IsDeleteMode) + { + ZEN_DEBUG("deleting CAS payload file '{}'", Name.ShardedPath.ToUtf8()); + std::error_code Ec; + uint64_t SizeOnDisk = std::filesystem::file_size(Name.ShardedPath.c_str(), Ec); + if (Ec) + { + SizeOnDisk = 0; + } + bool Existed = std::filesystem::remove(Name.ShardedPath.c_str(), Ec); + if (Ec) + { + ZEN_WARN("failed deleting CAS payload file '{}'. Reason '{}'", Name.ShardedPath.ToUtf8(), Ec.message()); + continue; + } + if (!Existed) + { + continue; + } + Ctx.RemovedDiskSpace.fetch_add(SizeOnDisk); + } + else + { + std::error_code Ec; + bool Existed = std::filesystem::is_regular_file(Name.ShardedPath.c_str(), Ec); + if (Ec) + { + ZEN_WARN("failed checking CAS payload file '{}'. Reason '{}'", Name.ShardedPath.ToUtf8(), Ec.message()); + continue; + } + if (!Existed) + { + continue; + } + } + ReferencedCleaned.push_back(ChunkHash); + } + } + CompactedCount = ReferencedCleaned.size(); + Ctx.CompactedReferences.fetch_add(ReferencedCleaned.size()); + } + +private: + FileCasStrategy& m_FileCasStrategy; + std::vector<IoHash> m_ReferencesToClean; +}; + +class FileCasReferencePruner : public GcReferencePruner +{ +public: + FileCasReferencePruner(FileCasStrategy& Owner, std::vector<IoHash>&& Cids) : m_FileCasStrategy(Owner), m_Cids(std::move(Cids)) {} + + virtual GcReferenceStoreCompactor* RemoveUnreferencedData(GcCtx& Ctx, const GetUnusedReferencesFunc& GetUnusedReferences) + { + size_t TotalCount = m_Cids.size(); + size_t PruneCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_DEBUG("gc file store '{}': removed {} unused cid out of {} in {}", + m_FileCasStrategy.m_RootDirectory, + PruneCount, + TotalCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + std::vector<IoHash> UnusedReferences = GetUnusedReferences(m_Cids); + m_Cids.clear(); + + std::vector<IoHash> PrunedReferences; + PrunedReferences.reserve(UnusedReferences.size()); + { + RwLock::ExclusiveLockScope __(m_FileCasStrategy.m_Lock); + for (const IoHash& ChunkHash : UnusedReferences) + { + auto It = m_FileCasStrategy.m_Index.find(ChunkHash); + if (It == m_FileCasStrategy.m_Index.end()) + { + continue; + } + if (Ctx.Settings.IsDeleteMode) + { + uint64_t FileSize = It->second.Size; + m_FileCasStrategy.m_Index.erase(It); + m_FileCasStrategy.m_CasLog.Append( + {.Key = ChunkHash, .Flags = FileCasStrategy::FileCasIndexEntry::kTombStone, .Size = FileSize}); + m_FileCasStrategy.m_TotalSize.fetch_sub(It->second.Size, std::memory_order_relaxed); + } + PrunedReferences.push_back(ChunkHash); + } + } + + PruneCount = PrunedReferences.size(); + Ctx.PrunedReferences.fetch_add(PruneCount); + return new FileCasStoreCompactor(m_FileCasStrategy, std::move(PrunedReferences)); + } + +private: + FileCasStrategy& m_FileCasStrategy; + std::vector<IoHash> m_Cids; +}; + +GcReferencePruner* +FileCasStrategy::CreateReferencePruner(GcCtx& Ctx) +{ + // TODO + std::size_t TotalCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_DEBUG("gc file store '{}': found {} cid keys to check in {}", + m_RootDirectory, + TotalCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + std::vector<IoHash> CidsToCheck; + { + RwLock::SharedLockScope __(m_Lock); + CidsToCheck.reserve(m_Index.size()); + for (const auto& It : m_Index) + { + CidsToCheck.push_back(It.first); + } + } + TotalCount = CidsToCheck.size(); + if (TotalCount == 0) + { + return {}; + } + Ctx.References.fetch_add(TotalCount); + return new FileCasReferencePruner(*this, std::move(CidsToCheck)); +} + +////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS diff --git a/src/zenstore/filecas.h b/src/zenstore/filecas.h index ea7ff8e8c..2e9a1d5dc 100644 --- a/src/zenstore/filecas.h +++ b/src/zenstore/filecas.h @@ -27,7 +27,7 @@ class BasicFile; /** CAS storage strategy using a file-per-chunk storage strategy */ -struct FileCasStrategy final : public GcStorage +struct FileCasStrategy final : public GcStorage, public GcReferenceStore { FileCasStrategy(GcManager& Gc); ~FileCasStrategy(); @@ -44,6 +44,8 @@ struct FileCasStrategy final : public GcStorage virtual void CollectGarbage(GcContext& GcCtx) override; virtual GcStorageSize StorageSize() const override; + virtual GcReferencePruner* CreateReferencePruner(GcCtx& Ctx) override; + private: void MakeIndexSnapshot(); uint64_t ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion); @@ -97,6 +99,9 @@ private: size_t Shard2len = 0; ExtendablePathBuilder<128> ShardedPath; }; + + friend class FileCasReferencePruner; + friend class FileCasStoreCompactor; }; void filecas_forcelink(); diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index 9743eabf0..e09f46063 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -327,6 +327,280 @@ GcManager::~GcManager() { } +//////// Begin New GC WIP + +void +GcManager::AddGcReferencer(GcReferencer& Referencer) +{ + RwLock::ExclusiveLockScope _(m_Lock); + m_GcReferencers.push_back(&Referencer); +} +void +GcManager::RemoveGcReferencer(GcReferencer& Referencer) +{ + RwLock::ExclusiveLockScope _(m_Lock); + std::erase_if(m_GcReferencers, [&](GcReferencer* $) { return $ == &Referencer; }); +} + +void +GcManager::AddGcReferenceStore(GcReferenceStore& ReferenceStore) +{ + RwLock::ExclusiveLockScope _(m_Lock); + m_GcReferenceStores.push_back(&ReferenceStore); +} +void +GcManager::RemoveGcReferenceStore(GcReferenceStore& ReferenceStore) +{ + RwLock::ExclusiveLockScope _(m_Lock); + std::erase_if(m_GcReferenceStores, [&](GcReferenceStore* $) { return $ == &ReferenceStore; }); +} + +GcResult +GcManager::CollectGarbage(const GcSettings& Settings) +{ + GcCtx Ctx{.Settings = Settings}; + + Stopwatch TotalTimer; + auto __ = MakeGuard([&]() { + ZEN_INFO( + "GC: Removed {} items out of {}, deleted {} out of {}. Pruned {} Cid entries out of {}, compacted {} Cid entries out of {}, " + "freed " + "{} on disk and {} of memory in {}", + Ctx.ExpiredItems.load(), + Ctx.Items.load(), + Ctx.DeletedItems.load(), + Ctx.ExpiredItems.load(), + Ctx.PrunedReferences.load(), + Ctx.References.load(), + Ctx.CompactedReferences.load(), + Ctx.PrunedReferences.load(), + NiceBytes(Ctx.RemovedDiskSpace.load()), + NiceBytes(Ctx.RemovedMemory.load()), + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs())); + }); + + RwLock::SharedLockScope GcLock(m_Lock); + + static const bool SingleThread = +#if ZEN_BUILD_DEBUG + true +#else + false +#endif + ; + WorkerThreadPool ThreadPool(SingleThread ? 0 : 8); + + if (!m_GcReferencers.empty()) + { + Latch WorkLeft(1); + // First remove any cache keys that may own references + Stopwatch Timer; + auto _ = MakeGuard([&]() { ZEN_INFO("GC: Removed expired data in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())) }); + for (GcReferencer* Owner : m_GcReferencers) + { + WorkLeft.AddCount(1); + ThreadPool.ScheduleWork([&Ctx, Owner, &WorkLeft]() { + auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); + Owner->RemoveExpiredData(Ctx); + }); + } + WorkLeft.CountDown(); + WorkLeft.Wait(); + } + + if (Ctx.Settings.SkipCidDelete) + { + return GcResult{.Items = Ctx.Items.load(), + .ExpiredItems = Ctx.ExpiredItems.load(), + .DeletedItems = Ctx.DeletedItems.load(), + .References = Ctx.References.load(), + .PrunedReferences = Ctx.PrunedReferences.load(), + .CompactedReferences = Ctx.CompactedReferences.load(), + .RemovedDiskSpace = Ctx.RemovedDiskSpace.load(), + .RemovedMemory = Ctx.RemovedMemory.load()}; + } + + std::vector<std::unique_ptr<GcReferencePruner>> ReferencePruners; + if (!m_GcReferenceStores.empty()) + { + ReferencePruners.reserve(m_GcReferenceStores.size()); + Latch WorkLeft(1); + RwLock ReferencePrunersLock; + // Easy to go wide, CreateReferencePruner is usually not very heavy but big data sets change that + Stopwatch Timer; + auto _ = MakeGuard([&]() { ZEN_INFO("GC: Created Cid pruners in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())) }); + for (GcReferenceStore* CidStore : m_GcReferenceStores) + { + WorkLeft.AddCount(1); + ThreadPool.ScheduleWork([&Ctx, CidStore, &WorkLeft, &ReferencePrunersLock, &ReferencePruners]() { + auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); + // The CidStore will pick a list of CId entries to check, returning a collector + std::unique_ptr<GcReferencePruner> ReferencePruner(CidStore->CreateReferencePruner(Ctx)); + if (ReferencePruner) + { + RwLock::ExclusiveLockScope __(ReferencePrunersLock); + ReferencePruners.emplace_back(std::move(ReferencePruner)); + } + }); + } + WorkLeft.CountDown(); + WorkLeft.Wait(); + } + + std::vector<std::unique_ptr<GcReferenceChecker>> ReferenceCheckers; + if (!m_GcReferencers.empty()) + { + ReferenceCheckers.reserve(m_GcReferencers.size()); + Latch WorkLeft(1); + RwLock ReferenceCheckersLock; + Stopwatch Timer; + auto _ = MakeGuard([&]() { ZEN_INFO("GC: Created Cid checkers in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())) }); + // Easy to go wide, CreateReferenceCheckers is potentially heavy + // Lock all reference owners from changing the reference data and get access to check for referenced data + for (GcReferencer* Referencer : m_GcReferencers) + { + WorkLeft.AddCount(1); + ThreadPool.ScheduleWork([&Ctx, &WorkLeft, Referencer, &ReferenceCheckersLock, &ReferenceCheckers]() { + auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); + // The Referencer will create a reference checker that guarrantees that the references do not change as long as it lives + std::vector<GcReferenceChecker*> Checkers = Referencer->CreateReferenceCheckers(Ctx); + try + { + if (!Checkers.empty()) + { + RwLock::ExclusiveLockScope __(ReferenceCheckersLock); + for (auto& Checker : Checkers) + { + ReferenceCheckers.emplace_back(std::unique_ptr<GcReferenceChecker>(Checker)); + Checker = nullptr; + } + } + } + catch (std::exception&) + { + while (!Checkers.empty()) + { + delete Checkers.back(); + Checkers.pop_back(); + } + throw; + } + }); + } + WorkLeft.CountDown(); + WorkLeft.Wait(); + } + + Stopwatch LockStateTimer; + if (!ReferenceCheckers.empty()) + { + // Easy to go wide, locking all references checkers so we hafve a stead state of which references are used + // From this point we have block all writes to all References (DiskBucket/ProjectStore) until we do delete the ReferenceCheckers + Latch WorkLeft(1); + + Stopwatch Timer; + auto _ = MakeGuard([&]() { ZEN_INFO("GC: Locked Cid checkers in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())) }); + for (std::unique_ptr<GcReferenceChecker>& ReferenceChecker : ReferenceCheckers) + { + GcReferenceChecker* Checker = ReferenceChecker.get(); + WorkLeft.AddCount(1); + ThreadPool.ScheduleWork([&Ctx, Checker, &WorkLeft, &ReferenceCheckers]() { + auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); + Checker->LockState(Ctx); + }); + } + WorkLeft.CountDown(); + WorkLeft.Wait(); + } + + std::vector<std::unique_ptr<GcReferenceStoreCompactor>> ReferenceStoreCompactors; + ReferenceStoreCompactors.reserve(ReferencePruners.size()); + if (!ReferencePruners.empty()) + { + const auto GetUnusedReferences = [&ReferenceCheckers, &Ctx](std::span<IoHash> References) -> std::vector<IoHash> { + HashSet UnusedCids(References.begin(), References.end()); + for (const std::unique_ptr<GcReferenceChecker>& ReferenceChecker : ReferenceCheckers) + { + ReferenceChecker->RemoveUsedReferencesFromSet(Ctx, UnusedCids); + if (UnusedCids.empty()) + { + return {}; + } + } + return std::vector<IoHash>(UnusedCids.begin(), UnusedCids.end()); + }; + + // Easy to go wide, checking all Cids agains references in cache + // Ask stores to remove data that the ReferenceCheckers says are not references - this should be a lightweight operation that + // only updates in-memory index, actual disk changes should be done by the ReferenceStoreCompactors + + Latch WorkLeft(1); + RwLock ReferenceStoreCompactorsLock; + + Stopwatch Timer; + auto _ = MakeGuard([&]() { ZEN_INFO("GC: Pruned unreferenced Cid data in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())) }); + for (std::unique_ptr<GcReferencePruner>& ReferencePruner : ReferencePruners) + { + GcReferencePruner* Pruner = ReferencePruner.get(); + WorkLeft.AddCount(1); + ThreadPool.ScheduleWork( + [&Ctx, Pruner, &WorkLeft, &GetUnusedReferences, &ReferenceStoreCompactorsLock, &ReferenceStoreCompactors]() { + auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); + // Go through all the ReferenceCheckers to see if the list of Cids the collector selected are referenced or not. + std::unique_ptr<GcReferenceStoreCompactor> ReferenceCompactor(Pruner->RemoveUnreferencedData(Ctx, GetUnusedReferences)); + if (ReferenceCompactor) + { + RwLock::ExclusiveLockScope __(ReferenceStoreCompactorsLock); + ReferenceStoreCompactors.emplace_back(std::move(ReferenceCompactor)); + } + }); + } + WorkLeft.CountDown(); + WorkLeft.Wait(); + } + // Let the GcReferencers add new data, we will only change on-disk data at this point, adding new data is allowed + ReferenceCheckers.clear(); + ZEN_INFO("GC: Writes blocked for {}", NiceTimeSpanMs(LockStateTimer.GetElapsedTimeMs())) + + // Let go of the pruners + ReferencePruners.clear(); + + if (!ReferenceStoreCompactors.empty()) + { + Latch WorkLeft(1); + + // Easy to go wide + // Remove the stuff we deemed unreferenced from disk - may be heavy operation + Stopwatch Timer; + auto _ = MakeGuard([&]() { ZEN_INFO("GC: Compacted Cid stores in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())) }); + for (std::unique_ptr<GcReferenceStoreCompactor>& StoreCompactor : ReferenceStoreCompactors) + { + GcReferenceStoreCompactor* Compactor = StoreCompactor.get(); + WorkLeft.AddCount(1); + ThreadPool.ScheduleWork([&Ctx, Compactor, &WorkLeft]() { + auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); + // Go through all the ReferenceCheckers to see if the list of Cids the collector selected are referenced or not. + Compactor->CompactReferenceStore(Ctx); + }); + } + WorkLeft.CountDown(); + WorkLeft.Wait(); + } + + ReferenceStoreCompactors.clear(); + + return GcResult{.Items = Ctx.Items.load(), + .ExpiredItems = Ctx.ExpiredItems.load(), + .DeletedItems = Ctx.DeletedItems.load(), + .References = Ctx.References.load(), + .PrunedReferences = Ctx.PrunedReferences.load(), + .CompactedReferences = Ctx.CompactedReferences.load(), + .RemovedDiskSpace = Ctx.RemovedDiskSpace.load(), + .RemovedMemory = Ctx.RemovedMemory.load()}; +} + +//////// End New GC WIP + void GcManager::AddGcContributor(GcContributor* Contributor) { @@ -645,23 +919,19 @@ GcScheduler::Shutdown() bool GcScheduler::TriggerGc(const GcScheduler::TriggerGcParams& Params) { - if (m_Config.Enabled) + std::unique_lock Lock(m_GcMutex); + if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status) { - std::unique_lock Lock(m_GcMutex); - if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status) - { - m_TriggerGcParams = Params; - uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle); + m_TriggerGcParams = Params; + uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle); - if (m_Status.compare_exchange_strong(/* expected */ IdleState, - /* desired */ static_cast<uint32_t>(GcSchedulerStatus::kRunning))) - { - m_GcSignal.notify_one(); - return true; - } + if (m_Status.compare_exchange_strong(/* expected */ IdleState, + /* desired */ static_cast<uint32_t>(GcSchedulerStatus::kRunning))) + { + m_GcSignal.notify_one(); + return true; } } - return false; } @@ -806,7 +1076,7 @@ GcScheduler::SchedulerThread() break; } - if (!m_Config.Enabled && !m_TriggerScrubParams) + if (!m_Config.Enabled && !m_TriggerScrubParams && !m_TriggerGcParams) { WaitTime = std::chrono::seconds::max(); continue; @@ -830,6 +1100,7 @@ GcScheduler::SchedulerThread() std::chrono::seconds MaxProjectStoreDuration = m_Config.MaxProjectStoreDuration; uint64_t DiskSizeSoftLimit = m_Config.DiskSizeSoftLimit; bool SkipCid = false; + GcVersion UseGCVersion = m_Config.UseGCVersion; bool DiskSpaceGCTriggered = false; bool TimeBasedGCTriggered = false; @@ -863,6 +1134,8 @@ GcScheduler::SchedulerThread() { DoDelete = false; } + UseGCVersion = TriggerParams.ForceGCVersion.value_or(UseGCVersion); + DoGc = true; } if (m_TriggerScrubParams) @@ -1067,7 +1340,7 @@ GcScheduler::SchedulerThread() } } - CollectGarbage(CacheExpireTime, ProjectStoreExpireTime, DoDelete, CollectSmallObjects, SkipCid); + CollectGarbage(CacheExpireTime, ProjectStoreExpireTime, DoDelete, CollectSmallObjects, SkipCid, UseGCVersion); uint32_t RunningState = static_cast<uint32_t>(GcSchedulerStatus::kRunning); if (!m_Status.compare_exchange_strong(RunningState, static_cast<uint32_t>(GcSchedulerStatus::kIdle))) @@ -1148,7 +1421,8 @@ GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime, const GcClock::TimePoint& ProjectStoreExpireTime, bool Delete, bool CollectSmallObjects, - bool SkipCid) + bool SkipCid, + GcVersion UseGCVersion) { ZEN_TRACE_CPU("GcScheduler::CollectGarbage"); @@ -1195,10 +1469,26 @@ GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime, Stopwatch Timer; const auto __ = MakeGuard([&] { ZEN_INFO("garbage collection DONE in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - GcStorageSize Diff = m_GcManager.CollectGarbage(GcCtx); + GcStorageSize Diff; + switch (UseGCVersion) + { + case GcVersion::kV1: + Diff = m_GcManager.CollectGarbage(GcCtx); + break; + case GcVersion::kV2: + { + GcResult Result = m_GcManager.CollectGarbage({.CacheExpireTime = CacheExpireTime, + .ProjectStoreExpireTime = ProjectStoreExpireTime, + .CollectSmallObjects = CollectSmallObjects, + .IsDeleteMode = Delete, + .SkipCidDelete = SkipCid}); + Diff.DiskSize = Result.RemovedDiskSpace; + Diff.MemorySize = Result.RemovedMemory; + } + break; + } std::chrono::milliseconds ElapsedMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()); - if (SkipCid) { m_LastLightweightGcTime = GcClock::Now(); diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h index 56906f570..cd475cd8b 100644 --- a/src/zenstore/include/zenstore/blockstore.h +++ b/src/zenstore/include/zenstore/blockstore.h @@ -108,6 +108,8 @@ private: BasicFile m_File; }; +class BlockStoreCompactState; + class BlockStore { public: @@ -124,6 +126,7 @@ public: typedef std::vector<size_t> ChunkIndexArray; typedef std::function<void(const MovedChunksArray& MovedChunks, const ChunkIndexArray& RemovedChunks)> ReclaimCallback; + typedef std::function<void(const MovedChunksArray& MovedChunks, uint64_t FreedDiskSpace)> CompactCallback; typedef std::function<uint64_t()> ClaimDiskReserveCallback; typedef std::function<void(size_t ChunkIndex, const void* Data, uint64_t Size)> IterateChunksSmallSizeCallback; typedef std::function<void(size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size)> IterateChunksLargeSizeCallback; @@ -156,6 +159,12 @@ public: const IterateChunksSmallSizeCallback& SmallSizeCallback, const IterateChunksLargeSizeCallback& LargeSizeCallback); + void CompactBlocks( + const BlockStoreCompactState& CompactState, + uint64_t PayloadAlignment, + const CompactCallback& ChangeCallback = [](const MovedChunksArray&, uint64_t) {}, + const ClaimDiskReserveCallback& DiskReserveCallback = []() { return 0; }); + static const char* GetBlockFileExtension(); static std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex); @@ -179,6 +188,55 @@ private: std::atomic_uint64_t m_TotalSize{}; }; +class BlockStoreCompactState +{ +public: + BlockStoreCompactState() = default; + + void AddBlock(uint32_t BlockIndex) + { + auto It = m_BlockIndexToChunkMapIndex.find(BlockIndex); + if (It == m_BlockIndexToChunkMapIndex.end()) + { + m_KeepChunks.emplace_back(std::vector<size_t>()); + m_BlockIndexToChunkMapIndex.insert_or_assign(BlockIndex, m_KeepChunks.size() - 1); + } + } + + bool AddKeepLocation(const BlockStoreLocation& Location) + { + auto It = m_BlockIndexToChunkMapIndex.find(Location.BlockIndex); + if (It == m_BlockIndexToChunkMapIndex.end()) + { + return false; + } + + std::vector<size_t>& KeepChunks = m_KeepChunks[It->second]; + size_t Index = m_ChunkLocations.size(); + KeepChunks.push_back(Index); + m_ChunkLocations.push_back(Location); + return true; + }; + + const BlockStoreLocation& GetLocation(size_t Index) const { return m_ChunkLocations[Index]; } + + void IterateBlocks(std::function<void(uint32_t BlockIndex, + const std::vector<size_t>& KeepChunkIndexes, + const std::vector<BlockStoreLocation>& ChunkLocations)> Callback) const + { + for (auto It : m_BlockIndexToChunkMapIndex) + { + size_t ChunkMapIndex = It.second; + Callback(It.first, m_KeepChunks[ChunkMapIndex], m_ChunkLocations); + } + } + +private: + std::unordered_map<uint32_t, size_t> m_BlockIndexToChunkMapIndex; // Maps to which vector in BlockKeepChunks to use for a block + std::vector<std::vector<size_t>> m_KeepChunks; // One vector per block index with index into ChunkLocations + std::vector<BlockStoreLocation> m_ChunkLocations; +}; + void blockstore_forcelink(); } // namespace zen diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h index 42605804e..fa7dce331 100644 --- a/src/zenstore/include/zenstore/gc.h +++ b/src/zenstore/include/zenstore/gc.h @@ -20,6 +20,10 @@ ZEN_THIRD_PARTY_INCLUDES_END #include <span> #include <thread> +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_set.h> +ZEN_THIRD_PARTY_INCLUDES_END + namespace spdlog { class logger; } @@ -48,6 +52,151 @@ public: static TimePoint TimePointFromTick(const Tick TickCount) { return TimePoint{Duration{TickCount}}; } }; +//////// Begin New GC WIP + +struct GcSettings +{ + GcClock::TimePoint CacheExpireTime = GcClock::Now(); + GcClock::TimePoint ProjectStoreExpireTime = GcClock::Now(); + bool CollectSmallObjects = false; + bool IsDeleteMode = false; + bool SkipCidDelete = false; +}; + +struct GcResult +{ + uint64_t Items = 0; + uint64_t ExpiredItems = 0; + uint64_t DeletedItems = 0; + uint64_t References = 0; + uint64_t PrunedReferences = 0; + uint64_t CompactedReferences = 0; + uint64_t RemovedDiskSpace = 0; + uint64_t RemovedMemory = 0; +}; + +struct GcCtx +{ + const GcSettings Settings; + std::atomic_uint64_t Items = 0; + std::atomic_uint64_t ExpiredItems = 0; + std::atomic_uint64_t DeletedItems = 0; + std::atomic_uint64_t References = 0; + std::atomic_uint64_t PrunedReferences = 0; + std::atomic_uint64_t CompactedReferences = 0; + std::atomic_uint64_t RemovedDiskSpace = 0; + std::atomic_uint64_t RemovedMemory = 0; +}; + +typedef tsl::robin_set<IoHash> HashSet; + +/** + * @brief An interface to remove the stored data on disk after a GcReferencePruner::RemoveUnreferencedData + * + * CompactReferenceStore is called after pruning (GcReferencePruner::RemoveUnreferencedData) and state locking is + * complete so implementor must take care to only remove data that has not been altered since the prune operation. + * + * Instance will be deleted after CompactReferenceStore has completed execution. + * + * The subclass constructor should be provided with information on what is intended to be removed. + */ +class GcReferenceStoreCompactor +{ +public: + virtual ~GcReferenceStoreCompactor() = default; + + // Remove data on disk based on results from GcReferencePruner::RemoveUnreferencedData + virtual void CompactReferenceStore(GcCtx& Ctx) = 0; +}; + +/** + * @brief An interface to check if a set of Cids are referenced + * + * Instance will be deleted after RemoveUsedReferencesFromSet has been called 0-n times. + * + * During construction of the GcReferenceChecker the world is not stopped and this is a good + * place to do caching to be able to execute LockState and RemoveUsedReferencesFromSet quickly. + */ +class GcReferenceChecker +{ +public: + // Destructor should unlock what was locked in LockState + virtual ~GcReferenceChecker() = default; + + // Lock the state and make sure no references changes, usually a read-lock is taken until the destruction + // of the instance. Called once before any calls to RemoveUsedReferencesFromSet + // The implementation should be as fast as possible as LockState is part of a stop the world (from changes) + // until all instances of GcReferenceChecker are deleted + virtual void LockState(GcCtx& Ctx) = 0; + + // Go through IoCids and see which ones are referenced. If it is the reference must be removed from IoCids + // This function should use pre-cached information on what is referenced as we are in stop the world mode + virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) = 0; +}; + +/** + * @brief Interface to handle GC of data that references Cid data + * + * TODO: Maybe we should split up being a referencer and something that holds cache values? + * + * GcCacheStore and GcReferencer? + * + * This interface is registered/unregistered to GcManager vua AddGcReferencer() and RemoveGcReferencer() + */ +class GcReferencer +{ +protected: + virtual ~GcReferencer() = default; + +public: + // Remove expired data based on either GcCtx::Settings CacheExpireTime/ProjectExpireTime + // TODO: For disk layer we need to first update it with access times from the memory layer + // The implementer of GcReferencer (in our case a disk bucket) does not know about any + // potential memory cache layer :( + virtual void RemoveExpiredData(GcCtx& Ctx) = 0; + + // Create 0-n GcReferenceChecker for this GcReferencer. Caller will manage lifetime of + // returned instances + virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) = 0; +}; + +/** + * @brief Interface to prune - remove pointers to data but not the bulk data on disk - references from a GcReferenceStore + */ +class GcReferencePruner +{ +public: + virtual ~GcReferencePruner() = default; + + typedef std::function<std::vector<IoHash>(std::span<IoHash> References)> GetUnusedReferencesFunc; + + // Check a set of references to see if they are in use. + // Use the GetUnusedReferences input function to check if references are used and update any pointers + // so any query for references determined to be unreferences will not be found. + // If any references a found to be unused, return a GcReferenceStoreCompactor instance which will + // clean up any stored bulk data mapping to the pruned references. + // Caller will manage lifetime of returned instance + // This function should execute as fast as possible, so try to prepare a list of references to check ahead of + // call to this function and make sure the removal of unreferences items is as lightweight as possible. + virtual GcReferenceStoreCompactor* RemoveUnreferencedData(GcCtx& Ctx, const GetUnusedReferencesFunc& GetUnusedReferences) = 0; +}; + +/** + * @brief A interface to prune referenced (Cid) data from a store + */ +class GcReferenceStore +{ +protected: + virtual ~GcReferenceStore() = default; + +public: + // Create a GcReferencePruner which can check a set of references (decided by implementor) if they are no longer in use + // Caller will manage lifetime of returned instance + virtual GcReferencePruner* CreateReferencePruner(GcCtx& Ctx) = 0; +}; + +//////// End New GC WIP + /** Garbage Collection context object */ class GcContext @@ -141,6 +290,18 @@ public: GcManager(); ~GcManager(); + //////// Begin New GC WIP + + void AddGcReferencer(GcReferencer& Referencer); + void RemoveGcReferencer(GcReferencer& Referencer); + + void AddGcReferenceStore(GcReferenceStore& ReferenceStore); + void RemoveGcReferenceStore(GcReferenceStore& ReferenceStore); + + GcResult CollectGarbage(const GcSettings& Settings); + + //////// End New GC WIP + void AddGcContributor(GcContributor* Contributor); void RemoveGcContributor(GcContributor* Contributor); @@ -163,6 +324,9 @@ private: std::vector<GcStorage*> m_GcStorage; CidStore* m_CidStore = nullptr; const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; + + std::vector<GcReferencer*> m_GcReferencers; + std::vector<GcReferenceStore*> m_GcReferenceStores; }; enum class GcSchedulerStatus : uint32_t @@ -172,6 +336,12 @@ enum class GcSchedulerStatus : uint32_t kStopped }; +enum class GcVersion : uint32_t +{ + kV1, + kV2 +}; + struct GcSchedulerConfig { std::filesystem::path RootDirectory; @@ -185,6 +355,7 @@ struct GcSchedulerConfig uint64_t DiskSizeSoftLimit = 0; uint64_t MinimumFreeDiskSpaceToAllowWrites = 1ul << 28; std::chrono::seconds LightweightInterval{}; + GcVersion UseGCVersion = GcVersion::kV1; }; struct GcSchedulerState @@ -246,12 +417,13 @@ public: struct TriggerGcParams { - bool CollectSmallObjects = false; - std::chrono::seconds MaxCacheDuration = std::chrono::seconds::max(); - std::chrono::seconds MaxProjectStoreDuration = std::chrono::seconds::max(); - uint64_t DiskSizeSoftLimit = 0; - bool SkipCid = false; - bool SkipDelete = false; + bool CollectSmallObjects = false; + std::chrono::seconds MaxCacheDuration = std::chrono::seconds::max(); + std::chrono::seconds MaxProjectStoreDuration = std::chrono::seconds::max(); + uint64_t DiskSizeSoftLimit = 0; + bool SkipCid = false; + bool SkipDelete = false; + std::optional<GcVersion> ForceGCVersion; }; bool TriggerGc(const TriggerGcParams& Params); @@ -270,7 +442,8 @@ private: const GcClock::TimePoint& ProjectStoreExpireTime, bool Delete, bool CollectSmallObjects, - bool SkipCid); + bool SkipCid, + GcVersion UseGCVersion); void ScrubStorage(bool DoDelete, std::chrono::seconds TimeSlice); spdlog::logger& Log() { return m_Log; } virtual bool AreDiskWritesAllowed() const override { return !m_AreDiskWritesBlocked.load(); } |