aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache/httpstructuredcache.h
blob: 2feaaead824830a4a7aa9c4dd062464a79ca8f0c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
// Copyright Epic Games, Inc. All Rights Reserved.

#pragma once

#include <zencore/stats.h>
#include <zenhttp/httpserver.h>
#include <zenhttp/httpstats.h>
#include <zenhttp/httpstatus.h>
#include <zenutil/cache/cache.h>
#include <zenutil/openprocesscache.h>

#include <memory>
#include <vector>

namespace zen {

struct CacheChunkRequest;
struct CacheKeyRequest;
class CidStore;
class CbObjectView;
struct PutRequestData;
class ScrubContext;
class UpstreamCache;
class ZenCacheStore;
class DiskWriteBlocker;
enum class CachePolicy : uint32_t;
enum class RpcAcceptOptions : uint16_t;

namespace cache {
	class IRpcRequestReplayer;
	class IRpcRequestRecorder;
	namespace detail {
		struct RecordBody;
		struct ChunkRequest;
	}  // namespace detail
}  // namespace cache

/**
 * Structured cache service. Imposes constraints on keys, supports blobs and
 * structured values
 *
 * Keys are structured as:
 *
 *   {BucketId}/{KeyHash}
 *
 * Where BucketId is a lower-case alphanumeric string, and KeyHash is a 40-character
 * hexadecimal sequence. The hash value may be derived in any number of ways, it's
 * up to the application to pick an approach.
 *
 * Values may be structured or unstructured. Structured values are encoded using Unreal
 * Engine's compact binary encoding (see CbObject)
 *
 * Additionally, attachments may be addressed as:
 *
 *   {BucketId}/{KeyHash}/{ValueHash}
 *
 * Where the two initial components are the same as for the main endpoint
 *
 * The storage strategy is as follows:
 *
 *  - Structured values are stored in a dedicated backing store per bucket
 *  - Unstructured values and attachments are stored in the CAS pool
 *
 */

class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider, public IHttpStatusProvider
{
public:
	HttpStructuredCacheService(ZenCacheStore&		   InCacheStore,
							   CidStore&			   InCidStore,
							   HttpStatsService&	   StatsService,
							   HttpStatusService&	   StatusService,
							   UpstreamCache&		   UpstreamCache,
							   const DiskWriteBlocker* InDiskWriteBlocker);
	~HttpStructuredCacheService();

	virtual const char* BaseUri() const override;
	virtual void		HandleRequest(HttpServerRequest& Request) override;

	void Flush();
	void ScrubStorage(ScrubContext& Ctx);

private:
	struct CacheRef
	{
		std::string Namespace;
		std::string BucketSegment;
		IoHash		HashKey;
		IoHash		ValueContentId;
	};

	struct CacheStats
	{
		std::atomic_uint64_t HitCount{};
		std::atomic_uint64_t UpstreamHitCount{};
		std::atomic_uint64_t MissCount{};
		std::atomic_uint64_t WriteCount{};
		std::atomic_uint64_t BadRequestCount{};
		std::atomic_uint64_t RpcRequests{};
		std::atomic_uint64_t RpcRecordRequests{};
		std::atomic_uint64_t RpcRecordBatchRequests{};
		std::atomic_uint64_t RpcValueRequests{};
		std::atomic_uint64_t RpcValueBatchRequests{};
		std::atomic_uint64_t RpcChunkRequests{};
		std::atomic_uint64_t RpcChunkBatchRequests{};
	};
	enum class PutResult
	{
		Success,
		Fail,
		Invalid,
	};

