aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache/httpstructuredcache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/cache/httpstructuredcache.cpp')
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp263
1 files changed, 211 insertions, 52 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index 52e31ff40..925c7b42d 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -10,6 +10,7 @@
#include <zencore/enumflags.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/memory/llm.h>
#include <zencore/scopeguard.h>
#include <zencore/stream.h>
#include <zencore/timer.h>
@@ -23,6 +24,7 @@
#include <zenutil/cache/cacherequests.h>
#include <zenutil/cache/rpcrecording.h>
#include <zenutil/packageformat.h>
+#include <zenutil/workerpools.h>
#include "upstream/jupiter.h"
#include "upstream/upstreamcache.h"
@@ -41,6 +43,16 @@
namespace zen {
+const FLLMTag&
+GetCacheHttpTag()
+{
+ static FLLMTag CacheHttpTag("http", FLLMTag("cache"));
+
+ return CacheHttpTag;
+}
+
+extern const FLLMTag& GetCacheRpcTag();
+
using namespace std::literals;
//////////////////////////////////////////////////////////////////////////
@@ -76,7 +88,8 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach
HttpStatsService& StatsService,
HttpStatusService& StatusService,
UpstreamCache& UpstreamCache,
- const DiskWriteBlocker* InDiskWriteBlocker)
+ const DiskWriteBlocker* InDiskWriteBlocker,
+ OpenProcessCache& InOpenProcessCache)
: m_Log(logging::Get("cache"))
, m_CacheStore(InCacheStore)
, m_StatsService(StatsService)
@@ -84,6 +97,7 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach
, m_CidStore(InCidStore)
, m_UpstreamCache(UpstreamCache)
, m_DiskWriteBlocker(InDiskWriteBlocker)
+, m_OpenProcessCache(InOpenProcessCache)
, m_RpcHandler(m_Log, m_CacheStats, UpstreamCache, InCacheStore, InCidStore, InDiskWriteBlocker)
{
m_StatsService.RegisterHandler("z$", *this);
@@ -364,12 +378,31 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
{
ZEN_TRACE_CPU("z$::Http::HandleRequest");
+ ZEN_MEMSCOPE(GetCacheHttpTag());
+
metrics::OperationTiming::Scope $(m_HttpRequests);
- std::string_view Key = Request.RelativeUri();
- if (Key == HttpZCacheRPCPrefix)
+ const std::string_view Key = Request.RelativeUri();
+
+ std::string_view UriNamespace;
+
+ if (Key.ends_with(HttpZCacheRPCPrefix))
{
- return HandleRpcRequest(Request);
+ const size_t RpcOffset = Key.length() - HttpZCacheRPCPrefix.length();
+
+ if (RpcOffset)
+ {
+ std::string_view KeyPrefix = Key.substr(0, RpcOffset);
+
+ if (KeyPrefix.back() == '/')
+ {
+ KeyPrefix.remove_suffix(1);
+
+ UriNamespace = KeyPrefix;
+ }
+ }
+
+ return HandleRpcRequest(Request, UriNamespace);
}
if (Key == HttpZCacheUtilStartRecording)
@@ -589,6 +622,82 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque
ResponseWriter.AddInteger("EntryCount", Info->DiskLayerInfo.EntryCount);
+ if (auto Buckets = HttpServerRequest::Decode(Request.GetQueryParams().GetValue("bucketsizes")); !Buckets.empty())
+ {
+ ResponseWriter.BeginObject("BucketSizes");
+
+ ResponseWriter.BeginArray("Buckets");
+
+ std::vector<std::string> BucketNames;
+ if (Buckets == "*") // Get all - empty FieldFilter equal getting all fields
+ {
+ BucketNames = Info.value().BucketNames;
+ }
+ else
+ {
+ ForEachStrTok(Buckets, ',', [&](std::string_view BucketName) {
+ BucketNames.push_back(std::string(BucketName));
+ return true;
+ });
+ }
+ WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Background);
+ std::vector<IoHash> AllAttachments;
+ for (const std::string& BucketName : BucketNames)
+ {
+ ResponseWriter.BeginObject();
+ ResponseWriter << "Name" << BucketName;
+ CacheContentStats ContentStats;
+ bool Success = m_CacheStore.GetContentStats(NamespaceName, BucketName, ContentStats);
+ if (Success)
+ {
+ size_t ValuesSize = 0;
+ for (const uint64_t Size : ContentStats.ValueSizes)
+ {
+ ValuesSize += Size;
+ }
+
+ std::sort(ContentStats.Attachments.begin(), ContentStats.Attachments.end());
+ auto NewEnd = std::unique(ContentStats.Attachments.begin(), ContentStats.Attachments.end());
+ ContentStats.Attachments.erase(NewEnd, ContentStats.Attachments.end());
+
+ ResponseWriter << "Count" << ContentStats.ValueSizes.size();
+ ResponseWriter << "StructuredCount" << ContentStats.StructuredValuesCount;
+ ResponseWriter << "StandaloneCount" << ContentStats.StandaloneValuesCount;
+ ResponseWriter << "Size" << ValuesSize;
+ ResponseWriter << "AttachmentCount" << ContentStats.Attachments.size();
+
+ AllAttachments.insert(AllAttachments.end(), ContentStats.Attachments.begin(), ContentStats.Attachments.end());
+ }
+ ResponseWriter.EndObject();
+ }
+
+ ResponseWriter.EndArray();
+
+ ResponseWriter.BeginObject("Attachments");
+ std::sort(AllAttachments.begin(), AllAttachments.end());
+ auto NewEnd = std::unique(AllAttachments.begin(), AllAttachments.end());
+ AllAttachments.erase(NewEnd, AllAttachments.end());
+
+ uint64_t AttachmentsSize = 0;
+
+ m_CidStore.IterateChunks(
+ AllAttachments,
+ [&](size_t Index, const IoBuffer& Payload) {
+ ZEN_UNUSED(Index);
+ AttachmentsSize += Payload.GetSize();
+ return true;
+ },
+ &WorkerPool,
+ 8u * 1024u);
+
+ ResponseWriter << "Count" << AllAttachments.size();
+ ResponseWriter << "Size" << AttachmentsSize;
+
+ ResponseWriter.EndObject();
+
+ ResponseWriter.EndObject();
+ }
+
return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save());
}
break;
@@ -639,6 +748,46 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request,
ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount);
+ if (auto GetBucketSize = Request.GetQueryParams().GetValue("bucketsize"); GetBucketSize == "true")
+ {
+ CacheContentStats ContentStats;
+ bool Success = m_CacheStore.GetContentStats(NamespaceName, BucketName, ContentStats);
+ if (Success)
+ {
+ size_t ValuesSize = 0;
+ for (const uint64_t Size : ContentStats.ValueSizes)
+ {
+ ValuesSize += Size;
+ }
+
+ std::sort(ContentStats.Attachments.begin(), ContentStats.Attachments.end());
+ auto NewEnd = std::unique(ContentStats.Attachments.begin(), ContentStats.Attachments.end());
+ ContentStats.Attachments.erase(NewEnd, ContentStats.Attachments.end());
+
+ ResponseWriter << "Count" << ContentStats.ValueSizes.size();
+ ResponseWriter << "StructuredCount" << ContentStats.StructuredValuesCount;
+ ResponseWriter << "StandaloneCount" << ContentStats.StandaloneValuesCount;
+ ResponseWriter << "Size" << ValuesSize;
+ ResponseWriter << "AttachmentCount" << ContentStats.Attachments.size();
+
+ uint64_t AttachmentsSize = 0;
+
+ WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Background);
+
+ m_CidStore.IterateChunks(
+ ContentStats.Attachments,
+ [&](size_t Index, const IoBuffer& Payload) {
+ ZEN_UNUSED(Index);
+ AttachmentsSize += Payload.GetSize();
+ return true;
+ },
+ &WorkerPool,
+ 8u * 1024u);
+
+ ResponseWriter << "AttachmentsSize" << AttachmentsSize;
+ }
+ }
+
return Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save());
}
break;
@@ -1452,6 +1601,7 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co
int TargetPid = 0;
CbPackage RpcResult;
if (m_RpcHandler.HandleRpcRequest(Context,
+ /* UriNamespace */ {},
RequestInfo.ContentType,
std::move(Body),
AcceptMagic,
@@ -1497,8 +1647,10 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co
}
void
-HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
+HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request, std::string_view UriNamespace)
{
+ ZEN_MEMSCOPE(GetCacheRpcTag());
+
ZEN_TRACE_CPU("z$::Http::HandleRpcRequest");
const bool HasUpstream = m_UpstreamCache.IsActive();
@@ -1519,65 +1671,70 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
- auto HandleRpc =
- [this, RequestContext, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable {
- if (m_RequestRecordingEnabled)
+ auto HandleRpc = [this,
+ RequestContext,
+ Body = Request.ReadPayload(),
+ ContentType,
+ AcceptType,
+ UriNamespaceString = std::string{UriNamespace}](HttpServerRequest& AsyncRequest) mutable {
+ if (m_RequestRecordingEnabled)
+ {
+ RwLock::SharedLockScope _(m_RequestRecordingLock);
+ if (m_RequestRecorder)
{
- RwLock::SharedLockScope _(m_RequestRecordingLock);
- if (m_RequestRecorder)
- {
- m_RequestRecorder->RecordRequest(
- {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId},
- Body);
- }
+ m_RequestRecorder->RecordRequest(
+ {.ContentType = ContentType, .AcceptType = AcceptType, .SessionId = RequestContext.SessionId},
+ Body);
}
+ }
- uint32_t AcceptMagic = 0;
- RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
- int TargetProcessId = 0;
- CbPackage RpcResult;
+ uint32_t AcceptMagic = 0;
+ RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
+ int TargetProcessId = 0;
+ CbPackage RpcResult;
- CacheRpcHandler::RpcResponseCode ResultCode = m_RpcHandler.HandleRpcRequest(RequestContext,
- ContentType,
- std::move(Body),
- AcceptMagic,
- AcceptFlags,
- TargetProcessId,
- RpcResult);
+ CacheRpcHandler::RpcResponseCode ResultCode = m_RpcHandler.HandleRpcRequest(RequestContext,
+ UriNamespaceString,
+ ContentType,
+ std::move(Body),
+ /* out */ AcceptMagic,
+ /* out */ AcceptFlags,
+ /* out */ TargetProcessId,
+ /* out */ RpcResult);
- HttpResponseCode HttpResultCode = HttpResponseCode(int(ResultCode));
+ HttpResponseCode HttpResultCode = HttpResponseCode(int(ResultCode));
- if (!IsHttpSuccessCode(HttpResultCode))
- {
- return AsyncRequest.WriteResponse(HttpResultCode);
- }
+ if (!IsHttpSuccessCode(HttpResultCode))
+ {
+ return AsyncRequest.WriteResponse(HttpResultCode);
+ }
- if (AcceptMagic == kCbPkgMagic)
+ if (AcceptMagic == kCbPkgMagic)
+ {
+ void* TargetProcessHandle = nullptr;
+ FormatFlags Flags = FormatFlags::kDefault;
+ if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
{
- void* TargetProcessHandle = nullptr;
- FormatFlags Flags = FormatFlags::kDefault;
- if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
+ Flags |= FormatFlags::kAllowLocalReferences;
+ if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
{
- Flags |= FormatFlags::kAllowLocalReferences;
- if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
- {
- Flags |= FormatFlags::kDenyPartialLocalReferences;
- }
- TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(RequestContext.SessionId, TargetProcessId);
+ Flags |= FormatFlags::kDenyPartialLocalReferences;
}
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle);
- AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
+ TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(RequestContext.SessionId, TargetProcessId);
}
- else
- {
- BinaryWriter MemStream;
- RpcResult.Save(MemStream);
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessHandle);
+ AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
+ else
+ {
+ BinaryWriter MemStream;
+ RpcResult.Save(MemStream);
- AsyncRequest.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
- }
- };
+ AsyncRequest.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ }
+ };
if (HasUpstream)
{
@@ -1602,6 +1759,8 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
void
HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request)
{
+ ZEN_MEMSCOPE(GetCacheHttpTag());
+
CbObjectWriter Cbo;
EmitSnapshot("requests", m_HttpRequests, Cbo);