// Copyright Epic Games, Inc. All Rights Reserved. #include "cachetracking.h" #if ZEN_USE_CACHE_TRACKER # include # include # include # include # include # include # include # include # include ZEN_THIRD_PARTY_INCLUDES_START # pragma comment(lib, "Rpcrt4.lib") // RocksDB made me do this # include # include # include # include # include ZEN_THIRD_PARTY_INCLUDES_END namespace zen { namespace rocksdb = ROCKSDB_NAMESPACE; static constinit auto Epoch = std::chrono::time_point{}; static uint64_t GetCurrentCacheTimeStamp() { auto Duration = std::chrono::system_clock::now() - Epoch; uint64_t Millis = std::chrono::duration_cast(Duration).count(); return Millis; } struct CacheAccessSnapshot { public: void TrackAccess(std::string_view BucketSegment, const IoHash& HashKey) { BucketTracker* Tracker = GetBucket(std::string(BucketSegment)); Tracker->Track(HashKey); } bool SerializeSnapshot(CbObjectWriter& Cbo) { bool Serialized = false; RwLock::ExclusiveLockScope _(m_Lock); for (const auto& Kv : m_BucketMapping) { if (m_Buckets[Kv.second]->Size()) { Cbo.BeginArray(Kv.first); m_Buckets[Kv.second]->SerializeSnapshotAndClear(Cbo); Cbo.EndArray(); Serialized = true; } } return Serialized; } private: struct BucketTracker { mutable RwLock Lock; tsl::robin_set AccessedKeys; void Track(const IoHash& HashKey) { if (RwLock::SharedLockScope _(Lock); AccessedKeys.contains(HashKey)) { return; } RwLock::ExclusiveLockScope _(Lock); AccessedKeys.insert(HashKey); } void SerializeSnapshotAndClear(CbObjectWriter& Cbo) { RwLock::ExclusiveLockScope _(Lock); for (const IoHash& Hash : AccessedKeys) { Cbo.AddHash(Hash); } AccessedKeys.clear(); } size_t Size() const { RwLock::SharedLockScope _(Lock); return AccessedKeys.size(); } }; BucketTracker* GetBucket(const std::string& BucketName) { RwLock::SharedLockScope _(m_Lock); if (auto It = m_BucketMapping.find(BucketName); It == m_BucketMapping.end()) { _.ReleaseNow(); return AddNewBucket(BucketName); } else { return m_Buckets[It->second].get(); } } BucketTracker* AddNewBucket(const std::string& BucketName) { RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_BucketMapping.find(BucketName); It == m_BucketMapping.end()) { const uint32_t BucketIndex = gsl::narrow(m_Buckets.size()); m_Buckets.emplace_back(std::make_unique()); m_BucketMapping[BucketName] = BucketIndex; return m_Buckets[BucketIndex].get(); } else { return m_Buckets[It->second].get(); } } RwLock m_Lock; std::vector> m_Buckets; tsl::robin_map m_BucketMapping; }; struct ZenCacheTracker::Impl { Impl(std::filesystem::path StateDirectory) { std::filesystem::path StatsDbPath{StateDirectory / ".zdb"}; std::string RocksdbPath = StatsDbPath.string(); ZEN_DEBUG("opening tracker db at '{}'", RocksdbPath); rocksdb::DB* Db = nullptr; rocksdb::DBOptions Options; Options.create_if_missing = true; std::vector ExistingColumnFamilies; rocksdb::Status Status = rocksdb::DB::ListColumnFamilies(Options, RocksdbPath, &ExistingColumnFamilies); std::vector ColumnDescriptors; if (Status.IsPathNotFound()) { ColumnDescriptors.emplace_back(rocksdb::ColumnFamilyDescriptor{rocksdb::kDefaultColumnFamilyName, {}}); } else if (Status.ok()) { for (const std::string& Column : ExistingColumnFamilies) { rocksdb::ColumnFamilyDescriptor ColumnFamily; ColumnFamily.name = Column; ColumnDescriptors.push_back(ColumnFamily); } } else { throw std::runtime_error(fmt::format("column family iteration failed for '{}': '{}'", RocksdbPath, Status.getState()).c_str()); } Status = rocksdb::DB::Open(Options, RocksdbPath, ColumnDescriptors, &m_RocksDbColumnHandles, &Db); if (!Status.ok()) { throw std::runtime_error(fmt::format("database open failed for '{}': '{}'", RocksdbPath, Status.getState()).c_str()); } m_RocksDb.reset(Db); } ~Impl() { for (auto* Column : m_RocksDbColumnHandles) { delete Column; } m_RocksDbColumnHandles.clear(); } struct KeyStruct { uint64_t TimestampLittleEndian; }; void TrackAccess(std::string_view BucketSegment, const IoHash& HashKey) { m_CurrentSnapshot.TrackAccess(BucketSegment, HashKey); } void SaveSnapshot() { CbObjectWriter Cbo; if (m_CurrentSnapshot.SerializeSnapshot(Cbo)) { IoBuffer SnapshotBuffer = Cbo.Save().GetBuffer().AsIoBuffer(); const KeyStruct Key{.TimestampLittleEndian = ToNetworkOrder(GetCurrentCacheTimeStamp())}; rocksdb::Slice KeySlice{(const char*)&Key, sizeof Key}; rocksdb::Slice ValueSlice{(char*)SnapshotBuffer.Data(), SnapshotBuffer.Size()}; rocksdb::WriteOptions Wo; m_RocksDb->Put(Wo, KeySlice, ValueSlice); } } void IterateSnapshots(std::function&& Callback) { rocksdb::ManagedSnapshot Snap(m_RocksDb.get()); rocksdb::ReadOptions Ro; Ro.snapshot = Snap.snapshot(); std::unique_ptr It{m_RocksDb->NewIterator(Ro)}; const KeyStruct ZeroKey{.TimestampLittleEndian = 0}; rocksdb::Slice ZeroKeySlice{(const char*)&ZeroKey, sizeof ZeroKey}; It->Seek(ZeroKeySlice); while (It->Valid()) { rocksdb::Slice KeySlice = It->key(); rocksdb::Slice ValueSlice = It->value(); if (KeySlice.size() == sizeof(KeyStruct)) { IoBuffer ValueBuffer(IoBuffer::Wrap, ValueSlice.data(), ValueSlice.size()); CbObject Value = LoadCompactBinaryObject(ValueBuffer); uint64_t Key = FromNetworkOrder(*reinterpret_cast(KeySlice.data())); Callback(Key, Value); } It->Next(); } } std::unique_ptr m_RocksDb; std::vector m_RocksDbColumnHandles; CacheAccessSnapshot m_CurrentSnapshot; }; ZenCacheTracker::ZenCacheTracker(std::filesystem::path StateDirectory) : m_Impl(new Impl(StateDirectory)) { } ZenCacheTracker::~ZenCacheTracker() { delete m_Impl; } void ZenCacheTracker::TrackAccess(std::string_view BucketSegment, const IoHash& HashKey) { m_Impl->TrackAccess(BucketSegment, HashKey); } void ZenCacheTracker::SaveSnapshot() { m_Impl->SaveSnapshot(); } void ZenCacheTracker::IterateSnapshots(std::function&& Callback) { m_Impl->IterateSnapshots(std::move(Callback)); } # if ZEN_WITH_TESTS TEST_CASE("z$.tracker") { using namespace std::literals; const uint64_t t0 = GetCurrentCacheTimeStamp(); ScopedTemporaryDirectory TempDir; ZenCacheTracker Zcs(TempDir.Path()); tsl::robin_set KeyHashes; for (int i = 0; i < 10000; ++i) { IoHash KeyHash = IoHash::HashBuffer(&i, sizeof i); KeyHashes.insert(KeyHash); Zcs.TrackAccess("foo"sv, KeyHash); } for (int i = 0; i < 10000; ++i) { IoHash KeyHash = IoHash::HashBuffer(&i, sizeof i); Zcs.TrackAccess("foo"sv, KeyHash); } Zcs.SaveSnapshot(); for (int n = 0; n < 10; ++n) { for (int i = 0; i < 1000; ++i) { const int Index = i + n * 1000; IoHash KeyHash = IoHash::HashBuffer(&Index, sizeof Index); Zcs.TrackAccess("foo"sv, KeyHash); } Zcs.SaveSnapshot(); } Zcs.SaveSnapshot(); const uint64_t t1 = GetCurrentCacheTimeStamp(); int SnapshotCount = 0; Zcs.IterateSnapshots([&](uint64_t TimeStamp, CbObject Snapshot) { CHECK(TimeStamp >= t0); CHECK(TimeStamp <= t1); for (auto& Field : Snapshot) { CHECK_EQ(Field.GetName(), "foo"sv); const CbArray& Array = Field.AsArray(); for (const auto& Element : Array) { CHECK(KeyHashes.contains(Element.GetValue().AsHash())); } } ++SnapshotCount; }); CHECK_EQ(SnapshotCount, 11); } # endif void cachetracker_forcelink() { } } // namespace zen #endif // ZEN_USE_CACHE_TRACKER