aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/include
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-12-20 16:03:35 +0100
committerGitHub <[email protected]>2023-12-20 16:03:35 +0100
commit086558fd15f884cd29d1e6941a8576190c0b650d (patch)
tree71e5e729be82d1825a228931d9c03376c659b5ca /src/zenstore/include
parentmove cachedisklayer and structuredcachestore into zenstore (#624) (diff)
downloadzen-086558fd15f884cd29d1e6941a8576190c0b650d.tar.xz
zen-086558fd15f884cd29d1e6941a8576190c0b650d.zip
separate RPC processing from HTTP processing (#626)
* moved all RPC processing from HttpStructuredCacheService into separate CacheRpcHandler class in zenstore * move package marshaling to zenutil. was previously in zenhttp/httpshared but it's useful in other contexts as well where we don't want to depend on zenhttp * introduced UpstreamCacheClient, this provides a subset of functions on UpstreamCache and lives in zenstore
Diffstat (limited to 'src/zenstore/include')
-rw-r--r--src/zenstore/include/zenstore/cache/cachedisklayer.h (renamed from src/zenstore/include/zenstore/cachedisklayer.h)0
-rw-r--r--src/zenstore/include/zenstore/cache/cacherpc.h155
-rw-r--r--src/zenstore/include/zenstore/cache/cacheshared.h (renamed from src/zenstore/include/zenstore/cacheshared.h)0
-rw-r--r--src/zenstore/include/zenstore/cache/structuredcachestore.h (renamed from src/zenstore/include/zenstore/structuredcachestore.h)2
-rw-r--r--src/zenstore/include/zenstore/cache/upstreamcacheclient.h119
5 files changed, 275 insertions, 1 deletions
diff --git a/src/zenstore/include/zenstore/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h
index 6997a12e4..6997a12e4 100644
--- a/src/zenstore/include/zenstore/cachedisklayer.h
+++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h
diff --git a/src/zenstore/include/zenstore/cache/cacherpc.h b/src/zenstore/include/zenstore/cache/cacherpc.h
new file mode 100644
index 000000000..c57e2818c
--- /dev/null
+++ b/src/zenstore/include/zenstore/cache/cacherpc.h
@@ -0,0 +1,155 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/iobuffer.h>
+#include <zencore/logging.h>
+#include <zenutil/cache/cache.h>
+
+#include <atomic>
+#include <string_view>
+#include <vector>
+
+namespace zen {
+
+namespace cache { namespace detail {
+ struct RecordBody;
+ struct ChunkRequest;
+}} // namespace cache::detail
+
+struct CacheChunkRequest;
+struct CacheKeyRequest;
+struct PutRequestData;
+
+class CbPackage;
+class CbObjectView;
+class CidStore;
+class DiskWriteBlocker;
+class HttpStructuredCacheService;
+class UpstreamCacheClient;
+class ZenCacheStore;
+
+enum class CachePolicy : uint32_t;
+enum class RpcAcceptOptions : uint16_t;
+
+struct AttachmentCount
+{
+ uint32_t New = 0;
+ uint32_t Valid = 0;
+ uint32_t Invalid = 0;
+ uint32_t Total = 0;
+};
+
+struct CacheStats
+{
+ std::atomic_uint64_t HitCount{};
+ std::atomic_uint64_t UpstreamHitCount{};
+ std::atomic_uint64_t MissCount{};
+ std::atomic_uint64_t WriteCount{};
+ std::atomic_uint64_t BadRequestCount{};
+ std::atomic_uint64_t RpcRequests{};
+ std::atomic_uint64_t RpcRecordRequests{};
+ std::atomic_uint64_t RpcRecordBatchRequests{};
+ std::atomic_uint64_t RpcValueRequests{};
+ std::atomic_uint64_t RpcValueBatchRequests{};
+ std::atomic_uint64_t RpcChunkRequests{};
+ std::atomic_uint64_t RpcChunkBatchRequests{};
+};
+
+enum class PutResult
+{
+ Success,
+ Fail,
+ Invalid,
+};
+
+/** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys.
+
+ We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers.
+ */
+
+constexpr bool
+IsCompressedBinary(ZenContentType Type)
+{
+ return Type == ZenContentType::kBinary || Type == ZenContentType::kCompressedBinary;
+}
+
+struct CacheRpcHandler
+{
+ CacheRpcHandler(LoggerRef InLog,
+ CacheStats& InCacheStats,
+ UpstreamCacheClient& InUpstreamCache,
+ ZenCacheStore& InCacheStore,
+ CidStore& InCidStore,
+ const DiskWriteBlocker* InDiskWriteBlocker);
+ ~CacheRpcHandler();
+
+ enum class RpcResponseCode
+ {
+ InsufficientStorage = 507,
+ BadRequest = 400,
+ OK = 200
+ };
+
+ RpcResponseCode HandleRpcRequest(const CacheRequestContext& Context,
+ const ZenContentType ContentType,
+ IoBuffer&& Body,
+ uint32_t& OutAcceptMagic,
+ RpcAcceptOptions& OutAcceptFlags,
+ int& OutTargetProcessId,
+ CbPackage& OutPackage);
+
+private:
+ CbPackage HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest);
+ CbPackage HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView BatchRequest);
+ CbPackage HandleRpcPutCacheValues(const CacheRequestContext& Context, const CbPackage& BatchRequest);
+ CbPackage HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest);
+ CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbObjectView BatchRequest);
+
+ PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
+
+ /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */
+ bool ParseGetCacheChunksRequest(std::string& Namespace,
+ std::vector<CacheKeyRequest>& RecordKeys,
+ std::vector<cache::detail::RecordBody>& Records,
+ std::vector<CacheChunkRequest>& RequestKeys,
+ std::vector<cache::detail::ChunkRequest>& Requests,
+ std::vector<cache::detail::ChunkRequest*>& RecordRequests,
+ std::vector<cache::detail::ChunkRequest*>& ValueRequests,
+ CbObjectView RpcRequest);
+ /** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally.
+ */
+ void GetLocalCacheRecords(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::vector<CacheKeyRequest>& RecordKeys,
+ std::vector<cache::detail::RecordBody>& Records,
+ std::vector<cache::detail::ChunkRequest*>& RecordRequests,
+ std::vector<CacheChunkRequest*>& OutUpstreamChunks);
+ /** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */
+ void GetLocalCacheValues(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::vector<cache::detail::ChunkRequest*>& ValueRequests,
+ std::vector<CacheChunkRequest*>& OutUpstreamChunks);
+ /** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */
+ void GetUpstreamCacheChunks(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::vector<CacheChunkRequest*>& UpstreamChunks,
+ std::vector<CacheChunkRequest>& RequestKeys,
+ std::vector<cache::detail::ChunkRequest>& Requests);
+ /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */
+ CbPackage WriteGetCacheChunksResponse(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::vector<cache::detail::ChunkRequest>& Requests);
+
+ LoggerRef Log() { return m_Log; }
+ LoggerRef m_Log;
+ CacheStats& m_CacheStats;
+ UpstreamCacheClient& m_UpstreamCache;
+ ZenCacheStore& m_CacheStore;
+ CidStore& m_CidStore;
+ const DiskWriteBlocker* m_DiskWriteBlocker = nullptr;
+
+ bool AreDiskWritesAllowed() const;
+};
+
+} // namespace zen \ No newline at end of file
diff --git a/src/zenstore/include/zenstore/cacheshared.h b/src/zenstore/include/zenstore/cache/cacheshared.h
index e3e8a2f84..e3e8a2f84 100644
--- a/src/zenstore/include/zenstore/cacheshared.h
+++ b/src/zenstore/include/zenstore/cache/cacheshared.h
diff --git a/src/zenstore/include/zenstore/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h
index 56b2105a9..89d2abd11 100644
--- a/src/zenstore/include/zenstore/structuredcachestore.h
+++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h
@@ -5,7 +5,7 @@
#include <zencore/compactbinary.h>
#include <zencore/iohash.h>
#include <zencore/stats.h>
-#include <zenstore/cachedisklayer.h>
+#include <zenstore/cache/cachedisklayer.h>
#include <zenstore/gc.h>
#include <zenutil/cache/cache.h>
#include <zenutil/statsreporter.h>
diff --git a/src/zenstore/include/zenstore/cache/upstreamcacheclient.h b/src/zenstore/include/zenstore/cache/upstreamcacheclient.h
new file mode 100644
index 000000000..152031c3a
--- /dev/null
+++ b/src/zenstore/include/zenstore/cache/upstreamcacheclient.h
@@ -0,0 +1,119 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/compactbinary.h>
+#include <zencore/compress.h>
+#include <zencore/iobuffer.h>
+#include <zencore/iohash.h>
+#include <zencore/stats.h>
+#include <zencore/zencore.h>
+#include <zenutil/cache/cache.h>
+
+#include <functional>
+#include <memory>
+#include <string>
+#include <vector>
+
+namespace zen {
+
+class CbObjectView;
+class CbPackage;
+
+struct UpstreamCacheRecord
+{
+ ZenContentType Type = ZenContentType::kBinary;
+ std::string Namespace;
+ CacheKey Key;
+ std::vector<IoHash> ValueContentIds;
+ CacheRequestContext Context;
+};
+
+struct UpstreamError
+{
+ int32_t ErrorCode{};
+ std::string Reason{};
+
+ explicit operator bool() const { return ErrorCode != 0; }
+};
+
+struct UpstreamEndpointInfo
+{
+ std::string Name;
+ std::string Url;
+};
+
+struct GetUpstreamCacheResult
+{
+ UpstreamError Error{};
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ bool Success = false;
+};
+
+struct PutUpstreamCacheResult
+{
+ std::string Reason;
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ bool Success = false;
+};
+
+struct CacheRecordGetCompleteParams
+{
+ CacheKeyRequest& Request;
+ const CbObjectView& Record;
+ const CbPackage& Package;
+ double ElapsedSeconds{};
+ const UpstreamEndpointInfo* Source = nullptr;
+};
+
+using OnCacheRecordGetComplete = std::function<void(CacheRecordGetCompleteParams&&)>;
+
+struct CacheValueGetCompleteParams
+{
+ CacheValueRequest& Request;
+ IoHash RawHash;
+ uint64_t RawSize;
+ IoBuffer Value;
+ double ElapsedSeconds{};
+ const UpstreamEndpointInfo* Source = nullptr;
+};
+
+using OnCacheValueGetComplete = std::function<void(CacheValueGetCompleteParams&&)>;
+
+struct CacheChunkGetCompleteParams
+{
+ CacheChunkRequest& Request;
+ IoHash RawHash;
+ uint64_t RawSize;
+ IoBuffer Value;
+ double ElapsedSeconds{};
+ const UpstreamEndpointInfo* Source = nullptr;
+};
+
+using OnCacheChunksGetComplete = std::function<void(CacheChunkGetCompleteParams&&)>;
+
+class UpstreamCacheClient
+{
+public:
+ virtual ~UpstreamCacheClient() = default;
+
+ virtual bool IsActive() = 0;
+
+ virtual void GetCacheValues(std::string_view Namespace,
+ std::span<CacheValueRequest*> CacheValueRequests,
+ OnCacheValueGetComplete&& OnComplete) = 0;
+
+ virtual void GetCacheRecords(std::string_view Namespace,
+ std::span<CacheKeyRequest*> Requests,
+ OnCacheRecordGetComplete&& OnComplete) = 0;
+
+ virtual void GetCacheChunks(std::string_view Namespace,
+ std::span<CacheChunkRequest*> CacheChunkRequests,
+ OnCacheChunksGetComplete&& OnComplete) = 0;
+
+ virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0;
+};
+
+} // namespace zen