aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache/cachetracking.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/cache/cachetracking.cpp')
-rw-r--r--src/zenserver/cache/cachetracking.cpp376
1 files changed, 376 insertions, 0 deletions
diff --git a/src/zenserver/cache/cachetracking.cpp b/src/zenserver/cache/cachetracking.cpp
new file mode 100644
index 000000000..9119e3122
--- /dev/null
+++ b/src/zenserver/cache/cachetracking.cpp
@@ -0,0 +1,376 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "cachetracking.h"
+
+#if ZEN_USE_CACHE_TRACKER
+
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compactbinaryvalue.h>
+# include <zencore/endian.h>
+# include <zencore/filesystem.h>
+# include <zencore/logging.h>
+# include <zencore/scopeguard.h>
+# include <zencore/string.h>
+
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+# pragma comment(lib, "Rpcrt4.lib") // RocksDB made me do this
+# include <fmt/format.h>
+# include <rocksdb/db.h>
+# include <tsl/robin_map.h>
+# include <tsl/robin_set.h>
+# include <gsl/gsl-lite.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+namespace rocksdb = ROCKSDB_NAMESPACE;
+
+static constinit auto Epoch = std::chrono::time_point<std::chrono::system_clock>{};
+
+static uint64_t
+GetCurrentCacheTimeStamp()
+{
+ auto Duration = std::chrono::system_clock::now() - Epoch;
+ uint64_t Millis = std::chrono::duration_cast<std::chrono::milliseconds>(Duration).count();
+
+ return Millis;
+}
+
+struct CacheAccessSnapshot
+{
+public:
+ void TrackAccess(std::string_view BucketSegment, const IoHash& HashKey)
+ {
+ BucketTracker* Tracker = GetBucket(std::string(BucketSegment));
+
+ Tracker->Track(HashKey);
+ }
+
+ bool SerializeSnapshot(CbObjectWriter& Cbo)
+ {
+ bool Serialized = false;
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ for (const auto& Kv : m_BucketMapping)
+ {
+ if (m_Buckets[Kv.second]->Size())
+ {
+ Cbo.BeginArray(Kv.first);
+ m_Buckets[Kv.second]->SerializeSnapshotAndClear(Cbo);
+ Cbo.EndArray();
+ Serialized = true;
+ }
+ }
+
+ return Serialized;
+ }
+
+private:
+ struct BucketTracker
+ {
+ mutable RwLock Lock;
+ tsl::robin_set<IoHash> AccessedKeys;
+
+ void Track(const IoHash& HashKey)
+ {
+ if (RwLock::SharedLockScope _(Lock); AccessedKeys.contains(HashKey))
+ {
+ return;
+ }
+
+ RwLock::ExclusiveLockScope _(Lock);
+
+ AccessedKeys.insert(HashKey);
+ }
+
+ void SerializeSnapshotAndClear(CbObjectWriter& Cbo)
+ {
+ RwLock::ExclusiveLockScope _(Lock);
+
+ for (const IoHash& Hash : AccessedKeys)
+ {
+ Cbo.AddHash(Hash);
+ }
+
+ AccessedKeys.clear();
+ }
+
+ size_t Size() const
+ {
+ RwLock::SharedLockScope _(Lock);
+ return AccessedKeys.size();
+ }
+ };
+
+ BucketTracker* GetBucket(const std::string& BucketName)
+ {
+ RwLock::SharedLockScope _(m_Lock);
+
+ if (auto It = m_BucketMapping.find(BucketName); It == m_BucketMapping.end())
+ {
+ _.ReleaseNow();
+
+ return AddNewBucket(BucketName);
+ }
+ else
+ {
+ return m_Buckets[It->second].get();
+ }
+ }
+
+ BucketTracker* AddNewBucket(const std::string& BucketName)
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (auto It = m_BucketMapping.find(BucketName); It == m_BucketMapping.end())
+ {
+ const uint32_t BucketIndex = gsl::narrow<uint32_t>(m_Buckets.size());
+ m_Buckets.emplace_back(std::make_unique<BucketTracker>());
+ m_BucketMapping[BucketName] = BucketIndex;
+
+ return m_Buckets[BucketIndex].get();
+ }
+ else
+ {
+ return m_Buckets[It->second].get();
+ }
+ }
+
+ RwLock m_Lock;
+ std::vector<std::unique_ptr<BucketTracker>> m_Buckets;
+ tsl::robin_map<std::string, uint32_t> m_BucketMapping;
+};
+
+struct ZenCacheTracker::Impl
+{
+ Impl(std::filesystem::path StateDirectory)
+ {
+ std::filesystem::path StatsDbPath{StateDirectory / ".zdb"};
+
+ std::string RocksdbPath = StatsDbPath.string();
+
+ ZEN_DEBUG("opening tracker db at '{}'", RocksdbPath);
+
+ rocksdb::DB* Db = nullptr;
+ rocksdb::DBOptions Options;
+ Options.create_if_missing = true;
+
+ std::vector<std::string> ExistingColumnFamilies;
+ rocksdb::Status Status = rocksdb::DB::ListColumnFamilies(Options, RocksdbPath, &ExistingColumnFamilies);
+
+ std::vector<rocksdb::ColumnFamilyDescriptor> ColumnDescriptors;
+
+ if (Status.IsPathNotFound())
+ {
+ ColumnDescriptors.emplace_back(rocksdb::ColumnFamilyDescriptor{rocksdb::kDefaultColumnFamilyName, {}});
+ }
+ else if (Status.ok())
+ {
+ for (const std::string& Column : ExistingColumnFamilies)
+ {
+ rocksdb::ColumnFamilyDescriptor ColumnFamily;
+ ColumnFamily.name = Column;
+ ColumnDescriptors.push_back(ColumnFamily);
+ }
+ }
+ else
+ {
+ throw std::runtime_error(fmt::format("column family iteration failed for '{}': '{}'", RocksdbPath, Status.getState()).c_str());
+ }
+
+ Status = rocksdb::DB::Open(Options, RocksdbPath, ColumnDescriptors, &m_RocksDbColumnHandles, &Db);
+
+ if (!Status.ok())
+ {
+ throw std::runtime_error(fmt::format("database open failed for '{}': '{}'", RocksdbPath, Status.getState()).c_str());
+ }
+
+ m_RocksDb.reset(Db);
+ }
+
+ ~Impl()
+ {
+ for (auto* Column : m_RocksDbColumnHandles)
+ {
+ delete Column;
+ }
+
+ m_RocksDbColumnHandles.clear();
+ }
+
+ struct KeyStruct
+ {
+ uint64_t TimestampLittleEndian;
+ };
+
+ void TrackAccess(std::string_view BucketSegment, const IoHash& HashKey) { m_CurrentSnapshot.TrackAccess(BucketSegment, HashKey); }
+
+ void SaveSnapshot()
+ {
+ CbObjectWriter Cbo;
+
+ if (m_CurrentSnapshot.SerializeSnapshot(Cbo))
+ {
+ IoBuffer SnapshotBuffer = Cbo.Save().GetBuffer().AsIoBuffer();
+
+ const KeyStruct Key{.TimestampLittleEndian = ToNetworkOrder(GetCurrentCacheTimeStamp())};
+ rocksdb::Slice KeySlice{(const char*)&Key, sizeof Key};
+ rocksdb::Slice ValueSlice{(char*)SnapshotBuffer.Data(), SnapshotBuffer.Size()};
+
+ rocksdb::WriteOptions Wo;
+ m_RocksDb->Put(Wo, KeySlice, ValueSlice);
+ }
+ }
+
+ void IterateSnapshots(std::function<void(uint64_t TimeStamp, CbObject Snapshot)>&& Callback)
+ {
+ rocksdb::ManagedSnapshot Snap(m_RocksDb.get());
+
+ rocksdb::ReadOptions Ro;
+ Ro.snapshot = Snap.snapshot();
+
+ std::unique_ptr<rocksdb::Iterator> It{m_RocksDb->NewIterator(Ro)};
+
+ const KeyStruct ZeroKey{.TimestampLittleEndian = 0};
+ rocksdb::Slice ZeroKeySlice{(const char*)&ZeroKey, sizeof ZeroKey};
+
+ It->Seek(ZeroKeySlice);
+
+ while (It->Valid())
+ {
+ rocksdb::Slice KeySlice = It->key();
+ rocksdb::Slice ValueSlice = It->value();
+
+ if (KeySlice.size() == sizeof(KeyStruct))
+ {
+ IoBuffer ValueBuffer(IoBuffer::Wrap, ValueSlice.data(), ValueSlice.size());
+
+ CbObject Value = LoadCompactBinaryObject(ValueBuffer);
+
+ uint64_t Key = FromNetworkOrder(*reinterpret_cast<const uint64_t*>(KeySlice.data()));
+
+ Callback(Key, Value);
+ }
+
+ It->Next();
+ }
+ }
+
+ std::unique_ptr<rocksdb::DB> m_RocksDb;
+ std::vector<rocksdb::ColumnFamilyHandle*> m_RocksDbColumnHandles;
+ CacheAccessSnapshot m_CurrentSnapshot;
+};
+
+ZenCacheTracker::ZenCacheTracker(std::filesystem::path StateDirectory) : m_Impl(new Impl(StateDirectory))
+{
+}
+
+ZenCacheTracker::~ZenCacheTracker()
+{
+ delete m_Impl;
+}
+
+void
+ZenCacheTracker::TrackAccess(std::string_view BucketSegment, const IoHash& HashKey)
+{
+ m_Impl->TrackAccess(BucketSegment, HashKey);
+}
+
+void
+ZenCacheTracker::SaveSnapshot()
+{
+ m_Impl->SaveSnapshot();
+}
+
+void
+ZenCacheTracker::IterateSnapshots(std::function<void(uint64_t TimeStamp, CbObject Snapshot)>&& Callback)
+{
+ m_Impl->IterateSnapshots(std::move(Callback));
+}
+
+# if ZEN_WITH_TESTS
+
+TEST_CASE("z$.tracker")
+{
+ using namespace std::literals;
+
+ const uint64_t t0 = GetCurrentCacheTimeStamp();
+
+ ScopedTemporaryDirectory TempDir;
+
+ ZenCacheTracker Zcs(TempDir.Path());
+
+ tsl::robin_set<IoHash> KeyHashes;
+
+ for (int i = 0; i < 10000; ++i)
+ {
+ IoHash KeyHash = IoHash::HashBuffer(&i, sizeof i);
+
+ KeyHashes.insert(KeyHash);
+
+ Zcs.TrackAccess("foo"sv, KeyHash);
+ }
+
+ for (int i = 0; i < 10000; ++i)
+ {
+ IoHash KeyHash = IoHash::HashBuffer(&i, sizeof i);
+
+ Zcs.TrackAccess("foo"sv, KeyHash);
+ }
+
+ Zcs.SaveSnapshot();
+
+ for (int n = 0; n < 10; ++n)
+ {
+ for (int i = 0; i < 1000; ++i)
+ {
+ const int Index = i + n * 1000;
+ IoHash KeyHash = IoHash::HashBuffer(&Index, sizeof Index);
+
+ Zcs.TrackAccess("foo"sv, KeyHash);
+ }
+
+ Zcs.SaveSnapshot();
+ }
+
+ Zcs.SaveSnapshot();
+
+ const uint64_t t1 = GetCurrentCacheTimeStamp();
+
+ int SnapshotCount = 0;
+
+ Zcs.IterateSnapshots([&](uint64_t TimeStamp, CbObject Snapshot) {
+ CHECK(TimeStamp >= t0);
+ CHECK(TimeStamp <= t1);
+
+ for (auto& Field : Snapshot)
+ {
+ CHECK_EQ(Field.GetName(), "foo"sv);
+
+ const CbArray& Array = Field.AsArray();
+
+ for (const auto& Element : Array)
+ {
+ CHECK(KeyHashes.contains(Element.GetValue().AsHash()));
+ }
+ }
+
+ ++SnapshotCount;
+ });
+
+ CHECK_EQ(SnapshotCount, 11);
+}
+
+# endif
+
+void
+cachetracker_forcelink()
+{
+}
+
+} // namespace zen
+
+#endif // ZEN_USE_CACHE_TRACKER