diff options
| author | Stefan Boberg <[email protected]> | 2023-12-20 16:03:35 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-12-20 16:03:35 +0100 |
| commit | 086558fd15f884cd29d1e6941a8576190c0b650d (patch) | |
| tree | 71e5e729be82d1825a228931d9c03376c659b5ca /src/zenstore/include | |
| parent | move cachedisklayer and structuredcachestore into zenstore (#624) (diff) | |
| download | zen-086558fd15f884cd29d1e6941a8576190c0b650d.tar.xz zen-086558fd15f884cd29d1e6941a8576190c0b650d.zip | |
separate RPC processing from HTTP processing (#626)
* moved all RPC processing from HttpStructuredCacheService into separate CacheRpcHandler class in zenstore
* move package marshaling to zenutil. was previously in zenhttp/httpshared but it's useful in other contexts as well where we don't want to depend on zenhttp
* introduced UpstreamCacheClient, this provides a subset of functions on UpstreamCache and lives in zenstore
Diffstat (limited to 'src/zenstore/include')
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cachedisklayer.h (renamed from src/zenstore/include/zenstore/cachedisklayer.h) | 0 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cacherpc.h | 155 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cacheshared.h (renamed from src/zenstore/include/zenstore/cacheshared.h) | 0 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/structuredcachestore.h (renamed from src/zenstore/include/zenstore/structuredcachestore.h) | 2 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/upstreamcacheclient.h | 119 |
5 files changed, 275 insertions, 1 deletions
diff --git a/src/zenstore/include/zenstore/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index 6997a12e4..6997a12e4 100644 --- a/src/zenstore/include/zenstore/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h diff --git a/src/zenstore/include/zenstore/cache/cacherpc.h b/src/zenstore/include/zenstore/cache/cacherpc.h new file mode 100644 index 000000000..c57e2818c --- /dev/null +++ b/src/zenstore/include/zenstore/cache/cacherpc.h @@ -0,0 +1,155 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/iobuffer.h> +#include <zencore/logging.h> +#include <zenutil/cache/cache.h> + +#include <atomic> +#include <string_view> +#include <vector> + +namespace zen { + +namespace cache { namespace detail { + struct RecordBody; + struct ChunkRequest; +}} // namespace cache::detail + +struct CacheChunkRequest; +struct CacheKeyRequest; +struct PutRequestData; + +class CbPackage; +class CbObjectView; +class CidStore; +class DiskWriteBlocker; +class HttpStructuredCacheService; +class UpstreamCacheClient; +class ZenCacheStore; + +enum class CachePolicy : uint32_t; +enum class RpcAcceptOptions : uint16_t; + +struct AttachmentCount +{ + uint32_t New = 0; + uint32_t Valid = 0; + uint32_t Invalid = 0; + uint32_t Total = 0; +}; + +struct CacheStats +{ + std::atomic_uint64_t HitCount{}; + std::atomic_uint64_t UpstreamHitCount{}; + std::atomic_uint64_t MissCount{}; + std::atomic_uint64_t WriteCount{}; + std::atomic_uint64_t BadRequestCount{}; + std::atomic_uint64_t RpcRequests{}; + std::atomic_uint64_t RpcRecordRequests{}; + std::atomic_uint64_t RpcRecordBatchRequests{}; + std::atomic_uint64_t RpcValueRequests{}; + std::atomic_uint64_t RpcValueBatchRequests{}; + std::atomic_uint64_t RpcChunkRequests{}; + std::atomic_uint64_t RpcChunkBatchRequests{}; +}; + +enum class PutResult +{ + Success, + Fail, + Invalid, +}; + +/** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys. + + We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers. + */ + +constexpr bool +IsCompressedBinary(ZenContentType Type) +{ + return Type == ZenContentType::kBinary || Type == ZenContentType::kCompressedBinary; +} + +struct CacheRpcHandler +{ + CacheRpcHandler(LoggerRef InLog, + CacheStats& InCacheStats, + UpstreamCacheClient& InUpstreamCache, + ZenCacheStore& InCacheStore, + CidStore& InCidStore, + const DiskWriteBlocker* InDiskWriteBlocker); + ~CacheRpcHandler(); + + enum class RpcResponseCode + { + InsufficientStorage = 507, + BadRequest = 400, + OK = 200 + }; + + RpcResponseCode HandleRpcRequest(const CacheRequestContext& Context, + const ZenContentType ContentType, + IoBuffer&& Body, + uint32_t& OutAcceptMagic, + RpcAcceptOptions& OutAcceptFlags, + int& OutTargetProcessId, + CbPackage& OutPackage); + +private: + CbPackage HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest); + CbPackage HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView BatchRequest); + CbPackage HandleRpcPutCacheValues(const CacheRequestContext& Context, const CbPackage& BatchRequest); + CbPackage HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest); + CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbObjectView BatchRequest); + + PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); + + /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */ + bool ParseGetCacheChunksRequest(std::string& Namespace, + std::vector<CacheKeyRequest>& RecordKeys, + std::vector<cache::detail::RecordBody>& Records, + std::vector<CacheChunkRequest>& RequestKeys, + std::vector<cache::detail::ChunkRequest>& Requests, + std::vector<cache::detail::ChunkRequest*>& RecordRequests, + std::vector<cache::detail::ChunkRequest*>& ValueRequests, + CbObjectView RpcRequest); + /** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally. + */ + void GetLocalCacheRecords(const CacheRequestContext& Context, + std::string_view Namespace, + std::vector<CacheKeyRequest>& RecordKeys, + std::vector<cache::detail::RecordBody>& Records, + std::vector<cache::detail::ChunkRequest*>& RecordRequests, + std::vector<CacheChunkRequest*>& OutUpstreamChunks); + /** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */ + void GetLocalCacheValues(const CacheRequestContext& Context, + std::string_view Namespace, + std::vector<cache::detail::ChunkRequest*>& ValueRequests, + std::vector<CacheChunkRequest*>& OutUpstreamChunks); + /** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */ + void GetUpstreamCacheChunks(const CacheRequestContext& Context, + std::string_view Namespace, + std::vector<CacheChunkRequest*>& UpstreamChunks, + std::vector<CacheChunkRequest>& RequestKeys, + std::vector<cache::detail::ChunkRequest>& Requests); + /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */ + CbPackage WriteGetCacheChunksResponse(const CacheRequestContext& Context, + std::string_view Namespace, + std::vector<cache::detail::ChunkRequest>& Requests); + + LoggerRef Log() { return m_Log; } + LoggerRef m_Log; + CacheStats& m_CacheStats; + UpstreamCacheClient& m_UpstreamCache; + ZenCacheStore& m_CacheStore; + CidStore& m_CidStore; + const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; + + bool AreDiskWritesAllowed() const; +}; + +} // namespace zen
\ No newline at end of file diff --git a/src/zenstore/include/zenstore/cacheshared.h b/src/zenstore/include/zenstore/cache/cacheshared.h index e3e8a2f84..e3e8a2f84 100644 --- a/src/zenstore/include/zenstore/cacheshared.h +++ b/src/zenstore/include/zenstore/cache/cacheshared.h diff --git a/src/zenstore/include/zenstore/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h index 56b2105a9..89d2abd11 100644 --- a/src/zenstore/include/zenstore/structuredcachestore.h +++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h @@ -5,7 +5,7 @@ #include <zencore/compactbinary.h> #include <zencore/iohash.h> #include <zencore/stats.h> -#include <zenstore/cachedisklayer.h> +#include <zenstore/cache/cachedisklayer.h> #include <zenstore/gc.h> #include <zenutil/cache/cache.h> #include <zenutil/statsreporter.h> diff --git a/src/zenstore/include/zenstore/cache/upstreamcacheclient.h b/src/zenstore/include/zenstore/cache/upstreamcacheclient.h new file mode 100644 index 000000000..152031c3a --- /dev/null +++ b/src/zenstore/include/zenstore/cache/upstreamcacheclient.h @@ -0,0 +1,119 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/compactbinary.h> +#include <zencore/compress.h> +#include <zencore/iobuffer.h> +#include <zencore/iohash.h> +#include <zencore/stats.h> +#include <zencore/zencore.h> +#include <zenutil/cache/cache.h> + +#include <functional> +#include <memory> +#include <string> +#include <vector> + +namespace zen { + +class CbObjectView; +class CbPackage; + +struct UpstreamCacheRecord +{ + ZenContentType Type = ZenContentType::kBinary; + std::string Namespace; + CacheKey Key; + std::vector<IoHash> ValueContentIds; + CacheRequestContext Context; +}; + +struct UpstreamError +{ + int32_t ErrorCode{}; + std::string Reason{}; + + explicit operator bool() const { return ErrorCode != 0; } +}; + +struct UpstreamEndpointInfo +{ + std::string Name; + std::string Url; +}; + +struct GetUpstreamCacheResult +{ + UpstreamError Error{}; + int64_t Bytes{}; + double ElapsedSeconds{}; + bool Success = false; +}; + +struct PutUpstreamCacheResult +{ + std::string Reason; + int64_t Bytes{}; + double ElapsedSeconds{}; + bool Success = false; +}; + +struct CacheRecordGetCompleteParams +{ + CacheKeyRequest& Request; + const CbObjectView& Record; + const CbPackage& Package; + double ElapsedSeconds{}; + const UpstreamEndpointInfo* Source = nullptr; +}; + +using OnCacheRecordGetComplete = std::function<void(CacheRecordGetCompleteParams&&)>; + +struct CacheValueGetCompleteParams +{ + CacheValueRequest& Request; + IoHash RawHash; + uint64_t RawSize; + IoBuffer Value; + double ElapsedSeconds{}; + const UpstreamEndpointInfo* Source = nullptr; +}; + +using OnCacheValueGetComplete = std::function<void(CacheValueGetCompleteParams&&)>; + +struct CacheChunkGetCompleteParams +{ + CacheChunkRequest& Request; + IoHash RawHash; + uint64_t RawSize; + IoBuffer Value; + double ElapsedSeconds{}; + const UpstreamEndpointInfo* Source = nullptr; +}; + +using OnCacheChunksGetComplete = std::function<void(CacheChunkGetCompleteParams&&)>; + +class UpstreamCacheClient +{ +public: + virtual ~UpstreamCacheClient() = default; + + virtual bool IsActive() = 0; + + virtual void GetCacheValues(std::string_view Namespace, + std::span<CacheValueRequest*> CacheValueRequests, + OnCacheValueGetComplete&& OnComplete) = 0; + + virtual void GetCacheRecords(std::string_view Namespace, + std::span<CacheKeyRequest*> Requests, + OnCacheRecordGetComplete&& OnComplete) = 0; + + virtual void GetCacheChunks(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheChunksGetComplete&& OnComplete) = 0; + + virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; +}; + +} // namespace zen |