aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-12-20 16:03:35 +0100
committerGitHub <[email protected]>2023-12-20 16:03:35 +0100
commit086558fd15f884cd29d1e6941a8576190c0b650d (patch)
tree71e5e729be82d1825a228931d9c03376c659b5ca /src
parentmove cachedisklayer and structuredcachestore into zenstore (#624) (diff)
downloadzen-086558fd15f884cd29d1e6941a8576190c0b650d.tar.xz
zen-086558fd15f884cd29d1e6941a8576190c0b650d.zip
separate RPC processing from HTTP processing (#626)
* moved all RPC processing from HttpStructuredCacheService into separate CacheRpcHandler class in zenstore * move package marshaling to zenutil. was previously in zenhttp/httpshared but it's useful in other contexts as well where we don't want to depend on zenhttp * introduced UpstreamCacheClient, this provides a subset of functions on UpstreamCache and lives in zenstore
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/print_cmd.cpp2
-rw-r--r--src/zen/cmds/rpcreplay_cmd.cpp2
-rw-r--r--src/zenhttp/httpclient.cpp2
-rw-r--r--src/zenhttp/httpserver.cpp2
-rw-r--r--src/zenhttp/servers/httpsys.cpp2
-rw-r--r--src/zenhttp/xmake.lua2
-rw-r--r--src/zenhttp/zenhttp.cpp3
-rw-r--r--src/zennet/zennet.cpp2
-rw-r--r--src/zenserver-test/zenserver-test.cpp2
-rw-r--r--src/zenserver/admin/admin.cpp2
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp1609
-rw-r--r--src/zenserver/cache/httpstructuredcache.h85
-rw-r--r--src/zenserver/projectstore/projectstore.cpp2
-rw-r--r--src/zenserver/projectstore/zenremoteprojectstore.cpp2
-rw-r--r--src/zenserver/upstream/upstreamcache.cpp6
-rw-r--r--src/zenserver/upstream/upstreamcache.h128
-rw-r--r--src/zenserver/upstream/zen.cpp4
-rw-r--r--src/zenserver/vfs/vfsimpl.cpp2
-rw-r--r--src/zenserver/zenserver.cpp2
-rw-r--r--src/zenserver/zenserver.h2
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp2
-rw-r--r--src/zenstore/cache/cacherpc.cpp1640
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp2
-rw-r--r--src/zenstore/include/zenstore/cache/cachedisklayer.h (renamed from src/zenstore/include/zenstore/cachedisklayer.h)0
-rw-r--r--src/zenstore/include/zenstore/cache/cacherpc.h155
-rw-r--r--src/zenstore/include/zenstore/cache/cacheshared.h (renamed from src/zenstore/include/zenstore/cacheshared.h)0
-rw-r--r--src/zenstore/include/zenstore/cache/structuredcachestore.h (renamed from src/zenstore/include/zenstore/structuredcachestore.h)2
-rw-r--r--src/zenstore/include/zenstore/cache/upstreamcacheclient.h119
-rw-r--r--src/zenutil/include/zenutil/packageformat.h (renamed from src/zenhttp/include/zenhttp/httpshared.h)2
-rw-r--r--src/zenutil/packageformat.cpp (renamed from src/zenhttp/httpshared.cpp)4
-rw-r--r--src/zenutil/zenutil.cpp2
31 files changed, 2002 insertions, 1789 deletions
diff --git a/src/zen/cmds/print_cmd.cpp b/src/zen/cmds/print_cmd.cpp
index ccd7f248e..fa42a3563 100644
--- a/src/zen/cmds/print_cmd.cpp
+++ b/src/zen/cmds/print_cmd.cpp
@@ -8,7 +8,7 @@
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/string.h>
-#include <zenhttp/httpshared.h>
+#include <zenutil/packageformat.h>
using namespace std::literals;
diff --git a/src/zen/cmds/rpcreplay_cmd.cpp b/src/zen/cmds/rpcreplay_cmd.cpp
index 53f45358e..d66adfb81 100644
--- a/src/zen/cmds/rpcreplay_cmd.cpp
+++ b/src/zen/cmds/rpcreplay_cmd.cpp
@@ -13,8 +13,8 @@
#include <zencore/timer.h>
#include <zencore/workthreadpool.h>
#include <zenhttp/httpcommon.h>
-#include <zenhttp/httpshared.h>
#include <zenutil/cache/rpcrecording.h>
+#include <zenutil/packageformat.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <cpr/cpr.h>
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index 1299d51c1..a29a08a3c 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -16,7 +16,7 @@
#include <zencore/testing.h>
#include <zencore/trace.h>
#include <zenhttp/formatters.h>
-#include <zenhttp/httpshared.h>
+#include <zenutil/packageformat.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <cpr/cpr.h>
diff --git a/src/zenhttp/httpserver.cpp b/src/zenhttp/httpserver.cpp
index 97d6a01fe..3270855ad 100644
--- a/src/zenhttp/httpserver.cpp
+++ b/src/zenhttp/httpserver.cpp
@@ -24,7 +24,7 @@
#include <zencore/string.h>
#include <zencore/testing.h>
#include <zencore/thread.h>
-#include <zenhttp/httpshared.h>
+#include <zenutil/packageformat.h>
#include <charconv>
#include <mutex>
diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp
index d2cb63cd7..5cd273c40 100644
--- a/src/zenhttp/servers/httpsys.cpp
+++ b/src/zenhttp/servers/httpsys.cpp
@@ -13,7 +13,7 @@
#include <zencore/string.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
-#include <zenhttp/httpshared.h>
+#include <zenutil/packageformat.h>
#if ZEN_WITH_HTTPSYS
# define _WINSOCKAPI_
diff --git a/src/zenhttp/xmake.lua b/src/zenhttp/xmake.lua
index b6ffbe467..8393f399b 100644
--- a/src/zenhttp/xmake.lua
+++ b/src/zenhttp/xmake.lua
@@ -7,7 +7,7 @@ target('zenhttp')
add_files("**.cpp")
add_files("servers/httpsys.cpp", {unity_ignored=true})
add_includedirs("include", {public=true})
- add_deps("zencore", "transport-sdk")
+ add_deps("zencore", "zenutil", "transport-sdk")
add_packages(
"vcpkg::asio",
"vcpkg::cpr",
diff --git a/src/zenhttp/zenhttp.cpp b/src/zenhttp/zenhttp.cpp
index 3e5b387b8..6b855c4db 100644
--- a/src/zenhttp/zenhttp.cpp
+++ b/src/zenhttp/zenhttp.cpp
@@ -6,7 +6,7 @@
# include <zenhttp/httpclient.h>
# include <zenhttp/httpserver.h>
-# include <zenhttp/httpshared.h>
+# include <zenutil/packageformat.h>
namespace zen {
@@ -15,7 +15,6 @@ zenhttp_forcelinktests()
{
http_forcelink();
httpclient_forcelink();
- forcelink_httpshared();
}
} // namespace zen
diff --git a/src/zennet/zennet.cpp b/src/zennet/zennet.cpp
index 30a012a3b..47157c3ed 100644
--- a/src/zennet/zennet.cpp
+++ b/src/zennet/zennet.cpp
@@ -12,7 +12,7 @@ void
zennet_forcelinktests()
{
#if ZEN_WITH_TESTS
- zen::statsd_forcelink();
+ statsd_forcelink();
#endif
}
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp
index 3efa57fdb..5dfbc6327 100644
--- a/src/zenserver-test/zenserver-test.cpp
+++ b/src/zenserver-test/zenserver-test.cpp
@@ -20,11 +20,11 @@
#include <zencore/timer.h>
#include <zencore/xxhash.h>
#include <zenhttp/httpclient.h>
-#include <zenhttp/httpshared.h>
#include <zenhttp/zenhttp.h>
#include <zenutil/cache/cache.h>
#include <zenutil/cache/cacherequests.h>
#include <zenutil/logging/testformatter.h>
+#include <zenutil/packageformat.h>
#include <zenutil/zenserverprocess.h>
#if ZEN_USE_MIMALLOC
diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp
index 36ea8c0f1..8b3f5a785 100644
--- a/src/zenserver/admin/admin.cpp
+++ b/src/zenserver/admin/admin.cpp
@@ -20,7 +20,7 @@
#include <zenstore/cidstore.h>
#include <zenstore/gc.h>
-#include <zenstore/structuredcachestore.h>
+#include <zenstore/cache/structuredcachestore.h>
#include "config.h"
#include "projectstore/projectstore.h"
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index f9cd7ae13..c62b5325e 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -16,12 +16,13 @@
#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zenhttp/httpserver.h>
-#include <zenhttp/httpshared.h>
#include <zenhttp/httpstats.h>
+#include <zenstore/cache/structuredcachestore.h>
#include <zenstore/gc.h>
-#include <zenstore/structuredcachestore.h>
#include <zenutil/cache/cache.h>
+#include <zenutil/cache/cacherequests.h>
#include <zenutil/cache/rpcrecording.h>
+#include <zenutil/packageformat.h>
#include "upstream/jupiter.h"
#include "upstream/upstreamcache.h"
@@ -56,30 +57,6 @@ ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams)
return !PolicyText.empty() ? zen::ParseCachePolicy(PolicyText) : CachePolicy::Default;
}
-CacheRecordPolicy
-LoadCacheRecordPolicy(CbObjectView Object, CachePolicy DefaultPolicy = CachePolicy::Default)
-{
- OptionalCacheRecordPolicy Policy = CacheRecordPolicy::Load(Object);
- return Policy ? std::move(Policy).Get() : CacheRecordPolicy(DefaultPolicy);
-}
-
-struct AttachmentCount
-{
- uint32_t New = 0;
- uint32_t Valid = 0;
- uint32_t Invalid = 0;
- uint32_t Total = 0;
-};
-
-struct PutRequestData
-{
- std::string Namespace;
- CacheKey Key;
- CbObjectView RecordObject;
- CacheRecordPolicy Policy;
- CacheRequestContext Context;
-};
-
namespace {
static constinit std::string_view HttpZCacheRPCPrefix = "$rpc"sv;
static constinit std::string_view HttpZCacheUtilStartRecording = "exec$/start-recording"sv;
@@ -87,14 +64,6 @@ namespace {
static constinit std::string_view HttpZCacheUtilReplayRecording = "exec$/replay-recording"sv;
static constinit std::string_view HttpZCacheDetailsPrefix = "details$"sv;
- struct HttpRequestData
- {
- std::optional<std::string> Namespace;
- std::optional<std::string> Bucket;
- std::optional<IoHash> HashKey;
- std::optional<IoHash> ValueContentId;
- };
-
constinit AsciiSet ValidNamespaceNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789-_.ABCDEFGHIJKLMNOPQRSTUVWXYZ"};
constinit AsciiSet ValidBucketNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"};
@@ -153,6 +122,14 @@ namespace {
return KeyHash;
}
+ struct HttpRequestData
+ {
+ std::optional<std::string> Namespace;
+ std::optional<std::string> Bucket;
+ std::optional<IoHash> HashKey;
+ std::optional<IoHash> ValueContentId;
+ };
+
bool HttpRequestParseRelativeUri(std::string_view Key, HttpRequestData& Data)
{
std::vector<std::string_view> Tokens;
@@ -264,55 +241,6 @@ namespace {
}
}
- std::optional<std::string> GetRpcRequestNamespace(const CbObjectView Params)
- {
- CbFieldView NamespaceField = Params["Namespace"sv];
- if (!NamespaceField)
- {
- return std::string(ZenCacheStore::DefaultNamespace);
- }
-
- if (NamespaceField.HasError())
- {
- return {};
- }
- if (!NamespaceField.IsString())
- {
- return {};
- }
- return GetValidNamespaceName(NamespaceField.AsString());
- }
-
- bool GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key)
- {
- CbFieldView BucketField = KeyView["Bucket"sv];
- if (BucketField.HasError())
- {
- return false;
- }
- if (!BucketField.IsString())
- {
- return false;
- }
- std::optional<std::string> Bucket = GetValidBucketName(BucketField.AsString());
- if (!Bucket.has_value())
- {
- return false;
- }
- CbFieldView HashField = KeyView["Hash"sv];
- if (HashField.HasError())
- {
- return false;
- }
- if (!HashField.IsHash())
- {
- return false;
- }
- IoHash Hash = HashField.AsHash();
- Key = CacheKey::Create(*Bucket, Hash);
- return true;
- }
-
} // namespace
//////////////////////////////////////////////////////////////////////////
@@ -330,6 +258,7 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach
, m_CidStore(InCidStore)
, m_UpstreamCache(UpstreamCache)
, m_DiskWriteBlocker(InDiskWriteBlocker)
+, m_RpcHandler(m_Log, m_CacheStats, UpstreamCache, InCacheStore, InCidStore, InDiskWriteBlocker)
{
m_StatsService.RegisterHandler("z$", *this);
m_StatusService.RegisterHandler("z$", *this);
@@ -915,14 +844,13 @@ HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request,
{
case HttpVerb::kHead:
case HttpVerb::kGet:
- {
- HandleGetCacheRecord(Request, Ref, PolicyFromUrl);
- }
+ HandleGetCacheRecord(Request, Ref, PolicyFromUrl);
break;
case HttpVerb::kPut:
HandlePutCacheRecord(Request, Ref, PolicyFromUrl);
break;
+
default:
break;
}
@@ -1644,74 +1572,6 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons
Request.WriteResponse(ResponseCode);
}
-HttpResponseCode
-HttpStructuredCacheService::HandleRpcRequest(const CacheRequestContext& Context,
- const ZenContentType ContentType,
- IoBuffer&& Body,
- uint32_t& OutAcceptMagic,
- RpcAcceptOptions& OutAcceptFlags,
- int& OutTargetProcessId,
- CbPackage& OutResultPackage)
-{
- ZEN_TRACE_CPU("Z$::HandleRpcRequest");
-
- m_CacheStats.RpcRequests.fetch_add(1);
-
- CbPackage Package;
- CbObjectView Object;
- CbObject ObjectBuffer;
- if (ContentType == ZenContentType::kCbObject)
- {
- ObjectBuffer = LoadCompactBinaryObject(std::move(Body));
- Object = ObjectBuffer;
- }
- else
- {
- Package = ParsePackageMessage(Body);
- Object = Package.GetObject();
- }
- OutAcceptMagic = Object["Accept"sv].AsUInt32();
- OutAcceptFlags = static_cast<RpcAcceptOptions>(Object["AcceptFlags"sv].AsUInt16(0u));
- OutTargetProcessId = Object["Pid"sv].AsInt32(0);
-
- const std::string_view Method = Object["Method"sv].AsString();
-
- if (Method == "PutCacheRecords"sv)
- {
- if (!AreDiskWritesAllowed())
- {
- return HttpResponseCode::InsufficientStorage;
- }
- OutResultPackage = HandleRpcPutCacheRecords(Context, Package);
- }
- else if (Method == "GetCacheRecords"sv)
- {
- OutResultPackage = HandleRpcGetCacheRecords(Context, Object);
- }
- else if (Method == "PutCacheValues"sv)
- {
- if (!AreDiskWritesAllowed())
- {
- return HttpResponseCode::InsufficientStorage;
- }
- OutResultPackage = HandleRpcPutCacheValues(Context, Package);
- }
- else if (Method == "GetCacheValues"sv)
- {
- OutResultPackage = HandleRpcGetCacheValues(Context, Object);
- }
- else if (Method == "GetCacheChunks"sv)
- {
- OutResultPackage = HandleRpcGetCacheChunks(Context, Object);
- }
- else
- {
- m_CacheStats.BadRequestCount++;
- return HttpResponseCode::BadRequest;
- }
- return HttpResponseCode::OK;
-}
-
void
HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Context,
cache::IRpcRequestReplayer& Replayer,
@@ -1735,13 +1595,13 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co
RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
int TargetPid = 0;
CbPackage RpcResult;
- if (IsHttpSuccessCode(HandleRpcRequest(Context,
- RequestInfo.ContentType,
- std::move(Body),
- AcceptMagic,
- AcceptFlags,
- TargetPid,
- RpcResult)))
+ if (m_RpcHandler.HandleRpcRequest(Context,
+ RequestInfo.ContentType,
+ std::move(Body),
+ AcceptMagic,
+ AcceptFlags,
+ TargetPid,
+ RpcResult) == CacheRpcHandler::RpcResponseCode::OK)
{
if (AcceptMagic == kCbPkgMagic)
{
@@ -1823,16 +1683,19 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
int TargetProcessId = 0;
CbPackage RpcResult;
- HttpResponseCode ResultCode = HandleRpcRequest(RequestContext,
- ContentType,
- std::move(Body),
- AcceptMagic,
- AcceptFlags,
- TargetProcessId,
- RpcResult);
- if (!IsHttpSuccessCode(ResultCode))
+ CacheRpcHandler::RpcResponseCode ResultCode = m_RpcHandler.HandleRpcRequest(RequestContext,
+ ContentType,
+ std::move(Body),
+ AcceptMagic,
+ AcceptFlags,
+ TargetProcessId,
+ RpcResult);
+
+ HttpResponseCode HttpResultCode = HttpResponseCode(int(ResultCode));
+
+ if (!IsHttpSuccessCode(HttpResultCode))
{
- return AsyncRequest.WriteResponse(ResultCode);
+ return AsyncRequest.WriteResponse(HttpResultCode);
}
if (AcceptMagic == kCbPkgMagic)
@@ -1900,1412 +1763,6 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
}
}
-CbPackage
-HttpStructuredCacheService::HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest)
-{
- ZEN_TRACE_CPU("Z$::RpcPutCacheRecords");
-
- CbObjectView BatchObject = BatchRequest.GetObject();
- ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv);
-
- CbObjectView Params = BatchObject["Params"sv].AsObjectView();
- CachePolicy DefaultPolicy;
-
- m_CacheStats.RpcRecordRequests.fetch_add(1);
-
- std::string_view PolicyText = Params["DefaultPolicy"].AsString();
- std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
- if (!Namespace)
- {
- return CbPackage{};
- }
- DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
-
- std::vector<bool> Results;
-
- CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- for (CbFieldView RequestField : RequestsArray)
- {
- m_CacheStats.RpcRecordBatchRequests.fetch_add(1);
- CbObjectView RequestObject = RequestField.AsObjectView();
- CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView();
- CbObjectView KeyView = RecordObject["Key"sv].AsObjectView();
-
- CacheKey Key;
- if (!GetRpcRequestCacheKey(KeyView, Key))
- {
- return CbPackage{};
- }
- CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
- PutRequestData PutRequest{.Namespace = *Namespace,
- .Key = std::move(Key),
- .RecordObject = RecordObject,
- .Policy = std::move(Policy),
- .Context = Context};
-
- PutResult Result = PutCacheRecord(PutRequest, &BatchRequest);
-
- if (Result == PutResult::Invalid)
- {
- return CbPackage{};
- }
- Results.push_back(Result == PutResult::Success);
- }
- if (Results.empty())
- {
- return CbPackage{};
- }
-
- CbObjectWriter ResponseObject;
- ResponseObject.BeginArray("Result"sv);
- for (bool Value : Results)
- {
- ResponseObject.AddBool(Value);
- }
- ResponseObject.EndArray();
-
- CbPackage RpcResponse;
- RpcResponse.SetObject(ResponseObject.Save());
- return RpcResponse;
-}
-
-HttpStructuredCacheService::PutResult
-HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPackage* Package)
-{
- CbObjectView Record = Request.RecordObject;
- uint64_t RecordObjectSize = Record.GetSize();
- uint64_t TransferredSize = RecordObjectSize;
-
- AttachmentCount Count;
- size_t NumAttachments = Package->GetAttachments().size();
- std::vector<IoHash> ValidAttachments;
- std::vector<IoHash> ReferencedAttachments;
- std::vector<const CbAttachment*> AttachmentsToStoreLocally;
- ValidAttachments.reserve(NumAttachments);
- AttachmentsToStoreLocally.reserve(NumAttachments);
-
- const bool HasUpstream = m_UpstreamCache.IsActive();
-
- Stopwatch Timer;
-
- Request.RecordObject.IterateAttachments(
- [this, &Request, Package, &AttachmentsToStoreLocally, &ValidAttachments, &ReferencedAttachments, &Count, &TransferredSize](
- CbFieldView HashView) {
- const IoHash ValueHash = HashView.AsHash();
- ReferencedAttachments.push_back(ValueHash);
- if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr)
- {
- if (Attachment->IsCompressedBinary())
- {
- AttachmentsToStoreLocally.emplace_back(Attachment);
- ValidAttachments.emplace_back(ValueHash);
- Count.Valid++;
- }
- else
- {
- ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
- Request.Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- ToString(HttpContentType::kCbPackage),
- ValueHash);
- Count.Invalid++;
- }
- }
- else if (m_CidStore.ContainsChunk(ValueHash))
- {
- ValidAttachments.emplace_back(ValueHash);
- Count.Valid++;
- }
- Count.Total++;
- });
-
- if (Count.Invalid > 0)
- {
- return PutResult::Invalid;
- }
-
- ZenCacheValue CacheValue;
- CacheValue.Value = IoBuffer(Record.GetSize());
- Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
- CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments);
- m_CacheStats.WriteCount++;
-
- for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
- {
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
- if (InsertResult.New)
- {
- Count.New++;
- }
- TransferredSize += Chunk.GetCompressedSize();
- }
-
- ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total) in {}",
- Request.Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- NiceBytes(TransferredSize),
- Count.New,
- Count.Valid,
- Count.Total,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
-
- const bool IsPartialRecord = Count.Valid != Count.Total;
-
- if (HasUpstream && EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord)
- {
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
- .Namespace = Request.Namespace,
- .Key = Request.Key,
- .ValueContentIds = std::move(ValidAttachments)});
- }
- return PutResult::Success;
-}
-
-CbPackage
-HttpStructuredCacheService::HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView RpcRequest)
-{
- ZEN_TRACE_CPU("Z$::RpcGetCacheRecords");
-
- ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv);
-
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
- m_CacheStats.RpcRecordRequests.fetch_add(1);
-
- struct ValueRequestData
- {
- Oid ValueId;
- IoHash ContentId;
- CompressedBuffer Payload;
- CachePolicy DownstreamPolicy;
- bool Exists = false;
- bool ReadFromUpstream = false;
- };
- struct RecordRequestData
- {
- CacheKeyRequest Upstream;
- CbObjectView RecordObject;
- IoBuffer RecordCacheValue;
- CacheRecordPolicy DownstreamPolicy;
- std::vector<ValueRequestData> Values;
- bool Complete = false;
- const UpstreamEndpointInfo* Source = nullptr;
- uint64_t ElapsedTimeUs;
- };
-
- std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
- CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
- std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
- if (!Namespace)
- {
- return CbPackage{};
- }
-
- const bool HasUpstream = m_UpstreamCache.IsActive();
-
- std::vector<RecordRequestData> Requests;
- std::vector<size_t> UpstreamIndexes;
-
- auto ParseValues = [](RecordRequestData& Request) {
- CbArrayView ValuesArray = Request.RecordObject["Values"sv].AsArrayView();
- Request.Values.reserve(ValuesArray.Num());
- for (CbFieldView ValueField : ValuesArray)
- {
- CbObjectView ValueObject = ValueField.AsObjectView();
- Oid ValueId = ValueObject["Id"sv].AsObjectId();
- CbFieldView RawHashField = ValueObject["RawHash"sv];
- IoHash RawHash = RawHashField.AsBinaryAttachment();
- if (ValueId && !RawHashField.HasError())
- {
- Request.Values.push_back({ValueId, RawHash});
- Request.Values.back().DownstreamPolicy = Request.DownstreamPolicy.GetValuePolicy(ValueId);
- }
- }
- };
-
- CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- for (CbFieldView RequestField : RequestsArray)
- {
- ZEN_TRACE_CPU("Z$::RpcGetCacheRecords::Request");
-
- m_CacheStats.RpcRecordBatchRequests.fetch_add(1);
-
- Stopwatch Timer;
- RecordRequestData& Request = Requests.emplace_back();
- CbObjectView RequestObject = RequestField.AsObjectView();
- CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
-
- CacheKey& Key = Request.Upstream.Key;
- if (!GetRpcRequestCacheKey(KeyObject, Key))
- {
- return CbPackage{};
- }
-
- Request.DownstreamPolicy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
- const CacheRecordPolicy& Policy = Request.DownstreamPolicy;
-
- ZenCacheValue CacheValue;
- bool NeedUpstreamAttachment = false;
- bool FoundLocalInvalid = false;
- ZenCacheValue RecordCacheValue;
-
- if (EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryLocal) &&
- m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, RecordCacheValue))
- {
- Request.RecordCacheValue = std::move(RecordCacheValue.Value);
- if (Request.RecordCacheValue.GetContentType() != ZenContentType::kCbObject)
- {
- FoundLocalInvalid = true;
- }
- else
- {
- Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData());
- ParseValues(Request);
-
- Request.Complete = true;
- for (ValueRequestData& Value : Request.Values)
- {
- CachePolicy ValuePolicy = Value.DownstreamPolicy;
- if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal))
- {
- // A value that is requested without the Query flag (such as None/Disable) counts as existing, because we
- // didn't ask for it and thus the record is complete in its absence.
- if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
- {
- Value.Exists = true;
- }
- else
- {
- NeedUpstreamAttachment = true;
- Value.ReadFromUpstream = true;
- Request.Complete = false;
- }
- }
- else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
- {
- if (m_CidStore.ContainsChunk(Value.ContentId))
- {
- Value.Exists = true;
- }
- else
- {
- if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
- {
- NeedUpstreamAttachment = true;
- Value.ReadFromUpstream = true;
- }
- Request.Complete = false;
- }
- }
- else
- {
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId))
- {
- if (Chunk.GetSize() > 0)
- {
- Value.Payload = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
- Value.Exists = true;
- continue;
- }
- else
- {
- ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId);
- }
- }
-
- if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
- {
- NeedUpstreamAttachment = true;
- Value.ReadFromUpstream = true;
- }
- Request.Complete = false;
- }
- }
- }
- }
- if (!Request.Complete)
- {
- bool NeedUpstreamRecord = HasUpstream && !Request.RecordObject && !FoundLocalInvalid &&
- EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryRemote);
- if (NeedUpstreamRecord || NeedUpstreamAttachment)
- {
- UpstreamIndexes.push_back(Requests.size() - 1);
- }
- }
- Request.ElapsedTimeUs = Timer.GetElapsedTimeUs();
- }
- if (Requests.empty())
- {
- return CbPackage{};
- }
-
- if (!UpstreamIndexes.empty())
- {
- std::vector<CacheKeyRequest*> UpstreamRequests;
- UpstreamRequests.reserve(UpstreamIndexes.size());
- for (size_t Index : UpstreamIndexes)
- {
- RecordRequestData& Request = Requests[Index];
- UpstreamRequests.push_back(&Request.Upstream);
-
- if (Request.Values.size())
- {
- // We will be returning the local object and know all the value Ids that exist in it
- // Convert all their Downstream Values to upstream values, and add SkipData to any ones that we already have.
- CachePolicy UpstreamBasePolicy = ConvertToUpstream(Request.DownstreamPolicy.GetBasePolicy()) | CachePolicy::SkipMeta;
- CacheRecordPolicyBuilder Builder(UpstreamBasePolicy);
- for (ValueRequestData& Value : Request.Values)
- {
- CachePolicy UpstreamPolicy = ConvertToUpstream(Value.DownstreamPolicy);
- UpstreamPolicy |= !Value.ReadFromUpstream ? CachePolicy::SkipData : CachePolicy::None;
- Builder.AddValuePolicy(Value.ValueId, UpstreamPolicy);
- }
- Request.Upstream.Policy = Builder.Build();
- }
- else
- {
- // We don't know which Values exist in the Record; ask the upstrem for all values that the client wants,
- // and convert the CacheRecordPolicy to an upstream policy
- Request.Upstream.Policy = Request.DownstreamPolicy.ConvertToUpstream();
- }
- }
-
- const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues, Context](CacheRecordGetCompleteParams&& Params) {
- if (!Params.Record)
- {
- return;
- }
-
- RecordRequestData& Request =
- *reinterpret_cast<RecordRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(RecordRequestData, Upstream));
- Request.ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
- const CacheKey& Key = Request.Upstream.Key;
- Stopwatch Timer;
- auto TimeGuard = MakeGuard([&Timer, &Request]() { Request.ElapsedTimeUs += Timer.GetElapsedTimeUs(); });
- if (!Request.RecordObject)
- {
- CbObject ObjectBuffer = CbObject::Clone(Params.Record);
- Request.RecordCacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
- Request.RecordCacheValue.SetContentType(ZenContentType::kCbObject);
- Request.RecordObject = ObjectBuffer;
- bool StoreLocal =
- EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed();
- if (StoreLocal)
- {
- std::vector<IoHash> ReferencedAttachments;
- ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
- const IoHash ValueHash = HashView.AsHash();
- ReferencedAttachments.push_back(ValueHash);
- });
- m_CacheStore
- .Put(Context, *Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}, ReferencedAttachments);
- m_CacheStats.WriteCount++;
- }
- ParseValues(Request);
- Request.Source = Params.Source;
- }
-
- Request.Complete = true;
- for (ValueRequestData& Value : Request.Values)
- {
- if (Value.Exists)
- {
- continue;
- }
- CachePolicy ValuePolicy = Value.DownstreamPolicy;
- if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
- {
- Request.Complete = false;
- continue;
- }
- if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData) || EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal))
- {
- bool StoreLocal = EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
- if (const CbAttachment* Attachment = Params.Package.FindAttachment(Value.ContentId))
- {
- if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
- {
- Request.Source = Params.Source;
- Value.Exists = true;
- if (StoreLocal)
- {
- m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
- }
- if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
- {
- Value.Payload = Compressed;
- }
- }
- else
- {
- ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}/{}'",
- Value.ContentId,
- *Namespace,
- Key.Bucket,
- Key.Hash);
- }
- }
- if (!Value.Exists && !EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
- {
- Request.Complete = false;
- }
- // Request.Complete does not need to be set to false for upstream SkipData attachments.
- // In the PartialRecord==false case, the upstream will have failed the entire record if any SkipData attachment
- // didn't exist and we will not get here. In the PartialRecord==true case, we do not need to inform the client of
- // any missing SkipData attachments.
- }
- Request.ElapsedTimeUs += Timer.GetElapsedTimeUs();
- }
- };
-
- m_UpstreamCache.GetCacheRecords(*Namespace, UpstreamRequests, std::move(OnCacheRecordGetComplete));
- }
-
- {
- ZEN_TRACE_CPU("Z$::RpcGetCacheRecords::Response");
- CbPackage ResponsePackage;
- CbObjectWriter ResponseObject;
-
- ResponseObject.BeginArray("Result"sv);
- for (RecordRequestData& Request : Requests)
- {
- const CacheKey& Key = Request.Upstream.Key;
- if (Request.Complete ||
- (Request.RecordObject && EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::PartialRecord)))
- {
- ResponseObject << Request.RecordObject;
- for (ValueRequestData& Value : Request.Values)
- {
- if (!EnumHasAllFlags(Value.DownstreamPolicy, CachePolicy::SkipData) && Value.Payload)
- {
- ResponsePackage.AddAttachment(CbAttachment(Value.Payload, Value.ContentId));
- }
- }
-
- ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {}{} ({}) in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- NiceBytes(Request.RecordCacheValue.Size()),
- Request.Complete ? ""sv : " (PARTIAL)"sv,
- Request.Source ? Request.Source->Url : "LOCAL"sv,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
- m_CacheStats.HitCount++;
- m_CacheStats.UpstreamHitCount += Request.Source ? 1 : 0;
- }
- else
- {
- ResponseObject.AddNull();
-
- if (!EnumHasAnyFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::Query))
- {
- // If they requested no query, do not record this as a miss
- ZEN_DEBUG("GETCACHERECORD DISABLEDQUERY - '{}/{}/{}' in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
- }
- else
- {
- ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}'{} ({}) in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- Request.RecordObject ? ""sv : " (PARTIAL)"sv,
- Request.Source ? Request.Source->Url : "LOCAL"sv,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
- m_CacheStats.MissCount++;
- }
- }
- }
- ResponseObject.EndArray();
- ResponsePackage.SetObject(ResponseObject.Save());
- return ResponsePackage;
- }
-}
-
-CbPackage
-HttpStructuredCacheService::HandleRpcPutCacheValues(const CacheRequestContext& Context, const CbPackage& BatchRequest)
-{
- ZEN_TRACE_CPU("Z$::RpcPutCacheValues");
- CbObjectView BatchObject = BatchRequest.GetObject();
- CbObjectView Params = BatchObject["Params"sv].AsObjectView();
-
- m_CacheStats.RpcValueRequests.fetch_add(1);
-
- std::string_view PolicyText = Params["DefaultPolicy"].AsString();
- CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
- std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
- if (!Namespace)
- {
- return CbPackage{};
- }
- const bool HasUpstream = m_UpstreamCache.IsActive();
- CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
-
- std::vector<bool> Results;
- for (CbFieldView RequestField : RequestsArray)
- {
- ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Request");
-
- m_CacheStats.RpcValueBatchRequests.fetch_add(1);
-
- Stopwatch Timer;
-
- CbObjectView RequestObject = RequestField.AsObjectView();
- CbObjectView KeyView = RequestObject["Key"sv].AsObjectView();
-
- CacheKey Key;
- if (!GetRpcRequestCacheKey(KeyView, Key))
- {
- return CbPackage{};
- }
-
- PolicyText = RequestObject["Policy"sv].AsString();
- CachePolicy Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
- IoHash RawHash = RequestObject["RawHash"sv].AsBinaryAttachment();
- uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64();
- bool Succeeded = false;
- uint64_t TransferredSize = 0;
-
- if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash))
- {
- if (Attachment->IsCompressedBinary())
- {
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
- {
- // TODO: Implement upstream puts of CacheValues with StoreLocal == false.
- // Currently ProcessCacheRecord requires that the value exist in the local cache to put it upstream.
- Policy |= CachePolicy::StoreLocal;
- }
-
- if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
- {
- IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
- Value.SetContentType(ZenContentType::kCompressedBinary);
- if (RawSize == 0)
- {
- RawSize = Chunk.DecodeRawSize();
- }
- m_CacheStore
- .Put(Context, *Namespace, Key.Bucket, Key.Hash, {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, {});
- m_CacheStats.WriteCount++;
- TransferredSize = Chunk.GetCompressedSize();
- }
- Succeeded = true;
- }
- else
- {
- ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash);
- return CbPackage{};
- }
- }
- else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
- {
- ZenCacheValue ExistingValue;
- if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) &&
- IsCompressedBinary(ExistingValue.Value.GetContentType()))
- {
- Succeeded = true;
- }
- }
- // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put.
- // If it doesn't exist locally they should do the heavy put rather than having us fetch it from upstream.
-
- if (HasUpstream && Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
- {
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = Key});
- }
- Results.push_back(Succeeded);
- ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}' in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- NiceBytes(TransferredSize),
- Succeeded ? "Added"sv : "Invalid",
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- }
- if (Results.empty())
- {
- return CbPackage{};
- }
-
- {
- ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Response");
- CbObjectWriter ResponseObject;
- ResponseObject.BeginArray("Result"sv);
- for (bool Value : Results)
- {
- ResponseObject.AddBool(Value);
- }
- ResponseObject.EndArray();
-
- CbPackage RpcResponse;
- RpcResponse.SetObject(ResponseObject.Save());
-
- return RpcResponse;
- }
-}
-
-CbPackage
-HttpStructuredCacheService::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView RpcRequest)
-{
- ZEN_TRACE_CPU("Z$::RpcGetCacheValues");
- ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv);
-
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
- m_CacheStats.RpcValueRequests.fetch_add(1);
-
- std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
- CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
- std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
- if (!Namespace)
- {
- return CbPackage{};
- }
-
- struct RequestData
- {
- CacheKey Key;
- CachePolicy Policy;
- IoHash RawHash = IoHash::Zero;
- uint64_t RawSize = 0;
- CompressedBuffer Result;
- };
- std::vector<RequestData> Requests;
-
- std::vector<size_t> RemoteRequestIndexes;
-
- const bool HasUpstream = m_UpstreamCache.IsActive();
-
- CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- for (CbFieldView RequestField : RequestsArray)
- {
- ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Request");
-
- m_CacheStats.RpcValueBatchRequests.fetch_add(1);
-
- Stopwatch Timer;
-
- RequestData& Request = Requests.emplace_back();
- CbObjectView RequestObject = RequestField.AsObjectView();
- CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
-
- if (!GetRpcRequestCacheKey(KeyObject, Request.Key))
- {
- return CbPackage{};
- }
-
- PolicyText = RequestObject["Policy"sv].AsString();
- Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
-
- CacheKey& Key = Request.Key;
- CachePolicy Policy = Request.Policy;
-
- ZenCacheValue CacheValue;
- if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
- {
- if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, CacheValue) &&
- IsCompressedBinary(CacheValue.Value.GetContentType()))
- {
- Request.RawHash = CacheValue.RawHash;
- Request.RawSize = CacheValue.RawSize;
- Request.Result = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value));
- }
- }
- if (Request.Result)
- {
- ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- NiceBytes(Request.Result.GetCompressed().GetSize()),
- "LOCAL"sv,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- m_CacheStats.HitCount++;
- }
- else if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::QueryRemote))
- {
- RemoteRequestIndexes.push_back(Requests.size() - 1);
- }
- else if (!EnumHasAnyFlags(Policy, CachePolicy::Query))
- {
- // If they requested no query, do not record this as a miss
- ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash);
- }
- else
- {
- ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- "LOCAL"sv,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- m_CacheStats.MissCount++;
- }
- }
-
- if (!RemoteRequestIndexes.empty())
- {
- std::vector<CacheValueRequest> RequestedRecordsData;
- std::vector<CacheValueRequest*> CacheValueRequests;
- RequestedRecordsData.reserve(RemoteRequestIndexes.size());
- CacheValueRequests.reserve(RemoteRequestIndexes.size());
- for (size_t Index : RemoteRequestIndexes)
- {
- RequestData& Request = Requests[Index];
- RequestedRecordsData.push_back({.Key = {Request.Key.Bucket, Request.Key.Hash}, .Policy = ConvertToUpstream(Request.Policy)});
- CacheValueRequests.push_back(&RequestedRecordsData.back());
- }
- Stopwatch Timer;
- m_UpstreamCache.GetCacheValues(
- *Namespace,
- CacheValueRequests,
- [this, Namespace, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer, Context](
- CacheValueGetCompleteParams&& Params) {
- CacheValueRequest& ChunkRequest = Params.Request;
- if (Params.RawHash != IoHash::Zero)
- {
- size_t RequestOffset = std::distance(RequestedRecordsData.data(), &ChunkRequest);
- size_t RequestIndex = RemoteRequestIndexes[RequestOffset];
- RequestData& Request = Requests[RequestIndex];
- Request.RawHash = Params.RawHash;
- Request.RawSize = Params.RawSize;
- const bool HasData = IsCompressedBinary(Params.Value.GetContentType());
- const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData);
- const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
- const bool IsHit = SkipData || HasData;
- if (IsHit)
- {
- if (HasData && !SkipData)
- {
- Request.Result = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Params.Value));
- }
-
- if (HasData && StoreData)
- {
- m_CacheStore.Put(Context,
- *Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash},
- {});
- m_CacheStats.WriteCount++;
- }
-
- ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
- *Namespace,
- ChunkRequest.Key.Bucket,
- ChunkRequest.Key.Hash,
- NiceBytes(Request.Result.GetCompressed().GetSize()),
- Params.Source ? Params.Source->Url : "UPSTREAM",
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- m_CacheStats.HitCount++;
- m_CacheStats.UpstreamHitCount++;
- return;
- }
- }
- ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}",
- *Namespace,
- ChunkRequest.Key.Bucket,
- ChunkRequest.Key.Hash,
- Params.Source ? Params.Source->Url : "UPSTREAM",
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- m_CacheStats.MissCount++;
- });
- }
-
- if (Requests.empty())
- {
- return CbPackage{};
- }
-
- {
- ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Response");
- CbPackage RpcResponse;
- CbObjectWriter ResponseObject;
- ResponseObject.BeginArray("Result"sv);
- for (const RequestData& Request : Requests)
- {
- ResponseObject.BeginObject();
- {
- const CompressedBuffer& Result = Request.Result;
- if (Result)
- {
- ResponseObject.AddHash("RawHash"sv, Request.RawHash);
- if (!EnumHasAllFlags(Request.Policy, CachePolicy::SkipData))
- {
- RpcResponse.AddAttachment(CbAttachment(Result, Request.RawHash));
- }
- else
- {
- ResponseObject.AddInteger("RawSize"sv, Request.RawSize);
- }
- }
- else if (Request.RawHash != IoHash::Zero)
- {
- ResponseObject.AddHash("RawHash"sv, Request.RawHash);
- ResponseObject.AddInteger("RawSize"sv, Request.RawSize);
- }
- }
- ResponseObject.EndObject();
- }
- ResponseObject.EndArray();
-
- RpcResponse.SetObject(ResponseObject.Save());
- return RpcResponse;
- }
-}
-
-namespace cache::detail {
-
- struct RecordValue
- {
- Oid ValueId;
- IoHash ContentId;
- uint64_t RawSize;
- };
- struct RecordBody
- {
- IoBuffer CacheValue;
- std::vector<RecordValue> Values;
- const UpstreamEndpointInfo* Source = nullptr;
- CachePolicy DownstreamPolicy;
- bool Exists = false;
- bool HasRequest = false;
- bool ValuesRead = false;
- };
- struct ChunkRequest
- {
- CacheChunkRequest* Key = nullptr;
- RecordBody* Record = nullptr;
- CompressedBuffer Value;
- const UpstreamEndpointInfo* Source = nullptr;
- uint64_t RawSize = 0;
- uint64_t RequestedSize = 0;
- uint64_t RequestedOffset = 0;
- CachePolicy DownstreamPolicy;
- bool Exists = false;
- bool RawSizeKnown = false;
- bool IsRecordRequest = false;
- uint64_t ElapsedTimeUs = 0;
- };
-
-} // namespace cache::detail
-
-CbPackage
-HttpStructuredCacheService::HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbObjectView RpcRequest)
-{
- ZEN_TRACE_CPU("Z$::RpcGetCacheChunks");
- using namespace cache::detail;
-
- std::string Namespace;
- std::vector<CacheKeyRequest> RecordKeys; // Data about a Record necessary to identify it to the upstream
- std::vector<RecordBody> Records; // Scratch-space data about a Record when fulfilling RecordRequests
- std::vector<CacheChunkRequest> RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream
- std::vector<ChunkRequest> Requests; // Intermediate and result data about a ChunkRequest
- std::vector<ChunkRequest*> RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key
- std::vector<ChunkRequest*> ValueRequests; // The ChunkRequests that are requesting a Value Key
- std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream
-
- // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests
- if (!ParseGetCacheChunksRequest(Namespace, RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest))
- {
- return CbPackage{};
- }
-
- // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we
- // have it locally, and otherwise append a request for the payload to UpstreamChunks
- GetLocalCacheRecords(Context, Namespace, RecordKeys, Records, RecordRequests, UpstreamChunks);
-
- // For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks
- GetLocalCacheValues(Context, Namespace, ValueRequests, UpstreamChunks);
-
- // Call GetCacheChunks on the upstream for any payloads we do not have locally
- GetUpstreamCacheChunks(Context, Namespace, UpstreamChunks, RequestKeys, Requests);
-
- // Send the payload and descriptive data about each chunk to the client
- return WriteGetCacheChunksResponse(Context, Namespace, Requests);
-}
-
-bool
-HttpStructuredCacheService::ParseGetCacheChunksRequest(std::string& Namespace,
- std::vector<CacheKeyRequest>& RecordKeys,
- std::vector<cache::detail::RecordBody>& Records,
- std::vector<CacheChunkRequest>& RequestKeys,
- std::vector<cache::detail::ChunkRequest>& Requests,
- std::vector<cache::detail::ChunkRequest*>& RecordRequests,
- std::vector<cache::detail::ChunkRequest*>& ValueRequests,
- CbObjectView RpcRequest)
-{
- ZEN_TRACE_CPU("Z$::ParseGetCacheChunksRequest");
-
- using namespace cache::detail;
-
- ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv);
-
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
- m_CacheStats.RpcChunkRequests.fetch_add(1);
-
- std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString();
- CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default;
-
- std::optional<std::string> NamespaceText = GetRpcRequestNamespace(Params);
- if (!NamespaceText)
- {
- ZEN_WARN("GetCacheChunks: Invalid namespace in ChunkRequest.");
- return false;
- }
- Namespace = *NamespaceText;
-
- CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView();
- size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num());
- // Note that these reservations allow us to take pointers to the elements while populating them. If the reservation is removed,
- // we will need to change the pointers to indexes to handle reallocations.
- RecordKeys.reserve(NumRequests);
- Records.reserve(NumRequests);
- RequestKeys.reserve(NumRequests);
- Requests.reserve(NumRequests);
- RecordRequests.reserve(NumRequests);
- ValueRequests.reserve(NumRequests);
-
- CacheKeyRequest* PreviousRecordKey = nullptr;
- RecordBody* PreviousRecord = nullptr;
-
- for (CbFieldView RequestView : ChunkRequestsArray)
- {
- ZEN_TRACE_CPU("Z$::ParseGetCacheChunksRequest::Request");
-
- m_CacheStats.RpcChunkBatchRequests.fetch_add(1);
-
- CbObjectView RequestObject = RequestView.AsObjectView();
- CacheChunkRequest& RequestKey = RequestKeys.emplace_back();
- ChunkRequest& Request = Requests.emplace_back();
- CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
-
- Request.Key = &RequestKey;
- if (!GetRpcRequestCacheKey(KeyObject, Request.Key->Key))
- {
- ZEN_WARN("GetCacheChunks: Invalid key in ChunkRequest.");
- return false;
- }
-
- RequestKey.ChunkId = RequestObject["ChunkId"sv].AsHash();
- RequestKey.ValueId = RequestObject["ValueId"sv].AsObjectId();
- RequestKey.RawOffset = RequestObject["RawOffset"sv].AsUInt64();
- RequestKey.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX);
- Request.RequestedSize = RequestKey.RawSize;
- Request.RequestedOffset = RequestKey.RawOffset;
- std::string_view PolicyText = RequestObject["Policy"sv].AsString();
- Request.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
- Request.IsRecordRequest = (bool)RequestKey.ValueId;
-
- if (!Request.IsRecordRequest)
- {
- ValueRequests.push_back(&Request);
- }
- else
- {
- RecordRequests.push_back(&Request);
- CacheKeyRequest* RecordKey = nullptr;
- RecordBody* Record = nullptr;
-
- if (!PreviousRecordKey || PreviousRecordKey->Key < RequestKey.Key)
- {
- RecordKey = &RecordKeys.emplace_back();
- PreviousRecordKey = RecordKey;
- Record = &Records.emplace_back();
- PreviousRecord = Record;
- RecordKey->Key = RequestKey.Key;
- }
- else if (RequestKey.Key == PreviousRecordKey->Key)
- {
- RecordKey = PreviousRecordKey;
- Record = PreviousRecord;
- }
- else
- {
- ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.",
- RequestKey.Key.Bucket,
- RequestKey.Key.Hash,
- PreviousRecordKey->Key.Bucket,
- PreviousRecordKey->Key.Hash);
- return false;
- }
- Request.Record = Record;
- if (RequestKey.ChunkId == RequestKey.ChunkId.Zero)
- {
- Record->DownstreamPolicy =
- Record->HasRequest ? Union(Record->DownstreamPolicy, Request.DownstreamPolicy) : Request.DownstreamPolicy;
- Record->HasRequest = true;
- }
- }
- }
- if (Requests.empty())
- {
- return false;
- }
- return true;
-}
-
-void
-HttpStructuredCacheService::GetLocalCacheRecords(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::vector<CacheKeyRequest>& RecordKeys,
- std::vector<cache::detail::RecordBody>& Records,
- std::vector<cache::detail::ChunkRequest*>& RecordRequests,
- std::vector<CacheChunkRequest*>& OutUpstreamChunks)
-{
- ZEN_TRACE_CPU("Z$::GetLocalCacheRecords");
-
- using namespace cache::detail;
- const bool HasUpstream = m_UpstreamCache.IsActive();
-
- std::vector<CacheKeyRequest*> UpstreamRecordRequests;
- for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex)
- {
- ZEN_TRACE_CPU("Z$::GetLocalCacheRecords::Record");
-
- Stopwatch Timer;
- CacheKeyRequest& RecordKey = RecordKeys[RecordIndex];
- RecordBody& Record = Records[RecordIndex];
- if (Record.HasRequest)
- {
- Record.DownstreamPolicy |= CachePolicy::SkipData | CachePolicy::SkipMeta;
-
- if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal))
- {
- ZenCacheValue CacheValue;
- if (m_CacheStore.Get(Context, Namespace, RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue))
- {
- Record.Exists = true;
- Record.CacheValue = std::move(CacheValue.Value);
- }
- }
- if (HasUpstream && !Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryRemote))
- {
- RecordKey.Policy = CacheRecordPolicy(ConvertToUpstream(Record.DownstreamPolicy));
- UpstreamRecordRequests.push_back(&RecordKey);
- }
- RecordRequests[RecordIndex]->ElapsedTimeUs += Timer.GetElapsedTimeUs();
- }
- }
-
- if (!UpstreamRecordRequests.empty())
- {
- const auto OnCacheRecordGetComplete =
- [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](CacheRecordGetCompleteParams&& Params) {
- if (!Params.Record)
- {
- return;
- }
- CacheKeyRequest& RecordKey = Params.Request;
- size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey);
- RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
- RecordBody& Record = Records[RecordIndex];
-
- const CacheKey& Key = RecordKey.Key;
- Record.Exists = true;
- CbObject ObjectBuffer = CbObject::Clone(Params.Record);
- Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
- Record.CacheValue.SetContentType(ZenContentType::kCbObject);
- Record.Source = Params.Source;
-
- bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
- if (StoreLocal)
- {
- std::vector<IoHash> ReferencedAttachments;
- ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
- const IoHash ValueHash = HashView.AsHash();
- ReferencedAttachments.push_back(ValueHash);
- });
- m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}, ReferencedAttachments);
- m_CacheStats.WriteCount++;
- }
- };
- m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete));
- }
-
- for (ChunkRequest* Request : RecordRequests)
- {
- ZEN_TRACE_CPU("Z$::GetLocalCacheRecords::Chunk");
-
- Stopwatch Timer;
- if (Request->Key->ChunkId == IoHash::Zero)
- {
- // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId)
- // is missing, parse the cache record and try to find the raw hash from the ValueId.
- RecordBody& Record = *Request->Record;
- if (!Record.ValuesRead)
- {
- Record.ValuesRead = true;
- if (Record.CacheValue && Record.CacheValue.GetContentType() == ZenContentType::kCbObject)
- {
- CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData());
- CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView();
- Record.Values.reserve(ValuesArray.Num());
- for (CbFieldView ValueField : ValuesArray)
- {
- CbObjectView ValueObject = ValueField.AsObjectView();
- Oid ValueId = ValueObject["Id"sv].AsObjectId();
- CbFieldView RawHashField = ValueObject["RawHash"sv];
- IoHash RawHash = RawHashField.AsBinaryAttachment();
- if (ValueId && !RawHashField.HasError())
- {
- Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()});
- }
- }
- }
- }
-
- for (const RecordValue& Value : Record.Values)
- {
- if (Value.ValueId == Request->Key->ValueId)
- {
- Request->Key->ChunkId = Value.ContentId;
- Request->RawSize = Value.RawSize;
- Request->RawSizeKnown = true;
- break;
- }
- }
- }
-
- // Now load the ContentId from the local ContentIdStore or from the upstream
- if (Request->Key->ChunkId != IoHash::Zero)
- {
- if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
- {
- if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown)
- {
- if (m_CidStore.ContainsChunk(Request->Key->ChunkId))
- {
- Request->Exists = true;
- }
- }
- else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId))
- {
- if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
- {
- Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(Payload));
- if (Request->Value)
- {
- Request->Exists = true;
- Request->RawSizeKnown = false;
- }
- }
- else
- {
- IoHash _;
- if (CompressedBuffer::ValidateCompressedHeader(Payload, _, Request->RawSize))
- {
- Request->Exists = true;
- Request->RawSizeKnown = true;
- }
- }
- }
- }
- if (HasUpstream && !Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
- {
- Request->Key->Policy = ConvertToUpstream(Request->DownstreamPolicy);
- OutUpstreamChunks.push_back(Request->Key);
- }
- }
- Request->ElapsedTimeUs += Timer.GetElapsedTimeUs();
- }
-}
-
-void
-HttpStructuredCacheService::GetLocalCacheValues(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::vector<cache::detail::ChunkRequest*>& ValueRequests,
- std::vector<CacheChunkRequest*>& OutUpstreamChunks)
-{
- ZEN_TRACE_CPU("Z$::GetLocalCacheValues");
-
- using namespace cache::detail;
- const bool HasUpstream = m_UpstreamCache.IsActive();
-
- for (ChunkRequest* Request : ValueRequests)
- {
- ZEN_TRACE_CPU("Z$::GetLocalCacheValues::Value");
-
- Stopwatch Timer;
- if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
- {
- ZenCacheValue CacheValue;
- if (m_CacheStore.Get(Context, Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue))
- {
- if (IsCompressedBinary(CacheValue.Value.GetContentType()))
- {
- Request->Key->ChunkId = CacheValue.RawHash;
- Request->Exists = true;
- Request->RawSize = CacheValue.RawSize;
- Request->RawSizeKnown = true;
- if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
- {
- Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value));
- }
- }
- }
- }
- if (HasUpstream && !Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
- {
- if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::StoreLocal))
- {
- // Convert the Offset,Size request into a request for the entire value; we will need it all to be able to store it locally
- Request->Key->RawOffset = 0;
- Request->Key->RawSize = UINT64_MAX;
- }
- OutUpstreamChunks.push_back(Request->Key);
- }
- Request->ElapsedTimeUs += Timer.GetElapsedTimeUs();
- }
-}
-
-void
-HttpStructuredCacheService::GetUpstreamCacheChunks(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::vector<CacheChunkRequest*>& UpstreamChunks,
- std::vector<CacheChunkRequest>& RequestKeys,
- std::vector<cache::detail::ChunkRequest>& Requests)
-{
- if (UpstreamChunks.empty())
- {
- return;
- }
- ZEN_TRACE_CPU("Z$::GetUpstreamCacheChunks");
-
- using namespace cache::detail;
-
- const auto OnCacheChunksGetComplete = [this, Namespace, &RequestKeys, &Requests, Context](CacheChunkGetCompleteParams&& Params) {
- if (Params.RawHash == Params.RawHash.Zero)
- {
- return;
- }
-
- CacheChunkRequest& Key = Params.Request;
- size_t RequestIndex = std::distance(RequestKeys.data(), &Key);
- ChunkRequest& Request = Requests[RequestIndex];
- Request.ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
- if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) ||
- !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
- {
- CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Params.Value));
- if (!Compressed)
- {
- return;
- }
-
- bool StoreLocal = EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
- if (StoreLocal)
- {
- if (Request.IsRecordRequest)
- {
- m_CidStore.AddChunk(Params.Value, Params.RawHash);
- }
- else
- {
- m_CacheStore.Put(Context,
- Namespace,
- Key.Key.Bucket,
- Key.Key.Hash,
- {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash},
- {});
- m_CacheStats.WriteCount++;
- }
- }
- if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
- {
- Request.Value = std::move(Compressed);
- }
- }
- Key.ChunkId = Params.RawHash;
- Request.Exists = true;
- Request.RawSize = Params.RawSize;
- Request.RawSizeKnown = true;
- Request.Source = Params.Source;
-
- m_CacheStats.UpstreamHitCount++;
- };
-
- m_UpstreamCache.GetCacheChunks(Namespace, UpstreamChunks, std::move(OnCacheChunksGetComplete));
-}
-
-CbPackage
-HttpStructuredCacheService::WriteGetCacheChunksResponse([[maybe_unused]] const CacheRequestContext& Context,
- std::string_view Namespace,
- std::vector<cache::detail::ChunkRequest>& Requests)
-{
- ZEN_TRACE_CPU("Z$::WriteGetCacheChunksResponse");
-
- using namespace cache::detail;
-
- CbPackage RpcResponse;
- CbObjectWriter Writer;
-
- Writer.BeginArray("Result"sv);
- for (ChunkRequest& Request : Requests)
- {
- ZEN_TRACE_CPU("Z$::WriteGetCacheChunksResponse::Request");
-
- Writer.BeginObject();
- {
- if (Request.Exists)
- {
- Writer.AddHash("RawHash"sv, Request.Key->ChunkId);
- if (Request.Value && !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
- {
- RpcResponse.AddAttachment(CbAttachment(Request.Value, Request.Key->ChunkId));
- }
- else
- {
- Writer.AddInteger("RawSize"sv, Request.RawSize);
- }
-
- ZEN_DEBUG("GETCACHECHUNKS HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}",
- Namespace,
- Request.Key->Key.Bucket,
- Request.Key->Key.Hash,
- Request.Key->ValueId,
- NiceBytes(Request.RawSize),
- Request.IsRecordRequest ? "Record"sv : "Value"sv,
- Request.Source ? Request.Source->Url : "LOCAL"sv,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
- m_CacheStats.HitCount++;
- }
- else if (!EnumHasAnyFlags(Request.DownstreamPolicy, CachePolicy::Query))
- {
- ZEN_DEBUG("GETCACHECHUNKS DISABLEDQUERY - '{}/{}/{}/{}' in {}",
- Namespace,
- Request.Key->Key.Bucket,
- Request.Key->Key.Hash,
- Request.Key->ValueId,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
- }
- else
- {
- ZEN_DEBUG("GETCACHECHUNKS MISS - '{}/{}/{}/{}' in {}",
- Namespace,
- Request.Key->Key.Bucket,
- Request.Key->Key.Hash,
- Request.Key->ValueId,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
- m_CacheStats.MissCount++;
- }
- }
- Writer.EndObject();
- }
- Writer.EndArray();
-
- RpcResponse.SetObject(Writer.Save());
- return RpcResponse;
-}
-
void
HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request)
{
diff --git a/src/zenserver/cache/httpstructuredcache.h b/src/zenserver/cache/httpstructuredcache.h
index 2feaaead8..da4bdd63c 100644
--- a/src/zenserver/cache/httpstructuredcache.h
+++ b/src/zenserver/cache/httpstructuredcache.h
@@ -6,6 +6,7 @@
#include <zenhttp/httpserver.h>
#include <zenhttp/httpstats.h>
#include <zenhttp/httpstatus.h>
+#include <zenstore/cache/cacherpc.h>
#include <zenutil/cache/cache.h>
#include <zenutil/openprocesscache.h>
@@ -16,13 +17,16 @@ namespace zen {
struct CacheChunkRequest;
struct CacheKeyRequest;
+struct PutRequestData;
+
class CidStore;
class CbObjectView;
-struct PutRequestData;
+class DiskWriteBlocker;
+class HttpStructuredCacheService;
class ScrubContext;
class UpstreamCache;
class ZenCacheStore;
-class DiskWriteBlocker;
+
enum class CachePolicy : uint32_t;
enum class RpcAcceptOptions : uint16_t;
@@ -89,28 +93,6 @@ private:
IoHash ValueContentId;
};
- struct CacheStats
- {
- std::atomic_uint64_t HitCount{};
- std::atomic_uint64_t UpstreamHitCount{};
- std::atomic_uint64_t MissCount{};
- std::atomic_uint64_t WriteCount{};
- std::atomic_uint64_t BadRequestCount{};
- std::atomic_uint64_t RpcRequests{};
- std::atomic_uint64_t RpcRecordRequests{};
- std::atomic_uint64_t RpcRecordBatchRequests{};
- std::atomic_uint64_t RpcValueRequests{};
- std::atomic_uint64_t RpcValueBatchRequests{};
- std::atomic_uint64_t RpcChunkRequests{};
- std::atomic_uint64_t RpcChunkBatchRequests{};
- };
- enum class PutResult
- {
- Success,
- Fail,
- Invalid,
- };
-
void HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
void HandleGetCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
void HandlePutCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
@@ -120,57 +102,11 @@ private:
void HandleRpcRequest(HttpServerRequest& Request);
void HandleDetailsRequest(HttpServerRequest& Request);
- CbPackage HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest);
- CbPackage HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView BatchRequest);
- CbPackage HandleRpcPutCacheValues(const CacheRequestContext& Context, const CbPackage& BatchRequest);
- CbPackage HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest);
- CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbObjectView BatchRequest);
- HttpResponseCode HandleRpcRequest(const CacheRequestContext& Context,
- const ZenContentType ContentType,
- IoBuffer&& Body,
- uint32_t& OutAcceptMagic,
- RpcAcceptOptions& OutAcceptFlags,
- int& OutTargetProcessId,
- CbPackage& OutPackage);
-
void HandleCacheRequest(HttpServerRequest& Request);
void HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view Namespace);
void HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Namespace, std::string_view Bucket);
virtual void HandleStatsRequest(HttpServerRequest& Request) override;
virtual void HandleStatusRequest(HttpServerRequest& Request) override;
- PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
-
- /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */
- bool ParseGetCacheChunksRequest(std::string& Namespace,
- std::vector<CacheKeyRequest>& RecordKeys,
- std::vector<cache::detail::RecordBody>& Records,
- std::vector<CacheChunkRequest>& RequestKeys,
- std::vector<cache::detail::ChunkRequest>& Requests,
- std::vector<cache::detail::ChunkRequest*>& RecordRequests,
- std::vector<cache::detail::ChunkRequest*>& ValueRequests,
- CbObjectView RpcRequest);
- /** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally. */
- void GetLocalCacheRecords(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::vector<CacheKeyRequest>& RecordKeys,
- std::vector<cache::detail::RecordBody>& Records,
- std::vector<cache::detail::ChunkRequest*>& RecordRequests,
- std::vector<CacheChunkRequest*>& OutUpstreamChunks);
- /** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */
- void GetLocalCacheValues(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::vector<cache::detail::ChunkRequest*>& ValueRequests,
- std::vector<CacheChunkRequest*>& OutUpstreamChunks);
- /** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */
- void GetUpstreamCacheChunks(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::vector<CacheChunkRequest*>& UpstreamChunks,
- std::vector<CacheChunkRequest>& RequestKeys,
- std::vector<cache::detail::ChunkRequest>& Requests);
- /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */
- CbPackage WriteGetCacheChunksResponse(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::vector<cache::detail::ChunkRequest>& Requests);
bool AreDiskWritesAllowed() const;
@@ -187,6 +123,7 @@ private:
CacheStats m_CacheStats;
const DiskWriteBlocker* m_DiskWriteBlocker = nullptr;
OpenProcessCache m_OpenProcessCache;
+ CacheRpcHandler m_RpcHandler;
void ReplayRequestRecorder(const CacheRequestContext& Context, cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount);
@@ -199,14 +136,6 @@ private:
std::unique_ptr<cache::IRpcRequestRecorder> m_RequestRecorder;
};
-/** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys.
- * We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers. */
-inline bool
-IsCompressedBinary(ZenContentType Type)
-{
- return Type == ZenContentType::kBinary || Type == ZenContentType::kCompressedBinary;
-}
-
void z$service_forcelink();
} // namespace zen
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index b7507bd17..038a6db47 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -15,11 +15,11 @@
#include <zencore/stream.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
-#include <zenhttp/httpshared.h>
#include <zenstore/caslog.h>
#include <zenstore/cidstore.h>
#include <zenstore/scrubcontext.h>
#include <zenutil/cache/rpcrecording.h>
+#include <zenutil/packageformat.h>
#include "fileremoteprojectstore.h"
#include "jupiterremoteprojectstore.h"
diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp
index 57a09e929..7823010b5 100644
--- a/src/zenserver/projectstore/zenremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp
@@ -9,7 +9,7 @@
#include <zencore/scopeguard.h>
#include <zencore/stream.h>
#include <zencore/timer.h>
-#include <zenhttp/httpshared.h>
+#include <zenutil/packageformat.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <cpr/cpr.h>
diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp
index 6dde0a701..dac29c273 100644
--- a/src/zenserver/upstream/upstreamcache.cpp
+++ b/src/zenserver/upstream/upstreamcache.cpp
@@ -16,10 +16,10 @@
#include <zencore/trace.h>
#include <zenhttp/auth/authmgr.h>
-#include <zenhttp/httpshared.h>
#include <zenstore/cidstore.h>
+#include <zenutil/packageformat.h>
-#include <zenstore/structuredcachestore.h>
+#include <zenstore/cache/structuredcachestore.h>
#include "cache/httpstructuredcache.h"
#include "diag/logging.h"
@@ -2129,7 +2129,7 @@ UpstreamEndpoint::CreateJupiterEndpoint(const CloudCacheClientOptions& Options,
}
std::unique_ptr<UpstreamCache>
-UpstreamCache::Create(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore)
+CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore)
{
return std::make_unique<UpstreamCacheImpl>(Options, CacheStore, CidStore);
}
diff --git a/src/zenserver/upstream/upstreamcache.h b/src/zenserver/upstream/upstreamcache.h
index 291e7e95e..bb0193e4e 100644
--- a/src/zenserver/upstream/upstreamcache.h
+++ b/src/zenserver/upstream/upstreamcache.h
@@ -8,6 +8,7 @@
#include <zencore/iohash.h>
#include <zencore/stats.h>
#include <zencore/zencore.h>
+#include <zenstore/cache/upstreamcacheclient.h>
#include <zenutil/cache/cache.h>
#include <atomic>
@@ -29,95 +30,6 @@ struct CloudCacheClientOptions;
class CloudCacheTokenProvider;
struct ZenStructuredCacheClientOptions;
-struct UpstreamCacheRecord
-{
- ZenContentType Type = ZenContentType::kBinary;
- std::string Namespace;
- CacheKey Key;
- std::vector<IoHash> ValueContentIds;
- CacheRequestContext Context;
-};
-
-struct UpstreamCacheOptions
-{
- std::chrono::seconds HealthCheckInterval{5};
- uint32_t ThreadCount = 4;
- bool ReadUpstream = true;
- bool WriteUpstream = true;
-};
-
-struct UpstreamError
-{
- int32_t ErrorCode{};
- std::string Reason{};
-
- explicit operator bool() const { return ErrorCode != 0; }
-};
-
-struct UpstreamEndpointInfo
-{
- std::string Name;
- std::string Url;
-};
-
-struct GetUpstreamCacheResult
-{
- UpstreamError Error{};
- int64_t Bytes{};
- double ElapsedSeconds{};
- bool Success = false;
-};
-
-struct GetUpstreamCacheSingleResult
-{
- GetUpstreamCacheResult Status;
- IoBuffer Value;
- const UpstreamEndpointInfo* Source = nullptr;
-};
-
-struct PutUpstreamCacheResult
-{
- std::string Reason;
- int64_t Bytes{};
- double ElapsedSeconds{};
- bool Success = false;
-};
-
-struct CacheRecordGetCompleteParams
-{
- CacheKeyRequest& Request;
- const CbObjectView& Record;
- const CbPackage& Package;
- double ElapsedSeconds{};
- const UpstreamEndpointInfo* Source = nullptr;
-};
-
-using OnCacheRecordGetComplete = std::function<void(CacheRecordGetCompleteParams&&)>;
-
-struct CacheValueGetCompleteParams
-{
- CacheValueRequest& Request;
- IoHash RawHash;
- uint64_t RawSize;
- IoBuffer Value;
- double ElapsedSeconds{};
- const UpstreamEndpointInfo* Source = nullptr;
-};
-
-using OnCacheValueGetComplete = std::function<void(CacheValueGetCompleteParams&&)>;
-
-struct CacheChunkGetCompleteParams
-{
- CacheChunkRequest& Request;
- IoHash RawHash;
- uint64_t RawSize;
- IoBuffer Value;
- double ElapsedSeconds{};
- const UpstreamEndpointInfo* Source = nullptr;
-};
-
-using OnCacheChunksGetComplete = std::function<void(CacheChunkGetCompleteParams&&)>;
-
struct UpstreamEndpointStats
{
metrics::OperationTiming CacheGetRequestTiming;
@@ -172,6 +84,13 @@ struct UpstreamEndpointStatus
UpstreamEndpointState State;
};
+struct GetUpstreamCacheSingleResult
+{
+ GetUpstreamCacheResult Status;
+ IoBuffer Value;
+ const UpstreamEndpointInfo* Source = nullptr;
+};
+
/**
* The upstream endpoint is responsible for handling upload/downloading of cache records.
*/
@@ -217,39 +136,32 @@ public:
/**
* Manages one or more upstream cache endpoints.
*/
-class UpstreamCache
+
+class UpstreamCache : public UpstreamCacheClient
{
public:
- virtual ~UpstreamCache() = default;
-
virtual void Initialize() = 0;
- virtual bool IsActive() = 0;
-
virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0;
virtual void IterateEndpoints(std::function<bool(UpstreamEndpoint&)>&& Fn) = 0;
virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0;
- virtual void GetCacheRecords(std::string_view Namespace,
- std::span<CacheKeyRequest*> Requests,
- OnCacheRecordGetComplete&& OnComplete) = 0;
-
- virtual void GetCacheValues(std::string_view Namespace,
- std::span<CacheValueRequest*> CacheValueRequests,
- OnCacheValueGetComplete&& OnComplete) = 0;
virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace,
const CacheKey& CacheKey,
- const IoHash& ValueContentId) = 0;
- virtual void GetCacheChunks(std::string_view Namespace,
- std::span<CacheChunkRequest*> CacheChunkRequests,
- OnCacheChunksGetComplete&& OnComplete) = 0;
-
- virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0;
+ const IoHash& ValueContentId) = 0;
virtual void GetStatus(CbObjectWriter& CbO) = 0;
+};
- static std::unique_ptr<UpstreamCache> Create(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore);
+struct UpstreamCacheOptions
+{
+ std::chrono::seconds HealthCheckInterval{5};
+ uint32_t ThreadCount = 4;
+ bool ReadUpstream = true;
+ bool WriteUpstream = true;
};
+std::unique_ptr<UpstreamCache> CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore);
+
} // namespace zen
diff --git a/src/zenserver/upstream/zen.cpp b/src/zenserver/upstream/zen.cpp
index afc3b8438..c031a4086 100644
--- a/src/zenserver/upstream/zen.cpp
+++ b/src/zenserver/upstream/zen.cpp
@@ -10,9 +10,9 @@
#include <zencore/stream.h>
#include <zenhttp/formatters.h>
#include <zenhttp/httpcommon.h>
-#include <zenhttp/httpshared.h>
+#include <zenutil/packageformat.h>
-#include <zenstore/structuredcachestore.h>
+#include <zenstore/cache/structuredcachestore.h>
#include "diag/logging.h"
ZEN_THIRD_PARTY_INCLUDES_START
diff --git a/src/zenserver/vfs/vfsimpl.cpp b/src/zenserver/vfs/vfsimpl.cpp
index 69b9bb356..f528b2620 100644
--- a/src/zenserver/vfs/vfsimpl.cpp
+++ b/src/zenserver/vfs/vfsimpl.cpp
@@ -7,7 +7,7 @@
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
-#include <zenstore/structuredcachestore.h>
+#include <zenstore/cache/structuredcachestore.h>
#include <zenvfs/projfs.h>
#include <zenvfs/vfs.h>
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index f80f95f8e..37b3f0531 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -510,7 +510,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount);
}
- m_UpstreamCache = UpstreamCache::Create(UpstreamOptions, *m_CacheStore, *m_CidStore);
+ m_UpstreamCache = CreateUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore);
m_UpstreamService = std::make_unique<HttpUpstreamService>(*m_UpstreamCache, *m_AuthMgr);
m_UpstreamCache->Initialize();
diff --git a/src/zenserver/zenserver.h b/src/zenserver/zenserver.h
index a53e16821..6ff13cfff 100644
--- a/src/zenserver/zenserver.h
+++ b/src/zenserver/zenserver.h
@@ -22,8 +22,8 @@ ZEN_THIRD_PARTY_INCLUDES_END
#include <zenhttp/httpstats.h>
#include <zenhttp/httpstatus.h>
#include <zenhttp/httptest.h>
+#include <zenstore/cache/structuredcachestore.h>
#include <zenstore/gc.h>
-#include <zenstore/structuredcachestore.h>
#include "admin/admin.h"
#include "cache/httpstructuredcache.h"
#include "diag/diagsvcs.h"
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 82f190caa..4d6b9f89e 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -1,6 +1,6 @@
// Copyright Epic Games, Inc. All Rights Reserved.
-#include "zenstore/cachedisklayer.h"
+#include "zenstore/cache/cachedisklayer.h"
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
new file mode 100644
index 000000000..96b344ee9
--- /dev/null
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -0,0 +1,1640 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zenstore/cache/cacherpc.h"
+
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/fmtutils.h>
+#include <zencore/scopeguard.h>
+#include <zencore/timer.h>
+#include <zencore/trace.h>
+#include <zenstore/cache/cacheshared.h>
+#include <zenstore/cache/structuredcachestore.h>
+#include <zenstore/cache/upstreamcacheclient.h>
+#include <zenstore/cidstore.h>
+#include <zenutil/packageformat.h>
+
+namespace zen { namespace {
+
+ constinit AsciiSet ValidNamespaceNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789-_.ABCDEFGHIJKLMNOPQRSTUVWXYZ"};
+ constinit AsciiSet ValidBucketNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"};
+
+ std::optional<std::string> GetValidNamespaceName(std::string_view Name)
+ {
+ if (Name.empty())
+ {
+ ZEN_WARN("Namespace is invalid, empty namespace is not allowed");
+ return {};
+ }
+
+ if (Name.length() > 64)
+ {
+ ZEN_WARN("Namespace '{}' is invalid, length exceeds 64 characters", Name);
+ return {};
+ }
+
+ if (!AsciiSet::HasOnly(Name, ValidNamespaceNameCharactersSet))
+ {
+ ZEN_WARN("Namespace '{}' is invalid, invalid characters detected", Name);
+ return {};
+ }
+
+ return ToLower(Name);
+ }
+
+ std::optional<std::string> GetValidBucketName(std::string_view Name)
+ {
+ if (Name.empty())
+ {
+ ZEN_WARN("Bucket name is invalid, empty bucket name is not allowed");
+ return {};
+ }
+
+ if (!AsciiSet::HasOnly(Name, ValidBucketNameCharactersSet))
+ {
+ ZEN_WARN("Bucket name '{}' is invalid, invalid characters detected", Name);
+ return {};
+ }
+
+ return ToLower(Name);
+ }
+
+}} // namespace zen::
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen {
+
+using namespace std::literals;
+
+std::optional<std::string>
+GetRpcRequestNamespace(const CbObjectView Params)
+{
+ CbFieldView NamespaceField = Params["Namespace"sv];
+ if (!NamespaceField)
+ {
+ return std::string(ZenCacheStore::DefaultNamespace);
+ }
+
+ if (NamespaceField.HasError())
+ {
+ return {};
+ }
+ if (!NamespaceField.IsString())
+ {
+ return {};
+ }
+ return GetValidNamespaceName(NamespaceField.AsString());
+}
+
+bool
+GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key)
+{
+ CbFieldView BucketField = KeyView["Bucket"sv];
+ if (BucketField.HasError())
+ {
+ return false;
+ }
+ if (!BucketField.IsString())
+ {
+ return false;
+ }
+ std::optional<std::string> Bucket = GetValidBucketName(BucketField.AsString());
+ if (!Bucket.has_value())
+ {
+ return false;
+ }
+ CbFieldView HashField = KeyView["Hash"sv];
+ if (HashField.HasError())
+ {
+ return false;
+ }
+ if (!HashField.IsHash())
+ {
+ return false;
+ }
+ IoHash Hash = HashField.AsHash();
+ Key = CacheKey::Create(*Bucket, Hash);
+ return true;
+}
+
+namespace cache::detail {
+
+ struct RecordValue
+ {
+ Oid ValueId;
+ IoHash ContentId;
+ uint64_t RawSize;
+ };
+
+ struct RecordBody
+ {
+ IoBuffer CacheValue;
+ std::vector<RecordValue> Values;
+ const UpstreamEndpointInfo* Source = nullptr;
+ CachePolicy DownstreamPolicy;
+ bool Exists = false;
+ bool HasRequest = false;
+ bool ValuesRead = false;
+ };
+
+ struct ChunkRequest
+ {
+ CacheChunkRequest* Key = nullptr;
+ RecordBody* Record = nullptr;
+ CompressedBuffer Value;
+ const UpstreamEndpointInfo* Source = nullptr;
+ uint64_t RawSize = 0;
+ uint64_t RequestedSize = 0;
+ uint64_t RequestedOffset = 0;
+ CachePolicy DownstreamPolicy;
+ bool Exists = false;
+ bool RawSizeKnown = false;
+ bool IsRecordRequest = false;
+ uint64_t ElapsedTimeUs = 0;
+ };
+
+} // namespace cache::detail
+
+struct PutRequestData
+{
+ std::string Namespace;
+ CacheKey Key;
+ CbObjectView RecordObject;
+ CacheRecordPolicy Policy;
+ CacheRequestContext Context;
+};
+
+CacheRecordPolicy
+LoadCacheRecordPolicy(CbObjectView Object, CachePolicy DefaultPolicy = CachePolicy::Default)
+{
+ OptionalCacheRecordPolicy Policy = CacheRecordPolicy::Load(Object);
+ return Policy ? std::move(Policy).Get() : CacheRecordPolicy(DefaultPolicy);
+}
+
+CacheRpcHandler::CacheRpcHandler(LoggerRef InLog,
+ CacheStats& InCacheStats,
+ UpstreamCacheClient& InUpstreamCache,
+ ZenCacheStore& InCacheStore,
+ CidStore& InCidStore,
+ const DiskWriteBlocker* InDiskWriteBlocker)
+: m_Log(InLog)
+, m_CacheStats(InCacheStats)
+, m_UpstreamCache(InUpstreamCache)
+, m_CacheStore(InCacheStore)
+, m_CidStore(InCidStore)
+, m_DiskWriteBlocker(InDiskWriteBlocker)
+{
+}
+
+CacheRpcHandler::~CacheRpcHandler()
+{
+}
+
+bool
+CacheRpcHandler::AreDiskWritesAllowed() const
+{
+ return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed());
+}
+
+CacheRpcHandler::RpcResponseCode
+CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context,
+ const ZenContentType ContentType,
+ IoBuffer&& Body,
+ uint32_t& OutAcceptMagic,
+ RpcAcceptOptions& OutAcceptFlags,
+ int& OutTargetProcessId,
+ CbPackage& OutResultPackage)
+{
+ ZEN_TRACE_CPU("Z$::HandleRpcRequest");
+
+ m_CacheStats.RpcRequests.fetch_add(1);
+
+ CbPackage Package;
+ CbObjectView Object;
+ CbObject ObjectBuffer;
+ if (ContentType == ZenContentType::kCbObject)
+ {
+ ObjectBuffer = LoadCompactBinaryObject(std::move(Body));
+ Object = ObjectBuffer;
+ }
+ else
+ {
+ Package = ParsePackageMessage(Body);
+ Object = Package.GetObject();
+ }
+ OutAcceptMagic = Object["Accept"sv].AsUInt32();
+ OutAcceptFlags = static_cast<RpcAcceptOptions>(Object["AcceptFlags"sv].AsUInt16(0u));
+ OutTargetProcessId = Object["Pid"sv].AsInt32(0);
+
+ const std::string_view Method = Object["Method"sv].AsString();
+
+ if (Method == "PutCacheRecords"sv)
+ {
+ if (!AreDiskWritesAllowed())
+ {
+ return RpcResponseCode::InsufficientStorage;
+ }
+ OutResultPackage = HandleRpcPutCacheRecords(Context, Package);
+ }
+ else if (Method == "GetCacheRecords"sv)
+ {
+ OutResultPackage = HandleRpcGetCacheRecords(Context, Object);
+ }
+ else if (Method == "PutCacheValues"sv)
+ {
+ if (!AreDiskWritesAllowed())
+ {
+ return RpcResponseCode::InsufficientStorage;
+ }
+ OutResultPackage = HandleRpcPutCacheValues(Context, Package);
+ }
+ else if (Method == "GetCacheValues"sv)
+ {
+ OutResultPackage = HandleRpcGetCacheValues(Context, Object);
+ }
+ else if (Method == "GetCacheChunks"sv)
+ {
+ OutResultPackage = HandleRpcGetCacheChunks(Context, Object);
+ }
+ else
+ {
+ m_CacheStats.BadRequestCount++;
+ return RpcResponseCode::BadRequest;
+ }
+ return RpcResponseCode::OK;
+}
+
+CbPackage
+CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest)
+{
+ ZEN_TRACE_CPU("Z$::RpcPutCacheRecords");
+
+ CbObjectView BatchObject = BatchRequest.GetObject();
+ ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv);
+
+ CbObjectView Params = BatchObject["Params"sv].AsObjectView();
+ CachePolicy DefaultPolicy;
+
+ m_CacheStats.RpcRecordRequests.fetch_add(1);
+
+ std::string_view PolicyText = Params["DefaultPolicy"].AsString();
+ std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
+ if (!Namespace)
+ {
+ return CbPackage{};
+ }
+ DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+
+ std::vector<bool> Results;
+
+ CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
+ for (CbFieldView RequestField : RequestsArray)
+ {
+ m_CacheStats.RpcRecordBatchRequests.fetch_add(1);
+ CbObjectView RequestObject = RequestField.AsObjectView();
+ CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView();
+ CbObjectView KeyView = RecordObject["Key"sv].AsObjectView();
+
+ CacheKey Key;
+ if (!GetRpcRequestCacheKey(KeyView, Key))
+ {
+ return CbPackage{};
+ }
+ CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
+ PutRequestData PutRequest{.Namespace = *Namespace,
+ .Key = std::move(Key),
+ .RecordObject = RecordObject,
+ .Policy = std::move(Policy),
+ .Context = Context};
+
+ PutResult Result = PutCacheRecord(PutRequest, &BatchRequest);
+
+ if (Result == PutResult::Invalid)
+ {
+ return CbPackage{};
+ }
+ Results.push_back(Result == PutResult::Success);
+ }
+ if (Results.empty())
+ {
+ return CbPackage{};
+ }
+
+ CbObjectWriter ResponseObject;
+ ResponseObject.BeginArray("Result"sv);
+ for (bool Value : Results)
+ {
+ ResponseObject.AddBool(Value);
+ }
+ ResponseObject.EndArray();
+
+ CbPackage RpcResponse;
+ RpcResponse.SetObject(ResponseObject.Save());
+ return RpcResponse;
+}
+
+PutResult
+CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Package)
+{
+ CbObjectView Record = Request.RecordObject;
+ uint64_t RecordObjectSize = Record.GetSize();
+ uint64_t TransferredSize = RecordObjectSize;
+
+ AttachmentCount Count;
+ size_t NumAttachments = Package->GetAttachments().size();
+ std::vector<IoHash> ValidAttachments;
+ std::vector<IoHash> ReferencedAttachments;
+ std::vector<const CbAttachment*> AttachmentsToStoreLocally;
+ ValidAttachments.reserve(NumAttachments);
+ AttachmentsToStoreLocally.reserve(NumAttachments);
+
+ const bool HasUpstream = m_UpstreamCache.IsActive();
+
+ Stopwatch Timer;
+
+ Request.RecordObject.IterateAttachments(
+ [this, &Request, Package, &AttachmentsToStoreLocally, &ValidAttachments, &ReferencedAttachments, &Count, &TransferredSize](
+ CbFieldView HashView) {
+ const IoHash ValueHash = HashView.AsHash();
+ ReferencedAttachments.push_back(ValueHash);
+ if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr)
+ {
+ if (Attachment->IsCompressedBinary())
+ {
+ AttachmentsToStoreLocally.emplace_back(Attachment);
+ ValidAttachments.emplace_back(ValueHash);
+ Count.Valid++;
+ }
+ else
+ {
+ ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
+ Request.Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ ToString(ZenContentType::kCbPackage),
+ ValueHash);
+ Count.Invalid++;
+ }
+ }
+ else if (m_CidStore.ContainsChunk(ValueHash))
+ {
+ ValidAttachments.emplace_back(ValueHash);
+ Count.Valid++;
+ }
+ Count.Total++;
+ });
+
+ if (Count.Invalid > 0)
+ {
+ return PutResult::Invalid;
+ }
+
+ ZenCacheValue CacheValue;
+ CacheValue.Value = IoBuffer(Record.GetSize());
+ Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
+ CacheValue.Value.SetContentType(ZenContentType::kCbObject);
+ m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments);
+ m_CacheStats.WriteCount++;
+
+ for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
+ {
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
+ if (InsertResult.New)
+ {
+ Count.New++;
+ }
+ TransferredSize += Chunk.GetCompressedSize();
+ }
+
+ ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total) in {}",
+ Request.Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ NiceBytes(TransferredSize),
+ Count.New,
+ Count.Valid,
+ Count.Total,
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+
+ const bool IsPartialRecord = Count.Valid != Count.Total;
+
+ if (HasUpstream && EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord)
+ {
+ m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
+ .Namespace = Request.Namespace,
+ .Key = Request.Key,
+ .ValueContentIds = std::move(ValidAttachments)});
+ }
+ return PutResult::Success;
+}
+
+CbPackage
+CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView RpcRequest)
+{
+ ZEN_TRACE_CPU("Z$::RpcGetCacheRecords");
+
+ ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv);
+
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ m_CacheStats.RpcRecordRequests.fetch_add(1);
+
+ struct ValueRequestData
+ {
+ Oid ValueId;
+ IoHash ContentId;
+ CompressedBuffer Payload;
+ CachePolicy DownstreamPolicy;
+ bool Exists = false;
+ bool ReadFromUpstream = false;
+ };
+ struct RecordRequestData
+ {
+ CacheKeyRequest Upstream;
+ CbObjectView RecordObject;
+ IoBuffer RecordCacheValue;
+ CacheRecordPolicy DownstreamPolicy;
+ std::vector<ValueRequestData> Values;
+ bool Complete = false;
+ const UpstreamEndpointInfo* Source = nullptr;
+ uint64_t ElapsedTimeUs;
+ };
+
+ std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
+ CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
+ if (!Namespace)
+ {
+ return CbPackage{};
+ }
+
+ const bool HasUpstream = m_UpstreamCache.IsActive();
+
+ std::vector<RecordRequestData> Requests;
+ std::vector<size_t> UpstreamIndexes;
+
+ auto ParseValues = [](RecordRequestData& Request) {
+ CbArrayView ValuesArray = Request.RecordObject["Values"sv].AsArrayView();
+ Request.Values.reserve(ValuesArray.Num());
+ for (CbFieldView ValueField : ValuesArray)
+ {
+ CbObjectView ValueObject = ValueField.AsObjectView();
+ Oid ValueId = ValueObject["Id"sv].AsObjectId();
+ CbFieldView RawHashField = ValueObject["RawHash"sv];
+ IoHash RawHash = RawHashField.AsBinaryAttachment();
+ if (ValueId && !RawHashField.HasError())
+ {
+ Request.Values.push_back({ValueId, RawHash});
+ Request.Values.back().DownstreamPolicy = Request.DownstreamPolicy.GetValuePolicy(ValueId);
+ }
+ }
+ };
+
+ CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
+ for (CbFieldView RequestField : RequestsArray)
+ {
+ ZEN_TRACE_CPU("Z$::RpcGetCacheRecords::Request");
+
+ m_CacheStats.RpcRecordBatchRequests.fetch_add(1);
+
+ Stopwatch Timer;
+ RecordRequestData& Request = Requests.emplace_back();
+ CbObjectView RequestObject = RequestField.AsObjectView();
+ CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
+
+ CacheKey& Key = Request.Upstream.Key;
+ if (!GetRpcRequestCacheKey(KeyObject, Key))
+ {
+ return CbPackage{};
+ }
+
+ Request.DownstreamPolicy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
+ const CacheRecordPolicy& Policy = Request.DownstreamPolicy;
+
+ ZenCacheValue CacheValue;
+ bool NeedUpstreamAttachment = false;
+ bool FoundLocalInvalid = false;
+ ZenCacheValue RecordCacheValue;
+
+ if (EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryLocal) &&
+ m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, RecordCacheValue))
+ {
+ Request.RecordCacheValue = std::move(RecordCacheValue.Value);
+ if (Request.RecordCacheValue.GetContentType() != ZenContentType::kCbObject)
+ {
+ FoundLocalInvalid = true;
+ }
+ else
+ {
+ Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData());
+ ParseValues(Request);
+
+ Request.Complete = true;
+ for (ValueRequestData& Value : Request.Values)
+ {
+ CachePolicy ValuePolicy = Value.DownstreamPolicy;
+ if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal))
+ {
+ // A value that is requested without the Query flag (such as None/Disable) counts as existing, because we
+ // didn't ask for it and thus the record is complete in its absence.
+ if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ Value.Exists = true;
+ }
+ else
+ {
+ NeedUpstreamAttachment = true;
+ Value.ReadFromUpstream = true;
+ Request.Complete = false;
+ }
+ }
+ else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
+ {
+ if (m_CidStore.ContainsChunk(Value.ContentId))
+ {
+ Value.Exists = true;
+ }
+ else
+ {
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ NeedUpstreamAttachment = true;
+ Value.ReadFromUpstream = true;
+ }
+ Request.Complete = false;
+ }
+ }
+ else
+ {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId))
+ {
+ if (Chunk.GetSize() > 0)
+ {
+ Value.Payload = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
+ Value.Exists = true;
+ continue;
+ }
+ else
+ {
+ ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId);
+ }
+ }
+
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ NeedUpstreamAttachment = true;
+ Value.ReadFromUpstream = true;
+ }
+ Request.Complete = false;
+ }
+ }
+ }
+ }
+ if (!Request.Complete)
+ {
+ bool NeedUpstreamRecord = HasUpstream && !Request.RecordObject && !FoundLocalInvalid &&
+ EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryRemote);
+ if (NeedUpstreamRecord || NeedUpstreamAttachment)
+ {
+ UpstreamIndexes.push_back(Requests.size() - 1);
+ }
+ }
+ Request.ElapsedTimeUs = Timer.GetElapsedTimeUs();
+ }
+ if (Requests.empty())
+ {
+ return CbPackage{};
+ }
+
+ if (!UpstreamIndexes.empty())
+ {
+ std::vector<CacheKeyRequest*> UpstreamRequests;
+ UpstreamRequests.reserve(UpstreamIndexes.size());
+ for (size_t Index : UpstreamIndexes)
+ {
+ RecordRequestData& Request = Requests[Index];
+ UpstreamRequests.push_back(&Request.Upstream);
+
+ if (Request.Values.size())
+ {
+ // We will be returning the local object and know all the value Ids that exist in it
+ // Convert all their Downstream Values to upstream values, and add SkipData to any ones that we already have.
+ CachePolicy UpstreamBasePolicy = ConvertToUpstream(Request.DownstreamPolicy.GetBasePolicy()) | CachePolicy::SkipMeta;
+ CacheRecordPolicyBuilder Builder(UpstreamBasePolicy);
+ for (ValueRequestData& Value : Request.Values)
+ {
+ CachePolicy UpstreamPolicy = ConvertToUpstream(Value.DownstreamPolicy);
+ UpstreamPolicy |= !Value.ReadFromUpstream ? CachePolicy::SkipData : CachePolicy::None;
+ Builder.AddValuePolicy(Value.ValueId, UpstreamPolicy);
+ }
+ Request.Upstream.Policy = Builder.Build();
+ }
+ else
+ {
+ // We don't know which Values exist in the Record; ask the upstrem for all values that the client wants,
+ // and convert the CacheRecordPolicy to an upstream policy
+ Request.Upstream.Policy = Request.DownstreamPolicy.ConvertToUpstream();
+ }
+ }
+
+ const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues, Context](CacheRecordGetCompleteParams&& Params) {
+ if (!Params.Record)
+ {
+ return;
+ }
+
+ RecordRequestData& Request =
+ *reinterpret_cast<RecordRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(RecordRequestData, Upstream));
+ Request.ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
+ const CacheKey& Key = Request.Upstream.Key;
+ Stopwatch Timer;
+ auto TimeGuard = MakeGuard([&Timer, &Request]() { Request.ElapsedTimeUs += Timer.GetElapsedTimeUs(); });
+ if (!Request.RecordObject)
+ {
+ CbObject ObjectBuffer = CbObject::Clone(Params.Record);
+ Request.RecordCacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
+ Request.RecordCacheValue.SetContentType(ZenContentType::kCbObject);
+ Request.RecordObject = ObjectBuffer;
+ bool StoreLocal =
+ EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ if (StoreLocal)
+ {
+ std::vector<IoHash> ReferencedAttachments;
+ ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
+ const IoHash ValueHash = HashView.AsHash();
+ ReferencedAttachments.push_back(ValueHash);
+ });
+ m_CacheStore
+ .Put(Context, *Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}, ReferencedAttachments);
+ m_CacheStats.WriteCount++;
+ }
+ ParseValues(Request);
+ Request.Source = Params.Source;
+ }
+
+ Request.Complete = true;
+ for (ValueRequestData& Value : Request.Values)
+ {
+ if (Value.Exists)
+ {
+ continue;
+ }
+ CachePolicy ValuePolicy = Value.DownstreamPolicy;
+ if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ Request.Complete = false;
+ continue;
+ }
+ if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData) || EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal))
+ {
+ bool StoreLocal = EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ if (const CbAttachment* Attachment = Params.Package.FindAttachment(Value.ContentId))
+ {
+ if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
+ {
+ Request.Source = Params.Source;
+ Value.Exists = true;
+ if (StoreLocal)
+ {
+ m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
+ }
+ if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
+ {
+ Value.Payload = Compressed;
+ }
+ }
+ else
+ {
+ ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}/{}'",
+ Value.ContentId,
+ *Namespace,
+ Key.Bucket,
+ Key.Hash);
+ }
+ }
+ if (!Value.Exists && !EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
+ {
+ Request.Complete = false;
+ }
+ // Request.Complete does not need to be set to false for upstream SkipData attachments.
+ // In the PartialRecord==false case, the upstream will have failed the entire record if any SkipData attachment
+ // didn't exist and we will not get here. In the PartialRecord==true case, we do not need to inform the client of
+ // any missing SkipData attachments.
+ }
+ Request.ElapsedTimeUs += Timer.GetElapsedTimeUs();
+ }
+ };
+
+ m_UpstreamCache.GetCacheRecords(*Namespace, UpstreamRequests, std::move(OnCacheRecordGetComplete));
+ }
+
+ {
+ ZEN_TRACE_CPU("Z$::RpcGetCacheRecords::Response");
+ CbPackage ResponsePackage;
+ CbObjectWriter ResponseObject;
+
+ ResponseObject.BeginArray("Result"sv);
+ for (RecordRequestData& Request : Requests)
+ {
+ const CacheKey& Key = Request.Upstream.Key;
+ if (Request.Complete ||
+ (Request.RecordObject && EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::PartialRecord)))
+ {
+ ResponseObject << Request.RecordObject;
+ for (ValueRequestData& Value : Request.Values)
+ {
+ if (!EnumHasAllFlags(Value.DownstreamPolicy, CachePolicy::SkipData) && Value.Payload)
+ {
+ ResponsePackage.AddAttachment(CbAttachment(Value.Payload, Value.ContentId));
+ }
+ }
+
+ ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {}{} ({}) in {}",
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ NiceBytes(Request.RecordCacheValue.Size()),
+ Request.Complete ? ""sv : " (PARTIAL)"sv,
+ Request.Source ? Request.Source->Url : "LOCAL"sv,
+ NiceLatencyNs(Request.ElapsedTimeUs * 1000));
+ m_CacheStats.HitCount++;
+ m_CacheStats.UpstreamHitCount += Request.Source ? 1 : 0;
+ }
+ else
+ {
+ ResponseObject.AddNull();
+
+ if (!EnumHasAnyFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::Query))
+ {
+ // If they requested no query, do not record this as a miss
+ ZEN_DEBUG("GETCACHERECORD DISABLEDQUERY - '{}/{}/{}' in {}",
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ NiceLatencyNs(Request.ElapsedTimeUs * 1000));
+ }
+ else
+ {
+ ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}'{} ({}) in {}",
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ Request.RecordObject ? ""sv : " (PARTIAL)"sv,
+ Request.Source ? Request.Source->Url : "LOCAL"sv,
+ NiceLatencyNs(Request.ElapsedTimeUs * 1000));
+ m_CacheStats.MissCount++;
+ }
+ }
+ }
+ ResponseObject.EndArray();
+ ResponsePackage.SetObject(ResponseObject.Save());
+ return ResponsePackage;
+ }
+}
+
+CbPackage
+CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, const CbPackage& BatchRequest)
+{
+ ZEN_TRACE_CPU("Z$::RpcPutCacheValues");
+ CbObjectView BatchObject = BatchRequest.GetObject();
+ CbObjectView Params = BatchObject["Params"sv].AsObjectView();
+
+ m_CacheStats.RpcValueRequests.fetch_add(1);
+
+ std::string_view PolicyText = Params["DefaultPolicy"].AsString();
+ CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
+ if (!Namespace)
+ {
+ return CbPackage{};
+ }
+ const bool HasUpstream = m_UpstreamCache.IsActive();
+ CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
+
+ std::vector<bool> Results;
+ for (CbFieldView RequestField : RequestsArray)
+ {
+ ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Request");
+
+ m_CacheStats.RpcValueBatchRequests.fetch_add(1);
+
+ Stopwatch Timer;
+
+ CbObjectView RequestObject = RequestField.AsObjectView();
+ CbObjectView KeyView = RequestObject["Key"sv].AsObjectView();
+
+ CacheKey Key;
+ if (!GetRpcRequestCacheKey(KeyView, Key))
+ {
+ return CbPackage{};
+ }
+
+ PolicyText = RequestObject["Policy"sv].AsString();
+ CachePolicy Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
+ IoHash RawHash = RequestObject["RawHash"sv].AsBinaryAttachment();
+ uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64();
+ bool Succeeded = false;
+ uint64_t TransferredSize = 0;
+
+ if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash))
+ {
+ if (Attachment->IsCompressedBinary())
+ {
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
+ {
+ // TODO: Implement upstream puts of CacheValues with StoreLocal == false.
+ // Currently ProcessCacheRecord requires that the value exist in the local cache to put it upstream.
+ Policy |= CachePolicy::StoreLocal;
+ }
+
+ if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
+ {
+ IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
+ Value.SetContentType(ZenContentType::kCompressedBinary);
+ if (RawSize == 0)
+ {
+ RawSize = Chunk.DecodeRawSize();
+ }
+ m_CacheStore
+ .Put(Context, *Namespace, Key.Bucket, Key.Hash, {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, {});
+ m_CacheStats.WriteCount++;
+ TransferredSize = Chunk.GetCompressedSize();
+ }
+ Succeeded = true;
+ }
+ else
+ {
+ ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash);
+ return CbPackage{};
+ }
+ }
+ else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
+ {
+ ZenCacheValue ExistingValue;
+ if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) &&
+ IsCompressedBinary(ExistingValue.Value.GetContentType()))
+ {
+ Succeeded = true;
+ }
+ }
+ // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put.
+ // If it doesn't exist locally they should do the heavy put rather than having us fetch it from upstream.
+
+ if (HasUpstream && Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
+ {
+ m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = Key});
+ }
+ Results.push_back(Succeeded);
+ ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}' in {}",
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ NiceBytes(TransferredSize),
+ Succeeded ? "Added"sv : "Invalid",
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ }
+ if (Results.empty())
+ {
+ return CbPackage{};
+ }
+
+ {
+ ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Response");
+ CbObjectWriter ResponseObject;
+ ResponseObject.BeginArray("Result"sv);
+ for (bool Value : Results)
+ {
+ ResponseObject.AddBool(Value);
+ }
+ ResponseObject.EndArray();
+
+ CbPackage RpcResponse;
+ RpcResponse.SetObject(ResponseObject.Save());
+
+ return RpcResponse;
+ }
+}
+
+CbPackage
+CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView RpcRequest)
+{
+ ZEN_TRACE_CPU("Z$::RpcGetCacheValues");
+ ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv);
+
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ m_CacheStats.RpcValueRequests.fetch_add(1);
+
+ std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
+ CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
+ if (!Namespace)
+ {
+ return CbPackage{};
+ }
+
+ struct RequestData
+ {
+ CacheKey Key;
+ CachePolicy Policy;
+ IoHash RawHash = IoHash::Zero;
+ uint64_t RawSize = 0;
+ CompressedBuffer Result;
+ };
+ std::vector<RequestData> Requests;
+
+ std::vector<size_t> RemoteRequestIndexes;
+
+ const bool HasUpstream = m_UpstreamCache.IsActive();
+
+ CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
+ for (CbFieldView RequestField : RequestsArray)
+ {
+ ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Request");
+
+ m_CacheStats.RpcValueBatchRequests.fetch_add(1);
+
+ Stopwatch Timer;
+
+ RequestData& Request = Requests.emplace_back();
+ CbObjectView RequestObject = RequestField.AsObjectView();
+ CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
+
+ if (!GetRpcRequestCacheKey(KeyObject, Request.Key))
+ {
+ return CbPackage{};
+ }
+
+ PolicyText = RequestObject["Policy"sv].AsString();
+ Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
+
+ CacheKey& Key = Request.Key;
+ CachePolicy Policy = Request.Policy;
+
+ ZenCacheValue CacheValue;
+ if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
+ {
+ if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, CacheValue) &&
+ IsCompressedBinary(CacheValue.Value.GetContentType()))
+ {
+ Request.RawHash = CacheValue.RawHash;
+ Request.RawSize = CacheValue.RawSize;
+ Request.Result = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value));
+ }
+ }
+ if (Request.Result)
+ {
+ ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ NiceBytes(Request.Result.GetCompressed().GetSize()),
+ "LOCAL"sv,
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ m_CacheStats.HitCount++;
+ }
+ else if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::QueryRemote))
+ {
+ RemoteRequestIndexes.push_back(Requests.size() - 1);
+ }
+ else if (!EnumHasAnyFlags(Policy, CachePolicy::Query))
+ {
+ // If they requested no query, do not record this as a miss
+ ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash);
+ }
+ else
+ {
+ ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}",
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ "LOCAL"sv,
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ m_CacheStats.MissCount++;
+ }
+ }
+
+ if (!RemoteRequestIndexes.empty())
+ {
+ std::vector<CacheValueRequest> RequestedRecordsData;
+ std::vector<CacheValueRequest*> CacheValueRequests;
+ RequestedRecordsData.reserve(RemoteRequestIndexes.size());
+ CacheValueRequests.reserve(RemoteRequestIndexes.size());
+ for (size_t Index : RemoteRequestIndexes)
+ {
+ RequestData& Request = Requests[Index];
+ RequestedRecordsData.push_back({.Key = {Request.Key.Bucket, Request.Key.Hash}, .Policy = ConvertToUpstream(Request.Policy)});
+ CacheValueRequests.push_back(&RequestedRecordsData.back());
+ }
+ Stopwatch Timer;
+ m_UpstreamCache.GetCacheValues(
+ *Namespace,
+ CacheValueRequests,
+ [this, Namespace, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer, Context](
+ CacheValueGetCompleteParams&& Params) {
+ CacheValueRequest& ChunkRequest = Params.Request;
+ if (Params.RawHash != IoHash::Zero)
+ {
+ size_t RequestOffset = std::distance(RequestedRecordsData.data(), &ChunkRequest);
+ size_t RequestIndex = RemoteRequestIndexes[RequestOffset];
+ RequestData& Request = Requests[RequestIndex];
+ Request.RawHash = Params.RawHash;
+ Request.RawSize = Params.RawSize;
+ const bool HasData = IsCompressedBinary(Params.Value.GetContentType());
+ const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData);
+ const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ const bool IsHit = SkipData || HasData;
+ if (IsHit)
+ {
+ if (HasData && !SkipData)
+ {
+ Request.Result = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Params.Value));
+ }
+
+ if (HasData && StoreData)
+ {
+ m_CacheStore.Put(Context,
+ *Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash},
+ {});
+ m_CacheStats.WriteCount++;
+ }
+
+ ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
+ *Namespace,
+ ChunkRequest.Key.Bucket,
+ ChunkRequest.Key.Hash,
+ NiceBytes(Request.Result.GetCompressed().GetSize()),
+ Params.Source ? Params.Source->Url : "UPSTREAM",
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ m_CacheStats.HitCount++;
+ m_CacheStats.UpstreamHitCount++;
+ return;
+ }
+ }
+ ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}",
+ *Namespace,
+ ChunkRequest.Key.Bucket,
+ ChunkRequest.Key.Hash,
+ Params.Source ? Params.Source->Url : "UPSTREAM",
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ m_CacheStats.MissCount++;
+ });
+ }
+
+ if (Requests.empty())
+ {
+ return CbPackage{};
+ }
+
+ {
+ ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Response");
+ CbPackage RpcResponse;
+ CbObjectWriter ResponseObject;
+ ResponseObject.BeginArray("Result"sv);
+ for (const RequestData& Request : Requests)
+ {
+ ResponseObject.BeginObject();
+ {
+ const CompressedBuffer& Result = Request.Result;
+ if (Result)
+ {
+ ResponseObject.AddHash("RawHash"sv, Request.RawHash);
+ if (!EnumHasAllFlags(Request.Policy, CachePolicy::SkipData))
+ {
+ RpcResponse.AddAttachment(CbAttachment(Result, Request.RawHash));
+ }
+ else
+ {
+ ResponseObject.AddInteger("RawSize"sv, Request.RawSize);
+ }
+ }
+ else if (Request.RawHash != IoHash::Zero)
+ {
+ ResponseObject.AddHash("RawHash"sv, Request.RawHash);
+ ResponseObject.AddInteger("RawSize"sv, Request.RawSize);
+ }
+ }
+ ResponseObject.EndObject();
+ }
+ ResponseObject.EndArray();
+
+ RpcResponse.SetObject(ResponseObject.Save());
+ return RpcResponse;
+ }
+}
+
+CbPackage
+CacheRpcHandler::HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbObjectView RpcRequest)
+{
+ ZEN_TRACE_CPU("Z$::RpcGetCacheChunks");
+ using namespace cache::detail;
+
+ std::string Namespace;
+ std::vector<CacheKeyRequest> RecordKeys; // Data about a Record necessary to identify it to the upstream
+ std::vector<RecordBody> Records; // Scratch-space data about a Record when fulfilling RecordRequests
+ std::vector<CacheChunkRequest> RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream
+ std::vector<ChunkRequest> Requests; // Intermediate and result data about a ChunkRequest
+ std::vector<ChunkRequest*> RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key
+ std::vector<ChunkRequest*> ValueRequests; // The ChunkRequests that are requesting a Value Key
+ std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream
+
+ // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests
+ if (!ParseGetCacheChunksRequest(Namespace, RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest))
+ {
+ return CbPackage{};
+ }
+
+ // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we
+ // have it locally, and otherwise append a request for the payload to UpstreamChunks
+ GetLocalCacheRecords(Context, Namespace, RecordKeys, Records, RecordRequests, UpstreamChunks);
+
+ // For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks
+ GetLocalCacheValues(Context, Namespace, ValueRequests, UpstreamChunks);
+
+ // Call GetCacheChunks on the upstream for any payloads we do not have locally
+ GetUpstreamCacheChunks(Context, Namespace, UpstreamChunks, RequestKeys, Requests);
+
+ // Send the payload and descriptive data about each chunk to the client
+ return WriteGetCacheChunksResponse(Context, Namespace, Requests);
+}
+
+bool
+CacheRpcHandler::ParseGetCacheChunksRequest(std::string& Namespace,
+ std::vector<CacheKeyRequest>& RecordKeys,
+ std::vector<cache::detail::RecordBody>& Records,
+ std::vector<CacheChunkRequest>& RequestKeys,
+ std::vector<cache::detail::ChunkRequest>& Requests,
+ std::vector<cache::detail::ChunkRequest*>& RecordRequests,
+ std::vector<cache::detail::ChunkRequest*>& ValueRequests,
+ CbObjectView RpcRequest)
+{
+ ZEN_TRACE_CPU("Z$::ParseGetCacheChunksRequest");
+
+ using namespace cache::detail;
+
+ ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv);
+
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ m_CacheStats.RpcChunkRequests.fetch_add(1);
+
+ std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString();
+ CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default;
+
+ std::optional<std::string> NamespaceText = GetRpcRequestNamespace(Params);
+ if (!NamespaceText)
+ {
+ ZEN_WARN("GetCacheChunks: Invalid namespace in ChunkRequest.");
+ return false;
+ }
+ Namespace = *NamespaceText;
+
+ CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView();
+ size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num());
+ // Note that these reservations allow us to take pointers to the elements while populating them. If the reservation is removed,
+ // we will need to change the pointers to indexes to handle reallocations.
+ RecordKeys.reserve(NumRequests);
+ Records.reserve(NumRequests);
+ RequestKeys.reserve(NumRequests);
+ Requests.reserve(NumRequests);
+ RecordRequests.reserve(NumRequests);
+ ValueRequests.reserve(NumRequests);
+
+ CacheKeyRequest* PreviousRecordKey = nullptr;
+ RecordBody* PreviousRecord = nullptr;
+
+ for (CbFieldView RequestView : ChunkRequestsArray)
+ {
+ ZEN_TRACE_CPU("Z$::ParseGetCacheChunksRequest::Request");
+
+ m_CacheStats.RpcChunkBatchRequests.fetch_add(1);
+
+ CbObjectView RequestObject = RequestView.AsObjectView();
+ CacheChunkRequest& RequestKey = RequestKeys.emplace_back();
+ ChunkRequest& Request = Requests.emplace_back();
+ CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
+
+ Request.Key = &RequestKey;
+ if (!GetRpcRequestCacheKey(KeyObject, Request.Key->Key))
+ {
+ ZEN_WARN("GetCacheChunks: Invalid key in ChunkRequest.");
+ return false;
+ }
+
+ RequestKey.ChunkId = RequestObject["ChunkId"sv].AsHash();
+ RequestKey.ValueId = RequestObject["ValueId"sv].AsObjectId();
+ RequestKey.RawOffset = RequestObject["RawOffset"sv].AsUInt64();
+ RequestKey.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX);
+ Request.RequestedSize = RequestKey.RawSize;
+ Request.RequestedOffset = RequestKey.RawOffset;
+ std::string_view PolicyText = RequestObject["Policy"sv].AsString();
+ Request.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
+ Request.IsRecordRequest = (bool)RequestKey.ValueId;
+
+ if (!Request.IsRecordRequest)
+ {
+ ValueRequests.push_back(&Request);
+ }
+ else
+ {
+ RecordRequests.push_back(&Request);
+ CacheKeyRequest* RecordKey = nullptr;
+ RecordBody* Record = nullptr;
+
+ if (!PreviousRecordKey || PreviousRecordKey->Key < RequestKey.Key)
+ {
+ RecordKey = &RecordKeys.emplace_back();
+ PreviousRecordKey = RecordKey;
+ Record = &Records.emplace_back();
+ PreviousRecord = Record;
+ RecordKey->Key = RequestKey.Key;
+ }
+ else if (RequestKey.Key == PreviousRecordKey->Key)
+ {
+ RecordKey = PreviousRecordKey;
+ Record = PreviousRecord;
+ }
+ else
+ {
+ ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.",
+ RequestKey.Key.Bucket,
+ RequestKey.Key.Hash,
+ PreviousRecordKey->Key.Bucket,
+ PreviousRecordKey->Key.Hash);
+ return false;
+ }
+ Request.Record = Record;
+ if (RequestKey.ChunkId == RequestKey.ChunkId.Zero)
+ {
+ Record->DownstreamPolicy =
+ Record->HasRequest ? Union(Record->DownstreamPolicy, Request.DownstreamPolicy) : Request.DownstreamPolicy;
+ Record->HasRequest = true;
+ }
+ }
+ }
+ if (Requests.empty())
+ {
+ return false;
+ }
+ return true;
+}
+
+void
+CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::vector<CacheKeyRequest>& RecordKeys,
+ std::vector<cache::detail::RecordBody>& Records,
+ std::vector<cache::detail::ChunkRequest*>& RecordRequests,
+ std::vector<CacheChunkRequest*>& OutUpstreamChunks)
+{
+ ZEN_TRACE_CPU("Z$::GetLocalCacheRecords");
+
+ using namespace cache::detail;
+ const bool HasUpstream = m_UpstreamCache.IsActive();
+
+ std::vector<CacheKeyRequest*> UpstreamRecordRequests;
+ for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex)
+ {
+ ZEN_TRACE_CPU("Z$::GetLocalCacheRecords::Record");
+
+ Stopwatch Timer;
+ CacheKeyRequest& RecordKey = RecordKeys[RecordIndex];
+ RecordBody& Record = Records[RecordIndex];
+ if (Record.HasRequest)
+ {
+ Record.DownstreamPolicy |= CachePolicy::SkipData | CachePolicy::SkipMeta;
+
+ if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal))
+ {
+ ZenCacheValue CacheValue;
+ if (m_CacheStore.Get(Context, Namespace, RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue))
+ {
+ Record.Exists = true;
+ Record.CacheValue = std::move(CacheValue.Value);
+ }
+ }
+ if (HasUpstream && !Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryRemote))
+ {
+ RecordKey.Policy = CacheRecordPolicy(ConvertToUpstream(Record.DownstreamPolicy));
+ UpstreamRecordRequests.push_back(&RecordKey);
+ }
+ RecordRequests[RecordIndex]->ElapsedTimeUs += Timer.GetElapsedTimeUs();
+ }
+ }
+
+ if (!UpstreamRecordRequests.empty())
+ {
+ const auto OnCacheRecordGetComplete =
+ [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](CacheRecordGetCompleteParams&& Params) {
+ if (!Params.Record)
+ {
+ return;
+ }
+ CacheKeyRequest& RecordKey = Params.Request;
+ size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey);
+ RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
+ RecordBody& Record = Records[RecordIndex];
+
+ const CacheKey& Key = RecordKey.Key;
+ Record.Exists = true;
+ CbObject ObjectBuffer = CbObject::Clone(Params.Record);
+ Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
+ Record.CacheValue.SetContentType(ZenContentType::kCbObject);
+ Record.Source = Params.Source;
+
+ bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ if (StoreLocal)
+ {
+ std::vector<IoHash> ReferencedAttachments;
+ ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
+ const IoHash ValueHash = HashView.AsHash();
+ ReferencedAttachments.push_back(ValueHash);
+ });
+ m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}, ReferencedAttachments);
+ m_CacheStats.WriteCount++;
+ }
+ };
+ m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete));
+ }
+
+ for (ChunkRequest* Request : RecordRequests)
+ {
+ ZEN_TRACE_CPU("Z$::GetLocalCacheRecords::Chunk");
+
+ Stopwatch Timer;
+ if (Request->Key->ChunkId == IoHash::Zero)
+ {
+ // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId)
+ // is missing, parse the cache record and try to find the raw hash from the ValueId.
+ RecordBody& Record = *Request->Record;
+ if (!Record.ValuesRead)
+ {
+ Record.ValuesRead = true;
+ if (Record.CacheValue && Record.CacheValue.GetContentType() == ZenContentType::kCbObject)
+ {
+ CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData());
+ CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView();
+ Record.Values.reserve(ValuesArray.Num());
+ for (CbFieldView ValueField : ValuesArray)
+ {
+ CbObjectView ValueObject = ValueField.AsObjectView();
+ Oid ValueId = ValueObject["Id"sv].AsObjectId();
+ CbFieldView RawHashField = ValueObject["RawHash"sv];
+ IoHash RawHash = RawHashField.AsBinaryAttachment();
+ if (ValueId && !RawHashField.HasError())
+ {
+ Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()});
+ }
+ }
+ }
+ }
+
+ for (const RecordValue& Value : Record.Values)
+ {
+ if (Value.ValueId == Request->Key->ValueId)
+ {
+ Request->Key->ChunkId = Value.ContentId;
+ Request->RawSize = Value.RawSize;
+ Request->RawSizeKnown = true;
+ break;
+ }
+ }
+ }
+
+ // Now load the ContentId from the local ContentIdStore or from the upstream
+ if (Request->Key->ChunkId != IoHash::Zero)
+ {
+ if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
+ {
+ if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown)
+ {
+ if (m_CidStore.ContainsChunk(Request->Key->ChunkId))
+ {
+ Request->Exists = true;
+ }
+ }
+ else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId))
+ {
+ if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
+ {
+ Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(Payload));
+ if (Request->Value)
+ {
+ Request->Exists = true;
+ Request->RawSizeKnown = false;
+ }
+ }
+ else
+ {
+ IoHash _;
+ if (CompressedBuffer::ValidateCompressedHeader(Payload, _, Request->RawSize))
+ {
+ Request->Exists = true;
+ Request->RawSizeKnown = true;
+ }
+ }
+ }
+ }
+ if (HasUpstream && !Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
+ {
+ Request->Key->Policy = ConvertToUpstream(Request->DownstreamPolicy);
+ OutUpstreamChunks.push_back(Request->Key);
+ }
+ }
+ Request->ElapsedTimeUs += Timer.GetElapsedTimeUs();
+ }
+}
+
+void
+CacheRpcHandler::GetLocalCacheValues(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::vector<cache::detail::ChunkRequest*>& ValueRequests,
+ std::vector<CacheChunkRequest*>& OutUpstreamChunks)
+{
+ ZEN_TRACE_CPU("Z$::GetLocalCacheValues");
+
+ using namespace cache::detail;
+ const bool HasUpstream = m_UpstreamCache.IsActive();
+
+ for (ChunkRequest* Request : ValueRequests)
+ {
+ ZEN_TRACE_CPU("Z$::GetLocalCacheValues::Value");
+
+ Stopwatch Timer;
+ if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
+ {
+ ZenCacheValue CacheValue;
+ if (m_CacheStore.Get(Context, Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue))
+ {
+ if (IsCompressedBinary(CacheValue.Value.GetContentType()))
+ {
+ Request->Key->ChunkId = CacheValue.RawHash;
+ Request->Exists = true;
+ Request->RawSize = CacheValue.RawSize;
+ Request->RawSizeKnown = true;
+ if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
+ {
+ Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value));
+ }
+ }
+ }
+ }
+ if (HasUpstream && !Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
+ {
+ if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::StoreLocal))
+ {
+ // Convert the Offset,Size request into a request for the entire value; we will need it all to be able to store it locally
+ Request->Key->RawOffset = 0;
+ Request->Key->RawSize = UINT64_MAX;
+ }
+ OutUpstreamChunks.push_back(Request->Key);
+ }
+ Request->ElapsedTimeUs += Timer.GetElapsedTimeUs();
+ }
+}
+
+void
+CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::vector<CacheChunkRequest*>& UpstreamChunks,
+ std::vector<CacheChunkRequest>& RequestKeys,
+ std::vector<cache::detail::ChunkRequest>& Requests)
+{
+ if (UpstreamChunks.empty())
+ {
+ return;
+ }
+ ZEN_TRACE_CPU("Z$::GetUpstreamCacheChunks");
+
+ using namespace cache::detail;
+
+ const auto OnCacheChunksGetComplete = [this, Namespace, &RequestKeys, &Requests, Context](CacheChunkGetCompleteParams&& Params) {
+ if (Params.RawHash == Params.RawHash.Zero)
+ {
+ return;
+ }
+
+ CacheChunkRequest& Key = Params.Request;
+ size_t RequestIndex = std::distance(RequestKeys.data(), &Key);
+ ChunkRequest& Request = Requests[RequestIndex];
+ Request.ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
+ if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) ||
+ !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Params.Value));
+ if (!Compressed)
+ {
+ return;
+ }
+
+ bool StoreLocal = EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ if (StoreLocal)
+ {
+ if (Request.IsRecordRequest)
+ {
+ m_CidStore.AddChunk(Params.Value, Params.RawHash);
+ }
+ else
+ {
+ m_CacheStore.Put(Context,
+ Namespace,
+ Key.Key.Bucket,
+ Key.Key.Hash,
+ {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash},
+ {});
+ m_CacheStats.WriteCount++;
+ }
+ }
+ if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
+ {
+ Request.Value = std::move(Compressed);
+ }
+ }
+ Key.ChunkId = Params.RawHash;
+ Request.Exists = true;
+ Request.RawSize = Params.RawSize;
+ Request.RawSizeKnown = true;
+ Request.Source = Params.Source;
+
+ m_CacheStats.UpstreamHitCount++;
+ };
+
+ m_UpstreamCache.GetCacheChunks(Namespace, UpstreamChunks, std::move(OnCacheChunksGetComplete));
+}
+
+CbPackage
+CacheRpcHandler::WriteGetCacheChunksResponse([[maybe_unused]] const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::vector<cache::detail::ChunkRequest>& Requests)
+{
+ ZEN_TRACE_CPU("Z$::WriteGetCacheChunksResponse");
+
+ using namespace cache::detail;
+
+ CbPackage RpcResponse;
+ CbObjectWriter Writer;
+
+ Writer.BeginArray("Result"sv);
+ for (ChunkRequest& Request : Requests)
+ {
+ ZEN_TRACE_CPU("Z$::WriteGetCacheChunksResponse::Request");
+
+ Writer.BeginObject();
+ {
+ if (Request.Exists)
+ {
+ Writer.AddHash("RawHash"sv, Request.Key->ChunkId);
+ if (Request.Value && !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
+ {
+ RpcResponse.AddAttachment(CbAttachment(Request.Value, Request.Key->ChunkId));
+ }
+ else
+ {
+ Writer.AddInteger("RawSize"sv, Request.RawSize);
+ }
+
+ ZEN_DEBUG("GETCACHECHUNKS HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}",
+ Namespace,
+ Request.Key->Key.Bucket,
+ Request.Key->Key.Hash,
+ Request.Key->ValueId,
+ NiceBytes(Request.RawSize),
+ Request.IsRecordRequest ? "Record"sv : "Value"sv,
+ Request.Source ? Request.Source->Url : "LOCAL"sv,
+ NiceLatencyNs(Request.ElapsedTimeUs * 1000));
+ m_CacheStats.HitCount++;
+ }
+ else if (!EnumHasAnyFlags(Request.DownstreamPolicy, CachePolicy::Query))
+ {
+ ZEN_DEBUG("GETCACHECHUNKS DISABLEDQUERY - '{}/{}/{}/{}' in {}",
+ Namespace,
+ Request.Key->Key.Bucket,
+ Request.Key->Key.Hash,
+ Request.Key->ValueId,
+ NiceLatencyNs(Request.ElapsedTimeUs * 1000));
+ }
+ else
+ {
+ ZEN_DEBUG("GETCACHECHUNKS MISS - '{}/{}/{}/{}' in {}",
+ Namespace,
+ Request.Key->Key.Bucket,
+ Request.Key->Key.Hash,
+ Request.Key->ValueId,
+ NiceLatencyNs(Request.ElapsedTimeUs * 1000));
+ m_CacheStats.MissCount++;
+ }
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+
+ RpcResponse.SetObject(Writer.Save());
+ return RpcResponse;
+}
+
+} // namespace zen \ No newline at end of file
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index f92baaf0b..fd04af2a3 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -1,6 +1,6 @@
// Copyright Epic Games, Inc. All Rights Reserved.
-#include "zenstore/structuredcachestore.h"
+#include "zenstore/cache/structuredcachestore.h"
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
diff --git a/src/zenstore/include/zenstore/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h
index 6997a12e4..6997a12e4 100644
--- a/src/zenstore/include/zenstore/cachedisklayer.h
+++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h
diff --git a/src/zenstore/include/zenstore/cache/cacherpc.h b/src/zenstore/include/zenstore/cache/cacherpc.h
new file mode 100644
index 000000000..c57e2818c
--- /dev/null
+++ b/src/zenstore/include/zenstore/cache/cacherpc.h
@@ -0,0 +1,155 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/iobuffer.h>
+#include <zencore/logging.h>
+#include <zenutil/cache/cache.h>
+
+#include <atomic>
+#include <string_view>
+#include <vector>
+
+namespace zen {
+
+namespace cache { namespace detail {
+ struct RecordBody;
+ struct ChunkRequest;
+}} // namespace cache::detail
+
+struct CacheChunkRequest;
+struct CacheKeyRequest;
+struct PutRequestData;
+
+class CbPackage;
+class CbObjectView;
+class CidStore;
+class DiskWriteBlocker;
+class HttpStructuredCacheService;
+class UpstreamCacheClient;
+class ZenCacheStore;
+
+enum class CachePolicy : uint32_t;
+enum class RpcAcceptOptions : uint16_t;
+
+struct AttachmentCount
+{
+ uint32_t New = 0;
+ uint32_t Valid = 0;
+ uint32_t Invalid = 0;
+ uint32_t Total = 0;
+};
+
+struct CacheStats
+{
+ std::atomic_uint64_t HitCount{};
+ std::atomic_uint64_t UpstreamHitCount{};
+ std::atomic_uint64_t MissCount{};
+ std::atomic_uint64_t WriteCount{};
+ std::atomic_uint64_t BadRequestCount{};
+ std::atomic_uint64_t RpcRequests{};
+ std::atomic_uint64_t RpcRecordRequests{};
+ std::atomic_uint64_t RpcRecordBatchRequests{};
+ std::atomic_uint64_t RpcValueRequests{};
+ std::atomic_uint64_t RpcValueBatchRequests{};
+ std::atomic_uint64_t RpcChunkRequests{};
+ std::atomic_uint64_t RpcChunkBatchRequests{};
+};
+
+enum class PutResult
+{
+ Success,
+ Fail,
+ Invalid,
+};
+
+/** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys.
+
+ We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers.
+ */
+
+constexpr bool
+IsCompressedBinary(ZenContentType Type)
+{
+ return Type == ZenContentType::kBinary || Type == ZenContentType::kCompressedBinary;
+}
+
+struct CacheRpcHandler
+{
+ CacheRpcHandler(LoggerRef InLog,
+ CacheStats& InCacheStats,
+ UpstreamCacheClient& InUpstreamCache,
+ ZenCacheStore& InCacheStore,
+ CidStore& InCidStore,
+ const DiskWriteBlocker* InDiskWriteBlocker);
+ ~CacheRpcHandler();
+
+ enum class RpcResponseCode
+ {
+ InsufficientStorage = 507,
+ BadRequest = 400,
+ OK = 200
+ };
+
+ RpcResponseCode HandleRpcRequest(const CacheRequestContext& Context,
+ const ZenContentType ContentType,
+ IoBuffer&& Body,
+ uint32_t& OutAcceptMagic,
+ RpcAcceptOptions& OutAcceptFlags,
+ int& OutTargetProcessId,
+ CbPackage& OutPackage);
+
+private:
+ CbPackage HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest);
+ CbPackage HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView BatchRequest);
+ CbPackage HandleRpcPutCacheValues(const CacheRequestContext& Context, const CbPackage& BatchRequest);
+ CbPackage HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest);
+ CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbObjectView BatchRequest);
+
+ PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
+
+ /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */
+ bool ParseGetCacheChunksRequest(std::string& Namespace,
+ std::vector<CacheKeyRequest>& RecordKeys,
+ std::vector<cache::detail::RecordBody>& Records,
+ std::vector<CacheChunkRequest>& RequestKeys,
+ std::vector<cache::detail::ChunkRequest>& Requests,
+ std::vector<cache::detail::ChunkRequest*>& RecordRequests,
+ std::vector<cache::detail::ChunkRequest*>& ValueRequests,
+ CbObjectView RpcRequest);
+ /** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally.
+ */
+ void GetLocalCacheRecords(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::vector<CacheKeyRequest>& RecordKeys,
+ std::vector<cache::detail::RecordBody>& Records,
+ std::vector<cache::detail::ChunkRequest*>& RecordRequests,
+ std::vector<CacheChunkRequest*>& OutUpstreamChunks);
+ /** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */
+ void GetLocalCacheValues(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::vector<cache::detail::ChunkRequest*>& ValueRequests,
+ std::vector<CacheChunkRequest*>& OutUpstreamChunks);
+ /** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */
+ void GetUpstreamCacheChunks(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::vector<CacheChunkRequest*>& UpstreamChunks,
+ std::vector<CacheChunkRequest>& RequestKeys,
+ std::vector<cache::detail::ChunkRequest>& Requests);
+ /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */
+ CbPackage WriteGetCacheChunksResponse(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::vector<cache::detail::ChunkRequest>& Requests);
+
+ LoggerRef Log() { return m_Log; }
+ LoggerRef m_Log;
+ CacheStats& m_CacheStats;
+ UpstreamCacheClient& m_UpstreamCache;
+ ZenCacheStore& m_CacheStore;
+ CidStore& m_CidStore;
+ const DiskWriteBlocker* m_DiskWriteBlocker = nullptr;
+
+ bool AreDiskWritesAllowed() const;
+};
+
+} // namespace zen \ No newline at end of file
diff --git a/src/zenstore/include/zenstore/cacheshared.h b/src/zenstore/include/zenstore/cache/cacheshared.h
index e3e8a2f84..e3e8a2f84 100644
--- a/src/zenstore/include/zenstore/cacheshared.h
+++ b/src/zenstore/include/zenstore/cache/cacheshared.h
diff --git a/src/zenstore/include/zenstore/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h
index 56b2105a9..89d2abd11 100644
--- a/src/zenstore/include/zenstore/structuredcachestore.h
+++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h
@@ -5,7 +5,7 @@
#include <zencore/compactbinary.h>
#include <zencore/iohash.h>
#include <zencore/stats.h>
-#include <zenstore/cachedisklayer.h>
+#include <zenstore/cache/cachedisklayer.h>
#include <zenstore/gc.h>
#include <zenutil/cache/cache.h>
#include <zenutil/statsreporter.h>
diff --git a/src/zenstore/include/zenstore/cache/upstreamcacheclient.h b/src/zenstore/include/zenstore/cache/upstreamcacheclient.h
new file mode 100644
index 000000000..152031c3a
--- /dev/null
+++ b/src/zenstore/include/zenstore/cache/upstreamcacheclient.h
@@ -0,0 +1,119 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/compactbinary.h>
+#include <zencore/compress.h>
+#include <zencore/iobuffer.h>
+#include <zencore/iohash.h>
+#include <zencore/stats.h>
+#include <zencore/zencore.h>
+#include <zenutil/cache/cache.h>
+
+#include <functional>
+#include <memory>
+#include <string>
+#include <vector>
+
+namespace zen {
+
+class CbObjectView;
+class CbPackage;
+
+struct UpstreamCacheRecord
+{
+ ZenContentType Type = ZenContentType::kBinary;
+ std::string Namespace;
+ CacheKey Key;
+ std::vector<IoHash> ValueContentIds;
+ CacheRequestContext Context;
+};
+
+struct UpstreamError
+{
+ int32_t ErrorCode{};
+ std::string Reason{};
+
+ explicit operator bool() const { return ErrorCode != 0; }
+};
+
+struct UpstreamEndpointInfo
+{
+ std::string Name;
+ std::string Url;
+};
+
+struct GetUpstreamCacheResult
+{
+ UpstreamError Error{};
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ bool Success = false;
+};
+
+struct PutUpstreamCacheResult
+{
+ std::string Reason;
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ bool Success = false;
+};
+
+struct CacheRecordGetCompleteParams
+{
+ CacheKeyRequest& Request;
+ const CbObjectView& Record;
+ const CbPackage& Package;
+ double ElapsedSeconds{};
+ const UpstreamEndpointInfo* Source = nullptr;
+};
+
+using OnCacheRecordGetComplete = std::function<void(CacheRecordGetCompleteParams&&)>;
+
+struct CacheValueGetCompleteParams
+{
+ CacheValueRequest& Request;
+ IoHash RawHash;
+ uint64_t RawSize;
+ IoBuffer Value;
+ double ElapsedSeconds{};
+ const UpstreamEndpointInfo* Source = nullptr;
+};
+
+using OnCacheValueGetComplete = std::function<void(CacheValueGetCompleteParams&&)>;
+
+struct CacheChunkGetCompleteParams
+{
+ CacheChunkRequest& Request;
+ IoHash RawHash;
+ uint64_t RawSize;
+ IoBuffer Value;
+ double ElapsedSeconds{};
+ const UpstreamEndpointInfo* Source = nullptr;
+};
+
+using OnCacheChunksGetComplete = std::function<void(CacheChunkGetCompleteParams&&)>;
+
+class UpstreamCacheClient
+{
+public:
+ virtual ~UpstreamCacheClient() = default;
+
+ virtual bool IsActive() = 0;
+
+ virtual void GetCacheValues(std::string_view Namespace,
+ std::span<CacheValueRequest*> CacheValueRequests,
+ OnCacheValueGetComplete&& OnComplete) = 0;
+
+ virtual void GetCacheRecords(std::string_view Namespace,
+ std::span<CacheKeyRequest*> Requests,
+ OnCacheRecordGetComplete&& OnComplete) = 0;
+
+ virtual void GetCacheChunks(std::string_view Namespace,
+ std::span<CacheChunkRequest*> CacheChunkRequests,
+ OnCacheChunksGetComplete&& OnComplete) = 0;
+
+ virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0;
+};
+
+} // namespace zen
diff --git a/src/zenhttp/include/zenhttp/httpshared.h b/src/zenutil/include/zenutil/packageformat.h
index cf74b6b21..0b8495fbd 100644
--- a/src/zenhttp/include/zenhttp/httpshared.h
+++ b/src/zenutil/include/zenutil/packageformat.h
@@ -158,6 +158,6 @@ private:
IoBuffer MarshalLocalChunkReference(IoBuffer AttachmentBuffer);
};
-void forcelink_httpshared();
+void forcelink_packageformat();
} // namespace zen
diff --git a/src/zenhttp/httpshared.cpp b/src/zenutil/packageformat.cpp
index ca014bf1c..015782283 100644
--- a/src/zenhttp/httpshared.cpp
+++ b/src/zenutil/packageformat.cpp
@@ -1,6 +1,6 @@
// Copyright Epic Games, Inc. All Rights Reserved.
-#include <zenhttp/httpshared.h>
+#include <zenutil/packageformat.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
@@ -818,7 +818,7 @@ TEST_CASE("CbPackage.LocalRef")
}
void
-forcelink_httpshared()
+forcelink_packageformat()
{
}
diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp
index d9d6c83a2..8544f3401 100644
--- a/src/zenutil/zenutil.cpp
+++ b/src/zenutil/zenutil.cpp
@@ -6,6 +6,7 @@
# include <zenutil/basicfile.h>
# include <zenutil/cache/rpcrecording.h>
+# include <zenutil/packageformat.h>
namespace zen {
@@ -14,6 +15,7 @@ zenutil_forcelinktests()
{
basicfile_forcelink();
cache::rpcrecord_forcelink();
+ forcelink_packageformat();
}
} // namespace zen