diff options
| author | Stefan Boberg <[email protected]> | 2023-05-02 10:01:47 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-02 10:01:47 +0200 |
| commit | 075d17f8ada47e990fe94606c3d21df409223465 (patch) | |
| tree | e50549b766a2f3c354798a54ff73404217b4c9af /src/zenserver/cache/structuredcache.h | |
| parent | fix: bundle shouldn't append content zip to zen (diff) | |
| download | zen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz zen-075d17f8ada47e990fe94606c3d21df409223465.zip | |
moved source directories into `/src` (#264)
* moved source directories into `/src`
* updated bundle.lua for new `src` path
* moved some docs, icon
* removed old test trees
Diffstat (limited to 'src/zenserver/cache/structuredcache.h')
| -rw-r--r-- | src/zenserver/cache/structuredcache.h | 187 |
1 files changed, 187 insertions, 0 deletions
diff --git a/src/zenserver/cache/structuredcache.h b/src/zenserver/cache/structuredcache.h new file mode 100644 index 000000000..4e7b98ac9 --- /dev/null +++ b/src/zenserver/cache/structuredcache.h @@ -0,0 +1,187 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/stats.h> +#include <zenhttp/httpserver.h> + +#include "monitoring/httpstats.h" +#include "monitoring/httpstatus.h" + +#include <memory> +#include <vector> + +namespace spdlog { +class logger; +} + +namespace zen { + +struct CacheChunkRequest; +struct CacheKeyRequest; +class CidStore; +class CbObjectView; +struct PutRequestData; +class ScrubContext; +class UpstreamCache; +class ZenCacheStore; +enum class CachePolicy : uint32_t; +enum class RpcAcceptOptions : uint16_t; + +namespace cache { + class IRpcRequestReplayer; + class IRpcRequestRecorder; + namespace detail { + struct RecordBody; + struct ChunkRequest; + } // namespace detail +} // namespace cache + +/** + * Structured cache service. Imposes constraints on keys, supports blobs and + * structured values + * + * Keys are structured as: + * + * {BucketId}/{KeyHash} + * + * Where BucketId is a lower-case alphanumeric string, and KeyHash is a 40-character + * hexadecimal sequence. The hash value may be derived in any number of ways, it's + * up to the application to pick an approach. + * + * Values may be structured or unstructured. Structured values are encoded using Unreal + * Engine's compact binary encoding (see CbObject) + * + * Additionally, attachments may be addressed as: + * + * {BucketId}/{KeyHash}/{ValueHash} + * + * Where the two initial components are the same as for the main endpoint + * + * The storage strategy is as follows: + * + * - Structured values are stored in a dedicated backing store per bucket + * - Unstructured values and attachments are stored in the CAS pool + * + */ + +class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider, public IHttpStatusProvider +{ +public: + HttpStructuredCacheService(ZenCacheStore& InCacheStore, + CidStore& InCidStore, + HttpStatsService& StatsService, + HttpStatusService& StatusService, + UpstreamCache& UpstreamCache); + ~HttpStructuredCacheService(); + + virtual const char* BaseUri() const override; + virtual void HandleRequest(HttpServerRequest& Request) override; + + void Flush(); + void Scrub(ScrubContext& Ctx); + +private: + struct CacheRef + { + std::string Namespace; + std::string BucketSegment; + IoHash HashKey; + IoHash ValueContentId; + }; + + struct CacheStats + { + std::atomic_uint64_t HitCount{}; + std::atomic_uint64_t UpstreamHitCount{}; + std::atomic_uint64_t MissCount{}; + }; + enum class PutResult + { + Success, + Fail, + Invalid, + }; + + void HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); + void HandleGetCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); + void HandlePutCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); + void HandleCacheChunkRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); + void HandleGetCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); + void HandlePutCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); + void HandleRpcRequest(HttpServerRequest& Request); + void HandleDetailsRequest(HttpServerRequest& Request); + + CbPackage HandleRpcPutCacheRecords(const CbPackage& BatchRequest); + CbPackage HandleRpcGetCacheRecords(CbObjectView BatchRequest); + CbPackage HandleRpcPutCacheValues(const CbPackage& BatchRequest); + CbPackage HandleRpcGetCacheValues(CbObjectView BatchRequest); + CbPackage HandleRpcGetCacheChunks(CbObjectView BatchRequest); + CbPackage HandleRpcRequest(const ZenContentType ContentType, + IoBuffer&& Body, + uint32_t& OutAcceptMagic, + RpcAcceptOptions& OutAcceptFlags, + int& OutTargetProcessId); + + void HandleCacheRequest(HttpServerRequest& Request); + void HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view Namespace); + void HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Namespace, std::string_view Bucket); + virtual void HandleStatsRequest(HttpServerRequest& Request) override; + virtual void HandleStatusRequest(HttpServerRequest& Request) override; + PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); + + /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */ + bool ParseGetCacheChunksRequest(std::string& Namespace, + std::vector<CacheKeyRequest>& RecordKeys, + std::vector<cache::detail::RecordBody>& Records, + std::vector<CacheChunkRequest>& RequestKeys, + std::vector<cache::detail::ChunkRequest>& Requests, + std::vector<cache::detail::ChunkRequest*>& RecordRequests, + std::vector<cache::detail::ChunkRequest*>& ValueRequests, + CbObjectView RpcRequest); + /** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally. */ + void GetLocalCacheRecords(std::string_view Namespace, + std::vector<CacheKeyRequest>& RecordKeys, + std::vector<cache::detail::RecordBody>& Records, + std::vector<cache::detail::ChunkRequest*>& RecordRequests, + std::vector<CacheChunkRequest*>& OutUpstreamChunks); + /** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */ + void GetLocalCacheValues(std::string_view Namespace, + std::vector<cache::detail::ChunkRequest*>& ValueRequests, + std::vector<CacheChunkRequest*>& OutUpstreamChunks); + /** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */ + void GetUpstreamCacheChunks(std::string_view Namespace, + std::vector<CacheChunkRequest*>& UpstreamChunks, + std::vector<CacheChunkRequest>& RequestKeys, + std::vector<cache::detail::ChunkRequest>& Requests); + /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */ + CbPackage WriteGetCacheChunksResponse(std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests); + + spdlog::logger& Log() { return m_Log; } + spdlog::logger& m_Log; + ZenCacheStore& m_CacheStore; + HttpStatsService& m_StatsService; + HttpStatusService& m_StatusService; + CidStore& m_CidStore; + UpstreamCache& m_UpstreamCache; + uint64_t m_LastScrubTime = 0; + metrics::OperationTiming m_HttpRequests; + metrics::OperationTiming m_UpstreamGetRequestTiming; + CacheStats m_CacheStats; + + void ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount); + + std::unique_ptr<cache::IRpcRequestRecorder> m_RequestRecorder; +}; + +/** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys. + * We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers. */ +inline bool +IsCompressedBinary(ZenContentType Type) +{ + return Type == ZenContentType::kBinary || Type == ZenContentType::kCompressedBinary; +} + +void z$service_forcelink(); + +} // namespace zen |