// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include #include #include #include #include #include #include #include namespace zen { struct CacheChunkRequest; struct CacheKeyRequest; class CidStore; class CbObjectView; struct PutRequestData; class ScrubContext; class UpstreamCache; class ZenCacheStore; class DiskWriteBlocker; enum class CachePolicy : uint32_t; enum class RpcAcceptOptions : uint16_t; namespace cache { class IRpcRequestReplayer; class IRpcRequestRecorder; namespace detail { struct RecordBody; struct ChunkRequest; } // namespace detail } // namespace cache /** * Structured cache service. Imposes constraints on keys, supports blobs and * structured values * * Keys are structured as: * * {BucketId}/{KeyHash} * * Where BucketId is a lower-case alphanumeric string, and KeyHash is a 40-character * hexadecimal sequence. The hash value may be derived in any number of ways, it's * up to the application to pick an approach. * * Values may be structured or unstructured. Structured values are encoded using Unreal * Engine's compact binary encoding (see CbObject) * * Additionally, attachments may be addressed as: * * {BucketId}/{KeyHash}/{ValueHash} * * Where the two initial components are the same as for the main endpoint * * The storage strategy is as follows: * * - Structured values are stored in a dedicated backing store per bucket * - Unstructured values and attachments are stored in the CAS pool * */ class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider, public IHttpStatusProvider { public: HttpStructuredCacheService(ZenCacheStore& InCacheStore, CidStore& InCidStore, HttpStatsService& StatsService, HttpStatusService& StatusService, UpstreamCache& UpstreamCache, const DiskWriteBlocker* InDiskWriteBlocker); ~HttpStructuredCacheService(); virtual const char* BaseUri() const override; virtual void HandleRequest(HttpServerRequest& Request) override; void Flush(); void ScrubStorage(ScrubContext& Ctx); private: struct CacheRef { std::string Namespace; std::string BucketSegment; IoHash HashKey; IoHash ValueContentId; }; struct CacheStats { std::atomic_uint64_t HitCount{}; std::atomic_uint64_t UpstreamHitCount{}; std::atomic_uint64_t MissCount{}; std::atomic_uint64_t WriteCount{}; std::atomic_uint64_t BadRequestCount{}; std::atomic_uint64_t RpcRequests{}; std::atomic_uint64_t RpcRecordRequests{}; std::atomic_uint64_t RpcRecordBatchRequests{}; std::atomic_uint64_t RpcValueRequests{}; std::atomic_uint64_t RpcValueBatchRequests{}; std::atomic_uint64_t RpcChunkRequests{}; std::atomic_uint64_t RpcChunkBatchRequests{}; }; enum class PutResult { Success, Fail, Invalid, }; void HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); void HandleGetCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); void HandlePutCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); void HandleCacheChunkRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); void HandleGetCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); void HandlePutCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); void HandleRpcRequest(HttpServerRequest& Request); void HandleDetailsRequest(HttpServerRequest& Request); CbPackage HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest); CbPackage HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView BatchRequest); CbPackage HandleRpcPutCacheValues(const CacheRequestContext& Context, const CbPackage& BatchRequest); CbPackage HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest); CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbObjectView BatchRequest); HttpResponseCode HandleRpcRequest(const CacheRequestContext& Context, const ZenContentType ContentType, IoBuffer&& Body, uint32_t& OutAcceptMagic, RpcAcceptOptions& OutAcceptFlags, int& OutTargetProcessId, CbPackage& OutPackage); void HandleCacheRequest(HttpServerRequest& Request); void HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view Namespace); void HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Namespace, std::string_view Bucket); virtual void HandleStatsRequest(HttpServerRequest& Request) override; virtual void HandleStatusRequest(HttpServerRequest& Request) override; PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */ bool ParseGetCacheChunksRequest(std::string& Namespace, std::vector& RecordKeys, std::vector& Records, std::vector& RequestKeys, std::vector& Requests, std::vector& RecordRequests, std::vector& ValueRequests, CbObjectView RpcRequest); /** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally. */ void GetLocalCacheRecords(const CacheRequestContext& Context, std::string_view Namespace, std::vector& RecordKeys, std::vector& Records, std::vector& RecordRequests, std::vector& OutUpstreamChunks); /** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */ void GetLocalCacheValues(const CacheRequestContext& Context, std::string_view Namespace, std::vector& ValueRequests, std::vector& OutUpstreamChunks); /** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */ void GetUpstreamCacheChunks(const CacheRequestContext& Context, std::string_view Namespace, std::vector& UpstreamChunks, std::vector& RequestKeys, std::vector& Requests); /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */ CbPackage WriteGetCacheChunksResponse(const CacheRequestContext& Context, std::string_view Namespace, std::vector& Requests); bool AreDiskWritesAllowed() const; LoggerRef Log() { return m_Log; } LoggerRef m_Log; ZenCacheStore& m_CacheStore; HttpStatsService& m_StatsService; HttpStatusService& m_StatusService; CidStore& m_CidStore; UpstreamCache& m_UpstreamCache; uint64_t m_LastScrubTime = 0; metrics::OperationTiming m_HttpRequests; metrics::OperationTiming m_UpstreamGetRequestTiming; CacheStats m_CacheStats; const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; OpenProcessCache m_OpenProcessCache; void ReplayRequestRecorder(const CacheRequestContext& Context, cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount); // This exists to avoid taking locks when recording is not enabled std::atomic_bool m_RequestRecordingEnabled{false}; // This lock should be taken in SHARED mode when calling into the recorder, // and taken in EXCLUSIVE mode whenever the recorder is created or destroyed RwLock m_RequestRecordingLock; std::unique_ptr m_RequestRecorder; }; /** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys. * We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers. */ inline bool IsCompressedBinary(ZenContentType Type) { return Type == ZenContentType::kBinary || Type == ZenContentType::kCompressedBinary; } void z$service_forcelink(); } // namespace zen