diff options
Diffstat (limited to 'src/zenserver/cache/structuredcache.cpp')
| -rw-r--r-- | src/zenserver/cache/structuredcache.cpp | 3209 |
1 files changed, 0 insertions, 3209 deletions
diff --git a/src/zenserver/cache/structuredcache.cpp b/src/zenserver/cache/structuredcache.cpp deleted file mode 100644 index 566df73c1..000000000 --- a/src/zenserver/cache/structuredcache.cpp +++ /dev/null @@ -1,3209 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "structuredcache.h" - -#include <zencore/compactbinary.h> -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinarypackage.h> -#include <zencore/compactbinaryvalidation.h> -#include <zencore/compress.h> -#include <zencore/enumflags.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> -#include <zencore/scopeguard.h> -#include <zencore/stream.h> -#include <zencore/timer.h> -#include <zencore/trace.h> -#include <zencore/workthreadpool.h> -#include <zenhttp/httpserver.h> -#include <zenhttp/httpshared.h> -#include <zenhttp/httpstats.h> -#include <zenstore/gc.h> -#include <zenutil/cache/cache.h> -#include <zenutil/cache/rpcrecording.h> - -#include "structuredcachestore.h" -#include "upstream/jupiter.h" -#include "upstream/upstreamcache.h" -#include "upstream/zen.h" -#include "zenstore/cidstore.h" -#include "zenstore/scrubcontext.h" - -#include <algorithm> -#include <atomic> -#include <filesystem> -#include <queue> -#include <thread> - -#include <cpr/cpr.h> -#include <gsl/gsl-lite.hpp> - -#if ZEN_WITH_TESTS -# include <zencore/testing.h> -# include <zencore/testutils.h> -#endif - -namespace zen { - -using namespace std::literals; - -////////////////////////////////////////////////////////////////////////// - -CachePolicy -ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams) -{ - std::string_view PolicyText = QueryParams.GetValue("Policy"sv); - return !PolicyText.empty() ? zen::ParseCachePolicy(PolicyText) : CachePolicy::Default; -} - -CacheRecordPolicy -LoadCacheRecordPolicy(CbObjectView Object, CachePolicy DefaultPolicy = CachePolicy::Default) -{ - OptionalCacheRecordPolicy Policy = CacheRecordPolicy::Load(Object); - return Policy ? std::move(Policy).Get() : CacheRecordPolicy(DefaultPolicy); -} - -struct AttachmentCount -{ - uint32_t New = 0; - uint32_t Valid = 0; - uint32_t Invalid = 0; - uint32_t Total = 0; -}; - -struct PutRequestData -{ - std::string Namespace; - CacheKey Key; - CbObjectView RecordObject; - CacheRecordPolicy Policy; -}; - -namespace { - static constinit std::string_view HttpZCacheRPCPrefix = "$rpc"sv; - static constinit std::string_view HttpZCacheUtilStartRecording = "exec$/start-recording"sv; - static constinit std::string_view HttpZCacheUtilStopRecording = "exec$/stop-recording"sv; - static constinit std::string_view HttpZCacheUtilReplayRecording = "exec$/replay-recording"sv; - static constinit std::string_view HttpZCacheDetailsPrefix = "details$"sv; - - struct HttpRequestData - { - std::optional<std::string> Namespace; - std::optional<std::string> Bucket; - std::optional<IoHash> HashKey; - std::optional<IoHash> ValueContentId; - }; - - constinit AsciiSet ValidNamespaceNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789-_.ABCDEFGHIJKLMNOPQRSTUVWXYZ"}; - constinit AsciiSet ValidBucketNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"}; - - std::optional<std::string> GetValidNamespaceName(std::string_view Name) - { - if (Name.empty()) - { - ZEN_WARN("Namespace is invalid, empty namespace is not allowed"); - return {}; - } - - if (Name.length() > 64) - { - ZEN_WARN("Namespace '{}' is invalid, length exceeds 64 characters", Name); - return {}; - } - - if (!AsciiSet::HasOnly(Name, ValidNamespaceNameCharactersSet)) - { - ZEN_WARN("Namespace '{}' is invalid, invalid characters detected", Name); - return {}; - } - - return ToLower(Name); - } - - std::optional<std::string> GetValidBucketName(std::string_view Name) - { - if (Name.empty()) - { - ZEN_WARN("Bucket name is invalid, empty bucket name is not allowed"); - return {}; - } - - if (!AsciiSet::HasOnly(Name, ValidBucketNameCharactersSet)) - { - ZEN_WARN("Bucket name '{}' is invalid, invalid characters detected", Name); - return {}; - } - - return ToLower(Name); - } - - std::optional<IoHash> GetValidIoHash(std::string_view Hash) - { - if (Hash.length() != IoHash::StringLength) - { - return {}; - } - - IoHash KeyHash; - if (!ParseHexBytes(Hash.data(), Hash.size(), KeyHash.Hash)) - { - return {}; - } - return KeyHash; - } - - bool HttpRequestParseRelativeUri(std::string_view Key, HttpRequestData& Data) - { - std::vector<std::string_view> Tokens; - uint32_t TokenCount = ForEachStrTok(Key, '/', [&](const std::string_view& Token) { - Tokens.push_back(Token); - return true; - }); - - switch (TokenCount) - { - case 0: - return true; - case 1: - Data.Namespace = GetValidNamespaceName(Tokens[0]); - return Data.Namespace.has_value(); - case 2: - { - std::optional<IoHash> PossibleHashKey = GetValidIoHash(Tokens[1]); - if (PossibleHashKey.has_value()) - { - // Legacy bucket/key request - Data.Bucket = GetValidBucketName(Tokens[0]); - if (!Data.Bucket.has_value()) - { - return false; - } - Data.HashKey = PossibleHashKey; - Data.Namespace = ZenCacheStore::DefaultNamespace; - return true; - } - Data.Namespace = GetValidNamespaceName(Tokens[0]); - if (!Data.Namespace.has_value()) - { - return false; - } - Data.Bucket = GetValidBucketName(Tokens[1]); - if (!Data.Bucket.has_value()) - { - return false; - } - return true; - } - case 3: - { - std::optional<IoHash> PossibleHashKey = GetValidIoHash(Tokens[1]); - if (PossibleHashKey.has_value()) - { - // Legacy bucket/key/valueid request - Data.Bucket = GetValidBucketName(Tokens[0]); - if (!Data.Bucket.has_value()) - { - return false; - } - Data.HashKey = PossibleHashKey; - Data.ValueContentId = GetValidIoHash(Tokens[2]); - if (!Data.ValueContentId.has_value()) - { - return false; - } - Data.Namespace = ZenCacheStore::DefaultNamespace; - return true; - } - Data.Namespace = GetValidNamespaceName(Tokens[0]); - if (!Data.Namespace.has_value()) - { - return false; - } - Data.Bucket = GetValidBucketName(Tokens[1]); - if (!Data.Bucket.has_value()) - { - return false; - } - Data.HashKey = GetValidIoHash(Tokens[2]); - if (!Data.HashKey) - { - return false; - } - return true; - } - case 4: - { - Data.Namespace = GetValidNamespaceName(Tokens[0]); - if (!Data.Namespace.has_value()) - { - return false; - } - - Data.Bucket = GetValidBucketName(Tokens[1]); - if (!Data.Bucket.has_value()) - { - return false; - } - - Data.HashKey = GetValidIoHash(Tokens[2]); - if (!Data.HashKey.has_value()) - { - return false; - } - - Data.ValueContentId = GetValidIoHash(Tokens[3]); - if (!Data.ValueContentId.has_value()) - { - return false; - } - return true; - } - default: - return false; - } - } - - std::optional<std::string> GetRpcRequestNamespace(const CbObjectView Params) - { - CbFieldView NamespaceField = Params["Namespace"sv]; - if (!NamespaceField) - { - return std::string(ZenCacheStore::DefaultNamespace); - } - - if (NamespaceField.HasError()) - { - return {}; - } - if (!NamespaceField.IsString()) - { - return {}; - } - return GetValidNamespaceName(NamespaceField.AsString()); - } - - bool GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key) - { - CbFieldView BucketField = KeyView["Bucket"sv]; - if (BucketField.HasError()) - { - return false; - } - if (!BucketField.IsString()) - { - return false; - } - std::optional<std::string> Bucket = GetValidBucketName(BucketField.AsString()); - if (!Bucket.has_value()) - { - return false; - } - CbFieldView HashField = KeyView["Hash"sv]; - if (HashField.HasError()) - { - return false; - } - if (!HashField.IsHash()) - { - return false; - } - IoHash Hash = HashField.AsHash(); - Key = CacheKey::Create(*Bucket, Hash); - return true; - } - -} // namespace - -////////////////////////////////////////////////////////////////////////// - -HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, - CidStore& InCidStore, - HttpStatsService& StatsService, - HttpStatusService& StatusService, - UpstreamCache& UpstreamCache, - const DiskWriteBlocker* InDiskWriteBlocker) -: m_Log(logging::Get("cache")) -, m_CacheStore(InCacheStore) -, m_StatsService(StatsService) -, m_StatusService(StatusService) -, m_CidStore(InCidStore) -, m_UpstreamCache(UpstreamCache) -, m_DiskWriteBlocker(InDiskWriteBlocker) -{ - m_StatsService.RegisterHandler("z$", *this); - m_StatusService.RegisterHandler("z$", *this); -} - -HttpStructuredCacheService::~HttpStructuredCacheService() -{ - ZEN_INFO("closing structured cache"); - m_RequestRecorder.reset(); - - m_StatsService.UnregisterHandler("z$", *this); - m_StatusService.UnregisterHandler("z$", *this); -} - -const char* -HttpStructuredCacheService::BaseUri() const -{ - return "/z$/"; -} - -void -HttpStructuredCacheService::Flush() -{ - m_CacheStore.Flush(); -} - -void -HttpStructuredCacheService::ScrubStorage(ScrubContext& Ctx) -{ - if (m_LastScrubTime == Ctx.ScrubTimestamp()) - { - return; - } - - ZenCacheStore::Info Info = m_CacheStore.GetInfo(); - - ZEN_INFO("scrubbing '{}'", Info.Config.BasePath); - - m_LastScrubTime = Ctx.ScrubTimestamp(); - - m_CidStore.ScrubStorage(Ctx); - m_CacheStore.ScrubStorage(Ctx); -} - -void -HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) -{ - std::string_view Key = Request.RelativeUri(); - std::vector<std::string> Tokens; - uint32_t TokenCount = ForEachStrTok(Key, '/', [&Tokens](std::string_view Token) { - Tokens.push_back(std::string(Token)); - return true; - }); - std::string FilterNamespace; - std::string FilterBucket; - std::string FilterValue; - switch (TokenCount) - { - case 1: - break; - case 2: - { - FilterNamespace = Tokens[1]; - if (FilterNamespace.empty()) - { - return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL - } - } - break; - case 3: - { - FilterNamespace = Tokens[1]; - if (FilterNamespace.empty()) - { - return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL - } - FilterBucket = Tokens[2]; - if (FilterBucket.empty()) - { - return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL - } - } - break; - case 4: - { - FilterNamespace = Tokens[1]; - if (FilterNamespace.empty()) - { - return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL - } - FilterBucket = Tokens[2]; - if (FilterBucket.empty()) - { - return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL - } - FilterValue = Tokens[3]; - if (FilterValue.empty()) - { - return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL - } - } - break; - default: - return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL - } - - HttpServerRequest::QueryParams Params = Request.GetQueryParams(); - bool CSV = Params.GetValue("csv") == "true"; - bool Details = Params.GetValue("details") == "true"; - bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true"; - - std::chrono::seconds NowSeconds = std::chrono::duration_cast<std::chrono::seconds>(GcClock::Now().time_since_epoch()); - CacheValueDetails ValueDetails = m_CacheStore.GetValueDetails(FilterNamespace, FilterBucket, FilterValue); - - if (CSV) - { - ExtendableStringBuilder<4096> CSVWriter; - if (AttachmentDetails) - { - CSVWriter << "Namespace, Bucket, Key, Cid, Size"; - } - else if (Details) - { - CSVWriter << "Namespace, Bucket, Key, Size, RawSize, RawHash, ContentType, Age, AttachmentsCount, AttachmentsSize"; - } - else - { - CSVWriter << "Namespace, Bucket, Key"; - } - for (const auto& NamespaceIt : ValueDetails.Namespaces) - { - const std::string& Namespace = NamespaceIt.first; - for (const auto& BucketIt : NamespaceIt.second.Buckets) - { - const std::string& Bucket = BucketIt.first; - for (const auto& ValueIt : BucketIt.second.Values) - { - if (AttachmentDetails) - { - for (const IoHash& Hash : ValueIt.second.Attachments) - { - IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); - CSVWriter << "\r\n" - << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << Hash.ToHexString() - << ", " << gsl::narrow<uint64_t>(Payload.GetSize()); - } - } - else if (Details) - { - std::chrono::seconds LastAccessedSeconds = std::chrono::duration_cast<std::chrono::seconds>( - GcClock::TimePointFromTick(ValueIt.second.LastAccess).time_since_epoch()); - CSVWriter << "\r\n" - << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << ValueIt.second.Size << "," - << ValueIt.second.RawSize << "," << ValueIt.second.RawHash.ToHexString() << ", " - << ToString(ValueIt.second.ContentType) << ", " << (NowSeconds.count() - LastAccessedSeconds.count()) - << ", " << gsl::narrow<uint64_t>(ValueIt.second.Attachments.size()); - size_t AttachmentsSize = 0; - for (const IoHash& Hash : ValueIt.second.Attachments) - { - IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); - AttachmentsSize += Payload.GetSize(); - } - CSVWriter << ", " << gsl::narrow<uint64_t>(AttachmentsSize); - } - else - { - CSVWriter << "\r\n" << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString(); - } - } - } - } - return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); - } - else - { - CbObjectWriter Cbo; - Cbo.BeginArray("namespaces"); - { - for (const auto& NamespaceIt : ValueDetails.Namespaces) - { - const std::string& Namespace = NamespaceIt.first; - Cbo.BeginObject(); - { - Cbo.AddString("name", Namespace); - Cbo.BeginArray("buckets"); - { - for (const auto& BucketIt : NamespaceIt.second.Buckets) - { - const std::string& Bucket = BucketIt.first; - Cbo.BeginObject(); - { - Cbo.AddString("name", Bucket); - Cbo.BeginArray("values"); - { - for (const auto& ValueIt : BucketIt.second.Values) - { - std::chrono::seconds LastAccessedSeconds = std::chrono::duration_cast<std::chrono::seconds>( - GcClock::TimePointFromTick(ValueIt.second.LastAccess).time_since_epoch()); - Cbo.BeginObject(); - { - Cbo.AddHash("key", ValueIt.first); - if (Details) - { - Cbo.AddInteger("size", ValueIt.second.Size); - if (ValueIt.second.Size > 0 && ValueIt.second.RawSize != 0 && - ValueIt.second.RawSize != ValueIt.second.Size) - { - Cbo.AddInteger("rawsize", ValueIt.second.RawSize); - Cbo.AddHash("rawhash", ValueIt.second.RawHash); - } - Cbo.AddString("contenttype", ToString(ValueIt.second.ContentType)); - Cbo.AddInteger("age", NowSeconds.count() - LastAccessedSeconds.count()); - if (ValueIt.second.Attachments.size() > 0) - { - if (AttachmentDetails) - { - Cbo.BeginArray("attachments"); - { - for (const IoHash& Hash : ValueIt.second.Attachments) - { - Cbo.BeginObject(); - Cbo.AddHash("cid", Hash); - IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); - Cbo.AddInteger("size", gsl::narrow<uint64_t>(Payload.GetSize())); - Cbo.EndObject(); - } - } - Cbo.EndArray(); - } - else - { - Cbo.AddInteger("attachmentcount", - gsl::narrow<uint64_t>(ValueIt.second.Attachments.size())); - size_t AttachmentsSize = 0; - for (const IoHash& Hash : ValueIt.second.Attachments) - { - IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); - AttachmentsSize += Payload.GetSize(); - } - Cbo.AddInteger("attachmentssize", gsl::narrow<uint64_t>(AttachmentsSize)); - } - } - } - } - Cbo.EndObject(); - } - } - Cbo.EndArray(); - } - Cbo.EndObject(); - } - } - Cbo.EndArray(); - } - Cbo.EndObject(); - } - } - Cbo.EndArray(); - Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); - } -} - -void -HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) -{ - metrics::OperationTiming::Scope $(m_HttpRequests); - - std::string_view Key = Request.RelativeUri(); - if (Key == HttpZCacheRPCPrefix) - { - return HandleRpcRequest(Request); - } - - if (Key == HttpZCacheUtilStartRecording) - { - m_RequestRecorder.reset(); - HttpServerRequest::QueryParams Params = Request.GetQueryParams(); - std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path"))); - m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath); - Request.WriteResponse(HttpResponseCode::OK); - return; - } - if (Key == HttpZCacheUtilStopRecording) - { - m_RequestRecorder.reset(); - Request.WriteResponse(HttpResponseCode::OK); - return; - } - if (Key == HttpZCacheUtilReplayRecording) - { - m_RequestRecorder.reset(); - HttpServerRequest::QueryParams Params = Request.GetQueryParams(); - std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path"))); - uint32_t ThreadCount = std::thread::hardware_concurrency(); - if (auto Param = Params.GetValue("thread_count"); Param.empty() == false) - { - if (auto Value = ParseInt<uint64_t>(Param)) - { - ThreadCount = gsl::narrow<uint32_t>(Value.value()); - } - } - std::unique_ptr<cache::IRpcRequestReplayer> Replayer(cache::MakeDiskRequestReplayer(RecordPath, false)); - ReplayRequestRecorder(*Replayer, ThreadCount < 1 ? 1 : ThreadCount); - Request.WriteResponse(HttpResponseCode::OK); - return; - } - if (Key.starts_with(HttpZCacheDetailsPrefix)) - { - HandleDetailsRequest(Request); - return; - } - - HttpRequestData RequestData; - if (!HttpRequestParseRelativeUri(Key, RequestData)) - { - return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL - } - - if (RequestData.ValueContentId.has_value()) - { - ZEN_ASSERT(RequestData.Namespace.has_value()); - ZEN_ASSERT(RequestData.Bucket.has_value()); - ZEN_ASSERT(RequestData.HashKey.has_value()); - CacheRef Ref = {.Namespace = RequestData.Namespace.value(), - .BucketSegment = RequestData.Bucket.value(), - .HashKey = RequestData.HashKey.value(), - .ValueContentId = RequestData.ValueContentId.value()}; - return HandleCacheChunkRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams())); - } - - if (RequestData.HashKey.has_value()) - { - ZEN_ASSERT(RequestData.Namespace.has_value()); - ZEN_ASSERT(RequestData.Bucket.has_value()); - CacheRef Ref = {.Namespace = RequestData.Namespace.value(), - .BucketSegment = RequestData.Bucket.value(), - .HashKey = RequestData.HashKey.value(), - .ValueContentId = IoHash::Zero}; - return HandleCacheRecordRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams())); - } - - if (RequestData.Bucket.has_value()) - { - ZEN_ASSERT(RequestData.Namespace.has_value()); - return HandleCacheBucketRequest(Request, RequestData.Namespace.value(), RequestData.Bucket.value()); - } - - if (RequestData.Namespace.has_value()) - { - return HandleCacheNamespaceRequest(Request, RequestData.Namespace.value()); - } - return HandleCacheRequest(Request); -} - -void -HttpStructuredCacheService::HandleCacheRequest(HttpServerRequest& Request) -{ - switch (Request.RequestVerb()) - { - case HttpVerb::kHead: - case HttpVerb::kGet: - { - ZenCacheStore::Info Info = m_CacheStore.GetInfo(); - - CbObjectWriter ResponseWriter; - - ResponseWriter.BeginObject("Configuration"); - { - ExtendableStringBuilder<128> BasePathString; - BasePathString << Info.Config.BasePath.u8string(); - ResponseWriter.AddString("BasePath"sv, BasePathString.ToView()); - ResponseWriter.AddBool("AllowAutomaticCreationOfNamespaces", Info.Config.AllowAutomaticCreationOfNamespaces); - } - ResponseWriter.EndObject(); - - std::sort(begin(Info.NamespaceNames), end(Info.NamespaceNames), [](std::string_view L, std::string_view R) { - return L.compare(R) < 0; - }); - ResponseWriter.BeginArray("Namespaces"); - for (const std::string& NamespaceName : Info.NamespaceNames) - { - ResponseWriter.AddString(NamespaceName); - } - ResponseWriter.EndArray(); - ResponseWriter.BeginObject("StorageSize"); - { - ResponseWriter.AddInteger("DiskSize", Info.StorageSize.DiskSize); - ResponseWriter.AddInteger("MemorySize", Info.StorageSize.MemorySize); - } - - ResponseWriter.EndObject(); - - ResponseWriter.AddInteger("DiskEntryCount", Info.DiskEntryCount); - ResponseWriter.AddInteger("MemoryEntryCount", Info.MemoryEntryCount); - - return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); - } - break; - } -} - -void -HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view NamespaceName) -{ - switch (Request.RequestVerb()) - { - case HttpVerb::kHead: - case HttpVerb::kGet: - { - std::optional<ZenCacheNamespace::Info> Info = m_CacheStore.GetNamespaceInfo(NamespaceName); - if (!Info.has_value()) - { - return Request.WriteResponse(HttpResponseCode::NotFound); - } - - CbObjectWriter ResponseWriter; - - ResponseWriter.BeginObject("Configuration"); - { - ExtendableStringBuilder<128> BasePathString; - BasePathString << Info->Config.RootDir.u8string(); - ResponseWriter.AddString("RootDir"sv, BasePathString.ToView()); - ResponseWriter.AddInteger("DiskLayerThreshold"sv, Info->Config.DiskLayerThreshold); - } - ResponseWriter.EndObject(); - - std::sort(begin(Info->BucketNames), end(Info->BucketNames), [](std::string_view L, std::string_view R) { - return L.compare(R) < 0; - }); - - ResponseWriter.BeginArray("Buckets"sv); - for (const std::string& BucketName : Info->BucketNames) - { - ResponseWriter.AddString(BucketName); - } - ResponseWriter.EndArray(); - - ResponseWriter.BeginObject("StorageSize"sv); - { - ResponseWriter.AddInteger("DiskSize"sv, Info->DiskLayerInfo.TotalSize); - ResponseWriter.AddInteger("MemorySize"sv, Info->MemoryLayerInfo.TotalSize); - } - ResponseWriter.EndObject(); - - ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount); - ResponseWriter.AddInteger("MemoryEntryCount", Info->MemoryLayerInfo.EntryCount); - - return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); - } - break; - - case HttpVerb::kDelete: - // Drop namespace - { - if (m_CacheStore.DropNamespace(NamespaceName)) - { - return Request.WriteResponse(HttpResponseCode::OK); - } - else - { - return Request.WriteResponse(HttpResponseCode::NotFound); - } - } - break; - - default: - break; - } -} - -void -HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, - std::string_view NamespaceName, - std::string_view BucketName) -{ - switch (Request.RequestVerb()) - { - case HttpVerb::kHead: - case HttpVerb::kGet: - { - std::optional<ZenCacheNamespace::BucketInfo> Info = m_CacheStore.GetBucketInfo(NamespaceName, BucketName); - if (!Info.has_value()) - { - return Request.WriteResponse(HttpResponseCode::NotFound); - } - - CbObjectWriter ResponseWriter; - - ResponseWriter.BeginObject("StorageSize"); - { - ResponseWriter.AddInteger("DiskSize", Info->DiskLayerInfo.TotalSize); - ResponseWriter.AddInteger("MemorySize", Info->MemoryLayerInfo.TotalSize); - } - ResponseWriter.EndObject(); - - ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount); - ResponseWriter.AddInteger("MemoryEntryCount", Info->MemoryLayerInfo.EntryCount); - - return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); - } - break; - - case HttpVerb::kDelete: - // Drop bucket - { - if (m_CacheStore.DropBucket(NamespaceName, BucketName)) - { - return Request.WriteResponse(HttpResponseCode::OK); - } - else - { - return Request.WriteResponse(HttpResponseCode::NotFound); - } - } - break; - - default: - break; - } -} - -void -HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) -{ - switch (Request.RequestVerb()) - { - case HttpVerb::kHead: - case HttpVerb::kGet: - { - HandleGetCacheRecord(Request, Ref, PolicyFromUrl); - } - break; - - case HttpVerb::kPut: - HandlePutCacheRecord(Request, Ref, PolicyFromUrl); - break; - default: - break; - } -} - -void -HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) -{ - const ZenContentType AcceptType = Request.AcceptContentType(); - const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData); - const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord); - - bool Success = false; - ZenCacheValue ClientResultValue; - if (!EnumHasAnyFlags(PolicyFromUrl, CachePolicy::Query)) - { - return Request.WriteResponse(HttpResponseCode::OK); - } - - Stopwatch Timer; - - if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal) && - m_CacheStore.Get(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue)) - { - Success = true; - ZenContentType ContentType = ClientResultValue.Value.GetContentType(); - - if (AcceptType == ZenContentType::kCbPackage) - { - if (ContentType == ZenContentType::kCbObject) - { - CbPackage Package; - uint32_t MissingCount = 0; - - CbObjectView CacheRecord(ClientResultValue.Value.Data()); - CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) { - if (SkipData) - { - if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) - { - MissingCount++; - } - } - else - { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) - { - CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); - Package.AddAttachment(CbAttachment(Compressed, AttachmentHash.AsHash())); - } - else - { - MissingCount++; - } - } - }); - - Success = MissingCount == 0 || PartialRecord; - - if (Success) - { - Package.SetObject(LoadCompactBinaryObject(ClientResultValue.Value)); - - BinaryWriter MemStream; - Package.Save(MemStream); - - ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - ClientResultValue.Value.SetContentType(HttpContentType::kCbPackage); - } - } - else - { - Success = false; - } - } - else if (AcceptType != ClientResultValue.Value.GetContentType() && AcceptType != ZenContentType::kUnknownContentType && - AcceptType != ZenContentType::kBinary) - { - Success = false; - } - } - - if (Success) - { - ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {} '{}' (LOCAL) in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(ClientResultValue.Value.Size()), - ToString(ClientResultValue.Value.GetContentType()), - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - - m_CacheStats.HitCount++; - if (SkipData && AcceptType != ZenContentType::kCbPackage && AcceptType != ZenContentType::kCbObject) - { - return Request.WriteResponse(HttpResponseCode::OK); - } - else - { - // kCbPackage handled SkipData when constructing the ClientResultValue, kcbObject ignores SkipData - return Request.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); - } - } - else if (!EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryRemote)) - { - ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(AcceptType), - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - m_CacheStats.MissCount++; - return Request.WriteResponse(HttpResponseCode::NotFound); - } - - // Issue upstream query asynchronously in order to keep requests flowing without - // hogging I/O servicing threads with blocking work - - uint64_t LocalElapsedTimeUs = Timer.GetElapsedTimeUs(); - - Request.WriteResponseAsync([this, AcceptType, PolicyFromUrl, Ref, LocalElapsedTimeUs](HttpServerRequest& AsyncRequest) { - Stopwatch Timer; - bool Success = false; - const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord); - const bool QueryLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); - const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); - const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData); - ZenCacheValue ClientResultValue; - - metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming); - - if (GetUpstreamCacheSingleResult UpstreamResult = - m_UpstreamCache.GetCacheRecord(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, AcceptType); - UpstreamResult.Status.Success) - { - Success = true; - - ClientResultValue.Value = UpstreamResult.Value; - ClientResultValue.Value.SetContentType(AcceptType); - - if (AcceptType == ZenContentType::kBinary || AcceptType == ZenContentType::kCbObject) - { - if (AcceptType == ZenContentType::kCbObject) - { - const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All); - if (ValidationResult != CbValidateError::None) - { - Success = false; - ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid compact binary object from upstream", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(AcceptType)); - } - - // We do not do anything to the returned object for SkipData, only package attachments are cut when skipping data - } - - if (Success && StoreLocal) - { - m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue); - } - } - else if (AcceptType == ZenContentType::kCbPackage) - { - CbPackage Package; - if (Package.TryLoad(ClientResultValue.Value)) - { - CbObject CacheRecord = Package.GetObject(); - AttachmentCount Count; - size_t NumAttachments = Package.GetAttachments().size(); - std::vector<const CbAttachment*> AttachmentsToStoreLocally; - AttachmentsToStoreLocally.reserve(NumAttachments); - - CacheRecord.IterateAttachments( - [this, &Package, &Ref, &AttachmentsToStoreLocally, &Count, QueryLocal, StoreLocal, SkipData](CbFieldView HashView) { - IoHash Hash = HashView.AsHash(); - if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) - { - if (Attachment->IsCompressedBinary()) - { - if (StoreLocal) - { - AttachmentsToStoreLocally.emplace_back(Attachment); - } - Count.Valid++; - } - else - { - ZEN_WARN("Uncompressed value '{}' from upstream cache record '{}/{}'", - Hash, - Ref.BucketSegment, - Ref.HashKey); - Count.Invalid++; - } - } - else if (QueryLocal) - { - if (SkipData) - { - if (m_CidStore.ContainsChunk(Hash)) - { - Count.Valid++; - } - } - else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash)) - { - CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); - if (Compressed) - { - Package.AddAttachment(CbAttachment(Compressed, Hash)); - Count.Valid++; - } - else - { - ZEN_WARN("Uncompressed value '{}' stored in local cache '{}/{}'", - Hash, - Ref.BucketSegment, - Ref.HashKey); - Count.Invalid++; - } - } - } - Count.Total++; - }); - - if ((Count.Valid == Count.Total) || PartialRecord) - { - ZenCacheValue CacheValue; - CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); - CacheValue.Value.SetContentType(ZenContentType::kCbObject); - - if (StoreLocal) - { - m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue); - } - - for (const CbAttachment* Attachment : AttachmentsToStoreLocally) - { - ZEN_ASSERT_SLOW(StoreLocal); - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = - m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); - if (InsertResult.New) - { - Count.New++; - } - } - - BinaryWriter MemStream; - if (SkipData) - { - // Save a package containing only the object. - CbPackage(Package.GetObject()).Save(MemStream); - } - else - { - Package.Save(MemStream); - } - - ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - ClientResultValue.Value.SetContentType(ZenContentType::kCbPackage); - } - else - { - Success = false; - ZEN_WARN("Get - '{}/{}' '{}' FAILED, attachments missing in upstream package", - Ref.BucketSegment, - Ref.HashKey, - ToString(AcceptType)); - } - } - else - { - Success = false; - ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid upstream package", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(AcceptType)); - } - } - } - - if (Success) - { - ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {} '{}' (UPSTREAM) in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(ClientResultValue.Value.Size()), - ToString(ClientResultValue.Value.GetContentType()), - NiceLatencyNs((LocalElapsedTimeUs + Timer.GetElapsedTimeUs()) * 1000)); - - m_CacheStats.HitCount++; - m_CacheStats.UpstreamHitCount++; - - if (SkipData && AcceptType == ZenContentType::kBinary) - { - AsyncRequest.WriteResponse(HttpResponseCode::OK); - } - else - { - // Other methods modify ClientResultValue to a version that has skipped the data but keeps the Object and optionally - // metadata. - AsyncRequest.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); - } - } - else - { - ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(AcceptType), - NiceLatencyNs((LocalElapsedTimeUs + Timer.GetElapsedTimeUs()) * 1000)); - m_CacheStats.MissCount++; - AsyncRequest.WriteResponse(HttpResponseCode::NotFound); - } - }); -} - -void -HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) -{ - IoBuffer Body = Request.ReadPayload(); - - if (!Body || Body.Size() == 0) - { - return Request.WriteResponse(HttpResponseCode::BadRequest); - } - if (!AreDiskWritesAllowed()) - { - return Request.WriteResponse(HttpResponseCode::InsufficientStorage); - } - - const HttpContentType ContentType = Request.RequestContentType(); - - Body.SetContentType(ContentType); - - Stopwatch Timer; - - if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kCompressedBinary) - { - IoHash RawHash = IoHash::Zero; - uint64_t RawSize = Body.GetSize(); - if (ContentType == HttpContentType::kCompressedBinary) - { - if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize)) - { - return Request.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "Payload is not a valid compressed binary"sv); - } - } - else - { - RawHash = IoHash::HashBuffer(SharedBuffer(Body)); - } - m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}); - - if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote)) - { - m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}}); - } - - ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(Body.Size()), - ToString(ContentType), - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - Request.WriteResponse(HttpResponseCode::Created); - } - else if (ContentType == HttpContentType::kCbObject) - { - const CbValidateError ValidationResult = ValidateCompactBinary(MemoryView(Body.GetData(), Body.GetSize()), CbValidateMode::All); - - if (ValidationResult != CbValidateError::None) - { - ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, invalid compact binary", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(ContentType)); - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv); - } - - Body.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}); - - CbObjectView CacheRecord(Body.Data()); - std::vector<IoHash> ValidAttachments; - int32_t TotalCount = 0; - - CacheRecord.IterateAttachments([this, &TotalCount, &ValidAttachments](CbFieldView AttachmentHash) { - const IoHash Hash = AttachmentHash.AsHash(); - if (m_CidStore.ContainsChunk(Hash)) - { - ValidAttachments.emplace_back(Hash); - } - TotalCount++; - }); - - ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total) in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(Body.Size()), - ToString(ContentType), - TotalCount, - ValidAttachments.size(), - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - - const bool IsPartialRecord = TotalCount != static_cast<int32_t>(ValidAttachments.size()); - - CachePolicy Policy = PolicyFromUrl; - if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) - { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject, - .Namespace = Ref.Namespace, - .Key = {Ref.BucketSegment, Ref.HashKey}, - .ValueContentIds = std::move(ValidAttachments)}); - } - - Request.WriteResponse(HttpResponseCode::Created); - } - else if (ContentType == HttpContentType::kCbPackage) - { - CbPackage Package; - - if (!Package.TryLoad(Body)) - { - ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, invalid package", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(ContentType)); - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv); - } - CachePolicy Policy = PolicyFromUrl; - - CbObject CacheRecord = Package.GetObject(); - - AttachmentCount Count; - size_t NumAttachments = Package.GetAttachments().size(); - std::vector<IoHash> ValidAttachments; - std::vector<const CbAttachment*> AttachmentsToStoreLocally; - ValidAttachments.reserve(NumAttachments); - AttachmentsToStoreLocally.reserve(NumAttachments); - - CacheRecord.IterateAttachments([this, &Ref, &Package, &AttachmentsToStoreLocally, &ValidAttachments, &Count](CbFieldView HashView) { - const IoHash Hash = HashView.AsHash(); - if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) - { - if (Attachment->IsCompressedBinary()) - { - AttachmentsToStoreLocally.emplace_back(Attachment); - ValidAttachments.emplace_back(Hash); - Count.Valid++; - } - else - { - ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(HttpContentType::kCbPackage), - Hash); - Count.Invalid++; - } - } - else if (m_CidStore.ContainsChunk(Hash)) - { - ValidAttachments.emplace_back(Hash); - Count.Valid++; - } - Count.Total++; - }); - - if (Count.Invalid > 0) - { - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv); - } - - ZenCacheValue CacheValue; - CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); - CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue); - - for (const CbAttachment* Attachment : AttachmentsToStoreLocally) - { - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); - if (InsertResult.New) - { - Count.New++; - } - } - - ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total) in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(Body.GetSize()), - ToString(ContentType), - Count.New, - Count.Valid, - Count.Total, - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - - const bool IsPartialRecord = Count.Valid != Count.Total; - - if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) - { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, - .Namespace = Ref.Namespace, - .Key = {Ref.BucketSegment, Ref.HashKey}, - .ValueContentIds = std::move(ValidAttachments)}); - } - - Request.WriteResponse(HttpResponseCode::Created); - } - else - { - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Content-Type invalid"sv); - } -} - -void -HttpStructuredCacheService::HandleCacheChunkRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) -{ - switch (Request.RequestVerb()) - { - case HttpVerb::kHead: - case HttpVerb::kGet: - HandleGetCacheChunk(Request, Ref, PolicyFromUrl); - break; - case HttpVerb::kPut: - HandlePutCacheChunk(Request, Ref, PolicyFromUrl); - break; - default: - break; - } -} - -void -HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) -{ - Stopwatch Timer; - - IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId); - const UpstreamEndpointInfo* Source = nullptr; - CachePolicy Policy = PolicyFromUrl; - { - const bool QueryUpstream = !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote); - - if (QueryUpstream) - { - if (GetUpstreamCacheSingleResult UpstreamResult = - m_UpstreamCache.GetCacheChunk(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); - UpstreamResult.Status.Success) - { - IoHash RawHash; - uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(UpstreamResult.Value, RawHash, RawSize)) - { - if (RawHash == Ref.ValueContentId) - { - if (AreDiskWritesAllowed()) - { - m_CidStore.AddChunk(UpstreamResult.Value, RawHash); - } - Source = UpstreamResult.Source; - } - else - { - ZEN_WARN("got missmatching upstream cache value"); - } - } - else - { - ZEN_WARN("got uncompressed upstream cache value"); - } - } - } - } - - if (!Value) - { - ZEN_DEBUG("GETCACHECHUNK MISS - '{}/{}/{}/{}' '{}' in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - Ref.ValueContentId, - ToString(Request.AcceptContentType()), - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - m_CacheStats.MissCount++; - return Request.WriteResponse(HttpResponseCode::NotFound); - } - - ZEN_DEBUG("GETCACHECHUNK HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - Ref.ValueContentId, - NiceBytes(Value.Size()), - ToString(Value.GetContentType()), - Source ? Source->Url : "LOCAL"sv, - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - - m_CacheStats.HitCount++; - if (Source) - { - m_CacheStats.UpstreamHitCount++; - } - - if (EnumHasAllFlags(Policy, CachePolicy::SkipData)) - { - Request.WriteResponse(HttpResponseCode::OK); - } - else - { - Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value); - } -} - -void -HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) -{ - // Note: Individual cacherecord values are not propagated upstream until a valid cache record has been stored - ZEN_UNUSED(PolicyFromUrl); - - Stopwatch Timer; - - IoBuffer Body = Request.ReadPayload(); - - if (!Body || Body.Size() == 0) - { - return Request.WriteResponse(HttpResponseCode::BadRequest); - } - if (!AreDiskWritesAllowed()) - { - return Request.WriteResponse(HttpResponseCode::InsufficientStorage); - } - - Body.SetContentType(Request.RequestContentType()); - - IoHash RawHash; - uint64_t RawSize; - if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize)) - { - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv); - } - - if (RawHash != Ref.ValueContentId) - { - return Request.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "ValueContentId does not match attachment hash"sv); - } - - CidStore::InsertResult Result = m_CidStore.AddChunk(Body, RawHash); - - ZEN_DEBUG("PUTCACHECHUNK - '{}/{}/{}/{}' {} '{}' ({}) in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - Ref.ValueContentId, - NiceBytes(Body.Size()), - ToString(Body.GetContentType()), - Result.New ? "NEW" : "OLD", - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - - const HttpResponseCode ResponseCode = Result.New ? HttpResponseCode::Created : HttpResponseCode::OK; - - Request.WriteResponse(ResponseCode); -} - -HttpResponseCode -HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType, - IoBuffer&& Body, - uint32_t& OutAcceptMagic, - RpcAcceptOptions& OutAcceptFlags, - int& OutTargetProcessId, - CbPackage& OutResultPackage) -{ - CbPackage Package; - CbObjectView Object; - CbObject ObjectBuffer; - if (ContentType == ZenContentType::kCbObject) - { - ObjectBuffer = LoadCompactBinaryObject(std::move(Body)); - Object = ObjectBuffer; - } - else - { - Package = ParsePackageMessage(Body); - Object = Package.GetObject(); - } - OutAcceptMagic = Object["Accept"sv].AsUInt32(); - OutAcceptFlags = static_cast<RpcAcceptOptions>(Object["AcceptFlags"sv].AsUInt16(0u)); - OutTargetProcessId = Object["Pid"sv].AsInt32(0); - - const std::string_view Method = Object["Method"sv].AsString(); - - if (Method == "PutCacheRecords"sv) - { - if (!AreDiskWritesAllowed()) - { - return HttpResponseCode::InsufficientStorage; - } - OutResultPackage = HandleRpcPutCacheRecords(Package); - } - else if (Method == "GetCacheRecords"sv) - { - OutResultPackage = HandleRpcGetCacheRecords(Object); - } - else if (Method == "PutCacheValues"sv) - { - if (!AreDiskWritesAllowed()) - { - return HttpResponseCode::InsufficientStorage; - } - OutResultPackage = HandleRpcPutCacheValues(Package); - } - else if (Method == "GetCacheValues"sv) - { - OutResultPackage = HandleRpcGetCacheValues(Object); - } - else if (Method == "GetCacheChunks"sv) - { - OutResultPackage = HandleRpcGetCacheChunks(Object); - } - else - { - return HttpResponseCode::BadRequest; - } - return HttpResponseCode::OK; -} - -void -HttpStructuredCacheService::ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount) -{ - WorkerThreadPool WorkerPool(ThreadCount); - uint64_t RequestCount = Replayer.GetRequestCount(); - Stopwatch Timer; - auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); - Latch JobLatch(RequestCount); - ZEN_INFO("Replaying {} requests", RequestCount); - for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) - { - WorkerPool.ScheduleWork([this, &JobLatch, &Replayer, RequestIndex]() { - IoBuffer Body; - std::pair<ZenContentType, ZenContentType> ContentType = Replayer.GetRequest(RequestIndex, Body); - if (Body) - { - uint32_t AcceptMagic = 0; - RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; - int TargetPid = 0; - CbPackage RpcResult; - if (IsHttpSuccessCode(HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid, RpcResult))) - { - if (AcceptMagic == kCbPkgMagic) - { - FormatFlags Flags = FormatFlags::kDefault; - if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) - { - Flags |= FormatFlags::kAllowLocalReferences; - if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) - { - Flags |= FormatFlags::kDenyPartialLocalReferences; - } - } - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetPid); - ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0); - } - else - { - BinaryWriter MemStream; - RpcResult.Save(MemStream); - IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()); - ZEN_ASSERT(RpcResponseBuffer.Size() > 0); - } - } - } - JobLatch.CountDown(); - }); - } - while (!JobLatch.Wait(10000)) - { - ZEN_INFO("Replayed {} of {} requests, elapsed {}", - RequestCount - JobLatch.Remaining(), - RequestCount, - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - } -} - -void -HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) -{ - switch (Request.RequestVerb()) - { - case HttpVerb::kPost: - { - const HttpContentType ContentType = Request.RequestContentType(); - const HttpContentType AcceptType = Request.AcceptContentType(); - - if ((ContentType != HttpContentType::kCbObject && ContentType != HttpContentType::kCbPackage) || - AcceptType != HttpContentType::kCbPackage) - { - return Request.WriteResponse(HttpResponseCode::BadRequest); - } - - Request.WriteResponseAsync( - [this, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable { - std::uint64_t RequestIndex = - m_RequestRecorder ? m_RequestRecorder->RecordRequest(ContentType, AcceptType, Body) : ~0ull; - uint32_t AcceptMagic = 0; - RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; - int TargetProcessId = 0; - CbPackage RpcResult; - - HttpResponseCode ResultCode = - HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags, TargetProcessId, RpcResult); - if (!IsHttpSuccessCode(ResultCode)) - { - AsyncRequest.WriteResponse(ResultCode); - return; - } - if (AcceptMagic == kCbPkgMagic) - { - FormatFlags Flags = FormatFlags::kDefault; - if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) - { - Flags |= FormatFlags::kAllowLocalReferences; - if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) - { - Flags |= FormatFlags::kDenyPartialLocalReferences; - } - } - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessId); - if (RequestIndex != ~0ull) - { - ZEN_ASSERT(m_RequestRecorder); - m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer); - } - AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); - } - else - { - BinaryWriter MemStream; - RpcResult.Save(MemStream); - - if (RequestIndex != ~0ull) - { - ZEN_ASSERT(m_RequestRecorder); - m_RequestRecorder->RecordResponse(RequestIndex, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); - } - AsyncRequest.WriteResponse(HttpResponseCode::OK, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); - } - }); - } - break; - default: - Request.WriteResponse(HttpResponseCode::BadRequest); - break; - } -} - -CbPackage -HttpStructuredCacheService::HandleRpcPutCacheRecords(const CbPackage& BatchRequest) -{ - ZEN_TRACE_CPU("Z$::RpcPutCacheRecords"); - - CbObjectView BatchObject = BatchRequest.GetObject(); - ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv); - - CbObjectView Params = BatchObject["Params"sv].AsObjectView(); - CachePolicy DefaultPolicy; - - std::string_view PolicyText = Params["DefaultPolicy"].AsString(); - std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); - if (!Namespace) - { - return CbPackage{}; - } - DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; - std::vector<bool> Results; - for (CbFieldView RequestField : Params["Requests"sv]) - { - CbObjectView RequestObject = RequestField.AsObjectView(); - CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView(); - CbObjectView KeyView = RecordObject["Key"sv].AsObjectView(); - - CacheKey Key; - if (!GetRpcRequestCacheKey(KeyView, Key)) - { - return CbPackage{}; - } - CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); - PutRequestData PutRequest{*Namespace, std::move(Key), RecordObject, std::move(Policy)}; - - PutResult Result = PutCacheRecord(PutRequest, &BatchRequest); - - if (Result == PutResult::Invalid) - { - return CbPackage{}; - } - Results.push_back(Result == PutResult::Success); - } - if (Results.empty()) - { - return CbPackage{}; - } - - CbObjectWriter ResponseObject; - ResponseObject.BeginArray("Result"sv); - for (bool Value : Results) - { - ResponseObject.AddBool(Value); - } - ResponseObject.EndArray(); - - CbPackage RpcResponse; - RpcResponse.SetObject(ResponseObject.Save()); - return RpcResponse; -} - -HttpStructuredCacheService::PutResult -HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPackage* Package) -{ - CbObjectView Record = Request.RecordObject; - uint64_t RecordObjectSize = Record.GetSize(); - uint64_t TransferredSize = RecordObjectSize; - - AttachmentCount Count; - size_t NumAttachments = Package->GetAttachments().size(); - std::vector<IoHash> ValidAttachments; - std::vector<const CbAttachment*> AttachmentsToStoreLocally; - ValidAttachments.reserve(NumAttachments); - AttachmentsToStoreLocally.reserve(NumAttachments); - - Stopwatch Timer; - - Request.RecordObject.IterateAttachments( - [this, &Request, Package, &AttachmentsToStoreLocally, &ValidAttachments, &Count, &TransferredSize](CbFieldView HashView) { - const IoHash ValueHash = HashView.AsHash(); - if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr) - { - if (Attachment->IsCompressedBinary()) - { - AttachmentsToStoreLocally.emplace_back(Attachment); - ValidAttachments.emplace_back(ValueHash); - Count.Valid++; - } - else - { - ZEN_WARN("PUTCACEHRECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", - Request.Namespace, - Request.Key.Bucket, - Request.Key.Hash, - ToString(HttpContentType::kCbPackage), - ValueHash); - Count.Invalid++; - } - } - else if (m_CidStore.ContainsChunk(ValueHash)) - { - ValidAttachments.emplace_back(ValueHash); - Count.Valid++; - } - Count.Total++; - }); - - if (Count.Invalid > 0) - { - return PutResult::Invalid; - } - - ZenCacheValue CacheValue; - CacheValue.Value = IoBuffer(Record.GetSize()); - Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); - CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue); - - for (const CbAttachment* Attachment : AttachmentsToStoreLocally) - { - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); - if (InsertResult.New) - { - Count.New++; - } - TransferredSize += Chunk.GetCompressedSize(); - } - - ZEN_DEBUG("PUTCACEHRECORD - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total) in {}", - Request.Namespace, - Request.Key.Bucket, - Request.Key.Hash, - NiceBytes(TransferredSize), - Count.New, - Count.Valid, - Count.Total, - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - - const bool IsPartialRecord = Count.Valid != Count.Total; - - if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord) - { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, - .Namespace = Request.Namespace, - .Key = Request.Key, - .ValueContentIds = std::move(ValidAttachments)}); - } - return PutResult::Success; -} - -CbPackage -HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) -{ - ZEN_TRACE_CPU("Z$::RpcGetCacheRecords"); - - ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv); - - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); - - struct ValueRequestData - { - Oid ValueId; - IoHash ContentId; - CompressedBuffer Payload; - CachePolicy DownstreamPolicy; - bool Exists = false; - bool ReadFromUpstream = false; - }; - struct RecordRequestData - { - CacheKeyRequest Upstream; - CbObjectView RecordObject; - IoBuffer RecordCacheValue; - CacheRecordPolicy DownstreamPolicy; - std::vector<ValueRequestData> Values; - bool Complete = false; - const UpstreamEndpointInfo* Source = nullptr; - uint64_t ElapsedTimeUs; - }; - - std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); - CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; - std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); - if (!Namespace) - { - return CbPackage{}; - } - std::vector<RecordRequestData> Requests; - std::vector<size_t> UpstreamIndexes; - CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - Requests.reserve(RequestsArray.Num()); - - auto ParseValues = [](RecordRequestData& Request) { - CbArrayView ValuesArray = Request.RecordObject["Values"sv].AsArrayView(); - Request.Values.reserve(ValuesArray.Num()); - for (CbFieldView ValueField : ValuesArray) - { - CbObjectView ValueObject = ValueField.AsObjectView(); - Oid ValueId = ValueObject["Id"sv].AsObjectId(); - CbFieldView RawHashField = ValueObject["RawHash"sv]; - IoHash RawHash = RawHashField.AsBinaryAttachment(); - if (ValueId && !RawHashField.HasError()) - { - Request.Values.push_back({ValueId, RawHash}); - Request.Values.back().DownstreamPolicy = Request.DownstreamPolicy.GetValuePolicy(ValueId); - } - } - }; - - for (CbFieldView RequestField : RequestsArray) - { - Stopwatch Timer; - RecordRequestData& Request = Requests.emplace_back(); - CbObjectView RequestObject = RequestField.AsObjectView(); - CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); - - CacheKey& Key = Request.Upstream.Key; - if (!GetRpcRequestCacheKey(KeyObject, Key)) - { - return CbPackage{}; - } - - Request.DownstreamPolicy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); - const CacheRecordPolicy& Policy = Request.DownstreamPolicy; - - ZenCacheValue CacheValue; - bool NeedUpstreamAttachment = false; - bool FoundLocalInvalid = false; - ZenCacheValue RecordCacheValue; - - if (EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryLocal) && - m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, RecordCacheValue)) - { - Request.RecordCacheValue = std::move(RecordCacheValue.Value); - if (Request.RecordCacheValue.GetContentType() != ZenContentType::kCbObject) - { - FoundLocalInvalid = true; - } - else - { - Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData()); - ParseValues(Request); - - Request.Complete = true; - for (ValueRequestData& Value : Request.Values) - { - CachePolicy ValuePolicy = Value.DownstreamPolicy; - if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal)) - { - // A value that is requested without the Query flag (such as None/Disable) counts as existing, because we - // didn't ask for it and thus the record is complete in its absence. - if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) - { - Value.Exists = true; - } - else - { - NeedUpstreamAttachment = true; - Value.ReadFromUpstream = true; - Request.Complete = false; - } - } - else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) - { - if (m_CidStore.ContainsChunk(Value.ContentId)) - { - Value.Exists = true; - } - else - { - if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) - { - NeedUpstreamAttachment = true; - Value.ReadFromUpstream = true; - } - Request.Complete = false; - } - } - else - { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId)) - { - ZEN_ASSERT(Chunk.GetSize() > 0); - Value.Payload = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); - Value.Exists = true; - } - else - { - if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) - { - NeedUpstreamAttachment = true; - Value.ReadFromUpstream = true; - } - Request.Complete = false; - } - } - } - } - } - if (!Request.Complete) - { - bool NeedUpstreamRecord = - !Request.RecordObject && !FoundLocalInvalid && EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryRemote); - if (NeedUpstreamRecord || NeedUpstreamAttachment) - { - UpstreamIndexes.push_back(Requests.size() - 1); - } - } - Request.ElapsedTimeUs = Timer.GetElapsedTimeUs(); - } - if (Requests.empty()) - { - return CbPackage{}; - } - - if (!UpstreamIndexes.empty()) - { - std::vector<CacheKeyRequest*> UpstreamRequests; - UpstreamRequests.reserve(UpstreamIndexes.size()); - for (size_t Index : UpstreamIndexes) - { - RecordRequestData& Request = Requests[Index]; - UpstreamRequests.push_back(&Request.Upstream); - - if (Request.Values.size()) - { - // We will be returning the local object and know all the value Ids that exist in it - // Convert all their Downstream Values to upstream values, and add SkipData to any ones that we already have. - CachePolicy UpstreamBasePolicy = ConvertToUpstream(Request.DownstreamPolicy.GetBasePolicy()) | CachePolicy::SkipMeta; - CacheRecordPolicyBuilder Builder(UpstreamBasePolicy); - for (ValueRequestData& Value : Request.Values) - { - CachePolicy UpstreamPolicy = ConvertToUpstream(Value.DownstreamPolicy); - UpstreamPolicy |= !Value.ReadFromUpstream ? CachePolicy::SkipData : CachePolicy::None; - Builder.AddValuePolicy(Value.ValueId, UpstreamPolicy); - } - Request.Upstream.Policy = Builder.Build(); - } - else - { - // We don't know which Values exist in the Record; ask the upstrem for all values that the client wants, - // and convert the CacheRecordPolicy to an upstream policy - Request.Upstream.Policy = Request.DownstreamPolicy.ConvertToUpstream(); - } - } - - const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues](CacheRecordGetCompleteParams&& Params) { - if (!Params.Record) - { - return; - } - - RecordRequestData& Request = - *reinterpret_cast<RecordRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(RecordRequestData, Upstream)); - Request.ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0); - const CacheKey& Key = Request.Upstream.Key; - Stopwatch Timer; - auto TimeGuard = MakeGuard([&Timer, &Request]() { Request.ElapsedTimeUs += Timer.GetElapsedTimeUs(); }); - if (!Request.RecordObject) - { - CbObject ObjectBuffer = CbObject::Clone(Params.Record); - Request.RecordCacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); - Request.RecordCacheValue.SetContentType(ZenContentType::kCbObject); - Request.RecordObject = ObjectBuffer; - bool StoreLocal = - EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed(); - if (StoreLocal) - { - m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}); - } - ParseValues(Request); - Request.Source = Params.Source; - } - - Request.Complete = true; - for (ValueRequestData& Value : Request.Values) - { - if (Value.Exists) - { - continue; - } - CachePolicy ValuePolicy = Value.DownstreamPolicy; - if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) - { - Request.Complete = false; - continue; - } - if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData) || EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) - { - bool StoreLocal = EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); - if (const CbAttachment* Attachment = Params.Package.FindAttachment(Value.ContentId)) - { - if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) - { - Request.Source = Params.Source; - Value.Exists = true; - if (StoreLocal) - { - m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); - } - if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) - { - Value.Payload = Compressed; - } - } - else - { - ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}/{}'", - Value.ContentId, - *Namespace, - Key.Bucket, - Key.Hash); - } - } - if (!Value.Exists && !EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) - { - Request.Complete = false; - } - // Request.Complete does not need to be set to false for upstream SkipData attachments. - // In the PartialRecord==false case, the upstream will have failed the entire record if any SkipData attachment - // didn't exist and we will not get here. In the PartialRecord==true case, we do not need to inform the client of - // any missing SkipData attachments. - } - Request.ElapsedTimeUs += Timer.GetElapsedTimeUs(); - } - }; - - m_UpstreamCache.GetCacheRecords(*Namespace, UpstreamRequests, std::move(OnCacheRecordGetComplete)); - } - - CbPackage ResponsePackage; - CbObjectWriter ResponseObject; - - ResponseObject.BeginArray("Result"sv); - for (RecordRequestData& Request : Requests) - { - const CacheKey& Key = Request.Upstream.Key; - if (Request.Complete || - (Request.RecordObject && EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::PartialRecord))) - { - ResponseObject << Request.RecordObject; - for (ValueRequestData& Value : Request.Values) - { - if (!EnumHasAllFlags(Value.DownstreamPolicy, CachePolicy::SkipData) && Value.Payload) - { - ResponsePackage.AddAttachment(CbAttachment(Value.Payload, Value.ContentId)); - } - } - - ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {}{} ({}) in {}", - *Namespace, - Key.Bucket, - Key.Hash, - NiceBytes(Request.RecordCacheValue.Size()), - Request.Complete ? ""sv : " (PARTIAL)"sv, - Request.Source ? Request.Source->Url : "LOCAL"sv, - NiceLatencyNs(Request.ElapsedTimeUs * 1000)); - m_CacheStats.HitCount++; - m_CacheStats.UpstreamHitCount += Request.Source ? 1 : 0; - } - else - { - ResponseObject.AddNull(); - - if (!EnumHasAnyFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::Query)) - { - // If they requested no query, do not record this as a miss - ZEN_DEBUG("GETCACHERECORD DISABLEDQUERY - '{}/{}/{}' in {}", - *Namespace, - Key.Bucket, - Key.Hash, - NiceLatencyNs(Request.ElapsedTimeUs * 1000)); - } - else - { - ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}'{} ({}) in {}", - *Namespace, - Key.Bucket, - Key.Hash, - Request.RecordObject ? ""sv : " (PARTIAL)"sv, - Request.Source ? Request.Source->Url : "LOCAL"sv, - NiceLatencyNs(Request.ElapsedTimeUs * 1000)); - m_CacheStats.MissCount++; - } - } - } - ResponseObject.EndArray(); - ResponsePackage.SetObject(ResponseObject.Save()); - return ResponsePackage; -} - -CbPackage -HttpStructuredCacheService::HandleRpcPutCacheValues(const CbPackage& BatchRequest) -{ - CbObjectView BatchObject = BatchRequest.GetObject(); - CbObjectView Params = BatchObject["Params"sv].AsObjectView(); - - std::string_view PolicyText = Params["DefaultPolicy"].AsString(); - CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; - std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); - if (!Namespace) - { - return CbPackage{}; - } - std::vector<bool> Results; - for (CbFieldView RequestField : Params["Requests"sv]) - { - Stopwatch Timer; - - CbObjectView RequestObject = RequestField.AsObjectView(); - CbObjectView KeyView = RequestObject["Key"sv].AsObjectView(); - - CacheKey Key; - if (!GetRpcRequestCacheKey(KeyView, Key)) - { - return CbPackage{}; - } - - PolicyText = RequestObject["Policy"sv].AsString(); - CachePolicy Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; - IoHash RawHash = RequestObject["RawHash"sv].AsBinaryAttachment(); - uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64(); - bool Succeeded = false; - uint64_t TransferredSize = 0; - - if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash)) - { - if (Attachment->IsCompressedBinary()) - { - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) - { - // TODO: Implement upstream puts of CacheValues with StoreLocal == false. - // Currently ProcessCacheRecord requires that the value exist in the local cache to put it upstream. - Policy |= CachePolicy::StoreLocal; - } - - if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) - { - IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); - Value.SetContentType(ZenContentType::kCompressedBinary); - if (RawSize == 0) - { - RawSize = Chunk.DecodeRawSize(); - } - m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}); - TransferredSize = Chunk.GetCompressedSize(); - } - Succeeded = true; - } - else - { - ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash); - return CbPackage{}; - } - } - else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) - { - ZenCacheValue ExistingValue; - if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, ExistingValue) && - IsCompressedBinary(ExistingValue.Value.GetContentType())) - { - Succeeded = true; - } - } - // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put. - // If it doesn't exist locally they should do the heavy put rather than having us fetch it from upstream. - - if (Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) - { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = Key}); - } - Results.push_back(Succeeded); - ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}' in {}", - *Namespace, - Key.Bucket, - Key.Hash, - NiceBytes(TransferredSize), - Succeeded ? "Added"sv : "Invalid", - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - } - if (Results.empty()) - { - return CbPackage{}; - } - - CbObjectWriter ResponseObject; - ResponseObject.BeginArray("Result"sv); - for (bool Value : Results) - { - ResponseObject.AddBool(Value); - } - ResponseObject.EndArray(); - - CbPackage RpcResponse; - RpcResponse.SetObject(ResponseObject.Save()); - - return RpcResponse; -} - -CbPackage -HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest) -{ - ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv); - - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); - std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); - CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; - std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); - if (!Namespace) - { - return CbPackage{}; - } - - struct RequestData - { - CacheKey Key; - CachePolicy Policy; - IoHash RawHash = IoHash::Zero; - uint64_t RawSize = 0; - CompressedBuffer Result; - }; - std::vector<RequestData> Requests; - - std::vector<size_t> RemoteRequestIndexes; - - for (CbFieldView RequestField : Params["Requests"sv]) - { - Stopwatch Timer; - - RequestData& Request = Requests.emplace_back(); - CbObjectView RequestObject = RequestField.AsObjectView(); - CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); - - if (!GetRpcRequestCacheKey(KeyObject, Request.Key)) - { - return CbPackage{}; - } - - PolicyText = RequestObject["Policy"sv].AsString(); - Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; - - CacheKey& Key = Request.Key; - CachePolicy Policy = Request.Policy; - - ZenCacheValue CacheValue; - if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) - { - if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType())) - { - Request.RawHash = CacheValue.RawHash; - Request.RawSize = CacheValue.RawSize; - Request.Result = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value)); - } - } - if (Request.Result) - { - ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", - *Namespace, - Key.Bucket, - Key.Hash, - NiceBytes(Request.Result.GetCompressed().GetSize()), - "LOCAL"sv, - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - m_CacheStats.HitCount++; - } - else if (EnumHasAllFlags(Policy, CachePolicy::QueryRemote)) - { - RemoteRequestIndexes.push_back(Requests.size() - 1); - } - else if (!EnumHasAnyFlags(Policy, CachePolicy::Query)) - { - // If they requested no query, do not record this as a miss - ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash); - } - else - { - ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}", - *Namespace, - Key.Bucket, - Key.Hash, - "LOCAL"sv, - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - m_CacheStats.MissCount++; - } - } - - if (!RemoteRequestIndexes.empty()) - { - std::vector<CacheValueRequest> RequestedRecordsData; - std::vector<CacheValueRequest*> CacheValueRequests; - RequestedRecordsData.reserve(RemoteRequestIndexes.size()); - CacheValueRequests.reserve(RemoteRequestIndexes.size()); - for (size_t Index : RemoteRequestIndexes) - { - RequestData& Request = Requests[Index]; - RequestedRecordsData.push_back({.Key = {Request.Key.Bucket, Request.Key.Hash}, .Policy = ConvertToUpstream(Request.Policy)}); - CacheValueRequests.push_back(&RequestedRecordsData.back()); - } - Stopwatch Timer; - m_UpstreamCache.GetCacheValues( - *Namespace, - CacheValueRequests, - [this, Namespace, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer](CacheValueGetCompleteParams&& Params) { - CacheValueRequest& ChunkRequest = Params.Request; - if (Params.RawHash != IoHash::Zero) - { - size_t RequestOffset = std::distance(RequestedRecordsData.data(), &ChunkRequest); - size_t RequestIndex = RemoteRequestIndexes[RequestOffset]; - RequestData& Request = Requests[RequestIndex]; - Request.RawHash = Params.RawHash; - Request.RawSize = Params.RawSize; - const bool HasData = IsCompressedBinary(Params.Value.GetContentType()); - const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData); - const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); - const bool IsHit = SkipData || HasData; - if (IsHit) - { - if (HasData && !SkipData) - { - Request.Result = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Params.Value)); - } - - if (HasData && StoreData) - { - m_CacheStore.Put(*Namespace, - Request.Key.Bucket, - Request.Key.Hash, - ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}); - } - - ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", - *Namespace, - ChunkRequest.Key.Bucket, - ChunkRequest.Key.Hash, - NiceBytes(Request.Result.GetCompressed().GetSize()), - Params.Source ? Params.Source->Url : "UPSTREAM", - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - m_CacheStats.HitCount++; - m_CacheStats.UpstreamHitCount++; - return; - } - } - ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}", - *Namespace, - ChunkRequest.Key.Bucket, - ChunkRequest.Key.Hash, - Params.Source ? Params.Source->Url : "UPSTREAM", - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - m_CacheStats.MissCount++; - }); - } - - if (Requests.empty()) - { - return CbPackage{}; - } - - CbPackage RpcResponse; - CbObjectWriter ResponseObject; - ResponseObject.BeginArray("Result"sv); - for (const RequestData& Request : Requests) - { - ResponseObject.BeginObject(); - { - const CompressedBuffer& Result = Request.Result; - if (Result) - { - ResponseObject.AddHash("RawHash"sv, Request.RawHash); - if (!EnumHasAllFlags(Request.Policy, CachePolicy::SkipData)) - { - RpcResponse.AddAttachment(CbAttachment(Result, Request.RawHash)); - } - else - { - ResponseObject.AddInteger("RawSize"sv, Request.RawSize); - } - } - else if (Request.RawHash != IoHash::Zero) - { - ResponseObject.AddHash("RawHash"sv, Request.RawHash); - ResponseObject.AddInteger("RawSize"sv, Request.RawSize); - } - } - ResponseObject.EndObject(); - } - ResponseObject.EndArray(); - - RpcResponse.SetObject(ResponseObject.Save()); - return RpcResponse; -} - -namespace cache::detail { - - struct RecordValue - { - Oid ValueId; - IoHash ContentId; - uint64_t RawSize; - }; - struct RecordBody - { - IoBuffer CacheValue; - std::vector<RecordValue> Values; - const UpstreamEndpointInfo* Source = nullptr; - CachePolicy DownstreamPolicy; - bool Exists = false; - bool HasRequest = false; - bool ValuesRead = false; - }; - struct ChunkRequest - { - CacheChunkRequest* Key = nullptr; - RecordBody* Record = nullptr; - CompressedBuffer Value; - const UpstreamEndpointInfo* Source = nullptr; - uint64_t RawSize = 0; - uint64_t RequestedSize = 0; - uint64_t RequestedOffset = 0; - CachePolicy DownstreamPolicy; - bool Exists = false; - bool RawSizeKnown = false; - bool IsRecordRequest = false; - uint64_t ElapsedTimeUs = 0; - }; - -} // namespace cache::detail - -CbPackage -HttpStructuredCacheService::HandleRpcGetCacheChunks(CbObjectView RpcRequest) -{ - using namespace cache::detail; - - std::string Namespace; - std::vector<CacheKeyRequest> RecordKeys; // Data about a Record necessary to identify it to the upstream - std::vector<RecordBody> Records; // Scratch-space data about a Record when fulfilling RecordRequests - std::vector<CacheChunkRequest> RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream - std::vector<ChunkRequest> Requests; // Intermediate and result data about a ChunkRequest - std::vector<ChunkRequest*> RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key - std::vector<ChunkRequest*> ValueRequests; // The ChunkRequests that are requesting a Value Key - std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream - - // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests - if (!ParseGetCacheChunksRequest(Namespace, RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest)) - { - return CbPackage{}; - } - - // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we - // have it locally, and otherwise append a request for the payload to UpstreamChunks - GetLocalCacheRecords(Namespace, RecordKeys, Records, RecordRequests, UpstreamChunks); - - // For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks - GetLocalCacheValues(Namespace, ValueRequests, UpstreamChunks); - - // Call GetCacheChunks on the upstream for any payloads we do not have locally - GetUpstreamCacheChunks(Namespace, UpstreamChunks, RequestKeys, Requests); - - // Send the payload and descriptive data about each chunk to the client - return WriteGetCacheChunksResponse(Namespace, Requests); -} - -bool -HttpStructuredCacheService::ParseGetCacheChunksRequest(std::string& Namespace, - std::vector<CacheKeyRequest>& RecordKeys, - std::vector<cache::detail::RecordBody>& Records, - std::vector<CacheChunkRequest>& RequestKeys, - std::vector<cache::detail::ChunkRequest>& Requests, - std::vector<cache::detail::ChunkRequest*>& RecordRequests, - std::vector<cache::detail::ChunkRequest*>& ValueRequests, - CbObjectView RpcRequest) -{ - using namespace cache::detail; - - ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv); - - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); - std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString(); - CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default; - - std::optional<std::string> NamespaceText = GetRpcRequestNamespace(Params); - if (!NamespaceText) - { - ZEN_WARN("GetCacheChunks: Invalid namespace in ChunkRequest."); - return false; - } - Namespace = *NamespaceText; - - CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView(); - size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num()); - - // Note that these reservations allow us to take pointers to the elements while populating them. If the reservation is removed, - // we will need to change the pointers to indexes to handle reallocations. - RecordKeys.reserve(NumRequests); - Records.reserve(NumRequests); - RequestKeys.reserve(NumRequests); - Requests.reserve(NumRequests); - RecordRequests.reserve(NumRequests); - ValueRequests.reserve(NumRequests); - - CacheKeyRequest* PreviousRecordKey = nullptr; - RecordBody* PreviousRecord = nullptr; - - for (CbFieldView RequestView : ChunkRequestsArray) - { - CbObjectView RequestObject = RequestView.AsObjectView(); - CacheChunkRequest& RequestKey = RequestKeys.emplace_back(); - ChunkRequest& Request = Requests.emplace_back(); - CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); - - Request.Key = &RequestKey; - if (!GetRpcRequestCacheKey(KeyObject, Request.Key->Key)) - { - ZEN_WARN("GetCacheChunks: Invalid key in ChunkRequest."); - return false; - } - - RequestKey.ChunkId = RequestObject["ChunkId"sv].AsHash(); - RequestKey.ValueId = RequestObject["ValueId"sv].AsObjectId(); - RequestKey.RawOffset = RequestObject["RawOffset"sv].AsUInt64(); - RequestKey.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX); - Request.RequestedSize = RequestKey.RawSize; - Request.RequestedOffset = RequestKey.RawOffset; - std::string_view PolicyText = RequestObject["Policy"sv].AsString(); - Request.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; - Request.IsRecordRequest = (bool)RequestKey.ValueId; - - if (!Request.IsRecordRequest) - { - ValueRequests.push_back(&Request); - } - else - { - RecordRequests.push_back(&Request); - CacheKeyRequest* RecordKey = nullptr; - RecordBody* Record = nullptr; - - if (!PreviousRecordKey || PreviousRecordKey->Key < RequestKey.Key) - { - RecordKey = &RecordKeys.emplace_back(); - PreviousRecordKey = RecordKey; - Record = &Records.emplace_back(); - PreviousRecord = Record; - RecordKey->Key = RequestKey.Key; - } - else if (RequestKey.Key == PreviousRecordKey->Key) - { - RecordKey = PreviousRecordKey; - Record = PreviousRecord; - } - else - { - ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.", - RequestKey.Key.Bucket, - RequestKey.Key.Hash, - PreviousRecordKey->Key.Bucket, - PreviousRecordKey->Key.Hash); - return false; - } - Request.Record = Record; - if (RequestKey.ChunkId == RequestKey.ChunkId.Zero) - { - Record->DownstreamPolicy = - Record->HasRequest ? Union(Record->DownstreamPolicy, Request.DownstreamPolicy) : Request.DownstreamPolicy; - Record->HasRequest = true; - } - } - } - if (Requests.empty()) - { - return false; - } - return true; -} - -void -HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespace, - std::vector<CacheKeyRequest>& RecordKeys, - std::vector<cache::detail::RecordBody>& Records, - std::vector<cache::detail::ChunkRequest*>& RecordRequests, - std::vector<CacheChunkRequest*>& OutUpstreamChunks) -{ - using namespace cache::detail; - - std::vector<CacheKeyRequest*> UpstreamRecordRequests; - for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex) - { - Stopwatch Timer; - CacheKeyRequest& RecordKey = RecordKeys[RecordIndex]; - RecordBody& Record = Records[RecordIndex]; - if (Record.HasRequest) - { - Record.DownstreamPolicy |= CachePolicy::SkipData | CachePolicy::SkipMeta; - - if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal)) - { - ZenCacheValue CacheValue; - if (m_CacheStore.Get(Namespace, RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue)) - { - Record.Exists = true; - Record.CacheValue = std::move(CacheValue.Value); - } - } - if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryRemote)) - { - RecordKey.Policy = CacheRecordPolicy(ConvertToUpstream(Record.DownstreamPolicy)); - UpstreamRecordRequests.push_back(&RecordKey); - } - RecordRequests[RecordIndex]->ElapsedTimeUs += Timer.GetElapsedTimeUs(); - } - } - - if (!UpstreamRecordRequests.empty()) - { - const auto OnCacheRecordGetComplete = - [this, Namespace, &RecordKeys, &Records, &RecordRequests](CacheRecordGetCompleteParams&& Params) { - if (!Params.Record) - { - return; - } - CacheKeyRequest& RecordKey = Params.Request; - size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey); - RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0); - RecordBody& Record = Records[RecordIndex]; - - const CacheKey& Key = RecordKey.Key; - Record.Exists = true; - CbObject ObjectBuffer = CbObject::Clone(Params.Record); - Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); - Record.CacheValue.SetContentType(ZenContentType::kCbObject); - Record.Source = Params.Source; - - bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); - if (StoreLocal) - { - m_CacheStore.Put(Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}); - } - }; - m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); - } - - std::vector<CacheChunkRequest*> UpstreamPayloadRequests; - for (ChunkRequest* Request : RecordRequests) - { - Stopwatch Timer; - if (Request->Key->ChunkId == IoHash::Zero) - { - // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId) - // is missing, parse the cache record and try to find the raw hash from the ValueId. - RecordBody& Record = *Request->Record; - if (!Record.ValuesRead) - { - Record.ValuesRead = true; - if (Record.CacheValue && Record.CacheValue.GetContentType() == ZenContentType::kCbObject) - { - CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData()); - CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView(); - Record.Values.reserve(ValuesArray.Num()); - for (CbFieldView ValueField : ValuesArray) - { - CbObjectView ValueObject = ValueField.AsObjectView(); - Oid ValueId = ValueObject["Id"sv].AsObjectId(); - CbFieldView RawHashField = ValueObject["RawHash"sv]; - IoHash RawHash = RawHashField.AsBinaryAttachment(); - if (ValueId && !RawHashField.HasError()) - { - Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()}); - } - } - } - } - - for (const RecordValue& Value : Record.Values) - { - if (Value.ValueId == Request->Key->ValueId) - { - Request->Key->ChunkId = Value.ContentId; - Request->RawSize = Value.RawSize; - Request->RawSizeKnown = true; - break; - } - } - } - - // Now load the ContentId from the local ContentIdStore or from the upstream - if (Request->Key->ChunkId != IoHash::Zero) - { - if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) - { - if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown) - { - if (m_CidStore.ContainsChunk(Request->Key->ChunkId)) - { - Request->Exists = true; - } - } - else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId)) - { - if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) - { - Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(Payload)); - if (Request->Value) - { - Request->Exists = true; - Request->RawSizeKnown = false; - } - } - else - { - IoHash _; - if (CompressedBuffer::ValidateCompressedHeader(Payload, _, Request->RawSize)) - { - Request->Exists = true; - Request->RawSizeKnown = true; - } - } - } - } - if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote)) - { - Request->Key->Policy = ConvertToUpstream(Request->DownstreamPolicy); - OutUpstreamChunks.push_back(Request->Key); - } - } - Request->ElapsedTimeUs += Timer.GetElapsedTimeUs(); - } -} - -void -HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespace, - std::vector<cache::detail::ChunkRequest*>& ValueRequests, - std::vector<CacheChunkRequest*>& OutUpstreamChunks) -{ - using namespace cache::detail; - - for (ChunkRequest* Request : ValueRequests) - { - Stopwatch Timer; - if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) - { - ZenCacheValue CacheValue; - if (m_CacheStore.Get(Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue)) - { - if (IsCompressedBinary(CacheValue.Value.GetContentType())) - { - Request->Key->ChunkId = CacheValue.RawHash; - Request->Exists = true; - Request->RawSize = CacheValue.RawSize; - Request->RawSizeKnown = true; - if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) - { - Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value)); - } - } - } - } - if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote)) - { - if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::StoreLocal)) - { - // Convert the Offset,Size request into a request for the entire value; we will need it all to be able to store it locally - Request->Key->RawOffset = 0; - Request->Key->RawSize = UINT64_MAX; - } - OutUpstreamChunks.push_back(Request->Key); - } - Request->ElapsedTimeUs += Timer.GetElapsedTimeUs(); - } -} - -void -HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Namespace, - std::vector<CacheChunkRequest*>& UpstreamChunks, - std::vector<CacheChunkRequest>& RequestKeys, - std::vector<cache::detail::ChunkRequest>& Requests) -{ - using namespace cache::detail; - - if (!UpstreamChunks.empty()) - { - const auto OnCacheChunksGetComplete = [this, Namespace, &RequestKeys, &Requests](CacheChunkGetCompleteParams&& Params) { - if (Params.RawHash == Params.RawHash.Zero) - { - return; - } - - CacheChunkRequest& Key = Params.Request; - size_t RequestIndex = std::distance(RequestKeys.data(), &Key); - ChunkRequest& Request = Requests[RequestIndex]; - Request.ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0); - if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) || - !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) - { - CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Params.Value)); - if (!Compressed) - { - return; - } - - bool StoreLocal = EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); - if (StoreLocal) - { - if (Request.IsRecordRequest) - { - m_CidStore.AddChunk(Params.Value, Params.RawHash); - } - else - { - m_CacheStore.Put(Namespace, - Key.Key.Bucket, - Key.Key.Hash, - {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash}); - } - } - if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) - { - Request.Value = std::move(Compressed); - } - } - Key.ChunkId = Params.RawHash; - Request.Exists = true; - Request.RawSize = Params.RawSize; - Request.RawSizeKnown = true; - Request.Source = Params.Source; - - m_CacheStats.UpstreamHitCount++; - }; - - m_UpstreamCache.GetCacheChunks(Namespace, UpstreamChunks, std::move(OnCacheChunksGetComplete)); - } -} - -CbPackage -HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests) -{ - using namespace cache::detail; - - CbPackage RpcResponse; - CbObjectWriter Writer; - - Writer.BeginArray("Result"sv); - for (ChunkRequest& Request : Requests) - { - Writer.BeginObject(); - { - if (Request.Exists) - { - Writer.AddHash("RawHash"sv, Request.Key->ChunkId); - if (Request.Value && !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) - { - RpcResponse.AddAttachment(CbAttachment(Request.Value, Request.Key->ChunkId)); - } - else - { - Writer.AddInteger("RawSize"sv, Request.RawSize); - } - - ZEN_DEBUG("GETCACHECHUNKS HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}", - Namespace, - Request.Key->Key.Bucket, - Request.Key->Key.Hash, - Request.Key->ValueId, - NiceBytes(Request.RawSize), - Request.IsRecordRequest ? "Record"sv : "Value"sv, - Request.Source ? Request.Source->Url : "LOCAL"sv, - NiceLatencyNs(Request.ElapsedTimeUs * 1000)); - m_CacheStats.HitCount++; - } - else if (!EnumHasAnyFlags(Request.DownstreamPolicy, CachePolicy::Query)) - { - ZEN_DEBUG("GETCACHECHUNKS DISABLEDQUERY - '{}/{}/{}/{}' in {}", - Namespace, - Request.Key->Key.Bucket, - Request.Key->Key.Hash, - Request.Key->ValueId, - NiceLatencyNs(Request.ElapsedTimeUs * 1000)); - } - else - { - ZEN_DEBUG("GETCACHECHUNKS MISS - '{}/{}/{}/{}' in {}", - Namespace, - Request.Key->Key.Bucket, - Request.Key->Key.Hash, - Request.Key->ValueId, - NiceLatencyNs(Request.ElapsedTimeUs * 1000)); - m_CacheStats.MissCount++; - } - } - Writer.EndObject(); - } - Writer.EndArray(); - - RpcResponse.SetObject(Writer.Save()); - return RpcResponse; -} - -void -HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) -{ - CbObjectWriter Cbo; - - EmitSnapshot("requests", m_HttpRequests, Cbo); - EmitSnapshot("upstream_gets", m_UpstreamGetRequestTiming, Cbo); - - const uint64_t HitCount = m_CacheStats.HitCount; - const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount; - const uint64_t MissCount = m_CacheStats.MissCount; - const uint64_t TotalCount = HitCount + MissCount; - - const CidStoreSize CidSize = m_CidStore.TotalSize(); - const GcStorageSize CacheSize = m_CacheStore.StorageSize(); - - Cbo.BeginObject("cache"); - { - Cbo.BeginObject("size"); - { - Cbo << "disk" << CacheSize.DiskSize; - Cbo << "memory" << CacheSize.MemorySize; - } - Cbo.EndObject(); - - Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); - Cbo << "hits" << HitCount << "misses" << MissCount; - Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0); - Cbo << "upstream_hits" << m_CacheStats.UpstreamHitCount; - Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); - } - Cbo.EndObject(); - - Cbo.BeginObject("upstream"); - { - m_UpstreamCache.GetStatus(Cbo); - } - Cbo.EndObject(); - - Cbo.BeginObject("cid"); - { - Cbo.BeginObject("size"); - { - Cbo << "tiny" << CidSize.TinySize; - Cbo << "small" << CidSize.SmallSize; - Cbo << "large" << CidSize.LargeSize; - Cbo << "total" << CidSize.TotalSize; - } - Cbo.EndObject(); - } - Cbo.EndObject(); - - Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); -} - -void -HttpStructuredCacheService::HandleStatusRequest(HttpServerRequest& Request) -{ - CbObjectWriter Cbo; - Cbo << "ok" << true; - Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); -} - -bool -HttpStructuredCacheService::AreDiskWritesAllowed() const -{ - return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed()); -} - -#if ZEN_WITH_TESTS - -TEST_CASE("z$service.parse.relative.Uri") -{ - HttpRequestData RootRequest; - CHECK(HttpRequestParseRelativeUri("", RootRequest)); - CHECK(!RootRequest.Namespace.has_value()); - CHECK(!RootRequest.Bucket.has_value()); - CHECK(!RootRequest.HashKey.has_value()); - CHECK(!RootRequest.ValueContentId.has_value()); - - RootRequest = {}; - CHECK(HttpRequestParseRelativeUri("/", RootRequest)); - CHECK(!RootRequest.Namespace.has_value()); - CHECK(!RootRequest.Bucket.has_value()); - CHECK(!RootRequest.HashKey.has_value()); - CHECK(!RootRequest.ValueContentId.has_value()); - - HttpRequestData LegacyBucketRequestBecomesNamespaceRequest; - CHECK(HttpRequestParseRelativeUri("test", LegacyBucketRequestBecomesNamespaceRequest)); - CHECK(LegacyBucketRequestBecomesNamespaceRequest.Namespace == "test"sv); - CHECK(!LegacyBucketRequestBecomesNamespaceRequest.Bucket.has_value()); - CHECK(!LegacyBucketRequestBecomesNamespaceRequest.HashKey.has_value()); - CHECK(!LegacyBucketRequestBecomesNamespaceRequest.ValueContentId.has_value()); - - HttpRequestData LegacyHashKeyRequest; - CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234", LegacyHashKeyRequest)); - CHECK(LegacyHashKeyRequest.Namespace == ZenCacheStore::DefaultNamespace); - CHECK(LegacyHashKeyRequest.Bucket == "test"sv); - CHECK(LegacyHashKeyRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); - CHECK(!LegacyHashKeyRequest.ValueContentId.has_value()); - - HttpRequestData LegacyValueContentIdRequest; - CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234/56789abcdef12345678956789abcdef123456789", - LegacyValueContentIdRequest)); - CHECK(LegacyValueContentIdRequest.Namespace == ZenCacheStore::DefaultNamespace); - CHECK(LegacyValueContentIdRequest.Bucket == "test"sv); - CHECK(LegacyValueContentIdRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); - CHECK(LegacyValueContentIdRequest.ValueContentId == IoHash::FromHexString("56789abcdef12345678956789abcdef123456789"sv)); - - HttpRequestData V2DefaultNamespaceRequest; - CHECK(HttpRequestParseRelativeUri("ue4.ddc", V2DefaultNamespaceRequest)); - CHECK(V2DefaultNamespaceRequest.Namespace == "ue4.ddc"); - CHECK(!V2DefaultNamespaceRequest.Bucket.has_value()); - CHECK(!V2DefaultNamespaceRequest.HashKey.has_value()); - CHECK(!V2DefaultNamespaceRequest.ValueContentId.has_value()); - - HttpRequestData V2NamespaceRequest; - CHECK(HttpRequestParseRelativeUri("nicenamespace", V2NamespaceRequest)); - CHECK(V2NamespaceRequest.Namespace == "nicenamespace"sv); - CHECK(!V2NamespaceRequest.Bucket.has_value()); - CHECK(!V2NamespaceRequest.HashKey.has_value()); - CHECK(!V2NamespaceRequest.ValueContentId.has_value()); - - HttpRequestData V2BucketRequestWithDefaultNamespace; - CHECK(HttpRequestParseRelativeUri("ue4.ddc/test", V2BucketRequestWithDefaultNamespace)); - CHECK(V2BucketRequestWithDefaultNamespace.Namespace == "ue4.ddc"); - CHECK(V2BucketRequestWithDefaultNamespace.Bucket == "test"sv); - CHECK(!V2BucketRequestWithDefaultNamespace.HashKey.has_value()); - CHECK(!V2BucketRequestWithDefaultNamespace.ValueContentId.has_value()); - - HttpRequestData V2BucketRequestWithNamespace; - CHECK(HttpRequestParseRelativeUri("nicenamespace/test", V2BucketRequestWithNamespace)); - CHECK(V2BucketRequestWithNamespace.Namespace == "nicenamespace"sv); - CHECK(V2BucketRequestWithNamespace.Bucket == "test"sv); - CHECK(!V2BucketRequestWithNamespace.HashKey.has_value()); - CHECK(!V2BucketRequestWithNamespace.ValueContentId.has_value()); - - HttpRequestData V2HashKeyRequest; - CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234", V2HashKeyRequest)); - CHECK(V2HashKeyRequest.Namespace == ZenCacheStore::DefaultNamespace); - CHECK(V2HashKeyRequest.Bucket == "test"); - CHECK(V2HashKeyRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); - CHECK(!V2HashKeyRequest.ValueContentId.has_value()); - - HttpRequestData V2ValueContentIdRequest; - CHECK( - HttpRequestParseRelativeUri("nicenamespace/test/0123456789abcdef12340123456789abcdef1234/56789abcdef12345678956789abcdef123456789", - V2ValueContentIdRequest)); - CHECK(V2ValueContentIdRequest.Namespace == "nicenamespace"sv); - CHECK(V2ValueContentIdRequest.Bucket == "test"sv); - CHECK(V2ValueContentIdRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); - CHECK(V2ValueContentIdRequest.ValueContentId == IoHash::FromHexString("56789abcdef12345678956789abcdef123456789"sv)); - - HttpRequestData Invalid; - CHECK(!HttpRequestParseRelativeUri("bad\2_namespace", Invalid)); - CHECK(!HttpRequestParseRelativeUri("nice/\2\1bucket", Invalid)); - CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789a", Invalid)); - CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/56789abcdef1234", Invalid)); - CHECK(!HttpRequestParseRelativeUri("namespace/bucket/pppppppp89abcdef12340123456789abcdef1234", Invalid)); - CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/56789abcd", Invalid)); - CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/ppppppppdef12345678956789abcdef123456789", - Invalid)); -} - -#endif - -void -z$service_forcelink() -{ -} - -} // namespace zen |