	void HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
	void HandleGetCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
	void HandlePutCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
	void HandleCacheChunkRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
	void HandleGetCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
	void HandlePutCacheChunk(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
	void HandleRpcRequest(HttpServerRequest& Request);
	void HandleDetailsRequest(HttpServerRequest& Request);

	CbPackage		 HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest);
	CbPackage		 HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView BatchRequest);
	CbPackage		 HandleRpcPutCacheValues(const CacheRequestContext& Context, const CbPackage& BatchRequest);
	CbPackage		 HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest);
	CbPackage		 HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbObjectView BatchRequest);
	HttpResponseCode HandleRpcRequest(const CacheRequestContext& Context,
									  const ZenContentType		 ContentType,
									  IoBuffer&&				 Body,
									  uint32_t&					 OutAcceptMagic,
									  RpcAcceptOptions&			 OutAcceptFlags,
									  int&						 OutTargetProcessId,
									  CbPackage&				 OutPackage);

	void		 HandleCacheRequest(HttpServerRequest& Request);
	void		 HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view Namespace);
	void		 HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Namespace, std::string_view Bucket);
	virtual void HandleStatsRequest(HttpServerRequest& Request) override;
	virtual void HandleStatusRequest(HttpServerRequest& Request) override;
	PutResult	 PutCacheRecord(PutRequestData& Request, const CbPackage* Package);

	/** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */
	bool ParseGetCacheChunksRequest(std::string&							   Namespace,
									std::vector<CacheKeyRequest>&			   RecordKeys,
									std::vector<cache::detail::RecordBody>&	   Records,
									std::vector<CacheChunkRequest>&			   RequestKeys,
									std::vector<cache::detail::ChunkRequest>&  Requests,
									std::vector<cache::detail::ChunkRequest*>& RecordRequests,
									std::vector<cache::detail::ChunkRequest*>& ValueRequests,
									CbObjectView							   RpcRequest);
	/** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally. */
	void GetLocalCacheRecords(const CacheRequestContext&				 Context,
							  std::string_view							 Namespace,
							  std::vector<CacheKeyRequest>&				 RecordKeys,
							  std::vector<cache::detail::RecordBody>&	 Records,
							  std::vector<cache::detail::ChunkRequest*>& RecordRequests,
							  std::vector<CacheChunkRequest*>&			 OutUpstreamChunks);
	/** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */
	void GetLocalCacheValues(const CacheRequestContext&					Context,
							 std::string_view							Namespace,
							 std::vector<cache::detail::ChunkRequest*>& ValueRequests,
							 std::vector<CacheChunkRequest*>&			OutUpstreamChunks);
	/** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */
	void GetUpstreamCacheChunks(const CacheRequestContext&				  Context,
								std::string_view						  Namespace,
								std::vector<CacheChunkRequest*>&		  UpstreamChunks,
								std::vector<CacheChunkRequest>&			  RequestKeys,
								std::vector<cache::detail::ChunkRequest>& Requests);
	/** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */
	CbPackage WriteGetCacheChunksResponse(const CacheRequestContext&				Context,
										  std::string_view							Namespace,
										  std::vector<cache::detail::ChunkRequest>& Requests);

	bool AreDiskWritesAllowed() const;

	LoggerRef				 Log() { return m_Log; }
	LoggerRef				 m_Log;
	ZenCacheStore&			 m_CacheStore;
	HttpStatsService&		 m_StatsService;
	HttpStatusService&		 m_StatusService;
	CidStore&				 m_CidStore;
	UpstreamCache&			 m_UpstreamCache;
	uint64_t				 m_LastScrubTime = 0;
	metrics::OperationTiming m_HttpRequests;
	metrics::OperationTiming m_UpstreamGetRequestTiming;
	CacheStats				 m_CacheStats;
	const DiskWriteBlocker*	 m_DiskWriteBlocker = nullptr;
	OpenProcessCache		 m_OpenProcessCache;

	void ReplayRequestRecorder(const CacheRequestContext& Context, cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount);

	// This exists to avoid taking locks when recording is not enabled
	std::atomic_bool m_RequestRecordingEnabled{false};

	// This lock should be taken in SHARED mode when calling into the recorder,
	// and taken in EXCLUSIVE mode whenever the recorder is created or destroyed
	RwLock										m_RequestRecordingLock;
	std::unique_ptr<cache::IRpcRequestRecorder> m_RequestRecorder;
};

/** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys.
 * We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers. */
inline bool
IsCompressedBinary(ZenContentType Type)
{
	return Type == ZenContentType::kBinary || Type == ZenContentType::kCompressedBinary;
}

void z$service_forcelink();

}  // namespace zen