diff options
Diffstat (limited to 'src/zenserver/cache/httpstructuredcache.cpp')
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 263 |
1 files changed, 211 insertions, 52 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 52e31ff40..925c7b42d 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -10,6 +10,7 @@ #include <zencore/enumflags.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/memory/llm.h> #include <zencore/scopeguard.h> #include <zencore/stream.h> #include <zencore/timer.h> @@ -23,6 +24,7 @@ #include <zenutil/cache/cacherequests.h> #include <zenutil/cache/rpcrecording.h> #include <zenutil/packageformat.h> +#include <zenutil/workerpools.h> #include "upstream/jupiter.h" #include "upstream/upstreamcache.h" @@ -41,6 +43,16 @@ namespace zen { +const FLLMTag& +GetCacheHttpTag() +{ + static FLLMTag CacheHttpTag("http", FLLMTag("cache")); + + return CacheHttpTag; +} + +extern const FLLMTag& GetCacheRpcTag(); + using namespace std::literals; ////////////////////////////////////////////////////////////////////////// @@ -76,7 +88,8 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach HttpStatsService& StatsService, HttpStatusService& StatusService, UpstreamCache& UpstreamCache, - const DiskWriteBlocker* InDiskWriteBlocker) + const DiskWriteBlocker* InDiskWriteBlocker, + OpenProcessCache& InOpenProcessCache) : m_Log(logging::Get("cache")) , m_CacheStore(InCacheStore) , m_StatsService(StatsService) @@ -84,6 +97,7 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach , m_CidStore(InCidStore) , m_UpstreamCache(UpstreamCache) , m_DiskWriteBlocker(InDiskWriteBlocker) +, m_OpenProcessCache(InOpenProcessCache) , m_RpcHandler(m_Log, m_CacheStats, UpstreamCache, InCacheStore, InCidStore, InDiskWriteBlocker) { m_StatsService.RegisterHandler("z$", *this); @@ -364,12 +378,31 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) { ZEN_TRACE_CPU("z$::Http::HandleRequest"); + ZEN_MEMSCOPE(GetCacheHttpTag()); + metrics::OperationTiming::Scope $(m_HttpRequests); - std::string_view Key = Request.RelativeUri(); - if (Key == HttpZCacheRPCPrefix) + const std::string_view Key = Request.RelativeUri(); + + std::string_view UriNamespace; + + if (Key.ends_with(HttpZCacheRPCPrefix)) { - return HandleRpcRequest(Request); + const size_t RpcOffset = Key.length() - HttpZCacheRPCPrefix.length(); + + if (RpcOffset) + { + std::string_view KeyPrefix = Key.substr(0, RpcOffset); + + if (KeyPrefix.back() == '/') + { + KeyPrefix.remove_suffix(1); + + UriNamespace = KeyPrefix; + } + } + + return HandleRpcRequest(Request, UriNamespace); } if (Key == HttpZCacheUtilStartRecording) @@ -589,6 +622,82 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque ResponseWriter.AddInteger("EntryCount", Info->DiskLayerInfo.EntryCount); + if (auto Buckets = HttpServerRequest::Decode(Request.GetQueryParams().GetValue("bucketsizes")); !Buckets.empty()) + { + ResponseWriter.BeginObject("BucketSizes"); + + ResponseWriter.BeginArray("Buckets"); + + std::vector<std::string> BucketNames; + if (Buckets == "*") // Get all - empty FieldFilter equal getting all fields + { + BucketNames = Info.value().BucketNames; + } + else + { + ForEachStrTok(Buckets, ',', [&](std::string_view BucketName) { + BucketNames.push_back(std::string(BucketName)); + return true; + }); + } + WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Background); + std::vector<IoHash> AllAttachments; + for (const std::string& BucketName : BucketNames) + { + ResponseWriter.BeginObject(); + ResponseWriter << "Name" << BucketName; + CacheContentStats ContentStats; + bool Success = m_CacheStore.GetContentStats(NamespaceName, BucketName, ContentStats); + if (Success) + { + size_t ValuesSize = 0; + for (const uint64_t Size : ContentStats.ValueSizes) + { + ValuesSize += Size; + } + + std::sort(ContentStats.Attachments.begin(), ContentStats.Attachments.end()); + auto NewEnd = std::unique(ContentStats.Attachments.begin(), ContentStats.Attachments.end()); + ContentStats.Attachments.erase(NewEnd, ContentStats.Attachments.end()); + + ResponseWriter << "Count" << ContentStats.ValueSizes.size(); + ResponseWriter << "StructuredCount" << ContentStats.StructuredValuesCount; + ResponseWriter << "StandaloneCount" << ContentStats.StandaloneValuesCount; + ResponseWriter << "Size" << ValuesSize; + ResponseWriter << "AttachmentCount" << ContentStats.Attachments.size(); + + AllAttachments.insert(AllAttachments.end(), ContentStats.Attachments.begin(), ContentStats.Attachments.end()); + } + ResponseWriter.EndObject(); + } + + ResponseWriter.EndArray(); + + ResponseWriter.BeginObject("Attachments"); + std::sort(AllAttachments.begin(), AllAttachments.end()); + auto NewEnd = std::unique(AllAttachments.begin(), AllAttachments.end()); + AllAttachments.erase(NewEnd, AllAttachments.end()); + + uint64_t AttachmentsSize = 0; + + m_CidStore.IterateChunks( + AllAttachments, + [&](size_t Index, const IoBuffer& Payload) { + ZEN_UNUSED(Index); + AttachmentsSize += Payload.GetSize(); + return true; + }, + &WorkerPool, + 8u * 1024u); + + ResponseWriter << "Count" << AllAttachments.size(); + ResponseWriter << "Size" << AttachmentsSize; + + ResponseWriter.EndObject(); + + ResponseWriter.EndObject(); + } + return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); } break; @@ -639,6 +748,46 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount); + if (auto GetBucketSize = Request.GetQueryParams().GetValue("bucketsize"); GetBucketSize == "true") + { + CacheContentStats ContentStats; + bool Success = m_CacheStore.GetContentStats(NamespaceName, BucketName, ContentStats); + if (Success) + { + size_t ValuesSize = 0; + for (const uint64_t Size : ContentStats.ValueSizes) + { + ValuesSize += Size; + } + + std::sort(ContentStats.Attachments.begin(), ContentStats.Attachments.end()); + auto NewEnd = std::unique(ContentStats.Attachments.begin(), ContentStats.Attachments.end()); + ContentStats.Attachments.erase(NewEnd, ContentStats.Attachments.end()); + + ResponseWriter << "Count" << ContentStats.ValueSizes.size(); + ResponseWriter << "StructuredCount" << ContentStats.StructuredValuesCount; + ResponseWriter << "StandaloneCount" << ContentStats.StandaloneValuesCount; + ResponseWriter << "Size" << ValuesSize; + ResponseWriter << "AttachmentCount" << ContentStats.Attachments.size(); + + uint64_t AttachmentsSize = 0; + + WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Background); + + m_CidStore.IterateChunks( + ContentStats.Attachments, + [&](size_t Index, const IoBuffer& Payload) { + ZEN_UNUSED(Index); + AttachmentsSize += Payload.GetSize(); + return true; + }, + &WorkerPool, + 8u * 1024u); + + ResponseWriter << "AttachmentsSize" << AttachmentsSize; + } + } + return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); } break; @@ -1452,6 +1601,7 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co int TargetPid = 0; CbPackage RpcResult; if (m_RpcHandler.HandleRpcRequest(Context, + /* UriNamespace */ {}, RequestInfo.ContentType, std::move(Body), AcceptMagic, @@ -1497,8 +1647,10 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co } void -HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) +HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request, std::string_view UriNamespace) { + ZEN_MEMSCOPE(GetCacheRpcTag()); + ZEN_TRACE_CPU("z$::Http::HandleRpcRequest"); const bool HasUpstream = m_UpstreamCache.IsActive(); @@ -1519,65 +1671,70 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) return Request.WriteResponse(HttpResponseCode::BadRequest); } - auto HandleRpc = - [this, RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable { - if (m_RequestRecordingEnabled) + auto HandleRpc = [this, + RequestContext, + Body = Request.ReadPayload(), + ContentType, + AcceptType, + UriNamespaceString = std::string{UriNamespace}](HttpServerRequest& AsyncRequest) mutable { + if (m_RequestRecordingEnabled) + { + RwLock::SharedLockScope _(m_RequestRecordingLock); + if (m_RequestRecorder) { - RwLock::SharedLockScope _(m_RequestRecordingLock); - if (m_RequestRecorder) - { - m_RequestRecorder->RecordRequest( - {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId}, - Body); - } + m_RequestRecorder->RecordRequest( + {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId}, + Body); } + } - uint32_t AcceptMagic = 0; - RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; - int TargetProcessId = 0; - CbPackage RpcResult; + uint32_t AcceptMagic = 0; + RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; + int TargetProcessId = 0; + CbPackage RpcResult; - CacheRpcHandler::RpcResponseCode ResultCode = m_RpcHandler.HandleRpcRequest(RequestContext, - ContentType, - std::move(Body), - AcceptMagic, - AcceptFlags, - TargetProcessId, - RpcResult); + CacheRpcHandler::RpcResponseCode ResultCode = m_RpcHandler.HandleRpcRequest(RequestContext, + UriNamespaceString, + ContentType, + std::move(Body), + /* out */ AcceptMagic, + /* out */ AcceptFlags, + /* out */ TargetProcessId, + /* out */ RpcResult); - HttpResponseCode HttpResultCode = HttpResponseCode(int(ResultCode)); + HttpResponseCode HttpResultCode = HttpResponseCode(int(ResultCode)); - if (!IsHttpSuccessCode(HttpResultCode)) - { - return AsyncRequest.WriteResponse(HttpResultCode); - } + if (!IsHttpSuccessCode(HttpResultCode)) + { + return AsyncRequest.WriteResponse(HttpResultCode); + } - if (AcceptMagic == kCbPkgMagic) + if (AcceptMagic == kCbPkgMagic) + { + void* TargetProcessHandle = nullptr; + FormatFlags Flags = FormatFlags::kDefault; + if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) { - void* TargetProcessHandle = nullptr; - FormatFlags Flags = FormatFlags::kDefault; - if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) + Flags |= FormatFlags::kAllowLocalReferences; + if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) { - Flags |= FormatFlags::kAllowLocalReferences; - if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) - { - Flags |= FormatFlags::kDenyPartialLocalReferences; - } - TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(RequestContext.SessionId, TargetProcessId); + Flags |= FormatFlags::kDenyPartialLocalReferences; } - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle); - AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(RequestContext.SessionId, TargetProcessId); } - else - { - BinaryWriter MemStream; - RpcResult.Save(MemStream); + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle); + AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + } + else + { + BinaryWriter MemStream; + RpcResult.Save(MemStream); - AsyncRequest.WriteResponse(HttpResponseCode::OK, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); - } - }; + AsyncRequest.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + } + }; if (HasUpstream) { @@ -1602,6 +1759,8 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) void HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) { + ZEN_MEMSCOPE(GetCacheHttpTag()); + CbObjectWriter Cbo; EmitSnapshot("requests", m_HttpRequests, Cbo); |