aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGELOG.md6
-rw-r--r--VERSION.txt2
-rw-r--r--src/zen/cmds/print_cmd.cpp2
-rw-r--r--src/zen/cmds/rpcreplay_cmd.cpp2
-rw-r--r--src/zenhttp/httpclient.cpp2
-rw-r--r--src/zenhttp/httpserver.cpp2
-rw-r--r--src/zenhttp/servers/httpsys.cpp2
-rw-r--r--src/zenhttp/xmake.lua2
-rw-r--r--src/zenhttp/zenhttp.cpp3
-rw-r--r--src/zennet/zennet.cpp2
-rw-r--r--src/zenserver-test/zenserver-test.cpp2
-rw-r--r--src/zenserver/admin/admin.cpp2
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp1609
-rw-r--r--src/zenserver/cache/httpstructuredcache.h85
-rw-r--r--src/zenserver/projectstore/projectstore.cpp2
-rw-r--r--src/zenserver/projectstore/zenremoteprojectstore.cpp2
-rw-r--r--src/zenserver/upstream/upstreamcache.cpp6
-rw-r--r--src/zenserver/upstream/upstreamcache.h128
-rw-r--r--src/zenserver/upstream/zen.cpp4
-rw-r--r--src/zenserver/vfs/vfsimpl.cpp2
-rw-r--r--src/zenserver/zenserver.cpp2
-rw-r--r--src/zenserver/zenserver.h2
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp2
-rw-r--r--src/zenstore/cache/cacherpc.cpp1640
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp2
-rw-r--r--src/zenstore/include/zenstore/cache/cachedisklayer.h (renamed from src/zenstore/include/zenstore/cachedisklayer.h)0
-rw-r--r--src/zenstore/include/zenstore/cache/cacherpc.h155
-rw-r--r--src/zenstore/include/zenstore/cache/cacheshared.h (renamed from src/zenstore/include/zenstore/cacheshared.h)0
-rw-r--r--src/zenstore/include/zenstore/cache/structuredcachestore.h (renamed from src/zenstore/include/zenstore/structuredcachestore.h)2
-rw-r--r--src/zenstore/include/zenstore/cache/upstreamcacheclient.h119
-rw-r--r--src/zenutil/cache/rpcrecording.cpp9
-rw-r--r--src/zenutil/include/zenutil/packageformat.h (renamed from src/zenhttp/include/zenhttp/httpshared.h)2
-rw-r--r--src/zenutil/packageformat.cpp (renamed from src/zenhttp/httpshared.cpp)4
-rw-r--r--src/zenutil/zenutil.cpp2
34 files changed, 2015 insertions, 1793 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9c587ef4f..449b5ffdf 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,8 @@
##
+- Improvement: Separated cache RPC handling code from general structured cache HTTP code
+- Bugfix: RPC recording would not release memory as early as intended which resulted in memory buildup during long recording sessions. Previously certain memory was only released when recording stopped, now it gets released immediately when a segment is complete and written to disk.
+
+## 0.2.38
- Bugfix: Cache RPC recording would drop data when it reached 4GB of inline chunk data in a segment
- Bugfix: Fixed thread safety issues in RPC recorder v2
- Bugfix: `IoBuffer::Materialize` would leak memory for small buffers
@@ -13,7 +17,7 @@
- Improvement: Windows executables are now signed with official cert when creating a release
- Improvement: Each block in block store that is rewritten will now be logged for better feedback
-## 0.2 37
+## 0.2.37
- Bugfix: ShutdownLogging code would throw an exception if it was called before everything had been initialised properly
- Bugfix: Reorder shutdown to avoid crash due to late async log messages (spdlog workaround)
- Bugfix: Correctly calculate peak disk write size in GC status message
diff --git a/VERSION.txt b/VERSION.txt
index 15de20c94..f70c1e130 100644
--- a/VERSION.txt
+++ b/VERSION.txt
@@ -1 +1 @@
-0.2.38 \ No newline at end of file
+0.2.39-pre0 \ No newline at end of file
diff --git a/src/zen/cmds/print_cmd.cpp b/src/zen/cmds/print_cmd.cpp
index ccd7f248e..fa42a3563 100644
--- a/src/zen/cmds/print_cmd.cpp
+++ b/src/zen/cmds/print_cmd.cpp
@@ -8,7 +8,7 @@
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/string.h>
-#include <zenhttp/httpshared.h>
+#include <zenutil/packageformat.h>
using namespace std::literals;
diff --git a/src/zen/cmds/rpcreplay_cmd.cpp b/src/zen/cmds/rpcreplay_cmd.cpp
index 0b6236cb5..b54cb9681 100644
--- a/src/zen/cmds/rpcreplay_cmd.cpp
+++ b/src/zen/cmds/rpcreplay_cmd.cpp
@@ -14,8 +14,8 @@
#include <zencore/timer.h>
#include <zencore/workthreadpool.h>
#include <zenhttp/httpcommon.h>
-#include <zenhttp/httpshared.h>
#include <zenutil/cache/rpcrecording.h>
+#include <zenutil/packageformat.h>
#include <zencore/jobqueue.h>
#include <zenstore/structuredcachestore.h>
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index 1299d51c1..a29a08a3c 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -16,7 +16,7 @@
#include <zencore/testing.h>
#include <zencore/trace.h>
#include <zenhttp/formatters.h>
-#include <zenhttp/httpshared.h>
+#include <zenutil/packageformat.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <cpr/cpr.h>
diff --git a/src/zenhttp/httpserver.cpp b/src/zenhttp/httpserver.cpp
index 97d6a01fe..3270855ad 100644
--- a/src/zenhttp/httpserver.cpp
+++ b/src/zenhttp/httpserver.cpp
@@ -24,7 +24,7 @@
#include <zencore/string.h>
#include <zencore/testing.h>
#include <zencore/thread.h>
-#include <zenhttp/httpshared.h>
+#include <zenutil/packageformat.h>
#include <charconv>
#include <mutex>
diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp
index d2cb63cd7..5cd273c40 100644
--- a/src/zenhttp/servers/httpsys.cpp
+++ b/src/zenhttp/servers/httpsys.cpp
@@ -13,7 +13,7 @@
#include <zencore/string.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
-#include <zenhttp/httpshared.h>
+#include <zenutil/packageformat.h>
#if ZEN_WITH_HTTPSYS
# define _WINSOCKAPI_
diff --git a/src/zenhttp/xmake.lua b/src/zenhttp/xmake.lua
index b6ffbe467..8393f399b 100644
--- a/src/zenhttp/xmake.lua
+++ b/src/zenhttp/xmake.lua
@@ -7,7 +7,7 @@ target('zenhttp')
add_files("**.cpp")
add_files("servers/httpsys.cpp", {unity_ignored=true})
add_includedirs("include", {public=true})
- add_deps("zencore", "transport-sdk")
+ add_deps("zencore", "zenutil", "transport-sdk")
add_packages(
"vcpkg::asio",
"vcpkg::cpr",
diff --git a/src/zenhttp/zenhttp.cpp b/src/zenhttp/zenhttp.cpp
index 3e5b387b8..6b855c4db 100644
--- a/src/zenhttp/zenhttp.cpp
+++ b/src/zenhttp/zenhttp.cpp
@@ -6,7 +6,7 @@
# include <zenhttp/httpclient.h>
# include <zenhttp/httpserver.h>
-# include <zenhttp/httpshared.h>
+# include <zenutil/packageformat.h>
namespace zen {
@@ -15,7 +15,6 @@ zenhttp_forcelinktests()
{
http_forcelink();
httpclient_forcelink();
- forcelink_httpshared();
}
} // namespace zen
diff --git a/src/zennet/zennet.cpp b/src/zennet/zennet.cpp
index 30a012a3b..47157c3ed 100644
--- a/src/zennet/zennet.cpp
+++ b/src/zennet/zennet.cpp
@@ -12,7 +12,7 @@ void
zennet_forcelinktests()
{
#if ZEN_WITH_TESTS
- zen::statsd_forcelink();
+ statsd_forcelink();
#endif
}
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp
index 3efa57fdb..5dfbc6327 100644
--- a/src/zenserver-test/zenserver-test.cpp
+++ b/src/zenserver-test/zenserver-test.cpp
@@ -20,11 +20,11 @@
#include <zencore/timer.h>
#include <zencore/xxhash.h>
#include <zenhttp/httpclient.h>
-#include <zenhttp/httpshared.h>
#include <zenhttp/zenhttp.h>
#include <zenutil/cache/cache.h>
#include <zenutil/cache/cacherequests.h>
#include <zenutil/logging/testformatter.h>
+#include <zenutil/packageformat.h>
#include <zenutil/zenserverprocess.h>
#if ZEN_USE_MIMALLOC
diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp
index 36ea8c0f1..8b3f5a785 100644
--- a/src/zenserver/admin/admin.cpp
+++ b/src/zenserver/admin/admin.cpp
@@ -20,7 +20,7 @@
#include <zenstore/cidstore.h>
#include <zenstore/gc.h>
-#include <zenstore/structuredcachestore.h>
+#include <zenstore/cache/structuredcachestore.h>
#include "config.h"
#include "projectstore/projectstore.h"
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index f9cd7ae13..c62b5325e 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -16,12 +16,13 @@
#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zenhttp/httpserver.h>
-#include <zenhttp/httpshared.h>
#include <zenhttp/httpstats.h>
+#include <zenstore/cache/structuredcachestore.h>
#include <zenstore/gc.h>
-#include <zenstore/structuredcachestore.h>
#include <zenutil/cache/cache.h>
+#include <zenutil/cache/cacherequests.h>
#include <zenutil/cache/rpcrecording.h>
+#include <zenutil/packageformat.h>
#include "upstream/jupiter.h"
#include "upstream/upstreamcache.h"
@@ -56,30 +57,6 @@ ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams)
return !PolicyText.empty() ? zen::ParseCachePolicy(PolicyText) : CachePolicy::Default;
}
-CacheRecordPolicy
-LoadCacheRecordPolicy(CbObjectView Object, CachePolicy DefaultPolicy = CachePolicy::Default)
-{
- OptionalCacheRecordPolicy Policy = CacheRecordPolicy::Load(Object);
- return Policy ? std::move(Policy).Get() : CacheRecordPolicy(DefaultPolicy);
-}
-
-struct AttachmentCount
-{
- uint32_t New = 0;
- uint32_t Valid = 0;
- uint32_t Invalid = 0;
- uint32_t Total = 0;
-};
-
-struct PutRequestData
-{
- std::string Namespace;
- CacheKey Key;
- CbObjectView RecordObject;
- CacheRecordPolicy Policy;
- CacheRequestContext Context;
-};
-
namespace {
static constinit std::string_view HttpZCacheRPCPrefix = "$rpc"sv;
static constinit std::string_view HttpZCacheUtilStartRecording = "exec$/start-recording"sv;
@@ -87,14 +64,6 @@ namespace {
static constinit std::string_view HttpZCacheUtilReplayRecording = "exec$/replay-recording"sv;
static constinit std::string_view HttpZCacheDetailsPrefix = "details$"sv;
- struct HttpRequestData
- {
- std::optional<std::string> Namespace;
- std::optional<std::string> Bucket;
- std::optional<IoHash> HashKey;
- std::optional<IoHash> ValueContentId;
- };
-
constinit AsciiSet ValidNamespaceNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789-_.ABCDEFGHIJKLMNOPQRSTUVWXYZ"};
constinit AsciiSet ValidBucketNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"};
@@ -153,6 +122,14 @@ namespace {
return KeyHash;
}
+ struct HttpRequestData
+ {
+ std::optional<std::string> Namespace;
+ std::optional<std::string> Bucket;
+ std::optional<IoHash> HashKey;
+ std::optional<IoHash> ValueContentId;
+ };
+
bool HttpRequestParseRelativeUri(std::string_view Key, HttpRequestData& Data)
{
std::vector<std::string_view> Tokens;
@@ -264,55 +241,6 @@ namespace {
}
}
- std::optional<std::string> GetRpcRequestNamespace(const CbObjectView Params)
- {
- CbFieldView NamespaceField = Params["Namespace"sv];
- if (!NamespaceField)
- {
- return std::string(ZenCacheStore::DefaultNamespace);
- }
-
- if (NamespaceField.HasError())
- {
- return {};
- }
- if (!NamespaceField.IsString())
- {
- return {};
- }
- return GetValidNamespaceName(NamespaceField.AsString());
- }
-
- bool GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key)
- {
- CbFieldView BucketField = KeyView["Bucket"sv];
- if (BucketField.HasError())
- {
- return false;
- }
- if (!BucketField.IsString())
- {
- return false;
- }
- std::optional<std::string> Bucket = GetValidBucketName(BucketField.AsString());
- if (!Bucket.has_value())
- {
- return false;
- }
- CbFieldView HashField = KeyView["Hash"sv];
- if (HashField.HasError())
- {
- return false;
- }
- if (!HashField.IsHash())
- {
- return false;
- }
- IoHash Hash = HashField.AsHash();
- Key = CacheKey::Create(*Bucket, Hash);
- return true;
- }
-
} // namespace
//////////////////////////////////////////////////////////////////////////
@@ -330,6 +258,7 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach
, m_CidStore(InCidStore)
, m_UpstreamCache(UpstreamCache)
, m_DiskWriteBlocker(InDiskWriteBlocker)
+, m_RpcHandler(m_Log, m_CacheStats, UpstreamCache, InCacheStore, InCidStore, InDiskWriteBlocker)
{
m_StatsService.RegisterHandler("z$", *this);
m_StatusService.RegisterHandler("z$", *this);
@@ -915,14 +844,13 @@ HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request,
{
case HttpVerb::kHead:
case HttpVerb::kGet:
- {
- HandleGetCacheRecord(Request, Ref, PolicyFromUrl);
- }
+ HandleGetCacheRecord(Request, Ref, PolicyFromUrl);
break;
case HttpVerb::kPut:
HandlePutCacheRecord(Request, Ref, PolicyFromUrl);
break;
+
default:
break;
}
@@ -1644,74 +1572,6 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons
Request.WriteResponse(ResponseCode);
}
-HttpResponseCode
-HttpStructuredCacheService::HandleRpcRequest(const CacheRequestContext& Context,
- const ZenContentType ContentType,
- IoBuffer&& Body,
- uint32_t& OutAcceptMagic,
- RpcAcceptOptions& OutAcceptFlags,
- int& OutTargetProcessId,
- CbPackage& OutResultPackage)
-{
- ZEN_TRACE_CPU("Z$::HandleRpcRequest");
-
- m_CacheStats.RpcRequests.fetch_add(1);
-
- CbPackage Package;
- CbObjectView Object;
- CbObject ObjectBuffer;
- if (ContentType == ZenContentType::kCbObject)
- {
- ObjectBuffer = LoadCompactBinaryObject(std::move(Body));
- Object = ObjectBuffer;
- }
- else
- {
- Package = ParsePackageMessage(Body);
- Object = Package.GetObject();
- }
- OutAcceptMagic = Object["Accept"sv].AsUInt32();
- OutAcceptFlags = static_cast<RpcAcceptOptions>(Object["AcceptFlags"sv].AsUInt16(0u));
- OutTargetProcessId = Object["Pid"sv].AsInt32(0);
-
- const std::string_view Method = Object["Method"sv].AsString();
-
- if (Method == "PutCacheRecords"sv)
- {
- if (!AreDiskWritesAllowed())
- {
- return HttpResponseCode::InsufficientStorage;
- }
- OutResultPackage = HandleRpcPutCacheRecords(Context, Package);
- }
- else if (Method == "GetCacheRecords"sv)
- {
- OutResultPackage = HandleRpcGetCacheRecords(Context, Object);
- }
- else if (Method == "PutCacheValues"sv)
- {
- if (!AreDiskWritesAllowed())
- {
- return HttpResponseCode::InsufficientStorage;
- }
- OutResultPackage = HandleRpcPutCacheValues(Context, Package);
- }
- else if (Method == "GetCacheValues"sv)
- {
- OutResultPackage = HandleRpcGetCacheValues(Context, Object);
- }
- else if (Method == "GetCacheChunks"sv)
- {
- OutResultPackage = HandleRpcGetCacheChunks(Context, Object);
- }
- else
- {
- m_CacheStats.BadRequestCount++;
- return HttpResponseCode::BadRequest;
- }
- return HttpResponseCode::OK;
-}
-
void
HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Context,
cache::IRpcRequestReplayer& Replayer,
@@ -1735,13 +1595,13 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co
RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
int TargetPid = 0;
CbPackage RpcResult;
- if (IsHttpSuccessCode(HandleRpcRequest(Context,
- RequestInfo.ContentType,
- std::move(Body),
- AcceptMagic,
- AcceptFlags,
- TargetPid,
- RpcResult)))
+ if (m_RpcHandler.HandleRpcRequest(Context,
+ RequestInfo.ContentType,
+ std::move(Body),
+ AcceptMagic,
+ AcceptFlags,
+ TargetPid,
+ RpcResult) == CacheRpcHandler::RpcResponseCode::OK)
{
if (AcceptMagic == kCbPkgMagic)
{
@@ -1823,16 +1683,19 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
int TargetProcessId = 0;
CbPackage RpcResult;
- HttpResponseCode ResultCode = HandleRpcRequest(RequestContext,
- ContentType,
- std::move(Body),
- AcceptMagic,
- AcceptFlags,
- TargetProcessId,
- RpcResult);
- if (!IsHttpSuccessCode(ResultCode))
+ CacheRpcHandler::RpcResponseCode ResultCode = m_RpcHandler.HandleRpcRequest(RequestContext,
+ ContentType,
+ std::move(Body),
+ AcceptMagic,
+ AcceptFlags,
+ TargetProcessId,
+ RpcResult);
+
+ HttpResponseCode HttpResultCode = HttpResponseCode(int(ResultCode));
+
+ if (!IsHttpSuccessCode(HttpResultCode))
{
- return AsyncRequest.WriteResponse(ResultCode);
+ return AsyncRequest.WriteResponse(HttpResultCode);
}
if (AcceptMagic == kCbPkgMagic)
@@ -1900,1412 +1763,6 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
}
}
-CbPackage
-HttpStructuredCacheService::HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest)
-{
- ZEN_TRACE_CPU("Z$::RpcPutCacheRecords");
-
- CbObjectView BatchObject = BatchRequest.GetObject();
- ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv);
-
- CbObjectView Params = BatchObject["Params"sv].AsObjectView();
- CachePolicy DefaultPolicy;
-
- m_CacheStats.RpcRecordRequests.fetch_add(1);
-
- std::string_view PolicyText = Params["DefaultPolicy"].AsString();
- std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
- if (!Namespace)
- {
- return CbPackage{};
- }
- DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
-
- std::vector<bool> Results;
-
- CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- for (CbFieldView RequestField : RequestsArray)
- {
- m_CacheStats.RpcRecordBatchRequests.fetch_add(1);
- CbObjectView RequestObject = RequestField.AsObjectView();
- CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView();
- CbObjectView KeyView = RecordObject["Key"sv].AsObjectView();
-
- CacheKey Key;
- if (!GetRpcRequestCacheKey(KeyView, Key))
- {
- return CbPackage{};
- }
- CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
- PutRequestData PutRequest{.Namespace = *Namespace,
- .Key = std::move(Key),
- .RecordObject = RecordObject,
- .Policy = std::move(Policy),
- .Context = Context};
-
- PutResult Result = PutCacheRecord(PutRequest, &BatchRequest);
-
- if (Result == PutResult::Invalid)
- {
- return CbPackage{};
- }
- Results.push_back(Result == PutResult::Success);
- }
- if (Results.empty())
- {
- return CbPackage{};
- }
-
- CbObjectWriter ResponseObject;
- ResponseObject.BeginArray("Result"sv);
- for (bool Value : Results)
- {
- ResponseObject.AddBool(Value);
- }
- ResponseObject.EndArray();
-
- CbPackage RpcResponse;
- RpcResponse.SetObject(ResponseObject.Save());
- return RpcResponse;
-}
-
-HttpStructuredCacheService::PutResult
-HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPackage* Package)
-{
- CbObjectView Record = Request.RecordObject;
- uint64_t RecordObjectSize = Record.GetSize();
- uint64_t TransferredSize = RecordObjectSize;
-
- AttachmentCount Count;
- size_t NumAttachments = Package->GetAttachments().size();
- std::vector<IoHash> ValidAttachments;
- std::vector<IoHash> ReferencedAttachments;
- std::vector<const CbAttachment*> AttachmentsToStoreLocally;
- ValidAttachments.reserve(NumAttachments);
- AttachmentsToStoreLocally.reserve(NumAttachments);
-
- const bool HasUpstream = m_UpstreamCache.IsActive();
-
- Stopwatch Timer;
-
- Request.RecordObject.IterateAttachments(
- [this, &Request, Package, &AttachmentsToStoreLocally, &ValidAttachments, &ReferencedAttachments, &Count, &TransferredSize](
- CbFieldView HashView) {
- const IoHash ValueHash = HashView.AsHash();
- ReferencedAttachments.push_back(ValueHash);
- if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr)
- {
- if (Attachment->IsCompressedBinary())
- {
- AttachmentsToStoreLocally.emplace_back(Attachment);
- ValidAttachments.emplace_back(ValueHash);
- Count.Valid++;
- }
- else
- {
- ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
- Request.Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- ToString(HttpContentType::kCbPackage),
- ValueHash);
- Count.Invalid++;
- }
- }
- else if (m_CidStore.ContainsChunk(ValueHash))
- {
- ValidAttachments.emplace_back(ValueHash);
- Count.Valid++;
- }
- Count.Total++;
- });
-
- if (Count.Invalid > 0)
- {
- return PutResult::Invalid;
- }
-
- ZenCacheValue CacheValue;
- CacheValue.Value = IoBuffer(Record.GetSize());
- Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
- CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments);
- m_CacheStats.WriteCount++;
-
- for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
- {
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
- if (InsertResult.New)
- {
- Count.New++;
- }
- TransferredSize += Chunk.GetCompressedSize();
- }
-
- ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total) in {}",
- Request.Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- NiceBytes(TransferredSize),
- Count.New,
- Count.Valid,
- Count.Total,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
-
- const bool IsPartialRecord = Count.Valid != Count.Total;
-
- if (HasUpstream && EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord)
- {
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
- .Namespace = Request.Namespace,
- .Key = Request.Key,
- .ValueContentIds = std::move(ValidAttachments)});
- }
- return PutResult::Success;
-}
-
-CbPackage
-HttpStructuredCacheService::HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView RpcRequest)
-{
- ZEN_TRACE_CPU("Z$::RpcGetCacheRecords");
-
- ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv);
-
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
- m_CacheStats.RpcRecordRequests.fetch_add(1);
-
- struct ValueRequestData
- {
- Oid ValueId;
- IoHash ContentId;
- CompressedBuffer Payload;
- CachePolicy DownstreamPolicy;
- bool Exists = false;
- bool ReadFromUpstream = false;
- };
- struct RecordRequestData
- {
- CacheKeyRequest Upstream;
- CbObjectView RecordObject;
- IoBuffer RecordCacheValue;
- CacheRecordPolicy DownstreamPolicy;
- std::vector<ValueRequestData> Values;
- bool Complete = false;
- const UpstreamEndpointInfo* Source = nullptr;
- uint64_t ElapsedTimeUs;
- };
-
- std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
- CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
- std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
- if (!Namespace)
- {
- return CbPackage{};
- }
-
- const bool HasUpstream = m_UpstreamCache.IsActive();
-
- std::vector<RecordRequestData> Requests;
- std::vector<size_t> UpstreamIndexes;
-
- auto ParseValues = [](RecordRequestData& Request) {
- CbArrayView ValuesArray = Request.RecordObject["Values"sv].AsArrayView();
- Request.Values.reserve(ValuesArray.Num());
- for (CbFieldView ValueField : ValuesArray)
- {
- CbObjectView ValueObject = ValueField.AsObjectView();
- Oid ValueId = ValueObject["Id"sv].AsObjectId();
- CbFieldView RawHashField = ValueObject["RawHash"sv];
- IoHash RawHash = RawHashField.AsBinaryAttachment();
- if (ValueId && !RawHashField.HasError())
- {
- Request.Values.push_back({ValueId, RawHash});
- Request.Values.back().DownstreamPolicy = Request.DownstreamPolicy.GetValuePolicy(ValueId);
- }
- }
- };
-
- CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- for (CbFieldView RequestField : RequestsArray)
- {
- ZEN_TRACE_CPU("Z$::RpcGetCacheRecords::Request");
-
- m_CacheStats.RpcRecordBatchRequests.fetch_add(1);
-
- Stopwatch Timer;
- RecordRequestData& Request = Requests.emplace_back();
- CbObjectView RequestObject = RequestField.AsObjectView();
- CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
-
- CacheKey& Key = Request.Upstream.Key;
- if (!GetRpcRequestCacheKey(KeyObject, Key))
- {
- return CbPackage{};
- }
-
- Request.DownstreamPolicy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
- const CacheRecordPolicy& Policy = Request.DownstreamPolicy;
-
- ZenCacheValue CacheValue;
- bool NeedUpstreamAttachment = false;
- bool FoundLocalInvalid = false;
- ZenCacheValue RecordCacheValue;
-
- if (EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryLocal) &&
- m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, RecordCacheValue))
- {
- Request.RecordCacheValue = std::move(RecordCacheValue.Value);
- if (Request.RecordCacheValue.GetContentType() != ZenContentType::kCbObject)
- {
- FoundLocalInvalid = true;
- }
- else
- {
- Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData());
- ParseValues(Request);
-
- Request.Complete = true;
- for (ValueRequestData& Value : Request.Values)
- {
- CachePolicy ValuePolicy = Value.DownstreamPolicy;
- if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal))
- {
- // A value that is requested without the Query flag (such as None/Disable) counts as existing, because we
- // didn't ask for it and thus the record is complete in its absence.
- if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
- {
- Value.Exists = true;
- }
- else
- {
- NeedUpstreamAttachment = true;
- Value.ReadFromUpstream = true;
- Request.Complete = false;
- }
- }
- else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
- {
- if (m_CidStore.ContainsChunk(Value.ContentId))
- {
- Value.Exists = true;
- }
- else
- {
- if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
- {
- NeedUpstreamAttachment = true;
- Value.ReadFromUpstream = true;
- }
- Request.Complete = false;
- }
- }
- else
- {
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId))
- {
- if (Chunk.GetSize() > 0)
- {
- Value.Payload = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
- Value.Exists = true;
- continue;
- }
- else
- {
- ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId);
- }
- }
-
- if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
- {
- NeedUpstreamAttachment = true;
- Value.ReadFromUpstream = true;
- }
- Request.Complete = false;
- }
- }
- }
- }
- if (!Request.Complete)
- {
- bool NeedUpstreamRecord = HasUpstream && !Request.RecordObject && !FoundLocalInvalid &&
- EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryRemote);
- if (NeedUpstreamRecord || NeedUpstreamAttachment)
- {
- UpstreamIndexes.push_back(Requests.size() - 1);
- }
- }
- Request.ElapsedTimeUs = Timer.GetElapsedTimeUs();
- }
- if (Requests.empty())
- {
- return CbPackage{};
- }
-
- if (!UpstreamIndexes.empty())
- {
- std::vector<CacheKeyRequest*> UpstreamRequests;
- UpstreamRequests.reserve(UpstreamIndexes.size());
- for (size_t Index : UpstreamIndexes)
- {
- RecordRequestData& Request = Requests[Index];
- UpstreamRequests.push_back(&Request.Upstream);
-
- if (Request.Values.size())
- {
- // We will be returning the local object and know all the value Ids that exist in it
- // Convert all their Downstream Values to upstream values, and add SkipData to any ones that we already have.
- CachePolicy UpstreamBasePolicy = ConvertToUpstream(Request.DownstreamPolicy.GetBasePolicy()) | CachePolicy::SkipMeta;
- CacheRecordPolicyBuilder Builder(UpstreamBasePolicy);
- for (ValueRequestData& Value : Request.Values)
- {
- CachePolicy UpstreamPolicy = ConvertToUpstream(Value.DownstreamPolicy);
- UpstreamPolicy |= !Value.ReadFromUpstream ? CachePolicy::SkipData : CachePolicy::None;
- Builder.AddValuePolicy(Value.ValueId, UpstreamPolicy);
- }
- Request.Upstream.Policy = Builder.Build();
- }
- else
- {
- // We don't know which Values exist in the Record; ask the upstrem for all values that the client wants,
- // and convert the CacheRecordPolicy to an upstream policy
- Request.Upstream.Policy = Request.DownstreamPolicy.ConvertToUpstream();
- }
- }
-
- const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues, Context](CacheRecordGetCompleteParams&& Params) {
- if (!Params.Record)
- {
- return;
- }
-
- RecordRequestData& Request =
- *reinterpret_cast<RecordRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(RecordRequestData, Upstream));
- Request.ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
- const CacheKey& Key = Request.Upstream.Key;
- Stopwatch Timer;
- auto TimeGuard = MakeGuard([&Timer, &Request]() { Request.ElapsedTimeUs += Timer.GetElapsedTimeUs(); });
- if (!Request.RecordObject)
- {
- CbObject ObjectBuffer = CbObject::Clone(Params.Record);
- Request.RecordCacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
- Request.RecordCacheValue.SetContentType(ZenContentType::kCbObject);
- Request.RecordObject = ObjectBuffer;
- bool StoreLocal =
- EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed();
- if (StoreLocal)
- {
- std::vector<IoHash> ReferencedAttachments;
- ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
- const IoHash ValueHash = HashView.AsHash();
- ReferencedAttachments.push_back(ValueHash);
- });
- m_CacheStore
- .Put(Context, *Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}, ReferencedAttachments);
- m_CacheStats.WriteCount++;
- }
- ParseValues(Request);
- Request.Source = Params.Source;
- }
-
- Request.Complete = true;
- for (ValueRequestData& Value : Request.Values)
- {
- if (Value.Exists)
- {
- continue;
- }
- CachePolicy ValuePolicy = Value.DownstreamPolicy;
- if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
- {
- Request.Complete = false;
- continue;
- }
- if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData) || EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal))
- {
- bool StoreLocal = EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
- if (const CbAttachment* Attachment = Params.Package.FindAttachment(Value.ContentId))
- {
- if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
- {
- Request.Source = Params.Source;
- Value.Exists = true;
- if (StoreLocal)
- {
- m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
- }
- if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
- {
- Value.Payload = Compressed;
- }
- }
- else
- {
- ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}/{}'",
- Value.ContentId,
- *Namespace,
- Key.Bucket,
- Key.Hash);
- }
- }
- if (!Value.Exists && !EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
- {
- Request.Complete = false;
- }
- // Request.Complete does not need to be set to false for upstream SkipData attachments.
- // In the PartialRecord==false case, the upstream will have failed the entire record if any SkipData attachment
- // didn't exist and we will not get here. In the PartialRecord==true case, we do not need to inform the client of
- // any missing SkipData attachments.
- }
- Request.ElapsedTimeUs += Timer.GetElapsedTimeUs();
- }
- };
-
- m_UpstreamCache.GetCacheRecords(*Namespace, UpstreamRequests, std::move(OnCacheRecordGetComplete));
- }
-
- {
- ZEN_TRACE_CPU("Z$::RpcGetCacheRecords::Response");
- CbPackage ResponsePackage;
- CbObjectWriter ResponseObject;
-
- ResponseObject.BeginArray("Result"sv);
- for (RecordRequestData& Request : Requests)
- {
- const CacheKey& Key = Request.Upstream.Key;
- if (Request.Complete ||
- (Request.RecordObject && EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::PartialRecord)))
- {
- ResponseObject << Request.RecordObject;
- for (ValueRequestData& Value : Request.Values)
- {
- if (!EnumHasAllFlags(Value.DownstreamPolicy, CachePolicy::SkipData) && Value.Payload)
- {
- ResponsePackage.AddAttachment(CbAttachment(Value.Payload, Value.ContentId));
- }
- }
-
- ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {}{} ({}) in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- NiceBytes(Request.RecordCacheValue.Size()),
- Request.Complete ? ""sv : " (PARTIAL)"sv,
- Request.Source ? Request.Source->Url : "LOCAL"sv,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
- m_CacheStats.HitCount++;
- m_CacheStats.UpstreamHitCount += Request.Source ? 1 : 0;
- }
- else
- {
- ResponseObject.AddNull();
-
- if (!EnumHasAnyFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::Query))
- {
- // If they requested no query, do not record this as a miss
- ZEN_DEBUG("GETCACHERECORD DISABLEDQUERY - '{}/{}/{}' in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
- }
- else
- {
- ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}'{} ({}) in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- Request.RecordObject ? ""sv : " (PARTIAL)"sv,
- Request.Source ? Request.Source->Url : "LOCAL"sv,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
- m_CacheStats.MissCount++;
- }
- }
- }
- ResponseObject.EndArray();
- ResponsePackage.SetObject(ResponseObject.Save());
- return ResponsePackage;
- }
-}
-
-CbPackage
-HttpStructuredCacheService::HandleRpcPutCacheValues(const CacheRequestContext& Context, const CbPackage& BatchRequest)
-{
- ZEN_TRACE_CPU("Z$::RpcPutCacheValues");
- CbObjectView BatchObject = BatchRequest.GetObject();
- CbObjectView Params = BatchObject["Params"sv].AsObjectView();
-
- m_CacheStats.RpcValueRequests.fetch_add(1);
-
- std::string_view PolicyText = Params["DefaultPolicy"].AsString();
- CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
- std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
- if (!Namespace)
- {
- return CbPackage{};
- }
- const bool HasUpstream = m_UpstreamCache.IsActive();
- CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
-
- std::vector<bool> Results;
- for (CbFieldView RequestField : RequestsArray)
- {
- ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Request");
-
- m_CacheStats.RpcValueBatchRequests.fetch_add(1);
-
- Stopwatch Timer;
-
- CbObjectView RequestObject = RequestField.AsObjectView();
- CbObjectView KeyView = RequestObject["Key"sv].AsObjectView();
-
- CacheKey Key;
- if (!GetRpcRequestCacheKey(KeyView, Key))
- {
- return CbPackage{};
- }
-
- PolicyText = RequestObject["Policy"sv].AsString();
- CachePolicy Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
- IoHash RawHash = RequestObject["RawHash"sv].AsBinaryAttachment();
- uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64();
- bool Succeeded = false;
- uint64_t TransferredSize = 0;
-
- if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash))
- {
- if (Attachment->IsCompressedBinary())
- {
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
- {
- // TODO: Implement upstream puts of CacheValues with StoreLocal == false.
- // Currently ProcessCacheRecord requires that the value exist in the local cache to put it upstream.
- Policy |= CachePolicy::StoreLocal;
- }
-
- if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
- {
- IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
- Value.SetContentType(ZenContentType::kCompressedBinary);
- if (RawSize == 0)
- {
- RawSize = Chunk.DecodeRawSize();
- }
- m_CacheStore
- .Put(Context, *Namespace, Key.Bucket, Key.Hash, {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, {});
- m_CacheStats.WriteCount++;
- TransferredSize = Chunk.GetCompressedSize();
- }
- Succeeded = true;
- }
- else
- {
- ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash);
- return CbPackage{};
- }
- }
- else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
- {
- ZenCacheValue ExistingValue;
- if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) &&
- IsCompressedBinary(ExistingValue.Value.GetContentType()))
- {
- Succeeded = true;
- }
- }
- // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put.
- // If it doesn't exist locally they should do the heavy put rather than having us fetch it from upstream.
-
- if (HasUpstream && Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
- {
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = Key});
- }
- Results.push_back(Succeeded);
- ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}' in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- NiceBytes(TransferredSize),
- Succeeded ? "Added"sv : "Invalid",
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- }
- if (Results.empty())
- {
- return CbPackage{};
- }
-
- {
- ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Response");
- CbObjectWriter ResponseObject;
- ResponseObject.BeginArray("Result"sv);
- for (bool Value : Results)
- {
- ResponseObject.AddBool(Value);
- }
- ResponseObject.EndArray();
-
- CbPackage RpcResponse;
- RpcResponse.SetObject(ResponseObject.Save());
-
- return RpcResponse;
- }
-}
-
-CbPackage
-HttpStructuredCacheService::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView RpcRequest)
-{
- ZEN_TRACE_CPU("Z$::RpcGetCacheValues");
- ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv);
-
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
- m_CacheStats.RpcValueRequests.fetch_add(1);
-
- std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
- CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
- std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
- if (!Namespace)
- {
- return CbPackage{};
- }
-
- struct RequestData
- {
- CacheKey Key;
- CachePolicy Policy;
- IoHash RawHash = IoHash::Zero;
- uint64_t RawSize = 0;
- CompressedBuffer Result;
- };
- std::vector<RequestData> Requests;
-
- std::vector<size_t> RemoteRequestIndexes;
-
- const bool HasUpstream = m_UpstreamCache.IsActive();
-
- CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- for (CbFieldView RequestField : RequestsArray)
- {
- ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Request");
-
- m_CacheStats.RpcValueBatchRequests.fetch_add(1);
-
- Stopwatch Timer;
-
- RequestData& Request = Requests.emplace_back();
- CbObjectView RequestObject = RequestField.AsObjectView();
- CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
-
- if (!GetRpcRequestCacheKey(KeyObject, Request.Key))
- {
- return CbPackage{};
- }
-
- PolicyText = RequestObject["Policy"sv].AsString();
- Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
-
- CacheKey& Key = Request.Key;
- CachePolicy Policy = Request.Policy;
-
- ZenCacheValue CacheValue;
- if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
- {
- if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, CacheValue) &&
- IsCompressedBinary(CacheValue.Value.GetContentType()))
- {
- Request.RawHash = CacheValue.RawHash;
- Request.RawSize = CacheValue.RawSize;
- Request.Result = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value));
- }
- }
- if (Request.Result)
- {
- ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- NiceBytes(Request.Result.GetCompressed().GetSize()),
- "LOCAL"sv,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- m_CacheStats.HitCount++;
- }
- else if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::QueryRemote))
- {
- RemoteRequestIndexes.push_back(Requests.size() - 1);
- }
- else if (!EnumHasAnyFlags(Policy, CachePolicy::Query))
- {
- // If they requested no query, do not record this as a miss
- ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash);
- }
- else
- {
- ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- "LOCAL"sv,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- m_CacheStats.MissCount++;
- }
- }
-
- if (!RemoteRequestIndexes.empty())
- {
- std::vector<CacheValueRequest> RequestedRecordsData;
- std::vector<CacheValueRequest*> CacheValueRequests;
- RequestedRecordsData.reserve(RemoteRequestIndexes.size());
- CacheValueRequests.reserve(RemoteRequestIndexes.size());
- for (size_t Index : RemoteRequestIndexes)
- {
- RequestData& Request = Requests[Index];
- RequestedRecordsData.push_back({.Key = {Request.Key.Bucket, Request.Key.Hash}, .Policy = ConvertToUpstream(Request.Policy)});
- CacheValueRequests.push_back(&RequestedRecordsData.back());
- }
- Stopwatch Timer;
- m_UpstreamCache.GetCacheValues(
- *Namespace,
- CacheValueRequests,
- [this, Namespace, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer, Context](
- CacheValueGetCompleteParams&& Params) {
- CacheValueRequest& ChunkRequest = Params.Request;
- if (Params.RawHash != IoHash::Zero)
- {
- size_t RequestOffset = std::distance(RequestedRecordsData.data(), &ChunkRequest);
- size_t RequestIndex = RemoteRequestIndexes[RequestOffset];
- RequestData& Request = Requests[RequestIndex];
- Request.RawHash = Params.RawHash;
- Request.RawSize = Params.RawSize;
- const bool HasData = IsCompressedBinary(Params.Value.GetContentType());
- const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData);
- const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
- const bool IsHit = SkipData || HasData;
- if (IsHit)
- {
- if (HasData && !SkipData)
- {
- Request.Result = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Params.Value));
- }
-
- if (HasData && StoreData)
- {
- m_CacheStore.Put(Context,
- *Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash},
- {});
- m_CacheStats.WriteCount++;
- }
-
- ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
- *Namespace,
- ChunkRequest.Key.Bucket,
- ChunkRequest.Key.Hash,
- NiceBytes(Request.Result.GetCompressed().GetSize()),
- Params.Source ? Params.Source->Url : "UPSTREAM",
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- m_CacheStats.HitCount++;
- m_CacheStats.UpstreamHitCount++;
- return;
- }
- }
- ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}",
- *Namespace,
- ChunkRequest.Key.Bucket,
- ChunkRequest.Key.Hash,
- Params.Source ? Params.Source->Url : "UPSTREAM",
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- m_CacheStats.MissCount++;
- });
- }
-
- if (Requests.empty())
- {
- return CbPackage{};
- }
-
- {
- ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Response");
- CbPackage RpcResponse;
- CbObjectWriter ResponseObject;
- ResponseObject.BeginArray("Result"sv);
- for (const RequestData& Request : Requests)
- {
- ResponseObject.BeginObject();
- {
- const CompressedBuffer& Result = Request.Result;
- if (Result)
- {
- ResponseObject.AddHash("RawHash"sv, Request.RawHash);
- if (!EnumHasAllFlags(Request.Policy, CachePolicy::SkipData))
- {
- RpcResponse.AddAttachment(CbAttachment(Result, Request.RawHash));
- }
- else
- {
- ResponseObject.AddInteger("RawSize"sv, Request.RawSize);
- }
- }
- else if (Request.RawHash != IoHash::Zero)
- {
- ResponseObject.AddHash("RawHash"sv, Request.RawHash);
- ResponseObject.AddInteger("RawSize"sv, Request.RawSize);
- }
- }
- ResponseObject.EndObject();
- }
- ResponseObject.EndArray();
-
- RpcResponse.SetObject(ResponseObject.Save());
- return RpcResponse;
- }
-}
-
-namespace cache::detail {
-
- struct RecordValue
- {
- Oid ValueId;
- IoHash ContentId;
- uint64_t RawSize;
- };
- struct RecordBody
- {
- IoBuffer CacheValue;
- std::vector<RecordValue> Values;
- const UpstreamEndpointInfo* Source = nullptr;
- CachePolicy DownstreamPolicy;
- bool Exists = false;
- bool HasRequest = false;
- bool ValuesRead = false;
- };
- struct ChunkRequest
- {
- CacheChunkRequest* Key = nullptr;
- RecordBody* Record = nullptr;
- CompressedBuffer Value;
- const UpstreamEndpointInfo* Source = nullptr;
- uint64_t RawSize = 0;
- uint64_t RequestedSize = 0;
- uint64_t RequestedOffset = 0;
- CachePolicy DownstreamPolicy;
- bool Exists = false;
- bool RawSizeKnown = false;
- bool IsRecordRequest = false;
- uint64_t ElapsedTimeUs = 0;
- };
-
-} // namespace cache::detail
-
-CbPackage
-HttpStructuredCacheService::HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbObjectView RpcRequest)
-{
- ZEN_TRACE_CPU("Z$::RpcGetCacheChunks");
- using namespace cache::detail;
-
- std::string Namespace;
- std::vector<CacheKeyRequest> RecordKeys; // Data about a Record necessary to identify it to the upstream
- std::vector<RecordBody> Records; // Scratch-space data about a Record when fulfilling RecordRequests
- std::vector<CacheChunkRequest> RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream
- std::vector<ChunkRequest> Requests; // Intermediate and result data about a ChunkRequest
- std::vector<ChunkRequest*> RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key
- std::vector<ChunkRequest*> ValueRequests; // The ChunkRequests that are requesting a Value Key
- std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream
-
- // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests
- if (!ParseGetCacheChunksRequest(Namespace, RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest))
- {
- return CbPackage{};
- }
-
- // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we
- // have it locally, and otherwise append a request for the payload to UpstreamChunks
- GetLocalCacheRecords(Context, Namespace, RecordKeys, Records, RecordRequests, UpstreamChunks);
-
- // For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks
- GetLocalCacheValues(Context, Namespace, ValueRequests, UpstreamChunks);
-
- // Call GetCacheChunks on the upstream for any payloads we do not have locally
- GetUpstreamCacheChunks(Context, Namespace, UpstreamChunks, RequestKeys, Requests);
-
- // Send the payload and descriptive data about each chunk to the client
- return WriteGetCacheChunksResponse(Context, Namespace, Requests);
-}
-
-bool
-HttpStructuredCacheService::ParseGetCacheChunksRequest(std::string& Namespace,
- std::vector<CacheKeyRequest>& RecordKeys,
- std::vector<cache::detail::RecordBody>& Records,
- std::vector<CacheChunkRequest>& RequestKeys,
- std::vector<cache::detail::ChunkRequest>& Requests,
- std::vector<cache::detail::ChunkRequest*>& RecordRequests,
- std::vector<cache::detail::ChunkRequest*>& ValueRequests,
- CbObjectView RpcRequest)
-{
- ZEN_TRACE_CPU("Z$::ParseGetCacheChunksRequest");
-
- using namespace cache::detail;
-
- ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv);
-
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
- m_CacheStats.RpcChunkRequests.fetch_add(1);
-
- std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString();
- CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default;
-
- std::optional<std::string> NamespaceText = GetRpcRequestNamespace(Params);
- if (!NamespaceText)
- {
- ZEN_WARN("GetCacheChunks: Invalid namespace in ChunkRequest.");
- return false;
- }
- Namespace = *NamespaceText;
-
- CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView();
- size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num());
- // Note that these reservations allow us to take pointers to the elements while populating them. If the reservation is removed,
- // we will need to change the pointers to indexes to handle reallocations.
- RecordKeys.reserve(NumRequests);
- Records.reserve(NumRequests);
- RequestKeys.reserve(NumRequests);
- Requests.reserve(NumRequests);
- RecordRequests.reserve(NumRequests);
- ValueRequests.reserve(NumRequests);
-
- CacheKeyRequest* PreviousRecordKey = nullptr;
- RecordBody* PreviousRecord = nullptr;
-
- for (CbFieldView RequestView : ChunkRequestsArray)
- {
- ZEN_TRACE_CPU("Z$::ParseGetCacheChunksRequest::Request");
-
- m_CacheStats.RpcChunkBatchRequests.fetch_add(1);
-
- CbObjectView RequestObject = RequestView.AsObjectView();
- CacheChunkRequest& RequestKey = RequestKeys.emplace_back();
- ChunkRequest& Request = Requests.emplace_back();
- CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
-
- Request.Key = &RequestKey;
- if (!GetRpcRequestCacheKey(KeyObject, Request.Key->Key))
- {
- ZEN_WARN("GetCacheChunks: Invalid key in ChunkRequest.");
- return false;
- }
-
- RequestKey.ChunkId = RequestObject["ChunkId"sv].AsHash();
- RequestKey.ValueId = RequestObject["ValueId"sv].AsObjectId();
- RequestKey.RawOffset = RequestObject["RawOffset"sv].AsUInt64();
- RequestKey.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX);
- Request.RequestedSize = RequestKey.RawSize;
- Request.RequestedOffset = RequestKey.RawOffset;
- std::string_view PolicyText = RequestObject["Policy"sv].AsString();
- Request.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
- Request.IsRecordRequest = (bool)RequestKey.ValueId;
-
- if (!Request.IsRecordRequest)
- {
- ValueRequests.push_back(&Request);
- }
- else
- {
- RecordRequests.push_back(&Request);
- CacheKeyRequest* RecordKey = nullptr;
- RecordBody* Record = nullptr;
-
- if (!PreviousRecordKey || PreviousRecordKey->Key < RequestKey.Key)
- {
- RecordKey = &RecordKeys.emplace_back();
- PreviousRecordKey = RecordKey;
- Record = &Records.emplace_back();
- PreviousRecord = Record;
- RecordKey->Key = RequestKey.Key;
- }
- else if (RequestKey.Key == PreviousRecordKey->Key)
- {
- RecordKey = PreviousRecordKey;
- Record = PreviousRecord;
- }
- else
- {
- ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.",
- RequestKey.Key.Bucket,
- RequestKey.Key.Hash,
- PreviousRecordKey->Key.Bucket,
- PreviousRecordKey->Key.Hash);
- return false;
- }
- Request.Record = Record;
- if (RequestKey.ChunkId == RequestKey.ChunkId.Zero)
- {
- Record->DownstreamPolicy =
- Record->HasRequest ? Union(Record->DownstreamPolicy, Request.DownstreamPolicy) : Request.DownstreamPolicy;
- Record->HasRequest = true;
- }
- }
- }
- if (Requests.empty())
- {
- return false;
- }
- return true;
-}
-
-void
-HttpStructuredCacheService::GetLocalCacheRecords(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::vector<CacheKeyRequest>& RecordKeys,
- std::vector<cache::detail::RecordBody>& Records,
- std::vector<cache::detail::ChunkRequest*>& RecordRequests,
- std::vector<CacheChunkRequest*>& OutUpstreamChunks)
-{
- ZEN_TRACE_CPU("Z$::GetLocalCacheRecords");
-
- using namespace cache::detail;
- const bool HasUpstream = m_UpstreamCache.IsActive();
-
- std::vector<CacheKeyRequest*> UpstreamRecordRequests;
- for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex)
- {
- ZEN_TRACE_CPU("Z$::GetLocalCacheRecords::Record");
-
- Stopwatch Timer;
- CacheKeyRequest& RecordKey = RecordKeys[RecordIndex];
- RecordBody& Record = Records[RecordIndex];
- if (Record.HasRequest)
- {
- Record.DownstreamPolicy |= CachePolicy::SkipData | CachePolicy::SkipMeta;
-
- if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal))
- {
- ZenCacheValue CacheValue;
- if (m_CacheStore.Get(Context, Namespace, RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue))
- {
- Record.Exists = true;
- Record.CacheValue = std::move(CacheValue.Value);
- }
- }
- if (HasUpstream && !Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryRemote))
- {
- RecordKey.Policy = CacheRecordPolicy(ConvertToUpstream(Record.DownstreamPolicy));
- UpstreamRecordRequests.push_back(&RecordKey);
- }
- RecordRequests[RecordIndex]->ElapsedTimeUs += Timer.GetElapsedTimeUs();
- }
- }
-
- if (!UpstreamRecordRequests.empty())
- {
- const auto OnCacheRecordGetComplete =
- [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](CacheRecordGetCompleteParams&& Params) {
- if (!Params.Record)
- {
- return;
- }
- CacheKeyRequest& RecordKey = Params.Request;
- size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey);
- RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
- RecordBody& Record = Records[RecordIndex];
-
- const CacheKey& Key = RecordKey.Key;
- Record.Exists = true;
- CbObject ObjectBuffer = CbObject::Clone(Params.Record);
- Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
- Record.CacheValue.SetContentType(ZenContentType::kCbObject);
- Record.Source = Params.Source;
-
- bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
- if (StoreLocal)
- {
- std::vector<IoHash> ReferencedAttachments;
- ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
- const IoHash ValueHash = HashView.AsHash();
- ReferencedAttachments.push_back(ValueHash);
- });
- m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}, ReferencedAttachments);
- m_CacheStats.WriteCount++;
- }
- };
- m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete));
- }
-
- for (ChunkRequest* Request : RecordRequests)
- {
- ZEN_TRACE_CPU("Z$::GetLocalCacheRecords::Chunk");
-
- Stopwatch Timer;
- if (Request->Key->ChunkId == IoHash::Zero)
- {
- // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId)
- // is missing, parse the cache record and try to find the raw hash from the ValueId.
- RecordBody& Record = *Request->Record;
- if (!Record.ValuesRead)
- {
- Record.ValuesRead = true;
- if (Record.CacheValue && Record.CacheValue.GetContentType() == ZenContentType::kCbObject)
- {
- CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData());
- CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView();
- Record.Values.reserve(ValuesArray.Num());
- for (CbFieldView ValueField : ValuesArray)
- {
- CbObjectView ValueObject = ValueField.AsObjectView();
- Oid ValueId = ValueObject["Id"sv].AsObjectId();
- CbFieldView RawHashField = ValueObject["RawHash"sv];
- IoHash RawHash = RawHashField.AsBinaryAttachment();
- if (ValueId && !RawHashField.HasError())
- {
- Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()});
- }
- }
- }
- }
-
- for (const RecordValue& Value : Record.Values)
- {
- if (Value.ValueId == Request->Key->ValueId)
- {
- Request->Key->ChunkId = Value.ContentId;
- Request->RawSize = Value.RawSize;
- Request->RawSizeKnown = true;
- break;
- }
- }
- }
-
- // Now load the ContentId from the local ContentIdStore or from the upstream
- if (Request->Key->ChunkId != IoHash::Zero)
- {
- if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
- {
- if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown)
- {
- if (m_CidStore.ContainsChunk(Request->Key->ChunkId))
- {
- Request->Exists = true;
- }
- }
- else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId))
- {
- if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
- {
- Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(Payload));
- if (Request->Value)
- {
- Request->Exists = true;
- Request->RawSizeKnown = false;
- }
- }
- else
- {
- IoHash _;
- if (CompressedBuffer::ValidateCompressedHeader(Payload, _, Request->RawSize))
- {
- Request->Exists = true;
- Request->RawSizeKnown = true;
- }
- }
- }
- }
- if (HasUpstream && !Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
- {
- Request->Key->Policy = ConvertToUpstream(Request->DownstreamPolicy);
- OutUpstreamChunks.push_back(Request->Key);
- }
- }
- Request->ElapsedTimeUs += Timer.GetElapsedTimeUs();
- }
-}
-
-void
-HttpStructuredCacheService::GetLocalCacheValues(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::vector<cache::detail::ChunkRequest*>& ValueRequests,
- std::vector<CacheChunkRequest*>& OutUpstreamChunks)
-{
- ZEN_TRACE_CPU("Z$::GetLocalCacheValues");
-
- using namespace cache::detail;
- const bool HasUpstream = m_UpstreamCache.IsActive();
-
- for (ChunkRequest* Request : ValueRequests)
- {
- ZEN_TRACE_CPU("Z$::GetLocalCacheValues::Value");
-
- Stopwatch Timer;
- if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
- {
- ZenCacheValue CacheValue;
- if (m_CacheStore.Get(Context, Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue))
- {
- if (IsCompressedBinary(CacheValue.Value.GetContentType()))
- {
- Request->Key->ChunkId = CacheValue.RawHash;
- Request->Exists = true;
- Request->RawSize = CacheValue.RawSize;
- Request->RawSizeKnown = true;
- if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
- {
- Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value));
- }
- }
- }
- }
- if (HasUpstream && !Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
- {
- if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::StoreLocal))
- {
- // Convert the Offset,Size request into a request for the entire value; we will need it all to be able to store it locally
- Request->Key->RawOffset = 0;
- Request->Key->RawSize = UINT64_MAX;
- }
- OutUpstreamChunks.push_back(Request->Key);
- }
- Request->ElapsedTimeUs += Timer.GetElapsedTimeUs();
- }
-}
-
-void
-HttpStructuredCacheService::GetUpstreamCacheChunks(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::vector<CacheChunkRequest*>& UpstreamChunks,
- std::vector<CacheChunkRequest>& RequestKeys,
- std::vector<cache::detail::ChunkRequest>& Requests)
-{
- if (UpstreamChunks.empty())
- {
- return;
- }
- ZEN_TRACE_CPU("Z$::GetUpstreamCacheChunks");
-
- using namespace cache::detail;
-
- const auto OnCacheChunksGetComplete = [this, Namespace, &RequestKeys, &Requests, Context](CacheChunkGetCompleteParams&& Params) {
- if (Params.RawHash == Params.RawHash.Zero)
- {
- return;
- }
-
- CacheChunkRequest& Key = Params.Request;
- size_t RequestIndex = std::distance(RequestKeys.data(), &Key);
- ChunkRequest& Request = Requests[RequestIndex];
- Request.ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
- if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) ||
- !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
- {
- CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Params.Value));
- if (!Compressed)
- {
- return;
- }
-
- bool StoreLocal = EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
- if (StoreLocal)
- {
- if (Request.IsRecordRequest)
- {
- m_CidStore.AddChunk(Params.Value, Params.RawHash);
- }
- else
- {
- m_CacheStore.Put(Context,
- Namespace,
- Key.Key.Bucket,
- Key.Key.Hash,
- {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash},
- {});
- m_CacheStats.WriteCount++;
- }
- }
- if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
- {
- Request.Value = std::move(Compressed);
- }
- }
- Key.ChunkId = Params.RawHash;
- Request.Exists = true;
- Request.RawSize = Params.RawSize;
- Request.RawSizeKnown = true;
- Request.Source = Params.Source;
-
- m_CacheStats.UpstreamHitCount++;
- };
-
- m_UpstreamCache.GetCacheChunks(Namespace, UpstreamChunks, std::move(OnCacheChunksGetComplete));
-}
-
-CbPackage
-HttpStructuredCacheService::WriteGetCacheChunksResponse([[maybe_unused]] const CacheRequestContext& Context,
- std::string_view Namespace,
- std::vector<cache::detail::ChunkRequest>& Requests)
-{
- ZEN_TRACE_CPU("Z$::WriteGetCacheChunksResponse");
-
- using namespace cache::detail;
-
- CbPackage RpcResponse;
- CbObjectWriter Writer;
-
- Writer.BeginArray("Result"sv);
- for (ChunkRequest& Request : Requests)
- {
- ZEN_TRACE_CPU("Z$::WriteGetCacheChunksResponse::Request");
-
- Writer.BeginObject();
- {
- if (Request.Exists)
- {
- Writer.AddHash("RawHash"sv, Request.Key->ChunkId);
- if (Request.Value && !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
- {
- RpcResponse.AddAttachment(CbAttachment(Request.Value, Request.Key->ChunkId));
- }
- else
- {
- Writer.AddInteger("RawSize"sv, Request.RawSize);
- }
-
- ZEN_DEBUG("GETCACHECHUNKS HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}",
- Namespace,
- Request.Key->Key.Bucket,
- Request.Key->Key.Hash,
- Request.Key->ValueId,
- NiceBytes(Request.RawSize),
- Request.IsRecordRequest ? "Record"sv : "Value"sv,
- Request.Source ? Request.Source->Url : "LOCAL"sv,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
- m_CacheStats.HitCount++;
- }
- else if (!EnumHasAnyFlags(Request.DownstreamPolicy, CachePolicy::Query))
- {
- ZEN_DEBUG("GETCACHECHUNKS DISABLEDQUERY - '{}/{}/{}/{}' in {}",
- Namespace,
- Request.Key->Key.Bucket,
- Request.Key->Key.Hash,
- Request.Key->ValueId,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
- }
- else
- {
- ZEN_DEBUG("GETCACHECHUNKS MISS - '{}/{}/{}/{}' in {}",
- Namespace,
- Request.Key->Key.Bucket,
- Request.Key->Key.Hash,
- Request.Key->ValueId,
- NiceLatencyNs(Request.ElapsedTimeUs * 1000));
- m_CacheStats.MissCount++;
- }
- }
- Writer.EndObject();
- }
- Writer.EndArray();
-
- RpcResponse.SetObject(Writer.Save());
- return RpcResponse;
-}
-
void
HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request)
{
diff --git a/src/zenserver/cache/httpstructuredcache.h b/src/zenserver/cache/httpstructuredcache.h
index 2feaaead8..da4bdd63c 100644
--- a/src/zenserver/cache/httpstructuredcache.h
+++ b/src/zenserver/cache/httpstructuredcache.h
@@ -6,6 +6,7 @@
#include <zenhttp/httpserver.h>
#include <zenhttp/httpstats.h>
#include <zenhttp/httpstatus.h>
+#include <zenstore/cache/cacherpc.h>
#include <zenutil/cache/cache.h>
#include <zenutil/openprocesscache.h>
@@ -16,13 +17,16 @@ namespace zen {
struct CacheChunkRequest;
struct CacheKeyRequest;
+struct PutRequestData;
+
class CidStore;
class CbObjectView;
-struct PutRequestData;
+class DiskWriteBlocker;
+class HttpStructuredCacheService;
class ScrubContext;
class UpstreamCache;
class ZenCacheStore;
-class DiskWriteBlocker;
+
enum class CachePolicy : uint32_t;
enum class RpcAcceptOptions : uint16_t;
@@ -89,28 +93,6 @@ private:
IoHash ValueContentId;
};
- struct CacheStats
- {
- std::atomic_uint64_t HitCount{};
- std::atomic_uint64_t UpstreamHitCount{};
- std::atomic_uint64_t MissCount{};
- std::atomic_uint64_t WriteCount{};
- std::atomic_uint64_t BadRequestCount{};
- std::atomic_uint64_t RpcRequests{};
- std::atomic_uint64_t RpcRecordRequests{};
- std::atomic_uint64_t RpcRecordBatchRequests{};
- std::atomic_uint64_t RpcValueRequests{};
- std::atomic_uint64_t RpcValueBatchRequests{};
- std::atomic_uint64_t RpcChunkRequests{};
- std::atomic_uint64_t RpcChunkBatchRequests{};
- };
- enum class PutResult
- {
- Success,
- Fail,
- Invalid,
- };
-
void HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
void HandleGetCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
void HandlePutCacheRecord(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
@@ -120,57 +102,11 @@ private:
void HandleRpcRequest(HttpServerRequest& Request);
void HandleDetailsRequest(HttpServerRequest& Request);
- CbPackage HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest);
- CbPackage HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView BatchRequest);
- CbPackage HandleRpcPutCacheValues(const CacheRequestContext& Context, const CbPackage& BatchRequest);
- CbPackage HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest);
- CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbObjectView BatchRequest);
- HttpResponseCode HandleRpcRequest(const CacheRequestContext& Context,
- const ZenContentType ContentType,
- IoBuffer&& Body,
- uint32_t& OutAcceptMagic,
- RpcAcceptOptions& OutAcceptFlags,
- int& OutTargetProcessId,
- CbPackage& OutPackage);
-
void HandleCacheRequest(HttpServerRequest& Request);
void HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view Namespace);
void HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Namespace, std::string_view Bucket);
virtual void HandleStatsRequest(HttpServerRequest& Request) override;
virtual void HandleStatusRequest(HttpServerRequest& Request) override;
- PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
-
- /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */
- bool ParseGetCacheChunksRequest(std::string& Namespace,
- std::vector<CacheKeyRequest>& RecordKeys,
- std::vector<cache::detail::RecordBody>& Records,
- std::vector<CacheChunkRequest>& RequestKeys,
- std::vector<cache::detail::ChunkRequest>& Requests,
- std::vector<cache::detail::ChunkRequest*>& RecordRequests,
- std::vector<cache::detail::ChunkRequest*>& ValueRequests,
- CbObjectView RpcRequest);
- /** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally. */
- void GetLocalCacheRecords(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::vector<CacheKeyRequest>& RecordKeys,
- std::vector<cache::detail::RecordBody>& Records,
- std::vector<cache::detail::ChunkRequest*>& RecordRequests,
- std::vector<CacheChunkRequest*>& OutUpstreamChunks);
- /** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */
- void GetLocalCacheValues(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::vector<cache::detail::ChunkRequest*>& ValueRequests,
- std::vector<CacheChunkRequest*>& OutUpstreamChunks);
- /** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */
- void GetUpstreamCacheChunks(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::vector<CacheChunkRequest*>& UpstreamChunks,
- std::vector<CacheChunkRequest>& RequestKeys,
- std::vector<cache::detail::ChunkRequest>& Requests);
- /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */
- CbPackage WriteGetCacheChunksResponse(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::vector<cache::detail::ChunkRequest>& Requests);
bool AreDiskWritesAllowed() const;
@@ -187,6 +123,7 @@ private:
CacheStats m_CacheStats;
const DiskWriteBlocker* m_DiskWriteBlocker = nullptr;
OpenProcessCache m_OpenProcessCache;
+ CacheRpcHandler m_RpcHandler;
void ReplayRequestRecorder(const CacheRequestContext& Context, cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount);
@@ -199,14 +136,6 @@ private:
std::unique_ptr<cache::IRpcRequestRecorder> m_RequestRecorder;
};
-/** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys.
- * We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers. */
-inline bool
-IsCompressedBinary(ZenContentType Type)
-{
- return Type == ZenContentType::kBinary || Type == ZenContentType::kCompressedBinary;
-}
-
void z$service_forcelink();
} // namespace zen
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index b7507bd17..038a6db47 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -15,11 +15,11 @@
#include <zencore/stream.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
-#include <zenhttp/httpshared.h>
#include <zenstore/caslog.h>
#include <zenstore/cidstore.h>
#include <zenstore/scrubcontext.h>
#include <zenutil/cache/rpcrecording.h>
+#include <zenutil/packageformat.h>
#include "fileremoteprojectstore.h"
#include "jupiterremoteprojectstore.h"
diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp
index 57a09e929..7823010b5 100644
--- a/src/zenserver/projectstore/zenremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp
@@ -9,7 +9,7 @@
#include <zencore/scopeguard.h>
#include <zencore/stream.h>
#include <zencore/timer.h>
-#include <zenhttp/httpshared.h>
+#include <zenutil/packageformat.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <cpr/cpr.h>
diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp
index 6dde0a701..dac29c273 100644
--- a/src/zenserver/upstream/upstreamcache.cpp
+++ b/src/zenserver/upstream/upstreamcache.cpp
@@ -16,10 +16,10 @@
#include <zencore/trace.h>
#include <zenhttp/auth/authmgr.h>
-#include <zenhttp/httpshared.h>
#include <zenstore/cidstore.h>
+#include <zenutil/packageformat.h>
-#include <zenstore/structuredcachestore.h>
+#include <zenstore/cache/structuredcachestore.h>
#include "cache/httpstructuredcache.h"
#include "diag/logging.h"
@@ -2129,7 +2129,7 @@ UpstreamEndpoint::CreateJupiterEndpoint(const CloudCacheClientOptions& Options,
}
std::unique_ptr<UpstreamCache>
-UpstreamCache::Create(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore)
+CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore)
{
return std::make_unique<UpstreamCacheImpl>(Options, CacheStore, CidStore);
}
diff --git a/src/zenserver/upstream/upstreamcache.h b/src/zenserver/upstream/upstreamcache.h
index 291e7e95e..bb0193e4e 100644
--- a/src/zenserver/upstream/upstreamcache.h
+++ b/src/zenserver/upstream/upstreamcache.h
@@ -8,6 +8,7 @@
#include <zencore/iohash.h>
#include <zencore/stats.h>
#include <zencore/zencore.h>
+#include <zenstore/cache/upstreamcacheclient.h>
#include <zenutil/cache/cache.h>
#include <atomic>
@@ -29,95 +30,6 @@ struct CloudCacheClientOptions;
class CloudCacheTokenProvider;
struct ZenStructuredCacheClientOptions;
-struct UpstreamCacheRecord
-{
- ZenContentType Type = ZenContentType::kBinary;
- std::string Namespace;
- CacheKey Key;
- std::vector<IoHash> ValueContentIds;
- CacheRequestContext Context;
-};
-
-struct UpstreamCacheOptions
-{
- std::chrono::seconds HealthCheckInterval{5};
- uint32_t ThreadCount = 4;
- bool ReadUpstream = true;
- bool WriteUpstream = true;
-};
-
-struct UpstreamError
-{
- int32_t ErrorCode{};
- std::string Reason{};
-
- explicit operator bool() const { return ErrorCode != 0; }
-};
-
-struct UpstreamEndpointInfo
-{
- std::string Name;
- std::string Url;
-};
-
-struct GetUpstreamCacheResult
-{
- UpstreamError Error{};
- int64_t Bytes{};
- double ElapsedSeconds{};
- bool Success = false;
-};
-
-struct GetUpstreamCacheSingleResult
-{
- GetUpstreamCacheResult Status;
- IoBuffer Value;
- const UpstreamEndpointInfo* Source = nullptr;
-};
-
-struct PutUpstreamCacheResult
-{
- std::string Reason;
- int64_t Bytes{};
- double ElapsedSeconds{};
- bool Success = false;
-};
-
-struct CacheRecordGetCompleteParams
-{
- CacheKeyRequest& Request;
- const CbObjectView& Record;
- const CbPackage& Package;
- double ElapsedSeconds{};
- const UpstreamEndpointInfo* Source = nullptr;
-};
-
-using OnCacheRecordGetComplete = std::function<void(CacheRecordGetCompleteParams&&)>;
-
-struct CacheValueGetCompleteParams
-{
- CacheValueRequest& Request;
- IoHash RawHash;
- uint64_t RawSize;
- IoBuffer Value;
- double ElapsedSeconds{};
- const UpstreamEndpointInfo* Source = nullptr;
-};
-
-using OnCacheValueGetComplete = std::function<void(CacheValueGetCompleteParams&&)>;
-
-struct CacheChunkGetCompleteParams
-{
- CacheChunkRequest& Request;
- IoHash RawHash;
- uint64_t RawSize;
- IoBuffer Value;
- double ElapsedSeconds{};
- const UpstreamEndpointInfo* Source = nullptr;
-};
-
-using OnCacheChunksGetComplete = std::function<void(CacheChunkGetCompleteParams&&)>;
-
struct UpstreamEndpointStats
{
metrics::OperationTiming CacheGetRequestTiming;
@@ -172,6 +84,13 @@ struct UpstreamEndpointStatus
UpstreamEndpointState State;
};
+struct GetUpstreamCacheSingleResult
+{
+ GetUpstreamCacheResult Status;
+ IoBuffer Value;
+ const UpstreamEndpointInfo* Source = nullptr;
+};
+
/**
* The upstream endpoint is responsible for handling upload/downloading of cache records.
*/
@@ -217,39 +136,32 @@ public:
/**
* Manages one or more upstream cache endpoints.
*/
-class UpstreamCache
+
+class UpstreamCache : public UpstreamCacheClient
{
public:
- virtual ~UpstreamCache() = default;
-
virtual void Initialize() = 0;
- virtual bool IsActive() = 0;
-
virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0;
virtual void IterateEndpoints(std::function<bool(UpstreamEndpoint&)>&& Fn) = 0;
virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0;
- virtual void GetCacheRecords(std::string_view Namespace,
- std::span<CacheKeyRequest*> Requests,
- OnCacheRecordGetComplete&& OnComplete) = 0;
-
- virtual void GetCacheValues(std::string_view Namespace,
- std::span<CacheValueRequest*> CacheValueRequests,
- OnCacheValueGetComplete&& OnComplete) = 0;
virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace,
const CacheKey& CacheKey,
- const IoHash& ValueContentId) = 0;
- virtual void GetCacheChunks(std::string_view Namespace,
- std::span<CacheChunkRequest*> CacheChunkRequests,
- OnCacheChunksGetComplete&& OnComplete) = 0;
-
- virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0;
+ const IoHash& ValueContentId) = 0;
virtual void GetStatus(CbObjectWriter& CbO) = 0;
+};
- static std::unique_ptr<UpstreamCache> Create(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore);
+struct UpstreamCacheOptions
+{
+ std::chrono::seconds HealthCheckInterval{5};
+ uint32_t ThreadCount = 4;
+ bool ReadUpstream = true;
+ bool WriteUpstream = true;
};
+std::unique_ptr<UpstreamCache> CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore);
+
} // namespace zen
diff --git a/src/zenserver/upstream/zen.cpp b/src/zenserver/upstream/zen.cpp
index afc3b8438..c031a4086 100644
--- a/src/zenserver/upstream/zen.cpp
+++ b/src/zenserver/upstream/zen.cpp
@@ -10,9 +10,9 @@
#include <zencore/stream.h>
#include <zenhttp/formatters.h>
#include <zenhttp/httpcommon.h>
-#include <zenhttp/httpshared.h>
+#include <zenutil/packageformat.h>
-#include <zenstore/structuredcachestore.h>
+#include <zenstore/cache/structuredcachestore.h>
#include "diag/logging.h"
ZEN_THIRD_PARTY_INCLUDES_START
diff --git a/src/zenserver/vfs/vfsimpl.cpp b/src/zenserver/vfs/vfsimpl.cpp
index 69b9bb356..f528b2620 100644
--- a/src/zenserver/vfs/vfsimpl.cpp
+++ b/src/zenserver/vfs/vfsimpl.cpp
@@ -7,7 +7,7 @@
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
-#include <zenstore/structuredcachestore.h>
+#include <zenstore/cache/structuredcachestore.h>
#include <zenvfs/projfs.h>
#include <zenvfs/vfs.h>
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index f80f95f8e..37b3f0531 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -510,7 +510,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount);
}
- m_UpstreamCache = UpstreamCache::Create(UpstreamOptions, *m_CacheStore, *m_CidStore);
+ m_UpstreamCache = CreateUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore);
m_UpstreamService = std::make_unique<HttpUpstreamService>(*m_UpstreamCache, *m_AuthMgr);
m_UpstreamCache->Initialize();
diff --git a/src/zenserver/zenserver.h b/src/zenserver/zenserver.h
index a53e16821..6ff13cfff 100644
--- a/src/zenserver/zenserver.h
+++ b/src/zenserver/zenserver.h
@@ -22,8 +22,8 @@ ZEN_THIRD_PARTY_INCLUDES_END
#include <zenhttp/httpstats.h>
#include <zenhttp/httpstatus.h>
#include <zenhttp/httptest.h>
+#include <zenstore/cache/structuredcachestore.h>
#include <zenstore/gc.h>
-#include <zenstore/structuredcachestore.h>
#include "admin/admin.h"
#include "cache/httpstructuredcache.h"
#include "diag/diagsvcs.h"
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 82f190caa..4d6b9f89e 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -1,6 +1,6 @@
// Copyright Epic Games, Inc. All Rights Reserved.
-#include "zenstore/cachedisklayer.h"
+#include "zenstore/cache/cachedisklayer.h"
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
new file mode 100644
index 000000000..96b344ee9
--- /dev/null
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -0,0 +1,1640 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zenstore/cache/cacherpc.h"
+
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/fmtutils.h>
+#include <zencore/scopeguard.h>
+#include <zencore/timer.h>
+#include <zencore/trace.h>
+#include <zenstore/cache/cacheshared.h>
+#include <zenstore/cache/structuredcachestore.h>
+#include <zenstore/cache/upstreamcacheclient.h>
+#include <zenstore/cidstore.h>
+#include <zenutil/packageformat.h>
+
+namespace zen { namespace {
+
+ constinit AsciiSet ValidNamespaceNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789-_.ABCDEFGHIJKLMNOPQRSTUVWXYZ"};
+ constinit AsciiSet ValidBucketNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"};
+
+ std::optional<std::string> GetValidNamespaceName(std::string_view Name)
+ {
+ if (Name.empty())
+ {
+ ZEN_WARN("Namespace is invalid, empty namespace is not allowed");
+ return {};
+ }
+
+ if (Name.length() > 64)
+ {
+ ZEN_WARN("Namespace '{}' is invalid, length exceeds 64 characters", Name);
+ return {};
+ }
+
+ if (!AsciiSet::HasOnly(Name, ValidNamespaceNameCharactersSet))
+ {
+ ZEN_WARN("Namespace '{}' is invalid, invalid characters detected", Name);
+ return {};
+ }
+
+ return ToLower(Name);
+ }
+
+ std::optional<std::string> GetValidBucketName(std::string_view Name)
+ {
+ if (Name.empty())
+ {
+ ZEN_WARN("Bucket name is invalid, empty bucket name is not allowed");
+ return {};
+ }
+
+ if (!AsciiSet::HasOnly(Name, ValidBucketNameCharactersSet))
+ {
+ ZEN_WARN("Bucket name '{}' is invalid, invalid characters detected", Name);
+ return {};
+ }
+
+ return ToLower(Name);
+ }
+
+}} // namespace zen::
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen {
+
+using namespace std::literals;
+
+std::optional<std::string>
+GetRpcRequestNamespace(const CbObjectView Params)
+{
+ CbFieldView NamespaceField = Params["Namespace"sv];
+ if (!NamespaceField)
+ {
+ return std::string(ZenCacheStore::DefaultNamespace);
+ }
+
+ if (NamespaceField.HasError())
+ {
+ return {};
+ }
+ if (!NamespaceField.IsString())
+ {
+ return {};
+ }
+ return GetValidNamespaceName(NamespaceField.AsString());
+}
+
+bool
+GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key)
+{
+ CbFieldView BucketField = KeyView["Bucket"sv];
+ if (BucketField.HasError())
+ {
+ return false;
+ }
+ if (!BucketField.IsString())
+ {
+ return false;
+ }
+ std::optional<std::string> Bucket = GetValidBucketName(BucketField.AsString());
+ if (!Bucket.has_value())
+ {
+ return false;
+ }
+ CbFieldView HashField = KeyView["Hash"sv];
+ if (HashField.HasError())
+ {
+ return false;
+ }
+ if (!HashField.IsHash())
+ {
+ return false;
+ }
+ IoHash Hash = HashField.AsHash();
+ Key = CacheKey::Create(*Bucket, Hash);
+ return true;
+}
+
+namespace cache::detail {
+
+ struct RecordValue
+ {
+ Oid ValueId;
+ IoHash ContentId;
+ uint64_t RawSize;
+ };
+
+ struct RecordBody
+ {
+ IoBuffer CacheValue;
+ std::vector<RecordValue> Values;
+ const UpstreamEndpointInfo* Source = nullptr;
+ CachePolicy DownstreamPolicy;
+ bool Exists = false;
+ bool HasRequest = false;
+ bool ValuesRead = false;
+ };
+
+ struct ChunkRequest
+ {
+ CacheChunkRequest* Key = nullptr;
+ RecordBody* Record = nullptr;
+ CompressedBuffer Value;
+ const UpstreamEndpointInfo* Source = nullptr;
+ uint64_t RawSize = 0;
+ uint64_t RequestedSize = 0;
+ uint64_t RequestedOffset = 0;
+ CachePolicy DownstreamPolicy;
+ bool Exists = false;
+ bool RawSizeKnown = false;
+ bool IsRecordRequest = false;
+ uint64_t ElapsedTimeUs = 0;
+ };
+
+} // namespace cache::detail
+
+struct PutRequestData
+{
+ std::string Namespace;
+ CacheKey Key;
+ CbObjectView RecordObject;
+ CacheRecordPolicy Policy;
+ CacheRequestContext Context;
+};
+
+CacheRecordPolicy
+LoadCacheRecordPolicy(CbObjectView Object, CachePolicy DefaultPolicy = CachePolicy::Default)
+{
+ OptionalCacheRecordPolicy Policy = CacheRecordPolicy::Load(Object);
+ return Policy ? std::move(Policy).Get() : CacheRecordPolicy(DefaultPolicy);
+}
+
+CacheRpcHandler::CacheRpcHandler(LoggerRef InLog,
+ CacheStats& InCacheStats,
+ UpstreamCacheClient& InUpstreamCache,
+ ZenCacheStore& InCacheStore,
+ CidStore& InCidStore,
+ const DiskWriteBlocker* InDiskWriteBlocker)
+: m_Log(InLog)
+, m_CacheStats(InCacheStats)
+, m_UpstreamCache(InUpstreamCache)
+, m_CacheStore(InCacheStore)
+, m_CidStore(InCidStore)
+, m_DiskWriteBlocker(InDiskWriteBlocker)
+{
+}
+
+CacheRpcHandler::~CacheRpcHandler()
+{
+}
+
+bool
+CacheRpcHandler::AreDiskWritesAllowed() const
+{
+ return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed());
+}
+
+CacheRpcHandler::RpcResponseCode
+CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context,
+ const ZenContentType ContentType,
+ IoBuffer&& Body,
+ uint32_t& OutAcceptMagic,
+ RpcAcceptOptions& OutAcceptFlags,
+ int& OutTargetProcessId,
+ CbPackage& OutResultPackage)
+{
+ ZEN_TRACE_CPU("Z$::HandleRpcRequest");
+
+ m_CacheStats.RpcRequests.fetch_add(1);
+
+ CbPackage Package;
+ CbObjectView Object;
+ CbObject ObjectBuffer;
+ if (ContentType == ZenContentType::kCbObject)
+ {
+ ObjectBuffer = LoadCompactBinaryObject(std::move(Body));
+ Object = ObjectBuffer;
+ }
+ else
+ {
+ Package = ParsePackageMessage(Body);
+ Object = Package.GetObject();
+ }
+ OutAcceptMagic = Object["Accept"sv].AsUInt32();
+ OutAcceptFlags = static_cast<RpcAcceptOptions>(Object["AcceptFlags"sv].AsUInt16(0u));
+ OutTargetProcessId = Object["Pid"sv].AsInt32(0);
+
+ const std::string_view Method = Object["Method"sv].AsString();
+
+ if (Method == "PutCacheRecords"sv)
+ {
+ if (!AreDiskWritesAllowed())
+ {
+ return RpcResponseCode::InsufficientStorage;
+ }
+ OutResultPackage = HandleRpcPutCacheRecords(Context, Package);
+ }
+ else if (Method == "GetCacheRecords"sv)
+ {
+ OutResultPackage = HandleRpcGetCacheRecords(Context, Object);
+ }
+ else if (Method == "PutCacheValues"sv)
+ {
+ if (!AreDiskWritesAllowed())
+ {
+ return RpcResponseCode::InsufficientStorage;
+ }
+ OutResultPackage = HandleRpcPutCacheValues(Context, Package);
+ }
+ else if (Method == "GetCacheValues"sv)
+ {
+ OutResultPackage = HandleRpcGetCacheValues(Context, Object);
+ }
+ else if (Method == "GetCacheChunks"sv)
+ {
+ OutResultPackage = HandleRpcGetCacheChunks(Context, Object);
+ }
+ else
+ {
+ m_CacheStats.BadRequestCount++;
+ return RpcResponseCode::BadRequest;
+ }
+ return RpcResponseCode::OK;
+}
+
+CbPackage
+CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest)
+{
+ ZEN_TRACE_CPU("Z$::RpcPutCacheRecords");
+
+ CbObjectView BatchObject = BatchRequest.GetObject();
+ ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv);
+
+ CbObjectView Params = BatchObject["Params"sv].AsObjectView();
+ CachePolicy DefaultPolicy;
+
+ m_CacheStats.RpcRecordRequests.fetch_add(1);
+
+ std::string_view PolicyText = Params["DefaultPolicy"].AsString();
+ std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
+ if (!Namespace)
+ {
+ return CbPackage{};
+ }
+ DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+
+ std::vector<bool> Results;
+
+ CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
+ for (CbFieldView RequestField : RequestsArray)
+ {
+ m_CacheStats.RpcRecordBatchRequests.fetch_add(1);
+ CbObjectView RequestObject = RequestField.AsObjectView();
+ CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView();
+ CbObjectView KeyView = RecordObject["Key"sv].AsObjectView();
+
+ CacheKey Key;
+ if (!GetRpcRequestCacheKey(KeyView, Key))
+ {
+ return CbPackage{};
+ }
+ CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
+ PutRequestData PutRequest{.Namespace = *Namespace,
+ .Key = std::move(Key),
+ .RecordObject = RecordObject,
+ .Policy = std::move(Policy),
+ .Context = Context};
+
+ PutResult Result = PutCacheRecord(PutRequest, &BatchRequest);
+
+ if (Result == PutResult::Invalid)
+ {
+ return CbPackage{};
+ }
+ Results.push_back(Result == PutResult::Success);
+ }
+ if (Results.empty())
+ {
+ return CbPackage{};
+ }
+
+ CbObjectWriter ResponseObject;
+ ResponseObject.BeginArray("Result"sv);
+ for (bool Value : Results)
+ {
+ ResponseObject.AddBool(Value);
+ }
+ ResponseObject.EndArray();
+
+ CbPackage RpcResponse;
+ RpcResponse.SetObject(ResponseObject.Save());
+ return RpcResponse;
+}
+
+PutResult
+CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Package)
+{
+ CbObjectView Record = Request.RecordObject;
+ uint64_t RecordObjectSize = Record.GetSize();
+ uint64_t TransferredSize = RecordObjectSize;
+
+ AttachmentCount Count;
+ size_t NumAttachments = Package->GetAttachments().size();
+ std::vector<IoHash> ValidAttachments;
+ std::vector<IoHash> ReferencedAttachments;
+ std::vector<const CbAttachment*> AttachmentsToStoreLocally;
+ ValidAttachments.reserve(NumAttachments);
+ AttachmentsToStoreLocally.reserve(NumAttachments);
+
+ const bool HasUpstream = m_UpstreamCache.IsActive();
+
+ Stopwatch Timer;
+
+ Request.RecordObject.IterateAttachments(
+ [this, &Request, Package, &AttachmentsToStoreLocally, &ValidAttachments, &ReferencedAttachments, &Count, &TransferredSize](
+ CbFieldView HashView) {
+ const IoHash ValueHash = HashView.AsHash();
+ ReferencedAttachments.push_back(ValueHash);
+ if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr)
+ {
+ if (Attachment->IsCompressedBinary())
+ {
+ AttachmentsToStoreLocally.emplace_back(Attachment);
+ ValidAttachments.emplace_back(ValueHash);
+ Count.Valid++;
+ }
+ else
+ {
+ ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
+ Request.Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ ToString(ZenContentType::kCbPackage),
+ ValueHash);
+ Count.Invalid++;
+ }
+ }
+ else if (m_CidStore.ContainsChunk(ValueHash))
+ {
+ ValidAttachments.emplace_back(ValueHash);
+ Count.Valid++;
+ }
+ Count.Total++;
+ });
+
+ if (Count.Invalid > 0)
+ {
+ return PutResult::Invalid;
+ }
+
+ ZenCacheValue CacheValue;
+ CacheValue.Value = IoBuffer(Record.GetSize());
+ Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
+ CacheValue.Value.SetContentType(ZenContentType::kCbObject);
+ m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments);
+ m_CacheStats.WriteCount++;
+
+ for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
+ {
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
+ if (InsertResult.New)
+ {
+ Count.New++;
+ }
+ TransferredSize += Chunk.GetCompressedSize();
+ }
+
+ ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total) in {}",
+ Request.Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ NiceBytes(TransferredSize),
+ Count.New,
+ Count.Valid,
+ Count.Total,
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+
+ const bool IsPartialRecord = Count.Valid != Count.Total;
+
+ if (HasUpstream && EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord)
+ {
+ m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
+ .Namespace = Request.Namespace,
+ .Key = Request.Key,
+ .ValueContentIds = std::move(ValidAttachments)});
+ }
+ return PutResult::Success;
+}
+
+CbPackage
+CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView RpcRequest)
+{
+ ZEN_TRACE_CPU("Z$::RpcGetCacheRecords");
+
+ ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv);
+
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ m_CacheStats.RpcRecordRequests.fetch_add(1);
+
+ struct ValueRequestData
+ {
+ Oid ValueId;
+ IoHash ContentId;
+ CompressedBuffer Payload;
+ CachePolicy DownstreamPolicy;
+ bool Exists = false;
+ bool ReadFromUpstream = false;
+ };
+ struct RecordRequestData
+ {
+ CacheKeyRequest Upstream;
+ CbObjectView RecordObject;
+ IoBuffer RecordCacheValue;
+ CacheRecordPolicy DownstreamPolicy;
+ std::vector<ValueRequestData> Values;
+ bool Complete = false;
+ const UpstreamEndpointInfo* Source = nullptr;
+ uint64_t ElapsedTimeUs;
+ };
+
+ std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
+ CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
+ if (!Namespace)
+ {
+ return CbPackage{};
+ }
+
+ const bool HasUpstream = m_UpstreamCache.IsActive();
+
+ std::vector<RecordRequestData> Requests;
+ std::vector<size_t> UpstreamIndexes;
+
+ auto ParseValues = [](RecordRequestData& Request) {
+ CbArrayView ValuesArray = Request.RecordObject["Values"sv].AsArrayView();
+ Request.Values.reserve(ValuesArray.Num());
+ for (CbFieldView ValueField : ValuesArray)
+ {
+ CbObjectView ValueObject = ValueField.AsObjectView();
+ Oid ValueId = ValueObject["Id"sv].AsObjectId();
+ CbFieldView RawHashField = ValueObject["RawHash"sv];
+ IoHash RawHash = RawHashField.AsBinaryAttachment();
+ if (ValueId && !RawHashField.HasError())
+ {
+ Request.Values.push_back({ValueId, RawHash});
+ Request.Values.back().DownstreamPolicy = Request.DownstreamPolicy.GetValuePolicy(ValueId);
+ }
+ }
+ };
+
+ CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
+ for (CbFieldView RequestField : RequestsArray)
+ {
+ ZEN_TRACE_CPU("Z$::RpcGetCacheRecords::Request");
+
+ m_CacheStats.RpcRecordBatchRequests.fetch_add(1);
+
+ Stopwatch Timer;
+ RecordRequestData& Request = Requests.emplace_back();
+ CbObjectView RequestObject = RequestField.AsObjectView();
+ CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
+
+ CacheKey& Key = Request.Upstream.Key;
+ if (!GetRpcRequestCacheKey(KeyObject, Key))
+ {
+ return CbPackage{};
+ }
+
+ Request.DownstreamPolicy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
+ const CacheRecordPolicy& Policy = Request.DownstreamPolicy;
+
+ ZenCacheValue CacheValue;
+ bool NeedUpstreamAttachment = false;
+ bool FoundLocalInvalid = false;
+ ZenCacheValue RecordCacheValue;
+
+ if (EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryLocal) &&
+ m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, RecordCacheValue))
+ {
+ Request.RecordCacheValue = std::move(RecordCacheValue.Value);
+ if (Request.RecordCacheValue.GetContentType() != ZenContentType::kCbObject)
+ {
+ FoundLocalInvalid = true;
+ }
+ else
+ {
+ Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData());
+ ParseValues(Request);
+
+ Request.Complete = true;
+ for (ValueRequestData& Value : Request.Values)
+ {
+ CachePolicy ValuePolicy = Value.DownstreamPolicy;
+ if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal))
+ {
+ // A value that is requested without the Query flag (such as None/Disable) counts as existing, because we
+ // didn't ask for it and thus the record is complete in its absence.
+ if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ Value.Exists = true;
+ }
+ else
+ {
+ NeedUpstreamAttachment = true;
+ Value.ReadFromUpstream = true;
+ Request.Complete = false;
+ }
+ }
+ else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
+ {
+ if (m_CidStore.ContainsChunk(Value.ContentId))
+ {
+ Value.Exists = true;
+ }
+ else
+ {
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ NeedUpstreamAttachment = true;
+ Value.ReadFromUpstream = true;
+ }
+ Request.Complete = false;
+ }
+ }
+ else
+ {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId))
+ {
+ if (Chunk.GetSize() > 0)
+ {
+ Value.Payload = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
+ Value.Exists = true;
+ continue;
+ }
+ else
+ {
+ ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId);
+ }
+ }
+
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ NeedUpstreamAttachment = true;
+ Value.ReadFromUpstream = true;
+ }
+ Request.Complete = false;
+ }
+ }
+ }
+ }
+ if (!Request.Complete)
+ {
+ bool NeedUpstreamRecord = HasUpstream && !Request.RecordObject && !FoundLocalInvalid &&
+ EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryRemote);
+ if (NeedUpstreamRecord || NeedUpstreamAttachment)
+ {
+ UpstreamIndexes.push_back(Requests.size() - 1);
+ }
+ }
+ Request.ElapsedTimeUs = Timer.GetElapsedTimeUs();
+ }
+ if (Requests.empty())
+ {
+ return CbPackage{};
+ }
+
+ if (!UpstreamIndexes.empty())
+ {
+ std::vector<CacheKeyRequest*> UpstreamRequests;
+ UpstreamRequests.reserve(UpstreamIndexes.size());
+ for (size_t Index : UpstreamIndexes)
+ {
+ RecordRequestData& Request = Requests[Index];
+ UpstreamRequests.push_back(&Request.Upstream);
+
+ if (Request.Values.size())
+ {
+ // We will be returning the local object and know all the value Ids that exist in it
+ // Convert all their Downstream Values to upstream values, and add SkipData to any ones that we already have.
+ CachePolicy UpstreamBasePolicy = ConvertToUpstream(Request.DownstreamPolicy.GetBasePolicy()) | CachePolicy::SkipMeta;
+ CacheRecordPolicyBuilder Builder(UpstreamBasePolicy);
+ for (ValueRequestData& Value : Request.Values)
+ {
+ CachePolicy UpstreamPolicy = ConvertToUpstream(Value.DownstreamPolicy);
+ UpstreamPolicy |= !Value.ReadFromUpstream ? CachePolicy::SkipData : CachePolicy::None;
+ Builder.AddValuePolicy(Value.ValueId, UpstreamPolicy);
+ }
+ Request.Upstream.Policy = Builder.Build();
+ }
+ else
+ {
+ // We don't know which Values exist in the Record; ask the upstrem for all values that the client wants,
+ // and convert the CacheRecordPolicy to an upstream policy
+ Request.Upstream.Policy = Request.DownstreamPolicy.ConvertToUpstream();
+ }
+ }
+
+ const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues, Context](CacheRecordGetCompleteParams&& Params) {
+ if (!Params.Record)
+ {
+ return;
+ }
+
+ RecordRequestData& Request =
+ *reinterpret_cast<RecordRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(RecordRequestData, Upstream));
+ Request.ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
+ const CacheKey& Key = Request.Upstream.Key;
+ Stopwatch Timer;
+ auto TimeGuard = MakeGuard([&Timer, &Request]() { Request.ElapsedTimeUs += Timer.GetElapsedTimeUs(); });
+ if (!Request.RecordObject)
+ {
+ CbObject ObjectBuffer = CbObject::Clone(Params.Record);
+ Request.RecordCacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
+ Request.RecordCacheValue.SetContentType(ZenContentType::kCbObject);
+ Request.RecordObject = ObjectBuffer;
+ bool StoreLocal =
+ EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ if (StoreLocal)
+ {
+ std::vector<IoHash> ReferencedAttachments;
+ ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
+ const IoHash ValueHash = HashView.AsHash();
+ ReferencedAttachments.push_back(ValueHash);
+ });
+ m_CacheStore
+ .Put(Context, *Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}, ReferencedAttachments);
+ m_CacheStats.WriteCount++;
+ }
+ ParseValues(Request);
+ Request.Source = Params.Source;
+ }
+
+ Request.Complete = true;
+ for (ValueRequestData& Value : Request.Values)
+ {
+ if (Value.Exists)
+ {
+ continue;
+ }
+ CachePolicy ValuePolicy = Value.DownstreamPolicy;
+ if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ Request.Complete = false;
+ continue;
+ }
+ if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData) || EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal))
+ {
+ bool StoreLocal = EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ if (const CbAttachment* Attachment = Params.Package.FindAttachment(Value.ContentId))
+ {
+ if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
+ {
+ Request.Source = Params.Source;
+ Value.Exists = true;
+ if (StoreLocal)
+ {
+ m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
+ }
+ if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
+ {
+ Value.Payload = Compressed;
+ }
+ }
+ else
+ {
+ ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}/{}'",
+ Value.ContentId,
+ *Namespace,
+ Key.Bucket,
+ Key.Hash);
+ }
+ }
+ if (!Value.Exists && !EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
+ {
+ Request.Complete = false;
+ }
+ // Request.Complete does not need to be set to false for upstream SkipData attachments.
+ // In the PartialRecord==false case, the upstream will have failed the entire record if any SkipData attachment
+ // didn't exist and we will not get here. In the PartialRecord==true case, we do not need to inform the client of
+ // any missing SkipData attachments.
+ }
+ Request.ElapsedTimeUs += Timer.GetElapsedTimeUs();
+ }
+ };
+
+ m_UpstreamCache.GetCacheRecords(*Namespace, UpstreamRequests, std::move(OnCacheRecordGetComplete));
+ }
+
+ {
+ ZEN_TRACE_CPU("Z$::RpcGetCacheRecords::Response");
+ CbPackage ResponsePackage;
+ CbObjectWriter ResponseObject;
+
+ ResponseObject.BeginArray("Result"sv);
+ for (RecordRequestData& Request : Requests)
+ {
+ const CacheKey& Key = Request.Upstream.Key;
+ if (Request.Complete ||
+ (Request.RecordObject && EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::PartialRecord)))
+ {
+ ResponseObject << Request.RecordObject;
+ for (ValueRequestData& Value : Request.Values)
+ {
+ if (!EnumHasAllFlags(Value.DownstreamPolicy, CachePolicy::SkipData) && Value.Payload)
+ {
+ ResponsePackage.AddAttachment(CbAttachment(Value.Payload, Value.ContentId));
+ }
+ }
+
+ ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {}{} ({}) in {}",
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ NiceBytes(Request.RecordCacheValue.Size()),
+ Request.Complete ? ""sv : " (PARTIAL)"sv,
+ Request.Source ? Request.Source->Url : "LOCAL"sv,
+ NiceLatencyNs(Request.ElapsedTimeUs * 1000));
+ m_CacheStats.HitCount++;
+ m_CacheStats.UpstreamHitCount += Request.Source ? 1 : 0;
+ }
+ else
+ {
+ ResponseObject.AddNull();
+
+ if (!EnumHasAnyFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::Query))
+ {
+ // If they requested no query, do not record this as a miss
+ ZEN_DEBUG("GETCACHERECORD DISABLEDQUERY - '{}/{}/{}' in {}",
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ NiceLatencyNs(Request.ElapsedTimeUs * 1000));
+ }
+ else
+ {
+ ZEN_DEBUG("GETCACHERECORD MISS - '{}/{}/{}'{} ({}) in {}",
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ Request.RecordObject ? ""sv : " (PARTIAL)"sv,
+ Request.Source ? Request.Source->Url : "LOCAL"sv,
+ NiceLatencyNs(Request.ElapsedTimeUs * 1000));
+ m_CacheStats.MissCount++;
+ }
+ }
+ }
+ ResponseObject.EndArray();
+ ResponsePackage.SetObject(ResponseObject.Save());
+ return ResponsePackage;
+ }
+}
+
+CbPackage
+CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, const CbPackage& BatchRequest)
+{
+ ZEN_TRACE_CPU("Z$::RpcPutCacheValues");
+ CbObjectView BatchObject = BatchRequest.GetObject();
+ CbObjectView Params = BatchObject["Params"sv].AsObjectView();
+
+ m_CacheStats.RpcValueRequests.fetch_add(1);
+
+ std::string_view PolicyText = Params["DefaultPolicy"].AsString();
+ CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
+ if (!Namespace)
+ {
+ return CbPackage{};
+ }
+ const bool HasUpstream = m_UpstreamCache.IsActive();
+ CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
+
+ std::vector<bool> Results;
+ for (CbFieldView RequestField : RequestsArray)
+ {
+ ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Request");
+
+ m_CacheStats.RpcValueBatchRequests.fetch_add(1);
+
+ Stopwatch Timer;
+
+ CbObjectView RequestObject = RequestField.AsObjectView();
+ CbObjectView KeyView = RequestObject["Key"sv].AsObjectView();
+
+ CacheKey Key;
+ if (!GetRpcRequestCacheKey(KeyView, Key))
+ {
+ return CbPackage{};
+ }
+
+ PolicyText = RequestObject["Policy"sv].AsString();
+ CachePolicy Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
+ IoHash RawHash = RequestObject["RawHash"sv].AsBinaryAttachment();
+ uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64();
+ bool Succeeded = false;
+ uint64_t TransferredSize = 0;
+
+ if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash))
+ {
+ if (Attachment->IsCompressedBinary())
+ {
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
+ {
+ // TODO: Implement upstream puts of CacheValues with StoreLocal == false.
+ // Currently ProcessCacheRecord requires that the value exist in the local cache to put it upstream.
+ Policy |= CachePolicy::StoreLocal;
+ }
+
+ if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
+ {
+ IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
+ Value.SetContentType(ZenContentType::kCompressedBinary);
+ if (RawSize == 0)
+ {
+ RawSize = Chunk.DecodeRawSize();
+ }
+ m_CacheStore
+ .Put(Context, *Namespace, Key.Bucket, Key.Hash, {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, {});
+ m_CacheStats.WriteCount++;
+ TransferredSize = Chunk.GetCompressedSize();
+ }
+ Succeeded = true;
+ }
+ else
+ {
+ ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash);
+ return CbPackage{};
+ }
+ }
+ else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
+ {
+ ZenCacheValue ExistingValue;
+ if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) &&
+ IsCompressedBinary(ExistingValue.Value.GetContentType()))
+ {
+ Succeeded = true;
+ }
+ }
+ // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put.
+ // If it doesn't exist locally they should do the heavy put rather than having us fetch it from upstream.
+
+ if (HasUpstream && Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
+ {
+ m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = Key});
+ }
+ Results.push_back(Succeeded);
+ ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}' in {}",
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ NiceBytes(TransferredSize),
+ Succeeded ? "Added"sv : "Invalid",
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ }
+ if (Results.empty())
+ {
+ return CbPackage{};
+ }
+
+ {
+ ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Response");
+ CbObjectWriter ResponseObject;
+ ResponseObject.BeginArray("Result"sv);
+ for (bool Value : Results)
+ {
+ ResponseObject.AddBool(Value);
+ }
+ ResponseObject.EndArray();
+
+ CbPackage RpcResponse;
+ RpcResponse.SetObject(ResponseObject.Save());
+
+ return RpcResponse;
+ }
+}
+
+CbPackage
+CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView RpcRequest)
+{
+ ZEN_TRACE_CPU("Z$::RpcGetCacheValues");
+ ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv);
+
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ m_CacheStats.RpcValueRequests.fetch_add(1);
+
+ std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
+ CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
+ if (!Namespace)
+ {
+ return CbPackage{};
+ }
+
+ struct RequestData
+ {
+ CacheKey Key;
+ CachePolicy Policy;
+ IoHash RawHash = IoHash::Zero;
+ uint64_t RawSize = 0;
+ CompressedBuffer Result;
+ };
+ std::vector<RequestData> Requests;
+
+ std::vector<size_t> RemoteRequestIndexes;
+
+ const bool HasUpstream = m_UpstreamCache.IsActive();
+
+ CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
+ for (CbFieldView RequestField : RequestsArray)
+ {
+ ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Request");
+
+ m_CacheStats.RpcValueBatchRequests.fetch_add(1);
+
+ Stopwatch Timer;
+
+ RequestData& Request = Requests.emplace_back();
+ CbObjectView RequestObject = RequestField.AsObjectView();
+ CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
+
+ if (!GetRpcRequestCacheKey(KeyObject, Request.Key))
+ {
+ return CbPackage{};
+ }
+
+ PolicyText = RequestObject["Policy"sv].AsString();
+ Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
+
+ CacheKey& Key = Request.Key;
+ CachePolicy Policy = Request.Policy;
+
+ ZenCacheValue CacheValue;
+ if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
+ {
+ if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, CacheValue) &&
+ IsCompressedBinary(CacheValue.Value.GetContentType()))
+ {
+ Request.RawHash = CacheValue.RawHash;
+ Request.RawSize = CacheValue.RawSize;
+ Request.Result = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value));
+ }
+ }
+ if (Request.Result)
+ {
+ ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ NiceBytes(Request.Result.GetCompressed().GetSize()),
+ "LOCAL"sv,
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ m_CacheStats.HitCount++;
+ }
+ else if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::QueryRemote))
+ {
+ RemoteRequestIndexes.push_back(Requests.size() - 1);
+ }
+ else if (!EnumHasAnyFlags(Policy, CachePolicy::Query))
+ {
+ // If they requested no query, do not record this as a miss
+ ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash);
+ }
+ else
+ {
+ ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}",
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ "LOCAL"sv,
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ m_CacheStats.MissCount++;
+ }
+ }
+
+ if (!RemoteRequestIndexes.empty())
+ {
+ std::vector<CacheValueRequest> RequestedRecordsData;
+ std::vector<CacheValueRequest*> CacheValueRequests;
+ RequestedRecordsData.reserve(RemoteRequestIndexes.size());
+ CacheValueRequests.reserve(RemoteRequestIndexes.size());
+ for (size_t Index : RemoteRequestIndexes)
+ {
+ RequestData& Request = Requests[Index];
+ RequestedRecordsData.push_back({.Key = {Request.Key.Bucket, Request.Key.Hash}, .Policy = ConvertToUpstream(Request.Policy)});
+ CacheValueRequests.push_back(&RequestedRecordsData.back());
+ }
+ Stopwatch Timer;
+ m_UpstreamCache.GetCacheValues(
+ *Namespace,
+ CacheValueRequests,
+ [this, Namespace, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer, Context](
+ CacheValueGetCompleteParams&& Params) {
+ CacheValueRequest& ChunkRequest = Params.Request;
+ if (Params.RawHash != IoHash::Zero)
+ {
+ size_t RequestOffset = std::distance(RequestedRecordsData.data(), &ChunkRequest);
+ size_t RequestIndex = RemoteRequestIndexes[RequestOffset];
+ RequestData& Request = Requests[RequestIndex];
+ Request.RawHash = Params.RawHash;
+ Request.RawSize = Params.RawSize;
+ const bool HasData = IsCompressedBinary(Params.Value.GetContentType());
+ const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData);
+ const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ const bool IsHit = SkipData || HasData;
+ if (IsHit)
+ {
+ if (HasData && !SkipData)
+ {
+ Request.Result = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Params.Value));
+ }
+
+ if (HasData && StoreData)
+ {
+ m_CacheStore.Put(Context,
+ *Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash},
+ {});
+ m_CacheStats.WriteCount++;
+ }
+
+ ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
+ *Namespace,
+ ChunkRequest.Key.Bucket,
+ ChunkRequest.Key.Hash,
+ NiceBytes(Request.Result.GetCompressed().GetSize()),
+ Params.Source ? Params.Source->Url : "UPSTREAM",
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ m_CacheStats.HitCount++;
+ m_CacheStats.UpstreamHitCount++;
+ return;
+ }
+ }
+ ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}",
+ *Namespace,
+ ChunkRequest.Key.Bucket,
+ ChunkRequest.Key.Hash,
+ Params.Source ? Params.Source->Url : "UPSTREAM",
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ m_CacheStats.MissCount++;
+ });
+ }
+
+ if (Requests.empty())
+ {
+ return CbPackage{};
+ }
+
+ {
+ ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Response");
+ CbPackage RpcResponse;
+ CbObjectWriter ResponseObject;
+ ResponseObject.BeginArray("Result"sv);
+ for (const RequestData& Request : Requests)
+ {
+ ResponseObject.BeginObject();
+ {
+ const CompressedBuffer& Result = Request.Result;
+ if (Result)
+ {
+ ResponseObject.AddHash("RawHash"sv, Request.RawHash);
+ if (!EnumHasAllFlags(Request.Policy, CachePolicy::SkipData))
+ {
+ RpcResponse.AddAttachment(CbAttachment(Result, Request.RawHash));
+ }
+ else
+ {
+ ResponseObject.AddInteger("RawSize"sv, Request.RawSize);
+ }
+ }
+ else if (Request.RawHash != IoHash::Zero)
+ {
+ ResponseObject.AddHash("RawHash"sv, Request.RawHash);
+ ResponseObject.AddInteger("RawSize"sv, Request.RawSize);
+ }
+ }
+ ResponseObject.EndObject();
+ }
+ ResponseObject.EndArray();
+
+ RpcResponse.SetObject(ResponseObject.Save());
+ return RpcResponse;
+ }
+}
+
+CbPackage
+CacheRpcHandler::HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbObjectView RpcRequest)
+{
+ ZEN_TRACE_CPU("Z$::RpcGetCacheChunks");
+ using namespace cache::detail;
+
+ std::string Namespace;
+ std::vector<CacheKeyRequest> RecordKeys; // Data about a Record necessary to identify it to the upstream
+ std::vector<RecordBody> Records; // Scratch-space data about a Record when fulfilling RecordRequests
+ std::vector<CacheChunkRequest> RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream
+ std::vector<ChunkRequest> Requests; // Intermediate and result data about a ChunkRequest
+ std::vector<ChunkRequest*> RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key
+ std::vector<ChunkRequest*> ValueRequests; // The ChunkRequests that are requesting a Value Key
+ std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream
+
+ // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests
+ if (!ParseGetCacheChunksRequest(Namespace, RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest))
+ {
+ return CbPackage{};
+ }
+
+ // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we
+ // have it locally, and otherwise append a request for the payload to UpstreamChunks
+ GetLocalCacheRecords(Context, Namespace, RecordKeys, Records, RecordRequests, UpstreamChunks);
+
+ // For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks
+ GetLocalCacheValues(Context, Namespace, ValueRequests, UpstreamChunks);
+
+ // Call GetCacheChunks on the upstream for any payloads we do not have locally
+ GetUpstreamCacheChunks(Context, Namespace, UpstreamChunks, RequestKeys, Requests);
+
+ // Send the payload and descriptive data about each chunk to the client
+ return WriteGetCacheChunksResponse(Context, Namespace, Requests);
+}
+
+bool
+CacheRpcHandler::ParseGetCacheChunksRequest(std::string& Namespace,
+ std::vector<CacheKeyRequest>& RecordKeys,
+ std::vector<cache::detail::RecordBody>& Records,
+ std::vector<CacheChunkRequest>& RequestKeys,
+ std::vector<cache::detail::ChunkRequest>& Requests,
+ std::vector<cache::detail::ChunkRequest*>& RecordRequests,
+ std::vector<cache::detail::ChunkRequest*>& ValueRequests,
+ CbObjectView RpcRequest)
+{
+ ZEN_TRACE_CPU("Z$::ParseGetCacheChunksRequest");
+
+ using namespace cache::detail;
+
+ ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv);
+
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ m_CacheStats.RpcChunkRequests.fetch_add(1);
+
+ std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString();
+ CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default;
+
+ std::optional<std::string> NamespaceText = GetRpcRequestNamespace(Params);
+ if (!NamespaceText)
+ {
+ ZEN_WARN("GetCacheChunks: Invalid namespace in ChunkRequest.");
+ return false;
+ }
+ Namespace = *NamespaceText;
+
+ CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView();
+ size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num());
+ // Note that these reservations allow us to take pointers to the elements while populating them. If the reservation is removed,
+ // we will need to change the pointers to indexes to handle reallocations.
+ RecordKeys.reserve(NumRequests);
+ Records.reserve(NumRequests);
+ RequestKeys.reserve(NumRequests);
+ Requests.reserve(NumRequests);
+ RecordRequests.reserve(NumRequests);
+ ValueRequests.reserve(NumRequests);
+
+ CacheKeyRequest* PreviousRecordKey = nullptr;
+ RecordBody* PreviousRecord = nullptr;
+
+ for (CbFieldView RequestView : ChunkRequestsArray)
+ {
+ ZEN_TRACE_CPU("Z$::ParseGetCacheChunksRequest::Request");
+
+ m_CacheStats.RpcChunkBatchRequests.fetch_add(1);
+
+ CbObjectView RequestObject = RequestView.AsObjectView();
+ CacheChunkRequest& RequestKey = RequestKeys.emplace_back();
+ ChunkRequest& Request = Requests.emplace_back();
+ CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
+
+ Request.Key = &RequestKey;
+ if (!GetRpcRequestCacheKey(KeyObject, Request.Key->Key))
+ {
+ ZEN_WARN("GetCacheChunks: Invalid key in ChunkRequest.");
+ return false;
+ }
+
+ RequestKey.ChunkId = RequestObject["ChunkId"sv].AsHash();
+ RequestKey.ValueId = RequestObject["ValueId"sv].AsObjectId();
+ RequestKey.RawOffset = RequestObject["RawOffset"sv].AsUInt64();
+ RequestKey.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX);
+ Request.RequestedSize = RequestKey.RawSize;
+ Request.RequestedOffset = RequestKey.RawOffset;
+ std::string_view PolicyText = RequestObject["Policy"sv].AsString();
+ Request.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
+ Request.IsRecordRequest = (bool)RequestKey.ValueId;
+
+ if (!Request.IsRecordRequest)
+ {
+ ValueRequests.push_back(&Request);
+ }
+ else
+ {
+ RecordRequests.push_back(&Request);
+ CacheKeyRequest* RecordKey = nullptr;
+ RecordBody* Record = nullptr;
+
+ if (!PreviousRecordKey || PreviousRecordKey->Key < RequestKey.Key)
+ {
+ RecordKey = &RecordKeys.emplace_back();
+ PreviousRecordKey = RecordKey;
+ Record = &Records.emplace_back();
+ PreviousRecord = Record;
+ RecordKey->Key = RequestKey.Key;
+ }
+ else if (RequestKey.Key == PreviousRecordKey->Key)
+ {
+ RecordKey = PreviousRecordKey;
+ Record = PreviousRecord;
+ }
+ else
+ {
+ ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.",
+ RequestKey.Key.Bucket,
+ RequestKey.Key.Hash,
+ PreviousRecordKey->Key.Bucket,
+ PreviousRecordKey->Key.Hash);
+ return false;
+ }
+ Request.Record = Record;
+ if (RequestKey.ChunkId == RequestKey.ChunkId.Zero)
+ {
+ Record->DownstreamPolicy =
+ Record->HasRequest ? Union(Record->DownstreamPolicy, Request.DownstreamPolicy) : Request.DownstreamPolicy;
+ Record->HasRequest = true;
+ }
+ }
+ }
+ if (Requests.empty())
+ {
+ return false;
+ }
+ return true;
+}
+
+void
+CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::vector<CacheKeyRequest>& RecordKeys,
+ std::vector<cache::detail::RecordBody>& Records,
+ std::vector<cache::detail::ChunkRequest*>& RecordRequests,
+ std::vector<CacheChunkRequest*>& OutUpstreamChunks)
+{
+ ZEN_TRACE_CPU("Z$::GetLocalCacheRecords");
+
+ using namespace cache::detail;
+ const bool HasUpstream = m_UpstreamCache.IsActive();
+
+ std::vector<CacheKeyRequest*> UpstreamRecordRequests;
+ for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex)
+ {
+ ZEN_TRACE_CPU("Z$::GetLocalCacheRecords::Record");
+
+ Stopwatch Timer;
+ CacheKeyRequest& RecordKey = RecordKeys[RecordIndex];
+ RecordBody& Record = Records[RecordIndex];
+ if (Record.HasRequest)
+ {
+ Record.DownstreamPolicy |= CachePolicy::SkipData | CachePolicy::SkipMeta;
+
+ if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal))
+ {
+ ZenCacheValue CacheValue;
+ if (m_CacheStore.Get(Context, Namespace, RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue))
+ {
+ Record.Exists = true;
+ Record.CacheValue = std::move(CacheValue.Value);
+ }
+ }
+ if (HasUpstream && !Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryRemote))
+ {
+ RecordKey.Policy = CacheRecordPolicy(ConvertToUpstream(Record.DownstreamPolicy));
+ UpstreamRecordRequests.push_back(&RecordKey);
+ }
+ RecordRequests[RecordIndex]->ElapsedTimeUs += Timer.GetElapsedTimeUs();
+ }
+ }
+
+ if (!UpstreamRecordRequests.empty())
+ {
+ const auto OnCacheRecordGetComplete =
+ [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](CacheRecordGetCompleteParams&& Params) {
+ if (!Params.Record)
+ {
+ return;
+ }
+ CacheKeyRequest& RecordKey = Params.Request;
+ size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey);
+ RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
+ RecordBody& Record = Records[RecordIndex];
+
+ const CacheKey& Key = RecordKey.Key;
+ Record.Exists = true;
+ CbObject ObjectBuffer = CbObject::Clone(Params.Record);
+ Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
+ Record.CacheValue.SetContentType(ZenContentType::kCbObject);
+ Record.Source = Params.Source;
+
+ bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ if (StoreLocal)
+ {
+ std::vector<IoHash> ReferencedAttachments;
+ ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
+ const IoHash ValueHash = HashView.AsHash();
+ ReferencedAttachments.push_back(ValueHash);
+ });
+ m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}, ReferencedAttachments);
+ m_CacheStats.WriteCount++;
+ }
+ };
+ m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete));
+ }
+
+ for (ChunkRequest* Request : RecordRequests)
+ {
+ ZEN_TRACE_CPU("Z$::GetLocalCacheRecords::Chunk");
+
+ Stopwatch Timer;
+ if (Request->Key->ChunkId == IoHash::Zero)
+ {
+ // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId)
+ // is missing, parse the cache record and try to find the raw hash from the ValueId.
+ RecordBody& Record = *Request->Record;
+ if (!Record.ValuesRead)
+ {
+ Record.ValuesRead = true;
+ if (Record.CacheValue && Record.CacheValue.GetContentType() == ZenContentType::kCbObject)
+ {
+ CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData());
+ CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView();
+ Record.Values.reserve(ValuesArray.Num());
+ for (CbFieldView ValueField : ValuesArray)
+ {
+ CbObjectView ValueObject = ValueField.AsObjectView();
+ Oid ValueId = ValueObject["Id"sv].AsObjectId();
+ CbFieldView RawHashField = ValueObject["RawHash"sv];
+ IoHash RawHash = RawHashField.AsBinaryAttachment();
+ if (ValueId && !RawHashField.HasError())
+ {
+ Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()});
+ }
+ }
+ }
+ }
+
+ for (const RecordValue& Value : Record.Values)
+ {
+ if (Value.ValueId == Request->Key->ValueId)
+ {
+ Request->Key->ChunkId = Value.ContentId;
+ Request->RawSize = Value.RawSize;
+ Request->RawSizeKnown = true;
+ break;
+ }
+ }
+ }
+
+ // Now load the ContentId from the local ContentIdStore or from the upstream
+ if (Request->Key->ChunkId != IoHash::Zero)
+ {
+ if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
+ {
+ if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown)
+ {
+ if (m_CidStore.ContainsChunk(Request->Key->ChunkId))
+ {
+ Request->Exists = true;
+ }
+ }
+ else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId))
+ {
+ if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
+ {
+ Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(Payload));
+ if (Request->Value)
+ {
+ Request->Exists = true;
+ Request->RawSizeKnown = false;
+ }
+ }
+ else
+ {
+ IoHash _;
+ if (CompressedBuffer::ValidateCompressedHeader(Payload, _, Request->RawSize))
+ {
+ Request->Exists = true;
+ Request->RawSizeKnown = true;
+ }
+ }
+ }
+ }
+ if (HasUpstream && !Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
+ {
+ Request->Key->Policy = ConvertToUpstream(Request->DownstreamPolicy);
+ OutUpstreamChunks.push_back(Request->Key);
+ }
+ }
+ Request->ElapsedTimeUs += Timer.GetElapsedTimeUs();
+ }
+}
+
+void
+CacheRpcHandler::GetLocalCacheValues(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::vector<cache::detail::ChunkRequest*>& ValueRequests,
+ std::vector<CacheChunkRequest*>& OutUpstreamChunks)
+{
+ ZEN_TRACE_CPU("Z$::GetLocalCacheValues");
+
+ using namespace cache::detail;
+ const bool HasUpstream = m_UpstreamCache.IsActive();
+
+ for (ChunkRequest* Request : ValueRequests)
+ {
+ ZEN_TRACE_CPU("Z$::GetLocalCacheValues::Value");
+
+ Stopwatch Timer;
+ if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
+ {
+ ZenCacheValue CacheValue;
+ if (m_CacheStore.Get(Context, Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue))
+ {
+ if (IsCompressedBinary(CacheValue.Value.GetContentType()))
+ {
+ Request->Key->ChunkId = CacheValue.RawHash;
+ Request->Exists = true;
+ Request->RawSize = CacheValue.RawSize;
+ Request->RawSizeKnown = true;
+ if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
+ {
+ Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value));
+ }
+ }
+ }
+ }
+ if (HasUpstream && !Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
+ {
+ if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::StoreLocal))
+ {
+ // Convert the Offset,Size request into a request for the entire value; we will need it all to be able to store it locally
+ Request->Key->RawOffset = 0;
+ Request->Key->RawSize = UINT64_MAX;
+ }
+ OutUpstreamChunks.push_back(Request->Key);
+ }
+ Request->ElapsedTimeUs += Timer.GetElapsedTimeUs();
+ }
+}
+
+void
+CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::vector<CacheChunkRequest*>& UpstreamChunks,
+ std::vector<CacheChunkRequest>& RequestKeys,
+ std::vector<cache::detail::ChunkRequest>& Requests)
+{
+ if (UpstreamChunks.empty())
+ {
+ return;
+ }
+ ZEN_TRACE_CPU("Z$::GetUpstreamCacheChunks");
+
+ using namespace cache::detail;
+
+ const auto OnCacheChunksGetComplete = [this, Namespace, &RequestKeys, &Requests, Context](CacheChunkGetCompleteParams&& Params) {
+ if (Params.RawHash == Params.RawHash.Zero)
+ {
+ return;
+ }
+
+ CacheChunkRequest& Key = Params.Request;
+ size_t RequestIndex = std::distance(RequestKeys.data(), &Key);
+ ChunkRequest& Request = Requests[RequestIndex];
+ Request.ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
+ if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) ||
+ !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Params.Value));
+ if (!Compressed)
+ {
+ return;
+ }
+
+ bool StoreLocal = EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ if (StoreLocal)
+ {
+ if (Request.IsRecordRequest)
+ {
+ m_CidStore.AddChunk(Params.Value, Params.RawHash);
+ }
+ else
+ {
+ m_CacheStore.Put(Context,
+ Namespace,
+ Key.Key.Bucket,
+ Key.Key.Hash,
+ {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash},
+ {});
+ m_CacheStats.WriteCount++;
+ }
+ }
+ if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
+ {
+ Request.Value = std::move(Compressed);
+ }
+ }
+ Key.ChunkId = Params.RawHash;
+ Request.Exists = true;
+ Request.RawSize = Params.RawSize;
+ Request.RawSizeKnown = true;
+ Request.Source = Params.Source;
+
+ m_CacheStats.UpstreamHitCount++;
+ };
+
+ m_UpstreamCache.GetCacheChunks(Namespace, UpstreamChunks, std::move(OnCacheChunksGetComplete));
+}
+
+CbPackage
+CacheRpcHandler::WriteGetCacheChunksResponse([[maybe_unused]] const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::vector<cache::detail::ChunkRequest>& Requests)
+{
+ ZEN_TRACE_CPU("Z$::WriteGetCacheChunksResponse");
+
+ using namespace cache::detail;
+
+ CbPackage RpcResponse;
+ CbObjectWriter Writer;
+
+ Writer.BeginArray("Result"sv);
+ for (ChunkRequest& Request : Requests)
+ {
+ ZEN_TRACE_CPU("Z$::WriteGetCacheChunksResponse::Request");
+
+ Writer.BeginObject();
+ {
+ if (Request.Exists)
+ {
+ Writer.AddHash("RawHash"sv, Request.Key->ChunkId);
+ if (Request.Value && !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
+ {
+ RpcResponse.AddAttachment(CbAttachment(Request.Value, Request.Key->ChunkId));
+ }
+ else
+ {
+ Writer.AddInteger("RawSize"sv, Request.RawSize);
+ }
+
+ ZEN_DEBUG("GETCACHECHUNKS HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}",
+ Namespace,
+ Request.Key->Key.Bucket,
+ Request.Key->Key.Hash,
+ Request.Key->ValueId,
+ NiceBytes(Request.RawSize),
+ Request.IsRecordRequest ? "Record"sv : "Value"sv,
+ Request.Source ? Request.Source->Url : "LOCAL"sv,
+ NiceLatencyNs(Request.ElapsedTimeUs * 1000));
+ m_CacheStats.HitCount++;
+ }
+ else if (!EnumHasAnyFlags(Request.DownstreamPolicy, CachePolicy::Query))
+ {
+ ZEN_DEBUG("GETCACHECHUNKS DISABLEDQUERY - '{}/{}/{}/{}' in {}",
+ Namespace,
+ Request.Key->Key.Bucket,
+ Request.Key->Key.Hash,
+ Request.Key->ValueId,
+ NiceLatencyNs(Request.ElapsedTimeUs * 1000));
+ }
+ else
+ {
+ ZEN_DEBUG("GETCACHECHUNKS MISS - '{}/{}/{}/{}' in {}",
+ Namespace,
+ Request.Key->Key.Bucket,
+ Request.Key->Key.Hash,
+ Request.Key->ValueId,
+ NiceLatencyNs(Request.ElapsedTimeUs * 1000));
+ m_CacheStats.MissCount++;
+ }
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+
+ RpcResponse.SetObject(Writer.Save());
+ return RpcResponse;
+}
+
+} // namespace zen \ No newline at end of file
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index f92baaf0b..fd04af2a3 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -1,6 +1,6 @@
// Copyright Epic Games, Inc. All Rights Reserved.
-#include "zenstore/structuredcachestore.h"
+#include "zenstore/cache/structuredcachestore.h"
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
diff --git a/src/zenstore/include/zenstore/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h
index 6997a12e4..6997a12e4 100644
--- a/src/zenstore/include/zenstore/cachedisklayer.h
+++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h
diff --git a/src/zenstore/include/zenstore/cache/cacherpc.h b/src/zenstore/include/zenstore/cache/cacherpc.h
new file mode 100644
index 000000000..c57e2818c
--- /dev/null
+++ b/src/zenstore/include/zenstore/cache/cacherpc.h
@@ -0,0 +1,155 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/iobuffer.h>
+#include <zencore/logging.h>
+#include <zenutil/cache/cache.h>
+
+#include <atomic>
+#include <string_view>
+#include <vector>
+
+namespace zen {
+
+namespace cache { namespace detail {
+ struct RecordBody;
+ struct ChunkRequest;
+}} // namespace cache::detail
+
+struct CacheChunkRequest;
+struct CacheKeyRequest;
+struct PutRequestData;
+
+class CbPackage;
+class CbObjectView;
+class CidStore;
+class DiskWriteBlocker;
+class HttpStructuredCacheService;
+class UpstreamCacheClient;
+class ZenCacheStore;
+
+enum class CachePolicy : uint32_t;
+enum class RpcAcceptOptions : uint16_t;
+
+struct AttachmentCount
+{
+ uint32_t New = 0;
+ uint32_t Valid = 0;
+ uint32_t Invalid = 0;
+ uint32_t Total = 0;
+};
+
+struct CacheStats
+{
+ std::atomic_uint64_t HitCount{};
+ std::atomic_uint64_t UpstreamHitCount{};
+ std::atomic_uint64_t MissCount{};
+ std::atomic_uint64_t WriteCount{};
+ std::atomic_uint64_t BadRequestCount{};
+ std::atomic_uint64_t RpcRequests{};
+ std::atomic_uint64_t RpcRecordRequests{};
+ std::atomic_uint64_t RpcRecordBatchRequests{};
+ std::atomic_uint64_t RpcValueRequests{};
+ std::atomic_uint64_t RpcValueBatchRequests{};
+ std::atomic_uint64_t RpcChunkRequests{};
+ std::atomic_uint64_t RpcChunkBatchRequests{};
+};
+
+enum class PutResult
+{
+ Success,
+ Fail,
+ Invalid,
+};
+
+/** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys.
+
+ We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers.
+ */
+
+constexpr bool
+IsCompressedBinary(ZenContentType Type)
+{
+ return Type == ZenContentType::kBinary || Type == ZenContentType::kCompressedBinary;
+}
+
+struct CacheRpcHandler
+{
+ CacheRpcHandler(LoggerRef InLog,
+ CacheStats& InCacheStats,
+ UpstreamCacheClient& InUpstreamCache,
+ ZenCacheStore& InCacheStore,
+ CidStore& InCidStore,
+ const DiskWriteBlocker* InDiskWriteBlocker);
+ ~CacheRpcHandler();
+
+ enum class RpcResponseCode
+ {
+ InsufficientStorage = 507,
+ BadRequest = 400,
+ OK = 200
+ };
+
+ RpcResponseCode HandleRpcRequest(const CacheRequestContext& Context,
+ const ZenContentType ContentType,
+ IoBuffer&& Body,
+ uint32_t& OutAcceptMagic,
+ RpcAcceptOptions& OutAcceptFlags,
+ int& OutTargetProcessId,
+ CbPackage& OutPackage);
+
+private:
+ CbPackage HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest);
+ CbPackage HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView BatchRequest);
+ CbPackage HandleRpcPutCacheValues(const CacheRequestContext& Context, const CbPackage& BatchRequest);
+ CbPackage HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest);
+ CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, CbObjectView BatchRequest);
+
+ PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
+
+ /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */
+ bool ParseGetCacheChunksRequest(std::string& Namespace,
+ std::vector<CacheKeyRequest>& RecordKeys,
+ std::vector<cache::detail::RecordBody>& Records,
+ std::vector<CacheChunkRequest>& RequestKeys,
+ std::vector<cache::detail::ChunkRequest>& Requests,
+ std::vector<cache::detail::ChunkRequest*>& RecordRequests,
+ std::vector<cache::detail::ChunkRequest*>& ValueRequests,
+ CbObjectView RpcRequest);
+ /** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally.
+ */
+ void GetLocalCacheRecords(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::vector<CacheKeyRequest>& RecordKeys,
+ std::vector<cache::detail::RecordBody>& Records,
+ std::vector<cache::detail::ChunkRequest*>& RecordRequests,
+ std::vector<CacheChunkRequest*>& OutUpstreamChunks);
+ /** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */
+ void GetLocalCacheValues(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::vector<cache::detail::ChunkRequest*>& ValueRequests,
+ std::vector<CacheChunkRequest*>& OutUpstreamChunks);
+ /** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */
+ void GetUpstreamCacheChunks(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::vector<CacheChunkRequest*>& UpstreamChunks,
+ std::vector<CacheChunkRequest>& RequestKeys,
+ std::vector<cache::detail::ChunkRequest>& Requests);
+ /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */
+ CbPackage WriteGetCacheChunksResponse(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::vector<cache::detail::ChunkRequest>& Requests);
+
+ LoggerRef Log() { return m_Log; }
+ LoggerRef m_Log;
+ CacheStats& m_CacheStats;
+ UpstreamCacheClient& m_UpstreamCache;
+ ZenCacheStore& m_CacheStore;
+ CidStore& m_CidStore;
+ const DiskWriteBlocker* m_DiskWriteBlocker = nullptr;
+
+ bool AreDiskWritesAllowed() const;
+};
+
+} // namespace zen \ No newline at end of file
diff --git a/src/zenstore/include/zenstore/cacheshared.h b/src/zenstore/include/zenstore/cache/cacheshared.h
index e3e8a2f84..e3e8a2f84 100644
--- a/src/zenstore/include/zenstore/cacheshared.h
+++ b/src/zenstore/include/zenstore/cache/cacheshared.h
diff --git a/src/zenstore/include/zenstore/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h
index 5732735cf..296e4a431 100644
--- a/src/zenstore/include/zenstore/structuredcachestore.h
+++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h
@@ -5,7 +5,7 @@
#include <zencore/compactbinary.h>
#include <zencore/iohash.h>
#include <zencore/stats.h>
-#include <zenstore/cachedisklayer.h>
+#include <zenstore/cache/cachedisklayer.h>
#include <zenstore/gc.h>
#include <zenutil/cache/cache.h>
#include <zenutil/statsreporter.h>
diff --git a/src/zenstore/include/zenstore/cache/upstreamcacheclient.h b/src/zenstore/include/zenstore/cache/upstreamcacheclient.h
new file mode 100644
index 000000000..152031c3a
--- /dev/null
+++ b/src/zenstore/include/zenstore/cache/upstreamcacheclient.h
@@ -0,0 +1,119 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/compactbinary.h>
+#include <zencore/compress.h>
+#include <zencore/iobuffer.h>
+#include <zencore/iohash.h>
+#include <zencore/stats.h>
+#include <zencore/zencore.h>
+#include <zenutil/cache/cache.h>
+
+#include <functional>
+#include <memory>
+#include <string>
+#include <vector>
+
+namespace zen {
+
+class CbObjectView;
+class CbPackage;
+
+struct UpstreamCacheRecord
+{
+ ZenContentType Type = ZenContentType::kBinary;
+ std::string Namespace;
+ CacheKey Key;
+ std::vector<IoHash> ValueContentIds;
+ CacheRequestContext Context;
+};
+
+struct UpstreamError
+{
+ int32_t ErrorCode{};
+ std::string Reason{};
+
+ explicit operator bool() const { return ErrorCode != 0; }
+};
+
+struct UpstreamEndpointInfo
+{
+ std::string Name;
+ std::string Url;
+};
+
+struct GetUpstreamCacheResult
+{
+ UpstreamError Error{};
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ bool Success = false;
+};
+
+struct PutUpstreamCacheResult
+{
+ std::string Reason;
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ bool Success = false;
+};
+
+struct CacheRecordGetCompleteParams
+{
+ CacheKeyRequest& Request;
+ const CbObjectView& Record;
+ const CbPackage& Package;
+ double ElapsedSeconds{};
+ const UpstreamEndpointInfo* Source = nullptr;
+};
+
+using OnCacheRecordGetComplete = std::function<void(CacheRecordGetCompleteParams&&)>;
+
+struct CacheValueGetCompleteParams
+{
+ CacheValueRequest& Request;
+ IoHash RawHash;
+ uint64_t RawSize;
+ IoBuffer Value;
+ double ElapsedSeconds{};
+ const UpstreamEndpointInfo* Source = nullptr;
+};
+
+using OnCacheValueGetComplete = std::function<void(CacheValueGetCompleteParams&&)>;
+
+struct CacheChunkGetCompleteParams
+{
+ CacheChunkRequest& Request;
+ IoHash RawHash;
+ uint64_t RawSize;
+ IoBuffer Value;
+ double ElapsedSeconds{};
+ const UpstreamEndpointInfo* Source = nullptr;
+};
+
+using OnCacheChunksGetComplete = std::function<void(CacheChunkGetCompleteParams&&)>;
+
+class UpstreamCacheClient
+{
+public:
+ virtual ~UpstreamCacheClient() = default;
+
+ virtual bool IsActive() = 0;
+
+ virtual void GetCacheValues(std::string_view Namespace,
+ std::span<CacheValueRequest*> CacheValueRequests,
+ OnCacheValueGetComplete&& OnComplete) = 0;
+
+ virtual void GetCacheRecords(std::string_view Namespace,
+ std::span<CacheKeyRequest*> Requests,
+ OnCacheRecordGetComplete&& OnComplete) = 0;
+
+ virtual void GetCacheChunks(std::string_view Namespace,
+ std::span<CacheChunkRequest*> CacheChunkRequests,
+ OnCacheChunksGetComplete&& OnComplete) = 0;
+
+ virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0;
+};
+
+} // namespace zen
diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp
index b8f9d65ef..5e00e8852 100644
--- a/src/zenutil/cache/rpcrecording.cpp
+++ b/src/zenutil/cache/rpcrecording.cpp
@@ -425,7 +425,7 @@ RecordedRequestsSegmentWriter::EndWrite()
CbObjectWriter Cbo;
Cbo << "time_start" << m_StartTime << "time_end" << m_EndTime << "duration" << Duration;
Cbo << "segment_index" << m_SegmentIndex;
- Cbo << "entry_count" << m_Entries.size() << "entry_size" << sizeof(RecordedRequest);
+ Cbo << "entry_count" << m_RequestCount << "entry_size" << sizeof(RecordedRequest);
Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold;
Cbo.BeginObject("system_info");
@@ -446,7 +446,12 @@ RecordedRequestsSegmentWriter::EndWrite()
std::error_code Ec;
IndexFile.WriteAll(IndexBuffer, Ec);
IndexFile.Close();
- m_Entries.clear();
+
+ // note that simply calling `.reset()` here will *not* release backing memory
+ // and it's important that we do because on high traffic servers this will use
+ // a lot of memory otherwise
+ std::vector<RecordedRequest> EmptyEntries;
+ swap(m_Entries, EmptyEntries);
}
uint64_t
diff --git a/src/zenhttp/include/zenhttp/httpshared.h b/src/zenutil/include/zenutil/packageformat.h
index cf74b6b21..0b8495fbd 100644
--- a/src/zenhttp/include/zenhttp/httpshared.h
+++ b/src/zenutil/include/zenutil/packageformat.h
@@ -158,6 +158,6 @@ private:
IoBuffer MarshalLocalChunkReference(IoBuffer AttachmentBuffer);
};
-void forcelink_httpshared();
+void forcelink_packageformat();
} // namespace zen
diff --git a/src/zenhttp/httpshared.cpp b/src/zenutil/packageformat.cpp
index ca014bf1c..015782283 100644
--- a/src/zenhttp/httpshared.cpp
+++ b/src/zenutil/packageformat.cpp
@@ -1,6 +1,6 @@
// Copyright Epic Games, Inc. All Rights Reserved.
-#include <zenhttp/httpshared.h>
+#include <zenutil/packageformat.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
@@ -818,7 +818,7 @@ TEST_CASE("CbPackage.LocalRef")
}
void
-forcelink_httpshared()
+forcelink_packageformat()
{
}
diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp
index d9d6c83a2..8544f3401 100644
--- a/src/zenutil/zenutil.cpp
+++ b/src/zenutil/zenutil.cpp
@@ -6,6 +6,7 @@
# include <zenutil/basicfile.h>
# include <zenutil/cache/rpcrecording.h>
+# include <zenutil/packageformat.h>
namespace zen {
@@ -14,6 +15,7 @@ zenutil_forcelinktests()
{
basicfile_forcelink();
cache::rpcrecord_forcelink();
+ forcelink_packageformat();
}
} // namespace zen