diff options
Diffstat (limited to 'src/zenserver/cache/cachetracking.cpp')
| -rw-r--r-- | src/zenserver/cache/cachetracking.cpp | 376 |
1 files changed, 376 insertions, 0 deletions
diff --git a/src/zenserver/cache/cachetracking.cpp b/src/zenserver/cache/cachetracking.cpp new file mode 100644 index 000000000..9119e3122 --- /dev/null +++ b/src/zenserver/cache/cachetracking.cpp @@ -0,0 +1,376 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "cachetracking.h" + +#if ZEN_USE_CACHE_TRACKER + +# include <zencore/compactbinarybuilder.h> +# include <zencore/compactbinaryvalue.h> +# include <zencore/endian.h> +# include <zencore/filesystem.h> +# include <zencore/logging.h> +# include <zencore/scopeguard.h> +# include <zencore/string.h> + +# include <zencore/testing.h> +# include <zencore/testutils.h> + +ZEN_THIRD_PARTY_INCLUDES_START +# pragma comment(lib, "Rpcrt4.lib") // RocksDB made me do this +# include <fmt/format.h> +# include <rocksdb/db.h> +# include <tsl/robin_map.h> +# include <tsl/robin_set.h> +# include <gsl/gsl-lite.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +namespace rocksdb = ROCKSDB_NAMESPACE; + +static constinit auto Epoch = std::chrono::time_point<std::chrono::system_clock>{}; + +static uint64_t +GetCurrentCacheTimeStamp() +{ + auto Duration = std::chrono::system_clock::now() - Epoch; + uint64_t Millis = std::chrono::duration_cast<std::chrono::milliseconds>(Duration).count(); + + return Millis; +} + +struct CacheAccessSnapshot +{ +public: + void TrackAccess(std::string_view BucketSegment, const IoHash& HashKey) + { + BucketTracker* Tracker = GetBucket(std::string(BucketSegment)); + + Tracker->Track(HashKey); + } + + bool SerializeSnapshot(CbObjectWriter& Cbo) + { + bool Serialized = false; + RwLock::ExclusiveLockScope _(m_Lock); + + for (const auto& Kv : m_BucketMapping) + { + if (m_Buckets[Kv.second]->Size()) + { + Cbo.BeginArray(Kv.first); + m_Buckets[Kv.second]->SerializeSnapshotAndClear(Cbo); + Cbo.EndArray(); + Serialized = true; + } + } + + return Serialized; + } + +private: + struct BucketTracker + { + mutable RwLock Lock; + tsl::robin_set<IoHash> AccessedKeys; + + void Track(const IoHash& HashKey) + { + if (RwLock::SharedLockScope _(Lock); AccessedKeys.contains(HashKey)) + { + return; + } + + RwLock::ExclusiveLockScope _(Lock); + + AccessedKeys.insert(HashKey); + } + + void SerializeSnapshotAndClear(CbObjectWriter& Cbo) + { + RwLock::ExclusiveLockScope _(Lock); + + for (const IoHash& Hash : AccessedKeys) + { + Cbo.AddHash(Hash); + } + + AccessedKeys.clear(); + } + + size_t Size() const + { + RwLock::SharedLockScope _(Lock); + return AccessedKeys.size(); + } + }; + + BucketTracker* GetBucket(const std::string& BucketName) + { + RwLock::SharedLockScope _(m_Lock); + + if (auto It = m_BucketMapping.find(BucketName); It == m_BucketMapping.end()) + { + _.ReleaseNow(); + + return AddNewBucket(BucketName); + } + else + { + return m_Buckets[It->second].get(); + } + } + + BucketTracker* AddNewBucket(const std::string& BucketName) + { + RwLock::ExclusiveLockScope _(m_Lock); + + if (auto It = m_BucketMapping.find(BucketName); It == m_BucketMapping.end()) + { + const uint32_t BucketIndex = gsl::narrow<uint32_t>(m_Buckets.size()); + m_Buckets.emplace_back(std::make_unique<BucketTracker>()); + m_BucketMapping[BucketName] = BucketIndex; + + return m_Buckets[BucketIndex].get(); + } + else + { + return m_Buckets[It->second].get(); + } + } + + RwLock m_Lock; + std::vector<std::unique_ptr<BucketTracker>> m_Buckets; + tsl::robin_map<std::string, uint32_t> m_BucketMapping; +}; + +struct ZenCacheTracker::Impl +{ + Impl(std::filesystem::path StateDirectory) + { + std::filesystem::path StatsDbPath{StateDirectory / ".zdb"}; + + std::string RocksdbPath = StatsDbPath.string(); + + ZEN_DEBUG("opening tracker db at '{}'", RocksdbPath); + + rocksdb::DB* Db = nullptr; + rocksdb::DBOptions Options; + Options.create_if_missing = true; + + std::vector<std::string> ExistingColumnFamilies; + rocksdb::Status Status = rocksdb::DB::ListColumnFamilies(Options, RocksdbPath, &ExistingColumnFamilies); + + std::vector<rocksdb::ColumnFamilyDescriptor> ColumnDescriptors; + + if (Status.IsPathNotFound()) + { + ColumnDescriptors.emplace_back(rocksdb::ColumnFamilyDescriptor{rocksdb::kDefaultColumnFamilyName, {}}); + } + else if (Status.ok()) + { + for (const std::string& Column : ExistingColumnFamilies) + { + rocksdb::ColumnFamilyDescriptor ColumnFamily; + ColumnFamily.name = Column; + ColumnDescriptors.push_back(ColumnFamily); + } + } + else + { + throw std::runtime_error(fmt::format("column family iteration failed for '{}': '{}'", RocksdbPath, Status.getState()).c_str()); + } + + Status = rocksdb::DB::Open(Options, RocksdbPath, ColumnDescriptors, &m_RocksDbColumnHandles, &Db); + + if (!Status.ok()) + { + throw std::runtime_error(fmt::format("database open failed for '{}': '{}'", RocksdbPath, Status.getState()).c_str()); + } + + m_RocksDb.reset(Db); + } + + ~Impl() + { + for (auto* Column : m_RocksDbColumnHandles) + { + delete Column; + } + + m_RocksDbColumnHandles.clear(); + } + + struct KeyStruct + { + uint64_t TimestampLittleEndian; + }; + + void TrackAccess(std::string_view BucketSegment, const IoHash& HashKey) { m_CurrentSnapshot.TrackAccess(BucketSegment, HashKey); } + + void SaveSnapshot() + { + CbObjectWriter Cbo; + + if (m_CurrentSnapshot.SerializeSnapshot(Cbo)) + { + IoBuffer SnapshotBuffer = Cbo.Save().GetBuffer().AsIoBuffer(); + + const KeyStruct Key{.TimestampLittleEndian = ToNetworkOrder(GetCurrentCacheTimeStamp())}; + rocksdb::Slice KeySlice{(const char*)&Key, sizeof Key}; + rocksdb::Slice ValueSlice{(char*)SnapshotBuffer.Data(), SnapshotBuffer.Size()}; + + rocksdb::WriteOptions Wo; + m_RocksDb->Put(Wo, KeySlice, ValueSlice); + } + } + + void IterateSnapshots(std::function<void(uint64_t TimeStamp, CbObject Snapshot)>&& Callback) + { + rocksdb::ManagedSnapshot Snap(m_RocksDb.get()); + + rocksdb::ReadOptions Ro; + Ro.snapshot = Snap.snapshot(); + + std::unique_ptr<rocksdb::Iterator> It{m_RocksDb->NewIterator(Ro)}; + + const KeyStruct ZeroKey{.TimestampLittleEndian = 0}; + rocksdb::Slice ZeroKeySlice{(const char*)&ZeroKey, sizeof ZeroKey}; + + It->Seek(ZeroKeySlice); + + while (It->Valid()) + { + rocksdb::Slice KeySlice = It->key(); + rocksdb::Slice ValueSlice = It->value(); + + if (KeySlice.size() == sizeof(KeyStruct)) + { + IoBuffer ValueBuffer(IoBuffer::Wrap, ValueSlice.data(), ValueSlice.size()); + + CbObject Value = LoadCompactBinaryObject(ValueBuffer); + + uint64_t Key = FromNetworkOrder(*reinterpret_cast<const uint64_t*>(KeySlice.data())); + + Callback(Key, Value); + } + + It->Next(); + } + } + + std::unique_ptr<rocksdb::DB> m_RocksDb; + std::vector<rocksdb::ColumnFamilyHandle*> m_RocksDbColumnHandles; + CacheAccessSnapshot m_CurrentSnapshot; +}; + +ZenCacheTracker::ZenCacheTracker(std::filesystem::path StateDirectory) : m_Impl(new Impl(StateDirectory)) +{ +} + +ZenCacheTracker::~ZenCacheTracker() +{ + delete m_Impl; +} + +void +ZenCacheTracker::TrackAccess(std::string_view BucketSegment, const IoHash& HashKey) +{ + m_Impl->TrackAccess(BucketSegment, HashKey); +} + +void +ZenCacheTracker::SaveSnapshot() +{ + m_Impl->SaveSnapshot(); +} + +void +ZenCacheTracker::IterateSnapshots(std::function<void(uint64_t TimeStamp, CbObject Snapshot)>&& Callback) +{ + m_Impl->IterateSnapshots(std::move(Callback)); +} + +# if ZEN_WITH_TESTS + +TEST_CASE("z$.tracker") +{ + using namespace std::literals; + + const uint64_t t0 = GetCurrentCacheTimeStamp(); + + ScopedTemporaryDirectory TempDir; + + ZenCacheTracker Zcs(TempDir.Path()); + + tsl::robin_set<IoHash> KeyHashes; + + for (int i = 0; i < 10000; ++i) + { + IoHash KeyHash = IoHash::HashBuffer(&i, sizeof i); + + KeyHashes.insert(KeyHash); + + Zcs.TrackAccess("foo"sv, KeyHash); + } + + for (int i = 0; i < 10000; ++i) + { + IoHash KeyHash = IoHash::HashBuffer(&i, sizeof i); + + Zcs.TrackAccess("foo"sv, KeyHash); + } + + Zcs.SaveSnapshot(); + + for (int n = 0; n < 10; ++n) + { + for (int i = 0; i < 1000; ++i) + { + const int Index = i + n * 1000; + IoHash KeyHash = IoHash::HashBuffer(&Index, sizeof Index); + + Zcs.TrackAccess("foo"sv, KeyHash); + } + + Zcs.SaveSnapshot(); + } + + Zcs.SaveSnapshot(); + + const uint64_t t1 = GetCurrentCacheTimeStamp(); + + int SnapshotCount = 0; + + Zcs.IterateSnapshots([&](uint64_t TimeStamp, CbObject Snapshot) { + CHECK(TimeStamp >= t0); + CHECK(TimeStamp <= t1); + + for (auto& Field : Snapshot) + { + CHECK_EQ(Field.GetName(), "foo"sv); + + const CbArray& Array = Field.AsArray(); + + for (const auto& Element : Array) + { + CHECK(KeyHashes.contains(Element.GetValue().AsHash())); + } + } + + ++SnapshotCount; + }); + + CHECK_EQ(SnapshotCount, 11); +} + +# endif + +void +cachetracker_forcelink() +{ +} + +} // namespace zen + +#endif // ZEN_USE_CACHE_TRACKER |