aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache/structuredcache.h
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-02 10:01:47 +0200
committerGitHub <[email protected]>2023-05-02 10:01:47 +0200
commit075d17f8ada47e990fe94606c3d21df409223465 (patch)
treee50549b766a2f3c354798a54ff73404217b4c9af /src/zenserver/cache/structuredcache.h
parentfix: bundle shouldn't append content zip to zen (diff)
downloadzen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz
zen-075d17f8ada47e990fe94606c3d21df409223465.zip
moved source directories into `/src` (#264)
* moved source directories into `/src` * updated bundle.lua for new `src` path * moved some docs, icon * removed old test trees
Diffstat (limited to 'src/zenserver/cache/structuredcache.h')
-rw-r--r--src/zenserver/cache/structuredcache.h187
1 files changed, 187 insertions, 0 deletions
diff --git a/src/zenserver/cache/structuredcache.h b/src/zenserver/cache/structuredcache.h
new file mode 100644
index 000000000..4e7b98ac9
--- /dev/null
+++ b/src/zenserver/cache/structuredcache.h
@@ -0,0 +1,187 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/stats.h>
+#include <zenhttp/httpserver.h>
+
+#include "monitoring/httpstats.h"
+#include "monitoring/httpstatus.h"
+
+#include <memory>
+#include <vector>
+
+namespace spdlog {
+class logger;
+}
+
+namespace zen {
+
+struct CacheChunkRequest;
+struct CacheKeyRequest;
+class CidStore;
+class CbObjectView;
+struct PutRequestData;
+class ScrubContext;
+class UpstreamCache;
+class ZenCacheStore;
+enum class CachePolicy : uint32_t;
+enum class RpcAcceptOptions : uint16_t;
+
+namespace cache {
+ class IRpcRequestReplayer;
+ class IRpcRequestRecorder;
+ namespace detail {
+ struct RecordBody;
+ struct ChunkRequest;
+ } // namespace detail
+} // namespace cache
+
+/**
+ * Structured cache service. Imposes constraints on keys, supports blobs and
+ * structured values
+ *
+ * Keys are structured as:
+ *
+ * {BucketId}/{KeyHash}
+ *
+ * Where BucketId is a lower-case alphanumeric string, and KeyHash is a 40-character
+ * hexadecimal sequence. The hash value may be derived in any number of ways, it's
+ * up to the application to pick an approach.
+ *
+ * Values may be structured or unstructured. Structured values are encoded using Unreal
+ * Engine's compact binary encoding (see CbObject)
+ *
+ * Additionally, attachments may be addressed as:
+ *
+ * {BucketId}/{KeyHash}/{ValueHash}
+ *
+ * Where the two initial components are the same as for the main endpoint
+ *
+ * The storage strategy is as follows:
+ *
+ * - Structured values are stored in a dedicated backing store per bucket
+ * - Unstructured values and attachments are stored in the CAS pool
+ *
+ */
+
+class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider, public IHttpStatusProvider
+{
+public:
+ HttpStructuredCacheService(ZenCacheStore& InCacheStore,
+ CidStore& InCidStore,
+ HttpStatsService& StatsService,
+ HttpStatusService& StatusService,
+ UpstreamCache& UpstreamCache);
+ ~HttpStructuredCacheService();
+
+ virtual const char* BaseUri() const override;
+ virtual void HandleRequest(HttpServerRequest& Request) override;
+
+ void Flush();
+ void Scrub(ScrubContext& Ctx);
+
+private:
+ struct CacheRef
+ {
+ std::string Namespace;
+ std::string BucketSegment;
+ IoHash HashKey;
+ IoHash ValueContentId;
+ };
+
+ struct CacheStats
+ {
+ std::atomic_uint64_t HitCount{};
+ std::atomic_uint64_t UpstreamHitCount{};
+ std::atomic_uint64_t MissCount{};
+ };
+ enum class PutResult
+ {
+ Success,
+ Fail,
+ Invalid,
+ };
+
+ void HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
+ void HandleGetCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
+ void HandlePutCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
+ void HandleCacheChunkRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
+ void HandleGetCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
+ void HandlePutCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
+ void HandleRpcRequest(HttpServerRequest& Request);
+ void HandleDetailsRequest(HttpServerRequest& Request);
+
+ CbPackage HandleRpcPutCacheRecords(const CbPackage& BatchRequest);
+ CbPackage HandleRpcGetCacheRecords(CbObjectView BatchRequest);
+ CbPackage HandleRpcPutCacheValues(const CbPackage& BatchRequest);
+ CbPackage HandleRpcGetCacheValues(CbObjectView BatchRequest);
+ CbPackage HandleRpcGetCacheChunks(CbObjectView BatchRequest);
+ CbPackage HandleRpcRequest(const ZenContentType ContentType,
+ IoBuffer&& Body,
+ uint32_t& OutAcceptMagic,
+ RpcAcceptOptions& OutAcceptFlags,
+ int& OutTargetProcessId);
+
+ void HandleCacheRequest(HttpServerRequest& Request);
+ void HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view Namespace);
+ void HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Namespace, std::string_view Bucket);
+ virtual void HandleStatsRequest(HttpServerRequest& Request) override;
+ virtual void HandleStatusRequest(HttpServerRequest& Request) override;
+ PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
+
+ /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */
+ bool ParseGetCacheChunksRequest(std::string& Namespace,
+ std::vector<CacheKeyRequest>& RecordKeys,
+ std::vector<cache::detail::RecordBody>& Records,
+ std::vector<CacheChunkRequest>& RequestKeys,
+ std::vector<cache::detail::ChunkRequest>& Requests,
+ std::vector<cache::detail::ChunkRequest*>& RecordRequests,
+ std::vector<cache::detail::ChunkRequest*>& ValueRequests,
+ CbObjectView RpcRequest);
+ /** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally. */
+ void GetLocalCacheRecords(std::string_view Namespace,
+ std::vector<CacheKeyRequest>& RecordKeys,
+ std::vector<cache::detail::RecordBody>& Records,
+ std::vector<cache::detail::ChunkRequest*>& RecordRequests,
+ std::vector<CacheChunkRequest*>& OutUpstreamChunks);
+ /** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */
+ void GetLocalCacheValues(std::string_view Namespace,
+ std::vector<cache::detail::ChunkRequest*>& ValueRequests,
+ std::vector<CacheChunkRequest*>& OutUpstreamChunks);
+ /** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */
+ void GetUpstreamCacheChunks(std::string_view Namespace,
+ std::vector<CacheChunkRequest*>& UpstreamChunks,
+ std::vector<CacheChunkRequest>& RequestKeys,
+ std::vector<cache::detail::ChunkRequest>& Requests);
+ /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */
+ CbPackage WriteGetCacheChunksResponse(std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests);
+
+ spdlog::logger& Log() { return m_Log; }
+ spdlog::logger& m_Log;
+ ZenCacheStore& m_CacheStore;
+ HttpStatsService& m_StatsService;
+ HttpStatusService& m_StatusService;
+ CidStore& m_CidStore;
+ UpstreamCache& m_UpstreamCache;
+ uint64_t m_LastScrubTime = 0;
+ metrics::OperationTiming m_HttpRequests;
+ metrics::OperationTiming m_UpstreamGetRequestTiming;
+ CacheStats m_CacheStats;
+
+ void ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount);
+
+ std::unique_ptr<cache::IRpcRequestRecorder> m_RequestRecorder;
+};
+
+/** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys.
+ * We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers. */
+inline bool
+IsCompressedBinary(ZenContentType Type)
+{
+ return Type == ZenContentType::kBinary || Type == ZenContentType::kCompressedBinary;
+}
+
+void z$service_forcelink();
+
+} // namespace zen