aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache/httpstructuredcache.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2025-10-14 11:32:16 +0200
committerGitHub Enterprise <[email protected]>2025-10-14 11:32:16 +0200
commitca09abbeef5b1788f4a52b61eedd2f3dd07f81f2 (patch)
tree005a50adfddf6982bab3a06bb93d4c50da1a11fd /src/zenserver/cache/httpstructuredcache.cpp
parentmake asiohttp work without IPv6 (#562) (diff)
downloadzen-ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2.tar.xz
zen-ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2.zip
move all storage-related services into storage tree (#571)
* move all storage-related services into storage tree * move config into config/ * also move admin service into storage since it mostly has storage related functionality * header consolidation
Diffstat (limited to 'src/zenserver/cache/httpstructuredcache.cpp')
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp2052
1 files changed, 0 insertions, 2052 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
deleted file mode 100644
index 9f87c208c..000000000
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ /dev/null
@@ -1,2052 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "httpstructuredcache.h"
-
-#include <zencore/compactbinary.h>
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinarypackage.h>
-#include <zencore/compactbinaryutil.h>
-#include <zencore/compactbinaryvalidation.h>
-#include <zencore/compress.h>
-#include <zencore/enumflags.h>
-#include <zencore/fmtutils.h>
-#include <zencore/logging.h>
-#include <zencore/memory/llm.h>
-#include <zencore/parallelwork.h>
-#include <zencore/scopeguard.h>
-#include <zencore/stream.h>
-#include <zencore/timer.h>
-#include <zencore/trace.h>
-#include <zencore/workthreadpool.h>
-#include <zenhttp/httpserver.h>
-#include <zenhttp/httpstats.h>
-#include <zenhttp/packageformat.h>
-#include <zenremotestore/jupiter/jupiterclient.h>
-#include <zenstore/cache/cache.h>
-#include <zenstore/cache/structuredcachestore.h>
-#include <zenstore/gc.h>
-#include <zenutil/rpcrecording.h>
-#include <zenutil/workerpools.h>
-
-#include "upstream/upstreamcache.h"
-#include "upstream/zen.h"
-#include "zenstore/cidstore.h"
-#include "zenstore/scrubcontext.h"
-
-#include <algorithm>
-#include <atomic>
-#include <filesystem>
-#include <queue>
-#include <thread>
-
-#include <gsl/gsl-lite.hpp>
-
-namespace zen {
-
-const FLLMTag&
-GetCacheHttpTag()
-{
- static FLLMTag CacheHttpTag("http", FLLMTag("cache"));
-
- return CacheHttpTag;
-}
-
-extern const FLLMTag& GetCacheRpcTag();
-
-using namespace std::literals;
-
-//////////////////////////////////////////////////////////////////////////
-
-CachePolicy
-ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams)
-{
- std::string_view PolicyText = QueryParams.GetValue("Policy"sv);
- return !PolicyText.empty() ? zen::ParseCachePolicy(PolicyText) : CachePolicy::Default;
-}
-
-namespace {
- static constinit std::string_view HttpZCacheRPCPrefix = "$rpc"sv;
- static constinit std::string_view HttpZCacheUtilStartRecording = "exec$/start-recording"sv;
- static constinit std::string_view HttpZCacheUtilStopRecording = "exec$/stop-recording"sv;
- static constinit std::string_view HttpZCacheUtilReplayRecording = "exec$/replay-recording"sv;
- static constinit std::string_view HttpZCacheDetailsPrefix = "details$"sv;
-} // namespace
-
-//////////////////////////////////////////////////////////////////////////
-
-HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
- CidStore& InCidStore,
- HttpStatsService& StatsService,
- HttpStatusService& StatusService,
- UpstreamCache& UpstreamCache,
- const DiskWriteBlocker* InDiskWriteBlocker,
- OpenProcessCache& InOpenProcessCache)
-: m_Log(logging::Get("cache"))
-, m_CacheStore(InCacheStore)
-, m_StatsService(StatsService)
-, m_StatusService(StatusService)
-, m_CidStore(InCidStore)
-, m_UpstreamCache(UpstreamCache)
-, m_DiskWriteBlocker(InDiskWriteBlocker)
-, m_OpenProcessCache(InOpenProcessCache)
-, m_RpcHandler(m_Log, m_CacheStats, UpstreamCache, InCacheStore, InCidStore, InDiskWriteBlocker)
-{
- m_StatsService.RegisterHandler("z$", *this);
- m_StatusService.RegisterHandler("z$", *this);
-}
-
-HttpStructuredCacheService::~HttpStructuredCacheService()
-{
- ZEN_INFO("closing structured cache");
- {
- RwLock::ExclusiveLockScope _(m_RequestRecordingLock);
- m_RequestRecordingEnabled.store(false);
- m_RequestRecorder.reset();
- }
-
- m_StatsService.UnregisterHandler("z$", *this);
- m_StatusService.UnregisterHandler("z$", *this);
-}
-
-const char*
-HttpStructuredCacheService::BaseUri() const
-{
- return "/z$/";
-}
-
-void
-HttpStructuredCacheService::Flush()
-{
- m_CacheStore.Flush();
-}
-
-void
-HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request)
-{
- std::string_view Key = Request.RelativeUri();
- std::vector<std::string> Tokens;
- uint32_t TokenCount = ForEachStrTok(Key, '/', [&Tokens](std::string_view Token) {
- Tokens.push_back(std::string(Token));
- return true;
- });
- std::string FilterNamespace;
- std::string FilterBucket;
- std::string FilterValue;
- switch (TokenCount)
- {
- case 1:
- break;
- case 2:
- {
- FilterNamespace = Tokens[1];
- if (FilterNamespace.empty())
- {
- m_CacheStats.BadRequestCount++;
- return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
- }
- }
- break;
- case 3:
- {
- FilterNamespace = Tokens[1];
- if (FilterNamespace.empty())
- {
- m_CacheStats.BadRequestCount++;
- return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
- }
- FilterBucket = Tokens[2];
- if (FilterBucket.empty())
- {
- m_CacheStats.BadRequestCount++;
- return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
- }
- }
- break;
- case 4:
- {
- FilterNamespace = Tokens[1];
- if (FilterNamespace.empty())
- {
- m_CacheStats.BadRequestCount++;
- return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
- }
- FilterBucket = Tokens[2];
- if (FilterBucket.empty())
- {
- m_CacheStats.BadRequestCount++;
- return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
- }
- FilterValue = Tokens[3];
- if (FilterValue.empty())
- {
- m_CacheStats.BadRequestCount++;
- return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
- }
- }
- break;
- default:
- m_CacheStats.BadRequestCount++;
- return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
- }
-
- HttpServerRequest::QueryParams Params = Request.GetQueryParams();
- bool CSV = Params.GetValue("csv") == "true";
- bool Details = Params.GetValue("details") == "true";
- bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true";
-
- std::chrono::seconds NowSeconds = std::chrono::duration_cast<std::chrono::seconds>(GcClock::Now().time_since_epoch());
- CacheValueDetails ValueDetails = m_CacheStore.GetValueDetails(FilterNamespace, FilterBucket, FilterValue);
-
- if (CSV)
- {
- ExtendableStringBuilder<4096> CSVWriter;
- if (AttachmentDetails)
- {
- CSVWriter << "Namespace, Bucket, Key, Cid, Size";
- }
- else if (Details)
- {
- CSVWriter << "Namespace, Bucket, Key, Size, RawSize, RawHash, ContentType, Age, AttachmentsCount, AttachmentsSize";
- }
- else
- {
- CSVWriter << "Namespace, Bucket, Key";
- }
- for (const auto& NamespaceIt : ValueDetails.Namespaces)
- {
- const std::string& Namespace = NamespaceIt.first;
- for (const auto& BucketIt : NamespaceIt.second.Buckets)
- {
- const std::string& Bucket = BucketIt.first;
- for (const auto& ValueIt : BucketIt.second.Values)
- {
- if (AttachmentDetails)
- {
- for (const IoHash& Hash : ValueIt.second.Attachments)
- {
- IoBuffer Payload = m_CidStore.FindChunkByCid(Hash);
- CSVWriter << "\r\n"
- << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << Hash.ToHexString()
- << ", " << gsl::narrow<uint64_t>(Payload.GetSize());
- }
- }
- else if (Details)
- {
- std::chrono::seconds LastAccessedSeconds = std::chrono::duration_cast<std::chrono::seconds>(
- GcClock::TimePointFromTick(ValueIt.second.LastAccess).time_since_epoch());
- CSVWriter << "\r\n"
- << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << ValueIt.second.Size << ","
- << ValueIt.second.RawSize << "," << ValueIt.second.RawHash.ToHexString() << ", "
- << ToString(ValueIt.second.ContentType) << ", " << (NowSeconds.count() - LastAccessedSeconds.count())
- << ", " << gsl::narrow<uint64_t>(ValueIt.second.Attachments.size());
- size_t AttachmentsSize = 0;
- for (const IoHash& Hash : ValueIt.second.Attachments)
- {
- IoBuffer Payload = m_CidStore.FindChunkByCid(Hash);
- AttachmentsSize += Payload.GetSize();
- }
- CSVWriter << ", " << gsl::narrow<uint64_t>(AttachmentsSize);
- }
- else
- {
- CSVWriter << "\r\n" << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString();
- }
- }
- }
- }
- return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView());
- }
- else
- {
- CbObjectWriter Cbo;
- Cbo.BeginArray("namespaces");
- {
- for (const auto& NamespaceIt : ValueDetails.Namespaces)
- {
- const std::string& Namespace = NamespaceIt.first;
- Cbo.BeginObject();
- {
- Cbo.AddString("name", Namespace);
- Cbo.BeginArray("buckets");
- {
- for (const auto& BucketIt : NamespaceIt.second.Buckets)
- {
- const std::string& Bucket = BucketIt.first;
- Cbo.BeginObject();
- {
- Cbo.AddString("name", Bucket);
- Cbo.BeginArray("values");
- {
- for (const auto& ValueIt : BucketIt.second.Values)
- {
- std::chrono::seconds LastAccessedSeconds = std::chrono::duration_cast<std::chrono::seconds>(
- GcClock::TimePointFromTick(ValueIt.second.LastAccess).time_since_epoch());
- Cbo.BeginObject();
- {
- Cbo.AddHash("key", ValueIt.first);
- if (Details)
- {
- Cbo.AddInteger("size", ValueIt.second.Size);
- if (ValueIt.second.Size > 0 && ValueIt.second.RawSize != 0 &&
- ValueIt.second.RawSize != ValueIt.second.Size)
- {
- Cbo.AddInteger("rawsize", ValueIt.second.RawSize);
- Cbo.AddHash("rawhash", ValueIt.second.RawHash);
- }
- Cbo.AddString("contenttype", ToString(ValueIt.second.ContentType));
- Cbo.AddInteger("age",
- gsl::narrow<uint64_t>(NowSeconds.count() - LastAccessedSeconds.count()));
- if (ValueIt.second.Attachments.size() > 0)
- {
- if (AttachmentDetails)
- {
- Cbo.BeginArray("attachments");
- {
- for (const IoHash& Hash : ValueIt.second.Attachments)
- {
- Cbo.BeginObject();
- Cbo.AddHash("cid", Hash);
- IoBuffer Payload = m_CidStore.FindChunkByCid(Hash);
- Cbo.AddInteger("size", gsl::narrow<uint64_t>(Payload.GetSize()));
- Cbo.EndObject();
- }
- }
- Cbo.EndArray();
- }
- else
- {
- Cbo.AddInteger("attachmentcount",
- gsl::narrow<uint64_t>(ValueIt.second.Attachments.size()));
- size_t AttachmentsSize = 0;
- for (const IoHash& Hash : ValueIt.second.Attachments)
- {
- IoBuffer Payload = m_CidStore.FindChunkByCid(Hash);
- AttachmentsSize += Payload.GetSize();
- }
- Cbo.AddInteger("attachmentssize", gsl::narrow<uint64_t>(AttachmentsSize));
- }
- }
- }
- }
- Cbo.EndObject();
- }
- }
- Cbo.EndArray();
- }
- Cbo.EndObject();
- }
- }
- Cbo.EndArray();
- }
- Cbo.EndObject();
- }
- }
- Cbo.EndArray();
- Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
- }
-}
-
-void
-HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
-{
- ZEN_TRACE_CPU("z$::Http::HandleRequest");
-
- ZEN_MEMSCOPE(GetCacheHttpTag());
-
- metrics::OperationTiming::Scope $(m_HttpRequests);
-
- const std::string_view Key = Request.RelativeUri();
-
- std::string_view UriNamespace;
-
- if (Key.ends_with(HttpZCacheRPCPrefix))
- {
- const size_t RpcOffset = Key.length() - HttpZCacheRPCPrefix.length();
-
- if (RpcOffset)
- {
- std::string_view KeyPrefix = Key.substr(0, RpcOffset);
-
- if (KeyPrefix.back() == '/')
- {
- KeyPrefix.remove_suffix(1);
-
- UriNamespace = KeyPrefix;
- }
- }
-
- return HandleRpcRequest(Request, UriNamespace);
- }
-
- if (Key == HttpZCacheUtilStartRecording)
- {
- HttpServerRequest::QueryParams Params = Request.GetQueryParams();
-
- std::string RecordPath = UrlDecode(Params.GetValue("path"));
-
- {
- RwLock::ExclusiveLockScope _(m_RequestRecordingLock);
- m_RequestRecordingEnabled.store(false);
- m_RequestRecorder.reset();
-
- m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath);
- m_RequestRecordingEnabled.store(true);
- }
- ZEN_INFO("cache RPC recording STARTED -> '{}'", RecordPath);
- Request.WriteResponse(HttpResponseCode::OK);
- return;
- }
-
- if (Key == HttpZCacheUtilStopRecording)
- {
- {
- RwLock::ExclusiveLockScope _(m_RequestRecordingLock);
- m_RequestRecordingEnabled.store(false);
- m_RequestRecorder.reset();
- }
- ZEN_INFO("cache RPC recording STOPPED");
- Request.WriteResponse(HttpResponseCode::OK);
- return;
- }
-
- if (Key == HttpZCacheUtilReplayRecording)
- {
- CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()};
-
- {
- RwLock::ExclusiveLockScope _(m_RequestRecordingLock);
- m_RequestRecordingEnabled.store(false);
- m_RequestRecorder.reset();
- }
-
- HttpServerRequest::QueryParams Params = Request.GetQueryParams();
-
- std::string RecordPath = UrlDecode(Params.GetValue("path"));
-
- uint32_t ThreadCount = GetHardwareConcurrency();
- if (auto Param = Params.GetValue("thread_count"); Param.empty() == false)
- {
- if (auto Value = ParseInt<uint64_t>(Param))
- {
- ThreadCount = gsl::narrow<uint32_t>(Value.value());
- }
- }
-
- ZEN_INFO("initiating cache RPC replay using {} threads, from '{}'", ThreadCount, RecordPath);
-
- std::unique_ptr<cache::IRpcRequestReplayer> Replayer(cache::MakeDiskRequestReplayer(RecordPath, false));
- ReplayRequestRecorder(RequestContext, *Replayer, ThreadCount < 1 ? 1 : ThreadCount);
-
- ZEN_INFO("cache RPC replay COMPLETED");
-
- Request.WriteResponse(HttpResponseCode::OK);
- return;
- }
-
- if (Key.starts_with(HttpZCacheDetailsPrefix))
- {
- HandleDetailsRequest(Request);
- return;
- }
-
- HttpCacheRequestData RequestData;
- if (!HttpCacheRequestParseRelativeUri(Key, ZenCacheStore::DefaultNamespace, RequestData))
- {
- m_CacheStats.BadRequestCount++;
- return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
- }
-
- if (RequestData.ValueContentId.has_value())
- {
- ZEN_ASSERT(RequestData.Namespace.has_value());
- ZEN_ASSERT(RequestData.Bucket.has_value());
- ZEN_ASSERT(RequestData.HashKey.has_value());
- CacheRef Ref = {.Namespace = RequestData.Namespace.value(),
- .BucketSegment = RequestData.Bucket.value(),
- .HashKey = RequestData.HashKey.value(),
- .ValueContentId = RequestData.ValueContentId.value()};
- return HandleCacheChunkRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams()));
- }
-
- if (RequestData.HashKey.has_value())
- {
- ZEN_ASSERT(RequestData.Namespace.has_value());
- ZEN_ASSERT(RequestData.Bucket.has_value());
- CacheRef Ref = {.Namespace = RequestData.Namespace.value(),
- .BucketSegment = RequestData.Bucket.value(),
- .HashKey = RequestData.HashKey.value(),
- .ValueContentId = IoHash::Zero};
- return HandleCacheRecordRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams()));
- }
-
- if (RequestData.Bucket.has_value())
- {
- ZEN_ASSERT(RequestData.Namespace.has_value());
- return HandleCacheBucketRequest(Request, RequestData.Namespace.value(), RequestData.Bucket.value());
- }
-
- if (RequestData.Namespace.has_value())
- {
- return HandleCacheNamespaceRequest(Request, RequestData.Namespace.value());
- }
- return HandleCacheRequest(Request);
-}
-
-void
-HttpStructuredCacheService::HandleCacheRequest(HttpServerRequest& Request)
-{
- switch (Request.RequestVerb())
- {
- case HttpVerb::kHead:
- case HttpVerb::kGet:
- {
- ZenCacheStore::Info Info = m_CacheStore.GetInfo();
-
- CbObjectWriter ResponseWriter;
-
- ResponseWriter.BeginObject("Configuration");
- {
- ExtendableStringBuilder<128> BasePathString;
- BasePathString << Info.BasePath.u8string();
- ResponseWriter.AddString("BasePath"sv, BasePathString.ToView());
- ResponseWriter.AddBool("AllowAutomaticCreationOfNamespaces", Info.Config.AllowAutomaticCreationOfNamespaces);
- ResponseWriter.BeginObject("Logging");
- {
- ResponseWriter.AddBool("EnableWriteLog", Info.Config.Logging.EnableWriteLog);
- ResponseWriter.AddBool("EnableAccessLog", Info.Config.Logging.EnableAccessLog);
- }
- ResponseWriter.EndObject();
- }
- ResponseWriter.EndObject();
-
- std::sort(begin(Info.NamespaceNames), end(Info.NamespaceNames), [](std::string_view L, std::string_view R) {
- return L.compare(R) < 0;
- });
- ResponseWriter.BeginArray("Namespaces");
- for (const std::string& NamespaceName : Info.NamespaceNames)
- {
- ResponseWriter.AddString(NamespaceName);
- }
- ResponseWriter.EndArray();
- ResponseWriter.BeginObject("StorageSize");
- {
- ResponseWriter.AddInteger("DiskSize", Info.StorageSize.DiskSize);
- ResponseWriter.AddInteger("MemorySize", Info.StorageSize.MemorySize);
- }
-
- ResponseWriter.EndObject();
-
- ResponseWriter.AddInteger("DiskEntryCount", Info.DiskEntryCount);
-
- return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save());
- }
- break;
- default:
- m_CacheStats.BadRequestCount++;
- break;
- }
-}
-
-void
-HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view NamespaceName)
-{
- switch (Request.RequestVerb())
- {
- case HttpVerb::kHead:
- case HttpVerb::kGet:
- {
- std::optional<ZenCacheNamespace::Info> Info = m_CacheStore.GetNamespaceInfo(NamespaceName);
- if (!Info.has_value())
- {
- return Request.WriteResponse(HttpResponseCode::NotFound);
- }
-
- CbObjectWriter ResponseWriter;
-
- ResponseWriter.BeginObject("Configuration");
- {
- ExtendableStringBuilder<128> BasePathString;
- BasePathString << Info->RootDir.u8string();
- ResponseWriter.AddString("RootDir"sv, BasePathString.ToView());
- ResponseWriter.AddInteger("MaxBlockSize"sv, Info->Config.DiskLayerConfig.BucketConfig.MaxBlockSize);
- ResponseWriter.AddInteger("PayloadAlignment"sv, Info->Config.DiskLayerConfig.BucketConfig.PayloadAlignment);
- ResponseWriter.AddInteger("MemCacheSizeThreshold"sv, Info->Config.DiskLayerConfig.BucketConfig.MemCacheSizeThreshold);
- ResponseWriter.AddInteger("LargeObjectThreshold"sv, Info->Config.DiskLayerConfig.BucketConfig.LargeObjectThreshold);
- ResponseWriter.AddInteger("MemCacheTargetFootprintBytes"sv, Info->Config.DiskLayerConfig.MemCacheTargetFootprintBytes);
- ResponseWriter.AddInteger("MemCacheTrimIntervalSeconds"sv, Info->Config.DiskLayerConfig.MemCacheTrimIntervalSeconds);
- ResponseWriter.AddInteger("MemCacheMaxAgeSeconds"sv, Info->Config.DiskLayerConfig.MemCacheMaxAgeSeconds);
- }
- ResponseWriter.EndObject();
-
- std::sort(begin(Info->BucketNames), end(Info->BucketNames), [](std::string_view L, std::string_view R) {
- return L.compare(R) < 0;
- });
-
- ResponseWriter.BeginArray("Buckets"sv);
- for (const std::string& BucketName : Info->BucketNames)
- {
- ResponseWriter.AddString(BucketName);
- }
- ResponseWriter.EndArray();
-
- ResponseWriter.BeginObject("StorageSize"sv);
- {
- ResponseWriter.AddInteger("DiskSize"sv, Info->DiskLayerInfo.StorageSize.DiskSize);
- ResponseWriter.AddInteger("MemorySize"sv, Info->DiskLayerInfo.StorageSize.MemorySize);
- }
- ResponseWriter.EndObject();
-
- ResponseWriter.AddInteger("EntryCount", Info->DiskLayerInfo.EntryCount);
-
- if (auto Buckets = HttpServerRequest::Decode(Request.GetQueryParams().GetValue("bucketsizes")); !Buckets.empty())
- {
- ResponseWriter.BeginObject("BucketSizes");
-
- ResponseWriter.BeginArray("Buckets");
-
- std::vector<std::string> BucketNames;
- if (Buckets == "*") // Get all - empty FieldFilter equal getting all fields
- {
- BucketNames = Info.value().BucketNames;
- }
- else
- {
- ForEachStrTok(Buckets, ',', [&](std::string_view BucketName) {
- BucketNames.push_back(std::string(BucketName));
- return true;
- });
- }
- WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Background);
- std::vector<IoHash> AllAttachments;
- for (const std::string& BucketName : BucketNames)
- {
- ResponseWriter.BeginObject();
- ResponseWriter << "Name" << BucketName;
- CacheContentStats ContentStats;
- bool Success = m_CacheStore.GetContentStats(NamespaceName, BucketName, ContentStats);
- if (Success)
- {
- size_t ValuesSize = 0;
- for (const uint64_t Size : ContentStats.ValueSizes)
- {
- ValuesSize += Size;
- }
-
- std::sort(ContentStats.Attachments.begin(), ContentStats.Attachments.end());
- auto NewEnd = std::unique(ContentStats.Attachments.begin(), ContentStats.Attachments.end());
- ContentStats.Attachments.erase(NewEnd, ContentStats.Attachments.end());
-
- ResponseWriter << "Count" << ContentStats.ValueSizes.size();
- ResponseWriter << "StructuredCount" << ContentStats.StructuredValuesCount;
- ResponseWriter << "StandaloneCount" << ContentStats.StandaloneValuesCount;
- ResponseWriter << "Size" << ValuesSize;
- ResponseWriter << "AttachmentCount" << ContentStats.Attachments.size();
-
- AllAttachments.insert(AllAttachments.end(), ContentStats.Attachments.begin(), ContentStats.Attachments.end());
- }
- ResponseWriter.EndObject();
- }
-
- ResponseWriter.EndArray();
-
- ResponseWriter.BeginObject("Attachments");
- std::sort(AllAttachments.begin(), AllAttachments.end());
- auto NewEnd = std::unique(AllAttachments.begin(), AllAttachments.end());
- AllAttachments.erase(NewEnd, AllAttachments.end());
-
- uint64_t AttachmentsSize = 0;
-
- m_CidStore.IterateChunks(
- AllAttachments,
- [&](size_t Index, const IoBuffer& Payload) {
- ZEN_UNUSED(Index);
- AttachmentsSize += Payload.GetSize();
- return true;
- },
- &WorkerPool,
- 8u * 1024u);
-
- ResponseWriter << "Count" << AllAttachments.size();
- ResponseWriter << "Size" << AttachmentsSize;
-
- ResponseWriter.EndObject();
-
- ResponseWriter.EndObject();
- }
-
- return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save());
- }
- break;
-
- case HttpVerb::kDelete:
- // Drop namespace
- {
- if (m_CacheStore.DropNamespace(NamespaceName))
- {
- return Request.WriteResponse(HttpResponseCode::OK);
- }
- else
- {
- return Request.WriteResponse(HttpResponseCode::NotFound);
- }
- }
- break;
-
- default:
- break;
- }
-}
-
-void
-HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request,
- std::string_view NamespaceName,
- std::string_view BucketName)
-{
- switch (Request.RequestVerb())
- {
- case HttpVerb::kHead:
- case HttpVerb::kGet:
- {
- std::optional<ZenCacheNamespace::BucketInfo> Info = m_CacheStore.GetBucketInfo(NamespaceName, BucketName);
- if (!Info.has_value())
- {
- return Request.WriteResponse(HttpResponseCode::NotFound);
- }
-
- CbObjectWriter ResponseWriter;
-
- ResponseWriter.BeginObject("StorageSize");
- {
- ResponseWriter.AddInteger("DiskSize", Info->DiskLayerInfo.StorageSize.DiskSize);
- ResponseWriter.AddInteger("MemorySize", Info->DiskLayerInfo.StorageSize.MemorySize);
- }
- ResponseWriter.EndObject();
-
- ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount);
-
- if (auto GetBucketSize = Request.GetQueryParams().GetValue("bucketsize"); GetBucketSize == "true")
- {
- CacheContentStats ContentStats;
- bool Success = m_CacheStore.GetContentStats(NamespaceName, BucketName, ContentStats);
- if (Success)
- {
- size_t ValuesSize = 0;
- for (const uint64_t Size : ContentStats.ValueSizes)
- {
- ValuesSize += Size;
- }
-
- std::sort(ContentStats.Attachments.begin(), ContentStats.Attachments.end());
- auto NewEnd = std::unique(ContentStats.Attachments.begin(), ContentStats.Attachments.end());
- ContentStats.Attachments.erase(NewEnd, ContentStats.Attachments.end());
-
- ResponseWriter << "Count" << ContentStats.ValueSizes.size();
- ResponseWriter << "StructuredCount" << ContentStats.StructuredValuesCount;
- ResponseWriter << "StandaloneCount" << ContentStats.StandaloneValuesCount;
- ResponseWriter << "Size" << ValuesSize;
- ResponseWriter << "AttachmentCount" << ContentStats.Attachments.size();
-
- uint64_t AttachmentsSize = 0;
-
- WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Background);
-
- m_CidStore.IterateChunks(
- ContentStats.Attachments,
- [&](size_t Index, const IoBuffer& Payload) {
- ZEN_UNUSED(Index);
- AttachmentsSize += Payload.GetSize();
- return true;
- },
- &WorkerPool,
- 8u * 1024u);
-
- ResponseWriter << "AttachmentsSize" << AttachmentsSize;
- }
- }
-
- return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save());
- }
- break;
-
- case HttpVerb::kDelete:
- // Drop bucket
- {
- if (m_CacheStore.DropBucket(NamespaceName, BucketName))
- {
- return Request.WriteResponse(HttpResponseCode::OK);
- }
- else
- {
- return Request.WriteResponse(HttpResponseCode::NotFound);
- }
- }
- break;
-
- default:
- break;
- }
-}
-
-void
-HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
-{
- switch (Request.RequestVerb())
- {
- case HttpVerb::kHead:
- case HttpVerb::kGet:
- HandleGetCacheRecord(Request, Ref, PolicyFromUrl);
- break;
-
- case HttpVerb::kPut:
- HandlePutCacheRecord(Request, Ref, PolicyFromUrl);
- break;
-
- default:
- break;
- }
-}
-
-void
-HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
-{
- const ZenContentType AcceptType = Request.AcceptContentType();
- const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData);
- const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord);
-
- bool Success = false;
- uint32_t MissingCount = 0;
- ZenCacheValue ClientResultValue;
- if (!EnumHasAnyFlags(PolicyFromUrl, CachePolicy::Query))
- {
- return Request.WriteResponse(HttpResponseCode::OK);
- }
-
- const bool HasUpstream = m_UpstreamCache.IsActive();
-
- CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()};
- Stopwatch Timer;
-
- if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal) &&
- m_CacheStore.Get(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue))
- {
- Success = true;
- ZenContentType ContentType = ClientResultValue.Value.GetContentType();
-
- if (AcceptType == ZenContentType::kCbPackage)
- {
- if (ContentType == ZenContentType::kCbObject)
- {
- CbPackage Package;
- CbValidateError ValidateError = CbValidateError::None;
- if (CbObject PackageObject = ValidateAndReadCompactBinaryObject(std::move(ClientResultValue.Value), ValidateError);
- ValidateError == CbValidateError::None)
- {
- CbObjectView CacheRecord(ClientResultValue.Value.Data());
- CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) {
- if (SkipData)
- {
- if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash()))
- {
- MissingCount++;
- }
- }
- else
- {
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
- {
- CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
- if (Compressed)
- {
- Package.AddAttachment(CbAttachment(Compressed, AttachmentHash.AsHash()));
- }
- else
- {
- ZEN_WARN("invalid compressed binary returned for {}", AttachmentHash.AsHash());
- MissingCount++;
- }
- }
- else
- {
- MissingCount++;
- }
- }
- });
-
- Success = MissingCount == 0 || PartialRecord;
- }
- else
- {
- ZEN_WARN("Invalid compact binary payload returned for {}/{}/{} ({}). Reason: '{}'",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- Ref.ValueContentId,
- ToString(ValidateError));
- Success = false;
- }
-
- if (Success)
- {
- CbObject PackageObject = LoadCompactBinaryObject(std::move(ClientResultValue.Value));
-
- Package.SetObject(std::move(PackageObject));
-
- BinaryWriter MemStream;
- Package.Save(MemStream);
-
- ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
- ClientResultValue.Value.SetContentType(HttpContentType::kCbPackage);
- }
- }
- else
- {
- Success = false;
- }
- }
- else if (AcceptType != ClientResultValue.Value.GetContentType() && AcceptType != ZenContentType::kUnknownContentType &&
- AcceptType != ZenContentType::kBinary)
- {
- Success = false;
- }
- }
-
- if (Success)
- {
- ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {} '{}' (LOCAL) in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(ClientResultValue.Value.Size()),
- ToString(ClientResultValue.Value.GetContentType()),
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
-
- m_CacheStats.HitCount++;
- if (SkipData && AcceptType != ZenContentType::kCbPackage && AcceptType != ZenContentType::kCbObject)
- {
- return Request.WriteResponse(HttpResponseCode::OK);
- }
- else
- {
- // kCbPackage handled SkipData when constructing the ClientResultValue, kcbObject ignores SkipData
- return Request.WriteResponse((MissingCount == 0) ? HttpResponseCode::OK : HttpResponseCode::PartialContent,
- ClientResultValue.Value.GetContentType(),
- ClientResultValue.Value);
- }
- }
- else if (!HasUpstream || !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryRemote))
- {
- ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(AcceptType),
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- m_CacheStats.MissCount++;
- return Request.WriteResponse(HttpResponseCode::NotFound);
- }
-
- // Issue upstream query asynchronously in order to keep requests flowing without
- // hogging I/O servicing threads with blocking work
-
- uint64_t LocalElapsedTimeUs = Timer.GetElapsedTimeUs();
-
- Request.WriteResponseAsync([this, AcceptType, PolicyFromUrl, Ref, LocalElapsedTimeUs, RequestContext](HttpServerRequest& AsyncRequest) {
- Stopwatch Timer;
- bool Success = false;
- const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord);
- const bool QueryLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
- const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
- const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData);
- ZenCacheValue ClientResultValue;
-
- metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming);
-
- if (GetUpstreamCacheSingleResult UpstreamResult =
- m_UpstreamCache.GetCacheRecord(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, AcceptType);
- UpstreamResult.Status.Success)
- {
- Success = true;
-
- ClientResultValue.Value = UpstreamResult.Value;
- ClientResultValue.Value.SetContentType(AcceptType);
-
- if (AcceptType == ZenContentType::kBinary || AcceptType == ZenContentType::kCbObject)
- {
- if (AcceptType == ZenContentType::kCbObject)
- {
- const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All);
- if (ValidationResult != CbValidateError::None)
- {
- Success = false;
- ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid compact binary object from upstream",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(AcceptType));
- }
-
- // We do not do anything to the returned object for SkipData, only package attachments are cut when skipping data
- }
-
- if (Success && StoreLocal)
- {
- const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
- ZenCacheStore::PutResult PutResult =
- m_CacheStore
- .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, Overwrite, nullptr);
- if (PutResult.Status == zen::PutStatus::Success)
- {
- m_CacheStats.WriteCount++;
- }
- }
- }
- else if (AcceptType == ZenContentType::kCbPackage)
- {
- CbPackage Package;
- if (Package.TryLoad(ClientResultValue.Value))
- {
- CbObject CacheRecord = Package.GetObject();
- AttachmentCount Count;
- size_t NumAttachments = Package.GetAttachments().size();
- std::vector<IoHash> ReferencedAttachments;
- std::vector<IoBuffer> WriteAttachmentBuffers;
- WriteAttachmentBuffers.reserve(NumAttachments);
- std::vector<IoHash> WriteRawHashes;
- WriteRawHashes.reserve(NumAttachments);
-
- CacheRecord.IterateAttachments([this,
- &Package,
- &Ref,
- &WriteAttachmentBuffers,
- &WriteRawHashes,
- &ReferencedAttachments,
- &Count,
- QueryLocal,
- StoreLocal,
- SkipData](CbFieldView HashView) {
- IoHash Hash = HashView.AsHash();
- ReferencedAttachments.push_back(Hash);
- if (const CbAttachment* Attachment = Package.FindAttachment(Hash))
- {
- if (Attachment->IsCompressedBinary())
- {
- if (StoreLocal)
- {
- const CompressedBuffer& Chunk = Attachment->AsCompressedBinary();
- WriteAttachmentBuffers.push_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
- WriteRawHashes.push_back(Attachment->GetHash());
- }
- Count.Valid++;
- }
- else
- {
- ZEN_WARN("Uncompressed value '{}' from upstream cache record '{}/{}'",
- Hash,
- Ref.BucketSegment,
- Ref.HashKey);
- Count.Invalid++;
- }
- }
- else if (QueryLocal)
- {
- if (SkipData)
- {
- if (m_CidStore.ContainsChunk(Hash))
- {
- Count.Valid++;
- }
- }
- else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash))
- {
- CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
- if (Compressed)
- {
- Package.AddAttachment(CbAttachment(Compressed, Hash));
- Count.Valid++;
- }
- else
- {
- ZEN_WARN("Uncompressed value '{}' stored in local cache '{}/{}'", Hash, Ref.BucketSegment, Ref.HashKey);
- Count.Invalid++;
- }
- }
- }
- Count.Total++;
- });
-
- if ((Count.Valid == Count.Total) || PartialRecord)
- {
- ZenCacheValue CacheValue;
- CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer();
- CacheValue.Value.SetContentType(ZenContentType::kCbObject);
-
- if (StoreLocal)
- {
- const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
- ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext,
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- CacheValue,
- ReferencedAttachments,
- Overwrite,
- nullptr);
- if (PutResult.Status == zen::PutStatus::Success)
- {
- m_CacheStats.WriteCount++;
-
- if (!WriteAttachmentBuffers.empty())
- {
- std::vector<CidStore::InsertResult> InsertResults =
- m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
- for (const CidStore::InsertResult& Result : InsertResults)
- {
- if (Result.New)
- {
- Count.New++;
- }
- }
- }
-
- WriteAttachmentBuffers = {};
- WriteRawHashes = {};
- }
- }
-
- BinaryWriter MemStream;
- if (SkipData)
- {
- // Save a package containing only the object.
- CbPackage(Package.GetObject()).Save(MemStream);
- }
- else
- {
- Package.Save(MemStream);
- }
-
- ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
- ClientResultValue.Value.SetContentType(ZenContentType::kCbPackage);
- }
- else
- {
- Success = false;
- ZEN_WARN("Get - '{}/{}' '{}' FAILED, attachments missing in upstream package",
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(AcceptType));
- }
- }
- else
- {
- Success = false;
- ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid upstream package",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(AcceptType));
- }
- }
- }
-
- if (Success)
- {
- ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {} '{}' (UPSTREAM) in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(ClientResultValue.Value.Size()),
- ToString(ClientResultValue.Value.GetContentType()),
- NiceLatencyNs((LocalElapsedTimeUs + Timer.GetElapsedTimeUs()) * 1000));
-
- m_CacheStats.HitCount++;
- m_CacheStats.UpstreamHitCount++;
-
- if (SkipData && AcceptType == ZenContentType::kBinary)
- {
- AsyncRequest.WriteResponse(HttpResponseCode::OK);
- }
- else
- {
- // Other methods modify ClientResultValue to a version that has skipped the data but keeps the Object and optionally
- // metadata.
- AsyncRequest.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value);
- }
- }
- else
- {
- ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(AcceptType),
- NiceLatencyNs((LocalElapsedTimeUs + Timer.GetElapsedTimeUs()) * 1000));
- m_CacheStats.MissCount++;
- AsyncRequest.WriteResponse(HttpResponseCode::NotFound);
- }
- });
-}
-
-void
-HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
-{
- IoBuffer Body = Request.ReadPayload();
-
- if (!Body || Body.Size() == 0)
- {
- m_CacheStats.BadRequestCount++;
- return Request.WriteResponse(HttpResponseCode::BadRequest);
- }
- if (!AreDiskWritesAllowed())
- {
- return Request.WriteResponse(HttpResponseCode::InsufficientStorage);
- }
-
- auto WriteFailureResponse = [&Request](const ZenCacheStore::PutResult& PutResult) {
- ZEN_UNUSED(PutResult);
-
- HttpResponseCode ResponseCode = HttpResponseCode::InternalServerError;
- switch (PutResult.Status)
- {
- case zen::PutStatus::Conflict:
- ResponseCode = HttpResponseCode::Conflict;
- break;
- case zen::PutStatus::Invalid:
- ResponseCode = HttpResponseCode::BadRequest;
- break;
- }
-
- if (PutResult.Details)
- {
- Request.WriteResponse(ResponseCode, PutResult.Details);
- }
- return Request.WriteResponse(ResponseCode);
- };
-
- const HttpContentType ContentType = Request.RequestContentType();
-
- Body.SetContentType(ContentType);
-
- CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()};
-
- const bool HasUpstream = m_UpstreamCache.IsActive();
-
- Stopwatch Timer;
-
- if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kCompressedBinary)
- {
- IoHash RawHash = IoHash::Zero;
- uint64_t RawSize = Body.GetSize();
- if (ContentType == HttpContentType::kCompressedBinary)
- {
- if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize))
- {
- m_CacheStats.BadRequestCount++;
- return Request.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- "Payload is not a valid compressed binary"sv);
- }
- }
- else
- {
- RawHash = IoHash::HashBuffer(SharedBuffer(Body));
- }
- const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
- // TODO: Propagation for rejected PUTs
- ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext,
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- {.Value = Body, .RawSize = RawSize, .RawHash = RawHash},
- {},
- Overwrite,
- nullptr);
- if (PutResult.Status != zen::PutStatus::Success)
- {
- return WriteFailureResponse(PutResult);
- }
- m_CacheStats.WriteCount++;
-
- if (HasUpstream && EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote))
- {
- m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}});
- }
-
- ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(Body.Size()),
- ToString(ContentType),
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- Request.WriteResponse(HttpResponseCode::Created);
- }
- else if (ContentType == HttpContentType::kCbObject)
- {
- const CbValidateError ValidationResult = ValidateCompactBinary(MemoryView(Body.GetData(), Body.GetSize()), CbValidateMode::All);
-
- if (ValidationResult != CbValidateError::None)
- {
- ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, invalid compact binary",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(ContentType));
- m_CacheStats.BadRequestCount++;
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv);
- }
-
- Body.SetContentType(ZenContentType::kCbObject);
-
- CbObjectView CacheRecord(Body.Data());
- std::vector<IoHash> ValidAttachments;
- std::vector<IoHash> ReferencedAttachments;
- int32_t TotalCount = 0;
-
- CacheRecord.IterateAttachments([this, &TotalCount, &ValidAttachments, &ReferencedAttachments](CbFieldView AttachmentHash) {
- const IoHash Hash = AttachmentHash.AsHash();
- ReferencedAttachments.push_back(Hash);
- if (m_CidStore.ContainsChunk(Hash))
- {
- ValidAttachments.emplace_back(Hash);
- }
- TotalCount++;
- });
-
- const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
-
- // TODO: Propagation for rejected PUTs
- ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext,
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- {.Value = Body},
- ReferencedAttachments,
- Overwrite,
- nullptr);
- if (PutResult.Status != zen::PutStatus::Success)
- {
- return WriteFailureResponse(PutResult);
- }
- m_CacheStats.WriteCount++;
-
- ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total) in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(Body.Size()),
- ToString(ContentType),
- TotalCount,
- ValidAttachments.size(),
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
-
- const bool IsPartialRecord = TotalCount != static_cast<int32_t>(ValidAttachments.size());
-
- CachePolicy Policy = PolicyFromUrl;
- if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
- {
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject,
- .Namespace = Ref.Namespace,
- .Key = {Ref.BucketSegment, Ref.HashKey},
- .ValueContentIds = std::move(ValidAttachments)});
- }
-
- Request.WriteResponse(HttpResponseCode::Created);
- }
- else if (ContentType == HttpContentType::kCbPackage)
- {
- CbPackage Package;
-
- if (!Package.TryLoad(Body))
- {
- ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, invalid package",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(ContentType));
- m_CacheStats.BadRequestCount++;
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv);
- }
- CachePolicy Policy = PolicyFromUrl;
-
- CbObject CacheRecord = Package.GetObject();
-
- AttachmentCount Count;
- size_t NumAttachments = Package.GetAttachments().size();
- std::vector<IoHash> ValidAttachments;
- std::vector<IoHash> ReferencedAttachments;
- ValidAttachments.reserve(NumAttachments);
- std::vector<IoBuffer> WriteAttachmentBuffers;
- std::vector<IoHash> WriteRawHashes;
- WriteAttachmentBuffers.reserve(NumAttachments);
- WriteRawHashes.reserve(NumAttachments);
-
- CacheRecord.IterateAttachments(
- [this, &Ref, &Package, &WriteAttachmentBuffers, &WriteRawHashes, &ValidAttachments, &ReferencedAttachments, &Count](
- CbFieldView HashView) {
- const IoHash Hash = HashView.AsHash();
- ReferencedAttachments.push_back(Hash);
- if (const CbAttachment* Attachment = Package.FindAttachment(Hash))
- {
- if (Attachment->IsCompressedBinary())
- {
- WriteAttachmentBuffers.emplace_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer());
- WriteRawHashes.push_back(Hash);
- ValidAttachments.emplace_back(Hash);
- Count.Valid++;
- }
- else
- {
- ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(HttpContentType::kCbPackage),
- Hash);
- Count.Invalid++;
- }
- }
- else if (m_CidStore.ContainsChunk(Hash))
- {
- ValidAttachments.emplace_back(Hash);
- Count.Valid++;
- }
- Count.Total++;
- });
-
- if (Count.Invalid > 0)
- {
- m_CacheStats.BadRequestCount++;
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv);
- }
-
- const bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal);
-
- ZenCacheValue CacheValue;
- CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer();
- CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- // TODO: Propagation for rejected PUTs
- ZenCacheStore::PutResult PutResult =
- m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments, Overwrite);
- if (PutResult.Status != zen::PutStatus::Success)
- {
- return WriteFailureResponse(PutResult);
- }
- m_CacheStats.WriteCount++;
-
- if (!WriteAttachmentBuffers.empty())
- {
- std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
- for (const CidStore::InsertResult& InsertResult : InsertResults)
- {
- if (InsertResult.New)
- {
- Count.New++;
- }
- }
- WriteAttachmentBuffers = {};
- WriteRawHashes = {};
- }
-
- ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total) in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(Body.GetSize()),
- ToString(ContentType),
- Count.New,
- Count.Valid,
- Count.Total,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
-
- const bool IsPartialRecord = Count.Valid != Count.Total;
-
- if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
- {
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
- .Namespace = Ref.Namespace,
- .Key = {Ref.BucketSegment, Ref.HashKey},
- .ValueContentIds = std::move(ValidAttachments)});
- }
-
- Request.WriteResponse(HttpResponseCode::Created);
- }
- else
- {
- m_CacheStats.BadRequestCount++;
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Content-Type invalid"sv);
- }
-}
-
-void
-HttpStructuredCacheService::HandleCacheChunkRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
-{
- switch (Request.RequestVerb())
- {
- case HttpVerb::kHead:
- case HttpVerb::kGet:
- HandleGetCacheChunk(Request, Ref, PolicyFromUrl);
- break;
- case HttpVerb::kPut:
- HandlePutCacheChunk(Request, Ref, PolicyFromUrl);
- break;
- default:
- break;
- }
-}
-
-void
-HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
-{
- Stopwatch Timer;
-
- IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId);
- const UpstreamEndpointInfo* Source = nullptr;
- CachePolicy Policy = PolicyFromUrl;
-
- const bool HasUpstream = m_UpstreamCache.IsActive();
- {
- const bool QueryUpstream = HasUpstream && !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote);
-
- if (QueryUpstream)
- {
- if (GetUpstreamCacheSingleResult UpstreamResult =
- m_UpstreamCache.GetCacheChunk(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId);
- UpstreamResult.Status.Success)
- {
- IoHash RawHash;
- uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(UpstreamResult.Value, RawHash, RawSize))
- {
- if (RawHash == Ref.ValueContentId)
- {
- if (AreDiskWritesAllowed())
- {
- m_CidStore.AddChunk(UpstreamResult.Value, RawHash);
- }
- Source = UpstreamResult.Source;
- }
- else
- {
- ZEN_WARN("got missmatching upstream cache value");
- }
- }
- else
- {
- ZEN_WARN("got uncompressed upstream cache value");
- }
- }
- }
- }
-
- if (!Value)
- {
- ZEN_DEBUG("GETCACHECHUNK MISS - '{}/{}/{}/{}' '{}' in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- Ref.ValueContentId,
- ToString(Request.AcceptContentType()),
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- m_CacheStats.MissCount++;
- return Request.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ZEN_DEBUG("GETCACHECHUNK HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- Ref.ValueContentId,
- NiceBytes(Value.Size()),
- ToString(Value.GetContentType()),
- Source ? Source->Url : "LOCAL"sv,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
-
- m_CacheStats.HitCount++;
- if (Source)
- {
- m_CacheStats.UpstreamHitCount++;
- }
-
- if (EnumHasAllFlags(Policy, CachePolicy::SkipData))
- {
- Request.WriteResponse(HttpResponseCode::OK);
- }
- else
- {
- Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value);
- }
-}
-
-void
-HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl)
-{
- // Note: Individual cacherecord values are not propagated upstream until a valid cache record has been stored
- ZEN_UNUSED(PolicyFromUrl);
-
- Stopwatch Timer;
-
- IoBuffer Body = Request.ReadPayload();
-
- if (!Body || Body.Size() == 0)
- {
- m_CacheStats.BadRequestCount++;
- return Request.WriteResponse(HttpResponseCode::BadRequest);
- }
- if (!AreDiskWritesAllowed())
- {
- return Request.WriteResponse(HttpResponseCode::InsufficientStorage);
- }
-
- Body.SetContentType(Request.RequestContentType());
-
- IoHash RawHash;
- uint64_t RawSize;
- if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize))
- {
- m_CacheStats.BadRequestCount++;
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv);
- }
-
- if (RawHash != Ref.ValueContentId)
- {
- m_CacheStats.BadRequestCount++;
- return Request.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- "ValueContentId does not match attachment hash"sv);
- }
-
- CidStore::InsertResult Result = m_CidStore.AddChunk(Body, RawHash);
-
- ZEN_DEBUG("PUTCACHECHUNK - '{}/{}/{}/{}' {} '{}' ({}) in {}",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- Ref.ValueContentId,
- NiceBytes(Body.Size()),
- ToString(Body.GetContentType()),
- Result.New ? "NEW" : "OLD",
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
-
- const HttpResponseCode ResponseCode = Result.New ? HttpResponseCode::Created : HttpResponseCode::OK;
-
- Request.WriteResponse(ResponseCode);
-}
-
-void
-HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Context,
- cache::IRpcRequestReplayer& Replayer,
- uint32_t ThreadCount)
-{
- WorkerThreadPool WorkerPool(ThreadCount);
- uint64_t RequestCount = Replayer.GetRequestCount();
- Stopwatch Timer;
- auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); });
- std::atomic<bool> AbortFlag;
- std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- ZEN_INFO("Replaying {} requests", RequestCount);
- for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex)
- {
- if (AbortFlag)
- {
- break;
- }
- Work.ScheduleWork(WorkerPool, [this, &Context, &Replayer, RequestIndex](std::atomic<bool>& AbortFlag) {
- IoBuffer Body;
- zen::cache::RecordedRequestInfo RequestInfo = Replayer.GetRequest(RequestIndex, /* out */ Body);
-
- if (AbortFlag)
- {
- return;
- }
-
- if (Body)
- {
- uint32_t AcceptMagic = 0;
- RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
- int TargetPid = 0;
- CbPackage RpcResult;
- if (m_RpcHandler.HandleRpcRequest(Context,
- /* UriNamespace */ {},
- RequestInfo.ContentType,
- std::move(Body),
- AcceptMagic,
- AcceptFlags,
- TargetPid,
- RpcResult) == CacheRpcHandler::RpcResponseCode::OK)
- {
- if (AcceptMagic == kCbPkgMagic)
- {
- void* TargetProcessHandle = nullptr;
- FormatFlags Flags = FormatFlags::kDefault;
- if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
- {
- Flags |= FormatFlags::kAllowLocalReferences;
- if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
- {
- Flags |= FormatFlags::kDenyPartialLocalReferences;
- }
- TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(Context.SessionId, TargetPid);
- }
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle);
- ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0);
- }
- else
- {
- BinaryWriter MemStream;
- RpcResult.Save(MemStream);
- IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize());
- ZEN_ASSERT(RpcResponseBuffer.Size() > 0);
- }
- }
- }
- });
- }
- Work.Wait(10000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, IsPaused);
- ZEN_INFO("Replayed {} of {} requests, elapsed {}",
- RequestCount - PendingWork,
- RequestCount,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- });
-}
-
-void
-HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request, std::string_view UriNamespace)
-{
- ZEN_MEMSCOPE(GetCacheRpcTag());
-
- ZEN_TRACE_CPU("z$::Http::HandleRpcRequest");
-
- const bool HasUpstream = m_UpstreamCache.IsActive();
-
- switch (Request.RequestVerb())
- {
- case HttpVerb::kPost:
- {
- CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()};
-
- const HttpContentType ContentType = Request.RequestContentType();
- const HttpContentType AcceptType = Request.AcceptContentType();
-
- if ((ContentType != HttpContentType::kCbObject && ContentType != HttpContentType::kCbPackage) ||
- AcceptType != HttpContentType::kCbPackage)
- {
- m_CacheStats.BadRequestCount++;
- return Request.WriteResponse(HttpResponseCode::BadRequest);
- }
-
- auto HandleRpc = [this,
- RequestContext,
- Body = Request.ReadPayload(),
- ContentType,
- AcceptType,
- UriNamespaceString = std::string{UriNamespace}](HttpServerRequest& AsyncRequest) mutable {
- if (m_RequestRecordingEnabled)
- {
- RwLock::SharedLockScope _(m_RequestRecordingLock);
- if (m_RequestRecorder)
- {
- m_RequestRecorder->RecordRequest(
- {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId},
- Body);
- }
- }
-
- uint32_t AcceptMagic = 0;
- RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
- int TargetProcessId = 0;
- CbPackage RpcResult;
-
- CacheRpcHandler::RpcResponseCode ResultCode = m_RpcHandler.HandleRpcRequest(RequestContext,
- UriNamespaceString,
- ContentType,
- std::move(Body),
- /* out */ AcceptMagic,
- /* out */ AcceptFlags,
- /* out */ TargetProcessId,
- /* out */ RpcResult);
-
- HttpResponseCode HttpResultCode = HttpResponseCode(int(ResultCode));
-
- if (!IsHttpSuccessCode(HttpResultCode))
- {
- return AsyncRequest.WriteResponse(HttpResultCode);
- }
-
- if (AcceptMagic == kCbPkgMagic)
- {
- void* TargetProcessHandle = nullptr;
- FormatFlags Flags = FormatFlags::kDefault;
- if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
- {
- Flags |= FormatFlags::kAllowLocalReferences;
- if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
- {
- Flags |= FormatFlags::kDenyPartialLocalReferences;
- }
- TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(RequestContext.SessionId, TargetProcessId);
- }
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle);
- AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- else
- {
- BinaryWriter MemStream;
- RpcResult.Save(MemStream);
-
- AsyncRequest.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
- }
- };
-
- if (HasUpstream)
- {
- ZEN_TRACE_CPU("z$::Http::HandleRpcRequest::WriteResponseAsync");
- Request.WriteResponseAsync(std::move(HandleRpc));
- }
- else
- {
- ZEN_TRACE_CPU("z$::Http::HandleRpcRequest::WriteResponse");
- HandleRpc(Request);
- }
- }
- break;
-
- default:
- m_CacheStats.BadRequestCount++;
- Request.WriteResponse(HttpResponseCode::BadRequest);
- break;
- }
-}
-
-void
-HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request)
-{
- ZEN_MEMSCOPE(GetCacheHttpTag());
-
- CbObjectWriter Cbo;
-
- EmitSnapshot("requests", m_HttpRequests, Cbo);
-
- const uint64_t HitCount = m_CacheStats.HitCount;
- const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount;
- const uint64_t MissCount = m_CacheStats.MissCount;
- const uint64_t WriteCount = m_CacheStats.WriteCount;
- const uint64_t BadRequestCount = m_CacheStats.BadRequestCount;
- struct CidStoreStats StoreStats = m_CidStore.Stats();
- const uint64_t ChunkHitCount = StoreStats.HitCount;
- const uint64_t ChunkMissCount = StoreStats.MissCount;
- const uint64_t ChunkWriteCount = StoreStats.WriteCount;
- const uint64_t TotalCount = HitCount + MissCount;
-
- const uint64_t RpcRequests = m_CacheStats.RpcRequests;
- const uint64_t RpcRecordRequests = m_CacheStats.RpcRecordRequests;
- const uint64_t RpcRecordBatchRequests = m_CacheStats.RpcRecordBatchRequests;
- const uint64_t RpcValueRequests = m_CacheStats.RpcValueRequests;
- const uint64_t RpcValueBatchRequests = m_CacheStats.RpcValueBatchRequests;
- const uint64_t RpcChunkRequests = m_CacheStats.RpcChunkRequests;
- const uint64_t RpcChunkBatchRequests = m_CacheStats.RpcChunkBatchRequests;
-
- const CidStoreSize CidSize = m_CidStore.TotalSize();
- const CacheStoreSize CacheSize = m_CacheStore.TotalSize();
-
- bool ShowCidStoreStats = Request.GetQueryParams().GetValue("cidstorestats") == "true";
- bool ShowCacheStoreStats = Request.GetQueryParams().GetValue("cachestorestats") == "true";
-
- CidStoreStats CidStoreStats = {};
- if (ShowCidStoreStats)
- {
- CidStoreStats = m_CidStore.Stats();
- }
- ZenCacheStore::CacheStoreStats CacheStoreStats = {};
- if (ShowCacheStoreStats)
- {
- CacheStoreStats = m_CacheStore.Stats();
- }
-
- Cbo.BeginObject("cache");
- {
- Cbo << "badrequestcount" << BadRequestCount;
- Cbo.BeginObject("rpc");
- Cbo << "count" << RpcRequests;
- Cbo << "ops" << RpcRecordBatchRequests + RpcValueBatchRequests + RpcChunkBatchRequests;
- Cbo.BeginObject("records");
- Cbo << "count" << RpcRecordRequests;
- Cbo << "ops" << RpcRecordBatchRequests;
- Cbo.EndObject();
- Cbo.BeginObject("values");
- Cbo << "count" << RpcValueRequests;
- Cbo << "ops" << RpcValueBatchRequests;
- Cbo.EndObject();
- Cbo.BeginObject("chunks");
- Cbo << "count" << RpcChunkRequests;
- Cbo << "ops" << RpcChunkBatchRequests;
- Cbo.EndObject();
- Cbo.EndObject();
-
- Cbo.BeginObject("size");
- {
- Cbo << "disk" << CacheSize.DiskSize;
- Cbo << "memory" << CacheSize.MemorySize;
- }
- Cbo.EndObject();
-
- Cbo << "hits" << HitCount << "misses" << MissCount << "writes" << WriteCount;
- Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0);
-
- if (m_UpstreamCache.IsActive())
- {
- Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0);
- Cbo << "upstream_hits" << m_CacheStats.UpstreamHitCount;
- Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0);
- Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0);
- }
-
- Cbo << "cidhits" << ChunkHitCount << "cidmisses" << ChunkMissCount << "cidwrites" << ChunkWriteCount;
-
- if (ShowCacheStoreStats)
- {
- Cbo.BeginObject("store");
- Cbo << "hits" << CacheStoreStats.HitCount << "misses" << CacheStoreStats.MissCount << "writes" << CacheStoreStats.WriteCount
- << "rejected_writes" << CacheStoreStats.RejectedWriteCount << "rejected_reads" << CacheStoreStats.RejectedReadCount;
- const uint64_t StoreTotal = CacheStoreStats.HitCount + CacheStoreStats.MissCount;
- Cbo << "hit_ratio" << (StoreTotal > 0 ? (double(CacheStoreStats.HitCount) / double(StoreTotal)) : 0.0);
- EmitSnapshot("read", CacheStoreStats.GetOps, Cbo);
- EmitSnapshot("write", CacheStoreStats.PutOps, Cbo);
- if (!CacheStoreStats.NamespaceStats.empty())
- {
- Cbo.BeginArray("namespaces");
- for (const ZenCacheStore::NamedNamespaceStats& NamespaceStats : CacheStoreStats.NamespaceStats)
- {
- Cbo.BeginObject();
- Cbo.AddString("namespace", NamespaceStats.NamespaceName);
- Cbo << "hits" << NamespaceStats.Stats.HitCount << "misses" << NamespaceStats.Stats.MissCount << "writes"
- << NamespaceStats.Stats.WriteCount;
- const uint64_t NamespaceTotal = NamespaceStats.Stats.HitCount + NamespaceStats.Stats.MissCount;
- Cbo << "hit_ratio" << (NamespaceTotal > 0 ? (double(NamespaceStats.Stats.HitCount) / double(NamespaceTotal)) : 0.0);
- EmitSnapshot("read", NamespaceStats.Stats.GetOps, Cbo);
- EmitSnapshot("write", NamespaceStats.Stats.PutOps, Cbo);
- Cbo.BeginObject("size");
- {
- Cbo << "disk" << NamespaceStats.Stats.DiskStats.DiskSize;
- Cbo << "memory" << NamespaceStats.Stats.DiskStats.MemorySize;
- }
- Cbo.EndObject();
- if (!NamespaceStats.Stats.DiskStats.BucketStats.empty())
- {
- Cbo.BeginArray("buckets");
- for (const ZenCacheDiskLayer::NamedBucketStats& BucketStats : NamespaceStats.Stats.DiskStats.BucketStats)
- {
- Cbo.BeginObject();
- Cbo.AddString("bucket", BucketStats.BucketName);
- if (BucketStats.Stats.DiskSize != 0 || BucketStats.Stats.MemorySize != 0)
- {
- Cbo.BeginObject("size");
- {
- Cbo << "disk" << BucketStats.Stats.DiskSize;
- Cbo << "memory" << BucketStats.Stats.MemorySize;
- }
- Cbo.EndObject();
- }
-
- if (BucketStats.Stats.DiskSize == 0 && BucketStats.Stats.DiskHitCount == 0 &&
- BucketStats.Stats.DiskMissCount == 0 && BucketStats.Stats.DiskWriteCount == 0 &&
- BucketStats.Stats.MemoryHitCount == 0 && BucketStats.Stats.MemoryMissCount == 0 &&
- BucketStats.Stats.MemoryWriteCount == 0)
- {
- Cbo.EndObject();
- continue;
- }
-
- const uint64_t BucketDiskTotal = BucketStats.Stats.DiskHitCount + BucketStats.Stats.DiskMissCount;
- if (BucketDiskTotal != 0 || BucketStats.Stats.DiskWriteCount != 0)
- {
- Cbo << "hits" << BucketStats.Stats.DiskHitCount << "misses" << BucketStats.Stats.DiskMissCount << "writes"
- << BucketStats.Stats.DiskWriteCount;
- Cbo << "hit_ratio"
- << (BucketDiskTotal > 0 ? (double(BucketStats.Stats.DiskHitCount) / double(BucketDiskTotal)) : 0.0);
- }
-
- const uint64_t BucketMemoryTotal = BucketStats.Stats.MemoryHitCount + BucketStats.Stats.MemoryMissCount;
- if (BucketMemoryTotal != 0 || BucketStats.Stats.MemoryWriteCount != 0)
- {
- Cbo << "mem_hits" << BucketStats.Stats.MemoryHitCount << "mem_misses" << BucketStats.Stats.MemoryMissCount
- << "mem_writes" << BucketStats.Stats.MemoryWriteCount;
- Cbo << "mem_hit_ratio"
- << (BucketMemoryTotal > 0 ? (double(BucketStats.Stats.MemoryHitCount) / double(BucketMemoryTotal))
- : 0.0);
- }
-
- if (BucketDiskTotal != 0 || BucketStats.Stats.DiskWriteCount != 0 || BucketMemoryTotal != 0 ||
- BucketStats.Stats.MemoryWriteCount != 0)
- {
- EmitSnapshot("read", BucketStats.Stats.GetOps, Cbo);
- EmitSnapshot("write", BucketStats.Stats.PutOps, Cbo);
- }
-
- Cbo.EndObject();
- }
- Cbo.EndArray();
- }
- Cbo.EndObject();
- }
- Cbo.EndArray();
- }
- Cbo.EndObject();
- }
- Cbo.EndObject();
- }
-
- if (m_UpstreamCache.IsActive())
- {
- EmitSnapshot("upstream_gets", m_UpstreamGetRequestTiming, Cbo);
- Cbo.BeginObject("upstream");
- {
- m_UpstreamCache.GetStatus(Cbo);
- }
- Cbo.EndObject();
- }
-
- Cbo.BeginObject("cid");
- {
- Cbo.BeginObject("size");
- {
- Cbo << "tiny" << CidSize.TinySize;
- Cbo << "small" << CidSize.SmallSize;
- Cbo << "large" << CidSize.LargeSize;
- Cbo << "total" << CidSize.TotalSize;
- }
- Cbo.EndObject();
-
- if (ShowCidStoreStats)
- {
- Cbo.BeginObject("store");
- Cbo << "hits" << CidStoreStats.HitCount << "misses" << CidStoreStats.MissCount << "writes" << CidStoreStats.WriteCount;
- EmitSnapshot("read", CidStoreStats.FindChunkOps, Cbo);
- EmitSnapshot("write", CidStoreStats.AddChunkOps, Cbo);
- // EmitSnapshot("exists", CidStoreStats.ContainChunkOps, Cbo);
- Cbo.EndObject();
- }
- }
- Cbo.EndObject();
-
- Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
-}
-
-void
-HttpStructuredCacheService::HandleStatusRequest(HttpServerRequest& Request)
-{
- CbObjectWriter Cbo;
- Cbo << "ok" << true;
- Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
-}
-
-bool
-HttpStructuredCacheService::AreDiskWritesAllowed() const
-{
- return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed());
-}
-
-} // namespace zen