diff options
Diffstat (limited to 'zenserver/cache')
| -rw-r--r-- | zenserver/cache/cachetracking.cpp | 376 | ||||
| -rw-r--r-- | zenserver/cache/cachetracking.h | 41 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 3159 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 187 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 3648 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.h | 535 |
6 files changed, 0 insertions, 7946 deletions
diff --git a/zenserver/cache/cachetracking.cpp b/zenserver/cache/cachetracking.cpp deleted file mode 100644 index 9119e3122..000000000 --- a/zenserver/cache/cachetracking.cpp +++ /dev/null @@ -1,376 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "cachetracking.h" - -#if ZEN_USE_CACHE_TRACKER - -# include <zencore/compactbinarybuilder.h> -# include <zencore/compactbinaryvalue.h> -# include <zencore/endian.h> -# include <zencore/filesystem.h> -# include <zencore/logging.h> -# include <zencore/scopeguard.h> -# include <zencore/string.h> - -# include <zencore/testing.h> -# include <zencore/testutils.h> - -ZEN_THIRD_PARTY_INCLUDES_START -# pragma comment(lib, "Rpcrt4.lib") // RocksDB made me do this -# include <fmt/format.h> -# include <rocksdb/db.h> -# include <tsl/robin_map.h> -# include <tsl/robin_set.h> -# include <gsl/gsl-lite.hpp> -ZEN_THIRD_PARTY_INCLUDES_END - -namespace zen { - -namespace rocksdb = ROCKSDB_NAMESPACE; - -static constinit auto Epoch = std::chrono::time_point<std::chrono::system_clock>{}; - -static uint64_t -GetCurrentCacheTimeStamp() -{ - auto Duration = std::chrono::system_clock::now() - Epoch; - uint64_t Millis = std::chrono::duration_cast<std::chrono::milliseconds>(Duration).count(); - - return Millis; -} - -struct CacheAccessSnapshot -{ -public: - void TrackAccess(std::string_view BucketSegment, const IoHash& HashKey) - { - BucketTracker* Tracker = GetBucket(std::string(BucketSegment)); - - Tracker->Track(HashKey); - } - - bool SerializeSnapshot(CbObjectWriter& Cbo) - { - bool Serialized = false; - RwLock::ExclusiveLockScope _(m_Lock); - - for (const auto& Kv : m_BucketMapping) - { - if (m_Buckets[Kv.second]->Size()) - { - Cbo.BeginArray(Kv.first); - m_Buckets[Kv.second]->SerializeSnapshotAndClear(Cbo); - Cbo.EndArray(); - Serialized = true; - } - } - - return Serialized; - } - -private: - struct BucketTracker - { - mutable RwLock Lock; - tsl::robin_set<IoHash> AccessedKeys; - - void Track(const IoHash& HashKey) - { - if (RwLock::SharedLockScope _(Lock); AccessedKeys.contains(HashKey)) - { - return; - } - - RwLock::ExclusiveLockScope _(Lock); - - AccessedKeys.insert(HashKey); - } - - void SerializeSnapshotAndClear(CbObjectWriter& Cbo) - { - RwLock::ExclusiveLockScope _(Lock); - - for (const IoHash& Hash : AccessedKeys) - { - Cbo.AddHash(Hash); - } - - AccessedKeys.clear(); - } - - size_t Size() const - { - RwLock::SharedLockScope _(Lock); - return AccessedKeys.size(); - } - }; - - BucketTracker* GetBucket(const std::string& BucketName) - { - RwLock::SharedLockScope _(m_Lock); - - if (auto It = m_BucketMapping.find(BucketName); It == m_BucketMapping.end()) - { - _.ReleaseNow(); - - return AddNewBucket(BucketName); - } - else - { - return m_Buckets[It->second].get(); - } - } - - BucketTracker* AddNewBucket(const std::string& BucketName) - { - RwLock::ExclusiveLockScope _(m_Lock); - - if (auto It = m_BucketMapping.find(BucketName); It == m_BucketMapping.end()) - { - const uint32_t BucketIndex = gsl::narrow<uint32_t>(m_Buckets.size()); - m_Buckets.emplace_back(std::make_unique<BucketTracker>()); - m_BucketMapping[BucketName] = BucketIndex; - - return m_Buckets[BucketIndex].get(); - } - else - { - return m_Buckets[It->second].get(); - } - } - - RwLock m_Lock; - std::vector<std::unique_ptr<BucketTracker>> m_Buckets; - tsl::robin_map<std::string, uint32_t> m_BucketMapping; -}; - -struct ZenCacheTracker::Impl -{ - Impl(std::filesystem::path StateDirectory) - { - std::filesystem::path StatsDbPath{StateDirectory / ".zdb"}; - - std::string RocksdbPath = StatsDbPath.string(); - - ZEN_DEBUG("opening tracker db at '{}'", RocksdbPath); - - rocksdb::DB* Db = nullptr; - rocksdb::DBOptions Options; - Options.create_if_missing = true; - - std::vector<std::string> ExistingColumnFamilies; - rocksdb::Status Status = rocksdb::DB::ListColumnFamilies(Options, RocksdbPath, &ExistingColumnFamilies); - - std::vector<rocksdb::ColumnFamilyDescriptor> ColumnDescriptors; - - if (Status.IsPathNotFound()) - { - ColumnDescriptors.emplace_back(rocksdb::ColumnFamilyDescriptor{rocksdb::kDefaultColumnFamilyName, {}}); - } - else if (Status.ok()) - { - for (const std::string& Column : ExistingColumnFamilies) - { - rocksdb::ColumnFamilyDescriptor ColumnFamily; - ColumnFamily.name = Column; - ColumnDescriptors.push_back(ColumnFamily); - } - } - else - { - throw std::runtime_error(fmt::format("column family iteration failed for '{}': '{}'", RocksdbPath, Status.getState()).c_str()); - } - - Status = rocksdb::DB::Open(Options, RocksdbPath, ColumnDescriptors, &m_RocksDbColumnHandles, &Db); - - if (!Status.ok()) - { - throw std::runtime_error(fmt::format("database open failed for '{}': '{}'", RocksdbPath, Status.getState()).c_str()); - } - - m_RocksDb.reset(Db); - } - - ~Impl() - { - for (auto* Column : m_RocksDbColumnHandles) - { - delete Column; - } - - m_RocksDbColumnHandles.clear(); - } - - struct KeyStruct - { - uint64_t TimestampLittleEndian; - }; - - void TrackAccess(std::string_view BucketSegment, const IoHash& HashKey) { m_CurrentSnapshot.TrackAccess(BucketSegment, HashKey); } - - void SaveSnapshot() - { - CbObjectWriter Cbo; - - if (m_CurrentSnapshot.SerializeSnapshot(Cbo)) - { - IoBuffer SnapshotBuffer = Cbo.Save().GetBuffer().AsIoBuffer(); - - const KeyStruct Key{.TimestampLittleEndian = ToNetworkOrder(GetCurrentCacheTimeStamp())}; - rocksdb::Slice KeySlice{(const char*)&Key, sizeof Key}; - rocksdb::Slice ValueSlice{(char*)SnapshotBuffer.Data(), SnapshotBuffer.Size()}; - - rocksdb::WriteOptions Wo; - m_RocksDb->Put(Wo, KeySlice, ValueSlice); - } - } - - void IterateSnapshots(std::function<void(uint64_t TimeStamp, CbObject Snapshot)>&& Callback) - { - rocksdb::ManagedSnapshot Snap(m_RocksDb.get()); - - rocksdb::ReadOptions Ro; - Ro.snapshot = Snap.snapshot(); - - std::unique_ptr<rocksdb::Iterator> It{m_RocksDb->NewIterator(Ro)}; - - const KeyStruct ZeroKey{.TimestampLittleEndian = 0}; - rocksdb::Slice ZeroKeySlice{(const char*)&ZeroKey, sizeof ZeroKey}; - - It->Seek(ZeroKeySlice); - - while (It->Valid()) - { - rocksdb::Slice KeySlice = It->key(); - rocksdb::Slice ValueSlice = It->value(); - - if (KeySlice.size() == sizeof(KeyStruct)) - { - IoBuffer ValueBuffer(IoBuffer::Wrap, ValueSlice.data(), ValueSlice.size()); - - CbObject Value = LoadCompactBinaryObject(ValueBuffer); - - uint64_t Key = FromNetworkOrder(*reinterpret_cast<const uint64_t*>(KeySlice.data())); - - Callback(Key, Value); - } - - It->Next(); - } - } - - std::unique_ptr<rocksdb::DB> m_RocksDb; - std::vector<rocksdb::ColumnFamilyHandle*> m_RocksDbColumnHandles; - CacheAccessSnapshot m_CurrentSnapshot; -}; - -ZenCacheTracker::ZenCacheTracker(std::filesystem::path StateDirectory) : m_Impl(new Impl(StateDirectory)) -{ -} - -ZenCacheTracker::~ZenCacheTracker() -{ - delete m_Impl; -} - -void -ZenCacheTracker::TrackAccess(std::string_view BucketSegment, const IoHash& HashKey) -{ - m_Impl->TrackAccess(BucketSegment, HashKey); -} - -void -ZenCacheTracker::SaveSnapshot() -{ - m_Impl->SaveSnapshot(); -} - -void -ZenCacheTracker::IterateSnapshots(std::function<void(uint64_t TimeStamp, CbObject Snapshot)>&& Callback) -{ - m_Impl->IterateSnapshots(std::move(Callback)); -} - -# if ZEN_WITH_TESTS - -TEST_CASE("z$.tracker") -{ - using namespace std::literals; - - const uint64_t t0 = GetCurrentCacheTimeStamp(); - - ScopedTemporaryDirectory TempDir; - - ZenCacheTracker Zcs(TempDir.Path()); - - tsl::robin_set<IoHash> KeyHashes; - - for (int i = 0; i < 10000; ++i) - { - IoHash KeyHash = IoHash::HashBuffer(&i, sizeof i); - - KeyHashes.insert(KeyHash); - - Zcs.TrackAccess("foo"sv, KeyHash); - } - - for (int i = 0; i < 10000; ++i) - { - IoHash KeyHash = IoHash::HashBuffer(&i, sizeof i); - - Zcs.TrackAccess("foo"sv, KeyHash); - } - - Zcs.SaveSnapshot(); - - for (int n = 0; n < 10; ++n) - { - for (int i = 0; i < 1000; ++i) - { - const int Index = i + n * 1000; - IoHash KeyHash = IoHash::HashBuffer(&Index, sizeof Index); - - Zcs.TrackAccess("foo"sv, KeyHash); - } - - Zcs.SaveSnapshot(); - } - - Zcs.SaveSnapshot(); - - const uint64_t t1 = GetCurrentCacheTimeStamp(); - - int SnapshotCount = 0; - - Zcs.IterateSnapshots([&](uint64_t TimeStamp, CbObject Snapshot) { - CHECK(TimeStamp >= t0); - CHECK(TimeStamp <= t1); - - for (auto& Field : Snapshot) - { - CHECK_EQ(Field.GetName(), "foo"sv); - - const CbArray& Array = Field.AsArray(); - - for (const auto& Element : Array) - { - CHECK(KeyHashes.contains(Element.GetValue().AsHash())); - } - } - - ++SnapshotCount; - }); - - CHECK_EQ(SnapshotCount, 11); -} - -# endif - -void -cachetracker_forcelink() -{ -} - -} // namespace zen - -#endif // ZEN_USE_CACHE_TRACKER diff --git a/zenserver/cache/cachetracking.h b/zenserver/cache/cachetracking.h deleted file mode 100644 index fdfe1a4c7..000000000 --- a/zenserver/cache/cachetracking.h +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/iohash.h> - -#include <stdint.h> -#include <filesystem> -#include <functional> - -namespace zen { - -#define ZEN_USE_CACHE_TRACKER 0 -#if ZEN_USE_CACHE_TRACKER - -class CbObject; - -/** - */ - -class ZenCacheTracker -{ -public: - ZenCacheTracker(std::filesystem::path StateDirectory); - ~ZenCacheTracker(); - - void TrackAccess(std::string_view BucketSegment, const IoHash& HashKey); - void SaveSnapshot(); - void IterateSnapshots(std::function<void(uint64_t TimeStamp, CbObject Snapshot)>&& Callback); - -private: - struct Impl; - - Impl* m_Impl = nullptr; -}; - -void cachetracker_forcelink(); - -#endif // ZEN_USE_CACHE_TRACKER - -} // namespace zen diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp deleted file mode 100644 index 90e905bf6..000000000 --- a/zenserver/cache/structuredcache.cpp +++ /dev/null @@ -1,3159 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "structuredcache.h" - -#include <zencore/compactbinary.h> -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinarypackage.h> -#include <zencore/compactbinaryvalidation.h> -#include <zencore/compress.h> -#include <zencore/enumflags.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> -#include <zencore/scopeguard.h> -#include <zencore/stream.h> -#include <zencore/timer.h> -#include <zencore/trace.h> -#include <zencore/workthreadpool.h> -#include <zenhttp/httpserver.h> -#include <zenhttp/httpshared.h> -#include <zenutil/cache/cache.h> -#include <zenutil/cache/rpcrecording.h> - -#include "monitoring/httpstats.h" -#include "structuredcachestore.h" -#include "upstream/jupiter.h" -#include "upstream/upstreamcache.h" -#include "upstream/zen.h" -#include "zenstore/cidstore.h" -#include "zenstore/scrubcontext.h" - -#include <algorithm> -#include <atomic> -#include <filesystem> -#include <queue> -#include <thread> - -#include <cpr/cpr.h> -#include <gsl/gsl-lite.hpp> - -#if ZEN_WITH_TESTS -# include <zencore/testing.h> -# include <zencore/testutils.h> -#endif - -namespace zen { - -using namespace std::literals; - -////////////////////////////////////////////////////////////////////////// - -CachePolicy -ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams) -{ - std::string_view PolicyText = QueryParams.GetValue("Policy"sv); - return !PolicyText.empty() ? zen::ParseCachePolicy(PolicyText) : CachePolicy::Default; -} - -CacheRecordPolicy -LoadCacheRecordPolicy(CbObjectView Object, CachePolicy DefaultPolicy = CachePolicy::Default) -{ - OptionalCacheRecordPolicy Policy = CacheRecordPolicy::Load(Object); - return Policy ? std::move(Policy).Get() : CacheRecordPolicy(DefaultPolicy); -} - -struct AttachmentCount -{ - uint32_t New = 0; - uint32_t Valid = 0; - uint32_t Invalid = 0; - uint32_t Total = 0; -}; - -struct PutRequestData -{ - std::string Namespace; - CacheKey Key; - CbObjectView RecordObject; - CacheRecordPolicy Policy; -}; - -namespace { - static constinit std::string_view HttpZCacheRPCPrefix = "$rpc"sv; - static constinit std::string_view HttpZCacheUtilStartRecording = "exec$/start-recording"sv; - static constinit std::string_view HttpZCacheUtilStopRecording = "exec$/stop-recording"sv; - static constinit std::string_view HttpZCacheUtilReplayRecording = "exec$/replay-recording"sv; - static constinit std::string_view HttpZCacheDetailsPrefix = "details$"sv; - - struct HttpRequestData - { - std::optional<std::string> Namespace; - std::optional<std::string> Bucket; - std::optional<IoHash> HashKey; - std::optional<IoHash> ValueContentId; - }; - - constinit AsciiSet ValidNamespaceNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789-_.ABCDEFGHIJKLMNOPQRSTUVWXYZ"}; - constinit AsciiSet ValidBucketNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"}; - - std::optional<std::string> GetValidNamespaceName(std::string_view Name) - { - if (Name.empty()) - { - ZEN_WARN("Namespace is invalid, empty namespace is not allowed"); - return {}; - } - - if (Name.length() > 64) - { - ZEN_WARN("Namespace '{}' is invalid, length exceeds 64 characters", Name); - return {}; - } - - if (!AsciiSet::HasOnly(Name, ValidNamespaceNameCharactersSet)) - { - ZEN_WARN("Namespace '{}' is invalid, invalid characters detected", Name); - return {}; - } - - return ToLower(Name); - } - - std::optional<std::string> GetValidBucketName(std::string_view Name) - { - if (Name.empty()) - { - ZEN_WARN("Bucket name is invalid, empty bucket name is not allowed"); - return {}; - } - - if (!AsciiSet::HasOnly(Name, ValidBucketNameCharactersSet)) - { - ZEN_WARN("Bucket name '{}' is invalid, invalid characters detected", Name); - return {}; - } - - return ToLower(Name); - } - - std::optional<IoHash> GetValidIoHash(std::string_view Hash) - { - if (Hash.length() != IoHash::StringLength) - { - return {}; - } - - IoHash KeyHash; - if (!ParseHexBytes(Hash.data(), Hash.size(), KeyHash.Hash)) - { - return {}; - } - return KeyHash; - } - - bool HttpRequestParseRelativeUri(std::string_view Key, HttpRequestData& Data) - { - std::vector<std::string_view> Tokens; - uint32_t TokenCount = ForEachStrTok(Key, '/', [&](const std::string_view& Token) { - Tokens.push_back(Token); - return true; - }); - - switch (TokenCount) - { - case 0: - return true; - case 1: - Data.Namespace = GetValidNamespaceName(Tokens[0]); - return Data.Namespace.has_value(); - case 2: - { - std::optional<IoHash> PossibleHashKey = GetValidIoHash(Tokens[1]); - if (PossibleHashKey.has_value()) - { - // Legacy bucket/key request - Data.Bucket = GetValidBucketName(Tokens[0]); - if (!Data.Bucket.has_value()) - { - return false; - } - Data.HashKey = PossibleHashKey; - Data.Namespace = ZenCacheStore::DefaultNamespace; - return true; - } - Data.Namespace = GetValidNamespaceName(Tokens[0]); - if (!Data.Namespace.has_value()) - { - return false; - } - Data.Bucket = GetValidBucketName(Tokens[1]); - if (!Data.Bucket.has_value()) - { - return false; - } - return true; - } - case 3: - { - std::optional<IoHash> PossibleHashKey = GetValidIoHash(Tokens[1]); - if (PossibleHashKey.has_value()) - { - // Legacy bucket/key/valueid request - Data.Bucket = GetValidBucketName(Tokens[0]); - if (!Data.Bucket.has_value()) - { - return false; - } - Data.HashKey = PossibleHashKey; - Data.ValueContentId = GetValidIoHash(Tokens[2]); - if (!Data.ValueContentId.has_value()) - { - return false; - } - Data.Namespace = ZenCacheStore::DefaultNamespace; - return true; - } - Data.Namespace = GetValidNamespaceName(Tokens[0]); - if (!Data.Namespace.has_value()) - { - return false; - } - Data.Bucket = GetValidBucketName(Tokens[1]); - if (!Data.Bucket.has_value()) - { - return false; - } - Data.HashKey = GetValidIoHash(Tokens[2]); - if (!Data.HashKey) - { - return false; - } - return true; - } - case 4: - { - Data.Namespace = GetValidNamespaceName(Tokens[0]); - if (!Data.Namespace.has_value()) - { - return false; - } - - Data.Bucket = GetValidBucketName(Tokens[1]); - if (!Data.Bucket.has_value()) - { - return false; - } - - Data.HashKey = GetValidIoHash(Tokens[2]); - if (!Data.HashKey.has_value()) - { - return false; - } - - Data.ValueContentId = GetValidIoHash(Tokens[3]); - if (!Data.ValueContentId.has_value()) - { - return false; - } - return true; - } - default: - return false; - } - } - - std::optional<std::string> GetRpcRequestNamespace(const CbObjectView Params) - { - CbFieldView NamespaceField = Params["Namespace"sv]; - if (!NamespaceField) - { - return std::string(ZenCacheStore::DefaultNamespace); - } - - if (NamespaceField.HasError()) - { - return {}; - } - if (!NamespaceField.IsString()) - { - return {}; - } - return GetValidNamespaceName(NamespaceField.AsString()); - } - - bool GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key) - { - CbFieldView BucketField = KeyView["Bucket"sv]; - if (BucketField.HasError()) - { - return false; - } - if (!BucketField.IsString()) - { - return false; - } - std::optional<std::string> Bucket = GetValidBucketName(BucketField.AsString()); - if (!Bucket.has_value()) - { - return false; - } - CbFieldView HashField = KeyView["Hash"sv]; - if (HashField.HasError()) - { - return false; - } - if (!HashField.IsHash()) - { - return false; - } - IoHash Hash = HashField.AsHash(); - Key = CacheKey::Create(*Bucket, Hash); - return true; - } - -} // namespace - -////////////////////////////////////////////////////////////////////////// - -HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, - CidStore& InCidStore, - HttpStatsService& StatsService, - HttpStatusService& StatusService, - UpstreamCache& UpstreamCache) -: m_Log(logging::Get("cache")) -, m_CacheStore(InCacheStore) -, m_StatsService(StatsService) -, m_StatusService(StatusService) -, m_CidStore(InCidStore) -, m_UpstreamCache(UpstreamCache) -{ - m_StatsService.RegisterHandler("z$", *this); - m_StatusService.RegisterHandler("z$", *this); -} - -HttpStructuredCacheService::~HttpStructuredCacheService() -{ - ZEN_INFO("closing structured cache"); - m_RequestRecorder.reset(); - - m_StatsService.UnregisterHandler("z$", *this); - m_StatusService.UnregisterHandler("z$", *this); -} - -const char* -HttpStructuredCacheService::BaseUri() const -{ - return "/z$/"; -} - -void -HttpStructuredCacheService::Flush() -{ - m_CacheStore.Flush(); -} - -void -HttpStructuredCacheService::Scrub(ScrubContext& Ctx) -{ - if (m_LastScrubTime == Ctx.ScrubTimestamp()) - { - return; - } - - m_LastScrubTime = Ctx.ScrubTimestamp(); - - m_CidStore.Scrub(Ctx); - m_CacheStore.Scrub(Ctx); -} - -void -HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) -{ - std::string_view Key = Request.RelativeUri(); - std::vector<std::string> Tokens; - uint32_t TokenCount = ForEachStrTok(Key, '/', [&Tokens](std::string_view Token) { - Tokens.push_back(std::string(Token)); - return true; - }); - std::string FilterNamespace; - std::string FilterBucket; - std::string FilterValue; - switch (TokenCount) - { - case 1: - break; - case 2: - { - FilterNamespace = Tokens[1]; - if (FilterNamespace.empty()) - { - return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL - } - } - break; - case 3: - { - FilterNamespace = Tokens[1]; - if (FilterNamespace.empty()) - { - return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL - } - FilterBucket = Tokens[2]; - if (FilterBucket.empty()) - { - return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL - } - } - break; - case 4: - { - FilterNamespace = Tokens[1]; - if (FilterNamespace.empty()) - { - return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL - } - FilterBucket = Tokens[2]; - if (FilterBucket.empty()) - { - return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL - } - FilterValue = Tokens[3]; - if (FilterValue.empty()) - { - return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL - } - } - break; - default: - return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL - } - - HttpServerRequest::QueryParams Params = Request.GetQueryParams(); - bool CSV = Params.GetValue("csv") == "true"; - bool Details = Params.GetValue("details") == "true"; - bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true"; - - std::chrono::seconds NowSeconds = std::chrono::duration_cast<std::chrono::seconds>(GcClock::Now().time_since_epoch()); - CacheValueDetails ValueDetails = m_CacheStore.GetValueDetails(FilterNamespace, FilterBucket, FilterValue); - - if (CSV) - { - ExtendableStringBuilder<4096> CSVWriter; - if (AttachmentDetails) - { - CSVWriter << "Namespace, Bucket, Key, Cid, Size"; - } - else if (Details) - { - CSVWriter << "Namespace, Bucket, Key, Size, RawSize, RawHash, ContentType, Age, AttachmentsCount, AttachmentsSize"; - } - else - { - CSVWriter << "Namespace, Bucket, Key"; - } - for (const auto& NamespaceIt : ValueDetails.Namespaces) - { - const std::string& Namespace = NamespaceIt.first; - for (const auto& BucketIt : NamespaceIt.second.Buckets) - { - const std::string& Bucket = BucketIt.first; - for (const auto& ValueIt : BucketIt.second.Values) - { - if (AttachmentDetails) - { - for (const IoHash& Hash : ValueIt.second.Attachments) - { - IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); - CSVWriter << "\r\n" - << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << Hash.ToHexString() - << ", " << gsl::narrow<uint64_t>(Payload.GetSize()); - } - } - else if (Details) - { - std::chrono::seconds LastAccessedSeconds = std::chrono::duration_cast<std::chrono::seconds>( - GcClock::TimePointFromTick(ValueIt.second.LastAccess).time_since_epoch()); - CSVWriter << "\r\n" - << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << ValueIt.second.Size << "," - << ValueIt.second.RawSize << "," << ValueIt.second.RawHash.ToHexString() << ", " - << ToString(ValueIt.second.ContentType) << ", " << (NowSeconds.count() - LastAccessedSeconds.count()) - << ", " << gsl::narrow<uint64_t>(ValueIt.second.Attachments.size()); - size_t AttachmentsSize = 0; - for (const IoHash& Hash : ValueIt.second.Attachments) - { - IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); - AttachmentsSize += Payload.GetSize(); - } - CSVWriter << ", " << gsl::narrow<uint64_t>(AttachmentsSize); - } - else - { - CSVWriter << "\r\n" << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString(); - } - } - } - } - return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); - } - else - { - CbObjectWriter Cbo; - Cbo.BeginArray("namespaces"); - { - for (const auto& NamespaceIt : ValueDetails.Namespaces) - { - const std::string& Namespace = NamespaceIt.first; - Cbo.BeginObject(); - { - Cbo.AddString("name", Namespace); - Cbo.BeginArray("buckets"); - { - for (const auto& BucketIt : NamespaceIt.second.Buckets) - { - const std::string& Bucket = BucketIt.first; - Cbo.BeginObject(); - { - Cbo.AddString("name", Bucket); - Cbo.BeginArray("values"); - { - for (const auto& ValueIt : BucketIt.second.Values) - { - std::chrono::seconds LastAccessedSeconds = std::chrono::duration_cast<std::chrono::seconds>( - GcClock::TimePointFromTick(ValueIt.second.LastAccess).time_since_epoch()); - Cbo.BeginObject(); - { - Cbo.AddHash("key", ValueIt.first); - if (Details) - { - Cbo.AddInteger("size", ValueIt.second.Size); - if (ValueIt.second.Size > 0 && ValueIt.second.RawSize != 0 && - ValueIt.second.RawSize != ValueIt.second.Size) - { - Cbo.AddInteger("rawsize", ValueIt.second.RawSize); - Cbo.AddHash("rawhash", ValueIt.second.RawHash); - } - Cbo.AddString("contenttype", ToString(ValueIt.second.ContentType)); - Cbo.AddInteger("age", NowSeconds.count() - LastAccessedSeconds.count()); - if (ValueIt.second.Attachments.size() > 0) - { - if (AttachmentDetails) - { - Cbo.BeginArray("attachments"); - { - for (const IoHash& Hash : ValueIt.second.Attachments) - { - Cbo.BeginObject(); - Cbo.AddHash("cid", Hash); - IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); - Cbo.AddInteger("size", gsl::narrow<uint64_t>(Payload.GetSize())); - Cbo.EndObject(); - } - } - Cbo.EndArray(); - } - else - { - Cbo.AddInteger("attachmentcount", - gsl::narrow<uint64_t>(ValueIt.second.Attachments.size())); - size_t AttachmentsSize = 0; - for (const IoHash& Hash : ValueIt.second.Attachments) - { - IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); - AttachmentsSize += Payload.GetSize(); - } - Cbo.AddInteger("attachmentssize", gsl::narrow<uint64_t>(AttachmentsSize)); - } - } - } - } - Cbo.EndObject(); - } - } - Cbo.EndArray(); - } - Cbo.EndObject(); - } - } - Cbo.EndArray(); - } - Cbo.EndObject(); - } - } - Cbo.EndArray(); - Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); - } -} - -void -HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) -{ - metrics::OperationTiming::Scope $(m_HttpRequests); - - std::string_view Key = Request.RelativeUri(); - if (Key == HttpZCacheRPCPrefix) - { - return HandleRpcRequest(Request); - } - - if (Key == HttpZCacheUtilStartRecording) - { - m_RequestRecorder.reset(); - HttpServerRequest::QueryParams Params = Request.GetQueryParams(); - std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path"))); - m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath); - Request.WriteResponse(HttpResponseCode::OK); - return; - } - if (Key == HttpZCacheUtilStopRecording) - { - m_RequestRecorder.reset(); - Request.WriteResponse(HttpResponseCode::OK); - return; - } - if (Key == HttpZCacheUtilReplayRecording) - { - m_RequestRecorder.reset(); - HttpServerRequest::QueryParams Params = Request.GetQueryParams(); - std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path"))); - uint32_t ThreadCount = std::thread::hardware_concurrency(); - if (auto Param = Params.GetValue("thread_count"); Param.empty() == false) - { - if (auto Value = ParseInt<uint64_t>(Param)) - { - ThreadCount = gsl::narrow<uint32_t>(Value.value()); - } - } - std::unique_ptr<cache::IRpcRequestReplayer> Replayer(cache::MakeDiskRequestReplayer(RecordPath, false)); - ReplayRequestRecorder(*Replayer, ThreadCount < 1 ? 1 : ThreadCount); - Request.WriteResponse(HttpResponseCode::OK); - return; - } - if (Key.starts_with(HttpZCacheDetailsPrefix)) - { - HandleDetailsRequest(Request); - return; - } - - HttpRequestData RequestData; - if (!HttpRequestParseRelativeUri(Key, RequestData)) - { - return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL - } - - if (RequestData.ValueContentId.has_value()) - { - ZEN_ASSERT(RequestData.Namespace.has_value()); - ZEN_ASSERT(RequestData.Bucket.has_value()); - ZEN_ASSERT(RequestData.HashKey.has_value()); - CacheRef Ref = {.Namespace = RequestData.Namespace.value(), - .BucketSegment = RequestData.Bucket.value(), - .HashKey = RequestData.HashKey.value(), - .ValueContentId = RequestData.ValueContentId.value()}; - return HandleCacheChunkRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams())); - } - - if (RequestData.HashKey.has_value()) - { - ZEN_ASSERT(RequestData.Namespace.has_value()); - ZEN_ASSERT(RequestData.Bucket.has_value()); - CacheRef Ref = {.Namespace = RequestData.Namespace.value(), - .BucketSegment = RequestData.Bucket.value(), - .HashKey = RequestData.HashKey.value(), - .ValueContentId = IoHash::Zero}; - return HandleCacheRecordRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams())); - } - - if (RequestData.Bucket.has_value()) - { - ZEN_ASSERT(RequestData.Namespace.has_value()); - return HandleCacheBucketRequest(Request, RequestData.Namespace.value(), RequestData.Bucket.value()); - } - - if (RequestData.Namespace.has_value()) - { - return HandleCacheNamespaceRequest(Request, RequestData.Namespace.value()); - } - return HandleCacheRequest(Request); -} - -void -HttpStructuredCacheService::HandleCacheRequest(HttpServerRequest& Request) -{ - switch (Request.RequestVerb()) - { - case HttpVerb::kHead: - case HttpVerb::kGet: - { - ZenCacheStore::Info Info = m_CacheStore.GetInfo(); - - CbObjectWriter ResponseWriter; - - ResponseWriter.BeginObject("Configuration"); - { - ExtendableStringBuilder<128> BasePathString; - BasePathString << Info.Config.BasePath.u8string(); - ResponseWriter.AddString("BasePath"sv, BasePathString.ToView()); - ResponseWriter.AddBool("AllowAutomaticCreationOfNamespaces", Info.Config.AllowAutomaticCreationOfNamespaces); - } - ResponseWriter.EndObject(); - - std::sort(begin(Info.NamespaceNames), end(Info.NamespaceNames), [](std::string_view L, std::string_view R) { - return L.compare(R) < 0; - }); - ResponseWriter.BeginArray("Namespaces"); - for (const std::string& NamespaceName : Info.NamespaceNames) - { - ResponseWriter.AddString(NamespaceName); - } - ResponseWriter.EndArray(); - ResponseWriter.BeginObject("StorageSize"); - { - ResponseWriter.AddInteger("DiskSize", Info.StorageSize.DiskSize); - ResponseWriter.AddInteger("MemorySize", Info.StorageSize.MemorySize); - } - - ResponseWriter.EndObject(); - - ResponseWriter.AddInteger("DiskEntryCount", Info.DiskEntryCount); - ResponseWriter.AddInteger("MemoryEntryCount", Info.MemoryEntryCount); - - return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); - } - break; - } -} - -void -HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view NamespaceName) -{ - switch (Request.RequestVerb()) - { - case HttpVerb::kHead: - case HttpVerb::kGet: - { - std::optional<ZenCacheNamespace::Info> Info = m_CacheStore.GetNamespaceInfo(NamespaceName); - if (!Info.has_value()) - { - return Request.WriteResponse(HttpResponseCode::NotFound); - } - - CbObjectWriter ResponseWriter; - - ResponseWriter.BeginObject("Configuration"); - { - ExtendableStringBuilder<128> BasePathString; - BasePathString << Info->Config.RootDir.u8string(); - ResponseWriter.AddString("RootDir"sv, BasePathString.ToView()); - ResponseWriter.AddInteger("DiskLayerThreshold"sv, Info->Config.DiskLayerThreshold); - } - ResponseWriter.EndObject(); - - std::sort(begin(Info->BucketNames), end(Info->BucketNames), [](std::string_view L, std::string_view R) { - return L.compare(R) < 0; - }); - - ResponseWriter.BeginArray("Buckets"sv); - for (const std::string& BucketName : Info->BucketNames) - { - ResponseWriter.AddString(BucketName); - } - ResponseWriter.EndArray(); - - ResponseWriter.BeginObject("StorageSize"sv); - { - ResponseWriter.AddInteger("DiskSize"sv, Info->DiskLayerInfo.TotalSize); - ResponseWriter.AddInteger("MemorySize"sv, Info->MemoryLayerInfo.TotalSize); - } - ResponseWriter.EndObject(); - - ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount); - ResponseWriter.AddInteger("MemoryEntryCount", Info->MemoryLayerInfo.EntryCount); - - return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); - } - break; - - case HttpVerb::kDelete: - // Drop namespace - { - if (m_CacheStore.DropNamespace(NamespaceName)) - { - return Request.WriteResponse(HttpResponseCode::OK); - } - else - { - return Request.WriteResponse(HttpResponseCode::NotFound); - } - } - break; - - default: - break; - } -} - -void -HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, - std::string_view NamespaceName, - std::string_view BucketName) -{ - switch (Request.RequestVerb()) - { - case HttpVerb::kHead: - case HttpVerb::kGet: - { - std::optional<ZenCacheNamespace::BucketInfo> Info = m_CacheStore.GetBucketInfo(NamespaceName, BucketName); - if (!Info.has_value()) - { - return Request.WriteResponse(HttpResponseCode::NotFound); - } - - CbObjectWriter ResponseWriter; - - ResponseWriter.BeginObject("StorageSize"); - { - ResponseWriter.AddInteger("DiskSize", Info->DiskLayerInfo.TotalSize); - ResponseWriter.AddInteger("MemorySize", Info->MemoryLayerInfo.TotalSize); - } - ResponseWriter.EndObject(); - - ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount); - ResponseWriter.AddInteger("MemoryEntryCount", Info->MemoryLayerInfo.EntryCount); - - return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); - } - break; - - case HttpVerb::kDelete: - // Drop bucket - { - if (m_CacheStore.DropBucket(NamespaceName, BucketName)) - { - return Request.WriteResponse(HttpResponseCode::OK); - } - else - { - return Request.WriteResponse(HttpResponseCode::NotFound); - } - } - break; - - default: - break; - } -} - -void -HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) -{ - switch (Request.RequestVerb()) - { - case HttpVerb::kHead: - case HttpVerb::kGet: - { - HandleGetCacheRecord(Request, Ref, PolicyFromUrl); - } - break; - - case HttpVerb::kPut: - HandlePutCacheRecord(Request, Ref, PolicyFromUrl); - break; - default: - break; - } -} - -void -HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) -{ - const ZenContentType AcceptType = Request.AcceptContentType(); - const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData); - const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord); - - bool Success = false; - ZenCacheValue ClientResultValue; - if (!EnumHasAnyFlags(PolicyFromUrl, CachePolicy::Query)) - { - return Request.WriteResponse(HttpResponseCode::OK); - } - - Stopwatch Timer; - - if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal) && - m_CacheStore.Get(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue)) - { - Success = true; - ZenContentType ContentType = ClientResultValue.Value.GetContentType(); - - if (AcceptType == ZenContentType::kCbPackage) - { - if (ContentType == ZenContentType::kCbObject) - { - CbPackage Package; - uint32_t MissingCount = 0; - - CbObjectView CacheRecord(ClientResultValue.Value.Data()); - CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) { - if (SkipData) - { - if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) - { - MissingCount++; - } - } - else - { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) - { - CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); - Package.AddAttachment(CbAttachment(Compressed, AttachmentHash.AsHash())); - } - else - { - MissingCount++; - } - } - }); - - Success = MissingCount == 0 || PartialRecord; - - if (Success) - { - Package.SetObject(LoadCompactBinaryObject(ClientResultValue.Value)); - - BinaryWriter MemStream; - Package.Save(MemStream); - - ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - ClientResultValue.Value.SetContentType(HttpContentType::kCbPackage); - } - } - else - { - Success = false; - } - } - else if (AcceptType != ClientResultValue.Value.GetContentType() && AcceptType != ZenContentType::kUnknownContentType && - AcceptType != ZenContentType::kBinary) - { - Success = false; - } - } - - if (Success) - { - ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {} '{}' (LOCAL) in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(ClientResultValue.Value.Size()), - ToString(ClientResultValue.Value.GetContentType()), - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - - m_CacheStats.HitCount++; - if (SkipData && AcceptType != ZenContentType::kCbPackage && AcceptType != ZenContentType::kCbObject) - { - return Request.WriteResponse(HttpResponseCode::OK); - } - else - { - // kCbPackage handled SkipData when constructing the ClientResultValue, kcbObject ignores SkipData - return Request.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); - } - } - else if (!EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryRemote)) - { - ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(AcceptType), - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - m_CacheStats.MissCount++; - return Request.WriteResponse(HttpResponseCode::NotFound); - } - - // Issue upstream query asynchronously in order to keep requests flowing without - // hogging I/O servicing threads with blocking work - - uint64_t LocalElapsedTimeUs = Timer.GetElapsedTimeUs(); - - Request.WriteResponseAsync([this, AcceptType, PolicyFromUrl, Ref, LocalElapsedTimeUs](HttpServerRequest& AsyncRequest) { - Stopwatch Timer; - bool Success = false; - const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord); - const bool QueryLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); - const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal); - const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData); - ZenCacheValue ClientResultValue; - - metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming); - - if (GetUpstreamCacheSingleResult UpstreamResult = - m_UpstreamCache.GetCacheRecord(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, AcceptType); - UpstreamResult.Status.Success) - { - Success = true; - - ClientResultValue.Value = UpstreamResult.Value; - ClientResultValue.Value.SetContentType(AcceptType); - - if (AcceptType == ZenContentType::kBinary || AcceptType == ZenContentType::kCbObject) - { - if (AcceptType == ZenContentType::kCbObject) - { - const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All); - if (ValidationResult != CbValidateError::None) - { - Success = false; - ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid compact binary object from upstream", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(AcceptType)); - } - - // We do not do anything to the returned object for SkipData, only package attachments are cut when skipping data - } - - if (Success && StoreLocal) - { - m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue); - } - } - else if (AcceptType == ZenContentType::kCbPackage) - { - CbPackage Package; - if (Package.TryLoad(ClientResultValue.Value)) - { - CbObject CacheRecord = Package.GetObject(); - AttachmentCount Count; - size_t NumAttachments = Package.GetAttachments().size(); - std::vector<const CbAttachment*> AttachmentsToStoreLocally; - AttachmentsToStoreLocally.reserve(NumAttachments); - - CacheRecord.IterateAttachments( - [this, &Package, &Ref, &AttachmentsToStoreLocally, &Count, QueryLocal, StoreLocal, SkipData](CbFieldView HashView) { - IoHash Hash = HashView.AsHash(); - if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) - { - if (Attachment->IsCompressedBinary()) - { - if (StoreLocal) - { - AttachmentsToStoreLocally.emplace_back(Attachment); - } - Count.Valid++; - } - else - { - ZEN_WARN("Uncompressed value '{}' from upstream cache record '{}/{}'", - Hash, - Ref.BucketSegment, - Ref.HashKey); - Count.Invalid++; - } - } - else if (QueryLocal) - { - if (SkipData) - { - if (m_CidStore.ContainsChunk(Hash)) - { - Count.Valid++; - } - } - else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash)) - { - CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); - if (Compressed) - { - Package.AddAttachment(CbAttachment(Compressed, Hash)); - Count.Valid++; - } - else - { - ZEN_WARN("Uncompressed value '{}' stored in local cache '{}/{}'", - Hash, - Ref.BucketSegment, - Ref.HashKey); - Count.Invalid++; - } - } - } - Count.Total++; - }); - - if ((Count.Valid == Count.Total) || PartialRecord) - { - ZenCacheValue CacheValue; - CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); - CacheValue.Value.SetContentType(ZenContentType::kCbObject); - - if (StoreLocal) - { - m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue); - } - - for (const CbAttachment* Attachment : AttachmentsToStoreLocally) - { - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = - m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); - if (InsertResult.New) - { - Count.New++; - } - } - - BinaryWriter MemStream; - if (SkipData) - { - // Save a package containing only the object. - CbPackage(Package.GetObject()).Save(MemStream); - } - else - { - Package.Save(MemStream); - } - - ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - ClientResultValue.Value.SetContentType(ZenContentType::kCbPackage); - } - else - { - Success = false; - ZEN_WARN("Get - '{}/{}' '{}' FAILED, attachments missing in upstream package", - Ref.BucketSegment, - Ref.HashKey, - ToString(AcceptType)); - } - } - else - { - Success = false; - ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid upstream package", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(AcceptType)); - } - } - } - - if (Success) - { - ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {} '{}' (UPSTREAM) in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(ClientResultValue.Value.Size()), - ToString(ClientResultValue.Value.GetContentType()), - NiceLatencyNs((LocalElapsedTimeUs + Timer.GetElapsedTimeUs()) * 1000)); - - m_CacheStats.HitCount++; - m_CacheStats.UpstreamHitCount++; - - if (SkipData && AcceptType == ZenContentType::kBinary) - { - AsyncRequest.WriteResponse(HttpResponseCode::OK); - } - else - { - // Other methods modify ClientResultValue to a version that has skipped the data but keeps the Object and optionally - // metadata. - AsyncRequest.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); - } - } - else - { - ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(AcceptType), - NiceLatencyNs((LocalElapsedTimeUs + Timer.GetElapsedTimeUs()) * 1000)); - m_CacheStats.MissCount++; - AsyncRequest.WriteResponse(HttpResponseCode::NotFound); - } - }); -} - -void -HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) -{ - IoBuffer Body = Request.ReadPayload(); - - if (!Body || Body.Size() == 0) - { - return Request.WriteResponse(HttpResponseCode::BadRequest); - } - - const HttpContentType ContentType = Request.RequestContentType(); - - Body.SetContentType(ContentType); - - Stopwatch Timer; - - if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kCompressedBinary) - { - IoHash RawHash = IoHash::Zero; - uint64_t RawSize = Body.GetSize(); - if (ContentType == HttpContentType::kCompressedBinary) - { - if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize)) - { - return Request.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "Payload is not a valid compressed binary"sv); - } - } - else - { - RawHash = IoHash::HashBuffer(SharedBuffer(Body)); - } - m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}); - - if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote)) - { - m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}}); - } - - ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(Body.Size()), - ToString(ContentType), - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - Request.WriteResponse(HttpResponseCode::Created); - } - else if (ContentType == HttpContentType::kCbObject) - { - const CbValidateError ValidationResult = ValidateCompactBinary(MemoryView(Body.GetData(), Body.GetSize()), CbValidateMode::All); - - if (ValidationResult != CbValidateError::None) - { - ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, invalid compact binary", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(ContentType)); - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv); - } - - Body.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}); - - CbObjectView CacheRecord(Body.Data()); - std::vector<IoHash> ValidAttachments; - int32_t TotalCount = 0; - - CacheRecord.IterateAttachments([this, &TotalCount, &ValidAttachments](CbFieldView AttachmentHash) { - const IoHash Hash = AttachmentHash.AsHash(); - if (m_CidStore.ContainsChunk(Hash)) - { - ValidAttachments.emplace_back(Hash); - } - TotalCount++; - }); - - ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total) in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(Body.Size()), - ToString(ContentType), - TotalCount, - ValidAttachments.size(), - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - - const bool IsPartialRecord = TotalCount != static_cast<int32_t>(ValidAttachments.size()); - - CachePolicy Policy = PolicyFromUrl; - if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) - { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject, - .Namespace = Ref.Namespace, - .Key = {Ref.BucketSegment, Ref.HashKey}, - .ValueContentIds = std::move(ValidAttachments)}); - } - - Request.WriteResponse(HttpResponseCode::Created); - } - else if (ContentType == HttpContentType::kCbPackage) - { - CbPackage Package; - - if (!Package.TryLoad(Body)) - { - ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, invalid package", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(ContentType)); - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv); - } - CachePolicy Policy = PolicyFromUrl; - - CbObject CacheRecord = Package.GetObject(); - - AttachmentCount Count; - size_t NumAttachments = Package.GetAttachments().size(); - std::vector<IoHash> ValidAttachments; - std::vector<const CbAttachment*> AttachmentsToStoreLocally; - ValidAttachments.reserve(NumAttachments); - AttachmentsToStoreLocally.reserve(NumAttachments); - - CacheRecord.IterateAttachments([this, &Ref, &Package, &AttachmentsToStoreLocally, &ValidAttachments, &Count](CbFieldView HashView) { - const IoHash Hash = HashView.AsHash(); - if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) - { - if (Attachment->IsCompressedBinary()) - { - AttachmentsToStoreLocally.emplace_back(Attachment); - ValidAttachments.emplace_back(Hash); - Count.Valid++; - } - else - { - ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(HttpContentType::kCbPackage), - Hash); - Count.Invalid++; - } - } - else if (m_CidStore.ContainsChunk(Hash)) - { - ValidAttachments.emplace_back(Hash); - Count.Valid++; - } - Count.Total++; - }); - - if (Count.Invalid > 0) - { - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv); - } - - ZenCacheValue CacheValue; - CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); - CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue); - - for (const CbAttachment* Attachment : AttachmentsToStoreLocally) - { - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); - if (InsertResult.New) - { - Count.New++; - } - } - - ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total) in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(Body.GetSize()), - ToString(ContentType), - Count.New, - Count.Valid, - Count.Total, - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - - const bool IsPartialRecord = Count.Valid != Count.Total; - - if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) - { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, - .Namespace = Ref.Namespace, - .Key = {Ref.BucketSegment, Ref.HashKey}, - .ValueContentIds = std::move(ValidAttachments)}); - } - - Request.WriteResponse(HttpResponseCode::Created); - } - else - { - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Content-Type invalid"sv); - } -} - -void -HttpStructuredCacheService::HandleCacheChunkRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) -{ - switch (Request.RequestVerb()) - { - case HttpVerb::kHead: - case HttpVerb::kGet: - HandleGetCacheChunk(Request, Ref, PolicyFromUrl); - break; - case HttpVerb::kPut: - HandlePutCacheChunk(Request, Ref, PolicyFromUrl); - break; - default: - break; - } -} - -void -HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) -{ - Stopwatch Timer; - - IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId); - const UpstreamEndpointInfo* Source = nullptr; - CachePolicy Policy = PolicyFromUrl; - { - const bool QueryUpstream = !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote); - - if (QueryUpstream) - { - if (GetUpstreamCacheSingleResult UpstreamResult = - m_UpstreamCache.GetCacheChunk(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); - UpstreamResult.Status.Success) - { - IoHash RawHash; - uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(UpstreamResult.Value, RawHash, RawSize)) - { - if (RawHash == Ref.ValueContentId) - { - m_CidStore.AddChunk(UpstreamResult.Value, RawHash); - Source = UpstreamResult.Source; - } - else - { - ZEN_WARN("got missmatching upstream cache value"); - } - } - else - { - ZEN_WARN("got uncompressed upstream cache value"); - } - } - } - } - - if (!Value) - { - ZEN_DEBUG("GETCACHECHUNK MISS - '{}/{}/{}/{}' '{}' in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - Ref.ValueContentId, - ToString(Request.AcceptContentType()), - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - m_CacheStats.MissCount++; - return Request.WriteResponse(HttpResponseCode::NotFound); - } - - ZEN_DEBUG("GETCACHECHUNK HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - Ref.ValueContentId, - NiceBytes(Value.Size()), - ToString(Value.GetContentType()), - Source ? Source->Url : "LOCAL"sv, - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - - m_CacheStats.HitCount++; - if (Source) - { - m_CacheStats.UpstreamHitCount++; - } - - if (EnumHasAllFlags(Policy, CachePolicy::SkipData)) - { - Request.WriteResponse(HttpResponseCode::OK); - } - else - { - Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value); - } -} - -void -HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) -{ - // Note: Individual cacherecord values are not propagated upstream until a valid cache record has been stored - ZEN_UNUSED(PolicyFromUrl); - - Stopwatch Timer; - - IoBuffer Body = Request.ReadPayload(); - - if (!Body || Body.Size() == 0) - { - return Request.WriteResponse(HttpResponseCode::BadRequest); - } - - Body.SetContentType(Request.RequestContentType()); - - IoHash RawHash; - uint64_t RawSize; - if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize)) - { - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv); - } - - if (RawHash != Ref.ValueContentId) - { - return Request.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "ValueContentId does not match attachment hash"sv); - } - - CidStore::InsertResult Result = m_CidStore.AddChunk(Body, RawHash); - - ZEN_DEBUG("PUTCACHECHUNK - '{}/{}/{}/{}' {} '{}' ({}) in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - Ref.ValueContentId, - NiceBytes(Body.Size()), - ToString(Body.GetContentType()), - Result.New ? "NEW" : "OLD", - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - - const HttpResponseCode ResponseCode = Result.New ? HttpResponseCode::Created : HttpResponseCode::OK; - - Request.WriteResponse(ResponseCode); -} - -CbPackage -HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType, - IoBuffer&& Body, - uint32_t& OutAcceptMagic, - RpcAcceptOptions& OutAcceptFlags, - int& OutTargetProcessId) -{ - CbPackage Package; - CbObjectView Object; - CbObject ObjectBuffer; - if (ContentType == ZenContentType::kCbObject) - { - ObjectBuffer = LoadCompactBinaryObject(std::move(Body)); - Object = ObjectBuffer; - } - else - { - Package = ParsePackageMessage(Body); - Object = Package.GetObject(); - } - OutAcceptMagic = Object["Accept"sv].AsUInt32(); - OutAcceptFlags = static_cast<RpcAcceptOptions>(Object["AcceptFlags"sv].AsUInt16(0u)); - OutTargetProcessId = Object["Pid"sv].AsInt32(0); - - const std::string_view Method = Object["Method"sv].AsString(); - - if (Method == "PutCacheRecords"sv) - { - return HandleRpcPutCacheRecords(Package); - } - else if (Method == "GetCacheRecords"sv) - { - return HandleRpcGetCacheRecords(Object); - } - else if (Method == "PutCacheValues"sv) - { - return HandleRpcPutCacheValues(Package); - } - else if (Method == "GetCacheValues"sv) - { - return HandleRpcGetCacheValues(Object); - } - else if (Method == "GetCacheChunks"sv) - { - return HandleRpcGetCacheChunks(Object); - } - return CbPackage{}; -} - -void -HttpStructuredCacheService::ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount) -{ - WorkerThreadPool WorkerPool(ThreadCount); - uint64_t RequestCount = Replayer.GetRequestCount(); - Stopwatch Timer; - auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); - Latch JobLatch(RequestCount); - ZEN_INFO("Replaying {} requests", RequestCount); - for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) - { - WorkerPool.ScheduleWork([this, &JobLatch, &Replayer, RequestIndex]() { - IoBuffer Body; - std::pair<ZenContentType, ZenContentType> ContentType = Replayer.GetRequest(RequestIndex, Body); - if (Body) - { - uint32_t AcceptMagic = 0; - RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; - int TargetPid = 0; - CbPackage RpcResult = HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid); - if (AcceptMagic == kCbPkgMagic) - { - FormatFlags Flags = FormatFlags::kDefault; - if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) - { - Flags |= FormatFlags::kAllowLocalReferences; - if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) - { - Flags |= FormatFlags::kDenyPartialLocalReferences; - } - } - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetPid); - ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0); - } - else - { - BinaryWriter MemStream; - RpcResult.Save(MemStream); - IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()); - ZEN_ASSERT(RpcResponseBuffer.Size() > 0); - } - } - JobLatch.CountDown(); - }); - } - while (!JobLatch.Wait(10000)) - { - ZEN_INFO("Replayed {} of {} requests, elapsed {}", - RequestCount - JobLatch.Remaining(), - RequestCount, - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - } -} - -void -HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) -{ - switch (Request.RequestVerb()) - { - case HttpVerb::kPost: - { - const HttpContentType ContentType = Request.RequestContentType(); - const HttpContentType AcceptType = Request.AcceptContentType(); - - if ((ContentType != HttpContentType::kCbObject && ContentType != HttpContentType::kCbPackage) || - AcceptType != HttpContentType::kCbPackage) - { - return Request.WriteResponse(HttpResponseCode::BadRequest); - } - - Request.WriteResponseAsync( - [this, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable { - std::uint64_t RequestIndex = - m_RequestRecorder ? m_RequestRecorder->RecordRequest(ContentType, AcceptType, Body) : ~0ull; - uint32_t AcceptMagic = 0; - RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; - int TargetProcessId = 0; - CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags, TargetProcessId); - if (RpcResult.IsNull()) - { - AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); - return; - } - if (AcceptMagic == kCbPkgMagic) - { - FormatFlags Flags = FormatFlags::kDefault; - if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) - { - Flags |= FormatFlags::kAllowLocalReferences; - if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) - { - Flags |= FormatFlags::kDenyPartialLocalReferences; - } - } - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessId); - if (RequestIndex != ~0ull) - { - ZEN_ASSERT(m_RequestRecorder); - m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer); - } - AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); - } - else - { - BinaryWriter MemStream; - RpcResult.Save(MemStream); - - if (RequestIndex != ~0ull) - { - ZEN_ASSERT(m_RequestRecorder); - m_RequestRecorder->RecordResponse(RequestIndex, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); - } - AsyncRequest.WriteResponse(HttpResponseCode::OK, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); - } - }); - } - break; - default: - Request.WriteResponse(HttpResponseCode::BadRequest); - break; - } -} - -CbPackage -HttpStructuredCacheService::HandleRpcPutCacheRecords(const CbPackage& BatchRequest) -{ - ZEN_TRACE_CPU("Z$::RpcPutCacheRecords"); - CbObjectView BatchObject = BatchRequest.GetObject(); - ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv); - - CbObjectView Params = BatchObject["Params"sv].AsObjectView(); - CachePolicy DefaultPolicy; - - std::string_view PolicyText = Params["DefaultPolicy"].AsString(); - std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); - if (!Namespace) - { - return CbPackage{}; - } - DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; - std::vector<bool> Results; - for (CbFieldView RequestField : Params["Requests"sv]) - { - CbObjectView RequestObject = RequestField.AsObjectView(); - CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView(); - CbObjectView KeyView = RecordObject["Key"sv].AsObjectView(); - - CacheKey Key; - if (!GetRpcRequestCacheKey(KeyView, Key)) - { - return CbPackage{}; - } - CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); - PutRequestData PutRequest{*Namespace, std::move(Key), RecordObject, std::move(Policy)}; - - PutResult Result = PutCacheRecord(PutRequest, &BatchRequest); - - if (Result == PutResult::Invalid) - { - return CbPackage{}; - } - Results.push_back(Result == PutResult::Success); - } - if (Results.empty()) - { - return CbPackage{}; - } - - CbObjectWriter ResponseObject; - ResponseObject.BeginArray("Result"sv); - for (bool Value : Results) - { - ResponseObject.AddBool(Value); - } - ResponseObject.EndArray(); - - CbPackage RpcResponse; - RpcResponse.SetObject(ResponseObject.Save()); - return RpcResponse; -} - -HttpStructuredCacheService::PutResult -HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPackage* Package) -{ - CbObjectView Record = Request.RecordObject; - uint64_t RecordObjectSize = Record.GetSize(); - uint64_t TransferredSize = RecordObjectSize; - - AttachmentCount Count; - size_t NumAttachments = Package->GetAttachments().size(); - std::vector<IoHash> ValidAttachments; - std::vector<const CbAttachment*> AttachmentsToStoreLocally; - ValidAttachments.reserve(NumAttachments); - AttachmentsToStoreLocally.reserve(NumAttachments); - - Stopwatch Timer; - - Request.RecordObject.IterateAttachments( - [this, &Request, Package, &AttachmentsToStoreLocally, &ValidAttachments, &Count, &TransferredSize](CbFieldView HashView) { - const IoHash ValueHash = HashView.AsHash(); - if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr) - { - if (Attachment->IsCompressedBinary()) - { - AttachmentsToStoreLocally.emplace_back(Attachment); - ValidAttachments.emplace_back(ValueHash); - Count.Valid++; - } - else - { - ZEN_WARN("PUTCACEHRECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", - Request.Namespace, - Request.Key.Bucket, - Request.Key.Hash, - ToString(HttpContentType::kCbPackage), - ValueHash); - Count.Invalid++; - } - } - else if (m_CidStore.ContainsChunk(ValueHash)) - { - ValidAttachments.emplace_back(ValueHash); - Count.Valid++; - } - Count.Total++; - }); - - if (Count.Invalid > 0) - { - return PutResult::Invalid; - } - - ZenCacheValue CacheValue; - CacheValue.Value = IoBuffer(Record.GetSize()); - Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); - CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue); - - for (const CbAttachment* Attachment : AttachmentsToStoreLocally) - { - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); - if (InsertResult.New) - { - Count.New++; - } - TransferredSize += Chunk.GetCompressedSize(); - } - - ZEN_DEBUG("PUTCACEHRECORD - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total) in {}", - Request.Namespace, - Request.Key.Bucket, - Request.Key.Hash, - NiceBytes(TransferredSize), - Count.New, - Count.Valid, - Count.Total, - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - - const bool IsPartialRecord = Count.Valid != Count.Total; - - if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord) - { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, - .Namespace = Request.Namespace, - .Key = Request.Key, - .ValueContentIds = std::move(ValidAttachments)}); - } - return PutResult::Success; -} - -CbPackage -HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) -{ - ZEN_TRACE_CPU("Z$::RpcGetCacheRecords"); - - ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv); - - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); - - struct ValueRequestData - { - Oid ValueId; - IoHash ContentId; - CompressedBuffer Payload; - CachePolicy DownstreamPolicy; - bool Exists = false; - bool ReadFromUpstream = false; - }; - struct RecordRequestData - { - CacheKeyRequest Upstream; - CbObjectView RecordObject; - IoBuffer RecordCacheValue; - CacheRecordPolicy DownstreamPolicy; - std::vector<ValueRequestData> Values; - bool Complete = false; - const UpstreamEndpointInfo* Source = nullptr; - uint64_t ElapsedTimeUs; - }; - - std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); - CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; - std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); - if (!Namespace) - { - return CbPackage{}; - } - std::vector<RecordRequestData> Requests; - std::vector<size_t> UpstreamIndexes; - CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - Requests.reserve(RequestsArray.Num()); - - auto ParseValues = [](RecordRequestData& Request) { - CbArrayView ValuesArray = Request.RecordObject["Values"sv].AsArrayView(); - Request.Values.reserve(ValuesArray.Num()); - for (CbFieldView ValueField : ValuesArray) - { - CbObjectView ValueObject = ValueField.AsObjectView(); - Oid ValueId = ValueObject["Id"sv].AsObjectId(); - CbFieldView RawHashField = ValueObject["RawHash"sv]; - IoHash RawHash = RawHashField.AsBinaryAttachment(); - if (ValueId && !RawHashField.HasError()) - { - Request.Values.push_back({ValueId, RawHash}); - Request.Values.back().DownstreamPolicy = Request.DownstreamPolicy.GetValuePolicy(ValueId); - } - } - }; - - for (CbFieldView RequestField : RequestsArray) - { - Stopwatch Timer; - RecordRequestData& Request = Requests.emplace_back(); - CbObjectView RequestObject = RequestField.AsObjectView(); - CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); - - CacheKey& Key = Request.Upstream.Key; - if (!GetRpcRequestCacheKey(KeyObject, Key)) - { - return CbPackage{}; - } - - Request.DownstreamPolicy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); - const CacheRecordPolicy& Policy = Request.DownstreamPolicy; - - ZenCacheValue CacheValue; - bool NeedUpstreamAttachment = false; - bool FoundLocalInvalid = false; - ZenCacheValue RecordCacheValue; - - if (EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryLocal) && - m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, RecordCacheValue)) - { - Request.RecordCacheValue = std::move(RecordCacheValue.Value); - if (Request.RecordCacheValue.GetContentType() != ZenContentType::kCbObject) - { - FoundLocalInvalid = true; - } - else - { - Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData()); - ParseValues(Request); - - Request.Complete = true; - for (ValueRequestData& Value : Request.Values) - { - CachePolicy ValuePolicy = Value.DownstreamPolicy; - if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal)) - { - // A value that is requested without the Query flag (such as None/Disable) counts as existing, because we - // didn't ask for it and thus the record is complete in its absence. - if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) - { - Value.Exists = true; - } - else - { - NeedUpstreamAttachment = true; - Value.ReadFromUpstream = true; - Request.Complete = false; - } - } - else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) - { - if (m_CidStore.ContainsChunk(Value.ContentId)) - { - Value.Exists = true; - } - else - { - if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) - { - NeedUpstreamAttachment = true; - Value.ReadFromUpstream = true; - } - Request.Complete = false; - } - } - else - { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId)) - { - ZEN_ASSERT(Chunk.GetSize() > 0); - Value.Payload = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); - Value.Exists = true; - } - else - { - if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) - { - NeedUpstreamAttachment = true; - Value.ReadFromUpstream = true; - } - Request.Complete = false; - } - } - } - } - } - if (!Request.Complete) - { - bool NeedUpstreamRecord = - !Request.RecordObject && !FoundLocalInvalid && EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryRemote); - if (NeedUpstreamRecord || NeedUpstreamAttachment) - { - UpstreamIndexes.push_back(Requests.size() - 1); - } - } - Request.ElapsedTimeUs = Timer.GetElapsedTimeUs(); - } - if (Requests.empty()) - { - return CbPackage{}; - } - - if (!UpstreamIndexes.empty()) - { - std::vector<CacheKeyRequest*> UpstreamRequests; - UpstreamRequests.reserve(UpstreamIndexes.size()); - for (size_t Index : UpstreamIndexes) - { - RecordRequestData& Request = Requests[Index]; - UpstreamRequests.push_back(&Request.Upstream); - - if (Request.Values.size()) - { - // We will be returning the local object and know all the value Ids that exist in it - // Convert all their Downstream Values to upstream values, and add SkipData to any ones that we already have. - CachePolicy UpstreamBasePolicy = ConvertToUpstream(Request.DownstreamPolicy.GetBasePolicy()) | CachePolicy::SkipMeta; - CacheRecordPolicyBuilder Builder(UpstreamBasePolicy); - for (ValueRequestData& Value : Request.Values) - { - CachePolicy UpstreamPolicy = ConvertToUpstream(Value.DownstreamPolicy); - UpstreamPolicy |= !Value.ReadFromUpstream ? CachePolicy::SkipData : CachePolicy::None; - Builder.AddValuePolicy(Value.ValueId, UpstreamPolicy); - } - Request.Upstream.Policy = Builder.Build(); - } - else - { - // We don't know which Values exist in the Record; ask the upstrem for all values that the client wants, - // and convert the CacheRecordPolicy to an upstream policy - Request.Upstream.Policy = Request.DownstreamPolicy.ConvertToUpstream(); - } - } - - const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues](CacheRecordGetCompleteParams&& Params) { - if (!Params.Record) - { - return; - } - - RecordRequestData& Request = - *reinterpret_cast<RecordRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(RecordRequestData, Upstream)); - Request.ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0); - const CacheKey& Key = Request.Upstream.Key; - Stopwatch Timer; - auto TimeGuard = MakeGuard([&Timer, &Request]() { Request.ElapsedTimeUs += Timer.GetElapsedTimeUs(); }); - if (!Request.RecordObject) - { - CbObject ObjectBuffer = CbObject::Clone(Params.Record); - Request.RecordCacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); - Request.RecordCacheValue.SetContentType(ZenContentType::kCbObject); - Request.RecordObject = ObjectBuffer; - if (EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal)) - { - m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}); - } - ParseValues(Request); - Request.Source = Params.Source; - } - - Request.Complete = true; - for (ValueRequestData& Value : Request.Values) - { - if (Value.Exists) - { - continue; - } - CachePolicy ValuePolicy = Value.DownstreamPolicy; - if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) - { - Request.Complete = false; - continue; - } - if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData) || EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) - { - if (const CbAttachment* Attachment = Params.Package.FindAttachment(Value.ContentId)) - { - if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) - { - Request.Source = Params.Source; - Value.Exists = true; - if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) - { - m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); - } - if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) - { - Value.Payload = Compressed; - } - } - else - { - ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}/{}'", - Value.ContentId, - *Namespace, - Key.Bucket, - Key.Hash); - } - } - if (!Value.Exists && !EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) - { - Request.Complete = false; - } - // Request.Complete does not need to be set to false for upstream SkipData attachments. - // In the PartialRecord==false case, the upstream will have failed the entire record if any SkipData attachment - // didn't exist and we will not get here. In the PartialRecord==true case, we do not need to inform the client of - // any missing SkipData attachments. - } - Request.ElapsedTimeUs += Timer.GetElapsedTimeUs(); - } - }; - - m_UpstreamCache.GetCacheRecords(*Namespace, UpstreamRequests, std::move(OnCacheRecordGetComplete)); - } - - CbPackage ResponsePackage; - CbObjectWriter ResponseObject; - - ResponseObject.BeginArray("Result"sv); - for (RecordRequestData& Request : Requests) - { - const CacheKey& Key = Request.Upstream.Key; - if (Request.Complete || - (Request.RecordObject && EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::PartialRecord))) - { - ResponseObject << Request.RecordObject; - for (ValueRequestData& Value : Request.Values) - { - if (!EnumHasAllFlags(Value.DownstreamPolicy, CachePolicy::SkipData) && Value.Payload) - { - ResponsePackage.AddAttachment(CbAttachment(Value.Payload, Value.ContentId)); - } - } - - ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {}{} ({}) in {}", - *Namespace, - Key.Bucket, - Key.Hash, - NiceBytes(Request.RecordCacheValue.Size()), - Request.Complete ? ""sv : " (PARTIAL)"sv, - Request.Source ? Request.Source->Url : "LOCAL"sv, - NiceLatencyNs(Request.ElapsedTimeUs * 1000)); - m_CacheStats.HitCount++; - m_CacheStats.UpstreamHitCount += Request.Source ? 1 : 0; - } - else - { - ResponseObject.AddNull(); - - if (!EnumHasAnyFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::Query)) - { - // If they requested no query, do not record this as a miss - ZEN_DEBUG("GETCACHERECORD DISABLEDQUERY - '{}/{}/{}' in {}", - *Namespace, - Key.Bucket, - Key.Hash, - NiceLatencyNs(Request.ElapsedTimeUs * 1000)); - } - else - { - ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}'{} ({}) in {}", - *Namespace, - Key.Bucket, - Key.Hash, - Request.RecordObject ? ""sv : " (PARTIAL)"sv, - Request.Source ? Request.Source->Url : "LOCAL"sv, - NiceLatencyNs(Request.ElapsedTimeUs * 1000)); - m_CacheStats.MissCount++; - } - } - } - ResponseObject.EndArray(); - ResponsePackage.SetObject(ResponseObject.Save()); - return ResponsePackage; -} - -CbPackage -HttpStructuredCacheService::HandleRpcPutCacheValues(const CbPackage& BatchRequest) -{ - CbObjectView BatchObject = BatchRequest.GetObject(); - CbObjectView Params = BatchObject["Params"sv].AsObjectView(); - - std::string_view PolicyText = Params["DefaultPolicy"].AsString(); - CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; - std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); - if (!Namespace) - { - return CbPackage{}; - } - std::vector<bool> Results; - for (CbFieldView RequestField : Params["Requests"sv]) - { - Stopwatch Timer; - - CbObjectView RequestObject = RequestField.AsObjectView(); - CbObjectView KeyView = RequestObject["Key"sv].AsObjectView(); - - CacheKey Key; - if (!GetRpcRequestCacheKey(KeyView, Key)) - { - return CbPackage{}; - } - - PolicyText = RequestObject["Policy"sv].AsString(); - CachePolicy Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; - IoHash RawHash = RequestObject["RawHash"sv].AsBinaryAttachment(); - uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64(); - bool Succeeded = false; - uint64_t TransferredSize = 0; - - if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash)) - { - if (Attachment->IsCompressedBinary()) - { - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) - { - // TODO: Implement upstream puts of CacheValues with StoreLocal == false. - // Currently ProcessCacheRecord requires that the value exist in the local cache to put it upstream. - Policy |= CachePolicy::StoreLocal; - } - - if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) - { - IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); - Value.SetContentType(ZenContentType::kCompressedBinary); - if (RawSize == 0) - { - RawSize = Chunk.DecodeRawSize(); - } - m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}); - TransferredSize = Chunk.GetCompressedSize(); - } - Succeeded = true; - } - else - { - ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash); - return CbPackage{}; - } - } - else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) - { - ZenCacheValue ExistingValue; - if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, ExistingValue) && - IsCompressedBinary(ExistingValue.Value.GetContentType())) - { - Succeeded = true; - } - } - // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put. - // If it doesn't exist locally they should do the heavy put rather than having us fetch it from upstream. - - if (Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) - { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = Key}); - } - Results.push_back(Succeeded); - ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}' in {}", - *Namespace, - Key.Bucket, - Key.Hash, - NiceBytes(TransferredSize), - Succeeded ? "Added"sv : "Invalid", - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - } - if (Results.empty()) - { - return CbPackage{}; - } - - CbObjectWriter ResponseObject; - ResponseObject.BeginArray("Result"sv); - for (bool Value : Results) - { - ResponseObject.AddBool(Value); - } - ResponseObject.EndArray(); - - CbPackage RpcResponse; - RpcResponse.SetObject(ResponseObject.Save()); - - return RpcResponse; -} - -CbPackage -HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest) -{ - ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv); - - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); - std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); - CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; - std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); - if (!Namespace) - { - return CbPackage{}; - } - - struct RequestData - { - CacheKey Key; - CachePolicy Policy; - IoHash RawHash = IoHash::Zero; - uint64_t RawSize = 0; - CompressedBuffer Result; - }; - std::vector<RequestData> Requests; - - std::vector<size_t> RemoteRequestIndexes; - - for (CbFieldView RequestField : Params["Requests"sv]) - { - Stopwatch Timer; - - RequestData& Request = Requests.emplace_back(); - CbObjectView RequestObject = RequestField.AsObjectView(); - CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); - - if (!GetRpcRequestCacheKey(KeyObject, Request.Key)) - { - return CbPackage{}; - } - - PolicyText = RequestObject["Policy"sv].AsString(); - Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; - - CacheKey& Key = Request.Key; - CachePolicy Policy = Request.Policy; - - ZenCacheValue CacheValue; - if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) - { - if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType())) - { - Request.RawHash = CacheValue.RawHash; - Request.RawSize = CacheValue.RawSize; - Request.Result = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value)); - } - } - if (Request.Result) - { - ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", - *Namespace, - Key.Bucket, - Key.Hash, - NiceBytes(Request.Result.GetCompressed().GetSize()), - "LOCAL"sv, - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - m_CacheStats.HitCount++; - } - else if (EnumHasAllFlags(Policy, CachePolicy::QueryRemote)) - { - RemoteRequestIndexes.push_back(Requests.size() - 1); - } - else if (!EnumHasAnyFlags(Policy, CachePolicy::Query)) - { - // If they requested no query, do not record this as a miss - ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash); - } - else - { - ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}", - *Namespace, - Key.Bucket, - Key.Hash, - "LOCAL"sv, - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - m_CacheStats.MissCount++; - } - } - - if (!RemoteRequestIndexes.empty()) - { - std::vector<CacheValueRequest> RequestedRecordsData; - std::vector<CacheValueRequest*> CacheValueRequests; - RequestedRecordsData.reserve(RemoteRequestIndexes.size()); - CacheValueRequests.reserve(RemoteRequestIndexes.size()); - for (size_t Index : RemoteRequestIndexes) - { - RequestData& Request = Requests[Index]; - RequestedRecordsData.push_back({.Key = {Request.Key.Bucket, Request.Key.Hash}, .Policy = ConvertToUpstream(Request.Policy)}); - CacheValueRequests.push_back(&RequestedRecordsData.back()); - } - Stopwatch Timer; - m_UpstreamCache.GetCacheValues( - *Namespace, - CacheValueRequests, - [this, Namespace, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer](CacheValueGetCompleteParams&& Params) { - CacheValueRequest& ChunkRequest = Params.Request; - if (Params.RawHash != IoHash::Zero) - { - size_t RequestOffset = std::distance(RequestedRecordsData.data(), &ChunkRequest); - size_t RequestIndex = RemoteRequestIndexes[RequestOffset]; - RequestData& Request = Requests[RequestIndex]; - Request.RawHash = Params.RawHash; - Request.RawSize = Params.RawSize; - const bool HasData = IsCompressedBinary(Params.Value.GetContentType()); - const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData); - const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal); - const bool IsHit = SkipData || HasData; - if (IsHit) - { - if (HasData && !SkipData) - { - Request.Result = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Params.Value)); - } - - if (HasData && StoreData) - { - m_CacheStore.Put(*Namespace, - Request.Key.Bucket, - Request.Key.Hash, - ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}); - } - - ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", - *Namespace, - ChunkRequest.Key.Bucket, - ChunkRequest.Key.Hash, - NiceBytes(Request.Result.GetCompressed().GetSize()), - Params.Source ? Params.Source->Url : "UPSTREAM", - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - m_CacheStats.HitCount++; - m_CacheStats.UpstreamHitCount++; - return; - } - } - ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}", - *Namespace, - ChunkRequest.Key.Bucket, - ChunkRequest.Key.Hash, - Params.Source ? Params.Source->Url : "UPSTREAM", - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - m_CacheStats.MissCount++; - }); - } - - if (Requests.empty()) - { - return CbPackage{}; - } - - CbPackage RpcResponse; - CbObjectWriter ResponseObject; - ResponseObject.BeginArray("Result"sv); - for (const RequestData& Request : Requests) - { - ResponseObject.BeginObject(); - { - const CompressedBuffer& Result = Request.Result; - if (Result) - { - ResponseObject.AddHash("RawHash"sv, Request.RawHash); - if (!EnumHasAllFlags(Request.Policy, CachePolicy::SkipData)) - { - RpcResponse.AddAttachment(CbAttachment(Result, Request.RawHash)); - } - else - { - ResponseObject.AddInteger("RawSize"sv, Request.RawSize); - } - } - else if (Request.RawHash != IoHash::Zero) - { - ResponseObject.AddHash("RawHash"sv, Request.RawHash); - ResponseObject.AddInteger("RawSize"sv, Request.RawSize); - } - } - ResponseObject.EndObject(); - } - ResponseObject.EndArray(); - - RpcResponse.SetObject(ResponseObject.Save()); - return RpcResponse; -} - -namespace cache::detail { - - struct RecordValue - { - Oid ValueId; - IoHash ContentId; - uint64_t RawSize; - }; - struct RecordBody - { - IoBuffer CacheValue; - std::vector<RecordValue> Values; - const UpstreamEndpointInfo* Source = nullptr; - CachePolicy DownstreamPolicy; - bool Exists = false; - bool HasRequest = false; - bool ValuesRead = false; - }; - struct ChunkRequest - { - CacheChunkRequest* Key = nullptr; - RecordBody* Record = nullptr; - CompressedBuffer Value; - const UpstreamEndpointInfo* Source = nullptr; - uint64_t RawSize = 0; - uint64_t RequestedSize = 0; - uint64_t RequestedOffset = 0; - CachePolicy DownstreamPolicy; - bool Exists = false; - bool RawSizeKnown = false; - bool IsRecordRequest = false; - uint64_t ElapsedTimeUs = 0; - }; - -} // namespace cache::detail - -CbPackage -HttpStructuredCacheService::HandleRpcGetCacheChunks(CbObjectView RpcRequest) -{ - using namespace cache::detail; - - std::string Namespace; - std::vector<CacheKeyRequest> RecordKeys; // Data about a Record necessary to identify it to the upstream - std::vector<RecordBody> Records; // Scratch-space data about a Record when fulfilling RecordRequests - std::vector<CacheChunkRequest> RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream - std::vector<ChunkRequest> Requests; // Intermediate and result data about a ChunkRequest - std::vector<ChunkRequest*> RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key - std::vector<ChunkRequest*> ValueRequests; // The ChunkRequests that are requesting a Value Key - std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream - - // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests - if (!ParseGetCacheChunksRequest(Namespace, RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest)) - { - return CbPackage{}; - } - - // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we - // have it locally, and otherwise append a request for the payload to UpstreamChunks - GetLocalCacheRecords(Namespace, RecordKeys, Records, RecordRequests, UpstreamChunks); - - // For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks - GetLocalCacheValues(Namespace, ValueRequests, UpstreamChunks); - - // Call GetCacheChunks on the upstream for any payloads we do not have locally - GetUpstreamCacheChunks(Namespace, UpstreamChunks, RequestKeys, Requests); - - // Send the payload and descriptive data about each chunk to the client - return WriteGetCacheChunksResponse(Namespace, Requests); -} - -bool -HttpStructuredCacheService::ParseGetCacheChunksRequest(std::string& Namespace, - std::vector<CacheKeyRequest>& RecordKeys, - std::vector<cache::detail::RecordBody>& Records, - std::vector<CacheChunkRequest>& RequestKeys, - std::vector<cache::detail::ChunkRequest>& Requests, - std::vector<cache::detail::ChunkRequest*>& RecordRequests, - std::vector<cache::detail::ChunkRequest*>& ValueRequests, - CbObjectView RpcRequest) -{ - using namespace cache::detail; - - ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv); - - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); - std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString(); - CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default; - - std::optional<std::string> NamespaceText = GetRpcRequestNamespace(Params); - if (!NamespaceText) - { - ZEN_WARN("GetCacheChunks: Invalid namespace in ChunkRequest."); - return false; - } - Namespace = *NamespaceText; - - CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView(); - size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num()); - - // Note that these reservations allow us to take pointers to the elements while populating them. If the reservation is removed, - // we will need to change the pointers to indexes to handle reallocations. - RecordKeys.reserve(NumRequests); - Records.reserve(NumRequests); - RequestKeys.reserve(NumRequests); - Requests.reserve(NumRequests); - RecordRequests.reserve(NumRequests); - ValueRequests.reserve(NumRequests); - - CacheKeyRequest* PreviousRecordKey = nullptr; - RecordBody* PreviousRecord = nullptr; - - for (CbFieldView RequestView : ChunkRequestsArray) - { - CbObjectView RequestObject = RequestView.AsObjectView(); - CacheChunkRequest& RequestKey = RequestKeys.emplace_back(); - ChunkRequest& Request = Requests.emplace_back(); - CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); - - Request.Key = &RequestKey; - if (!GetRpcRequestCacheKey(KeyObject, Request.Key->Key)) - { - ZEN_WARN("GetCacheChunks: Invalid key in ChunkRequest."); - return false; - } - - RequestKey.ChunkId = RequestObject["ChunkId"sv].AsHash(); - RequestKey.ValueId = RequestObject["ValueId"sv].AsObjectId(); - RequestKey.RawOffset = RequestObject["RawOffset"sv].AsUInt64(); - RequestKey.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX); - Request.RequestedSize = RequestKey.RawSize; - Request.RequestedOffset = RequestKey.RawOffset; - std::string_view PolicyText = RequestObject["Policy"sv].AsString(); - Request.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; - Request.IsRecordRequest = (bool)RequestKey.ValueId; - - if (!Request.IsRecordRequest) - { - ValueRequests.push_back(&Request); - } - else - { - RecordRequests.push_back(&Request); - CacheKeyRequest* RecordKey = nullptr; - RecordBody* Record = nullptr; - - if (!PreviousRecordKey || PreviousRecordKey->Key < RequestKey.Key) - { - RecordKey = &RecordKeys.emplace_back(); - PreviousRecordKey = RecordKey; - Record = &Records.emplace_back(); - PreviousRecord = Record; - RecordKey->Key = RequestKey.Key; - } - else if (RequestKey.Key == PreviousRecordKey->Key) - { - RecordKey = PreviousRecordKey; - Record = PreviousRecord; - } - else - { - ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.", - RequestKey.Key.Bucket, - RequestKey.Key.Hash, - PreviousRecordKey->Key.Bucket, - PreviousRecordKey->Key.Hash); - return false; - } - Request.Record = Record; - if (RequestKey.ChunkId == RequestKey.ChunkId.Zero) - { - Record->DownstreamPolicy = - Record->HasRequest ? Union(Record->DownstreamPolicy, Request.DownstreamPolicy) : Request.DownstreamPolicy; - Record->HasRequest = true; - } - } - } - if (Requests.empty()) - { - return false; - } - return true; -} - -void -HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespace, - std::vector<CacheKeyRequest>& RecordKeys, - std::vector<cache::detail::RecordBody>& Records, - std::vector<cache::detail::ChunkRequest*>& RecordRequests, - std::vector<CacheChunkRequest*>& OutUpstreamChunks) -{ - using namespace cache::detail; - - std::vector<CacheKeyRequest*> UpstreamRecordRequests; - for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex) - { - Stopwatch Timer; - CacheKeyRequest& RecordKey = RecordKeys[RecordIndex]; - RecordBody& Record = Records[RecordIndex]; - if (Record.HasRequest) - { - Record.DownstreamPolicy |= CachePolicy::SkipData | CachePolicy::SkipMeta; - - if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal)) - { - ZenCacheValue CacheValue; - if (m_CacheStore.Get(Namespace, RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue)) - { - Record.Exists = true; - Record.CacheValue = std::move(CacheValue.Value); - } - } - if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryRemote)) - { - RecordKey.Policy = CacheRecordPolicy(ConvertToUpstream(Record.DownstreamPolicy)); - UpstreamRecordRequests.push_back(&RecordKey); - } - RecordRequests[RecordIndex]->ElapsedTimeUs += Timer.GetElapsedTimeUs(); - } - } - - if (!UpstreamRecordRequests.empty()) - { - const auto OnCacheRecordGetComplete = - [this, Namespace, &RecordKeys, &Records, &RecordRequests](CacheRecordGetCompleteParams&& Params) { - if (!Params.Record) - { - return; - } - CacheKeyRequest& RecordKey = Params.Request; - size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey); - RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0); - RecordBody& Record = Records[RecordIndex]; - - const CacheKey& Key = RecordKey.Key; - Record.Exists = true; - CbObject ObjectBuffer = CbObject::Clone(Params.Record); - Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); - Record.CacheValue.SetContentType(ZenContentType::kCbObject); - Record.Source = Params.Source; - - if (EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal)) - { - m_CacheStore.Put(Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}); - } - }; - m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); - } - - std::vector<CacheChunkRequest*> UpstreamPayloadRequests; - for (ChunkRequest* Request : RecordRequests) - { - Stopwatch Timer; - if (Request->Key->ChunkId == IoHash::Zero) - { - // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId) - // is missing, parse the cache record and try to find the raw hash from the ValueId. - RecordBody& Record = *Request->Record; - if (!Record.ValuesRead) - { - Record.ValuesRead = true; - if (Record.CacheValue && Record.CacheValue.GetContentType() == ZenContentType::kCbObject) - { - CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData()); - CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView(); - Record.Values.reserve(ValuesArray.Num()); - for (CbFieldView ValueField : ValuesArray) - { - CbObjectView ValueObject = ValueField.AsObjectView(); - Oid ValueId = ValueObject["Id"sv].AsObjectId(); - CbFieldView RawHashField = ValueObject["RawHash"sv]; - IoHash RawHash = RawHashField.AsBinaryAttachment(); - if (ValueId && !RawHashField.HasError()) - { - Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()}); - } - } - } - } - - for (const RecordValue& Value : Record.Values) - { - if (Value.ValueId == Request->Key->ValueId) - { - Request->Key->ChunkId = Value.ContentId; - Request->RawSize = Value.RawSize; - Request->RawSizeKnown = true; - break; - } - } - } - - // Now load the ContentId from the local ContentIdStore or from the upstream - if (Request->Key->ChunkId != IoHash::Zero) - { - if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) - { - if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown) - { - if (m_CidStore.ContainsChunk(Request->Key->ChunkId)) - { - Request->Exists = true; - } - } - else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId)) - { - if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) - { - Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(Payload)); - if (Request->Value) - { - Request->Exists = true; - Request->RawSizeKnown = false; - } - } - else - { - IoHash _; - if (CompressedBuffer::ValidateCompressedHeader(Payload, _, Request->RawSize)) - { - Request->Exists = true; - Request->RawSizeKnown = true; - } - } - } - } - if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote)) - { - Request->Key->Policy = ConvertToUpstream(Request->DownstreamPolicy); - OutUpstreamChunks.push_back(Request->Key); - } - } - Request->ElapsedTimeUs += Timer.GetElapsedTimeUs(); - } -} - -void -HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespace, - std::vector<cache::detail::ChunkRequest*>& ValueRequests, - std::vector<CacheChunkRequest*>& OutUpstreamChunks) -{ - using namespace cache::detail; - - for (ChunkRequest* Request : ValueRequests) - { - Stopwatch Timer; - if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) - { - ZenCacheValue CacheValue; - if (m_CacheStore.Get(Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue)) - { - if (IsCompressedBinary(CacheValue.Value.GetContentType())) - { - Request->Key->ChunkId = CacheValue.RawHash; - Request->Exists = true; - Request->RawSize = CacheValue.RawSize; - Request->RawSizeKnown = true; - if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) - { - Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value)); - } - } - } - } - if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote)) - { - if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::StoreLocal)) - { - // Convert the Offset,Size request into a request for the entire value; we will need it all to be able to store it locally - Request->Key->RawOffset = 0; - Request->Key->RawSize = UINT64_MAX; - } - OutUpstreamChunks.push_back(Request->Key); - } - Request->ElapsedTimeUs += Timer.GetElapsedTimeUs(); - } -} - -void -HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Namespace, - std::vector<CacheChunkRequest*>& UpstreamChunks, - std::vector<CacheChunkRequest>& RequestKeys, - std::vector<cache::detail::ChunkRequest>& Requests) -{ - using namespace cache::detail; - - if (!UpstreamChunks.empty()) - { - const auto OnCacheChunksGetComplete = [this, Namespace, &RequestKeys, &Requests](CacheChunkGetCompleteParams&& Params) { - if (Params.RawHash == Params.RawHash.Zero) - { - return; - } - - CacheChunkRequest& Key = Params.Request; - size_t RequestIndex = std::distance(RequestKeys.data(), &Key); - ChunkRequest& Request = Requests[RequestIndex]; - Request.ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0); - if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) || - !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) - { - CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Params.Value)); - if (!Compressed) - { - return; - } - - if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal)) - { - if (Request.IsRecordRequest) - { - m_CidStore.AddChunk(Params.Value, Params.RawHash); - } - else - { - m_CacheStore.Put(Namespace, - Key.Key.Bucket, - Key.Key.Hash, - {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash}); - } - } - if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) - { - Request.Value = std::move(Compressed); - } - } - Key.ChunkId = Params.RawHash; - Request.Exists = true; - Request.RawSize = Params.RawSize; - Request.RawSizeKnown = true; - Request.Source = Params.Source; - - m_CacheStats.UpstreamHitCount++; - }; - - m_UpstreamCache.GetCacheChunks(Namespace, UpstreamChunks, std::move(OnCacheChunksGetComplete)); - } -} - -CbPackage -HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests) -{ - using namespace cache::detail; - - CbPackage RpcResponse; - CbObjectWriter Writer; - - Writer.BeginArray("Result"sv); - for (ChunkRequest& Request : Requests) - { - Writer.BeginObject(); - { - if (Request.Exists) - { - Writer.AddHash("RawHash"sv, Request.Key->ChunkId); - if (Request.Value && !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) - { - RpcResponse.AddAttachment(CbAttachment(Request.Value, Request.Key->ChunkId)); - } - else - { - Writer.AddInteger("RawSize"sv, Request.RawSize); - } - - ZEN_DEBUG("GETCACHECHUNKS HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}", - Namespace, - Request.Key->Key.Bucket, - Request.Key->Key.Hash, - Request.Key->ValueId, - NiceBytes(Request.RawSize), - Request.IsRecordRequest ? "Record"sv : "Value"sv, - Request.Source ? Request.Source->Url : "LOCAL"sv, - NiceLatencyNs(Request.ElapsedTimeUs * 1000)); - m_CacheStats.HitCount++; - } - else if (!EnumHasAnyFlags(Request.DownstreamPolicy, CachePolicy::Query)) - { - ZEN_DEBUG("GETCACHECHUNKS DISABLEDQUERY - '{}/{}/{}/{}' in {}", - Namespace, - Request.Key->Key.Bucket, - Request.Key->Key.Hash, - Request.Key->ValueId, - NiceLatencyNs(Request.ElapsedTimeUs * 1000)); - } - else - { - ZEN_DEBUG("GETCACHECHUNKS MISS - '{}/{}/{}/{}' in {}", - Namespace, - Request.Key->Key.Bucket, - Request.Key->Key.Hash, - Request.Key->ValueId, - NiceLatencyNs(Request.ElapsedTimeUs * 1000)); - m_CacheStats.MissCount++; - } - } - Writer.EndObject(); - } - Writer.EndArray(); - - RpcResponse.SetObject(Writer.Save()); - return RpcResponse; -} - -void -HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) -{ - CbObjectWriter Cbo; - - EmitSnapshot("requests", m_HttpRequests, Cbo); - EmitSnapshot("upstream_gets", m_UpstreamGetRequestTiming, Cbo); - - const uint64_t HitCount = m_CacheStats.HitCount; - const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount; - const uint64_t MissCount = m_CacheStats.MissCount; - const uint64_t TotalCount = HitCount + MissCount; - - const CidStoreSize CidSize = m_CidStore.TotalSize(); - const GcStorageSize CacheSize = m_CacheStore.StorageSize(); - - Cbo.BeginObject("cache"); - { - Cbo.BeginObject("size"); - { - Cbo << "disk" << CacheSize.DiskSize; - Cbo << "memory" << CacheSize.MemorySize; - } - Cbo.EndObject(); - - Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); - Cbo << "hits" << HitCount << "misses" << MissCount; - Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0); - Cbo << "upstream_hits" << m_CacheStats.UpstreamHitCount; - Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); - } - Cbo.EndObject(); - - Cbo.BeginObject("upstream"); - { - m_UpstreamCache.GetStatus(Cbo); - } - Cbo.EndObject(); - - Cbo.BeginObject("cid"); - { - Cbo.BeginObject("size"); - { - Cbo << "tiny" << CidSize.TinySize; - Cbo << "small" << CidSize.SmallSize; - Cbo << "large" << CidSize.LargeSize; - Cbo << "total" << CidSize.TotalSize; - } - Cbo.EndObject(); - } - Cbo.EndObject(); - - Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); -} - -void -HttpStructuredCacheService::HandleStatusRequest(HttpServerRequest& Request) -{ - CbObjectWriter Cbo; - Cbo << "ok" << true; - Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); -} - -#if ZEN_WITH_TESTS - -TEST_CASE("z$service.parse.relative.Uri") -{ - HttpRequestData RootRequest; - CHECK(HttpRequestParseRelativeUri("", RootRequest)); - CHECK(!RootRequest.Namespace.has_value()); - CHECK(!RootRequest.Bucket.has_value()); - CHECK(!RootRequest.HashKey.has_value()); - CHECK(!RootRequest.ValueContentId.has_value()); - - RootRequest = {}; - CHECK(HttpRequestParseRelativeUri("/", RootRequest)); - CHECK(!RootRequest.Namespace.has_value()); - CHECK(!RootRequest.Bucket.has_value()); - CHECK(!RootRequest.HashKey.has_value()); - CHECK(!RootRequest.ValueContentId.has_value()); - - HttpRequestData LegacyBucketRequestBecomesNamespaceRequest; - CHECK(HttpRequestParseRelativeUri("test", LegacyBucketRequestBecomesNamespaceRequest)); - CHECK(LegacyBucketRequestBecomesNamespaceRequest.Namespace == "test"sv); - CHECK(!LegacyBucketRequestBecomesNamespaceRequest.Bucket.has_value()); - CHECK(!LegacyBucketRequestBecomesNamespaceRequest.HashKey.has_value()); - CHECK(!LegacyBucketRequestBecomesNamespaceRequest.ValueContentId.has_value()); - - HttpRequestData LegacyHashKeyRequest; - CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234", LegacyHashKeyRequest)); - CHECK(LegacyHashKeyRequest.Namespace == ZenCacheStore::DefaultNamespace); - CHECK(LegacyHashKeyRequest.Bucket == "test"sv); - CHECK(LegacyHashKeyRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); - CHECK(!LegacyHashKeyRequest.ValueContentId.has_value()); - - HttpRequestData LegacyValueContentIdRequest; - CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234/56789abcdef12345678956789abcdef123456789", - LegacyValueContentIdRequest)); - CHECK(LegacyValueContentIdRequest.Namespace == ZenCacheStore::DefaultNamespace); - CHECK(LegacyValueContentIdRequest.Bucket == "test"sv); - CHECK(LegacyValueContentIdRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); - CHECK(LegacyValueContentIdRequest.ValueContentId == IoHash::FromHexString("56789abcdef12345678956789abcdef123456789"sv)); - - HttpRequestData V2DefaultNamespaceRequest; - CHECK(HttpRequestParseRelativeUri("ue4.ddc", V2DefaultNamespaceRequest)); - CHECK(V2DefaultNamespaceRequest.Namespace == "ue4.ddc"); - CHECK(!V2DefaultNamespaceRequest.Bucket.has_value()); - CHECK(!V2DefaultNamespaceRequest.HashKey.has_value()); - CHECK(!V2DefaultNamespaceRequest.ValueContentId.has_value()); - - HttpRequestData V2NamespaceRequest; - CHECK(HttpRequestParseRelativeUri("nicenamespace", V2NamespaceRequest)); - CHECK(V2NamespaceRequest.Namespace == "nicenamespace"sv); - CHECK(!V2NamespaceRequest.Bucket.has_value()); - CHECK(!V2NamespaceRequest.HashKey.has_value()); - CHECK(!V2NamespaceRequest.ValueContentId.has_value()); - - HttpRequestData V2BucketRequestWithDefaultNamespace; - CHECK(HttpRequestParseRelativeUri("ue4.ddc/test", V2BucketRequestWithDefaultNamespace)); - CHECK(V2BucketRequestWithDefaultNamespace.Namespace == "ue4.ddc"); - CHECK(V2BucketRequestWithDefaultNamespace.Bucket == "test"sv); - CHECK(!V2BucketRequestWithDefaultNamespace.HashKey.has_value()); - CHECK(!V2BucketRequestWithDefaultNamespace.ValueContentId.has_value()); - - HttpRequestData V2BucketRequestWithNamespace; - CHECK(HttpRequestParseRelativeUri("nicenamespace/test", V2BucketRequestWithNamespace)); - CHECK(V2BucketRequestWithNamespace.Namespace == "nicenamespace"sv); - CHECK(V2BucketRequestWithNamespace.Bucket == "test"sv); - CHECK(!V2BucketRequestWithNamespace.HashKey.has_value()); - CHECK(!V2BucketRequestWithNamespace.ValueContentId.has_value()); - - HttpRequestData V2HashKeyRequest; - CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234", V2HashKeyRequest)); - CHECK(V2HashKeyRequest.Namespace == ZenCacheStore::DefaultNamespace); - CHECK(V2HashKeyRequest.Bucket == "test"); - CHECK(V2HashKeyRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); - CHECK(!V2HashKeyRequest.ValueContentId.has_value()); - - HttpRequestData V2ValueContentIdRequest; - CHECK( - HttpRequestParseRelativeUri("nicenamespace/test/0123456789abcdef12340123456789abcdef1234/56789abcdef12345678956789abcdef123456789", - V2ValueContentIdRequest)); - CHECK(V2ValueContentIdRequest.Namespace == "nicenamespace"sv); - CHECK(V2ValueContentIdRequest.Bucket == "test"sv); - CHECK(V2ValueContentIdRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); - CHECK(V2ValueContentIdRequest.ValueContentId == IoHash::FromHexString("56789abcdef12345678956789abcdef123456789"sv)); - - HttpRequestData Invalid; - CHECK(!HttpRequestParseRelativeUri("bad\2_namespace", Invalid)); - CHECK(!HttpRequestParseRelativeUri("nice/\2\1bucket", Invalid)); - CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789a", Invalid)); - CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/56789abcdef1234", Invalid)); - CHECK(!HttpRequestParseRelativeUri("namespace/bucket/pppppppp89abcdef12340123456789abcdef1234", Invalid)); - CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/56789abcd", Invalid)); - CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/ppppppppdef12345678956789abcdef123456789", - Invalid)); -} - -#endif - -void -z$service_forcelink() -{ -} - -} // namespace zen diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h deleted file mode 100644 index 4e7b98ac9..000000000 --- a/zenserver/cache/structuredcache.h +++ /dev/null @@ -1,187 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/stats.h> -#include <zenhttp/httpserver.h> - -#include "monitoring/httpstats.h" -#include "monitoring/httpstatus.h" - -#include <memory> -#include <vector> - -namespace spdlog { -class logger; -} - -namespace zen { - -struct CacheChunkRequest; -struct CacheKeyRequest; -class CidStore; -class CbObjectView; -struct PutRequestData; -class ScrubContext; -class UpstreamCache; -class ZenCacheStore; -enum class CachePolicy : uint32_t; -enum class RpcAcceptOptions : uint16_t; - -namespace cache { - class IRpcRequestReplayer; - class IRpcRequestRecorder; - namespace detail { - struct RecordBody; - struct ChunkRequest; - } // namespace detail -} // namespace cache - -/** - * Structured cache service. Imposes constraints on keys, supports blobs and - * structured values - * - * Keys are structured as: - * - * {BucketId}/{KeyHash} - * - * Where BucketId is a lower-case alphanumeric string, and KeyHash is a 40-character - * hexadecimal sequence. The hash value may be derived in any number of ways, it's - * up to the application to pick an approach. - * - * Values may be structured or unstructured. Structured values are encoded using Unreal - * Engine's compact binary encoding (see CbObject) - * - * Additionally, attachments may be addressed as: - * - * {BucketId}/{KeyHash}/{ValueHash} - * - * Where the two initial components are the same as for the main endpoint - * - * The storage strategy is as follows: - * - * - Structured values are stored in a dedicated backing store per bucket - * - Unstructured values and attachments are stored in the CAS pool - * - */ - -class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider, public IHttpStatusProvider -{ -public: - HttpStructuredCacheService(ZenCacheStore& InCacheStore, - CidStore& InCidStore, - HttpStatsService& StatsService, - HttpStatusService& StatusService, - UpstreamCache& UpstreamCache); - ~HttpStructuredCacheService(); - - virtual const char* BaseUri() const override; - virtual void HandleRequest(HttpServerRequest& Request) override; - - void Flush(); - void Scrub(ScrubContext& Ctx); - -private: - struct CacheRef - { - std::string Namespace; - std::string BucketSegment; - IoHash HashKey; - IoHash ValueContentId; - }; - - struct CacheStats - { - std::atomic_uint64_t HitCount{}; - std::atomic_uint64_t UpstreamHitCount{}; - std::atomic_uint64_t MissCount{}; - }; - enum class PutResult - { - Success, - Fail, - Invalid, - }; - - void HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); - void HandleGetCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); - void HandlePutCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); - void HandleCacheChunkRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); - void HandleGetCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); - void HandlePutCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); - void HandleRpcRequest(HttpServerRequest& Request); - void HandleDetailsRequest(HttpServerRequest& Request); - - CbPackage HandleRpcPutCacheRecords(const CbPackage& BatchRequest); - CbPackage HandleRpcGetCacheRecords(CbObjectView BatchRequest); - CbPackage HandleRpcPutCacheValues(const CbPackage& BatchRequest); - CbPackage HandleRpcGetCacheValues(CbObjectView BatchRequest); - CbPackage HandleRpcGetCacheChunks(CbObjectView BatchRequest); - CbPackage HandleRpcRequest(const ZenContentType ContentType, - IoBuffer&& Body, - uint32_t& OutAcceptMagic, - RpcAcceptOptions& OutAcceptFlags, - int& OutTargetProcessId); - - void HandleCacheRequest(HttpServerRequest& Request); - void HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view Namespace); - void HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Namespace, std::string_view Bucket); - virtual void HandleStatsRequest(HttpServerRequest& Request) override; - virtual void HandleStatusRequest(HttpServerRequest& Request) override; - PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); - - /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */ - bool ParseGetCacheChunksRequest(std::string& Namespace, - std::vector<CacheKeyRequest>& RecordKeys, - std::vector<cache::detail::RecordBody>& Records, - std::vector<CacheChunkRequest>& RequestKeys, - std::vector<cache::detail::ChunkRequest>& Requests, - std::vector<cache::detail::ChunkRequest*>& RecordRequests, - std::vector<cache::detail::ChunkRequest*>& ValueRequests, - CbObjectView RpcRequest); - /** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally. */ - void GetLocalCacheRecords(std::string_view Namespace, - std::vector<CacheKeyRequest>& RecordKeys, - std::vector<cache::detail::RecordBody>& Records, - std::vector<cache::detail::ChunkRequest*>& RecordRequests, - std::vector<CacheChunkRequest*>& OutUpstreamChunks); - /** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */ - void GetLocalCacheValues(std::string_view Namespace, - std::vector<cache::detail::ChunkRequest*>& ValueRequests, - std::vector<CacheChunkRequest*>& OutUpstreamChunks); - /** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */ - void GetUpstreamCacheChunks(std::string_view Namespace, - std::vector<CacheChunkRequest*>& UpstreamChunks, - std::vector<CacheChunkRequest>& RequestKeys, - std::vector<cache::detail::ChunkRequest>& Requests); - /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */ - CbPackage WriteGetCacheChunksResponse(std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests); - - spdlog::logger& Log() { return m_Log; } - spdlog::logger& m_Log; - ZenCacheStore& m_CacheStore; - HttpStatsService& m_StatsService; - HttpStatusService& m_StatusService; - CidStore& m_CidStore; - UpstreamCache& m_UpstreamCache; - uint64_t m_LastScrubTime = 0; - metrics::OperationTiming m_HttpRequests; - metrics::OperationTiming m_UpstreamGetRequestTiming; - CacheStats m_CacheStats; - - void ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount); - - std::unique_ptr<cache::IRpcRequestRecorder> m_RequestRecorder; -}; - -/** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys. - * We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers. */ -inline bool -IsCompressedBinary(ZenContentType Type) -{ - return Type == ZenContentType::kBinary || Type == ZenContentType::kCompressedBinary; -} - -void z$service_forcelink(); - -} // namespace zen diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp deleted file mode 100644 index 26e970073..000000000 --- a/zenserver/cache/structuredcachestore.cpp +++ /dev/null @@ -1,3648 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "structuredcachestore.h" - -#include <zencore/except.h> - -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinarypackage.h> -#include <zencore/compactbinaryvalidation.h> -#include <zencore/compress.h> -#include <zencore/except.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> -#include <zencore/scopeguard.h> -#include <zencore/string.h> -#include <zencore/thread.h> -#include <zencore/timer.h> -#include <zencore/trace.h> -#include <zenstore/cidstore.h> -#include <zenstore/scrubcontext.h> - -#include <xxhash.h> - -#include <limits> - -#if ZEN_PLATFORM_WINDOWS -# include <zencore/windows.h> -#endif - -ZEN_THIRD_PARTY_INCLUDES_START -#include <fmt/core.h> -#include <gsl/gsl-lite.hpp> -ZEN_THIRD_PARTY_INCLUDES_END - -#if ZEN_WITH_TESTS -# include <zencore/testing.h> -# include <zencore/testutils.h> -# include <zencore/workthreadpool.h> -# include <random> -#endif - -////////////////////////////////////////////////////////////////////////// - -namespace zen { - -namespace { - -#pragma pack(push) -#pragma pack(1) - - struct CacheBucketIndexHeader - { - static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; - static constexpr uint32_t Version2 = 2; - static constexpr uint32_t CurrentVersion = Version2; - - uint32_t Magic = ExpectedMagic; - uint32_t Version = CurrentVersion; - uint64_t EntryCount = 0; - uint64_t LogPosition = 0; - uint32_t PayloadAlignment = 0; - uint32_t Checksum = 0; - - static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header) - { - return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); - } - }; - - static_assert(sizeof(CacheBucketIndexHeader) == 32); - -#pragma pack(pop) - - const char* IndexExtension = ".uidx"; - const char* LogExtension = ".slog"; - - std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) - { - return BucketDir / (BucketName + IndexExtension); - } - - std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) - { - return BucketDir / (BucketName + ".tmp"); - } - - std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName) - { - return BucketDir / (BucketName + LogExtension); - } - - bool ValidateEntry(const DiskIndexEntry& Entry, std::string& OutReason) - { - if (Entry.Key == IoHash::Zero) - { - OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); - return false; - } - if (Entry.Location.GetFlags() & - ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed)) - { - OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString()); - return false; - } - if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) - { - return true; - } - if (Entry.Location.Reserved != 0) - { - OutReason = fmt::format("Invalid reserved field {} for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString()); - return false; - } - uint64_t Size = Entry.Location.Size(); - if (Size == 0) - { - OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); - return false; - } - return true; - } - - bool MoveAndDeleteDirectory(const std::filesystem::path& Dir) - { - int DropIndex = 0; - do - { - if (!std::filesystem::exists(Dir)) - { - return false; - } - - std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex); - std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName; - if (std::filesystem::exists(DroppedBucketPath)) - { - DropIndex++; - continue; - } - - std::error_code Ec; - std::filesystem::rename(Dir, DroppedBucketPath, Ec); - if (!Ec) - { - DeleteDirectories(DroppedBucketPath); - return true; - } - // TODO: Do we need to bail at some point? - zen::Sleep(100); - } while (true); - } - -} // namespace - -namespace fs = std::filesystem; - -static CbObject -LoadCompactBinaryObject(const fs::path& Path) -{ - FileContents Result = ReadFile(Path); - - if (!Result.ErrorCode) - { - IoBuffer Buffer = Result.Flatten(); - if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) - { - return LoadCompactBinaryObject(Buffer); - } - } - - return CbObject(); -} - -static void -SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) -{ - WriteFile(Path, Object.GetBuffer().AsIoBuffer()); -} - -ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, const std::filesystem::path& RootDir) -: GcStorage(Gc) -, GcContributor(Gc) -, m_RootDir(RootDir) -, m_DiskLayer(RootDir) -{ - ZEN_INFO("initializing structured cache at '{}'", RootDir); - CreateDirectories(RootDir); - - m_DiskLayer.DiscoverBuckets(); - -#if ZEN_USE_CACHE_TRACKER - m_AccessTracker.reset(new ZenCacheTracker(RootDir)); -#endif -} - -ZenCacheNamespace::~ZenCacheNamespace() -{ -} - -bool -ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) -{ - ZEN_TRACE_CPU("Z$::Get"); - - bool Ok = m_MemLayer.Get(InBucket, HashKey, OutValue); - -#if ZEN_USE_CACHE_TRACKER - auto _ = MakeGuard([&] { - if (!Ok) - return; - - m_AccessTracker->TrackAccess(InBucket, HashKey); - }); -#endif - - if (Ok) - { - ZEN_ASSERT(OutValue.Value.Size()); - - return true; - } - - Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue); - - if (Ok) - { - ZEN_ASSERT(OutValue.Value.Size()); - - if (OutValue.Value.Size() <= m_DiskLayerSizeThreshold) - { - m_MemLayer.Put(InBucket, HashKey, OutValue); - } - } - - return Ok; -} - -void -ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) -{ - ZEN_TRACE_CPU("Z$::Put"); - - // Store value and index - - ZEN_ASSERT(Value.Value.Size()); - - m_DiskLayer.Put(InBucket, HashKey, Value); - -#if ZEN_USE_REF_TRACKING - if (Value.Value.GetContentType() == ZenContentType::kCbObject) - { - if (ValidateCompactBinary(Value.Value, CbValidateMode::All) == CbValidateError::None) - { - CbObject Object{SharedBuffer(Value.Value)}; - - uint8_t TempBuffer[8 * sizeof(IoHash)]; - std::pmr::monotonic_buffer_resource Linear{TempBuffer, sizeof TempBuffer}; - std::pmr::polymorphic_allocator Allocator{&Linear}; - std::pmr::vector<IoHash> CidReferences{Allocator}; - - Object.IterateAttachments([&](CbFieldView Field) { CidReferences.push_back(Field.AsAttachment()); }); - - m_Gc.OnNewCidReferences(CidReferences); - } - } -#endif - - if (Value.Value.Size() <= m_DiskLayerSizeThreshold) - { - m_MemLayer.Put(InBucket, HashKey, Value); - } -} - -bool -ZenCacheNamespace::DropBucket(std::string_view Bucket) -{ - ZEN_INFO("dropping bucket '{}'", Bucket); - - // TODO: should ensure this is done atomically across all layers - - const bool MemDropped = m_MemLayer.DropBucket(Bucket); - const bool DiskDropped = m_DiskLayer.DropBucket(Bucket); - const bool AnyDropped = MemDropped || DiskDropped; - - ZEN_INFO("bucket '{}' was {}", Bucket, AnyDropped ? "dropped" : "not found"); - - return AnyDropped; -} - -bool -ZenCacheNamespace::Drop() -{ - m_MemLayer.Drop(); - return m_DiskLayer.Drop(); -} - -void -ZenCacheNamespace::Flush() -{ - m_DiskLayer.Flush(); -} - -void -ZenCacheNamespace::Scrub(ScrubContext& Ctx) -{ - if (m_LastScrubTime == Ctx.ScrubTimestamp()) - { - return; - } - - m_LastScrubTime = Ctx.ScrubTimestamp(); - - m_DiskLayer.Scrub(Ctx); - m_MemLayer.Scrub(Ctx); -} - -void -ZenCacheNamespace::GatherReferences(GcContext& GcCtx) -{ - Stopwatch Timer; - const auto Guard = - MakeGuard([&] { ZEN_DEBUG("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - - access_tracking::AccessTimes AccessTimes; - m_MemLayer.GatherAccessTimes(AccessTimes); - - m_DiskLayer.UpdateAccessTimes(AccessTimes); - m_DiskLayer.GatherReferences(GcCtx); -} - -void -ZenCacheNamespace::CollectGarbage(GcContext& GcCtx) -{ - m_MemLayer.Reset(); - m_DiskLayer.CollectGarbage(GcCtx); -} - -GcStorageSize -ZenCacheNamespace::StorageSize() const -{ - return {.DiskSize = m_DiskLayer.TotalSize(), .MemorySize = m_MemLayer.TotalSize()}; -} - -ZenCacheNamespace::Info -ZenCacheNamespace::GetInfo() const -{ - ZenCacheNamespace::Info Info = {.Config = {.RootDir = m_RootDir, .DiskLayerThreshold = m_DiskLayerSizeThreshold}, - .DiskLayerInfo = m_DiskLayer.GetInfo(), - .MemoryLayerInfo = m_MemLayer.GetInfo()}; - std::unordered_set<std::string> BucketNames; - for (const std::string& BucketName : Info.DiskLayerInfo.BucketNames) - { - BucketNames.insert(BucketName); - } - for (const std::string& BucketName : Info.MemoryLayerInfo.BucketNames) - { - BucketNames.insert(BucketName); - } - Info.BucketNames.insert(Info.BucketNames.end(), BucketNames.begin(), BucketNames.end()); - return Info; -} - -std::optional<ZenCacheNamespace::BucketInfo> -ZenCacheNamespace::GetBucketInfo(std::string_view Bucket) const -{ - std::optional<ZenCacheDiskLayer::BucketInfo> DiskBucketInfo = m_DiskLayer.GetBucketInfo(Bucket); - if (!DiskBucketInfo.has_value()) - { - return {}; - } - ZenCacheNamespace::BucketInfo Info = {.DiskLayerInfo = *DiskBucketInfo, - .MemoryLayerInfo = m_MemLayer.GetBucketInfo(Bucket).value_or(ZenCacheMemoryLayer::BucketInfo{})}; - return Info; -} - -CacheValueDetails::NamespaceDetails -ZenCacheNamespace::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const -{ - return m_DiskLayer.GetValueDetails(BucketFilter, ValueFilter); -} - -////////////////////////////////////////////////////////////////////////// - -ZenCacheMemoryLayer::ZenCacheMemoryLayer() -{ -} - -ZenCacheMemoryLayer::~ZenCacheMemoryLayer() -{ -} - -bool -ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) -{ - RwLock::SharedLockScope _(m_Lock); - - auto It = m_Buckets.find(std::string(InBucket)); - - if (It == m_Buckets.end()) - { - return false; - } - - CacheBucket* Bucket = It->second.get(); - - _.ReleaseNow(); - - // There's a race here. Since the lock is released early to allow - // inserts, the bucket delete path could end up deleting the - // underlying data structure - - return Bucket->Get(HashKey, OutValue); -} - -void -ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) -{ - const auto BucketName = std::string(InBucket); - CacheBucket* Bucket = nullptr; - - { - RwLock::SharedLockScope _(m_Lock); - - if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - } - - if (Bucket == nullptr) - { - // New bucket - - RwLock::ExclusiveLockScope _(m_Lock); - - if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - else - { - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>()); - Bucket = InsertResult.first->second.get(); - } - } - - // Note that since the underlying IoBuffer is retained, the content type is also - - Bucket->Put(HashKey, Value); -} - -bool -ZenCacheMemoryLayer::DropBucket(std::string_view InBucket) -{ - RwLock::ExclusiveLockScope _(m_Lock); - - auto It = m_Buckets.find(std::string(InBucket)); - - if (It != m_Buckets.end()) - { - CacheBucket& Bucket = *It->second; - m_DroppedBuckets.push_back(std::move(It->second)); - m_Buckets.erase(It); - Bucket.Drop(); - return true; - } - return false; -} - -void -ZenCacheMemoryLayer::Drop() -{ - RwLock::ExclusiveLockScope _(m_Lock); - std::vector<std::unique_ptr<CacheBucket>> Buckets; - Buckets.reserve(m_Buckets.size()); - while (!m_Buckets.empty()) - { - const auto& It = m_Buckets.begin(); - CacheBucket& Bucket = *It->second; - m_DroppedBuckets.push_back(std::move(It->second)); - m_Buckets.erase(It->first); - Bucket.Drop(); - } -} - -void -ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx) -{ - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - Kv.second->Scrub(Ctx); - } -} - -void -ZenCacheMemoryLayer::GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes) -{ - using namespace zen::access_tracking; - - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - std::vector<KeyAccessTime>& Bucket = AccessTimes.Buckets[Kv.first]; - Kv.second->GatherAccessTimes(Bucket); - } -} - -void -ZenCacheMemoryLayer::Reset() -{ - RwLock::ExclusiveLockScope _(m_Lock); - m_Buckets.clear(); -} - -uint64_t -ZenCacheMemoryLayer::TotalSize() const -{ - uint64_t TotalSize{}; - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - TotalSize += Kv.second->TotalSize(); - } - - return TotalSize; -} - -ZenCacheMemoryLayer::Info -ZenCacheMemoryLayer::GetInfo() const -{ - ZenCacheMemoryLayer::Info Info = {.Config = m_Configuration, .TotalSize = TotalSize()}; - - RwLock::SharedLockScope _(m_Lock); - Info.BucketNames.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) - { - Info.BucketNames.push_back(Kv.first); - Info.EntryCount += Kv.second->EntryCount(); - } - return Info; -} - -std::optional<ZenCacheMemoryLayer::BucketInfo> -ZenCacheMemoryLayer::GetBucketInfo(std::string_view Bucket) const -{ - RwLock::SharedLockScope _(m_Lock); - - if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) - { - return ZenCacheMemoryLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()}; - } - return {}; -} - -void -ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) -{ - RwLock::SharedLockScope _(m_BucketLock); - - std::vector<IoHash> BadHashes; - - auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) { - if (ContentType == ZenContentType::kCbObject) - { - CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); - return Error == CbValidateError::None; - } - if (ContentType == ZenContentType::kCompressedBinary) - { - IoHash RawHash; - uint64_t RawSize; - if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) - { - return false; - } - if (Hash != RawHash) - { - return false; - } - } - return true; - }; - - for (auto& Kv : m_CacheMap) - { - const BucketPayload& Payload = m_Payloads[Kv.second]; - if (!ValidateEntry(Kv.first, Payload.Payload.GetContentType(), Payload.Payload)) - { - BadHashes.push_back(Kv.first); - } - } - - if (!BadHashes.empty()) - { - Ctx.ReportBadCidChunks(BadHashes); - } -} - -void -ZenCacheMemoryLayer::CacheBucket::GatherAccessTimes(std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes) -{ - RwLock::SharedLockScope _(m_BucketLock); - std::transform(m_CacheMap.begin(), m_CacheMap.end(), std::back_inserter(AccessTimes), [this](const auto& Kv) { - return access_tracking::KeyAccessTime{.Key = Kv.first, .LastAccess = m_AccessTimes[Kv.second]}; - }); -} - -bool -ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) -{ - RwLock::SharedLockScope _(m_BucketLock); - - if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end()) - { - uint32_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size()); - - const BucketPayload& Payload = m_Payloads[EntryIndex]; - OutValue = {.Value = Payload.Payload, .RawSize = Payload.RawSize, .RawHash = Payload.RawHash}; - m_AccessTimes[EntryIndex] = GcClock::TickCount(); - - return true; - } - - return false; -} - -void -ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) -{ - size_t PayloadSize = Value.Value.GetSize(); - { - GcClock::Tick AccessTime = GcClock::TickCount(); - RwLock::ExclusiveLockScope _(m_BucketLock); - if (m_CacheMap.size() == std::numeric_limits<uint32_t>::max()) - { - // No more space in our memory cache! - return; - } - if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end()) - { - uint32_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - - m_TotalSize.fetch_sub(PayloadSize, std::memory_order::relaxed); - BucketPayload& Payload = m_Payloads[EntryIndex]; - Payload.Payload = Value.Value; - Payload.RawHash = Value.RawHash; - Payload.RawSize = gsl::narrow<uint32_t>(Value.RawSize); - m_AccessTimes[EntryIndex] = AccessTime; - } - else - { - uint32_t EntryIndex = gsl::narrow<uint32_t>(m_Payloads.size()); - m_Payloads.emplace_back( - BucketPayload{.Payload = Value.Value, .RawSize = gsl::narrow<uint32_t>(Value.RawSize), .RawHash = Value.RawHash}); - m_AccessTimes.emplace_back(AccessTime); - m_CacheMap.insert_or_assign(HashKey, EntryIndex); - } - ZEN_ASSERT_SLOW(m_Payloads.size() == m_CacheMap.size()); - ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size()); - } - - m_TotalSize.fetch_add(PayloadSize, std::memory_order::relaxed); -} - -void -ZenCacheMemoryLayer::CacheBucket::Drop() -{ - RwLock::ExclusiveLockScope _(m_BucketLock); - m_CacheMap.clear(); - m_AccessTimes.clear(); - m_Payloads.clear(); - m_TotalSize.store(0); -} - -uint64_t -ZenCacheMemoryLayer::CacheBucket::EntryCount() const -{ - RwLock::SharedLockScope _(m_BucketLock); - return static_cast<uint64_t>(m_CacheMap.size()); -} - -////////////////////////////////////////////////////////////////////////// - -ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName) : m_BucketName(std::move(BucketName)), m_BucketId(Oid::Zero) -{ -} - -ZenCacheDiskLayer::CacheBucket::~CacheBucket() -{ -} - -bool -ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate) -{ - using namespace std::literals; - - m_BlocksBasePath = BucketDir / "blocks"; - m_BucketDir = BucketDir; - - CreateDirectories(m_BucketDir); - - std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"}; - - bool IsNew = false; - - CbObject Manifest = LoadCompactBinaryObject(ManifestPath); - - if (Manifest) - { - m_BucketId = Manifest["BucketId"sv].AsObjectId(); - if (m_BucketId == Oid::Zero) - { - return false; - } - } - else if (AllowCreate) - { - m_BucketId.Generate(); - - CbObjectWriter Writer; - Writer << "BucketId"sv << m_BucketId; - Manifest = Writer.Save(); - SaveCompactBinaryObject(ManifestPath, Manifest); - IsNew = true; - } - else - { - return false; - } - - OpenLog(IsNew); - - if (!IsNew) - { - Stopwatch Timer; - const auto _ = - MakeGuard([&] { ZEN_INFO("read store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - - for (CbFieldView Entry : Manifest["Timestamps"sv]) - { - const CbObjectView Obj = Entry.AsObjectView(); - const IoHash Key = Obj["Key"sv].AsHash(); - - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64(); - } - } - for (CbFieldView Entry : Manifest["RawInfo"sv]) - { - const CbObjectView Obj = Entry.AsObjectView(); - const IoHash Key = Obj["Key"sv].AsHash(); - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - m_Payloads[EntryIndex].RawHash = Obj["RawHash"sv].AsHash(); - m_Payloads[EntryIndex].RawSize = Obj["RawSize"sv].AsUInt64(); - } - } - } - - return true; -} - -void -ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() -{ - uint64_t LogCount = m_SlogFile.GetLogCount(); - if (m_LogFlushPosition == LogCount) - { - return; - } - - ZEN_DEBUG("write store snapshot for '{}'", m_BucketDir / m_BucketName); - uint64_t EntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", - m_BucketDir / m_BucketName, - EntryCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - namespace fs = std::filesystem; - - fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); - fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName); - - // Move index away, we keep it if something goes wrong - if (fs::is_regular_file(STmpIndexPath)) - { - fs::remove(STmpIndexPath); - } - if (fs::is_regular_file(IndexPath)) - { - fs::rename(IndexPath, STmpIndexPath); - } - - try - { - // Write the current state of the location map to a new index state - std::vector<DiskIndexEntry> Entries; - - { - Entries.resize(m_Index.size()); - - uint64_t EntryIndex = 0; - for (auto& Entry : m_Index) - { - DiskIndexEntry& IndexEntry = Entries[EntryIndex++]; - IndexEntry.Key = Entry.first; - IndexEntry.Location = m_Payloads[Entry.second].Location; - } - } - - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); - CacheBucketIndexHeader Header = {.EntryCount = Entries.size(), - .LogPosition = LogCount, - .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)}; - - Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); - - ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0); - ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); - ObjectIndexFile.Flush(); - ObjectIndexFile.Close(); - EntryCount = Entries.size(); - m_LogFlushPosition = LogCount; - } - catch (std::exception& Err) - { - ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what()); - - // Restore any previous snapshot - - if (fs::is_regular_file(STmpIndexPath)) - { - fs::remove(IndexPath); - fs::rename(STmpIndexPath, IndexPath); - } - } - if (fs::is_regular_file(STmpIndexPath)) - { - fs::remove(STmpIndexPath); - } -} - -uint64_t -ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion) -{ - if (std::filesystem::is_regular_file(IndexPath)) - { - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); - uint64_t Size = ObjectIndexFile.FileSize(); - if (Size >= sizeof(CacheBucketIndexHeader)) - { - CacheBucketIndexHeader Header; - ObjectIndexFile.Read(&Header, sizeof(Header), 0); - if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) && - (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0)) - { - switch (Header.Version) - { - case CacheBucketIndexHeader::Version2: - { - uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); - if (Header.EntryCount > ExpectedEntryCount) - { - break; - } - size_t EntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' index containing {} entries in {}", - IndexPath, - EntryCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - m_PayloadAlignment = Header.PayloadAlignment; - - std::vector<DiskIndexEntry> Entries; - Entries.resize(Header.EntryCount); - ObjectIndexFile.Read(Entries.data(), - Header.EntryCount * sizeof(DiskIndexEntry), - sizeof(CacheBucketIndexHeader)); - - m_Payloads.reserve(Header.EntryCount); - m_AccessTimes.reserve(Header.EntryCount); - m_Index.reserve(Header.EntryCount); - - std::string InvalidEntryReason; - for (const DiskIndexEntry& Entry : Entries) - { - if (!ValidateEntry(Entry, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); - continue; - } - size_t EntryIndex = m_Payloads.size(); - m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location, .RawSize = 0, .RawHash = IoHash::Zero}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - m_Index.insert_or_assign(Entry.Key, EntryIndex); - EntryCount++; - } - OutVersion = CacheBucketIndexHeader::Version2; - return Header.LogPosition; - } - break; - default: - break; - } - } - } - ZEN_WARN("skipping invalid index file '{}'", IndexPath); - } - return 0; -} - -uint64_t -ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount) -{ - if (std::filesystem::is_regular_file(LogPath)) - { - uint64_t LogEntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - TCasLogFile<DiskIndexEntry> CasLog; - CasLog.Open(LogPath, CasLogFile::Mode::kRead); - if (CasLog.Initialize()) - { - uint64_t EntryCount = CasLog.GetLogCount(); - if (EntryCount < SkipEntryCount) - { - ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); - SkipEntryCount = 0; - } - LogEntryCount = EntryCount - SkipEntryCount; - m_Index.reserve(LogEntryCount); - uint64_t InvalidEntryCount = 0; - CasLog.Replay( - [&](const DiskIndexEntry& Record) { - std::string InvalidEntryReason; - if (Record.Location.Flags & DiskLocation::kTombStone) - { - m_Index.erase(Record.Key); - return; - } - if (!ValidateEntry(Record, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); - ++InvalidEntryCount; - return; - } - size_t EntryIndex = m_Payloads.size(); - m_Payloads.emplace_back(BucketPayload{.Location = Record.Location, .RawSize = 0u, .RawHash = IoHash::Zero}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - m_Index.insert_or_assign(Record.Key, EntryIndex); - }, - SkipEntryCount); - if (InvalidEntryCount) - { - ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName); - } - return LogEntryCount; - } - } - return 0; -}; - -void -ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) -{ - m_TotalStandaloneSize = 0; - - m_Index.clear(); - m_Payloads.clear(); - m_AccessTimes.clear(); - - std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); - std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); - - if (IsNew) - { - fs::remove(LogPath); - fs::remove(IndexPath); - fs::remove_all(m_BlocksBasePath); - } - - uint64_t LogEntryCount = 0; - { - uint32_t IndexVersion = 0; - m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion); - if (IndexVersion == 0 && std::filesystem::is_regular_file(IndexPath)) - { - ZEN_WARN("removing invalid index file at '{}'", IndexPath); - fs::remove(IndexPath); - } - - if (TCasLogFile<DiskIndexEntry>::IsValid(LogPath)) - { - LogEntryCount = ReadLog(LogPath, m_LogFlushPosition); - } - else - { - ZEN_WARN("removing invalid cas log at '{}'", LogPath); - fs::remove(LogPath); - } - } - - CreateDirectories(m_BucketDir); - - m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); - - std::vector<BlockStoreLocation> KnownLocations; - KnownLocations.reserve(m_Index.size()); - for (const auto& Entry : m_Index) - { - size_t EntryIndex = Entry.second; - const BucketPayload& Payload = m_Payloads[EntryIndex]; - const DiskLocation& Location = Payload.Location; - - if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) - { - m_TotalStandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed); - continue; - } - const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment); - KnownLocations.push_back(BlockLocation); - } - - m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations); - if (IsNew || LogEntryCount > 0) - { - MakeIndexSnapshot(); - } - // TODO: should validate integrity of container files here -} - -void -ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const -{ - char HexString[sizeof(HashKey.Hash) * 2]; - ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, HexString); - - Path.Append(m_BucketDir); - Path.AppendSeparator(); - Path.Append(L"blob"); - Path.AppendSeparator(); - Path.AppendAsciiRange(HexString, HexString + 3); - Path.AppendSeparator(); - Path.AppendAsciiRange(HexString + 3, HexString + 5); - Path.AppendSeparator(); - Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString)); -} - -IoBuffer -ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) const -{ - BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment); - - IoBuffer Value = m_BlockStore.TryGetChunk(Location); - if (Value) - { - Value.SetContentType(Loc.GetContentType()); - } - - return Value; -} - -IoBuffer -ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const -{ - ExtendablePathBuilder<256> DataFilePath; - BuildPath(DataFilePath, HashKey); - - RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); - - if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath())) - { - Data.SetContentType(Loc.GetContentType()); - - return Data; - } - - return {}; -} - -bool -ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) -{ - RwLock::SharedLockScope _(m_IndexLock); - auto It = m_Index.find(HashKey); - if (It == m_Index.end()) - { - return false; - } - size_t EntryIndex = It.value(); - const BucketPayload& Payload = m_Payloads[EntryIndex]; - m_AccessTimes[EntryIndex] = GcClock::TickCount(); - DiskLocation Location = Payload.Location; - OutValue.RawSize = Payload.RawSize; - OutValue.RawHash = Payload.RawHash; - if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) - { - // We don't need to hold the index lock when we read a standalone file - _.ReleaseNow(); - OutValue.Value = GetStandaloneCacheValue(Location, HashKey); - } - else - { - OutValue.Value = GetInlineCacheValue(Location); - } - _.ReleaseNow(); - - if (!Location.IsFlagSet(DiskLocation::kStructured)) - { - if (OutValue.RawHash == IoHash::Zero && OutValue.RawSize == 0 && OutValue.Value.GetSize() > 0) - { - if (Location.IsFlagSet(DiskLocation::kCompressed)) - { - (void)CompressedBuffer::FromCompressed(SharedBuffer(OutValue.Value), OutValue.RawHash, OutValue.RawSize); - } - else - { - OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); - OutValue.RawSize = OutValue.Value.GetSize(); - } - RwLock::ExclusiveLockScope __(m_IndexLock); - if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end()) - { - BucketPayload& WritePayload = m_Payloads[WriteIt.value()]; - WritePayload.RawHash = OutValue.RawHash; - WritePayload.RawSize = OutValue.RawSize; - - m_LogFlushPosition = 0; // Force resave of index on exit - } - } - } - - return (bool)OutValue.Value; -} - -void -ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) -{ - if (Value.Value.Size() >= m_LargeObjectThreshold) - { - return PutStandaloneCacheValue(HashKey, Value); - } - PutInlineCacheValue(HashKey, Value); -} - -bool -ZenCacheDiskLayer::CacheBucket::Drop() -{ - RwLock::ExclusiveLockScope _(m_IndexLock); - - std::vector<std::unique_ptr<RwLock::ExclusiveLockScope>> ShardLocks; - ShardLocks.reserve(256); - for (RwLock& Lock : m_ShardedLocks) - { - ShardLocks.push_back(std::make_unique<RwLock::ExclusiveLockScope>(Lock)); - } - m_BlockStore.Close(); - m_SlogFile.Close(); - - bool Deleted = MoveAndDeleteDirectory(m_BucketDir); - - m_Index.clear(); - m_Payloads.clear(); - m_AccessTimes.clear(); - return Deleted; -} - -void -ZenCacheDiskLayer::CacheBucket::Flush() -{ - m_BlockStore.Flush(); - - RwLock::SharedLockScope _(m_IndexLock); - m_SlogFile.Flush(); - MakeIndexSnapshot(); - SaveManifest(); -} - -void -ZenCacheDiskLayer::CacheBucket::SaveManifest() -{ - using namespace std::literals; - - CbObjectWriter Writer; - Writer << "BucketId"sv << m_BucketId; - - if (!m_Index.empty()) - { - Writer.BeginArray("Timestamps"sv); - for (auto& Kv : m_Index) - { - const IoHash& Key = Kv.first; - GcClock::Tick AccessTime = m_AccessTimes[Kv.second]; - - Writer.BeginObject(); - Writer << "Key"sv << Key; - Writer << "LastAccess"sv << AccessTime; - Writer.EndObject(); - } - Writer.EndArray(); - - Writer.BeginArray("RawInfo"sv); - { - for (auto& Kv : m_Index) - { - const IoHash& Key = Kv.first; - const BucketPayload& Payload = m_Payloads[Kv.second]; - if (Payload.RawHash != IoHash::Zero) - { - Writer.BeginObject(); - Writer << "Key"sv << Key; - Writer << "RawHash"sv << Payload.RawHash; - Writer << "RawSize"sv << Payload.RawSize; - Writer.EndObject(); - } - } - } - Writer.EndArray(); - } - - SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Writer.Save()); -} - -void -ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) -{ - std::vector<IoHash> BadKeys; - uint64_t ChunkCount{0}, ChunkBytes{0}; - std::vector<BlockStoreLocation> ChunkLocations; - std::vector<IoHash> ChunkIndexToChunkHash; - - auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) { - if (ContentType == ZenContentType::kCbObject) - { - CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); - return Error == CbValidateError::None; - } - if (ContentType == ZenContentType::kCompressedBinary) - { - IoHash RawHash; - uint64_t RawSize; - if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) - { - return false; - } - if (RawHash != Hash) - { - return false; - } - } - return true; - }; - - RwLock::SharedLockScope _(m_IndexLock); - - const size_t BlockChunkInitialCount = m_Index.size() / 4; - ChunkLocations.reserve(BlockChunkInitialCount); - ChunkIndexToChunkHash.reserve(BlockChunkInitialCount); - - for (auto& Kv : m_Index) - { - const IoHash& HashKey = Kv.first; - const BucketPayload& Payload = m_Payloads[Kv.second]; - const DiskLocation& Loc = Payload.Location; - - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - ++ChunkCount; - ChunkBytes += Loc.Size(); - if (Loc.GetContentType() == ZenContentType::kBinary) - { - ExtendablePathBuilder<256> DataFilePath; - BuildPath(DataFilePath, HashKey); - - RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); - - std::error_code Ec; - uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec); - if (Ec) - { - BadKeys.push_back(HashKey); - } - if (size != Loc.Size()) - { - BadKeys.push_back(HashKey); - } - continue; - } - IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey); - if (!Buffer) - { - BadKeys.push_back(HashKey); - continue; - } - if (!ValidateEntry(HashKey, Loc.GetContentType(), Buffer)) - { - BadKeys.push_back(HashKey); - continue; - } - } - else - { - ChunkLocations.emplace_back(Loc.GetBlockLocation(m_PayloadAlignment)); - ChunkIndexToChunkHash.push_back(HashKey); - continue; - } - } - - const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) { - ++ChunkCount; - ChunkBytes += Size; - const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; - if (!Data) - { - // ChunkLocation out of range of stored blocks - BadKeys.push_back(Hash); - return; - } - IoBuffer Buffer(IoBuffer::Wrap, Data, Size); - if (!Buffer) - { - BadKeys.push_back(Hash); - return; - } - const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; - ZenContentType ContentType = Payload.Location.GetContentType(); - if (!ValidateEntry(Hash, ContentType, Buffer)) - { - BadKeys.push_back(Hash); - return; - } - }; - - const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { - ++ChunkCount; - ChunkBytes += Size; - const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; - // TODO: Add API to verify compressed buffer and possible structure data without having to memorymap the whole file - IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); - if (!Buffer) - { - BadKeys.push_back(Hash); - return; - } - const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; - ZenContentType ContentType = Payload.Location.GetContentType(); - if (!ValidateEntry(Hash, ContentType, Buffer)) - { - BadKeys.push_back(Hash); - return; - } - }; - - m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); - - _.ReleaseNow(); - - Ctx.ReportScrubbed(ChunkCount, ChunkBytes); - - if (!BadKeys.empty()) - { - ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir / m_BucketName); - - if (Ctx.RunRecovery()) - { - // Deal with bad chunks by removing them from our lookup map - - std::vector<DiskIndexEntry> LogEntries; - LogEntries.reserve(BadKeys.size()); - - { - RwLock::ExclusiveLockScope __(m_IndexLock); - for (const IoHash& BadKey : BadKeys) - { - // Log a tombstone and delete the in-memory index for the bad entry - const auto It = m_Index.find(BadKey); - const BucketPayload& Payload = m_Payloads[It->second]; - DiskLocation Location = Payload.Location; - Location.Flags |= DiskLocation::kTombStone; - LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location}); - m_Index.erase(BadKey); - } - } - for (const DiskIndexEntry& Entry : LogEntries) - { - if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile)) - { - ExtendablePathBuilder<256> Path; - BuildPath(Path, Entry.Key); - fs::path FilePath = Path.ToPath(); - RwLock::ExclusiveLockScope ValueLock(LockForHash(Entry.Key)); - if (fs::is_regular_file(FilePath)) - { - ZEN_DEBUG("deleting bad standalone cache file '{}'", Path.ToUtf8()); - std::error_code Ec; - fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file... - } - m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); - } - } - m_SlogFile.Append(LogEntries); - - // Clean up m_AccessTimes and m_Payloads vectors - { - std::vector<BucketPayload> Payloads; - std::vector<AccessTime> AccessTimes; - IndexMap Index; - - { - RwLock::ExclusiveLockScope __(m_IndexLock); - size_t EntryCount = m_Index.size(); - Payloads.reserve(EntryCount); - AccessTimes.reserve(EntryCount); - Index.reserve(EntryCount); - for (auto It : m_Index) - { - size_t EntryIndex = Payloads.size(); - Payloads.push_back(m_Payloads[EntryIndex]); - AccessTimes.push_back(m_AccessTimes[EntryIndex]); - Index.insert({It.first, EntryIndex}); - } - m_Index.swap(Index); - m_Payloads.swap(Payloads); - m_AccessTimes.swap(AccessTimes); - } - } - } - } - - // Let whomever it concerns know about the bad chunks. This could - // be used to invalidate higher level data structures more efficiently - // than a full validation pass might be able to do - Ctx.ReportBadCidChunks(BadKeys); - - ZEN_INFO("cache bucket scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes)); -} - -void -ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) -{ - ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::GatherReferences"); - - uint64_t WriteBlockTimeUs = 0; - uint64_t WriteBlockLongestTimeUs = 0; - uint64_t ReadBlockTimeUs = 0; - uint64_t ReadBlockLongestTimeUs = 0; - - Stopwatch TotalTimer; - const auto _ = MakeGuard([&] { - ZEN_DEBUG("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})", - m_BucketDir / m_BucketName, - NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), - NiceLatencyNs(WriteBlockTimeUs), - NiceLatencyNs(WriteBlockLongestTimeUs), - NiceLatencyNs(ReadBlockTimeUs), - NiceLatencyNs(ReadBlockLongestTimeUs)); - }); - - const GcClock::TimePoint ExpireTime = GcCtx.ExpireTime(); - - const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); - - IndexMap Index; - std::vector<AccessTime> AccessTimes; - std::vector<BucketPayload> Payloads; - { - RwLock::SharedLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - Index = m_Index; - AccessTimes = m_AccessTimes; - Payloads = m_Payloads; - } - - std::vector<IoHash> ExpiredKeys; - ExpiredKeys.reserve(1024); - - std::vector<IoHash> Cids; - Cids.reserve(1024); - - for (const auto& Entry : Index) - { - const IoHash& Key = Entry.first; - GcClock::Tick AccessTime = AccessTimes[Entry.second]; - if (AccessTime < ExpireTicks) - { - ExpiredKeys.push_back(Key); - continue; - } - - const DiskLocation& Loc = Payloads[Entry.second].Location; - - if (Loc.IsFlagSet(DiskLocation::kStructured)) - { - if (Cids.size() > 1024) - { - GcCtx.AddRetainedCids(Cids); - Cids.clear(); - } - - IoBuffer Buffer; - { - RwLock::SharedLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - // We don't need to hold the index lock when we read a standalone file - __.ReleaseNow(); - if (Buffer = GetStandaloneCacheValue(Loc, Key); !Buffer) - { - continue; - } - } - else if (Buffer = GetInlineCacheValue(Loc); !Buffer) - { - continue; - } - } - - ZEN_ASSERT(Buffer); - ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject); - CbObject Obj(SharedBuffer{Buffer}); - Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); - } - } - - GcCtx.AddRetainedCids(Cids); - GcCtx.SetExpiredCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys)); -} - -void -ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) -{ - ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::CollectGarbage"); - - ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir / m_BucketName); - - Stopwatch TotalTimer; - uint64_t WriteBlockTimeUs = 0; - uint64_t WriteBlockLongestTimeUs = 0; - uint64_t ReadBlockTimeUs = 0; - uint64_t ReadBlockLongestTimeUs = 0; - uint64_t TotalChunkCount = 0; - uint64_t DeletedSize = 0; - uint64_t OldTotalSize = TotalSize(); - - std::unordered_set<IoHash> DeletedChunks; - uint64_t MovedCount = 0; - - const auto _ = MakeGuard([&] { - ZEN_DEBUG( - "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved " - "{} " - "of {} " - "entires ({}).", - m_BucketDir / m_BucketName, - NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), - NiceLatencyNs(WriteBlockTimeUs), - NiceLatencyNs(WriteBlockLongestTimeUs), - NiceLatencyNs(ReadBlockTimeUs), - NiceLatencyNs(ReadBlockLongestTimeUs), - NiceBytes(DeletedSize), - DeletedChunks.size(), - MovedCount, - TotalChunkCount, - NiceBytes(OldTotalSize)); - RwLock::SharedLockScope _(m_IndexLock); - SaveManifest(); - }); - - m_SlogFile.Flush(); - - std::span<const IoHash> ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketDir.string()); - std::vector<IoHash> DeleteCacheKeys; - DeleteCacheKeys.reserve(ExpiredCacheKeys.size()); - GcCtx.FilterCids(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) { - if (Keep) - { - return; - } - DeleteCacheKeys.push_back(ChunkHash); - }); - if (DeleteCacheKeys.empty()) - { - ZEN_DEBUG("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir / m_BucketName); - return; - } - - auto __ = MakeGuard([&]() { - if (!DeletedChunks.empty()) - { - // Clean up m_AccessTimes and m_Payloads vectors - std::vector<BucketPayload> Payloads; - std::vector<AccessTime> AccessTimes; - IndexMap Index; - - { - RwLock::ExclusiveLockScope _(m_IndexLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - size_t EntryCount = m_Index.size(); - Payloads.reserve(EntryCount); - AccessTimes.reserve(EntryCount); - Index.reserve(EntryCount); - for (auto It : m_Index) - { - size_t EntryIndex = Payloads.size(); - Payloads.push_back(m_Payloads[EntryIndex]); - AccessTimes.push_back(m_AccessTimes[EntryIndex]); - Index.insert({It.first, EntryIndex}); - } - m_Index.swap(Index); - m_Payloads.swap(Payloads); - m_AccessTimes.swap(AccessTimes); - } - GcCtx.AddDeletedCids(std::vector<IoHash>(DeletedChunks.begin(), DeletedChunks.end())); - } - }); - - std::vector<DiskIndexEntry> ExpiredStandaloneEntries; - IndexMap Index; - BlockStore::ReclaimSnapshotState BlockStoreState; - { - RwLock::SharedLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ____ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - if (m_Index.empty()) - { - ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName); - return; - } - BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); - - SaveManifest(); - Index = m_Index; - - for (const IoHash& Key : DeleteCacheKeys) - { - if (auto It = Index.find(Key); It != Index.end()) - { - const BucketPayload& Payload = m_Payloads[It->second]; - DiskIndexEntry Entry = {.Key = It->first, .Location = Payload.Location}; - if (Entry.Location.Flags & DiskLocation::kStandaloneFile) - { - Entry.Location.Flags |= DiskLocation::kTombStone; - ExpiredStandaloneEntries.push_back(Entry); - } - } - } - if (GcCtx.IsDeletionMode()) - { - for (const auto& Entry : ExpiredStandaloneEntries) - { - m_Index.erase(Entry.Key); - m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); - DeletedChunks.insert(Entry.Key); - } - m_SlogFile.Append(ExpiredStandaloneEntries); - } - } - - if (GcCtx.IsDeletionMode()) - { - std::error_code Ec; - ExtendablePathBuilder<256> Path; - - for (const auto& Entry : ExpiredStandaloneEntries) - { - const IoHash& Key = Entry.Key; - const DiskLocation& Loc = Entry.Location; - - Path.Reset(); - BuildPath(Path, Key); - fs::path FilePath = Path.ToPath(); - - { - RwLock::SharedLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ____ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - if (m_Index.contains(Key)) - { - // Someone added it back, let the file on disk be - ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8()); - continue; - } - __.ReleaseNow(); - - RwLock::ExclusiveLockScope ValueLock(LockForHash(Key)); - if (fs::is_regular_file(FilePath)) - { - ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); - fs::remove(FilePath, Ec); - } - } - - if (Ec) - { - ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message()); - Ec.clear(); - DiskLocation RestoreLocation = Loc; - RestoreLocation.Flags &= ~DiskLocation::kTombStone; - - RwLock::ExclusiveLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - if (m_Index.contains(Key)) - { - continue; - } - m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation}); - size_t EntryIndex = m_Payloads.size(); - m_Payloads.emplace_back(BucketPayload{.Location = RestoreLocation}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - m_Index.insert({Key, EntryIndex}); - m_TotalStandaloneSize.fetch_add(RestoreLocation.Size(), std::memory_order::relaxed); - DeletedChunks.erase(Key); - continue; - } - DeletedSize += Entry.Location.Size(); - } - } - - TotalChunkCount = Index.size(); - - std::vector<IoHash> TotalChunkHashes; - TotalChunkHashes.reserve(TotalChunkCount); - for (const auto& Entry : Index) - { - const DiskLocation& Location = m_Payloads[Entry.second].Location; - - if (Location.Flags & DiskLocation::kStandaloneFile) - { - continue; - } - TotalChunkHashes.push_back(Entry.first); - } - - if (TotalChunkHashes.empty()) - { - return; - } - TotalChunkCount = TotalChunkHashes.size(); - - std::vector<BlockStoreLocation> ChunkLocations; - BlockStore::ChunkIndexArray KeepChunkIndexes; - std::vector<IoHash> ChunkIndexToChunkHash; - ChunkLocations.reserve(TotalChunkCount); - ChunkLocations.reserve(TotalChunkCount); - ChunkIndexToChunkHash.reserve(TotalChunkCount); - - GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { - auto KeyIt = Index.find(ChunkHash); - const DiskLocation& DiskLocation = m_Payloads[KeyIt->second].Location; - BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment); - size_t ChunkIndex = ChunkLocations.size(); - ChunkLocations.push_back(Location); - ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; - if (Keep) - { - KeepChunkIndexes.push_back(ChunkIndex); - } - }); - - size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size(); - - const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); - if (!PerformDelete) - { - m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); - uint64_t CurrentTotalSize = TotalSize(); - ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} {}", - m_BucketDir / m_BucketName, - DeleteCount, - TotalChunkCount, - NiceBytes(CurrentTotalSize)); - return; - } - - m_BlockStore.ReclaimSpace( - BlockStoreState, - ChunkLocations, - KeepChunkIndexes, - m_PayloadAlignment, - false, - [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { - std::vector<DiskIndexEntry> LogEntries; - LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); - for (const auto& Entry : MovedChunks) - { - size_t ChunkIndex = Entry.first; - const BlockStoreLocation& NewLocation = Entry.second; - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - const BucketPayload& OldPayload = m_Payloads[Index[ChunkHash]]; - const DiskLocation& OldDiskLocation = OldPayload.Location; - LogEntries.push_back( - {.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, OldDiskLocation.GetFlags())}); - } - for (const size_t ChunkIndex : RemovedChunks) - { - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - const BucketPayload& OldPayload = m_Payloads[Index[ChunkHash]]; - const DiskLocation& OldDiskLocation = OldPayload.Location; - LogEntries.push_back({.Key = ChunkHash, - .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_PayloadAlignment), - m_PayloadAlignment, - OldDiskLocation.GetFlags() | DiskLocation::kTombStone)}); - DeletedChunks.insert(ChunkHash); - } - - m_SlogFile.Append(LogEntries); - m_SlogFile.Flush(); - { - RwLock::ExclusiveLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ____ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - for (const DiskIndexEntry& Entry : LogEntries) - { - if (Entry.Location.GetFlags() & DiskLocation::kTombStone) - { - m_Index.erase(Entry.Key); - continue; - } - m_Payloads[m_Index[Entry.Key]].Location = Entry.Location; - } - } - }, - [&]() { return GcCtx.CollectSmallObjects(); }); -} - -void -ZenCacheDiskLayer::CacheBucket::UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes) -{ - using namespace access_tracking; - - for (const KeyAccessTime& KeyTime : AccessTimes) - { - if (auto It = m_Index.find(KeyTime.Key); It != m_Index.end()) - { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_AccessTimes[EntryIndex] = KeyTime.LastAccess; - } - } -} - -uint64_t -ZenCacheDiskLayer::CacheBucket::EntryCount() const -{ - RwLock::SharedLockScope _(m_IndexLock); - return static_cast<uint64_t>(m_Index.size()); -} - -CacheValueDetails::ValueDetails -ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, size_t Index) const -{ - std::vector<IoHash> Attachments; - const BucketPayload& Payload = m_Payloads[Index]; - if (Payload.Location.IsFlagSet(DiskLocation::kStructured)) - { - IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile) ? GetStandaloneCacheValue(Payload.Location, Key) - : GetInlineCacheValue(Payload.Location); - CbObject Obj(SharedBuffer{Value}); - Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); }); - } - return CacheValueDetails::ValueDetails{.Size = Payload.Location.Size(), - .RawSize = Payload.RawSize, - .RawHash = Payload.RawHash, - .LastAccess = m_AccessTimes[Index], - .Attachments = std::move(Attachments), - .ContentType = Payload.Location.GetContentType()}; -} - -CacheValueDetails::BucketDetails -ZenCacheDiskLayer::CacheBucket::GetValueDetails(const std::string_view ValueFilter) const -{ - CacheValueDetails::BucketDetails Details; - RwLock::SharedLockScope _(m_IndexLock); - if (ValueFilter.empty()) - { - Details.Values.reserve(m_Index.size()); - for (const auto& It : m_Index) - { - Details.Values.insert_or_assign(It.first, GetValueDetails(It.first, It.second)); - } - } - else - { - IoHash Key = IoHash::FromHexString(ValueFilter); - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - Details.Values.insert_or_assign(It->first, GetValueDetails(It->first, It->second)); - } - } - return Details; -} - -void -ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) -{ - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - CacheBucket& Bucket = *Kv.second; - Bucket.CollectGarbage(GcCtx); - } -} - -void -ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes) -{ - RwLock::SharedLockScope _(m_Lock); - - for (const auto& Kv : AccessTimes.Buckets) - { - if (auto It = m_Buckets.find(Kv.first); It != m_Buckets.end()) - { - CacheBucket& Bucket = *It->second; - Bucket.UpdateAccessTimes(Kv.second); - } - } -} - -void -ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) -{ - uint64_t NewFileSize = Value.Value.Size(); - - TemporaryFile DataFile; - - std::error_code Ec; - DataFile.CreateTemporary(m_BucketDir.c_str(), Ec); - if (Ec) - { - throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir)); - } - - bool CleanUpTempFile = false; - auto __ = MakeGuard([&] { - if (CleanUpTempFile) - { - std::error_code Ec; - std::filesystem::remove(DataFile.GetPath(), Ec); - if (Ec) - { - ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'", - DataFile.GetPath(), - m_BucketDir, - Ec.message()); - } - } - }); - - DataFile.WriteAll(Value.Value, Ec); - if (Ec) - { - throw std::system_error(Ec, - fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'", - NiceBytes(NewFileSize), - DataFile.GetPath().string(), - m_BucketDir)); - } - - ExtendablePathBuilder<256> DataFilePath; - BuildPath(DataFilePath, HashKey); - std::filesystem::path FsPath{DataFilePath.ToPath()}; - - RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); - - // We do a speculative remove of the file instead of probing with a exists call and check the error code instead - std::filesystem::remove(FsPath, Ec); - if (Ec) - { - if (Ec.value() != ENOENT) - { - ZEN_WARN("Failed to remove file '{}' for put in '{}', reason: '{}', retrying.", FsPath, m_BucketDir, Ec.message()); - Sleep(100); - Ec.clear(); - std::filesystem::remove(FsPath, Ec); - if (Ec && Ec.value() != ENOENT) - { - throw std::system_error(Ec, fmt::format("Failed to remove file '{}' for put in '{}'", FsPath, m_BucketDir)); - } - } - } - - DataFile.MoveTemporaryIntoPlace(FsPath, Ec); - if (Ec) - { - CreateDirectories(FsPath.parent_path()); - Ec.clear(); - - // Try again - DataFile.MoveTemporaryIntoPlace(FsPath, Ec); - if (Ec) - { - ZEN_WARN("Failed to finalize file '{}', moving from '{}' for put in '{}', reason: '{}', retrying.", - FsPath, - DataFile.GetPath(), - m_BucketDir, - Ec.message()); - Sleep(100); - Ec.clear(); - DataFile.MoveTemporaryIntoPlace(FsPath, Ec); - if (Ec) - { - throw std::system_error( - Ec, - fmt::format("Failed to finalize file '{}', moving from '{}' for put in '{}'", FsPath, DataFile.GetPath(), m_BucketDir)); - } - } - } - - // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file - // will be disabled as the file handle has already been closed - CleanUpTempFile = false; - - uint8_t EntryFlags = DiskLocation::kStandaloneFile; - - if (Value.Value.GetContentType() == ZenContentType::kCbObject) - { - EntryFlags |= DiskLocation::kStructured; - } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) - { - EntryFlags |= DiskLocation::kCompressed; - } - - DiskLocation Loc(NewFileSize, EntryFlags); - - RwLock::ExclusiveLockScope _(m_IndexLock); - if (auto It = m_Index.find(HashKey); It == m_Index.end()) - { - // Previously unknown object - size_t EntryIndex = m_Payloads.size(); - m_Payloads.emplace_back(BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - m_Index.insert_or_assign(HashKey, EntryIndex); - } - else - { - // TODO: should check if write is idempotent and bail out if it is? - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_Payloads[EntryIndex] = BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash}; - m_AccessTimes.emplace_back(GcClock::TickCount()); - m_TotalStandaloneSize.fetch_sub(Loc.Size(), std::memory_order::relaxed); - } - - m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - m_TotalStandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed); -} - -void -ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) -{ - uint8_t EntryFlags = 0; - - if (Value.Value.GetContentType() == ZenContentType::kCbObject) - { - EntryFlags |= DiskLocation::kStructured; - } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) - { - EntryFlags |= DiskLocation::kCompressed; - } - - m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) { - DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); - m_SlogFile.Append({.Key = HashKey, .Location = Location}); - - RwLock::ExclusiveLockScope _(m_IndexLock); - if (auto It = m_Index.find(HashKey); It != m_Index.end()) - { - // TODO: should check if write is idempotent and bail out if it is? - // this would requiring comparing contents on disk unless we add a - // content hash to the index entry - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_Payloads[EntryIndex] = (BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); - m_AccessTimes[EntryIndex] = GcClock::TickCount(); - } - else - { - size_t EntryIndex = m_Payloads.size(); - m_Payloads.emplace_back(BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - m_Index.insert_or_assign(HashKey, EntryIndex); - } - }); -} - -////////////////////////////////////////////////////////////////////////// - -ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir) : m_RootDir(RootDir) -{ -} - -ZenCacheDiskLayer::~ZenCacheDiskLayer() = default; - -bool -ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) -{ - const auto BucketName = std::string(InBucket); - CacheBucket* Bucket = nullptr; - - { - RwLock::SharedLockScope _(m_Lock); - - auto It = m_Buckets.find(BucketName); - - if (It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - } - - if (Bucket == nullptr) - { - // Bucket needs to be opened/created - - RwLock::ExclusiveLockScope _(m_Lock); - - if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - else - { - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName)); - Bucket = InsertResult.first->second.get(); - - std::filesystem::path BucketPath = m_RootDir; - BucketPath /= BucketName; - - if (!Bucket->OpenOrCreate(BucketPath)) - { - m_Buckets.erase(InsertResult.first); - return false; - } - } - } - - ZEN_ASSERT(Bucket != nullptr); - return Bucket->Get(HashKey, OutValue); -} - -void -ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) -{ - const auto BucketName = std::string(InBucket); - CacheBucket* Bucket = nullptr; - - { - RwLock::SharedLockScope _(m_Lock); - - auto It = m_Buckets.find(BucketName); - - if (It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - } - - if (Bucket == nullptr) - { - // New bucket needs to be created - - RwLock::ExclusiveLockScope _(m_Lock); - - if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - else - { - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName)); - Bucket = InsertResult.first->second.get(); - - std::filesystem::path BucketPath = m_RootDir; - BucketPath /= BucketName; - - try - { - if (!Bucket->OpenOrCreate(BucketPath)) - { - ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); - m_Buckets.erase(InsertResult.first); - return; - } - } - catch (const std::exception& Err) - { - ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); - return; - } - } - } - - ZEN_ASSERT(Bucket != nullptr); - - Bucket->Put(HashKey, Value); -} - -void -ZenCacheDiskLayer::DiscoverBuckets() -{ - DirectoryContent DirContent; - GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent); - - // Initialize buckets - - RwLock::ExclusiveLockScope _(m_Lock); - - for (const std::filesystem::path& BucketPath : DirContent.Directories) - { - const std::string BucketName = PathToUtf8(BucketPath.stem()); - // New bucket needs to be created - if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) - { - continue; - } - - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName)); - CacheBucket& Bucket = *InsertResult.first->second; - - try - { - if (!Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false)) - { - ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); - - m_Buckets.erase(InsertResult.first); - continue; - } - } - catch (const std::exception& Err) - { - ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); - return; - } - ZEN_INFO("Discovered bucket '{}'", BucketName); - } -} - -bool -ZenCacheDiskLayer::DropBucket(std::string_view InBucket) -{ - RwLock::ExclusiveLockScope _(m_Lock); - - auto It = m_Buckets.find(std::string(InBucket)); - - if (It != m_Buckets.end()) - { - CacheBucket& Bucket = *It->second; - m_DroppedBuckets.push_back(std::move(It->second)); - m_Buckets.erase(It); - - return Bucket.Drop(); - } - - // Make sure we remove the folder even if we don't know about the bucket - std::filesystem::path BucketPath = m_RootDir; - BucketPath /= std::string(InBucket); - return MoveAndDeleteDirectory(BucketPath); -} - -bool -ZenCacheDiskLayer::Drop() -{ - RwLock::ExclusiveLockScope _(m_Lock); - - std::vector<std::unique_ptr<CacheBucket>> Buckets; - Buckets.reserve(m_Buckets.size()); - while (!m_Buckets.empty()) - { - const auto& It = m_Buckets.begin(); - CacheBucket& Bucket = *It->second; - m_DroppedBuckets.push_back(std::move(It->second)); - m_Buckets.erase(It->first); - if (!Bucket.Drop()) - { - return false; - } - } - return MoveAndDeleteDirectory(m_RootDir); -} - -void -ZenCacheDiskLayer::Flush() -{ - std::vector<CacheBucket*> Buckets; - - { - RwLock::SharedLockScope _(m_Lock); - Buckets.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) - { - CacheBucket* Bucket = Kv.second.get(); - Buckets.push_back(Bucket); - } - } - - for (auto& Bucket : Buckets) - { - Bucket->Flush(); - } -} - -void -ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) -{ - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - CacheBucket& Bucket = *Kv.second; - Bucket.Scrub(Ctx); - } -} - -void -ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx) -{ - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - CacheBucket& Bucket = *Kv.second; - Bucket.GatherReferences(GcCtx); - } -} - -uint64_t -ZenCacheDiskLayer::TotalSize() const -{ - uint64_t TotalSize{}; - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - TotalSize += Kv.second->TotalSize(); - } - - return TotalSize; -} - -ZenCacheDiskLayer::Info -ZenCacheDiskLayer::GetInfo() const -{ - ZenCacheDiskLayer::Info Info = {.Config = {.RootDir = m_RootDir}, .TotalSize = TotalSize()}; - - RwLock::SharedLockScope _(m_Lock); - Info.BucketNames.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) - { - Info.BucketNames.push_back(Kv.first); - Info.EntryCount += Kv.second->EntryCount(); - } - return Info; -} - -std::optional<ZenCacheDiskLayer::BucketInfo> -ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const -{ - RwLock::SharedLockScope _(m_Lock); - - if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) - { - return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()}; - } - return {}; -} - -CacheValueDetails::NamespaceDetails -ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const -{ - RwLock::SharedLockScope _(m_Lock); - CacheValueDetails::NamespaceDetails Details; - if (BucketFilter.empty()) - { - Details.Buckets.reserve(BucketFilter.empty() ? m_Buckets.size() : 1); - for (auto& Kv : m_Buckets) - { - Details.Buckets[Kv.first] = Kv.second->GetValueDetails(ValueFilter); - } - } - else if (auto It = m_Buckets.find(std::string(BucketFilter)); It != m_Buckets.end()) - { - Details.Buckets[It->first] = It->second->GetValueDetails(ValueFilter); - } - return Details; -} - -//////////////////////////// ZenCacheStore - -static constexpr std::string_view UE4DDCNamespaceName = "ue4.ddc"; - -ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration) : m_Gc(Gc), m_Configuration(Configuration) -{ - CreateDirectories(m_Configuration.BasePath); - - DirectoryContent DirContent; - GetDirectoryContent(m_Configuration.BasePath, DirectoryContent::IncludeDirsFlag, DirContent); - - std::vector<std::string> Namespaces; - for (const std::filesystem::path& DirPath : DirContent.Directories) - { - std::string DirName = PathToUtf8(DirPath.filename()); - if (DirName.starts_with(NamespaceDiskPrefix)) - { - Namespaces.push_back(DirName.substr(NamespaceDiskPrefix.length())); - continue; - } - } - - ZEN_INFO("Found {} namespaces in '{}'", Namespaces.size(), m_Configuration.BasePath); - - if (std::find(Namespaces.begin(), Namespaces.end(), UE4DDCNamespaceName) == Namespaces.end()) - { - // default (unspecified) and ue4-ddc namespace points to the same namespace instance - - std::filesystem::path DefaultNamespaceFolder = - m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName); - CreateDirectories(DefaultNamespaceFolder); - Namespaces.push_back(std::string(UE4DDCNamespaceName)); - } - - for (const std::string& NamespaceName : Namespaces) - { - m_Namespaces[NamespaceName] = - std::make_unique<ZenCacheNamespace>(Gc, m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName)); - } -} - -ZenCacheStore::~ZenCacheStore() -{ - m_Namespaces.clear(); -} - -bool -ZenCacheStore::Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue) -{ - if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) - { - return Store->Get(Bucket, HashKey, OutValue); - } - ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Get, bucket '{}', key '{}'", Namespace, Bucket, HashKey.ToHexString()); - - return false; -} - -void -ZenCacheStore::Put(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value) -{ - if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) - { - return Store->Put(Bucket, HashKey, Value); - } - ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put, bucket '{}', key '{}'", Namespace, Bucket, HashKey.ToHexString()); -} - -bool -ZenCacheStore::DropBucket(std::string_view Namespace, std::string_view Bucket) -{ - if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) - { - return Store->DropBucket(Bucket); - } - ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropBucket, bucket '{}'", Namespace, Bucket); - return false; -} - -bool -ZenCacheStore::DropNamespace(std::string_view InNamespace) -{ - RwLock::SharedLockScope _(m_NamespacesLock); - if (auto It = m_Namespaces.find(std::string(InNamespace)); It != m_Namespaces.end()) - { - ZenCacheNamespace& Namespace = *It->second; - m_DroppedNamespaces.push_back(std::move(It->second)); - m_Namespaces.erase(It); - return Namespace.Drop(); - } - ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropNamespace", InNamespace); - return false; -} - -void -ZenCacheStore::Flush() -{ - IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Flush(); }); -} - -void -ZenCacheStore::Scrub(ScrubContext& Ctx) -{ - IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Scrub(Ctx); }); -} - -CacheValueDetails -ZenCacheStore::GetValueDetails(const std::string_view NamespaceFilter, - const std::string_view BucketFilter, - const std::string_view ValueFilter) const -{ - CacheValueDetails Details; - if (NamespaceFilter.empty()) - { - IterateNamespaces([&](std::string_view Namespace, ZenCacheNamespace& Store) { - Details.Namespaces[std::string(Namespace)] = Store.GetValueDetails(BucketFilter, ValueFilter); - }); - } - else if (const ZenCacheNamespace* Store = FindNamespace(NamespaceFilter); Store != nullptr) - { - Details.Namespaces[std::string(NamespaceFilter)] = Store->GetValueDetails(BucketFilter, ValueFilter); - } - return Details; -} - -ZenCacheNamespace* -ZenCacheStore::GetNamespace(std::string_view Namespace) -{ - RwLock::SharedLockScope _(m_NamespacesLock); - if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end()) - { - return It->second.get(); - } - if (Namespace == DefaultNamespace) - { - if (auto It = m_Namespaces.find(std::string(UE4DDCNamespaceName)); It != m_Namespaces.end()) - { - return It->second.get(); - } - } - _.ReleaseNow(); - - if (!m_Configuration.AllowAutomaticCreationOfNamespaces) - { - return nullptr; - } - - RwLock::ExclusiveLockScope __(m_NamespacesLock); - if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end()) - { - return It->second.get(); - } - - auto NewNamespace = m_Namespaces.insert_or_assign( - std::string(Namespace), - std::make_unique<ZenCacheNamespace>(m_Gc, m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, Namespace))); - return NewNamespace.first->second.get(); -} - -const ZenCacheNamespace* -ZenCacheStore::FindNamespace(std::string_view Namespace) const -{ - RwLock::SharedLockScope _(m_NamespacesLock); - if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end()) - { - return It->second.get(); - } - if (Namespace == DefaultNamespace) - { - if (auto It = m_Namespaces.find(std::string(UE4DDCNamespaceName)); It != m_Namespaces.end()) - { - return It->second.get(); - } - } - return nullptr; -} - -void -ZenCacheStore::IterateNamespaces(const std::function<void(std::string_view Namespace, ZenCacheNamespace& Store)>& Callback) const -{ - std::vector<std::pair<std::string, ZenCacheNamespace&>> Namespaces; - { - RwLock::SharedLockScope _(m_NamespacesLock); - Namespaces.reserve(m_Namespaces.size()); - for (const auto& Entry : m_Namespaces) - { - if (Entry.first == DefaultNamespace) - { - continue; - } - Namespaces.push_back({Entry.first, *Entry.second}); - } - } - for (auto& Entry : Namespaces) - { - Callback(Entry.first, Entry.second); - } -} - -GcStorageSize -ZenCacheStore::StorageSize() const -{ - GcStorageSize Size; - IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { - GcStorageSize StoreSize = Store.StorageSize(); - Size.MemorySize += StoreSize.MemorySize; - Size.DiskSize += StoreSize.DiskSize; - }); - return Size; -} - -ZenCacheStore::Info -ZenCacheStore::GetInfo() const -{ - ZenCacheStore::Info Info = {.Config = m_Configuration, .StorageSize = StorageSize()}; - - IterateNamespaces([&Info](std::string_view NamespaceName, ZenCacheNamespace& Namespace) { - Info.NamespaceNames.push_back(std::string(NamespaceName)); - ZenCacheNamespace::Info NamespaceInfo = Namespace.GetInfo(); - Info.DiskEntryCount += NamespaceInfo.DiskLayerInfo.EntryCount; - Info.MemoryEntryCount += NamespaceInfo.MemoryLayerInfo.EntryCount; - }); - - return Info; -} - -std::optional<ZenCacheNamespace::Info> -ZenCacheStore::GetNamespaceInfo(std::string_view NamespaceName) -{ - if (const ZenCacheNamespace* Namespace = FindNamespace(NamespaceName); Namespace) - { - return Namespace->GetInfo(); - } - return {}; -} - -std::optional<ZenCacheNamespace::BucketInfo> -ZenCacheStore::GetBucketInfo(std::string_view NamespaceName, std::string_view BucketName) -{ - if (const ZenCacheNamespace* Namespace = FindNamespace(NamespaceName); Namespace) - { - return Namespace->GetBucketInfo(BucketName); - } - return {}; -} - -////////////////////////////////////////////////////////////////////////// - -#if ZEN_WITH_TESTS - -using namespace std::literals; - -namespace testutils { - IoHash CreateKey(size_t KeyValue) { return IoHash::HashBuffer(&KeyValue, sizeof(size_t)); } - - IoBuffer CreateBinaryCacheValue(uint64_t Size) - { - static std::random_device rd; - static std::mt19937 g(rd()); - - std::vector<uint8_t> Values; - Values.resize(Size); - for (size_t Idx = 0; Idx < Size; ++Idx) - { - Values[Idx] = static_cast<uint8_t>(Idx); - } - std::shuffle(Values.begin(), Values.end(), g); - - IoBuffer Buf(IoBuffer::Clone, Values.data(), Values.size()); - Buf.SetContentType(ZenContentType::kBinary); - return Buf; - }; - -} // namespace testutils - -TEST_CASE("z$.store") -{ - ScopedTemporaryDirectory TempDir; - - GcManager Gc; - - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); - - const int kIterationCount = 100; - - for (int i = 0; i < kIterationCount; ++i) - { - const IoHash Key = IoHash::HashBuffer(&i, sizeof i); - - CbObjectWriter Cbo; - Cbo << "hey" << i; - CbObject Obj = Cbo.Save(); - - ZenCacheValue Value; - Value.Value = Obj.GetBuffer().AsIoBuffer(); - Value.Value.SetContentType(ZenContentType::kCbObject); - - Zcs.Put("test_bucket"sv, Key, Value); - } - - for (int i = 0; i < kIterationCount; ++i) - { - const IoHash Key = IoHash::HashBuffer(&i, sizeof i); - - ZenCacheValue Value; - Zcs.Get("test_bucket"sv, Key, /* out */ Value); - - REQUIRE(Value.Value); - CHECK(Value.Value.GetContentType() == ZenContentType::kCbObject); - CHECK_EQ(ValidateCompactBinary(Value.Value, CbValidateMode::All), CbValidateError::None); - CbObject Obj = LoadCompactBinaryObject(Value.Value); - CHECK_EQ(Obj["hey"].AsInt32(), i); - } -} - -TEST_CASE("z$.size") -{ - const auto CreateCacheValue = [](size_t Size) -> CbObject { - std::vector<uint8_t> Buf; - Buf.resize(Size); - - CbObjectWriter Writer; - Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); - return Writer.Save(); - }; - - SUBCASE("mem/disklayer") - { - const size_t Count = 16; - ScopedTemporaryDirectory TempDir; - - GcStorageSize CacheSize; - - { - GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); - - CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() - 256); - - IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); - Buffer.SetContentType(ZenContentType::kCbObject); - - for (size_t Key = 0; Key < Count; ++Key) - { - const size_t Bucket = Key % 4; - Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), ZenCacheValue{.Value = Buffer}); - } - - CacheSize = Zcs.StorageSize(); - CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize); - CHECK_LE(CacheValue.GetSize() * Count, CacheSize.MemorySize); - } - - { - GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); - - const GcStorageSize SerializedSize = Zcs.StorageSize(); - CHECK_EQ(SerializedSize.MemorySize, 0); - CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize); - - for (size_t Bucket = 0; Bucket < 4; ++Bucket) - { - Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket)); - } - CHECK_EQ(0, Zcs.StorageSize().DiskSize); - } - } - - SUBCASE("disklayer") - { - const size_t Count = 16; - ScopedTemporaryDirectory TempDir; - - GcStorageSize CacheSize; - - { - GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); - - CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() + 64); - - IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); - Buffer.SetContentType(ZenContentType::kCbObject); - - for (size_t Key = 0; Key < Count; ++Key) - { - const size_t Bucket = Key % 4; - Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}); - } - - CacheSize = Zcs.StorageSize(); - CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize); - CHECK_EQ(0, CacheSize.MemorySize); - } - - { - GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); - - const GcStorageSize SerializedSize = Zcs.StorageSize(); - CHECK_EQ(SerializedSize.MemorySize, 0); - CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize); - - for (size_t Bucket = 0; Bucket < 4; ++Bucket) - { - Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket)); - } - CHECK_EQ(0, Zcs.StorageSize().DiskSize); - } - } -} - -TEST_CASE("z$.gc") -{ - using namespace testutils; - - SUBCASE("gather references does NOT add references for expired cache entries") - { - ScopedTemporaryDirectory TempDir; - std::vector<IoHash> Cids{CreateKey(1), CreateKey(2), CreateKey(3)}; - - const auto CollectAndFilter = [](GcManager& Gc, - GcClock::TimePoint Time, - GcClock::Duration MaxDuration, - std::span<const IoHash> Cids, - std::vector<IoHash>& OutKeep) { - GcContext GcCtx(Time - MaxDuration); - Gc.CollectGarbage(GcCtx); - OutKeep.clear(); - GcCtx.FilterCids(Cids, [&OutKeep](const IoHash& Hash) { OutKeep.push_back(Hash); }); - }; - - { - GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); - const auto Bucket = "teardrinker"sv; - - // Create a cache record - const IoHash Key = CreateKey(42); - CbObjectWriter Record; - Record << "Key"sv - << "SomeRecord"sv; - - for (size_t Idx = 0; auto& Cid : Cids) - { - Record.AddBinaryAttachment(fmt::format("attachment-{}", Idx++), Cid); - } - - IoBuffer Buffer = Record.Save().GetBuffer().AsIoBuffer(); - Buffer.SetContentType(ZenContentType::kCbObject); - - Zcs.Put(Bucket, Key, {.Value = Buffer}); - - std::vector<IoHash> Keep; - - // Collect garbage with 1 hour max cache duration - { - CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep); - CHECK_EQ(Cids.size(), Keep.size()); - } - - // Move forward in time - { - CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep); - CHECK_EQ(0, Keep.size()); - } - } - - // Expect timestamps to be serialized - { - GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); - std::vector<IoHash> Keep; - - // Collect garbage with 1 hour max cache duration - { - CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep); - CHECK_EQ(3, Keep.size()); - } - - // Move forward in time - { - CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep); - CHECK_EQ(0, Keep.size()); - } - } - } - - SUBCASE("gc removes standalone values") - { - ScopedTemporaryDirectory TempDir; - GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); - const auto Bucket = "fortysixandtwo"sv; - const GcClock::TimePoint CurrentTime = GcClock::Now(); - - std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)}; - - for (const auto& Key : Keys) - { - IoBuffer Value = testutils::CreateBinaryCacheValue(128 << 10); - Zcs.Put(Bucket, Key, {.Value = Value}); - } - - { - GcContext GcCtx(CurrentTime - std::chrono::hours(46)); - - Gc.CollectGarbage(GcCtx); - - for (const auto& Key : Keys) - { - ZenCacheValue CacheValue; - const bool Exists = Zcs.Get(Bucket, Key, CacheValue); - CHECK(Exists); - } - } - - // Move forward in time and collect again - { - GcContext GcCtx(CurrentTime + std::chrono::minutes(2)); - Gc.CollectGarbage(GcCtx); - - for (const auto& Key : Keys) - { - ZenCacheValue CacheValue; - const bool Exists = Zcs.Get(Bucket, Key, CacheValue); - CHECK(!Exists); - } - - CHECK_EQ(0, Zcs.StorageSize().DiskSize); - } - } - - SUBCASE("gc removes small objects") - { - ScopedTemporaryDirectory TempDir; - GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); - const auto Bucket = "rightintwo"sv; - - std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)}; - - for (const auto& Key : Keys) - { - IoBuffer Value = testutils::CreateBinaryCacheValue(128); - Zcs.Put(Bucket, Key, {.Value = Value}); - } - - { - GcContext GcCtx(GcClock::Now() - std::chrono::hours(2)); - GcCtx.CollectSmallObjects(true); - - Gc.CollectGarbage(GcCtx); - - for (const auto& Key : Keys) - { - ZenCacheValue CacheValue; - const bool Exists = Zcs.Get(Bucket, Key, CacheValue); - CHECK(Exists); - } - } - - // Move forward in time and collect again - { - GcContext GcCtx(GcClock::Now() + std::chrono::minutes(2)); - GcCtx.CollectSmallObjects(true); - - Zcs.Flush(); - Gc.CollectGarbage(GcCtx); - - for (const auto& Key : Keys) - { - ZenCacheValue CacheValue; - const bool Exists = Zcs.Get(Bucket, Key, CacheValue); - CHECK(!Exists); - } - - CHECK_EQ(0, Zcs.StorageSize().DiskSize); - } - } -} - -TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) -{ - // for (uint32_t i = 0; i < 100; ++i) - { - ScopedTemporaryDirectory TempDir; - - const uint64_t kChunkSize = 1048; - const int32_t kChunkCount = 8192; - - struct Chunk - { - std::string Bucket; - IoBuffer Buffer; - }; - std::unordered_map<IoHash, Chunk, IoHash::Hasher> Chunks; - Chunks.reserve(kChunkCount); - - const std::string Bucket1 = "rightinone"; - const std::string Bucket2 = "rightintwo"; - - for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) - { - while (true) - { - IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); - IoHash Hash = HashBuffer(Chunk); - if (Chunks.contains(Hash)) - { - continue; - } - Chunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk}; - break; - } - while (true) - { - IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); - IoHash Hash = HashBuffer(Chunk); - if (Chunks.contains(Hash)) - { - continue; - } - Chunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk}; - break; - } - } - - CreateDirectories(TempDir.Path()); - - WorkerThreadPool ThreadPool(4); - GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path()); - - { - std::atomic<size_t> WorkCompleted = 0; - for (const auto& Chunk : Chunks) - { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { - Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}); - WorkCompleted.fetch_add(1); - }); - } - while (WorkCompleted < Chunks.size()) - { - Sleep(1); - } - } - - const uint64_t TotalSize = Zcs.StorageSize().DiskSize; - CHECK_LE(kChunkSize * Chunks.size(), TotalSize); - - { - std::atomic<size_t> WorkCompleted = 0; - for (const auto& Chunk : Chunks) - { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { - std::string Bucket = Chunk.second.Bucket; - IoHash ChunkHash = Chunk.first; - ZenCacheValue CacheValue; - - CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue)); - IoHash Hash = IoHash::HashBuffer(CacheValue.Value); - CHECK(ChunkHash == Hash); - WorkCompleted.fetch_add(1); - }); - } - while (WorkCompleted < Chunks.size()) - { - Sleep(1); - } - } - std::unordered_map<IoHash, std::string, IoHash::Hasher> GcChunkHashes; - GcChunkHashes.reserve(Chunks.size()); - for (const auto& Chunk : Chunks) - { - GcChunkHashes[Chunk.first] = Chunk.second.Bucket; - } - { - std::unordered_map<IoHash, Chunk, IoHash::Hasher> NewChunks; - - for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) - { - { - IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); - IoHash Hash = HashBuffer(Chunk); - NewChunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk}; - } - { - IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); - IoHash Hash = HashBuffer(Chunk); - NewChunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk}; - } - } - - std::atomic<size_t> WorkCompleted = 0; - std::atomic_uint32_t AddedChunkCount = 0; - for (const auto& Chunk : NewChunks) - { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() { - Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}); - AddedChunkCount.fetch_add(1); - WorkCompleted.fetch_add(1); - }); - } - - for (const auto& Chunk : Chunks) - { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() { - ZenCacheValue CacheValue; - if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) - { - CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); - } - WorkCompleted.fetch_add(1); - }); - } - while (AddedChunkCount.load() < NewChunks.size()) - { - // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope - for (const auto& Chunk : NewChunks) - { - ZenCacheValue CacheValue; - if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) - { - GcChunkHashes[Chunk.first] = Chunk.second.Bucket; - } - } - std::vector<IoHash> KeepHashes; - KeepHashes.reserve(GcChunkHashes.size()); - for (const auto& Entry : GcChunkHashes) - { - KeepHashes.push_back(Entry.first); - } - size_t C = 0; - while (C < KeepHashes.size()) - { - if (C % 155 == 0) - { - if (C < KeepHashes.size() - 1) - { - KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); - } - if (C + 3 < KeepHashes.size() - 1) - { - KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); - } - } - C++; - } - - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - GcCtx.AddRetainedCids(KeepHashes); - Zcs.CollectGarbage(GcCtx); - const HashKeySet& Deleted = GcCtx.DeletedCids(); - Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); - } - - while (WorkCompleted < NewChunks.size() + Chunks.size()) - { - Sleep(1); - } - - { - // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope - for (const auto& Chunk : NewChunks) - { - ZenCacheValue CacheValue; - if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) - { - GcChunkHashes[Chunk.first] = Chunk.second.Bucket; - } - } - std::vector<IoHash> KeepHashes; - KeepHashes.reserve(GcChunkHashes.size()); - for (const auto& Entry : GcChunkHashes) - { - KeepHashes.push_back(Entry.first); - } - size_t C = 0; - while (C < KeepHashes.size()) - { - if (C % 155 == 0) - { - if (C < KeepHashes.size() - 1) - { - KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); - } - if (C + 3 < KeepHashes.size() - 1) - { - KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); - } - } - C++; - } - - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - GcCtx.AddRetainedCids(KeepHashes); - Zcs.CollectGarbage(GcCtx); - const HashKeySet& Deleted = GcCtx.DeletedCids(); - Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); - } - } - { - std::atomic<size_t> WorkCompleted = 0; - for (const auto& Chunk : GcChunkHashes) - { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() { - ZenCacheValue CacheValue; - CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue)); - CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); - WorkCompleted.fetch_add(1); - }); - } - while (WorkCompleted < GcChunkHashes.size()) - { - Sleep(1); - } - } - } -} - -TEST_CASE("z$.namespaces") -{ - using namespace testutils; - - const auto CreateCacheValue = [](size_t Size) -> CbObject { - std::vector<uint8_t> Buf; - Buf.resize(Size); - - CbObjectWriter Writer; - Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); - return Writer.Save(); - }; - - ScopedTemporaryDirectory TempDir; - CreateDirectories(TempDir.Path()); - - IoHash Key1; - IoHash Key2; - { - GcManager Gc; - ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = false}); - const auto Bucket = "teardrinker"sv; - const auto CustomNamespace = "mynamespace"sv; - - // Create a cache record - Key1 = CreateKey(42); - CbObject CacheValue = CreateCacheValue(4096); - - IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); - Buffer.SetContentType(ZenContentType::kCbObject); - - ZenCacheValue PutValue = {.Value = Buffer}; - Zcs.Put(ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue); - - ZenCacheValue GetValue; - CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue)); - CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue)); - - // This should just be dropped as we don't allow creating of namespaces on the fly - Zcs.Put(CustomNamespace, Bucket, Key1, PutValue); - CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue)); - } - - { - GcManager Gc; - ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}); - const auto Bucket = "teardrinker"sv; - const auto CustomNamespace = "mynamespace"sv; - - Key2 = CreateKey(43); - CbObject CacheValue2 = CreateCacheValue(4096); - - IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer(); - Buffer2.SetContentType(ZenContentType::kCbObject); - ZenCacheValue PutValue2 = {.Value = Buffer2}; - Zcs.Put(CustomNamespace, Bucket, Key2, PutValue2); - - ZenCacheValue GetValue; - CHECK(!Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue)); - CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue)); - CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue)); - CHECK(Zcs.Get(CustomNamespace, Bucket, Key2, GetValue)); - } -} - -TEST_CASE("z$.drop.bucket") -{ - using namespace testutils; - - const auto CreateCacheValue = [](size_t Size) -> CbObject { - std::vector<uint8_t> Buf; - Buf.resize(Size); - - CbObjectWriter Writer; - Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); - return Writer.Save(); - }; - - ScopedTemporaryDirectory TempDir; - CreateDirectories(TempDir.Path()); - - IoHash Key1; - IoHash Key2; - - auto PutValue = - [&CreateCacheValue](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) { - // Create a cache record - IoHash Key = CreateKey(KeyIndex); - CbObject CacheValue = CreateCacheValue(Size); - - IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); - Buffer.SetContentType(ZenContentType::kCbObject); - - ZenCacheValue PutValue = {.Value = Buffer}; - Zcs.Put(Namespace, Bucket, Key, PutValue); - return Key; - }; - auto GetValue = [](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) { - ZenCacheValue GetValue; - Zcs.Get(Namespace, Bucket, Key, GetValue); - return GetValue; - }; - WorkerThreadPool Workers(1); - { - GcManager Gc; - ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}); - const auto Bucket = "teardrinker"sv; - const auto Namespace = "mynamespace"sv; - - Key1 = PutValue(Zcs, Namespace, Bucket, 42, 4096); - Key2 = PutValue(Zcs, Namespace, Bucket, 43, 2048); - - ZenCacheValue Value1 = GetValue(Zcs, Namespace, Bucket, Key1); - CHECK(Value1.Value); - - std::atomic_bool WorkComplete = false; - Workers.ScheduleWork([&]() { - zen::Sleep(100); - Value1.Value = IoBuffer{}; - WorkComplete = true; - }); - // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket - // Our DropBucket execution blocks any incoming request from completing until we are done with the drop - CHECK(Zcs.DropBucket(Namespace, Bucket)); - while (!WorkComplete) - { - zen::Sleep(1); - } - - // Entire bucket should be dropped, but doing a request should will re-create the namespace but it must still be empty - Value1 = GetValue(Zcs, Namespace, Bucket, Key1); - CHECK(!Value1.Value); - ZenCacheValue Value2 = GetValue(Zcs, Namespace, Bucket, Key2); - CHECK(!Value2.Value); - } -} - -TEST_CASE("z$.drop.namespace") -{ - using namespace testutils; - - const auto CreateCacheValue = [](size_t Size) -> CbObject { - std::vector<uint8_t> Buf; - Buf.resize(Size); - - CbObjectWriter Writer; - Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); - return Writer.Save(); - }; - - ScopedTemporaryDirectory TempDir; - CreateDirectories(TempDir.Path()); - - auto PutValue = - [&CreateCacheValue](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) { - // Create a cache record - IoHash Key = CreateKey(KeyIndex); - CbObject CacheValue = CreateCacheValue(Size); - - IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); - Buffer.SetContentType(ZenContentType::kCbObject); - - ZenCacheValue PutValue = {.Value = Buffer}; - Zcs.Put(Namespace, Bucket, Key, PutValue); - return Key; - }; - auto GetValue = [](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) { - ZenCacheValue GetValue; - Zcs.Get(Namespace, Bucket, Key, GetValue); - return GetValue; - }; - WorkerThreadPool Workers(1); - { - GcManager Gc; - ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}); - const auto Bucket1 = "teardrinker1"sv; - const auto Bucket2 = "teardrinker2"sv; - const auto Namespace1 = "mynamespace1"sv; - const auto Namespace2 = "mynamespace2"sv; - - IoHash Key1 = PutValue(Zcs, Namespace1, Bucket1, 42, 4096); - IoHash Key2 = PutValue(Zcs, Namespace1, Bucket2, 43, 2048); - IoHash Key3 = PutValue(Zcs, Namespace2, Bucket1, 44, 4096); - IoHash Key4 = PutValue(Zcs, Namespace2, Bucket2, 45, 2048); - - ZenCacheValue Value1 = GetValue(Zcs, Namespace1, Bucket1, Key1); - CHECK(Value1.Value); - ZenCacheValue Value2 = GetValue(Zcs, Namespace1, Bucket2, Key2); - CHECK(Value2.Value); - ZenCacheValue Value3 = GetValue(Zcs, Namespace2, Bucket1, Key3); - CHECK(Value3.Value); - ZenCacheValue Value4 = GetValue(Zcs, Namespace2, Bucket2, Key4); - CHECK(Value4.Value); - - std::atomic_bool WorkComplete = false; - Workers.ScheduleWork([&]() { - zen::Sleep(100); - Value1.Value = IoBuffer{}; - Value2.Value = IoBuffer{}; - Value3.Value = IoBuffer{}; - Value4.Value = IoBuffer{}; - WorkComplete = true; - }); - // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket - // Our DropBucket execution blocks any incoming request from completing until we are done with the drop - CHECK(Zcs.DropNamespace(Namespace1)); - while (!WorkComplete) - { - zen::Sleep(1); - } - - // Entire namespace should be dropped, but doing a request should will re-create the namespace but it must still be empty - Value1 = GetValue(Zcs, Namespace1, Bucket1, Key1); - CHECK(!Value1.Value); - Value2 = GetValue(Zcs, Namespace1, Bucket2, Key2); - CHECK(!Value2.Value); - Value3 = GetValue(Zcs, Namespace2, Bucket1, Key3); - CHECK(Value3.Value); - Value4 = GetValue(Zcs, Namespace2, Bucket2, Key4); - CHECK(Value4.Value); - } -} - -TEST_CASE("z$.blocked.disklayer.put") -{ - ScopedTemporaryDirectory TempDir; - - GcStorageSize CacheSize; - - const auto CreateCacheValue = [](size_t Size) -> CbObject { - std::vector<uint8_t> Buf; - Buf.resize(Size, Size & 0xff); - - CbObjectWriter Writer; - Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); - return Writer.Save(); - }; - - GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); - - CbObject CacheValue = CreateCacheValue(64 * 1024 + 64); - - IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); - Buffer.SetContentType(ZenContentType::kCbObject); - - size_t Key = Buffer.Size(); - IoHash HashKey = IoHash::HashBuffer(&Key, sizeof(uint32_t)); - Zcs.Put("test_bucket", HashKey, {.Value = Buffer}); - - ZenCacheValue BufferGet; - CHECK(Zcs.Get("test_bucket", HashKey, BufferGet)); - - CbObject CacheValue2 = CreateCacheValue(64 * 1024 + 64 + 1); - IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer(); - Buffer2.SetContentType(ZenContentType::kCbObject); - - // We should be able to overwrite even if the file is open for read - Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}); - - MemoryView OldView = BufferGet.Value.GetView(); - - ZenCacheValue BufferGet2; - CHECK(Zcs.Get("test_bucket", HashKey, BufferGet2)); - MemoryView NewView = BufferGet2.Value.GetView(); - - // Make sure file openend for read before we wrote it still have old data - CHECK(OldView.GetSize() == Buffer.GetSize()); - CHECK(memcmp(OldView.GetData(), Buffer.GetData(), OldView.GetSize()) == 0); - - // Make sure we get the new data when reading after we write new data - CHECK(NewView.GetSize() == Buffer2.GetSize()); - CHECK(memcmp(NewView.GetData(), Buffer2.GetData(), NewView.GetSize()) == 0); -} - -TEST_CASE("z$.scrub") -{ - ScopedTemporaryDirectory TempDir; - - using namespace testutils; - - struct CacheRecord - { - IoBuffer Record; - std::vector<CompressedBuffer> Attachments; - }; - - auto CreateCacheRecord = [](bool Structured, std::string_view Bucket, const IoHash& Key, const std::vector<size_t>& AttachmentSizes) { - CacheRecord Result; - if (Structured) - { - Result.Attachments.resize(AttachmentSizes.size()); - CbObjectWriter Record; - Record.BeginObject("Key"sv); - { - Record << "Bucket"sv << Bucket; - Record << "Hash"sv << Key; - } - Record.EndObject(); - for (size_t Index = 0; Index < AttachmentSizes.size(); Index++) - { - IoBuffer AttachmentData = CreateBinaryCacheValue(AttachmentSizes[Index]); - CompressedBuffer CompressedAttachmentData = CompressedBuffer::Compress(SharedBuffer(AttachmentData)); - Record.AddBinaryAttachment(fmt::format("attachment-{}", Index), CompressedAttachmentData.DecodeRawHash()); - Result.Attachments[Index] = CompressedAttachmentData; - } - Result.Record = Record.Save().GetBuffer().AsIoBuffer(); - Result.Record.SetContentType(ZenContentType::kCbObject); - } - else - { - std::string RecordData = fmt::format("{}:{}", Bucket, Key.ToHexString()); - size_t TotalSize = RecordData.length() + 1; - for (size_t AttachmentSize : AttachmentSizes) - { - TotalSize += AttachmentSize; - } - Result.Record = IoBuffer(TotalSize); - char* DataPtr = (char*)Result.Record.MutableData(); - memcpy(DataPtr, RecordData.c_str(), RecordData.length() + 1); - DataPtr += RecordData.length() + 1; - for (size_t AttachmentSize : AttachmentSizes) - { - IoBuffer AttachmentData = CreateBinaryCacheValue(AttachmentSize); - memcpy(DataPtr, AttachmentData.GetData(), AttachmentData.GetSize()); - DataPtr += AttachmentData.GetSize(); - } - } - return Result; - }; - - GcManager Gc; - CidStore CidStore(Gc); - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); - CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; - CidStore.Initialize(CidConfig); - - auto CreateRecords = - [&](bool IsStructured, std::string_view BucketName, const std::vector<IoHash>& Cids, const std::vector<size_t>& AttachmentSizes) { - for (const IoHash& Cid : Cids) - { - CacheRecord Record = CreateCacheRecord(IsStructured, BucketName, Cid, AttachmentSizes); - Zcs.Put("mybucket", Cid, {.Value = Record.Record}); - for (const CompressedBuffer& Attachment : Record.Attachments) - { - CidStore.AddChunk(Attachment.GetCompressed().Flatten().AsIoBuffer(), Attachment.DecodeRawHash()); - } - } - }; - - std::vector<size_t> AttachmentSizes = {16, 1000, 2000, 4000, 8000, 64000, 80000}; - - std::vector<IoHash> UnstructuredCids{CreateKey(4), CreateKey(5), CreateKey(6)}; - CreateRecords(false, "mybucket"sv, UnstructuredCids, AttachmentSizes); - - std::vector<IoHash> StructuredCids{CreateKey(1), CreateKey(2), CreateKey(3)}; - CreateRecords(true, "mybucket"sv, StructuredCids, AttachmentSizes); - - ScrubContext ScrubCtx; - Zcs.Scrub(ScrubCtx); - CidStore.Scrub(ScrubCtx); - CHECK(ScrubCtx.ScrubbedChunks() == (StructuredCids.size() + StructuredCids.size() * AttachmentSizes.size()) + UnstructuredCids.size()); - CHECK(ScrubCtx.BadCids().GetSize() == 0); -} - -#endif - -void -z$_forcelink() -{ -} - -} // namespace zen diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h deleted file mode 100644 index 3fb4f035d..000000000 --- a/zenserver/cache/structuredcachestore.h +++ /dev/null @@ -1,535 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/compactbinary.h> -#include <zencore/iobuffer.h> -#include <zencore/iohash.h> -#include <zencore/thread.h> -#include <zencore/uid.h> -#include <zenstore/blockstore.h> -#include <zenstore/caslog.h> -#include <zenstore/gc.h> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <tsl/robin_map.h> -ZEN_THIRD_PARTY_INCLUDES_END - -#include <atomic> -#include <compare> -#include <filesystem> -#include <unordered_map> - -#define ZEN_USE_CACHE_TRACKER 0 - -namespace zen { - -class PathBuilderBase; -class GcManager; -class ZenCacheTracker; -class ScrubContext; - -/****************************************************************************** - - /$$$$$$$$ /$$$$$$ /$$ - |_____ $$ /$$__ $$ | $$ - /$$/ /$$$$$$ /$$$$$$$ | $$ \__/ /$$$$$$ /$$$$$$| $$$$$$$ /$$$$$$ - /$$/ /$$__ $| $$__ $$ | $$ |____ $$/$$_____| $$__ $$/$$__ $$ - /$$/ | $$$$$$$| $$ \ $$ | $$ /$$$$$$| $$ | $$ \ $| $$$$$$$$ - /$$/ | $$_____| $$ | $$ | $$ $$/$$__ $| $$ | $$ | $| $$_____/ - /$$$$$$$| $$$$$$| $$ | $$ | $$$$$$| $$$$$$| $$$$$$| $$ | $| $$$$$$$ - |________/\_______|__/ |__/ \______/ \_______/\_______|__/ |__/\_______/ - - Cache store for UE5. Restricts keys to "{bucket}/{hash}" pairs where the hash - is 40 (hex) chars in size. Values may be opaque blobs or structured objects - which can in turn contain references to other objects (or blobs). - -******************************************************************************/ - -namespace access_tracking { - - struct KeyAccessTime - { - IoHash Key; - GcClock::Tick LastAccess{}; - }; - - struct AccessTimes - { - std::unordered_map<std::string, std::vector<KeyAccessTime>> Buckets; - }; -}; // namespace access_tracking - -struct ZenCacheValue -{ - IoBuffer Value; - uint64_t RawSize = 0; - IoHash RawHash = IoHash::Zero; -}; - -struct CacheValueDetails -{ - struct ValueDetails - { - uint64_t Size; - uint64_t RawSize; - IoHash RawHash; - GcClock::Tick LastAccess{}; - std::vector<IoHash> Attachments; - ZenContentType ContentType; - }; - - struct BucketDetails - { - std::unordered_map<IoHash, ValueDetails, IoHash::Hasher> Values; - }; - - struct NamespaceDetails - { - std::unordered_map<std::string, BucketDetails> Buckets; - }; - - std::unordered_map<std::string, NamespaceDetails> Namespaces; -}; - -////////////////////////////////////////////////////////////////////////// - -#pragma pack(push) -#pragma pack(1) - -struct DiskLocation -{ - inline DiskLocation() = default; - - inline DiskLocation(uint64_t ValueSize, uint8_t Flags) : Flags(Flags | kStandaloneFile) { Location.StandaloneSize = ValueSize; } - - inline DiskLocation(const BlockStoreLocation& Location, uint64_t PayloadAlignment, uint8_t Flags) : Flags(Flags & ~kStandaloneFile) - { - this->Location.BlockLocation = BlockStoreDiskLocation(Location, PayloadAlignment); - } - - inline BlockStoreLocation GetBlockLocation(uint64_t PayloadAlignment) const - { - ZEN_ASSERT(!(Flags & kStandaloneFile)); - return Location.BlockLocation.Get(PayloadAlignment); - } - - inline uint64_t Size() const { return (Flags & kStandaloneFile) ? Location.StandaloneSize : Location.BlockLocation.GetSize(); } - inline uint8_t IsFlagSet(uint64_t Flag) const { return Flags & Flag; } - inline uint8_t GetFlags() const { return Flags; } - inline ZenContentType GetContentType() const - { - ZenContentType ContentType = ZenContentType::kBinary; - - if (IsFlagSet(kStructured)) - { - ContentType = ZenContentType::kCbObject; - } - - if (IsFlagSet(kCompressed)) - { - ContentType = ZenContentType::kCompressedBinary; - } - - return ContentType; - } - - union - { - BlockStoreDiskLocation BlockLocation; // 10 bytes - uint64_t StandaloneSize = 0; // 8 bytes - } Location; - - static const uint8_t kStandaloneFile = 0x80u; // Stored as a separate file - static const uint8_t kStructured = 0x40u; // Serialized as compact binary - static const uint8_t kTombStone = 0x20u; // Represents a deleted key/value - static const uint8_t kCompressed = 0x10u; // Stored in compressed buffer format - - uint8_t Flags = 0; - uint8_t Reserved = 0; -}; - -struct DiskIndexEntry -{ - IoHash Key; // 20 bytes - DiskLocation Location; // 12 bytes -}; - -#pragma pack(pop) - -static_assert(sizeof(DiskIndexEntry) == 32); - -// This store the access time as seconds since epoch internally in a 32-bit value giving is a range of 136 years since epoch -struct AccessTime -{ - explicit AccessTime(GcClock::Tick Tick) noexcept : SecondsSinceEpoch(ToSeconds(Tick)) {} - AccessTime& operator=(GcClock::Tick Tick) noexcept - { - SecondsSinceEpoch.store(ToSeconds(Tick), std::memory_order_relaxed); - return *this; - } - operator GcClock::Tick() const noexcept - { - return std::chrono::duration_cast<GcClock::Duration>(std::chrono::seconds(SecondsSinceEpoch.load(std::memory_order_relaxed))) - .count(); - } - - AccessTime(AccessTime&& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {} - AccessTime(const AccessTime& Rhs) noexcept : SecondsSinceEpoch(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed)) {} - AccessTime& operator=(AccessTime&& Rhs) noexcept - { - SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed); - return *this; - } - AccessTime& operator=(const AccessTime& Rhs) noexcept - { - SecondsSinceEpoch.store(Rhs.SecondsSinceEpoch.load(std::memory_order_relaxed), std::memory_order_relaxed); - return *this; - } - -private: - static uint32_t ToSeconds(GcClock::Tick Tick) - { - return gsl::narrow<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>(GcClock::Duration(Tick)).count()); - } - std::atomic_uint32_t SecondsSinceEpoch; -}; - -/** In-memory cache storage - - Intended for small values which are frequently accessed - - This should have a better memory management policy to maintain reasonable - footprint. - */ -class ZenCacheMemoryLayer -{ -public: - struct Configuration - { - uint64_t TargetFootprintBytes = 16 * 1024 * 1024; - uint64_t ScavengeThreshold = 4 * 1024 * 1024; - }; - - struct BucketInfo - { - uint64_t EntryCount = 0; - uint64_t TotalSize = 0; - }; - - struct Info - { - Configuration Config; - std::vector<std::string> BucketNames; - uint64_t EntryCount = 0; - uint64_t TotalSize = 0; - }; - - ZenCacheMemoryLayer(); - ~ZenCacheMemoryLayer(); - - bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); - void Drop(); - bool DropBucket(std::string_view Bucket); - void Scrub(ScrubContext& Ctx); - void GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes); - void Reset(); - uint64_t TotalSize() const; - - Info GetInfo() const; - std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const; - - const Configuration& GetConfiguration() const { return m_Configuration; } - void SetConfiguration(const Configuration& NewConfig) { m_Configuration = NewConfig; } - -private: - struct CacheBucket - { -#pragma pack(push) -#pragma pack(1) - struct BucketPayload - { - IoBuffer Payload; // 8 - uint32_t RawSize; // 4 - IoHash RawHash; // 20 - }; -#pragma pack(pop) - static_assert(sizeof(BucketPayload) == 32u); - static_assert(sizeof(AccessTime) == 4u); - - mutable RwLock m_BucketLock; - std::vector<AccessTime> m_AccessTimes; - std::vector<BucketPayload> m_Payloads; - tsl::robin_map<IoHash, uint32_t> m_CacheMap; - - std::atomic_uint64_t m_TotalSize{}; - - bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(const IoHash& HashKey, const ZenCacheValue& Value); - void Drop(); - void Scrub(ScrubContext& Ctx); - void GatherAccessTimes(std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes); - inline uint64_t TotalSize() const { return m_TotalSize; } - uint64_t EntryCount() const; - }; - - mutable RwLock m_Lock; - std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets; - std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets; - Configuration m_Configuration; - - ZenCacheMemoryLayer(const ZenCacheMemoryLayer&) = delete; - ZenCacheMemoryLayer& operator=(const ZenCacheMemoryLayer&) = delete; -}; - -class ZenCacheDiskLayer -{ -public: - struct Configuration - { - std::filesystem::path RootDir; - }; - - struct BucketInfo - { - uint64_t EntryCount = 0; - uint64_t TotalSize = 0; - }; - - struct Info - { - Configuration Config; - std::vector<std::string> BucketNames; - uint64_t EntryCount = 0; - uint64_t TotalSize = 0; - }; - - explicit ZenCacheDiskLayer(const std::filesystem::path& RootDir); - ~ZenCacheDiskLayer(); - - bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); - bool Drop(); - bool DropBucket(std::string_view Bucket); - void Flush(); - void Scrub(ScrubContext& Ctx); - void GatherReferences(GcContext& GcCtx); - void CollectGarbage(GcContext& GcCtx); - void UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes); - - void DiscoverBuckets(); - uint64_t TotalSize() const; - - Info GetInfo() const; - std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const; - - CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const; - -private: - /** A cache bucket manages a single directory containing - metadata and data for that bucket - */ - struct CacheBucket - { - CacheBucket(std::string BucketName); - ~CacheBucket(); - - bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); - bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(const IoHash& HashKey, const ZenCacheValue& Value); - bool Drop(); - void Flush(); - void Scrub(ScrubContext& Ctx); - void GatherReferences(GcContext& GcCtx); - void CollectGarbage(GcContext& GcCtx); - void UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes); - - inline uint64_t TotalSize() const { return m_TotalStandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(); } - uint64_t EntryCount() const; - - CacheValueDetails::BucketDetails GetValueDetails(const std::string_view ValueFilter) const; - - private: - const uint64_t MaxBlockSize = 1ull << 30; - uint64_t m_PayloadAlignment = 1ull << 4; - - std::string m_BucketName; - std::filesystem::path m_BucketDir; - std::filesystem::path m_BlocksBasePath; - BlockStore m_BlockStore; - Oid m_BucketId; - uint64_t m_LargeObjectThreshold = 128 * 1024; - - // These files are used to manage storage of small objects for this bucket - - TCasLogFile<DiskIndexEntry> m_SlogFile; - uint64_t m_LogFlushPosition = 0; - -#pragma pack(push) -#pragma pack(1) - struct BucketPayload - { - DiskLocation Location; // 12 - uint64_t RawSize; // 8 - IoHash RawHash; // 20 - }; -#pragma pack(pop) - static_assert(sizeof(BucketPayload) == 40u); - static_assert(sizeof(AccessTime) == 4u); - - using IndexMap = tsl::robin_map<IoHash, size_t, IoHash::Hasher>; - - mutable RwLock m_IndexLock; - std::vector<AccessTime> m_AccessTimes; - std::vector<BucketPayload> m_Payloads; - IndexMap m_Index; - - std::atomic_uint64_t m_TotalStandaloneSize{}; - - void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const; - void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); - IoBuffer GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const; - void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); - IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const; - void MakeIndexSnapshot(); - uint64_t ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion); - uint64_t ReadLog(const std::filesystem::path& LogPath, uint64_t LogPosition); - void OpenLog(const bool IsNew); - void SaveManifest(); - CacheValueDetails::ValueDetails GetValueDetails(const IoHash& Key, size_t Index) const; - // These locks are here to avoid contention on file creation, therefore it's sufficient - // that we take the same lock for the same hash - // - // These locks are small and should really be spaced out so they don't share cache lines, - // but we don't currently access them at particularly high frequency so it should not be - // an issue in practice - - mutable RwLock m_ShardedLocks[256]; - inline RwLock& LockForHash(const IoHash& Hash) const { return m_ShardedLocks[Hash.Hash[19]]; } - }; - - std::filesystem::path m_RootDir; - mutable RwLock m_Lock; - std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets; // TODO: make this case insensitive - std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets; - - ZenCacheDiskLayer(const ZenCacheDiskLayer&) = delete; - ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete; -}; - -class ZenCacheNamespace final : public RefCounted, public GcStorage, public GcContributor -{ -public: - struct Configuration - { - std::filesystem::path RootDir; - uint64_t DiskLayerThreshold = 0; - }; - struct BucketInfo - { - ZenCacheDiskLayer::BucketInfo DiskLayerInfo; - ZenCacheMemoryLayer::BucketInfo MemoryLayerInfo; - }; - struct Info - { - Configuration Config; - std::vector<std::string> BucketNames; - ZenCacheDiskLayer::Info DiskLayerInfo; - ZenCacheMemoryLayer::Info MemoryLayerInfo; - }; - - ZenCacheNamespace(GcManager& Gc, const std::filesystem::path& RootDir); - ~ZenCacheNamespace(); - - bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); - bool Drop(); - bool DropBucket(std::string_view Bucket); - void Flush(); - void Scrub(ScrubContext& Ctx); - uint64_t DiskLayerThreshold() const { return m_DiskLayerSizeThreshold; } - virtual void GatherReferences(GcContext& GcCtx) override; - virtual void CollectGarbage(GcContext& GcCtx) override; - virtual GcStorageSize StorageSize() const override; - Info GetInfo() const; - std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const; - - CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const; - -private: - std::filesystem::path m_RootDir; - ZenCacheMemoryLayer m_MemLayer; - ZenCacheDiskLayer m_DiskLayer; - uint64_t m_DiskLayerSizeThreshold = 1 * 1024; - uint64_t m_LastScrubTime = 0; - -#if ZEN_USE_CACHE_TRACKER - std::unique_ptr<ZenCacheTracker> m_AccessTracker; -#endif - - ZenCacheNamespace(const ZenCacheNamespace&) = delete; - ZenCacheNamespace& operator=(const ZenCacheNamespace&) = delete; -}; - -class ZenCacheStore final -{ -public: - static constexpr std::string_view DefaultNamespace = - "!default!"; // This is intentionally not a valid namespace name and will only be used for mapping when no namespace is given - static constexpr std::string_view NamespaceDiskPrefix = "ns_"; - - struct Configuration - { - std::filesystem::path BasePath; - bool AllowAutomaticCreationOfNamespaces = false; - }; - - struct Info - { - Configuration Config; - std::vector<std::string> NamespaceNames; - uint64_t DiskEntryCount = 0; - uint64_t MemoryEntryCount = 0; - GcStorageSize StorageSize; - }; - - ZenCacheStore(GcManager& Gc, const Configuration& Configuration); - ~ZenCacheStore(); - - bool Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); - bool DropBucket(std::string_view Namespace, std::string_view Bucket); - bool DropNamespace(std::string_view Namespace); - void Flush(); - void Scrub(ScrubContext& Ctx); - - CacheValueDetails GetValueDetails(const std::string_view NamespaceFilter, - const std::string_view BucketFilter, - const std::string_view ValueFilter) const; - - GcStorageSize StorageSize() const; - // const Configuration& GetConfiguration() const { return m_Configuration; } - - Info GetInfo() const; - std::optional<ZenCacheNamespace::Info> GetNamespaceInfo(std::string_view Namespace); - std::optional<ZenCacheNamespace::BucketInfo> GetBucketInfo(std::string_view Namespace, std::string_view Bucket); - -private: - const ZenCacheNamespace* FindNamespace(std::string_view Namespace) const; - ZenCacheNamespace* GetNamespace(std::string_view Namespace); - void IterateNamespaces(const std::function<void(std::string_view Namespace, ZenCacheNamespace& Store)>& Callback) const; - - typedef std::unordered_map<std::string, std::unique_ptr<ZenCacheNamespace>> NamespaceMap; - - mutable RwLock m_NamespacesLock; - NamespaceMap m_Namespaces; - std::vector<std::unique_ptr<ZenCacheNamespace>> m_DroppedNamespaces; - - GcManager& m_Gc; - Configuration m_Configuration; -}; - -void z$_forcelink(); - -} // namespace zen |