diff options
| author | Stefan Boberg <[email protected]> | 2025-10-14 11:32:16 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-14 11:32:16 +0200 |
| commit | ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2 (patch) | |
| tree | 005a50adfddf6982bab3a06bb93d4c50da1a11fd /src/zenserver/storage/cache/httpstructuredcache.cpp | |
| parent | make asiohttp work without IPv6 (#562) (diff) | |
| download | zen-ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2.tar.xz zen-ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2.zip | |
move all storage-related services into storage tree (#571)
* move all storage-related services into storage tree
* move config into config/
* also move admin service into storage since it mostly has storage related functionality
* header consolidation
Diffstat (limited to 'src/zenserver/storage/cache/httpstructuredcache.cpp')
| -rw-r--r-- | src/zenserver/storage/cache/httpstructuredcache.cpp | 2052 |
1 files changed, 2052 insertions, 0 deletions
diff --git a/src/zenserver/storage/cache/httpstructuredcache.cpp b/src/zenserver/storage/cache/httpstructuredcache.cpp new file mode 100644 index 000000000..ece1d7a46 --- /dev/null +++ b/src/zenserver/storage/cache/httpstructuredcache.cpp @@ -0,0 +1,2052 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "httpstructuredcache.h" + +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryutil.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/compress.h> +#include <zencore/enumflags.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/memory/llm.h> +#include <zencore/parallelwork.h> +#include <zencore/scopeguard.h> +#include <zencore/stream.h> +#include <zencore/timer.h> +#include <zencore/trace.h> +#include <zencore/workthreadpool.h> +#include <zenhttp/httpserver.h> +#include <zenhttp/httpstats.h> +#include <zenhttp/packageformat.h> +#include <zenremotestore/jupiter/jupiterclient.h> +#include <zenstore/cache/cache.h> +#include <zenstore/cache/structuredcachestore.h> +#include <zenstore/gc.h> +#include <zenutil/rpcrecording.h> +#include <zenutil/workerpools.h> + +#include "storage/upstream/upstreamcache.h" +#include "storage/upstream/zen.h" +#include "zenstore/cidstore.h" +#include "zenstore/scrubcontext.h" + +#include <algorithm> +#include <atomic> +#include <filesystem> +#include <queue> +#include <thread> + +#include <gsl/gsl-lite.hpp> + +namespace zen { + +const FLLMTag& +GetCacheHttpTag() +{ + static FLLMTag CacheHttpTag("http", FLLMTag("cache")); + + return CacheHttpTag; +} + +extern const FLLMTag& GetCacheRpcTag(); + +using namespace std::literals; + +////////////////////////////////////////////////////////////////////////// + +CachePolicy +ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams) +{ + std::string_view PolicyText = QueryParams.GetValue("Policy"sv); + return !PolicyText.empty() ? zen::ParseCachePolicy(PolicyText) : CachePolicy::Default; +} + +namespace { + static constinit std::string_view HttpZCacheRPCPrefix = "$rpc"sv; + static constinit std::string_view HttpZCacheUtilStartRecording = "exec$/start-recording"sv; + static constinit std::string_view HttpZCacheUtilStopRecording = "exec$/stop-recording"sv; + static constinit std::string_view HttpZCacheUtilReplayRecording = "exec$/replay-recording"sv; + static constinit std::string_view HttpZCacheDetailsPrefix = "details$"sv; +} // namespace + +////////////////////////////////////////////////////////////////////////// + +HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, + CidStore& InCidStore, + HttpStatsService& StatsService, + HttpStatusService& StatusService, + UpstreamCache& UpstreamCache, + const DiskWriteBlocker* InDiskWriteBlocker, + OpenProcessCache& InOpenProcessCache) +: m_Log(logging::Get("cache")) +, m_CacheStore(InCacheStore) +, m_StatsService(StatsService) +, m_StatusService(StatusService) +, m_CidStore(InCidStore) +, m_UpstreamCache(UpstreamCache) +, m_DiskWriteBlocker(InDiskWriteBlocker) +, m_OpenProcessCache(InOpenProcessCache) +, m_RpcHandler(m_Log, m_CacheStats, UpstreamCache, InCacheStore, InCidStore, InDiskWriteBlocker) +{ + m_StatsService.RegisterHandler("z$", *this); + m_StatusService.RegisterHandler("z$", *this); +} + +HttpStructuredCacheService::~HttpStructuredCacheService() +{ + ZEN_INFO("closing structured cache"); + { + RwLock::ExclusiveLockScope _(m_RequestRecordingLock); + m_RequestRecordingEnabled.store(false); + m_RequestRecorder.reset(); + } + + m_StatsService.UnregisterHandler("z$", *this); + m_StatusService.UnregisterHandler("z$", *this); +} + +const char* +HttpStructuredCacheService::BaseUri() const +{ + return "/z$/"; +} + +void +HttpStructuredCacheService::Flush() +{ + m_CacheStore.Flush(); +} + +void +HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) +{ + std::string_view Key = Request.RelativeUri(); + std::vector<std::string> Tokens; + uint32_t TokenCount = ForEachStrTok(Key, '/', [&Tokens](std::string_view Token) { + Tokens.push_back(std::string(Token)); + return true; + }); + std::string FilterNamespace; + std::string FilterBucket; + std::string FilterValue; + switch (TokenCount) + { + case 1: + break; + case 2: + { + FilterNamespace = Tokens[1]; + if (FilterNamespace.empty()) + { + m_CacheStats.BadRequestCount++; + return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL + } + } + break; + case 3: + { + FilterNamespace = Tokens[1]; + if (FilterNamespace.empty()) + { + m_CacheStats.BadRequestCount++; + return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL + } + FilterBucket = Tokens[2]; + if (FilterBucket.empty()) + { + m_CacheStats.BadRequestCount++; + return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL + } + } + break; + case 4: + { + FilterNamespace = Tokens[1]; + if (FilterNamespace.empty()) + { + m_CacheStats.BadRequestCount++; + return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL + } + FilterBucket = Tokens[2]; + if (FilterBucket.empty()) + { + m_CacheStats.BadRequestCount++; + return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL + } + FilterValue = Tokens[3]; + if (FilterValue.empty()) + { + m_CacheStats.BadRequestCount++; + return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL + } + } + break; + default: + m_CacheStats.BadRequestCount++; + return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL + } + + HttpServerRequest::QueryParams Params = Request.GetQueryParams(); + bool CSV = Params.GetValue("csv") == "true"; + bool Details = Params.GetValue("details") == "true"; + bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true"; + + std::chrono::seconds NowSeconds = std::chrono::duration_cast<std::chrono::seconds>(GcClock::Now().time_since_epoch()); + CacheValueDetails ValueDetails = m_CacheStore.GetValueDetails(FilterNamespace, FilterBucket, FilterValue); + + if (CSV) + { + ExtendableStringBuilder<4096> CSVWriter; + if (AttachmentDetails) + { + CSVWriter << "Namespace, Bucket, Key, Cid, Size"; + } + else if (Details) + { + CSVWriter << "Namespace, Bucket, Key, Size, RawSize, RawHash, ContentType, Age, AttachmentsCount, AttachmentsSize"; + } + else + { + CSVWriter << "Namespace, Bucket, Key"; + } + for (const auto& NamespaceIt : ValueDetails.Namespaces) + { + const std::string& Namespace = NamespaceIt.first; + for (const auto& BucketIt : NamespaceIt.second.Buckets) + { + const std::string& Bucket = BucketIt.first; + for (const auto& ValueIt : BucketIt.second.Values) + { + if (AttachmentDetails) + { + for (const IoHash& Hash : ValueIt.second.Attachments) + { + IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); + CSVWriter << "\r\n" + << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << Hash.ToHexString() + << ", " << gsl::narrow<uint64_t>(Payload.GetSize()); + } + } + else if (Details) + { + std::chrono::seconds LastAccessedSeconds = std::chrono::duration_cast<std::chrono::seconds>( + GcClock::TimePointFromTick(ValueIt.second.LastAccess).time_since_epoch()); + CSVWriter << "\r\n" + << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << ValueIt.second.Size << "," + << ValueIt.second.RawSize << "," << ValueIt.second.RawHash.ToHexString() << ", " + << ToString(ValueIt.second.ContentType) << ", " << (NowSeconds.count() - LastAccessedSeconds.count()) + << ", " << gsl::narrow<uint64_t>(ValueIt.second.Attachments.size()); + size_t AttachmentsSize = 0; + for (const IoHash& Hash : ValueIt.second.Attachments) + { + IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); + AttachmentsSize += Payload.GetSize(); + } + CSVWriter << ", " << gsl::narrow<uint64_t>(AttachmentsSize); + } + else + { + CSVWriter << "\r\n" << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString(); + } + } + } + } + return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); + } + else + { + CbObjectWriter Cbo; + Cbo.BeginArray("namespaces"); + { + for (const auto& NamespaceIt : ValueDetails.Namespaces) + { + const std::string& Namespace = NamespaceIt.first; + Cbo.BeginObject(); + { + Cbo.AddString("name", Namespace); + Cbo.BeginArray("buckets"); + { + for (const auto& BucketIt : NamespaceIt.second.Buckets) + { + const std::string& Bucket = BucketIt.first; + Cbo.BeginObject(); + { + Cbo.AddString("name", Bucket); + Cbo.BeginArray("values"); + { + for (const auto& ValueIt : BucketIt.second.Values) + { + std::chrono::seconds LastAccessedSeconds = std::chrono::duration_cast<std::chrono::seconds>( + GcClock::TimePointFromTick(ValueIt.second.LastAccess).time_since_epoch()); + Cbo.BeginObject(); + { + Cbo.AddHash("key", ValueIt.first); + if (Details) + { + Cbo.AddInteger("size", ValueIt.second.Size); + if (ValueIt.second.Size > 0 && ValueIt.second.RawSize != 0 && + ValueIt.second.RawSize != ValueIt.second.Size) + { + Cbo.AddInteger("rawsize", ValueIt.second.RawSize); + Cbo.AddHash("rawhash", ValueIt.second.RawHash); + } + Cbo.AddString("contenttype", ToString(ValueIt.second.ContentType)); + Cbo.AddInteger("age", + gsl::narrow<uint64_t>(NowSeconds.count() - LastAccessedSeconds.count())); + if (ValueIt.second.Attachments.size() > 0) + { + if (AttachmentDetails) + { + Cbo.BeginArray("attachments"); + { + for (const IoHash& Hash : ValueIt.second.Attachments) + { + Cbo.BeginObject(); + Cbo.AddHash("cid", Hash); + IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); + Cbo.AddInteger("size", gsl::narrow<uint64_t>(Payload.GetSize())); + Cbo.EndObject(); + } + } + Cbo.EndArray(); + } + else + { + Cbo.AddInteger("attachmentcount", + gsl::narrow<uint64_t>(ValueIt.second.Attachments.size())); + size_t AttachmentsSize = 0; + for (const IoHash& Hash : ValueIt.second.Attachments) + { + IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); + AttachmentsSize += Payload.GetSize(); + } + Cbo.AddInteger("attachmentssize", gsl::narrow<uint64_t>(AttachmentsSize)); + } + } + } + } + Cbo.EndObject(); + } + } + Cbo.EndArray(); + } + Cbo.EndObject(); + } + } + Cbo.EndArray(); + } + Cbo.EndObject(); + } + } + Cbo.EndArray(); + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + } +} + +void +HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) +{ + ZEN_TRACE_CPU("z$::Http::HandleRequest"); + + ZEN_MEMSCOPE(GetCacheHttpTag()); + + metrics::OperationTiming::Scope $(m_HttpRequests); + + const std::string_view Key = Request.RelativeUri(); + + std::string_view UriNamespace; + + if (Key.ends_with(HttpZCacheRPCPrefix)) + { + const size_t RpcOffset = Key.length() - HttpZCacheRPCPrefix.length(); + + if (RpcOffset) + { + std::string_view KeyPrefix = Key.substr(0, RpcOffset); + + if (KeyPrefix.back() == '/') + { + KeyPrefix.remove_suffix(1); + + UriNamespace = KeyPrefix; + } + } + + return HandleRpcRequest(Request, UriNamespace); + } + + if (Key == HttpZCacheUtilStartRecording) + { + HttpServerRequest::QueryParams Params = Request.GetQueryParams(); + + std::string RecordPath = UrlDecode(Params.GetValue("path")); + + { + RwLock::ExclusiveLockScope _(m_RequestRecordingLock); + m_RequestRecordingEnabled.store(false); + m_RequestRecorder.reset(); + + m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath); + m_RequestRecordingEnabled.store(true); + } + ZEN_INFO("cache RPC recording STARTED -> '{}'", RecordPath); + Request.WriteResponse(HttpResponseCode::OK); + return; + } + + if (Key == HttpZCacheUtilStopRecording) + { + { + RwLock::ExclusiveLockScope _(m_RequestRecordingLock); + m_RequestRecordingEnabled.store(false); + m_RequestRecorder.reset(); + } + ZEN_INFO("cache RPC recording STOPPED"); + Request.WriteResponse(HttpResponseCode::OK); + return; + } + + if (Key == HttpZCacheUtilReplayRecording) + { + CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; + + { + RwLock::ExclusiveLockScope _(m_RequestRecordingLock); + m_RequestRecordingEnabled.store(false); + m_RequestRecorder.reset(); + } + + HttpServerRequest::QueryParams Params = Request.GetQueryParams(); + + std::string RecordPath = UrlDecode(Params.GetValue("path")); + + uint32_t ThreadCount = GetHardwareConcurrency(); + if (auto Param = Params.GetValue("thread_count"); Param.empty() == false) + { + if (auto Value = ParseInt<uint64_t>(Param)) + { + ThreadCount = gsl::narrow<uint32_t>(Value.value()); + } + } + + ZEN_INFO("initiating cache RPC replay using {} threads, from '{}'", ThreadCount, RecordPath); + + std::unique_ptr<cache::IRpcRequestReplayer> Replayer(cache::MakeDiskRequestReplayer(RecordPath, false)); + ReplayRequestRecorder(RequestContext, *Replayer, ThreadCount < 1 ? 1 : ThreadCount); + + ZEN_INFO("cache RPC replay COMPLETED"); + + Request.WriteResponse(HttpResponseCode::OK); + return; + } + + if (Key.starts_with(HttpZCacheDetailsPrefix)) + { + HandleDetailsRequest(Request); + return; + } + + HttpCacheRequestData RequestData; + if (!HttpCacheRequestParseRelativeUri(Key, ZenCacheStore::DefaultNamespace, RequestData)) + { + m_CacheStats.BadRequestCount++; + return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL + } + + if (RequestData.ValueContentId.has_value()) + { + ZEN_ASSERT(RequestData.Namespace.has_value()); + ZEN_ASSERT(RequestData.Bucket.has_value()); + ZEN_ASSERT(RequestData.HashKey.has_value()); + CacheRef Ref = {.Namespace = RequestData.Namespace.value(), + .BucketSegment = RequestData.Bucket.value(), + .HashKey = RequestData.HashKey.value(), + .ValueContentId = RequestData.ValueContentId.value()}; + return HandleCacheChunkRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams())); + } + + if (RequestData.HashKey.has_value()) + { + ZEN_ASSERT(RequestData.Namespace.has_value()); + ZEN_ASSERT(RequestData.Bucket.has_value()); + CacheRef Ref = {.Namespace = RequestData.Namespace.value(), + .BucketSegment = RequestData.Bucket.value(), + .HashKey = RequestData.HashKey.value(), + .ValueContentId = IoHash::Zero}; + return HandleCacheRecordRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams())); + } + + if (RequestData.Bucket.has_value()) + { + ZEN_ASSERT(RequestData.Namespace.has_value()); + return HandleCacheBucketRequest(Request, RequestData.Namespace.value(), RequestData.Bucket.value()); + } + + if (RequestData.Namespace.has_value()) + { + return HandleCacheNamespaceRequest(Request, RequestData.Namespace.value()); + } + return HandleCacheRequest(Request); +} + +void +HttpStructuredCacheService::HandleCacheRequest(HttpServerRequest& Request) +{ + switch (Request.RequestVerb()) + { + case HttpVerb::kHead: + case HttpVerb::kGet: + { + ZenCacheStore::Info Info = m_CacheStore.GetInfo(); + + CbObjectWriter ResponseWriter; + + ResponseWriter.BeginObject("Configuration"); + { + ExtendableStringBuilder<128> BasePathString; + BasePathString << Info.BasePath.u8string(); + ResponseWriter.AddString("BasePath"sv, BasePathString.ToView()); + ResponseWriter.AddBool("AllowAutomaticCreationOfNamespaces", Info.Config.AllowAutomaticCreationOfNamespaces); + ResponseWriter.BeginObject("Logging"); + { + ResponseWriter.AddBool("EnableWriteLog", Info.Config.Logging.EnableWriteLog); + ResponseWriter.AddBool("EnableAccessLog", Info.Config.Logging.EnableAccessLog); + } + ResponseWriter.EndObject(); + } + ResponseWriter.EndObject(); + + std::sort(begin(Info.NamespaceNames), end(Info.NamespaceNames), [](std::string_view L, std::string_view R) { + return L.compare(R) < 0; + }); + ResponseWriter.BeginArray("Namespaces"); + for (const std::string& NamespaceName : Info.NamespaceNames) + { + ResponseWriter.AddString(NamespaceName); + } + ResponseWriter.EndArray(); + ResponseWriter.BeginObject("StorageSize"); + { + ResponseWriter.AddInteger("DiskSize", Info.StorageSize.DiskSize); + ResponseWriter.AddInteger("MemorySize", Info.StorageSize.MemorySize); + } + + ResponseWriter.EndObject(); + + ResponseWriter.AddInteger("DiskEntryCount", Info.DiskEntryCount); + + return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); + } + break; + default: + m_CacheStats.BadRequestCount++; + break; + } +} + +void +HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view NamespaceName) +{ + switch (Request.RequestVerb()) + { + case HttpVerb::kHead: + case HttpVerb::kGet: + { + std::optional<ZenCacheNamespace::Info> Info = m_CacheStore.GetNamespaceInfo(NamespaceName); + if (!Info.has_value()) + { + return Request.WriteResponse(HttpResponseCode::NotFound); + } + + CbObjectWriter ResponseWriter; + + ResponseWriter.BeginObject("Configuration"); + { + ExtendableStringBuilder<128> BasePathString; + BasePathString << Info->RootDir.u8string(); + ResponseWriter.AddString("RootDir"sv, BasePathString.ToView()); + ResponseWriter.AddInteger("MaxBlockSize"sv, Info->Config.DiskLayerConfig.BucketConfig.MaxBlockSize); + ResponseWriter.AddInteger("PayloadAlignment"sv, Info->Config.DiskLayerConfig.BucketConfig.PayloadAlignment); + ResponseWriter.AddInteger("MemCacheSizeThreshold"sv, Info->Config.DiskLayerConfig.BucketConfig.MemCacheSizeThreshold); + ResponseWriter.AddInteger("LargeObjectThreshold"sv, Info->Config.DiskLayerConfig.BucketConfig.LargeObjectThreshold); + ResponseWriter.AddInteger("MemCacheTargetFootprintBytes"sv, Info->Config.DiskLayerConfig.MemCacheTargetFootprintBytes); + ResponseWriter.AddInteger("MemCacheTrimIntervalSeconds"sv, Info->Config.DiskLayerConfig.MemCacheTrimIntervalSeconds); + ResponseWriter.AddInteger("MemCacheMaxAgeSeconds"sv, Info->Config.DiskLayerConfig.MemCacheMaxAgeSeconds); + } + ResponseWriter.EndObject(); + + std::sort(begin(Info->BucketNames), end(Info->BucketNames), [](std::string_view L, std::string_view R) { + return L.compare(R) < 0; + }); + + ResponseWriter.BeginArray("Buckets"sv); + for (const std::string& BucketName : Info->BucketNames) + { + ResponseWriter.AddString(BucketName); + } + ResponseWriter.EndArray(); + + ResponseWriter.BeginObject("StorageSize"sv); + { + ResponseWriter.AddInteger("DiskSize"sv, Info->DiskLayerInfo.StorageSize.DiskSize); + ResponseWriter.AddInteger("MemorySize"sv, Info->DiskLayerInfo.StorageSize.MemorySize); + } + ResponseWriter.EndObject(); + + ResponseWriter.AddInteger("EntryCount", Info->DiskLayerInfo.EntryCount); + + if (auto Buckets = HttpServerRequest::Decode(Request.GetQueryParams().GetValue("bucketsizes")); !Buckets.empty()) + { + ResponseWriter.BeginObject("BucketSizes"); + + ResponseWriter.BeginArray("Buckets"); + + std::vector<std::string> BucketNames; + if (Buckets == "*") // Get all - empty FieldFilter equal getting all fields + { + BucketNames = Info.value().BucketNames; + } + else + { + ForEachStrTok(Buckets, ',', [&](std::string_view BucketName) { + BucketNames.push_back(std::string(BucketName)); + return true; + }); + } + WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Background); + std::vector<IoHash> AllAttachments; + for (const std::string& BucketName : BucketNames) + { + ResponseWriter.BeginObject(); + ResponseWriter << "Name" << BucketName; + CacheContentStats ContentStats; + bool Success = m_CacheStore.GetContentStats(NamespaceName, BucketName, ContentStats); + if (Success) + { + size_t ValuesSize = 0; + for (const uint64_t Size : ContentStats.ValueSizes) + { + ValuesSize += Size; + } + + std::sort(ContentStats.Attachments.begin(), ContentStats.Attachments.end()); + auto NewEnd = std::unique(ContentStats.Attachments.begin(), ContentStats.Attachments.end()); + ContentStats.Attachments.erase(NewEnd, ContentStats.Attachments.end()); + + ResponseWriter << "Count" << ContentStats.ValueSizes.size(); + ResponseWriter << "StructuredCount" << ContentStats.StructuredValuesCount; + ResponseWriter << "StandaloneCount" << ContentStats.StandaloneValuesCount; + ResponseWriter << "Size" << ValuesSize; + ResponseWriter << "AttachmentCount" << ContentStats.Attachments.size(); + + AllAttachments.insert(AllAttachments.end(), ContentStats.Attachments.begin(), ContentStats.Attachments.end()); + } + ResponseWriter.EndObject(); + } + + ResponseWriter.EndArray(); + + ResponseWriter.BeginObject("Attachments"); + std::sort(AllAttachments.begin(), AllAttachments.end()); + auto NewEnd = std::unique(AllAttachments.begin(), AllAttachments.end()); + AllAttachments.erase(NewEnd, AllAttachments.end()); + + uint64_t AttachmentsSize = 0; + + m_CidStore.IterateChunks( + AllAttachments, + [&](size_t Index, const IoBuffer& Payload) { + ZEN_UNUSED(Index); + AttachmentsSize += Payload.GetSize(); + return true; + }, + &WorkerPool, + 8u * 1024u); + + ResponseWriter << "Count" << AllAttachments.size(); + ResponseWriter << "Size" << AttachmentsSize; + + ResponseWriter.EndObject(); + + ResponseWriter.EndObject(); + } + + return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); + } + break; + + case HttpVerb::kDelete: + // Drop namespace + { + if (m_CacheStore.DropNamespace(NamespaceName)) + { + return Request.WriteResponse(HttpResponseCode::OK); + } + else + { + return Request.WriteResponse(HttpResponseCode::NotFound); + } + } + break; + + default: + break; + } +} + +void +HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, + std::string_view NamespaceName, + std::string_view BucketName) +{ + switch (Request.RequestVerb()) + { + case HttpVerb::kHead: + case HttpVerb::kGet: + { + std::optional<ZenCacheNamespace::BucketInfo> Info = m_CacheStore.GetBucketInfo(NamespaceName, BucketName); + if (!Info.has_value()) + { + return Request.WriteResponse(HttpResponseCode::NotFound); + } + + CbObjectWriter ResponseWriter; + + ResponseWriter.BeginObject("StorageSize"); + { + ResponseWriter.AddInteger("DiskSize", Info->DiskLayerInfo.StorageSize.DiskSize); + ResponseWriter.AddInteger("MemorySize", Info->DiskLayerInfo.StorageSize.MemorySize); + } + ResponseWriter.EndObject(); + + ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount); + + if (auto GetBucketSize = Request.GetQueryParams().GetValue("bucketsize"); GetBucketSize == "true") + { + CacheContentStats ContentStats; + bool Success = m_CacheStore.GetContentStats(NamespaceName, BucketName, ContentStats); + if (Success) + { + size_t ValuesSize = 0; + for (const uint64_t Size : ContentStats.ValueSizes) + { + ValuesSize += Size; + } + + std::sort(ContentStats.Attachments.begin(), ContentStats.Attachments.end()); + auto NewEnd = std::unique(ContentStats.Attachments.begin(), ContentStats.Attachments.end()); + ContentStats.Attachments.erase(NewEnd, ContentStats.Attachments.end()); + + ResponseWriter << "Count" << ContentStats.ValueSizes.size(); + ResponseWriter << "StructuredCount" << ContentStats.StructuredValuesCount; + ResponseWriter << "StandaloneCount" << ContentStats.StandaloneValuesCount; + ResponseWriter << "Size" << ValuesSize; + ResponseWriter << "AttachmentCount" << ContentStats.Attachments.size(); + + uint64_t AttachmentsSize = 0; + + WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Background); + + m_CidStore.IterateChunks( + ContentStats.Attachments, + [&](size_t Index, const IoBuffer& Payload) { + ZEN_UNUSED(Index); + AttachmentsSize += Payload.GetSize(); + return true; + }, + &WorkerPool, + 8u * 1024u); + + ResponseWriter << "AttachmentsSize" << AttachmentsSize; + } + } + + return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); + } + break; + + case HttpVerb::kDelete: + // Drop bucket + { + if (m_CacheStore.DropBucket(NamespaceName, BucketName)) + { + return Request.WriteResponse(HttpResponseCode::OK); + } + else + { + return Request.WriteResponse(HttpResponseCode::NotFound); + } + } + break; + + default: + break; + } +} + +void +HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) +{ + switch (Request.RequestVerb()) + { + case HttpVerb::kHead: + case HttpVerb::kGet: + HandleGetCacheRecord(Request, Ref, PolicyFromUrl); + break; + + case HttpVerb::kPut: + HandlePutCacheRecord(Request, Ref, PolicyFromUrl); + break; + + default: + break; + } +} + +void +HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) +{ + const ZenContentType AcceptType = Request.AcceptContentType(); + const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData); + const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord); + + bool Success = false; + uint32_t MissingCount = 0; + ZenCacheValue ClientResultValue; + if (!EnumHasAnyFlags(PolicyFromUrl, CachePolicy::Query)) + { + return Request.WriteResponse(HttpResponseCode::OK); + } + + const bool HasUpstream = m_UpstreamCache.IsActive(); + + CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; + Stopwatch Timer; + + if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal) && + m_CacheStore.Get(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue)) + { + Success = true; + ZenContentType ContentType = ClientResultValue.Value.GetContentType(); + + if (AcceptType == ZenContentType::kCbPackage) + { + if (ContentType == ZenContentType::kCbObject) + { + CbPackage Package; + CbValidateError ValidateError = CbValidateError::None; + if (CbObject PackageObject = ValidateAndReadCompactBinaryObject(std::move(ClientResultValue.Value), ValidateError); + ValidateError == CbValidateError::None) + { + CbObjectView CacheRecord(ClientResultValue.Value.Data()); + CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) { + if (SkipData) + { + if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) + { + MissingCount++; + } + } + else + { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); + if (Compressed) + { + Package.AddAttachment(CbAttachment(Compressed, AttachmentHash.AsHash())); + } + else + { + ZEN_WARN("invalid compressed binary returned for {}", AttachmentHash.AsHash()); + MissingCount++; + } + } + else + { + MissingCount++; + } + } + }); + + Success = MissingCount == 0 || PartialRecord; + } + else + { + ZEN_WARN("Invalid compact binary payload returned for {}/{}/{} ({}). Reason: '{}'", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + Ref.ValueContentId, + ToString(ValidateError)); + Success = false; + } + + if (Success) + { + CbObject PackageObject = LoadCompactBinaryObject(std::move(ClientResultValue.Value)); + + Package.SetObject(std::move(PackageObject)); + + BinaryWriter MemStream; + Package.Save(MemStream); + + ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + ClientResultValue.Value.SetContentType(HttpContentType::kCbPackage); + } + } + else + { + Success = false; + } + } + else if (AcceptType != ClientResultValue.Value.GetContentType() && AcceptType != ZenContentType::kUnknownContentType && + AcceptType != ZenContentType::kBinary) + { + Success = false; + } + } + + if (Success) + { + ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {} '{}' (LOCAL) in {}", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(ClientResultValue.Value.Size()), + ToString(ClientResultValue.Value.GetContentType()), + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + + m_CacheStats.HitCount++; + if (SkipData && AcceptType != ZenContentType::kCbPackage && AcceptType != ZenContentType::kCbObject) + { + return Request.WriteResponse(HttpResponseCode::OK); + } + else + { + // kCbPackage handled SkipData when constructing the ClientResultValue, kcbObject ignores SkipData + return Request.WriteResponse((MissingCount == 0) ? HttpResponseCode::OK : HttpResponseCode::PartialContent, + ClientResultValue.Value.GetContentType(), + ClientResultValue.Value); + } + } + else if (!HasUpstream || !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryRemote)) + { + ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + ToString(AcceptType), + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + m_CacheStats.MissCount++; + return Request.WriteResponse(HttpResponseCode::NotFound); + } + + // Issue upstream query asynchronously in order to keep requests flowing without + // hogging I/O servicing threads with blocking work + + uint64_t LocalElapsedTimeUs = Timer.GetElapsedTimeUs(); + + Request.WriteResponseAsync([this, AcceptType, PolicyFromUrl, Ref, LocalElapsedTimeUs, RequestContext](HttpServerRequest& AsyncRequest) { + Stopwatch Timer; + bool Success = false; + const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord); + const bool QueryLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData); + ZenCacheValue ClientResultValue; + + metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming); + + if (GetUpstreamCacheSingleResult UpstreamResult = + m_UpstreamCache.GetCacheRecord(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, AcceptType); + UpstreamResult.Status.Success) + { + Success = true; + + ClientResultValue.Value = UpstreamResult.Value; + ClientResultValue.Value.SetContentType(AcceptType); + + if (AcceptType == ZenContentType::kBinary || AcceptType == ZenContentType::kCbObject) + { + if (AcceptType == ZenContentType::kCbObject) + { + const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All); + if (ValidationResult != CbValidateError::None) + { + Success = false; + ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid compact binary object from upstream", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + ToString(AcceptType)); + } + + // We do not do anything to the returned object for SkipData, only package attachments are cut when skipping data + } + + if (Success && StoreLocal) + { + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + ZenCacheStore::PutResult PutResult = + m_CacheStore + .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, Overwrite, nullptr); + if (PutResult.Status == zen::PutStatus::Success) + { + m_CacheStats.WriteCount++; + } + } + } + else if (AcceptType == ZenContentType::kCbPackage) + { + CbPackage Package; + if (Package.TryLoad(ClientResultValue.Value)) + { + CbObject CacheRecord = Package.GetObject(); + AttachmentCount Count; + size_t NumAttachments = Package.GetAttachments().size(); + std::vector<IoHash> ReferencedAttachments; + std::vector<IoBuffer> WriteAttachmentBuffers; + WriteAttachmentBuffers.reserve(NumAttachments); + std::vector<IoHash> WriteRawHashes; + WriteRawHashes.reserve(NumAttachments); + + CacheRecord.IterateAttachments([this, + &Package, + &Ref, + &WriteAttachmentBuffers, + &WriteRawHashes, + &ReferencedAttachments, + &Count, + QueryLocal, + StoreLocal, + SkipData](CbFieldView HashView) { + IoHash Hash = HashView.AsHash(); + ReferencedAttachments.push_back(Hash); + if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) + { + if (Attachment->IsCompressedBinary()) + { + if (StoreLocal) + { + const CompressedBuffer& Chunk = Attachment->AsCompressedBinary(); + WriteAttachmentBuffers.push_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(Attachment->GetHash()); + } + Count.Valid++; + } + else + { + ZEN_WARN("Uncompressed value '{}' from upstream cache record '{}/{}'", + Hash, + Ref.BucketSegment, + Ref.HashKey); + Count.Invalid++; + } + } + else if (QueryLocal) + { + if (SkipData) + { + if (m_CidStore.ContainsChunk(Hash)) + { + Count.Valid++; + } + } + else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash)) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); + if (Compressed) + { + Package.AddAttachment(CbAttachment(Compressed, Hash)); + Count.Valid++; + } + else + { + ZEN_WARN("Uncompressed value '{}' stored in local cache '{}/{}'", Hash, Ref.BucketSegment, Ref.HashKey); + Count.Invalid++; + } + } + } + Count.Total++; + }); + + if ((Count.Valid == Count.Total) || PartialRecord) + { + ZenCacheValue CacheValue; + CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); + CacheValue.Value.SetContentType(ZenContentType::kCbObject); + + if (StoreLocal) + { + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + CacheValue, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status == zen::PutStatus::Success) + { + m_CacheStats.WriteCount++; + + if (!WriteAttachmentBuffers.empty()) + { + std::vector<CidStore::InsertResult> InsertResults = + m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (const CidStore::InsertResult& Result : InsertResults) + { + if (Result.New) + { + Count.New++; + } + } + } + + WriteAttachmentBuffers = {}; + WriteRawHashes = {}; + } + } + + BinaryWriter MemStream; + if (SkipData) + { + // Save a package containing only the object. + CbPackage(Package.GetObject()).Save(MemStream); + } + else + { + Package.Save(MemStream); + } + + ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + ClientResultValue.Value.SetContentType(ZenContentType::kCbPackage); + } + else + { + Success = false; + ZEN_WARN("Get - '{}/{}' '{}' FAILED, attachments missing in upstream package", + Ref.BucketSegment, + Ref.HashKey, + ToString(AcceptType)); + } + } + else + { + Success = false; + ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid upstream package", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + ToString(AcceptType)); + } + } + } + + if (Success) + { + ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {} '{}' (UPSTREAM) in {}", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(ClientResultValue.Value.Size()), + ToString(ClientResultValue.Value.GetContentType()), + NiceLatencyNs((LocalElapsedTimeUs + Timer.GetElapsedTimeUs()) * 1000)); + + m_CacheStats.HitCount++; + m_CacheStats.UpstreamHitCount++; + + if (SkipData && AcceptType == ZenContentType::kBinary) + { + AsyncRequest.WriteResponse(HttpResponseCode::OK); + } + else + { + // Other methods modify ClientResultValue to a version that has skipped the data but keeps the Object and optionally + // metadata. + AsyncRequest.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); + } + } + else + { + ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + ToString(AcceptType), + NiceLatencyNs((LocalElapsedTimeUs + Timer.GetElapsedTimeUs()) * 1000)); + m_CacheStats.MissCount++; + AsyncRequest.WriteResponse(HttpResponseCode::NotFound); + } + }); +} + +void +HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) +{ + IoBuffer Body = Request.ReadPayload(); + + if (!Body || Body.Size() == 0) + { + m_CacheStats.BadRequestCount++; + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + if (!AreDiskWritesAllowed()) + { + return Request.WriteResponse(HttpResponseCode::InsufficientStorage); + } + + auto WriteFailureResponse = [&Request](const ZenCacheStore::PutResult& PutResult) { + ZEN_UNUSED(PutResult); + + HttpResponseCode ResponseCode = HttpResponseCode::InternalServerError; + switch (PutResult.Status) + { + case zen::PutStatus::Conflict: + ResponseCode = HttpResponseCode::Conflict; + break; + case zen::PutStatus::Invalid: + ResponseCode = HttpResponseCode::BadRequest; + break; + } + + if (PutResult.Details) + { + Request.WriteResponse(ResponseCode, PutResult.Details); + } + return Request.WriteResponse(ResponseCode); + }; + + const HttpContentType ContentType = Request.RequestContentType(); + + Body.SetContentType(ContentType); + + CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; + + const bool HasUpstream = m_UpstreamCache.IsActive(); + + Stopwatch Timer; + + if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kCompressedBinary) + { + IoHash RawHash = IoHash::Zero; + uint64_t RawSize = Body.GetSize(); + if (ContentType == HttpContentType::kCompressedBinary) + { + if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize)) + { + m_CacheStats.BadRequestCount++; + return Request.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Payload is not a valid compressed binary"sv); + } + } + else + { + RawHash = IoHash::HashBuffer(SharedBuffer(Body)); + } + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + // TODO: Propagation for rejected PUTs + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}, + {}, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) + { + return WriteFailureResponse(PutResult); + } + m_CacheStats.WriteCount++; + + if (HasUpstream && EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote)) + { + m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}}); + } + + ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' in {}", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(Body.Size()), + ToString(ContentType), + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + Request.WriteResponse(HttpResponseCode::Created); + } + else if (ContentType == HttpContentType::kCbObject) + { + const CbValidateError ValidationResult = ValidateCompactBinary(MemoryView(Body.GetData(), Body.GetSize()), CbValidateMode::All); + + if (ValidationResult != CbValidateError::None) + { + ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, invalid compact binary", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + ToString(ContentType)); + m_CacheStats.BadRequestCount++; + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv); + } + + Body.SetContentType(ZenContentType::kCbObject); + + CbObjectView CacheRecord(Body.Data()); + std::vector<IoHash> ValidAttachments; + std::vector<IoHash> ReferencedAttachments; + int32_t TotalCount = 0; + + CacheRecord.IterateAttachments([this, &TotalCount, &ValidAttachments, &ReferencedAttachments](CbFieldView AttachmentHash) { + const IoHash Hash = AttachmentHash.AsHash(); + ReferencedAttachments.push_back(Hash); + if (m_CidStore.ContainsChunk(Hash)) + { + ValidAttachments.emplace_back(Hash); + } + TotalCount++; + }); + + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + + // TODO: Propagation for rejected PUTs + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + {.Value = Body}, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) + { + return WriteFailureResponse(PutResult); + } + m_CacheStats.WriteCount++; + + ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total) in {}", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(Body.Size()), + ToString(ContentType), + TotalCount, + ValidAttachments.size(), + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + + const bool IsPartialRecord = TotalCount != static_cast<int32_t>(ValidAttachments.size()); + + CachePolicy Policy = PolicyFromUrl; + if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) + { + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject, + .Namespace = Ref.Namespace, + .Key = {Ref.BucketSegment, Ref.HashKey}, + .ValueContentIds = std::move(ValidAttachments)}); + } + + Request.WriteResponse(HttpResponseCode::Created); + } + else if (ContentType == HttpContentType::kCbPackage) + { + CbPackage Package; + + if (!Package.TryLoad(Body)) + { + ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, invalid package", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + ToString(ContentType)); + m_CacheStats.BadRequestCount++; + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv); + } + CachePolicy Policy = PolicyFromUrl; + + CbObject CacheRecord = Package.GetObject(); + + AttachmentCount Count; + size_t NumAttachments = Package.GetAttachments().size(); + std::vector<IoHash> ValidAttachments; + std::vector<IoHash> ReferencedAttachments; + ValidAttachments.reserve(NumAttachments); + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; + WriteAttachmentBuffers.reserve(NumAttachments); + WriteRawHashes.reserve(NumAttachments); + + CacheRecord.IterateAttachments( + [this, &Ref, &Package, &WriteAttachmentBuffers, &WriteRawHashes, &ValidAttachments, &ReferencedAttachments, &Count]( + CbFieldView HashView) { + const IoHash Hash = HashView.AsHash(); + ReferencedAttachments.push_back(Hash); + if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) + { + if (Attachment->IsCompressedBinary()) + { + WriteAttachmentBuffers.emplace_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(Hash); + ValidAttachments.emplace_back(Hash); + Count.Valid++; + } + else + { + ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + ToString(HttpContentType::kCbPackage), + Hash); + Count.Invalid++; + } + } + else if (m_CidStore.ContainsChunk(Hash)) + { + ValidAttachments.emplace_back(Hash); + Count.Valid++; + } + Count.Total++; + }); + + if (Count.Invalid > 0) + { + m_CacheStats.BadRequestCount++; + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv); + } + + const bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal); + + ZenCacheValue CacheValue; + CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); + CacheValue.Value.SetContentType(ZenContentType::kCbObject); + // TODO: Propagation for rejected PUTs + ZenCacheStore::PutResult PutResult = + m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments, Overwrite); + if (PutResult.Status != zen::PutStatus::Success) + { + return WriteFailureResponse(PutResult); + } + m_CacheStats.WriteCount++; + + if (!WriteAttachmentBuffers.empty()) + { + std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (const CidStore::InsertResult& InsertResult : InsertResults) + { + if (InsertResult.New) + { + Count.New++; + } + } + WriteAttachmentBuffers = {}; + WriteRawHashes = {}; + } + + ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total) in {}", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(Body.GetSize()), + ToString(ContentType), + Count.New, + Count.Valid, + Count.Total, + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + + const bool IsPartialRecord = Count.Valid != Count.Total; + + if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) + { + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, + .Namespace = Ref.Namespace, + .Key = {Ref.BucketSegment, Ref.HashKey}, + .ValueContentIds = std::move(ValidAttachments)}); + } + + Request.WriteResponse(HttpResponseCode::Created); + } + else + { + m_CacheStats.BadRequestCount++; + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Content-Type invalid"sv); + } +} + +void +HttpStructuredCacheService::HandleCacheChunkRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) +{ + switch (Request.RequestVerb()) + { + case HttpVerb::kHead: + case HttpVerb::kGet: + HandleGetCacheChunk(Request, Ref, PolicyFromUrl); + break; + case HttpVerb::kPut: + HandlePutCacheChunk(Request, Ref, PolicyFromUrl); + break; + default: + break; + } +} + +void +HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) +{ + Stopwatch Timer; + + IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId); + const UpstreamEndpointInfo* Source = nullptr; + CachePolicy Policy = PolicyFromUrl; + + const bool HasUpstream = m_UpstreamCache.IsActive(); + { + const bool QueryUpstream = HasUpstream && !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote); + + if (QueryUpstream) + { + if (GetUpstreamCacheSingleResult UpstreamResult = + m_UpstreamCache.GetCacheChunk(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); + UpstreamResult.Status.Success) + { + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer::ValidateCompressedHeader(UpstreamResult.Value, RawHash, RawSize)) + { + if (RawHash == Ref.ValueContentId) + { + if (AreDiskWritesAllowed()) + { + m_CidStore.AddChunk(UpstreamResult.Value, RawHash); + } + Source = UpstreamResult.Source; + } + else + { + ZEN_WARN("got missmatching upstream cache value"); + } + } + else + { + ZEN_WARN("got uncompressed upstream cache value"); + } + } + } + } + + if (!Value) + { + ZEN_DEBUG("GETCACHECHUNK MISS - '{}/{}/{}/{}' '{}' in {}", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + Ref.ValueContentId, + ToString(Request.AcceptContentType()), + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + m_CacheStats.MissCount++; + return Request.WriteResponse(HttpResponseCode::NotFound); + } + + ZEN_DEBUG("GETCACHECHUNK HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + Ref.ValueContentId, + NiceBytes(Value.Size()), + ToString(Value.GetContentType()), + Source ? Source->Url : "LOCAL"sv, + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + + m_CacheStats.HitCount++; + if (Source) + { + m_CacheStats.UpstreamHitCount++; + } + + if (EnumHasAllFlags(Policy, CachePolicy::SkipData)) + { + Request.WriteResponse(HttpResponseCode::OK); + } + else + { + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value); + } +} + +void +HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) +{ + // Note: Individual cacherecord values are not propagated upstream until a valid cache record has been stored + ZEN_UNUSED(PolicyFromUrl); + + Stopwatch Timer; + + IoBuffer Body = Request.ReadPayload(); + + if (!Body || Body.Size() == 0) + { + m_CacheStats.BadRequestCount++; + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + if (!AreDiskWritesAllowed()) + { + return Request.WriteResponse(HttpResponseCode::InsufficientStorage); + } + + Body.SetContentType(Request.RequestContentType()); + + IoHash RawHash; + uint64_t RawSize; + if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize)) + { + m_CacheStats.BadRequestCount++; + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv); + } + + if (RawHash != Ref.ValueContentId) + { + m_CacheStats.BadRequestCount++; + return Request.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "ValueContentId does not match attachment hash"sv); + } + + CidStore::InsertResult Result = m_CidStore.AddChunk(Body, RawHash); + + ZEN_DEBUG("PUTCACHECHUNK - '{}/{}/{}/{}' {} '{}' ({}) in {}", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + Ref.ValueContentId, + NiceBytes(Body.Size()), + ToString(Body.GetContentType()), + Result.New ? "NEW" : "OLD", + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + + const HttpResponseCode ResponseCode = Result.New ? HttpResponseCode::Created : HttpResponseCode::OK; + + Request.WriteResponse(ResponseCode); +} + +void +HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Context, + cache::IRpcRequestReplayer& Replayer, + uint32_t ThreadCount) +{ + WorkerThreadPool WorkerPool(ThreadCount); + uint64_t RequestCount = Replayer.GetRequestCount(); + Stopwatch Timer; + auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + ZEN_INFO("Replaying {} requests", RequestCount); + for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) + { + if (AbortFlag) + { + break; + } + Work.ScheduleWork(WorkerPool, [this, &Context, &Replayer, RequestIndex](std::atomic<bool>& AbortFlag) { + IoBuffer Body; + zen::cache::RecordedRequestInfo RequestInfo = Replayer.GetRequest(RequestIndex, /* out */ Body); + + if (AbortFlag) + { + return; + } + + if (Body) + { + uint32_t AcceptMagic = 0; + RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; + int TargetPid = 0; + CbPackage RpcResult; + if (m_RpcHandler.HandleRpcRequest(Context, + /* UriNamespace */ {}, + RequestInfo.ContentType, + std::move(Body), + AcceptMagic, + AcceptFlags, + TargetPid, + RpcResult) == CacheRpcHandler::RpcResponseCode::OK) + { + if (AcceptMagic == kCbPkgMagic) + { + void* TargetProcessHandle = nullptr; + FormatFlags Flags = FormatFlags::kDefault; + if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) + { + Flags |= FormatFlags::kAllowLocalReferences; + if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) + { + Flags |= FormatFlags::kDenyPartialLocalReferences; + } + TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(Context.SessionId, TargetPid); + } + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle); + ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0); + } + else + { + BinaryWriter MemStream; + RpcResult.Save(MemStream); + IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()); + ZEN_ASSERT(RpcResponseBuffer.Size() > 0); + } + } + } + }); + } + Work.Wait(10000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted, IsPaused); + ZEN_INFO("Replayed {} of {} requests, elapsed {}", + RequestCount - PendingWork, + RequestCount, + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + }); +} + +void +HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request, std::string_view UriNamespace) +{ + ZEN_MEMSCOPE(GetCacheRpcTag()); + + ZEN_TRACE_CPU("z$::Http::HandleRpcRequest"); + + const bool HasUpstream = m_UpstreamCache.IsActive(); + + switch (Request.RequestVerb()) + { + case HttpVerb::kPost: + { + CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; + + const HttpContentType ContentType = Request.RequestContentType(); + const HttpContentType AcceptType = Request.AcceptContentType(); + + if ((ContentType != HttpContentType::kCbObject && ContentType != HttpContentType::kCbPackage) || + AcceptType != HttpContentType::kCbPackage) + { + m_CacheStats.BadRequestCount++; + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + + auto HandleRpc = [this, + RequestContext, + Body = Request.ReadPayload(), + ContentType, + AcceptType, + UriNamespaceString = std::string{UriNamespace}](HttpServerRequest& AsyncRequest) mutable { + if (m_RequestRecordingEnabled) + { + RwLock::SharedLockScope _(m_RequestRecordingLock); + if (m_RequestRecorder) + { + m_RequestRecorder->RecordRequest( + {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId}, + Body); + } + } + + uint32_t AcceptMagic = 0; + RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; + int TargetProcessId = 0; + CbPackage RpcResult; + + CacheRpcHandler::RpcResponseCode ResultCode = m_RpcHandler.HandleRpcRequest(RequestContext, + UriNamespaceString, + ContentType, + std::move(Body), + /* out */ AcceptMagic, + /* out */ AcceptFlags, + /* out */ TargetProcessId, + /* out */ RpcResult); + + HttpResponseCode HttpResultCode = HttpResponseCode(int(ResultCode)); + + if (!IsHttpSuccessCode(HttpResultCode)) + { + return AsyncRequest.WriteResponse(HttpResultCode); + } + + if (AcceptMagic == kCbPkgMagic) + { + void* TargetProcessHandle = nullptr; + FormatFlags Flags = FormatFlags::kDefault; + if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) + { + Flags |= FormatFlags::kAllowLocalReferences; + if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) + { + Flags |= FormatFlags::kDenyPartialLocalReferences; + } + TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(RequestContext.SessionId, TargetProcessId); + } + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle); + AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + } + else + { + BinaryWriter MemStream; + RpcResult.Save(MemStream); + + AsyncRequest.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + } + }; + + if (HasUpstream) + { + ZEN_TRACE_CPU("z$::Http::HandleRpcRequest::WriteResponseAsync"); + Request.WriteResponseAsync(std::move(HandleRpc)); + } + else + { + ZEN_TRACE_CPU("z$::Http::HandleRpcRequest::WriteResponse"); + HandleRpc(Request); + } + } + break; + + default: + m_CacheStats.BadRequestCount++; + Request.WriteResponse(HttpResponseCode::BadRequest); + break; + } +} + +void +HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) +{ + ZEN_MEMSCOPE(GetCacheHttpTag()); + + CbObjectWriter Cbo; + + EmitSnapshot("requests", m_HttpRequests, Cbo); + + const uint64_t HitCount = m_CacheStats.HitCount; + const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount; + const uint64_t MissCount = m_CacheStats.MissCount; + const uint64_t WriteCount = m_CacheStats.WriteCount; + const uint64_t BadRequestCount = m_CacheStats.BadRequestCount; + struct CidStoreStats StoreStats = m_CidStore.Stats(); + const uint64_t ChunkHitCount = StoreStats.HitCount; + const uint64_t ChunkMissCount = StoreStats.MissCount; + const uint64_t ChunkWriteCount = StoreStats.WriteCount; + const uint64_t TotalCount = HitCount + MissCount; + + const uint64_t RpcRequests = m_CacheStats.RpcRequests; + const uint64_t RpcRecordRequests = m_CacheStats.RpcRecordRequests; + const uint64_t RpcRecordBatchRequests = m_CacheStats.RpcRecordBatchRequests; + const uint64_t RpcValueRequests = m_CacheStats.RpcValueRequests; + const uint64_t RpcValueBatchRequests = m_CacheStats.RpcValueBatchRequests; + const uint64_t RpcChunkRequests = m_CacheStats.RpcChunkRequests; + const uint64_t RpcChunkBatchRequests = m_CacheStats.RpcChunkBatchRequests; + + const CidStoreSize CidSize = m_CidStore.TotalSize(); + const CacheStoreSize CacheSize = m_CacheStore.TotalSize(); + + bool ShowCidStoreStats = Request.GetQueryParams().GetValue("cidstorestats") == "true"; + bool ShowCacheStoreStats = Request.GetQueryParams().GetValue("cachestorestats") == "true"; + + CidStoreStats CidStoreStats = {}; + if (ShowCidStoreStats) + { + CidStoreStats = m_CidStore.Stats(); + } + ZenCacheStore::CacheStoreStats CacheStoreStats = {}; + if (ShowCacheStoreStats) + { + CacheStoreStats = m_CacheStore.Stats(); + } + + Cbo.BeginObject("cache"); + { + Cbo << "badrequestcount" << BadRequestCount; + Cbo.BeginObject("rpc"); + Cbo << "count" << RpcRequests; + Cbo << "ops" << RpcRecordBatchRequests + RpcValueBatchRequests + RpcChunkBatchRequests; + Cbo.BeginObject("records"); + Cbo << "count" << RpcRecordRequests; + Cbo << "ops" << RpcRecordBatchRequests; + Cbo.EndObject(); + Cbo.BeginObject("values"); + Cbo << "count" << RpcValueRequests; + Cbo << "ops" << RpcValueBatchRequests; + Cbo.EndObject(); + Cbo.BeginObject("chunks"); + Cbo << "count" << RpcChunkRequests; + Cbo << "ops" << RpcChunkBatchRequests; + Cbo.EndObject(); + Cbo.EndObject(); + + Cbo.BeginObject("size"); + { + Cbo << "disk" << CacheSize.DiskSize; + Cbo << "memory" << CacheSize.MemorySize; + } + Cbo.EndObject(); + + Cbo << "hits" << HitCount << "misses" << MissCount << "writes" << WriteCount; + Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0); + + if (m_UpstreamCache.IsActive()) + { + Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); + Cbo << "upstream_hits" << m_CacheStats.UpstreamHitCount; + Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); + Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); + } + + Cbo << "cidhits" << ChunkHitCount << "cidmisses" << ChunkMissCount << "cidwrites" << ChunkWriteCount; + + if (ShowCacheStoreStats) + { + Cbo.BeginObject("store"); + Cbo << "hits" << CacheStoreStats.HitCount << "misses" << CacheStoreStats.MissCount << "writes" << CacheStoreStats.WriteCount + << "rejected_writes" << CacheStoreStats.RejectedWriteCount << "rejected_reads" << CacheStoreStats.RejectedReadCount; + const uint64_t StoreTotal = CacheStoreStats.HitCount + CacheStoreStats.MissCount; + Cbo << "hit_ratio" << (StoreTotal > 0 ? (double(CacheStoreStats.HitCount) / double(StoreTotal)) : 0.0); + EmitSnapshot("read", CacheStoreStats.GetOps, Cbo); + EmitSnapshot("write", CacheStoreStats.PutOps, Cbo); + if (!CacheStoreStats.NamespaceStats.empty()) + { + Cbo.BeginArray("namespaces"); + for (const ZenCacheStore::NamedNamespaceStats& NamespaceStats : CacheStoreStats.NamespaceStats) + { + Cbo.BeginObject(); + Cbo.AddString("namespace", NamespaceStats.NamespaceName); + Cbo << "hits" << NamespaceStats.Stats.HitCount << "misses" << NamespaceStats.Stats.MissCount << "writes" + << NamespaceStats.Stats.WriteCount; + const uint64_t NamespaceTotal = NamespaceStats.Stats.HitCount + NamespaceStats.Stats.MissCount; + Cbo << "hit_ratio" << (NamespaceTotal > 0 ? (double(NamespaceStats.Stats.HitCount) / double(NamespaceTotal)) : 0.0); + EmitSnapshot("read", NamespaceStats.Stats.GetOps, Cbo); + EmitSnapshot("write", NamespaceStats.Stats.PutOps, Cbo); + Cbo.BeginObject("size"); + { + Cbo << "disk" << NamespaceStats.Stats.DiskStats.DiskSize; + Cbo << "memory" << NamespaceStats.Stats.DiskStats.MemorySize; + } + Cbo.EndObject(); + if (!NamespaceStats.Stats.DiskStats.BucketStats.empty()) + { + Cbo.BeginArray("buckets"); + for (const ZenCacheDiskLayer::NamedBucketStats& BucketStats : NamespaceStats.Stats.DiskStats.BucketStats) + { + Cbo.BeginObject(); + Cbo.AddString("bucket", BucketStats.BucketName); + if (BucketStats.Stats.DiskSize != 0 || BucketStats.Stats.MemorySize != 0) + { + Cbo.BeginObject("size"); + { + Cbo << "disk" << BucketStats.Stats.DiskSize; + Cbo << "memory" << BucketStats.Stats.MemorySize; + } + Cbo.EndObject(); + } + + if (BucketStats.Stats.DiskSize == 0 && BucketStats.Stats.DiskHitCount == 0 && + BucketStats.Stats.DiskMissCount == 0 && BucketStats.Stats.DiskWriteCount == 0 && + BucketStats.Stats.MemoryHitCount == 0 && BucketStats.Stats.MemoryMissCount == 0 && + BucketStats.Stats.MemoryWriteCount == 0) + { + Cbo.EndObject(); + continue; + } + + const uint64_t BucketDiskTotal = BucketStats.Stats.DiskHitCount + BucketStats.Stats.DiskMissCount; + if (BucketDiskTotal != 0 || BucketStats.Stats.DiskWriteCount != 0) + { + Cbo << "hits" << BucketStats.Stats.DiskHitCount << "misses" << BucketStats.Stats.DiskMissCount << "writes" + << BucketStats.Stats.DiskWriteCount; + Cbo << "hit_ratio" + << (BucketDiskTotal > 0 ? (double(BucketStats.Stats.DiskHitCount) / double(BucketDiskTotal)) : 0.0); + } + + const uint64_t BucketMemoryTotal = BucketStats.Stats.MemoryHitCount + BucketStats.Stats.MemoryMissCount; + if (BucketMemoryTotal != 0 || BucketStats.Stats.MemoryWriteCount != 0) + { + Cbo << "mem_hits" << BucketStats.Stats.MemoryHitCount << "mem_misses" << BucketStats.Stats.MemoryMissCount + << "mem_writes" << BucketStats.Stats.MemoryWriteCount; + Cbo << "mem_hit_ratio" + << (BucketMemoryTotal > 0 ? (double(BucketStats.Stats.MemoryHitCount) / double(BucketMemoryTotal)) + : 0.0); + } + + if (BucketDiskTotal != 0 || BucketStats.Stats.DiskWriteCount != 0 || BucketMemoryTotal != 0 || + BucketStats.Stats.MemoryWriteCount != 0) + { + EmitSnapshot("read", BucketStats.Stats.GetOps, Cbo); + EmitSnapshot("write", BucketStats.Stats.PutOps, Cbo); + } + + Cbo.EndObject(); + } + Cbo.EndArray(); + } + Cbo.EndObject(); + } + Cbo.EndArray(); + } + Cbo.EndObject(); + } + Cbo.EndObject(); + } + + if (m_UpstreamCache.IsActive()) + { + EmitSnapshot("upstream_gets", m_UpstreamGetRequestTiming, Cbo); + Cbo.BeginObject("upstream"); + { + m_UpstreamCache.GetStatus(Cbo); + } + Cbo.EndObject(); + } + + Cbo.BeginObject("cid"); + { + Cbo.BeginObject("size"); + { + Cbo << "tiny" << CidSize.TinySize; + Cbo << "small" << CidSize.SmallSize; + Cbo << "large" << CidSize.LargeSize; + Cbo << "total" << CidSize.TotalSize; + } + Cbo.EndObject(); + + if (ShowCidStoreStats) + { + Cbo.BeginObject("store"); + Cbo << "hits" << CidStoreStats.HitCount << "misses" << CidStoreStats.MissCount << "writes" << CidStoreStats.WriteCount; + EmitSnapshot("read", CidStoreStats.FindChunkOps, Cbo); + EmitSnapshot("write", CidStoreStats.AddChunkOps, Cbo); + // EmitSnapshot("exists", CidStoreStats.ContainChunkOps, Cbo); + Cbo.EndObject(); + } + } + Cbo.EndObject(); + + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +void +HttpStructuredCacheService::HandleStatusRequest(HttpServerRequest& Request) +{ + CbObjectWriter Cbo; + Cbo << "ok" << true; + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +bool +HttpStructuredCacheService::AreDiskWritesAllowed() const +{ + return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed()); +} + +} // namespace zen |