aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache/httpstructuredcache.h
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-12-20 16:03:35 +0100
committerGitHub <[email protected]>2023-12-20 16:03:35 +0100
commit086558fd15f884cd29d1e6941a8576190c0b650d (patch)
tree71e5e729be82d1825a228931d9c03376c659b5ca /src/zenserver/cache/httpstructuredcache.h
parentmove cachedisklayer and structuredcachestore into zenstore (#624) (diff)
downloadzen-086558fd15f884cd29d1e6941a8576190c0b650d.tar.xz
zen-086558fd15f884cd29d1e6941a8576190c0b650d.zip
separate RPC processing from HTTP processing (#626)
* moved all RPC processing from HttpStructuredCacheService into separate CacheRpcHandler class in zenstore * move package marshaling to zenutil. was previously in zenhttp/httpshared but it's useful in other contexts as well where we don't want to depend on zenhttp * introduced UpstreamCacheClient, this provides a subset of functions on UpstreamCache and lives in zenstore
Diffstat (limited to 'src/zenserver/cache/httpstructuredcache.h')
-rw-r--r--src/zenserver/cache/httpstructuredcache.h85
1 files changed, 7 insertions, 78 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.h b/src/zenserver/cache/httpstructuredcache.h
index 2feaaead8..da4bdd63c 100644
--- a/src/zenserver/cache/httpstructuredcache.h
+++ b/src/zenserver/cache/httpstructuredcache.h
@@ -6,6 +6,7 @@
#include <zenhttp/httpserver.h>
#include <zenhttp/httpstats.h>
#include <zenhttp/httpstatus.h>
+#include <zenstore/cache/cacherpc.h>
#include <zenutil/cache/cache.h>
#include <zenutil/openprocesscache.h>
@@ -16,13 +17,16 @@ namespace zen {
struct CacheChunkRequest;
struct CacheKeyRequest;
+struct PutRequestData;
+
class CidStore;
class CbObjectView;
-struct PutRequestData;
+class DiskWriteBlocker;
+class HttpStructuredCacheService;
class ScrubContext;
class UpstreamCache;
class ZenCacheStore;
-class DiskWriteBlocker;
+
enum class CachePolicy : uint32_t;
enum class RpcAcceptOptions : uint16_t;
@@ -89,28 +93,6 @@ private:
IoHash ValueContentId;
};
- struct CacheStats
- {
- std::atomic_uint64_t HitCount{};
- std::atomic_uint64_t UpstreamHitCount{};
- std::atomic_uint64_t MissCount{};
- std::atomic_uint64_t WriteCount{};
- std::atomic_uint64_t BadRequestCount{};
- std::atomic_uint64_t RpcRequests{};
- std::atomic_uint64_t RpcRecordRequests{};
- std::atomic_uint64_t RpcRecordBatchRequests{};
- std::atomic_uint64_t RpcValueRequests{};
- std::atomic_uint64_t RpcValueBatchRequests{};
- std::atomic_uint64_t RpcChunkRequests{};
- std::atomic_uint64_t RpcChunkBatchRequests{};
- };
- enum class PutResult
- {
- Success,
- Fail,
- Invalid,
- };
-
void HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
void HandleGetCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
void HandlePutCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
@@ -120,57 +102,11 @@ private:
void HandleRpcRequest(HttpServerRequest& Request);
void HandleDetailsRequest(HttpServerRequest& Request);
- CbPackage HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest);
- CbPackage HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView BatchRequest);
- CbPackage HandleRpcPutCacheValues(const CacheRequestContext& Context, const CbPackage& BatchRequest);
- CbPackage HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest);
- CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbObjectView BatchRequest);
- HttpResponseCode HandleRpcRequest(const CacheRequestContext& Context,
- const ZenContentType ContentType,
- IoBuffer&& Body,
- uint32_t& OutAcceptMagic,
- RpcAcceptOptions& OutAcceptFlags,
- int& OutTargetProcessId,
- CbPackage& OutPackage);
-
void HandleCacheRequest(HttpServerRequest& Request);
void HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view Namespace);
void HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Namespace, std::string_view Bucket);
virtual void HandleStatsRequest(HttpServerRequest& Request) override;
virtual void HandleStatusRequest(HttpServerRequest& Request) override;
- PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
-
- /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */
- bool ParseGetCacheChunksRequest(std::string& Namespace,
- std::vector<CacheKeyRequest>& RecordKeys,
- std::vector<cache::detail::RecordBody>& Records,
- std::vector<CacheChunkRequest>& RequestKeys,
- std::vector<cache::detail::ChunkRequest>& Requests,
- std::vector<cache::detail::ChunkRequest*>& RecordRequests,
- std::vector<cache::detail::ChunkRequest*>& ValueRequests,
- CbObjectView RpcRequest);
- /** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally. */
- void GetLocalCacheRecords(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::vector<CacheKeyRequest>& RecordKeys,
- std::vector<cache::detail::RecordBody>& Records,
- std::vector<cache::detail::ChunkRequest*>& RecordRequests,
- std::vector<CacheChunkRequest*>& OutUpstreamChunks);
- /** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */
- void GetLocalCacheValues(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::vector<cache::detail::ChunkRequest*>& ValueRequests,
- std::vector<CacheChunkRequest*>& OutUpstreamChunks);
- /** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */
- void GetUpstreamCacheChunks(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::vector<CacheChunkRequest*>& UpstreamChunks,
- std::vector<CacheChunkRequest>& RequestKeys,
- std::vector<cache::detail::ChunkRequest>& Requests);
- /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */
- CbPackage WriteGetCacheChunksResponse(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::vector<cache::detail::ChunkRequest>& Requests);
bool AreDiskWritesAllowed() const;
@@ -187,6 +123,7 @@ private:
CacheStats m_CacheStats;
const DiskWriteBlocker* m_DiskWriteBlocker = nullptr;
OpenProcessCache m_OpenProcessCache;
+ CacheRpcHandler m_RpcHandler;
void ReplayRequestRecorder(const CacheRequestContext& Context, cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount);
@@ -199,14 +136,6 @@ private:
std::unique_ptr<cache::IRpcRequestRecorder> m_RequestRecorder;
};
-/** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys.
- * We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers. */
-inline bool
-IsCompressedBinary(ZenContentType Type)
-{
- return Type == ZenContentType::kBinary || Type == ZenContentType::kCompressedBinary;
-}
-
void z$service_forcelink();
} // namespace zen