diff options
Diffstat (limited to 'src/zenserver/cache/httpstructuredcache.cpp')
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 2052 |
1 files changed, 0 insertions, 2052 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp deleted file mode 100644 index 9f87c208c..000000000 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ /dev/null @@ -1,2052 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "httpstructuredcache.h" - -#include <zencore/compactbinary.h> -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinarypackage.h> -#include <zencore/compactbinaryutil.h> -#include <zencore/compactbinaryvalidation.h> -#include <zencore/compress.h> -#include <zencore/enumflags.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> -#include <zencore/memory/llm.h> -#include <zencore/parallelwork.h> -#include <zencore/scopeguard.h> -#include <zencore/stream.h> -#include <zencore/timer.h> -#include <zencore/trace.h> -#include <zencore/workthreadpool.h> -#include <zenhttp/httpserver.h> -#include <zenhttp/httpstats.h> -#include <zenhttp/packageformat.h> -#include <zenremotestore/jupiter/jupiterclient.h> -#include <zenstore/cache/cache.h> -#include <zenstore/cache/structuredcachestore.h> -#include <zenstore/gc.h> -#include <zenutil/rpcrecording.h> -#include <zenutil/workerpools.h> - -#include "upstream/upstreamcache.h" -#include "upstream/zen.h" -#include "zenstore/cidstore.h" -#include "zenstore/scrubcontext.h" - -#include <algorithm> -#include <atomic> -#include <filesystem> -#include <queue> -#include <thread> - -#include <gsl/gsl-lite.hpp> - -namespace zen { - -const FLLMTag& -GetCacheHttpTag() -{ - static FLLMTag CacheHttpTag("http", FLLMTag("cache")); - - return CacheHttpTag; -} - -extern const FLLMTag& GetCacheRpcTag(); - -using namespace std::literals; - -////////////////////////////////////////////////////////////////////////// - -CachePolicy -ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams) -{ - std::string_view PolicyText = QueryParams.GetValue("Policy"sv); - return !PolicyText.empty() ? zen::ParseCachePolicy(PolicyText) : CachePolicy::Default; -} - -namespace { - static constinit std::string_view HttpZCacheRPCPrefix = "$rpc"sv; - static constinit std::string_view HttpZCacheUtilStartRecording = "exec$/start-recording"sv; - static constinit std::string_view HttpZCacheUtilStopRecording = "exec$/stop-recording"sv; - static constinit std::string_view HttpZCacheUtilReplayRecording = "exec$/replay-recording"sv; - static constinit std::string_view HttpZCacheDetailsPrefix = "details$"sv; -} // namespace - -////////////////////////////////////////////////////////////////////////// - -HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, - CidStore& InCidStore, - HttpStatsService& StatsService, - HttpStatusService& StatusService, - UpstreamCache& UpstreamCache, - const DiskWriteBlocker* InDiskWriteBlocker, - OpenProcessCache& InOpenProcessCache) -: m_Log(logging::Get("cache")) -, m_CacheStore(InCacheStore) -, m_StatsService(StatsService) -, m_StatusService(StatusService) -, m_CidStore(InCidStore) -, m_UpstreamCache(UpstreamCache) -, m_DiskWriteBlocker(InDiskWriteBlocker) -, m_OpenProcessCache(InOpenProcessCache) -, m_RpcHandler(m_Log, m_CacheStats, UpstreamCache, InCacheStore, InCidStore, InDiskWriteBlocker) -{ - m_StatsService.RegisterHandler("z$", *this); - m_StatusService.RegisterHandler("z$", *this); -} - -HttpStructuredCacheService::~HttpStructuredCacheService() -{ - ZEN_INFO("closing structured cache"); - { - RwLock::ExclusiveLockScope _(m_RequestRecordingLock); - m_RequestRecordingEnabled.store(false); - m_RequestRecorder.reset(); - } - - m_StatsService.UnregisterHandler("z$", *this); - m_StatusService.UnregisterHandler("z$", *this); -} - -const char* -HttpStructuredCacheService::BaseUri() const -{ - return "/z$/"; -} - -void -HttpStructuredCacheService::Flush() -{ - m_CacheStore.Flush(); -} - -void -HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) -{ - std::string_view Key = Request.RelativeUri(); - std::vector<std::string> Tokens; - uint32_t TokenCount = ForEachStrTok(Key, '/', [&Tokens](std::string_view Token) { - Tokens.push_back(std::string(Token)); - return true; - }); - std::string FilterNamespace; - std::string FilterBucket; - std::string FilterValue; - switch (TokenCount) - { - case 1: - break; - case 2: - { - FilterNamespace = Tokens[1]; - if (FilterNamespace.empty()) - { - m_CacheStats.BadRequestCount++; - return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL - } - } - break; - case 3: - { - FilterNamespace = Tokens[1]; - if (FilterNamespace.empty()) - { - m_CacheStats.BadRequestCount++; - return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL - } - FilterBucket = Tokens[2]; - if (FilterBucket.empty()) - { - m_CacheStats.BadRequestCount++; - return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL - } - } - break; - case 4: - { - FilterNamespace = Tokens[1]; - if (FilterNamespace.empty()) - { - m_CacheStats.BadRequestCount++; - return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL - } - FilterBucket = Tokens[2]; - if (FilterBucket.empty()) - { - m_CacheStats.BadRequestCount++; - return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL - } - FilterValue = Tokens[3]; - if (FilterValue.empty()) - { - m_CacheStats.BadRequestCount++; - return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL - } - } - break; - default: - m_CacheStats.BadRequestCount++; - return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL - } - - HttpServerRequest::QueryParams Params = Request.GetQueryParams(); - bool CSV = Params.GetValue("csv") == "true"; - bool Details = Params.GetValue("details") == "true"; - bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true"; - - std::chrono::seconds NowSeconds = std::chrono::duration_cast<std::chrono::seconds>(GcClock::Now().time_since_epoch()); - CacheValueDetails ValueDetails = m_CacheStore.GetValueDetails(FilterNamespace, FilterBucket, FilterValue); - - if (CSV) - { - ExtendableStringBuilder<4096> CSVWriter; - if (AttachmentDetails) - { - CSVWriter << "Namespace, Bucket, Key, Cid, Size"; - } - else if (Details) - { - CSVWriter << "Namespace, Bucket, Key, Size, RawSize, RawHash, ContentType, Age, AttachmentsCount, AttachmentsSize"; - } - else - { - CSVWriter << "Namespace, Bucket, Key"; - } - for (const auto& NamespaceIt : ValueDetails.Namespaces) - { - const std::string& Namespace = NamespaceIt.first; - for (const auto& BucketIt : NamespaceIt.second.Buckets) - { - const std::string& Bucket = BucketIt.first; - for (const auto& ValueIt : BucketIt.second.Values) - { - if (AttachmentDetails) - { - for (const IoHash& Hash : ValueIt.second.Attachments) - { - IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); - CSVWriter << "\r\n" - << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << Hash.ToHexString() - << ", " << gsl::narrow<uint64_t>(Payload.GetSize()); - } - } - else if (Details) - { - std::chrono::seconds LastAccessedSeconds = std::chrono::duration_cast<std::chrono::seconds>( - GcClock::TimePointFromTick(ValueIt.second.LastAccess).time_since_epoch()); - CSVWriter << "\r\n" - << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << ValueIt.second.Size << "," - << ValueIt.second.RawSize << "," << ValueIt.second.RawHash.ToHexString() << ", " - << ToString(ValueIt.second.ContentType) << ", " << (NowSeconds.count() - LastAccessedSeconds.count()) - << ", " << gsl::narrow<uint64_t>(ValueIt.second.Attachments.size()); - size_t AttachmentsSize = 0; - for (const IoHash& Hash : ValueIt.second.Attachments) - { - IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); - AttachmentsSize += Payload.GetSize(); - } - CSVWriter << ", " << gsl::narrow<uint64_t>(AttachmentsSize); - } - else - { - CSVWriter << "\r\n" << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString(); - } - } - } - } - return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); - } - else - { - CbObjectWriter Cbo; - Cbo.BeginArray("namespaces"); - { - for (const auto& NamespaceIt : ValueDetails.Namespaces) - { - const std::string& Namespace = NamespaceIt.first; - Cbo.BeginObject(); - { - Cbo.AddString("name", Namespace); - Cbo.BeginArray("buckets"); - { - for (const auto& BucketIt : NamespaceIt.second.Buckets) - { - const std::string& Bucket = BucketIt.first; - Cbo.BeginObject(); - { - Cbo.AddString("name", Bucket); - Cbo.BeginArray("values"); - { - for (const auto& ValueIt : BucketIt.second.Values) - { - std::chrono::seconds LastAccessedSeconds = std::chrono::duration_cast<std::chrono::seconds>( - GcClock::TimePointFromTick(ValueIt.second.LastAccess).time_since_epoch()); - Cbo.BeginObject(); - { - Cbo.AddHash("key", ValueIt.first); - if (Details) - { - Cbo.AddInteger("size", ValueIt.second.Size); - if (ValueIt.second.Size > 0 && ValueIt.second.RawSize != 0 && - ValueIt.second.RawSize != ValueIt.second.Size) - { - Cbo.AddInteger("rawsize", ValueIt.second.RawSize); - Cbo.AddHash("rawhash", ValueIt.second.RawHash); - } - Cbo.AddString("contenttype", ToString(ValueIt.second.ContentType)); - Cbo.AddInteger("age", - gsl::narrow<uint64_t>(NowSeconds.count() - LastAccessedSeconds.count())); - if (ValueIt.second.Attachments.size() > 0) - { - if (AttachmentDetails) - { - Cbo.BeginArray("attachments"); - { - for (const IoHash& Hash : ValueIt.second.Attachments) - { - Cbo.BeginObject(); - Cbo.AddHash("cid", Hash); - IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); - Cbo.AddInteger("size", gsl::narrow<uint64_t>(Payload.GetSize())); - Cbo.EndObject(); - } - } - Cbo.EndArray(); - } - else - { - Cbo.AddInteger("attachmentcount", - gsl::narrow<uint64_t>(ValueIt.second.Attachments.size())); - size_t AttachmentsSize = 0; - for (const IoHash& Hash : ValueIt.second.Attachments) - { - IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); - AttachmentsSize += Payload.GetSize(); - } - Cbo.AddInteger("attachmentssize", gsl::narrow<uint64_t>(AttachmentsSize)); - } - } - } - } - Cbo.EndObject(); - } - } - Cbo.EndArray(); - } - Cbo.EndObject(); - } - } - Cbo.EndArray(); - } - Cbo.EndObject(); - } - } - Cbo.EndArray(); - Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); - } -} - -void -HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) -{ - ZEN_TRACE_CPU("z$::Http::HandleRequest"); - - ZEN_MEMSCOPE(GetCacheHttpTag()); - - metrics::OperationTiming::Scope $(m_HttpRequests); - - const std::string_view Key = Request.RelativeUri(); - - std::string_view UriNamespace; - - if (Key.ends_with(HttpZCacheRPCPrefix)) - { - const size_t RpcOffset = Key.length() - HttpZCacheRPCPrefix.length(); - - if (RpcOffset) - { - std::string_view KeyPrefix = Key.substr(0, RpcOffset); - - if (KeyPrefix.back() == '/') - { - KeyPrefix.remove_suffix(1); - - UriNamespace = KeyPrefix; - } - } - - return HandleRpcRequest(Request, UriNamespace); - } - - if (Key == HttpZCacheUtilStartRecording) - { - HttpServerRequest::QueryParams Params = Request.GetQueryParams(); - - std::string RecordPath = UrlDecode(Params.GetValue("path")); - - { - RwLock::ExclusiveLockScope _(m_RequestRecordingLock); - m_RequestRecordingEnabled.store(false); - m_RequestRecorder.reset(); - - m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath); - m_RequestRecordingEnabled.store(true); - } - ZEN_INFO("cache RPC recording STARTED -> '{}'", RecordPath); - Request.WriteResponse(HttpResponseCode::OK); - return; - } - - if (Key == HttpZCacheUtilStopRecording) - { - { - RwLock::ExclusiveLockScope _(m_RequestRecordingLock); - m_RequestRecordingEnabled.store(false); - m_RequestRecorder.reset(); - } - ZEN_INFO("cache RPC recording STOPPED"); - Request.WriteResponse(HttpResponseCode::OK); - return; - } - - if (Key == HttpZCacheUtilReplayRecording) - { - CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; - - { - RwLock::ExclusiveLockScope _(m_RequestRecordingLock); - m_RequestRecordingEnabled.store(false); - m_RequestRecorder.reset(); - } - - HttpServerRequest::QueryParams Params = Request.GetQueryParams(); - - std::string RecordPath = UrlDecode(Params.GetValue("path")); - - uint32_t ThreadCount = GetHardwareConcurrency(); - if (auto Param = Params.GetValue("thread_count"); Param.empty() == false) - { - if (auto Value = ParseInt<uint64_t>(Param)) - { - ThreadCount = gsl::narrow<uint32_t>(Value.value()); - } - } - - ZEN_INFO("initiating cache RPC replay using {} threads, from '{}'", ThreadCount, RecordPath); - - std::unique_ptr<cache::IRpcRequestReplayer> Replayer(cache::MakeDiskRequestReplayer(RecordPath, false)); - ReplayRequestRecorder(RequestContext, *Replayer, ThreadCount < 1 ? 1 : ThreadCount); - - ZEN_INFO("cache RPC replay COMPLETED"); - - Request.WriteResponse(HttpResponseCode::OK); - return; - } - - if (Key.starts_with(HttpZCacheDetailsPrefix)) - { - HandleDetailsRequest(Request); - return; - } - - HttpCacheRequestData RequestData; - if (!HttpCacheRequestParseRelativeUri(Key, ZenCacheStore::DefaultNamespace, RequestData)) - { - m_CacheStats.BadRequestCount++; - return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL - } - - if (RequestData.ValueContentId.has_value()) - { - ZEN_ASSERT(RequestData.Namespace.has_value()); - ZEN_ASSERT(RequestData.Bucket.has_value()); - ZEN_ASSERT(RequestData.HashKey.has_value()); - CacheRef Ref = {.Namespace = RequestData.Namespace.value(), - .BucketSegment = RequestData.Bucket.value(), - .HashKey = RequestData.HashKey.value(), - .ValueContentId = RequestData.ValueContentId.value()}; - return HandleCacheChunkRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams())); - } - - if (RequestData.HashKey.has_value()) - { - ZEN_ASSERT(RequestData.Namespace.has_value()); - ZEN_ASSERT(RequestData.Bucket.has_value()); - CacheRef Ref = {.Namespace = RequestData.Namespace.value(), - .BucketSegment = RequestData.Bucket.value(), - .HashKey = RequestData.HashKey.value(), - .ValueContentId = IoHash::Zero}; - return HandleCacheRecordRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams())); - } - - if (RequestData.Bucket.has_value()) - { - ZEN_ASSERT(RequestData.Namespace.has_value()); - return HandleCacheBucketRequest(Request, RequestData.Namespace.value(), RequestData.Bucket.value()); - } - - if (RequestData.Namespace.has_value()) - { - return HandleCacheNamespaceRequest(Request, RequestData.Namespace.value()); - } - return HandleCacheRequest(Request); -} - -void -HttpStructuredCacheService::HandleCacheRequest(HttpServerRequest& Request) -{ - switch (Request.RequestVerb()) - { - case HttpVerb::kHead: - case HttpVerb::kGet: - { - ZenCacheStore::Info Info = m_CacheStore.GetInfo(); - - CbObjectWriter ResponseWriter; - - ResponseWriter.BeginObject("Configuration"); - { - ExtendableStringBuilder<128> BasePathString; - BasePathString << Info.BasePath.u8string(); - ResponseWriter.AddString("BasePath"sv, BasePathString.ToView()); - ResponseWriter.AddBool("AllowAutomaticCreationOfNamespaces", Info.Config.AllowAutomaticCreationOfNamespaces); - ResponseWriter.BeginObject("Logging"); - { - ResponseWriter.AddBool("EnableWriteLog", Info.Config.Logging.EnableWriteLog); - ResponseWriter.AddBool("EnableAccessLog", Info.Config.Logging.EnableAccessLog); - } - ResponseWriter.EndObject(); - } - ResponseWriter.EndObject(); - - std::sort(begin(Info.NamespaceNames), end(Info.NamespaceNames), [](std::string_view L, std::string_view R) { - return L.compare(R) < 0; - }); - ResponseWriter.BeginArray("Namespaces"); - for (const std::string& NamespaceName : Info.NamespaceNames) - { - ResponseWriter.AddString(NamespaceName); - } - ResponseWriter.EndArray(); - ResponseWriter.BeginObject("StorageSize"); - { - ResponseWriter.AddInteger("DiskSize", Info.StorageSize.DiskSize); - ResponseWriter.AddInteger("MemorySize", Info.StorageSize.MemorySize); - } - - ResponseWriter.EndObject(); - - ResponseWriter.AddInteger("DiskEntryCount", Info.DiskEntryCount); - - return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); - } - break; - default: - m_CacheStats.BadRequestCount++; - break; - } -} - -void -HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view NamespaceName) -{ - switch (Request.RequestVerb()) - { - case HttpVerb::kHead: - case HttpVerb::kGet: - { - std::optional<ZenCacheNamespace::Info> Info = m_CacheStore.GetNamespaceInfo(NamespaceName); - if (!Info.has_value()) - { - return Request.WriteResponse(HttpResponseCode::NotFound); - } - - CbObjectWriter ResponseWriter; - - ResponseWriter.BeginObject("Configuration"); - { - ExtendableStringBuilder<128> BasePathString; - BasePathString << Info->RootDir.u8string(); - ResponseWriter.AddString("RootDir"sv, BasePathString.ToView()); - ResponseWriter.AddInteger("MaxBlockSize"sv, Info->Config.DiskLayerConfig.BucketConfig.MaxBlockSize); - ResponseWriter.AddInteger("PayloadAlignment"sv, Info->Config.DiskLayerConfig.BucketConfig.PayloadAlignment); - ResponseWriter.AddInteger("MemCacheSizeThreshold"sv, Info->Config.DiskLayerConfig.BucketConfig.MemCacheSizeThreshold); - ResponseWriter.AddInteger("LargeObjectThreshold"sv, Info->Config.DiskLayerConfig.BucketConfig.LargeObjectThreshold); - ResponseWriter.AddInteger("MemCacheTargetFootprintBytes"sv, Info->Config.DiskLayerConfig.MemCacheTargetFootprintBytes); - ResponseWriter.AddInteger("MemCacheTrimIntervalSeconds"sv, Info->Config.DiskLayerConfig.MemCacheTrimIntervalSeconds); - ResponseWriter.AddInteger("MemCacheMaxAgeSeconds"sv, Info->Config.DiskLayerConfig.MemCacheMaxAgeSeconds); - } - ResponseWriter.EndObject(); - - std::sort(begin(Info->BucketNames), end(Info->BucketNames), [](std::string_view L, std::string_view R) { - return L.compare(R) < 0; - }); - - ResponseWriter.BeginArray("Buckets"sv); - for (const std::string& BucketName : Info->BucketNames) - { - ResponseWriter.AddString(BucketName); - } - ResponseWriter.EndArray(); - - ResponseWriter.BeginObject("StorageSize"sv); - { - ResponseWriter.AddInteger("DiskSize"sv, Info->DiskLayerInfo.StorageSize.DiskSize); - ResponseWriter.AddInteger("MemorySize"sv, Info->DiskLayerInfo.StorageSize.MemorySize); - } - ResponseWriter.EndObject(); - - ResponseWriter.AddInteger("EntryCount", Info->DiskLayerInfo.EntryCount); - - if (auto Buckets = HttpServerRequest::Decode(Request.GetQueryParams().GetValue("bucketsizes")); !Buckets.empty()) - { - ResponseWriter.BeginObject("BucketSizes"); - - ResponseWriter.BeginArray("Buckets"); - - std::vector<std::string> BucketNames; - if (Buckets == "*") // Get all - empty FieldFilter equal getting all fields - { - BucketNames = Info.value().BucketNames; - } - else - { - ForEachStrTok(Buckets, ',', [&](std::string_view BucketName) { - BucketNames.push_back(std::string(BucketName)); - return true; - }); - } - WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Background); - std::vector<IoHash> AllAttachments; - for (const std::string& BucketName : BucketNames) - { - ResponseWriter.BeginObject(); - ResponseWriter << "Name" << BucketName; - CacheContentStats ContentStats; - bool Success = m_CacheStore.GetContentStats(NamespaceName, BucketName, ContentStats); - if (Success) - { - size_t ValuesSize = 0; - for (const uint64_t Size : ContentStats.ValueSizes) - { - ValuesSize += Size; - } - - std::sort(ContentStats.Attachments.begin(), ContentStats.Attachments.end()); - auto NewEnd = std::unique(ContentStats.Attachments.begin(), ContentStats.Attachments.end()); - ContentStats.Attachments.erase(NewEnd, ContentStats.Attachments.end()); - - ResponseWriter << "Count" << ContentStats.ValueSizes.size(); - ResponseWriter << "StructuredCount" << ContentStats.StructuredValuesCount; - ResponseWriter << "StandaloneCount" << ContentStats.StandaloneValuesCount; - ResponseWriter << "Size" << ValuesSize; - ResponseWriter << "AttachmentCount" << ContentStats.Attachments.size(); - - AllAttachments.insert(AllAttachments.end(), ContentStats.Attachments.begin(), ContentStats.Attachments.end()); - } - ResponseWriter.EndObject(); - } - - ResponseWriter.EndArray(); - - ResponseWriter.BeginObject("Attachments"); - std::sort(AllAttachments.begin(), AllAttachments.end()); - auto NewEnd = std::unique(AllAttachments.begin(), AllAttachments.end()); - AllAttachments.erase(NewEnd, AllAttachments.end()); - - uint64_t AttachmentsSize = 0; - - m_CidStore.IterateChunks( - AllAttachments, - [&](size_t Index, const IoBuffer& Payload) { - ZEN_UNUSED(Index); - AttachmentsSize += Payload.GetSize(); - return true; - }, - &WorkerPool, - 8u * 1024u); - - ResponseWriter << "Count" << AllAttachments.size(); - ResponseWriter << "Size" << AttachmentsSize; - - ResponseWriter.EndObject(); - - ResponseWriter.EndObject(); - } - - return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); - } - break; - - case HttpVerb::kDelete: - // Drop namespace - { - if (m_CacheStore.DropNamespace(NamespaceName)) - { - return Request.WriteResponse(HttpResponseCode::OK); - } - else - { - return Request.WriteResponse(HttpResponseCode::NotFound); - } - } - break; - - default: - break; - } -} - -void -HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, - std::string_view NamespaceName, - std::string_view BucketName) -{ - switch (Request.RequestVerb()) - { - case HttpVerb::kHead: - case HttpVerb::kGet: - { - std::optional<ZenCacheNamespace::BucketInfo> Info = m_CacheStore.GetBucketInfo(NamespaceName, BucketName); - if (!Info.has_value()) - { - return Request.WriteResponse(HttpResponseCode::NotFound); - } - - CbObjectWriter ResponseWriter; - - ResponseWriter.BeginObject("StorageSize"); - { - ResponseWriter.AddInteger("DiskSize", Info->DiskLayerInfo.StorageSize.DiskSize); - ResponseWriter.AddInteger("MemorySize", Info->DiskLayerInfo.StorageSize.MemorySize); - } - ResponseWriter.EndObject(); - - ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount); - - if (auto GetBucketSize = Request.GetQueryParams().GetValue("bucketsize"); GetBucketSize == "true") - { - CacheContentStats ContentStats; - bool Success = m_CacheStore.GetContentStats(NamespaceName, BucketName, ContentStats); - if (Success) - { - size_t ValuesSize = 0; - for (const uint64_t Size : ContentStats.ValueSizes) - { - ValuesSize += Size; - } - - std::sort(ContentStats.Attachments.begin(), ContentStats.Attachments.end()); - auto NewEnd = std::unique(ContentStats.Attachments.begin(), ContentStats.Attachments.end()); - ContentStats.Attachments.erase(NewEnd, ContentStats.Attachments.end()); - - ResponseWriter << "Count" << ContentStats.ValueSizes.size(); - ResponseWriter << "StructuredCount" << ContentStats.StructuredValuesCount; - ResponseWriter << "StandaloneCount" << ContentStats.StandaloneValuesCount; - ResponseWriter << "Size" << ValuesSize; - ResponseWriter << "AttachmentCount" << ContentStats.Attachments.size(); - - uint64_t AttachmentsSize = 0; - - WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Background); - - m_CidStore.IterateChunks( - ContentStats.Attachments, - [&](size_t Index, const IoBuffer& Payload) { - ZEN_UNUSED(Index); - AttachmentsSize += Payload.GetSize(); - return true; - }, - &WorkerPool, - 8u * 1024u); - - ResponseWriter << "AttachmentsSize" << AttachmentsSize; - } - } - - return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); - } - break; - - case HttpVerb::kDelete: - // Drop bucket - { - if (m_CacheStore.DropBucket(NamespaceName, BucketName)) - { - return Request.WriteResponse(HttpResponseCode::OK); - } - else - { - return Request.WriteResponse(HttpResponseCode::NotFound); - } - } - break; - - default: - break; - } -} - -void -HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) -{ - switch (Request.RequestVerb()) - { - case HttpVerb::kHead: - case HttpVerb::kGet: - HandleGetCacheRecord(Request, Ref, PolicyFromUrl); - break; - - case HttpVerb::kPut: - HandlePutCacheRecord(Request, Ref, PolicyFromUrl); - break; - - default: - break; - } -} - -void -HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) -{ - const ZenContentType AcceptType = Request.AcceptContentType(); - const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData); - const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord); - - bool Success = false; - uint32_t MissingCount = 0; - ZenCacheValue ClientResultValue; - if (!EnumHasAnyFlags(PolicyFromUrl, CachePolicy::Query)) - { - return Request.WriteResponse(HttpResponseCode::OK); - } - - const bool HasUpstream = m_UpstreamCache.IsActive(); - - CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; - Stopwatch Timer; - - if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal) && - m_CacheStore.Get(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue)) - { - Success = true; - ZenContentType ContentType = ClientResultValue.Value.GetContentType(); - - if (AcceptType == ZenContentType::kCbPackage) - { - if (ContentType == ZenContentType::kCbObject) - { - CbPackage Package; - CbValidateError ValidateError = CbValidateError::None; - if (CbObject PackageObject = ValidateAndReadCompactBinaryObject(std::move(ClientResultValue.Value), ValidateError); - ValidateError == CbValidateError::None) - { - CbObjectView CacheRecord(ClientResultValue.Value.Data()); - CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) { - if (SkipData) - { - if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) - { - MissingCount++; - } - } - else - { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) - { - CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); - if (Compressed) - { - Package.AddAttachment(CbAttachment(Compressed, AttachmentHash.AsHash())); - } - else - { - ZEN_WARN("invalid compressed binary returned for {}", AttachmentHash.AsHash()); - MissingCount++; - } - } - else - { - MissingCount++; - } - } - }); - - Success = MissingCount == 0 || PartialRecord; - } - else - { - ZEN_WARN("Invalid compact binary payload returned for {}/{}/{} ({}). Reason: '{}'", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - Ref.ValueContentId, - ToString(ValidateError)); - Success = false; - } - - if (Success) - { - CbObject PackageObject = LoadCompactBinaryObject(std::move(ClientResultValue.Value)); - - Package.SetObject(std::move(PackageObject)); - - BinaryWriter MemStream; - Package.Save(MemStream); - - ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - ClientResultValue.Value.SetContentType(HttpContentType::kCbPackage); - } - } - else - { - Success = false; - } - } - else if (AcceptType != ClientResultValue.Value.GetContentType() && AcceptType != ZenContentType::kUnknownContentType && - AcceptType != ZenContentType::kBinary) - { - Success = false; - } - } - - if (Success) - { - ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {} '{}' (LOCAL) in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(ClientResultValue.Value.Size()), - ToString(ClientResultValue.Value.GetContentType()), - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - - m_CacheStats.HitCount++; - if (SkipData && AcceptType != ZenContentType::kCbPackage && AcceptType != ZenContentType::kCbObject) - { - return Request.WriteResponse(HttpResponseCode::OK); - } - else - { - // kCbPackage handled SkipData when constructing the ClientResultValue, kcbObject ignores SkipData - return Request.WriteResponse((MissingCount == 0) ? HttpResponseCode::OK : HttpResponseCode::PartialContent, - ClientResultValue.Value.GetContentType(), - ClientResultValue.Value); - } - } - else if (!HasUpstream || !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryRemote)) - { - ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(AcceptType), - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - m_CacheStats.MissCount++; - return Request.WriteResponse(HttpResponseCode::NotFound); - } - - // Issue upstream query asynchronously in order to keep requests flowing without - // hogging I/O servicing threads with blocking work - - uint64_t LocalElapsedTimeUs = Timer.GetElapsedTimeUs(); - - Request.WriteResponseAsync([this, AcceptType, PolicyFromUrl, Ref, LocalElapsedTimeUs, RequestContext](HttpServerRequest& AsyncRequest) { - Stopwatch Timer; - bool Success = false; - const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord); - const bool QueryLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); - const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); - const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData); - ZenCacheValue ClientResultValue; - - metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming); - - if (GetUpstreamCacheSingleResult UpstreamResult = - m_UpstreamCache.GetCacheRecord(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, AcceptType); - UpstreamResult.Status.Success) - { - Success = true; - - ClientResultValue.Value = UpstreamResult.Value; - ClientResultValue.Value.SetContentType(AcceptType); - - if (AcceptType == ZenContentType::kBinary || AcceptType == ZenContentType::kCbObject) - { - if (AcceptType == ZenContentType::kCbObject) - { - const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All); - if (ValidationResult != CbValidateError::None) - { - Success = false; - ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid compact binary object from upstream", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(AcceptType)); - } - - // We do not do anything to the returned object for SkipData, only package attachments are cut when skipping data - } - - if (Success && StoreLocal) - { - const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); - ZenCacheStore::PutResult PutResult = - m_CacheStore - .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, Overwrite, nullptr); - if (PutResult.Status == zen::PutStatus::Success) - { - m_CacheStats.WriteCount++; - } - } - } - else if (AcceptType == ZenContentType::kCbPackage) - { - CbPackage Package; - if (Package.TryLoad(ClientResultValue.Value)) - { - CbObject CacheRecord = Package.GetObject(); - AttachmentCount Count; - size_t NumAttachments = Package.GetAttachments().size(); - std::vector<IoHash> ReferencedAttachments; - std::vector<IoBuffer> WriteAttachmentBuffers; - WriteAttachmentBuffers.reserve(NumAttachments); - std::vector<IoHash> WriteRawHashes; - WriteRawHashes.reserve(NumAttachments); - - CacheRecord.IterateAttachments([this, - &Package, - &Ref, - &WriteAttachmentBuffers, - &WriteRawHashes, - &ReferencedAttachments, - &Count, - QueryLocal, - StoreLocal, - SkipData](CbFieldView HashView) { - IoHash Hash = HashView.AsHash(); - ReferencedAttachments.push_back(Hash); - if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) - { - if (Attachment->IsCompressedBinary()) - { - if (StoreLocal) - { - const CompressedBuffer& Chunk = Attachment->AsCompressedBinary(); - WriteAttachmentBuffers.push_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); - WriteRawHashes.push_back(Attachment->GetHash()); - } - Count.Valid++; - } - else - { - ZEN_WARN("Uncompressed value '{}' from upstream cache record '{}/{}'", - Hash, - Ref.BucketSegment, - Ref.HashKey); - Count.Invalid++; - } - } - else if (QueryLocal) - { - if (SkipData) - { - if (m_CidStore.ContainsChunk(Hash)) - { - Count.Valid++; - } - } - else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash)) - { - CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); - if (Compressed) - { - Package.AddAttachment(CbAttachment(Compressed, Hash)); - Count.Valid++; - } - else - { - ZEN_WARN("Uncompressed value '{}' stored in local cache '{}/{}'", Hash, Ref.BucketSegment, Ref.HashKey); - Count.Invalid++; - } - } - } - Count.Total++; - }); - - if ((Count.Valid == Count.Total) || PartialRecord) - { - ZenCacheValue CacheValue; - CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); - CacheValue.Value.SetContentType(ZenContentType::kCbObject); - - if (StoreLocal) - { - const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); - ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext, - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - CacheValue, - ReferencedAttachments, - Overwrite, - nullptr); - if (PutResult.Status == zen::PutStatus::Success) - { - m_CacheStats.WriteCount++; - - if (!WriteAttachmentBuffers.empty()) - { - std::vector<CidStore::InsertResult> InsertResults = - m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); - for (const CidStore::InsertResult& Result : InsertResults) - { - if (Result.New) - { - Count.New++; - } - } - } - - WriteAttachmentBuffers = {}; - WriteRawHashes = {}; - } - } - - BinaryWriter MemStream; - if (SkipData) - { - // Save a package containing only the object. - CbPackage(Package.GetObject()).Save(MemStream); - } - else - { - Package.Save(MemStream); - } - - ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - ClientResultValue.Value.SetContentType(ZenContentType::kCbPackage); - } - else - { - Success = false; - ZEN_WARN("Get - '{}/{}' '{}' FAILED, attachments missing in upstream package", - Ref.BucketSegment, - Ref.HashKey, - ToString(AcceptType)); - } - } - else - { - Success = false; - ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid upstream package", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(AcceptType)); - } - } - } - - if (Success) - { - ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {} '{}' (UPSTREAM) in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(ClientResultValue.Value.Size()), - ToString(ClientResultValue.Value.GetContentType()), - NiceLatencyNs((LocalElapsedTimeUs + Timer.GetElapsedTimeUs()) * 1000)); - - m_CacheStats.HitCount++; - m_CacheStats.UpstreamHitCount++; - - if (SkipData && AcceptType == ZenContentType::kBinary) - { - AsyncRequest.WriteResponse(HttpResponseCode::OK); - } - else - { - // Other methods modify ClientResultValue to a version that has skipped the data but keeps the Object and optionally - // metadata. - AsyncRequest.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); - } - } - else - { - ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}' '{}' in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(AcceptType), - NiceLatencyNs((LocalElapsedTimeUs + Timer.GetElapsedTimeUs()) * 1000)); - m_CacheStats.MissCount++; - AsyncRequest.WriteResponse(HttpResponseCode::NotFound); - } - }); -} - -void -HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) -{ - IoBuffer Body = Request.ReadPayload(); - - if (!Body || Body.Size() == 0) - { - m_CacheStats.BadRequestCount++; - return Request.WriteResponse(HttpResponseCode::BadRequest); - } - if (!AreDiskWritesAllowed()) - { - return Request.WriteResponse(HttpResponseCode::InsufficientStorage); - } - - auto WriteFailureResponse = [&Request](const ZenCacheStore::PutResult& PutResult) { - ZEN_UNUSED(PutResult); - - HttpResponseCode ResponseCode = HttpResponseCode::InternalServerError; - switch (PutResult.Status) - { - case zen::PutStatus::Conflict: - ResponseCode = HttpResponseCode::Conflict; - break; - case zen::PutStatus::Invalid: - ResponseCode = HttpResponseCode::BadRequest; - break; - } - - if (PutResult.Details) - { - Request.WriteResponse(ResponseCode, PutResult.Details); - } - return Request.WriteResponse(ResponseCode); - }; - - const HttpContentType ContentType = Request.RequestContentType(); - - Body.SetContentType(ContentType); - - CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; - - const bool HasUpstream = m_UpstreamCache.IsActive(); - - Stopwatch Timer; - - if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kCompressedBinary) - { - IoHash RawHash = IoHash::Zero; - uint64_t RawSize = Body.GetSize(); - if (ContentType == HttpContentType::kCompressedBinary) - { - if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize)) - { - m_CacheStats.BadRequestCount++; - return Request.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "Payload is not a valid compressed binary"sv); - } - } - else - { - RawHash = IoHash::HashBuffer(SharedBuffer(Body)); - } - const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); - // TODO: Propagation for rejected PUTs - ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext, - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}, - {}, - Overwrite, - nullptr); - if (PutResult.Status != zen::PutStatus::Success) - { - return WriteFailureResponse(PutResult); - } - m_CacheStats.WriteCount++; - - if (HasUpstream && EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote)) - { - m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}}); - } - - ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(Body.Size()), - ToString(ContentType), - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - Request.WriteResponse(HttpResponseCode::Created); - } - else if (ContentType == HttpContentType::kCbObject) - { - const CbValidateError ValidationResult = ValidateCompactBinary(MemoryView(Body.GetData(), Body.GetSize()), CbValidateMode::All); - - if (ValidationResult != CbValidateError::None) - { - ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, invalid compact binary", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(ContentType)); - m_CacheStats.BadRequestCount++; - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv); - } - - Body.SetContentType(ZenContentType::kCbObject); - - CbObjectView CacheRecord(Body.Data()); - std::vector<IoHash> ValidAttachments; - std::vector<IoHash> ReferencedAttachments; - int32_t TotalCount = 0; - - CacheRecord.IterateAttachments([this, &TotalCount, &ValidAttachments, &ReferencedAttachments](CbFieldView AttachmentHash) { - const IoHash Hash = AttachmentHash.AsHash(); - ReferencedAttachments.push_back(Hash); - if (m_CidStore.ContainsChunk(Hash)) - { - ValidAttachments.emplace_back(Hash); - } - TotalCount++; - }); - - const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); - - // TODO: Propagation for rejected PUTs - ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext, - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - {.Value = Body}, - ReferencedAttachments, - Overwrite, - nullptr); - if (PutResult.Status != zen::PutStatus::Success) - { - return WriteFailureResponse(PutResult); - } - m_CacheStats.WriteCount++; - - ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total) in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(Body.Size()), - ToString(ContentType), - TotalCount, - ValidAttachments.size(), - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - - const bool IsPartialRecord = TotalCount != static_cast<int32_t>(ValidAttachments.size()); - - CachePolicy Policy = PolicyFromUrl; - if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) - { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject, - .Namespace = Ref.Namespace, - .Key = {Ref.BucketSegment, Ref.HashKey}, - .ValueContentIds = std::move(ValidAttachments)}); - } - - Request.WriteResponse(HttpResponseCode::Created); - } - else if (ContentType == HttpContentType::kCbPackage) - { - CbPackage Package; - - if (!Package.TryLoad(Body)) - { - ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, invalid package", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(ContentType)); - m_CacheStats.BadRequestCount++; - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv); - } - CachePolicy Policy = PolicyFromUrl; - - CbObject CacheRecord = Package.GetObject(); - - AttachmentCount Count; - size_t NumAttachments = Package.GetAttachments().size(); - std::vector<IoHash> ValidAttachments; - std::vector<IoHash> ReferencedAttachments; - ValidAttachments.reserve(NumAttachments); - std::vector<IoBuffer> WriteAttachmentBuffers; - std::vector<IoHash> WriteRawHashes; - WriteAttachmentBuffers.reserve(NumAttachments); - WriteRawHashes.reserve(NumAttachments); - - CacheRecord.IterateAttachments( - [this, &Ref, &Package, &WriteAttachmentBuffers, &WriteRawHashes, &ValidAttachments, &ReferencedAttachments, &Count]( - CbFieldView HashView) { - const IoHash Hash = HashView.AsHash(); - ReferencedAttachments.push_back(Hash); - if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) - { - if (Attachment->IsCompressedBinary()) - { - WriteAttachmentBuffers.emplace_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer()); - WriteRawHashes.push_back(Hash); - ValidAttachments.emplace_back(Hash); - Count.Valid++; - } - else - { - ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(HttpContentType::kCbPackage), - Hash); - Count.Invalid++; - } - } - else if (m_CidStore.ContainsChunk(Hash)) - { - ValidAttachments.emplace_back(Hash); - Count.Valid++; - } - Count.Total++; - }); - - if (Count.Invalid > 0) - { - m_CacheStats.BadRequestCount++; - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv); - } - - const bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal); - - ZenCacheValue CacheValue; - CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); - CacheValue.Value.SetContentType(ZenContentType::kCbObject); - // TODO: Propagation for rejected PUTs - ZenCacheStore::PutResult PutResult = - m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments, Overwrite); - if (PutResult.Status != zen::PutStatus::Success) - { - return WriteFailureResponse(PutResult); - } - m_CacheStats.WriteCount++; - - if (!WriteAttachmentBuffers.empty()) - { - std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); - for (const CidStore::InsertResult& InsertResult : InsertResults) - { - if (InsertResult.New) - { - Count.New++; - } - } - WriteAttachmentBuffers = {}; - WriteRawHashes = {}; - } - - ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total) in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(Body.GetSize()), - ToString(ContentType), - Count.New, - Count.Valid, - Count.Total, - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - - const bool IsPartialRecord = Count.Valid != Count.Total; - - if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) - { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, - .Namespace = Ref.Namespace, - .Key = {Ref.BucketSegment, Ref.HashKey}, - .ValueContentIds = std::move(ValidAttachments)}); - } - - Request.WriteResponse(HttpResponseCode::Created); - } - else - { - m_CacheStats.BadRequestCount++; - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Content-Type invalid"sv); - } -} - -void -HttpStructuredCacheService::HandleCacheChunkRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) -{ - switch (Request.RequestVerb()) - { - case HttpVerb::kHead: - case HttpVerb::kGet: - HandleGetCacheChunk(Request, Ref, PolicyFromUrl); - break; - case HttpVerb::kPut: - HandlePutCacheChunk(Request, Ref, PolicyFromUrl); - break; - default: - break; - } -} - -void -HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) -{ - Stopwatch Timer; - - IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId); - const UpstreamEndpointInfo* Source = nullptr; - CachePolicy Policy = PolicyFromUrl; - - const bool HasUpstream = m_UpstreamCache.IsActive(); - { - const bool QueryUpstream = HasUpstream && !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote); - - if (QueryUpstream) - { - if (GetUpstreamCacheSingleResult UpstreamResult = - m_UpstreamCache.GetCacheChunk(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); - UpstreamResult.Status.Success) - { - IoHash RawHash; - uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(UpstreamResult.Value, RawHash, RawSize)) - { - if (RawHash == Ref.ValueContentId) - { - if (AreDiskWritesAllowed()) - { - m_CidStore.AddChunk(UpstreamResult.Value, RawHash); - } - Source = UpstreamResult.Source; - } - else - { - ZEN_WARN("got missmatching upstream cache value"); - } - } - else - { - ZEN_WARN("got uncompressed upstream cache value"); - } - } - } - } - - if (!Value) - { - ZEN_DEBUG("GETCACHECHUNK MISS - '{}/{}/{}/{}' '{}' in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - Ref.ValueContentId, - ToString(Request.AcceptContentType()), - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - m_CacheStats.MissCount++; - return Request.WriteResponse(HttpResponseCode::NotFound); - } - - ZEN_DEBUG("GETCACHECHUNK HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - Ref.ValueContentId, - NiceBytes(Value.Size()), - ToString(Value.GetContentType()), - Source ? Source->Url : "LOCAL"sv, - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - - m_CacheStats.HitCount++; - if (Source) - { - m_CacheStats.UpstreamHitCount++; - } - - if (EnumHasAllFlags(Policy, CachePolicy::SkipData)) - { - Request.WriteResponse(HttpResponseCode::OK); - } - else - { - Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value); - } -} - -void -HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) -{ - // Note: Individual cacherecord values are not propagated upstream until a valid cache record has been stored - ZEN_UNUSED(PolicyFromUrl); - - Stopwatch Timer; - - IoBuffer Body = Request.ReadPayload(); - - if (!Body || Body.Size() == 0) - { - m_CacheStats.BadRequestCount++; - return Request.WriteResponse(HttpResponseCode::BadRequest); - } - if (!AreDiskWritesAllowed()) - { - return Request.WriteResponse(HttpResponseCode::InsufficientStorage); - } - - Body.SetContentType(Request.RequestContentType()); - - IoHash RawHash; - uint64_t RawSize; - if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize)) - { - m_CacheStats.BadRequestCount++; - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv); - } - - if (RawHash != Ref.ValueContentId) - { - m_CacheStats.BadRequestCount++; - return Request.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "ValueContentId does not match attachment hash"sv); - } - - CidStore::InsertResult Result = m_CidStore.AddChunk(Body, RawHash); - - ZEN_DEBUG("PUTCACHECHUNK - '{}/{}/{}/{}' {} '{}' ({}) in {}", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - Ref.ValueContentId, - NiceBytes(Body.Size()), - ToString(Body.GetContentType()), - Result.New ? "NEW" : "OLD", - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - - const HttpResponseCode ResponseCode = Result.New ? HttpResponseCode::Created : HttpResponseCode::OK; - - Request.WriteResponse(ResponseCode); -} - -void -HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Context, - cache::IRpcRequestReplayer& Replayer, - uint32_t ThreadCount) -{ - WorkerThreadPool WorkerPool(ThreadCount); - uint64_t RequestCount = Replayer.GetRequestCount(); - Stopwatch Timer; - auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); - std::atomic<bool> AbortFlag; - std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - ZEN_INFO("Replaying {} requests", RequestCount); - for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) - { - if (AbortFlag) - { - break; - } - Work.ScheduleWork(WorkerPool, [this, &Context, &Replayer, RequestIndex](std::atomic<bool>& AbortFlag) { - IoBuffer Body; - zen::cache::RecordedRequestInfo RequestInfo = Replayer.GetRequest(RequestIndex, /* out */ Body); - - if (AbortFlag) - { - return; - } - - if (Body) - { - uint32_t AcceptMagic = 0; - RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; - int TargetPid = 0; - CbPackage RpcResult; - if (m_RpcHandler.HandleRpcRequest(Context, - /* UriNamespace */ {}, - RequestInfo.ContentType, - std::move(Body), - AcceptMagic, - AcceptFlags, - TargetPid, - RpcResult) == CacheRpcHandler::RpcResponseCode::OK) - { - if (AcceptMagic == kCbPkgMagic) - { - void* TargetProcessHandle = nullptr; - FormatFlags Flags = FormatFlags::kDefault; - if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) - { - Flags |= FormatFlags::kAllowLocalReferences; - if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) - { - Flags |= FormatFlags::kDenyPartialLocalReferences; - } - TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(Context.SessionId, TargetPid); - } - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle); - ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0); - } - else - { - BinaryWriter MemStream; - RpcResult.Save(MemStream); - IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()); - ZEN_ASSERT(RpcResponseBuffer.Size() > 0); - } - } - } - }); - } - Work.Wait(10000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted, IsPaused); - ZEN_INFO("Replayed {} of {} requests, elapsed {}", - RequestCount - PendingWork, - RequestCount, - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - }); -} - -void -HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request, std::string_view UriNamespace) -{ - ZEN_MEMSCOPE(GetCacheRpcTag()); - - ZEN_TRACE_CPU("z$::Http::HandleRpcRequest"); - - const bool HasUpstream = m_UpstreamCache.IsActive(); - - switch (Request.RequestVerb()) - { - case HttpVerb::kPost: - { - CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; - - const HttpContentType ContentType = Request.RequestContentType(); - const HttpContentType AcceptType = Request.AcceptContentType(); - - if ((ContentType != HttpContentType::kCbObject && ContentType != HttpContentType::kCbPackage) || - AcceptType != HttpContentType::kCbPackage) - { - m_CacheStats.BadRequestCount++; - return Request.WriteResponse(HttpResponseCode::BadRequest); - } - - auto HandleRpc = [this, - RequestContext, - Body = Request.ReadPayload(), - ContentType, - AcceptType, - UriNamespaceString = std::string{UriNamespace}](HttpServerRequest& AsyncRequest) mutable { - if (m_RequestRecordingEnabled) - { - RwLock::SharedLockScope _(m_RequestRecordingLock); - if (m_RequestRecorder) - { - m_RequestRecorder->RecordRequest( - {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId}, - Body); - } - } - - uint32_t AcceptMagic = 0; - RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; - int TargetProcessId = 0; - CbPackage RpcResult; - - CacheRpcHandler::RpcResponseCode ResultCode = m_RpcHandler.HandleRpcRequest(RequestContext, - UriNamespaceString, - ContentType, - std::move(Body), - /* out */ AcceptMagic, - /* out */ AcceptFlags, - /* out */ TargetProcessId, - /* out */ RpcResult); - - HttpResponseCode HttpResultCode = HttpResponseCode(int(ResultCode)); - - if (!IsHttpSuccessCode(HttpResultCode)) - { - return AsyncRequest.WriteResponse(HttpResultCode); - } - - if (AcceptMagic == kCbPkgMagic) - { - void* TargetProcessHandle = nullptr; - FormatFlags Flags = FormatFlags::kDefault; - if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) - { - Flags |= FormatFlags::kAllowLocalReferences; - if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) - { - Flags |= FormatFlags::kDenyPartialLocalReferences; - } - TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(RequestContext.SessionId, TargetProcessId); - } - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle); - AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); - } - else - { - BinaryWriter MemStream; - RpcResult.Save(MemStream); - - AsyncRequest.WriteResponse(HttpResponseCode::OK, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); - } - }; - - if (HasUpstream) - { - ZEN_TRACE_CPU("z$::Http::HandleRpcRequest::WriteResponseAsync"); - Request.WriteResponseAsync(std::move(HandleRpc)); - } - else - { - ZEN_TRACE_CPU("z$::Http::HandleRpcRequest::WriteResponse"); - HandleRpc(Request); - } - } - break; - - default: - m_CacheStats.BadRequestCount++; - Request.WriteResponse(HttpResponseCode::BadRequest); - break; - } -} - -void -HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) -{ - ZEN_MEMSCOPE(GetCacheHttpTag()); - - CbObjectWriter Cbo; - - EmitSnapshot("requests", m_HttpRequests, Cbo); - - const uint64_t HitCount = m_CacheStats.HitCount; - const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount; - const uint64_t MissCount = m_CacheStats.MissCount; - const uint64_t WriteCount = m_CacheStats.WriteCount; - const uint64_t BadRequestCount = m_CacheStats.BadRequestCount; - struct CidStoreStats StoreStats = m_CidStore.Stats(); - const uint64_t ChunkHitCount = StoreStats.HitCount; - const uint64_t ChunkMissCount = StoreStats.MissCount; - const uint64_t ChunkWriteCount = StoreStats.WriteCount; - const uint64_t TotalCount = HitCount + MissCount; - - const uint64_t RpcRequests = m_CacheStats.RpcRequests; - const uint64_t RpcRecordRequests = m_CacheStats.RpcRecordRequests; - const uint64_t RpcRecordBatchRequests = m_CacheStats.RpcRecordBatchRequests; - const uint64_t RpcValueRequests = m_CacheStats.RpcValueRequests; - const uint64_t RpcValueBatchRequests = m_CacheStats.RpcValueBatchRequests; - const uint64_t RpcChunkRequests = m_CacheStats.RpcChunkRequests; - const uint64_t RpcChunkBatchRequests = m_CacheStats.RpcChunkBatchRequests; - - const CidStoreSize CidSize = m_CidStore.TotalSize(); - const CacheStoreSize CacheSize = m_CacheStore.TotalSize(); - - bool ShowCidStoreStats = Request.GetQueryParams().GetValue("cidstorestats") == "true"; - bool ShowCacheStoreStats = Request.GetQueryParams().GetValue("cachestorestats") == "true"; - - CidStoreStats CidStoreStats = {}; - if (ShowCidStoreStats) - { - CidStoreStats = m_CidStore.Stats(); - } - ZenCacheStore::CacheStoreStats CacheStoreStats = {}; - if (ShowCacheStoreStats) - { - CacheStoreStats = m_CacheStore.Stats(); - } - - Cbo.BeginObject("cache"); - { - Cbo << "badrequestcount" << BadRequestCount; - Cbo.BeginObject("rpc"); - Cbo << "count" << RpcRequests; - Cbo << "ops" << RpcRecordBatchRequests + RpcValueBatchRequests + RpcChunkBatchRequests; - Cbo.BeginObject("records"); - Cbo << "count" << RpcRecordRequests; - Cbo << "ops" << RpcRecordBatchRequests; - Cbo.EndObject(); - Cbo.BeginObject("values"); - Cbo << "count" << RpcValueRequests; - Cbo << "ops" << RpcValueBatchRequests; - Cbo.EndObject(); - Cbo.BeginObject("chunks"); - Cbo << "count" << RpcChunkRequests; - Cbo << "ops" << RpcChunkBatchRequests; - Cbo.EndObject(); - Cbo.EndObject(); - - Cbo.BeginObject("size"); - { - Cbo << "disk" << CacheSize.DiskSize; - Cbo << "memory" << CacheSize.MemorySize; - } - Cbo.EndObject(); - - Cbo << "hits" << HitCount << "misses" << MissCount << "writes" << WriteCount; - Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0); - - if (m_UpstreamCache.IsActive()) - { - Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); - Cbo << "upstream_hits" << m_CacheStats.UpstreamHitCount; - Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); - Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); - } - - Cbo << "cidhits" << ChunkHitCount << "cidmisses" << ChunkMissCount << "cidwrites" << ChunkWriteCount; - - if (ShowCacheStoreStats) - { - Cbo.BeginObject("store"); - Cbo << "hits" << CacheStoreStats.HitCount << "misses" << CacheStoreStats.MissCount << "writes" << CacheStoreStats.WriteCount - << "rejected_writes" << CacheStoreStats.RejectedWriteCount << "rejected_reads" << CacheStoreStats.RejectedReadCount; - const uint64_t StoreTotal = CacheStoreStats.HitCount + CacheStoreStats.MissCount; - Cbo << "hit_ratio" << (StoreTotal > 0 ? (double(CacheStoreStats.HitCount) / double(StoreTotal)) : 0.0); - EmitSnapshot("read", CacheStoreStats.GetOps, Cbo); - EmitSnapshot("write", CacheStoreStats.PutOps, Cbo); - if (!CacheStoreStats.NamespaceStats.empty()) - { - Cbo.BeginArray("namespaces"); - for (const ZenCacheStore::NamedNamespaceStats& NamespaceStats : CacheStoreStats.NamespaceStats) - { - Cbo.BeginObject(); - Cbo.AddString("namespace", NamespaceStats.NamespaceName); - Cbo << "hits" << NamespaceStats.Stats.HitCount << "misses" << NamespaceStats.Stats.MissCount << "writes" - << NamespaceStats.Stats.WriteCount; - const uint64_t NamespaceTotal = NamespaceStats.Stats.HitCount + NamespaceStats.Stats.MissCount; - Cbo << "hit_ratio" << (NamespaceTotal > 0 ? (double(NamespaceStats.Stats.HitCount) / double(NamespaceTotal)) : 0.0); - EmitSnapshot("read", NamespaceStats.Stats.GetOps, Cbo); - EmitSnapshot("write", NamespaceStats.Stats.PutOps, Cbo); - Cbo.BeginObject("size"); - { - Cbo << "disk" << NamespaceStats.Stats.DiskStats.DiskSize; - Cbo << "memory" << NamespaceStats.Stats.DiskStats.MemorySize; - } - Cbo.EndObject(); - if (!NamespaceStats.Stats.DiskStats.BucketStats.empty()) - { - Cbo.BeginArray("buckets"); - for (const ZenCacheDiskLayer::NamedBucketStats& BucketStats : NamespaceStats.Stats.DiskStats.BucketStats) - { - Cbo.BeginObject(); - Cbo.AddString("bucket", BucketStats.BucketName); - if (BucketStats.Stats.DiskSize != 0 || BucketStats.Stats.MemorySize != 0) - { - Cbo.BeginObject("size"); - { - Cbo << "disk" << BucketStats.Stats.DiskSize; - Cbo << "memory" << BucketStats.Stats.MemorySize; - } - Cbo.EndObject(); - } - - if (BucketStats.Stats.DiskSize == 0 && BucketStats.Stats.DiskHitCount == 0 && - BucketStats.Stats.DiskMissCount == 0 && BucketStats.Stats.DiskWriteCount == 0 && - BucketStats.Stats.MemoryHitCount == 0 && BucketStats.Stats.MemoryMissCount == 0 && - BucketStats.Stats.MemoryWriteCount == 0) - { - Cbo.EndObject(); - continue; - } - - const uint64_t BucketDiskTotal = BucketStats.Stats.DiskHitCount + BucketStats.Stats.DiskMissCount; - if (BucketDiskTotal != 0 || BucketStats.Stats.DiskWriteCount != 0) - { - Cbo << "hits" << BucketStats.Stats.DiskHitCount << "misses" << BucketStats.Stats.DiskMissCount << "writes" - << BucketStats.Stats.DiskWriteCount; - Cbo << "hit_ratio" - << (BucketDiskTotal > 0 ? (double(BucketStats.Stats.DiskHitCount) / double(BucketDiskTotal)) : 0.0); - } - - const uint64_t BucketMemoryTotal = BucketStats.Stats.MemoryHitCount + BucketStats.Stats.MemoryMissCount; - if (BucketMemoryTotal != 0 || BucketStats.Stats.MemoryWriteCount != 0) - { - Cbo << "mem_hits" << BucketStats.Stats.MemoryHitCount << "mem_misses" << BucketStats.Stats.MemoryMissCount - << "mem_writes" << BucketStats.Stats.MemoryWriteCount; - Cbo << "mem_hit_ratio" - << (BucketMemoryTotal > 0 ? (double(BucketStats.Stats.MemoryHitCount) / double(BucketMemoryTotal)) - : 0.0); - } - - if (BucketDiskTotal != 0 || BucketStats.Stats.DiskWriteCount != 0 || BucketMemoryTotal != 0 || - BucketStats.Stats.MemoryWriteCount != 0) - { - EmitSnapshot("read", BucketStats.Stats.GetOps, Cbo); - EmitSnapshot("write", BucketStats.Stats.PutOps, Cbo); - } - - Cbo.EndObject(); - } - Cbo.EndArray(); - } - Cbo.EndObject(); - } - Cbo.EndArray(); - } - Cbo.EndObject(); - } - Cbo.EndObject(); - } - - if (m_UpstreamCache.IsActive()) - { - EmitSnapshot("upstream_gets", m_UpstreamGetRequestTiming, Cbo); - Cbo.BeginObject("upstream"); - { - m_UpstreamCache.GetStatus(Cbo); - } - Cbo.EndObject(); - } - - Cbo.BeginObject("cid"); - { - Cbo.BeginObject("size"); - { - Cbo << "tiny" << CidSize.TinySize; - Cbo << "small" << CidSize.SmallSize; - Cbo << "large" << CidSize.LargeSize; - Cbo << "total" << CidSize.TotalSize; - } - Cbo.EndObject(); - - if (ShowCidStoreStats) - { - Cbo.BeginObject("store"); - Cbo << "hits" << CidStoreStats.HitCount << "misses" << CidStoreStats.MissCount << "writes" << CidStoreStats.WriteCount; - EmitSnapshot("read", CidStoreStats.FindChunkOps, Cbo); - EmitSnapshot("write", CidStoreStats.AddChunkOps, Cbo); - // EmitSnapshot("exists", CidStoreStats.ContainChunkOps, Cbo); - Cbo.EndObject(); - } - } - Cbo.EndObject(); - - Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); -} - -void -HttpStructuredCacheService::HandleStatusRequest(HttpServerRequest& Request) -{ - CbObjectWriter Cbo; - Cbo << "ok" << true; - Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); -} - -bool -HttpStructuredCacheService::AreDiskWritesAllowed() const -{ - return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed()); -} - -} // namespace zen |