diff options
Diffstat (limited to 'src/zenstore/cache/cacherpc.cpp')
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 69 |
1 files changed, 66 insertions, 3 deletions
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index e6b6be525..cca51e63e 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -10,12 +10,13 @@ #include <zencore/scopeguard.h> #include <zencore/timer.h> #include <zencore/trace.h> +#include <zenhttp/packageformat.h> #include <zenstore/cache/cacheshared.h> #include <zenstore/cache/structuredcachestore.h> #include <zenstore/cache/upstreamcacheclient.h> #include <zenstore/cidstore.h> #include <zenutil/cache/cacherequests.h> -#include <zenutil/packageformat.h> +#include <zenutil/workerpools.h> #include <zencore/memory/llm.h> @@ -561,7 +562,17 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData()); ParseValues(Request); - Request.Complete = true; + Request.Complete = true; + size_t ValueCount = Request.Values.size(); + std::vector<IoHash> CidHashes; + std::vector<size_t> RequestValueIndexes; + const bool DoBatch = ValueCount > 7; + if (DoBatch) + { + CidHashes.reserve(ValueCount); + RequestValueIndexes.reserve(ValueCount); + } + size_t ValueIndex = 0; for (ValueRequestData& Value : Request.Values) { CachePolicy ValuePolicy = Value.DownstreamPolicy; @@ -596,6 +607,11 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb Request.Complete = false; } } + else if (DoBatch) + { + CidHashes.push_back(Value.ContentId); + RequestValueIndexes.push_back(ValueIndex); + } else { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId)) @@ -611,7 +627,6 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId); } } - if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) { NeedUpstreamAttachment = true; @@ -619,6 +634,53 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } Request.Complete = false; } + ValueIndex++; + } + if (!RequestValueIndexes.empty()) + { + m_CidStore.IterateChunks( + CidHashes, + [this, &Request, ValueCount, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool { + try + { + const size_t ValueIndex = RequestValueIndexes[Index]; + ValueRequestData& Value = Request.Values[ValueIndex]; + if (Payload) + { + Value.Payload = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Payload)).MakeOwned(); + if (Value.Payload) + { + Value.Exists = true; + } + else + { + ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId); + } + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("CacheRpcHandler::HandleRpcGetCacheRecords IterateChunks callback failed with '{}'", Ex.what()); + } + return true; + }, + &GetSmallWorkerPool(EWorkloadType::Burst), + 64u * 1024u); + + for (size_t Index : RequestValueIndexes) + { + ValueRequestData& Value = Request.Values[Index]; + if (!Value.Exists) + { + const CachePolicy ValuePolicy = Value.DownstreamPolicy; + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { + NeedUpstreamAttachment = true; + Value.ReadFromUpstream = true; + } + Request.Complete = false; + } + } } } } @@ -1415,6 +1477,7 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, using namespace cache::detail; const bool HasUpstream = m_UpstreamCache.IsActive(); + // TODO: BatchGet records? std::vector<CacheKeyRequest*> UpstreamRecordRequests; for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex) { |