aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/cache/structuredcache.cpp')
-rw-r--r--src/zenserver/cache/structuredcache.cpp3209
1 files changed, 0 insertions, 3209 deletions
diff --git a/src/zenserver/cache/structuredcache.cpp b/src/zenserver/cache/structuredcache.cpp
deleted file mode 100644
index 566df73c1..000000000
--- a/src/zenserver/cache/structuredcache.cpp
+++ /dev/null
@@ -1,3209 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "structuredcache.h"
-
-#include <zencore/compactbinary.h>
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinarypackage.h>
-#include <zencore/compactbinaryvalidation.h>
-#include <zencore/compress.h>
-#include <zencore/enumflags.h>
-#include <zencore/fmtutils.h>
-#include <zencore/logging.h>
-#include <zencore/scopeguard.h>
-#include <zencore/stream.h>
-#include <zencore/timer.h>
-#include <zencore/trace.h>
-#include <zencore/workthreadpool.h>
-#include <zenhttp/httpserver.h>
-#include <zenhttp/httpshared.h>
-#include <zenhttp/httpstats.h>
-#include <zenstore/gc.h>
-#include <zenutil/cache/cache.h>
-#include <zenutil/cache/rpcrecording.h>
-
-#include "structuredcachestore.h"
-#include "upstream/jupiter.h"
-#include "upstream/upstreamcache.h"
-#include "upstream/zen.h"
-#include "zenstore/cidstore.h"
-#include "zenstore/scrubcontext.h"
-
-#include <algorithm>
-#include <atomic>
-#include <filesystem>
-#include <queue>
-#include <thread>
-
-#include <cpr/cpr.h>
-#include <gsl/gsl-lite.hpp>
-
-#if ZEN_WITH_TESTS
-# include <zencore/testing.h>
-# include <zencore/testutils.h>
-#endif
-
-namespace zen {
-
-using namespace std::literals;
-
-//////////////////////////////////////////////////////////////////////////
-
-CachePolicy
-ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams)
-{
- std::string_view PolicyText = QueryParams.GetValue("Policy"sv);
- return !PolicyText.empty() ? zen::ParseCachePolicy(PolicyText) : CachePolicy::Default;
-}
-
-CacheRecordPolicy
-LoadCacheRecordPolicy(CbObjectView Object, CachePolicy DefaultPolicy = CachePolicy::Default)
-{
- OptionalCacheRecordPolicy Policy = CacheRecordPolicy::Load(Object);
- return Policy ? std::move(Policy).Get() : CacheRecordPolicy(DefaultPolicy);
-}
-
-struct AttachmentCount
-{
- uint32_t New = 0;
- uint32_t Valid = 0;
- uint32_t Invalid = 0;
- uint32_t Total = 0;
-};
-
-struct PutRequestData
-{
- std::string Namespace;
- CacheKey Key;
- CbObjectView RecordObject;
- CacheRecordPolicy Policy;
-};
-
-namespace {
- static constinit std::string_view HttpZCacheRPCPrefix = "$rpc"sv;
- static constinit std::string_view HttpZCacheUtilStartRecording = "exec$/start-recording"sv;
- static constinit std::string_view HttpZCacheUtilStopRecording = "exec$/stop-recording"sv;
- static constinit std::string_view HttpZCacheUtilReplayRecording = "exec$/replay-recording"sv;
- static constinit std::string_view HttpZCacheDetailsPrefix = "details$"sv;
-
- struct HttpRequestData
- {
- std::optional<std::string> Namespace;
- std::optional<std::string> Bucket;
- std::optional<IoHash> HashKey;
- std::optional<IoHash> ValueContentId;
- };
-
- constinit AsciiSet ValidNamespaceNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789-_.ABCDEFGHIJKLMNOPQRSTUVWXYZ"};
- constinit AsciiSet ValidBucketNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"};
-
- std::optional<std::string> GetValidNamespaceName(std::string_view Name)
- {
- if (Name.empty())
- {
- ZEN_WARN("Namespace is invalid, empty namespace is not allowed");
- return {};
- }
-
- if (Name.length() > 64)
- {
- ZEN_WARN("Namespace '{}' is invalid, length exceeds 64 characters", Name);
- return {};
- }
-
- if (!AsciiSet::HasOnly(Name, ValidNamespaceNameCharactersSet))
- {
- ZEN_WARN("Namespace '{}' is invalid, invalid characters detected", Name);
- return {};
- }
-
- return ToLower(Name);
- }
-
- std::optional<std::string> GetValidBucketName(std::string_view Name)
- {
- if (Name.empty())
- {
- ZEN_WARN("Bucket name is invalid, empty bucket name is not allowed");
- return {};
- }
-
- if (!AsciiSet::HasOnly(Name, ValidBucketNameCharactersSet))
- {
- ZEN_WARN("Bucket name '{}' is invalid, invalid characters detected", Name);
- return {};
- }
-
- return ToLower(Name);
- }
-
- std::optional<IoHash> GetValidIoHash(std::string_view Hash)
- {
- if (Hash.length() != IoHash::StringLength)
- {
- return {};
- }
-
- IoHash KeyHash;
- if (!ParseHexBytes(Hash.data(), Hash.size(), KeyHash.Hash))
- {
- return {};
- }
- return KeyHash;
- }
-
- bool HttpRequestParseRelativeUri(std::string_view Key, HttpRequestData& Data)
- {
- std::vector<std::string_view> Tokens;
- uint32_t TokenCount = ForEachStrTok(Key, '/', [&](const std::string_view& Token) {
- Tokens.push_back(Token);
- return true;
- });
-
- switch (TokenCount)
- {
- case 0:
- return true;
- case 1:
- Data.Namespace = GetValidNamespaceName(Tokens[0]);
- return Data.Namespace.has_value();
- case 2:
- {
- std::optional<IoHash> PossibleHashKey = GetValidIoHash(Tokens[1]);
- if (PossibleHashKey.has_value())
- {
- // Legacy bucket/key request
- Data.Bucket = GetValidBucketName(Tokens[0]);
- if (!Data.Bucket.has_value())
- {
- return false;
- }
- Data.HashKey = PossibleHashKey;
- Data.Namespace = ZenCacheStore::DefaultNamespace;
- return true;
- }
- Data.Namespace = GetValidNamespaceName(Tokens[0]);
- if (!Data.Namespace.has_value())
- {
- return false;
- }
- Data.Bucket = GetValidBucketName(Tokens[1]);
- if (!Data.Bucket.has_value())
- {
- return false;
- }
- return true;
- }
- case 3:
- {
- std::optional<IoHash> PossibleHashKey = GetValidIoHash(Tokens[1]);
- if (PossibleHashKey.has_value())
- {
- // Legacy bucket/key/valueid request
- Data.Bucket = GetValidBucketName(Tokens[0]);
- if (!Data.Bucket.has_value())
- {
- return false;
- }
- Data.HashKey = PossibleHashKey;
- Data.ValueContentId = GetValidIoHash(Tokens[2]);
- if (!Data.ValueContentId.has_value())
- {
- return false;
- }
- Data.Namespace = ZenCacheStore::DefaultNamespace;
- return true;
- }
- Data.Namespace = GetValidNamespaceName(Tokens[0]);
- if (!Data.Namespace.has_value())
- {
- return false;
- }
- Data.Bucket = GetValidBucketName(Tokens[1]);
- if (!Data.Bucket.has_value())
- {
- return false;
- }
- Data.HashKey = GetValidIoHash(Tokens[2]);
- if (!Data.HashKey)
- {
- return false;
- }
- return true;
- }
- case 4:
- {
- Data.Namespace = GetValidNamespaceName(Tokens[0]);
- if (!Data.Namespace.has_value())
- {
- return false;
- }
-
- Data.Bucket = GetValidBucketName(Tokens[1]);
- if (!Data.Bucket.has_value())
- {
- return false;
- }
-
- Data.HashKey = GetValidIoHash(Tokens[2]);
- if (!Data.HashKey.has_value())
- {
- return false;
- }
-
- Data.ValueContentId = GetValidIoHash(Tokens[3]);
- if (!Data.ValueContentId.has_value())
- {
- return false;
- }
- return true;
- }
- default:
- return false;
- }
- }
-
- std::optional<std::string> GetRpcRequestNamespace(const CbObjectView Params)
- {
- CbFieldView NamespaceField = Params["Namespace"sv];
- if (!NamespaceField)
- {
- return std::string(ZenCacheStore::DefaultNamespace);
- }
-
- if (NamespaceField.HasError())
- {
- return {};
- }
- if (!NamespaceField.IsString())
- {
- return {};
- }
- return GetValidNamespaceName(NamespaceField.AsString());
- }
-
- bool GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key)
- {
- CbFieldView BucketField = KeyView["Bucket"sv];
- if (BucketField.HasError())
- {
- return false;
- }
- if (!BucketField.IsString())
- {
- return false;
- }
- std::optional<std::string> Bucket = GetValidBucketName(BucketField.AsString());
- if (!Bucket.has_value())
- {
- return false;
- }
- CbFieldView HashField = KeyView["Hash"sv];
- if (HashField.HasError())
- {
- return false;
- }
- if (!HashField.IsHash())
- {
- return false;
- }
- IoHash Hash = HashField.AsHash();
- Key = CacheKey::Create(*Bucket, Hash);
- return true;
- }
-
-} // namespace
-
-//////////////////////////////////////////////////////////////////////////
-
-HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
- CidStore& InCidStore,
- HttpStatsService& StatsService,
- HttpStatusService& StatusService,
- UpstreamCache& UpstreamCache,
- const DiskWriteBlocker* InDiskWriteBlocker)
-: m_Log(logging::Get("cache"))
-, m_CacheStore(InCacheStore)
-, m_StatsService(StatsService)
-, m_StatusService(StatusService)
-, m_CidStore(InCidStore)
-, m_UpstreamCache(UpstreamCache)
-, m_DiskWriteBlocker(InDiskWriteBlocker)
-{
- m_StatsService.RegisterHandler("z$", *this);
- m_StatusService.RegisterHandler("z$", *this);
-}
-
-HttpStructuredCacheService::~HttpStructuredCacheService()
-{
- ZEN_INFO("closing structured cache");
- m_RequestRecorder.reset();
-
- m_StatsService.UnregisterHandler("z$", *this);
- m_StatusService.UnregisterHandler("z$", *this);
-}
-
-const char*
-HttpStructuredCacheService::BaseUri() const
-{
- return "/z$/";
-}
-
-void
-HttpStructuredCacheService::Flush()
-{
- m_CacheStore.Flush();
-}
-
-void
-HttpStructuredCacheService::ScrubStorage(ScrubContext& Ctx)
-{
- if (m_LastScrubTime == Ctx.ScrubTimestamp())
- {
- return;
- }
-
- ZenCacheStore::Info Info = m_CacheStore.GetInfo();
-
- ZEN_INFO("scrubbing '{}'", Info.Config.BasePath);
-
- m_LastScrubTime = Ctx.ScrubTimestamp();
-
- m_CidStore.ScrubStorage(Ctx);
- m_CacheStore.ScrubStorage(Ctx);
-}
-
-void
-HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request)
-{
- std::string_view Key = Request.RelativeUri();
- std::vector<std::string> Tokens;
- uint32_t TokenCount = ForEachStrTok(Key, '/', [&Tokens](std::string_view Token) {
- Tokens.push_back(std::string(Token));
- return true;
- });
- std::string FilterNamespace;
- std::string FilterBucket;
- std::string FilterValue;
- switch (TokenCount)
- {
- case 1:
- break;
- case 2:
- {
- FilterNamespace = Tokens[1];
- if (FilterNamespace.empty())
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
- }
- }
- break;
- case 3:
- {
- FilterNamespace = Tokens[1];
- if (FilterNamespace.empty())
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
- }
- FilterBucket = Tokens[2];
- if (FilterBucket.empty())
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
- }
- }
- break;
- case 4:
- {
- FilterNamespace = Tokens[1];
- if (FilterNamespace.empty())
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
- }
- FilterBucket = Tokens[2];
- if (FilterBucket.empty())
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
- }
- FilterValue = Tokens[3];
- if (FilterValue.empty())
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
- }
- }
- break;
- default:
- return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
- }
-
- HttpServerRequest::QueryParams Params = Request.GetQueryParams();
- bool CSV = Params.GetValue("csv") == "true";
- bool Details = Params.GetValue("details") == "true";
- bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true";
-
- std::chrono::seconds NowSeconds = std::chrono::duration_cast<std::chrono::seconds>(GcClock::Now().time_since_epoch());
- CacheValueDetails ValueDetails = m_CacheStore.GetValueDetails(FilterNamespace, FilterBucket, FilterValue);
-
- if (CSV)
- {
- ExtendableStringBuilder<4096> CSVWriter;
- if (AttachmentDetails)
- {
- CSVWriter << "Namespace, Bucket, Key, Cid, Size";
- }
- else if (Details)
- {
- CSVWriter << "Namespace, Bucket, Key, Size, RawSize, RawHash, ContentType, Age, AttachmentsCount, AttachmentsSize";
- }
- else
- {
- CSVWriter << "Namespace, Bucket, Key";
- }
- for (const auto& NamespaceIt : ValueDetails.Namespaces)
- {
- const std::string& Namespace = NamespaceIt.first;
- for (const auto& BucketIt : NamespaceIt.second.Buckets)
- {
- const std::string& Bucket = BucketIt.first;
- for (const auto& ValueIt : BucketIt.second.Values)
- {
- if (AttachmentDetails)
- {
- for (const IoHash& Hash : ValueIt.second.Attachments)
- {
- IoBuffer Payload = m_CidStore.FindChunkByCid(Hash);
- CSVWriter << "\r\n"
- << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << Hash.ToHexString()
- << ", " << gsl::narrow<uint64_t>(Payload.GetSize());
- }
- }
- else if (Details)
- {
- std::chrono::seconds LastAccessedSeconds = std::chrono::duration_cast<std::chrono::seconds>(
- GcClock::TimePointFromTick(ValueIt.second.LastAccess).time_since_epoch());
- CSVWriter << "\r\n"
- << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << ValueIt.second.Size << ","
- << ValueIt.second.RawSize << "," << ValueIt.second.RawHash.ToHexString() << ", "
- << ToString(ValueIt.second.ContentType) << ", " << (NowSeconds.count() - LastAccessedSeconds.count())
- << ", " << gsl::narrow<uint64_t>(ValueIt.second.Attachments.size());
- size_t AttachmentsSize = 0;
- for (const IoHash& Hash : ValueIt.second.Attachments)
- {
- IoBuffer Payload = m_CidStore.FindChunkByCid(Hash);
- AttachmentsSize += Payload.GetSize();
- }
- CSVWriter << ", " << gsl::narrow<uint64_t>(AttachmentsSize);
- }
- else
- {
- CSVWriter << "\r\n" << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString();
- }
- }
- }
- }
- return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView());
- }
- else
- {
- CbObjectWriter Cbo;
- Cbo.BeginArray("namespaces");
- {
- for (const auto& NamespaceIt : ValueDetails.Namespaces)
- {
- const std::string& Namespace = NamespaceIt.first;
- Cbo.BeginObject();
- {
- Cbo.AddString("name", Namespace);
- Cbo.BeginArray("buckets");
- {
- for (const auto& BucketIt : NamespaceIt.second.Buckets)
- {
- const std::string& Bucket = BucketIt.first;
- Cbo.BeginObject();
- {
- Cbo.AddString("name", Bucket);
- Cbo.BeginArray("values");
- {
- for (const auto& ValueIt : BucketIt.second.Values)
- {
- std::chrono::seconds LastAccessedSeconds = std::chrono::duration_cast<std::chrono::seconds>(
- GcClock::TimePointFromTick(ValueIt.second.LastAccess).time_since_epoch());
- Cbo.BeginObject();
- {
- Cbo.AddHash("key", ValueIt.first);
- if (Details)
- {
- Cbo.AddInteger("size", ValueIt.second.Size);
- if (ValueIt.second.Size > 0 && ValueIt.second.RawSize != 0 &&
- ValueIt.second.RawSize != ValueIt.second.Size)
- {
- Cbo.AddInteger("rawsize", ValueIt.second.RawSize);
- Cbo.AddHash("rawhash", ValueIt.second.RawHash);
- }
- Cbo.AddString("contenttype", ToString(ValueIt.second.ContentType));
- Cbo.AddInteger("age", NowSeconds.count() - LastAccessedSeconds.count());
- if (ValueIt.second.Attachments.size() > 0)
- {
- if (AttachmentDetails)
- {
- Cbo.BeginArray("attachments");
- {
- for (const IoHash& Hash : ValueIt.second.Attachments)
- {
- Cbo.BeginObject();
- Cbo.AddHash("cid", Hash);
- IoBuffer Payload = m_CidStore.FindChunkByCid(Hash);
- Cbo.AddInteger("size", gsl::narrow<uint64_t>(Payload.GetSize()));
- Cbo.EndObject();
- }
- }
- Cbo.EndArray();
- }
- else
- {
- Cbo.AddInteger("attachmentcount",
- gsl::narrow<uint64_t>(ValueIt.second.Attachments.size()));
- size_t AttachmentsSize = 0;
- for (const IoHash& Hash : ValueIt.second.Attachments)
- {
- IoBuffer Payload = m_CidStore.FindChunkByCid(Hash);
- AttachmentsSize += Payload.GetSize();
- }
- Cbo.AddInteger("attachmentssize", gsl::narrow<uint64_t>(AttachmentsSize));
- }
- }
- }
- }
- Cbo.EndObject();
- }
- }
- Cbo.EndArray();
- }
- Cbo.EndObject();
- }
- }
- Cbo.EndArray();
- }
- Cbo.EndObject();
- }
- }
- Cbo.EndArray();
- Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
- }
-}
-
-void
-HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
-{
- metrics::OperationTiming::Scope $(m_HttpRequests);
-
- std::string_view Key = Request.RelativeUri();
- if (Key == HttpZCacheRPCPrefix)
- {
- return HandleRpcRequest(Request);
- }
-
- if (Key == HttpZCacheUtilStartRecording)
- {
- m_RequestRecorder.reset();
- HttpServerRequest::QueryParams Params = Request.GetQueryParams();
- std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path")));
- m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath);
- Request.WriteResponse(HttpResponseCode::OK);
- return;
- }
- if (Key == HttpZCacheUtilStopRecording)
- {
- m_RequestRecorder.reset();
- Request.WriteResponse(HttpResponseCode::OK);
- return;
- }
- if (Key == HttpZCacheUtilReplayRecording)
- {
- m_RequestRecorder.reset();
- HttpServerRequest::QueryParams Params = Request.GetQueryParams();
- std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path")));
- uint32_t ThreadCount = std::thread::hardware_concurrency();
- if (auto Param = Params.GetValue("thread_count"); Param.empty() == false)
- {
- if (auto Value = ParseInt<uint64_t>(Param))
- {
- ThreadCount = gsl::narrow<uint32_t>(Value.value());
- }
- }
- std::unique_ptr<cache::IRpcRequestReplayer> Replayer(cache::MakeDiskRequestReplayer(RecordPath, false));
- ReplayRequestRecorder(*Replayer, ThreadCount < 1 ? 1 : ThreadCount);
- Request.WriteResponse(HttpResponseCode::OK);
- return;
- }
- if (Key.starts_with(HttpZCacheDetailsPrefix))
- {
- HandleDetailsRequest(Request);
- return;
- }
-
- HttpRequestData RequestData;
- if (!HttpRequestParseRelativeUri(Key, RequestData))
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
- }
-
- if (RequestData.ValueContentId.has_value())
- {
- ZEN_ASSERT(RequestData.Namespace.has_value());
- ZEN_ASSERT(RequestData.Bucket.has_value());
- ZEN_ASSERT(RequestData.HashKey.has_value());
- CacheRef Ref = {.Namespace = RequestData.Namespace.value(),
- .BucketSegment = RequestData.Bucket.value(),
- .HashKey = RequestData.HashKey.value(),
- .ValueContentId = RequestData.ValueContentId.value()};
- return HandleCacheChunkRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams()));
- }
-
- if (RequestData.HashKey.has_value())
- {
- ZEN_ASSERT(RequestData.Namespace.has_value());
- ZEN_ASSERT(RequestData.Bucket.has_value());
- CacheRef Ref = {.Namespace = RequestData.Namespace.value(),
- .BucketSegment = RequestData.Bucket.value(),
- .HashKey = RequestData.HashKey.value(),
- .ValueContentId = IoHash::Zero};
- return HandleCacheRecordRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams()));
- }
-
- if (RequestData.Bucket.has_value())
- {
- ZEN_ASSERT(RequestData.Namespace.has_value());
- return HandleCacheBucketRequest(Request, RequestData.Namespace.value(), RequestData.Bucket.value());
- }
-
- if (RequestData.Namespace.has_value())
- {
- return HandleCacheNamespaceRequest(Request, RequestData.Namespace.value());
- }
- return HandleCacheRequest(Request);
-}
-
-void
-HttpStructuredCacheService::HandleCacheRequest(HttpServerRequest& Request)
-{
- switch (Request.RequestVerb())
- {
- case HttpVerb::kHead:
- case HttpVerb::kGet:
- {
- ZenCacheStore::Info Info = m_CacheStore.GetInfo();
-
- CbObjectWriter ResponseWriter;
-
- ResponseWriter.BeginObject("Configuration");
- {
- ExtendableStringBuilder<128> BasePathString;
- BasePathString << Info.Config.BasePath.u8string();
- ResponseWriter.AddString("BasePath"sv, BasePathString.ToView());
- ResponseWriter.AddBool("AllowAutomaticCreationOfNamespaces", Info.Config.AllowAutomaticCreationOfNamespaces);
- }
- ResponseWriter.EndObject();
-
- std::sort(begin(Info.NamespaceNames), end(Info.NamespaceNames), [](std::string_view L, std::string_view R) {
- return L.compare(R) < 0;
- });
- ResponseWriter.BeginArray("Namespaces");
- for (const std::string& NamespaceName : Info.NamespaceNames)
- {
- ResponseWriter.AddString(NamespaceName);
- }
- ResponseWriter.EndArray();
- ResponseWriter.BeginObject("StorageSize");
- {
- ResponseWriter.AddInteger("DiskSize", Info.StorageSize.DiskSize);
- ResponseWriter.AddInteger("MemorySize", Info.StorageSize.MemorySize);
- }
-
- ResponseWriter.EndObject();
-
- ResponseWriter.AddInteger("DiskEntryCount", Info.DiskEntryCount);
- ResponseWriter.AddInteger("MemoryEntryCount", Info.MemoryEntryCount);
-
- return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save());
- }
- break;
- }
-}
-
-void
-HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view NamespaceName)
-{
- switch (Request.RequestVerb())
- {
- case HttpVerb::kHead:
- case HttpVerb::kGet:
- {
- std::optional<ZenCacheNamespace::Info> Info = m_CacheStore.GetNamespaceInfo(NamespaceName);
- if (!Info.has_value())
- {
- return Request.WriteResponse(HttpResponseCode::NotFound);
- }
-
- CbObjectWriter ResponseWriter;
-
- ResponseWriter.BeginObject("Configuration");
- {
- ExtendableStringBuilder<128> BasePathString;
- BasePathString << Info->Config.RootDir.u8string();
- ResponseWriter.AddString("RootDir"sv, BasePathString.ToView());
- ResponseWriter.AddInteger("DiskLayerThreshold"sv, Info->Config.DiskLayerThreshold);
- }
- ResponseWriter.EndObject();
-
- std::sort(begin(Info->BucketNames), end(Info->BucketNames), [](std::string_view L, std::string_view R) {
- return L.compare(R) < 0;
- });
-
- ResponseWriter.BeginArray("Buckets"sv);
- for (const std::string& BucketName : Info->BucketNames)
- {
- ResponseWriter.AddString(BucketName);
- }
- ResponseWriter.EndArray();
-
- ResponseWriter.BeginObject("StorageSize"sv);
- {
- ResponseWriter.AddInteger("DiskSize"sv, Info->DiskLayerInfo.TotalSize);
- ResponseWriter.AddInteger("MemorySize"sv, Info->MemoryLayerInfo.TotalSize);
- }
- ResponseWriter.EndObject();
-
- ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount);
- ResponseWriter.AddInteger("MemoryEntryCount", Info->MemoryLayerInfo.EntryCount);
-
- return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save());
- }
- break;
-
- case HttpVerb::kDelete:
- // Drop namespace
- {
- if (m_CacheStore.DropNamespace(NamespaceName))
- {
- return Request.WriteResponse(HttpResponseCode::OK);
- }
- else
- {
- return Request.WriteResponse(HttpResponseCode::NotFound);
- }
- }
- break;
-
- default:
- break;
- }
-}
-
-void
-HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request,
- std::string_view NamespaceName,
- std::string_view BucketName)
-{
- switch (Request.RequestVerb())
- {
- case HttpVerb::kHead:
- case HttpVerb::kGet:
- {
- std::optional<ZenCacheNamespace::BucketInfo> Info = m_CacheStore.GetBucketInfo(NamespaceName, BucketName);
- if (!Info.has_value())
- {
- return Request.WriteResponse(HttpResponseCode::NotFound);
- }
-
- CbObjectWriter ResponseWriter;
-
- ResponseWriter.BeginObject("StorageSize");
- {
- ResponseWriter.AddInteger("DiskSize", Info->DiskLayerInfo.TotalSize);
- ResponseWriter.AddInteger("MemorySize", Info->MemoryLayerInfo.TotalSize);
- }
- ResponseWriter.EndObject();
-
- ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount);
- ResponseWriter.AddInteger("MemoryEntryCount", Info->MemoryLayerInfo.EntryCount);
-
- return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save());
- }
- break;
-
- case HttpVerb::kDelete:
- // Drop bucket
- {
- if (m_CacheStore.DropBucket(NamespaceName, BucketName))
- {
- return Request.WriteResponse(HttpResponseCode::OK);
- }
- else
- {
- return Request.WriteResponse(HttpResponseCode::NotFound);
- }
- }
- break;
-
- default:
- break;
- }
-}
-
-void
-HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
-{
- switch (Request.RequestVerb())
- {
- case HttpVerb::kHead:
- case HttpVerb::kGet:
- {
- HandleGetCacheRecord(Request, Ref, PolicyFromUrl);
- }
- break;
-
- case HttpVerb::kPut:
- HandlePutCacheRecord(Request, Ref, PolicyFromUrl);
- break;
- default:
- break;
- }
-}
-
-void
-HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
-{
- const ZenContentType AcceptType = Request.AcceptContentType();
- const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData);
- const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord);
-
- bool Success = false;
- ZenCacheValue ClientResultValue;
- if (!EnumHasAnyFlags(PolicyFromUrl, CachePolicy::Query))
- {
- return Request.WriteResponse(HttpResponseCode::OK);
- }
-
- Stopwatch Timer;
-
- if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal) &&
- m_CacheStore.Get(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue))
- {
- Success = true;
- ZenContentType ContentType = ClientResultValue.Value.GetContentType();
-
- if (AcceptType == ZenContentType::kCbPackage)
- {
- if (ContentType == ZenContentType::kCbObject)
- {
- CbPackage Package;
- uint32_t MissingCount = 0;
-
- CbObjectView CacheRecord(ClientResultValue.Value.Data());
- CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) {
- if (SkipData)
- {
- if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash()))
- {
- MissingCount++;
- }
- }
- else
- {
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
- {
- CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
- Package.AddAttachment(CbAttachment(Compressed, AttachmentHash.AsHash()));
- }
- else
- {
- MissingCount++;
- }
- }
- });
-
- Success = MissingCount == 0 || PartialRecord;
-
- if (Success)
- {
- Package.SetObject(LoadCompactBinaryObject(ClientResultValue.Value));
-
- BinaryWriter MemStream;
- Package.Save(MemStream);
-
- ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
- ClientResultValue.Value.SetContentType(HttpContentType::kCbPackage);
- }
- }
- else
- {
- Success = false;
- }
- }
- else if (AcceptType != ClientResultValue.Value.GetContentType() && AcceptType != ZenContentType::kUnknownContentType &&
- AcceptType != ZenContentType::kBinary)
- {
- Success = false;
- }
- }
-
- if (Success)
- {
- ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {} '{}' (LOCAL) in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(ClientResultValue.Value.Size()),
- ToString(ClientResultValue.Value.GetContentType()),
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
-
- m_CacheStats.HitCount++;
- if (SkipData && AcceptType != ZenContentType::kCbPackage && AcceptType != ZenContentType::kCbObject)
- {
- return Request.WriteResponse(HttpResponseCode::OK);
- }
- else
- {
- // kCbPackage handled SkipData when constructing the ClientResultValue, kcbObject ignores SkipData
- return Request.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value);
- }
- }
- else if (!EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryRemote))
- {
- ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(AcceptType),
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- m_CacheStats.MissCount++;
- return Request.WriteResponse(HttpResponseCode::NotFound);
- }
-
- // Issue upstream query asynchronously in order to keep requests flowing without
- // hogging I/O servicing threads with blocking work
-
- uint64_t LocalElapsedTimeUs = Timer.GetElapsedTimeUs();
-
- Request.WriteResponseAsync([this, AcceptType, PolicyFromUrl, Ref, LocalElapsedTimeUs](HttpServerRequest& AsyncRequest) {
- Stopwatch Timer;
- bool Success = false;
- const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord);
- const bool QueryLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
- const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
- const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData);
- ZenCacheValue ClientResultValue;
-
- metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming);
-
- if (GetUpstreamCacheSingleResult UpstreamResult =
- m_UpstreamCache.GetCacheRecord(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, AcceptType);
- UpstreamResult.Status.Success)
- {
- Success = true;
-
- ClientResultValue.Value = UpstreamResult.Value;
- ClientResultValue.Value.SetContentType(AcceptType);
-
- if (AcceptType == ZenContentType::kBinary || AcceptType == ZenContentType::kCbObject)
- {
- if (AcceptType == ZenContentType::kCbObject)
- {
- const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All);
- if (ValidationResult != CbValidateError::None)
- {
- Success = false;
- ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid compact binary object from upstream",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(AcceptType));
- }
-
- // We do not do anything to the returned object for SkipData, only package attachments are cut when skipping data
- }
-
- if (Success && StoreLocal)
- {
- m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue);
- }
- }
- else if (AcceptType == ZenContentType::kCbPackage)
- {
- CbPackage Package;
- if (Package.TryLoad(ClientResultValue.Value))
- {
- CbObject CacheRecord = Package.GetObject();
- AttachmentCount Count;
- size_t NumAttachments = Package.GetAttachments().size();
- std::vector<const CbAttachment*> AttachmentsToStoreLocally;
- AttachmentsToStoreLocally.reserve(NumAttachments);
-
- CacheRecord.IterateAttachments(
- [this, &Package, &Ref, &AttachmentsToStoreLocally, &Count, QueryLocal, StoreLocal, SkipData](CbFieldView HashView) {
- IoHash Hash = HashView.AsHash();
- if (const CbAttachment* Attachment = Package.FindAttachment(Hash))
- {
- if (Attachment->IsCompressedBinary())
- {
- if (StoreLocal)
- {
- AttachmentsToStoreLocally.emplace_back(Attachment);
- }
- Count.Valid++;
- }
- else
- {
- ZEN_WARN("Uncompressed value '{}' from upstream cache record '{}/{}'",
- Hash,
- Ref.BucketSegment,
- Ref.HashKey);
- Count.Invalid++;
- }
- }
- else if (QueryLocal)
- {
- if (SkipData)
- {
- if (m_CidStore.ContainsChunk(Hash))
- {
- Count.Valid++;
- }
- }
- else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash))
- {
- CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
- if (Compressed)
- {
- Package.AddAttachment(CbAttachment(Compressed, Hash));
- Count.Valid++;
- }
- else
- {
- ZEN_WARN("Uncompressed value '{}' stored in local cache '{}/{}'",
- Hash,
- Ref.BucketSegment,
- Ref.HashKey);
- Count.Invalid++;
- }
- }
- }
- Count.Total++;
- });
-
- if ((Count.Valid == Count.Total) || PartialRecord)
- {
- ZenCacheValue CacheValue;
- CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer();
- CacheValue.Value.SetContentType(ZenContentType::kCbObject);
-
- if (StoreLocal)
- {
- m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue);
- }
-
- for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
- {
- ZEN_ASSERT_SLOW(StoreLocal);
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult =
- m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
- if (InsertResult.New)
- {
- Count.New++;
- }
- }
-
- BinaryWriter MemStream;
- if (SkipData)
- {
- // Save a package containing only the object.
- CbPackage(Package.GetObject()).Save(MemStream);
- }
- else
- {
- Package.Save(MemStream);
- }
-
- ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
- ClientResultValue.Value.SetContentType(ZenContentType::kCbPackage);
- }
- else
- {
- Success = false;
- ZEN_WARN("Get - '{}/{}' '{}' FAILED, attachments missing in upstream package",
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(AcceptType));
- }
- }
- else
- {
- Success = false;
- ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid upstream package",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(AcceptType));
- }
- }
- }
-
- if (Success)
- {
- ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {} '{}' (UPSTREAM) in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(ClientResultValue.Value.Size()),
- ToString(ClientResultValue.Value.GetContentType()),
- NiceLatencyNs((LocalElapsedTimeUs + Timer.GetElapsedTimeUs()) * 1000));
-
- m_CacheStats.HitCount++;
- m_CacheStats.UpstreamHitCount++;
-
- if (SkipData && AcceptType == ZenContentType::kBinary)
- {
- AsyncRequest.WriteResponse(HttpResponseCode::OK);
- }
- else
- {
- // Other methods modify ClientResultValue to a version that has skipped the data but keeps the Object and optionally
- // metadata.
- AsyncRequest.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value);
- }
- }
- else
- {
- ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(AcceptType),
- NiceLatencyNs((LocalElapsedTimeUs + Timer.GetElapsedTimeUs()) * 1000));
- m_CacheStats.MissCount++;
- AsyncRequest.WriteResponse(HttpResponseCode::NotFound);
- }
- });
-}
-
-void
-HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
-{
- IoBuffer Body = Request.ReadPayload();
-
- if (!Body || Body.Size() == 0)
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest);
- }
- if (!AreDiskWritesAllowed())
- {
- return Request.WriteResponse(HttpResponseCode::InsufficientStorage);
- }
-
- const HttpContentType ContentType = Request.RequestContentType();
-
- Body.SetContentType(ContentType);
-
- Stopwatch Timer;
-
- if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kCompressedBinary)
- {
- IoHash RawHash = IoHash::Zero;
- uint64_t RawSize = Body.GetSize();
- if (ContentType == HttpContentType::kCompressedBinary)
- {
- if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize))
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- "Payload is not a valid compressed binary"sv);
- }
- }
- else
- {
- RawHash = IoHash::HashBuffer(SharedBuffer(Body));
- }
- m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body, .RawSize = RawSize, .RawHash = RawHash});
-
- if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote))
- {
- m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}});
- }
-
- ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(Body.Size()),
- ToString(ContentType),
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- Request.WriteResponse(HttpResponseCode::Created);
- }
- else if (ContentType == HttpContentType::kCbObject)
- {
- const CbValidateError ValidationResult = ValidateCompactBinary(MemoryView(Body.GetData(), Body.GetSize()), CbValidateMode::All);
-
- if (ValidationResult != CbValidateError::None)
- {
- ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, invalid compact binary",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(ContentType));
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv);
- }
-
- Body.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body});
-
- CbObjectView CacheRecord(Body.Data());
- std::vector<IoHash> ValidAttachments;
- int32_t TotalCount = 0;
-
- CacheRecord.IterateAttachments([this, &TotalCount, &ValidAttachments](CbFieldView AttachmentHash) {
- const IoHash Hash = AttachmentHash.AsHash();
- if (m_CidStore.ContainsChunk(Hash))
- {
- ValidAttachments.emplace_back(Hash);
- }
- TotalCount++;
- });
-
- ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total) in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(Body.Size()),
- ToString(ContentType),
- TotalCount,
- ValidAttachments.size(),
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
-
- const bool IsPartialRecord = TotalCount != static_cast<int32_t>(ValidAttachments.size());
-
- CachePolicy Policy = PolicyFromUrl;
- if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
- {
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject,
- .Namespace = Ref.Namespace,
- .Key = {Ref.BucketSegment, Ref.HashKey},
- .ValueContentIds = std::move(ValidAttachments)});
- }
-
- Request.WriteResponse(HttpResponseCode::Created);
- }
- else if (ContentType == HttpContentType::kCbPackage)
- {
- CbPackage Package;
-
- if (!Package.TryLoad(Body))
- {
- ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, invalid package",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(ContentType));
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv);
- }
- CachePolicy Policy = PolicyFromUrl;
-
- CbObject CacheRecord = Package.GetObject();
-
- AttachmentCount Count;
- size_t NumAttachments = Package.GetAttachments().size();
- std::vector<IoHash> ValidAttachments;
- std::vector<const CbAttachment*> AttachmentsToStoreLocally;
- ValidAttachments.reserve(NumAttachments);
- AttachmentsToStoreLocally.reserve(NumAttachments);
-
- CacheRecord.IterateAttachments([this, &Ref, &Package, &AttachmentsToStoreLocally, &ValidAttachments, &Count](CbFieldView HashView) {
- const IoHash Hash = HashView.AsHash();
- if (const CbAttachment* Attachment = Package.FindAttachment(Hash))
- {
- if (Attachment->IsCompressedBinary())
- {
- AttachmentsToStoreLocally.emplace_back(Attachment);
- ValidAttachments.emplace_back(Hash);
- Count.Valid++;
- }
- else
- {
- ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(HttpContentType::kCbPackage),
- Hash);
- Count.Invalid++;
- }
- }
- else if (m_CidStore.ContainsChunk(Hash))
- {
- ValidAttachments.emplace_back(Hash);
- Count.Valid++;
- }
- Count.Total++;
- });
-
- if (Count.Invalid > 0)
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv);
- }
-
- ZenCacheValue CacheValue;
- CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer();
- CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue);
-
- for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
- {
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
- if (InsertResult.New)
- {
- Count.New++;
- }
- }
-
- ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total) in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(Body.GetSize()),
- ToString(ContentType),
- Count.New,
- Count.Valid,
- Count.Total,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
-
- const bool IsPartialRecord = Count.Valid != Count.Total;
-
- if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
- {
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
- .Namespace = Ref.Namespace,
- .Key = {Ref.BucketSegment, Ref.HashKey},
- .ValueContentIds = std::move(ValidAttachments)});
- }
-
- Request.WriteResponse(HttpResponseCode::Created);
- }
- else
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Content-Type invalid"sv);
- }
-}
-
-void
-HttpStructuredCacheService::HandleCacheChunkRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
-{
- switch (Request.RequestVerb())
- {
- case HttpVerb::kHead:
- case HttpVerb::kGet:
- HandleGetCacheChunk(Request, Ref, PolicyFromUrl);
- break;
- case HttpVerb::kPut:
- HandlePutCacheChunk(Request, Ref, PolicyFromUrl);
- break;
- default:
- break;
- }
-}
-
-void
-HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
-{
- Stopwatch Timer;
-
- IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId);
- const UpstreamEndpointInfo* Source = nullptr;
- CachePolicy Policy = PolicyFromUrl;
- {
- const bool QueryUpstream = !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote);
-
- if (QueryUpstream)
- {
- if (GetUpstreamCacheSingleResult UpstreamResult =
- m_UpstreamCache.GetCacheChunk(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId);
- UpstreamResult.Status.Success)
- {
- IoHash RawHash;
- uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(UpstreamResult.Value, RawHash, RawSize))
- {
- if (RawHash == Ref.ValueContentId)
- {
- if (AreDiskWritesAllowed())
- {
- m_CidStore.AddChunk(UpstreamResult.Value, RawHash);
- }
- Source = UpstreamResult.Source;
- }
- else
- {
- ZEN_WARN("got missmatching upstream cache value");
- }
- }
- else
- {
- ZEN_WARN("got uncompressed upstream cache value");
- }
- }
- }
- }
-
- if (!Value)
- {
- ZEN_DEBUG("GETCACHECHUNK MISS - '{}/{}/{}/{}' '{}' in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- Ref.ValueContentId,
- ToString(Request.AcceptContentType()),
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- m_CacheStats.MissCount++;
- return Request.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ZEN_DEBUG("GETCACHECHUNK HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- Ref.ValueContentId,
- NiceBytes(Value.Size()),
- ToString(Value.GetContentType()),
- Source ? Source->Url : "LOCAL"sv,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
-
- m_CacheStats.HitCount++;
- if (Source)
- {
- m_CacheStats.UpstreamHitCount++;
- }
-
- if (EnumHasAllFlags(Policy, CachePolicy::SkipData))
- {
- Request.WriteResponse(HttpResponseCode::OK);
- }
- else
- {
- Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value);
- }
-}
-
-void
-HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
-{
- // Note: Individual cacherecord values are not propagated upstream until a valid cache record has been stored
- ZEN_UNUSED(PolicyFromUrl);
-
- Stopwatch Timer;
-
- IoBuffer Body = Request.ReadPayload();
-
- if (!Body || Body.Size() == 0)
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest);
- }
- if (!AreDiskWritesAllowed())
- {
- return Request.WriteResponse(HttpResponseCode::InsufficientStorage);
- }
-
- Body.SetContentType(Request.RequestContentType());
-
- IoHash RawHash;
- uint64_t RawSize;
- if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize))
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv);
- }
-
- if (RawHash != Ref.ValueContentId)
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- "ValueContentId does not match attachment hash"sv);
- }
-
- CidStore::InsertResult Result = m_CidStore.AddChunk(Body, RawHash);
-
- ZEN_DEBUG("PUTCACHECHUNK - '{}/{}/{}/{}' {} '{}' ({}) in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- Ref.ValueContentId,
- NiceBytes(Body.Size()),
- ToString(Body.GetContentType()),
- Result.New ? "NEW" : "OLD",
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
-
- const HttpResponseCode ResponseCode = Result.New ? HttpResponseCode::Created : HttpResponseCode::OK;
-
- Request.WriteResponse(ResponseCode);
-}
-
-HttpResponseCode
-HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType,
- IoBuffer&& Body,
- uint32_t& OutAcceptMagic,
- RpcAcceptOptions& OutAcceptFlags,
- int& OutTargetProcessId,
- CbPackage& OutResultPackage)
-{
- CbPackage Package;
- CbObjectView Object;
- CbObject ObjectBuffer;
- if (ContentType == ZenContentType::kCbObject)
- {
- ObjectBuffer = LoadCompactBinaryObject(std::move(Body));
- Object = ObjectBuffer;
- }
- else
- {
- Package = ParsePackageMessage(Body);
- Object = Package.GetObject();
- }
- OutAcceptMagic = Object["Accept"sv].AsUInt32();
- OutAcceptFlags = static_cast<RpcAcceptOptions>(Object["AcceptFlags"sv].AsUInt16(0u));
- OutTargetProcessId = Object["Pid"sv].AsInt32(0);
-
- const std::string_view Method = Object["Method"sv].AsString();
-
- if (Method == "PutCacheRecords"sv)
- {
- if (!AreDiskWritesAllowed())
- {
- return HttpResponseCode::InsufficientStorage;
- }
- OutResultPackage = HandleRpcPutCacheRecords(Package);
- }
- else if (Method == "GetCacheRecords"sv)
- {
- OutResultPackage = HandleRpcGetCacheRecords(Object);
- }
- else if (Method == "PutCacheValues"sv)
- {
- if (!AreDiskWritesAllowed())
- {
- return HttpResponseCode::InsufficientStorage;
- }
- OutResultPackage = HandleRpcPutCacheValues(Package);
- }
- else if (Method == "GetCacheValues"sv)
- {
- OutResultPackage = HandleRpcGetCacheValues(Object);
- }
- else if (Method == "GetCacheChunks"sv)
- {
- OutResultPackage = HandleRpcGetCacheChunks(Object);
- }
- else
- {
- return HttpResponseCode::BadRequest;
- }
- return HttpResponseCode::OK;
-}
-
-void
-HttpStructuredCacheService::ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount)
-{
- WorkerThreadPool WorkerPool(ThreadCount);
- uint64_t RequestCount = Replayer.GetRequestCount();
- Stopwatch Timer;
- auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); });
- Latch JobLatch(RequestCount);
- ZEN_INFO("Replaying {} requests", RequestCount);
- for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex)
- {
- WorkerPool.ScheduleWork([this, &JobLatch, &Replayer, RequestIndex]() {
- IoBuffer Body;
- std::pair<ZenContentType, ZenContentType> ContentType = Replayer.GetRequest(RequestIndex, Body);
- if (Body)
- {
- uint32_t AcceptMagic = 0;
- RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
- int TargetPid = 0;
- CbPackage RpcResult;
- if (IsHttpSuccessCode(HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid, RpcResult)))
- {
- if (AcceptMagic == kCbPkgMagic)
- {
- FormatFlags Flags = FormatFlags::kDefault;
- if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
- {
- Flags |= FormatFlags::kAllowLocalReferences;
- if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
- {
- Flags |= FormatFlags::kDenyPartialLocalReferences;
- }
- }
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetPid);
- ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0);
- }
- else
- {
- BinaryWriter MemStream;
- RpcResult.Save(MemStream);
- IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize());
- ZEN_ASSERT(RpcResponseBuffer.Size() > 0);
- }
- }
- }
- JobLatch.CountDown();
- });
- }
- while (!JobLatch.Wait(10000))
- {
- ZEN_INFO("Replayed {} of {} requests, elapsed {}",
- RequestCount - JobLatch.Remaining(),
- RequestCount,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- }
-}
-
-void
-HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
-{
- switch (Request.RequestVerb())
- {
- case HttpVerb::kPost:
- {
- const HttpContentType ContentType = Request.RequestContentType();
- const HttpContentType AcceptType = Request.AcceptContentType();
-
- if ((ContentType != HttpContentType::kCbObject && ContentType != HttpContentType::kCbPackage) ||
- AcceptType != HttpContentType::kCbPackage)
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest);
- }
-
- Request.WriteResponseAsync(
- [this, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable {
- std::uint64_t RequestIndex =
- m_RequestRecorder ? m_RequestRecorder->RecordRequest(ContentType, AcceptType, Body) : ~0ull;
- uint32_t AcceptMagic = 0;
- RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
- int TargetProcessId = 0;
- CbPackage RpcResult;
-
- HttpResponseCode ResultCode =
- HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags, TargetProcessId, RpcResult);
- if (!IsHttpSuccessCode(ResultCode))
- {
- AsyncRequest.WriteResponse(ResultCode);
- return;
- }
- if (AcceptMagic == kCbPkgMagic)
- {
- FormatFlags Flags = FormatFlags::kDefault;
- if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
- {
- Flags |= FormatFlags::kAllowLocalReferences;
- if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
- {
- Flags |= FormatFlags::kDenyPartialLocalReferences;
- }
- }
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessId);
- if (RequestIndex != ~0ull)
- {
- ZEN_ASSERT(m_RequestRecorder);
- m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- else
- {
- BinaryWriter MemStream;
- RpcResult.Save(MemStream);
-
- if (RequestIndex != ~0ull)
- {
- ZEN_ASSERT(m_RequestRecorder);
- m_RequestRecorder->RecordResponse(RequestIndex,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
- }
- AsyncRequest.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
- }
- });
- }
- break;
- default:
- Request.WriteResponse(HttpResponseCode::BadRequest);
- break;
- }
-}
-
-CbPackage
-HttpStructuredCacheService::HandleRpcPutCacheRecords(const CbPackage& BatchRequest)
-{
- ZEN_TRACE_CPU("Z$::RpcPutCacheRecords");
-
- CbObjectView BatchObject = BatchRequest.GetObject();
- ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv);
-
- CbObjectView Params = BatchObject["Params"sv].AsObjectView();
- CachePolicy DefaultPolicy;
-
- std::string_view PolicyText = Params["DefaultPolicy"].AsString();
- std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
- if (!Namespace)
- {
- return CbPackage{};
- }
- DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
- std::vector<bool> Results;
- for (CbFieldView RequestField : Params["Requests"sv])
- {
- CbObjectView RequestObject = RequestField.AsObjectView();
- CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView();
- CbObjectView KeyView = RecordObject["Key"sv].AsObjectView();
-
- CacheKey Key;
- if (!GetRpcRequestCacheKey(KeyView, Key))
- {
- return CbPackage{};
- }
- CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
- PutRequestData PutRequest{*Namespace, std::move(Key), RecordObject, std::move(Policy)};
-
- PutResult Result = PutCacheRecord(PutRequest, &BatchRequest);
-
- if (Result == PutResult::Invalid)
- {
- return CbPackage{};
- }
- Results.push_back(Result == PutResult::Success);
- }
- if (Results.empty())
- {
- return CbPackage{};
- }
-
- CbObjectWriter ResponseObject;
- ResponseObject.BeginArray("Result"sv);
- for (bool Value : Results)
- {
- ResponseObject.AddBool(Value);
- }
- ResponseObject.EndArray();
-
- CbPackage RpcResponse;
- RpcResponse.SetObject(ResponseObject.Save());
- return RpcResponse;
-}
-
-HttpStructuredCacheService::PutResult
-HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPackage* Package)
-{
- CbObjectView Record = Request.RecordObject;
- uint64_t RecordObjectSize = Record.GetSize();
- uint64_t TransferredSize = RecordObjectSize;
-
- AttachmentCount Count;
- size_t NumAttachments = Package->GetAttachments().size();
- std::vector<IoHash> ValidAttachments;
- std::vector<const CbAttachment*> AttachmentsToStoreLocally;
- ValidAttachments.reserve(NumAttachments);
- AttachmentsToStoreLocally.reserve(NumAttachments);
-
- Stopwatch Timer;
-
- Request.RecordObject.IterateAttachments(
- [this, &Request, Package, &AttachmentsToStoreLocally, &ValidAttachments, &Count, &TransferredSize](CbFieldView HashView) {
- const IoHash ValueHash = HashView.AsHash();
- if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr)
- {
- if (Attachment->IsCompressedBinary())
- {
- AttachmentsToStoreLocally.emplace_back(Attachment);
- ValidAttachments.emplace_back(ValueHash);
- Count.Valid++;
- }
- else
- {
- ZEN_WARN("PUTCACEHRECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
- Request.Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- ToString(HttpContentType::kCbPackage),
- ValueHash);
- Count.Invalid++;
- }
- }
- else if (m_CidStore.ContainsChunk(ValueHash))
- {
- ValidAttachments.emplace_back(ValueHash);
- Count.Valid++;
- }
- Count.Total++;
- });
-
- if (Count.Invalid > 0)
- {
- return PutResult::Invalid;
- }
-
- ZenCacheValue CacheValue;
- CacheValue.Value = IoBuffer(Record.GetSize());
- Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
- CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue);
-
- for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
- {
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
- if (InsertResult.New)
- {
- Count.New++;
- }
- TransferredSize += Chunk.GetCompressedSize();
- }
-
- ZEN_DEBUG("PUTCACEHRECORD - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total) in {}",
- Request.Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- NiceBytes(TransferredSize),
- Count.New,
- Count.Valid,
- Count.Total,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
-
- const bool IsPartialRecord = Count.Valid != Count.Total;
-
- if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord)
- {
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
- .Namespace = Request.Namespace,
- .Key = Request.Key,
- .ValueContentIds = std::move(ValidAttachments)});
- }
- return PutResult::Success;
-}
-
-CbPackage
-HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest)
-{
- ZEN_TRACE_CPU("Z$::RpcGetCacheRecords");
-
- ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv);
-
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
-
- struct ValueRequestData
- {
- Oid ValueId;
- IoHash ContentId;
- CompressedBuffer Payload;
- CachePolicy DownstreamPolicy;
- bool Exists = false;
- bool ReadFromUpstream = false;
- };
- struct RecordRequestData
- {
- CacheKeyRequest Upstream;
- CbObjectView RecordObject;
- IoBuffer RecordCacheValue;
- CacheRecordPolicy DownstreamPolicy;
- std::vector<ValueRequestData> Values;
- bool Complete = false;
- const UpstreamEndpointInfo* Source = nullptr;
- uint64_t ElapsedTimeUs;
- };
-
- std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
- CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
- std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
- if (!Namespace)
- {
- return CbPackage{};
- }
- std::vector<RecordRequestData> Requests;
- std::vector<size_t> UpstreamIndexes;
- CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- Requests.reserve(RequestsArray.Num());
-
- auto ParseValues = [](RecordRequestData& Request) {
- CbArrayView ValuesArray = Request.RecordObject["Values"sv].AsArrayView();
- Request.Values.reserve(ValuesArray.Num());
- for (CbFieldView ValueField : ValuesArray)
- {
- CbObjectView ValueObject = ValueField.AsObjectView();
- Oid ValueId = ValueObject["Id"sv].AsObjectId();
- CbFieldView RawHashField = ValueObject["RawHash"sv];
- IoHash RawHash = RawHashField.AsBinaryAttachment();
- if (ValueId && !RawHashField.HasError())
- {
- Request.Values.push_back({ValueId, RawHash});
- Request.Values.back().DownstreamPolicy = Request.DownstreamPolicy.GetValuePolicy(ValueId);
- }
- }
- };
-
- for (CbFieldView RequestField : RequestsArray)
- {
- Stopwatch Timer;
- RecordRequestData& Request = Requests.emplace_back();
- CbObjectView RequestObject = RequestField.AsObjectView();
- CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
-
- CacheKey& Key = Request.Upstream.Key;
- if (!GetRpcRequestCacheKey(KeyObject, Key))
- {
- return CbPackage{};
- }
-
- Request.DownstreamPolicy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
- const CacheRecordPolicy& Policy = Request.DownstreamPolicy;
-
- ZenCacheValue CacheValue;
- bool NeedUpstreamAttachment = false;
- bool FoundLocalInvalid = false;
- ZenCacheValue RecordCacheValue;
-
- if (EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryLocal) &&
- m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, RecordCacheValue))
- {
- Request.RecordCacheValue = std::move(RecordCacheValue.Value);
- if (Request.RecordCacheValue.GetContentType() != ZenContentType::kCbObject)
- {
- FoundLocalInvalid = true;
- }
- else
- {
- Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData());
- ParseValues(Request);
-
- Request.Complete = true;
- for (ValueRequestData& Value : Request.Values)
- {
- CachePolicy ValuePolicy = Value.DownstreamPolicy;
- if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal))
- {
- // A value that is requested without the Query flag (such as None/Disable) counts as existing, because we
- // didn't ask for it and thus the record is complete in its absence.
- if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
- {
- Value.Exists = true;
- }
- else
- {
- NeedUpstreamAttachment = true;
- Value.ReadFromUpstream = true;
- Request.Complete = false;
- }
- }
- else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
- {
- if (m_CidStore.ContainsChunk(Value.ContentId))
- {
- Value.Exists = true;
- }
- else
- {
- if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
- {
- NeedUpstreamAttachment = true;
- Value.ReadFromUpstream = true;
- }
- Request.Complete = false;
- }
- }
- else
- {
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId))
- {
- ZEN_ASSERT(Chunk.GetSize() > 0);
- Value.Payload = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
- Value.Exists = true;
- }
- else
- {
- if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
- {
- NeedUpstreamAttachment = true;
- Value.ReadFromUpstream = true;
- }
- Request.Complete = false;
- }
- }
- }
- }
- }
- if (!Request.Complete)
- {
- bool NeedUpstreamRecord =
- !Request.RecordObject && !FoundLocalInvalid && EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryRemote);
- if (NeedUpstreamRecord || NeedUpstreamAttachment)
- {
- UpstreamIndexes.push_back(Requests.size() - 1);
- }
- }
- Request.ElapsedTimeUs = Timer.GetElapsedTimeUs();
- }
- if (Requests.empty())
- {
- return CbPackage{};
- }
-
- if (!UpstreamIndexes.empty())
- {
- std::vector<CacheKeyRequest*> UpstreamRequests;
- UpstreamRequests.reserve(UpstreamIndexes.size());
- for (size_t Index : UpstreamIndexes)
- {
- RecordRequestData& Request = Requests[Index];
- UpstreamRequests.push_back(&Request.Upstream);
-
- if (Request.Values.size())
- {
- // We will be returning the local object and know all the value Ids that exist in it
- // Convert all their Downstream Values to upstream values, and add SkipData to any ones that we already have.
- CachePolicy UpstreamBasePolicy = ConvertToUpstream(Request.DownstreamPolicy.GetBasePolicy()) | CachePolicy::SkipMeta;
- CacheRecordPolicyBuilder Builder(UpstreamBasePolicy);
- for (ValueRequestData& Value : Request.Values)
- {
- CachePolicy UpstreamPolicy = ConvertToUpstream(Value.DownstreamPolicy);
- UpstreamPolicy |= !Value.ReadFromUpstream ? CachePolicy::SkipData : CachePolicy::None;
- Builder.AddValuePolicy(Value.ValueId, UpstreamPolicy);
- }
- Request.Upstream.Policy = Builder.Build();
- }
- else
- {
- // We don't know which Values exist in the Record; ask the upstrem for all values that the client wants,
- // and convert the CacheRecordPolicy to an upstream policy
- Request.Upstream.Policy = Request.DownstreamPolicy.ConvertToUpstream();
- }
- }
-
- const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues](CacheRecordGetCompleteParams&& Params) {
- if (!Params.Record)
- {
- return;
- }
-
- RecordRequestData& Request =
- *reinterpret_cast<RecordRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(RecordRequestData, Upstream));
- Request.ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
- const CacheKey& Key = Request.Upstream.Key;
- Stopwatch Timer;
- auto TimeGuard = MakeGuard([&Timer, &Request]() { Request.ElapsedTimeUs += Timer.GetElapsedTimeUs(); });
- if (!Request.RecordObject)
- {
- CbObject ObjectBuffer = CbObject::Clone(Params.Record);
- Request.RecordCacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
- Request.RecordCacheValue.SetContentType(ZenContentType::kCbObject);
- Request.RecordObject = ObjectBuffer;
- bool StoreLocal =
- EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed();
- if (StoreLocal)
- {
- m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}});
- }
- ParseValues(Request);
- Request.Source = Params.Source;
- }
-
- Request.Complete = true;
- for (ValueRequestData& Value : Request.Values)
- {
- if (Value.Exists)
- {
- continue;
- }
- CachePolicy ValuePolicy = Value.DownstreamPolicy;
- if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
- {
- Request.Complete = false;
- continue;
- }
- if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData) || EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal))
- {
- bool StoreLocal = EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
- if (const CbAttachment* Attachment = Params.Package.FindAttachment(Value.ContentId))
- {
- if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
- {
- Request.Source = Params.Source;
- Value.Exists = true;
- if (StoreLocal)
- {
- m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
- }
- if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
- {
- Value.Payload = Compressed;
- }
- }
- else
- {
- ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}/{}'",
- Value.ContentId,
- *Namespace,
- Key.Bucket,
- Key.Hash);
- }
- }
- if (!Value.Exists && !EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
- {
- Request.Complete = false;
- }
- // Request.Complete does not need to be set to false for upstream SkipData attachments.
- // In the PartialRecord==false case, the upstream will have failed the entire record if any SkipData attachment
- // didn't exist and we will not get here. In the PartialRecord==true case, we do not need to inform the client of
- // any missing SkipData attachments.
- }
- Request.ElapsedTimeUs += Timer.GetElapsedTimeUs();
- }
- };
-
- m_UpstreamCache.GetCacheRecords(*Namespace, UpstreamRequests, std::move(OnCacheRecordGetComplete));
- }
-
- CbPackage ResponsePackage;
- CbObjectWriter ResponseObject;
-
- ResponseObject.BeginArray("Result"sv);
- for (RecordRequestData& Request : Requests)
- {
- const CacheKey& Key = Request.Upstream.Key;
- if (Request.Complete ||
- (Request.RecordObject && EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::PartialRecord)))
- {
- ResponseObject << Request.RecordObject;
- for (ValueRequestData& Value : Request.Values)
- {
- if (!EnumHasAllFlags(Value.DownstreamPolicy, CachePolicy::SkipData) && Value.Payload)
- {
- ResponsePackage.AddAttachment(CbAttachment(Value.Payload, Value.ContentId));
- }
- }
-
- ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {}{} ({}) in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- NiceBytes(Request.RecordCacheValue.Size()),
- Request.Complete ? ""sv : " (PARTIAL)"sv,
- Request.Source ? Request.Source->Url : "LOCAL"sv,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
- m_CacheStats.HitCount++;
- m_CacheStats.UpstreamHitCount += Request.Source ? 1 : 0;
- }
- else
- {
- ResponseObject.AddNull();
-
- if (!EnumHasAnyFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::Query))
- {
- // If they requested no query, do not record this as a miss
- ZEN_DEBUG("GETCACHERECORD DISABLEDQUERY - '{}/{}/{}' in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
- }
- else
- {
- ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}'{} ({}) in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- Request.RecordObject ? ""sv : " (PARTIAL)"sv,
- Request.Source ? Request.Source->Url : "LOCAL"sv,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
- m_CacheStats.MissCount++;
- }
- }
- }
- ResponseObject.EndArray();
- ResponsePackage.SetObject(ResponseObject.Save());
- return ResponsePackage;
-}
-
-CbPackage
-HttpStructuredCacheService::HandleRpcPutCacheValues(const CbPackage& BatchRequest)
-{
- CbObjectView BatchObject = BatchRequest.GetObject();
- CbObjectView Params = BatchObject["Params"sv].AsObjectView();
-
- std::string_view PolicyText = Params["DefaultPolicy"].AsString();
- CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
- std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
- if (!Namespace)
- {
- return CbPackage{};
- }
- std::vector<bool> Results;
- for (CbFieldView RequestField : Params["Requests"sv])
- {
- Stopwatch Timer;
-
- CbObjectView RequestObject = RequestField.AsObjectView();
- CbObjectView KeyView = RequestObject["Key"sv].AsObjectView();
-
- CacheKey Key;
- if (!GetRpcRequestCacheKey(KeyView, Key))
- {
- return CbPackage{};
- }
-
- PolicyText = RequestObject["Policy"sv].AsString();
- CachePolicy Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
- IoHash RawHash = RequestObject["RawHash"sv].AsBinaryAttachment();
- uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64();
- bool Succeeded = false;
- uint64_t TransferredSize = 0;
-
- if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash))
- {
- if (Attachment->IsCompressedBinary())
- {
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
- {
- // TODO: Implement upstream puts of CacheValues with StoreLocal == false.
- // Currently ProcessCacheRecord requires that the value exist in the local cache to put it upstream.
- Policy |= CachePolicy::StoreLocal;
- }
-
- if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
- {
- IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
- Value.SetContentType(ZenContentType::kCompressedBinary);
- if (RawSize == 0)
- {
- RawSize = Chunk.DecodeRawSize();
- }
- m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = Value, .RawSize = RawSize, .RawHash = RawHash});
- TransferredSize = Chunk.GetCompressedSize();
- }
- Succeeded = true;
- }
- else
- {
- ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash);
- return CbPackage{};
- }
- }
- else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
- {
- ZenCacheValue ExistingValue;
- if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, ExistingValue) &&
- IsCompressedBinary(ExistingValue.Value.GetContentType()))
- {
- Succeeded = true;
- }
- }
- // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put.
- // If it doesn't exist locally they should do the heavy put rather than having us fetch it from upstream.
-
- if (Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
- {
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = Key});
- }
- Results.push_back(Succeeded);
- ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}' in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- NiceBytes(TransferredSize),
- Succeeded ? "Added"sv : "Invalid",
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- }
- if (Results.empty())
- {
- return CbPackage{};
- }
-
- CbObjectWriter ResponseObject;
- ResponseObject.BeginArray("Result"sv);
- for (bool Value : Results)
- {
- ResponseObject.AddBool(Value);
- }
- ResponseObject.EndArray();
-
- CbPackage RpcResponse;
- RpcResponse.SetObject(ResponseObject.Save());
-
- return RpcResponse;
-}
-
-CbPackage
-HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest)
-{
- ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv);
-
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
- std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
- CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
- std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
- if (!Namespace)
- {
- return CbPackage{};
- }
-
- struct RequestData
- {
- CacheKey Key;
- CachePolicy Policy;
- IoHash RawHash = IoHash::Zero;
- uint64_t RawSize = 0;
- CompressedBuffer Result;
- };
- std::vector<RequestData> Requests;
-
- std::vector<size_t> RemoteRequestIndexes;
-
- for (CbFieldView RequestField : Params["Requests"sv])
- {
- Stopwatch Timer;
-
- RequestData& Request = Requests.emplace_back();
- CbObjectView RequestObject = RequestField.AsObjectView();
- CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
-
- if (!GetRpcRequestCacheKey(KeyObject, Request.Key))
- {
- return CbPackage{};
- }
-
- PolicyText = RequestObject["Policy"sv].AsString();
- Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
-
- CacheKey& Key = Request.Key;
- CachePolicy Policy = Request.Policy;
-
- ZenCacheValue CacheValue;
- if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
- {
- if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType()))
- {
- Request.RawHash = CacheValue.RawHash;
- Request.RawSize = CacheValue.RawSize;
- Request.Result = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value));
- }
- }
- if (Request.Result)
- {
- ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- NiceBytes(Request.Result.GetCompressed().GetSize()),
- "LOCAL"sv,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- m_CacheStats.HitCount++;
- }
- else if (EnumHasAllFlags(Policy, CachePolicy::QueryRemote))
- {
- RemoteRequestIndexes.push_back(Requests.size() - 1);
- }
- else if (!EnumHasAnyFlags(Policy, CachePolicy::Query))
- {
- // If they requested no query, do not record this as a miss
- ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash);
- }
- else
- {
- ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- "LOCAL"sv,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- m_CacheStats.MissCount++;
- }
- }
-
- if (!RemoteRequestIndexes.empty())
- {
- std::vector<CacheValueRequest> RequestedRecordsData;
- std::vector<CacheValueRequest*> CacheValueRequests;
- RequestedRecordsData.reserve(RemoteRequestIndexes.size());
- CacheValueRequests.reserve(RemoteRequestIndexes.size());
- for (size_t Index : RemoteRequestIndexes)
- {
- RequestData& Request = Requests[Index];
- RequestedRecordsData.push_back({.Key = {Request.Key.Bucket, Request.Key.Hash}, .Policy = ConvertToUpstream(Request.Policy)});
- CacheValueRequests.push_back(&RequestedRecordsData.back());
- }
- Stopwatch Timer;
- m_UpstreamCache.GetCacheValues(
- *Namespace,
- CacheValueRequests,
- [this, Namespace, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer](CacheValueGetCompleteParams&& Params) {
- CacheValueRequest& ChunkRequest = Params.Request;
- if (Params.RawHash != IoHash::Zero)
- {
- size_t RequestOffset = std::distance(RequestedRecordsData.data(), &ChunkRequest);
- size_t RequestIndex = RemoteRequestIndexes[RequestOffset];
- RequestData& Request = Requests[RequestIndex];
- Request.RawHash = Params.RawHash;
- Request.RawSize = Params.RawSize;
- const bool HasData = IsCompressedBinary(Params.Value.GetContentType());
- const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData);
- const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
- const bool IsHit = SkipData || HasData;
- if (IsHit)
- {
- if (HasData && !SkipData)
- {
- Request.Result = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Params.Value));
- }
-
- if (HasData && StoreData)
- {
- m_CacheStore.Put(*Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash});
- }
-
- ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
- *Namespace,
- ChunkRequest.Key.Bucket,
- ChunkRequest.Key.Hash,
- NiceBytes(Request.Result.GetCompressed().GetSize()),
- Params.Source ? Params.Source->Url : "UPSTREAM",
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- m_CacheStats.HitCount++;
- m_CacheStats.UpstreamHitCount++;
- return;
- }
- }
- ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}",
- *Namespace,
- ChunkRequest.Key.Bucket,
- ChunkRequest.Key.Hash,
- Params.Source ? Params.Source->Url : "UPSTREAM",
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- m_CacheStats.MissCount++;
- });
- }
-
- if (Requests.empty())
- {
- return CbPackage{};
- }
-
- CbPackage RpcResponse;
- CbObjectWriter ResponseObject;
- ResponseObject.BeginArray("Result"sv);
- for (const RequestData& Request : Requests)
- {
- ResponseObject.BeginObject();
- {
- const CompressedBuffer& Result = Request.Result;
- if (Result)
- {
- ResponseObject.AddHash("RawHash"sv, Request.RawHash);
- if (!EnumHasAllFlags(Request.Policy, CachePolicy::SkipData))
- {
- RpcResponse.AddAttachment(CbAttachment(Result, Request.RawHash));
- }
- else
- {
- ResponseObject.AddInteger("RawSize"sv, Request.RawSize);
- }
- }
- else if (Request.RawHash != IoHash::Zero)
- {
- ResponseObject.AddHash("RawHash"sv, Request.RawHash);
- ResponseObject.AddInteger("RawSize"sv, Request.RawSize);
- }
- }
- ResponseObject.EndObject();
- }
- ResponseObject.EndArray();
-
- RpcResponse.SetObject(ResponseObject.Save());
- return RpcResponse;
-}
-
-namespace cache::detail {
-
- struct RecordValue
- {
- Oid ValueId;
- IoHash ContentId;
- uint64_t RawSize;
- };
- struct RecordBody
- {
- IoBuffer CacheValue;
- std::vector<RecordValue> Values;
- const UpstreamEndpointInfo* Source = nullptr;
- CachePolicy DownstreamPolicy;
- bool Exists = false;
- bool HasRequest = false;
- bool ValuesRead = false;
- };
- struct ChunkRequest
- {
- CacheChunkRequest* Key = nullptr;
- RecordBody* Record = nullptr;
- CompressedBuffer Value;
- const UpstreamEndpointInfo* Source = nullptr;
- uint64_t RawSize = 0;
- uint64_t RequestedSize = 0;
- uint64_t RequestedOffset = 0;
- CachePolicy DownstreamPolicy;
- bool Exists = false;
- bool RawSizeKnown = false;
- bool IsRecordRequest = false;
- uint64_t ElapsedTimeUs = 0;
- };
-
-} // namespace cache::detail
-
-CbPackage
-HttpStructuredCacheService::HandleRpcGetCacheChunks(CbObjectView RpcRequest)
-{
- using namespace cache::detail;
-
- std::string Namespace;
- std::vector<CacheKeyRequest> RecordKeys; // Data about a Record necessary to identify it to the upstream
- std::vector<RecordBody> Records; // Scratch-space data about a Record when fulfilling RecordRequests
- std::vector<CacheChunkRequest> RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream
- std::vector<ChunkRequest> Requests; // Intermediate and result data about a ChunkRequest
- std::vector<ChunkRequest*> RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key
- std::vector<ChunkRequest*> ValueRequests; // The ChunkRequests that are requesting a Value Key
- std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream
-
- // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests
- if (!ParseGetCacheChunksRequest(Namespace, RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest))
- {
- return CbPackage{};
- }
-
- // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we
- // have it locally, and otherwise append a request for the payload to UpstreamChunks
- GetLocalCacheRecords(Namespace, RecordKeys, Records, RecordRequests, UpstreamChunks);
-
- // For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks
- GetLocalCacheValues(Namespace, ValueRequests, UpstreamChunks);
-
- // Call GetCacheChunks on the upstream for any payloads we do not have locally
- GetUpstreamCacheChunks(Namespace, UpstreamChunks, RequestKeys, Requests);
-
- // Send the payload and descriptive data about each chunk to the client
- return WriteGetCacheChunksResponse(Namespace, Requests);
-}
-
-bool
-HttpStructuredCacheService::ParseGetCacheChunksRequest(std::string& Namespace,
- std::vector<CacheKeyRequest>& RecordKeys,
- std::vector<cache::detail::RecordBody>& Records,
- std::vector<CacheChunkRequest>& RequestKeys,
- std::vector<cache::detail::ChunkRequest>& Requests,
- std::vector<cache::detail::ChunkRequest*>& RecordRequests,
- std::vector<cache::detail::ChunkRequest*>& ValueRequests,
- CbObjectView RpcRequest)
-{
- using namespace cache::detail;
-
- ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv);
-
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
- std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString();
- CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default;
-
- std::optional<std::string> NamespaceText = GetRpcRequestNamespace(Params);
- if (!NamespaceText)
- {
- ZEN_WARN("GetCacheChunks: Invalid namespace in ChunkRequest.");
- return false;
- }
- Namespace = *NamespaceText;
-
- CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView();
- size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num());
-
- // Note that these reservations allow us to take pointers to the elements while populating them. If the reservation is removed,
- // we will need to change the pointers to indexes to handle reallocations.
- RecordKeys.reserve(NumRequests);
- Records.reserve(NumRequests);
- RequestKeys.reserve(NumRequests);
- Requests.reserve(NumRequests);
- RecordRequests.reserve(NumRequests);
- ValueRequests.reserve(NumRequests);
-
- CacheKeyRequest* PreviousRecordKey = nullptr;
- RecordBody* PreviousRecord = nullptr;
-
- for (CbFieldView RequestView : ChunkRequestsArray)
- {
- CbObjectView RequestObject = RequestView.AsObjectView();
- CacheChunkRequest& RequestKey = RequestKeys.emplace_back();
- ChunkRequest& Request = Requests.emplace_back();
- CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
-
- Request.Key = &RequestKey;
- if (!GetRpcRequestCacheKey(KeyObject, Request.Key->Key))
- {
- ZEN_WARN("GetCacheChunks: Invalid key in ChunkRequest.");
- return false;
- }
-
- RequestKey.ChunkId = RequestObject["ChunkId"sv].AsHash();
- RequestKey.ValueId = RequestObject["ValueId"sv].AsObjectId();
- RequestKey.RawOffset = RequestObject["RawOffset"sv].AsUInt64();
- RequestKey.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX);
- Request.RequestedSize = RequestKey.RawSize;
- Request.RequestedOffset = RequestKey.RawOffset;
- std::string_view PolicyText = RequestObject["Policy"sv].AsString();
- Request.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
- Request.IsRecordRequest = (bool)RequestKey.ValueId;
-
- if (!Request.IsRecordRequest)
- {
- ValueRequests.push_back(&Request);
- }
- else
- {
- RecordRequests.push_back(&Request);
- CacheKeyRequest* RecordKey = nullptr;
- RecordBody* Record = nullptr;
-
- if (!PreviousRecordKey || PreviousRecordKey->Key < RequestKey.Key)
- {
- RecordKey = &RecordKeys.emplace_back();
- PreviousRecordKey = RecordKey;
- Record = &Records.emplace_back();
- PreviousRecord = Record;
- RecordKey->Key = RequestKey.Key;
- }
- else if (RequestKey.Key == PreviousRecordKey->Key)
- {
- RecordKey = PreviousRecordKey;
- Record = PreviousRecord;
- }
- else
- {
- ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.",
- RequestKey.Key.Bucket,
- RequestKey.Key.Hash,
- PreviousRecordKey->Key.Bucket,
- PreviousRecordKey->Key.Hash);
- return false;
- }
- Request.Record = Record;
- if (RequestKey.ChunkId == RequestKey.ChunkId.Zero)
- {
- Record->DownstreamPolicy =
- Record->HasRequest ? Union(Record->DownstreamPolicy, Request.DownstreamPolicy) : Request.DownstreamPolicy;
- Record->HasRequest = true;
- }
- }
- }
- if (Requests.empty())
- {
- return false;
- }
- return true;
-}
-
-void
-HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespace,
- std::vector<CacheKeyRequest>& RecordKeys,
- std::vector<cache::detail::RecordBody>& Records,
- std::vector<cache::detail::ChunkRequest*>& RecordRequests,
- std::vector<CacheChunkRequest*>& OutUpstreamChunks)
-{
- using namespace cache::detail;
-
- std::vector<CacheKeyRequest*> UpstreamRecordRequests;
- for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex)
- {
- Stopwatch Timer;
- CacheKeyRequest& RecordKey = RecordKeys[RecordIndex];
- RecordBody& Record = Records[RecordIndex];
- if (Record.HasRequest)
- {
- Record.DownstreamPolicy |= CachePolicy::SkipData | CachePolicy::SkipMeta;
-
- if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal))
- {
- ZenCacheValue CacheValue;
- if (m_CacheStore.Get(Namespace, RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue))
- {
- Record.Exists = true;
- Record.CacheValue = std::move(CacheValue.Value);
- }
- }
- if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryRemote))
- {
- RecordKey.Policy = CacheRecordPolicy(ConvertToUpstream(Record.DownstreamPolicy));
- UpstreamRecordRequests.push_back(&RecordKey);
- }
- RecordRequests[RecordIndex]->ElapsedTimeUs += Timer.GetElapsedTimeUs();
- }
- }
-
- if (!UpstreamRecordRequests.empty())
- {
- const auto OnCacheRecordGetComplete =
- [this, Namespace, &RecordKeys, &Records, &RecordRequests](CacheRecordGetCompleteParams&& Params) {
- if (!Params.Record)
- {
- return;
- }
- CacheKeyRequest& RecordKey = Params.Request;
- size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey);
- RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
- RecordBody& Record = Records[RecordIndex];
-
- const CacheKey& Key = RecordKey.Key;
- Record.Exists = true;
- CbObject ObjectBuffer = CbObject::Clone(Params.Record);
- Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
- Record.CacheValue.SetContentType(ZenContentType::kCbObject);
- Record.Source = Params.Source;
-
- bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
- if (StoreLocal)
- {
- m_CacheStore.Put(Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue});
- }
- };
- m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete));
- }
-
- std::vector<CacheChunkRequest*> UpstreamPayloadRequests;
- for (ChunkRequest* Request : RecordRequests)
- {
- Stopwatch Timer;
- if (Request->Key->ChunkId == IoHash::Zero)
- {
- // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId)
- // is missing, parse the cache record and try to find the raw hash from the ValueId.
- RecordBody& Record = *Request->Record;
- if (!Record.ValuesRead)
- {
- Record.ValuesRead = true;
- if (Record.CacheValue && Record.CacheValue.GetContentType() == ZenContentType::kCbObject)
- {
- CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData());
- CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView();
- Record.Values.reserve(ValuesArray.Num());
- for (CbFieldView ValueField : ValuesArray)
- {
- CbObjectView ValueObject = ValueField.AsObjectView();
- Oid ValueId = ValueObject["Id"sv].AsObjectId();
- CbFieldView RawHashField = ValueObject["RawHash"sv];
- IoHash RawHash = RawHashField.AsBinaryAttachment();
- if (ValueId && !RawHashField.HasError())
- {
- Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()});
- }
- }
- }
- }
-
- for (const RecordValue& Value : Record.Values)
- {
- if (Value.ValueId == Request->Key->ValueId)
- {
- Request->Key->ChunkId = Value.ContentId;
- Request->RawSize = Value.RawSize;
- Request->RawSizeKnown = true;
- break;
- }
- }
- }
-
- // Now load the ContentId from the local ContentIdStore or from the upstream
- if (Request->Key->ChunkId != IoHash::Zero)
- {
- if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
- {
- if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown)
- {
- if (m_CidStore.ContainsChunk(Request->Key->ChunkId))
- {
- Request->Exists = true;
- }
- }
- else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId))
- {
- if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
- {
- Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(Payload));
- if (Request->Value)
- {
- Request->Exists = true;
- Request->RawSizeKnown = false;
- }
- }
- else
- {
- IoHash _;
- if (CompressedBuffer::ValidateCompressedHeader(Payload, _, Request->RawSize))
- {
- Request->Exists = true;
- Request->RawSizeKnown = true;
- }
- }
- }
- }
- if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
- {
- Request->Key->Policy = ConvertToUpstream(Request->DownstreamPolicy);
- OutUpstreamChunks.push_back(Request->Key);
- }
- }
- Request->ElapsedTimeUs += Timer.GetElapsedTimeUs();
- }
-}
-
-void
-HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespace,
- std::vector<cache::detail::ChunkRequest*>& ValueRequests,
- std::vector<CacheChunkRequest*>& OutUpstreamChunks)
-{
- using namespace cache::detail;
-
- for (ChunkRequest* Request : ValueRequests)
- {
- Stopwatch Timer;
- if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
- {
- ZenCacheValue CacheValue;
- if (m_CacheStore.Get(Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue))
- {
- if (IsCompressedBinary(CacheValue.Value.GetContentType()))
- {
- Request->Key->ChunkId = CacheValue.RawHash;
- Request->Exists = true;
- Request->RawSize = CacheValue.RawSize;
- Request->RawSizeKnown = true;
- if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
- {
- Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value));
- }
- }
- }
- }
- if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
- {
- if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::StoreLocal))
- {
- // Convert the Offset,Size request into a request for the entire value; we will need it all to be able to store it locally
- Request->Key->RawOffset = 0;
- Request->Key->RawSize = UINT64_MAX;
- }
- OutUpstreamChunks.push_back(Request->Key);
- }
- Request->ElapsedTimeUs += Timer.GetElapsedTimeUs();
- }
-}
-
-void
-HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Namespace,
- std::vector<CacheChunkRequest*>& UpstreamChunks,
- std::vector<CacheChunkRequest>& RequestKeys,
- std::vector<cache::detail::ChunkRequest>& Requests)
-{
- using namespace cache::detail;
-
- if (!UpstreamChunks.empty())
- {
- const auto OnCacheChunksGetComplete = [this, Namespace, &RequestKeys, &Requests](CacheChunkGetCompleteParams&& Params) {
- if (Params.RawHash == Params.RawHash.Zero)
- {
- return;
- }
-
- CacheChunkRequest& Key = Params.Request;
- size_t RequestIndex = std::distance(RequestKeys.data(), &Key);
- ChunkRequest& Request = Requests[RequestIndex];
- Request.ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
- if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) ||
- !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
- {
- CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Params.Value));
- if (!Compressed)
- {
- return;
- }
-
- bool StoreLocal = EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
- if (StoreLocal)
- {
- if (Request.IsRecordRequest)
- {
- m_CidStore.AddChunk(Params.Value, Params.RawHash);
- }
- else
- {
- m_CacheStore.Put(Namespace,
- Key.Key.Bucket,
- Key.Key.Hash,
- {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash});
- }
- }
- if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
- {
- Request.Value = std::move(Compressed);
- }
- }
- Key.ChunkId = Params.RawHash;
- Request.Exists = true;
- Request.RawSize = Params.RawSize;
- Request.RawSizeKnown = true;
- Request.Source = Params.Source;
-
- m_CacheStats.UpstreamHitCount++;
- };
-
- m_UpstreamCache.GetCacheChunks(Namespace, UpstreamChunks, std::move(OnCacheChunksGetComplete));
- }
-}
-
-CbPackage
-HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests)
-{
- using namespace cache::detail;
-
- CbPackage RpcResponse;
- CbObjectWriter Writer;
-
- Writer.BeginArray("Result"sv);
- for (ChunkRequest& Request : Requests)
- {
- Writer.BeginObject();
- {
- if (Request.Exists)
- {
- Writer.AddHash("RawHash"sv, Request.Key->ChunkId);
- if (Request.Value && !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
- {
- RpcResponse.AddAttachment(CbAttachment(Request.Value, Request.Key->ChunkId));
- }
- else
- {
- Writer.AddInteger("RawSize"sv, Request.RawSize);
- }
-
- ZEN_DEBUG("GETCACHECHUNKS HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}",
- Namespace,
- Request.Key->Key.Bucket,
- Request.Key->Key.Hash,
- Request.Key->ValueId,
- NiceBytes(Request.RawSize),
- Request.IsRecordRequest ? "Record"sv : "Value"sv,
- Request.Source ? Request.Source->Url : "LOCAL"sv,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
- m_CacheStats.HitCount++;
- }
- else if (!EnumHasAnyFlags(Request.DownstreamPolicy, CachePolicy::Query))
- {
- ZEN_DEBUG("GETCACHECHUNKS DISABLEDQUERY - '{}/{}/{}/{}' in {}",
- Namespace,
- Request.Key->Key.Bucket,
- Request.Key->Key.Hash,
- Request.Key->ValueId,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
- }
- else
- {
- ZEN_DEBUG("GETCACHECHUNKS MISS - '{}/{}/{}/{}' in {}",
- Namespace,
- Request.Key->Key.Bucket,
- Request.Key->Key.Hash,
- Request.Key->ValueId,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
- m_CacheStats.MissCount++;
- }
- }
- Writer.EndObject();
- }
- Writer.EndArray();
-
- RpcResponse.SetObject(Writer.Save());
- return RpcResponse;
-}
-
-void
-HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request)
-{
- CbObjectWriter Cbo;
-
- EmitSnapshot("requests", m_HttpRequests, Cbo);
- EmitSnapshot("upstream_gets", m_UpstreamGetRequestTiming, Cbo);
-
- const uint64_t HitCount = m_CacheStats.HitCount;
- const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount;
- const uint64_t MissCount = m_CacheStats.MissCount;
- const uint64_t TotalCount = HitCount + MissCount;
-
- const CidStoreSize CidSize = m_CidStore.TotalSize();
- const GcStorageSize CacheSize = m_CacheStore.StorageSize();
-
- Cbo.BeginObject("cache");
- {
- Cbo.BeginObject("size");
- {
- Cbo << "disk" << CacheSize.DiskSize;
- Cbo << "memory" << CacheSize.MemorySize;
- }
- Cbo.EndObject();
-
- Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0);
- Cbo << "hits" << HitCount << "misses" << MissCount;
- Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0);
- Cbo << "upstream_hits" << m_CacheStats.UpstreamHitCount;
- Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0);
- }
- Cbo.EndObject();
-
- Cbo.BeginObject("upstream");
- {
- m_UpstreamCache.GetStatus(Cbo);
- }
- Cbo.EndObject();
-
- Cbo.BeginObject("cid");
- {
- Cbo.BeginObject("size");
- {
- Cbo << "tiny" << CidSize.TinySize;
- Cbo << "small" << CidSize.SmallSize;
- Cbo << "large" << CidSize.LargeSize;
- Cbo << "total" << CidSize.TotalSize;
- }
- Cbo.EndObject();
- }
- Cbo.EndObject();
-
- Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
-}
-
-void
-HttpStructuredCacheService::HandleStatusRequest(HttpServerRequest& Request)
-{
- CbObjectWriter Cbo;
- Cbo << "ok" << true;
- Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
-}
-
-bool
-HttpStructuredCacheService::AreDiskWritesAllowed() const
-{
- return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed());
-}
-
-#if ZEN_WITH_TESTS
-
-TEST_CASE("z$service.parse.relative.Uri")
-{
- HttpRequestData RootRequest;
- CHECK(HttpRequestParseRelativeUri("", RootRequest));
- CHECK(!RootRequest.Namespace.has_value());
- CHECK(!RootRequest.Bucket.has_value());
- CHECK(!RootRequest.HashKey.has_value());
- CHECK(!RootRequest.ValueContentId.has_value());
-
- RootRequest = {};
- CHECK(HttpRequestParseRelativeUri("/", RootRequest));
- CHECK(!RootRequest.Namespace.has_value());
- CHECK(!RootRequest.Bucket.has_value());
- CHECK(!RootRequest.HashKey.has_value());
- CHECK(!RootRequest.ValueContentId.has_value());
-
- HttpRequestData LegacyBucketRequestBecomesNamespaceRequest;
- CHECK(HttpRequestParseRelativeUri("test", LegacyBucketRequestBecomesNamespaceRequest));
- CHECK(LegacyBucketRequestBecomesNamespaceRequest.Namespace == "test"sv);
- CHECK(!LegacyBucketRequestBecomesNamespaceRequest.Bucket.has_value());
- CHECK(!LegacyBucketRequestBecomesNamespaceRequest.HashKey.has_value());
- CHECK(!LegacyBucketRequestBecomesNamespaceRequest.ValueContentId.has_value());
-
- HttpRequestData LegacyHashKeyRequest;
- CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234", LegacyHashKeyRequest));
- CHECK(LegacyHashKeyRequest.Namespace == ZenCacheStore::DefaultNamespace);
- CHECK(LegacyHashKeyRequest.Bucket == "test"sv);
- CHECK(LegacyHashKeyRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv));
- CHECK(!LegacyHashKeyRequest.ValueContentId.has_value());
-
- HttpRequestData LegacyValueContentIdRequest;
- CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234/56789abcdef12345678956789abcdef123456789",
- LegacyValueContentIdRequest));
- CHECK(LegacyValueContentIdRequest.Namespace == ZenCacheStore::DefaultNamespace);
- CHECK(LegacyValueContentIdRequest.Bucket == "test"sv);
- CHECK(LegacyValueContentIdRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv));
- CHECK(LegacyValueContentIdRequest.ValueContentId == IoHash::FromHexString("56789abcdef12345678956789abcdef123456789"sv));
-
- HttpRequestData V2DefaultNamespaceRequest;
- CHECK(HttpRequestParseRelativeUri("ue4.ddc", V2DefaultNamespaceRequest));
- CHECK(V2DefaultNamespaceRequest.Namespace == "ue4.ddc");
- CHECK(!V2DefaultNamespaceRequest.Bucket.has_value());
- CHECK(!V2DefaultNamespaceRequest.HashKey.has_value());
- CHECK(!V2DefaultNamespaceRequest.ValueContentId.has_value());
-
- HttpRequestData V2NamespaceRequest;
- CHECK(HttpRequestParseRelativeUri("nicenamespace", V2NamespaceRequest));
- CHECK(V2NamespaceRequest.Namespace == "nicenamespace"sv);
- CHECK(!V2NamespaceRequest.Bucket.has_value());
- CHECK(!V2NamespaceRequest.HashKey.has_value());
- CHECK(!V2NamespaceRequest.ValueContentId.has_value());
-
- HttpRequestData V2BucketRequestWithDefaultNamespace;
- CHECK(HttpRequestParseRelativeUri("ue4.ddc/test", V2BucketRequestWithDefaultNamespace));
- CHECK(V2BucketRequestWithDefaultNamespace.Namespace == "ue4.ddc");
- CHECK(V2BucketRequestWithDefaultNamespace.Bucket == "test"sv);
- CHECK(!V2BucketRequestWithDefaultNamespace.HashKey.has_value());
- CHECK(!V2BucketRequestWithDefaultNamespace.ValueContentId.has_value());
-
- HttpRequestData V2BucketRequestWithNamespace;
- CHECK(HttpRequestParseRelativeUri("nicenamespace/test", V2BucketRequestWithNamespace));
- CHECK(V2BucketRequestWithNamespace.Namespace == "nicenamespace"sv);
- CHECK(V2BucketRequestWithNamespace.Bucket == "test"sv);
- CHECK(!V2BucketRequestWithNamespace.HashKey.has_value());
- CHECK(!V2BucketRequestWithNamespace.ValueContentId.has_value());
-
- HttpRequestData V2HashKeyRequest;
- CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234", V2HashKeyRequest));
- CHECK(V2HashKeyRequest.Namespace == ZenCacheStore::DefaultNamespace);
- CHECK(V2HashKeyRequest.Bucket == "test");
- CHECK(V2HashKeyRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv));
- CHECK(!V2HashKeyRequest.ValueContentId.has_value());
-
- HttpRequestData V2ValueContentIdRequest;
- CHECK(
- HttpRequestParseRelativeUri("nicenamespace/test/0123456789abcdef12340123456789abcdef1234/56789abcdef12345678956789abcdef123456789",
- V2ValueContentIdRequest));
- CHECK(V2ValueContentIdRequest.Namespace == "nicenamespace"sv);
- CHECK(V2ValueContentIdRequest.Bucket == "test"sv);
- CHECK(V2ValueContentIdRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv));
- CHECK(V2ValueContentIdRequest.ValueContentId == IoHash::FromHexString("56789abcdef12345678956789abcdef123456789"sv));
-
- HttpRequestData Invalid;
- CHECK(!HttpRequestParseRelativeUri("bad\2_namespace", Invalid));
- CHECK(!HttpRequestParseRelativeUri("nice/\2\1bucket", Invalid));
- CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789a", Invalid));
- CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/56789abcdef1234", Invalid));
- CHECK(!HttpRequestParseRelativeUri("namespace/bucket/pppppppp89abcdef12340123456789abcdef1234", Invalid));
- CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/56789abcd", Invalid));
- CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/ppppppppdef12345678956789abcdef123456789",
- Invalid));
-}
-
-#endif
-
-void
-z$service_forcelink()
-{
-}
-
-} // namespace zen