aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/cacherpc.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2025-01-29 15:08:03 +0100
committerStefan Boberg <[email protected]>2025-01-29 15:08:03 +0100
commite64c8727ecb073ca03e2c7d4b3972c375c1b6315 (patch)
tree04a1a7c178c43666de7f7f9b472ed156f6373da5 /src/zenstore/cache/cacherpc.cpp
parentMerge branch 'main' of https://github.ol.epicgames.net/ue-foundation/zen (diff)
parenthandle special backslash followed by quote for paths (#279) (diff)
downloadzen-sb/cleanup-main.tar.xz
zen-sb/cleanup-main.zip
Merge branch 'main' of https://github.ol.epicgames.net/ue-foundation/zensb/cleanup-main
Diffstat (limited to 'src/zenstore/cache/cacherpc.cpp')
-rw-r--r--src/zenstore/cache/cacherpc.cpp69
1 files changed, 66 insertions, 3 deletions
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index e6b6be525..cca51e63e 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -10,12 +10,13 @@
#include <zencore/scopeguard.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
+#include <zenhttp/packageformat.h>
#include <zenstore/cache/cacheshared.h>
#include <zenstore/cache/structuredcachestore.h>
#include <zenstore/cache/upstreamcacheclient.h>
#include <zenstore/cidstore.h>
#include <zenutil/cache/cacherequests.h>
-#include <zenutil/packageformat.h>
+#include <zenutil/workerpools.h>
#include <zencore/memory/llm.h>
@@ -561,7 +562,17 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData());
ParseValues(Request);
- Request.Complete = true;
+ Request.Complete = true;
+ size_t ValueCount = Request.Values.size();
+ std::vector<IoHash> CidHashes;
+ std::vector<size_t> RequestValueIndexes;
+ const bool DoBatch = ValueCount > 7;
+ if (DoBatch)
+ {
+ CidHashes.reserve(ValueCount);
+ RequestValueIndexes.reserve(ValueCount);
+ }
+ size_t ValueIndex = 0;
for (ValueRequestData& Value : Request.Values)
{
CachePolicy ValuePolicy = Value.DownstreamPolicy;
@@ -596,6 +607,11 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
Request.Complete = false;
}
}
+ else if (DoBatch)
+ {
+ CidHashes.push_back(Value.ContentId);
+ RequestValueIndexes.push_back(ValueIndex);
+ }
else
{
if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId))
@@ -611,7 +627,6 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId);
}
}
-
if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
{
NeedUpstreamAttachment = true;
@@ -619,6 +634,53 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
}
Request.Complete = false;
}
+ ValueIndex++;
+ }
+ if (!RequestValueIndexes.empty())
+ {
+ m_CidStore.IterateChunks(
+ CidHashes,
+ [this, &Request, ValueCount, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool {
+ try
+ {
+ const size_t ValueIndex = RequestValueIndexes[Index];
+ ValueRequestData& Value = Request.Values[ValueIndex];
+ if (Payload)
+ {
+ Value.Payload = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Payload)).MakeOwned();
+ if (Value.Payload)
+ {
+ Value.Exists = true;
+ }
+ else
+ {
+ ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId);
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("CacheRpcHandler::HandleRpcGetCacheRecords IterateChunks callback failed with '{}'", Ex.what());
+ }
+ return true;
+ },
+ &GetSmallWorkerPool(EWorkloadType::Burst),
+ 64u * 1024u);
+
+ for (size_t Index : RequestValueIndexes)
+ {
+ ValueRequestData& Value = Request.Values[Index];
+ if (!Value.Exists)
+ {
+ const CachePolicy ValuePolicy = Value.DownstreamPolicy;
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ NeedUpstreamAttachment = true;
+ Value.ReadFromUpstream = true;
+ }
+ Request.Complete = false;
+ }
+ }
}
}
}
@@ -1415,6 +1477,7 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
using namespace cache::detail;
const bool HasUpstream = m_UpstreamCache.IsActive();
+ // TODO: BatchGet records?
std::vector<CacheKeyRequest*> UpstreamRecordRequests;
for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex)
{