aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/cache')
-rw-r--r--zenserver/cache/cachetracking.cpp376
-rw-r--r--zenserver/cache/cachetracking.h41
-rw-r--r--zenserver/cache/structuredcache.cpp3159
-rw-r--r--zenserver/cache/structuredcache.h187
-rw-r--r--zenserver/cache/structuredcachestore.cpp3648
-rw-r--r--zenserver/cache/structuredcachestore.h535
6 files changed, 0 insertions, 7946 deletions
diff --git a/zenserver/cache/cachetracking.cpp b/zenserver/cache/cachetracking.cpp
deleted file mode 100644
index 9119e3122..000000000
--- a/zenserver/cache/cachetracking.cpp
+++ /dev/null
@@ -1,376 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "cachetracking.h"
-
-#if ZEN_USE_CACHE_TRACKER
-
-# include <zencore/compactbinarybuilder.h>
-# include <zencore/compactbinaryvalue.h>
-# include <zencore/endian.h>
-# include <zencore/filesystem.h>
-# include <zencore/logging.h>
-# include <zencore/scopeguard.h>
-# include <zencore/string.h>
-
-# include <zencore/testing.h>
-# include <zencore/testutils.h>
-
-ZEN_THIRD_PARTY_INCLUDES_START
-# pragma comment(lib, "Rpcrt4.lib") // RocksDB made me do this
-# include <fmt/format.h>
-# include <rocksdb/db.h>
-# include <tsl/robin_map.h>
-# include <tsl/robin_set.h>
-# include <gsl/gsl-lite.hpp>
-ZEN_THIRD_PARTY_INCLUDES_END
-
-namespace zen {
-
-namespace rocksdb = ROCKSDB_NAMESPACE;
-
-static constinit auto Epoch = std::chrono::time_point<std::chrono::system_clock>{};
-
-static uint64_t
-GetCurrentCacheTimeStamp()
-{
- auto Duration = std::chrono::system_clock::now() - Epoch;
- uint64_t Millis = std::chrono::duration_cast<std::chrono::milliseconds>(Duration).count();
-
- return Millis;
-}
-
-struct CacheAccessSnapshot
-{
-public:
- void TrackAccess(std::string_view BucketSegment, const IoHash& HashKey)
- {
- BucketTracker* Tracker = GetBucket(std::string(BucketSegment));
-
- Tracker->Track(HashKey);
- }
-
- bool SerializeSnapshot(CbObjectWriter& Cbo)
- {
- bool Serialized = false;
- RwLock::ExclusiveLockScope _(m_Lock);
-
- for (const auto& Kv : m_BucketMapping)
- {
- if (m_Buckets[Kv.second]->Size())
- {
- Cbo.BeginArray(Kv.first);
- m_Buckets[Kv.second]->SerializeSnapshotAndClear(Cbo);
- Cbo.EndArray();
- Serialized = true;
- }
- }
-
- return Serialized;
- }
-
-private:
- struct BucketTracker
- {
- mutable RwLock Lock;
- tsl::robin_set<IoHash> AccessedKeys;
-
- void Track(const IoHash& HashKey)
- {
- if (RwLock::SharedLockScope _(Lock); AccessedKeys.contains(HashKey))
- {
- return;
- }
-
- RwLock::ExclusiveLockScope _(Lock);
-
- AccessedKeys.insert(HashKey);
- }
-
- void SerializeSnapshotAndClear(CbObjectWriter& Cbo)
- {
- RwLock::ExclusiveLockScope _(Lock);
-
- for (const IoHash& Hash : AccessedKeys)
- {
- Cbo.AddHash(Hash);
- }
-
- AccessedKeys.clear();
- }
-
- size_t Size() const
- {
- RwLock::SharedLockScope _(Lock);
- return AccessedKeys.size();
- }
- };
-
- BucketTracker* GetBucket(const std::string& BucketName)
- {
- RwLock::SharedLockScope _(m_Lock);
-
- if (auto It = m_BucketMapping.find(BucketName); It == m_BucketMapping.end())
- {
- _.ReleaseNow();
-
- return AddNewBucket(BucketName);
- }
- else
- {
- return m_Buckets[It->second].get();
- }
- }
-
- BucketTracker* AddNewBucket(const std::string& BucketName)
- {
- RwLock::ExclusiveLockScope _(m_Lock);
-
- if (auto It = m_BucketMapping.find(BucketName); It == m_BucketMapping.end())
- {
- const uint32_t BucketIndex = gsl::narrow<uint32_t>(m_Buckets.size());
- m_Buckets.emplace_back(std::make_unique<BucketTracker>());
- m_BucketMapping[BucketName] = BucketIndex;
-
- return m_Buckets[BucketIndex].get();
- }
- else
- {
- return m_Buckets[It->second].get();
- }
- }
-
- RwLock m_Lock;
- std::vector<std::unique_ptr<BucketTracker>> m_Buckets;
- tsl::robin_map<std::string, uint32_t> m_BucketMapping;
-};
-
-struct ZenCacheTracker::Impl
-{
- Impl(std::filesystem::path StateDirectory)
- {
- std::filesystem::path StatsDbPath{StateDirectory / ".zdb"};
-
- std::string RocksdbPath = StatsDbPath.string();
-
- ZEN_DEBUG("opening tracker db at '{}'", RocksdbPath);
-
- rocksdb::DB* Db = nullptr;
- rocksdb::DBOptions Options;
- Options.create_if_missing = true;
-
- std::vector<std::string> ExistingColumnFamilies;
- rocksdb::Status Status = rocksdb::DB::ListColumnFamilies(Options, RocksdbPath, &ExistingColumnFamilies);
-
- std::vector<rocksdb::ColumnFamilyDescriptor> ColumnDescriptors;
-
- if (Status.IsPathNotFound())
- {
- ColumnDescriptors.emplace_back(rocksdb::ColumnFamilyDescriptor{rocksdb::kDefaultColumnFamilyName, {}});
- }
- else if (Status.ok())
- {
- for (const std::string& Column : ExistingColumnFamilies)
- {
- rocksdb::ColumnFamilyDescriptor ColumnFamily;
- ColumnFamily.name = Column;
- ColumnDescriptors.push_back(ColumnFamily);
- }
- }
- else
- {
- throw std::runtime_error(fmt::format("column family iteration failed for '{}': '{}'", RocksdbPath, Status.getState()).c_str());
- }
-
- Status = rocksdb::DB::Open(Options, RocksdbPath, ColumnDescriptors, &m_RocksDbColumnHandles, &Db);
-
- if (!Status.ok())
- {
- throw std::runtime_error(fmt::format("database open failed for '{}': '{}'", RocksdbPath, Status.getState()).c_str());
- }
-
- m_RocksDb.reset(Db);
- }
-
- ~Impl()
- {
- for (auto* Column : m_RocksDbColumnHandles)
- {
- delete Column;
- }
-
- m_RocksDbColumnHandles.clear();
- }
-
- struct KeyStruct
- {
- uint64_t TimestampLittleEndian;
- };
-
- void TrackAccess(std::string_view BucketSegment, const IoHash& HashKey) { m_CurrentSnapshot.TrackAccess(BucketSegment, HashKey); }
-
- void SaveSnapshot()
- {
- CbObjectWriter Cbo;
-
- if (m_CurrentSnapshot.SerializeSnapshot(Cbo))
- {
- IoBuffer SnapshotBuffer = Cbo.Save().GetBuffer().AsIoBuffer();
-
- const KeyStruct Key{.TimestampLittleEndian = ToNetworkOrder(GetCurrentCacheTimeStamp())};
- rocksdb::Slice KeySlice{(const char*)&Key, sizeof Key};
- rocksdb::Slice ValueSlice{(char*)SnapshotBuffer.Data(), SnapshotBuffer.Size()};
-
- rocksdb::WriteOptions Wo;
- m_RocksDb->Put(Wo, KeySlice, ValueSlice);
- }
- }
-
- void IterateSnapshots(std::function<void(uint64_t TimeStamp, CbObject Snapshot)>&& Callback)
- {
- rocksdb::ManagedSnapshot Snap(m_RocksDb.get());
-
- rocksdb::ReadOptions Ro;
- Ro.snapshot = Snap.snapshot();
-
- std::unique_ptr<rocksdb::Iterator> It{m_RocksDb->NewIterator(Ro)};
-
- const KeyStruct ZeroKey{.TimestampLittleEndian = 0};
- rocksdb::Slice ZeroKeySlice{(const char*)&ZeroKey, sizeof ZeroKey};
-
- It->Seek(ZeroKeySlice);
-
- while (It->Valid())
- {
- rocksdb::Slice KeySlice = It->key();
- rocksdb::Slice ValueSlice = It->value();
-
- if (KeySlice.size() == sizeof(KeyStruct))
- {
- IoBuffer ValueBuffer(IoBuffer::Wrap, ValueSlice.data(), ValueSlice.size());
-
- CbObject Value = LoadCompactBinaryObject(ValueBuffer);
-
- uint64_t Key = FromNetworkOrder(*reinterpret_cast<const uint64_t*>(KeySlice.data()));
-
- Callback(Key, Value);
- }
-
- It->Next();
- }
- }
-
- std::unique_ptr<rocksdb::DB> m_RocksDb;
- std::vector<rocksdb::ColumnFamilyHandle*> m_RocksDbColumnHandles;
- CacheAccessSnapshot m_CurrentSnapshot;
-};
-
-ZenCacheTracker::ZenCacheTracker(std::filesystem::path StateDirectory) : m_Impl(new Impl(StateDirectory))
-{
-}
-
-ZenCacheTracker::~ZenCacheTracker()
-{
- delete m_Impl;
-}
-
-void
-ZenCacheTracker::TrackAccess(std::string_view BucketSegment, const IoHash& HashKey)
-{
- m_Impl->TrackAccess(BucketSegment, HashKey);
-}
-
-void
-ZenCacheTracker::SaveSnapshot()
-{
- m_Impl->SaveSnapshot();
-}
-
-void
-ZenCacheTracker::IterateSnapshots(std::function<void(uint64_t TimeStamp, CbObject Snapshot)>&& Callback)
-{
- m_Impl->IterateSnapshots(std::move(Callback));
-}
-
-# if ZEN_WITH_TESTS
-
-TEST_CASE("z$.tracker")
-{
- using namespace std::literals;
-
- const uint64_t t0 = GetCurrentCacheTimeStamp();
-
- ScopedTemporaryDirectory TempDir;
-
- ZenCacheTracker Zcs(TempDir.Path());
-
- tsl::robin_set<IoHash> KeyHashes;
-
- for (int i = 0; i < 10000; ++i)
- {
- IoHash KeyHash = IoHash::HashBuffer(&i, sizeof i);
-
- KeyHashes.insert(KeyHash);
-
- Zcs.TrackAccess("foo"sv, KeyHash);
- }
-
- for (int i = 0; i < 10000; ++i)
- {
- IoHash KeyHash = IoHash::HashBuffer(&i, sizeof i);
-
- Zcs.TrackAccess("foo"sv, KeyHash);
- }
-
- Zcs.SaveSnapshot();
-
- for (int n = 0; n < 10; ++n)
- {
- for (int i = 0; i < 1000; ++i)
- {
- const int Index = i + n * 1000;
- IoHash KeyHash = IoHash::HashBuffer(&Index, sizeof Index);
-
- Zcs.TrackAccess("foo"sv, KeyHash);
- }
-
- Zcs.SaveSnapshot();
- }
-
- Zcs.SaveSnapshot();
-
- const uint64_t t1 = GetCurrentCacheTimeStamp();
-
- int SnapshotCount = 0;
-
- Zcs.IterateSnapshots([&](uint64_t TimeStamp, CbObject Snapshot) {
- CHECK(TimeStamp >= t0);
- CHECK(TimeStamp <= t1);
-
- for (auto& Field : Snapshot)
- {
- CHECK_EQ(Field.GetName(), "foo"sv);
-
- const CbArray& Array = Field.AsArray();
-
- for (const auto& Element : Array)
- {
- CHECK(KeyHashes.contains(Element.GetValue().AsHash()));
- }
- }
-
- ++SnapshotCount;
- });
-
- CHECK_EQ(SnapshotCount, 11);
-}
-
-# endif
-
-void
-cachetracker_forcelink()
-{
-}
-
-} // namespace zen
-
-#endif // ZEN_USE_CACHE_TRACKER
diff --git a/zenserver/cache/cachetracking.h b/zenserver/cache/cachetracking.h
deleted file mode 100644
index fdfe1a4c7..000000000
--- a/zenserver/cache/cachetracking.h
+++ /dev/null
@@ -1,41 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/iohash.h>
-
-#include <stdint.h>
-#include <filesystem>
-#include <functional>
-
-namespace zen {
-
-#define ZEN_USE_CACHE_TRACKER 0
-#if ZEN_USE_CACHE_TRACKER
-
-class CbObject;
-
-/**
- */
-
-class ZenCacheTracker
-{
-public:
- ZenCacheTracker(std::filesystem::path StateDirectory);
- ~ZenCacheTracker();
-
- void TrackAccess(std::string_view BucketSegment, const IoHash& HashKey);
- void SaveSnapshot();
- void IterateSnapshots(std::function<void(uint64_t TimeStamp, CbObject Snapshot)>&& Callback);
-
-private:
- struct Impl;
-
- Impl* m_Impl = nullptr;
-};
-
-void cachetracker_forcelink();
-
-#endif // ZEN_USE_CACHE_TRACKER
-
-} // namespace zen
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
deleted file mode 100644
index 90e905bf6..000000000
--- a/zenserver/cache/structuredcache.cpp
+++ /dev/null
@@ -1,3159 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "structuredcache.h"
-
-#include <zencore/compactbinary.h>
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinarypackage.h>
-#include <zencore/compactbinaryvalidation.h>
-#include <zencore/compress.h>
-#include <zencore/enumflags.h>
-#include <zencore/fmtutils.h>
-#include <zencore/logging.h>
-#include <zencore/scopeguard.h>
-#include <zencore/stream.h>
-#include <zencore/timer.h>
-#include <zencore/trace.h>
-#include <zencore/workthreadpool.h>
-#include <zenhttp/httpserver.h>
-#include <zenhttp/httpshared.h>
-#include <zenutil/cache/cache.h>
-#include <zenutil/cache/rpcrecording.h>
-
-#include "monitoring/httpstats.h"
-#include "structuredcachestore.h"
-#include "upstream/jupiter.h"
-#include "upstream/upstreamcache.h"
-#include "upstream/zen.h"
-#include "zenstore/cidstore.h"
-#include "zenstore/scrubcontext.h"
-
-#include <algorithm>
-#include <atomic>
-#include <filesystem>
-#include <queue>
-#include <thread>
-
-#include <cpr/cpr.h>
-#include <gsl/gsl-lite.hpp>
-
-#if ZEN_WITH_TESTS
-# include <zencore/testing.h>
-# include <zencore/testutils.h>
-#endif
-
-namespace zen {
-
-using namespace std::literals;
-
-//////////////////////////////////////////////////////////////////////////
-
-CachePolicy
-ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams)
-{
- std::string_view PolicyText = QueryParams.GetValue("Policy"sv);
- return !PolicyText.empty() ? zen::ParseCachePolicy(PolicyText) : CachePolicy::Default;
-}
-
-CacheRecordPolicy
-LoadCacheRecordPolicy(CbObjectView Object, CachePolicy DefaultPolicy = CachePolicy::Default)
-{
- OptionalCacheRecordPolicy Policy = CacheRecordPolicy::Load(Object);
- return Policy ? std::move(Policy).Get() : CacheRecordPolicy(DefaultPolicy);
-}
-
-struct AttachmentCount
-{
- uint32_t New = 0;
- uint32_t Valid = 0;
- uint32_t Invalid = 0;
- uint32_t Total = 0;
-};
-
-struct PutRequestData
-{
- std::string Namespace;
- CacheKey Key;
- CbObjectView RecordObject;
- CacheRecordPolicy Policy;
-};
-
-namespace {
- static constinit std::string_view HttpZCacheRPCPrefix = "$rpc"sv;
- static constinit std::string_view HttpZCacheUtilStartRecording = "exec$/start-recording"sv;
- static constinit std::string_view HttpZCacheUtilStopRecording = "exec$/stop-recording"sv;
- static constinit std::string_view HttpZCacheUtilReplayRecording = "exec$/replay-recording"sv;
- static constinit std::string_view HttpZCacheDetailsPrefix = "details$"sv;
-
- struct HttpRequestData
- {
- std::optional<std::string> Namespace;
- std::optional<std::string> Bucket;
- std::optional<IoHash> HashKey;
- std::optional<IoHash> ValueContentId;
- };
-
- constinit AsciiSet ValidNamespaceNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789-_.ABCDEFGHIJKLMNOPQRSTUVWXYZ"};
- constinit AsciiSet ValidBucketNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"};
-
- std::optional<std::string> GetValidNamespaceName(std::string_view Name)
- {
- if (Name.empty())
- {
- ZEN_WARN("Namespace is invalid, empty namespace is not allowed");
- return {};
- }
-
- if (Name.length() > 64)
- {
- ZEN_WARN("Namespace '{}' is invalid, length exceeds 64 characters", Name);
- return {};
- }
-
- if (!AsciiSet::HasOnly(Name, ValidNamespaceNameCharactersSet))
- {
- ZEN_WARN("Namespace '{}' is invalid, invalid characters detected", Name);
- return {};
- }
-
- return ToLower(Name);
- }
-
- std::optional<std::string> GetValidBucketName(std::string_view Name)
- {
- if (Name.empty())
- {
- ZEN_WARN("Bucket name is invalid, empty bucket name is not allowed");
- return {};
- }
-
- if (!AsciiSet::HasOnly(Name, ValidBucketNameCharactersSet))
- {
- ZEN_WARN("Bucket name '{}' is invalid, invalid characters detected", Name);
- return {};
- }
-
- return ToLower(Name);
- }
-
- std::optional<IoHash> GetValidIoHash(std::string_view Hash)
- {
- if (Hash.length() != IoHash::StringLength)
- {
- return {};
- }
-
- IoHash KeyHash;
- if (!ParseHexBytes(Hash.data(), Hash.size(), KeyHash.Hash))
- {
- return {};
- }
- return KeyHash;
- }
-
- bool HttpRequestParseRelativeUri(std::string_view Key, HttpRequestData& Data)
- {
- std::vector<std::string_view> Tokens;
- uint32_t TokenCount = ForEachStrTok(Key, '/', [&](const std::string_view& Token) {
- Tokens.push_back(Token);
- return true;
- });
-
- switch (TokenCount)
- {
- case 0:
- return true;
- case 1:
- Data.Namespace = GetValidNamespaceName(Tokens[0]);
- return Data.Namespace.has_value();
- case 2:
- {
- std::optional<IoHash> PossibleHashKey = GetValidIoHash(Tokens[1]);
- if (PossibleHashKey.has_value())
- {
- // Legacy bucket/key request
- Data.Bucket = GetValidBucketName(Tokens[0]);
- if (!Data.Bucket.has_value())
- {
- return false;
- }
- Data.HashKey = PossibleHashKey;
- Data.Namespace = ZenCacheStore::DefaultNamespace;
- return true;
- }
- Data.Namespace = GetValidNamespaceName(Tokens[0]);
- if (!Data.Namespace.has_value())
- {
- return false;
- }
- Data.Bucket = GetValidBucketName(Tokens[1]);
- if (!Data.Bucket.has_value())
- {
- return false;
- }
- return true;
- }
- case 3:
- {
- std::optional<IoHash> PossibleHashKey = GetValidIoHash(Tokens[1]);
- if (PossibleHashKey.has_value())
- {
- // Legacy bucket/key/valueid request
- Data.Bucket = GetValidBucketName(Tokens[0]);
- if (!Data.Bucket.has_value())
- {
- return false;
- }
- Data.HashKey = PossibleHashKey;
- Data.ValueContentId = GetValidIoHash(Tokens[2]);
- if (!Data.ValueContentId.has_value())
- {
- return false;
- }
- Data.Namespace = ZenCacheStore::DefaultNamespace;
- return true;
- }
- Data.Namespace = GetValidNamespaceName(Tokens[0]);
- if (!Data.Namespace.has_value())
- {
- return false;
- }
- Data.Bucket = GetValidBucketName(Tokens[1]);
- if (!Data.Bucket.has_value())
- {
- return false;
- }
- Data.HashKey = GetValidIoHash(Tokens[2]);
- if (!Data.HashKey)
- {
- return false;
- }
- return true;
- }
- case 4:
- {
- Data.Namespace = GetValidNamespaceName(Tokens[0]);
- if (!Data.Namespace.has_value())
- {
- return false;
- }
-
- Data.Bucket = GetValidBucketName(Tokens[1]);
- if (!Data.Bucket.has_value())
- {
- return false;
- }
-
- Data.HashKey = GetValidIoHash(Tokens[2]);
- if (!Data.HashKey.has_value())
- {
- return false;
- }
-
- Data.ValueContentId = GetValidIoHash(Tokens[3]);
- if (!Data.ValueContentId.has_value())
- {
- return false;
- }
- return true;
- }
- default:
- return false;
- }
- }
-
- std::optional<std::string> GetRpcRequestNamespace(const CbObjectView Params)
- {
- CbFieldView NamespaceField = Params["Namespace"sv];
- if (!NamespaceField)
- {
- return std::string(ZenCacheStore::DefaultNamespace);
- }
-
- if (NamespaceField.HasError())
- {
- return {};
- }
- if (!NamespaceField.IsString())
- {
- return {};
- }
- return GetValidNamespaceName(NamespaceField.AsString());
- }
-
- bool GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key)
- {
- CbFieldView BucketField = KeyView["Bucket"sv];
- if (BucketField.HasError())
- {
- return false;
- }
- if (!BucketField.IsString())
- {
- return false;
- }
- std::optional<std::string> Bucket = GetValidBucketName(BucketField.AsString());
- if (!Bucket.has_value())
- {
- return false;
- }
- CbFieldView HashField = KeyView["Hash"sv];
- if (HashField.HasError())
- {
- return false;
- }
- if (!HashField.IsHash())
- {
- return false;
- }
- IoHash Hash = HashField.AsHash();
- Key = CacheKey::Create(*Bucket, Hash);
- return true;
- }
-
-} // namespace
-
-//////////////////////////////////////////////////////////////////////////
-
-HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
- CidStore& InCidStore,
- HttpStatsService& StatsService,
- HttpStatusService& StatusService,
- UpstreamCache& UpstreamCache)
-: m_Log(logging::Get("cache"))
-, m_CacheStore(InCacheStore)
-, m_StatsService(StatsService)
-, m_StatusService(StatusService)
-, m_CidStore(InCidStore)
-, m_UpstreamCache(UpstreamCache)
-{
- m_StatsService.RegisterHandler("z$", *this);
- m_StatusService.RegisterHandler("z$", *this);
-}
-
-HttpStructuredCacheService::~HttpStructuredCacheService()
-{
- ZEN_INFO("closing structured cache");
- m_RequestRecorder.reset();
-
- m_StatsService.UnregisterHandler("z$", *this);
- m_StatusService.UnregisterHandler("z$", *this);
-}
-
-const char*
-HttpStructuredCacheService::BaseUri() const
-{
- return "/z$/";
-}
-
-void
-HttpStructuredCacheService::Flush()
-{
- m_CacheStore.Flush();
-}
-
-void
-HttpStructuredCacheService::Scrub(ScrubContext& Ctx)
-{
- if (m_LastScrubTime == Ctx.ScrubTimestamp())
- {
- return;
- }
-
- m_LastScrubTime = Ctx.ScrubTimestamp();
-
- m_CidStore.Scrub(Ctx);
- m_CacheStore.Scrub(Ctx);
-}
-
-void
-HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request)
-{
- std::string_view Key = Request.RelativeUri();
- std::vector<std::string> Tokens;
- uint32_t TokenCount = ForEachStrTok(Key, '/', [&Tokens](std::string_view Token) {
- Tokens.push_back(std::string(Token));
- return true;
- });
- std::string FilterNamespace;
- std::string FilterBucket;
- std::string FilterValue;
- switch (TokenCount)
- {
- case 1:
- break;
- case 2:
- {
- FilterNamespace = Tokens[1];
- if (FilterNamespace.empty())
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
- }
- }
- break;
- case 3:
- {
- FilterNamespace = Tokens[1];
- if (FilterNamespace.empty())
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
- }
- FilterBucket = Tokens[2];
- if (FilterBucket.empty())
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
- }
- }
- break;
- case 4:
- {
- FilterNamespace = Tokens[1];
- if (FilterNamespace.empty())
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
- }
- FilterBucket = Tokens[2];
- if (FilterBucket.empty())
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
- }
- FilterValue = Tokens[3];
- if (FilterValue.empty())
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
- }
- }
- break;
- default:
- return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
- }
-
- HttpServerRequest::QueryParams Params = Request.GetQueryParams();
- bool CSV = Params.GetValue("csv") == "true";
- bool Details = Params.GetValue("details") == "true";
- bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true";
-
- std::chrono::seconds NowSeconds = std::chrono::duration_cast<std::chrono::seconds>(GcClock::Now().time_since_epoch());
- CacheValueDetails ValueDetails = m_CacheStore.GetValueDetails(FilterNamespace, FilterBucket, FilterValue);
-
- if (CSV)
- {
- ExtendableStringBuilder<4096> CSVWriter;
- if (AttachmentDetails)
- {
- CSVWriter << "Namespace, Bucket, Key, Cid, Size";
- }
- else if (Details)
- {
- CSVWriter << "Namespace, Bucket, Key, Size, RawSize, RawHash, ContentType, Age, AttachmentsCount, AttachmentsSize";
- }
- else
- {
- CSVWriter << "Namespace, Bucket, Key";
- }
- for (const auto& NamespaceIt : ValueDetails.Namespaces)
- {
- const std::string& Namespace = NamespaceIt.first;
- for (const auto& BucketIt : NamespaceIt.second.Buckets)
- {
- const std::string& Bucket = BucketIt.first;
- for (const auto& ValueIt : BucketIt.second.Values)
- {
- if (AttachmentDetails)
- {
- for (const IoHash& Hash : ValueIt.second.Attachments)
- {
- IoBuffer Payload = m_CidStore.FindChunkByCid(Hash);
- CSVWriter << "\r\n"
- << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << Hash.ToHexString()
- << ", " << gsl::narrow<uint64_t>(Payload.GetSize());
- }
- }
- else if (Details)
- {
- std::chrono::seconds LastAccessedSeconds = std::chrono::duration_cast<std::chrono::seconds>(
- GcClock::TimePointFromTick(ValueIt.second.LastAccess).time_since_epoch());
- CSVWriter << "\r\n"
- << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << ValueIt.second.Size << ","
- << ValueIt.second.RawSize << "," << ValueIt.second.RawHash.ToHexString() << ", "
- << ToString(ValueIt.second.ContentType) << ", " << (NowSeconds.count() - LastAccessedSeconds.count())
- << ", " << gsl::narrow<uint64_t>(ValueIt.second.Attachments.size());
- size_t AttachmentsSize = 0;
- for (const IoHash& Hash : ValueIt.second.Attachments)
- {
- IoBuffer Payload = m_CidStore.FindChunkByCid(Hash);
- AttachmentsSize += Payload.GetSize();
- }
- CSVWriter << ", " << gsl::narrow<uint64_t>(AttachmentsSize);
- }
- else
- {
- CSVWriter << "\r\n" << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString();
- }
- }
- }
- }
- return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView());
- }
- else
- {
- CbObjectWriter Cbo;
- Cbo.BeginArray("namespaces");
- {
- for (const auto& NamespaceIt : ValueDetails.Namespaces)
- {
- const std::string& Namespace = NamespaceIt.first;
- Cbo.BeginObject();
- {
- Cbo.AddString("name", Namespace);
- Cbo.BeginArray("buckets");
- {
- for (const auto& BucketIt : NamespaceIt.second.Buckets)
- {
- const std::string& Bucket = BucketIt.first;
- Cbo.BeginObject();
- {
- Cbo.AddString("name", Bucket);
- Cbo.BeginArray("values");
- {
- for (const auto& ValueIt : BucketIt.second.Values)
- {
- std::chrono::seconds LastAccessedSeconds = std::chrono::duration_cast<std::chrono::seconds>(
- GcClock::TimePointFromTick(ValueIt.second.LastAccess).time_since_epoch());
- Cbo.BeginObject();
- {
- Cbo.AddHash("key", ValueIt.first);
- if (Details)
- {
- Cbo.AddInteger("size", ValueIt.second.Size);
- if (ValueIt.second.Size > 0 && ValueIt.second.RawSize != 0 &&
- ValueIt.second.RawSize != ValueIt.second.Size)
- {
- Cbo.AddInteger("rawsize", ValueIt.second.RawSize);
- Cbo.AddHash("rawhash", ValueIt.second.RawHash);
- }
- Cbo.AddString("contenttype", ToString(ValueIt.second.ContentType));
- Cbo.AddInteger("age", NowSeconds.count() - LastAccessedSeconds.count());
- if (ValueIt.second.Attachments.size() > 0)
- {
- if (AttachmentDetails)
- {
- Cbo.BeginArray("attachments");
- {
- for (const IoHash& Hash : ValueIt.second.Attachments)
- {
- Cbo.BeginObject();
- Cbo.AddHash("cid", Hash);
- IoBuffer Payload = m_CidStore.FindChunkByCid(Hash);
- Cbo.AddInteger("size", gsl::narrow<uint64_t>(Payload.GetSize()));
- Cbo.EndObject();
- }
- }
- Cbo.EndArray();
- }
- else
- {
- Cbo.AddInteger("attachmentcount",
- gsl::narrow<uint64_t>(ValueIt.second.Attachments.size()));
- size_t AttachmentsSize = 0;
- for (const IoHash& Hash : ValueIt.second.Attachments)
- {
- IoBuffer Payload = m_CidStore.FindChunkByCid(Hash);
- AttachmentsSize += Payload.GetSize();
- }
- Cbo.AddInteger("attachmentssize", gsl::narrow<uint64_t>(AttachmentsSize));
- }
- }
- }
- }
- Cbo.EndObject();
- }
- }
- Cbo.EndArray();
- }
- Cbo.EndObject();
- }
- }
- Cbo.EndArray();
- }
- Cbo.EndObject();
- }
- }
- Cbo.EndArray();
- Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
- }
-}
-
-void
-HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
-{
- metrics::OperationTiming::Scope $(m_HttpRequests);
-
- std::string_view Key = Request.RelativeUri();
- if (Key == HttpZCacheRPCPrefix)
- {
- return HandleRpcRequest(Request);
- }
-
- if (Key == HttpZCacheUtilStartRecording)
- {
- m_RequestRecorder.reset();
- HttpServerRequest::QueryParams Params = Request.GetQueryParams();
- std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path")));
- m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath);
- Request.WriteResponse(HttpResponseCode::OK);
- return;
- }
- if (Key == HttpZCacheUtilStopRecording)
- {
- m_RequestRecorder.reset();
- Request.WriteResponse(HttpResponseCode::OK);
- return;
- }
- if (Key == HttpZCacheUtilReplayRecording)
- {
- m_RequestRecorder.reset();
- HttpServerRequest::QueryParams Params = Request.GetQueryParams();
- std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path")));
- uint32_t ThreadCount = std::thread::hardware_concurrency();
- if (auto Param = Params.GetValue("thread_count"); Param.empty() == false)
- {
- if (auto Value = ParseInt<uint64_t>(Param))
- {
- ThreadCount = gsl::narrow<uint32_t>(Value.value());
- }
- }
- std::unique_ptr<cache::IRpcRequestReplayer> Replayer(cache::MakeDiskRequestReplayer(RecordPath, false));
- ReplayRequestRecorder(*Replayer, ThreadCount < 1 ? 1 : ThreadCount);
- Request.WriteResponse(HttpResponseCode::OK);
- return;
- }
- if (Key.starts_with(HttpZCacheDetailsPrefix))
- {
- HandleDetailsRequest(Request);
- return;
- }
-
- HttpRequestData RequestData;
- if (!HttpRequestParseRelativeUri(Key, RequestData))
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
- }
-
- if (RequestData.ValueContentId.has_value())
- {
- ZEN_ASSERT(RequestData.Namespace.has_value());
- ZEN_ASSERT(RequestData.Bucket.has_value());
- ZEN_ASSERT(RequestData.HashKey.has_value());
- CacheRef Ref = {.Namespace = RequestData.Namespace.value(),
- .BucketSegment = RequestData.Bucket.value(),
- .HashKey = RequestData.HashKey.value(),
- .ValueContentId = RequestData.ValueContentId.value()};
- return HandleCacheChunkRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams()));
- }
-
- if (RequestData.HashKey.has_value())
- {
- ZEN_ASSERT(RequestData.Namespace.has_value());
- ZEN_ASSERT(RequestData.Bucket.has_value());
- CacheRef Ref = {.Namespace = RequestData.Namespace.value(),
- .BucketSegment = RequestData.Bucket.value(),
- .HashKey = RequestData.HashKey.value(),
- .ValueContentId = IoHash::Zero};
- return HandleCacheRecordRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams()));
- }
-
- if (RequestData.Bucket.has_value())
- {
- ZEN_ASSERT(RequestData.Namespace.has_value());
- return HandleCacheBucketRequest(Request, RequestData.Namespace.value(), RequestData.Bucket.value());
- }
-
- if (RequestData.Namespace.has_value())
- {
- return HandleCacheNamespaceRequest(Request, RequestData.Namespace.value());
- }
- return HandleCacheRequest(Request);
-}
-
-void
-HttpStructuredCacheService::HandleCacheRequest(HttpServerRequest& Request)
-{
- switch (Request.RequestVerb())
- {
- case HttpVerb::kHead:
- case HttpVerb::kGet:
- {
- ZenCacheStore::Info Info = m_CacheStore.GetInfo();
-
- CbObjectWriter ResponseWriter;
-
- ResponseWriter.BeginObject("Configuration");
- {
- ExtendableStringBuilder<128> BasePathString;
- BasePathString << Info.Config.BasePath.u8string();
- ResponseWriter.AddString("BasePath"sv, BasePathString.ToView());
- ResponseWriter.AddBool("AllowAutomaticCreationOfNamespaces", Info.Config.AllowAutomaticCreationOfNamespaces);
- }
- ResponseWriter.EndObject();
-
- std::sort(begin(Info.NamespaceNames), end(Info.NamespaceNames), [](std::string_view L, std::string_view R) {
- return L.compare(R) < 0;
- });
- ResponseWriter.BeginArray("Namespaces");
- for (const std::string& NamespaceName : Info.NamespaceNames)
- {
- ResponseWriter.AddString(NamespaceName);
- }
- ResponseWriter.EndArray();
- ResponseWriter.BeginObject("StorageSize");
- {
- ResponseWriter.AddInteger("DiskSize", Info.StorageSize.DiskSize);
- ResponseWriter.AddInteger("MemorySize", Info.StorageSize.MemorySize);
- }
-
- ResponseWriter.EndObject();
-
- ResponseWriter.AddInteger("DiskEntryCount", Info.DiskEntryCount);
- ResponseWriter.AddInteger("MemoryEntryCount", Info.MemoryEntryCount);
-
- return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save());
- }
- break;
- }
-}
-
-void
-HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view NamespaceName)
-{
- switch (Request.RequestVerb())
- {
- case HttpVerb::kHead:
- case HttpVerb::kGet:
- {
- std::optional<ZenCacheNamespace::Info> Info = m_CacheStore.GetNamespaceInfo(NamespaceName);
- if (!Info.has_value())
- {
- return Request.WriteResponse(HttpResponseCode::NotFound);
- }
-
- CbObjectWriter ResponseWriter;
-
- ResponseWriter.BeginObject("Configuration");
- {
- ExtendableStringBuilder<128> BasePathString;
- BasePathString << Info->Config.RootDir.u8string();
- ResponseWriter.AddString("RootDir"sv, BasePathString.ToView());
- ResponseWriter.AddInteger("DiskLayerThreshold"sv, Info->Config.DiskLayerThreshold);
- }
- ResponseWriter.EndObject();
-
- std::sort(begin(Info->BucketNames), end(Info->BucketNames), [](std::string_view L, std::string_view R) {
- return L.compare(R) < 0;
- });
-
- ResponseWriter.BeginArray("Buckets"sv);
- for (const std::string& BucketName : Info->BucketNames)
- {
- ResponseWriter.AddString(BucketName);
- }
- ResponseWriter.EndArray();
-
- ResponseWriter.BeginObject("StorageSize"sv);
- {
- ResponseWriter.AddInteger("DiskSize"sv, Info->DiskLayerInfo.TotalSize);
- ResponseWriter.AddInteger("MemorySize"sv, Info->MemoryLayerInfo.TotalSize);
- }
- ResponseWriter.EndObject();
-
- ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount);
- ResponseWriter.AddInteger("MemoryEntryCount", Info->MemoryLayerInfo.EntryCount);
-
- return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save());
- }
- break;
-
- case HttpVerb::kDelete:
- // Drop namespace
- {
- if (m_CacheStore.DropNamespace(NamespaceName))
- {
- return Request.WriteResponse(HttpResponseCode::OK);
- }
- else
- {
- return Request.WriteResponse(HttpResponseCode::NotFound);
- }
- }
- break;
-
- default:
- break;
- }
-}
-
-void
-HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request,
- std::string_view NamespaceName,
- std::string_view BucketName)
-{
- switch (Request.RequestVerb())
- {
- case HttpVerb::kHead:
- case HttpVerb::kGet:
- {
- std::optional<ZenCacheNamespace::BucketInfo> Info = m_CacheStore.GetBucketInfo(NamespaceName, BucketName);
- if (!Info.has_value())
- {
- return Request.WriteResponse(HttpResponseCode::NotFound);
- }
-
- CbObjectWriter ResponseWriter;
-
- ResponseWriter.BeginObject("StorageSize");
- {
- ResponseWriter.AddInteger("DiskSize", Info->DiskLayerInfo.TotalSize);
- ResponseWriter.AddInteger("MemorySize", Info->MemoryLayerInfo.TotalSize);
- }
- ResponseWriter.EndObject();
-
- ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount);
- ResponseWriter.AddInteger("MemoryEntryCount", Info->MemoryLayerInfo.EntryCount);
-
- return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save());
- }
- break;
-
- case HttpVerb::kDelete:
- // Drop bucket
- {
- if (m_CacheStore.DropBucket(NamespaceName, BucketName))
- {
- return Request.WriteResponse(HttpResponseCode::OK);
- }
- else
- {
- return Request.WriteResponse(HttpResponseCode::NotFound);
- }
- }
- break;
-
- default:
- break;
- }
-}
-
-void
-HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
-{
- switch (Request.RequestVerb())
- {
- case HttpVerb::kHead:
- case HttpVerb::kGet:
- {
- HandleGetCacheRecord(Request, Ref, PolicyFromUrl);
- }
- break;
-
- case HttpVerb::kPut:
- HandlePutCacheRecord(Request, Ref, PolicyFromUrl);
- break;
- default:
- break;
- }
-}
-
-void
-HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
-{
- const ZenContentType AcceptType = Request.AcceptContentType();
- const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData);
- const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord);
-
- bool Success = false;
- ZenCacheValue ClientResultValue;
- if (!EnumHasAnyFlags(PolicyFromUrl, CachePolicy::Query))
- {
- return Request.WriteResponse(HttpResponseCode::OK);
- }
-
- Stopwatch Timer;
-
- if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal) &&
- m_CacheStore.Get(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue))
- {
- Success = true;
- ZenContentType ContentType = ClientResultValue.Value.GetContentType();
-
- if (AcceptType == ZenContentType::kCbPackage)
- {
- if (ContentType == ZenContentType::kCbObject)
- {
- CbPackage Package;
- uint32_t MissingCount = 0;
-
- CbObjectView CacheRecord(ClientResultValue.Value.Data());
- CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) {
- if (SkipData)
- {
- if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash()))
- {
- MissingCount++;
- }
- }
- else
- {
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
- {
- CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
- Package.AddAttachment(CbAttachment(Compressed, AttachmentHash.AsHash()));
- }
- else
- {
- MissingCount++;
- }
- }
- });
-
- Success = MissingCount == 0 || PartialRecord;
-
- if (Success)
- {
- Package.SetObject(LoadCompactBinaryObject(ClientResultValue.Value));
-
- BinaryWriter MemStream;
- Package.Save(MemStream);
-
- ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
- ClientResultValue.Value.SetContentType(HttpContentType::kCbPackage);
- }
- }
- else
- {
- Success = false;
- }
- }
- else if (AcceptType != ClientResultValue.Value.GetContentType() && AcceptType != ZenContentType::kUnknownContentType &&
- AcceptType != ZenContentType::kBinary)
- {
- Success = false;
- }
- }
-
- if (Success)
- {
- ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {} '{}' (LOCAL) in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(ClientResultValue.Value.Size()),
- ToString(ClientResultValue.Value.GetContentType()),
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
-
- m_CacheStats.HitCount++;
- if (SkipData && AcceptType != ZenContentType::kCbPackage && AcceptType != ZenContentType::kCbObject)
- {
- return Request.WriteResponse(HttpResponseCode::OK);
- }
- else
- {
- // kCbPackage handled SkipData when constructing the ClientResultValue, kcbObject ignores SkipData
- return Request.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value);
- }
- }
- else if (!EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryRemote))
- {
- ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(AcceptType),
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- m_CacheStats.MissCount++;
- return Request.WriteResponse(HttpResponseCode::NotFound);
- }
-
- // Issue upstream query asynchronously in order to keep requests flowing without
- // hogging I/O servicing threads with blocking work
-
- uint64_t LocalElapsedTimeUs = Timer.GetElapsedTimeUs();
-
- Request.WriteResponseAsync([this, AcceptType, PolicyFromUrl, Ref, LocalElapsedTimeUs](HttpServerRequest& AsyncRequest) {
- Stopwatch Timer;
- bool Success = false;
- const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord);
- const bool QueryLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
- const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal);
- const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData);
- ZenCacheValue ClientResultValue;
-
- metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming);
-
- if (GetUpstreamCacheSingleResult UpstreamResult =
- m_UpstreamCache.GetCacheRecord(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, AcceptType);
- UpstreamResult.Status.Success)
- {
- Success = true;
-
- ClientResultValue.Value = UpstreamResult.Value;
- ClientResultValue.Value.SetContentType(AcceptType);
-
- if (AcceptType == ZenContentType::kBinary || AcceptType == ZenContentType::kCbObject)
- {
- if (AcceptType == ZenContentType::kCbObject)
- {
- const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All);
- if (ValidationResult != CbValidateError::None)
- {
- Success = false;
- ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid compact binary object from upstream",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(AcceptType));
- }
-
- // We do not do anything to the returned object for SkipData, only package attachments are cut when skipping data
- }
-
- if (Success && StoreLocal)
- {
- m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue);
- }
- }
- else if (AcceptType == ZenContentType::kCbPackage)
- {
- CbPackage Package;
- if (Package.TryLoad(ClientResultValue.Value))
- {
- CbObject CacheRecord = Package.GetObject();
- AttachmentCount Count;
- size_t NumAttachments = Package.GetAttachments().size();
- std::vector<const CbAttachment*> AttachmentsToStoreLocally;
- AttachmentsToStoreLocally.reserve(NumAttachments);
-
- CacheRecord.IterateAttachments(
- [this, &Package, &Ref, &AttachmentsToStoreLocally, &Count, QueryLocal, StoreLocal, SkipData](CbFieldView HashView) {
- IoHash Hash = HashView.AsHash();
- if (const CbAttachment* Attachment = Package.FindAttachment(Hash))
- {
- if (Attachment->IsCompressedBinary())
- {
- if (StoreLocal)
- {
- AttachmentsToStoreLocally.emplace_back(Attachment);
- }
- Count.Valid++;
- }
- else
- {
- ZEN_WARN("Uncompressed value '{}' from upstream cache record '{}/{}'",
- Hash,
- Ref.BucketSegment,
- Ref.HashKey);
- Count.Invalid++;
- }
- }
- else if (QueryLocal)
- {
- if (SkipData)
- {
- if (m_CidStore.ContainsChunk(Hash))
- {
- Count.Valid++;
- }
- }
- else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash))
- {
- CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
- if (Compressed)
- {
- Package.AddAttachment(CbAttachment(Compressed, Hash));
- Count.Valid++;
- }
- else
- {
- ZEN_WARN("Uncompressed value '{}' stored in local cache '{}/{}'",
- Hash,
- Ref.BucketSegment,
- Ref.HashKey);
- Count.Invalid++;
- }
- }
- }
- Count.Total++;
- });
-
- if ((Count.Valid == Count.Total) || PartialRecord)
- {
- ZenCacheValue CacheValue;
- CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer();
- CacheValue.Value.SetContentType(ZenContentType::kCbObject);
-
- if (StoreLocal)
- {
- m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue);
- }
-
- for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
- {
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult =
- m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
- if (InsertResult.New)
- {
- Count.New++;
- }
- }
-
- BinaryWriter MemStream;
- if (SkipData)
- {
- // Save a package containing only the object.
- CbPackage(Package.GetObject()).Save(MemStream);
- }
- else
- {
- Package.Save(MemStream);
- }
-
- ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
- ClientResultValue.Value.SetContentType(ZenContentType::kCbPackage);
- }
- else
- {
- Success = false;
- ZEN_WARN("Get - '{}/{}' '{}' FAILED, attachments missing in upstream package",
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(AcceptType));
- }
- }
- else
- {
- Success = false;
- ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid upstream package",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(AcceptType));
- }
- }
- }
-
- if (Success)
- {
- ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {} '{}' (UPSTREAM) in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(ClientResultValue.Value.Size()),
- ToString(ClientResultValue.Value.GetContentType()),
- NiceLatencyNs((LocalElapsedTimeUs + Timer.GetElapsedTimeUs()) * 1000));
-
- m_CacheStats.HitCount++;
- m_CacheStats.UpstreamHitCount++;
-
- if (SkipData && AcceptType == ZenContentType::kBinary)
- {
- AsyncRequest.WriteResponse(HttpResponseCode::OK);
- }
- else
- {
- // Other methods modify ClientResultValue to a version that has skipped the data but keeps the Object and optionally
- // metadata.
- AsyncRequest.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value);
- }
- }
- else
- {
- ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(AcceptType),
- NiceLatencyNs((LocalElapsedTimeUs + Timer.GetElapsedTimeUs()) * 1000));
- m_CacheStats.MissCount++;
- AsyncRequest.WriteResponse(HttpResponseCode::NotFound);
- }
- });
-}
-
-void
-HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
-{
- IoBuffer Body = Request.ReadPayload();
-
- if (!Body || Body.Size() == 0)
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest);
- }
-
- const HttpContentType ContentType = Request.RequestContentType();
-
- Body.SetContentType(ContentType);
-
- Stopwatch Timer;
-
- if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kCompressedBinary)
- {
- IoHash RawHash = IoHash::Zero;
- uint64_t RawSize = Body.GetSize();
- if (ContentType == HttpContentType::kCompressedBinary)
- {
- if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize))
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- "Payload is not a valid compressed binary"sv);
- }
- }
- else
- {
- RawHash = IoHash::HashBuffer(SharedBuffer(Body));
- }
- m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body, .RawSize = RawSize, .RawHash = RawHash});
-
- if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote))
- {
- m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}});
- }
-
- ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(Body.Size()),
- ToString(ContentType),
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- Request.WriteResponse(HttpResponseCode::Created);
- }
- else if (ContentType == HttpContentType::kCbObject)
- {
- const CbValidateError ValidationResult = ValidateCompactBinary(MemoryView(Body.GetData(), Body.GetSize()), CbValidateMode::All);
-
- if (ValidationResult != CbValidateError::None)
- {
- ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, invalid compact binary",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(ContentType));
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv);
- }
-
- Body.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body});
-
- CbObjectView CacheRecord(Body.Data());
- std::vector<IoHash> ValidAttachments;
- int32_t TotalCount = 0;
-
- CacheRecord.IterateAttachments([this, &TotalCount, &ValidAttachments](CbFieldView AttachmentHash) {
- const IoHash Hash = AttachmentHash.AsHash();
- if (m_CidStore.ContainsChunk(Hash))
- {
- ValidAttachments.emplace_back(Hash);
- }
- TotalCount++;
- });
-
- ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total) in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(Body.Size()),
- ToString(ContentType),
- TotalCount,
- ValidAttachments.size(),
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
-
- const bool IsPartialRecord = TotalCount != static_cast<int32_t>(ValidAttachments.size());
-
- CachePolicy Policy = PolicyFromUrl;
- if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
- {
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject,
- .Namespace = Ref.Namespace,
- .Key = {Ref.BucketSegment, Ref.HashKey},
- .ValueContentIds = std::move(ValidAttachments)});
- }
-
- Request.WriteResponse(HttpResponseCode::Created);
- }
- else if (ContentType == HttpContentType::kCbPackage)
- {
- CbPackage Package;
-
- if (!Package.TryLoad(Body))
- {
- ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, invalid package",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(ContentType));
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv);
- }
- CachePolicy Policy = PolicyFromUrl;
-
- CbObject CacheRecord = Package.GetObject();
-
- AttachmentCount Count;
- size_t NumAttachments = Package.GetAttachments().size();
- std::vector<IoHash> ValidAttachments;
- std::vector<const CbAttachment*> AttachmentsToStoreLocally;
- ValidAttachments.reserve(NumAttachments);
- AttachmentsToStoreLocally.reserve(NumAttachments);
-
- CacheRecord.IterateAttachments([this, &Ref, &Package, &AttachmentsToStoreLocally, &ValidAttachments, &Count](CbFieldView HashView) {
- const IoHash Hash = HashView.AsHash();
- if (const CbAttachment* Attachment = Package.FindAttachment(Hash))
- {
- if (Attachment->IsCompressedBinary())
- {
- AttachmentsToStoreLocally.emplace_back(Attachment);
- ValidAttachments.emplace_back(Hash);
- Count.Valid++;
- }
- else
- {
- ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(HttpContentType::kCbPackage),
- Hash);
- Count.Invalid++;
- }
- }
- else if (m_CidStore.ContainsChunk(Hash))
- {
- ValidAttachments.emplace_back(Hash);
- Count.Valid++;
- }
- Count.Total++;
- });
-
- if (Count.Invalid > 0)
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv);
- }
-
- ZenCacheValue CacheValue;
- CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer();
- CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue);
-
- for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
- {
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
- if (InsertResult.New)
- {
- Count.New++;
- }
- }
-
- ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total) in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(Body.GetSize()),
- ToString(ContentType),
- Count.New,
- Count.Valid,
- Count.Total,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
-
- const bool IsPartialRecord = Count.Valid != Count.Total;
-
- if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
- {
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
- .Namespace = Ref.Namespace,
- .Key = {Ref.BucketSegment, Ref.HashKey},
- .ValueContentIds = std::move(ValidAttachments)});
- }
-
- Request.WriteResponse(HttpResponseCode::Created);
- }
- else
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Content-Type invalid"sv);
- }
-}
-
-void
-HttpStructuredCacheService::HandleCacheChunkRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
-{
- switch (Request.RequestVerb())
- {
- case HttpVerb::kHead:
- case HttpVerb::kGet:
- HandleGetCacheChunk(Request, Ref, PolicyFromUrl);
- break;
- case HttpVerb::kPut:
- HandlePutCacheChunk(Request, Ref, PolicyFromUrl);
- break;
- default:
- break;
- }
-}
-
-void
-HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
-{
- Stopwatch Timer;
-
- IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId);
- const UpstreamEndpointInfo* Source = nullptr;
- CachePolicy Policy = PolicyFromUrl;
- {
- const bool QueryUpstream = !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote);
-
- if (QueryUpstream)
- {
- if (GetUpstreamCacheSingleResult UpstreamResult =
- m_UpstreamCache.GetCacheChunk(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId);
- UpstreamResult.Status.Success)
- {
- IoHash RawHash;
- uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(UpstreamResult.Value, RawHash, RawSize))
- {
- if (RawHash == Ref.ValueContentId)
- {
- m_CidStore.AddChunk(UpstreamResult.Value, RawHash);
- Source = UpstreamResult.Source;
- }
- else
- {
- ZEN_WARN("got missmatching upstream cache value");
- }
- }
- else
- {
- ZEN_WARN("got uncompressed upstream cache value");
- }
- }
- }
- }
-
- if (!Value)
- {
- ZEN_DEBUG("GETCACHECHUNK MISS - '{}/{}/{}/{}' '{}' in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- Ref.ValueContentId,
- ToString(Request.AcceptContentType()),
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- m_CacheStats.MissCount++;
- return Request.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ZEN_DEBUG("GETCACHECHUNK HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- Ref.ValueContentId,
- NiceBytes(Value.Size()),
- ToString(Value.GetContentType()),
- Source ? Source->Url : "LOCAL"sv,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
-
- m_CacheStats.HitCount++;
- if (Source)
- {
- m_CacheStats.UpstreamHitCount++;
- }
-
- if (EnumHasAllFlags(Policy, CachePolicy::SkipData))
- {
- Request.WriteResponse(HttpResponseCode::OK);
- }
- else
- {
- Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value);
- }
-}
-
-void
-HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
-{
- // Note: Individual cacherecord values are not propagated upstream until a valid cache record has been stored
- ZEN_UNUSED(PolicyFromUrl);
-
- Stopwatch Timer;
-
- IoBuffer Body = Request.ReadPayload();
-
- if (!Body || Body.Size() == 0)
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest);
- }
-
- Body.SetContentType(Request.RequestContentType());
-
- IoHash RawHash;
- uint64_t RawSize;
- if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize))
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv);
- }
-
- if (RawHash != Ref.ValueContentId)
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- "ValueContentId does not match attachment hash"sv);
- }
-
- CidStore::InsertResult Result = m_CidStore.AddChunk(Body, RawHash);
-
- ZEN_DEBUG("PUTCACHECHUNK - '{}/{}/{}/{}' {} '{}' ({}) in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- Ref.ValueContentId,
- NiceBytes(Body.Size()),
- ToString(Body.GetContentType()),
- Result.New ? "NEW" : "OLD",
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
-
- const HttpResponseCode ResponseCode = Result.New ? HttpResponseCode::Created : HttpResponseCode::OK;
-
- Request.WriteResponse(ResponseCode);
-}
-
-CbPackage
-HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType,
- IoBuffer&& Body,
- uint32_t& OutAcceptMagic,
- RpcAcceptOptions& OutAcceptFlags,
- int& OutTargetProcessId)
-{
- CbPackage Package;
- CbObjectView Object;
- CbObject ObjectBuffer;
- if (ContentType == ZenContentType::kCbObject)
- {
- ObjectBuffer = LoadCompactBinaryObject(std::move(Body));
- Object = ObjectBuffer;
- }
- else
- {
- Package = ParsePackageMessage(Body);
- Object = Package.GetObject();
- }
- OutAcceptMagic = Object["Accept"sv].AsUInt32();
- OutAcceptFlags = static_cast<RpcAcceptOptions>(Object["AcceptFlags"sv].AsUInt16(0u));
- OutTargetProcessId = Object["Pid"sv].AsInt32(0);
-
- const std::string_view Method = Object["Method"sv].AsString();
-
- if (Method == "PutCacheRecords"sv)
- {
- return HandleRpcPutCacheRecords(Package);
- }
- else if (Method == "GetCacheRecords"sv)
- {
- return HandleRpcGetCacheRecords(Object);
- }
- else if (Method == "PutCacheValues"sv)
- {
- return HandleRpcPutCacheValues(Package);
- }
- else if (Method == "GetCacheValues"sv)
- {
- return HandleRpcGetCacheValues(Object);
- }
- else if (Method == "GetCacheChunks"sv)
- {
- return HandleRpcGetCacheChunks(Object);
- }
- return CbPackage{};
-}
-
-void
-HttpStructuredCacheService::ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount)
-{
- WorkerThreadPool WorkerPool(ThreadCount);
- uint64_t RequestCount = Replayer.GetRequestCount();
- Stopwatch Timer;
- auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); });
- Latch JobLatch(RequestCount);
- ZEN_INFO("Replaying {} requests", RequestCount);
- for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex)
- {
- WorkerPool.ScheduleWork([this, &JobLatch, &Replayer, RequestIndex]() {
- IoBuffer Body;
- std::pair<ZenContentType, ZenContentType> ContentType = Replayer.GetRequest(RequestIndex, Body);
- if (Body)
- {
- uint32_t AcceptMagic = 0;
- RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
- int TargetPid = 0;
- CbPackage RpcResult = HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid);
- if (AcceptMagic == kCbPkgMagic)
- {
- FormatFlags Flags = FormatFlags::kDefault;
- if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
- {
- Flags |= FormatFlags::kAllowLocalReferences;
- if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
- {
- Flags |= FormatFlags::kDenyPartialLocalReferences;
- }
- }
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetPid);
- ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0);
- }
- else
- {
- BinaryWriter MemStream;
- RpcResult.Save(MemStream);
- IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize());
- ZEN_ASSERT(RpcResponseBuffer.Size() > 0);
- }
- }
- JobLatch.CountDown();
- });
- }
- while (!JobLatch.Wait(10000))
- {
- ZEN_INFO("Replayed {} of {} requests, elapsed {}",
- RequestCount - JobLatch.Remaining(),
- RequestCount,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- }
-}
-
-void
-HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
-{
- switch (Request.RequestVerb())
- {
- case HttpVerb::kPost:
- {
- const HttpContentType ContentType = Request.RequestContentType();
- const HttpContentType AcceptType = Request.AcceptContentType();
-
- if ((ContentType != HttpContentType::kCbObject && ContentType != HttpContentType::kCbPackage) ||
- AcceptType != HttpContentType::kCbPackage)
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest);
- }
-
- Request.WriteResponseAsync(
- [this, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable {
- std::uint64_t RequestIndex =
- m_RequestRecorder ? m_RequestRecorder->RecordRequest(ContentType, AcceptType, Body) : ~0ull;
- uint32_t AcceptMagic = 0;
- RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
- int TargetProcessId = 0;
- CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags, TargetProcessId);
- if (RpcResult.IsNull())
- {
- AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
- return;
- }
- if (AcceptMagic == kCbPkgMagic)
- {
- FormatFlags Flags = FormatFlags::kDefault;
- if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
- {
- Flags |= FormatFlags::kAllowLocalReferences;
- if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
- {
- Flags |= FormatFlags::kDenyPartialLocalReferences;
- }
- }
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessId);
- if (RequestIndex != ~0ull)
- {
- ZEN_ASSERT(m_RequestRecorder);
- m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- else
- {
- BinaryWriter MemStream;
- RpcResult.Save(MemStream);
-
- if (RequestIndex != ~0ull)
- {
- ZEN_ASSERT(m_RequestRecorder);
- m_RequestRecorder->RecordResponse(RequestIndex,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
- }
- AsyncRequest.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
- }
- });
- }
- break;
- default:
- Request.WriteResponse(HttpResponseCode::BadRequest);
- break;
- }
-}
-
-CbPackage
-HttpStructuredCacheService::HandleRpcPutCacheRecords(const CbPackage& BatchRequest)
-{
- ZEN_TRACE_CPU("Z$::RpcPutCacheRecords");
- CbObjectView BatchObject = BatchRequest.GetObject();
- ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv);
-
- CbObjectView Params = BatchObject["Params"sv].AsObjectView();
- CachePolicy DefaultPolicy;
-
- std::string_view PolicyText = Params["DefaultPolicy"].AsString();
- std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
- if (!Namespace)
- {
- return CbPackage{};
- }
- DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
- std::vector<bool> Results;
- for (CbFieldView RequestField : Params["Requests"sv])
- {
- CbObjectView RequestObject = RequestField.AsObjectView();
- CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView();
- CbObjectView KeyView = RecordObject["Key"sv].AsObjectView();
-
- CacheKey Key;
- if (!GetRpcRequestCacheKey(KeyView, Key))
- {
- return CbPackage{};
- }
- CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
- PutRequestData PutRequest{*Namespace, std::move(Key), RecordObject, std::move(Policy)};
-
- PutResult Result = PutCacheRecord(PutRequest, &BatchRequest);
-
- if (Result == PutResult::Invalid)
- {
- return CbPackage{};
- }
- Results.push_back(Result == PutResult::Success);
- }
- if (Results.empty())
- {
- return CbPackage{};
- }
-
- CbObjectWriter ResponseObject;
- ResponseObject.BeginArray("Result"sv);
- for (bool Value : Results)
- {
- ResponseObject.AddBool(Value);
- }
- ResponseObject.EndArray();
-
- CbPackage RpcResponse;
- RpcResponse.SetObject(ResponseObject.Save());
- return RpcResponse;
-}
-
-HttpStructuredCacheService::PutResult
-HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPackage* Package)
-{
- CbObjectView Record = Request.RecordObject;
- uint64_t RecordObjectSize = Record.GetSize();
- uint64_t TransferredSize = RecordObjectSize;
-
- AttachmentCount Count;
- size_t NumAttachments = Package->GetAttachments().size();
- std::vector<IoHash> ValidAttachments;
- std::vector<const CbAttachment*> AttachmentsToStoreLocally;
- ValidAttachments.reserve(NumAttachments);
- AttachmentsToStoreLocally.reserve(NumAttachments);
-
- Stopwatch Timer;
-
- Request.RecordObject.IterateAttachments(
- [this, &Request, Package, &AttachmentsToStoreLocally, &ValidAttachments, &Count, &TransferredSize](CbFieldView HashView) {
- const IoHash ValueHash = HashView.AsHash();
- if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr)
- {
- if (Attachment->IsCompressedBinary())
- {
- AttachmentsToStoreLocally.emplace_back(Attachment);
- ValidAttachments.emplace_back(ValueHash);
- Count.Valid++;
- }
- else
- {
- ZEN_WARN("PUTCACEHRECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
- Request.Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- ToString(HttpContentType::kCbPackage),
- ValueHash);
- Count.Invalid++;
- }
- }
- else if (m_CidStore.ContainsChunk(ValueHash))
- {
- ValidAttachments.emplace_back(ValueHash);
- Count.Valid++;
- }
- Count.Total++;
- });
-
- if (Count.Invalid > 0)
- {
- return PutResult::Invalid;
- }
-
- ZenCacheValue CacheValue;
- CacheValue.Value = IoBuffer(Record.GetSize());
- Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
- CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue);
-
- for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
- {
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
- if (InsertResult.New)
- {
- Count.New++;
- }
- TransferredSize += Chunk.GetCompressedSize();
- }
-
- ZEN_DEBUG("PUTCACEHRECORD - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total) in {}",
- Request.Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- NiceBytes(TransferredSize),
- Count.New,
- Count.Valid,
- Count.Total,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
-
- const bool IsPartialRecord = Count.Valid != Count.Total;
-
- if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord)
- {
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
- .Namespace = Request.Namespace,
- .Key = Request.Key,
- .ValueContentIds = std::move(ValidAttachments)});
- }
- return PutResult::Success;
-}
-
-CbPackage
-HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest)
-{
- ZEN_TRACE_CPU("Z$::RpcGetCacheRecords");
-
- ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv);
-
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
-
- struct ValueRequestData
- {
- Oid ValueId;
- IoHash ContentId;
- CompressedBuffer Payload;
- CachePolicy DownstreamPolicy;
- bool Exists = false;
- bool ReadFromUpstream = false;
- };
- struct RecordRequestData
- {
- CacheKeyRequest Upstream;
- CbObjectView RecordObject;
- IoBuffer RecordCacheValue;
- CacheRecordPolicy DownstreamPolicy;
- std::vector<ValueRequestData> Values;
- bool Complete = false;
- const UpstreamEndpointInfo* Source = nullptr;
- uint64_t ElapsedTimeUs;
- };
-
- std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
- CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
- std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
- if (!Namespace)
- {
- return CbPackage{};
- }
- std::vector<RecordRequestData> Requests;
- std::vector<size_t> UpstreamIndexes;
- CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- Requests.reserve(RequestsArray.Num());
-
- auto ParseValues = [](RecordRequestData& Request) {
- CbArrayView ValuesArray = Request.RecordObject["Values"sv].AsArrayView();
- Request.Values.reserve(ValuesArray.Num());
- for (CbFieldView ValueField : ValuesArray)
- {
- CbObjectView ValueObject = ValueField.AsObjectView();
- Oid ValueId = ValueObject["Id"sv].AsObjectId();
- CbFieldView RawHashField = ValueObject["RawHash"sv];
- IoHash RawHash = RawHashField.AsBinaryAttachment();
- if (ValueId && !RawHashField.HasError())
- {
- Request.Values.push_back({ValueId, RawHash});
- Request.Values.back().DownstreamPolicy = Request.DownstreamPolicy.GetValuePolicy(ValueId);
- }
- }
- };
-
- for (CbFieldView RequestField : RequestsArray)
- {
- Stopwatch Timer;
- RecordRequestData& Request = Requests.emplace_back();
- CbObjectView RequestObject = RequestField.AsObjectView();
- CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
-
- CacheKey& Key = Request.Upstream.Key;
- if (!GetRpcRequestCacheKey(KeyObject, Key))
- {
- return CbPackage{};
- }
-
- Request.DownstreamPolicy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
- const CacheRecordPolicy& Policy = Request.DownstreamPolicy;
-
- ZenCacheValue CacheValue;
- bool NeedUpstreamAttachment = false;
- bool FoundLocalInvalid = false;
- ZenCacheValue RecordCacheValue;
-
- if (EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryLocal) &&
- m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, RecordCacheValue))
- {
- Request.RecordCacheValue = std::move(RecordCacheValue.Value);
- if (Request.RecordCacheValue.GetContentType() != ZenContentType::kCbObject)
- {
- FoundLocalInvalid = true;
- }
- else
- {
- Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData());
- ParseValues(Request);
-
- Request.Complete = true;
- for (ValueRequestData& Value : Request.Values)
- {
- CachePolicy ValuePolicy = Value.DownstreamPolicy;
- if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal))
- {
- // A value that is requested without the Query flag (such as None/Disable) counts as existing, because we
- // didn't ask for it and thus the record is complete in its absence.
- if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
- {
- Value.Exists = true;
- }
- else
- {
- NeedUpstreamAttachment = true;
- Value.ReadFromUpstream = true;
- Request.Complete = false;
- }
- }
- else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
- {
- if (m_CidStore.ContainsChunk(Value.ContentId))
- {
- Value.Exists = true;
- }
- else
- {
- if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
- {
- NeedUpstreamAttachment = true;
- Value.ReadFromUpstream = true;
- }
- Request.Complete = false;
- }
- }
- else
- {
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId))
- {
- ZEN_ASSERT(Chunk.GetSize() > 0);
- Value.Payload = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
- Value.Exists = true;
- }
- else
- {
- if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
- {
- NeedUpstreamAttachment = true;
- Value.ReadFromUpstream = true;
- }
- Request.Complete = false;
- }
- }
- }
- }
- }
- if (!Request.Complete)
- {
- bool NeedUpstreamRecord =
- !Request.RecordObject && !FoundLocalInvalid && EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryRemote);
- if (NeedUpstreamRecord || NeedUpstreamAttachment)
- {
- UpstreamIndexes.push_back(Requests.size() - 1);
- }
- }
- Request.ElapsedTimeUs = Timer.GetElapsedTimeUs();
- }
- if (Requests.empty())
- {
- return CbPackage{};
- }
-
- if (!UpstreamIndexes.empty())
- {
- std::vector<CacheKeyRequest*> UpstreamRequests;
- UpstreamRequests.reserve(UpstreamIndexes.size());
- for (size_t Index : UpstreamIndexes)
- {
- RecordRequestData& Request = Requests[Index];
- UpstreamRequests.push_back(&Request.Upstream);
-
- if (Request.Values.size())
- {
- // We will be returning the local object and know all the value Ids that exist in it
- // Convert all their Downstream Values to upstream values, and add SkipData to any ones that we already have.
- CachePolicy UpstreamBasePolicy = ConvertToUpstream(Request.DownstreamPolicy.GetBasePolicy()) | CachePolicy::SkipMeta;
- CacheRecordPolicyBuilder Builder(UpstreamBasePolicy);
- for (ValueRequestData& Value : Request.Values)
- {
- CachePolicy UpstreamPolicy = ConvertToUpstream(Value.DownstreamPolicy);
- UpstreamPolicy |= !Value.ReadFromUpstream ? CachePolicy::SkipData : CachePolicy::None;
- Builder.AddValuePolicy(Value.ValueId, UpstreamPolicy);
- }
- Request.Upstream.Policy = Builder.Build();
- }
- else
- {
- // We don't know which Values exist in the Record; ask the upstrem for all values that the client wants,
- // and convert the CacheRecordPolicy to an upstream policy
- Request.Upstream.Policy = Request.DownstreamPolicy.ConvertToUpstream();
- }
- }
-
- const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues](CacheRecordGetCompleteParams&& Params) {
- if (!Params.Record)
- {
- return;
- }
-
- RecordRequestData& Request =
- *reinterpret_cast<RecordRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(RecordRequestData, Upstream));
- Request.ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
- const CacheKey& Key = Request.Upstream.Key;
- Stopwatch Timer;
- auto TimeGuard = MakeGuard([&Timer, &Request]() { Request.ElapsedTimeUs += Timer.GetElapsedTimeUs(); });
- if (!Request.RecordObject)
- {
- CbObject ObjectBuffer = CbObject::Clone(Params.Record);
- Request.RecordCacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
- Request.RecordCacheValue.SetContentType(ZenContentType::kCbObject);
- Request.RecordObject = ObjectBuffer;
- if (EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal))
- {
- m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}});
- }
- ParseValues(Request);
- Request.Source = Params.Source;
- }
-
- Request.Complete = true;
- for (ValueRequestData& Value : Request.Values)
- {
- if (Value.Exists)
- {
- continue;
- }
- CachePolicy ValuePolicy = Value.DownstreamPolicy;
- if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
- {
- Request.Complete = false;
- continue;
- }
- if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData) || EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal))
- {
- if (const CbAttachment* Attachment = Params.Package.FindAttachment(Value.ContentId))
- {
- if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
- {
- Request.Source = Params.Source;
- Value.Exists = true;
- if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal))
- {
- m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
- }
- if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
- {
- Value.Payload = Compressed;
- }
- }
- else
- {
- ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}/{}'",
- Value.ContentId,
- *Namespace,
- Key.Bucket,
- Key.Hash);
- }
- }
- if (!Value.Exists && !EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
- {
- Request.Complete = false;
- }
- // Request.Complete does not need to be set to false for upstream SkipData attachments.
- // In the PartialRecord==false case, the upstream will have failed the entire record if any SkipData attachment
- // didn't exist and we will not get here. In the PartialRecord==true case, we do not need to inform the client of
- // any missing SkipData attachments.
- }
- Request.ElapsedTimeUs += Timer.GetElapsedTimeUs();
- }
- };
-
- m_UpstreamCache.GetCacheRecords(*Namespace, UpstreamRequests, std::move(OnCacheRecordGetComplete));
- }
-
- CbPackage ResponsePackage;
- CbObjectWriter ResponseObject;
-
- ResponseObject.BeginArray("Result"sv);
- for (RecordRequestData& Request : Requests)
- {
- const CacheKey& Key = Request.Upstream.Key;
- if (Request.Complete ||
- (Request.RecordObject && EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::PartialRecord)))
- {
- ResponseObject << Request.RecordObject;
- for (ValueRequestData& Value : Request.Values)
- {
- if (!EnumHasAllFlags(Value.DownstreamPolicy, CachePolicy::SkipData) && Value.Payload)
- {
- ResponsePackage.AddAttachment(CbAttachment(Value.Payload, Value.ContentId));
- }
- }
-
- ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {}{} ({}) in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- NiceBytes(Request.RecordCacheValue.Size()),
- Request.Complete ? ""sv : " (PARTIAL)"sv,
- Request.Source ? Request.Source->Url : "LOCAL"sv,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
- m_CacheStats.HitCount++;
- m_CacheStats.UpstreamHitCount += Request.Source ? 1 : 0;
- }
- else
- {
- ResponseObject.AddNull();
-
- if (!EnumHasAnyFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::Query))
- {
- // If they requested no query, do not record this as a miss
- ZEN_DEBUG("GETCACHERECORD DISABLEDQUERY - '{}/{}/{}' in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
- }
- else
- {
- ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}'{} ({}) in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- Request.RecordObject ? ""sv : " (PARTIAL)"sv,
- Request.Source ? Request.Source->Url : "LOCAL"sv,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
- m_CacheStats.MissCount++;
- }
- }
- }
- ResponseObject.EndArray();
- ResponsePackage.SetObject(ResponseObject.Save());
- return ResponsePackage;
-}
-
-CbPackage
-HttpStructuredCacheService::HandleRpcPutCacheValues(const CbPackage& BatchRequest)
-{
- CbObjectView BatchObject = BatchRequest.GetObject();
- CbObjectView Params = BatchObject["Params"sv].AsObjectView();
-
- std::string_view PolicyText = Params["DefaultPolicy"].AsString();
- CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
- std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
- if (!Namespace)
- {
- return CbPackage{};
- }
- std::vector<bool> Results;
- for (CbFieldView RequestField : Params["Requests"sv])
- {
- Stopwatch Timer;
-
- CbObjectView RequestObject = RequestField.AsObjectView();
- CbObjectView KeyView = RequestObject["Key"sv].AsObjectView();
-
- CacheKey Key;
- if (!GetRpcRequestCacheKey(KeyView, Key))
- {
- return CbPackage{};
- }
-
- PolicyText = RequestObject["Policy"sv].AsString();
- CachePolicy Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
- IoHash RawHash = RequestObject["RawHash"sv].AsBinaryAttachment();
- uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64();
- bool Succeeded = false;
- uint64_t TransferredSize = 0;
-
- if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash))
- {
- if (Attachment->IsCompressedBinary())
- {
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
- {
- // TODO: Implement upstream puts of CacheValues with StoreLocal == false.
- // Currently ProcessCacheRecord requires that the value exist in the local cache to put it upstream.
- Policy |= CachePolicy::StoreLocal;
- }
-
- if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
- {
- IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
- Value.SetContentType(ZenContentType::kCompressedBinary);
- if (RawSize == 0)
- {
- RawSize = Chunk.DecodeRawSize();
- }
- m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = Value, .RawSize = RawSize, .RawHash = RawHash});
- TransferredSize = Chunk.GetCompressedSize();
- }
- Succeeded = true;
- }
- else
- {
- ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash);
- return CbPackage{};
- }
- }
- else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
- {
- ZenCacheValue ExistingValue;
- if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, ExistingValue) &&
- IsCompressedBinary(ExistingValue.Value.GetContentType()))
- {
- Succeeded = true;
- }
- }
- // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put.
- // If it doesn't exist locally they should do the heavy put rather than having us fetch it from upstream.
-
- if (Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
- {
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = Key});
- }
- Results.push_back(Succeeded);
- ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}' in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- NiceBytes(TransferredSize),
- Succeeded ? "Added"sv : "Invalid",
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- }
- if (Results.empty())
- {
- return CbPackage{};
- }
-
- CbObjectWriter ResponseObject;
- ResponseObject.BeginArray("Result"sv);
- for (bool Value : Results)
- {
- ResponseObject.AddBool(Value);
- }
- ResponseObject.EndArray();
-
- CbPackage RpcResponse;
- RpcResponse.SetObject(ResponseObject.Save());
-
- return RpcResponse;
-}
-
-CbPackage
-HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest)
-{
- ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv);
-
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
- std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
- CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
- std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
- if (!Namespace)
- {
- return CbPackage{};
- }
-
- struct RequestData
- {
- CacheKey Key;
- CachePolicy Policy;
- IoHash RawHash = IoHash::Zero;
- uint64_t RawSize = 0;
- CompressedBuffer Result;
- };
- std::vector<RequestData> Requests;
-
- std::vector<size_t> RemoteRequestIndexes;
-
- for (CbFieldView RequestField : Params["Requests"sv])
- {
- Stopwatch Timer;
-
- RequestData& Request = Requests.emplace_back();
- CbObjectView RequestObject = RequestField.AsObjectView();
- CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
-
- if (!GetRpcRequestCacheKey(KeyObject, Request.Key))
- {
- return CbPackage{};
- }
-
- PolicyText = RequestObject["Policy"sv].AsString();
- Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
-
- CacheKey& Key = Request.Key;
- CachePolicy Policy = Request.Policy;
-
- ZenCacheValue CacheValue;
- if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
- {
- if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType()))
- {
- Request.RawHash = CacheValue.RawHash;
- Request.RawSize = CacheValue.RawSize;
- Request.Result = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value));
- }
- }
- if (Request.Result)
- {
- ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- NiceBytes(Request.Result.GetCompressed().GetSize()),
- "LOCAL"sv,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- m_CacheStats.HitCount++;
- }
- else if (EnumHasAllFlags(Policy, CachePolicy::QueryRemote))
- {
- RemoteRequestIndexes.push_back(Requests.size() - 1);
- }
- else if (!EnumHasAnyFlags(Policy, CachePolicy::Query))
- {
- // If they requested no query, do not record this as a miss
- ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash);
- }
- else
- {
- ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- "LOCAL"sv,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- m_CacheStats.MissCount++;
- }
- }
-
- if (!RemoteRequestIndexes.empty())
- {
- std::vector<CacheValueRequest> RequestedRecordsData;
- std::vector<CacheValueRequest*> CacheValueRequests;
- RequestedRecordsData.reserve(RemoteRequestIndexes.size());
- CacheValueRequests.reserve(RemoteRequestIndexes.size());
- for (size_t Index : RemoteRequestIndexes)
- {
- RequestData& Request = Requests[Index];
- RequestedRecordsData.push_back({.Key = {Request.Key.Bucket, Request.Key.Hash}, .Policy = ConvertToUpstream(Request.Policy)});
- CacheValueRequests.push_back(&RequestedRecordsData.back());
- }
- Stopwatch Timer;
- m_UpstreamCache.GetCacheValues(
- *Namespace,
- CacheValueRequests,
- [this, Namespace, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer](CacheValueGetCompleteParams&& Params) {
- CacheValueRequest& ChunkRequest = Params.Request;
- if (Params.RawHash != IoHash::Zero)
- {
- size_t RequestOffset = std::distance(RequestedRecordsData.data(), &ChunkRequest);
- size_t RequestIndex = RemoteRequestIndexes[RequestOffset];
- RequestData& Request = Requests[RequestIndex];
- Request.RawHash = Params.RawHash;
- Request.RawSize = Params.RawSize;
- const bool HasData = IsCompressedBinary(Params.Value.GetContentType());
- const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData);
- const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal);
- const bool IsHit = SkipData || HasData;
- if (IsHit)
- {
- if (HasData && !SkipData)
- {
- Request.Result = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Params.Value));
- }
-
- if (HasData && StoreData)
- {
- m_CacheStore.Put(*Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash});
- }
-
- ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
- *Namespace,
- ChunkRequest.Key.Bucket,
- ChunkRequest.Key.Hash,
- NiceBytes(Request.Result.GetCompressed().GetSize()),
- Params.Source ? Params.Source->Url : "UPSTREAM",
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- m_CacheStats.HitCount++;
- m_CacheStats.UpstreamHitCount++;
- return;
- }
- }
- ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}",
- *Namespace,
- ChunkRequest.Key.Bucket,
- ChunkRequest.Key.Hash,
- Params.Source ? Params.Source->Url : "UPSTREAM",
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- m_CacheStats.MissCount++;
- });
- }
-
- if (Requests.empty())
- {
- return CbPackage{};
- }
-
- CbPackage RpcResponse;
- CbObjectWriter ResponseObject;
- ResponseObject.BeginArray("Result"sv);
- for (const RequestData& Request : Requests)
- {
- ResponseObject.BeginObject();
- {
- const CompressedBuffer& Result = Request.Result;
- if (Result)
- {
- ResponseObject.AddHash("RawHash"sv, Request.RawHash);
- if (!EnumHasAllFlags(Request.Policy, CachePolicy::SkipData))
- {
- RpcResponse.AddAttachment(CbAttachment(Result, Request.RawHash));
- }
- else
- {
- ResponseObject.AddInteger("RawSize"sv, Request.RawSize);
- }
- }
- else if (Request.RawHash != IoHash::Zero)
- {
- ResponseObject.AddHash("RawHash"sv, Request.RawHash);
- ResponseObject.AddInteger("RawSize"sv, Request.RawSize);
- }
- }
- ResponseObject.EndObject();
- }
- ResponseObject.EndArray();
-
- RpcResponse.SetObject(ResponseObject.Save());
- return RpcResponse;
-}
-
-namespace cache::detail {
-
- struct RecordValue
- {
- Oid ValueId;
- IoHash ContentId;
- uint64_t RawSize;
- };
- struct RecordBody
- {
- IoBuffer CacheValue;
- std::vector<RecordValue> Values;
- const UpstreamEndpointInfo* Source = nullptr;
- CachePolicy DownstreamPolicy;
- bool Exists = false;
- bool HasRequest = false;
- bool ValuesRead = false;
- };
- struct ChunkRequest
- {
- CacheChunkRequest* Key = nullptr;
- RecordBody* Record = nullptr;
- CompressedBuffer Value;
- const UpstreamEndpointInfo* Source = nullptr;
- uint64_t RawSize = 0;
- uint64_t RequestedSize = 0;
- uint64_t RequestedOffset = 0;
- CachePolicy DownstreamPolicy;
- bool Exists = false;
- bool RawSizeKnown = false;
- bool IsRecordRequest = false;
- uint64_t ElapsedTimeUs = 0;
- };
-
-} // namespace cache::detail
-
-CbPackage
-HttpStructuredCacheService::HandleRpcGetCacheChunks(CbObjectView RpcRequest)
-{
- using namespace cache::detail;
-
- std::string Namespace;
- std::vector<CacheKeyRequest> RecordKeys; // Data about a Record necessary to identify it to the upstream
- std::vector<RecordBody> Records; // Scratch-space data about a Record when fulfilling RecordRequests
- std::vector<CacheChunkRequest> RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream
- std::vector<ChunkRequest> Requests; // Intermediate and result data about a ChunkRequest
- std::vector<ChunkRequest*> RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key
- std::vector<ChunkRequest*> ValueRequests; // The ChunkRequests that are requesting a Value Key
- std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream
-
- // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests
- if (!ParseGetCacheChunksRequest(Namespace, RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest))
- {
- return CbPackage{};
- }
-
- // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we
- // have it locally, and otherwise append a request for the payload to UpstreamChunks
- GetLocalCacheRecords(Namespace, RecordKeys, Records, RecordRequests, UpstreamChunks);
-
- // For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks
- GetLocalCacheValues(Namespace, ValueRequests, UpstreamChunks);
-
- // Call GetCacheChunks on the upstream for any payloads we do not have locally
- GetUpstreamCacheChunks(Namespace, UpstreamChunks, RequestKeys, Requests);
-
- // Send the payload and descriptive data about each chunk to the client
- return WriteGetCacheChunksResponse(Namespace, Requests);
-}
-
-bool
-HttpStructuredCacheService::ParseGetCacheChunksRequest(std::string& Namespace,
- std::vector<CacheKeyRequest>& RecordKeys,
- std::vector<cache::detail::RecordBody>& Records,
- std::vector<CacheChunkRequest>& RequestKeys,
- std::vector<cache::detail::ChunkRequest>& Requests,
- std::vector<cache::detail::ChunkRequest*>& RecordRequests,
- std::vector<cache::detail::ChunkRequest*>& ValueRequests,
- CbObjectView RpcRequest)
-{
- using namespace cache::detail;
-
- ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv);
-
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
- std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString();
- CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default;
-
- std::optional<std::string> NamespaceText = GetRpcRequestNamespace(Params);
- if (!NamespaceText)
- {
- ZEN_WARN("GetCacheChunks: Invalid namespace in ChunkRequest.");
- return false;
- }
- Namespace = *NamespaceText;
-
- CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView();
- size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num());
-
- // Note that these reservations allow us to take pointers to the elements while populating them. If the reservation is removed,
- // we will need to change the pointers to indexes to handle reallocations.
- RecordKeys.reserve(NumRequests);
- Records.reserve(NumRequests);
- RequestKeys.reserve(NumRequests);
- Requests.reserve(NumRequests);
- RecordRequests.reserve(NumRequests);
- ValueRequests.reserve(NumRequests);
-
- CacheKeyRequest* PreviousRecordKey = nullptr;
- RecordBody* PreviousRecord = nullptr;
-
- for (CbFieldView RequestView : ChunkRequestsArray)
- {
- CbObjectView RequestObject = RequestView.AsObjectView();
- CacheChunkRequest& RequestKey = RequestKeys.emplace_back();
- ChunkRequest& Request = Requests.emplace_back();
- CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
-
- Request.Key = &RequestKey;
- if (!GetRpcRequestCacheKey(KeyObject, Request.Key->Key))
- {
- ZEN_WARN("GetCacheChunks: Invalid key in ChunkRequest.");
- return false;
- }
-
- RequestKey.ChunkId = RequestObject["ChunkId"sv].AsHash();
- RequestKey.ValueId = RequestObject["ValueId"sv].AsObjectId();
- RequestKey.RawOffset = RequestObject["RawOffset"sv].AsUInt64();
- RequestKey.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX);
- Request.RequestedSize = RequestKey.RawSize;
- Request.RequestedOffset = RequestKey.RawOffset;
- std::string_view PolicyText = RequestObject["Policy"sv].AsString();
- Request.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
- Request.IsRecordRequest = (bool)RequestKey.ValueId;
-
- if (!Request.IsRecordRequest)
- {
- ValueRequests.push_back(&Request);
- }
- else
- {
- RecordRequests.push_back(&Request);
- CacheKeyRequest* RecordKey = nullptr;
- RecordBody* Record = nullptr;
-
- if (!PreviousRecordKey || PreviousRecordKey->Key < RequestKey.Key)
- {
- RecordKey = &RecordKeys.emplace_back();
- PreviousRecordKey = RecordKey;
- Record = &Records.emplace_back();
- PreviousRecord = Record;
- RecordKey->Key = RequestKey.Key;
- }
- else if (RequestKey.Key == PreviousRecordKey->Key)
- {
- RecordKey = PreviousRecordKey;
- Record = PreviousRecord;
- }
- else
- {
- ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.",
- RequestKey.Key.Bucket,
- RequestKey.Key.Hash,
- PreviousRecordKey->Key.Bucket,
- PreviousRecordKey->Key.Hash);
- return false;
- }
- Request.Record = Record;
- if (RequestKey.ChunkId == RequestKey.ChunkId.Zero)
- {
- Record->DownstreamPolicy =
- Record->HasRequest ? Union(Record->DownstreamPolicy, Request.DownstreamPolicy) : Request.DownstreamPolicy;
- Record->HasRequest = true;
- }
- }
- }
- if (Requests.empty())
- {
- return false;
- }
- return true;
-}
-
-void
-HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespace,
- std::vector<CacheKeyRequest>& RecordKeys,
- std::vector<cache::detail::RecordBody>& Records,
- std::vector<cache::detail::ChunkRequest*>& RecordRequests,
- std::vector<CacheChunkRequest*>& OutUpstreamChunks)
-{
- using namespace cache::detail;
-
- std::vector<CacheKeyRequest*> UpstreamRecordRequests;
- for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex)
- {
- Stopwatch Timer;
- CacheKeyRequest& RecordKey = RecordKeys[RecordIndex];
- RecordBody& Record = Records[RecordIndex];
- if (Record.HasRequest)
- {
- Record.DownstreamPolicy |= CachePolicy::SkipData | CachePolicy::SkipMeta;
-
- if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal))
- {
- ZenCacheValue CacheValue;
- if (m_CacheStore.Get(Namespace, RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue))
- {
- Record.Exists = true;
- Record.CacheValue = std::move(CacheValue.Value);
- }
- }
- if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryRemote))
- {
- RecordKey.Policy = CacheRecordPolicy(ConvertToUpstream(Record.DownstreamPolicy));
- UpstreamRecordRequests.push_back(&RecordKey);
- }
- RecordRequests[RecordIndex]->ElapsedTimeUs += Timer.GetElapsedTimeUs();
- }
- }
-
- if (!UpstreamRecordRequests.empty())
- {
- const auto OnCacheRecordGetComplete =
- [this, Namespace, &RecordKeys, &Records, &RecordRequests](CacheRecordGetCompleteParams&& Params) {
- if (!Params.Record)
- {
- return;
- }
- CacheKeyRequest& RecordKey = Params.Request;
- size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey);
- RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
- RecordBody& Record = Records[RecordIndex];
-
- const CacheKey& Key = RecordKey.Key;
- Record.Exists = true;
- CbObject ObjectBuffer = CbObject::Clone(Params.Record);
- Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
- Record.CacheValue.SetContentType(ZenContentType::kCbObject);
- Record.Source = Params.Source;
-
- if (EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal))
- {
- m_CacheStore.Put(Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue});
- }
- };
- m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete));
- }
-
- std::vector<CacheChunkRequest*> UpstreamPayloadRequests;
- for (ChunkRequest* Request : RecordRequests)
- {
- Stopwatch Timer;
- if (Request->Key->ChunkId == IoHash::Zero)
- {
- // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId)
- // is missing, parse the cache record and try to find the raw hash from the ValueId.
- RecordBody& Record = *Request->Record;
- if (!Record.ValuesRead)
- {
- Record.ValuesRead = true;
- if (Record.CacheValue && Record.CacheValue.GetContentType() == ZenContentType::kCbObject)
- {
- CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData());
- CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView();
- Record.Values.reserve(ValuesArray.Num());
- for (CbFieldView ValueField : ValuesArray)
- {
- CbObjectView ValueObject = ValueField.AsObjectView();
- Oid ValueId = ValueObject["Id"sv].AsObjectId();
- CbFieldView RawHashField = ValueObject["RawHash"sv];
- IoHash RawHash = RawHashField.AsBinaryAttachment();
- if (ValueId && !RawHashField.HasError())
- {
- Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()});
- }
- }
- }
- }
-
- for (const RecordValue& Value : Record.Values)
- {
- if (Value.ValueId == Request->Key->ValueId)
- {
- Request->Key->ChunkId = Value.ContentId;
- Request->RawSize = Value.RawSize;
- Request->RawSizeKnown = true;
- break;
- }
- }
- }
-
- // Now load the ContentId from the local ContentIdStore or from the upstream
- if (Request->Key->ChunkId != IoHash::Zero)
- {
- if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
- {
- if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown)
- {
- if (m_CidStore.ContainsChunk(Request->Key->ChunkId))
- {
- Request->Exists = true;
- }
- }
- else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId))
- {
- if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
- {
- Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(Payload));
- if (Request->Value)
- {
- Request->Exists = true;
- Request->RawSizeKnown = false;
- }
- }
- else
- {
- IoHash _;
- if (CompressedBuffer::ValidateCompressedHeader(Payload, _, Request->RawSize))
- {
- Request->Exists = true;
- Request->RawSizeKnown = true;
- }
- }
- }
- }
- if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
- {
- Request->Key->Policy = ConvertToUpstream(Request->DownstreamPolicy);
- OutUpstreamChunks.push_back(Request->Key);
- }
- }
- Request->ElapsedTimeUs += Timer.GetElapsedTimeUs();
- }
-}
-
-void
-HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespace,
- std::vector<cache::detail::ChunkRequest*>& ValueRequests,
- std::vector<CacheChunkRequest*>& OutUpstreamChunks)
-{
- using namespace cache::detail;
-
- for (ChunkRequest* Request : ValueRequests)
- {
- Stopwatch Timer;
- if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
- {
- ZenCacheValue CacheValue;
- if (m_CacheStore.Get(Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue))
- {
- if (IsCompressedBinary(CacheValue.Value.GetContentType()))
- {
- Request->Key->ChunkId = CacheValue.RawHash;
- Request->Exists = true;
- Request->RawSize = CacheValue.RawSize;
- Request->RawSizeKnown = true;
- if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
- {
- Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value));
- }
- }
- }
- }
- if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
- {
- if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::StoreLocal))
- {
- // Convert the Offset,Size request into a request for the entire value; we will need it all to be able to store it locally
- Request->Key->RawOffset = 0;
- Request->Key->RawSize = UINT64_MAX;
- }
- OutUpstreamChunks.push_back(Request->Key);
- }
- Request->ElapsedTimeUs += Timer.GetElapsedTimeUs();
- }
-}
-
-void
-HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Namespace,
- std::vector<CacheChunkRequest*>& UpstreamChunks,
- std::vector<CacheChunkRequest>& RequestKeys,
- std::vector<cache::detail::ChunkRequest>& Requests)
-{
- using namespace cache::detail;
-
- if (!UpstreamChunks.empty())
- {
- const auto OnCacheChunksGetComplete = [this, Namespace, &RequestKeys, &Requests](CacheChunkGetCompleteParams&& Params) {
- if (Params.RawHash == Params.RawHash.Zero)
- {
- return;
- }
-
- CacheChunkRequest& Key = Params.Request;
- size_t RequestIndex = std::distance(RequestKeys.data(), &Key);
- ChunkRequest& Request = Requests[RequestIndex];
- Request.ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
- if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) ||
- !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
- {
- CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Params.Value));
- if (!Compressed)
- {
- return;
- }
-
- if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal))
- {
- if (Request.IsRecordRequest)
- {
- m_CidStore.AddChunk(Params.Value, Params.RawHash);
- }
- else
- {
- m_CacheStore.Put(Namespace,
- Key.Key.Bucket,
- Key.Key.Hash,
- {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash});
- }
- }
- if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
- {
- Request.Value = std::move(Compressed);
- }
- }
- Key.ChunkId = Params.RawHash;
- Request.Exists = true;
- Request.RawSize = Params.RawSize;
- Request.RawSizeKnown = true;
- Request.Source = Params.Source;
-
- m_CacheStats.UpstreamHitCount++;
- };
-
- m_UpstreamCache.GetCacheChunks(Namespace, UpstreamChunks, std::move(OnCacheChunksGetComplete));
- }
-}
-
-CbPackage
-HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests)
-{
- using namespace cache::detail;
-
- CbPackage RpcResponse;
- CbObjectWriter Writer;
-
- Writer.BeginArray("Result"sv);
- for (ChunkRequest& Request : Requests)
- {
- Writer.BeginObject();
- {
- if (Request.Exists)
- {
- Writer.AddHash("RawHash"sv, Request.Key->ChunkId);
- if (Request.Value && !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
- {
- RpcResponse.AddAttachment(CbAttachment(Request.Value, Request.Key->ChunkId));
- }
- else
- {
- Writer.AddInteger("RawSize"sv, Request.RawSize);
- }
-
- ZEN_DEBUG("GETCACHECHUNKS HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}",
- Namespace,
- Request.Key->Key.Bucket,
- Request.Key->Key.Hash,
- Request.Key->ValueId,
- NiceBytes(Request.RawSize),
- Request.IsRecordRequest ? "Record"sv : "Value"sv,
- Request.Source ? Request.Source->Url : "LOCAL"sv,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
- m_CacheStats.HitCount++;
- }
- else if (!EnumHasAnyFlags(Request.DownstreamPolicy, CachePolicy::Query))
- {
- ZEN_DEBUG("GETCACHECHUNKS DISABLEDQUERY - '{}/{}/{}/{}' in {}",
- Namespace,
- Request.Key->Key.Bucket,
- Request.Key->Key.Hash,
- Request.Key->ValueId,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
- }
- else
- {
- ZEN_DEBUG("GETCACHECHUNKS MISS - '{}/{}/{}/{}' in {}",
- Namespace,
- Request.Key->Key.Bucket,
- Request.Key->Key.Hash,
- Request.Key->ValueId,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
- m_CacheStats.MissCount++;
- }
- }
- Writer.EndObject();
- }
- Writer.EndArray();
-
- RpcResponse.SetObject(Writer.Save());
- return RpcResponse;
-}
-
-void
-HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request)
-{
- CbObjectWriter Cbo;
-
- EmitSnapshot("requests", m_HttpRequests, Cbo);
- EmitSnapshot("upstream_gets", m_UpstreamGetRequestTiming, Cbo);
-
- const uint64_t HitCount = m_CacheStats.HitCount;
- const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount;
- const uint64_t MissCount = m_CacheStats.MissCount;
- const uint64_t TotalCount = HitCount + MissCount;
-
- const CidStoreSize CidSize = m_CidStore.TotalSize();
- const GcStorageSize CacheSize = m_CacheStore.StorageSize();
-
- Cbo.BeginObject("cache");
- {
- Cbo.BeginObject("size");
- {
- Cbo << "disk" << CacheSize.DiskSize;
- Cbo << "memory" << CacheSize.MemorySize;
- }
- Cbo.EndObject();
-
- Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0);
- Cbo << "hits" << HitCount << "misses" << MissCount;
- Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0);
- Cbo << "upstream_hits" << m_CacheStats.UpstreamHitCount;
- Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0);
- }
- Cbo.EndObject();
-
- Cbo.BeginObject("upstream");
- {
- m_UpstreamCache.GetStatus(Cbo);
- }
- Cbo.EndObject();
-
- Cbo.BeginObject("cid");
- {
- Cbo.BeginObject("size");
- {
- Cbo << "tiny" << CidSize.TinySize;
- Cbo << "small" << CidSize.SmallSize;
- Cbo << "large" << CidSize.LargeSize;
- Cbo << "total" << CidSize.TotalSize;
- }
- Cbo.EndObject();
- }
- Cbo.EndObject();
-
- Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
-}
-
-void
-HttpStructuredCacheService::HandleStatusRequest(HttpServerRequest& Request)
-{
- CbObjectWriter Cbo;
- Cbo << "ok" << true;
- Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
-}
-
-#if ZEN_WITH_TESTS
-
-TEST_CASE("z$service.parse.relative.Uri")
-{
- HttpRequestData RootRequest;
- CHECK(HttpRequestParseRelativeUri("", RootRequest));
- CHECK(!RootRequest.Namespace.has_value());
- CHECK(!RootRequest.Bucket.has_value());
- CHECK(!RootRequest.HashKey.has_value());
- CHECK(!RootRequest.ValueContentId.has_value());
-
- RootRequest = {};
- CHECK(HttpRequestParseRelativeUri("/", RootRequest));
- CHECK(!RootRequest.Namespace.has_value());
- CHECK(!RootRequest.Bucket.has_value());
- CHECK(!RootRequest.HashKey.has_value());
- CHECK(!RootRequest.ValueContentId.has_value());
-
- HttpRequestData LegacyBucketRequestBecomesNamespaceRequest;
- CHECK(HttpRequestParseRelativeUri("test", LegacyBucketRequestBecomesNamespaceRequest));
- CHECK(LegacyBucketRequestBecomesNamespaceRequest.Namespace == "test"sv);
- CHECK(!LegacyBucketRequestBecomesNamespaceRequest.Bucket.has_value());
- CHECK(!LegacyBucketRequestBecomesNamespaceRequest.HashKey.has_value());
- CHECK(!LegacyBucketRequestBecomesNamespaceRequest.ValueContentId.has_value());
-
- HttpRequestData LegacyHashKeyRequest;
- CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234", LegacyHashKeyRequest));
- CHECK(LegacyHashKeyRequest.Namespace == ZenCacheStore::DefaultNamespace);
- CHECK(LegacyHashKeyRequest.Bucket == "test"sv);
- CHECK(LegacyHashKeyRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv));
- CHECK(!LegacyHashKeyRequest.ValueContentId.has_value());
-
- HttpRequestData LegacyValueContentIdRequest;
- CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234/56789abcdef12345678956789abcdef123456789",
- LegacyValueContentIdRequest));
- CHECK(LegacyValueContentIdRequest.Namespace == ZenCacheStore::DefaultNamespace);
- CHECK(LegacyValueContentIdRequest.Bucket == "test"sv);
- CHECK(LegacyValueContentIdRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv));
- CHECK(LegacyValueContentIdRequest.ValueContentId == IoHash::FromHexString("56789abcdef12345678956789abcdef123456789"sv));
-
- HttpRequestData V2DefaultNamespaceRequest;
- CHECK(HttpRequestParseRelativeUri("ue4.ddc", V2DefaultNamespaceRequest));
- CHECK(V2DefaultNamespaceRequest.Namespace == "ue4.ddc");
- CHECK(!V2DefaultNamespaceRequest.Bucket.has_value());
- CHECK(!V2DefaultNamespaceRequest.HashKey.has_value());
- CHECK(!V2DefaultNamespaceRequest.ValueContentId.has_value());
-
- HttpRequestData V2NamespaceRequest;
- CHECK(HttpRequestParseRelativeUri("nicenamespace", V2NamespaceRequest));
- CHECK(V2NamespaceRequest.Namespace == "nicenamespace"sv);
- CHECK(!V2NamespaceRequest.Bucket.has_value());
- CHECK(!V2NamespaceRequest.HashKey.has_value());
- CHECK(!V2NamespaceRequest.ValueContentId.has_value());
-
- HttpRequestData V2BucketRequestWithDefaultNamespace;
- CHECK(HttpRequestParseRelativeUri("ue4.ddc/test", V2BucketRequestWithDefaultNamespace));
- CHECK(V2BucketRequestWithDefaultNamespace.Namespace == "ue4.ddc");
- CHECK(V2BucketRequestWithDefaultNamespace.Bucket == "test"sv);
- CHECK(!V2BucketRequestWithDefaultNamespace.HashKey.has_value());
- CHECK(!V2BucketRequestWithDefaultNamespace.ValueContentId.has_value());
-
- HttpRequestData V2BucketRequestWithNamespace;
- CHECK(HttpRequestParseRelativeUri("nicenamespace/test", V2BucketRequestWithNamespace));
- CHECK(V2BucketRequestWithNamespace.Namespace == "nicenamespace"sv);
- CHECK(V2BucketRequestWithNamespace.Bucket == "test"sv);
- CHECK(!V2BucketRequestWithNamespace.HashKey.has_value());
- CHECK(!V2BucketRequestWithNamespace.ValueContentId.has_value());
-
- HttpRequestData V2HashKeyRequest;
- CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234", V2HashKeyRequest));
- CHECK(V2HashKeyRequest.Namespace == ZenCacheStore::DefaultNamespace);
- CHECK(V2HashKeyRequest.Bucket == "test");
- CHECK(V2HashKeyRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv));
- CHECK(!V2HashKeyRequest.ValueContentId.has_value());
-
- HttpRequestData V2ValueContentIdRequest;
- CHECK(
- HttpRequestParseRelativeUri("nicenamespace/test/0123456789abcdef12340123456789abcdef1234/56789abcdef12345678956789abcdef123456789",
- V2ValueContentIdRequest));
- CHECK(V2ValueContentIdRequest.Namespace == "nicenamespace"sv);
- CHECK(V2ValueContentIdRequest.Bucket == "test"sv);
- CHECK(V2ValueContentIdRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv));
- CHECK(V2ValueContentIdRequest.ValueContentId == IoHash::FromHexString("56789abcdef12345678956789abcdef123456789"sv));
-
- HttpRequestData Invalid;
- CHECK(!HttpRequestParseRelativeUri("bad\2_namespace", Invalid));
- CHECK(!HttpRequestParseRelativeUri("nice/\2\1bucket", Invalid));
- CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789a", Invalid));
- CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/56789abcdef1234", Invalid));
- CHECK(!HttpRequestParseRelativeUri("namespace/bucket/pppppppp89abcdef12340123456789abcdef1234", Invalid));
- CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/56789abcd", Invalid));
- CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/ppppppppdef12345678956789abcdef123456789",
- Invalid));
-}
-
-#endif
-
-void
-z$service_forcelink()
-{
-}
-
-} // namespace zen
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
deleted file mode 100644
index 4e7b98ac9..000000000
--- a/zenserver/cache/structuredcache.h
+++ /dev/null
@@ -1,187 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/stats.h>
-#include <zenhttp/httpserver.h>
-
-#include "monitoring/httpstats.h"
-#include "monitoring/httpstatus.h"
-
-#include <memory>
-#include <vector>
-
-namespace spdlog {
-class logger;
-}
-
-namespace zen {
-
-struct CacheChunkRequest;
-struct CacheKeyRequest;
-class CidStore;
-class CbObjectView;
-struct PutRequestData;
-class ScrubContext;
-class UpstreamCache;
-class ZenCacheStore;
-enum class CachePolicy : uint32_t;
-enum class RpcAcceptOptions : uint16_t;
-
-namespace cache {
- class IRpcRequestReplayer;
- class IRpcRequestRecorder;
- namespace detail {
- struct RecordBody;
- struct ChunkRequest;
- } // namespace detail
-} // namespace cache
-
-/**
- * Structured cache service. Imposes constraints on keys, supports blobs and
- * structured values
- *
- * Keys are structured as:
- *
- * {BucketId}/{KeyHash}
- *
- * Where BucketId is a lower-case alphanumeric string, and KeyHash is a 40-character
- * hexadecimal sequence. The hash value may be derived in any number of ways, it's
- * up to the application to pick an approach.
- *
- * Values may be structured or unstructured. Structured values are encoded using Unreal
- * Engine's compact binary encoding (see CbObject)
- *
- * Additionally, attachments may be addressed as:
- *
- * {BucketId}/{KeyHash}/{ValueHash}
- *
- * Where the two initial components are the same as for the main endpoint
- *
- * The storage strategy is as follows:
- *
- * - Structured values are stored in a dedicated backing store per bucket
- * - Unstructured values and attachments are stored in the CAS pool
- *
- */
-
-class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider, public IHttpStatusProvider
-{
-public:
- HttpStructuredCacheService(ZenCacheStore& InCacheStore,
- CidStore& InCidStore,
- HttpStatsService& StatsService,
- HttpStatusService& StatusService,
- UpstreamCache& UpstreamCache);
- ~HttpStructuredCacheService();
-
- virtual const char* BaseUri() const override;
- virtual void HandleRequest(HttpServerRequest& Request) override;
-
- void Flush();
- void Scrub(ScrubContext& Ctx);
-
-private:
- struct CacheRef
- {
- std::string Namespace;
- std::string BucketSegment;
- IoHash HashKey;
- IoHash ValueContentId;
- };
-
- struct CacheStats
- {
- std::atomic_uint64_t HitCount{};
- std::atomic_uint64_t UpstreamHitCount{};
- std::atomic_uint64_t MissCount{};
- };
- enum class PutResult
- {
- Success,
- Fail,
- Invalid,
- };
-
- void HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
- void HandleGetCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
- void HandlePutCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
- void HandleCacheChunkRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
- void HandleGetCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
- void HandlePutCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
- void HandleRpcRequest(HttpServerRequest& Request);
- void HandleDetailsRequest(HttpServerRequest& Request);
-
- CbPackage HandleRpcPutCacheRecords(const CbPackage& BatchRequest);
- CbPackage HandleRpcGetCacheRecords(CbObjectView BatchRequest);
- CbPackage HandleRpcPutCacheValues(const CbPackage& BatchRequest);
- CbPackage HandleRpcGetCacheValues(CbObjectView BatchRequest);
- CbPackage HandleRpcGetCacheChunks(CbObjectView BatchRequest);
- CbPackage HandleRpcRequest(const ZenContentType ContentType,
- IoBuffer&& Body,
- uint32_t& OutAcceptMagic,
- RpcAcceptOptions& OutAcceptFlags,
- int& OutTargetProcessId);
-
- void HandleCacheRequest(HttpServerRequest& Request);
- void HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view Namespace);
- void HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Namespace, std::string_view Bucket);
- virtual void HandleStatsRequest(HttpServerRequest& Request) override;
- virtual void HandleStatusRequest(HttpServerRequest& Request) override;
- PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
-
- /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */
- bool ParseGetCacheChunksRequest(std::string& Namespace,
- std::vector<CacheKeyRequest>& RecordKeys,
- std::vector<cache::detail::RecordBody>& Records,
- std::vector<CacheChunkRequest>& RequestKeys,
- std::vector<cache::detail::ChunkRequest>& Requests,
- std::vector<cache::detail::ChunkRequest*>& RecordRequests,
- std::vector<cache::detail::ChunkRequest*>& ValueRequests,
- CbObjectView RpcRequest);
- /** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally. */
- void GetLocalCacheRecords(std::string_view Namespace,
- std::vector<CacheKeyRequest>& RecordKeys,
- std::vector<cache::detail::RecordBody>& Records,
- std::vector<cache::detail::ChunkRequest*>& RecordRequests,
- std::vector<CacheChunkRequest*>& OutUpstreamChunks);
- /** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */
- void GetLocalCacheValues(std::string_view Namespace,
- std::vector<cache::detail::ChunkRequest*>& ValueRequests,
- std::vector<CacheChunkRequest*>& OutUpstreamChunks);
- /** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */
- void GetUpstreamCacheChunks(std::string_view Namespace,
- std::vector<CacheChunkRequest*>& UpstreamChunks,
- std::vector<CacheChunkRequest>& RequestKeys,
- std::vector<cache::detail::ChunkRequest>& Requests);
- /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */
- CbPackage WriteGetCacheChunksResponse(std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests);
-
- spdlog::logger& Log() { return m_Log; }
- spdlog::logger& m_Log;
- ZenCacheStore& m_CacheStore;
- HttpStatsService& m_StatsService;
- HttpStatusService& m_StatusService;
- CidStore& m_CidStore;
- UpstreamCache& m_UpstreamCache;
- uint64_t m_LastScrubTime = 0;
- metrics::OperationTiming m_HttpRequests;
- metrics::OperationTiming m_UpstreamGetRequestTiming;
- CacheStats m_CacheStats;
-
- void ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount);
-
- std::unique_ptr<cache::IRpcRequestRecorder> m_RequestRecorder;
-};
-
-/** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys.
- * We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers. */
-inline bool
-IsCompressedBinary(ZenContentType Type)
-{
- return Type == ZenContentType::kBinary || Type == ZenContentType::kCompressedBinary;
-}
-
-void z$service_forcelink();
-
-} // namespace zen
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
deleted file mode 100644
index 26e970073..000000000
--- a/zenserver/cache/structuredcachestore.cpp
+++ /dev/null
@@ -1,3648 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "structuredcachestore.h"
-
-#include <zencore/except.h>
-
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinarypackage.h>
-#include <zencore/compactbinaryvalidation.h>
-#include <zencore/compress.h>
-#include <zencore/except.h>
-#include <zencore/filesystem.h>
-#include <zencore/fmtutils.h>
-#include <zencore/logging.h>
-#include <zencore/scopeguard.h>
-#include <zencore/string.h>
-#include <zencore/thread.h>
-#include <zencore/timer.h>
-#include <zencore/trace.h>
-#include <zenstore/cidstore.h>
-#include <zenstore/scrubcontext.h>
-
-#include <xxhash.h>
-
-#include <limits>
-
-#if ZEN_PLATFORM_WINDOWS
-# include <zencore/windows.h>
-#endif
-
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <fmt/core.h>
-#include <gsl/gsl-lite.hpp>
-ZEN_THIRD_PARTY_INCLUDES_END
-
-#if ZEN_WITH_TESTS
-# include <zencore/testing.h>
-# include <zencore/testutils.h>
-# include <zencore/workthreadpool.h>
-# include <random>
-#endif
-
-//////////////////////////////////////////////////////////////////////////
-
-namespace zen {
-
-namespace {
-
-#pragma pack(push)
-#pragma pack(1)
-
- struct CacheBucketIndexHeader
- {
- static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
- static constexpr uint32_t Version2 = 2;
- static constexpr uint32_t CurrentVersion = Version2;
-
- uint32_t Magic = ExpectedMagic;
- uint32_t Version = CurrentVersion;
- uint64_t EntryCount = 0;
- uint64_t LogPosition = 0;
- uint32_t PayloadAlignment = 0;
- uint32_t Checksum = 0;
-
- static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header)
- {
- return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
- }
- };
-
- static_assert(sizeof(CacheBucketIndexHeader) == 32);
-
-#pragma pack(pop)
-
- const char* IndexExtension = ".uidx";
- const char* LogExtension = ".slog";
-
- std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
- {
- return BucketDir / (BucketName + IndexExtension);
- }
-
- std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
- {
- return BucketDir / (BucketName + ".tmp");
- }
-
- std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
- {
- return BucketDir / (BucketName + LogExtension);
- }
-
- bool ValidateEntry(const DiskIndexEntry& Entry, std::string& OutReason)
- {
- if (Entry.Key == IoHash::Zero)
- {
- OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
- return false;
- }
- if (Entry.Location.GetFlags() &
- ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed))
- {
- OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString());
- return false;
- }
- if (Entry.Location.IsFlagSet(DiskLocation::kTombStone))
- {
- return true;
- }
- if (Entry.Location.Reserved != 0)
- {
- OutReason = fmt::format("Invalid reserved field {} for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString());
- return false;
- }
- uint64_t Size = Entry.Location.Size();
- if (Size == 0)
- {
- OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
- return false;
- }
- return true;
- }
-
- bool MoveAndDeleteDirectory(const std::filesystem::path& Dir)
- {
- int DropIndex = 0;
- do
- {
- if (!std::filesystem::exists(Dir))
- {
- return false;
- }
-
- std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex);
- std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName;
- if (std::filesystem::exists(DroppedBucketPath))
- {
- DropIndex++;
- continue;
- }
-
- std::error_code Ec;
- std::filesystem::rename(Dir, DroppedBucketPath, Ec);
- if (!Ec)
- {
- DeleteDirectories(DroppedBucketPath);
- return true;
- }
- // TODO: Do we need to bail at some point?
- zen::Sleep(100);
- } while (true);
- }
-
-} // namespace
-
-namespace fs = std::filesystem;
-
-static CbObject
-LoadCompactBinaryObject(const fs::path& Path)
-{
- FileContents Result = ReadFile(Path);
-
- if (!Result.ErrorCode)
- {
- IoBuffer Buffer = Result.Flatten();
- if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None)
- {
- return LoadCompactBinaryObject(Buffer);
- }
- }
-
- return CbObject();
-}
-
-static void
-SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object)
-{
- WriteFile(Path, Object.GetBuffer().AsIoBuffer());
-}
-
-ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, const std::filesystem::path& RootDir)
-: GcStorage(Gc)
-, GcContributor(Gc)
-, m_RootDir(RootDir)
-, m_DiskLayer(RootDir)
-{
- ZEN_INFO("initializing structured cache at '{}'", RootDir);
- CreateDirectories(RootDir);
-
- m_DiskLayer.DiscoverBuckets();
-
-#if ZEN_USE_CACHE_TRACKER
- m_AccessTracker.reset(new ZenCacheTracker(RootDir));
-#endif
-}
-
-ZenCacheNamespace::~ZenCacheNamespace()
-{
-}
-
-bool
-ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
-{
- ZEN_TRACE_CPU("Z$::Get");
-
- bool Ok = m_MemLayer.Get(InBucket, HashKey, OutValue);
-
-#if ZEN_USE_CACHE_TRACKER
- auto _ = MakeGuard([&] {
- if (!Ok)
- return;
-
- m_AccessTracker->TrackAccess(InBucket, HashKey);
- });
-#endif
-
- if (Ok)
- {
- ZEN_ASSERT(OutValue.Value.Size());
-
- return true;
- }
-
- Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue);
-
- if (Ok)
- {
- ZEN_ASSERT(OutValue.Value.Size());
-
- if (OutValue.Value.Size() <= m_DiskLayerSizeThreshold)
- {
- m_MemLayer.Put(InBucket, HashKey, OutValue);
- }
- }
-
- return Ok;
-}
-
-void
-ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value)
-{
- ZEN_TRACE_CPU("Z$::Put");
-
- // Store value and index
-
- ZEN_ASSERT(Value.Value.Size());
-
- m_DiskLayer.Put(InBucket, HashKey, Value);
-
-#if ZEN_USE_REF_TRACKING
- if (Value.Value.GetContentType() == ZenContentType::kCbObject)
- {
- if (ValidateCompactBinary(Value.Value, CbValidateMode::All) == CbValidateError::None)
- {
- CbObject Object{SharedBuffer(Value.Value)};
-
- uint8_t TempBuffer[8 * sizeof(IoHash)];
- std::pmr::monotonic_buffer_resource Linear{TempBuffer, sizeof TempBuffer};
- std::pmr::polymorphic_allocator Allocator{&Linear};
- std::pmr::vector<IoHash> CidReferences{Allocator};
-
- Object.IterateAttachments([&](CbFieldView Field) { CidReferences.push_back(Field.AsAttachment()); });
-
- m_Gc.OnNewCidReferences(CidReferences);
- }
- }
-#endif
-
- if (Value.Value.Size() <= m_DiskLayerSizeThreshold)
- {
- m_MemLayer.Put(InBucket, HashKey, Value);
- }
-}
-
-bool
-ZenCacheNamespace::DropBucket(std::string_view Bucket)
-{
- ZEN_INFO("dropping bucket '{}'", Bucket);
-
- // TODO: should ensure this is done atomically across all layers
-
- const bool MemDropped = m_MemLayer.DropBucket(Bucket);
- const bool DiskDropped = m_DiskLayer.DropBucket(Bucket);
- const bool AnyDropped = MemDropped || DiskDropped;
-
- ZEN_INFO("bucket '{}' was {}", Bucket, AnyDropped ? "dropped" : "not found");
-
- return AnyDropped;
-}
-
-bool
-ZenCacheNamespace::Drop()
-{
- m_MemLayer.Drop();
- return m_DiskLayer.Drop();
-}
-
-void
-ZenCacheNamespace::Flush()
-{
- m_DiskLayer.Flush();
-}
-
-void
-ZenCacheNamespace::Scrub(ScrubContext& Ctx)
-{
- if (m_LastScrubTime == Ctx.ScrubTimestamp())
- {
- return;
- }
-
- m_LastScrubTime = Ctx.ScrubTimestamp();
-
- m_DiskLayer.Scrub(Ctx);
- m_MemLayer.Scrub(Ctx);
-}
-
-void
-ZenCacheNamespace::GatherReferences(GcContext& GcCtx)
-{
- Stopwatch Timer;
- const auto Guard =
- MakeGuard([&] { ZEN_DEBUG("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
-
- access_tracking::AccessTimes AccessTimes;
- m_MemLayer.GatherAccessTimes(AccessTimes);
-
- m_DiskLayer.UpdateAccessTimes(AccessTimes);
- m_DiskLayer.GatherReferences(GcCtx);
-}
-
-void
-ZenCacheNamespace::CollectGarbage(GcContext& GcCtx)
-{
- m_MemLayer.Reset();
- m_DiskLayer.CollectGarbage(GcCtx);
-}
-
-GcStorageSize
-ZenCacheNamespace::StorageSize() const
-{
- return {.DiskSize = m_DiskLayer.TotalSize(), .MemorySize = m_MemLayer.TotalSize()};
-}
-
-ZenCacheNamespace::Info
-ZenCacheNamespace::GetInfo() const
-{
- ZenCacheNamespace::Info Info = {.Config = {.RootDir = m_RootDir, .DiskLayerThreshold = m_DiskLayerSizeThreshold},
- .DiskLayerInfo = m_DiskLayer.GetInfo(),
- .MemoryLayerInfo = m_MemLayer.GetInfo()};
- std::unordered_set<std::string> BucketNames;
- for (const std::string& BucketName : Info.DiskLayerInfo.BucketNames)
- {
- BucketNames.insert(BucketName);
- }
- for (const std::string& BucketName : Info.MemoryLayerInfo.BucketNames)
- {
- BucketNames.insert(BucketName);
- }
- Info.BucketNames.insert(Info.BucketNames.end(), BucketNames.begin(), BucketNames.end());
- return Info;
-}
-
-std::optional<ZenCacheNamespace::BucketInfo>
-ZenCacheNamespace::GetBucketInfo(std::string_view Bucket) const
-{
- std::optional<ZenCacheDiskLayer::BucketInfo> DiskBucketInfo = m_DiskLayer.GetBucketInfo(Bucket);
- if (!DiskBucketInfo.has_value())
- {
- return {};
- }
- ZenCacheNamespace::BucketInfo Info = {.DiskLayerInfo = *DiskBucketInfo,
- .MemoryLayerInfo = m_MemLayer.GetBucketInfo(Bucket).value_or(ZenCacheMemoryLayer::BucketInfo{})};
- return Info;
-}
-
-CacheValueDetails::NamespaceDetails
-ZenCacheNamespace::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const
-{
- return m_DiskLayer.GetValueDetails(BucketFilter, ValueFilter);
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-ZenCacheMemoryLayer::ZenCacheMemoryLayer()
-{
-}
-
-ZenCacheMemoryLayer::~ZenCacheMemoryLayer()
-{
-}
-
-bool
-ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
-{
- RwLock::SharedLockScope _(m_Lock);
-
- auto It = m_Buckets.find(std::string(InBucket));
-
- if (It == m_Buckets.end())
- {
- return false;
- }
-
- CacheBucket* Bucket = It->second.get();
-
- _.ReleaseNow();
-
- // There's a race here. Since the lock is released early to allow
- // inserts, the bucket delete path could end up deleting the
- // underlying data structure
-
- return Bucket->Get(HashKey, OutValue);
-}
-
-void
-ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value)
-{
- const auto BucketName = std::string(InBucket);
- CacheBucket* Bucket = nullptr;
-
- {
- RwLock::SharedLockScope _(m_Lock);
-
- if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end())
- {
- Bucket = It->second.get();
- }
- }
-
- if (Bucket == nullptr)
- {
- // New bucket
-
- RwLock::ExclusiveLockScope _(m_Lock);
-
- if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end())
- {
- Bucket = It->second.get();
- }
- else
- {
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>());
- Bucket = InsertResult.first->second.get();
- }
- }
-
- // Note that since the underlying IoBuffer is retained, the content type is also
-
- Bucket->Put(HashKey, Value);
-}
-
-bool
-ZenCacheMemoryLayer::DropBucket(std::string_view InBucket)
-{
- RwLock::ExclusiveLockScope _(m_Lock);
-
- auto It = m_Buckets.find(std::string(InBucket));
-
- if (It != m_Buckets.end())
- {
- CacheBucket& Bucket = *It->second;
- m_DroppedBuckets.push_back(std::move(It->second));
- m_Buckets.erase(It);
- Bucket.Drop();
- return true;
- }
- return false;
-}
-
-void
-ZenCacheMemoryLayer::Drop()
-{
- RwLock::ExclusiveLockScope _(m_Lock);
- std::vector<std::unique_ptr<CacheBucket>> Buckets;
- Buckets.reserve(m_Buckets.size());
- while (!m_Buckets.empty())
- {
- const auto& It = m_Buckets.begin();
- CacheBucket& Bucket = *It->second;
- m_DroppedBuckets.push_back(std::move(It->second));
- m_Buckets.erase(It->first);
- Bucket.Drop();
- }
-}
-
-void
-ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx)
-{
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto& Kv : m_Buckets)
- {
- Kv.second->Scrub(Ctx);
- }
-}
-
-void
-ZenCacheMemoryLayer::GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes)
-{
- using namespace zen::access_tracking;
-
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto& Kv : m_Buckets)
- {
- std::vector<KeyAccessTime>& Bucket = AccessTimes.Buckets[Kv.first];
- Kv.second->GatherAccessTimes(Bucket);
- }
-}
-
-void
-ZenCacheMemoryLayer::Reset()
-{
- RwLock::ExclusiveLockScope _(m_Lock);
- m_Buckets.clear();
-}
-
-uint64_t
-ZenCacheMemoryLayer::TotalSize() const
-{
- uint64_t TotalSize{};
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto& Kv : m_Buckets)
- {
- TotalSize += Kv.second->TotalSize();
- }
-
- return TotalSize;
-}
-
-ZenCacheMemoryLayer::Info
-ZenCacheMemoryLayer::GetInfo() const
-{
- ZenCacheMemoryLayer::Info Info = {.Config = m_Configuration, .TotalSize = TotalSize()};
-
- RwLock::SharedLockScope _(m_Lock);
- Info.BucketNames.reserve(m_Buckets.size());
- for (auto& Kv : m_Buckets)
- {
- Info.BucketNames.push_back(Kv.first);
- Info.EntryCount += Kv.second->EntryCount();
- }
- return Info;
-}
-
-std::optional<ZenCacheMemoryLayer::BucketInfo>
-ZenCacheMemoryLayer::GetBucketInfo(std::string_view Bucket) const
-{
- RwLock::SharedLockScope _(m_Lock);
-
- if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end())
- {
- return ZenCacheMemoryLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()};
- }
- return {};
-}
-
-void
-ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
-{
- RwLock::SharedLockScope _(m_BucketLock);
-
- std::vector<IoHash> BadHashes;
-
- auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) {
- if (ContentType == ZenContentType::kCbObject)
- {
- CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All);
- return Error == CbValidateError::None;
- }
- if (ContentType == ZenContentType::kCompressedBinary)
- {
- IoHash RawHash;
- uint64_t RawSize;
- if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
- {
- return false;
- }
- if (Hash != RawHash)
- {
- return false;
- }
- }
- return true;
- };
-
- for (auto& Kv : m_CacheMap)
- {
- const BucketPayload& Payload = m_Payloads[Kv.second];
- if (!ValidateEntry(Kv.first, Payload.Payload.GetContentType(), Payload.Payload))
- {
- BadHashes.push_back(Kv.first);
- }
- }
-
- if (!BadHashes.empty())
- {
- Ctx.ReportBadCidChunks(BadHashes);
- }
-}
-
-void
-ZenCacheMemoryLayer::CacheBucket::GatherAccessTimes(std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes)
-{
- RwLock::SharedLockScope _(m_BucketLock);
- std::transform(m_CacheMap.begin(), m_CacheMap.end(), std::back_inserter(AccessTimes), [this](const auto& Kv) {
- return access_tracking::KeyAccessTime{.Key = Kv.first, .LastAccess = m_AccessTimes[Kv.second]};
- });
-}
-
-bool
-ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
-{
- RwLock::SharedLockScope _(m_BucketLock);
-
- if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end())
- {
- uint32_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size());
- ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size());
-
- const BucketPayload& Payload = m_Payloads[EntryIndex];
- OutValue = {.Value = Payload.Payload, .RawSize = Payload.RawSize, .RawHash = Payload.RawHash};
- m_AccessTimes[EntryIndex] = GcClock::TickCount();
-
- return true;
- }
-
- return false;
-}
-
-void
-ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
-{
- size_t PayloadSize = Value.Value.GetSize();
- {
- GcClock::Tick AccessTime = GcClock::TickCount();
- RwLock::ExclusiveLockScope _(m_BucketLock);
- if (m_CacheMap.size() == std::numeric_limits<uint32_t>::max())
- {
- // No more space in our memory cache!
- return;
- }
- if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end())
- {
- uint32_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size());
-
- m_TotalSize.fetch_sub(PayloadSize, std::memory_order::relaxed);
- BucketPayload& Payload = m_Payloads[EntryIndex];
- Payload.Payload = Value.Value;
- Payload.RawHash = Value.RawHash;
- Payload.RawSize = gsl::narrow<uint32_t>(Value.RawSize);
- m_AccessTimes[EntryIndex] = AccessTime;
- }
- else
- {
- uint32_t EntryIndex = gsl::narrow<uint32_t>(m_Payloads.size());
- m_Payloads.emplace_back(
- BucketPayload{.Payload = Value.Value, .RawSize = gsl::narrow<uint32_t>(Value.RawSize), .RawHash = Value.RawHash});
- m_AccessTimes.emplace_back(AccessTime);
- m_CacheMap.insert_or_assign(HashKey, EntryIndex);
- }
- ZEN_ASSERT_SLOW(m_Payloads.size() == m_CacheMap.size());
- ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size());
- }
-
- m_TotalSize.fetch_add(PayloadSize, std::memory_order::relaxed);
-}
-
-void
-ZenCacheMemoryLayer::CacheBucket::Drop()
-{
- RwLock::ExclusiveLockScope _(m_BucketLock);
- m_CacheMap.clear();
- m_AccessTimes.clear();
- m_Payloads.clear();
- m_TotalSize.store(0);
-}
-
-uint64_t
-ZenCacheMemoryLayer::CacheBucket::EntryCount() const
-{
- RwLock::SharedLockScope _(m_BucketLock);
- return static_cast<uint64_t>(m_CacheMap.size());
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName) : m_BucketName(std::move(BucketName)), m_BucketId(Oid::Zero)
-{
-}
-
-ZenCacheDiskLayer::CacheBucket::~CacheBucket()
-{
-}
-
-bool
-ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate)
-{
- using namespace std::literals;
-
- m_BlocksBasePath = BucketDir / "blocks";
- m_BucketDir = BucketDir;
-
- CreateDirectories(m_BucketDir);
-
- std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"};
-
- bool IsNew = false;
-
- CbObject Manifest = LoadCompactBinaryObject(ManifestPath);
-
- if (Manifest)
- {
- m_BucketId = Manifest["BucketId"sv].AsObjectId();
- if (m_BucketId == Oid::Zero)
- {
- return false;
- }
- }
- else if (AllowCreate)
- {
- m_BucketId.Generate();
-
- CbObjectWriter Writer;
- Writer << "BucketId"sv << m_BucketId;
- Manifest = Writer.Save();
- SaveCompactBinaryObject(ManifestPath, Manifest);
- IsNew = true;
- }
- else
- {
- return false;
- }
-
- OpenLog(IsNew);
-
- if (!IsNew)
- {
- Stopwatch Timer;
- const auto _ =
- MakeGuard([&] { ZEN_INFO("read store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
-
- for (CbFieldView Entry : Manifest["Timestamps"sv])
- {
- const CbObjectView Obj = Entry.AsObjectView();
- const IoHash Key = Obj["Key"sv].AsHash();
-
- if (auto It = m_Index.find(Key); It != m_Index.end())
- {
- size_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
- m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64();
- }
- }
- for (CbFieldView Entry : Manifest["RawInfo"sv])
- {
- const CbObjectView Obj = Entry.AsObjectView();
- const IoHash Key = Obj["Key"sv].AsHash();
- if (auto It = m_Index.find(Key); It != m_Index.end())
- {
- size_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size());
- m_Payloads[EntryIndex].RawHash = Obj["RawHash"sv].AsHash();
- m_Payloads[EntryIndex].RawSize = Obj["RawSize"sv].AsUInt64();
- }
- }
- }
-
- return true;
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot()
-{
- uint64_t LogCount = m_SlogFile.GetLogCount();
- if (m_LogFlushPosition == LogCount)
- {
- return;
- }
-
- ZEN_DEBUG("write store snapshot for '{}'", m_BucketDir / m_BucketName);
- uint64_t EntryCount = 0;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}",
- m_BucketDir / m_BucketName,
- EntryCount,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- namespace fs = std::filesystem;
-
- fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
- fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName);
-
- // Move index away, we keep it if something goes wrong
- if (fs::is_regular_file(STmpIndexPath))
- {
- fs::remove(STmpIndexPath);
- }
- if (fs::is_regular_file(IndexPath))
- {
- fs::rename(IndexPath, STmpIndexPath);
- }
-
- try
- {
- // Write the current state of the location map to a new index state
- std::vector<DiskIndexEntry> Entries;
-
- {
- Entries.resize(m_Index.size());
-
- uint64_t EntryIndex = 0;
- for (auto& Entry : m_Index)
- {
- DiskIndexEntry& IndexEntry = Entries[EntryIndex++];
- IndexEntry.Key = Entry.first;
- IndexEntry.Location = m_Payloads[Entry.second].Location;
- }
- }
-
- BasicFile ObjectIndexFile;
- ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate);
- CacheBucketIndexHeader Header = {.EntryCount = Entries.size(),
- .LogPosition = LogCount,
- .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)};
-
- Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header);
-
- ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0);
- ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader));
- ObjectIndexFile.Flush();
- ObjectIndexFile.Close();
- EntryCount = Entries.size();
- m_LogFlushPosition = LogCount;
- }
- catch (std::exception& Err)
- {
- ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what());
-
- // Restore any previous snapshot
-
- if (fs::is_regular_file(STmpIndexPath))
- {
- fs::remove(IndexPath);
- fs::rename(STmpIndexPath, IndexPath);
- }
- }
- if (fs::is_regular_file(STmpIndexPath))
- {
- fs::remove(STmpIndexPath);
- }
-}
-
-uint64_t
-ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion)
-{
- if (std::filesystem::is_regular_file(IndexPath))
- {
- BasicFile ObjectIndexFile;
- ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
- uint64_t Size = ObjectIndexFile.FileSize();
- if (Size >= sizeof(CacheBucketIndexHeader))
- {
- CacheBucketIndexHeader Header;
- ObjectIndexFile.Read(&Header, sizeof(Header), 0);
- if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) &&
- (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0))
- {
- switch (Header.Version)
- {
- case CacheBucketIndexHeader::Version2:
- {
- uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
- if (Header.EntryCount > ExpectedEntryCount)
- {
- break;
- }
- size_t EntryCount = 0;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("read store '{}' index containing {} entries in {}",
- IndexPath,
- EntryCount,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- m_PayloadAlignment = Header.PayloadAlignment;
-
- std::vector<DiskIndexEntry> Entries;
- Entries.resize(Header.EntryCount);
- ObjectIndexFile.Read(Entries.data(),
- Header.EntryCount * sizeof(DiskIndexEntry),
- sizeof(CacheBucketIndexHeader));
-
- m_Payloads.reserve(Header.EntryCount);
- m_AccessTimes.reserve(Header.EntryCount);
- m_Index.reserve(Header.EntryCount);
-
- std::string InvalidEntryReason;
- for (const DiskIndexEntry& Entry : Entries)
- {
- if (!ValidateEntry(Entry, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
- continue;
- }
- size_t EntryIndex = m_Payloads.size();
- m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location, .RawSize = 0, .RawHash = IoHash::Zero});
- m_AccessTimes.emplace_back(GcClock::TickCount());
- m_Index.insert_or_assign(Entry.Key, EntryIndex);
- EntryCount++;
- }
- OutVersion = CacheBucketIndexHeader::Version2;
- return Header.LogPosition;
- }
- break;
- default:
- break;
- }
- }
- }
- ZEN_WARN("skipping invalid index file '{}'", IndexPath);
- }
- return 0;
-}
-
-uint64_t
-ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount)
-{
- if (std::filesystem::is_regular_file(LogPath))
- {
- uint64_t LogEntryCount = 0;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
- TCasLogFile<DiskIndexEntry> CasLog;
- CasLog.Open(LogPath, CasLogFile::Mode::kRead);
- if (CasLog.Initialize())
- {
- uint64_t EntryCount = CasLog.GetLogCount();
- if (EntryCount < SkipEntryCount)
- {
- ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
- SkipEntryCount = 0;
- }
- LogEntryCount = EntryCount - SkipEntryCount;
- m_Index.reserve(LogEntryCount);
- uint64_t InvalidEntryCount = 0;
- CasLog.Replay(
- [&](const DiskIndexEntry& Record) {
- std::string InvalidEntryReason;
- if (Record.Location.Flags & DiskLocation::kTombStone)
- {
- m_Index.erase(Record.Key);
- return;
- }
- if (!ValidateEntry(Record, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
- ++InvalidEntryCount;
- return;
- }
- size_t EntryIndex = m_Payloads.size();
- m_Payloads.emplace_back(BucketPayload{.Location = Record.Location, .RawSize = 0u, .RawHash = IoHash::Zero});
- m_AccessTimes.emplace_back(GcClock::TickCount());
- m_Index.insert_or_assign(Record.Key, EntryIndex);
- },
- SkipEntryCount);
- if (InvalidEntryCount)
- {
- ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName);
- }
- return LogEntryCount;
- }
- }
- return 0;
-};
-
-void
-ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
-{
- m_TotalStandaloneSize = 0;
-
- m_Index.clear();
- m_Payloads.clear();
- m_AccessTimes.clear();
-
- std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
- std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
-
- if (IsNew)
- {
- fs::remove(LogPath);
- fs::remove(IndexPath);
- fs::remove_all(m_BlocksBasePath);
- }
-
- uint64_t LogEntryCount = 0;
- {
- uint32_t IndexVersion = 0;
- m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion);
- if (IndexVersion == 0 && std::filesystem::is_regular_file(IndexPath))
- {
- ZEN_WARN("removing invalid index file at '{}'", IndexPath);
- fs::remove(IndexPath);
- }
-
- if (TCasLogFile<DiskIndexEntry>::IsValid(LogPath))
- {
- LogEntryCount = ReadLog(LogPath, m_LogFlushPosition);
- }
- else
- {
- ZEN_WARN("removing invalid cas log at '{}'", LogPath);
- fs::remove(LogPath);
- }
- }
-
- CreateDirectories(m_BucketDir);
-
- m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
-
- std::vector<BlockStoreLocation> KnownLocations;
- KnownLocations.reserve(m_Index.size());
- for (const auto& Entry : m_Index)
- {
- size_t EntryIndex = Entry.second;
- const BucketPayload& Payload = m_Payloads[EntryIndex];
- const DiskLocation& Location = Payload.Location;
-
- if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- m_TotalStandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed);
- continue;
- }
- const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment);
- KnownLocations.push_back(BlockLocation);
- }
-
- m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations);
- if (IsNew || LogEntryCount > 0)
- {
- MakeIndexSnapshot();
- }
- // TODO: should validate integrity of container files here
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const
-{
- char HexString[sizeof(HashKey.Hash) * 2];
- ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, HexString);
-
- Path.Append(m_BucketDir);
- Path.AppendSeparator();
- Path.Append(L"blob");
- Path.AppendSeparator();
- Path.AppendAsciiRange(HexString, HexString + 3);
- Path.AppendSeparator();
- Path.AppendAsciiRange(HexString + 3, HexString + 5);
- Path.AppendSeparator();
- Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString));
-}
-
-IoBuffer
-ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) const
-{
- BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment);
-
- IoBuffer Value = m_BlockStore.TryGetChunk(Location);
- if (Value)
- {
- Value.SetContentType(Loc.GetContentType());
- }
-
- return Value;
-}
-
-IoBuffer
-ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const
-{
- ExtendablePathBuilder<256> DataFilePath;
- BuildPath(DataFilePath, HashKey);
-
- RwLock::SharedLockScope ValueLock(LockForHash(HashKey));
-
- if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath()))
- {
- Data.SetContentType(Loc.GetContentType());
-
- return Data;
- }
-
- return {};
-}
-
-bool
-ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
-{
- RwLock::SharedLockScope _(m_IndexLock);
- auto It = m_Index.find(HashKey);
- if (It == m_Index.end())
- {
- return false;
- }
- size_t EntryIndex = It.value();
- const BucketPayload& Payload = m_Payloads[EntryIndex];
- m_AccessTimes[EntryIndex] = GcClock::TickCount();
- DiskLocation Location = Payload.Location;
- OutValue.RawSize = Payload.RawSize;
- OutValue.RawHash = Payload.RawHash;
- if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- // We don't need to hold the index lock when we read a standalone file
- _.ReleaseNow();
- OutValue.Value = GetStandaloneCacheValue(Location, HashKey);
- }
- else
- {
- OutValue.Value = GetInlineCacheValue(Location);
- }
- _.ReleaseNow();
-
- if (!Location.IsFlagSet(DiskLocation::kStructured))
- {
- if (OutValue.RawHash == IoHash::Zero && OutValue.RawSize == 0 && OutValue.Value.GetSize() > 0)
- {
- if (Location.IsFlagSet(DiskLocation::kCompressed))
- {
- (void)CompressedBuffer::FromCompressed(SharedBuffer(OutValue.Value), OutValue.RawHash, OutValue.RawSize);
- }
- else
- {
- OutValue.RawHash = IoHash::HashBuffer(OutValue.Value);
- OutValue.RawSize = OutValue.Value.GetSize();
- }
- RwLock::ExclusiveLockScope __(m_IndexLock);
- if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end())
- {
- BucketPayload& WritePayload = m_Payloads[WriteIt.value()];
- WritePayload.RawHash = OutValue.RawHash;
- WritePayload.RawSize = OutValue.RawSize;
-
- m_LogFlushPosition = 0; // Force resave of index on exit
- }
- }
- }
-
- return (bool)OutValue.Value;
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
-{
- if (Value.Value.Size() >= m_LargeObjectThreshold)
- {
- return PutStandaloneCacheValue(HashKey, Value);
- }
- PutInlineCacheValue(HashKey, Value);
-}
-
-bool
-ZenCacheDiskLayer::CacheBucket::Drop()
-{
- RwLock::ExclusiveLockScope _(m_IndexLock);
-
- std::vector<std::unique_ptr<RwLock::ExclusiveLockScope>> ShardLocks;
- ShardLocks.reserve(256);
- for (RwLock& Lock : m_ShardedLocks)
- {
- ShardLocks.push_back(std::make_unique<RwLock::ExclusiveLockScope>(Lock));
- }
- m_BlockStore.Close();
- m_SlogFile.Close();
-
- bool Deleted = MoveAndDeleteDirectory(m_BucketDir);
-
- m_Index.clear();
- m_Payloads.clear();
- m_AccessTimes.clear();
- return Deleted;
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::Flush()
-{
- m_BlockStore.Flush();
-
- RwLock::SharedLockScope _(m_IndexLock);
- m_SlogFile.Flush();
- MakeIndexSnapshot();
- SaveManifest();
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::SaveManifest()
-{
- using namespace std::literals;
-
- CbObjectWriter Writer;
- Writer << "BucketId"sv << m_BucketId;
-
- if (!m_Index.empty())
- {
- Writer.BeginArray("Timestamps"sv);
- for (auto& Kv : m_Index)
- {
- const IoHash& Key = Kv.first;
- GcClock::Tick AccessTime = m_AccessTimes[Kv.second];
-
- Writer.BeginObject();
- Writer << "Key"sv << Key;
- Writer << "LastAccess"sv << AccessTime;
- Writer.EndObject();
- }
- Writer.EndArray();
-
- Writer.BeginArray("RawInfo"sv);
- {
- for (auto& Kv : m_Index)
- {
- const IoHash& Key = Kv.first;
- const BucketPayload& Payload = m_Payloads[Kv.second];
- if (Payload.RawHash != IoHash::Zero)
- {
- Writer.BeginObject();
- Writer << "Key"sv << Key;
- Writer << "RawHash"sv << Payload.RawHash;
- Writer << "RawSize"sv << Payload.RawSize;
- Writer.EndObject();
- }
- }
- }
- Writer.EndArray();
- }
-
- SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Writer.Save());
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
-{
- std::vector<IoHash> BadKeys;
- uint64_t ChunkCount{0}, ChunkBytes{0};
- std::vector<BlockStoreLocation> ChunkLocations;
- std::vector<IoHash> ChunkIndexToChunkHash;
-
- auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) {
- if (ContentType == ZenContentType::kCbObject)
- {
- CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All);
- return Error == CbValidateError::None;
- }
- if (ContentType == ZenContentType::kCompressedBinary)
- {
- IoHash RawHash;
- uint64_t RawSize;
- if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
- {
- return false;
- }
- if (RawHash != Hash)
- {
- return false;
- }
- }
- return true;
- };
-
- RwLock::SharedLockScope _(m_IndexLock);
-
- const size_t BlockChunkInitialCount = m_Index.size() / 4;
- ChunkLocations.reserve(BlockChunkInitialCount);
- ChunkIndexToChunkHash.reserve(BlockChunkInitialCount);
-
- for (auto& Kv : m_Index)
- {
- const IoHash& HashKey = Kv.first;
- const BucketPayload& Payload = m_Payloads[Kv.second];
- const DiskLocation& Loc = Payload.Location;
-
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- ++ChunkCount;
- ChunkBytes += Loc.Size();
- if (Loc.GetContentType() == ZenContentType::kBinary)
- {
- ExtendablePathBuilder<256> DataFilePath;
- BuildPath(DataFilePath, HashKey);
-
- RwLock::SharedLockScope ValueLock(LockForHash(HashKey));
-
- std::error_code Ec;
- uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec);
- if (Ec)
- {
- BadKeys.push_back(HashKey);
- }
- if (size != Loc.Size())
- {
- BadKeys.push_back(HashKey);
- }
- continue;
- }
- IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey);
- if (!Buffer)
- {
- BadKeys.push_back(HashKey);
- continue;
- }
- if (!ValidateEntry(HashKey, Loc.GetContentType(), Buffer))
- {
- BadKeys.push_back(HashKey);
- continue;
- }
- }
- else
- {
- ChunkLocations.emplace_back(Loc.GetBlockLocation(m_PayloadAlignment));
- ChunkIndexToChunkHash.push_back(HashKey);
- continue;
- }
- }
-
- const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
- ++ChunkCount;
- ChunkBytes += Size;
- const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
- if (!Data)
- {
- // ChunkLocation out of range of stored blocks
- BadKeys.push_back(Hash);
- return;
- }
- IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
- if (!Buffer)
- {
- BadKeys.push_back(Hash);
- return;
- }
- const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
- ZenContentType ContentType = Payload.Location.GetContentType();
- if (!ValidateEntry(Hash, ContentType, Buffer))
- {
- BadKeys.push_back(Hash);
- return;
- }
- };
-
- const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
- ++ChunkCount;
- ChunkBytes += Size;
- const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
- // TODO: Add API to verify compressed buffer and possible structure data without having to memorymap the whole file
- IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
- if (!Buffer)
- {
- BadKeys.push_back(Hash);
- return;
- }
- const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
- ZenContentType ContentType = Payload.Location.GetContentType();
- if (!ValidateEntry(Hash, ContentType, Buffer))
- {
- BadKeys.push_back(Hash);
- return;
- }
- };
-
- m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk);
-
- _.ReleaseNow();
-
- Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
-
- if (!BadKeys.empty())
- {
- ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir / m_BucketName);
-
- if (Ctx.RunRecovery())
- {
- // Deal with bad chunks by removing them from our lookup map
-
- std::vector<DiskIndexEntry> LogEntries;
- LogEntries.reserve(BadKeys.size());
-
- {
- RwLock::ExclusiveLockScope __(m_IndexLock);
- for (const IoHash& BadKey : BadKeys)
- {
- // Log a tombstone and delete the in-memory index for the bad entry
- const auto It = m_Index.find(BadKey);
- const BucketPayload& Payload = m_Payloads[It->second];
- DiskLocation Location = Payload.Location;
- Location.Flags |= DiskLocation::kTombStone;
- LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location});
- m_Index.erase(BadKey);
- }
- }
- for (const DiskIndexEntry& Entry : LogEntries)
- {
- if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- ExtendablePathBuilder<256> Path;
- BuildPath(Path, Entry.Key);
- fs::path FilePath = Path.ToPath();
- RwLock::ExclusiveLockScope ValueLock(LockForHash(Entry.Key));
- if (fs::is_regular_file(FilePath))
- {
- ZEN_DEBUG("deleting bad standalone cache file '{}'", Path.ToUtf8());
- std::error_code Ec;
- fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file...
- }
- m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
- }
- }
- m_SlogFile.Append(LogEntries);
-
- // Clean up m_AccessTimes and m_Payloads vectors
- {
- std::vector<BucketPayload> Payloads;
- std::vector<AccessTime> AccessTimes;
- IndexMap Index;
-
- {
- RwLock::ExclusiveLockScope __(m_IndexLock);
- size_t EntryCount = m_Index.size();
- Payloads.reserve(EntryCount);
- AccessTimes.reserve(EntryCount);
- Index.reserve(EntryCount);
- for (auto It : m_Index)
- {
- size_t EntryIndex = Payloads.size();
- Payloads.push_back(m_Payloads[EntryIndex]);
- AccessTimes.push_back(m_AccessTimes[EntryIndex]);
- Index.insert({It.first, EntryIndex});
- }
- m_Index.swap(Index);
- m_Payloads.swap(Payloads);
- m_AccessTimes.swap(AccessTimes);
- }
- }
- }
- }
-
- // Let whomever it concerns know about the bad chunks. This could
- // be used to invalidate higher level data structures more efficiently
- // than a full validation pass might be able to do
- Ctx.ReportBadCidChunks(BadKeys);
-
- ZEN_INFO("cache bucket scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes));
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
-{
- ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::GatherReferences");
-
- uint64_t WriteBlockTimeUs = 0;
- uint64_t WriteBlockLongestTimeUs = 0;
- uint64_t ReadBlockTimeUs = 0;
- uint64_t ReadBlockLongestTimeUs = 0;
-
- Stopwatch TotalTimer;
- const auto _ = MakeGuard([&] {
- ZEN_DEBUG("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})",
- m_BucketDir / m_BucketName,
- NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
- NiceLatencyNs(WriteBlockTimeUs),
- NiceLatencyNs(WriteBlockLongestTimeUs),
- NiceLatencyNs(ReadBlockTimeUs),
- NiceLatencyNs(ReadBlockLongestTimeUs));
- });
-
- const GcClock::TimePoint ExpireTime = GcCtx.ExpireTime();
-
- const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
-
- IndexMap Index;
- std::vector<AccessTime> AccessTimes;
- std::vector<BucketPayload> Payloads;
- {
- RwLock::SharedLockScope __(m_IndexLock);
- Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- Index = m_Index;
- AccessTimes = m_AccessTimes;
- Payloads = m_Payloads;
- }
-
- std::vector<IoHash> ExpiredKeys;
- ExpiredKeys.reserve(1024);
-
- std::vector<IoHash> Cids;
- Cids.reserve(1024);
-
- for (const auto& Entry : Index)
- {
- const IoHash& Key = Entry.first;
- GcClock::Tick AccessTime = AccessTimes[Entry.second];
- if (AccessTime < ExpireTicks)
- {
- ExpiredKeys.push_back(Key);
- continue;
- }
-
- const DiskLocation& Loc = Payloads[Entry.second].Location;
-
- if (Loc.IsFlagSet(DiskLocation::kStructured))
- {
- if (Cids.size() > 1024)
- {
- GcCtx.AddRetainedCids(Cids);
- Cids.clear();
- }
-
- IoBuffer Buffer;
- {
- RwLock::SharedLockScope __(m_IndexLock);
- Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- // We don't need to hold the index lock when we read a standalone file
- __.ReleaseNow();
- if (Buffer = GetStandaloneCacheValue(Loc, Key); !Buffer)
- {
- continue;
- }
- }
- else if (Buffer = GetInlineCacheValue(Loc); !Buffer)
- {
- continue;
- }
- }
-
- ZEN_ASSERT(Buffer);
- ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject);
- CbObject Obj(SharedBuffer{Buffer});
- Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); });
- }
- }
-
- GcCtx.AddRetainedCids(Cids);
- GcCtx.SetExpiredCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys));
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
-{
- ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::CollectGarbage");
-
- ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir / m_BucketName);
-
- Stopwatch TotalTimer;
- uint64_t WriteBlockTimeUs = 0;
- uint64_t WriteBlockLongestTimeUs = 0;
- uint64_t ReadBlockTimeUs = 0;
- uint64_t ReadBlockLongestTimeUs = 0;
- uint64_t TotalChunkCount = 0;
- uint64_t DeletedSize = 0;
- uint64_t OldTotalSize = TotalSize();
-
- std::unordered_set<IoHash> DeletedChunks;
- uint64_t MovedCount = 0;
-
- const auto _ = MakeGuard([&] {
- ZEN_DEBUG(
- "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved "
- "{} "
- "of {} "
- "entires ({}).",
- m_BucketDir / m_BucketName,
- NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
- NiceLatencyNs(WriteBlockTimeUs),
- NiceLatencyNs(WriteBlockLongestTimeUs),
- NiceLatencyNs(ReadBlockTimeUs),
- NiceLatencyNs(ReadBlockLongestTimeUs),
- NiceBytes(DeletedSize),
- DeletedChunks.size(),
- MovedCount,
- TotalChunkCount,
- NiceBytes(OldTotalSize));
- RwLock::SharedLockScope _(m_IndexLock);
- SaveManifest();
- });
-
- m_SlogFile.Flush();
-
- std::span<const IoHash> ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketDir.string());
- std::vector<IoHash> DeleteCacheKeys;
- DeleteCacheKeys.reserve(ExpiredCacheKeys.size());
- GcCtx.FilterCids(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) {
- if (Keep)
- {
- return;
- }
- DeleteCacheKeys.push_back(ChunkHash);
- });
- if (DeleteCacheKeys.empty())
- {
- ZEN_DEBUG("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir / m_BucketName);
- return;
- }
-
- auto __ = MakeGuard([&]() {
- if (!DeletedChunks.empty())
- {
- // Clean up m_AccessTimes and m_Payloads vectors
- std::vector<BucketPayload> Payloads;
- std::vector<AccessTime> AccessTimes;
- IndexMap Index;
-
- {
- RwLock::ExclusiveLockScope _(m_IndexLock);
- Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- size_t EntryCount = m_Index.size();
- Payloads.reserve(EntryCount);
- AccessTimes.reserve(EntryCount);
- Index.reserve(EntryCount);
- for (auto It : m_Index)
- {
- size_t EntryIndex = Payloads.size();
- Payloads.push_back(m_Payloads[EntryIndex]);
- AccessTimes.push_back(m_AccessTimes[EntryIndex]);
- Index.insert({It.first, EntryIndex});
- }
- m_Index.swap(Index);
- m_Payloads.swap(Payloads);
- m_AccessTimes.swap(AccessTimes);
- }
- GcCtx.AddDeletedCids(std::vector<IoHash>(DeletedChunks.begin(), DeletedChunks.end()));
- }
- });
-
- std::vector<DiskIndexEntry> ExpiredStandaloneEntries;
- IndexMap Index;
- BlockStore::ReclaimSnapshotState BlockStoreState;
- {
- RwLock::SharedLockScope __(m_IndexLock);
- Stopwatch Timer;
- const auto ____ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- if (m_Index.empty())
- {
- ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName);
- return;
- }
- BlockStoreState = m_BlockStore.GetReclaimSnapshotState();
-
- SaveManifest();
- Index = m_Index;
-
- for (const IoHash& Key : DeleteCacheKeys)
- {
- if (auto It = Index.find(Key); It != Index.end())
- {
- const BucketPayload& Payload = m_Payloads[It->second];
- DiskIndexEntry Entry = {.Key = It->first, .Location = Payload.Location};
- if (Entry.Location.Flags & DiskLocation::kStandaloneFile)
- {
- Entry.Location.Flags |= DiskLocation::kTombStone;
- ExpiredStandaloneEntries.push_back(Entry);
- }
- }
- }
- if (GcCtx.IsDeletionMode())
- {
- for (const auto& Entry : ExpiredStandaloneEntries)
- {
- m_Index.erase(Entry.Key);
- m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
- DeletedChunks.insert(Entry.Key);
- }
- m_SlogFile.Append(ExpiredStandaloneEntries);
- }
- }
-
- if (GcCtx.IsDeletionMode())
- {
- std::error_code Ec;
- ExtendablePathBuilder<256> Path;
-
- for (const auto& Entry : ExpiredStandaloneEntries)
- {
- const IoHash& Key = Entry.Key;
- const DiskLocation& Loc = Entry.Location;
-
- Path.Reset();
- BuildPath(Path, Key);
- fs::path FilePath = Path.ToPath();
-
- {
- RwLock::SharedLockScope __(m_IndexLock);
- Stopwatch Timer;
- const auto ____ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- if (m_Index.contains(Key))
- {
- // Someone added it back, let the file on disk be
- ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8());
- continue;
- }
- __.ReleaseNow();
-
- RwLock::ExclusiveLockScope ValueLock(LockForHash(Key));
- if (fs::is_regular_file(FilePath))
- {
- ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8());
- fs::remove(FilePath, Ec);
- }
- }
-
- if (Ec)
- {
- ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message());
- Ec.clear();
- DiskLocation RestoreLocation = Loc;
- RestoreLocation.Flags &= ~DiskLocation::kTombStone;
-
- RwLock::ExclusiveLockScope __(m_IndexLock);
- Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
- if (m_Index.contains(Key))
- {
- continue;
- }
- m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation});
- size_t EntryIndex = m_Payloads.size();
- m_Payloads.emplace_back(BucketPayload{.Location = RestoreLocation});
- m_AccessTimes.emplace_back(GcClock::TickCount());
- m_Index.insert({Key, EntryIndex});
- m_TotalStandaloneSize.fetch_add(RestoreLocation.Size(), std::memory_order::relaxed);
- DeletedChunks.erase(Key);
- continue;
- }
- DeletedSize += Entry.Location.Size();
- }
- }
-
- TotalChunkCount = Index.size();
-
- std::vector<IoHash> TotalChunkHashes;
- TotalChunkHashes.reserve(TotalChunkCount);
- for (const auto& Entry : Index)
- {
- const DiskLocation& Location = m_Payloads[Entry.second].Location;
-
- if (Location.Flags & DiskLocation::kStandaloneFile)
- {
- continue;
- }
- TotalChunkHashes.push_back(Entry.first);
- }
-
- if (TotalChunkHashes.empty())
- {
- return;
- }
- TotalChunkCount = TotalChunkHashes.size();
-
- std::vector<BlockStoreLocation> ChunkLocations;
- BlockStore::ChunkIndexArray KeepChunkIndexes;
- std::vector<IoHash> ChunkIndexToChunkHash;
- ChunkLocations.reserve(TotalChunkCount);
- ChunkLocations.reserve(TotalChunkCount);
- ChunkIndexToChunkHash.reserve(TotalChunkCount);
-
- GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
- auto KeyIt = Index.find(ChunkHash);
- const DiskLocation& DiskLocation = m_Payloads[KeyIt->second].Location;
- BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment);
- size_t ChunkIndex = ChunkLocations.size();
- ChunkLocations.push_back(Location);
- ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
- if (Keep)
- {
- KeepChunkIndexes.push_back(ChunkIndex);
- }
- });
-
- size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size();
-
- const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
- if (!PerformDelete)
- {
- m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true);
- uint64_t CurrentTotalSize = TotalSize();
- ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} {}",
- m_BucketDir / m_BucketName,
- DeleteCount,
- TotalChunkCount,
- NiceBytes(CurrentTotalSize));
- return;
- }
-
- m_BlockStore.ReclaimSpace(
- BlockStoreState,
- ChunkLocations,
- KeepChunkIndexes,
- m_PayloadAlignment,
- false,
- [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) {
- std::vector<DiskIndexEntry> LogEntries;
- LogEntries.reserve(MovedChunks.size() + RemovedChunks.size());
- for (const auto& Entry : MovedChunks)
- {
- size_t ChunkIndex = Entry.first;
- const BlockStoreLocation& NewLocation = Entry.second;
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- const BucketPayload& OldPayload = m_Payloads[Index[ChunkHash]];
- const DiskLocation& OldDiskLocation = OldPayload.Location;
- LogEntries.push_back(
- {.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, OldDiskLocation.GetFlags())});
- }
- for (const size_t ChunkIndex : RemovedChunks)
- {
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- const BucketPayload& OldPayload = m_Payloads[Index[ChunkHash]];
- const DiskLocation& OldDiskLocation = OldPayload.Location;
- LogEntries.push_back({.Key = ChunkHash,
- .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_PayloadAlignment),
- m_PayloadAlignment,
- OldDiskLocation.GetFlags() | DiskLocation::kTombStone)});
- DeletedChunks.insert(ChunkHash);
- }
-
- m_SlogFile.Append(LogEntries);
- m_SlogFile.Flush();
- {
- RwLock::ExclusiveLockScope __(m_IndexLock);
- Stopwatch Timer;
- const auto ____ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
- for (const DiskIndexEntry& Entry : LogEntries)
- {
- if (Entry.Location.GetFlags() & DiskLocation::kTombStone)
- {
- m_Index.erase(Entry.Key);
- continue;
- }
- m_Payloads[m_Index[Entry.Key]].Location = Entry.Location;
- }
- }
- },
- [&]() { return GcCtx.CollectSmallObjects(); });
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes)
-{
- using namespace access_tracking;
-
- for (const KeyAccessTime& KeyTime : AccessTimes)
- {
- if (auto It = m_Index.find(KeyTime.Key); It != m_Index.end())
- {
- size_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
- m_AccessTimes[EntryIndex] = KeyTime.LastAccess;
- }
- }
-}
-
-uint64_t
-ZenCacheDiskLayer::CacheBucket::EntryCount() const
-{
- RwLock::SharedLockScope _(m_IndexLock);
- return static_cast<uint64_t>(m_Index.size());
-}
-
-CacheValueDetails::ValueDetails
-ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, size_t Index) const
-{
- std::vector<IoHash> Attachments;
- const BucketPayload& Payload = m_Payloads[Index];
- if (Payload.Location.IsFlagSet(DiskLocation::kStructured))
- {
- IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile) ? GetStandaloneCacheValue(Payload.Location, Key)
- : GetInlineCacheValue(Payload.Location);
- CbObject Obj(SharedBuffer{Value});
- Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); });
- }
- return CacheValueDetails::ValueDetails{.Size = Payload.Location.Size(),
- .RawSize = Payload.RawSize,
- .RawHash = Payload.RawHash,
- .LastAccess = m_AccessTimes[Index],
- .Attachments = std::move(Attachments),
- .ContentType = Payload.Location.GetContentType()};
-}
-
-CacheValueDetails::BucketDetails
-ZenCacheDiskLayer::CacheBucket::GetValueDetails(const std::string_view ValueFilter) const
-{
- CacheValueDetails::BucketDetails Details;
- RwLock::SharedLockScope _(m_IndexLock);
- if (ValueFilter.empty())
- {
- Details.Values.reserve(m_Index.size());
- for (const auto& It : m_Index)
- {
- Details.Values.insert_or_assign(It.first, GetValueDetails(It.first, It.second));
- }
- }
- else
- {
- IoHash Key = IoHash::FromHexString(ValueFilter);
- if (auto It = m_Index.find(Key); It != m_Index.end())
- {
- Details.Values.insert_or_assign(It->first, GetValueDetails(It->first, It->second));
- }
- }
- return Details;
-}
-
-void
-ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx)
-{
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto& Kv : m_Buckets)
- {
- CacheBucket& Bucket = *Kv.second;
- Bucket.CollectGarbage(GcCtx);
- }
-}
-
-void
-ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes)
-{
- RwLock::SharedLockScope _(m_Lock);
-
- for (const auto& Kv : AccessTimes.Buckets)
- {
- if (auto It = m_Buckets.find(Kv.first); It != m_Buckets.end())
- {
- CacheBucket& Bucket = *It->second;
- Bucket.UpdateAccessTimes(Kv.second);
- }
- }
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value)
-{
- uint64_t NewFileSize = Value.Value.Size();
-
- TemporaryFile DataFile;
-
- std::error_code Ec;
- DataFile.CreateTemporary(m_BucketDir.c_str(), Ec);
- if (Ec)
- {
- throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir));
- }
-
- bool CleanUpTempFile = false;
- auto __ = MakeGuard([&] {
- if (CleanUpTempFile)
- {
- std::error_code Ec;
- std::filesystem::remove(DataFile.GetPath(), Ec);
- if (Ec)
- {
- ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'",
- DataFile.GetPath(),
- m_BucketDir,
- Ec.message());
- }
- }
- });
-
- DataFile.WriteAll(Value.Value, Ec);
- if (Ec)
- {
- throw std::system_error(Ec,
- fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'",
- NiceBytes(NewFileSize),
- DataFile.GetPath().string(),
- m_BucketDir));
- }
-
- ExtendablePathBuilder<256> DataFilePath;
- BuildPath(DataFilePath, HashKey);
- std::filesystem::path FsPath{DataFilePath.ToPath()};
-
- RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey));
-
- // We do a speculative remove of the file instead of probing with a exists call and check the error code instead
- std::filesystem::remove(FsPath, Ec);
- if (Ec)
- {
- if (Ec.value() != ENOENT)
- {
- ZEN_WARN("Failed to remove file '{}' for put in '{}', reason: '{}', retrying.", FsPath, m_BucketDir, Ec.message());
- Sleep(100);
- Ec.clear();
- std::filesystem::remove(FsPath, Ec);
- if (Ec && Ec.value() != ENOENT)
- {
- throw std::system_error(Ec, fmt::format("Failed to remove file '{}' for put in '{}'", FsPath, m_BucketDir));
- }
- }
- }
-
- DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
- if (Ec)
- {
- CreateDirectories(FsPath.parent_path());
- Ec.clear();
-
- // Try again
- DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
- if (Ec)
- {
- ZEN_WARN("Failed to finalize file '{}', moving from '{}' for put in '{}', reason: '{}', retrying.",
- FsPath,
- DataFile.GetPath(),
- m_BucketDir,
- Ec.message());
- Sleep(100);
- Ec.clear();
- DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
- if (Ec)
- {
- throw std::system_error(
- Ec,
- fmt::format("Failed to finalize file '{}', moving from '{}' for put in '{}'", FsPath, DataFile.GetPath(), m_BucketDir));
- }
- }
- }
-
- // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file
- // will be disabled as the file handle has already been closed
- CleanUpTempFile = false;
-
- uint8_t EntryFlags = DiskLocation::kStandaloneFile;
-
- if (Value.Value.GetContentType() == ZenContentType::kCbObject)
- {
- EntryFlags |= DiskLocation::kStructured;
- }
- else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
- {
- EntryFlags |= DiskLocation::kCompressed;
- }
-
- DiskLocation Loc(NewFileSize, EntryFlags);
-
- RwLock::ExclusiveLockScope _(m_IndexLock);
- if (auto It = m_Index.find(HashKey); It == m_Index.end())
- {
- // Previously unknown object
- size_t EntryIndex = m_Payloads.size();
- m_Payloads.emplace_back(BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
- m_AccessTimes.emplace_back(GcClock::TickCount());
- m_Index.insert_or_assign(HashKey, EntryIndex);
- }
- else
- {
- // TODO: should check if write is idempotent and bail out if it is?
- size_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
- m_Payloads[EntryIndex] = BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash};
- m_AccessTimes.emplace_back(GcClock::TickCount());
- m_TotalStandaloneSize.fetch_sub(Loc.Size(), std::memory_order::relaxed);
- }
-
- m_SlogFile.Append({.Key = HashKey, .Location = Loc});
- m_TotalStandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed);
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value)
-{
- uint8_t EntryFlags = 0;
-
- if (Value.Value.GetContentType() == ZenContentType::kCbObject)
- {
- EntryFlags |= DiskLocation::kStructured;
- }
- else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
- {
- EntryFlags |= DiskLocation::kCompressed;
- }
-
- m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) {
- DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags);
- m_SlogFile.Append({.Key = HashKey, .Location = Location});
-
- RwLock::ExclusiveLockScope _(m_IndexLock);
- if (auto It = m_Index.find(HashKey); It != m_Index.end())
- {
- // TODO: should check if write is idempotent and bail out if it is?
- // this would requiring comparing contents on disk unless we add a
- // content hash to the index entry
- size_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
- m_Payloads[EntryIndex] = (BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
- m_AccessTimes[EntryIndex] = GcClock::TickCount();
- }
- else
- {
- size_t EntryIndex = m_Payloads.size();
- m_Payloads.emplace_back(BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
- m_AccessTimes.emplace_back(GcClock::TickCount());
- m_Index.insert_or_assign(HashKey, EntryIndex);
- }
- });
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir) : m_RootDir(RootDir)
-{
-}
-
-ZenCacheDiskLayer::~ZenCacheDiskLayer() = default;
-
-bool
-ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
-{
- const auto BucketName = std::string(InBucket);
- CacheBucket* Bucket = nullptr;
-
- {
- RwLock::SharedLockScope _(m_Lock);
-
- auto It = m_Buckets.find(BucketName);
-
- if (It != m_Buckets.end())
- {
- Bucket = It->second.get();
- }
- }
-
- if (Bucket == nullptr)
- {
- // Bucket needs to be opened/created
-
- RwLock::ExclusiveLockScope _(m_Lock);
-
- if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
- {
- Bucket = It->second.get();
- }
- else
- {
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName));
- Bucket = InsertResult.first->second.get();
-
- std::filesystem::path BucketPath = m_RootDir;
- BucketPath /= BucketName;
-
- if (!Bucket->OpenOrCreate(BucketPath))
- {
- m_Buckets.erase(InsertResult.first);
- return false;
- }
- }
- }
-
- ZEN_ASSERT(Bucket != nullptr);
- return Bucket->Get(HashKey, OutValue);
-}
-
-void
-ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value)
-{
- const auto BucketName = std::string(InBucket);
- CacheBucket* Bucket = nullptr;
-
- {
- RwLock::SharedLockScope _(m_Lock);
-
- auto It = m_Buckets.find(BucketName);
-
- if (It != m_Buckets.end())
- {
- Bucket = It->second.get();
- }
- }
-
- if (Bucket == nullptr)
- {
- // New bucket needs to be created
-
- RwLock::ExclusiveLockScope _(m_Lock);
-
- if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
- {
- Bucket = It->second.get();
- }
- else
- {
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName));
- Bucket = InsertResult.first->second.get();
-
- std::filesystem::path BucketPath = m_RootDir;
- BucketPath /= BucketName;
-
- try
- {
- if (!Bucket->OpenOrCreate(BucketPath))
- {
- ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
- m_Buckets.erase(InsertResult.first);
- return;
- }
- }
- catch (const std::exception& Err)
- {
- ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
- return;
- }
- }
- }
-
- ZEN_ASSERT(Bucket != nullptr);
-
- Bucket->Put(HashKey, Value);
-}
-
-void
-ZenCacheDiskLayer::DiscoverBuckets()
-{
- DirectoryContent DirContent;
- GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent);
-
- // Initialize buckets
-
- RwLock::ExclusiveLockScope _(m_Lock);
-
- for (const std::filesystem::path& BucketPath : DirContent.Directories)
- {
- const std::string BucketName = PathToUtf8(BucketPath.stem());
- // New bucket needs to be created
- if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
- {
- continue;
- }
-
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName));
- CacheBucket& Bucket = *InsertResult.first->second;
-
- try
- {
- if (!Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false))
- {
- ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
-
- m_Buckets.erase(InsertResult.first);
- continue;
- }
- }
- catch (const std::exception& Err)
- {
- ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
- return;
- }
- ZEN_INFO("Discovered bucket '{}'", BucketName);
- }
-}
-
-bool
-ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
-{
- RwLock::ExclusiveLockScope _(m_Lock);
-
- auto It = m_Buckets.find(std::string(InBucket));
-
- if (It != m_Buckets.end())
- {
- CacheBucket& Bucket = *It->second;
- m_DroppedBuckets.push_back(std::move(It->second));
- m_Buckets.erase(It);
-
- return Bucket.Drop();
- }
-
- // Make sure we remove the folder even if we don't know about the bucket
- std::filesystem::path BucketPath = m_RootDir;
- BucketPath /= std::string(InBucket);
- return MoveAndDeleteDirectory(BucketPath);
-}
-
-bool
-ZenCacheDiskLayer::Drop()
-{
- RwLock::ExclusiveLockScope _(m_Lock);
-
- std::vector<std::unique_ptr<CacheBucket>> Buckets;
- Buckets.reserve(m_Buckets.size());
- while (!m_Buckets.empty())
- {
- const auto& It = m_Buckets.begin();
- CacheBucket& Bucket = *It->second;
- m_DroppedBuckets.push_back(std::move(It->second));
- m_Buckets.erase(It->first);
- if (!Bucket.Drop())
- {
- return false;
- }
- }
- return MoveAndDeleteDirectory(m_RootDir);
-}
-
-void
-ZenCacheDiskLayer::Flush()
-{
- std::vector<CacheBucket*> Buckets;
-
- {
- RwLock::SharedLockScope _(m_Lock);
- Buckets.reserve(m_Buckets.size());
- for (auto& Kv : m_Buckets)
- {
- CacheBucket* Bucket = Kv.second.get();
- Buckets.push_back(Bucket);
- }
- }
-
- for (auto& Bucket : Buckets)
- {
- Bucket->Flush();
- }
-}
-
-void
-ZenCacheDiskLayer::Scrub(ScrubContext& Ctx)
-{
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto& Kv : m_Buckets)
- {
- CacheBucket& Bucket = *Kv.second;
- Bucket.Scrub(Ctx);
- }
-}
-
-void
-ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx)
-{
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto& Kv : m_Buckets)
- {
- CacheBucket& Bucket = *Kv.second;
- Bucket.GatherReferences(GcCtx);
- }
-}
-
-uint64_t
-ZenCacheDiskLayer::TotalSize() const
-{
- uint64_t TotalSize{};
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto& Kv : m_Buckets)
- {
- TotalSize += Kv.second->TotalSize();
- }
-
- return TotalSize;
-}
-
-ZenCacheDiskLayer::Info
-ZenCacheDiskLayer::GetInfo() const
-{
- ZenCacheDiskLayer::Info Info = {.Config = {.RootDir = m_RootDir}, .TotalSize = TotalSize()};
-
- RwLock::SharedLockScope _(m_Lock);
- Info.BucketNames.reserve(m_Buckets.size());
- for (auto& Kv : m_Buckets)
- {
- Info.BucketNames.push_back(Kv.first);
- Info.EntryCount += Kv.second->EntryCount();
- }
- return Info;
-}
-
-std::optional<ZenCacheDiskLayer::BucketInfo>
-ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const
-{
- RwLock::SharedLockScope _(m_Lock);
-
- if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end())
- {
- return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()};
- }
- return {};
-}
-
-CacheValueDetails::NamespaceDetails
-ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const
-{
- RwLock::SharedLockScope _(m_Lock);
- CacheValueDetails::NamespaceDetails Details;
- if (BucketFilter.empty())
- {
- Details.Buckets.reserve(BucketFilter.empty() ? m_Buckets.size() : 1);
- for (auto& Kv : m_Buckets)
- {
- Details.Buckets[Kv.first] = Kv.second->GetValueDetails(ValueFilter);
- }
- }
- else if (auto It = m_Buckets.find(std::string(BucketFilter)); It != m_Buckets.end())
- {
- Details.Buckets[It->first] = It->second->GetValueDetails(ValueFilter);
- }
- return Details;
-}
-
-//////////////////////////// ZenCacheStore
-
-static constexpr std::string_view UE4DDCNamespaceName = "ue4.ddc";
-
-ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration) : m_Gc(Gc), m_Configuration(Configuration)
-{
- CreateDirectories(m_Configuration.BasePath);
-
- DirectoryContent DirContent;
- GetDirectoryContent(m_Configuration.BasePath, DirectoryContent::IncludeDirsFlag, DirContent);
-
- std::vector<std::string> Namespaces;
- for (const std::filesystem::path& DirPath : DirContent.Directories)
- {
- std::string DirName = PathToUtf8(DirPath.filename());
- if (DirName.starts_with(NamespaceDiskPrefix))
- {
- Namespaces.push_back(DirName.substr(NamespaceDiskPrefix.length()));
- continue;
- }
- }
-
- ZEN_INFO("Found {} namespaces in '{}'", Namespaces.size(), m_Configuration.BasePath);
-
- if (std::find(Namespaces.begin(), Namespaces.end(), UE4DDCNamespaceName) == Namespaces.end())
- {
- // default (unspecified) and ue4-ddc namespace points to the same namespace instance
-
- std::filesystem::path DefaultNamespaceFolder =
- m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName);
- CreateDirectories(DefaultNamespaceFolder);
- Namespaces.push_back(std::string(UE4DDCNamespaceName));
- }
-
- for (const std::string& NamespaceName : Namespaces)
- {
- m_Namespaces[NamespaceName] =
- std::make_unique<ZenCacheNamespace>(Gc, m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName));
- }
-}
-
-ZenCacheStore::~ZenCacheStore()
-{
- m_Namespaces.clear();
-}
-
-bool
-ZenCacheStore::Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue)
-{
- if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
- {
- return Store->Get(Bucket, HashKey, OutValue);
- }
- ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Get, bucket '{}', key '{}'", Namespace, Bucket, HashKey.ToHexString());
-
- return false;
-}
-
-void
-ZenCacheStore::Put(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value)
-{
- if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
- {
- return Store->Put(Bucket, HashKey, Value);
- }
- ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put, bucket '{}', key '{}'", Namespace, Bucket, HashKey.ToHexString());
-}
-
-bool
-ZenCacheStore::DropBucket(std::string_view Namespace, std::string_view Bucket)
-{
- if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
- {
- return Store->DropBucket(Bucket);
- }
- ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropBucket, bucket '{}'", Namespace, Bucket);
- return false;
-}
-
-bool
-ZenCacheStore::DropNamespace(std::string_view InNamespace)
-{
- RwLock::SharedLockScope _(m_NamespacesLock);
- if (auto It = m_Namespaces.find(std::string(InNamespace)); It != m_Namespaces.end())
- {
- ZenCacheNamespace& Namespace = *It->second;
- m_DroppedNamespaces.push_back(std::move(It->second));
- m_Namespaces.erase(It);
- return Namespace.Drop();
- }
- ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropNamespace", InNamespace);
- return false;
-}
-
-void
-ZenCacheStore::Flush()
-{
- IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Flush(); });
-}
-
-void
-ZenCacheStore::Scrub(ScrubContext& Ctx)
-{
- IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Scrub(Ctx); });
-}
-
-CacheValueDetails
-ZenCacheStore::GetValueDetails(const std::string_view NamespaceFilter,
- const std::string_view BucketFilter,
- const std::string_view ValueFilter) const
-{
- CacheValueDetails Details;
- if (NamespaceFilter.empty())
- {
- IterateNamespaces([&](std::string_view Namespace, ZenCacheNamespace& Store) {
- Details.Namespaces[std::string(Namespace)] = Store.GetValueDetails(BucketFilter, ValueFilter);
- });
- }
- else if (const ZenCacheNamespace* Store = FindNamespace(NamespaceFilter); Store != nullptr)
- {
- Details.Namespaces[std::string(NamespaceFilter)] = Store->GetValueDetails(BucketFilter, ValueFilter);
- }
- return Details;
-}
-
-ZenCacheNamespace*
-ZenCacheStore::GetNamespace(std::string_view Namespace)
-{
- RwLock::SharedLockScope _(m_NamespacesLock);
- if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end())
- {
- return It->second.get();
- }
- if (Namespace == DefaultNamespace)
- {
- if (auto It = m_Namespaces.find(std::string(UE4DDCNamespaceName)); It != m_Namespaces.end())
- {
- return It->second.get();
- }
- }
- _.ReleaseNow();
-
- if (!m_Configuration.AllowAutomaticCreationOfNamespaces)
- {
- return nullptr;
- }
-
- RwLock::ExclusiveLockScope __(m_NamespacesLock);
- if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end())
- {
- return It->second.get();
- }
-
- auto NewNamespace = m_Namespaces.insert_or_assign(
- std::string(Namespace),
- std::make_unique<ZenCacheNamespace>(m_Gc, m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, Namespace)));
- return NewNamespace.first->second.get();
-}
-
-const ZenCacheNamespace*
-ZenCacheStore::FindNamespace(std::string_view Namespace) const
-{
- RwLock::SharedLockScope _(m_NamespacesLock);
- if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end())
- {
- return It->second.get();
- }
- if (Namespace == DefaultNamespace)
- {
- if (auto It = m_Namespaces.find(std::string(UE4DDCNamespaceName)); It != m_Namespaces.end())
- {
- return It->second.get();
- }
- }
- return nullptr;
-}
-
-void
-ZenCacheStore::IterateNamespaces(const std::function<void(std::string_view Namespace, ZenCacheNamespace& Store)>& Callback) const
-{
- std::vector<std::pair<std::string, ZenCacheNamespace&>> Namespaces;
- {
- RwLock::SharedLockScope _(m_NamespacesLock);
- Namespaces.reserve(m_Namespaces.size());
- for (const auto& Entry : m_Namespaces)
- {
- if (Entry.first == DefaultNamespace)
- {
- continue;
- }
- Namespaces.push_back({Entry.first, *Entry.second});
- }
- }
- for (auto& Entry : Namespaces)
- {
- Callback(Entry.first, Entry.second);
- }
-}
-
-GcStorageSize
-ZenCacheStore::StorageSize() const
-{
- GcStorageSize Size;
- IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) {
- GcStorageSize StoreSize = Store.StorageSize();
- Size.MemorySize += StoreSize.MemorySize;
- Size.DiskSize += StoreSize.DiskSize;
- });
- return Size;
-}
-
-ZenCacheStore::Info
-ZenCacheStore::GetInfo() const
-{
- ZenCacheStore::Info Info = {.Config = m_Configuration, .StorageSize = StorageSize()};
-
- IterateNamespaces([&Info](std::string_view NamespaceName, ZenCacheNamespace& Namespace) {
- Info.NamespaceNames.push_back(std::string(NamespaceName));
- ZenCacheNamespace::Info NamespaceInfo = Namespace.GetInfo();
- Info.DiskEntryCount += NamespaceInfo.DiskLayerInfo.EntryCount;
- Info.MemoryEntryCount += NamespaceInfo.MemoryLayerInfo.EntryCount;
- });
-
- return Info;
-}
-
-std::optional<ZenCacheNamespace::Info>
-ZenCacheStore::GetNamespaceInfo(std::string_view NamespaceName)
-{
- if (const ZenCacheNamespace* Namespace = FindNamespace(NamespaceName); Namespace)
- {
- return Namespace->GetInfo();
- }
- return {};
-}
-
-std::optional<ZenCacheNamespace::BucketInfo>
-ZenCacheStore::GetBucketInfo(std::string_view NamespaceName, std::string_view BucketName)
-{
- if (const ZenCacheNamespace* Namespace = FindNamespace(NamespaceName); Namespace)
- {
- return Namespace->GetBucketInfo(BucketName);
- }
- return {};
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-#if ZEN_WITH_TESTS
-
-using namespace std::literals;
-
-namespace testutils {
- IoHash CreateKey(size_t KeyValue) { return IoHash::HashBuffer(&KeyValue, sizeof(size_t)); }
-
- IoBuffer CreateBinaryCacheValue(uint64_t Size)
- {
- static std::random_device rd;
- static std::mt19937 g(rd());
-
- std::vector<uint8_t> Values;
- Values.resize(Size);
- for (size_t Idx = 0; Idx < Size; ++Idx)
- {
- Values[Idx] = static_cast<uint8_t>(Idx);
- }
- std::shuffle(Values.begin(), Values.end(), g);
-
- IoBuffer Buf(IoBuffer::Clone, Values.data(), Values.size());
- Buf.SetContentType(ZenContentType::kBinary);
- return Buf;
- };
-
-} // namespace testutils
-
-TEST_CASE("z$.store")
-{
- ScopedTemporaryDirectory TempDir;
-
- GcManager Gc;
-
- ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
-
- const int kIterationCount = 100;
-
- for (int i = 0; i < kIterationCount; ++i)
- {
- const IoHash Key = IoHash::HashBuffer(&i, sizeof i);
-
- CbObjectWriter Cbo;
- Cbo << "hey" << i;
- CbObject Obj = Cbo.Save();
-
- ZenCacheValue Value;
- Value.Value = Obj.GetBuffer().AsIoBuffer();
- Value.Value.SetContentType(ZenContentType::kCbObject);
-
- Zcs.Put("test_bucket"sv, Key, Value);
- }
-
- for (int i = 0; i < kIterationCount; ++i)
- {
- const IoHash Key = IoHash::HashBuffer(&i, sizeof i);
-
- ZenCacheValue Value;
- Zcs.Get("test_bucket"sv, Key, /* out */ Value);
-
- REQUIRE(Value.Value);
- CHECK(Value.Value.GetContentType() == ZenContentType::kCbObject);
- CHECK_EQ(ValidateCompactBinary(Value.Value, CbValidateMode::All), CbValidateError::None);
- CbObject Obj = LoadCompactBinaryObject(Value.Value);
- CHECK_EQ(Obj["hey"].AsInt32(), i);
- }
-}
-
-TEST_CASE("z$.size")
-{
- const auto CreateCacheValue = [](size_t Size) -> CbObject {
- std::vector<uint8_t> Buf;
- Buf.resize(Size);
-
- CbObjectWriter Writer;
- Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
- return Writer.Save();
- };
-
- SUBCASE("mem/disklayer")
- {
- const size_t Count = 16;
- ScopedTemporaryDirectory TempDir;
-
- GcStorageSize CacheSize;
-
- {
- GcManager Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
-
- CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() - 256);
-
- IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
- Buffer.SetContentType(ZenContentType::kCbObject);
-
- for (size_t Key = 0; Key < Count; ++Key)
- {
- const size_t Bucket = Key % 4;
- Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), ZenCacheValue{.Value = Buffer});
- }
-
- CacheSize = Zcs.StorageSize();
- CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
- CHECK_LE(CacheValue.GetSize() * Count, CacheSize.MemorySize);
- }
-
- {
- GcManager Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
-
- const GcStorageSize SerializedSize = Zcs.StorageSize();
- CHECK_EQ(SerializedSize.MemorySize, 0);
- CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize);
-
- for (size_t Bucket = 0; Bucket < 4; ++Bucket)
- {
- Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket));
- }
- CHECK_EQ(0, Zcs.StorageSize().DiskSize);
- }
- }
-
- SUBCASE("disklayer")
- {
- const size_t Count = 16;
- ScopedTemporaryDirectory TempDir;
-
- GcStorageSize CacheSize;
-
- {
- GcManager Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
-
- CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() + 64);
-
- IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
- Buffer.SetContentType(ZenContentType::kCbObject);
-
- for (size_t Key = 0; Key < Count; ++Key)
- {
- const size_t Bucket = Key % 4;
- Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer});
- }
-
- CacheSize = Zcs.StorageSize();
- CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
- CHECK_EQ(0, CacheSize.MemorySize);
- }
-
- {
- GcManager Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
-
- const GcStorageSize SerializedSize = Zcs.StorageSize();
- CHECK_EQ(SerializedSize.MemorySize, 0);
- CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize);
-
- for (size_t Bucket = 0; Bucket < 4; ++Bucket)
- {
- Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket));
- }
- CHECK_EQ(0, Zcs.StorageSize().DiskSize);
- }
- }
-}
-
-TEST_CASE("z$.gc")
-{
- using namespace testutils;
-
- SUBCASE("gather references does NOT add references for expired cache entries")
- {
- ScopedTemporaryDirectory TempDir;
- std::vector<IoHash> Cids{CreateKey(1), CreateKey(2), CreateKey(3)};
-
- const auto CollectAndFilter = [](GcManager& Gc,
- GcClock::TimePoint Time,
- GcClock::Duration MaxDuration,
- std::span<const IoHash> Cids,
- std::vector<IoHash>& OutKeep) {
- GcContext GcCtx(Time - MaxDuration);
- Gc.CollectGarbage(GcCtx);
- OutKeep.clear();
- GcCtx.FilterCids(Cids, [&OutKeep](const IoHash& Hash) { OutKeep.push_back(Hash); });
- };
-
- {
- GcManager Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
- const auto Bucket = "teardrinker"sv;
-
- // Create a cache record
- const IoHash Key = CreateKey(42);
- CbObjectWriter Record;
- Record << "Key"sv
- << "SomeRecord"sv;
-
- for (size_t Idx = 0; auto& Cid : Cids)
- {
- Record.AddBinaryAttachment(fmt::format("attachment-{}", Idx++), Cid);
- }
-
- IoBuffer Buffer = Record.Save().GetBuffer().AsIoBuffer();
- Buffer.SetContentType(ZenContentType::kCbObject);
-
- Zcs.Put(Bucket, Key, {.Value = Buffer});
-
- std::vector<IoHash> Keep;
-
- // Collect garbage with 1 hour max cache duration
- {
- CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep);
- CHECK_EQ(Cids.size(), Keep.size());
- }
-
- // Move forward in time
- {
- CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep);
- CHECK_EQ(0, Keep.size());
- }
- }
-
- // Expect timestamps to be serialized
- {
- GcManager Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
- std::vector<IoHash> Keep;
-
- // Collect garbage with 1 hour max cache duration
- {
- CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep);
- CHECK_EQ(3, Keep.size());
- }
-
- // Move forward in time
- {
- CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep);
- CHECK_EQ(0, Keep.size());
- }
- }
- }
-
- SUBCASE("gc removes standalone values")
- {
- ScopedTemporaryDirectory TempDir;
- GcManager Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
- const auto Bucket = "fortysixandtwo"sv;
- const GcClock::TimePoint CurrentTime = GcClock::Now();
-
- std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)};
-
- for (const auto& Key : Keys)
- {
- IoBuffer Value = testutils::CreateBinaryCacheValue(128 << 10);
- Zcs.Put(Bucket, Key, {.Value = Value});
- }
-
- {
- GcContext GcCtx(CurrentTime - std::chrono::hours(46));
-
- Gc.CollectGarbage(GcCtx);
-
- for (const auto& Key : Keys)
- {
- ZenCacheValue CacheValue;
- const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
- CHECK(Exists);
- }
- }
-
- // Move forward in time and collect again
- {
- GcContext GcCtx(CurrentTime + std::chrono::minutes(2));
- Gc.CollectGarbage(GcCtx);
-
- for (const auto& Key : Keys)
- {
- ZenCacheValue CacheValue;
- const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
- CHECK(!Exists);
- }
-
- CHECK_EQ(0, Zcs.StorageSize().DiskSize);
- }
- }
-
- SUBCASE("gc removes small objects")
- {
- ScopedTemporaryDirectory TempDir;
- GcManager Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
- const auto Bucket = "rightintwo"sv;
-
- std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)};
-
- for (const auto& Key : Keys)
- {
- IoBuffer Value = testutils::CreateBinaryCacheValue(128);
- Zcs.Put(Bucket, Key, {.Value = Value});
- }
-
- {
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(2));
- GcCtx.CollectSmallObjects(true);
-
- Gc.CollectGarbage(GcCtx);
-
- for (const auto& Key : Keys)
- {
- ZenCacheValue CacheValue;
- const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
- CHECK(Exists);
- }
- }
-
- // Move forward in time and collect again
- {
- GcContext GcCtx(GcClock::Now() + std::chrono::minutes(2));
- GcCtx.CollectSmallObjects(true);
-
- Zcs.Flush();
- Gc.CollectGarbage(GcCtx);
-
- for (const auto& Key : Keys)
- {
- ZenCacheValue CacheValue;
- const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
- CHECK(!Exists);
- }
-
- CHECK_EQ(0, Zcs.StorageSize().DiskSize);
- }
- }
-}
-
-TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
-{
- // for (uint32_t i = 0; i < 100; ++i)
- {
- ScopedTemporaryDirectory TempDir;
-
- const uint64_t kChunkSize = 1048;
- const int32_t kChunkCount = 8192;
-
- struct Chunk
- {
- std::string Bucket;
- IoBuffer Buffer;
- };
- std::unordered_map<IoHash, Chunk, IoHash::Hasher> Chunks;
- Chunks.reserve(kChunkCount);
-
- const std::string Bucket1 = "rightinone";
- const std::string Bucket2 = "rightintwo";
-
- for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
- {
- while (true)
- {
- IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
- IoHash Hash = HashBuffer(Chunk);
- if (Chunks.contains(Hash))
- {
- continue;
- }
- Chunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk};
- break;
- }
- while (true)
- {
- IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
- IoHash Hash = HashBuffer(Chunk);
- if (Chunks.contains(Hash))
- {
- continue;
- }
- Chunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk};
- break;
- }
- }
-
- CreateDirectories(TempDir.Path());
-
- WorkerThreadPool ThreadPool(4);
- GcManager Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path());
-
- {
- std::atomic<size_t> WorkCompleted = 0;
- for (const auto& Chunk : Chunks)
- {
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
- Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer});
- WorkCompleted.fetch_add(1);
- });
- }
- while (WorkCompleted < Chunks.size())
- {
- Sleep(1);
- }
- }
-
- const uint64_t TotalSize = Zcs.StorageSize().DiskSize;
- CHECK_LE(kChunkSize * Chunks.size(), TotalSize);
-
- {
- std::atomic<size_t> WorkCompleted = 0;
- for (const auto& Chunk : Chunks)
- {
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
- std::string Bucket = Chunk.second.Bucket;
- IoHash ChunkHash = Chunk.first;
- ZenCacheValue CacheValue;
-
- CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue));
- IoHash Hash = IoHash::HashBuffer(CacheValue.Value);
- CHECK(ChunkHash == Hash);
- WorkCompleted.fetch_add(1);
- });
- }
- while (WorkCompleted < Chunks.size())
- {
- Sleep(1);
- }
- }
- std::unordered_map<IoHash, std::string, IoHash::Hasher> GcChunkHashes;
- GcChunkHashes.reserve(Chunks.size());
- for (const auto& Chunk : Chunks)
- {
- GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
- }
- {
- std::unordered_map<IoHash, Chunk, IoHash::Hasher> NewChunks;
-
- for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
- {
- {
- IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
- IoHash Hash = HashBuffer(Chunk);
- NewChunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk};
- }
- {
- IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize);
- IoHash Hash = HashBuffer(Chunk);
- NewChunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk};
- }
- }
-
- std::atomic<size_t> WorkCompleted = 0;
- std::atomic_uint32_t AddedChunkCount = 0;
- for (const auto& Chunk : NewChunks)
- {
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() {
- Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer});
- AddedChunkCount.fetch_add(1);
- WorkCompleted.fetch_add(1);
- });
- }
-
- for (const auto& Chunk : Chunks)
- {
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
- ZenCacheValue CacheValue;
- if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
- {
- CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
- }
- WorkCompleted.fetch_add(1);
- });
- }
- while (AddedChunkCount.load() < NewChunks.size())
- {
- // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
- for (const auto& Chunk : NewChunks)
- {
- ZenCacheValue CacheValue;
- if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
- {
- GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
- }
- }
- std::vector<IoHash> KeepHashes;
- KeepHashes.reserve(GcChunkHashes.size());
- for (const auto& Entry : GcChunkHashes)
- {
- KeepHashes.push_back(Entry.first);
- }
- size_t C = 0;
- while (C < KeepHashes.size())
- {
- if (C % 155 == 0)
- {
- if (C < KeepHashes.size() - 1)
- {
- KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
- KeepHashes.pop_back();
- }
- if (C + 3 < KeepHashes.size() - 1)
- {
- KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
- KeepHashes.pop_back();
- }
- }
- C++;
- }
-
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
- GcCtx.AddRetainedCids(KeepHashes);
- Zcs.CollectGarbage(GcCtx);
- const HashKeySet& Deleted = GcCtx.DeletedCids();
- Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
- }
-
- while (WorkCompleted < NewChunks.size() + Chunks.size())
- {
- Sleep(1);
- }
-
- {
- // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
- for (const auto& Chunk : NewChunks)
- {
- ZenCacheValue CacheValue;
- if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue))
- {
- GcChunkHashes[Chunk.first] = Chunk.second.Bucket;
- }
- }
- std::vector<IoHash> KeepHashes;
- KeepHashes.reserve(GcChunkHashes.size());
- for (const auto& Entry : GcChunkHashes)
- {
- KeepHashes.push_back(Entry.first);
- }
- size_t C = 0;
- while (C < KeepHashes.size())
- {
- if (C % 155 == 0)
- {
- if (C < KeepHashes.size() - 1)
- {
- KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
- KeepHashes.pop_back();
- }
- if (C + 3 < KeepHashes.size() - 1)
- {
- KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
- KeepHashes.pop_back();
- }
- }
- C++;
- }
-
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- GcCtx.CollectSmallObjects(true);
- GcCtx.AddRetainedCids(KeepHashes);
- Zcs.CollectGarbage(GcCtx);
- const HashKeySet& Deleted = GcCtx.DeletedCids();
- Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
- }
- }
- {
- std::atomic<size_t> WorkCompleted = 0;
- for (const auto& Chunk : GcChunkHashes)
- {
- ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() {
- ZenCacheValue CacheValue;
- CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue));
- CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value));
- WorkCompleted.fetch_add(1);
- });
- }
- while (WorkCompleted < GcChunkHashes.size())
- {
- Sleep(1);
- }
- }
- }
-}
-
-TEST_CASE("z$.namespaces")
-{
- using namespace testutils;
-
- const auto CreateCacheValue = [](size_t Size) -> CbObject {
- std::vector<uint8_t> Buf;
- Buf.resize(Size);
-
- CbObjectWriter Writer;
- Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
- return Writer.Save();
- };
-
- ScopedTemporaryDirectory TempDir;
- CreateDirectories(TempDir.Path());
-
- IoHash Key1;
- IoHash Key2;
- {
- GcManager Gc;
- ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = false});
- const auto Bucket = "teardrinker"sv;
- const auto CustomNamespace = "mynamespace"sv;
-
- // Create a cache record
- Key1 = CreateKey(42);
- CbObject CacheValue = CreateCacheValue(4096);
-
- IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
- Buffer.SetContentType(ZenContentType::kCbObject);
-
- ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue);
-
- ZenCacheValue GetValue;
- CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue));
- CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue));
-
- // This should just be dropped as we don't allow creating of namespaces on the fly
- Zcs.Put(CustomNamespace, Bucket, Key1, PutValue);
- CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue));
- }
-
- {
- GcManager Gc;
- ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true});
- const auto Bucket = "teardrinker"sv;
- const auto CustomNamespace = "mynamespace"sv;
-
- Key2 = CreateKey(43);
- CbObject CacheValue2 = CreateCacheValue(4096);
-
- IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer();
- Buffer2.SetContentType(ZenContentType::kCbObject);
- ZenCacheValue PutValue2 = {.Value = Buffer2};
- Zcs.Put(CustomNamespace, Bucket, Key2, PutValue2);
-
- ZenCacheValue GetValue;
- CHECK(!Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue));
- CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue));
- CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue));
- CHECK(Zcs.Get(CustomNamespace, Bucket, Key2, GetValue));
- }
-}
-
-TEST_CASE("z$.drop.bucket")
-{
- using namespace testutils;
-
- const auto CreateCacheValue = [](size_t Size) -> CbObject {
- std::vector<uint8_t> Buf;
- Buf.resize(Size);
-
- CbObjectWriter Writer;
- Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
- return Writer.Save();
- };
-
- ScopedTemporaryDirectory TempDir;
- CreateDirectories(TempDir.Path());
-
- IoHash Key1;
- IoHash Key2;
-
- auto PutValue =
- [&CreateCacheValue](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) {
- // Create a cache record
- IoHash Key = CreateKey(KeyIndex);
- CbObject CacheValue = CreateCacheValue(Size);
-
- IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
- Buffer.SetContentType(ZenContentType::kCbObject);
-
- ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(Namespace, Bucket, Key, PutValue);
- return Key;
- };
- auto GetValue = [](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
- ZenCacheValue GetValue;
- Zcs.Get(Namespace, Bucket, Key, GetValue);
- return GetValue;
- };
- WorkerThreadPool Workers(1);
- {
- GcManager Gc;
- ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true});
- const auto Bucket = "teardrinker"sv;
- const auto Namespace = "mynamespace"sv;
-
- Key1 = PutValue(Zcs, Namespace, Bucket, 42, 4096);
- Key2 = PutValue(Zcs, Namespace, Bucket, 43, 2048);
-
- ZenCacheValue Value1 = GetValue(Zcs, Namespace, Bucket, Key1);
- CHECK(Value1.Value);
-
- std::atomic_bool WorkComplete = false;
- Workers.ScheduleWork([&]() {
- zen::Sleep(100);
- Value1.Value = IoBuffer{};
- WorkComplete = true;
- });
- // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket
- // Our DropBucket execution blocks any incoming request from completing until we are done with the drop
- CHECK(Zcs.DropBucket(Namespace, Bucket));
- while (!WorkComplete)
- {
- zen::Sleep(1);
- }
-
- // Entire bucket should be dropped, but doing a request should will re-create the namespace but it must still be empty
- Value1 = GetValue(Zcs, Namespace, Bucket, Key1);
- CHECK(!Value1.Value);
- ZenCacheValue Value2 = GetValue(Zcs, Namespace, Bucket, Key2);
- CHECK(!Value2.Value);
- }
-}
-
-TEST_CASE("z$.drop.namespace")
-{
- using namespace testutils;
-
- const auto CreateCacheValue = [](size_t Size) -> CbObject {
- std::vector<uint8_t> Buf;
- Buf.resize(Size);
-
- CbObjectWriter Writer;
- Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
- return Writer.Save();
- };
-
- ScopedTemporaryDirectory TempDir;
- CreateDirectories(TempDir.Path());
-
- auto PutValue =
- [&CreateCacheValue](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) {
- // Create a cache record
- IoHash Key = CreateKey(KeyIndex);
- CbObject CacheValue = CreateCacheValue(Size);
-
- IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
- Buffer.SetContentType(ZenContentType::kCbObject);
-
- ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(Namespace, Bucket, Key, PutValue);
- return Key;
- };
- auto GetValue = [](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
- ZenCacheValue GetValue;
- Zcs.Get(Namespace, Bucket, Key, GetValue);
- return GetValue;
- };
- WorkerThreadPool Workers(1);
- {
- GcManager Gc;
- ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true});
- const auto Bucket1 = "teardrinker1"sv;
- const auto Bucket2 = "teardrinker2"sv;
- const auto Namespace1 = "mynamespace1"sv;
- const auto Namespace2 = "mynamespace2"sv;
-
- IoHash Key1 = PutValue(Zcs, Namespace1, Bucket1, 42, 4096);
- IoHash Key2 = PutValue(Zcs, Namespace1, Bucket2, 43, 2048);
- IoHash Key3 = PutValue(Zcs, Namespace2, Bucket1, 44, 4096);
- IoHash Key4 = PutValue(Zcs, Namespace2, Bucket2, 45, 2048);
-
- ZenCacheValue Value1 = GetValue(Zcs, Namespace1, Bucket1, Key1);
- CHECK(Value1.Value);
- ZenCacheValue Value2 = GetValue(Zcs, Namespace1, Bucket2, Key2);
- CHECK(Value2.Value);
- ZenCacheValue Value3 = GetValue(Zcs, Namespace2, Bucket1, Key3);
- CHECK(Value3.Value);
- ZenCacheValue Value4 = GetValue(Zcs, Namespace2, Bucket2, Key4);
- CHECK(Value4.Value);
-
- std::atomic_bool WorkComplete = false;
- Workers.ScheduleWork([&]() {
- zen::Sleep(100);
- Value1.Value = IoBuffer{};
- Value2.Value = IoBuffer{};
- Value3.Value = IoBuffer{};
- Value4.Value = IoBuffer{};
- WorkComplete = true;
- });
- // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket
- // Our DropBucket execution blocks any incoming request from completing until we are done with the drop
- CHECK(Zcs.DropNamespace(Namespace1));
- while (!WorkComplete)
- {
- zen::Sleep(1);
- }
-
- // Entire namespace should be dropped, but doing a request should will re-create the namespace but it must still be empty
- Value1 = GetValue(Zcs, Namespace1, Bucket1, Key1);
- CHECK(!Value1.Value);
- Value2 = GetValue(Zcs, Namespace1, Bucket2, Key2);
- CHECK(!Value2.Value);
- Value3 = GetValue(Zcs, Namespace2, Bucket1, Key3);
- CHECK(Value3.Value);
- Value4 = GetValue(Zcs, Namespace2, Bucket2, Key4);
- CHECK(Value4.Value);
- }
-}
-
-TEST_CASE("z$.blocked.disklayer.put")
-{
- ScopedTemporaryDirectory TempDir;
-
- GcStorageSize CacheSize;
-
- const auto CreateCacheValue = [](size_t Size) -> CbObject {
- std::vector<uint8_t> Buf;
- Buf.resize(Size, Size & 0xff);
-
- CbObjectWriter Writer;
- Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
- return Writer.Save();
- };
-
- GcManager Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
-
- CbObject CacheValue = CreateCacheValue(64 * 1024 + 64);
-
- IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
- Buffer.SetContentType(ZenContentType::kCbObject);
-
- size_t Key = Buffer.Size();
- IoHash HashKey = IoHash::HashBuffer(&Key, sizeof(uint32_t));
- Zcs.Put("test_bucket", HashKey, {.Value = Buffer});
-
- ZenCacheValue BufferGet;
- CHECK(Zcs.Get("test_bucket", HashKey, BufferGet));
-
- CbObject CacheValue2 = CreateCacheValue(64 * 1024 + 64 + 1);
- IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer();
- Buffer2.SetContentType(ZenContentType::kCbObject);
-
- // We should be able to overwrite even if the file is open for read
- Zcs.Put("test_bucket", HashKey, {.Value = Buffer2});
-
- MemoryView OldView = BufferGet.Value.GetView();
-
- ZenCacheValue BufferGet2;
- CHECK(Zcs.Get("test_bucket", HashKey, BufferGet2));
- MemoryView NewView = BufferGet2.Value.GetView();
-
- // Make sure file openend for read before we wrote it still have old data
- CHECK(OldView.GetSize() == Buffer.GetSize());
- CHECK(memcmp(OldView.GetData(), Buffer.GetData(), OldView.GetSize()) == 0);
-
- // Make sure we get the new data when reading after we write new data
- CHECK(NewView.GetSize() == Buffer2.GetSize());
- CHECK(memcmp(NewView.GetData(), Buffer2.GetData(), NewView.GetSize()) == 0);
-}
-
-TEST_CASE("z$.scrub")
-{
- ScopedTemporaryDirectory TempDir;
-
- using namespace testutils;
-
- struct CacheRecord
- {
- IoBuffer Record;
- std::vector<CompressedBuffer> Attachments;
- };
-
- auto CreateCacheRecord = [](bool Structured, std::string_view Bucket, const IoHash& Key, const std::vector<size_t>& AttachmentSizes) {
- CacheRecord Result;
- if (Structured)
- {
- Result.Attachments.resize(AttachmentSizes.size());
- CbObjectWriter Record;
- Record.BeginObject("Key"sv);
- {
- Record << "Bucket"sv << Bucket;
- Record << "Hash"sv << Key;
- }
- Record.EndObject();
- for (size_t Index = 0; Index < AttachmentSizes.size(); Index++)
- {
- IoBuffer AttachmentData = CreateBinaryCacheValue(AttachmentSizes[Index]);
- CompressedBuffer CompressedAttachmentData = CompressedBuffer::Compress(SharedBuffer(AttachmentData));
- Record.AddBinaryAttachment(fmt::format("attachment-{}", Index), CompressedAttachmentData.DecodeRawHash());
- Result.Attachments[Index] = CompressedAttachmentData;
- }
- Result.Record = Record.Save().GetBuffer().AsIoBuffer();
- Result.Record.SetContentType(ZenContentType::kCbObject);
- }
- else
- {
- std::string RecordData = fmt::format("{}:{}", Bucket, Key.ToHexString());
- size_t TotalSize = RecordData.length() + 1;
- for (size_t AttachmentSize : AttachmentSizes)
- {
- TotalSize += AttachmentSize;
- }
- Result.Record = IoBuffer(TotalSize);
- char* DataPtr = (char*)Result.Record.MutableData();
- memcpy(DataPtr, RecordData.c_str(), RecordData.length() + 1);
- DataPtr += RecordData.length() + 1;
- for (size_t AttachmentSize : AttachmentSizes)
- {
- IoBuffer AttachmentData = CreateBinaryCacheValue(AttachmentSize);
- memcpy(DataPtr, AttachmentData.GetData(), AttachmentData.GetSize());
- DataPtr += AttachmentData.GetSize();
- }
- }
- return Result;
- };
-
- GcManager Gc;
- CidStore CidStore(Gc);
- ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
- CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
- CidStore.Initialize(CidConfig);
-
- auto CreateRecords =
- [&](bool IsStructured, std::string_view BucketName, const std::vector<IoHash>& Cids, const std::vector<size_t>& AttachmentSizes) {
- for (const IoHash& Cid : Cids)
- {
- CacheRecord Record = CreateCacheRecord(IsStructured, BucketName, Cid, AttachmentSizes);
- Zcs.Put("mybucket", Cid, {.Value = Record.Record});
- for (const CompressedBuffer& Attachment : Record.Attachments)
- {
- CidStore.AddChunk(Attachment.GetCompressed().Flatten().AsIoBuffer(), Attachment.DecodeRawHash());
- }
- }
- };
-
- std::vector<size_t> AttachmentSizes = {16, 1000, 2000, 4000, 8000, 64000, 80000};
-
- std::vector<IoHash> UnstructuredCids{CreateKey(4), CreateKey(5), CreateKey(6)};
- CreateRecords(false, "mybucket"sv, UnstructuredCids, AttachmentSizes);
-
- std::vector<IoHash> StructuredCids{CreateKey(1), CreateKey(2), CreateKey(3)};
- CreateRecords(true, "mybucket"sv, StructuredCids, AttachmentSizes);
-
- ScrubContext ScrubCtx;
- Zcs.Scrub(ScrubCtx);
- CidStore.Scrub(ScrubCtx);
- CHECK(ScrubCtx.ScrubbedChunks() == (StructuredCids.size() + StructuredCids.size() * AttachmentSizes.size()) + UnstructuredCids.size());
- CHECK(ScrubCtx.BadCids().GetSize() == 0);
-}
-
-#endif
-
-void
-z$_forcelink()
-{
-}
-
-} // namespace zen
diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h
deleted file mode 100644
index 3fb4f035d..000000000
--- a/zenserver/cache/structuredcachestore.h
+++ /dev/null
@@ -1,535 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/compactbinary.h>
-#include <zencore/iobuffer.h>
-#include <zencore/iohash.h>
-#include <zencore/thread.h>
-#include <zencore/uid.h>
-#include <zenstore/blockstore.h>
-#include <zenstore/caslog.h>
-#include <zenstore/gc.h>
-
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <tsl/robin_map.h>
-ZEN_THIRD_PARTY_INCLUDES_END
-
-#include <atomic>
-#include <compare>
-#include <filesystem>
-#include <unordered_map>
-
-#define ZEN_USE_CACHE_TRACKER 0
-
-namespace zen {
-
-class PathBuilderBase;
-class GcManager;
-class ZenCacheTracker;
-class ScrubContext;
-
-/******************************************************************************
-
- /$$$$$$$$ /$$$$$$ /$$
- |_____ $$ /$$__ $$ | $$
- /$$/ /$$$$$$ /$$$$$$$ | $$ \__/ /$$$$$$ /$$$$$$| $$$$$$$ /$$$$$$
- /$$/ /$$__ $| $$__ $$ | $$ |____ $$/$$_____| $$__ $$/$$__ $$
- /$$/ | $$$$$$$| $$ \ $$ | $$ /$$$$$$| $$ | $$ \ $| $$$$$$$$
- /$$/ | $$_____| $$ | $$ | $$ $$/$$__ $| $$ | $$ | $| $$_____/
- /$$$$$$$| $$$$$$| $$ | $$ | $$$$$$| $$$$$$| $$$$$$| $$ | $| $$$$$$$
- |________/\_______|__/ |__/ \______/ \_______/\_______|__/ |__/\_______/
-
- Cache store for UE5. Restricts keys to "{bucket}/{hash}" pairs where the hash
- is 40 (hex) chars in size. Values may be opaque blobs or structured objects
- which can in turn contain references to other objects (or blobs).
-
-******************************************************************************/
-
-namespace access_tracking {
-
- struct KeyAccessTime
- {
- IoHash Key;
- GcClock::Tick LastAccess{};
- };
-
- struct AccessTimes
- {
- std::unordered_map<std::string, std::vector<KeyAccessTime>> Buckets;
- };
-}; // namespace access_tracking
-
-struct ZenCacheValue
-{
- IoBuffer Value;
- uint64_t RawSize = 0;
- IoHash RawHash = IoHash::Zero;
-};
-
-struct CacheValueDetails
-{
- struct ValueDetails
- {
- uint64_t Size;
- uint64_t RawSize;
- IoHash RawHash;
- GcClock::Tick LastAccess{};
- std::vector<IoHash> Attachments;
- ZenContentType ContentType;
- };
-
- struct BucketDetails
- {
- std::unordered_map<IoHash, ValueDetails, IoHash::Hasher> Values;
- };
-
- struct NamespaceDetails
- {
- std::unordered_map<std::string, BucketDetails> Buckets;
- };
-
- std::unordered_map<std::string, NamespaceDetails> Namespaces;
-};
-
-//////////////////////////////////////////////////////////////////////////
-
-#pragma pack(push)
-#pragma pack(1)
-
-struct DiskLocation
-{
- inline DiskLocation() = default;
-
- inline DiskLocation(uint64_t ValueSize, uint8_t Flags) : Flags(Flags | kStandaloneFile) { Location.StandaloneSize = ValueSize; }
-
- inline DiskLocation(const BlockStoreLocation& Location, uint64_t PayloadAlignment, uint8_t Flags) : Flags(Flags & ~kStandaloneFile)
- {
- this->Location.BlockLocation = BlockStoreDiskLocation(Location, PayloadAlignment);
- }
-
- inline BlockStoreLocation GetBlockLocation(uint64_t PayloadAlignment) const
- {
- ZEN_ASSERT(!(Flags & kStandaloneFile));
- return Location.BlockLocation.Get(PayloadAlignment);
- }
-
- inline uint64_t Size() const { return (Flags & kStandaloneFile) ? Location.StandaloneSize : Location.BlockLocation.GetSize(); }
- inline uint8_t IsFlagSet(uint64_t Flag) const { return Flags & Flag; }
- inline uint8_t GetFlags() const { return Flags; }
- inline ZenContentType GetContentType() const
- {
- ZenContentType ContentType = ZenContentType::kBinary;
-
- if (IsFlagSet(kStructured))
- {
- ContentType = ZenContentType::kCbObject;
- }
-
- if (IsFlagSet(kCompressed))
- {
- ContentType = ZenContentType::kCompressedBinary;
- }
-
- return ContentType;
- }
-
- union
- {
- BlockStoreDiskLocation BlockLocation; // 10 bytes
- uint64_t StandaloneSize = 0; // 8 bytes
- } Location;
-
- static const uint8_t kStandaloneFile = 0x80u; // Stored as a separate file
- static const uint8_t kStructured = 0x40u; // Serialized as compact binary
- static const uint8_t kTombStone = 0x20u; // Represents a deleted key/value
- static const uint8_t kCompressed = 0x10u; // Stored in compressed buffer format
-
- uint8_t Flags = 0;
- uint8_t Reserved = 0;
-};
-
-struct DiskIndexEntry
-{
- IoHash Key; // 20 bytes
- DiskLocation Location; // 12 bytes
-};
-
-#pragma pack(pop)
-
-static_assert(sizeof(DiskIndexEntry) == 32);
-
-// This store the access time as seconds since epoch internally in a 32-bit value giving is a range of 136 years since epoch
-struct AccessTime
-{
- explicit AccessTime(GcClock::Tick Tick) noexcept : SecondsSinceEpoch(ToSeconds(Tick)) {}
- AccessTime& operator=(GcClock::Tick Tick) noexcept
- {
- SecondsSinceEpoch.store(ToSeconds(Tick), std::memory_order_relaxed);
- return *this;
- }
- operator GcClock::Tick() const noexcept
- {
- return std::chrono::duration_cast<GcClock::Duration>(std::chrono::seconds(SecondsSinceEpoch.load(std::memory_order_relaxed)))
- .count();
- }
-
- AccessTime(AccessTime&& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {}
- AccessTime(const AccessTime& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {}
- AccessTime& operator=(AccessTime&& Rhs) noexcept
- {
- SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed);
- return *this;
- }
- AccessTime& operator=(const AccessTime& Rhs) noexcept
- {
- SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed);
- return *this;
- }
-
-private:
- static uint32_t ToSeconds(GcClock::Tick Tick)
- {
- return gsl::narrow<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>(GcClock::Duration(Tick)).count());
- }
- std::atomic_uint32_t SecondsSinceEpoch;
-};
-
-/** In-memory cache storage
-
- Intended for small values which are frequently accessed
-
- This should have a better memory management policy to maintain reasonable
- footprint.
- */
-class ZenCacheMemoryLayer
-{
-public:
- struct Configuration
- {
- uint64_t TargetFootprintBytes = 16 * 1024 * 1024;
- uint64_t ScavengeThreshold = 4 * 1024 * 1024;
- };
-
- struct BucketInfo
- {
- uint64_t EntryCount = 0;
- uint64_t TotalSize = 0;
- };
-
- struct Info
- {
- Configuration Config;
- std::vector<std::string> BucketNames;
- uint64_t EntryCount = 0;
- uint64_t TotalSize = 0;
- };
-
- ZenCacheMemoryLayer();
- ~ZenCacheMemoryLayer();
-
- bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value);
- void Drop();
- bool DropBucket(std::string_view Bucket);
- void Scrub(ScrubContext& Ctx);
- void GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes);
- void Reset();
- uint64_t TotalSize() const;
-
- Info GetInfo() const;
- std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const;
-
- const Configuration& GetConfiguration() const { return m_Configuration; }
- void SetConfiguration(const Configuration& NewConfig) { m_Configuration = NewConfig; }
-
-private:
- struct CacheBucket
- {
-#pragma pack(push)
-#pragma pack(1)
- struct BucketPayload
- {
- IoBuffer Payload; // 8
- uint32_t RawSize; // 4
- IoHash RawHash; // 20
- };
-#pragma pack(pop)
- static_assert(sizeof(BucketPayload) == 32u);
- static_assert(sizeof(AccessTime) == 4u);
-
- mutable RwLock m_BucketLock;
- std::vector<AccessTime> m_AccessTimes;
- std::vector<BucketPayload> m_Payloads;
- tsl::robin_map<IoHash, uint32_t> m_CacheMap;
-
- std::atomic_uint64_t m_TotalSize{};
-
- bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(const IoHash& HashKey, const ZenCacheValue& Value);
- void Drop();
- void Scrub(ScrubContext& Ctx);
- void GatherAccessTimes(std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes);
- inline uint64_t TotalSize() const { return m_TotalSize; }
- uint64_t EntryCount() const;
- };
-
- mutable RwLock m_Lock;
- std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets;
- std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets;
- Configuration m_Configuration;
-
- ZenCacheMemoryLayer(const ZenCacheMemoryLayer&) = delete;
- ZenCacheMemoryLayer& operator=(const ZenCacheMemoryLayer&) = delete;
-};
-
-class ZenCacheDiskLayer
-{
-public:
- struct Configuration
- {
- std::filesystem::path RootDir;
- };
-
- struct BucketInfo
- {
- uint64_t EntryCount = 0;
- uint64_t TotalSize = 0;
- };
-
- struct Info
- {
- Configuration Config;
- std::vector<std::string> BucketNames;
- uint64_t EntryCount = 0;
- uint64_t TotalSize = 0;
- };
-
- explicit ZenCacheDiskLayer(const std::filesystem::path& RootDir);
- ~ZenCacheDiskLayer();
-
- bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value);
- bool Drop();
- bool DropBucket(std::string_view Bucket);
- void Flush();
- void Scrub(ScrubContext& Ctx);
- void GatherReferences(GcContext& GcCtx);
- void CollectGarbage(GcContext& GcCtx);
- void UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes);
-
- void DiscoverBuckets();
- uint64_t TotalSize() const;
-
- Info GetInfo() const;
- std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const;
-
- CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const;
-
-private:
- /** A cache bucket manages a single directory containing
- metadata and data for that bucket
- */
- struct CacheBucket
- {
- CacheBucket(std::string BucketName);
- ~CacheBucket();
-
- bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
- bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(const IoHash& HashKey, const ZenCacheValue& Value);
- bool Drop();
- void Flush();
- void Scrub(ScrubContext& Ctx);
- void GatherReferences(GcContext& GcCtx);
- void CollectGarbage(GcContext& GcCtx);
- void UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes);
-
- inline uint64_t TotalSize() const { return m_TotalStandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(); }
- uint64_t EntryCount() const;
-
- CacheValueDetails::BucketDetails GetValueDetails(const std::string_view ValueFilter) const;
-
- private:
- const uint64_t MaxBlockSize = 1ull << 30;
- uint64_t m_PayloadAlignment = 1ull << 4;
-
- std::string m_BucketName;
- std::filesystem::path m_BucketDir;
- std::filesystem::path m_BlocksBasePath;
- BlockStore m_BlockStore;
- Oid m_BucketId;
- uint64_t m_LargeObjectThreshold = 128 * 1024;
-
- // These files are used to manage storage of small objects for this bucket
-
- TCasLogFile<DiskIndexEntry> m_SlogFile;
- uint64_t m_LogFlushPosition = 0;
-
-#pragma pack(push)
-#pragma pack(1)
- struct BucketPayload
- {
- DiskLocation Location; // 12
- uint64_t RawSize; // 8
- IoHash RawHash; // 20
- };
-#pragma pack(pop)
- static_assert(sizeof(BucketPayload) == 40u);
- static_assert(sizeof(AccessTime) == 4u);
-
- using IndexMap = tsl::robin_map<IoHash, size_t, IoHash::Hasher>;
-
- mutable RwLock m_IndexLock;
- std::vector<AccessTime> m_AccessTimes;
- std::vector<BucketPayload> m_Payloads;
- IndexMap m_Index;
-
- std::atomic_uint64_t m_TotalStandaloneSize{};
-
- void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const;
- void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
- IoBuffer GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const;
- void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
- IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const;
- void MakeIndexSnapshot();
- uint64_t ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion);
- uint64_t ReadLog(const std::filesystem::path& LogPath, uint64_t LogPosition);
- void OpenLog(const bool IsNew);
- void SaveManifest();
- CacheValueDetails::ValueDetails GetValueDetails(const IoHash& Key, size_t Index) const;
- // These locks are here to avoid contention on file creation, therefore it's sufficient
- // that we take the same lock for the same hash
- //
- // These locks are small and should really be spaced out so they don't share cache lines,
- // but we don't currently access them at particularly high frequency so it should not be
- // an issue in practice
-
- mutable RwLock m_ShardedLocks[256];
- inline RwLock& LockForHash(const IoHash& Hash) const { return m_ShardedLocks[Hash.Hash[19]]; }
- };
-
- std::filesystem::path m_RootDir;
- mutable RwLock m_Lock;
- std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets; // TODO: make this case insensitive
- std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets;
-
- ZenCacheDiskLayer(const ZenCacheDiskLayer&) = delete;
- ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete;
-};
-
-class ZenCacheNamespace final : public RefCounted, public GcStorage, public GcContributor
-{
-public:
- struct Configuration
- {
- std::filesystem::path RootDir;
- uint64_t DiskLayerThreshold = 0;
- };
- struct BucketInfo
- {
- ZenCacheDiskLayer::BucketInfo DiskLayerInfo;
- ZenCacheMemoryLayer::BucketInfo MemoryLayerInfo;
- };
- struct Info
- {
- Configuration Config;
- std::vector<std::string> BucketNames;
- ZenCacheDiskLayer::Info DiskLayerInfo;
- ZenCacheMemoryLayer::Info MemoryLayerInfo;
- };
-
- ZenCacheNamespace(GcManager& Gc, const std::filesystem::path& RootDir);
- ~ZenCacheNamespace();
-
- bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value);
- bool Drop();
- bool DropBucket(std::string_view Bucket);
- void Flush();
- void Scrub(ScrubContext& Ctx);
- uint64_t DiskLayerThreshold() const { return m_DiskLayerSizeThreshold; }
- virtual void GatherReferences(GcContext& GcCtx) override;
- virtual void CollectGarbage(GcContext& GcCtx) override;
- virtual GcStorageSize StorageSize() const override;
- Info GetInfo() const;
- std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const;
-
- CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const;
-
-private:
- std::filesystem::path m_RootDir;
- ZenCacheMemoryLayer m_MemLayer;
- ZenCacheDiskLayer m_DiskLayer;
- uint64_t m_DiskLayerSizeThreshold = 1 * 1024;
- uint64_t m_LastScrubTime = 0;
-
-#if ZEN_USE_CACHE_TRACKER
- std::unique_ptr<ZenCacheTracker> m_AccessTracker;
-#endif
-
- ZenCacheNamespace(const ZenCacheNamespace&) = delete;
- ZenCacheNamespace& operator=(const ZenCacheNamespace&) = delete;
-};
-
-class ZenCacheStore final
-{
-public:
- static constexpr std::string_view DefaultNamespace =
- "!default!"; // This is intentionally not a valid namespace name and will only be used for mapping when no namespace is given
- static constexpr std::string_view NamespaceDiskPrefix = "ns_";
-
- struct Configuration
- {
- std::filesystem::path BasePath;
- bool AllowAutomaticCreationOfNamespaces = false;
- };
-
- struct Info
- {
- Configuration Config;
- std::vector<std::string> NamespaceNames;
- uint64_t DiskEntryCount = 0;
- uint64_t MemoryEntryCount = 0;
- GcStorageSize StorageSize;
- };
-
- ZenCacheStore(GcManager& Gc, const Configuration& Configuration);
- ~ZenCacheStore();
-
- bool Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value);
- bool DropBucket(std::string_view Namespace, std::string_view Bucket);
- bool DropNamespace(std::string_view Namespace);
- void Flush();
- void Scrub(ScrubContext& Ctx);
-
- CacheValueDetails GetValueDetails(const std::string_view NamespaceFilter,
- const std::string_view BucketFilter,
- const std::string_view ValueFilter) const;
-
- GcStorageSize StorageSize() const;
- // const Configuration& GetConfiguration() const { return m_Configuration; }
-
- Info GetInfo() const;
- std::optional<ZenCacheNamespace::Info> GetNamespaceInfo(std::string_view Namespace);
- std::optional<ZenCacheNamespace::BucketInfo> GetBucketInfo(std::string_view Namespace, std::string_view Bucket);
-
-private:
- const ZenCacheNamespace* FindNamespace(std::string_view Namespace) const;
- ZenCacheNamespace* GetNamespace(std::string_view Namespace);
- void IterateNamespaces(const std::function<void(std::string_view Namespace, ZenCacheNamespace& Store)>& Callback) const;
-
- typedef std::unordered_map<std::string, std::unique_ptr<ZenCacheNamespace>> NamespaceMap;
-
- mutable RwLock m_NamespacesLock;
- NamespaceMap m_Namespaces;
- std::vector<std::unique_ptr<ZenCacheNamespace>> m_DroppedNamespaces;
-
- GcManager& m_Gc;
- Configuration m_Configuration;
-};
-
-void z$_forcelink();
-
-} // namespace zen