diff options
| author | Stefan Boberg <[email protected]> | 2023-05-02 10:01:47 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-02 10:01:47 +0200 |
| commit | 075d17f8ada47e990fe94606c3d21df409223465 (patch) | |
| tree | e50549b766a2f3c354798a54ff73404217b4c9af /src/zenserver/cache | |
| parent | fix: bundle shouldn't append content zip to zen (diff) | |
| download | zen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz zen-075d17f8ada47e990fe94606c3d21df409223465.zip | |
moved source directories into `/src` (#264)
* moved source directories into `/src`
* updated bundle.lua for new `src` path
* moved some docs, icon
* removed old test trees
Diffstat (limited to 'src/zenserver/cache')
| -rw-r--r-- | src/zenserver/cache/cachetracking.cpp | 376 | ||||
| -rw-r--r-- | src/zenserver/cache/cachetracking.h | 41 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcache.cpp | 3159 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcache.h | 187 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.cpp | 3648 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.h | 535 |
6 files changed, 7946 insertions, 0 deletions
diff --git a/src/zenserver/cache/cachetracking.cpp b/src/zenserver/cache/cachetracking.cpp new file mode 100644 index 000000000..9119e3122 --- /dev/null +++ b/src/zenserver/cache/cachetracking.cpp @@ -0,0 +1,376 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "cachetracking.h" + +#if ZEN_USE_CACHE_TRACKER + +# include <zencore/compactbinarybuilder.h> +# include <zencore/compactbinaryvalue.h> +# include <zencore/endian.h> +# include <zencore/filesystem.h> +# include <zencore/logging.h> +# include <zencore/scopeguard.h> +# include <zencore/string.h> + +# include <zencore/testing.h> +# include <zencore/testutils.h> + +ZEN_THIRD_PARTY_INCLUDES_START +# pragma comment(lib, "Rpcrt4.lib") // RocksDB made me do this +# include <fmt/format.h> +# include <rocksdb/db.h> +# include <tsl/robin_map.h> +# include <tsl/robin_set.h> +# include <gsl/gsl-lite.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +namespace rocksdb = ROCKSDB_NAMESPACE; + +static constinit auto Epoch = std::chrono::time_point<std::chrono::system_clock>{}; + +static uint64_t +GetCurrentCacheTimeStamp() +{ + auto Duration = std::chrono::system_clock::now() - Epoch; + uint64_t Millis = std::chrono::duration_cast<std::chrono::milliseconds>(Duration).count(); + + return Millis; +} + +struct CacheAccessSnapshot +{ +public: + void TrackAccess(std::string_view BucketSegment, const IoHash& HashKey) + { + BucketTracker* Tracker = GetBucket(std::string(BucketSegment)); + + Tracker->Track(HashKey); + } + + bool SerializeSnapshot(CbObjectWriter& Cbo) + { + bool Serialized = false; + RwLock::ExclusiveLockScope _(m_Lock); + + for (const auto& Kv : m_BucketMapping) + { + if (m_Buckets[Kv.second]->Size()) + { + Cbo.BeginArray(Kv.first); + m_Buckets[Kv.second]->SerializeSnapshotAndClear(Cbo); + Cbo.EndArray(); + Serialized = true; + } + } + + return Serialized; + } + +private: + struct BucketTracker + { + mutable RwLock Lock; + tsl::robin_set<IoHash> AccessedKeys; + + void Track(const IoHash& HashKey) + { + if (RwLock::SharedLockScope _(Lock); AccessedKeys.contains(HashKey)) + { + return; + } + + RwLock::ExclusiveLockScope _(Lock); + + AccessedKeys.insert(HashKey); + } + + void SerializeSnapshotAndClear(CbObjectWriter& Cbo) + { + RwLock::ExclusiveLockScope _(Lock); + + for (const IoHash& Hash : AccessedKeys) + { + Cbo.AddHash(Hash); + } + + AccessedKeys.clear(); + } + + size_t Size() const + { + RwLock::SharedLockScope _(Lock); + return AccessedKeys.size(); + } + }; + + BucketTracker* GetBucket(const std::string& BucketName) + { + RwLock::SharedLockScope _(m_Lock); + + if (auto It = m_BucketMapping.find(BucketName); It == m_BucketMapping.end()) + { + _.ReleaseNow(); + + return AddNewBucket(BucketName); + } + else + { + return m_Buckets[It->second].get(); + } + } + + BucketTracker* AddNewBucket(const std::string& BucketName) + { + RwLock::ExclusiveLockScope _(m_Lock); + + if (auto It = m_BucketMapping.find(BucketName); It == m_BucketMapping.end()) + { + const uint32_t BucketIndex = gsl::narrow<uint32_t>(m_Buckets.size()); + m_Buckets.emplace_back(std::make_unique<BucketTracker>()); + m_BucketMapping[BucketName] = BucketIndex; + + return m_Buckets[BucketIndex].get(); + } + else + { + return m_Buckets[It->second].get(); + } + } + + RwLock m_Lock; + std::vector<std::unique_ptr<BucketTracker>> m_Buckets; + tsl::robin_map<std::string, uint32_t> m_BucketMapping; +}; + +struct ZenCacheTracker::Impl +{ + Impl(std::filesystem::path StateDirectory) + { + std::filesystem::path StatsDbPath{StateDirectory / ".zdb"}; + + std::string RocksdbPath = StatsDbPath.string(); + + ZEN_DEBUG("opening tracker db at '{}'", RocksdbPath); + + rocksdb::DB* Db = nullptr; + rocksdb::DBOptions Options; + Options.create_if_missing = true; + + std::vector<std::string> ExistingColumnFamilies; + rocksdb::Status Status = rocksdb::DB::ListColumnFamilies(Options, RocksdbPath, &ExistingColumnFamilies); + + std::vector<rocksdb::ColumnFamilyDescriptor> ColumnDescriptors; + + if (Status.IsPathNotFound()) + { + ColumnDescriptors.emplace_back(rocksdb::ColumnFamilyDescriptor{rocksdb::kDefaultColumnFamilyName, {}}); + } + else if (Status.ok()) + { + for (const std::string& Column : ExistingColumnFamilies) + { + rocksdb::ColumnFamilyDescriptor ColumnFamily; + ColumnFamily.name = Column; + ColumnDescriptors.push_back(ColumnFamily); + } + } + else + { + throw std::runtime_error(fmt::format("column family iteration failed for '{}': '{}'", RocksdbPath, Status.getState()).c_str()); + } + + Status = rocksdb::DB::Open(Options, RocksdbPath, ColumnDescriptors, &m_RocksDbColumnHandles, &Db); + + if (!Status.ok()) + { + throw std::runtime_error(fmt::format("database open failed for '{}': '{}'", RocksdbPath, Status.getState()).c_str()); + } + + m_RocksDb.reset(Db); + } + + ~Impl() + { + for (auto* Column : m_RocksDbColumnHandles) + { + delete Column; + } + + m_RocksDbColumnHandles.clear(); + } + + struct KeyStruct + { + uint64_t TimestampLittleEndian; + }; + + void TrackAccess(std::string_view BucketSegment, const IoHash& HashKey) { m_CurrentSnapshot.TrackAccess(BucketSegment, HashKey); } + + void SaveSnapshot() + { + CbObjectWriter Cbo; + + if (m_CurrentSnapshot.SerializeSnapshot(Cbo)) + { + IoBuffer SnapshotBuffer = Cbo.Save().GetBuffer().AsIoBuffer(); + + const KeyStruct Key{.TimestampLittleEndian = ToNetworkOrder(GetCurrentCacheTimeStamp())}; + rocksdb::Slice KeySlice{(const char*)&Key, sizeof Key}; + rocksdb::Slice ValueSlice{(char*)SnapshotBuffer.Data(), SnapshotBuffer.Size()}; + + rocksdb::WriteOptions Wo; + m_RocksDb->Put(Wo, KeySlice, ValueSlice); + } + } + + void IterateSnapshots(std::function<void(uint64_t TimeStamp, CbObject Snapshot)>&& Callback) + { + rocksdb::ManagedSnapshot Snap(m_RocksDb.get()); + + rocksdb::ReadOptions Ro; + Ro.snapshot = Snap.snapshot(); + + std::unique_ptr<rocksdb::Iterator> It{m_RocksDb->NewIterator(Ro)}; + + const KeyStruct ZeroKey{.TimestampLittleEndian = 0}; + rocksdb::Slice ZeroKeySlice{(const char*)&ZeroKey, sizeof ZeroKey}; + + It->Seek(ZeroKeySlice); + + while (It->Valid()) + { + rocksdb::Slice KeySlice = It->key(); + rocksdb::Slice ValueSlice = It->value(); + + if (KeySlice.size() == sizeof(KeyStruct)) + { + IoBuffer ValueBuffer(IoBuffer::Wrap, ValueSlice.data(), ValueSlice.size()); + + CbObject Value = LoadCompactBinaryObject(ValueBuffer); + + uint64_t Key = FromNetworkOrder(*reinterpret_cast<const uint64_t*>(KeySlice.data())); + + Callback(Key, Value); + } + + It->Next(); + } + } + + std::unique_ptr<rocksdb::DB> m_RocksDb; + std::vector<rocksdb::ColumnFamilyHandle*> m_RocksDbColumnHandles; + CacheAccessSnapshot m_CurrentSnapshot; +}; + +ZenCacheTracker::ZenCacheTracker(std::filesystem::path StateDirectory) : m_Impl(new Impl(StateDirectory)) +{ +} + +ZenCacheTracker::~ZenCacheTracker() +{ + delete m_Impl; +} + +void +ZenCacheTracker::TrackAccess(std::string_view BucketSegment, const IoHash& HashKey) +{ + m_Impl->TrackAccess(BucketSegment, HashKey); +} + +void +ZenCacheTracker::SaveSnapshot() +{ + m_Impl->SaveSnapshot(); +} + +void +ZenCacheTracker::IterateSnapshots(std::function<void(uint64_t TimeStamp, CbObject Snapshot)>&& Callback) +{ + m_Impl->IterateSnapshots(std::move(Callback)); +} + +# if ZEN_WITH_TESTS + +TEST_CASE("z$.tracker") +{ + using namespace std::literals; + + const uint64_t t0 = GetCurrentCacheTimeStamp(); + + ScopedTemporaryDirectory TempDir; + + ZenCacheTracker Zcs(TempDir.Path()); + + tsl::robin_set<IoHash> KeyHashes; + + for (int i = 0; i < 10000; ++i) + { + IoHash KeyHash = IoHash::HashBuffer(&i, sizeof i); + + KeyHashes.insert(KeyHash); + + Zcs.TrackAccess("foo"sv, KeyHash); + } + + for (int i = 0; i < 10000; ++i) + { + IoHash KeyHash = IoHash::HashBuffer(&i, sizeof i); + + Zcs.TrackAccess("foo"sv, KeyHash); + } + + Zcs.SaveSnapshot(); + + for (int n = 0; n < 10; ++n) + { + for (int i = 0; i < 1000; ++i) + { + const int Index = i + n * 1000; + IoHash KeyHash = IoHash::HashBuffer(&Index, sizeof Index); + + Zcs.TrackAccess("foo"sv, KeyHash); + } + + Zcs.SaveSnapshot(); + } + + Zcs.SaveSnapshot(); + + const uint64_t t1 = GetCurrentCacheTimeStamp(); + + int SnapshotCount = 0; + + Zcs.IterateSnapshots([&](uint64_t TimeStamp, CbObject Snapshot) { + CHECK(TimeStamp >= t0); + CHECK(TimeStamp <= t1); + + for (auto& Field : Snapshot) + { + CHECK_EQ(Field.GetName(), "foo"sv); + + const CbArray& Array = Field.AsArray(); + + for (const auto& Element : Array) + { + CHECK(KeyHashes.contains(Element.GetValue().AsHash())); + } + } + + ++SnapshotCount; + }); + + CHECK_EQ(SnapshotCount, 11); +} + +# endif + +void +cachetracker_forcelink() +{ +} + +} // namespace zen + +#endif // ZEN_USE_CACHE_TRACKER diff --git a/src/zenserver/cache/cachetracking.h b/src/zenserver/cache/cachetracking.h new file mode 100644 index 000000000..fdfe1a4c7 --- /dev/null +++ b/src/zenserver/cache/cachetracking.h @@ -0,0 +1,41 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/iohash.h> + +#include <stdint.h> +#include <filesystem> +#include <functional> + +namespace zen { + +#define ZEN_USE_CACHE_TRACKER 0 +#if ZEN_USE_CACHE_TRACKER + +class CbObject; + +/** + */ + +class ZenCacheTracker +{ +public: + ZenCacheTracker(std::filesystem::path StateDirectory); + ~ZenCacheTracker(); + + void TrackAccess(std::string_view BucketSegment, const IoHash& HashKey); + void SaveSnapshot(); + void IterateSnapshots(std::function<void(uint64_t TimeStamp, CbObject Snapshot)>&& Callback); + +private: + struct Impl; + + Impl* m_Impl = nullptr; +}; + +void cachetracker_forcelink(); + +#endif // ZEN_USE_CACHE_TRACKER + +} // namespace zen diff --git a/src/zenserver/cache/structuredcache.cpp b/src/zenserver/cache/structuredcache.cpp new file mode 100644 index 000000000..90e905bf6 --- /dev/null +++ b/src/zenserver/cache/structuredcache.cpp @@ -0,0 +1,3159 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "structuredcache.h" + +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/compress.h> +#include <zencore/enumflags.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/stream.h> +#include <zencore/timer.h> +#include <zencore/trace.h> +#include <zencore/workthreadpool.h> +#include <zenhttp/httpserver.h> +#include <zenhttp/httpshared.h> +#include <zenutil/cache/cache.h> +#include <zenutil/cache/rpcrecording.h> + +#include "monitoring/httpstats.h" +#include "structuredcachestore.h" +#include "upstream/jupiter.h" +#include "upstream/upstreamcache.h" +#include "upstream/zen.h" +#include "zenstore/cidstore.h" +#include "zenstore/scrubcontext.h" + +#include <algorithm> +#include <atomic> +#include <filesystem> +#include <queue> +#include <thread> + +#include <cpr/cpr.h> +#include <gsl/gsl-lite.hpp> + +#if ZEN_WITH_TESTS +# include <zencore/testing.h> +# include <zencore/testutils.h> +#endif + +namespace zen { + +using namespace std::literals; + +////////////////////////////////////////////////////////////////////////// + +CachePolicy +ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams) +{ + std::string_view PolicyText = QueryParams.GetValue("Policy"sv); + return !PolicyText.empty() ? zen::ParseCachePolicy(PolicyText) : CachePolicy::Default; +} + +CacheRecordPolicy +LoadCacheRecordPolicy(CbObjectView Object, CachePolicy DefaultPolicy = CachePolicy::Default) +{ + OptionalCacheRecordPolicy Policy = CacheRecordPolicy::Load(Object); + return Policy ? std::move(Policy).Get() : CacheRecordPolicy(DefaultPolicy); +} + +struct AttachmentCount +{ + uint32_t New = 0; + uint32_t Valid = 0; + uint32_t Invalid = 0; + uint32_t Total = 0; +}; + +struct PutRequestData +{ + std::string Namespace; + CacheKey Key; + CbObjectView RecordObject; + CacheRecordPolicy Policy; +}; + +namespace { + static constinit std::string_view HttpZCacheRPCPrefix = "$rpc"sv; + static constinit std::string_view HttpZCacheUtilStartRecording = "exec$/start-recording"sv; + static constinit std::string_view HttpZCacheUtilStopRecording = "exec$/stop-recording"sv; + static constinit std::string_view HttpZCacheUtilReplayRecording = "exec$/replay-recording"sv; + static constinit std::string_view HttpZCacheDetailsPrefix = "details$"sv; + + struct HttpRequestData + { + std::optional<std::string> Namespace; + std::optional<std::string> Bucket; + std::optional<IoHash> HashKey; + std::optional<IoHash> ValueContentId; + }; + + constinit AsciiSet ValidNamespaceNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789-_.ABCDEFGHIJKLMNOPQRSTUVWXYZ"}; + constinit AsciiSet ValidBucketNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"}; + + std::optional<std::string> GetValidNamespaceName(std::string_view Name) + { + if (Name.empty()) + { + ZEN_WARN("Namespace is invalid, empty namespace is not allowed"); + return {}; + } + + if (Name.length() > 64) + { + ZEN_WARN("Namespace '{}' is invalid, length exceeds 64 characters", Name); + return {}; + } + + if (!AsciiSet::HasOnly(Name, ValidNamespaceNameCharactersSet)) + { + ZEN_WARN("Namespace '{}' is invalid, invalid characters detected", Name); + return {}; + } + + return ToLower(Name); + } + + std::optional<std::string> GetValidBucketName(std::string_view Name) + { + if (Name.empty()) + { + ZEN_WARN("Bucket name is invalid, empty bucket name is not allowed"); + return {}; + } + + if (!AsciiSet::HasOnly(Name, ValidBucketNameCharactersSet)) + { + ZEN_WARN("Bucket name '{}' is invalid, invalid characters detected", Name); + return {}; + } + + return ToLower(Name); + } + + std::optional<IoHash> GetValidIoHash(std::string_view Hash) + { + if (Hash.length() != IoHash::StringLength) + { + return {}; + } + + IoHash KeyHash; + if (!ParseHexBytes(Hash.data(), Hash.size(), KeyHash.Hash)) + { + return {}; + } + return KeyHash; + } + + bool HttpRequestParseRelativeUri(std::string_view Key, HttpRequestData& Data) + { + std::vector<std::string_view> Tokens; + uint32_t TokenCount = ForEachStrTok(Key, '/', [&](const std::string_view& Token) { + Tokens.push_back(Token); + return true; + }); + + switch (TokenCount) + { + case 0: + return true; + case 1: + Data.Namespace = GetValidNamespaceName(Tokens[0]); + return Data.Namespace.has_value(); + case 2: + { + std::optional<IoHash> PossibleHashKey = GetValidIoHash(Tokens[1]); + if (PossibleHashKey.has_value()) + { + // Legacy bucket/key request + Data.Bucket = GetValidBucketName(Tokens[0]); + if (!Data.Bucket.has_value()) + { + return false; + } + Data.HashKey = PossibleHashKey; + Data.Namespace = ZenCacheStore::DefaultNamespace; + return true; + } + Data.Namespace = GetValidNamespaceName(Tokens[0]); + if (!Data.Namespace.has_value()) + { + return false; + } + Data.Bucket = GetValidBucketName(Tokens[1]); + if (!Data.Bucket.has_value()) + { + return false; + } + return true; + } + case 3: + { + std::optional<IoHash> PossibleHashKey = GetValidIoHash(Tokens[1]); + if (PossibleHashKey.has_value()) + { + // Legacy bucket/key/valueid request + Data.Bucket = GetValidBucketName(Tokens[0]); + if (!Data.Bucket.has_value()) + { + return false; + } + Data.HashKey = PossibleHashKey; + Data.ValueContentId = GetValidIoHash(Tokens[2]); + if (!Data.ValueContentId.has_value()) + { + return false; + } + Data.Namespace = ZenCacheStore::DefaultNamespace; + return true; + } + Data.Namespace = GetValidNamespaceName(Tokens[0]); + if (!Data.Namespace.has_value()) + { + return false; + } + Data.Bucket = GetValidBucketName(Tokens[1]); + if (!Data.Bucket.has_value()) + { + return false; + } + Data.HashKey = GetValidIoHash(Tokens[2]); + if (!Data.HashKey) + { + return false; + } + return true; + } + case 4: + { + Data.Namespace = GetValidNamespaceName(Tokens[0]); + if (!Data.Namespace.has_value()) + { + return false; + } + + Data.Bucket = GetValidBucketName(Tokens[1]); + if (!Data.Bucket.has_value()) + { + return false; + } + + Data.HashKey = GetValidIoHash(Tokens[2]); + if (!Data.HashKey.has_value()) + { + return false; + } + + Data.ValueContentId = GetValidIoHash(Tokens[3]); + if (!Data.ValueContentId.has_value()) + { + return false; + } + return true; + } + default: + return false; + } + } + + std::optional<std::string> GetRpcRequestNamespace(const CbObjectView Params) + { + CbFieldView NamespaceField = Params["Namespace"sv]; + if (!NamespaceField) + { + return std::string(ZenCacheStore::DefaultNamespace); + } + + if (NamespaceField.HasError()) + { + return {}; + } + if (!NamespaceField.IsString()) + { + return {}; + } + return GetValidNamespaceName(NamespaceField.AsString()); + } + + bool GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key) + { + CbFieldView BucketField = KeyView["Bucket"sv]; + if (BucketField.HasError()) + { + return false; + } + if (!BucketField.IsString()) + { + return false; + } + std::optional<std::string> Bucket = GetValidBucketName(BucketField.AsString()); + if (!Bucket.has_value()) + { + return false; + } + CbFieldView HashField = KeyView["Hash"sv]; + if (HashField.HasError()) + { + return false; + } + if (!HashField.IsHash()) + { + return false; + } + IoHash Hash = HashField.AsHash(); + Key = CacheKey::Create(*Bucket, Hash); + return true; + } + +} // namespace + +////////////////////////////////////////////////////////////////////////// + +HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, + CidStore& InCidStore, + HttpStatsService& StatsService, + HttpStatusService& StatusService, + UpstreamCache& UpstreamCache) +: m_Log(logging::Get("cache")) +, m_CacheStore(InCacheStore) +, m_StatsService(StatsService) +, m_StatusService(StatusService) +, m_CidStore(InCidStore) +, m_UpstreamCache(UpstreamCache) +{ + m_StatsService.RegisterHandler("z$", *this); + m_StatusService.RegisterHandler("z$", *this); +} + +HttpStructuredCacheService::~HttpStructuredCacheService() +{ + ZEN_INFO("closing structured cache"); + m_RequestRecorder.reset(); + + m_StatsService.UnregisterHandler("z$", *this); + m_StatusService.UnregisterHandler("z$", *this); +} + +const char* +HttpStructuredCacheService::BaseUri() const +{ + return "/z$/"; +} + +void +HttpStructuredCacheService::Flush() +{ + m_CacheStore.Flush(); +} + +void +HttpStructuredCacheService::Scrub(ScrubContext& Ctx) +{ + if (m_LastScrubTime == Ctx.ScrubTimestamp()) + { + return; + } + + m_LastScrubTime = Ctx.ScrubTimestamp(); + + m_CidStore.Scrub(Ctx); + m_CacheStore.Scrub(Ctx); +} + +void +HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) +{ + std::string_view Key = Request.RelativeUri(); + std::vector<std::string> Tokens; + uint32_t TokenCount = ForEachStrTok(Key, '/', [&Tokens](std::string_view Token) { + Tokens.push_back(std::string(Token)); + return true; + }); + std::string FilterNamespace; + std::string FilterBucket; + std::string FilterValue; + switch (TokenCount) + { + case 1: + break; + case 2: + { + FilterNamespace = Tokens[1]; + if (FilterNamespace.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL + } + } + break; + case 3: + { + FilterNamespace = Tokens[1]; + if (FilterNamespace.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL + } + FilterBucket = Tokens[2]; + if (FilterBucket.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL + } + } + break; + case 4: + { + FilterNamespace = Tokens[1]; + if (FilterNamespace.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL + } + FilterBucket = Tokens[2]; + if (FilterBucket.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL + } + FilterValue = Tokens[3]; + if (FilterValue.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL + } + } + break; + default: + return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL + } + + HttpServerRequest::QueryParams Params = Request.GetQueryParams(); + bool CSV = Params.GetValue("csv") == "true"; + bool Details = Params.GetValue("details") == "true"; + bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true"; + + std::chrono::seconds NowSeconds = std::chrono::duration_cast<std::chrono::seconds>(GcClock::Now().time_since_epoch()); + CacheValueDetails ValueDetails = m_CacheStore.GetValueDetails(FilterNamespace, FilterBucket, FilterValue); + + if (CSV) + { + ExtendableStringBuilder<4096> CSVWriter; + if (AttachmentDetails) + { + CSVWriter << "Namespace, Bucket, Key, Cid, Size"; + } + else if (Details) + { + CSVWriter << "Namespace, Bucket, Key, Size, RawSize, RawHash, ContentType, Age, AttachmentsCount, AttachmentsSize"; + } + else + { + CSVWriter << "Namespace, Bucket, Key"; + } + for (const auto& NamespaceIt : ValueDetails.Namespaces) + { + const std::string& Namespace = NamespaceIt.first; + for (const auto& BucketIt : NamespaceIt.second.Buckets) + { + const std::string& Bucket = BucketIt.first; + for (const auto& ValueIt : BucketIt.second.Values) + { + if (AttachmentDetails) + { + for (const IoHash& Hash : ValueIt.second.Attachments) + { + IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); + CSVWriter << "\r\n" + << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << Hash.ToHexString() + << ", " << gsl::narrow<uint64_t>(Payload.GetSize()); + } + } + else if (Details) + { + std::chrono::seconds LastAccessedSeconds = std::chrono::duration_cast<std::chrono::seconds>( + GcClock::TimePointFromTick(ValueIt.second.LastAccess).time_since_epoch()); + CSVWriter << "\r\n" + << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << ValueIt.second.Size << "," + << ValueIt.second.RawSize << "," << ValueIt.second.RawHash.ToHexString() << ", " + << ToString(ValueIt.second.ContentType) << ", " << (NowSeconds.count() - LastAccessedSeconds.count()) + << ", " << gsl::narrow<uint64_t>(ValueIt.second.Attachments.size()); + size_t AttachmentsSize = 0; + for (const IoHash& Hash : ValueIt.second.Attachments) + { + IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); + AttachmentsSize += Payload.GetSize(); + } + CSVWriter << ", " << gsl::narrow<uint64_t>(AttachmentsSize); + } + else + { + CSVWriter << "\r\n" << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString(); + } + } + } + } + return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); + } + else + { + CbObjectWriter Cbo; + Cbo.BeginArray("namespaces"); + { + for (const auto& NamespaceIt : ValueDetails.Namespaces) + { + const std::string& Namespace = NamespaceIt.first; + Cbo.BeginObject(); + { + Cbo.AddString("name", Namespace); + Cbo.BeginArray("buckets"); + { + for (const auto& BucketIt : NamespaceIt.second.Buckets) + { + const std::string& Bucket = BucketIt.first; + Cbo.BeginObject(); + { + Cbo.AddString("name", Bucket); + Cbo.BeginArray("values"); + { + for (const auto& ValueIt : BucketIt.second.Values) + { + std::chrono::seconds LastAccessedSeconds = std::chrono::duration_cast<std::chrono::seconds>( + GcClock::TimePointFromTick(ValueIt.second.LastAccess).time_since_epoch()); + Cbo.BeginObject(); + { + Cbo.AddHash("key", ValueIt.first); + if (Details) + { + Cbo.AddInteger("size", ValueIt.second.Size); + if (ValueIt.second.Size > 0 && ValueIt.second.RawSize != 0 && + ValueIt.second.RawSize != ValueIt.second.Size) + { + Cbo.AddInteger("rawsize", ValueIt.second.RawSize); + Cbo.AddHash("rawhash", ValueIt.second.RawHash); + } + Cbo.AddString("contenttype", ToString(ValueIt.second.ContentType)); + Cbo.AddInteger("age", NowSeconds.count() - LastAccessedSeconds.count()); + if (ValueIt.second.Attachments.size() > 0) + { + if (AttachmentDetails) + { + Cbo.BeginArray("attachments"); + { + for (const IoHash& Hash : ValueIt.second.Attachments) + { + Cbo.BeginObject(); + Cbo.AddHash("cid", Hash); + IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); + Cbo.AddInteger("size", gsl::narrow<uint64_t>(Payload.GetSize())); + Cbo.EndObject(); + } + } + Cbo.EndArray(); + } + else + { + Cbo.AddInteger("attachmentcount", + gsl::narrow<uint64_t>(ValueIt.second.Attachments.size())); + size_t AttachmentsSize = 0; + for (const IoHash& Hash : ValueIt.second.Attachments) + { + IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); + AttachmentsSize += Payload.GetSize(); + } + Cbo.AddInteger("attachmentssize", gsl::narrow<uint64_t>(AttachmentsSize)); + } + } + } + } + Cbo.EndObject(); + } + } + Cbo.EndArray(); + } + Cbo.EndObject(); + } + } + Cbo.EndArray(); + } + Cbo.EndObject(); + } + } + Cbo.EndArray(); + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + } +} + +void +HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) +{ + metrics::OperationTiming::Scope $(m_HttpRequests); + + std::string_view Key = Request.RelativeUri(); + if (Key == HttpZCacheRPCPrefix) + { + return HandleRpcRequest(Request); + } + + if (Key == HttpZCacheUtilStartRecording) + { + m_RequestRecorder.reset(); + HttpServerRequest::QueryParams Params = Request.GetQueryParams(); + std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path"))); + m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath); + Request.WriteResponse(HttpResponseCode::OK); + return; + } + if (Key == HttpZCacheUtilStopRecording) + { + m_RequestRecorder.reset(); + Request.WriteResponse(HttpResponseCode::OK); + return; + } + if (Key == HttpZCacheUtilReplayRecording) + { + m_RequestRecorder.reset(); + HttpServerRequest::QueryParams Params = Request.GetQueryParams(); + std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path"))); + uint32_t ThreadCount = std::thread::hardware_concurrency(); + if (auto Param = Params.GetValue("thread_count"); Param.empty() == false) + { + if (auto Value = ParseInt<uint64_t>(Param)) + { + ThreadCount = gsl::narrow<uint32_t>(Value.value()); + } + } + std::unique_ptr<cache::IRpcRequestReplayer> Replayer(cache::MakeDiskRequestReplayer(RecordPath, false)); + ReplayRequestRecorder(*Replayer, ThreadCount < 1 ? 1 : ThreadCount); + Request.WriteResponse(HttpResponseCode::OK); + return; + } + if (Key.starts_with(HttpZCacheDetailsPrefix)) + { + HandleDetailsRequest(Request); + return; + } + + HttpRequestData RequestData; + if (!HttpRequestParseRelativeUri(Key, RequestData)) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL + } + + if (RequestData.ValueContentId.has_value()) + { + ZEN_ASSERT(RequestData.Namespace.has_value()); + ZEN_ASSERT(RequestData.Bucket.has_value()); + ZEN_ASSERT(RequestData.HashKey.has_value()); + CacheRef Ref = {.Namespace = RequestData.Namespace.value(), + .BucketSegment = RequestData.Bucket.value(), + .HashKey = RequestData.HashKey.value(), + .ValueContentId = RequestData.ValueContentId.value()}; + return HandleCacheChunkRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams())); + } + + if (RequestData.HashKey.has_value()) + { + ZEN_ASSERT(RequestData.Namespace.has_value()); + ZEN_ASSERT(RequestData.Bucket.has_value()); + CacheRef Ref = {.Namespace = RequestData.Namespace.value(), + .BucketSegment = RequestData.Bucket.value(), + .HashKey = RequestData.HashKey.value(), + .ValueContentId = IoHash::Zero}; + return HandleCacheRecordRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams())); + } + + if (RequestData.Bucket.has_value()) + { + ZEN_ASSERT(RequestData.Namespace.has_value()); + return HandleCacheBucketRequest(Request, RequestData.Namespace.value(), RequestData.Bucket.value()); + } + + if (RequestData.Namespace.has_value()) + { + return HandleCacheNamespaceRequest(Request, RequestData.Namespace.value()); + } + return HandleCacheRequest(Request); +} + +void +HttpStructuredCacheService::HandleCacheRequest(HttpServerRequest& Request) +{ + switch (Request.RequestVerb()) + { + case HttpVerb::kHead: + case HttpVerb::kGet: + { + ZenCacheStore::Info Info = m_CacheStore.GetInfo(); + + CbObjectWriter ResponseWriter; + + ResponseWriter.BeginObject("Configuration"); + { + ExtendableStringBuilder<128> BasePathString; + BasePathString << Info.Config.BasePath.u8string(); + ResponseWriter.AddString("BasePath"sv, BasePathString.ToView()); + ResponseWriter.AddBool("AllowAutomaticCreationOfNamespaces", Info.Config.AllowAutomaticCreationOfNamespaces); + } + ResponseWriter.EndObject(); + + std::sort(begin(Info.NamespaceNames), end(Info.NamespaceNames), [](std::string_view L, std::string_view R) { + return L.compare(R) < 0; + }); + ResponseWriter.BeginArray("Namespaces"); + for (const std::string& NamespaceName : Info.NamespaceNames) + { + ResponseWriter.AddString(NamespaceName); + } + ResponseWriter.EndArray(); + ResponseWriter.BeginObject("StorageSize"); + { + ResponseWriter.AddInteger("DiskSize", Info.StorageSize.DiskSize); + ResponseWriter.AddInteger("MemorySize", Info.StorageSize.MemorySize); + } + + ResponseWriter.EndObject(); + + ResponseWriter.AddInteger("DiskEntryCount", Info.DiskEntryCount); + ResponseWriter.AddInteger("MemoryEntryCount", Info.MemoryEntryCount); + + return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); + } + break; + } +} + +void +HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view NamespaceName) +{ + switch (Request.RequestVerb()) + { + case HttpVerb::kHead: + case HttpVerb::kGet: + { + std::optional<ZenCacheNamespace::Info> Info = m_CacheStore.GetNamespaceInfo(NamespaceName); + if (!Info.has_value()) + { + return Request.WriteResponse(HttpResponseCode::NotFound); + } + + CbObjectWriter ResponseWriter; + + ResponseWriter.BeginObject("Configuration"); + { + ExtendableStringBuilder<128> BasePathString; + BasePathString << Info->Config.RootDir.u8string(); + ResponseWriter.AddString("RootDir"sv, BasePathString.ToView()); + ResponseWriter.AddInteger("DiskLayerThreshold"sv, Info->Config.DiskLayerThreshold); + } + ResponseWriter.EndObject(); + + std::sort(begin(Info->BucketNames), end(Info->BucketNames), [](std::string_view L, std::string_view R) { + return L.compare(R) < 0; + }); + + ResponseWriter.BeginArray("Buckets"sv); + for (const std::string& BucketName : Info->BucketNames) + { + ResponseWriter.AddString(BucketName); + } + ResponseWriter.EndArray(); + + ResponseWriter.BeginObject("StorageSize"sv); + { + ResponseWriter.AddInteger("DiskSize"sv, Info->DiskLayerInfo.TotalSize); + ResponseWriter.AddInteger("MemorySize"sv, Info->MemoryLayerInfo.TotalSize); + } + ResponseWriter.EndObject(); + + ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount); + ResponseWriter.AddInteger("MemoryEntryCount", Info->MemoryLayerInfo.EntryCount); + + return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); + } + break; + + case HttpVerb::kDelete: + // Drop namespace + { + if (m_CacheStore.DropNamespace(NamespaceName)) + { + return Request.WriteResponse(HttpResponseCode::OK); + } + else + { + return Request.WriteResponse(HttpResponseCode::NotFound); + } + } + break; + + default: + break; + } +} + +void +HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, + std::string_view NamespaceName, + std::string_view BucketName) +{ + switch (Request.RequestVerb()) + { + case HttpVerb::kHead: + case HttpVerb::kGet: + { + std::optional<ZenCacheNamespace::BucketInfo> Info = m_CacheStore.GetBucketInfo(NamespaceName, BucketName); + if (!Info.has_value()) + { + return Request.WriteResponse(HttpResponseCode::NotFound); + } + + CbObjectWriter ResponseWriter; + + ResponseWriter.BeginObject("StorageSize"); + { + ResponseWriter.AddInteger("DiskSize", Info->DiskLayerInfo.TotalSize); + ResponseWriter.AddInteger("MemorySize", Info->MemoryLayerInfo.TotalSize); + } + ResponseWriter.EndObject(); + + ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount); + ResponseWriter.AddInteger("MemoryEntryCount", Info->MemoryLayerInfo.EntryCount); + + return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); + } + break; + + case HttpVerb::kDelete: + // Drop bucket + { + if (m_CacheStore.DropBucket(NamespaceName, BucketName)) + { + return Request.WriteResponse(HttpResponseCode::OK); + } + else + { + return Request.WriteResponse(HttpResponseCode::NotFound); + } + } + break; + + default: + break; + } +} + +void +HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) +{ + switch (Request.RequestVerb()) + { + case HttpVerb::kHead: + case HttpVerb::kGet: + { + HandleGetCacheRecord(Request, Ref, PolicyFromUrl); + } + break; + + case HttpVerb::kPut: + HandlePutCacheRecord(Request, Ref, PolicyFromUrl); + break; + default: + break; + } +} + +void +HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) +{ + const ZenContentType AcceptType = Request.AcceptContentType(); + const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData); + const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord); + + bool Success = false; + ZenCacheValue ClientResultValue; + if (!EnumHasAnyFlags(PolicyFromUrl, CachePolicy::Query)) + { + return Request.WriteResponse(HttpResponseCode::OK); + } + + Stopwatch Timer; + + if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal) && + m_CacheStore.Get(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue)) + { + Success = true; + ZenContentType ContentType = ClientResultValue.Value.GetContentType(); + + if (AcceptType == ZenContentType::kCbPackage) + { + if (ContentType == ZenContentType::kCbObject) + { + CbPackage Package; + uint32_t MissingCount = 0; + + CbObjectView CacheRecord(ClientResultValue.Value.Data()); + CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) { + if (SkipData) + { + if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) + { + MissingCount++; + } + } + else + { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); + Package.AddAttachment(CbAttachment(Compressed, AttachmentHash.AsHash())); + } + else + { + MissingCount++; + } + } + }); + + Success = MissingCount == 0 || PartialRecord; + + if (Success) + { + Package.SetObject(LoadCompactBinaryObject(ClientResultValue.Value)); + + BinaryWriter MemStream; + Package.Save(MemStream); + + ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + ClientResultValue.Value.SetContentType(HttpContentType::kCbPackage); + } + } + else + { + Success = false; + } + } + else if (AcceptType != ClientResultValue.Value.GetContentType() && AcceptType != ZenContentType::kUnknownContentType && + AcceptType != ZenContentType::kBinary) + { + Success = false; + } + } + + if (Success) + { + ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {} '{}' (LOCAL) in {}", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(ClientResultValue.Value.Size()), + ToString(ClientResultValue.Value.GetContentType()), + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + + m_CacheStats.HitCount++; + if (SkipData && AcceptType != ZenContentType::kCbPackage && AcceptType != ZenContentType::kCbObject) + { + return Request.WriteResponse(HttpResponseCode::OK); + } + else + { + // kCbPackage handled SkipData when constructing the ClientResultValue, kcbObject ignores SkipData + return Request.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); + } + } + else if (!EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryRemote)) + { + ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + ToString(AcceptType), + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + m_CacheStats.MissCount++; + return Request.WriteResponse(HttpResponseCode::NotFound); + } + + // Issue upstream query asynchronously in order to keep requests flowing without + // hogging I/O servicing threads with blocking work + + uint64_t LocalElapsedTimeUs = Timer.GetElapsedTimeUs(); + + Request.WriteResponseAsync([this, AcceptType, PolicyFromUrl, Ref, LocalElapsedTimeUs](HttpServerRequest& AsyncRequest) { + Stopwatch Timer; + bool Success = false; + const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord); + const bool QueryLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal); + const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData); + ZenCacheValue ClientResultValue; + + metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming); + + if (GetUpstreamCacheSingleResult UpstreamResult = + m_UpstreamCache.GetCacheRecord(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, AcceptType); + UpstreamResult.Status.Success) + { + Success = true; + + ClientResultValue.Value = UpstreamResult.Value; + ClientResultValue.Value.SetContentType(AcceptType); + + if (AcceptType == ZenContentType::kBinary || AcceptType == ZenContentType::kCbObject) + { + if (AcceptType == ZenContentType::kCbObject) + { + const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All); + if (ValidationResult != CbValidateError::None) + { + Success = false; + ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid compact binary object from upstream", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + ToString(AcceptType)); + } + + // We do not do anything to the returned object for SkipData, only package attachments are cut when skipping data + } + + if (Success && StoreLocal) + { + m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue); + } + } + else if (AcceptType == ZenContentType::kCbPackage) + { + CbPackage Package; + if (Package.TryLoad(ClientResultValue.Value)) + { + CbObject CacheRecord = Package.GetObject(); + AttachmentCount Count; + size_t NumAttachments = Package.GetAttachments().size(); + std::vector<const CbAttachment*> AttachmentsToStoreLocally; + AttachmentsToStoreLocally.reserve(NumAttachments); + + CacheRecord.IterateAttachments( + [this, &Package, &Ref, &AttachmentsToStoreLocally, &Count, QueryLocal, StoreLocal, SkipData](CbFieldView HashView) { + IoHash Hash = HashView.AsHash(); + if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) + { + if (Attachment->IsCompressedBinary()) + { + if (StoreLocal) + { + AttachmentsToStoreLocally.emplace_back(Attachment); + } + Count.Valid++; + } + else + { + ZEN_WARN("Uncompressed value '{}' from upstream cache record '{}/{}'", + Hash, + Ref.BucketSegment, + Ref.HashKey); + Count.Invalid++; + } + } + else if (QueryLocal) + { + if (SkipData) + { + if (m_CidStore.ContainsChunk(Hash)) + { + Count.Valid++; + } + } + else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash)) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); + if (Compressed) + { + Package.AddAttachment(CbAttachment(Compressed, Hash)); + Count.Valid++; + } + else + { + ZEN_WARN("Uncompressed value '{}' stored in local cache '{}/{}'", + Hash, + Ref.BucketSegment, + Ref.HashKey); + Count.Invalid++; + } + } + } + Count.Total++; + }); + + if ((Count.Valid == Count.Total) || PartialRecord) + { + ZenCacheValue CacheValue; + CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); + CacheValue.Value.SetContentType(ZenContentType::kCbObject); + + if (StoreLocal) + { + m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue); + } + + for (const CbAttachment* Attachment : AttachmentsToStoreLocally) + { + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + CidStore::InsertResult InsertResult = + m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); + if (InsertResult.New) + { + Count.New++; + } + } + + BinaryWriter MemStream; + if (SkipData) + { + // Save a package containing only the object. + CbPackage(Package.GetObject()).Save(MemStream); + } + else + { + Package.Save(MemStream); + } + + ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + ClientResultValue.Value.SetContentType(ZenContentType::kCbPackage); + } + else + { + Success = false; + ZEN_WARN("Get - '{}/{}' '{}' FAILED, attachments missing in upstream package", + Ref.BucketSegment, + Ref.HashKey, + ToString(AcceptType)); + } + } + else + { + Success = false; + ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid upstream package", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + ToString(AcceptType)); + } + } + } + + if (Success) + { + ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {} '{}' (UPSTREAM) in {}", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(ClientResultValue.Value.Size()), + ToString(ClientResultValue.Value.GetContentType()), + NiceLatencyNs((LocalElapsedTimeUs + Timer.GetElapsedTimeUs()) * 1000)); + + m_CacheStats.HitCount++; + m_CacheStats.UpstreamHitCount++; + + if (SkipData && AcceptType == ZenContentType::kBinary) + { + AsyncRequest.WriteResponse(HttpResponseCode::OK); + } + else + { + // Other methods modify ClientResultValue to a version that has skipped the data but keeps the Object and optionally + // metadata. + AsyncRequest.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); + } + } + else + { + ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + ToString(AcceptType), + NiceLatencyNs((LocalElapsedTimeUs + Timer.GetElapsedTimeUs()) * 1000)); + m_CacheStats.MissCount++; + AsyncRequest.WriteResponse(HttpResponseCode::NotFound); + } + }); +} + +void +HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) +{ + IoBuffer Body = Request.ReadPayload(); + + if (!Body || Body.Size() == 0) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + + const HttpContentType ContentType = Request.RequestContentType(); + + Body.SetContentType(ContentType); + + Stopwatch Timer; + + if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kCompressedBinary) + { + IoHash RawHash = IoHash::Zero; + uint64_t RawSize = Body.GetSize(); + if (ContentType == HttpContentType::kCompressedBinary) + { + if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize)) + { + return Request.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Payload is not a valid compressed binary"sv); + } + } + else + { + RawHash = IoHash::HashBuffer(SharedBuffer(Body)); + } + m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}); + + if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote)) + { + m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}}); + } + + ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' in {}", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(Body.Size()), + ToString(ContentType), + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + Request.WriteResponse(HttpResponseCode::Created); + } + else if (ContentType == HttpContentType::kCbObject) + { + const CbValidateError ValidationResult = ValidateCompactBinary(MemoryView(Body.GetData(), Body.GetSize()), CbValidateMode::All); + + if (ValidationResult != CbValidateError::None) + { + ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, invalid compact binary", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + ToString(ContentType)); + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv); + } + + Body.SetContentType(ZenContentType::kCbObject); + m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}); + + CbObjectView CacheRecord(Body.Data()); + std::vector<IoHash> ValidAttachments; + int32_t TotalCount = 0; + + CacheRecord.IterateAttachments([this, &TotalCount, &ValidAttachments](CbFieldView AttachmentHash) { + const IoHash Hash = AttachmentHash.AsHash(); + if (m_CidStore.ContainsChunk(Hash)) + { + ValidAttachments.emplace_back(Hash); + } + TotalCount++; + }); + + ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total) in {}", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(Body.Size()), + ToString(ContentType), + TotalCount, + ValidAttachments.size(), + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + + const bool IsPartialRecord = TotalCount != static_cast<int32_t>(ValidAttachments.size()); + + CachePolicy Policy = PolicyFromUrl; + if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) + { + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject, + .Namespace = Ref.Namespace, + .Key = {Ref.BucketSegment, Ref.HashKey}, + .ValueContentIds = std::move(ValidAttachments)}); + } + + Request.WriteResponse(HttpResponseCode::Created); + } + else if (ContentType == HttpContentType::kCbPackage) + { + CbPackage Package; + + if (!Package.TryLoad(Body)) + { + ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, invalid package", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + ToString(ContentType)); + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv); + } + CachePolicy Policy = PolicyFromUrl; + + CbObject CacheRecord = Package.GetObject(); + + AttachmentCount Count; + size_t NumAttachments = Package.GetAttachments().size(); + std::vector<IoHash> ValidAttachments; + std::vector<const CbAttachment*> AttachmentsToStoreLocally; + ValidAttachments.reserve(NumAttachments); + AttachmentsToStoreLocally.reserve(NumAttachments); + + CacheRecord.IterateAttachments([this, &Ref, &Package, &AttachmentsToStoreLocally, &ValidAttachments, &Count](CbFieldView HashView) { + const IoHash Hash = HashView.AsHash(); + if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) + { + if (Attachment->IsCompressedBinary()) + { + AttachmentsToStoreLocally.emplace_back(Attachment); + ValidAttachments.emplace_back(Hash); + Count.Valid++; + } + else + { + ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + ToString(HttpContentType::kCbPackage), + Hash); + Count.Invalid++; + } + } + else if (m_CidStore.ContainsChunk(Hash)) + { + ValidAttachments.emplace_back(Hash); + Count.Valid++; + } + Count.Total++; + }); + + if (Count.Invalid > 0) + { + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv); + } + + ZenCacheValue CacheValue; + CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); + CacheValue.Value.SetContentType(ZenContentType::kCbObject); + m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue); + + for (const CbAttachment* Attachment : AttachmentsToStoreLocally) + { + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); + if (InsertResult.New) + { + Count.New++; + } + } + + ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total) in {}", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(Body.GetSize()), + ToString(ContentType), + Count.New, + Count.Valid, + Count.Total, + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + + const bool IsPartialRecord = Count.Valid != Count.Total; + + if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) + { + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, + .Namespace = Ref.Namespace, + .Key = {Ref.BucketSegment, Ref.HashKey}, + .ValueContentIds = std::move(ValidAttachments)}); + } + + Request.WriteResponse(HttpResponseCode::Created); + } + else + { + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Content-Type invalid"sv); + } +} + +void +HttpStructuredCacheService::HandleCacheChunkRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) +{ + switch (Request.RequestVerb()) + { + case HttpVerb::kHead: + case HttpVerb::kGet: + HandleGetCacheChunk(Request, Ref, PolicyFromUrl); + break; + case HttpVerb::kPut: + HandlePutCacheChunk(Request, Ref, PolicyFromUrl); + break; + default: + break; + } +} + +void +HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) +{ + Stopwatch Timer; + + IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId); + const UpstreamEndpointInfo* Source = nullptr; + CachePolicy Policy = PolicyFromUrl; + { + const bool QueryUpstream = !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote); + + if (QueryUpstream) + { + if (GetUpstreamCacheSingleResult UpstreamResult = + m_UpstreamCache.GetCacheChunk(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); + UpstreamResult.Status.Success) + { + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer::ValidateCompressedHeader(UpstreamResult.Value, RawHash, RawSize)) + { + if (RawHash == Ref.ValueContentId) + { + m_CidStore.AddChunk(UpstreamResult.Value, RawHash); + Source = UpstreamResult.Source; + } + else + { + ZEN_WARN("got missmatching upstream cache value"); + } + } + else + { + ZEN_WARN("got uncompressed upstream cache value"); + } + } + } + } + + if (!Value) + { + ZEN_DEBUG("GETCACHECHUNK MISS - '{}/{}/{}/{}' '{}' in {}", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + Ref.ValueContentId, + ToString(Request.AcceptContentType()), + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + m_CacheStats.MissCount++; + return Request.WriteResponse(HttpResponseCode::NotFound); + } + + ZEN_DEBUG("GETCACHECHUNK HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + Ref.ValueContentId, + NiceBytes(Value.Size()), + ToString(Value.GetContentType()), + Source ? Source->Url : "LOCAL"sv, + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + + m_CacheStats.HitCount++; + if (Source) + { + m_CacheStats.UpstreamHitCount++; + } + + if (EnumHasAllFlags(Policy, CachePolicy::SkipData)) + { + Request.WriteResponse(HttpResponseCode::OK); + } + else + { + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value); + } +} + +void +HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) +{ + // Note: Individual cacherecord values are not propagated upstream until a valid cache record has been stored + ZEN_UNUSED(PolicyFromUrl); + + Stopwatch Timer; + + IoBuffer Body = Request.ReadPayload(); + + if (!Body || Body.Size() == 0) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + + Body.SetContentType(Request.RequestContentType()); + + IoHash RawHash; + uint64_t RawSize; + if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize)) + { + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv); + } + + if (RawHash != Ref.ValueContentId) + { + return Request.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "ValueContentId does not match attachment hash"sv); + } + + CidStore::InsertResult Result = m_CidStore.AddChunk(Body, RawHash); + + ZEN_DEBUG("PUTCACHECHUNK - '{}/{}/{}/{}' {} '{}' ({}) in {}", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + Ref.ValueContentId, + NiceBytes(Body.Size()), + ToString(Body.GetContentType()), + Result.New ? "NEW" : "OLD", + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + + const HttpResponseCode ResponseCode = Result.New ? HttpResponseCode::Created : HttpResponseCode::OK; + + Request.WriteResponse(ResponseCode); +} + +CbPackage +HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType, + IoBuffer&& Body, + uint32_t& OutAcceptMagic, + RpcAcceptOptions& OutAcceptFlags, + int& OutTargetProcessId) +{ + CbPackage Package; + CbObjectView Object; + CbObject ObjectBuffer; + if (ContentType == ZenContentType::kCbObject) + { + ObjectBuffer = LoadCompactBinaryObject(std::move(Body)); + Object = ObjectBuffer; + } + else + { + Package = ParsePackageMessage(Body); + Object = Package.GetObject(); + } + OutAcceptMagic = Object["Accept"sv].AsUInt32(); + OutAcceptFlags = static_cast<RpcAcceptOptions>(Object["AcceptFlags"sv].AsUInt16(0u)); + OutTargetProcessId = Object["Pid"sv].AsInt32(0); + + const std::string_view Method = Object["Method"sv].AsString(); + + if (Method == "PutCacheRecords"sv) + { + return HandleRpcPutCacheRecords(Package); + } + else if (Method == "GetCacheRecords"sv) + { + return HandleRpcGetCacheRecords(Object); + } + else if (Method == "PutCacheValues"sv) + { + return HandleRpcPutCacheValues(Package); + } + else if (Method == "GetCacheValues"sv) + { + return HandleRpcGetCacheValues(Object); + } + else if (Method == "GetCacheChunks"sv) + { + return HandleRpcGetCacheChunks(Object); + } + return CbPackage{}; +} + +void +HttpStructuredCacheService::ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount) +{ + WorkerThreadPool WorkerPool(ThreadCount); + uint64_t RequestCount = Replayer.GetRequestCount(); + Stopwatch Timer; + auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); + Latch JobLatch(RequestCount); + ZEN_INFO("Replaying {} requests", RequestCount); + for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) + { + WorkerPool.ScheduleWork([this, &JobLatch, &Replayer, RequestIndex]() { + IoBuffer Body; + std::pair<ZenContentType, ZenContentType> ContentType = Replayer.GetRequest(RequestIndex, Body); + if (Body) + { + uint32_t AcceptMagic = 0; + RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; + int TargetPid = 0; + CbPackage RpcResult = HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid); + if (AcceptMagic == kCbPkgMagic) + { + FormatFlags Flags = FormatFlags::kDefault; + if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) + { + Flags |= FormatFlags::kAllowLocalReferences; + if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) + { + Flags |= FormatFlags::kDenyPartialLocalReferences; + } + } + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetPid); + ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0); + } + else + { + BinaryWriter MemStream; + RpcResult.Save(MemStream); + IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()); + ZEN_ASSERT(RpcResponseBuffer.Size() > 0); + } + } + JobLatch.CountDown(); + }); + } + while (!JobLatch.Wait(10000)) + { + ZEN_INFO("Replayed {} of {} requests, elapsed {}", + RequestCount - JobLatch.Remaining(), + RequestCount, + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + } +} + +void +HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) +{ + switch (Request.RequestVerb()) + { + case HttpVerb::kPost: + { + const HttpContentType ContentType = Request.RequestContentType(); + const HttpContentType AcceptType = Request.AcceptContentType(); + + if ((ContentType != HttpContentType::kCbObject && ContentType != HttpContentType::kCbPackage) || + AcceptType != HttpContentType::kCbPackage) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + + Request.WriteResponseAsync( + [this, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable { + std::uint64_t RequestIndex = + m_RequestRecorder ? m_RequestRecorder->RecordRequest(ContentType, AcceptType, Body) : ~0ull; + uint32_t AcceptMagic = 0; + RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; + int TargetProcessId = 0; + CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags, TargetProcessId); + if (RpcResult.IsNull()) + { + AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); + return; + } + if (AcceptMagic == kCbPkgMagic) + { + FormatFlags Flags = FormatFlags::kDefault; + if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) + { + Flags |= FormatFlags::kAllowLocalReferences; + if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) + { + Flags |= FormatFlags::kDenyPartialLocalReferences; + } + } + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessId); + if (RequestIndex != ~0ull) + { + ZEN_ASSERT(m_RequestRecorder); + m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer); + } + AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + } + else + { + BinaryWriter MemStream; + RpcResult.Save(MemStream); + + if (RequestIndex != ~0ull) + { + ZEN_ASSERT(m_RequestRecorder); + m_RequestRecorder->RecordResponse(RequestIndex, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + } + AsyncRequest.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + } + }); + } + break; + default: + Request.WriteResponse(HttpResponseCode::BadRequest); + break; + } +} + +CbPackage +HttpStructuredCacheService::HandleRpcPutCacheRecords(const CbPackage& BatchRequest) +{ + ZEN_TRACE_CPU("Z$::RpcPutCacheRecords"); + CbObjectView BatchObject = BatchRequest.GetObject(); + ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv); + + CbObjectView Params = BatchObject["Params"sv].AsObjectView(); + CachePolicy DefaultPolicy; + + std::string_view PolicyText = Params["DefaultPolicy"].AsString(); + std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); + if (!Namespace) + { + return CbPackage{}; + } + DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::vector<bool> Results; + for (CbFieldView RequestField : Params["Requests"sv]) + { + CbObjectView RequestObject = RequestField.AsObjectView(); + CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView(); + CbObjectView KeyView = RecordObject["Key"sv].AsObjectView(); + + CacheKey Key; + if (!GetRpcRequestCacheKey(KeyView, Key)) + { + return CbPackage{}; + } + CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); + PutRequestData PutRequest{*Namespace, std::move(Key), RecordObject, std::move(Policy)}; + + PutResult Result = PutCacheRecord(PutRequest, &BatchRequest); + + if (Result == PutResult::Invalid) + { + return CbPackage{}; + } + Results.push_back(Result == PutResult::Success); + } + if (Results.empty()) + { + return CbPackage{}; + } + + CbObjectWriter ResponseObject; + ResponseObject.BeginArray("Result"sv); + for (bool Value : Results) + { + ResponseObject.AddBool(Value); + } + ResponseObject.EndArray(); + + CbPackage RpcResponse; + RpcResponse.SetObject(ResponseObject.Save()); + return RpcResponse; +} + +HttpStructuredCacheService::PutResult +HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPackage* Package) +{ + CbObjectView Record = Request.RecordObject; + uint64_t RecordObjectSize = Record.GetSize(); + uint64_t TransferredSize = RecordObjectSize; + + AttachmentCount Count; + size_t NumAttachments = Package->GetAttachments().size(); + std::vector<IoHash> ValidAttachments; + std::vector<const CbAttachment*> AttachmentsToStoreLocally; + ValidAttachments.reserve(NumAttachments); + AttachmentsToStoreLocally.reserve(NumAttachments); + + Stopwatch Timer; + + Request.RecordObject.IterateAttachments( + [this, &Request, Package, &AttachmentsToStoreLocally, &ValidAttachments, &Count, &TransferredSize](CbFieldView HashView) { + const IoHash ValueHash = HashView.AsHash(); + if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr) + { + if (Attachment->IsCompressedBinary()) + { + AttachmentsToStoreLocally.emplace_back(Attachment); + ValidAttachments.emplace_back(ValueHash); + Count.Valid++; + } + else + { + ZEN_WARN("PUTCACEHRECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", + Request.Namespace, + Request.Key.Bucket, + Request.Key.Hash, + ToString(HttpContentType::kCbPackage), + ValueHash); + Count.Invalid++; + } + } + else if (m_CidStore.ContainsChunk(ValueHash)) + { + ValidAttachments.emplace_back(ValueHash); + Count.Valid++; + } + Count.Total++; + }); + + if (Count.Invalid > 0) + { + return PutResult::Invalid; + } + + ZenCacheValue CacheValue; + CacheValue.Value = IoBuffer(Record.GetSize()); + Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); + CacheValue.Value.SetContentType(ZenContentType::kCbObject); + m_CacheStore.Put(Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue); + + for (const CbAttachment* Attachment : AttachmentsToStoreLocally) + { + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); + if (InsertResult.New) + { + Count.New++; + } + TransferredSize += Chunk.GetCompressedSize(); + } + + ZEN_DEBUG("PUTCACEHRECORD - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total) in {}", + Request.Namespace, + Request.Key.Bucket, + Request.Key.Hash, + NiceBytes(TransferredSize), + Count.New, + Count.Valid, + Count.Total, + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + + const bool IsPartialRecord = Count.Valid != Count.Total; + + if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord) + { + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, + .Namespace = Request.Namespace, + .Key = Request.Key, + .ValueContentIds = std::move(ValidAttachments)}); + } + return PutResult::Success; +} + +CbPackage +HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) +{ + ZEN_TRACE_CPU("Z$::RpcGetCacheRecords"); + + ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv); + + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + + struct ValueRequestData + { + Oid ValueId; + IoHash ContentId; + CompressedBuffer Payload; + CachePolicy DownstreamPolicy; + bool Exists = false; + bool ReadFromUpstream = false; + }; + struct RecordRequestData + { + CacheKeyRequest Upstream; + CbObjectView RecordObject; + IoBuffer RecordCacheValue; + CacheRecordPolicy DownstreamPolicy; + std::vector<ValueRequestData> Values; + bool Complete = false; + const UpstreamEndpointInfo* Source = nullptr; + uint64_t ElapsedTimeUs; + }; + + std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); + CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); + if (!Namespace) + { + return CbPackage{}; + } + std::vector<RecordRequestData> Requests; + std::vector<size_t> UpstreamIndexes; + CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); + Requests.reserve(RequestsArray.Num()); + + auto ParseValues = [](RecordRequestData& Request) { + CbArrayView ValuesArray = Request.RecordObject["Values"sv].AsArrayView(); + Request.Values.reserve(ValuesArray.Num()); + for (CbFieldView ValueField : ValuesArray) + { + CbObjectView ValueObject = ValueField.AsObjectView(); + Oid ValueId = ValueObject["Id"sv].AsObjectId(); + CbFieldView RawHashField = ValueObject["RawHash"sv]; + IoHash RawHash = RawHashField.AsBinaryAttachment(); + if (ValueId && !RawHashField.HasError()) + { + Request.Values.push_back({ValueId, RawHash}); + Request.Values.back().DownstreamPolicy = Request.DownstreamPolicy.GetValuePolicy(ValueId); + } + } + }; + + for (CbFieldView RequestField : RequestsArray) + { + Stopwatch Timer; + RecordRequestData& Request = Requests.emplace_back(); + CbObjectView RequestObject = RequestField.AsObjectView(); + CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); + + CacheKey& Key = Request.Upstream.Key; + if (!GetRpcRequestCacheKey(KeyObject, Key)) + { + return CbPackage{}; + } + + Request.DownstreamPolicy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); + const CacheRecordPolicy& Policy = Request.DownstreamPolicy; + + ZenCacheValue CacheValue; + bool NeedUpstreamAttachment = false; + bool FoundLocalInvalid = false; + ZenCacheValue RecordCacheValue; + + if (EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryLocal) && + m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, RecordCacheValue)) + { + Request.RecordCacheValue = std::move(RecordCacheValue.Value); + if (Request.RecordCacheValue.GetContentType() != ZenContentType::kCbObject) + { + FoundLocalInvalid = true; + } + else + { + Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData()); + ParseValues(Request); + + Request.Complete = true; + for (ValueRequestData& Value : Request.Values) + { + CachePolicy ValuePolicy = Value.DownstreamPolicy; + if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal)) + { + // A value that is requested without the Query flag (such as None/Disable) counts as existing, because we + // didn't ask for it and thus the record is complete in its absence. + if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { + Value.Exists = true; + } + else + { + NeedUpstreamAttachment = true; + Value.ReadFromUpstream = true; + Request.Complete = false; + } + } + else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) + { + if (m_CidStore.ContainsChunk(Value.ContentId)) + { + Value.Exists = true; + } + else + { + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { + NeedUpstreamAttachment = true; + Value.ReadFromUpstream = true; + } + Request.Complete = false; + } + } + else + { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId)) + { + ZEN_ASSERT(Chunk.GetSize() > 0); + Value.Payload = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); + Value.Exists = true; + } + else + { + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { + NeedUpstreamAttachment = true; + Value.ReadFromUpstream = true; + } + Request.Complete = false; + } + } + } + } + } + if (!Request.Complete) + { + bool NeedUpstreamRecord = + !Request.RecordObject && !FoundLocalInvalid && EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryRemote); + if (NeedUpstreamRecord || NeedUpstreamAttachment) + { + UpstreamIndexes.push_back(Requests.size() - 1); + } + } + Request.ElapsedTimeUs = Timer.GetElapsedTimeUs(); + } + if (Requests.empty()) + { + return CbPackage{}; + } + + if (!UpstreamIndexes.empty()) + { + std::vector<CacheKeyRequest*> UpstreamRequests; + UpstreamRequests.reserve(UpstreamIndexes.size()); + for (size_t Index : UpstreamIndexes) + { + RecordRequestData& Request = Requests[Index]; + UpstreamRequests.push_back(&Request.Upstream); + + if (Request.Values.size()) + { + // We will be returning the local object and know all the value Ids that exist in it + // Convert all their Downstream Values to upstream values, and add SkipData to any ones that we already have. + CachePolicy UpstreamBasePolicy = ConvertToUpstream(Request.DownstreamPolicy.GetBasePolicy()) | CachePolicy::SkipMeta; + CacheRecordPolicyBuilder Builder(UpstreamBasePolicy); + for (ValueRequestData& Value : Request.Values) + { + CachePolicy UpstreamPolicy = ConvertToUpstream(Value.DownstreamPolicy); + UpstreamPolicy |= !Value.ReadFromUpstream ? CachePolicy::SkipData : CachePolicy::None; + Builder.AddValuePolicy(Value.ValueId, UpstreamPolicy); + } + Request.Upstream.Policy = Builder.Build(); + } + else + { + // We don't know which Values exist in the Record; ask the upstrem for all values that the client wants, + // and convert the CacheRecordPolicy to an upstream policy + Request.Upstream.Policy = Request.DownstreamPolicy.ConvertToUpstream(); + } + } + + const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues](CacheRecordGetCompleteParams&& Params) { + if (!Params.Record) + { + return; + } + + RecordRequestData& Request = + *reinterpret_cast<RecordRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(RecordRequestData, Upstream)); + Request.ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0); + const CacheKey& Key = Request.Upstream.Key; + Stopwatch Timer; + auto TimeGuard = MakeGuard([&Timer, &Request]() { Request.ElapsedTimeUs += Timer.GetElapsedTimeUs(); }); + if (!Request.RecordObject) + { + CbObject ObjectBuffer = CbObject::Clone(Params.Record); + Request.RecordCacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); + Request.RecordCacheValue.SetContentType(ZenContentType::kCbObject); + Request.RecordObject = ObjectBuffer; + if (EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal)) + { + m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}); + } + ParseValues(Request); + Request.Source = Params.Source; + } + + Request.Complete = true; + for (ValueRequestData& Value : Request.Values) + { + if (Value.Exists) + { + continue; + } + CachePolicy ValuePolicy = Value.DownstreamPolicy; + if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { + Request.Complete = false; + continue; + } + if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData) || EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) + { + if (const CbAttachment* Attachment = Params.Package.FindAttachment(Value.ContentId)) + { + if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) + { + Request.Source = Params.Source; + Value.Exists = true; + if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) + { + m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); + } + if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) + { + Value.Payload = Compressed; + } + } + else + { + ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}/{}'", + Value.ContentId, + *Namespace, + Key.Bucket, + Key.Hash); + } + } + if (!Value.Exists && !EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) + { + Request.Complete = false; + } + // Request.Complete does not need to be set to false for upstream SkipData attachments. + // In the PartialRecord==false case, the upstream will have failed the entire record if any SkipData attachment + // didn't exist and we will not get here. In the PartialRecord==true case, we do not need to inform the client of + // any missing SkipData attachments. + } + Request.ElapsedTimeUs += Timer.GetElapsedTimeUs(); + } + }; + + m_UpstreamCache.GetCacheRecords(*Namespace, UpstreamRequests, std::move(OnCacheRecordGetComplete)); + } + + CbPackage ResponsePackage; + CbObjectWriter ResponseObject; + + ResponseObject.BeginArray("Result"sv); + for (RecordRequestData& Request : Requests) + { + const CacheKey& Key = Request.Upstream.Key; + if (Request.Complete || + (Request.RecordObject && EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::PartialRecord))) + { + ResponseObject << Request.RecordObject; + for (ValueRequestData& Value : Request.Values) + { + if (!EnumHasAllFlags(Value.DownstreamPolicy, CachePolicy::SkipData) && Value.Payload) + { + ResponsePackage.AddAttachment(CbAttachment(Value.Payload, Value.ContentId)); + } + } + + ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {}{} ({}) in {}", + *Namespace, + Key.Bucket, + Key.Hash, + NiceBytes(Request.RecordCacheValue.Size()), + Request.Complete ? ""sv : " (PARTIAL)"sv, + Request.Source ? Request.Source->Url : "LOCAL"sv, + NiceLatencyNs(Request.ElapsedTimeUs * 1000)); + m_CacheStats.HitCount++; + m_CacheStats.UpstreamHitCount += Request.Source ? 1 : 0; + } + else + { + ResponseObject.AddNull(); + + if (!EnumHasAnyFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::Query)) + { + // If they requested no query, do not record this as a miss + ZEN_DEBUG("GETCACHERECORD DISABLEDQUERY - '{}/{}/{}' in {}", + *Namespace, + Key.Bucket, + Key.Hash, + NiceLatencyNs(Request.ElapsedTimeUs * 1000)); + } + else + { + ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}'{} ({}) in {}", + *Namespace, + Key.Bucket, + Key.Hash, + Request.RecordObject ? ""sv : " (PARTIAL)"sv, + Request.Source ? Request.Source->Url : "LOCAL"sv, + NiceLatencyNs(Request.ElapsedTimeUs * 1000)); + m_CacheStats.MissCount++; + } + } + } + ResponseObject.EndArray(); + ResponsePackage.SetObject(ResponseObject.Save()); + return ResponsePackage; +} + +CbPackage +HttpStructuredCacheService::HandleRpcPutCacheValues(const CbPackage& BatchRequest) +{ + CbObjectView BatchObject = BatchRequest.GetObject(); + CbObjectView Params = BatchObject["Params"sv].AsObjectView(); + + std::string_view PolicyText = Params["DefaultPolicy"].AsString(); + CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); + if (!Namespace) + { + return CbPackage{}; + } + std::vector<bool> Results; + for (CbFieldView RequestField : Params["Requests"sv]) + { + Stopwatch Timer; + + CbObjectView RequestObject = RequestField.AsObjectView(); + CbObjectView KeyView = RequestObject["Key"sv].AsObjectView(); + + CacheKey Key; + if (!GetRpcRequestCacheKey(KeyView, Key)) + { + return CbPackage{}; + } + + PolicyText = RequestObject["Policy"sv].AsString(); + CachePolicy Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; + IoHash RawHash = RequestObject["RawHash"sv].AsBinaryAttachment(); + uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64(); + bool Succeeded = false; + uint64_t TransferredSize = 0; + + if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash)) + { + if (Attachment->IsCompressedBinary()) + { + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) + { + // TODO: Implement upstream puts of CacheValues with StoreLocal == false. + // Currently ProcessCacheRecord requires that the value exist in the local cache to put it upstream. + Policy |= CachePolicy::StoreLocal; + } + + if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) + { + IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); + Value.SetContentType(ZenContentType::kCompressedBinary); + if (RawSize == 0) + { + RawSize = Chunk.DecodeRawSize(); + } + m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}); + TransferredSize = Chunk.GetCompressedSize(); + } + Succeeded = true; + } + else + { + ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash); + return CbPackage{}; + } + } + else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) + { + ZenCacheValue ExistingValue; + if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, ExistingValue) && + IsCompressedBinary(ExistingValue.Value.GetContentType())) + { + Succeeded = true; + } + } + // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put. + // If it doesn't exist locally they should do the heavy put rather than having us fetch it from upstream. + + if (Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) + { + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = Key}); + } + Results.push_back(Succeeded); + ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}' in {}", + *Namespace, + Key.Bucket, + Key.Hash, + NiceBytes(TransferredSize), + Succeeded ? "Added"sv : "Invalid", + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + } + if (Results.empty()) + { + return CbPackage{}; + } + + CbObjectWriter ResponseObject; + ResponseObject.BeginArray("Result"sv); + for (bool Value : Results) + { + ResponseObject.AddBool(Value); + } + ResponseObject.EndArray(); + + CbPackage RpcResponse; + RpcResponse.SetObject(ResponseObject.Save()); + + return RpcResponse; +} + +CbPackage +HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest) +{ + ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv); + + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); + CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); + if (!Namespace) + { + return CbPackage{}; + } + + struct RequestData + { + CacheKey Key; + CachePolicy Policy; + IoHash RawHash = IoHash::Zero; + uint64_t RawSize = 0; + CompressedBuffer Result; + }; + std::vector<RequestData> Requests; + + std::vector<size_t> RemoteRequestIndexes; + + for (CbFieldView RequestField : Params["Requests"sv]) + { + Stopwatch Timer; + + RequestData& Request = Requests.emplace_back(); + CbObjectView RequestObject = RequestField.AsObjectView(); + CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); + + if (!GetRpcRequestCacheKey(KeyObject, Request.Key)) + { + return CbPackage{}; + } + + PolicyText = RequestObject["Policy"sv].AsString(); + Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; + + CacheKey& Key = Request.Key; + CachePolicy Policy = Request.Policy; + + ZenCacheValue CacheValue; + if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) + { + if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType())) + { + Request.RawHash = CacheValue.RawHash; + Request.RawSize = CacheValue.RawSize; + Request.Result = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value)); + } + } + if (Request.Result) + { + ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", + *Namespace, + Key.Bucket, + Key.Hash, + NiceBytes(Request.Result.GetCompressed().GetSize()), + "LOCAL"sv, + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + m_CacheStats.HitCount++; + } + else if (EnumHasAllFlags(Policy, CachePolicy::QueryRemote)) + { + RemoteRequestIndexes.push_back(Requests.size() - 1); + } + else if (!EnumHasAnyFlags(Policy, CachePolicy::Query)) + { + // If they requested no query, do not record this as a miss + ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash); + } + else + { + ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}", + *Namespace, + Key.Bucket, + Key.Hash, + "LOCAL"sv, + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + m_CacheStats.MissCount++; + } + } + + if (!RemoteRequestIndexes.empty()) + { + std::vector<CacheValueRequest> RequestedRecordsData; + std::vector<CacheValueRequest*> CacheValueRequests; + RequestedRecordsData.reserve(RemoteRequestIndexes.size()); + CacheValueRequests.reserve(RemoteRequestIndexes.size()); + for (size_t Index : RemoteRequestIndexes) + { + RequestData& Request = Requests[Index]; + RequestedRecordsData.push_back({.Key = {Request.Key.Bucket, Request.Key.Hash}, .Policy = ConvertToUpstream(Request.Policy)}); + CacheValueRequests.push_back(&RequestedRecordsData.back()); + } + Stopwatch Timer; + m_UpstreamCache.GetCacheValues( + *Namespace, + CacheValueRequests, + [this, Namespace, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer](CacheValueGetCompleteParams&& Params) { + CacheValueRequest& ChunkRequest = Params.Request; + if (Params.RawHash != IoHash::Zero) + { + size_t RequestOffset = std::distance(RequestedRecordsData.data(), &ChunkRequest); + size_t RequestIndex = RemoteRequestIndexes[RequestOffset]; + RequestData& Request = Requests[RequestIndex]; + Request.RawHash = Params.RawHash; + Request.RawSize = Params.RawSize; + const bool HasData = IsCompressedBinary(Params.Value.GetContentType()); + const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData); + const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal); + const bool IsHit = SkipData || HasData; + if (IsHit) + { + if (HasData && !SkipData) + { + Request.Result = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Params.Value)); + } + + if (HasData && StoreData) + { + m_CacheStore.Put(*Namespace, + Request.Key.Bucket, + Request.Key.Hash, + ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}); + } + + ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", + *Namespace, + ChunkRequest.Key.Bucket, + ChunkRequest.Key.Hash, + NiceBytes(Request.Result.GetCompressed().GetSize()), + Params.Source ? Params.Source->Url : "UPSTREAM", + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + m_CacheStats.HitCount++; + m_CacheStats.UpstreamHitCount++; + return; + } + } + ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}", + *Namespace, + ChunkRequest.Key.Bucket, + ChunkRequest.Key.Hash, + Params.Source ? Params.Source->Url : "UPSTREAM", + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + m_CacheStats.MissCount++; + }); + } + + if (Requests.empty()) + { + return CbPackage{}; + } + + CbPackage RpcResponse; + CbObjectWriter ResponseObject; + ResponseObject.BeginArray("Result"sv); + for (const RequestData& Request : Requests) + { + ResponseObject.BeginObject(); + { + const CompressedBuffer& Result = Request.Result; + if (Result) + { + ResponseObject.AddHash("RawHash"sv, Request.RawHash); + if (!EnumHasAllFlags(Request.Policy, CachePolicy::SkipData)) + { + RpcResponse.AddAttachment(CbAttachment(Result, Request.RawHash)); + } + else + { + ResponseObject.AddInteger("RawSize"sv, Request.RawSize); + } + } + else if (Request.RawHash != IoHash::Zero) + { + ResponseObject.AddHash("RawHash"sv, Request.RawHash); + ResponseObject.AddInteger("RawSize"sv, Request.RawSize); + } + } + ResponseObject.EndObject(); + } + ResponseObject.EndArray(); + + RpcResponse.SetObject(ResponseObject.Save()); + return RpcResponse; +} + +namespace cache::detail { + + struct RecordValue + { + Oid ValueId; + IoHash ContentId; + uint64_t RawSize; + }; + struct RecordBody + { + IoBuffer CacheValue; + std::vector<RecordValue> Values; + const UpstreamEndpointInfo* Source = nullptr; + CachePolicy DownstreamPolicy; + bool Exists = false; + bool HasRequest = false; + bool ValuesRead = false; + }; + struct ChunkRequest + { + CacheChunkRequest* Key = nullptr; + RecordBody* Record = nullptr; + CompressedBuffer Value; + const UpstreamEndpointInfo* Source = nullptr; + uint64_t RawSize = 0; + uint64_t RequestedSize = 0; + uint64_t RequestedOffset = 0; + CachePolicy DownstreamPolicy; + bool Exists = false; + bool RawSizeKnown = false; + bool IsRecordRequest = false; + uint64_t ElapsedTimeUs = 0; + }; + +} // namespace cache::detail + +CbPackage +HttpStructuredCacheService::HandleRpcGetCacheChunks(CbObjectView RpcRequest) +{ + using namespace cache::detail; + + std::string Namespace; + std::vector<CacheKeyRequest> RecordKeys; // Data about a Record necessary to identify it to the upstream + std::vector<RecordBody> Records; // Scratch-space data about a Record when fulfilling RecordRequests + std::vector<CacheChunkRequest> RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream + std::vector<ChunkRequest> Requests; // Intermediate and result data about a ChunkRequest + std::vector<ChunkRequest*> RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key + std::vector<ChunkRequest*> ValueRequests; // The ChunkRequests that are requesting a Value Key + std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream + + // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests + if (!ParseGetCacheChunksRequest(Namespace, RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest)) + { + return CbPackage{}; + } + + // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we + // have it locally, and otherwise append a request for the payload to UpstreamChunks + GetLocalCacheRecords(Namespace, RecordKeys, Records, RecordRequests, UpstreamChunks); + + // For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks + GetLocalCacheValues(Namespace, ValueRequests, UpstreamChunks); + + // Call GetCacheChunks on the upstream for any payloads we do not have locally + GetUpstreamCacheChunks(Namespace, UpstreamChunks, RequestKeys, Requests); + + // Send the payload and descriptive data about each chunk to the client + return WriteGetCacheChunksResponse(Namespace, Requests); +} + +bool +HttpStructuredCacheService::ParseGetCacheChunksRequest(std::string& Namespace, + std::vector<CacheKeyRequest>& RecordKeys, + std::vector<cache::detail::RecordBody>& Records, + std::vector<CacheChunkRequest>& RequestKeys, + std::vector<cache::detail::ChunkRequest>& Requests, + std::vector<cache::detail::ChunkRequest*>& RecordRequests, + std::vector<cache::detail::ChunkRequest*>& ValueRequests, + CbObjectView RpcRequest) +{ + using namespace cache::detail; + + ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv); + + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString(); + CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default; + + std::optional<std::string> NamespaceText = GetRpcRequestNamespace(Params); + if (!NamespaceText) + { + ZEN_WARN("GetCacheChunks: Invalid namespace in ChunkRequest."); + return false; + } + Namespace = *NamespaceText; + + CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView(); + size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num()); + + // Note that these reservations allow us to take pointers to the elements while populating them. If the reservation is removed, + // we will need to change the pointers to indexes to handle reallocations. + RecordKeys.reserve(NumRequests); + Records.reserve(NumRequests); + RequestKeys.reserve(NumRequests); + Requests.reserve(NumRequests); + RecordRequests.reserve(NumRequests); + ValueRequests.reserve(NumRequests); + + CacheKeyRequest* PreviousRecordKey = nullptr; + RecordBody* PreviousRecord = nullptr; + + for (CbFieldView RequestView : ChunkRequestsArray) + { + CbObjectView RequestObject = RequestView.AsObjectView(); + CacheChunkRequest& RequestKey = RequestKeys.emplace_back(); + ChunkRequest& Request = Requests.emplace_back(); + CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); + + Request.Key = &RequestKey; + if (!GetRpcRequestCacheKey(KeyObject, Request.Key->Key)) + { + ZEN_WARN("GetCacheChunks: Invalid key in ChunkRequest."); + return false; + } + + RequestKey.ChunkId = RequestObject["ChunkId"sv].AsHash(); + RequestKey.ValueId = RequestObject["ValueId"sv].AsObjectId(); + RequestKey.RawOffset = RequestObject["RawOffset"sv].AsUInt64(); + RequestKey.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX); + Request.RequestedSize = RequestKey.RawSize; + Request.RequestedOffset = RequestKey.RawOffset; + std::string_view PolicyText = RequestObject["Policy"sv].AsString(); + Request.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; + Request.IsRecordRequest = (bool)RequestKey.ValueId; + + if (!Request.IsRecordRequest) + { + ValueRequests.push_back(&Request); + } + else + { + RecordRequests.push_back(&Request); + CacheKeyRequest* RecordKey = nullptr; + RecordBody* Record = nullptr; + + if (!PreviousRecordKey || PreviousRecordKey->Key < RequestKey.Key) + { + RecordKey = &RecordKeys.emplace_back(); + PreviousRecordKey = RecordKey; + Record = &Records.emplace_back(); + PreviousRecord = Record; + RecordKey->Key = RequestKey.Key; + } + else if (RequestKey.Key == PreviousRecordKey->Key) + { + RecordKey = PreviousRecordKey; + Record = PreviousRecord; + } + else + { + ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.", + RequestKey.Key.Bucket, + RequestKey.Key.Hash, + PreviousRecordKey->Key.Bucket, + PreviousRecordKey->Key.Hash); + return false; + } + Request.Record = Record; + if (RequestKey.ChunkId == RequestKey.ChunkId.Zero) + { + Record->DownstreamPolicy = + Record->HasRequest ? Union(Record->DownstreamPolicy, Request.DownstreamPolicy) : Request.DownstreamPolicy; + Record->HasRequest = true; + } + } + } + if (Requests.empty()) + { + return false; + } + return true; +} + +void +HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespace, + std::vector<CacheKeyRequest>& RecordKeys, + std::vector<cache::detail::RecordBody>& Records, + std::vector<cache::detail::ChunkRequest*>& RecordRequests, + std::vector<CacheChunkRequest*>& OutUpstreamChunks) +{ + using namespace cache::detail; + + std::vector<CacheKeyRequest*> UpstreamRecordRequests; + for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex) + { + Stopwatch Timer; + CacheKeyRequest& RecordKey = RecordKeys[RecordIndex]; + RecordBody& Record = Records[RecordIndex]; + if (Record.HasRequest) + { + Record.DownstreamPolicy |= CachePolicy::SkipData | CachePolicy::SkipMeta; + + if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal)) + { + ZenCacheValue CacheValue; + if (m_CacheStore.Get(Namespace, RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue)) + { + Record.Exists = true; + Record.CacheValue = std::move(CacheValue.Value); + } + } + if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryRemote)) + { + RecordKey.Policy = CacheRecordPolicy(ConvertToUpstream(Record.DownstreamPolicy)); + UpstreamRecordRequests.push_back(&RecordKey); + } + RecordRequests[RecordIndex]->ElapsedTimeUs += Timer.GetElapsedTimeUs(); + } + } + + if (!UpstreamRecordRequests.empty()) + { + const auto OnCacheRecordGetComplete = + [this, Namespace, &RecordKeys, &Records, &RecordRequests](CacheRecordGetCompleteParams&& Params) { + if (!Params.Record) + { + return; + } + CacheKeyRequest& RecordKey = Params.Request; + size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey); + RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0); + RecordBody& Record = Records[RecordIndex]; + + const CacheKey& Key = RecordKey.Key; + Record.Exists = true; + CbObject ObjectBuffer = CbObject::Clone(Params.Record); + Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); + Record.CacheValue.SetContentType(ZenContentType::kCbObject); + Record.Source = Params.Source; + + if (EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal)) + { + m_CacheStore.Put(Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}); + } + }; + m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); + } + + std::vector<CacheChunkRequest*> UpstreamPayloadRequests; + for (ChunkRequest* Request : RecordRequests) + { + Stopwatch Timer; + if (Request->Key->ChunkId == IoHash::Zero) + { + // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId) + // is missing, parse the cache record and try to find the raw hash from the ValueId. + RecordBody& Record = *Request->Record; + if (!Record.ValuesRead) + { + Record.ValuesRead = true; + if (Record.CacheValue && Record.CacheValue.GetContentType() == ZenContentType::kCbObject) + { + CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData()); + CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView(); + Record.Values.reserve(ValuesArray.Num()); + for (CbFieldView ValueField : ValuesArray) + { + CbObjectView ValueObject = ValueField.AsObjectView(); + Oid ValueId = ValueObject["Id"sv].AsObjectId(); + CbFieldView RawHashField = ValueObject["RawHash"sv]; + IoHash RawHash = RawHashField.AsBinaryAttachment(); + if (ValueId && !RawHashField.HasError()) + { + Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()}); + } + } + } + } + + for (const RecordValue& Value : Record.Values) + { + if (Value.ValueId == Request->Key->ValueId) + { + Request->Key->ChunkId = Value.ContentId; + Request->RawSize = Value.RawSize; + Request->RawSizeKnown = true; + break; + } + } + } + + // Now load the ContentId from the local ContentIdStore or from the upstream + if (Request->Key->ChunkId != IoHash::Zero) + { + if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) + { + if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown) + { + if (m_CidStore.ContainsChunk(Request->Key->ChunkId)) + { + Request->Exists = true; + } + } + else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId)) + { + if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) + { + Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(Payload)); + if (Request->Value) + { + Request->Exists = true; + Request->RawSizeKnown = false; + } + } + else + { + IoHash _; + if (CompressedBuffer::ValidateCompressedHeader(Payload, _, Request->RawSize)) + { + Request->Exists = true; + Request->RawSizeKnown = true; + } + } + } + } + if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote)) + { + Request->Key->Policy = ConvertToUpstream(Request->DownstreamPolicy); + OutUpstreamChunks.push_back(Request->Key); + } + } + Request->ElapsedTimeUs += Timer.GetElapsedTimeUs(); + } +} + +void +HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespace, + std::vector<cache::detail::ChunkRequest*>& ValueRequests, + std::vector<CacheChunkRequest*>& OutUpstreamChunks) +{ + using namespace cache::detail; + + for (ChunkRequest* Request : ValueRequests) + { + Stopwatch Timer; + if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) + { + ZenCacheValue CacheValue; + if (m_CacheStore.Get(Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue)) + { + if (IsCompressedBinary(CacheValue.Value.GetContentType())) + { + Request->Key->ChunkId = CacheValue.RawHash; + Request->Exists = true; + Request->RawSize = CacheValue.RawSize; + Request->RawSizeKnown = true; + if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) + { + Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value)); + } + } + } + } + if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote)) + { + if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::StoreLocal)) + { + // Convert the Offset,Size request into a request for the entire value; we will need it all to be able to store it locally + Request->Key->RawOffset = 0; + Request->Key->RawSize = UINT64_MAX; + } + OutUpstreamChunks.push_back(Request->Key); + } + Request->ElapsedTimeUs += Timer.GetElapsedTimeUs(); + } +} + +void +HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Namespace, + std::vector<CacheChunkRequest*>& UpstreamChunks, + std::vector<CacheChunkRequest>& RequestKeys, + std::vector<cache::detail::ChunkRequest>& Requests) +{ + using namespace cache::detail; + + if (!UpstreamChunks.empty()) + { + const auto OnCacheChunksGetComplete = [this, Namespace, &RequestKeys, &Requests](CacheChunkGetCompleteParams&& Params) { + if (Params.RawHash == Params.RawHash.Zero) + { + return; + } + + CacheChunkRequest& Key = Params.Request; + size_t RequestIndex = std::distance(RequestKeys.data(), &Key); + ChunkRequest& Request = Requests[RequestIndex]; + Request.ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0); + if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) || + !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Params.Value)); + if (!Compressed) + { + return; + } + + if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal)) + { + if (Request.IsRecordRequest) + { + m_CidStore.AddChunk(Params.Value, Params.RawHash); + } + else + { + m_CacheStore.Put(Namespace, + Key.Key.Bucket, + Key.Key.Hash, + {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash}); + } + } + if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) + { + Request.Value = std::move(Compressed); + } + } + Key.ChunkId = Params.RawHash; + Request.Exists = true; + Request.RawSize = Params.RawSize; + Request.RawSizeKnown = true; + Request.Source = Params.Source; + + m_CacheStats.UpstreamHitCount++; + }; + + m_UpstreamCache.GetCacheChunks(Namespace, UpstreamChunks, std::move(OnCacheChunksGetComplete)); + } +} + +CbPackage +HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests) +{ + using namespace cache::detail; + + CbPackage RpcResponse; + CbObjectWriter Writer; + + Writer.BeginArray("Result"sv); + for (ChunkRequest& Request : Requests) + { + Writer.BeginObject(); + { + if (Request.Exists) + { + Writer.AddHash("RawHash"sv, Request.Key->ChunkId); + if (Request.Value && !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) + { + RpcResponse.AddAttachment(CbAttachment(Request.Value, Request.Key->ChunkId)); + } + else + { + Writer.AddInteger("RawSize"sv, Request.RawSize); + } + + ZEN_DEBUG("GETCACHECHUNKS HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}", + Namespace, + Request.Key->Key.Bucket, + Request.Key->Key.Hash, + Request.Key->ValueId, + NiceBytes(Request.RawSize), + Request.IsRecordRequest ? "Record"sv : "Value"sv, + Request.Source ? Request.Source->Url : "LOCAL"sv, + NiceLatencyNs(Request.ElapsedTimeUs * 1000)); + m_CacheStats.HitCount++; + } + else if (!EnumHasAnyFlags(Request.DownstreamPolicy, CachePolicy::Query)) + { + ZEN_DEBUG("GETCACHECHUNKS DISABLEDQUERY - '{}/{}/{}/{}' in {}", + Namespace, + Request.Key->Key.Bucket, + Request.Key->Key.Hash, + Request.Key->ValueId, + NiceLatencyNs(Request.ElapsedTimeUs * 1000)); + } + else + { + ZEN_DEBUG("GETCACHECHUNKS MISS - '{}/{}/{}/{}' in {}", + Namespace, + Request.Key->Key.Bucket, + Request.Key->Key.Hash, + Request.Key->ValueId, + NiceLatencyNs(Request.ElapsedTimeUs * 1000)); + m_CacheStats.MissCount++; + } + } + Writer.EndObject(); + } + Writer.EndArray(); + + RpcResponse.SetObject(Writer.Save()); + return RpcResponse; +} + +void +HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) +{ + CbObjectWriter Cbo; + + EmitSnapshot("requests", m_HttpRequests, Cbo); + EmitSnapshot("upstream_gets", m_UpstreamGetRequestTiming, Cbo); + + const uint64_t HitCount = m_CacheStats.HitCount; + const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount; + const uint64_t MissCount = m_CacheStats.MissCount; + const uint64_t TotalCount = HitCount + MissCount; + + const CidStoreSize CidSize = m_CidStore.TotalSize(); + const GcStorageSize CacheSize = m_CacheStore.StorageSize(); + + Cbo.BeginObject("cache"); + { + Cbo.BeginObject("size"); + { + Cbo << "disk" << CacheSize.DiskSize; + Cbo << "memory" << CacheSize.MemorySize; + } + Cbo.EndObject(); + + Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); + Cbo << "hits" << HitCount << "misses" << MissCount; + Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0); + Cbo << "upstream_hits" << m_CacheStats.UpstreamHitCount; + Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); + } + Cbo.EndObject(); + + Cbo.BeginObject("upstream"); + { + m_UpstreamCache.GetStatus(Cbo); + } + Cbo.EndObject(); + + Cbo.BeginObject("cid"); + { + Cbo.BeginObject("size"); + { + Cbo << "tiny" << CidSize.TinySize; + Cbo << "small" << CidSize.SmallSize; + Cbo << "large" << CidSize.LargeSize; + Cbo << "total" << CidSize.TotalSize; + } + Cbo.EndObject(); + } + Cbo.EndObject(); + + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +void +HttpStructuredCacheService::HandleStatusRequest(HttpServerRequest& Request) +{ + CbObjectWriter Cbo; + Cbo << "ok" << true; + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +#if ZEN_WITH_TESTS + +TEST_CASE("z$service.parse.relative.Uri") +{ + HttpRequestData RootRequest; + CHECK(HttpRequestParseRelativeUri("", RootRequest)); + CHECK(!RootRequest.Namespace.has_value()); + CHECK(!RootRequest.Bucket.has_value()); + CHECK(!RootRequest.HashKey.has_value()); + CHECK(!RootRequest.ValueContentId.has_value()); + + RootRequest = {}; + CHECK(HttpRequestParseRelativeUri("/", RootRequest)); + CHECK(!RootRequest.Namespace.has_value()); + CHECK(!RootRequest.Bucket.has_value()); + CHECK(!RootRequest.HashKey.has_value()); + CHECK(!RootRequest.ValueContentId.has_value()); + + HttpRequestData LegacyBucketRequestBecomesNamespaceRequest; + CHECK(HttpRequestParseRelativeUri("test", LegacyBucketRequestBecomesNamespaceRequest)); + CHECK(LegacyBucketRequestBecomesNamespaceRequest.Namespace == "test"sv); + CHECK(!LegacyBucketRequestBecomesNamespaceRequest.Bucket.has_value()); + CHECK(!LegacyBucketRequestBecomesNamespaceRequest.HashKey.has_value()); + CHECK(!LegacyBucketRequestBecomesNamespaceRequest.ValueContentId.has_value()); + + HttpRequestData LegacyHashKeyRequest; + CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234", LegacyHashKeyRequest)); + CHECK(LegacyHashKeyRequest.Namespace == ZenCacheStore::DefaultNamespace); + CHECK(LegacyHashKeyRequest.Bucket == "test"sv); + CHECK(LegacyHashKeyRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); + CHECK(!LegacyHashKeyRequest.ValueContentId.has_value()); + + HttpRequestData LegacyValueContentIdRequest; + CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234/56789abcdef12345678956789abcdef123456789", + LegacyValueContentIdRequest)); + CHECK(LegacyValueContentIdRequest.Namespace == ZenCacheStore::DefaultNamespace); + CHECK(LegacyValueContentIdRequest.Bucket == "test"sv); + CHECK(LegacyValueContentIdRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); + CHECK(LegacyValueContentIdRequest.ValueContentId == IoHash::FromHexString("56789abcdef12345678956789abcdef123456789"sv)); + + HttpRequestData V2DefaultNamespaceRequest; + CHECK(HttpRequestParseRelativeUri("ue4.ddc", V2DefaultNamespaceRequest)); + CHECK(V2DefaultNamespaceRequest.Namespace == "ue4.ddc"); + CHECK(!V2DefaultNamespaceRequest.Bucket.has_value()); + CHECK(!V2DefaultNamespaceRequest.HashKey.has_value()); + CHECK(!V2DefaultNamespaceRequest.ValueContentId.has_value()); + + HttpRequestData V2NamespaceRequest; + CHECK(HttpRequestParseRelativeUri("nicenamespace", V2NamespaceRequest)); + CHECK(V2NamespaceRequest.Namespace == "nicenamespace"sv); + CHECK(!V2NamespaceRequest.Bucket.has_value()); + CHECK(!V2NamespaceRequest.HashKey.has_value()); + CHECK(!V2NamespaceRequest.ValueContentId.has_value()); + + HttpRequestData V2BucketRequestWithDefaultNamespace; + CHECK(HttpRequestParseRelativeUri("ue4.ddc/test", V2BucketRequestWithDefaultNamespace)); + CHECK(V2BucketRequestWithDefaultNamespace.Namespace == "ue4.ddc"); + CHECK(V2BucketRequestWithDefaultNamespace.Bucket == "test"sv); + CHECK(!V2BucketRequestWithDefaultNamespace.HashKey.has_value()); + CHECK(!V2BucketRequestWithDefaultNamespace.ValueContentId.has_value()); + + HttpRequestData V2BucketRequestWithNamespace; + CHECK(HttpRequestParseRelativeUri("nicenamespace/test", V2BucketRequestWithNamespace)); + CHECK(V2BucketRequestWithNamespace.Namespace == "nicenamespace"sv); + CHECK(V2BucketRequestWithNamespace.Bucket == "test"sv); + CHECK(!V2BucketRequestWithNamespace.HashKey.has_value()); + CHECK(!V2BucketRequestWithNamespace.ValueContentId.has_value()); + + HttpRequestData V2HashKeyRequest; + CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234", V2HashKeyRequest)); + CHECK(V2HashKeyRequest.Namespace == ZenCacheStore::DefaultNamespace); + CHECK(V2HashKeyRequest.Bucket == "test"); + CHECK(V2HashKeyRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); + CHECK(!V2HashKeyRequest.ValueContentId.has_value()); + + HttpRequestData V2ValueContentIdRequest; + CHECK( + HttpRequestParseRelativeUri("nicenamespace/test/0123456789abcdef12340123456789abcdef1234/56789abcdef12345678956789abcdef123456789", + V2ValueContentIdRequest)); + CHECK(V2ValueContentIdRequest.Namespace == "nicenamespace"sv); + CHECK(V2ValueContentIdRequest.Bucket == "test"sv); + CHECK(V2ValueContentIdRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); + CHECK(V2ValueContentIdRequest.ValueContentId == IoHash::FromHexString("56789abcdef12345678956789abcdef123456789"sv)); + + HttpRequestData Invalid; + CHECK(!HttpRequestParseRelativeUri("bad\2_namespace", Invalid)); + CHECK(!HttpRequestParseRelativeUri("nice/\2\1bucket", Invalid)); + CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789a", Invalid)); + CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/56789abcdef1234", Invalid)); + CHECK(!HttpRequestParseRelativeUri("namespace/bucket/pppppppp89abcdef12340123456789abcdef1234", Invalid)); + CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/56789abcd", Invalid)); + CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/ppppppppdef12345678956789abcdef123456789", + Invalid)); +} + +#endif + +void +z$service_forcelink() +{ +} + +} // namespace zen diff --git a/src/zenserver/cache/structuredcache.h b/src/zenserver/cache/structuredcache.h new file mode 100644 index 000000000..4e7b98ac9 --- /dev/null +++ b/src/zenserver/cache/structuredcache.h @@ -0,0 +1,187 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/stats.h> +#include <zenhttp/httpserver.h> + +#include "monitoring/httpstats.h" +#include "monitoring/httpstatus.h" + +#include <memory> +#include <vector> + +namespace spdlog { +class logger; +} + +namespace zen { + +struct CacheChunkRequest; +struct CacheKeyRequest; +class CidStore; +class CbObjectView; +struct PutRequestData; +class ScrubContext; +class UpstreamCache; +class ZenCacheStore; +enum class CachePolicy : uint32_t; +enum class RpcAcceptOptions : uint16_t; + +namespace cache { + class IRpcRequestReplayer; + class IRpcRequestRecorder; + namespace detail { + struct RecordBody; + struct ChunkRequest; + } // namespace detail +} // namespace cache + +/** + * Structured cache service. Imposes constraints on keys, supports blobs and + * structured values + * + * Keys are structured as: + * + * {BucketId}/{KeyHash} + * + * Where BucketId is a lower-case alphanumeric string, and KeyHash is a 40-character + * hexadecimal sequence. The hash value may be derived in any number of ways, it's + * up to the application to pick an approach. + * + * Values may be structured or unstructured. Structured values are encoded using Unreal + * Engine's compact binary encoding (see CbObject) + * + * Additionally, attachments may be addressed as: + * + * {BucketId}/{KeyHash}/{ValueHash} + * + * Where the two initial components are the same as for the main endpoint + * + * The storage strategy is as follows: + * + * - Structured values are stored in a dedicated backing store per bucket + * - Unstructured values and attachments are stored in the CAS pool + * + */ + +class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider, public IHttpStatusProvider +{ +public: + HttpStructuredCacheService(ZenCacheStore& InCacheStore, + CidStore& InCidStore, + HttpStatsService& StatsService, + HttpStatusService& StatusService, + UpstreamCache& UpstreamCache); + ~HttpStructuredCacheService(); + + virtual const char* BaseUri() const override; + virtual void HandleRequest(HttpServerRequest& Request) override; + + void Flush(); + void Scrub(ScrubContext& Ctx); + +private: + struct CacheRef + { + std::string Namespace; + std::string BucketSegment; + IoHash HashKey; + IoHash ValueContentId; + }; + + struct CacheStats + { + std::atomic_uint64_t HitCount{}; + std::atomic_uint64_t UpstreamHitCount{}; + std::atomic_uint64_t MissCount{}; + }; + enum class PutResult + { + Success, + Fail, + Invalid, + }; + + void HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); + void HandleGetCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); + void HandlePutCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); + void HandleCacheChunkRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); + void HandleGetCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); + void HandlePutCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); + void HandleRpcRequest(HttpServerRequest& Request); + void HandleDetailsRequest(HttpServerRequest& Request); + + CbPackage HandleRpcPutCacheRecords(const CbPackage& BatchRequest); + CbPackage HandleRpcGetCacheRecords(CbObjectView BatchRequest); + CbPackage HandleRpcPutCacheValues(const CbPackage& BatchRequest); + CbPackage HandleRpcGetCacheValues(CbObjectView BatchRequest); + CbPackage HandleRpcGetCacheChunks(CbObjectView BatchRequest); + CbPackage HandleRpcRequest(const ZenContentType ContentType, + IoBuffer&& Body, + uint32_t& OutAcceptMagic, + RpcAcceptOptions& OutAcceptFlags, + int& OutTargetProcessId); + + void HandleCacheRequest(HttpServerRequest& Request); + void HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view Namespace); + void HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Namespace, std::string_view Bucket); + virtual void HandleStatsRequest(HttpServerRequest& Request) override; + virtual void HandleStatusRequest(HttpServerRequest& Request) override; + PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); + + /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */ + bool ParseGetCacheChunksRequest(std::string& Namespace, + std::vector<CacheKeyRequest>& RecordKeys, + std::vector<cache::detail::RecordBody>& Records, + std::vector<CacheChunkRequest>& RequestKeys, + std::vector<cache::detail::ChunkRequest>& Requests, + std::vector<cache::detail::ChunkRequest*>& RecordRequests, + std::vector<cache::detail::ChunkRequest*>& ValueRequests, + CbObjectView RpcRequest); + /** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally. */ + void GetLocalCacheRecords(std::string_view Namespace, + std::vector<CacheKeyRequest>& RecordKeys, + std::vector<cache::detail::RecordBody>& Records, + std::vector<cache::detail::ChunkRequest*>& RecordRequests, + std::vector<CacheChunkRequest*>& OutUpstreamChunks); + /** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */ + void GetLocalCacheValues(std::string_view Namespace, + std::vector<cache::detail::ChunkRequest*>& ValueRequests, + std::vector<CacheChunkRequest*>& OutUpstreamChunks); + /** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */ + void GetUpstreamCacheChunks(std::string_view Namespace, + std::vector<CacheChunkRequest*>& UpstreamChunks, + std::vector<CacheChunkRequest>& RequestKeys, + std::vector<cache::detail::ChunkRequest>& Requests); + /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */ + CbPackage WriteGetCacheChunksResponse(std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests); + + spdlog::logger& Log() { return m_Log; } + spdlog::logger& m_Log; + ZenCacheStore& m_CacheStore; + HttpStatsService& m_StatsService; + HttpStatusService& m_StatusService; + CidStore& m_CidStore; + UpstreamCache& m_UpstreamCache; + uint64_t m_LastScrubTime = 0; + metrics::OperationTiming m_HttpRequests; + metrics::OperationTiming m_UpstreamGetRequestTiming; + CacheStats m_CacheStats; + + void ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount); + + std::unique_ptr<cache::IRpcRequestRecorder> m_RequestRecorder; +}; + +/** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys. + * We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers. */ +inline bool +IsCompressedBinary(ZenContentType Type) +{ + return Type == ZenContentType::kBinary || Type == ZenContentType::kCompressedBinary; +} + +void z$service_forcelink(); + +} // namespace zen diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp new file mode 100644 index 000000000..26e970073 --- /dev/null +++ b/src/zenserver/cache/structuredcachestore.cpp @@ -0,0 +1,3648 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "structuredcachestore.h" + +#include <zencore/except.h> + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/compress.h> +#include <zencore/except.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/string.h> +#include <zencore/thread.h> +#include <zencore/timer.h> +#include <zencore/trace.h> +#include <zenstore/cidstore.h> +#include <zenstore/scrubcontext.h> + +#include <xxhash.h> + +#include <limits> + +#if ZEN_PLATFORM_WINDOWS +# include <zencore/windows.h> +#endif + +ZEN_THIRD_PARTY_INCLUDES_START +#include <fmt/core.h> +#include <gsl/gsl-lite.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +#if ZEN_WITH_TESTS +# include <zencore/testing.h> +# include <zencore/testutils.h> +# include <zencore/workthreadpool.h> +# include <random> +#endif + +////////////////////////////////////////////////////////////////////////// + +namespace zen { + +namespace { + +#pragma pack(push) +#pragma pack(1) + + struct CacheBucketIndexHeader + { + static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; + static constexpr uint32_t Version2 = 2; + static constexpr uint32_t CurrentVersion = Version2; + + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + uint64_t EntryCount = 0; + uint64_t LogPosition = 0; + uint32_t PayloadAlignment = 0; + uint32_t Checksum = 0; + + static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header) + { + return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); + } + }; + + static_assert(sizeof(CacheBucketIndexHeader) == 32); + +#pragma pack(pop) + + const char* IndexExtension = ".uidx"; + const char* LogExtension = ".slog"; + + std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + return BucketDir / (BucketName + IndexExtension); + } + + std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + return BucketDir / (BucketName + ".tmp"); + } + + std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + return BucketDir / (BucketName + LogExtension); + } + + bool ValidateEntry(const DiskIndexEntry& Entry, std::string& OutReason) + { + if (Entry.Key == IoHash::Zero) + { + OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); + return false; + } + if (Entry.Location.GetFlags() & + ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed)) + { + OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString()); + return false; + } + if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) + { + return true; + } + if (Entry.Location.Reserved != 0) + { + OutReason = fmt::format("Invalid reserved field {} for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString()); + return false; + } + uint64_t Size = Entry.Location.Size(); + if (Size == 0) + { + OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); + return false; + } + return true; + } + + bool MoveAndDeleteDirectory(const std::filesystem::path& Dir) + { + int DropIndex = 0; + do + { + if (!std::filesystem::exists(Dir)) + { + return false; + } + + std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex); + std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName; + if (std::filesystem::exists(DroppedBucketPath)) + { + DropIndex++; + continue; + } + + std::error_code Ec; + std::filesystem::rename(Dir, DroppedBucketPath, Ec); + if (!Ec) + { + DeleteDirectories(DroppedBucketPath); + return true; + } + // TODO: Do we need to bail at some point? + zen::Sleep(100); + } while (true); + } + +} // namespace + +namespace fs = std::filesystem; + +static CbObject +LoadCompactBinaryObject(const fs::path& Path) +{ + FileContents Result = ReadFile(Path); + + if (!Result.ErrorCode) + { + IoBuffer Buffer = Result.Flatten(); + if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) + { + return LoadCompactBinaryObject(Buffer); + } + } + + return CbObject(); +} + +static void +SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) +{ + WriteFile(Path, Object.GetBuffer().AsIoBuffer()); +} + +ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, const std::filesystem::path& RootDir) +: GcStorage(Gc) +, GcContributor(Gc) +, m_RootDir(RootDir) +, m_DiskLayer(RootDir) +{ + ZEN_INFO("initializing structured cache at '{}'", RootDir); + CreateDirectories(RootDir); + + m_DiskLayer.DiscoverBuckets(); + +#if ZEN_USE_CACHE_TRACKER + m_AccessTracker.reset(new ZenCacheTracker(RootDir)); +#endif +} + +ZenCacheNamespace::~ZenCacheNamespace() +{ +} + +bool +ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) +{ + ZEN_TRACE_CPU("Z$::Get"); + + bool Ok = m_MemLayer.Get(InBucket, HashKey, OutValue); + +#if ZEN_USE_CACHE_TRACKER + auto _ = MakeGuard([&] { + if (!Ok) + return; + + m_AccessTracker->TrackAccess(InBucket, HashKey); + }); +#endif + + if (Ok) + { + ZEN_ASSERT(OutValue.Value.Size()); + + return true; + } + + Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue); + + if (Ok) + { + ZEN_ASSERT(OutValue.Value.Size()); + + if (OutValue.Value.Size() <= m_DiskLayerSizeThreshold) + { + m_MemLayer.Put(InBucket, HashKey, OutValue); + } + } + + return Ok; +} + +void +ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) +{ + ZEN_TRACE_CPU("Z$::Put"); + + // Store value and index + + ZEN_ASSERT(Value.Value.Size()); + + m_DiskLayer.Put(InBucket, HashKey, Value); + +#if ZEN_USE_REF_TRACKING + if (Value.Value.GetContentType() == ZenContentType::kCbObject) + { + if (ValidateCompactBinary(Value.Value, CbValidateMode::All) == CbValidateError::None) + { + CbObject Object{SharedBuffer(Value.Value)}; + + uint8_t TempBuffer[8 * sizeof(IoHash)]; + std::pmr::monotonic_buffer_resource Linear{TempBuffer, sizeof TempBuffer}; + std::pmr::polymorphic_allocator Allocator{&Linear}; + std::pmr::vector<IoHash> CidReferences{Allocator}; + + Object.IterateAttachments([&](CbFieldView Field) { CidReferences.push_back(Field.AsAttachment()); }); + + m_Gc.OnNewCidReferences(CidReferences); + } + } +#endif + + if (Value.Value.Size() <= m_DiskLayerSizeThreshold) + { + m_MemLayer.Put(InBucket, HashKey, Value); + } +} + +bool +ZenCacheNamespace::DropBucket(std::string_view Bucket) +{ + ZEN_INFO("dropping bucket '{}'", Bucket); + + // TODO: should ensure this is done atomically across all layers + + const bool MemDropped = m_MemLayer.DropBucket(Bucket); + const bool DiskDropped = m_DiskLayer.DropBucket(Bucket); + const bool AnyDropped = MemDropped || DiskDropped; + + ZEN_INFO("bucket '{}' was {}", Bucket, AnyDropped ? "dropped" : "not found"); + + return AnyDropped; +} + +bool +ZenCacheNamespace::Drop() +{ + m_MemLayer.Drop(); + return m_DiskLayer.Drop(); +} + +void +ZenCacheNamespace::Flush() +{ + m_DiskLayer.Flush(); +} + +void +ZenCacheNamespace::Scrub(ScrubContext& Ctx) +{ + if (m_LastScrubTime == Ctx.ScrubTimestamp()) + { + return; + } + + m_LastScrubTime = Ctx.ScrubTimestamp(); + + m_DiskLayer.Scrub(Ctx); + m_MemLayer.Scrub(Ctx); +} + +void +ZenCacheNamespace::GatherReferences(GcContext& GcCtx) +{ + Stopwatch Timer; + const auto Guard = + MakeGuard([&] { ZEN_DEBUG("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + + access_tracking::AccessTimes AccessTimes; + m_MemLayer.GatherAccessTimes(AccessTimes); + + m_DiskLayer.UpdateAccessTimes(AccessTimes); + m_DiskLayer.GatherReferences(GcCtx); +} + +void +ZenCacheNamespace::CollectGarbage(GcContext& GcCtx) +{ + m_MemLayer.Reset(); + m_DiskLayer.CollectGarbage(GcCtx); +} + +GcStorageSize +ZenCacheNamespace::StorageSize() const +{ + return {.DiskSize = m_DiskLayer.TotalSize(), .MemorySize = m_MemLayer.TotalSize()}; +} + +ZenCacheNamespace::Info +ZenCacheNamespace::GetInfo() const +{ + ZenCacheNamespace::Info Info = {.Config = {.RootDir = m_RootDir, .DiskLayerThreshold = m_DiskLayerSizeThreshold}, + .DiskLayerInfo = m_DiskLayer.GetInfo(), + .MemoryLayerInfo = m_MemLayer.GetInfo()}; + std::unordered_set<std::string> BucketNames; + for (const std::string& BucketName : Info.DiskLayerInfo.BucketNames) + { + BucketNames.insert(BucketName); + } + for (const std::string& BucketName : Info.MemoryLayerInfo.BucketNames) + { + BucketNames.insert(BucketName); + } + Info.BucketNames.insert(Info.BucketNames.end(), BucketNames.begin(), BucketNames.end()); + return Info; +} + +std::optional<ZenCacheNamespace::BucketInfo> +ZenCacheNamespace::GetBucketInfo(std::string_view Bucket) const +{ + std::optional<ZenCacheDiskLayer::BucketInfo> DiskBucketInfo = m_DiskLayer.GetBucketInfo(Bucket); + if (!DiskBucketInfo.has_value()) + { + return {}; + } + ZenCacheNamespace::BucketInfo Info = {.DiskLayerInfo = *DiskBucketInfo, + .MemoryLayerInfo = m_MemLayer.GetBucketInfo(Bucket).value_or(ZenCacheMemoryLayer::BucketInfo{})}; + return Info; +} + +CacheValueDetails::NamespaceDetails +ZenCacheNamespace::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const +{ + return m_DiskLayer.GetValueDetails(BucketFilter, ValueFilter); +} + +////////////////////////////////////////////////////////////////////////// + +ZenCacheMemoryLayer::ZenCacheMemoryLayer() +{ +} + +ZenCacheMemoryLayer::~ZenCacheMemoryLayer() +{ +} + +bool +ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) +{ + RwLock::SharedLockScope _(m_Lock); + + auto It = m_Buckets.find(std::string(InBucket)); + + if (It == m_Buckets.end()) + { + return false; + } + + CacheBucket* Bucket = It->second.get(); + + _.ReleaseNow(); + + // There's a race here. Since the lock is released early to allow + // inserts, the bucket delete path could end up deleting the + // underlying data structure + + return Bucket->Get(HashKey, OutValue); +} + +void +ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) +{ + const auto BucketName = std::string(InBucket); + CacheBucket* Bucket = nullptr; + + { + RwLock::SharedLockScope _(m_Lock); + + if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end()) + { + Bucket = It->second.get(); + } + } + + if (Bucket == nullptr) + { + // New bucket + + RwLock::ExclusiveLockScope _(m_Lock); + + if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end()) + { + Bucket = It->second.get(); + } + else + { + auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>()); + Bucket = InsertResult.first->second.get(); + } + } + + // Note that since the underlying IoBuffer is retained, the content type is also + + Bucket->Put(HashKey, Value); +} + +bool +ZenCacheMemoryLayer::DropBucket(std::string_view InBucket) +{ + RwLock::ExclusiveLockScope _(m_Lock); + + auto It = m_Buckets.find(std::string(InBucket)); + + if (It != m_Buckets.end()) + { + CacheBucket& Bucket = *It->second; + m_DroppedBuckets.push_back(std::move(It->second)); + m_Buckets.erase(It); + Bucket.Drop(); + return true; + } + return false; +} + +void +ZenCacheMemoryLayer::Drop() +{ + RwLock::ExclusiveLockScope _(m_Lock); + std::vector<std::unique_ptr<CacheBucket>> Buckets; + Buckets.reserve(m_Buckets.size()); + while (!m_Buckets.empty()) + { + const auto& It = m_Buckets.begin(); + CacheBucket& Bucket = *It->second; + m_DroppedBuckets.push_back(std::move(It->second)); + m_Buckets.erase(It->first); + Bucket.Drop(); + } +} + +void +ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx) +{ + RwLock::SharedLockScope _(m_Lock); + + for (auto& Kv : m_Buckets) + { + Kv.second->Scrub(Ctx); + } +} + +void +ZenCacheMemoryLayer::GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes) +{ + using namespace zen::access_tracking; + + RwLock::SharedLockScope _(m_Lock); + + for (auto& Kv : m_Buckets) + { + std::vector<KeyAccessTime>& Bucket = AccessTimes.Buckets[Kv.first]; + Kv.second->GatherAccessTimes(Bucket); + } +} + +void +ZenCacheMemoryLayer::Reset() +{ + RwLock::ExclusiveLockScope _(m_Lock); + m_Buckets.clear(); +} + +uint64_t +ZenCacheMemoryLayer::TotalSize() const +{ + uint64_t TotalSize{}; + RwLock::SharedLockScope _(m_Lock); + + for (auto& Kv : m_Buckets) + { + TotalSize += Kv.second->TotalSize(); + } + + return TotalSize; +} + +ZenCacheMemoryLayer::Info +ZenCacheMemoryLayer::GetInfo() const +{ + ZenCacheMemoryLayer::Info Info = {.Config = m_Configuration, .TotalSize = TotalSize()}; + + RwLock::SharedLockScope _(m_Lock); + Info.BucketNames.reserve(m_Buckets.size()); + for (auto& Kv : m_Buckets) + { + Info.BucketNames.push_back(Kv.first); + Info.EntryCount += Kv.second->EntryCount(); + } + return Info; +} + +std::optional<ZenCacheMemoryLayer::BucketInfo> +ZenCacheMemoryLayer::GetBucketInfo(std::string_view Bucket) const +{ + RwLock::SharedLockScope _(m_Lock); + + if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) + { + return ZenCacheMemoryLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()}; + } + return {}; +} + +void +ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) +{ + RwLock::SharedLockScope _(m_BucketLock); + + std::vector<IoHash> BadHashes; + + auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) { + if (ContentType == ZenContentType::kCbObject) + { + CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); + return Error == CbValidateError::None; + } + if (ContentType == ZenContentType::kCompressedBinary) + { + IoHash RawHash; + uint64_t RawSize; + if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) + { + return false; + } + if (Hash != RawHash) + { + return false; + } + } + return true; + }; + + for (auto& Kv : m_CacheMap) + { + const BucketPayload& Payload = m_Payloads[Kv.second]; + if (!ValidateEntry(Kv.first, Payload.Payload.GetContentType(), Payload.Payload)) + { + BadHashes.push_back(Kv.first); + } + } + + if (!BadHashes.empty()) + { + Ctx.ReportBadCidChunks(BadHashes); + } +} + +void +ZenCacheMemoryLayer::CacheBucket::GatherAccessTimes(std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes) +{ + RwLock::SharedLockScope _(m_BucketLock); + std::transform(m_CacheMap.begin(), m_CacheMap.end(), std::back_inserter(AccessTimes), [this](const auto& Kv) { + return access_tracking::KeyAccessTime{.Key = Kv.first, .LastAccess = m_AccessTimes[Kv.second]}; + }); +} + +bool +ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) +{ + RwLock::SharedLockScope _(m_BucketLock); + + if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end()) + { + uint32_t EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); + ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size()); + + const BucketPayload& Payload = m_Payloads[EntryIndex]; + OutValue = {.Value = Payload.Payload, .RawSize = Payload.RawSize, .RawHash = Payload.RawHash}; + m_AccessTimes[EntryIndex] = GcClock::TickCount(); + + return true; + } + + return false; +} + +void +ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) +{ + size_t PayloadSize = Value.Value.GetSize(); + { + GcClock::Tick AccessTime = GcClock::TickCount(); + RwLock::ExclusiveLockScope _(m_BucketLock); + if (m_CacheMap.size() == std::numeric_limits<uint32_t>::max()) + { + // No more space in our memory cache! + return; + } + if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end()) + { + uint32_t EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); + + m_TotalSize.fetch_sub(PayloadSize, std::memory_order::relaxed); + BucketPayload& Payload = m_Payloads[EntryIndex]; + Payload.Payload = Value.Value; + Payload.RawHash = Value.RawHash; + Payload.RawSize = gsl::narrow<uint32_t>(Value.RawSize); + m_AccessTimes[EntryIndex] = AccessTime; + } + else + { + uint32_t EntryIndex = gsl::narrow<uint32_t>(m_Payloads.size()); + m_Payloads.emplace_back( + BucketPayload{.Payload = Value.Value, .RawSize = gsl::narrow<uint32_t>(Value.RawSize), .RawHash = Value.RawHash}); + m_AccessTimes.emplace_back(AccessTime); + m_CacheMap.insert_or_assign(HashKey, EntryIndex); + } + ZEN_ASSERT_SLOW(m_Payloads.size() == m_CacheMap.size()); + ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size()); + } + + m_TotalSize.fetch_add(PayloadSize, std::memory_order::relaxed); +} + +void +ZenCacheMemoryLayer::CacheBucket::Drop() +{ + RwLock::ExclusiveLockScope _(m_BucketLock); + m_CacheMap.clear(); + m_AccessTimes.clear(); + m_Payloads.clear(); + m_TotalSize.store(0); +} + +uint64_t +ZenCacheMemoryLayer::CacheBucket::EntryCount() const +{ + RwLock::SharedLockScope _(m_BucketLock); + return static_cast<uint64_t>(m_CacheMap.size()); +} + +////////////////////////////////////////////////////////////////////////// + +ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName) : m_BucketName(std::move(BucketName)), m_BucketId(Oid::Zero) +{ +} + +ZenCacheDiskLayer::CacheBucket::~CacheBucket() +{ +} + +bool +ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate) +{ + using namespace std::literals; + + m_BlocksBasePath = BucketDir / "blocks"; + m_BucketDir = BucketDir; + + CreateDirectories(m_BucketDir); + + std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"}; + + bool IsNew = false; + + CbObject Manifest = LoadCompactBinaryObject(ManifestPath); + + if (Manifest) + { + m_BucketId = Manifest["BucketId"sv].AsObjectId(); + if (m_BucketId == Oid::Zero) + { + return false; + } + } + else if (AllowCreate) + { + m_BucketId.Generate(); + + CbObjectWriter Writer; + Writer << "BucketId"sv << m_BucketId; + Manifest = Writer.Save(); + SaveCompactBinaryObject(ManifestPath, Manifest); + IsNew = true; + } + else + { + return false; + } + + OpenLog(IsNew); + + if (!IsNew) + { + Stopwatch Timer; + const auto _ = + MakeGuard([&] { ZEN_INFO("read store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + + for (CbFieldView Entry : Manifest["Timestamps"sv]) + { + const CbObjectView Obj = Entry.AsObjectView(); + const IoHash Key = Obj["Key"sv].AsHash(); + + if (auto It = m_Index.find(Key); It != m_Index.end()) + { + size_t EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); + m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64(); + } + } + for (CbFieldView Entry : Manifest["RawInfo"sv]) + { + const CbObjectView Obj = Entry.AsObjectView(); + const IoHash Key = Obj["Key"sv].AsHash(); + if (auto It = m_Index.find(Key); It != m_Index.end()) + { + size_t EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); + m_Payloads[EntryIndex].RawHash = Obj["RawHash"sv].AsHash(); + m_Payloads[EntryIndex].RawSize = Obj["RawSize"sv].AsUInt64(); + } + } + } + + return true; +} + +void +ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() +{ + uint64_t LogCount = m_SlogFile.GetLogCount(); + if (m_LogFlushPosition == LogCount) + { + return; + } + + ZEN_DEBUG("write store snapshot for '{}'", m_BucketDir / m_BucketName); + uint64_t EntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", + m_BucketDir / m_BucketName, + EntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + namespace fs = std::filesystem; + + fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); + fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName); + + // Move index away, we keep it if something goes wrong + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(STmpIndexPath); + } + if (fs::is_regular_file(IndexPath)) + { + fs::rename(IndexPath, STmpIndexPath); + } + + try + { + // Write the current state of the location map to a new index state + std::vector<DiskIndexEntry> Entries; + + { + Entries.resize(m_Index.size()); + + uint64_t EntryIndex = 0; + for (auto& Entry : m_Index) + { + DiskIndexEntry& IndexEntry = Entries[EntryIndex++]; + IndexEntry.Key = Entry.first; + IndexEntry.Location = m_Payloads[Entry.second].Location; + } + } + + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); + CacheBucketIndexHeader Header = {.EntryCount = Entries.size(), + .LogPosition = LogCount, + .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)}; + + Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); + + ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0); + ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); + ObjectIndexFile.Flush(); + ObjectIndexFile.Close(); + EntryCount = Entries.size(); + m_LogFlushPosition = LogCount; + } + catch (std::exception& Err) + { + ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what()); + + // Restore any previous snapshot + + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(IndexPath); + fs::rename(STmpIndexPath, IndexPath); + } + } + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(STmpIndexPath); + } +} + +uint64_t +ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion) +{ + if (std::filesystem::is_regular_file(IndexPath)) + { + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t Size = ObjectIndexFile.FileSize(); + if (Size >= sizeof(CacheBucketIndexHeader)) + { + CacheBucketIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); + if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) && + (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0)) + { + switch (Header.Version) + { + case CacheBucketIndexHeader::Version2: + { + uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); + if (Header.EntryCount > ExpectedEntryCount) + { + break; + } + size_t EntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' index containing {} entries in {}", + IndexPath, + EntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + m_PayloadAlignment = Header.PayloadAlignment; + + std::vector<DiskIndexEntry> Entries; + Entries.resize(Header.EntryCount); + ObjectIndexFile.Read(Entries.data(), + Header.EntryCount * sizeof(DiskIndexEntry), + sizeof(CacheBucketIndexHeader)); + + m_Payloads.reserve(Header.EntryCount); + m_AccessTimes.reserve(Header.EntryCount); + m_Index.reserve(Header.EntryCount); + + std::string InvalidEntryReason; + for (const DiskIndexEntry& Entry : Entries) + { + if (!ValidateEntry(Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); + continue; + } + size_t EntryIndex = m_Payloads.size(); + m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location, .RawSize = 0, .RawHash = IoHash::Zero}); + m_AccessTimes.emplace_back(GcClock::TickCount()); + m_Index.insert_or_assign(Entry.Key, EntryIndex); + EntryCount++; + } + OutVersion = CacheBucketIndexHeader::Version2; + return Header.LogPosition; + } + break; + default: + break; + } + } + } + ZEN_WARN("skipping invalid index file '{}'", IndexPath); + } + return 0; +} + +uint64_t +ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount) +{ + if (std::filesystem::is_regular_file(LogPath)) + { + uint64_t LogEntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + TCasLogFile<DiskIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (CasLog.Initialize()) + { + uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; + } + LogEntryCount = EntryCount - SkipEntryCount; + m_Index.reserve(LogEntryCount); + uint64_t InvalidEntryCount = 0; + CasLog.Replay( + [&](const DiskIndexEntry& Record) { + std::string InvalidEntryReason; + if (Record.Location.Flags & DiskLocation::kTombStone) + { + m_Index.erase(Record.Key); + return; + } + if (!ValidateEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; + } + size_t EntryIndex = m_Payloads.size(); + m_Payloads.emplace_back(BucketPayload{.Location = Record.Location, .RawSize = 0u, .RawHash = IoHash::Zero}); + m_AccessTimes.emplace_back(GcClock::TickCount()); + m_Index.insert_or_assign(Record.Key, EntryIndex); + }, + SkipEntryCount); + if (InvalidEntryCount) + { + ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName); + } + return LogEntryCount; + } + } + return 0; +}; + +void +ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) +{ + m_TotalStandaloneSize = 0; + + m_Index.clear(); + m_Payloads.clear(); + m_AccessTimes.clear(); + + std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); + std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); + + if (IsNew) + { + fs::remove(LogPath); + fs::remove(IndexPath); + fs::remove_all(m_BlocksBasePath); + } + + uint64_t LogEntryCount = 0; + { + uint32_t IndexVersion = 0; + m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion); + if (IndexVersion == 0 && std::filesystem::is_regular_file(IndexPath)) + { + ZEN_WARN("removing invalid index file at '{}'", IndexPath); + fs::remove(IndexPath); + } + + if (TCasLogFile<DiskIndexEntry>::IsValid(LogPath)) + { + LogEntryCount = ReadLog(LogPath, m_LogFlushPosition); + } + else + { + ZEN_WARN("removing invalid cas log at '{}'", LogPath); + fs::remove(LogPath); + } + } + + CreateDirectories(m_BucketDir); + + m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); + + std::vector<BlockStoreLocation> KnownLocations; + KnownLocations.reserve(m_Index.size()); + for (const auto& Entry : m_Index) + { + size_t EntryIndex = Entry.second; + const BucketPayload& Payload = m_Payloads[EntryIndex]; + const DiskLocation& Location = Payload.Location; + + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + m_TotalStandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed); + continue; + } + const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment); + KnownLocations.push_back(BlockLocation); + } + + m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations); + if (IsNew || LogEntryCount > 0) + { + MakeIndexSnapshot(); + } + // TODO: should validate integrity of container files here +} + +void +ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const +{ + char HexString[sizeof(HashKey.Hash) * 2]; + ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, HexString); + + Path.Append(m_BucketDir); + Path.AppendSeparator(); + Path.Append(L"blob"); + Path.AppendSeparator(); + Path.AppendAsciiRange(HexString, HexString + 3); + Path.AppendSeparator(); + Path.AppendAsciiRange(HexString + 3, HexString + 5); + Path.AppendSeparator(); + Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString)); +} + +IoBuffer +ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) const +{ + BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment); + + IoBuffer Value = m_BlockStore.TryGetChunk(Location); + if (Value) + { + Value.SetContentType(Loc.GetContentType()); + } + + return Value; +} + +IoBuffer +ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const +{ + ExtendablePathBuilder<256> DataFilePath; + BuildPath(DataFilePath, HashKey); + + RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); + + if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath())) + { + Data.SetContentType(Loc.GetContentType()); + + return Data; + } + + return {}; +} + +bool +ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) +{ + RwLock::SharedLockScope _(m_IndexLock); + auto It = m_Index.find(HashKey); + if (It == m_Index.end()) + { + return false; + } + size_t EntryIndex = It.value(); + const BucketPayload& Payload = m_Payloads[EntryIndex]; + m_AccessTimes[EntryIndex] = GcClock::TickCount(); + DiskLocation Location = Payload.Location; + OutValue.RawSize = Payload.RawSize; + OutValue.RawHash = Payload.RawHash; + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + // We don't need to hold the index lock when we read a standalone file + _.ReleaseNow(); + OutValue.Value = GetStandaloneCacheValue(Location, HashKey); + } + else + { + OutValue.Value = GetInlineCacheValue(Location); + } + _.ReleaseNow(); + + if (!Location.IsFlagSet(DiskLocation::kStructured)) + { + if (OutValue.RawHash == IoHash::Zero && OutValue.RawSize == 0 && OutValue.Value.GetSize() > 0) + { + if (Location.IsFlagSet(DiskLocation::kCompressed)) + { + (void)CompressedBuffer::FromCompressed(SharedBuffer(OutValue.Value), OutValue.RawHash, OutValue.RawSize); + } + else + { + OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); + OutValue.RawSize = OutValue.Value.GetSize(); + } + RwLock::ExclusiveLockScope __(m_IndexLock); + if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end()) + { + BucketPayload& WritePayload = m_Payloads[WriteIt.value()]; + WritePayload.RawHash = OutValue.RawHash; + WritePayload.RawSize = OutValue.RawSize; + + m_LogFlushPosition = 0; // Force resave of index on exit + } + } + } + + return (bool)OutValue.Value; +} + +void +ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) +{ + if (Value.Value.Size() >= m_LargeObjectThreshold) + { + return PutStandaloneCacheValue(HashKey, Value); + } + PutInlineCacheValue(HashKey, Value); +} + +bool +ZenCacheDiskLayer::CacheBucket::Drop() +{ + RwLock::ExclusiveLockScope _(m_IndexLock); + + std::vector<std::unique_ptr<RwLock::ExclusiveLockScope>> ShardLocks; + ShardLocks.reserve(256); + for (RwLock& Lock : m_ShardedLocks) + { + ShardLocks.push_back(std::make_unique<RwLock::ExclusiveLockScope>(Lock)); + } + m_BlockStore.Close(); + m_SlogFile.Close(); + + bool Deleted = MoveAndDeleteDirectory(m_BucketDir); + + m_Index.clear(); + m_Payloads.clear(); + m_AccessTimes.clear(); + return Deleted; +} + +void +ZenCacheDiskLayer::CacheBucket::Flush() +{ + m_BlockStore.Flush(); + + RwLock::SharedLockScope _(m_IndexLock); + m_SlogFile.Flush(); + MakeIndexSnapshot(); + SaveManifest(); +} + +void +ZenCacheDiskLayer::CacheBucket::SaveManifest() +{ + using namespace std::literals; + + CbObjectWriter Writer; + Writer << "BucketId"sv << m_BucketId; + + if (!m_Index.empty()) + { + Writer.BeginArray("Timestamps"sv); + for (auto& Kv : m_Index) + { + const IoHash& Key = Kv.first; + GcClock::Tick AccessTime = m_AccessTimes[Kv.second]; + + Writer.BeginObject(); + Writer << "Key"sv << Key; + Writer << "LastAccess"sv << AccessTime; + Writer.EndObject(); + } + Writer.EndArray(); + + Writer.BeginArray("RawInfo"sv); + { + for (auto& Kv : m_Index) + { + const IoHash& Key = Kv.first; + const BucketPayload& Payload = m_Payloads[Kv.second]; + if (Payload.RawHash != IoHash::Zero) + { + Writer.BeginObject(); + Writer << "Key"sv << Key; + Writer << "RawHash"sv << Payload.RawHash; + Writer << "RawSize"sv << Payload.RawSize; + Writer.EndObject(); + } + } + } + Writer.EndArray(); + } + + SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Writer.Save()); +} + +void +ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) +{ + std::vector<IoHash> BadKeys; + uint64_t ChunkCount{0}, ChunkBytes{0}; + std::vector<BlockStoreLocation> ChunkLocations; + std::vector<IoHash> ChunkIndexToChunkHash; + + auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) { + if (ContentType == ZenContentType::kCbObject) + { + CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); + return Error == CbValidateError::None; + } + if (ContentType == ZenContentType::kCompressedBinary) + { + IoHash RawHash; + uint64_t RawSize; + if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) + { + return false; + } + if (RawHash != Hash) + { + return false; + } + } + return true; + }; + + RwLock::SharedLockScope _(m_IndexLock); + + const size_t BlockChunkInitialCount = m_Index.size() / 4; + ChunkLocations.reserve(BlockChunkInitialCount); + ChunkIndexToChunkHash.reserve(BlockChunkInitialCount); + + for (auto& Kv : m_Index) + { + const IoHash& HashKey = Kv.first; + const BucketPayload& Payload = m_Payloads[Kv.second]; + const DiskLocation& Loc = Payload.Location; + + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + ++ChunkCount; + ChunkBytes += Loc.Size(); + if (Loc.GetContentType() == ZenContentType::kBinary) + { + ExtendablePathBuilder<256> DataFilePath; + BuildPath(DataFilePath, HashKey); + + RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); + + std::error_code Ec; + uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec); + if (Ec) + { + BadKeys.push_back(HashKey); + } + if (size != Loc.Size()) + { + BadKeys.push_back(HashKey); + } + continue; + } + IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey); + if (!Buffer) + { + BadKeys.push_back(HashKey); + continue; + } + if (!ValidateEntry(HashKey, Loc.GetContentType(), Buffer)) + { + BadKeys.push_back(HashKey); + continue; + } + } + else + { + ChunkLocations.emplace_back(Loc.GetBlockLocation(m_PayloadAlignment)); + ChunkIndexToChunkHash.push_back(HashKey); + continue; + } + } + + const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + ++ChunkCount; + ChunkBytes += Size; + const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; + if (!Data) + { + // ChunkLocation out of range of stored blocks + BadKeys.push_back(Hash); + return; + } + IoBuffer Buffer(IoBuffer::Wrap, Data, Size); + if (!Buffer) + { + BadKeys.push_back(Hash); + return; + } + const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; + ZenContentType ContentType = Payload.Location.GetContentType(); + if (!ValidateEntry(Hash, ContentType, Buffer)) + { + BadKeys.push_back(Hash); + return; + } + }; + + const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + ++ChunkCount; + ChunkBytes += Size; + const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; + // TODO: Add API to verify compressed buffer and possible structure data without having to memorymap the whole file + IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); + if (!Buffer) + { + BadKeys.push_back(Hash); + return; + } + const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; + ZenContentType ContentType = Payload.Location.GetContentType(); + if (!ValidateEntry(Hash, ContentType, Buffer)) + { + BadKeys.push_back(Hash); + return; + } + }; + + m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); + + _.ReleaseNow(); + + Ctx.ReportScrubbed(ChunkCount, ChunkBytes); + + if (!BadKeys.empty()) + { + ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir / m_BucketName); + + if (Ctx.RunRecovery()) + { + // Deal with bad chunks by removing them from our lookup map + + std::vector<DiskIndexEntry> LogEntries; + LogEntries.reserve(BadKeys.size()); + + { + RwLock::ExclusiveLockScope __(m_IndexLock); + for (const IoHash& BadKey : BadKeys) + { + // Log a tombstone and delete the in-memory index for the bad entry + const auto It = m_Index.find(BadKey); + const BucketPayload& Payload = m_Payloads[It->second]; + DiskLocation Location = Payload.Location; + Location.Flags |= DiskLocation::kTombStone; + LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location}); + m_Index.erase(BadKey); + } + } + for (const DiskIndexEntry& Entry : LogEntries) + { + if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + ExtendablePathBuilder<256> Path; + BuildPath(Path, Entry.Key); + fs::path FilePath = Path.ToPath(); + RwLock::ExclusiveLockScope ValueLock(LockForHash(Entry.Key)); + if (fs::is_regular_file(FilePath)) + { + ZEN_DEBUG("deleting bad standalone cache file '{}'", Path.ToUtf8()); + std::error_code Ec; + fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file... + } + m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); + } + } + m_SlogFile.Append(LogEntries); + + // Clean up m_AccessTimes and m_Payloads vectors + { + std::vector<BucketPayload> Payloads; + std::vector<AccessTime> AccessTimes; + IndexMap Index; + + { + RwLock::ExclusiveLockScope __(m_IndexLock); + size_t EntryCount = m_Index.size(); + Payloads.reserve(EntryCount); + AccessTimes.reserve(EntryCount); + Index.reserve(EntryCount); + for (auto It : m_Index) + { + size_t EntryIndex = Payloads.size(); + Payloads.push_back(m_Payloads[EntryIndex]); + AccessTimes.push_back(m_AccessTimes[EntryIndex]); + Index.insert({It.first, EntryIndex}); + } + m_Index.swap(Index); + m_Payloads.swap(Payloads); + m_AccessTimes.swap(AccessTimes); + } + } + } + } + + // Let whomever it concerns know about the bad chunks. This could + // be used to invalidate higher level data structures more efficiently + // than a full validation pass might be able to do + Ctx.ReportBadCidChunks(BadKeys); + + ZEN_INFO("cache bucket scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes)); +} + +void +ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) +{ + ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::GatherReferences"); + + uint64_t WriteBlockTimeUs = 0; + uint64_t WriteBlockLongestTimeUs = 0; + uint64_t ReadBlockTimeUs = 0; + uint64_t ReadBlockLongestTimeUs = 0; + + Stopwatch TotalTimer; + const auto _ = MakeGuard([&] { + ZEN_DEBUG("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})", + m_BucketDir / m_BucketName, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + NiceLatencyNs(WriteBlockTimeUs), + NiceLatencyNs(WriteBlockLongestTimeUs), + NiceLatencyNs(ReadBlockTimeUs), + NiceLatencyNs(ReadBlockLongestTimeUs)); + }); + + const GcClock::TimePoint ExpireTime = GcCtx.ExpireTime(); + + const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); + + IndexMap Index; + std::vector<AccessTime> AccessTimes; + std::vector<BucketPayload> Payloads; + { + RwLock::SharedLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + Index = m_Index; + AccessTimes = m_AccessTimes; + Payloads = m_Payloads; + } + + std::vector<IoHash> ExpiredKeys; + ExpiredKeys.reserve(1024); + + std::vector<IoHash> Cids; + Cids.reserve(1024); + + for (const auto& Entry : Index) + { + const IoHash& Key = Entry.first; + GcClock::Tick AccessTime = AccessTimes[Entry.second]; + if (AccessTime < ExpireTicks) + { + ExpiredKeys.push_back(Key); + continue; + } + + const DiskLocation& Loc = Payloads[Entry.second].Location; + + if (Loc.IsFlagSet(DiskLocation::kStructured)) + { + if (Cids.size() > 1024) + { + GcCtx.AddRetainedCids(Cids); + Cids.clear(); + } + + IoBuffer Buffer; + { + RwLock::SharedLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + // We don't need to hold the index lock when we read a standalone file + __.ReleaseNow(); + if (Buffer = GetStandaloneCacheValue(Loc, Key); !Buffer) + { + continue; + } + } + else if (Buffer = GetInlineCacheValue(Loc); !Buffer) + { + continue; + } + } + + ZEN_ASSERT(Buffer); + ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject); + CbObject Obj(SharedBuffer{Buffer}); + Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); + } + } + + GcCtx.AddRetainedCids(Cids); + GcCtx.SetExpiredCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys)); +} + +void +ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) +{ + ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::CollectGarbage"); + + ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir / m_BucketName); + + Stopwatch TotalTimer; + uint64_t WriteBlockTimeUs = 0; + uint64_t WriteBlockLongestTimeUs = 0; + uint64_t ReadBlockTimeUs = 0; + uint64_t ReadBlockLongestTimeUs = 0; + uint64_t TotalChunkCount = 0; + uint64_t DeletedSize = 0; + uint64_t OldTotalSize = TotalSize(); + + std::unordered_set<IoHash> DeletedChunks; + uint64_t MovedCount = 0; + + const auto _ = MakeGuard([&] { + ZEN_DEBUG( + "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved " + "{} " + "of {} " + "entires ({}).", + m_BucketDir / m_BucketName, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + NiceLatencyNs(WriteBlockTimeUs), + NiceLatencyNs(WriteBlockLongestTimeUs), + NiceLatencyNs(ReadBlockTimeUs), + NiceLatencyNs(ReadBlockLongestTimeUs), + NiceBytes(DeletedSize), + DeletedChunks.size(), + MovedCount, + TotalChunkCount, + NiceBytes(OldTotalSize)); + RwLock::SharedLockScope _(m_IndexLock); + SaveManifest(); + }); + + m_SlogFile.Flush(); + + std::span<const IoHash> ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketDir.string()); + std::vector<IoHash> DeleteCacheKeys; + DeleteCacheKeys.reserve(ExpiredCacheKeys.size()); + GcCtx.FilterCids(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) { + if (Keep) + { + return; + } + DeleteCacheKeys.push_back(ChunkHash); + }); + if (DeleteCacheKeys.empty()) + { + ZEN_DEBUG("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir / m_BucketName); + return; + } + + auto __ = MakeGuard([&]() { + if (!DeletedChunks.empty()) + { + // Clean up m_AccessTimes and m_Payloads vectors + std::vector<BucketPayload> Payloads; + std::vector<AccessTime> AccessTimes; + IndexMap Index; + + { + RwLock::ExclusiveLockScope _(m_IndexLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + size_t EntryCount = m_Index.size(); + Payloads.reserve(EntryCount); + AccessTimes.reserve(EntryCount); + Index.reserve(EntryCount); + for (auto It : m_Index) + { + size_t EntryIndex = Payloads.size(); + Payloads.push_back(m_Payloads[EntryIndex]); + AccessTimes.push_back(m_AccessTimes[EntryIndex]); + Index.insert({It.first, EntryIndex}); + } + m_Index.swap(Index); + m_Payloads.swap(Payloads); + m_AccessTimes.swap(AccessTimes); + } + GcCtx.AddDeletedCids(std::vector<IoHash>(DeletedChunks.begin(), DeletedChunks.end())); + } + }); + + std::vector<DiskIndexEntry> ExpiredStandaloneEntries; + IndexMap Index; + BlockStore::ReclaimSnapshotState BlockStoreState; + { + RwLock::SharedLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ____ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + if (m_Index.empty()) + { + ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName); + return; + } + BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); + + SaveManifest(); + Index = m_Index; + + for (const IoHash& Key : DeleteCacheKeys) + { + if (auto It = Index.find(Key); It != Index.end()) + { + const BucketPayload& Payload = m_Payloads[It->second]; + DiskIndexEntry Entry = {.Key = It->first, .Location = Payload.Location}; + if (Entry.Location.Flags & DiskLocation::kStandaloneFile) + { + Entry.Location.Flags |= DiskLocation::kTombStone; + ExpiredStandaloneEntries.push_back(Entry); + } + } + } + if (GcCtx.IsDeletionMode()) + { + for (const auto& Entry : ExpiredStandaloneEntries) + { + m_Index.erase(Entry.Key); + m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); + DeletedChunks.insert(Entry.Key); + } + m_SlogFile.Append(ExpiredStandaloneEntries); + } + } + + if (GcCtx.IsDeletionMode()) + { + std::error_code Ec; + ExtendablePathBuilder<256> Path; + + for (const auto& Entry : ExpiredStandaloneEntries) + { + const IoHash& Key = Entry.Key; + const DiskLocation& Loc = Entry.Location; + + Path.Reset(); + BuildPath(Path, Key); + fs::path FilePath = Path.ToPath(); + + { + RwLock::SharedLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ____ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + if (m_Index.contains(Key)) + { + // Someone added it back, let the file on disk be + ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8()); + continue; + } + __.ReleaseNow(); + + RwLock::ExclusiveLockScope ValueLock(LockForHash(Key)); + if (fs::is_regular_file(FilePath)) + { + ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); + fs::remove(FilePath, Ec); + } + } + + if (Ec) + { + ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message()); + Ec.clear(); + DiskLocation RestoreLocation = Loc; + RestoreLocation.Flags &= ~DiskLocation::kTombStone; + + RwLock::ExclusiveLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + if (m_Index.contains(Key)) + { + continue; + } + m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation}); + size_t EntryIndex = m_Payloads.size(); + m_Payloads.emplace_back(BucketPayload{.Location = RestoreLocation}); + m_AccessTimes.emplace_back(GcClock::TickCount()); + m_Index.insert({Key, EntryIndex}); + m_TotalStandaloneSize.fetch_add(RestoreLocation.Size(), std::memory_order::relaxed); + DeletedChunks.erase(Key); + continue; + } + DeletedSize += Entry.Location.Size(); + } + } + + TotalChunkCount = Index.size(); + + std::vector<IoHash> TotalChunkHashes; + TotalChunkHashes.reserve(TotalChunkCount); + for (const auto& Entry : Index) + { + const DiskLocation& Location = m_Payloads[Entry.second].Location; + + if (Location.Flags & DiskLocation::kStandaloneFile) + { + continue; + } + TotalChunkHashes.push_back(Entry.first); + } + + if (TotalChunkHashes.empty()) + { + return; + } + TotalChunkCount = TotalChunkHashes.size(); + + std::vector<BlockStoreLocation> ChunkLocations; + BlockStore::ChunkIndexArray KeepChunkIndexes; + std::vector<IoHash> ChunkIndexToChunkHash; + ChunkLocations.reserve(TotalChunkCount); + ChunkLocations.reserve(TotalChunkCount); + ChunkIndexToChunkHash.reserve(TotalChunkCount); + + GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { + auto KeyIt = Index.find(ChunkHash); + const DiskLocation& DiskLocation = m_Payloads[KeyIt->second].Location; + BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment); + size_t ChunkIndex = ChunkLocations.size(); + ChunkLocations.push_back(Location); + ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; + if (Keep) + { + KeepChunkIndexes.push_back(ChunkIndex); + } + }); + + size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size(); + + const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); + if (!PerformDelete) + { + m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); + uint64_t CurrentTotalSize = TotalSize(); + ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} {}", + m_BucketDir / m_BucketName, + DeleteCount, + TotalChunkCount, + NiceBytes(CurrentTotalSize)); + return; + } + + m_BlockStore.ReclaimSpace( + BlockStoreState, + ChunkLocations, + KeepChunkIndexes, + m_PayloadAlignment, + false, + [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { + std::vector<DiskIndexEntry> LogEntries; + LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); + for (const auto& Entry : MovedChunks) + { + size_t ChunkIndex = Entry.first; + const BlockStoreLocation& NewLocation = Entry.second; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const BucketPayload& OldPayload = m_Payloads[Index[ChunkHash]]; + const DiskLocation& OldDiskLocation = OldPayload.Location; + LogEntries.push_back( + {.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, OldDiskLocation.GetFlags())}); + } + for (const size_t ChunkIndex : RemovedChunks) + { + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const BucketPayload& OldPayload = m_Payloads[Index[ChunkHash]]; + const DiskLocation& OldDiskLocation = OldPayload.Location; + LogEntries.push_back({.Key = ChunkHash, + .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_PayloadAlignment), + m_PayloadAlignment, + OldDiskLocation.GetFlags() | DiskLocation::kTombStone)}); + DeletedChunks.insert(ChunkHash); + } + + m_SlogFile.Append(LogEntries); + m_SlogFile.Flush(); + { + RwLock::ExclusiveLockScope __(m_IndexLock); + Stopwatch Timer; + const auto ____ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + for (const DiskIndexEntry& Entry : LogEntries) + { + if (Entry.Location.GetFlags() & DiskLocation::kTombStone) + { + m_Index.erase(Entry.Key); + continue; + } + m_Payloads[m_Index[Entry.Key]].Location = Entry.Location; + } + } + }, + [&]() { return GcCtx.CollectSmallObjects(); }); +} + +void +ZenCacheDiskLayer::CacheBucket::UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes) +{ + using namespace access_tracking; + + for (const KeyAccessTime& KeyTime : AccessTimes) + { + if (auto It = m_Index.find(KeyTime.Key); It != m_Index.end()) + { + size_t EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); + m_AccessTimes[EntryIndex] = KeyTime.LastAccess; + } + } +} + +uint64_t +ZenCacheDiskLayer::CacheBucket::EntryCount() const +{ + RwLock::SharedLockScope _(m_IndexLock); + return static_cast<uint64_t>(m_Index.size()); +} + +CacheValueDetails::ValueDetails +ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, size_t Index) const +{ + std::vector<IoHash> Attachments; + const BucketPayload& Payload = m_Payloads[Index]; + if (Payload.Location.IsFlagSet(DiskLocation::kStructured)) + { + IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile) ? GetStandaloneCacheValue(Payload.Location, Key) + : GetInlineCacheValue(Payload.Location); + CbObject Obj(SharedBuffer{Value}); + Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); }); + } + return CacheValueDetails::ValueDetails{.Size = Payload.Location.Size(), + .RawSize = Payload.RawSize, + .RawHash = Payload.RawHash, + .LastAccess = m_AccessTimes[Index], + .Attachments = std::move(Attachments), + .ContentType = Payload.Location.GetContentType()}; +} + +CacheValueDetails::BucketDetails +ZenCacheDiskLayer::CacheBucket::GetValueDetails(const std::string_view ValueFilter) const +{ + CacheValueDetails::BucketDetails Details; + RwLock::SharedLockScope _(m_IndexLock); + if (ValueFilter.empty()) + { + Details.Values.reserve(m_Index.size()); + for (const auto& It : m_Index) + { + Details.Values.insert_or_assign(It.first, GetValueDetails(It.first, It.second)); + } + } + else + { + IoHash Key = IoHash::FromHexString(ValueFilter); + if (auto It = m_Index.find(Key); It != m_Index.end()) + { + Details.Values.insert_or_assign(It->first, GetValueDetails(It->first, It->second)); + } + } + return Details; +} + +void +ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) +{ + RwLock::SharedLockScope _(m_Lock); + + for (auto& Kv : m_Buckets) + { + CacheBucket& Bucket = *Kv.second; + Bucket.CollectGarbage(GcCtx); + } +} + +void +ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes) +{ + RwLock::SharedLockScope _(m_Lock); + + for (const auto& Kv : AccessTimes.Buckets) + { + if (auto It = m_Buckets.find(Kv.first); It != m_Buckets.end()) + { + CacheBucket& Bucket = *It->second; + Bucket.UpdateAccessTimes(Kv.second); + } + } +} + +void +ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) +{ + uint64_t NewFileSize = Value.Value.Size(); + + TemporaryFile DataFile; + + std::error_code Ec; + DataFile.CreateTemporary(m_BucketDir.c_str(), Ec); + if (Ec) + { + throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir)); + } + + bool CleanUpTempFile = false; + auto __ = MakeGuard([&] { + if (CleanUpTempFile) + { + std::error_code Ec; + std::filesystem::remove(DataFile.GetPath(), Ec); + if (Ec) + { + ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'", + DataFile.GetPath(), + m_BucketDir, + Ec.message()); + } + } + }); + + DataFile.WriteAll(Value.Value, Ec); + if (Ec) + { + throw std::system_error(Ec, + fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'", + NiceBytes(NewFileSize), + DataFile.GetPath().string(), + m_BucketDir)); + } + + ExtendablePathBuilder<256> DataFilePath; + BuildPath(DataFilePath, HashKey); + std::filesystem::path FsPath{DataFilePath.ToPath()}; + + RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); + + // We do a speculative remove of the file instead of probing with a exists call and check the error code instead + std::filesystem::remove(FsPath, Ec); + if (Ec) + { + if (Ec.value() != ENOENT) + { + ZEN_WARN("Failed to remove file '{}' for put in '{}', reason: '{}', retrying.", FsPath, m_BucketDir, Ec.message()); + Sleep(100); + Ec.clear(); + std::filesystem::remove(FsPath, Ec); + if (Ec && Ec.value() != ENOENT) + { + throw std::system_error(Ec, fmt::format("Failed to remove file '{}' for put in '{}'", FsPath, m_BucketDir)); + } + } + } + + DataFile.MoveTemporaryIntoPlace(FsPath, Ec); + if (Ec) + { + CreateDirectories(FsPath.parent_path()); + Ec.clear(); + + // Try again + DataFile.MoveTemporaryIntoPlace(FsPath, Ec); + if (Ec) + { + ZEN_WARN("Failed to finalize file '{}', moving from '{}' for put in '{}', reason: '{}', retrying.", + FsPath, + DataFile.GetPath(), + m_BucketDir, + Ec.message()); + Sleep(100); + Ec.clear(); + DataFile.MoveTemporaryIntoPlace(FsPath, Ec); + if (Ec) + { + throw std::system_error( + Ec, + fmt::format("Failed to finalize file '{}', moving from '{}' for put in '{}'", FsPath, DataFile.GetPath(), m_BucketDir)); + } + } + } + + // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file + // will be disabled as the file handle has already been closed + CleanUpTempFile = false; + + uint8_t EntryFlags = DiskLocation::kStandaloneFile; + + if (Value.Value.GetContentType() == ZenContentType::kCbObject) + { + EntryFlags |= DiskLocation::kStructured; + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + EntryFlags |= DiskLocation::kCompressed; + } + + DiskLocation Loc(NewFileSize, EntryFlags); + + RwLock::ExclusiveLockScope _(m_IndexLock); + if (auto It = m_Index.find(HashKey); It == m_Index.end()) + { + // Previously unknown object + size_t EntryIndex = m_Payloads.size(); + m_Payloads.emplace_back(BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); + m_AccessTimes.emplace_back(GcClock::TickCount()); + m_Index.insert_or_assign(HashKey, EntryIndex); + } + else + { + // TODO: should check if write is idempotent and bail out if it is? + size_t EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); + m_Payloads[EntryIndex] = BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash}; + m_AccessTimes.emplace_back(GcClock::TickCount()); + m_TotalStandaloneSize.fetch_sub(Loc.Size(), std::memory_order::relaxed); + } + + m_SlogFile.Append({.Key = HashKey, .Location = Loc}); + m_TotalStandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed); +} + +void +ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) +{ + uint8_t EntryFlags = 0; + + if (Value.Value.GetContentType() == ZenContentType::kCbObject) + { + EntryFlags |= DiskLocation::kStructured; + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + EntryFlags |= DiskLocation::kCompressed; + } + + m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) { + DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); + m_SlogFile.Append({.Key = HashKey, .Location = Location}); + + RwLock::ExclusiveLockScope _(m_IndexLock); + if (auto It = m_Index.find(HashKey); It != m_Index.end()) + { + // TODO: should check if write is idempotent and bail out if it is? + // this would requiring comparing contents on disk unless we add a + // content hash to the index entry + size_t EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); + m_Payloads[EntryIndex] = (BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); + m_AccessTimes[EntryIndex] = GcClock::TickCount(); + } + else + { + size_t EntryIndex = m_Payloads.size(); + m_Payloads.emplace_back(BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); + m_AccessTimes.emplace_back(GcClock::TickCount()); + m_Index.insert_or_assign(HashKey, EntryIndex); + } + }); +} + +////////////////////////////////////////////////////////////////////////// + +ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir) : m_RootDir(RootDir) +{ +} + +ZenCacheDiskLayer::~ZenCacheDiskLayer() = default; + +bool +ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) +{ + const auto BucketName = std::string(InBucket); + CacheBucket* Bucket = nullptr; + + { + RwLock::SharedLockScope _(m_Lock); + + auto It = m_Buckets.find(BucketName); + + if (It != m_Buckets.end()) + { + Bucket = It->second.get(); + } + } + + if (Bucket == nullptr) + { + // Bucket needs to be opened/created + + RwLock::ExclusiveLockScope _(m_Lock); + + if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) + { + Bucket = It->second.get(); + } + else + { + auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName)); + Bucket = InsertResult.first->second.get(); + + std::filesystem::path BucketPath = m_RootDir; + BucketPath /= BucketName; + + if (!Bucket->OpenOrCreate(BucketPath)) + { + m_Buckets.erase(InsertResult.first); + return false; + } + } + } + + ZEN_ASSERT(Bucket != nullptr); + return Bucket->Get(HashKey, OutValue); +} + +void +ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) +{ + const auto BucketName = std::string(InBucket); + CacheBucket* Bucket = nullptr; + + { + RwLock::SharedLockScope _(m_Lock); + + auto It = m_Buckets.find(BucketName); + + if (It != m_Buckets.end()) + { + Bucket = It->second.get(); + } + } + + if (Bucket == nullptr) + { + // New bucket needs to be created + + RwLock::ExclusiveLockScope _(m_Lock); + + if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) + { + Bucket = It->second.get(); + } + else + { + auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName)); + Bucket = InsertResult.first->second.get(); + + std::filesystem::path BucketPath = m_RootDir; + BucketPath /= BucketName; + + try + { + if (!Bucket->OpenOrCreate(BucketPath)) + { + ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); + m_Buckets.erase(InsertResult.first); + return; + } + } + catch (const std::exception& Err) + { + ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); + return; + } + } + } + + ZEN_ASSERT(Bucket != nullptr); + + Bucket->Put(HashKey, Value); +} + +void +ZenCacheDiskLayer::DiscoverBuckets() +{ + DirectoryContent DirContent; + GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent); + + // Initialize buckets + + RwLock::ExclusiveLockScope _(m_Lock); + + for (const std::filesystem::path& BucketPath : DirContent.Directories) + { + const std::string BucketName = PathToUtf8(BucketPath.stem()); + // New bucket needs to be created + if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) + { + continue; + } + + auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName)); + CacheBucket& Bucket = *InsertResult.first->second; + + try + { + if (!Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false)) + { + ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); + + m_Buckets.erase(InsertResult.first); + continue; + } + } + catch (const std::exception& Err) + { + ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); + return; + } + ZEN_INFO("Discovered bucket '{}'", BucketName); + } +} + +bool +ZenCacheDiskLayer::DropBucket(std::string_view InBucket) +{ + RwLock::ExclusiveLockScope _(m_Lock); + + auto It = m_Buckets.find(std::string(InBucket)); + + if (It != m_Buckets.end()) + { + CacheBucket& Bucket = *It->second; + m_DroppedBuckets.push_back(std::move(It->second)); + m_Buckets.erase(It); + + return Bucket.Drop(); + } + + // Make sure we remove the folder even if we don't know about the bucket + std::filesystem::path BucketPath = m_RootDir; + BucketPath /= std::string(InBucket); + return MoveAndDeleteDirectory(BucketPath); +} + +bool +ZenCacheDiskLayer::Drop() +{ + RwLock::ExclusiveLockScope _(m_Lock); + + std::vector<std::unique_ptr<CacheBucket>> Buckets; + Buckets.reserve(m_Buckets.size()); + while (!m_Buckets.empty()) + { + const auto& It = m_Buckets.begin(); + CacheBucket& Bucket = *It->second; + m_DroppedBuckets.push_back(std::move(It->second)); + m_Buckets.erase(It->first); + if (!Bucket.Drop()) + { + return false; + } + } + return MoveAndDeleteDirectory(m_RootDir); +} + +void +ZenCacheDiskLayer::Flush() +{ + std::vector<CacheBucket*> Buckets; + + { + RwLock::SharedLockScope _(m_Lock); + Buckets.reserve(m_Buckets.size()); + for (auto& Kv : m_Buckets) + { + CacheBucket* Bucket = Kv.second.get(); + Buckets.push_back(Bucket); + } + } + + for (auto& Bucket : Buckets) + { + Bucket->Flush(); + } +} + +void +ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) +{ + RwLock::SharedLockScope _(m_Lock); + + for (auto& Kv : m_Buckets) + { + CacheBucket& Bucket = *Kv.second; + Bucket.Scrub(Ctx); + } +} + +void +ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx) +{ + RwLock::SharedLockScope _(m_Lock); + + for (auto& Kv : m_Buckets) + { + CacheBucket& Bucket = *Kv.second; + Bucket.GatherReferences(GcCtx); + } +} + +uint64_t +ZenCacheDiskLayer::TotalSize() const +{ + uint64_t TotalSize{}; + RwLock::SharedLockScope _(m_Lock); + + for (auto& Kv : m_Buckets) + { + TotalSize += Kv.second->TotalSize(); + } + + return TotalSize; +} + +ZenCacheDiskLayer::Info +ZenCacheDiskLayer::GetInfo() const +{ + ZenCacheDiskLayer::Info Info = {.Config = {.RootDir = m_RootDir}, .TotalSize = TotalSize()}; + + RwLock::SharedLockScope _(m_Lock); + Info.BucketNames.reserve(m_Buckets.size()); + for (auto& Kv : m_Buckets) + { + Info.BucketNames.push_back(Kv.first); + Info.EntryCount += Kv.second->EntryCount(); + } + return Info; +} + +std::optional<ZenCacheDiskLayer::BucketInfo> +ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const +{ + RwLock::SharedLockScope _(m_Lock); + + if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) + { + return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()}; + } + return {}; +} + +CacheValueDetails::NamespaceDetails +ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const +{ + RwLock::SharedLockScope _(m_Lock); + CacheValueDetails::NamespaceDetails Details; + if (BucketFilter.empty()) + { + Details.Buckets.reserve(BucketFilter.empty() ? m_Buckets.size() : 1); + for (auto& Kv : m_Buckets) + { + Details.Buckets[Kv.first] = Kv.second->GetValueDetails(ValueFilter); + } + } + else if (auto It = m_Buckets.find(std::string(BucketFilter)); It != m_Buckets.end()) + { + Details.Buckets[It->first] = It->second->GetValueDetails(ValueFilter); + } + return Details; +} + +//////////////////////////// ZenCacheStore + +static constexpr std::string_view UE4DDCNamespaceName = "ue4.ddc"; + +ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration) : m_Gc(Gc), m_Configuration(Configuration) +{ + CreateDirectories(m_Configuration.BasePath); + + DirectoryContent DirContent; + GetDirectoryContent(m_Configuration.BasePath, DirectoryContent::IncludeDirsFlag, DirContent); + + std::vector<std::string> Namespaces; + for (const std::filesystem::path& DirPath : DirContent.Directories) + { + std::string DirName = PathToUtf8(DirPath.filename()); + if (DirName.starts_with(NamespaceDiskPrefix)) + { + Namespaces.push_back(DirName.substr(NamespaceDiskPrefix.length())); + continue; + } + } + + ZEN_INFO("Found {} namespaces in '{}'", Namespaces.size(), m_Configuration.BasePath); + + if (std::find(Namespaces.begin(), Namespaces.end(), UE4DDCNamespaceName) == Namespaces.end()) + { + // default (unspecified) and ue4-ddc namespace points to the same namespace instance + + std::filesystem::path DefaultNamespaceFolder = + m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName); + CreateDirectories(DefaultNamespaceFolder); + Namespaces.push_back(std::string(UE4DDCNamespaceName)); + } + + for (const std::string& NamespaceName : Namespaces) + { + m_Namespaces[NamespaceName] = + std::make_unique<ZenCacheNamespace>(Gc, m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName)); + } +} + +ZenCacheStore::~ZenCacheStore() +{ + m_Namespaces.clear(); +} + +bool +ZenCacheStore::Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue) +{ + if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) + { + return Store->Get(Bucket, HashKey, OutValue); + } + ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Get, bucket '{}', key '{}'", Namespace, Bucket, HashKey.ToHexString()); + + return false; +} + +void +ZenCacheStore::Put(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value) +{ + if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) + { + return Store->Put(Bucket, HashKey, Value); + } + ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put, bucket '{}', key '{}'", Namespace, Bucket, HashKey.ToHexString()); +} + +bool +ZenCacheStore::DropBucket(std::string_view Namespace, std::string_view Bucket) +{ + if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) + { + return Store->DropBucket(Bucket); + } + ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropBucket, bucket '{}'", Namespace, Bucket); + return false; +} + +bool +ZenCacheStore::DropNamespace(std::string_view InNamespace) +{ + RwLock::SharedLockScope _(m_NamespacesLock); + if (auto It = m_Namespaces.find(std::string(InNamespace)); It != m_Namespaces.end()) + { + ZenCacheNamespace& Namespace = *It->second; + m_DroppedNamespaces.push_back(std::move(It->second)); + m_Namespaces.erase(It); + return Namespace.Drop(); + } + ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropNamespace", InNamespace); + return false; +} + +void +ZenCacheStore::Flush() +{ + IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Flush(); }); +} + +void +ZenCacheStore::Scrub(ScrubContext& Ctx) +{ + IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Scrub(Ctx); }); +} + +CacheValueDetails +ZenCacheStore::GetValueDetails(const std::string_view NamespaceFilter, + const std::string_view BucketFilter, + const std::string_view ValueFilter) const +{ + CacheValueDetails Details; + if (NamespaceFilter.empty()) + { + IterateNamespaces([&](std::string_view Namespace, ZenCacheNamespace& Store) { + Details.Namespaces[std::string(Namespace)] = Store.GetValueDetails(BucketFilter, ValueFilter); + }); + } + else if (const ZenCacheNamespace* Store = FindNamespace(NamespaceFilter); Store != nullptr) + { + Details.Namespaces[std::string(NamespaceFilter)] = Store->GetValueDetails(BucketFilter, ValueFilter); + } + return Details; +} + +ZenCacheNamespace* +ZenCacheStore::GetNamespace(std::string_view Namespace) +{ + RwLock::SharedLockScope _(m_NamespacesLock); + if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end()) + { + return It->second.get(); + } + if (Namespace == DefaultNamespace) + { + if (auto It = m_Namespaces.find(std::string(UE4DDCNamespaceName)); It != m_Namespaces.end()) + { + return It->second.get(); + } + } + _.ReleaseNow(); + + if (!m_Configuration.AllowAutomaticCreationOfNamespaces) + { + return nullptr; + } + + RwLock::ExclusiveLockScope __(m_NamespacesLock); + if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end()) + { + return It->second.get(); + } + + auto NewNamespace = m_Namespaces.insert_or_assign( + std::string(Namespace), + std::make_unique<ZenCacheNamespace>(m_Gc, m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, Namespace))); + return NewNamespace.first->second.get(); +} + +const ZenCacheNamespace* +ZenCacheStore::FindNamespace(std::string_view Namespace) const +{ + RwLock::SharedLockScope _(m_NamespacesLock); + if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end()) + { + return It->second.get(); + } + if (Namespace == DefaultNamespace) + { + if (auto It = m_Namespaces.find(std::string(UE4DDCNamespaceName)); It != m_Namespaces.end()) + { + return It->second.get(); + } + } + return nullptr; +} + +void +ZenCacheStore::IterateNamespaces(const std::function<void(std::string_view Namespace, ZenCacheNamespace& Store)>& Callback) const +{ + std::vector<std::pair<std::string, ZenCacheNamespace&>> Namespaces; + { + RwLock::SharedLockScope _(m_NamespacesLock); + Namespaces.reserve(m_Namespaces.size()); + for (const auto& Entry : m_Namespaces) + { + if (Entry.first == DefaultNamespace) + { + continue; + } + Namespaces.push_back({Entry.first, *Entry.second}); + } + } + for (auto& Entry : Namespaces) + { + Callback(Entry.first, Entry.second); + } +} + +GcStorageSize +ZenCacheStore::StorageSize() const +{ + GcStorageSize Size; + IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { + GcStorageSize StoreSize = Store.StorageSize(); + Size.MemorySize += StoreSize.MemorySize; + Size.DiskSize += StoreSize.DiskSize; + }); + return Size; +} + +ZenCacheStore::Info +ZenCacheStore::GetInfo() const +{ + ZenCacheStore::Info Info = {.Config = m_Configuration, .StorageSize = StorageSize()}; + + IterateNamespaces([&Info](std::string_view NamespaceName, ZenCacheNamespace& Namespace) { + Info.NamespaceNames.push_back(std::string(NamespaceName)); + ZenCacheNamespace::Info NamespaceInfo = Namespace.GetInfo(); + Info.DiskEntryCount += NamespaceInfo.DiskLayerInfo.EntryCount; + Info.MemoryEntryCount += NamespaceInfo.MemoryLayerInfo.EntryCount; + }); + + return Info; +} + +std::optional<ZenCacheNamespace::Info> +ZenCacheStore::GetNamespaceInfo(std::string_view NamespaceName) +{ + if (const ZenCacheNamespace* Namespace = FindNamespace(NamespaceName); Namespace) + { + return Namespace->GetInfo(); + } + return {}; +} + +std::optional<ZenCacheNamespace::BucketInfo> +ZenCacheStore::GetBucketInfo(std::string_view NamespaceName, std::string_view BucketName) +{ + if (const ZenCacheNamespace* Namespace = FindNamespace(NamespaceName); Namespace) + { + return Namespace->GetBucketInfo(BucketName); + } + return {}; +} + +////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS + +using namespace std::literals; + +namespace testutils { + IoHash CreateKey(size_t KeyValue) { return IoHash::HashBuffer(&KeyValue, sizeof(size_t)); } + + IoBuffer CreateBinaryCacheValue(uint64_t Size) + { + static std::random_device rd; + static std::mt19937 g(rd()); + + std::vector<uint8_t> Values; + Values.resize(Size); + for (size_t Idx = 0; Idx < Size; ++Idx) + { + Values[Idx] = static_cast<uint8_t>(Idx); + } + std::shuffle(Values.begin(), Values.end(), g); + + IoBuffer Buf(IoBuffer::Clone, Values.data(), Values.size()); + Buf.SetContentType(ZenContentType::kBinary); + return Buf; + }; + +} // namespace testutils + +TEST_CASE("z$.store") +{ + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + + const int kIterationCount = 100; + + for (int i = 0; i < kIterationCount; ++i) + { + const IoHash Key = IoHash::HashBuffer(&i, sizeof i); + + CbObjectWriter Cbo; + Cbo << "hey" << i; + CbObject Obj = Cbo.Save(); + + ZenCacheValue Value; + Value.Value = Obj.GetBuffer().AsIoBuffer(); + Value.Value.SetContentType(ZenContentType::kCbObject); + + Zcs.Put("test_bucket"sv, Key, Value); + } + + for (int i = 0; i < kIterationCount; ++i) + { + const IoHash Key = IoHash::HashBuffer(&i, sizeof i); + + ZenCacheValue Value; + Zcs.Get("test_bucket"sv, Key, /* out */ Value); + + REQUIRE(Value.Value); + CHECK(Value.Value.GetContentType() == ZenContentType::kCbObject); + CHECK_EQ(ValidateCompactBinary(Value.Value, CbValidateMode::All), CbValidateError::None); + CbObject Obj = LoadCompactBinaryObject(Value.Value); + CHECK_EQ(Obj["hey"].AsInt32(), i); + } +} + +TEST_CASE("z$.size") +{ + const auto CreateCacheValue = [](size_t Size) -> CbObject { + std::vector<uint8_t> Buf; + Buf.resize(Size); + + CbObjectWriter Writer; + Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); + return Writer.Save(); + }; + + SUBCASE("mem/disklayer") + { + const size_t Count = 16; + ScopedTemporaryDirectory TempDir; + + GcStorageSize CacheSize; + + { + GcManager Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + + CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() - 256); + + IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); + Buffer.SetContentType(ZenContentType::kCbObject); + + for (size_t Key = 0; Key < Count; ++Key) + { + const size_t Bucket = Key % 4; + Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), ZenCacheValue{.Value = Buffer}); + } + + CacheSize = Zcs.StorageSize(); + CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize); + CHECK_LE(CacheValue.GetSize() * Count, CacheSize.MemorySize); + } + + { + GcManager Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + + const GcStorageSize SerializedSize = Zcs.StorageSize(); + CHECK_EQ(SerializedSize.MemorySize, 0); + CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize); + + for (size_t Bucket = 0; Bucket < 4; ++Bucket) + { + Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket)); + } + CHECK_EQ(0, Zcs.StorageSize().DiskSize); + } + } + + SUBCASE("disklayer") + { + const size_t Count = 16; + ScopedTemporaryDirectory TempDir; + + GcStorageSize CacheSize; + + { + GcManager Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + + CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() + 64); + + IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); + Buffer.SetContentType(ZenContentType::kCbObject); + + for (size_t Key = 0; Key < Count; ++Key) + { + const size_t Bucket = Key % 4; + Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}); + } + + CacheSize = Zcs.StorageSize(); + CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize); + CHECK_EQ(0, CacheSize.MemorySize); + } + + { + GcManager Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + + const GcStorageSize SerializedSize = Zcs.StorageSize(); + CHECK_EQ(SerializedSize.MemorySize, 0); + CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize); + + for (size_t Bucket = 0; Bucket < 4; ++Bucket) + { + Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket)); + } + CHECK_EQ(0, Zcs.StorageSize().DiskSize); + } + } +} + +TEST_CASE("z$.gc") +{ + using namespace testutils; + + SUBCASE("gather references does NOT add references for expired cache entries") + { + ScopedTemporaryDirectory TempDir; + std::vector<IoHash> Cids{CreateKey(1), CreateKey(2), CreateKey(3)}; + + const auto CollectAndFilter = [](GcManager& Gc, + GcClock::TimePoint Time, + GcClock::Duration MaxDuration, + std::span<const IoHash> Cids, + std::vector<IoHash>& OutKeep) { + GcContext GcCtx(Time - MaxDuration); + Gc.CollectGarbage(GcCtx); + OutKeep.clear(); + GcCtx.FilterCids(Cids, [&OutKeep](const IoHash& Hash) { OutKeep.push_back(Hash); }); + }; + + { + GcManager Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + const auto Bucket = "teardrinker"sv; + + // Create a cache record + const IoHash Key = CreateKey(42); + CbObjectWriter Record; + Record << "Key"sv + << "SomeRecord"sv; + + for (size_t Idx = 0; auto& Cid : Cids) + { + Record.AddBinaryAttachment(fmt::format("attachment-{}", Idx++), Cid); + } + + IoBuffer Buffer = Record.Save().GetBuffer().AsIoBuffer(); + Buffer.SetContentType(ZenContentType::kCbObject); + + Zcs.Put(Bucket, Key, {.Value = Buffer}); + + std::vector<IoHash> Keep; + + // Collect garbage with 1 hour max cache duration + { + CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep); + CHECK_EQ(Cids.size(), Keep.size()); + } + + // Move forward in time + { + CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep); + CHECK_EQ(0, Keep.size()); + } + } + + // Expect timestamps to be serialized + { + GcManager Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + std::vector<IoHash> Keep; + + // Collect garbage with 1 hour max cache duration + { + CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep); + CHECK_EQ(3, Keep.size()); + } + + // Move forward in time + { + CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep); + CHECK_EQ(0, Keep.size()); + } + } + } + + SUBCASE("gc removes standalone values") + { + ScopedTemporaryDirectory TempDir; + GcManager Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + const auto Bucket = "fortysixandtwo"sv; + const GcClock::TimePoint CurrentTime = GcClock::Now(); + + std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)}; + + for (const auto& Key : Keys) + { + IoBuffer Value = testutils::CreateBinaryCacheValue(128 << 10); + Zcs.Put(Bucket, Key, {.Value = Value}); + } + + { + GcContext GcCtx(CurrentTime - std::chrono::hours(46)); + + Gc.CollectGarbage(GcCtx); + + for (const auto& Key : Keys) + { + ZenCacheValue CacheValue; + const bool Exists = Zcs.Get(Bucket, Key, CacheValue); + CHECK(Exists); + } + } + + // Move forward in time and collect again + { + GcContext GcCtx(CurrentTime + std::chrono::minutes(2)); + Gc.CollectGarbage(GcCtx); + + for (const auto& Key : Keys) + { + ZenCacheValue CacheValue; + const bool Exists = Zcs.Get(Bucket, Key, CacheValue); + CHECK(!Exists); + } + + CHECK_EQ(0, Zcs.StorageSize().DiskSize); + } + } + + SUBCASE("gc removes small objects") + { + ScopedTemporaryDirectory TempDir; + GcManager Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + const auto Bucket = "rightintwo"sv; + + std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)}; + + for (const auto& Key : Keys) + { + IoBuffer Value = testutils::CreateBinaryCacheValue(128); + Zcs.Put(Bucket, Key, {.Value = Value}); + } + + { + GcContext GcCtx(GcClock::Now() - std::chrono::hours(2)); + GcCtx.CollectSmallObjects(true); + + Gc.CollectGarbage(GcCtx); + + for (const auto& Key : Keys) + { + ZenCacheValue CacheValue; + const bool Exists = Zcs.Get(Bucket, Key, CacheValue); + CHECK(Exists); + } + } + + // Move forward in time and collect again + { + GcContext GcCtx(GcClock::Now() + std::chrono::minutes(2)); + GcCtx.CollectSmallObjects(true); + + Zcs.Flush(); + Gc.CollectGarbage(GcCtx); + + for (const auto& Key : Keys) + { + ZenCacheValue CacheValue; + const bool Exists = Zcs.Get(Bucket, Key, CacheValue); + CHECK(!Exists); + } + + CHECK_EQ(0, Zcs.StorageSize().DiskSize); + } + } +} + +TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) +{ + // for (uint32_t i = 0; i < 100; ++i) + { + ScopedTemporaryDirectory TempDir; + + const uint64_t kChunkSize = 1048; + const int32_t kChunkCount = 8192; + + struct Chunk + { + std::string Bucket; + IoBuffer Buffer; + }; + std::unordered_map<IoHash, Chunk, IoHash::Hasher> Chunks; + Chunks.reserve(kChunkCount); + + const std::string Bucket1 = "rightinone"; + const std::string Bucket2 = "rightintwo"; + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + while (true) + { + IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + if (Chunks.contains(Hash)) + { + continue; + } + Chunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk}; + break; + } + while (true) + { + IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + if (Chunks.contains(Hash)) + { + continue; + } + Chunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk}; + break; + } + } + + CreateDirectories(TempDir.Path()); + + WorkerThreadPool ThreadPool(4); + GcManager Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path()); + + { + std::atomic<size_t> WorkCompleted = 0; + for (const auto& Chunk : Chunks) + { + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { + Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size()) + { + Sleep(1); + } + } + + const uint64_t TotalSize = Zcs.StorageSize().DiskSize; + CHECK_LE(kChunkSize * Chunks.size(), TotalSize); + + { + std::atomic<size_t> WorkCompleted = 0; + for (const auto& Chunk : Chunks) + { + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { + std::string Bucket = Chunk.second.Bucket; + IoHash ChunkHash = Chunk.first; + ZenCacheValue CacheValue; + + CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue)); + IoHash Hash = IoHash::HashBuffer(CacheValue.Value); + CHECK(ChunkHash == Hash); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size()) + { + Sleep(1); + } + } + std::unordered_map<IoHash, std::string, IoHash::Hasher> GcChunkHashes; + GcChunkHashes.reserve(Chunks.size()); + for (const auto& Chunk : Chunks) + { + GcChunkHashes[Chunk.first] = Chunk.second.Bucket; + } + { + std::unordered_map<IoHash, Chunk, IoHash::Hasher> NewChunks; + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + { + IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + NewChunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk}; + } + { + IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + NewChunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk}; + } + } + + std::atomic<size_t> WorkCompleted = 0; + std::atomic_uint32_t AddedChunkCount = 0; + for (const auto& Chunk : NewChunks) + { + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() { + Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}); + AddedChunkCount.fetch_add(1); + WorkCompleted.fetch_add(1); + }); + } + + for (const auto& Chunk : Chunks) + { + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() { + ZenCacheValue CacheValue; + if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) + { + CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); + } + WorkCompleted.fetch_add(1); + }); + } + while (AddedChunkCount.load() < NewChunks.size()) + { + // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope + for (const auto& Chunk : NewChunks) + { + ZenCacheValue CacheValue; + if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) + { + GcChunkHashes[Chunk.first] = Chunk.second.Bucket; + } + } + std::vector<IoHash> KeepHashes; + KeepHashes.reserve(GcChunkHashes.size()); + for (const auto& Entry : GcChunkHashes) + { + KeepHashes.push_back(Entry.first); + } + size_t C = 0; + while (C < KeepHashes.size()) + { + if (C % 155 == 0) + { + if (C < KeepHashes.size() - 1) + { + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + if (C + 3 < KeepHashes.size() - 1) + { + KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + } + C++; + } + + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + GcCtx.AddRetainedCids(KeepHashes); + Zcs.CollectGarbage(GcCtx); + const HashKeySet& Deleted = GcCtx.DeletedCids(); + Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + } + + while (WorkCompleted < NewChunks.size() + Chunks.size()) + { + Sleep(1); + } + + { + // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope + for (const auto& Chunk : NewChunks) + { + ZenCacheValue CacheValue; + if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) + { + GcChunkHashes[Chunk.first] = Chunk.second.Bucket; + } + } + std::vector<IoHash> KeepHashes; + KeepHashes.reserve(GcChunkHashes.size()); + for (const auto& Entry : GcChunkHashes) + { + KeepHashes.push_back(Entry.first); + } + size_t C = 0; + while (C < KeepHashes.size()) + { + if (C % 155 == 0) + { + if (C < KeepHashes.size() - 1) + { + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + if (C + 3 < KeepHashes.size() - 1) + { + KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + } + C++; + } + + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + GcCtx.AddRetainedCids(KeepHashes); + Zcs.CollectGarbage(GcCtx); + const HashKeySet& Deleted = GcCtx.DeletedCids(); + Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + } + } + { + std::atomic<size_t> WorkCompleted = 0; + for (const auto& Chunk : GcChunkHashes) + { + ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() { + ZenCacheValue CacheValue; + CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue)); + CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < GcChunkHashes.size()) + { + Sleep(1); + } + } + } +} + +TEST_CASE("z$.namespaces") +{ + using namespace testutils; + + const auto CreateCacheValue = [](size_t Size) -> CbObject { + std::vector<uint8_t> Buf; + Buf.resize(Size); + + CbObjectWriter Writer; + Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); + return Writer.Save(); + }; + + ScopedTemporaryDirectory TempDir; + CreateDirectories(TempDir.Path()); + + IoHash Key1; + IoHash Key2; + { + GcManager Gc; + ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = false}); + const auto Bucket = "teardrinker"sv; + const auto CustomNamespace = "mynamespace"sv; + + // Create a cache record + Key1 = CreateKey(42); + CbObject CacheValue = CreateCacheValue(4096); + + IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); + Buffer.SetContentType(ZenContentType::kCbObject); + + ZenCacheValue PutValue = {.Value = Buffer}; + Zcs.Put(ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue); + + ZenCacheValue GetValue; + CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue)); + CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue)); + + // This should just be dropped as we don't allow creating of namespaces on the fly + Zcs.Put(CustomNamespace, Bucket, Key1, PutValue); + CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue)); + } + + { + GcManager Gc; + ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}); + const auto Bucket = "teardrinker"sv; + const auto CustomNamespace = "mynamespace"sv; + + Key2 = CreateKey(43); + CbObject CacheValue2 = CreateCacheValue(4096); + + IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer(); + Buffer2.SetContentType(ZenContentType::kCbObject); + ZenCacheValue PutValue2 = {.Value = Buffer2}; + Zcs.Put(CustomNamespace, Bucket, Key2, PutValue2); + + ZenCacheValue GetValue; + CHECK(!Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue)); + CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue)); + CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue)); + CHECK(Zcs.Get(CustomNamespace, Bucket, Key2, GetValue)); + } +} + +TEST_CASE("z$.drop.bucket") +{ + using namespace testutils; + + const auto CreateCacheValue = [](size_t Size) -> CbObject { + std::vector<uint8_t> Buf; + Buf.resize(Size); + + CbObjectWriter Writer; + Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); + return Writer.Save(); + }; + + ScopedTemporaryDirectory TempDir; + CreateDirectories(TempDir.Path()); + + IoHash Key1; + IoHash Key2; + + auto PutValue = + [&CreateCacheValue](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) { + // Create a cache record + IoHash Key = CreateKey(KeyIndex); + CbObject CacheValue = CreateCacheValue(Size); + + IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); + Buffer.SetContentType(ZenContentType::kCbObject); + + ZenCacheValue PutValue = {.Value = Buffer}; + Zcs.Put(Namespace, Bucket, Key, PutValue); + return Key; + }; + auto GetValue = [](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) { + ZenCacheValue GetValue; + Zcs.Get(Namespace, Bucket, Key, GetValue); + return GetValue; + }; + WorkerThreadPool Workers(1); + { + GcManager Gc; + ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}); + const auto Bucket = "teardrinker"sv; + const auto Namespace = "mynamespace"sv; + + Key1 = PutValue(Zcs, Namespace, Bucket, 42, 4096); + Key2 = PutValue(Zcs, Namespace, Bucket, 43, 2048); + + ZenCacheValue Value1 = GetValue(Zcs, Namespace, Bucket, Key1); + CHECK(Value1.Value); + + std::atomic_bool WorkComplete = false; + Workers.ScheduleWork([&]() { + zen::Sleep(100); + Value1.Value = IoBuffer{}; + WorkComplete = true; + }); + // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket + // Our DropBucket execution blocks any incoming request from completing until we are done with the drop + CHECK(Zcs.DropBucket(Namespace, Bucket)); + while (!WorkComplete) + { + zen::Sleep(1); + } + + // Entire bucket should be dropped, but doing a request should will re-create the namespace but it must still be empty + Value1 = GetValue(Zcs, Namespace, Bucket, Key1); + CHECK(!Value1.Value); + ZenCacheValue Value2 = GetValue(Zcs, Namespace, Bucket, Key2); + CHECK(!Value2.Value); + } +} + +TEST_CASE("z$.drop.namespace") +{ + using namespace testutils; + + const auto CreateCacheValue = [](size_t Size) -> CbObject { + std::vector<uint8_t> Buf; + Buf.resize(Size); + + CbObjectWriter Writer; + Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); + return Writer.Save(); + }; + + ScopedTemporaryDirectory TempDir; + CreateDirectories(TempDir.Path()); + + auto PutValue = + [&CreateCacheValue](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) { + // Create a cache record + IoHash Key = CreateKey(KeyIndex); + CbObject CacheValue = CreateCacheValue(Size); + + IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); + Buffer.SetContentType(ZenContentType::kCbObject); + + ZenCacheValue PutValue = {.Value = Buffer}; + Zcs.Put(Namespace, Bucket, Key, PutValue); + return Key; + }; + auto GetValue = [](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) { + ZenCacheValue GetValue; + Zcs.Get(Namespace, Bucket, Key, GetValue); + return GetValue; + }; + WorkerThreadPool Workers(1); + { + GcManager Gc; + ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}); + const auto Bucket1 = "teardrinker1"sv; + const auto Bucket2 = "teardrinker2"sv; + const auto Namespace1 = "mynamespace1"sv; + const auto Namespace2 = "mynamespace2"sv; + + IoHash Key1 = PutValue(Zcs, Namespace1, Bucket1, 42, 4096); + IoHash Key2 = PutValue(Zcs, Namespace1, Bucket2, 43, 2048); + IoHash Key3 = PutValue(Zcs, Namespace2, Bucket1, 44, 4096); + IoHash Key4 = PutValue(Zcs, Namespace2, Bucket2, 45, 2048); + + ZenCacheValue Value1 = GetValue(Zcs, Namespace1, Bucket1, Key1); + CHECK(Value1.Value); + ZenCacheValue Value2 = GetValue(Zcs, Namespace1, Bucket2, Key2); + CHECK(Value2.Value); + ZenCacheValue Value3 = GetValue(Zcs, Namespace2, Bucket1, Key3); + CHECK(Value3.Value); + ZenCacheValue Value4 = GetValue(Zcs, Namespace2, Bucket2, Key4); + CHECK(Value4.Value); + + std::atomic_bool WorkComplete = false; + Workers.ScheduleWork([&]() { + zen::Sleep(100); + Value1.Value = IoBuffer{}; + Value2.Value = IoBuffer{}; + Value3.Value = IoBuffer{}; + Value4.Value = IoBuffer{}; + WorkComplete = true; + }); + // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket + // Our DropBucket execution blocks any incoming request from completing until we are done with the drop + CHECK(Zcs.DropNamespace(Namespace1)); + while (!WorkComplete) + { + zen::Sleep(1); + } + + // Entire namespace should be dropped, but doing a request should will re-create the namespace but it must still be empty + Value1 = GetValue(Zcs, Namespace1, Bucket1, Key1); + CHECK(!Value1.Value); + Value2 = GetValue(Zcs, Namespace1, Bucket2, Key2); + CHECK(!Value2.Value); + Value3 = GetValue(Zcs, Namespace2, Bucket1, Key3); + CHECK(Value3.Value); + Value4 = GetValue(Zcs, Namespace2, Bucket2, Key4); + CHECK(Value4.Value); + } +} + +TEST_CASE("z$.blocked.disklayer.put") +{ + ScopedTemporaryDirectory TempDir; + + GcStorageSize CacheSize; + + const auto CreateCacheValue = [](size_t Size) -> CbObject { + std::vector<uint8_t> Buf; + Buf.resize(Size, Size & 0xff); + + CbObjectWriter Writer; + Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); + return Writer.Save(); + }; + + GcManager Gc; + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + + CbObject CacheValue = CreateCacheValue(64 * 1024 + 64); + + IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); + Buffer.SetContentType(ZenContentType::kCbObject); + + size_t Key = Buffer.Size(); + IoHash HashKey = IoHash::HashBuffer(&Key, sizeof(uint32_t)); + Zcs.Put("test_bucket", HashKey, {.Value = Buffer}); + + ZenCacheValue BufferGet; + CHECK(Zcs.Get("test_bucket", HashKey, BufferGet)); + + CbObject CacheValue2 = CreateCacheValue(64 * 1024 + 64 + 1); + IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer(); + Buffer2.SetContentType(ZenContentType::kCbObject); + + // We should be able to overwrite even if the file is open for read + Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}); + + MemoryView OldView = BufferGet.Value.GetView(); + + ZenCacheValue BufferGet2; + CHECK(Zcs.Get("test_bucket", HashKey, BufferGet2)); + MemoryView NewView = BufferGet2.Value.GetView(); + + // Make sure file openend for read before we wrote it still have old data + CHECK(OldView.GetSize() == Buffer.GetSize()); + CHECK(memcmp(OldView.GetData(), Buffer.GetData(), OldView.GetSize()) == 0); + + // Make sure we get the new data when reading after we write new data + CHECK(NewView.GetSize() == Buffer2.GetSize()); + CHECK(memcmp(NewView.GetData(), Buffer2.GetData(), NewView.GetSize()) == 0); +} + +TEST_CASE("z$.scrub") +{ + ScopedTemporaryDirectory TempDir; + + using namespace testutils; + + struct CacheRecord + { + IoBuffer Record; + std::vector<CompressedBuffer> Attachments; + }; + + auto CreateCacheRecord = [](bool Structured, std::string_view Bucket, const IoHash& Key, const std::vector<size_t>& AttachmentSizes) { + CacheRecord Result; + if (Structured) + { + Result.Attachments.resize(AttachmentSizes.size()); + CbObjectWriter Record; + Record.BeginObject("Key"sv); + { + Record << "Bucket"sv << Bucket; + Record << "Hash"sv << Key; + } + Record.EndObject(); + for (size_t Index = 0; Index < AttachmentSizes.size(); Index++) + { + IoBuffer AttachmentData = CreateBinaryCacheValue(AttachmentSizes[Index]); + CompressedBuffer CompressedAttachmentData = CompressedBuffer::Compress(SharedBuffer(AttachmentData)); + Record.AddBinaryAttachment(fmt::format("attachment-{}", Index), CompressedAttachmentData.DecodeRawHash()); + Result.Attachments[Index] = CompressedAttachmentData; + } + Result.Record = Record.Save().GetBuffer().AsIoBuffer(); + Result.Record.SetContentType(ZenContentType::kCbObject); + } + else + { + std::string RecordData = fmt::format("{}:{}", Bucket, Key.ToHexString()); + size_t TotalSize = RecordData.length() + 1; + for (size_t AttachmentSize : AttachmentSizes) + { + TotalSize += AttachmentSize; + } + Result.Record = IoBuffer(TotalSize); + char* DataPtr = (char*)Result.Record.MutableData(); + memcpy(DataPtr, RecordData.c_str(), RecordData.length() + 1); + DataPtr += RecordData.length() + 1; + for (size_t AttachmentSize : AttachmentSizes) + { + IoBuffer AttachmentData = CreateBinaryCacheValue(AttachmentSize); + memcpy(DataPtr, AttachmentData.GetData(), AttachmentData.GetSize()); + DataPtr += AttachmentData.GetSize(); + } + } + return Result; + }; + + GcManager Gc; + CidStore CidStore(Gc); + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + CidStore.Initialize(CidConfig); + + auto CreateRecords = + [&](bool IsStructured, std::string_view BucketName, const std::vector<IoHash>& Cids, const std::vector<size_t>& AttachmentSizes) { + for (const IoHash& Cid : Cids) + { + CacheRecord Record = CreateCacheRecord(IsStructured, BucketName, Cid, AttachmentSizes); + Zcs.Put("mybucket", Cid, {.Value = Record.Record}); + for (const CompressedBuffer& Attachment : Record.Attachments) + { + CidStore.AddChunk(Attachment.GetCompressed().Flatten().AsIoBuffer(), Attachment.DecodeRawHash()); + } + } + }; + + std::vector<size_t> AttachmentSizes = {16, 1000, 2000, 4000, 8000, 64000, 80000}; + + std::vector<IoHash> UnstructuredCids{CreateKey(4), CreateKey(5), CreateKey(6)}; + CreateRecords(false, "mybucket"sv, UnstructuredCids, AttachmentSizes); + + std::vector<IoHash> StructuredCids{CreateKey(1), CreateKey(2), CreateKey(3)}; + CreateRecords(true, "mybucket"sv, StructuredCids, AttachmentSizes); + + ScrubContext ScrubCtx; + Zcs.Scrub(ScrubCtx); + CidStore.Scrub(ScrubCtx); + CHECK(ScrubCtx.ScrubbedChunks() == (StructuredCids.size() + StructuredCids.size() * AttachmentSizes.size()) + UnstructuredCids.size()); + CHECK(ScrubCtx.BadCids().GetSize() == 0); +} + +#endif + +void +z$_forcelink() +{ +} + +} // namespace zen diff --git a/src/zenserver/cache/structuredcachestore.h b/src/zenserver/cache/structuredcachestore.h new file mode 100644 index 000000000..3fb4f035d --- /dev/null +++ b/src/zenserver/cache/structuredcachestore.h @@ -0,0 +1,535 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/compactbinary.h> +#include <zencore/iobuffer.h> +#include <zencore/iohash.h> +#include <zencore/thread.h> +#include <zencore/uid.h> +#include <zenstore/blockstore.h> +#include <zenstore/caslog.h> +#include <zenstore/gc.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> +ZEN_THIRD_PARTY_INCLUDES_END + +#include <atomic> +#include <compare> +#include <filesystem> +#include <unordered_map> + +#define ZEN_USE_CACHE_TRACKER 0 + +namespace zen { + +class PathBuilderBase; +class GcManager; +class ZenCacheTracker; +class ScrubContext; + +/****************************************************************************** + + /$$$$$$$$ /$$$$$$ /$$ + |_____ $$ /$$__ $$ | $$ + /$$/ /$$$$$$ /$$$$$$$ | $$ \__/ /$$$$$$ /$$$$$$| $$$$$$$ /$$$$$$ + /$$/ /$$__ $| $$__ $$ | $$ |____ $$/$$_____| $$__ $$/$$__ $$ + /$$/ | $$$$$$$| $$ \ $$ | $$ /$$$$$$| $$ | $$ \ $| $$$$$$$$ + /$$/ | $$_____| $$ | $$ | $$ $$/$$__ $| $$ | $$ | $| $$_____/ + /$$$$$$$| $$$$$$| $$ | $$ | $$$$$$| $$$$$$| $$$$$$| $$ | $| $$$$$$$ + |________/\_______|__/ |__/ \______/ \_______/\_______|__/ |__/\_______/ + + Cache store for UE5. Restricts keys to "{bucket}/{hash}" pairs where the hash + is 40 (hex) chars in size. Values may be opaque blobs or structured objects + which can in turn contain references to other objects (or blobs). + +******************************************************************************/ + +namespace access_tracking { + + struct KeyAccessTime + { + IoHash Key; + GcClock::Tick LastAccess{}; + }; + + struct AccessTimes + { + std::unordered_map<std::string, std::vector<KeyAccessTime>> Buckets; + }; +}; // namespace access_tracking + +struct ZenCacheValue +{ + IoBuffer Value; + uint64_t RawSize = 0; + IoHash RawHash = IoHash::Zero; +}; + +struct CacheValueDetails +{ + struct ValueDetails + { + uint64_t Size; + uint64_t RawSize; + IoHash RawHash; + GcClock::Tick LastAccess{}; + std::vector<IoHash> Attachments; + ZenContentType ContentType; + }; + + struct BucketDetails + { + std::unordered_map<IoHash, ValueDetails, IoHash::Hasher> Values; + }; + + struct NamespaceDetails + { + std::unordered_map<std::string, BucketDetails> Buckets; + }; + + std::unordered_map<std::string, NamespaceDetails> Namespaces; +}; + +////////////////////////////////////////////////////////////////////////// + +#pragma pack(push) +#pragma pack(1) + +struct DiskLocation +{ + inline DiskLocation() = default; + + inline DiskLocation(uint64_t ValueSize, uint8_t Flags) : Flags(Flags | kStandaloneFile) { Location.StandaloneSize = ValueSize; } + + inline DiskLocation(const BlockStoreLocation& Location, uint64_t PayloadAlignment, uint8_t Flags) : Flags(Flags & ~kStandaloneFile) + { + this->Location.BlockLocation = BlockStoreDiskLocation(Location, PayloadAlignment); + } + + inline BlockStoreLocation GetBlockLocation(uint64_t PayloadAlignment) const + { + ZEN_ASSERT(!(Flags & kStandaloneFile)); + return Location.BlockLocation.Get(PayloadAlignment); + } + + inline uint64_t Size() const { return (Flags & kStandaloneFile) ? Location.StandaloneSize : Location.BlockLocation.GetSize(); } + inline uint8_t IsFlagSet(uint64_t Flag) const { return Flags & Flag; } + inline uint8_t GetFlags() const { return Flags; } + inline ZenContentType GetContentType() const + { + ZenContentType ContentType = ZenContentType::kBinary; + + if (IsFlagSet(kStructured)) + { + ContentType = ZenContentType::kCbObject; + } + + if (IsFlagSet(kCompressed)) + { + ContentType = ZenContentType::kCompressedBinary; + } + + return ContentType; + } + + union + { + BlockStoreDiskLocation BlockLocation; // 10 bytes + uint64_t StandaloneSize = 0; // 8 bytes + } Location; + + static const uint8_t kStandaloneFile = 0x80u; // Stored as a separate file + static const uint8_t kStructured = 0x40u; // Serialized as compact binary + static const uint8_t kTombStone = 0x20u; // Represents a deleted key/value + static const uint8_t kCompressed = 0x10u; // Stored in compressed buffer format + + uint8_t Flags = 0; + uint8_t Reserved = 0; +}; + +struct DiskIndexEntry +{ + IoHash Key; // 20 bytes + DiskLocation Location; // 12 bytes +}; + +#pragma pack(pop) + +static_assert(sizeof(DiskIndexEntry) == 32); + +// This store the access time as seconds since epoch internally in a 32-bit value giving is a range of 136 years since epoch +struct AccessTime +{ + explicit AccessTime(GcClock::Tick Tick) noexcept : SecondsSinceEpoch(ToSeconds(Tick)) {} + AccessTime& operator=(GcClock::Tick Tick) noexcept + { + SecondsSinceEpoch.store(ToSeconds(Tick), std::memory_order_relaxed); + return *this; + } + operator GcClock::Tick() const noexcept + { + return std::chrono::duration_cast<GcClock::Duration>(std::chrono::seconds(SecondsSinceEpoch.load(std::memory_order_relaxed))) + .count(); + } + + AccessTime(AccessTime&& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {} + AccessTime(const AccessTime& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {} + AccessTime& operator=(AccessTime&& Rhs) noexcept + { + SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed); + return *this; + } + AccessTime& operator=(const AccessTime& Rhs) noexcept + { + SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed); + return *this; + } + +private: + static uint32_t ToSeconds(GcClock::Tick Tick) + { + return gsl::narrow<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>(GcClock::Duration(Tick)).count()); + } + std::atomic_uint32_t SecondsSinceEpoch; +}; + +/** In-memory cache storage + + Intended for small values which are frequently accessed + + This should have a better memory management policy to maintain reasonable + footprint. + */ +class ZenCacheMemoryLayer +{ +public: + struct Configuration + { + uint64_t TargetFootprintBytes = 16 * 1024 * 1024; + uint64_t ScavengeThreshold = 4 * 1024 * 1024; + }; + + struct BucketInfo + { + uint64_t EntryCount = 0; + uint64_t TotalSize = 0; + }; + + struct Info + { + Configuration Config; + std::vector<std::string> BucketNames; + uint64_t EntryCount = 0; + uint64_t TotalSize = 0; + }; + + ZenCacheMemoryLayer(); + ~ZenCacheMemoryLayer(); + + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); + void Drop(); + bool DropBucket(std::string_view Bucket); + void Scrub(ScrubContext& Ctx); + void GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes); + void Reset(); + uint64_t TotalSize() const; + + Info GetInfo() const; + std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const; + + const Configuration& GetConfiguration() const { return m_Configuration; } + void SetConfiguration(const Configuration& NewConfig) { m_Configuration = NewConfig; } + +private: + struct CacheBucket + { +#pragma pack(push) +#pragma pack(1) + struct BucketPayload + { + IoBuffer Payload; // 8 + uint32_t RawSize; // 4 + IoHash RawHash; // 20 + }; +#pragma pack(pop) + static_assert(sizeof(BucketPayload) == 32u); + static_assert(sizeof(AccessTime) == 4u); + + mutable RwLock m_BucketLock; + std::vector<AccessTime> m_AccessTimes; + std::vector<BucketPayload> m_Payloads; + tsl::robin_map<IoHash, uint32_t> m_CacheMap; + + std::atomic_uint64_t m_TotalSize{}; + + bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(const IoHash& HashKey, const ZenCacheValue& Value); + void Drop(); + void Scrub(ScrubContext& Ctx); + void GatherAccessTimes(std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes); + inline uint64_t TotalSize() const { return m_TotalSize; } + uint64_t EntryCount() const; + }; + + mutable RwLock m_Lock; + std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets; + std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets; + Configuration m_Configuration; + + ZenCacheMemoryLayer(const ZenCacheMemoryLayer&) = delete; + ZenCacheMemoryLayer& operator=(const ZenCacheMemoryLayer&) = delete; +}; + +class ZenCacheDiskLayer +{ +public: + struct Configuration + { + std::filesystem::path RootDir; + }; + + struct BucketInfo + { + uint64_t EntryCount = 0; + uint64_t TotalSize = 0; + }; + + struct Info + { + Configuration Config; + std::vector<std::string> BucketNames; + uint64_t EntryCount = 0; + uint64_t TotalSize = 0; + }; + + explicit ZenCacheDiskLayer(const std::filesystem::path& RootDir); + ~ZenCacheDiskLayer(); + + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); + bool Drop(); + bool DropBucket(std::string_view Bucket); + void Flush(); + void Scrub(ScrubContext& Ctx); + void GatherReferences(GcContext& GcCtx); + void CollectGarbage(GcContext& GcCtx); + void UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes); + + void DiscoverBuckets(); + uint64_t TotalSize() const; + + Info GetInfo() const; + std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const; + + CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const; + +private: + /** A cache bucket manages a single directory containing + metadata and data for that bucket + */ + struct CacheBucket + { + CacheBucket(std::string BucketName); + ~CacheBucket(); + + bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); + bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(const IoHash& HashKey, const ZenCacheValue& Value); + bool Drop(); + void Flush(); + void Scrub(ScrubContext& Ctx); + void GatherReferences(GcContext& GcCtx); + void CollectGarbage(GcContext& GcCtx); + void UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes); + + inline uint64_t TotalSize() const { return m_TotalStandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(); } + uint64_t EntryCount() const; + + CacheValueDetails::BucketDetails GetValueDetails(const std::string_view ValueFilter) const; + + private: + const uint64_t MaxBlockSize = 1ull << 30; + uint64_t m_PayloadAlignment = 1ull << 4; + + std::string m_BucketName; + std::filesystem::path m_BucketDir; + std::filesystem::path m_BlocksBasePath; + BlockStore m_BlockStore; + Oid m_BucketId; + uint64_t m_LargeObjectThreshold = 128 * 1024; + + // These files are used to manage storage of small objects for this bucket + + TCasLogFile<DiskIndexEntry> m_SlogFile; + uint64_t m_LogFlushPosition = 0; + +#pragma pack(push) +#pragma pack(1) + struct BucketPayload + { + DiskLocation Location; // 12 + uint64_t RawSize; // 8 + IoHash RawHash; // 20 + }; +#pragma pack(pop) + static_assert(sizeof(BucketPayload) == 40u); + static_assert(sizeof(AccessTime) == 4u); + + using IndexMap = tsl::robin_map<IoHash, size_t, IoHash::Hasher>; + + mutable RwLock m_IndexLock; + std::vector<AccessTime> m_AccessTimes; + std::vector<BucketPayload> m_Payloads; + IndexMap m_Index; + + std::atomic_uint64_t m_TotalStandaloneSize{}; + + void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const; + void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); + IoBuffer GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const; + void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); + IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const; + void MakeIndexSnapshot(); + uint64_t ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion); + uint64_t ReadLog(const std::filesystem::path& LogPath, uint64_t LogPosition); + void OpenLog(const bool IsNew); + void SaveManifest(); + CacheValueDetails::ValueDetails GetValueDetails(const IoHash& Key, size_t Index) const; + // These locks are here to avoid contention on file creation, therefore it's sufficient + // that we take the same lock for the same hash + // + // These locks are small and should really be spaced out so they don't share cache lines, + // but we don't currently access them at particularly high frequency so it should not be + // an issue in practice + + mutable RwLock m_ShardedLocks[256]; + inline RwLock& LockForHash(const IoHash& Hash) const { return m_ShardedLocks[Hash.Hash[19]]; } + }; + + std::filesystem::path m_RootDir; + mutable RwLock m_Lock; + std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets; // TODO: make this case insensitive + std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets; + + ZenCacheDiskLayer(const ZenCacheDiskLayer&) = delete; + ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete; +}; + +class ZenCacheNamespace final : public RefCounted, public GcStorage, public GcContributor +{ +public: + struct Configuration + { + std::filesystem::path RootDir; + uint64_t DiskLayerThreshold = 0; + }; + struct BucketInfo + { + ZenCacheDiskLayer::BucketInfo DiskLayerInfo; + ZenCacheMemoryLayer::BucketInfo MemoryLayerInfo; + }; + struct Info + { + Configuration Config; + std::vector<std::string> BucketNames; + ZenCacheDiskLayer::Info DiskLayerInfo; + ZenCacheMemoryLayer::Info MemoryLayerInfo; + }; + + ZenCacheNamespace(GcManager& Gc, const std::filesystem::path& RootDir); + ~ZenCacheNamespace(); + + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); + bool Drop(); + bool DropBucket(std::string_view Bucket); + void Flush(); + void Scrub(ScrubContext& Ctx); + uint64_t DiskLayerThreshold() const { return m_DiskLayerSizeThreshold; } + virtual void GatherReferences(GcContext& GcCtx) override; + virtual void CollectGarbage(GcContext& GcCtx) override; + virtual GcStorageSize StorageSize() const override; + Info GetInfo() const; + std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const; + + CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const; + +private: + std::filesystem::path m_RootDir; + ZenCacheMemoryLayer m_MemLayer; + ZenCacheDiskLayer m_DiskLayer; + uint64_t m_DiskLayerSizeThreshold = 1 * 1024; + uint64_t m_LastScrubTime = 0; + +#if ZEN_USE_CACHE_TRACKER + std::unique_ptr<ZenCacheTracker> m_AccessTracker; +#endif + + ZenCacheNamespace(const ZenCacheNamespace&) = delete; + ZenCacheNamespace& operator=(const ZenCacheNamespace&) = delete; +}; + +class ZenCacheStore final +{ +public: + static constexpr std::string_view DefaultNamespace = + "!default!"; // This is intentionally not a valid namespace name and will only be used for mapping when no namespace is given + static constexpr std::string_view NamespaceDiskPrefix = "ns_"; + + struct Configuration + { + std::filesystem::path BasePath; + bool AllowAutomaticCreationOfNamespaces = false; + }; + + struct Info + { + Configuration Config; + std::vector<std::string> NamespaceNames; + uint64_t DiskEntryCount = 0; + uint64_t MemoryEntryCount = 0; + GcStorageSize StorageSize; + }; + + ZenCacheStore(GcManager& Gc, const Configuration& Configuration); + ~ZenCacheStore(); + + bool Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); + bool DropBucket(std::string_view Namespace, std::string_view Bucket); + bool DropNamespace(std::string_view Namespace); + void Flush(); + void Scrub(ScrubContext& Ctx); + + CacheValueDetails GetValueDetails(const std::string_view NamespaceFilter, + const std::string_view BucketFilter, + const std::string_view ValueFilter) const; + + GcStorageSize StorageSize() const; + // const Configuration& GetConfiguration() const { return m_Configuration; } + + Info GetInfo() const; + std::optional<ZenCacheNamespace::Info> GetNamespaceInfo(std::string_view Namespace); + std::optional<ZenCacheNamespace::BucketInfo> GetBucketInfo(std::string_view Namespace, std::string_view Bucket); + +private: + const ZenCacheNamespace* FindNamespace(std::string_view Namespace) const; + ZenCacheNamespace* GetNamespace(std::string_view Namespace); + void IterateNamespaces(const std::function<void(std::string_view Namespace, ZenCacheNamespace& Store)>& Callback) const; + + typedef std::unordered_map<std::string, std::unique_ptr<ZenCacheNamespace>> NamespaceMap; + + mutable RwLock m_NamespacesLock; + NamespaceMap m_Namespaces; + std::vector<std::unique_ptr<ZenCacheNamespace>> m_DroppedNamespaces; + + GcManager& m_Gc; + Configuration m_Configuration; +}; + +void z$_forcelink(); + +} // namespace zen |