aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/storage/cache/httpstructuredcache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/storage/cache/httpstructuredcache.cpp')
-rw-r--r--src/zenserver/storage/cache/httpstructuredcache.cpp2052
1 files changed, 2052 insertions, 0 deletions
diff --git a/src/zenserver/storage/cache/httpstructuredcache.cpp b/src/zenserver/storage/cache/httpstructuredcache.cpp
new file mode 100644
index 000000000..ece1d7a46
--- /dev/null
+++ b/src/zenserver/storage/cache/httpstructuredcache.cpp
@@ -0,0 +1,2052 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "httpstructuredcache.h"
+
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryutil.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/compress.h>
+#include <zencore/enumflags.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/memory/llm.h>
+#include <zencore/parallelwork.h>
+#include <zencore/scopeguard.h>
+#include <zencore/stream.h>
+#include <zencore/timer.h>
+#include <zencore/trace.h>
+#include <zencore/workthreadpool.h>
+#include <zenhttp/httpserver.h>
+#include <zenhttp/httpstats.h>
+#include <zenhttp/packageformat.h>
+#include <zenremotestore/jupiter/jupiterclient.h>
+#include <zenstore/cache/cache.h>
+#include <zenstore/cache/structuredcachestore.h>
+#include <zenstore/gc.h>
+#include <zenutil/rpcrecording.h>
+#include <zenutil/workerpools.h>
+
+#include "storage/upstream/upstreamcache.h"
+#include "storage/upstream/zen.h"
+#include "zenstore/cidstore.h"
+#include "zenstore/scrubcontext.h"
+
+#include <algorithm>
+#include <atomic>
+#include <filesystem>
+#include <queue>
+#include <thread>
+
+#include <gsl/gsl-lite.hpp>
+
+namespace zen {
+
+const FLLMTag&
+GetCacheHttpTag()
+{
+ static FLLMTag CacheHttpTag("http", FLLMTag("cache"));
+
+ return CacheHttpTag;
+}
+
+extern const FLLMTag& GetCacheRpcTag();
+
+using namespace std::literals;
+
+//////////////////////////////////////////////////////////////////////////
+
+CachePolicy
+ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams)
+{
+ std::string_view PolicyText = QueryParams.GetValue("Policy"sv);
+ return !PolicyText.empty() ? zen::ParseCachePolicy(PolicyText) : CachePolicy::Default;
+}
+
+namespace {
+ static constinit std::string_view HttpZCacheRPCPrefix = "$rpc"sv;
+ static constinit std::string_view HttpZCacheUtilStartRecording = "exec$/start-recording"sv;
+ static constinit std::string_view HttpZCacheUtilStopRecording = "exec$/stop-recording"sv;
+ static constinit std::string_view HttpZCacheUtilReplayRecording = "exec$/replay-recording"sv;
+ static constinit std::string_view HttpZCacheDetailsPrefix = "details$"sv;
+} // namespace
+
+//////////////////////////////////////////////////////////////////////////
+
+HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
+ CidStore& InCidStore,
+ HttpStatsService& StatsService,
+ HttpStatusService& StatusService,
+ UpstreamCache& UpstreamCache,
+ const DiskWriteBlocker* InDiskWriteBlocker,
+ OpenProcessCache& InOpenProcessCache)
+: m_Log(logging::Get("cache"))
+, m_CacheStore(InCacheStore)
+, m_StatsService(StatsService)
+, m_StatusService(StatusService)
+, m_CidStore(InCidStore)
+, m_UpstreamCache(UpstreamCache)
+, m_DiskWriteBlocker(InDiskWriteBlocker)
+, m_OpenProcessCache(InOpenProcessCache)
+, m_RpcHandler(m_Log, m_CacheStats, UpstreamCache, InCacheStore, InCidStore, InDiskWriteBlocker)
+{
+ m_StatsService.RegisterHandler("z$", *this);
+ m_StatusService.RegisterHandler("z$", *this);
+}
+
+HttpStructuredCacheService::~HttpStructuredCacheService()
+{
+ ZEN_INFO("closing structured cache");
+ {
+ RwLock::ExclusiveLockScope _(m_RequestRecordingLock);
+ m_RequestRecordingEnabled.store(false);
+ m_RequestRecorder.reset();
+ }
+
+ m_StatsService.UnregisterHandler("z$", *this);
+ m_StatusService.UnregisterHandler("z$", *this);
+}
+
+const char*
+HttpStructuredCacheService::BaseUri() const
+{
+ return "/z$/";
+}
+
+void
+HttpStructuredCacheService::Flush()
+{
+ m_CacheStore.Flush();
+}
+
+void
+HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request)
+{
+ std::string_view Key = Request.RelativeUri();
+ std::vector<std::string> Tokens;
+ uint32_t TokenCount = ForEachStrTok(Key, '/', [&Tokens](std::string_view Token) {
+ Tokens.push_back(std::string(Token));
+ return true;
+ });
+ std::string FilterNamespace;
+ std::string FilterBucket;
+ std::string FilterValue;
+ switch (TokenCount)
+ {
+ case 1:
+ break;
+ case 2:
+ {
+ FilterNamespace = Tokens[1];
+ if (FilterNamespace.empty())
+ {
+ m_CacheStats.BadRequestCount++;
+ return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
+ }
+ }
+ break;
+ case 3:
+ {
+ FilterNamespace = Tokens[1];
+ if (FilterNamespace.empty())
+ {
+ m_CacheStats.BadRequestCount++;
+ return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
+ }
+ FilterBucket = Tokens[2];
+ if (FilterBucket.empty())
+ {
+ m_CacheStats.BadRequestCount++;
+ return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
+ }
+ }
+ break;
+ case 4:
+ {
+ FilterNamespace = Tokens[1];
+ if (FilterNamespace.empty())
+ {
+ m_CacheStats.BadRequestCount++;
+ return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
+ }
+ FilterBucket = Tokens[2];
+ if (FilterBucket.empty())
+ {
+ m_CacheStats.BadRequestCount++;
+ return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
+ }
+ FilterValue = Tokens[3];
+ if (FilterValue.empty())
+ {
+ m_CacheStats.BadRequestCount++;
+ return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
+ }
+ }
+ break;
+ default:
+ m_CacheStats.BadRequestCount++;
+ return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
+ }
+
+ HttpServerRequest::QueryParams Params = Request.GetQueryParams();
+ bool CSV = Params.GetValue("csv") == "true";
+ bool Details = Params.GetValue("details") == "true";
+ bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true";
+
+ std::chrono::seconds NowSeconds = std::chrono::duration_cast<std::chrono::seconds>(GcClock::Now().time_since_epoch());
+ CacheValueDetails ValueDetails = m_CacheStore.GetValueDetails(FilterNamespace, FilterBucket, FilterValue);
+
+ if (CSV)
+ {
+ ExtendableStringBuilder<4096> CSVWriter;
+ if (AttachmentDetails)
+ {
+ CSVWriter << "Namespace, Bucket, Key, Cid, Size";
+ }
+ else if (Details)
+ {
+ CSVWriter << "Namespace, Bucket, Key, Size, RawSize, RawHash, ContentType, Age, AttachmentsCount, AttachmentsSize";
+ }
+ else
+ {
+ CSVWriter << "Namespace, Bucket, Key";
+ }
+ for (const auto& NamespaceIt : ValueDetails.Namespaces)
+ {
+ const std::string& Namespace = NamespaceIt.first;
+ for (const auto& BucketIt : NamespaceIt.second.Buckets)
+ {
+ const std::string& Bucket = BucketIt.first;
+ for (const auto& ValueIt : BucketIt.second.Values)
+ {
+ if (AttachmentDetails)
+ {
+ for (const IoHash& Hash : ValueIt.second.Attachments)
+ {
+ IoBuffer Payload = m_CidStore.FindChunkByCid(Hash);
+ CSVWriter << "\r\n"
+ << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << Hash.ToHexString()
+ << ", " << gsl::narrow<uint64_t>(Payload.GetSize());
+ }
+ }
+ else if (Details)
+ {
+ std::chrono::seconds LastAccessedSeconds = std::chrono::duration_cast<std::chrono::seconds>(
+ GcClock::TimePointFromTick(ValueIt.second.LastAccess).time_since_epoch());
+ CSVWriter << "\r\n"
+ << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << ValueIt.second.Size << ","
+ << ValueIt.second.RawSize << "," << ValueIt.second.RawHash.ToHexString() << ", "
+ << ToString(ValueIt.second.ContentType) << ", " << (NowSeconds.count() - LastAccessedSeconds.count())
+ << ", " << gsl::narrow<uint64_t>(ValueIt.second.Attachments.size());
+ size_t AttachmentsSize = 0;
+ for (const IoHash& Hash : ValueIt.second.Attachments)
+ {
+ IoBuffer Payload = m_CidStore.FindChunkByCid(Hash);
+ AttachmentsSize += Payload.GetSize();
+ }
+ CSVWriter << ", " << gsl::narrow<uint64_t>(AttachmentsSize);
+ }
+ else
+ {
+ CSVWriter << "\r\n" << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString();
+ }
+ }
+ }
+ }
+ return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView());
+ }
+ else
+ {
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("namespaces");
+ {
+ for (const auto& NamespaceIt : ValueDetails.Namespaces)
+ {
+ const std::string& Namespace = NamespaceIt.first;
+ Cbo.BeginObject();
+ {
+ Cbo.AddString("name", Namespace);
+ Cbo.BeginArray("buckets");
+ {
+ for (const auto& BucketIt : NamespaceIt.second.Buckets)
+ {
+ const std::string& Bucket = BucketIt.first;
+ Cbo.BeginObject();
+ {
+ Cbo.AddString("name", Bucket);
+ Cbo.BeginArray("values");
+ {
+ for (const auto& ValueIt : BucketIt.second.Values)
+ {
+ std::chrono::seconds LastAccessedSeconds = std::chrono::duration_cast<std::chrono::seconds>(
+ GcClock::TimePointFromTick(ValueIt.second.LastAccess).time_since_epoch());
+ Cbo.BeginObject();
+ {
+ Cbo.AddHash("key", ValueIt.first);
+ if (Details)
+ {
+ Cbo.AddInteger("size", ValueIt.second.Size);
+ if (ValueIt.second.Size > 0 && ValueIt.second.RawSize != 0 &&
+ ValueIt.second.RawSize != ValueIt.second.Size)
+ {
+ Cbo.AddInteger("rawsize", ValueIt.second.RawSize);
+ Cbo.AddHash("rawhash", ValueIt.second.RawHash);
+ }
+ Cbo.AddString("contenttype", ToString(ValueIt.second.ContentType));
+ Cbo.AddInteger("age",
+ gsl::narrow<uint64_t>(NowSeconds.count() - LastAccessedSeconds.count()));
+ if (ValueIt.second.Attachments.size() > 0)
+ {
+ if (AttachmentDetails)
+ {
+ Cbo.BeginArray("attachments");
+ {
+ for (const IoHash& Hash : ValueIt.second.Attachments)
+ {
+ Cbo.BeginObject();
+ Cbo.AddHash("cid", Hash);
+ IoBuffer Payload = m_CidStore.FindChunkByCid(Hash);
+ Cbo.AddInteger("size", gsl::narrow<uint64_t>(Payload.GetSize()));
+ Cbo.EndObject();
+ }
+ }
+ Cbo.EndArray();
+ }
+ else
+ {
+ Cbo.AddInteger("attachmentcount",
+ gsl::narrow<uint64_t>(ValueIt.second.Attachments.size()));
+ size_t AttachmentsSize = 0;
+ for (const IoHash& Hash : ValueIt.second.Attachments)
+ {
+ IoBuffer Payload = m_CidStore.FindChunkByCid(Hash);
+ AttachmentsSize += Payload.GetSize();
+ }
+ Cbo.AddInteger("attachmentssize", gsl::narrow<uint64_t>(AttachmentsSize));
+ }
+ }
+ }
+ }
+ Cbo.EndObject();
+ }
+ }
+ Cbo.EndArray();
+ }
+ Cbo.EndObject();
+ }
+ }
+ Cbo.EndArray();
+ }
+ Cbo.EndObject();
+ }
+ }
+ Cbo.EndArray();
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+}
+
+void
+HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
+{
+ ZEN_TRACE_CPU("z$::Http::HandleRequest");
+
+ ZEN_MEMSCOPE(GetCacheHttpTag());
+
+ metrics::OperationTiming::Scope $(m_HttpRequests);
+
+ const std::string_view Key = Request.RelativeUri();
+
+ std::string_view UriNamespace;
+
+ if (Key.ends_with(HttpZCacheRPCPrefix))
+ {
+ const size_t RpcOffset = Key.length() - HttpZCacheRPCPrefix.length();
+
+ if (RpcOffset)
+ {
+ std::string_view KeyPrefix = Key.substr(0, RpcOffset);
+
+ if (KeyPrefix.back() == '/')
+ {
+ KeyPrefix.remove_suffix(1);
+
+ UriNamespace = KeyPrefix;
+ }
+ }
+
+ return HandleRpcRequest(Request, UriNamespace);
+ }
+
+ if (Key == HttpZCacheUtilStartRecording)
+ {
+ HttpServerRequest::QueryParams Params = Request.GetQueryParams();
+
+ std::string RecordPath = UrlDecode(Params.GetValue("path"));
+
+ {
+ RwLock::ExclusiveLockScope _(m_RequestRecordingLock);
+ m_RequestRecordingEnabled.store(false);
+ m_RequestRecorder.reset();
+
+ m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath);
+ m_RequestRecordingEnabled.store(true);
+ }
+ ZEN_INFO("cache RPC recording STARTED -> '{}'", RecordPath);
+ Request.WriteResponse(HttpResponseCode::OK);
+ return;
+ }
+
+ if (Key == HttpZCacheUtilStopRecording)
+ {
+ {
+ RwLock::ExclusiveLockScope _(m_RequestRecordingLock);
+ m_RequestRecordingEnabled.store(false);
+ m_RequestRecorder.reset();
+ }
+ ZEN_INFO("cache RPC recording STOPPED");
+ Request.WriteResponse(HttpResponseCode::OK);
+ return;
+ }
+
+ if (Key == HttpZCacheUtilReplayRecording)
+ {
+ CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()};
+
+ {
+ RwLock::ExclusiveLockScope _(m_RequestRecordingLock);
+ m_RequestRecordingEnabled.store(false);
+ m_RequestRecorder.reset();
+ }
+
+ HttpServerRequest::QueryParams Params = Request.GetQueryParams();
+
+ std::string RecordPath = UrlDecode(Params.GetValue("path"));
+
+ uint32_t ThreadCount = GetHardwareConcurrency();
+ if (auto Param = Params.GetValue("thread_count"); Param.empty() == false)
+ {
+ if (auto Value = ParseInt<uint64_t>(Param))
+ {
+ ThreadCount = gsl::narrow<uint32_t>(Value.value());
+ }
+ }
+
+ ZEN_INFO("initiating cache RPC replay using {} threads, from '{}'", ThreadCount, RecordPath);
+
+ std::unique_ptr<cache::IRpcRequestReplayer> Replayer(cache::MakeDiskRequestReplayer(RecordPath, false));
+ ReplayRequestRecorder(RequestContext, *Replayer, ThreadCount < 1 ? 1 : ThreadCount);
+
+ ZEN_INFO("cache RPC replay COMPLETED");
+
+ Request.WriteResponse(HttpResponseCode::OK);
+ return;
+ }
+
+ if (Key.starts_with(HttpZCacheDetailsPrefix))
+ {
+ HandleDetailsRequest(Request);
+ return;
+ }
+
+ HttpCacheRequestData RequestData;
+ if (!HttpCacheRequestParseRelativeUri(Key, ZenCacheStore::DefaultNamespace, RequestData))
+ {
+ m_CacheStats.BadRequestCount++;
+ return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
+ }
+
+ if (RequestData.ValueContentId.has_value())
+ {
+ ZEN_ASSERT(RequestData.Namespace.has_value());
+ ZEN_ASSERT(RequestData.Bucket.has_value());
+ ZEN_ASSERT(RequestData.HashKey.has_value());
+ CacheRef Ref = {.Namespace = RequestData.Namespace.value(),
+ .BucketSegment = RequestData.Bucket.value(),
+ .HashKey = RequestData.HashKey.value(),
+ .ValueContentId = RequestData.ValueContentId.value()};
+ return HandleCacheChunkRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams()));
+ }
+
+ if (RequestData.HashKey.has_value())
+ {
+ ZEN_ASSERT(RequestData.Namespace.has_value());
+ ZEN_ASSERT(RequestData.Bucket.has_value());
+ CacheRef Ref = {.Namespace = RequestData.Namespace.value(),
+ .BucketSegment = RequestData.Bucket.value(),
+ .HashKey = RequestData.HashKey.value(),
+ .ValueContentId = IoHash::Zero};
+ return HandleCacheRecordRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams()));
+ }
+
+ if (RequestData.Bucket.has_value())
+ {
+ ZEN_ASSERT(RequestData.Namespace.has_value());
+ return HandleCacheBucketRequest(Request, RequestData.Namespace.value(), RequestData.Bucket.value());
+ }
+
+ if (RequestData.Namespace.has_value())
+ {
+ return HandleCacheNamespaceRequest(Request, RequestData.Namespace.value());
+ }
+ return HandleCacheRequest(Request);
+}
+
+void
+HttpStructuredCacheService::HandleCacheRequest(HttpServerRequest& Request)
+{
+ switch (Request.RequestVerb())
+ {
+ case HttpVerb::kHead:
+ case HttpVerb::kGet:
+ {
+ ZenCacheStore::Info Info = m_CacheStore.GetInfo();
+
+ CbObjectWriter ResponseWriter;
+
+ ResponseWriter.BeginObject("Configuration");
+ {
+ ExtendableStringBuilder<128> BasePathString;
+ BasePathString << Info.BasePath.u8string();
+ ResponseWriter.AddString("BasePath"sv, BasePathString.ToView());
+ ResponseWriter.AddBool("AllowAutomaticCreationOfNamespaces", Info.Config.AllowAutomaticCreationOfNamespaces);
+ ResponseWriter.BeginObject("Logging");
+ {
+ ResponseWriter.AddBool("EnableWriteLog", Info.Config.Logging.EnableWriteLog);
+ ResponseWriter.AddBool("EnableAccessLog", Info.Config.Logging.EnableAccessLog);
+ }
+ ResponseWriter.EndObject();
+ }
+ ResponseWriter.EndObject();
+
+ std::sort(begin(Info.NamespaceNames), end(Info.NamespaceNames), [](std::string_view L, std::string_view R) {
+ return L.compare(R) < 0;
+ });
+ ResponseWriter.BeginArray("Namespaces");
+ for (const std::string& NamespaceName : Info.NamespaceNames)
+ {
+ ResponseWriter.AddString(NamespaceName);
+ }
+ ResponseWriter.EndArray();
+ ResponseWriter.BeginObject("StorageSize");
+ {
+ ResponseWriter.AddInteger("DiskSize", Info.StorageSize.DiskSize);
+ ResponseWriter.AddInteger("MemorySize", Info.StorageSize.MemorySize);
+ }
+
+ ResponseWriter.EndObject();
+
+ ResponseWriter.AddInteger("DiskEntryCount", Info.DiskEntryCount);
+
+ return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save());
+ }
+ break;
+ default:
+ m_CacheStats.BadRequestCount++;
+ break;
+ }
+}
+
+void
+HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view NamespaceName)
+{
+ switch (Request.RequestVerb())
+ {
+ case HttpVerb::kHead:
+ case HttpVerb::kGet:
+ {
+ std::optional<ZenCacheNamespace::Info> Info = m_CacheStore.GetNamespaceInfo(NamespaceName);
+ if (!Info.has_value())
+ {
+ return Request.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ CbObjectWriter ResponseWriter;
+
+ ResponseWriter.BeginObject("Configuration");
+ {
+ ExtendableStringBuilder<128> BasePathString;
+ BasePathString << Info->RootDir.u8string();
+ ResponseWriter.AddString("RootDir"sv, BasePathString.ToView());
+ ResponseWriter.AddInteger("MaxBlockSize"sv, Info->Config.DiskLayerConfig.BucketConfig.MaxBlockSize);
+ ResponseWriter.AddInteger("PayloadAlignment"sv, Info->Config.DiskLayerConfig.BucketConfig.PayloadAlignment);
+ ResponseWriter.AddInteger("MemCacheSizeThreshold"sv, Info->Config.DiskLayerConfig.BucketConfig.MemCacheSizeThreshold);
+ ResponseWriter.AddInteger("LargeObjectThreshold"sv, Info->Config.DiskLayerConfig.BucketConfig.LargeObjectThreshold);
+ ResponseWriter.AddInteger("MemCacheTargetFootprintBytes"sv, Info->Config.DiskLayerConfig.MemCacheTargetFootprintBytes);
+ ResponseWriter.AddInteger("MemCacheTrimIntervalSeconds"sv, Info->Config.DiskLayerConfig.MemCacheTrimIntervalSeconds);
+ ResponseWriter.AddInteger("MemCacheMaxAgeSeconds"sv, Info->Config.DiskLayerConfig.MemCacheMaxAgeSeconds);
+ }
+ ResponseWriter.EndObject();
+
+ std::sort(begin(Info->BucketNames), end(Info->BucketNames), [](std::string_view L, std::string_view R) {
+ return L.compare(R) < 0;
+ });
+
+ ResponseWriter.BeginArray("Buckets"sv);
+ for (const std::string& BucketName : Info->BucketNames)
+ {
+ ResponseWriter.AddString(BucketName);
+ }
+ ResponseWriter.EndArray();
+
+ ResponseWriter.BeginObject("StorageSize"sv);
+ {
+ ResponseWriter.AddInteger("DiskSize"sv, Info->DiskLayerInfo.StorageSize.DiskSize);
+ ResponseWriter.AddInteger("MemorySize"sv, Info->DiskLayerInfo.StorageSize.MemorySize);
+ }
+ ResponseWriter.EndObject();
+
+ ResponseWriter.AddInteger("EntryCount", Info->DiskLayerInfo.EntryCount);
+
+ if (auto Buckets = HttpServerRequest::Decode(Request.GetQueryParams().GetValue("bucketsizes")); !Buckets.empty())
+ {
+ ResponseWriter.BeginObject("BucketSizes");
+
+ ResponseWriter.BeginArray("Buckets");
+
+ std::vector<std::string> BucketNames;
+ if (Buckets == "*") // Get all - empty FieldFilter equal getting all fields
+ {
+ BucketNames = Info.value().BucketNames;
+ }
+ else
+ {
+ ForEachStrTok(Buckets, ',', [&](std::string_view BucketName) {
+ BucketNames.push_back(std::string(BucketName));
+ return true;
+ });
+ }
+ WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Background);
+ std::vector<IoHash> AllAttachments;
+ for (const std::string& BucketName : BucketNames)
+ {
+ ResponseWriter.BeginObject();
+ ResponseWriter << "Name" << BucketName;
+ CacheContentStats ContentStats;
+ bool Success = m_CacheStore.GetContentStats(NamespaceName, BucketName, ContentStats);
+ if (Success)
+ {
+ size_t ValuesSize = 0;
+ for (const uint64_t Size : ContentStats.ValueSizes)
+ {
+ ValuesSize += Size;
+ }
+
+ std::sort(ContentStats.Attachments.begin(), ContentStats.Attachments.end());
+ auto NewEnd = std::unique(ContentStats.Attachments.begin(), ContentStats.Attachments.end());
+ ContentStats.Attachments.erase(NewEnd, ContentStats.Attachments.end());
+
+ ResponseWriter << "Count" << ContentStats.ValueSizes.size();
+ ResponseWriter << "StructuredCount" << ContentStats.StructuredValuesCount;
+ ResponseWriter << "StandaloneCount" << ContentStats.StandaloneValuesCount;
+ ResponseWriter << "Size" << ValuesSize;
+ ResponseWriter << "AttachmentCount" << ContentStats.Attachments.size();
+
+ AllAttachments.insert(AllAttachments.end(), ContentStats.Attachments.begin(), ContentStats.Attachments.end());
+ }
+ ResponseWriter.EndObject();
+ }
+
+ ResponseWriter.EndArray();
+
+ ResponseWriter.BeginObject("Attachments");
+ std::sort(AllAttachments.begin(), AllAttachments.end());
+ auto NewEnd = std::unique(AllAttachments.begin(), AllAttachments.end());
+ AllAttachments.erase(NewEnd, AllAttachments.end());
+
+ uint64_t AttachmentsSize = 0;
+
+ m_CidStore.IterateChunks(
+ AllAttachments,
+ [&](size_t Index, const IoBuffer& Payload) {
+ ZEN_UNUSED(Index);
+ AttachmentsSize += Payload.GetSize();
+ return true;
+ },
+ &WorkerPool,
+ 8u * 1024u);
+
+ ResponseWriter << "Count" << AllAttachments.size();
+ ResponseWriter << "Size" << AttachmentsSize;
+
+ ResponseWriter.EndObject();
+
+ ResponseWriter.EndObject();
+ }
+
+ return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save());
+ }
+ break;
+
+ case HttpVerb::kDelete:
+ // Drop namespace
+ {
+ if (m_CacheStore.DropNamespace(NamespaceName))
+ {
+ return Request.WriteResponse(HttpResponseCode::OK);
+ }
+ else
+ {
+ return Request.WriteResponse(HttpResponseCode::NotFound);
+ }
+ }
+ break;
+
+ default:
+ break;
+ }
+}
+
+void
+HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request,
+ std::string_view NamespaceName,
+ std::string_view BucketName)
+{
+ switch (Request.RequestVerb())
+ {
+ case HttpVerb::kHead:
+ case HttpVerb::kGet:
+ {
+ std::optional<ZenCacheNamespace::BucketInfo> Info = m_CacheStore.GetBucketInfo(NamespaceName, BucketName);
+ if (!Info.has_value())
+ {
+ return Request.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ CbObjectWriter ResponseWriter;
+
+ ResponseWriter.BeginObject("StorageSize");
+ {
+ ResponseWriter.AddInteger("DiskSize", Info->DiskLayerInfo.StorageSize.DiskSize);
+ ResponseWriter.AddInteger("MemorySize", Info->DiskLayerInfo.StorageSize.MemorySize);
+ }
+ ResponseWriter.EndObject();
+
+ ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount);
+
+ if (auto GetBucketSize = Request.GetQueryParams().GetValue("bucketsize"); GetBucketSize == "true")
+ {
+ CacheContentStats ContentStats;
+ bool Success = m_CacheStore.GetContentStats(NamespaceName, BucketName, ContentStats);
+ if (Success)
+ {
+ size_t ValuesSize = 0;
+ for (const uint64_t Size : ContentStats.ValueSizes)
+ {
+ ValuesSize += Size;
+ }
+
+ std::sort(ContentStats.Attachments.begin(), ContentStats.Attachments.end());
+ auto NewEnd = std::unique(ContentStats.Attachments.begin(), ContentStats.Attachments.end());
+ ContentStats.Attachments.erase(NewEnd, ContentStats.Attachments.end());
+
+ ResponseWriter << "Count" << ContentStats.ValueSizes.size();
+ ResponseWriter << "StructuredCount" << ContentStats.StructuredValuesCount;
+ ResponseWriter << "StandaloneCount" << ContentStats.StandaloneValuesCount;
+ ResponseWriter << "Size" << ValuesSize;
+ ResponseWriter << "AttachmentCount" << ContentStats.Attachments.size();
+
+ uint64_t AttachmentsSize = 0;
+
+ WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Background);
+
+ m_CidStore.IterateChunks(
+ ContentStats.Attachments,
+ [&](size_t Index, const IoBuffer& Payload) {
+ ZEN_UNUSED(Index);
+ AttachmentsSize += Payload.GetSize();
+ return true;
+ },
+ &WorkerPool,
+ 8u * 1024u);
+
+ ResponseWriter << "AttachmentsSize" << AttachmentsSize;
+ }
+ }
+
+ return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save());
+ }
+ break;
+
+ case HttpVerb::kDelete:
+ // Drop bucket
+ {
+ if (m_CacheStore.DropBucket(NamespaceName, BucketName))
+ {
+ return Request.WriteResponse(HttpResponseCode::OK);
+ }
+ else
+ {
+ return Request.WriteResponse(HttpResponseCode::NotFound);
+ }
+ }
+ break;
+
+ default:
+ break;
+ }
+}
+
+void
+HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
+{
+ switch (Request.RequestVerb())
+ {
+ case HttpVerb::kHead:
+ case HttpVerb::kGet:
+ HandleGetCacheRecord(Request, Ref, PolicyFromUrl);
+ break;
+
+ case HttpVerb::kPut:
+ HandlePutCacheRecord(Request, Ref, PolicyFromUrl);
+ break;
+
+ default:
+ break;
+ }
+}
+
+void
+HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
+{
+ const ZenContentType AcceptType = Request.AcceptContentType();
+ const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData);
+ const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord);
+
+ bool Success = false;
+ uint32_t MissingCount = 0;
+ ZenCacheValue ClientResultValue;
+ if (!EnumHasAnyFlags(PolicyFromUrl, CachePolicy::Query))
+ {
+ return Request.WriteResponse(HttpResponseCode::OK);
+ }
+
+ const bool HasUpstream = m_UpstreamCache.IsActive();
+
+ CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()};
+ Stopwatch Timer;
+
+ if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal) &&
+ m_CacheStore.Get(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue))
+ {
+ Success = true;
+ ZenContentType ContentType = ClientResultValue.Value.GetContentType();
+
+ if (AcceptType == ZenContentType::kCbPackage)
+ {
+ if (ContentType == ZenContentType::kCbObject)
+ {
+ CbPackage Package;
+ CbValidateError ValidateError = CbValidateError::None;
+ if (CbObject PackageObject = ValidateAndReadCompactBinaryObject(std::move(ClientResultValue.Value), ValidateError);
+ ValidateError == CbValidateError::None)
+ {
+ CbObjectView CacheRecord(ClientResultValue.Value.Data());
+ CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) {
+ if (SkipData)
+ {
+ if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash()))
+ {
+ MissingCount++;
+ }
+ }
+ else
+ {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
+ if (Compressed)
+ {
+ Package.AddAttachment(CbAttachment(Compressed, AttachmentHash.AsHash()));
+ }
+ else
+ {
+ ZEN_WARN("invalid compressed binary returned for {}", AttachmentHash.AsHash());
+ MissingCount++;
+ }
+ }
+ else
+ {
+ MissingCount++;
+ }
+ }
+ });
+
+ Success = MissingCount == 0 || PartialRecord;
+ }
+ else
+ {
+ ZEN_WARN("Invalid compact binary payload returned for {}/{}/{} ({}). Reason: '{}'",
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ Ref.ValueContentId,
+ ToString(ValidateError));
+ Success = false;
+ }
+
+ if (Success)
+ {
+ CbObject PackageObject = LoadCompactBinaryObject(std::move(ClientResultValue.Value));
+
+ Package.SetObject(std::move(PackageObject));
+
+ BinaryWriter MemStream;
+ Package.Save(MemStream);
+
+ ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ ClientResultValue.Value.SetContentType(HttpContentType::kCbPackage);
+ }
+ }
+ else
+ {
+ Success = false;
+ }
+ }
+ else if (AcceptType != ClientResultValue.Value.GetContentType() && AcceptType != ZenContentType::kUnknownContentType &&
+ AcceptType != ZenContentType::kBinary)
+ {
+ Success = false;
+ }
+ }
+
+ if (Success)
+ {
+ ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {} '{}' (LOCAL) in {}",
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(ClientResultValue.Value.Size()),
+ ToString(ClientResultValue.Value.GetContentType()),
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+
+ m_CacheStats.HitCount++;
+ if (SkipData && AcceptType != ZenContentType::kCbPackage && AcceptType != ZenContentType::kCbObject)
+ {
+ return Request.WriteResponse(HttpResponseCode::OK);
+ }
+ else
+ {
+ // kCbPackage handled SkipData when constructing the ClientResultValue, kcbObject ignores SkipData
+ return Request.WriteResponse((MissingCount == 0) ? HttpResponseCode::OK : HttpResponseCode::PartialContent,
+ ClientResultValue.Value.GetContentType(),
+ ClientResultValue.Value);
+ }
+ }
+ else if (!HasUpstream || !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryRemote))
+ {
+ ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}",
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ ToString(AcceptType),
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ m_CacheStats.MissCount++;
+ return Request.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ // Issue upstream query asynchronously in order to keep requests flowing without
+ // hogging I/O servicing threads with blocking work
+
+ uint64_t LocalElapsedTimeUs = Timer.GetElapsedTimeUs();
+
+ Request.WriteResponseAsync([this, AcceptType, PolicyFromUrl, Ref, LocalElapsedTimeUs, RequestContext](HttpServerRequest& AsyncRequest) {
+ Stopwatch Timer;
+ bool Success = false;
+ const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord);
+ const bool QueryLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
+ const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData);
+ ZenCacheValue ClientResultValue;
+
+ metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming);
+
+ if (GetUpstreamCacheSingleResult UpstreamResult =
+ m_UpstreamCache.GetCacheRecord(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, AcceptType);
+ UpstreamResult.Status.Success)
+ {
+ Success = true;
+
+ ClientResultValue.Value = UpstreamResult.Value;
+ ClientResultValue.Value.SetContentType(AcceptType);
+
+ if (AcceptType == ZenContentType::kBinary || AcceptType == ZenContentType::kCbObject)
+ {
+ if (AcceptType == ZenContentType::kCbObject)
+ {
+ const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All);
+ if (ValidationResult != CbValidateError::None)
+ {
+ Success = false;
+ ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid compact binary object from upstream",
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ ToString(AcceptType));
+ }
+
+ // We do not do anything to the returned object for SkipData, only package attachments are cut when skipping data
+ }
+
+ if (Success && StoreLocal)
+ {
+ const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
+ ZenCacheStore::PutResult PutResult =
+ m_CacheStore
+ .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, Overwrite, nullptr);
+ if (PutResult.Status == zen::PutStatus::Success)
+ {
+ m_CacheStats.WriteCount++;
+ }
+ }
+ }
+ else if (AcceptType == ZenContentType::kCbPackage)
+ {
+ CbPackage Package;
+ if (Package.TryLoad(ClientResultValue.Value))
+ {
+ CbObject CacheRecord = Package.GetObject();
+ AttachmentCount Count;
+ size_t NumAttachments = Package.GetAttachments().size();
+ std::vector<IoHash> ReferencedAttachments;
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ WriteAttachmentBuffers.reserve(NumAttachments);
+ std::vector<IoHash> WriteRawHashes;
+ WriteRawHashes.reserve(NumAttachments);
+
+ CacheRecord.IterateAttachments([this,
+ &Package,
+ &Ref,
+ &WriteAttachmentBuffers,
+ &WriteRawHashes,
+ &ReferencedAttachments,
+ &Count,
+ QueryLocal,
+ StoreLocal,
+ SkipData](CbFieldView HashView) {
+ IoHash Hash = HashView.AsHash();
+ ReferencedAttachments.push_back(Hash);
+ if (const CbAttachment* Attachment = Package.FindAttachment(Hash))
+ {
+ if (Attachment->IsCompressedBinary())
+ {
+ if (StoreLocal)
+ {
+ const CompressedBuffer& Chunk = Attachment->AsCompressedBinary();
+ WriteAttachmentBuffers.push_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(Attachment->GetHash());
+ }
+ Count.Valid++;
+ }
+ else
+ {
+ ZEN_WARN("Uncompressed value '{}' from upstream cache record '{}/{}'",
+ Hash,
+ Ref.BucketSegment,
+ Ref.HashKey);
+ Count.Invalid++;
+ }
+ }
+ else if (QueryLocal)
+ {
+ if (SkipData)
+ {
+ if (m_CidStore.ContainsChunk(Hash))
+ {
+ Count.Valid++;
+ }
+ }
+ else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash))
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
+ if (Compressed)
+ {
+ Package.AddAttachment(CbAttachment(Compressed, Hash));
+ Count.Valid++;
+ }
+ else
+ {
+ ZEN_WARN("Uncompressed value '{}' stored in local cache '{}/{}'", Hash, Ref.BucketSegment, Ref.HashKey);
+ Count.Invalid++;
+ }
+ }
+ }
+ Count.Total++;
+ });
+
+ if ((Count.Valid == Count.Total) || PartialRecord)
+ {
+ ZenCacheValue CacheValue;
+ CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer();
+ CacheValue.Value.SetContentType(ZenContentType::kCbObject);
+
+ if (StoreLocal)
+ {
+ const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext,
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ CacheValue,
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status == zen::PutStatus::Success)
+ {
+ m_CacheStats.WriteCount++;
+
+ if (!WriteAttachmentBuffers.empty())
+ {
+ std::vector<CidStore::InsertResult> InsertResults =
+ m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (const CidStore::InsertResult& Result : InsertResults)
+ {
+ if (Result.New)
+ {
+ Count.New++;
+ }
+ }
+ }
+
+ WriteAttachmentBuffers = {};
+ WriteRawHashes = {};
+ }
+ }
+
+ BinaryWriter MemStream;
+ if (SkipData)
+ {
+ // Save a package containing only the object.
+ CbPackage(Package.GetObject()).Save(MemStream);
+ }
+ else
+ {
+ Package.Save(MemStream);
+ }
+
+ ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ ClientResultValue.Value.SetContentType(ZenContentType::kCbPackage);
+ }
+ else
+ {
+ Success = false;
+ ZEN_WARN("Get - '{}/{}' '{}' FAILED, attachments missing in upstream package",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ ToString(AcceptType));
+ }
+ }
+ else
+ {
+ Success = false;
+ ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid upstream package",
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ ToString(AcceptType));
+ }
+ }
+ }
+
+ if (Success)
+ {
+ ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {} '{}' (UPSTREAM) in {}",
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(ClientResultValue.Value.Size()),
+ ToString(ClientResultValue.Value.GetContentType()),
+ NiceLatencyNs((LocalElapsedTimeUs + Timer.GetElapsedTimeUs()) * 1000));
+
+ m_CacheStats.HitCount++;
+ m_CacheStats.UpstreamHitCount++;
+
+ if (SkipData && AcceptType == ZenContentType::kBinary)
+ {
+ AsyncRequest.WriteResponse(HttpResponseCode::OK);
+ }
+ else
+ {
+ // Other methods modify ClientResultValue to a version that has skipped the data but keeps the Object and optionally
+ // metadata.
+ AsyncRequest.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value);
+ }
+ }
+ else
+ {
+ ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}",
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ ToString(AcceptType),
+ NiceLatencyNs((LocalElapsedTimeUs + Timer.GetElapsedTimeUs()) * 1000));
+ m_CacheStats.MissCount++;
+ AsyncRequest.WriteResponse(HttpResponseCode::NotFound);
+ }
+ });
+}
+
+void
+HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
+{
+ IoBuffer Body = Request.ReadPayload();
+
+ if (!Body || Body.Size() == 0)
+ {
+ m_CacheStats.BadRequestCount++;
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ if (!AreDiskWritesAllowed())
+ {
+ return Request.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ auto WriteFailureResponse = [&Request](const ZenCacheStore::PutResult& PutResult) {
+ ZEN_UNUSED(PutResult);
+
+ HttpResponseCode ResponseCode = HttpResponseCode::InternalServerError;
+ switch (PutResult.Status)
+ {
+ case zen::PutStatus::Conflict:
+ ResponseCode = HttpResponseCode::Conflict;
+ break;
+ case zen::PutStatus::Invalid:
+ ResponseCode = HttpResponseCode::BadRequest;
+ break;
+ }
+
+ if (PutResult.Details)
+ {
+ Request.WriteResponse(ResponseCode, PutResult.Details);
+ }
+ return Request.WriteResponse(ResponseCode);
+ };
+
+ const HttpContentType ContentType = Request.RequestContentType();
+
+ Body.SetContentType(ContentType);
+
+ CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()};
+
+ const bool HasUpstream = m_UpstreamCache.IsActive();
+
+ Stopwatch Timer;
+
+ if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kCompressedBinary)
+ {
+ IoHash RawHash = IoHash::Zero;
+ uint64_t RawSize = Body.GetSize();
+ if (ContentType == HttpContentType::kCompressedBinary)
+ {
+ if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize))
+ {
+ m_CacheStats.BadRequestCount++;
+ return Request.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Payload is not a valid compressed binary"sv);
+ }
+ }
+ else
+ {
+ RawHash = IoHash::HashBuffer(SharedBuffer(Body));
+ }
+ const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
+ // TODO: Propagation for rejected PUTs
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext,
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ {.Value = Body, .RawSize = RawSize, .RawHash = RawHash},
+ {},
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
+ {
+ return WriteFailureResponse(PutResult);
+ }
+ m_CacheStats.WriteCount++;
+
+ if (HasUpstream && EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote))
+ {
+ m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}});
+ }
+
+ ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' in {}",
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(Body.Size()),
+ ToString(ContentType),
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ Request.WriteResponse(HttpResponseCode::Created);
+ }
+ else if (ContentType == HttpContentType::kCbObject)
+ {
+ const CbValidateError ValidationResult = ValidateCompactBinary(MemoryView(Body.GetData(), Body.GetSize()), CbValidateMode::All);
+
+ if (ValidationResult != CbValidateError::None)
+ {
+ ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, invalid compact binary",
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ ToString(ContentType));
+ m_CacheStats.BadRequestCount++;
+ return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv);
+ }
+
+ Body.SetContentType(ZenContentType::kCbObject);
+
+ CbObjectView CacheRecord(Body.Data());
+ std::vector<IoHash> ValidAttachments;
+ std::vector<IoHash> ReferencedAttachments;
+ int32_t TotalCount = 0;
+
+ CacheRecord.IterateAttachments([this, &TotalCount, &ValidAttachments, &ReferencedAttachments](CbFieldView AttachmentHash) {
+ const IoHash Hash = AttachmentHash.AsHash();
+ ReferencedAttachments.push_back(Hash);
+ if (m_CidStore.ContainsChunk(Hash))
+ {
+ ValidAttachments.emplace_back(Hash);
+ }
+ TotalCount++;
+ });
+
+ const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
+
+ // TODO: Propagation for rejected PUTs
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext,
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ {.Value = Body},
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
+ {
+ return WriteFailureResponse(PutResult);
+ }
+ m_CacheStats.WriteCount++;
+
+ ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total) in {}",
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(Body.Size()),
+ ToString(ContentType),
+ TotalCount,
+ ValidAttachments.size(),
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+
+ const bool IsPartialRecord = TotalCount != static_cast<int32_t>(ValidAttachments.size());
+
+ CachePolicy Policy = PolicyFromUrl;
+ if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
+ {
+ m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject,
+ .Namespace = Ref.Namespace,
+ .Key = {Ref.BucketSegment, Ref.HashKey},
+ .ValueContentIds = std::move(ValidAttachments)});
+ }
+
+ Request.WriteResponse(HttpResponseCode::Created);
+ }
+ else if (ContentType == HttpContentType::kCbPackage)
+ {
+ CbPackage Package;
+
+ if (!Package.TryLoad(Body))
+ {
+ ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, invalid package",
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ ToString(ContentType));
+ m_CacheStats.BadRequestCount++;
+ return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv);
+ }
+ CachePolicy Policy = PolicyFromUrl;
+
+ CbObject CacheRecord = Package.GetObject();
+
+ AttachmentCount Count;
+ size_t NumAttachments = Package.GetAttachments().size();
+ std::vector<IoHash> ValidAttachments;
+ std::vector<IoHash> ReferencedAttachments;
+ ValidAttachments.reserve(NumAttachments);
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+ WriteAttachmentBuffers.reserve(NumAttachments);
+ WriteRawHashes.reserve(NumAttachments);
+
+ CacheRecord.IterateAttachments(
+ [this, &Ref, &Package, &WriteAttachmentBuffers, &WriteRawHashes, &ValidAttachments, &ReferencedAttachments, &Count](
+ CbFieldView HashView) {
+ const IoHash Hash = HashView.AsHash();
+ ReferencedAttachments.push_back(Hash);
+ if (const CbAttachment* Attachment = Package.FindAttachment(Hash))
+ {
+ if (Attachment->IsCompressedBinary())
+ {
+ WriteAttachmentBuffers.emplace_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(Hash);
+ ValidAttachments.emplace_back(Hash);
+ Count.Valid++;
+ }
+ else
+ {
+ ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ ToString(HttpContentType::kCbPackage),
+ Hash);
+ Count.Invalid++;
+ }
+ }
+ else if (m_CidStore.ContainsChunk(Hash))
+ {
+ ValidAttachments.emplace_back(Hash);
+ Count.Valid++;
+ }
+ Count.Total++;
+ });
+
+ if (Count.Invalid > 0)
+ {
+ m_CacheStats.BadRequestCount++;
+ return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv);
+ }
+
+ const bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal);
+
+ ZenCacheValue CacheValue;
+ CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer();
+ CacheValue.Value.SetContentType(ZenContentType::kCbObject);
+ // TODO: Propagation for rejected PUTs
+ ZenCacheStore::PutResult PutResult =
+ m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments, Overwrite);
+ if (PutResult.Status != zen::PutStatus::Success)
+ {
+ return WriteFailureResponse(PutResult);
+ }
+ m_CacheStats.WriteCount++;
+
+ if (!WriteAttachmentBuffers.empty())
+ {
+ std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (const CidStore::InsertResult& InsertResult : InsertResults)
+ {
+ if (InsertResult.New)
+ {
+ Count.New++;
+ }
+ }
+ WriteAttachmentBuffers = {};
+ WriteRawHashes = {};
+ }
+
+ ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total) in {}",
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(Body.GetSize()),
+ ToString(ContentType),
+ Count.New,
+ Count.Valid,
+ Count.Total,
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+
+ const bool IsPartialRecord = Count.Valid != Count.Total;
+
+ if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
+ {
+ m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
+ .Namespace = Ref.Namespace,
+ .Key = {Ref.BucketSegment, Ref.HashKey},
+ .ValueContentIds = std::move(ValidAttachments)});
+ }
+
+ Request.WriteResponse(HttpResponseCode::Created);
+ }
+ else
+ {
+ m_CacheStats.BadRequestCount++;
+ return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Content-Type invalid"sv);
+ }
+}
+
+void
+HttpStructuredCacheService::HandleCacheChunkRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
+{
+ switch (Request.RequestVerb())
+ {
+ case HttpVerb::kHead:
+ case HttpVerb::kGet:
+ HandleGetCacheChunk(Request, Ref, PolicyFromUrl);
+ break;
+ case HttpVerb::kPut:
+ HandlePutCacheChunk(Request, Ref, PolicyFromUrl);
+ break;
+ default:
+ break;
+ }
+}
+
+void
+HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
+{
+ Stopwatch Timer;
+
+ IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId);
+ const UpstreamEndpointInfo* Source = nullptr;
+ CachePolicy Policy = PolicyFromUrl;
+
+ const bool HasUpstream = m_UpstreamCache.IsActive();
+ {
+ const bool QueryUpstream = HasUpstream && !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote);
+
+ if (QueryUpstream)
+ {
+ if (GetUpstreamCacheSingleResult UpstreamResult =
+ m_UpstreamCache.GetCacheChunk(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId);
+ UpstreamResult.Status.Success)
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (CompressedBuffer::ValidateCompressedHeader(UpstreamResult.Value, RawHash, RawSize))
+ {
+ if (RawHash == Ref.ValueContentId)
+ {
+ if (AreDiskWritesAllowed())
+ {
+ m_CidStore.AddChunk(UpstreamResult.Value, RawHash);
+ }
+ Source = UpstreamResult.Source;
+ }
+ else
+ {
+ ZEN_WARN("got missmatching upstream cache value");
+ }
+ }
+ else
+ {
+ ZEN_WARN("got uncompressed upstream cache value");
+ }
+ }
+ }
+ }
+
+ if (!Value)
+ {
+ ZEN_DEBUG("GETCACHECHUNK MISS - '{}/{}/{}/{}' '{}' in {}",
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ Ref.ValueContentId,
+ ToString(Request.AcceptContentType()),
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ m_CacheStats.MissCount++;
+ return Request.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ ZEN_DEBUG("GETCACHECHUNK HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}",
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ Ref.ValueContentId,
+ NiceBytes(Value.Size()),
+ ToString(Value.GetContentType()),
+ Source ? Source->Url : "LOCAL"sv,
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+
+ m_CacheStats.HitCount++;
+ if (Source)
+ {
+ m_CacheStats.UpstreamHitCount++;
+ }
+
+ if (EnumHasAllFlags(Policy, CachePolicy::SkipData))
+ {
+ Request.WriteResponse(HttpResponseCode::OK);
+ }
+ else
+ {
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value);
+ }
+}
+
+void
+HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
+{
+ // Note: Individual cacherecord values are not propagated upstream until a valid cache record has been stored
+ ZEN_UNUSED(PolicyFromUrl);
+
+ Stopwatch Timer;
+
+ IoBuffer Body = Request.ReadPayload();
+
+ if (!Body || Body.Size() == 0)
+ {
+ m_CacheStats.BadRequestCount++;
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ if (!AreDiskWritesAllowed())
+ {
+ return Request.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ Body.SetContentType(Request.RequestContentType());
+
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize))
+ {
+ m_CacheStats.BadRequestCount++;
+ return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv);
+ }
+
+ if (RawHash != Ref.ValueContentId)
+ {
+ m_CacheStats.BadRequestCount++;
+ return Request.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "ValueContentId does not match attachment hash"sv);
+ }
+
+ CidStore::InsertResult Result = m_CidStore.AddChunk(Body, RawHash);
+
+ ZEN_DEBUG("PUTCACHECHUNK - '{}/{}/{}/{}' {} '{}' ({}) in {}",
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ Ref.ValueContentId,
+ NiceBytes(Body.Size()),
+ ToString(Body.GetContentType()),
+ Result.New ? "NEW" : "OLD",
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+
+ const HttpResponseCode ResponseCode = Result.New ? HttpResponseCode::Created : HttpResponseCode::OK;
+
+ Request.WriteResponse(ResponseCode);
+}
+
+void
+HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Context,
+ cache::IRpcRequestReplayer& Replayer,
+ uint32_t ThreadCount)
+{
+ WorkerThreadPool WorkerPool(ThreadCount);
+ uint64_t RequestCount = Replayer.GetRequestCount();
+ Stopwatch Timer;
+ auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); });
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ ZEN_INFO("Replaying {} requests", RequestCount);
+ for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex)
+ {
+ if (AbortFlag)
+ {
+ break;
+ }
+ Work.ScheduleWork(WorkerPool, [this, &Context, &Replayer, RequestIndex](std::atomic<bool>& AbortFlag) {
+ IoBuffer Body;
+ zen::cache::RecordedRequestInfo RequestInfo = Replayer.GetRequest(RequestIndex, /* out */ Body);
+
+ if (AbortFlag)
+ {
+ return;
+ }
+
+ if (Body)
+ {
+ uint32_t AcceptMagic = 0;
+ RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
+ int TargetPid = 0;
+ CbPackage RpcResult;
+ if (m_RpcHandler.HandleRpcRequest(Context,
+ /* UriNamespace */ {},
+ RequestInfo.ContentType,
+ std::move(Body),
+ AcceptMagic,
+ AcceptFlags,
+ TargetPid,
+ RpcResult) == CacheRpcHandler::RpcResponseCode::OK)
+ {
+ if (AcceptMagic == kCbPkgMagic)
+ {
+ void* TargetProcessHandle = nullptr;
+ FormatFlags Flags = FormatFlags::kDefault;
+ if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
+ {
+ Flags |= FormatFlags::kAllowLocalReferences;
+ if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
+ {
+ Flags |= FormatFlags::kDenyPartialLocalReferences;
+ }
+ TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(Context.SessionId, TargetPid);
+ }
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle);
+ ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0);
+ }
+ else
+ {
+ BinaryWriter MemStream;
+ RpcResult.Save(MemStream);
+ IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize());
+ ZEN_ASSERT(RpcResponseBuffer.Size() > 0);
+ }
+ }
+ }
+ });
+ }
+ Work.Wait(10000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(IsAborted, IsPaused);
+ ZEN_INFO("Replayed {} of {} requests, elapsed {}",
+ RequestCount - PendingWork,
+ RequestCount,
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ });
+}
+
+void
+HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request, std::string_view UriNamespace)
+{
+ ZEN_MEMSCOPE(GetCacheRpcTag());
+
+ ZEN_TRACE_CPU("z$::Http::HandleRpcRequest");
+
+ const bool HasUpstream = m_UpstreamCache.IsActive();
+
+ switch (Request.RequestVerb())
+ {
+ case HttpVerb::kPost:
+ {
+ CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()};
+
+ const HttpContentType ContentType = Request.RequestContentType();
+ const HttpContentType AcceptType = Request.AcceptContentType();
+
+ if ((ContentType != HttpContentType::kCbObject && ContentType != HttpContentType::kCbPackage) ||
+ AcceptType != HttpContentType::kCbPackage)
+ {
+ m_CacheStats.BadRequestCount++;
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ auto HandleRpc = [this,
+ RequestContext,
+ Body = Request.ReadPayload(),
+ ContentType,
+ AcceptType,
+ UriNamespaceString = std::string{UriNamespace}](HttpServerRequest& AsyncRequest) mutable {
+ if (m_RequestRecordingEnabled)
+ {
+ RwLock::SharedLockScope _(m_RequestRecordingLock);
+ if (m_RequestRecorder)
+ {
+ m_RequestRecorder->RecordRequest(
+ {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId},
+ Body);
+ }
+ }
+
+ uint32_t AcceptMagic = 0;
+ RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
+ int TargetProcessId = 0;
+ CbPackage RpcResult;
+
+ CacheRpcHandler::RpcResponseCode ResultCode = m_RpcHandler.HandleRpcRequest(RequestContext,
+ UriNamespaceString,
+ ContentType,
+ std::move(Body),
+ /* out */ AcceptMagic,
+ /* out */ AcceptFlags,
+ /* out */ TargetProcessId,
+ /* out */ RpcResult);
+
+ HttpResponseCode HttpResultCode = HttpResponseCode(int(ResultCode));
+
+ if (!IsHttpSuccessCode(HttpResultCode))
+ {
+ return AsyncRequest.WriteResponse(HttpResultCode);
+ }
+
+ if (AcceptMagic == kCbPkgMagic)
+ {
+ void* TargetProcessHandle = nullptr;
+ FormatFlags Flags = FormatFlags::kDefault;
+ if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
+ {
+ Flags |= FormatFlags::kAllowLocalReferences;
+ if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
+ {
+ Flags |= FormatFlags::kDenyPartialLocalReferences;
+ }
+ TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(RequestContext.SessionId, TargetProcessId);
+ }
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle);
+ AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
+ else
+ {
+ BinaryWriter MemStream;
+ RpcResult.Save(MemStream);
+
+ AsyncRequest.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ }
+ };
+
+ if (HasUpstream)
+ {
+ ZEN_TRACE_CPU("z$::Http::HandleRpcRequest::WriteResponseAsync");
+ Request.WriteResponseAsync(std::move(HandleRpc));
+ }
+ else
+ {
+ ZEN_TRACE_CPU("z$::Http::HandleRpcRequest::WriteResponse");
+ HandleRpc(Request);
+ }
+ }
+ break;
+
+ default:
+ m_CacheStats.BadRequestCount++;
+ Request.WriteResponse(HttpResponseCode::BadRequest);
+ break;
+ }
+}
+
+void
+HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request)
+{
+ ZEN_MEMSCOPE(GetCacheHttpTag());
+
+ CbObjectWriter Cbo;
+
+ EmitSnapshot("requests", m_HttpRequests, Cbo);
+
+ const uint64_t HitCount = m_CacheStats.HitCount;
+ const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount;
+ const uint64_t MissCount = m_CacheStats.MissCount;
+ const uint64_t WriteCount = m_CacheStats.WriteCount;
+ const uint64_t BadRequestCount = m_CacheStats.BadRequestCount;
+ struct CidStoreStats StoreStats = m_CidStore.Stats();
+ const uint64_t ChunkHitCount = StoreStats.HitCount;
+ const uint64_t ChunkMissCount = StoreStats.MissCount;
+ const uint64_t ChunkWriteCount = StoreStats.WriteCount;
+ const uint64_t TotalCount = HitCount + MissCount;
+
+ const uint64_t RpcRequests = m_CacheStats.RpcRequests;
+ const uint64_t RpcRecordRequests = m_CacheStats.RpcRecordRequests;
+ const uint64_t RpcRecordBatchRequests = m_CacheStats.RpcRecordBatchRequests;
+ const uint64_t RpcValueRequests = m_CacheStats.RpcValueRequests;
+ const uint64_t RpcValueBatchRequests = m_CacheStats.RpcValueBatchRequests;
+ const uint64_t RpcChunkRequests = m_CacheStats.RpcChunkRequests;
+ const uint64_t RpcChunkBatchRequests = m_CacheStats.RpcChunkBatchRequests;
+
+ const CidStoreSize CidSize = m_CidStore.TotalSize();
+ const CacheStoreSize CacheSize = m_CacheStore.TotalSize();
+
+ bool ShowCidStoreStats = Request.GetQueryParams().GetValue("cidstorestats") == "true";
+ bool ShowCacheStoreStats = Request.GetQueryParams().GetValue("cachestorestats") == "true";
+
+ CidStoreStats CidStoreStats = {};
+ if (ShowCidStoreStats)
+ {
+ CidStoreStats = m_CidStore.Stats();
+ }
+ ZenCacheStore::CacheStoreStats CacheStoreStats = {};
+ if (ShowCacheStoreStats)
+ {
+ CacheStoreStats = m_CacheStore.Stats();
+ }
+
+ Cbo.BeginObject("cache");
+ {
+ Cbo << "badrequestcount" << BadRequestCount;
+ Cbo.BeginObject("rpc");
+ Cbo << "count" << RpcRequests;
+ Cbo << "ops" << RpcRecordBatchRequests + RpcValueBatchRequests + RpcChunkBatchRequests;
+ Cbo.BeginObject("records");
+ Cbo << "count" << RpcRecordRequests;
+ Cbo << "ops" << RpcRecordBatchRequests;
+ Cbo.EndObject();
+ Cbo.BeginObject("values");
+ Cbo << "count" << RpcValueRequests;
+ Cbo << "ops" << RpcValueBatchRequests;
+ Cbo.EndObject();
+ Cbo.BeginObject("chunks");
+ Cbo << "count" << RpcChunkRequests;
+ Cbo << "ops" << RpcChunkBatchRequests;
+ Cbo.EndObject();
+ Cbo.EndObject();
+
+ Cbo.BeginObject("size");
+ {
+ Cbo << "disk" << CacheSize.DiskSize;
+ Cbo << "memory" << CacheSize.MemorySize;
+ }
+ Cbo.EndObject();
+
+ Cbo << "hits" << HitCount << "misses" << MissCount << "writes" << WriteCount;
+ Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0);
+
+ if (m_UpstreamCache.IsActive())
+ {
+ Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0);
+ Cbo << "upstream_hits" << m_CacheStats.UpstreamHitCount;
+ Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0);
+ Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0);
+ }
+
+ Cbo << "cidhits" << ChunkHitCount << "cidmisses" << ChunkMissCount << "cidwrites" << ChunkWriteCount;
+
+ if (ShowCacheStoreStats)
+ {
+ Cbo.BeginObject("store");
+ Cbo << "hits" << CacheStoreStats.HitCount << "misses" << CacheStoreStats.MissCount << "writes" << CacheStoreStats.WriteCount
+ << "rejected_writes" << CacheStoreStats.RejectedWriteCount << "rejected_reads" << CacheStoreStats.RejectedReadCount;
+ const uint64_t StoreTotal = CacheStoreStats.HitCount + CacheStoreStats.MissCount;
+ Cbo << "hit_ratio" << (StoreTotal > 0 ? (double(CacheStoreStats.HitCount) / double(StoreTotal)) : 0.0);
+ EmitSnapshot("read", CacheStoreStats.GetOps, Cbo);
+ EmitSnapshot("write", CacheStoreStats.PutOps, Cbo);
+ if (!CacheStoreStats.NamespaceStats.empty())
+ {
+ Cbo.BeginArray("namespaces");
+ for (const ZenCacheStore::NamedNamespaceStats& NamespaceStats : CacheStoreStats.NamespaceStats)
+ {
+ Cbo.BeginObject();
+ Cbo.AddString("namespace", NamespaceStats.NamespaceName);
+ Cbo << "hits" << NamespaceStats.Stats.HitCount << "misses" << NamespaceStats.Stats.MissCount << "writes"
+ << NamespaceStats.Stats.WriteCount;
+ const uint64_t NamespaceTotal = NamespaceStats.Stats.HitCount + NamespaceStats.Stats.MissCount;
+ Cbo << "hit_ratio" << (NamespaceTotal > 0 ? (double(NamespaceStats.Stats.HitCount) / double(NamespaceTotal)) : 0.0);
+ EmitSnapshot("read", NamespaceStats.Stats.GetOps, Cbo);
+ EmitSnapshot("write", NamespaceStats.Stats.PutOps, Cbo);
+ Cbo.BeginObject("size");
+ {
+ Cbo << "disk" << NamespaceStats.Stats.DiskStats.DiskSize;
+ Cbo << "memory" << NamespaceStats.Stats.DiskStats.MemorySize;
+ }
+ Cbo.EndObject();
+ if (!NamespaceStats.Stats.DiskStats.BucketStats.empty())
+ {
+ Cbo.BeginArray("buckets");
+ for (const ZenCacheDiskLayer::NamedBucketStats& BucketStats : NamespaceStats.Stats.DiskStats.BucketStats)
+ {
+ Cbo.BeginObject();
+ Cbo.AddString("bucket", BucketStats.BucketName);
+ if (BucketStats.Stats.DiskSize != 0 || BucketStats.Stats.MemorySize != 0)
+ {
+ Cbo.BeginObject("size");
+ {
+ Cbo << "disk" << BucketStats.Stats.DiskSize;
+ Cbo << "memory" << BucketStats.Stats.MemorySize;
+ }
+ Cbo.EndObject();
+ }
+
+ if (BucketStats.Stats.DiskSize == 0 && BucketStats.Stats.DiskHitCount == 0 &&
+ BucketStats.Stats.DiskMissCount == 0 && BucketStats.Stats.DiskWriteCount == 0 &&
+ BucketStats.Stats.MemoryHitCount == 0 && BucketStats.Stats.MemoryMissCount == 0 &&
+ BucketStats.Stats.MemoryWriteCount == 0)
+ {
+ Cbo.EndObject();
+ continue;
+ }
+
+ const uint64_t BucketDiskTotal = BucketStats.Stats.DiskHitCount + BucketStats.Stats.DiskMissCount;
+ if (BucketDiskTotal != 0 || BucketStats.Stats.DiskWriteCount != 0)
+ {
+ Cbo << "hits" << BucketStats.Stats.DiskHitCount << "misses" << BucketStats.Stats.DiskMissCount << "writes"
+ << BucketStats.Stats.DiskWriteCount;
+ Cbo << "hit_ratio"
+ << (BucketDiskTotal > 0 ? (double(BucketStats.Stats.DiskHitCount) / double(BucketDiskTotal)) : 0.0);
+ }
+
+ const uint64_t BucketMemoryTotal = BucketStats.Stats.MemoryHitCount + BucketStats.Stats.MemoryMissCount;
+ if (BucketMemoryTotal != 0 || BucketStats.Stats.MemoryWriteCount != 0)
+ {
+ Cbo << "mem_hits" << BucketStats.Stats.MemoryHitCount << "mem_misses" << BucketStats.Stats.MemoryMissCount
+ << "mem_writes" << BucketStats.Stats.MemoryWriteCount;
+ Cbo << "mem_hit_ratio"
+ << (BucketMemoryTotal > 0 ? (double(BucketStats.Stats.MemoryHitCount) / double(BucketMemoryTotal))
+ : 0.0);
+ }
+
+ if (BucketDiskTotal != 0 || BucketStats.Stats.DiskWriteCount != 0 || BucketMemoryTotal != 0 ||
+ BucketStats.Stats.MemoryWriteCount != 0)
+ {
+ EmitSnapshot("read", BucketStats.Stats.GetOps, Cbo);
+ EmitSnapshot("write", BucketStats.Stats.PutOps, Cbo);
+ }
+
+ Cbo.EndObject();
+ }
+ Cbo.EndArray();
+ }
+ Cbo.EndObject();
+ }
+ Cbo.EndArray();
+ }
+ Cbo.EndObject();
+ }
+ Cbo.EndObject();
+ }
+
+ if (m_UpstreamCache.IsActive())
+ {
+ EmitSnapshot("upstream_gets", m_UpstreamGetRequestTiming, Cbo);
+ Cbo.BeginObject("upstream");
+ {
+ m_UpstreamCache.GetStatus(Cbo);
+ }
+ Cbo.EndObject();
+ }
+
+ Cbo.BeginObject("cid");
+ {
+ Cbo.BeginObject("size");
+ {
+ Cbo << "tiny" << CidSize.TinySize;
+ Cbo << "small" << CidSize.SmallSize;
+ Cbo << "large" << CidSize.LargeSize;
+ Cbo << "total" << CidSize.TotalSize;
+ }
+ Cbo.EndObject();
+
+ if (ShowCidStoreStats)
+ {
+ Cbo.BeginObject("store");
+ Cbo << "hits" << CidStoreStats.HitCount << "misses" << CidStoreStats.MissCount << "writes" << CidStoreStats.WriteCount;
+ EmitSnapshot("read", CidStoreStats.FindChunkOps, Cbo);
+ EmitSnapshot("write", CidStoreStats.AddChunkOps, Cbo);
+ // EmitSnapshot("exists", CidStoreStats.ContainChunkOps, Cbo);
+ Cbo.EndObject();
+ }
+ }
+ Cbo.EndObject();
+
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
+void
+HttpStructuredCacheService::HandleStatusRequest(HttpServerRequest& Request)
+{
+ CbObjectWriter Cbo;
+ Cbo << "ok" << true;
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
+bool
+HttpStructuredCacheService::AreDiskWritesAllowed() const
+{
+ return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed());
+}
+
+} // namespace zen