From 086558fd15f884cd29d1e6941a8576190c0b650d Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Wed, 20 Dec 2023 16:03:35 +0100 Subject: separate RPC processing from HTTP processing (#626) * moved all RPC processing from HttpStructuredCacheService into separate CacheRpcHandler class in zenstore * move package marshaling to zenutil. was previously in zenhttp/httpshared but it's useful in other contexts as well where we don't want to depend on zenhttp * introduced UpstreamCacheClient, this provides a subset of functions on UpstreamCache and lives in zenstore --- src/zenserver/cache/httpstructuredcache.h | 85 +++---------------------------- 1 file changed, 7 insertions(+), 78 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.h') diff --git a/src/zenserver/cache/httpstructuredcache.h b/src/zenserver/cache/httpstructuredcache.h index 2feaaead8..da4bdd63c 100644 --- a/src/zenserver/cache/httpstructuredcache.h +++ b/src/zenserver/cache/httpstructuredcache.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -16,13 +17,16 @@ namespace zen { struct CacheChunkRequest; struct CacheKeyRequest; +struct PutRequestData; + class CidStore; class CbObjectView; -struct PutRequestData; +class DiskWriteBlocker; +class HttpStructuredCacheService; class ScrubContext; class UpstreamCache; class ZenCacheStore; -class DiskWriteBlocker; + enum class CachePolicy : uint32_t; enum class RpcAcceptOptions : uint16_t; @@ -89,28 +93,6 @@ private: IoHash ValueContentId; }; - struct CacheStats - { - std::atomic_uint64_t HitCount{}; - std::atomic_uint64_t UpstreamHitCount{}; - std::atomic_uint64_t MissCount{}; - std::atomic_uint64_t WriteCount{}; - std::atomic_uint64_t BadRequestCount{}; - std::atomic_uint64_t RpcRequests{}; - std::atomic_uint64_t RpcRecordRequests{}; - std::atomic_uint64_t RpcRecordBatchRequests{}; - std::atomic_uint64_t RpcValueRequests{}; - std::atomic_uint64_t RpcValueBatchRequests{}; - std::atomic_uint64_t RpcChunkRequests{}; - std::atomic_uint64_t RpcChunkBatchRequests{}; - }; - enum class PutResult - { - Success, - Fail, - Invalid, - }; - void HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); void HandleGetCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); void HandlePutCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); @@ -120,57 +102,11 @@ private: void HandleRpcRequest(HttpServerRequest& Request); void HandleDetailsRequest(HttpServerRequest& Request); - CbPackage HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest); - CbPackage HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView BatchRequest); - CbPackage HandleRpcPutCacheValues(const CacheRequestContext& Context, const CbPackage& BatchRequest); - CbPackage HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest); - CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbObjectView BatchRequest); - HttpResponseCode HandleRpcRequest(const CacheRequestContext& Context, - const ZenContentType ContentType, - IoBuffer&& Body, - uint32_t& OutAcceptMagic, - RpcAcceptOptions& OutAcceptFlags, - int& OutTargetProcessId, - CbPackage& OutPackage); - void HandleCacheRequest(HttpServerRequest& Request); void HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view Namespace); void HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Namespace, std::string_view Bucket); virtual void HandleStatsRequest(HttpServerRequest& Request) override; virtual void HandleStatusRequest(HttpServerRequest& Request) override; - PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); - - /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */ - bool ParseGetCacheChunksRequest(std::string& Namespace, - std::vector& RecordKeys, - std::vector& Records, - std::vector& RequestKeys, - std::vector& Requests, - std::vector& RecordRequests, - std::vector& ValueRequests, - CbObjectView RpcRequest); - /** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally. */ - void GetLocalCacheRecords(const CacheRequestContext& Context, - std::string_view Namespace, - std::vector& RecordKeys, - std::vector& Records, - std::vector& RecordRequests, - std::vector& OutUpstreamChunks); - /** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */ - void GetLocalCacheValues(const CacheRequestContext& Context, - std::string_view Namespace, - std::vector& ValueRequests, - std::vector& OutUpstreamChunks); - /** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */ - void GetUpstreamCacheChunks(const CacheRequestContext& Context, - std::string_view Namespace, - std::vector& UpstreamChunks, - std::vector& RequestKeys, - std::vector& Requests); - /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */ - CbPackage WriteGetCacheChunksResponse(const CacheRequestContext& Context, - std::string_view Namespace, - std::vector& Requests); bool AreDiskWritesAllowed() const; @@ -187,6 +123,7 @@ private: CacheStats m_CacheStats; const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; OpenProcessCache m_OpenProcessCache; + CacheRpcHandler m_RpcHandler; void ReplayRequestRecorder(const CacheRequestContext& Context, cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount); @@ -199,14 +136,6 @@ private: std::unique_ptr m_RequestRecorder; }; -/** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys. - * We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers. */ -inline bool -IsCompressedBinary(ZenContentType Type) -{ - return Type == ZenContentType::kBinary || Type == ZenContentType::kCompressedBinary; -} - void z$service_forcelink(); } // namespace zen -- cgit v1.2.3