diff options
Diffstat (limited to 'zenserver/cache')
| -rw-r--r-- | zenserver/cache/cachetracking.cpp | 375 | ||||
| -rw-r--r-- | zenserver/cache/cachetracking.h | 36 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 38 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 4 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 1151 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.h | 188 |
6 files changed, 1470 insertions, 322 deletions
diff --git a/zenserver/cache/cachetracking.cpp b/zenserver/cache/cachetracking.cpp new file mode 100644 index 000000000..d1c99a597 --- /dev/null +++ b/zenserver/cache/cachetracking.cpp @@ -0,0 +1,375 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "cachetracking.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalue.h> +#include <zencore/endian.h> +#include <zencore/filesystem.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/string.h> + +#include <zencore/testing.h> +#include <zencore/testutils.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#pragma comment(lib, "Rpcrt4.lib") // RocksDB made me do this +#include <fmt/format.h> +#include <rocksdb/db.h> +#include <tsl/robin_map.h> +#include <tsl/robin_set.h> +#include <gsl/gsl-lite.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +using namespace fmt::literals; + +namespace rocksdb = ROCKSDB_NAMESPACE; + +static constinit auto Epoch = std::chrono::time_point<std::chrono::system_clock>{}; + +static uint64_t +GetCurrentCacheTimeStamp() +{ + auto Duration = std::chrono::system_clock::now() - Epoch; + uint64_t Millis = std::chrono::duration_cast<std::chrono::milliseconds>(Duration).count(); + + return Millis; +} + +struct CacheAccessSnapshot +{ +public: + void TrackAccess(std::string_view BucketSegment, const IoHash& HashKey) + { + BucketTracker* Tracker = GetBucket(std::string(BucketSegment)); + + Tracker->Track(HashKey); + } + + bool SerializeSnapshot(CbObjectWriter& Cbo) + { + bool Serialized = false; + RwLock::ExclusiveLockScope _(m_Lock); + + for (const auto& Kv : m_BucketMapping) + { + if (m_Buckets[Kv.second]->Size()) + { + Cbo.BeginArray(Kv.first); + m_Buckets[Kv.second]->SerializeSnapshotAndClear(Cbo); + Cbo.EndArray(); + Serialized = true; + } + } + + return Serialized; + } + +private: + struct BucketTracker + { + mutable RwLock Lock; + tsl::robin_set<IoHash> AccessedKeys; + + void Track(const IoHash& HashKey) + { + if (RwLock::SharedLockScope _(Lock); AccessedKeys.contains(HashKey)) + { + return; + } + + RwLock::ExclusiveLockScope _(Lock); + + AccessedKeys.insert(HashKey); + } + + void SerializeSnapshotAndClear(CbObjectWriter& Cbo) + { + RwLock::ExclusiveLockScope _(Lock); + + for (const IoHash& Hash : AccessedKeys) + { + Cbo.AddHash(Hash); + } + + AccessedKeys.clear(); + } + + size_t Size() const + { + RwLock::SharedLockScope _(Lock); + return AccessedKeys.size(); + } + }; + + BucketTracker* GetBucket(const std::string& BucketName) + { + RwLock::SharedLockScope _(m_Lock); + + if (auto It = m_BucketMapping.find(BucketName); It == m_BucketMapping.end()) + { + _.ReleaseNow(); + + return AddNewBucket(BucketName); + } + else + { + return m_Buckets[It->second].get(); + } + } + + BucketTracker* AddNewBucket(const std::string& BucketName) + { + RwLock::ExclusiveLockScope _(m_Lock); + + if (auto It = m_BucketMapping.find(BucketName); It == m_BucketMapping.end()) + { + const uint32_t BucketIndex = gsl::narrow<uint32_t>(m_Buckets.size()); + m_Buckets.emplace_back(std::make_unique<BucketTracker>()); + m_BucketMapping[BucketName] = BucketIndex; + + return m_Buckets[BucketIndex].get(); + } + else + { + return m_Buckets[It->second].get(); + } + } + + RwLock m_Lock; + std::vector<std::unique_ptr<BucketTracker>> m_Buckets; + tsl::robin_map<std::string, uint32_t> m_BucketMapping; +}; + +struct ZenCacheTracker::Impl +{ + Impl(std::filesystem::path StateDirectory) + { + std::filesystem::path StatsDbPath{StateDirectory / ".zdb"}; + + std::string RocksdbPath = ToUtf8(StatsDbPath); + + ZEN_DEBUG("opening tracker db at '{}'", RocksdbPath); + + rocksdb::DB* Db = nullptr; + rocksdb::DBOptions Options; + Options.create_if_missing = true; + + std::vector<std::string> ExistingColumnFamilies; + rocksdb::Status Status = rocksdb::DB::ListColumnFamilies(Options, RocksdbPath, &ExistingColumnFamilies); + + std::vector<rocksdb::ColumnFamilyDescriptor> ColumnDescriptors; + + if (Status.IsPathNotFound()) + { + ColumnDescriptors.emplace_back(rocksdb::ColumnFamilyDescriptor{rocksdb::kDefaultColumnFamilyName, {}}); + } + else if (Status.ok()) + { + for (const std::string& Column : ExistingColumnFamilies) + { + rocksdb::ColumnFamilyDescriptor ColumnFamily; + ColumnFamily.name = Column; + ColumnDescriptors.push_back(ColumnFamily); + } + } + else + { + throw std::runtime_error("column family iteration failed for '{}': '{}'"_format(RocksdbPath, Status.getState()).c_str()); + } + + Status = rocksdb::DB::Open(Options, RocksdbPath, ColumnDescriptors, &m_RocksDbColumnHandles, &Db); + + if (!Status.ok()) + { + throw std::runtime_error("database open failed for '{}': '{}'"_format(RocksdbPath, Status.getState()).c_str()); + } + + m_RocksDb.reset(Db); + } + + ~Impl() + { + for (auto* Column : m_RocksDbColumnHandles) + { + delete Column; + } + + m_RocksDbColumnHandles.clear(); + } + + struct KeyStruct + { + uint64_t TimestampLittleEndian; + }; + + void TrackAccess(std::string_view BucketSegment, const IoHash& HashKey) { m_CurrentSnapshot.TrackAccess(BucketSegment, HashKey); } + + void SaveSnapshot() + { + CbObjectWriter Cbo; + + if (m_CurrentSnapshot.SerializeSnapshot(Cbo)) + { + IoBuffer SnapshotBuffer = Cbo.Save().GetBuffer().AsIoBuffer(); + + const KeyStruct Key{.TimestampLittleEndian = ToNetworkOrder(GetCurrentCacheTimeStamp())}; + rocksdb::Slice KeySlice{(const char*)&Key, sizeof Key}; + rocksdb::Slice ValueSlice{(char*)SnapshotBuffer.Data(), SnapshotBuffer.Size()}; + + rocksdb::WriteOptions Wo; + m_RocksDb->Put(Wo, KeySlice, ValueSlice); + } + } + + void IterateSnapshots(std::function<void(uint64_t TimeStamp, CbObject Snapshot)>&& Callback) + { + rocksdb::ManagedSnapshot Snap(m_RocksDb.get()); + + rocksdb::ReadOptions Ro; + Ro.snapshot = Snap.snapshot(); + + std::unique_ptr<rocksdb::Iterator> It{m_RocksDb->NewIterator(Ro)}; + + const KeyStruct ZeroKey{.TimestampLittleEndian = 0}; + rocksdb::Slice ZeroKeySlice{(const char*)&ZeroKey, sizeof ZeroKey}; + + It->Seek(ZeroKeySlice); + + while (It->Valid()) + { + rocksdb::Slice KeySlice = It->key(); + rocksdb::Slice ValueSlice = It->value(); + + if (KeySlice.size() == sizeof(KeyStruct)) + { + IoBuffer ValueBuffer(IoBuffer::Wrap, ValueSlice.data(), ValueSlice.size()); + + CbObject Value = LoadCompactBinaryObject(ValueBuffer); + + uint64_t Key = FromNetworkOrder(*reinterpret_cast<const uint64_t*>(KeySlice.data())); + + Callback(Key, Value); + } + + It->Next(); + } + } + + std::unique_ptr<rocksdb::DB> m_RocksDb; + std::vector<rocksdb::ColumnFamilyHandle*> m_RocksDbColumnHandles; + CacheAccessSnapshot m_CurrentSnapshot; +}; + +ZenCacheTracker::ZenCacheTracker(std::filesystem::path StateDirectory) : m_Impl(new Impl(StateDirectory)) +{ +} + +ZenCacheTracker::~ZenCacheTracker() +{ + delete m_Impl; +} + +void +ZenCacheTracker::TrackAccess(std::string_view BucketSegment, const IoHash& HashKey) +{ + m_Impl->TrackAccess(BucketSegment, HashKey); +} + +void +ZenCacheTracker::SaveSnapshot() +{ + m_Impl->SaveSnapshot(); +} + +void +ZenCacheTracker::IterateSnapshots(std::function<void(uint64_t TimeStamp, CbObject Snapshot)>&& Callback) +{ + m_Impl->IterateSnapshots(std::move(Callback)); +} + +#if ZEN_WITH_TESTS + +TEST_CASE("z$.tracker") +{ + using namespace fmt::literals; + using namespace std::literals; + + const uint64_t t0 = GetCurrentCacheTimeStamp(); + + ScopedTemporaryDirectory TempDir; + + ZenCacheTracker Zcs(TempDir.Path()); + + tsl::robin_set<IoHash> KeyHashes; + + for (int i = 0; i < 10000; ++i) + { + IoHash KeyHash = IoHash::HashBuffer(&i, sizeof i); + + KeyHashes.insert(KeyHash); + + Zcs.TrackAccess("foo"sv, KeyHash); + } + + for (int i = 0; i < 10000; ++i) + { + IoHash KeyHash = IoHash::HashBuffer(&i, sizeof i); + + Zcs.TrackAccess("foo"sv, KeyHash); + } + + Zcs.SaveSnapshot(); + + for (int n = 0; n < 10; ++n) + { + for (int i = 0; i < 1000; ++i) + { + const int Index = i + n * 1000; + IoHash KeyHash = IoHash::HashBuffer(&Index, sizeof Index); + + Zcs.TrackAccess("foo"sv, KeyHash); + } + + Zcs.SaveSnapshot(); + } + + Zcs.SaveSnapshot(); + + const uint64_t t1 = GetCurrentCacheTimeStamp(); + + int SnapshotCount = 0; + + Zcs.IterateSnapshots([&](uint64_t TimeStamp, CbObject Snapshot) { + CHECK(TimeStamp >= t0); + CHECK(TimeStamp <= t1); + + for (auto& Field : Snapshot) + { + CHECK_EQ(Field.GetName(), "foo"sv); + + const CbArray& Array = Field.AsArray(); + + for (const auto& Element : Array) + { + CHECK(KeyHashes.contains(Element.GetValue().AsHash())); + } + } + + ++SnapshotCount; + }); + + CHECK_EQ(SnapshotCount, 11); +} + +#endif + +void +cachetracker_forcelink() +{ +} + +} // namespace zen diff --git a/zenserver/cache/cachetracking.h b/zenserver/cache/cachetracking.h new file mode 100644 index 000000000..06109ebb0 --- /dev/null +++ b/zenserver/cache/cachetracking.h @@ -0,0 +1,36 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/iohash.h> + +#include <stdint.h> +#include <filesystem> +#include <functional> + +namespace zen { + +class CbObject; + +/** + */ + +class ZenCacheTracker +{ +public: + ZenCacheTracker(std::filesystem::path StateDirectory); + ~ZenCacheTracker(); + + void TrackAccess(std::string_view BucketSegment, const IoHash& HashKey); + void SaveSnapshot(); + void IterateSnapshots(std::function<void(uint64_t TimeStamp, CbObject Snapshot)>&& Callback); + +private: + struct Impl; + + Impl* m_Impl = nullptr; +}; + +void cachetracker_forcelink(); + +} // namespace zen diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 726bd7cdb..e74030e07 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -1,7 +1,10 @@ // Copyright Epic Games, Inc. All Rights Reserved. +#include "structuredcache.h" + #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/compress.h> #include <zencore/fmtutils.h> @@ -15,15 +18,12 @@ //#include "cachekey.h" #include "monitoring/httpstats.h" -#include "structuredcache.h" #include "structuredcachestore.h" #include "upstream/jupiter.h" #include "upstream/upstreamcache.h" #include "upstream/zen.h" #include "zenstore/cidstore.h" -#include <zencore/compactbinarypackage.h> - #include <algorithm> #include <atomic> #include <filesystem> @@ -59,7 +59,6 @@ struct AttachmentCount ////////////////////////////////////////////////////////////////////////// HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, - CasStore& InStore, CidStore& InCidStore, HttpStatsService& StatsService, HttpStatusService& StatusService, @@ -68,7 +67,6 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCac , m_CacheStore(InCacheStore) , m_StatsService(StatsService) , m_StatusService(StatusService) -, m_CasStore(InStore) , m_CidStore(InCidStore) , m_UpstreamCache(std::move(UpstreamCache)) { @@ -105,7 +103,6 @@ HttpStructuredCacheService::Scrub(ScrubContext& Ctx) m_LastScrubTime = Ctx.ScrubTimestamp(); - m_CasStore.Scrub(Ctx); m_CidStore.Scrub(Ctx); m_CacheStore.Scrub(Ctx); } @@ -618,12 +615,8 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques { if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) { - Payload = UpstreamResult.Value; - IoHash ChunkHash = IoHash::HashBuffer(Payload); - CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash); - InUpstreamCache = true; - - m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); + CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed); + InUpstreamCache = true; } else { @@ -691,9 +684,7 @@ HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Reques return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Payload ID does not match attachment hash"sv); } - CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash); - - m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); + CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed); ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}' ({})", Ref.BucketSegment, @@ -1201,7 +1192,15 @@ HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request) const uint64_t MissCount = m_CacheStats.MissCount; const uint64_t TotalCount = HitCount + MissCount; + const CasStoreSize CasSize = m_CidStore.CasSize(); + const GcStorageSize CacheSize = m_CacheStore.StorageSize(); + Cbo.BeginObject("cache"); + Cbo.BeginObject("size"); + Cbo << "disk" << CacheSize.DiskSize; + Cbo << "memory" << CacheSize.MemorySize; + Cbo.EndObject(); + Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); Cbo << "hits" << HitCount << "misses" << MissCount; Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0); Cbo << "upstream_hits" << m_CacheStats.UpstreamHitCount; @@ -1215,6 +1214,15 @@ HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request) Cbo.EndObject(); } + Cbo.BeginObject("cas"); + Cbo.BeginObject("size"); + Cbo << "tiny" << CasSize.TinySize; + Cbo << "small" << CasSize.SmallSize; + Cbo << "large" << CasSize.LargeSize; + Cbo << "total" << CasSize.TotalSize; + Cbo.EndObject(); + Cbo.EndObject(); + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 51073d05d..9ee7da99b 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -18,6 +18,8 @@ namespace zen { class CasStore; class CidStore; +class CbObjectView; +class ScrubContext; class UpstreamCache; class ZenCacheStore; enum class CachePolicy : uint32_t; @@ -54,7 +56,6 @@ class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider { public: HttpStructuredCacheService(ZenCacheStore& InCacheStore, - CasStore& InCasStore, CidStore& InCidStore, HttpStatsService& StatsService, HttpStatusService& StatusService, @@ -101,7 +102,6 @@ private: ZenCacheStore& m_CacheStore; HttpStatsService& m_StatsService; HttpStatusService& m_StatusService; - CasStore& m_CasStore; CidStore& m_CidStore; std::unique_ptr<UpstreamCache> m_UpstreamCache; uint64_t m_LastScrubTime = 0; diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 8b9ce8ff9..030588659 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -2,31 +2,31 @@ #include "structuredcachestore.h" -#include <zencore/except.h> -#include <zencore/windows.h> +#include "cachetracking.h" -#include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/compress.h> +#include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> -#include <zencore/iobuffer.h> #include <zencore/logging.h> +#include <zencore/scopeguard.h> #include <zencore/string.h> #include <zencore/testing.h> #include <zencore/testutils.h> #include <zencore/thread.h> -#include <zenstore/cas.h> +#include <zencore/timer.h> +#include <zencore/windows.h> +#include <zenstore/basicfile.h> #include <zenstore/caslog.h> #include <zenstore/cidstore.h> -#include <zenstore/gc.h> +#include <chrono> #include <concepts> -#include <filesystem> +#include <memory_resource> #include <ranges> -#include <unordered_map> ZEN_THIRD_PARTY_INCLUDES_START #include <fmt/core.h> @@ -38,13 +38,42 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { using namespace fmt::literals; +using PathBuilder = WideStringBuilder<256>; +namespace fs = std::filesystem; + +static CbObject +LoadCompactBinaryObject(const fs::path& Path) +{ + FileContents Result = ReadFile(Path); + + if (!Result.ErrorCode) + { + IoBuffer Buffer = Result.Flatten(); + if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) + { + return LoadCompactBinaryObject(Buffer); + } + } + + return CbObject(); +} + +static void +SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) +{ + WriteFile(Path, Object.GetBuffer().AsIoBuffer()); +} -ZenCacheStore::ZenCacheStore(const std::filesystem::path& RootDir) : m_DiskLayer{RootDir} +ZenCacheStore::ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir) : GcStorage(Gc), GcContributor(Gc), m_DiskLayer(RootDir) { ZEN_INFO("initializing structured cache at '{}'", RootDir); CreateDirectories(RootDir); m_DiskLayer.DiscoverBuckets(); + +#if ZEN_USE_CACHE_TRACKER + m_AccessTracker.reset(new ZenCacheTracker(RootDir)); +#endif } ZenCacheStore::~ZenCacheStore() @@ -56,21 +85,29 @@ ZenCacheStore::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheVal { bool Ok = m_MemLayer.Get(InBucket, HashKey, OutValue); +#if ZEN_USE_CACHE_TRACKER + auto _ = MakeGuard([&] { + if (!Ok) + return; + + m_AccessTracker->TrackAccess(InBucket, HashKey); + }); +#endif + if (Ok) { ZEN_ASSERT(OutValue.Value.Size()); + + return true; } - if (!Ok) - { - Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue); + Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue); - if (Ok) - { - ZEN_ASSERT(OutValue.Value.Size()); - } + if (Ok) + { + ZEN_ASSERT(OutValue.Value.Size()); - if (Ok && (OutValue.Value.Size() <= m_DiskLayerSizeThreshold)) + if (OutValue.Value.Size() <= m_DiskLayerSizeThreshold) { m_MemLayer.Put(InBucket, HashKey, OutValue); } @@ -88,6 +125,25 @@ ZenCacheStore::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCa m_DiskLayer.Put(InBucket, HashKey, Value); +#if ZEN_USE_REF_TRACKING + if (Value.Value.GetContentType() == ZenContentType::kCbObject) + { + if (ValidateCompactBinary(Value.Value, CbValidateMode::All) == CbValidateError::None) + { + CbObject Object{SharedBuffer(Value.Value)}; + + uint8_t TempBuffer[8 * sizeof(IoHash)]; + std::pmr::monotonic_buffer_resource Linear{TempBuffer, sizeof TempBuffer}; + std::pmr::polymorphic_allocator Allocator{&Linear}; + std::pmr::vector<IoHash> CidReferences{Allocator}; + + Object.IterateAttachments([&](CbFieldView Field) { CidReferences.push_back(Field.AsAttachment()); }); + + m_Gc.OnNewCidReferences(CidReferences); + } + } +#endif + if (Value.Value.Size() <= m_DiskLayerSizeThreshold) { m_MemLayer.Put(InBucket, HashKey, Value); @@ -131,10 +187,26 @@ ZenCacheStore::Scrub(ScrubContext& Ctx) } void -ZenCacheStore::GarbageCollect(GcContext& GcCtx) +ZenCacheStore::GatherReferences(GcContext& GcCtx) { - m_DiskLayer.GarbageCollect(GcCtx); - m_MemLayer.GarbageCollect(GcCtx); + access_tracking::AccessTimes AccessTimes; + m_MemLayer.GatherAccessTimes(AccessTimes); + + m_DiskLayer.UpdateAccessTimes(AccessTimes); + m_DiskLayer.GatherReferences(GcCtx); +} + +void +ZenCacheStore::CollectGarbage(GcContext& GcCtx) +{ + m_MemLayer.Reset(); + m_DiskLayer.CollectGarbage(GcCtx); +} + +GcStorageSize +ZenCacheStore::StorageSize() const +{ + return {.DiskSize = m_DiskLayer.TotalSize(), .MemorySize = m_MemLayer.TotalSize()}; } ////////////////////////////////////////////////////////////////////////// @@ -220,14 +292,38 @@ ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx) } void -ZenCacheMemoryLayer::GarbageCollect(GcContext& GcCtx) +ZenCacheMemoryLayer::GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes) +{ + using namespace zen::access_tracking; + + RwLock::SharedLockScope _(m_Lock); + + for (auto& Kv : m_Buckets) + { + std::vector<KeyAccessTime>& Bucket = AccessTimes.Buckets[Kv.first]; + Kv.second.GatherAccessTimes(Bucket); + } +} + +void +ZenCacheMemoryLayer::Reset() +{ + RwLock::ExclusiveLockScope _(m_Lock); + m_Buckets.clear(); +} + +uint64_t +ZenCacheMemoryLayer::TotalSize() const { + uint64_t TotalSize{}; RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { - Kv.second.GarbageCollect(GcCtx); + TotalSize += Kv.second.TotalSize(); } + + return TotalSize; } void @@ -252,33 +348,12 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) } void -ZenCacheMemoryLayer::CacheBucket::GarbageCollect(GcContext& GcCtx) +ZenCacheMemoryLayer::CacheBucket::GatherAccessTimes(std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes) { - // Is it even meaningful to do this? The memory layer shouldn't - // contain anything which is not already in the disk layer - RwLock::SharedLockScope _(m_bucketLock); - - std::vector<IoHash> BadHashes; - - for (const auto& Kv : m_cacheMap) - { - const IoBuffer& Payload = Kv.second.Payload; - - switch (Payload.GetContentType()) - { - case ZenContentType::kCbObject: - { - CbObject Obj(SharedBuffer{Payload}); - - Obj.IterateAttachments([&](CbFieldView Field) { GcCtx.ContributeCids(std::array<IoHash, 1>{Field.AsAttachment()}); }); - } - break; - - case ZenContentType::kBinary: - break; - } - } + std::transform(m_cacheMap.begin(), m_cacheMap.end(), std::back_inserter(AccessTimes), [](const auto& Kv) { + return access_tracking::KeyAccessTime{.Key = Kv.first, .LastAccess = Kv.second.LastAccess}; + }); } bool @@ -294,75 +369,140 @@ ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutV { BucketValue& Value = bucketIt.value(); OutValue.Value = Value.Payload; - Value.LastAccess = GetCurrentTimeStamp(); + Value.LastAccess = GcClock::TickCount(); return true; } } -uint64_t -ZenCacheMemoryLayer::CacheBucket::GetCurrentTimeStamp() -{ - return GetLofreqTimerValue(); -} - void ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { - RwLock::ExclusiveLockScope _(m_bucketLock); + { + RwLock::ExclusiveLockScope _(m_bucketLock); + m_cacheMap.insert_or_assign(HashKey, BucketValue{.LastAccess = GcClock::TickCount(), .Payload = Value.Value}); + } - m_cacheMap.insert_or_assign(HashKey, BucketValue{.LastAccess = GetCurrentTimeStamp(), .Payload = Value.Value}); + m_TotalSize.fetch_add(Value.Value.GetSize(), std::memory_order::relaxed); } ////////////////////////////////////////////////////////////////////////// -inline DiskLocation::DiskLocation() = default; +#pragma pack(push) +#pragma pack(1) -inline DiskLocation::DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags) -: OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags)) -, LowerSize(ValueSize & 0xFFFFffff) -, IndexDataSize(IndexSize) +struct DiskLocation { -} + inline DiskLocation() = default; -inline uint64_t -DiskLocation::CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) -{ - return Offset | Flags; -} + inline DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags) + : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags)) + , LowerSize(ValueSize & 0xFFFFffff) + , IndexDataSize(IndexSize) + { + } -inline uint64_t -DiskLocation::Offset() const -{ - return OffsetAndFlags & kOffsetMask; -} + static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull; + static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; + static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull; + static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; + static const uint64_t kStructured = 0x4000'0000'0000'0000ull; + static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; -inline uint64_t -DiskLocation::Size() const -{ - return LowerSize; -} + static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; } -inline uint64_t -DiskLocation::IsFlagSet(uint64_t Flag) const -{ - return OffsetAndFlags & Flag; -} + inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; } + inline uint64_t Size() const { return LowerSize; } + inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; } + inline ZenContentType GetContentType() const + { + ZenContentType ContentType = ZenContentType::kBinary; -inline ZenContentType -DiskLocation::GetContentType() const -{ - ZenContentType ContentType = ZenContentType::kBinary; + if (IsFlagSet(DiskLocation::kStructured)) + { + ContentType = ZenContentType::kCbObject; + } - if (IsFlagSet(DiskLocation::kStructured)) - { - ContentType = ZenContentType::kCbObject; + return ContentType; } - return ContentType; -} +private: + uint64_t OffsetAndFlags = 0; + uint32_t LowerSize = 0; + uint32_t IndexDataSize = 0; +}; -////////////////////////////////////////////////////////////////////////// +struct DiskIndexEntry +{ + IoHash Key; + DiskLocation Location; +}; + +#pragma pack(pop) + +static_assert(sizeof(DiskIndexEntry) == 36); + +struct ZenCacheDiskLayer::CacheBucket +{ + CacheBucket(); + ~CacheBucket(); + + void OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); + static bool Delete(std::filesystem::path BucketDir); + bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(const IoHash& HashKey, const ZenCacheValue& Value); + void Drop(); + void Flush(); + void SaveManifest(); + void Scrub(ScrubContext& Ctx); + void GatherReferences(GcContext& GcCtx); + void CollectGarbage(GcContext& GcCtx); + void UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes); + + inline bool IsOk() const { return m_IsOk; } + inline uint64_t TotalSize() const { return m_TotalSize.load(std::memory_order::relaxed); } + +private: + std::filesystem::path m_BucketDir; + Oid m_BucketId; + bool m_IsOk = false; + uint64_t m_LargeObjectThreshold = 64 * 1024; + + // These files are used to manage storage of small objects for this bucket + + BasicFile m_SobsFile; + TCasLogFile<DiskIndexEntry> m_SlogFile; + + struct IndexEntry + { + DiskLocation Location; + GcClock::Tick LastAccess{}; + }; + + using IndexMap = tsl::robin_map<IoHash, IndexEntry, IoHash::Hasher>; + + RwLock m_IndexLock; + IndexMap m_Index; + uint64_t m_SobsCursor = 0; + std::atomic_uint64_t m_TotalSize{}; + + void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey); + void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); + bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue); + void DeleteStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, const fs::path& Path, std::error_code& Ec); + bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue); + void OpenLog(const fs::path& BucketDir, const bool IsNew); + + // These locks are here to avoid contention on file creation, therefore it's sufficient + // that we take the same lock for the same hash + // + // These locks are small and should really be spaced out so they don't share cache lines, + // but we don't currently access them at particularly high frequency so it should not be + // an issue in practice + + RwLock m_ShardedLocks[256]; + inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardedLocks[Hash.Hash[19]]; } +}; ZenCacheDiskLayer::CacheBucket::CacheBucket() { @@ -388,86 +528,93 @@ ZenCacheDiskLayer::CacheBucket::Delete(std::filesystem::path BucketDir) void ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate) { - CreateDirectories(BucketDir); - - m_BucketDir = BucketDir; - - std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"}; - std::filesystem::path SobsPath{m_BucketDir / "zen.sobs"}; - std::filesystem::path SlogPath{m_BucketDir / "zen.slog"}; + using namespace std::literals; - BasicFile ManifestFile; + CreateDirectories(BucketDir); - // Try opening existing manifest file first + std::filesystem::path ManifestPath{BucketDir / "zen_manifest"}; bool IsNew = false; - std::error_code Ec; - ManifestFile.Open(ManifestPath, /* IsCreate */ false, Ec); + CbObject Manifest = LoadCompactBinaryObject(ManifestPath); - if (!Ec) + if (Manifest) { - uint64_t FileSize = ManifestFile.FileSize(); - - if (FileSize == sizeof(Oid)) - { - ManifestFile.Read(&m_BucketId, sizeof(Oid), 0); - - m_IsOk = true; - } - - if (!m_IsOk) - { - ManifestFile.Close(); - } + m_BucketId = Manifest["BucketId"].AsObjectId(); + m_IsOk = m_BucketId != Oid::Zero; } + else if (AllowCreate) + { + m_BucketId.Generate(); - if (!m_IsOk) + CbObjectWriter Writer; + Writer << "BucketId"sv << m_BucketId; + Manifest = Writer.Save(); + SaveCompactBinaryObject(ManifestPath, Manifest); + IsNew = true; + } + else { - if (AllowCreate == false) - { - // Invalid bucket - return; - } + return; + } - // No manifest file found, this is a new bucket + OpenLog(BucketDir, IsNew); - ManifestFile.Open(ManifestPath, /* IsCreate */ true, Ec); + for (CbFieldView Entry : Manifest["Timestamps"]) + { + const CbObjectView Obj = Entry.AsObjectView(); + const IoHash Key = Obj["Key"sv].AsHash(); - if (Ec) + if (auto It = m_Index.find(Key); It != m_Index.end()) { - throw std::system_error(Ec, "Failed to create bucket manifest '{}'"_format(ManifestPath)); + It.value().LastAccess = Obj["LastAccess"sv].AsInt64(); } + } - m_BucketId.Generate(); - - ManifestFile.Write(&m_BucketId, sizeof(Oid), /* FileOffset */ 0); + m_IsOk = true; +} - IsNew = true; - } +void +ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew) +{ + m_BucketDir = BucketDir; - // Initialize small object storage related files + uint64_t MaxFileOffset = 0; + uint64_t InvalidEntryCount = 0; + m_SobsCursor = 0; + m_TotalSize = 0; - m_SobsFile.Open(SobsPath, IsNew); + m_Index.clear(); - // Open and replay log + std::filesystem::path SobsPath{BucketDir / "zen.sobs"}; + std::filesystem::path SlogPath{BucketDir / "zen.slog"}; + m_SobsFile.Open(SobsPath, IsNew); m_SlogFile.Open(SlogPath, IsNew); - uint64_t MaxFileOffset = 0; + m_SlogFile.Replay([&](const DiskIndexEntry& Entry) { + if (Entry.Key == IoHash::Zero) + { + ++InvalidEntryCount; + } + else if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) + { + m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); + } + else + { + m_Index[Entry.Key] = {.Location = Entry.Location, .LastAccess = GcClock::TickCount()}; + m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed); + } + MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Entry.Location.Offset() + Entry.Location.Size()); + }); - if (RwLock::ExclusiveLockScope _(m_IndexLock); m_Index.empty()) + if (InvalidEntryCount) { - m_SlogFile.Replay([&](const DiskIndexEntry& Record) { - m_Index[Record.Key] = Record.Location; - - MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset() + Record.Location.Size()); - }); - - m_WriteCursor = (MaxFileOffset + 15) & ~15; + ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, SlogPath); } - m_IsOk = true; + m_SobsCursor = (MaxFileOffset + 15) & ~15; } void @@ -502,7 +649,7 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, Zen bool ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue) { - WideStringBuilder<128> DataFilePath; + PathBuilder DataFilePath; BuildPath(DataFilePath, HashKey); RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); @@ -518,6 +665,23 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, return false; } +void +ZenCacheDiskLayer::CacheBucket::DeleteStandaloneCacheValue(const DiskLocation& Loc, + const IoHash& HashKey, + const fs::path& Path, + std::error_code& Ec) +{ + ZEN_DEBUG("deleting standalone cache file '{}'", Path); + fs::remove(Path, Ec); + + if (!Ec) + { + m_SlogFile.Append(DiskIndexEntry{.Key = HashKey, .Location = {0, Loc.Size(), 0, DiskLocation::kTombStone}}); + m_Index.erase(HashKey); + m_TotalSize.fetch_sub(Loc.Size(), std::memory_order::relaxed); + } +} + bool ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { @@ -528,18 +692,21 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal RwLock::SharedLockScope _(m_IndexLock); - if (auto it = m_Index.find(HashKey); it != m_Index.end()) + if (auto It = m_Index.find(HashKey); It != m_Index.end()) { - const DiskLocation& Loc = it->second; + IndexEntry& Entry = It.value(); + Entry.LastAccess = GcClock::TickCount(); + { + } - if (GetInlineCacheValue(Loc, OutValue)) + if (GetInlineCacheValue(Entry.Location, OutValue)) { return true; } _.ReleaseNow(); - return GetStandaloneCacheValue(Loc, HashKey, OutValue); + return GetStandaloneCacheValue(Entry.Location, HashKey, OutValue); } return false; @@ -570,25 +737,28 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& RwLock::ExclusiveLockScope _(m_IndexLock); - DiskLocation Loc(m_WriteCursor, Value.Value.Size(), 0, EntryFlags); + DiskLocation Loc(m_SobsCursor, Value.Value.Size(), 0, EntryFlags); - m_WriteCursor = RoundUp(m_WriteCursor + Loc.Size(), 16); + m_SobsCursor = RoundUp(m_SobsCursor + Loc.Size(), 16); - if (auto it = m_Index.find(HashKey); it == m_Index.end()) + if (auto It = m_Index.find(HashKey); It == m_Index.end()) { // Previously unknown object - m_Index.insert({HashKey, Loc}); + m_Index.insert({HashKey, {Loc, GcClock::TickCount()}}); } else { // TODO: should check if write is idempotent and bail out if it is? // this would requiring comparing contents on disk unless we add a // content hash to the index entry - it.value() = Loc; + IndexEntry& Entry = It.value(); + Entry.Location = Loc; + Entry.LastAccess = GcClock::TickCount(); } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); m_SobsFile.Write(Value.Value.Data(), Loc.Size(), Loc.Offset()); + m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); } } @@ -605,8 +775,35 @@ ZenCacheDiskLayer::CacheBucket::Drop() void ZenCacheDiskLayer::CacheBucket::Flush() { + RwLock::SharedLockScope _(m_IndexLock); + m_SobsFile.Flush(); m_SlogFile.Flush(); + + SaveManifest(); +} + +void +ZenCacheDiskLayer::CacheBucket::SaveManifest() +{ + using namespace std::literals; + + CbObjectWriter Writer; + Writer << "BucketId"sv << m_BucketId; + + if (!m_Index.empty()) + { + Writer.BeginArray("Timestamps"sv); + for (auto& Kv : m_Index) + { + const IoHash& Key = Kv.first; + const IndexEntry& Entry = Kv.second; + Writer << "Key"sv << Key << "LastAccess"sv << Entry.LastAccess; + } + Writer.EndArray(); + } + + SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Writer.Save()); } void @@ -620,7 +817,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) for (auto& Kv : m_Index) { const IoHash& HashKey = Kv.first; - const DiskLocation& Loc = Kv.second; + const DiskLocation& Loc = Kv.second.Location; ZenCacheValue Value; @@ -654,52 +851,314 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) { // Log a tombstone and delete the in-memory index for the bad entry - m_SlogFile.Append(DiskIndexEntry{.Key = BadKey, .Location = {0, 0, 0, DiskLocation::kTombStone}}); + const auto It = m_Index.find(BadKey); + const DiskLocation& Location = It->second.Location; + m_SlogFile.Append(DiskIndexEntry{.Key = BadKey, .Location = {Location.Offset(), Location.Size(), 0, DiskLocation::kTombStone}}); m_Index.erase(BadKey); } } } void -ZenCacheDiskLayer::CacheBucket::GarbageCollect(GcContext& GcCtx) +ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) { - RwLock::SharedLockScope _(m_IndexLock); + const GcClock::TimePoint ExpireTime = + GcCtx.MaxCacheDuration() == GcClock::Duration::max() ? GcClock::TimePoint::min() : GcCtx.Time() - GcCtx.MaxCacheDuration(); + + const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); + + RwLock::ExclusiveLockScope _(m_IndexLock); + + std::vector<IoHash> ValidKeys; + std::vector<IoHash> ExpiredKeys; + std::vector<IoHash> Cids; + std::vector<IndexMap::value_type> Entries(m_Index.begin(), m_Index.end()); + + std::sort(Entries.begin(), Entries.end(), [](const auto& LHS, const auto& RHS) { + return LHS.second.LastAccess < RHS.second.LastAccess; + }); + + const auto ValidIt = std::lower_bound(Entries.begin(), Entries.end(), ExpireTicks, [](const auto& Kv, auto Ticks) { + const IndexEntry& Entry = Kv.second; + return Entry.LastAccess < Ticks; + }); + + Cids.reserve(Entries.size()); - for (auto& Kv : m_Index) + for (auto Kv = ValidIt; Kv != Entries.end(); ++Kv) { - const IoHash& HashKey = Kv.first; - const DiskLocation& Loc = Kv.second; + const IoHash& Key = Kv->first; + const DiskLocation& Loc = Kv->second.Location; - if (Loc.IsFlagSet(DiskLocation::kStructured) == false) + if (Loc.IsFlagSet(DiskLocation::kStructured)) { - continue; + ZenCacheValue CacheValue; + if (!GetInlineCacheValue(Loc, CacheValue)) + { + GetStandaloneCacheValue(Loc, Key, CacheValue); + } + + if (CacheValue.Value) + { + ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject); + + CbObject Obj(SharedBuffer{CacheValue.Value}); + Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); + GcCtx.ContributeCids(Cids); + } } + } - ZenCacheValue CacheValue; - std::vector<IoHash> Attachments; + ValidKeys.reserve(std::distance(ValidIt, Entries.end())); + ExpiredKeys.reserve(std::distance(Entries.begin(), ValidIt)); - auto GatherRefs = [&] { - ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject); + std::transform(ValidIt, Entries.end(), std::back_inserter(ValidKeys), [](const auto& Kv) { return Kv.first; }); + std::transform(Entries.begin(), ValidIt, std::back_inserter(ExpiredKeys), [](const auto& Kv) { return Kv.first; }); + + GcCtx.ContributeCids(Cids); + GcCtx.ContributeCacheKeys(std::move(ValidKeys), std::move(ExpiredKeys)); +} + +void +ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) +{ + Flush(); + + RwLock::ExclusiveLockScope _(m_IndexLock); + + const uint64_t OldCount = m_Index.size(); + const uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); + + ZEN_INFO("collecting garbage from z$ bucket '{}'", m_BucketDir); + + Stopwatch Timer; + const auto Guard = MakeGuard([this, &Timer, &OldCount, &OldTotalSize] { + const uint64_t NewCount = m_Index.size(); + const uint64_t NewTotalSize = m_TotalSize.load(std::memory_order::relaxed); + ZEN_INFO("garbage collect from '{}' DONE after {}, collected {} ({}) chunks of total {} ({})", + m_BucketDir, + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + OldCount - NewCount, + NiceBytes(OldTotalSize - NewTotalSize), + OldCount, + NiceBytes(OldTotalSize)); + SaveManifest(); + }); + + if (m_Index.empty()) + { + return; + } + + auto AddEntries = [this](std::span<const IoHash> Keys, std::vector<IndexMap::value_type>& OutEntries) { + for (const IoHash& Key : Keys) + { + if (auto It = m_Index.find(Key); It != m_Index.end()) + { + OutEntries.push_back(*It); + } + } + }; + + std::vector<IndexMap::value_type> ValidEntries; + std::vector<IndexMap::value_type> ExpiredEntries; + + AddEntries(GcCtx.ValidCacheKeys(), ValidEntries); + AddEntries(GcCtx.ExpiredCacheKeys(), ExpiredEntries); + + // Remove all standalone file(s) + // NOTE: This can probably be made asynchronously + { + std::error_code Ec; + PathBuilder Path; - CbObject Obj(SharedBuffer{CacheValue.Value}); - Obj.IterateAttachments([&](CbFieldView Field) { Attachments.push_back(Field.AsAttachment()); }); - GcCtx.ContributeCids(Attachments); + for (const auto& Entry : ExpiredEntries) + { + const IoHash& Key = Entry.first; + const DiskLocation& Loc = Entry.second.Location; + + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + Path.Reset(); + BuildPath(Path, Key); + + // NOTE: this will update index and log file + DeleteStandaloneCacheValue(Loc, Key, Path.c_str(), Ec); + + if (Ec) + { + ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason '{}'", WideToUtf8(Path.ToString()), Ec.message()); + Ec.clear(); + } + } + } + } + + if (GcCtx.CollectSmallObjects() && !ExpiredEntries.empty()) + { + // Naive GC implementation of small objects. Needs enough free + // disk space to store intermediate sob container along side the + // old container + + const auto ResetSobStorage = [this, &ValidEntries]() { + m_SobsFile.Close(); + m_SlogFile.Close(); + + const bool IsNew = true; + m_SobsFile.Open(m_BucketDir / "zen.sobs", IsNew); + m_SlogFile.Open(m_BucketDir / "zen.slog", IsNew); + + m_SobsCursor = 0; + m_TotalSize = 0; + m_Index.clear(); + + for (const auto& Entry : ValidEntries) + { + const IoHash& Key = Entry.first; + const DiskLocation& Loc = Entry.second.Location; + + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + m_SlogFile.Append({.Key = Key, .Location = Loc}); + m_Index.insert({Key, {Loc, GcClock::TickCount()}}); + m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); + } + } }; - if (GetInlineCacheValue(Loc, /* out */ CacheValue)) + uint64_t NewContainerSize{}; + for (const auto& Entry : ValidEntries) { - GatherRefs(); + const DiskLocation& Loc = Entry.second.Location; + + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile) == false) + { + NewContainerSize += (Loc.Size() + sizeof(DiskLocation)); + } } - else if (GetStandaloneCacheValue(Loc, HashKey, /* out */ CacheValue)) + + if (NewContainerSize == 0) { - GatherRefs(); + ResetSobStorage(); + return; } - else + + const uint64_t DiskSpaceMargin = (256 << 10); + + std::error_code Ec; + DiskSpace Space = DiskSpaceInfo(m_BucketDir, Ec); + if (Ec || Space.Free < NewContainerSize + DiskSpaceMargin) + { + ZEN_WARN("garbage collect z$ bucket '{}' FAILED, not enough disk space {}/{} (required/free)", + m_BucketDir, + NiceBytes(NewContainerSize), + NiceBytes(Space.Free)); + return; + } + + std::filesystem::path TmpSobsPath{m_BucketDir / "zen.sobs.tmp"}; + std::filesystem::path TmpSlogPath{m_BucketDir / "zen.slog.tmp"}; + + // Copy non expired sob(s) to temporary sob container + + { + BasicFile TmpSobs; + TCasLogFile<DiskIndexEntry> TmpLog; + uint64_t TmpCursor{}; + std::vector<uint8_t> Chunk; + + TmpSobs.Open(TmpSobsPath, true); + TmpLog.Open(TmpSlogPath, true); + + for (const auto& Entry : ValidEntries) + { + const IoHash& Key = Entry.first; + const DiskLocation& Loc = Entry.second.Location; + + DiskLocation NewLoc; + + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + NewLoc = DiskLocation(0, Loc.Size(), 0, DiskLocation::kStandaloneFile); + } + else + { + Chunk.resize(Loc.Size()); + m_SobsFile.Read(Chunk.data(), Chunk.size(), Loc.Offset()); + + NewLoc = DiskLocation(TmpCursor, Chunk.size(), 0, 0); + TmpSobs.Write(Chunk.data(), Chunk.size(), TmpCursor); + TmpCursor = RoundUp(TmpCursor + Chunk.size(), 16); + } + + TmpLog.Append(DiskIndexEntry{.Key = Key, .Location = NewLoc}); + } + } + + // Swap state + try + { + fs::path SobsPath{m_BucketDir / "zen.sobs"}; + fs::path SlogPath{m_BucketDir / "zen.slog"}; + + m_SobsFile.Close(); + m_SlogFile.Close(); + + fs::remove(SobsPath); + fs::remove(SlogPath); + + fs::rename(TmpSobsPath, SobsPath); + fs::rename(TmpSlogPath, SlogPath); + + const bool IsNew = false; + OpenLog(m_BucketDir, IsNew); + } + catch (std::exception& Err) + { + ZEN_ERROR("garbage collection FAILED, reason '{}'", Err.what()); + ResetSobStorage(); + } + } +} + +void +ZenCacheDiskLayer::CacheBucket::UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes) +{ + using namespace access_tracking; + + for (const KeyAccessTime& KeyTime : AccessTimes) + { + if (auto It = m_Index.find(KeyTime.Key); It != m_Index.end()) { - // Value not found + IndexEntry& Entry = It.value(); + Entry.LastAccess = KeyTime.LastAccess; } + } +} + +void +ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) +{ + RwLock::SharedLockScope _(m_Lock); + + for (auto& Kv : m_Buckets) + { + Kv.second.CollectGarbage(GcCtx); + } +} - GcCtx.ContributeCids(Attachments); +void +ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes) +{ + RwLock::SharedLockScope _(m_Lock); + + for (const auto& Kv : AccessTimes.Buckets) + { + if (auto It = m_Buckets.find(Kv.first); It != m_Buckets.end()) + { + CacheBucket& Bucket = It->second; + Bucket.UpdateAccessTimes(Kv.second); + } } } @@ -708,7 +1167,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c { RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); - WideStringBuilder<128> DataFilePath; + PathBuilder DataFilePath; BuildPath(DataFilePath, HashKey); TemporaryFile DataFile; @@ -781,19 +1240,21 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c RwLock::ExclusiveLockScope _(m_IndexLock); DiskLocation Loc(/* Offset */ 0, Value.Value.Size(), 0, EntryFlags); + IndexEntry Entry = IndexEntry{.Location = Loc, .LastAccess = GcClock::TickCount()}; - if (auto it = m_Index.find(HashKey); it == m_Index.end()) + if (auto It = m_Index.find(HashKey); It == m_Index.end()) { // Previously unknown object - m_Index.insert({HashKey, Loc}); + m_Index.insert({HashKey, Entry}); } else { // TODO: should check if write is idempotent and bail out if it is? - it.value() = Loc; + It.value() = Entry; } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); + m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); } ////////////////////////////////////////////////////////////////////////// @@ -924,7 +1385,7 @@ ZenCacheDiskLayer::DiscoverBuckets() { // New bucket needs to be created - std::string BucketName8 = WideToUtf8(BucketName); + const std::string BucketName8 = ToUtf8(BucketName); if (auto It = m_Buckets.find(BucketName8); It != m_Buckets.end()) { @@ -940,7 +1401,11 @@ ZenCacheDiskLayer::DiscoverBuckets() Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false); - if (!Bucket.IsOk()) + if (Bucket.IsOk()) + { + ZEN_INFO("Discovered bucket '{}'", BucketName8); + } + else { ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName8, m_RootDir); @@ -1007,28 +1472,60 @@ ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) } void -ZenCacheDiskLayer::GarbageCollect(GcContext& GcCtx) +ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx) +{ + RwLock::SharedLockScope _(m_Lock); + + for (auto& Kv : m_Buckets) + { + Kv.second.GatherReferences(GcCtx); + } +} + +uint64_t +ZenCacheDiskLayer::TotalSize() const { + uint64_t TotalSize{}; RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { - Kv.second.GarbageCollect(GcCtx); + TotalSize += Kv.second.TotalSize(); } + + return TotalSize; } ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS +using namespace std::literals; +using namespace fmt::literals; + +namespace testutils { + + IoHash CreateKey(size_t KeyValue) { return IoHash::HashBuffer(&KeyValue, sizeof(size_t)); } + + IoBuffer CreateBinaryCacheValue(uint64_t Size) + { + std::vector<uint32_t> Data(size_t(Size / sizeof uint32_t)); + std::generate(Data.begin(), Data.end(), [Idx = 0]() mutable { return Idx++; }); + + IoBuffer Buf(IoBuffer::Clone, Data.data(), Data.size() * sizeof(uint32_t)); + Buf.SetContentType(ZenContentType::kBinary); + return Buf; + }; + +} // namespace testutils + TEST_CASE("z$.store") { - using namespace fmt::literals; - using namespace std::literals; - ScopedTemporaryDirectory TempDir; - ZenCacheStore Zcs(TempDir.Path() / "cache"); + CasGc Gc; + + ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); const int kIterationCount = 100; @@ -1062,6 +1559,280 @@ TEST_CASE("z$.store") } } +TEST_CASE("z$.size") +{ + const auto CreateCacheValue = [](size_t Size) -> CbObject { + std::vector<uint8_t> Buf; + Buf.resize(Size); + + CbObjectWriter Writer; + Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); + return Writer.Save(); + }; + + SUBCASE("mem/disklayer") + { + const size_t Count = 16; + ScopedTemporaryDirectory TempDir; + + GcStorageSize CacheSize; + + { + CasGc Gc; + ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + + CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() - 256); + + IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); + Buffer.SetContentType(ZenContentType::kCbObject); + + for (size_t Key = 0; Key < Count; ++Key) + { + const size_t Bucket = Key % 4; + Zcs.Put("test_bucket-{}"_format(Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}); + } + + CacheSize = Zcs.StorageSize(); + CHECK_EQ(CacheValue.GetSize() * Count, CacheSize.DiskSize); + CHECK_EQ(CacheValue.GetSize() * Count, CacheSize.MemorySize); + } + + { + CasGc Gc; + ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + + const GcStorageSize SerializedSize = Zcs.StorageSize(); + CHECK_EQ(SerializedSize.MemorySize, 0); + CHECK_EQ(SerializedSize.DiskSize, CacheSize.DiskSize); + + for (size_t Bucket = 0; Bucket < 4; ++Bucket) + { + Zcs.DropBucket("test_bucket-{}"_format(Bucket)); + } + CHECK_EQ(0, Zcs.StorageSize().DiskSize); + } + } + + SUBCASE("disklayer") + { + const size_t Count = 16; + ScopedTemporaryDirectory TempDir; + + GcStorageSize CacheSize; + + { + CasGc Gc; + ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + + CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() + 64); + + IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); + Buffer.SetContentType(ZenContentType::kCbObject); + + for (size_t Key = 0; Key < Count; ++Key) + { + const size_t Bucket = Key % 4; + Zcs.Put("test_bucket-{}"_format(Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}); + } + + CacheSize = Zcs.StorageSize(); + CHECK_EQ(CacheValue.GetSize() * Count, CacheSize.DiskSize); + CHECK_EQ(0, CacheSize.MemorySize); + } + + { + CasGc Gc; + ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + + const GcStorageSize SerializedSize = Zcs.StorageSize(); + CHECK_EQ(SerializedSize.MemorySize, 0); + CHECK_EQ(SerializedSize.DiskSize, CacheSize.DiskSize); + + for (size_t Bucket = 0; Bucket < 4; ++Bucket) + { + Zcs.DropBucket("test_bucket-{}"_format(Bucket)); + } + CHECK_EQ(0, Zcs.StorageSize().DiskSize); + } + } +} + +TEST_CASE("z$.gc") +{ + using namespace testutils; + + SUBCASE("gather references does NOT add references for expired cache entries") + { + ScopedTemporaryDirectory TempDir; + std::vector<IoHash> Cids{CreateKey(1), CreateKey(2), CreateKey(3)}; + + const auto CollectAndFilter = [](CasGc& Gc, + GcClock::TimePoint Time, + GcClock::Duration MaxDuration, + std::span<const IoHash> Cids, + std::vector<IoHash>& OutKeep) { + GcContext GcCtx(Time); + GcCtx.MaxCacheDuration(MaxDuration); + Gc.CollectGarbage(GcCtx); + OutKeep.clear(); + GcCtx.FilterCids(Cids, [&OutKeep](const IoHash& Hash) { OutKeep.push_back(Hash); }); + }; + + { + CasGc Gc; + ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + const auto Bucket = "teardrinker"sv; + + // Create a cache record + const IoHash Key = CreateKey(42); + CbObjectWriter Record; + Record << "Key"sv + << "SomeRecord"sv; + + for (size_t Idx = 0; auto& Cid : Cids) + { + Record.AddBinaryAttachment("attachment-{}"_format(Idx++), Cid); + } + + IoBuffer Buffer = Record.Save().GetBuffer().AsIoBuffer(); + Buffer.SetContentType(ZenContentType::kCbObject); + + Zcs.Put(Bucket, Key, {.Value = Buffer}); + + std::vector<IoHash> Keep; + + // Collect garbage with 1 hour max cache duration + { + CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep); + CHECK_EQ(Cids.size(), Keep.size()); + } + + // Move forward in time + { + CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep); + CHECK_EQ(0, Keep.size()); + } + } + + // Expect timestamps to be serialized + //{ + // CasGc Gc; + // ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + // std::vector<IoHash> Keep; + + // // Collect garbage with 1 hour max cache duration + // { + // CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep); + // CHECK_EQ(3, Keep.size()); + // } + + // // Move forward in time + // { + // CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep); + // CHECK_EQ(0, Keep.size()); + // } + //} + } + + SUBCASE("gc removes standalone values") + { + ScopedTemporaryDirectory TempDir; + CasGc Gc; + ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + const auto Bucket = "fortysixandtwo"sv; + const GcClock::TimePoint CurrentTime = GcClock::Now(); + + std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)}; + + for (const auto& Key : Keys) + { + IoBuffer Value = testutils::CreateBinaryCacheValue(128 << 10); + Zcs.Put(Bucket, Key, {.Value = Value}); + } + + { + GcContext GcCtx; + GcCtx.MaxCacheDuration(std::chrono::hours(46)); + + Gc.CollectGarbage(GcCtx); + + for (const auto& Key : Keys) + { + ZenCacheValue CacheValue; + const bool Exists = Zcs.Get(Bucket, Key, CacheValue); + CHECK(Exists); + } + } + + // Move forward in time and collect again + { + GcContext GcCtx(CurrentTime + std::chrono::hours(46)); + GcCtx.MaxCacheDuration(std::chrono::minutes(2)); + + Gc.CollectGarbage(GcCtx); + + for (const auto& Key : Keys) + { + ZenCacheValue CacheValue; + const bool Exists = Zcs.Get(Bucket, Key, CacheValue); + CHECK(!Exists); + } + + CHECK_EQ(0, Zcs.StorageSize().DiskSize); + } + } + + SUBCASE("gc removes small objects") + { + ScopedTemporaryDirectory TempDir; + CasGc Gc; + ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + const auto Bucket = "rightintwo"sv; + const GcClock::TimePoint CurrentTime = GcClock::Now(); + + std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)}; + + for (const auto& Key : Keys) + { + IoBuffer Value = testutils::CreateBinaryCacheValue(128); + Zcs.Put(Bucket, Key, {.Value = Value}); + } + + { + GcContext GcCtx; + GcCtx.MaxCacheDuration(std::chrono::hours(2)); + GcCtx.CollectSmallObjects(true); + + Gc.CollectGarbage(GcCtx); + + for (const auto& Key : Keys) + { + ZenCacheValue CacheValue; + const bool Exists = Zcs.Get(Bucket, Key, CacheValue); + CHECK(Exists); + } + } + + // Move forward in time and collect again + { + GcContext GcCtx(CurrentTime + std::chrono::hours(2)); + GcCtx.MaxCacheDuration(std::chrono::minutes(2)); + GcCtx.CollectSmallObjects(true); + + Gc.CollectGarbage(GcCtx); + + for (const auto& Key : Keys) + { + ZenCacheValue CacheValue; + const bool Exists = Zcs.Get(Bucket, Key, CacheValue); + CHECK(!Exists); + } + + CHECK_EQ(0, Zcs.StorageSize().DiskSize); + } + } +} + #endif void diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h index 0dfcbc5ca..b64b1353e 100644 --- a/zenserver/cache/structuredcachestore.h +++ b/zenserver/cache/structuredcachestore.h @@ -7,23 +7,26 @@ #include <zencore/iohash.h> #include <zencore/thread.h> #include <zencore/uid.h> -#include <zenstore/basicfile.h> #include <zenstore/cas.h> -#include <zenstore/caslog.h> +#include <zenstore/gc.h> -#pragma warning(push) -#pragma warning(disable : 4127) +ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_map.h> -#pragma warning(pop) +ZEN_THIRD_PARTY_INCLUDES_END +#include <atomic> #include <compare> #include <filesystem> #include <unordered_map> +#define ZEN_USE_CACHE_TRACKER 0 + namespace zen { -class WideStringBuilderBase; class CasStore; +class CasGc; +class WideStringBuilderBase; +class ZenCacheTracker; /****************************************************************************** @@ -42,6 +45,20 @@ class CasStore; ******************************************************************************/ +namespace access_tracking { + + struct KeyAccessTime + { + IoHash Key; + GcClock::Tick LastAccess{}; + }; + + struct AccessTimes + { + std::unordered_map<std::string, std::vector<KeyAccessTime>> Buckets; + }; +}; // namespace access_tracking + struct ZenCacheValue { IoBuffer Value; @@ -52,6 +69,9 @@ struct ZenCacheValue Intended for small values which are frequently accessed + This should have a better memory management policy to maintain reasonable + footprint. + */ class ZenCacheMemoryLayer { @@ -59,11 +79,13 @@ public: ZenCacheMemoryLayer(); ~ZenCacheMemoryLayer(); - bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); - bool DropBucket(std::string_view Bucket); - void Scrub(ScrubContext& Ctx); - void GarbageCollect(GcContext& GcCtx); + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); + bool DropBucket(std::string_view Bucket); + void Scrub(ScrubContext& Ctx); + void GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes); + void Reset(); + uint64_t TotalSize() const; struct Configuration { @@ -79,63 +101,29 @@ private: { struct BucketValue { - uint64_t LastAccess = 0; - IoBuffer Payload; + GcClock::Tick LastAccess{}; + IoBuffer Payload; }; RwLock m_bucketLock; tsl::robin_map<IoHash, BucketValue> m_cacheMap; + std::atomic_uint64_t m_TotalSize{}; - bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(const IoHash& HashKey, const ZenCacheValue& Value); - void Scrub(ScrubContext& Ctx); - void GarbageCollect(GcContext& GcCtx); - - private: - uint64_t GetCurrentTimeStamp(); + bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(const IoHash& HashKey, const ZenCacheValue& Value); + void Scrub(ScrubContext& Ctx); + void GatherAccessTimes(std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes); + inline uint64_t TotalSize() const { return m_TotalSize; } }; - RwLock m_Lock; + mutable RwLock m_Lock; std::unordered_map<std::string, CacheBucket> m_Buckets; Configuration m_Configuration; -}; -#pragma pack(push) -#pragma pack(1) - -struct DiskLocation -{ - static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull; - static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; - static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull; - static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; - static const uint64_t kStructured = 0x4000'0000'0000'0000ull; - static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; - - DiskLocation(); - DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags); - static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags); - uint64_t Offset() const; - uint64_t Size() const; - uint64_t IsFlagSet(uint64_t Flag) const; - ZenContentType GetContentType() const; - -private: - uint64_t OffsetAndFlags = 0; - uint32_t LowerSize = 0; - uint32_t IndexDataSize = 0; -}; - -struct DiskIndexEntry -{ - IoHash Key; - DiskLocation Location; + ZenCacheMemoryLayer(const ZenCacheMemoryLayer&) = delete; + ZenCacheMemoryLayer& operator=(const ZenCacheMemoryLayer&) = delete; }; -#pragma pack(pop) - -static_assert(sizeof(DiskIndexEntry) == 36); - class ZenCacheDiskLayer { public: @@ -147,86 +135,56 @@ public: bool DropBucket(std::string_view Bucket); void Flush(); void Scrub(ScrubContext& Ctx); - void GarbageCollect(GcContext& GcCtx); + void GatherReferences(GcContext& GcCtx); + void CollectGarbage(GcContext& GcCtx); + void UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes); - void DiscoverBuckets(); + void DiscoverBuckets(); + uint64_t TotalSize() const; private: /** A cache bucket manages a single directory containing metadata and data for that bucket */ - struct CacheBucket - { - CacheBucket(); - ~CacheBucket(); - - void OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); - static bool Delete(std::filesystem::path BucketDir); - - bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(const IoHash& HashKey, const ZenCacheValue& Value); - void Drop(); - void Flush(); - void Scrub(ScrubContext& Ctx); - void GarbageCollect(GcContext& GcCtx); - - inline bool IsOk() const { return m_IsOk; } - - private: - std::filesystem::path m_BucketDir; - Oid m_BucketId; - bool m_IsOk = false; - uint64_t m_LargeObjectThreshold = 64 * 1024; - - // These files are used to manage storage of small objects for this bucket - - BasicFile m_SobsFile; - TCasLogFile<DiskIndexEntry> m_SlogFile; - - RwLock m_IndexLock; - tsl::robin_map<IoHash, DiskLocation, IoHash::Hasher> m_Index; - uint64_t m_WriteCursor = 0; - - void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey); - void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); - bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue); - bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue); - - // These locks are here to avoid contention on file creation, therefore it's sufficient - // that we take the same lock for the same hash - // - // These locks are small and should really be spaced out so they don't share cache lines, - // but we don't currently access them at particularly high frequency so it should not be - // an issue in practice - - RwLock m_ShardedLocks[256]; - inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardedLocks[Hash.Hash[19]]; } - }; + struct CacheBucket; std::filesystem::path m_RootDir; - RwLock m_Lock; + mutable RwLock m_Lock; std::unordered_map<std::string, CacheBucket> m_Buckets; // TODO: make this case insensitive + + ZenCacheDiskLayer(const ZenCacheDiskLayer&) = delete; + ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete; }; -class ZenCacheStore +class ZenCacheStore final : public GcStorage, public GcContributor { public: - explicit ZenCacheStore(const std::filesystem::path& RootDir); + ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir); ~ZenCacheStore(); - bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); - bool DropBucket(std::string_view Bucket); - void Flush(); - void Scrub(ScrubContext& Ctx); - void GarbageCollect(GcContext& GcCtx); + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); + bool DropBucket(std::string_view Bucket); + void Flush(); + void Scrub(ScrubContext& Ctx); + uint64_t DiskLayerThreshold() const { return m_DiskLayerSizeThreshold; } + virtual void GatherReferences(GcContext& GcCtx) override; + virtual void CollectGarbage(GcContext& GcCtx) override; + virtual GcStorageSize StorageSize() const override; private: std::filesystem::path m_RootDir; ZenCacheMemoryLayer m_MemLayer; ZenCacheDiskLayer m_DiskLayer; - uint64_t m_DiskLayerSizeThreshold = 4 * 1024; + uint64_t m_DiskLayerSizeThreshold = 1 * 1024; uint64_t m_LastScrubTime = 0; + +#if ZEN_USE_CACHE_TRACKER + std::unique_ptr<ZenCacheTracker> m_AccessTracker; +#endif + + ZenCacheStore(const ZenCacheStore&) = delete; + ZenCacheStore& operator=(const ZenCacheStore&) = delete; }; void z$_forcelink(); |