aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/cacherpc.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-12-17 14:20:48 +0100
committerGitHub Enterprise <[email protected]>2024-12-17 14:20:48 +0100
commit48657d2fb6c9c709ec29264a609350c7b4a541f9 (patch)
treea2c1997a99e9659ee038663e1a8da4b56ec01ad5 /src/zenstore/cache/cacherpc.cpp
parentremove all referenced attachments in op from pending chunk references (#267) (diff)
downloadzen-48657d2fb6c9c709ec29264a609350c7b4a541f9.tar.xz
zen-48657d2fb6c9c709ec29264a609350c7b4a541f9.zip
batch fetch record cache values (#266)
- Improvement: Batch fetch record attachments when appropriate - Improvement: Reduce memory buffer allocation in BlockStore::IterateBlock - Improvement: Tweaked BlockStore::IterateBlock logic when to use threaded work (at least 4 chunks requested) - Bugfix: CasContainerStrategy::IterateChunks could give wrong payload/index when requesting 1 or 2 chunks
Diffstat (limited to 'src/zenstore/cache/cacherpc.cpp')
-rw-r--r--src/zenstore/cache/cacherpc.cpp67
1 files changed, 65 insertions, 2 deletions
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index e6b6be525..0a1ed0e09 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -16,6 +16,7 @@
#include <zenstore/cidstore.h>
#include <zenutil/cache/cacherequests.h>
#include <zenutil/packageformat.h>
+#include <zenutil/workerpools.h>
#include <zencore/memory/llm.h>
@@ -561,7 +562,17 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData());
ParseValues(Request);
- Request.Complete = true;
+ Request.Complete = true;
+ size_t ValueCount = Request.Values.size();
+ std::vector<IoHash> CidHashes;
+ std::vector<size_t> RequestValueIndexes;
+ const bool DoBatch = ValueCount > 7;
+ if (DoBatch)
+ {
+ CidHashes.reserve(ValueCount);
+ RequestValueIndexes.reserve(ValueCount);
+ }
+ size_t ValueIndex = 0;
for (ValueRequestData& Value : Request.Values)
{
CachePolicy ValuePolicy = Value.DownstreamPolicy;
@@ -596,6 +607,11 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
Request.Complete = false;
}
}
+ else if (DoBatch)
+ {
+ CidHashes.push_back(Value.ContentId);
+ RequestValueIndexes.push_back(ValueIndex);
+ }
else
{
if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId))
@@ -611,7 +627,6 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId);
}
}
-
if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
{
NeedUpstreamAttachment = true;
@@ -619,6 +634,53 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
}
Request.Complete = false;
}
+ ValueIndex++;
+ }
+ if (!RequestValueIndexes.empty())
+ {
+ m_CidStore.IterateChunks(
+ CidHashes,
+ [this, &Request, ValueCount, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool {
+ try
+ {
+ const size_t ValueIndex = RequestValueIndexes[Index];
+ ValueRequestData& Value = Request.Values[ValueIndex];
+ if (Payload)
+ {
+ Value.Payload = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Payload)).MakeOwned();
+ if (Value.Payload)
+ {
+ Value.Exists = true;
+ }
+ else
+ {
+ ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId);
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("CacheRpcHandler::HandleRpcGetCacheRecords IterateChunks callback failed with '{}'", Ex.what());
+ }
+ return true;
+ },
+ &GetSmallWorkerPool(EWorkloadType::Burst),
+ 64u * 1024u);
+
+ for (size_t Index : RequestValueIndexes)
+ {
+ ValueRequestData& Value = Request.Values[Index];
+ if (!Value.Exists)
+ {
+ const CachePolicy ValuePolicy = Value.DownstreamPolicy;
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ NeedUpstreamAttachment = true;
+ Value.ReadFromUpstream = true;
+ }
+ Request.Complete = false;
+ }
+ }
}
}
}
@@ -1415,6 +1477,7 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
using namespace cache::detail;
const bool HasUpstream = m_UpstreamCache.IsActive();
+ // TODO: BatchGet records?
std::vector<CacheKeyRequest*> UpstreamRecordRequests;
for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex)
{