diff options
| author | Stefan Boberg <[email protected]> | 2021-11-17 14:57:24 +0100 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-11-17 14:57:24 +0100 |
| commit | 7db8b001e097c58e005aab820cdcf9ec5e662a32 (patch) | |
| tree | 4e44fa25324c9f2cced3acbc707e8a824168f36e /zenserver/cache/cachetracking.cpp | |
| parent | zen: eliminated unused deploy command (diff) | |
| download | zen-7db8b001e097c58e005aab820cdcf9ec5e662a32.tar.xz zen-7db8b001e097c58e005aab820cdcf9ec5e662a32.zip | |
gc: implemented timestamped snapshot persistence
Diffstat (limited to 'zenserver/cache/cachetracking.cpp')
| -rw-r--r-- | zenserver/cache/cachetracking.cpp | 182 |
1 files changed, 140 insertions, 42 deletions
diff --git a/zenserver/cache/cachetracking.cpp b/zenserver/cache/cachetracking.cpp index 93aeb4aba..6a9b2f403 100644 --- a/zenserver/cache/cachetracking.cpp +++ b/zenserver/cache/cachetracking.cpp @@ -2,8 +2,12 @@ #include "cachetracking.h" +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalue.h> +#include <zencore/endian.h> #include <zencore/filesystem.h> #include <zencore/logging.h> +#include <zencore/scopeguard.h> #include <zencore/string.h> #include <zencore/testing.h> @@ -26,13 +30,13 @@ namespace rocksdb = ROCKSDB_NAMESPACE; static constinit auto Epoch = std::chrono::time_point<std::chrono::system_clock>{}; -uint32_t -GetCurrentTimeStamp() +static uint64_t +GetCurrentCacheTimeStamp() { - auto Duration = std::chrono::system_clock::now() - Epoch; - auto Minutes = std::chrono::duration_cast<std::chrono::minutes>(Duration).count(); + auto Duration = std::chrono::system_clock::now() - Epoch; + uint64_t Millis = std::chrono::duration_cast<std::chrono::milliseconds>(Duration).count(); - return Minutes; + return Millis; } struct CacheAccessSnapshot @@ -40,11 +44,23 @@ struct CacheAccessSnapshot public: void TrackAccess(std::string_view BucketSegment, const IoHash& HashKey) { - BucketTracker* Tracker = MapBucketToIndex(std::string(BucketSegment)); + BucketTracker* Tracker = GetBucket(std::string(BucketSegment)); Tracker->Track(HashKey); } + void SerializeSnapshot(CbObjectWriter& Cbo) + { + RwLock::ExclusiveLockScope _(m_Lock); + + for (const auto& Kv : m_BucketMapping) + { + Cbo.BeginArray(Kv.first); + m_Buckets[Kv.second]->SerializeSnapshotAndClear(Cbo); + Cbo.EndArray(); + } + } + private: struct BucketTracker { @@ -53,14 +69,33 @@ private: void Track(const IoHash& HashKey) { + if (RwLock::SharedLockScope _(Lock); AccessedKeys.contains(HashKey)) + { + return; + } + RwLock::ExclusiveLockScope _(Lock); + AccessedKeys.insert(HashKey); } + + void SerializeSnapshotAndClear(CbObjectWriter& Cbo) + { + RwLock::ExclusiveLockScope _(Lock); + + for (const IoHash& Hash : AccessedKeys) + { + Cbo.AddHash(Hash); + } + + AccessedKeys.clear(); + } }; - BucketTracker* MapBucketToIndex(const std::string& BucketName) + BucketTracker* GetBucket(const std::string& BucketName) { RwLock::SharedLockScope _(m_Lock); + if (auto It = m_BucketMapping.find(BucketName); It == m_BucketMapping.end()) { _.ReleaseNow(); @@ -155,54 +190,57 @@ struct ZenCacheTracker::Impl struct KeyStruct { - uint16_t BucketId; - IoHash HashKey; + uint64_t TimestampLittleEndian; }; - struct ValueStruct - { - uint32_t CreateTime = 0; - uint32_t AccessTime = 0; - uint32_t AccessCount = 0; - uint32_t LastGcCount = 0; - }; + void TrackAccess(std::string_view BucketSegment, const IoHash& HashKey) { m_CurrentSnapshot.TrackAccess(BucketSegment, HashKey); } - void TrackAccess(std::string_view BucketSegment, const IoHash& HashKey) + void SaveSnapshot() { - const uint32_t Ts = GetCurrentTimeStamp(); - rocksdb::WriteOptions Wo; - rocksdb::ReadOptions Ro; + CbObjectWriter Cbo; + m_CurrentSnapshot.SerializeSnapshot(Cbo); + IoBuffer SnapshotBuffer = Cbo.Save().GetBuffer().AsIoBuffer(); + + const KeyStruct Key{.TimestampLittleEndian = ToNetworkOrder(GetCurrentCacheTimeStamp())}; + rocksdb::Slice KeySlice{(const char*)&Key, sizeof Key}; + rocksdb::Slice ValueSlice{(char*)SnapshotBuffer.Data(), SnapshotBuffer.Size()}; - ZEN_UNUSED(BucketSegment); + rocksdb::WriteOptions Wo; + m_RocksDb->Put(Wo, KeySlice, ValueSlice); + } - ValueStruct Value; + void IterateSnapshots(std::function<void(uint64_t TimeStamp, CbObject Snapshot)>&& Callback) + { + rocksdb::ManagedSnapshot Snap(m_RocksDb.get()); - rocksdb::Slice ValueSlice{(char*)&Value, sizeof Value}; + rocksdb::ReadOptions Ro; + Ro.snapshot = Snap.snapshot(); - KeyStruct Key; - Key.BucketId = 0; - Key.HashKey = HashKey; + std::unique_ptr<rocksdb::Iterator> It{m_RocksDb->NewIterator(Ro)}; - rocksdb::Slice KeySlice{(char*)&Key, sizeof Key}; + const KeyStruct ZeroKey{.TimestampLittleEndian = 0}; + rocksdb::Slice ZeroKeySlice{(const char*)&ZeroKey, sizeof ZeroKey}; - std::string ValueString; - rocksdb::Status Status = m_RocksDb->Get(Ro, KeySlice, &ValueString); + It->Seek(ZeroKeySlice); - if (Status.ok() && ValueString.size() == sizeof(ValueStruct)) + while (It->Valid()) { - memcpy(&Value, ValueString.data(), ValueString.size()); + rocksdb::Slice KeySlice = It->key(); + rocksdb::Slice ValueSlice = It->value(); - ++Value.AccessCount; - } - else - { - Value.CreateTime = Ts; - Value.AccessCount = 1; - } + if (KeySlice.size() == sizeof(KeyStruct)) + { + IoBuffer ValueBuffer(IoBuffer::Wrap, ValueSlice.data(), ValueSlice.size()); - Value.AccessTime = Ts; + CbObject Value = LoadCompactBinaryObject(ValueBuffer); - m_RocksDb->Put(Wo, KeySlice, ValueSlice); + uint64_t Key = FromNetworkOrder(*reinterpret_cast<const uint64_t*>(KeySlice.data())); + + Callback(Key, Value); + } + + It->Next(); + } } std::unique_ptr<rocksdb::DB> m_RocksDb; @@ -225,6 +263,18 @@ ZenCacheTracker::TrackAccess(std::string_view BucketSegment, const IoHash& HashK m_Impl->TrackAccess(BucketSegment, HashKey); } +void +ZenCacheTracker::SaveSnapshot() +{ + m_Impl->SaveSnapshot(); +} + +void +ZenCacheTracker::IterateSnapshots(std::function<void(uint64_t TimeStamp, CbObject Snapshot)>&& Callback) +{ + m_Impl->IterateSnapshots(std::move(Callback)); +} + #if ZEN_WITH_TESTS TEST_CASE("z$.tracker") @@ -232,23 +282,71 @@ TEST_CASE("z$.tracker") using namespace fmt::literals; using namespace std::literals; + const uint64_t t0 = GetCurrentCacheTimeStamp(); + ScopedTemporaryDirectory TempDir; ZenCacheTracker Zcs(TempDir.Path()); + tsl::robin_set<IoHash> KeyHashes; + for (int i = 0; i < 10000; ++i) { IoHash KeyHash = IoHash::HashBuffer(&i, sizeof i); - Zcs.TrackAccess("foo", KeyHash); + KeyHashes.insert(KeyHash); + + Zcs.TrackAccess("foo"sv, KeyHash); } for (int i = 0; i < 10000; ++i) { IoHash KeyHash = IoHash::HashBuffer(&i, sizeof i); - Zcs.TrackAccess("foo", KeyHash); + Zcs.TrackAccess("foo"sv, KeyHash); } + + Zcs.SaveSnapshot(); + + for (int n = 0; n < 10; ++n) + { + for (int i = 0; i < 1000; ++i) + { + const int Index = i + n * 1000; + IoHash KeyHash = IoHash::HashBuffer(&Index, sizeof Index); + + Zcs.TrackAccess("foo"sv, KeyHash); + } + + Zcs.SaveSnapshot(); + } + + Zcs.SaveSnapshot(); + + const uint64_t t1 = GetCurrentCacheTimeStamp(); + + int SnapshotCount = 0; + + Zcs.IterateSnapshots([&](uint64_t TimeStamp, CbObject Snapshot) { + CHECK(TimeStamp >= t0); + CHECK(TimeStamp <= t1); + + for (auto& Field : Snapshot) + { + CHECK_EQ(Field.GetName(), "foo"sv); + + const CbArray& Array = Field.AsArray(); + + for (const auto& Element : Array) + { + CHECK(KeyHashes.contains(Element.GetValue().AsHash())); + } + } + + ++SnapshotCount; + }); + + CHECK_EQ(SnapshotCount, 11); } #endif |