diff options
| author | Stefan Boberg <[email protected]> | 2023-12-20 16:03:35 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-12-20 16:03:35 +0100 |
| commit | 086558fd15f884cd29d1e6941a8576190c0b650d (patch) | |
| tree | 71e5e729be82d1825a228931d9c03376c659b5ca /src/zenserver/cache/httpstructuredcache.h | |
| parent | move cachedisklayer and structuredcachestore into zenstore (#624) (diff) | |
| download | zen-086558fd15f884cd29d1e6941a8576190c0b650d.tar.xz zen-086558fd15f884cd29d1e6941a8576190c0b650d.zip | |
separate RPC processing from HTTP processing (#626)
* moved all RPC processing from HttpStructuredCacheService into separate CacheRpcHandler class in zenstore
* move package marshaling to zenutil. was previously in zenhttp/httpshared but it's useful in other contexts as well where we don't want to depend on zenhttp
* introduced UpstreamCacheClient, this provides a subset of functions on UpstreamCache and lives in zenstore
Diffstat (limited to 'src/zenserver/cache/httpstructuredcache.h')
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.h | 85 |
1 files changed, 7 insertions, 78 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.h b/src/zenserver/cache/httpstructuredcache.h index 2feaaead8..da4bdd63c 100644 --- a/src/zenserver/cache/httpstructuredcache.h +++ b/src/zenserver/cache/httpstructuredcache.h @@ -6,6 +6,7 @@ #include <zenhttp/httpserver.h> #include <zenhttp/httpstats.h> #include <zenhttp/httpstatus.h> +#include <zenstore/cache/cacherpc.h> #include <zenutil/cache/cache.h> #include <zenutil/openprocesscache.h> @@ -16,13 +17,16 @@ namespace zen { struct CacheChunkRequest; struct CacheKeyRequest; +struct PutRequestData; + class CidStore; class CbObjectView; -struct PutRequestData; +class DiskWriteBlocker; +class HttpStructuredCacheService; class ScrubContext; class UpstreamCache; class ZenCacheStore; -class DiskWriteBlocker; + enum class CachePolicy : uint32_t; enum class RpcAcceptOptions : uint16_t; @@ -89,28 +93,6 @@ private: IoHash ValueContentId; }; - struct CacheStats - { - std::atomic_uint64_t HitCount{}; - std::atomic_uint64_t UpstreamHitCount{}; - std::atomic_uint64_t MissCount{}; - std::atomic_uint64_t WriteCount{}; - std::atomic_uint64_t BadRequestCount{}; - std::atomic_uint64_t RpcRequests{}; - std::atomic_uint64_t RpcRecordRequests{}; - std::atomic_uint64_t RpcRecordBatchRequests{}; - std::atomic_uint64_t RpcValueRequests{}; - std::atomic_uint64_t RpcValueBatchRequests{}; - std::atomic_uint64_t RpcChunkRequests{}; - std::atomic_uint64_t RpcChunkBatchRequests{}; - }; - enum class PutResult - { - Success, - Fail, - Invalid, - }; - void HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); void HandleGetCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); void HandlePutCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); @@ -120,57 +102,11 @@ private: void HandleRpcRequest(HttpServerRequest& Request); void HandleDetailsRequest(HttpServerRequest& Request); - CbPackage HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest); - CbPackage HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView BatchRequest); - CbPackage HandleRpcPutCacheValues(const CacheRequestContext& Context, const CbPackage& BatchRequest); - CbPackage HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest); - CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbObjectView BatchRequest); - HttpResponseCode HandleRpcRequest(const CacheRequestContext& Context, - const ZenContentType ContentType, - IoBuffer&& Body, - uint32_t& OutAcceptMagic, - RpcAcceptOptions& OutAcceptFlags, - int& OutTargetProcessId, - CbPackage& OutPackage); - void HandleCacheRequest(HttpServerRequest& Request); void HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view Namespace); void HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Namespace, std::string_view Bucket); virtual void HandleStatsRequest(HttpServerRequest& Request) override; virtual void HandleStatusRequest(HttpServerRequest& Request) override; - PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); - - /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */ - bool ParseGetCacheChunksRequest(std::string& Namespace, - std::vector<CacheKeyRequest>& RecordKeys, - std::vector<cache::detail::RecordBody>& Records, - std::vector<CacheChunkRequest>& RequestKeys, - std::vector<cache::detail::ChunkRequest>& Requests, - std::vector<cache::detail::ChunkRequest*>& RecordRequests, - std::vector<cache::detail::ChunkRequest*>& ValueRequests, - CbObjectView RpcRequest); - /** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally. */ - void GetLocalCacheRecords(const CacheRequestContext& Context, - std::string_view Namespace, - std::vector<CacheKeyRequest>& RecordKeys, - std::vector<cache::detail::RecordBody>& Records, - std::vector<cache::detail::ChunkRequest*>& RecordRequests, - std::vector<CacheChunkRequest*>& OutUpstreamChunks); - /** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */ - void GetLocalCacheValues(const CacheRequestContext& Context, - std::string_view Namespace, - std::vector<cache::detail::ChunkRequest*>& ValueRequests, - std::vector<CacheChunkRequest*>& OutUpstreamChunks); - /** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */ - void GetUpstreamCacheChunks(const CacheRequestContext& Context, - std::string_view Namespace, - std::vector<CacheChunkRequest*>& UpstreamChunks, - std::vector<CacheChunkRequest>& RequestKeys, - std::vector<cache::detail::ChunkRequest>& Requests); - /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */ - CbPackage WriteGetCacheChunksResponse(const CacheRequestContext& Context, - std::string_view Namespace, - std::vector<cache::detail::ChunkRequest>& Requests); bool AreDiskWritesAllowed() const; @@ -187,6 +123,7 @@ private: CacheStats m_CacheStats; const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; OpenProcessCache m_OpenProcessCache; + CacheRpcHandler m_RpcHandler; void ReplayRequestRecorder(const CacheRequestContext& Context, cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount); @@ -199,14 +136,6 @@ private: std::unique_ptr<cache::IRpcRequestRecorder> m_RequestRecorder; }; -/** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys. - * We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers. */ -inline bool -IsCompressedBinary(ZenContentType Type) -{ - return Type == ZenContentType::kBinary || Type == ZenContentType::kCompressedBinary; -} - void z$service_forcelink(); } // namespace zen |