diff options
| author | Dan Engelbrecht <[email protected]> | 2024-12-17 14:20:48 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-12-17 14:20:48 +0100 |
| commit | 48657d2fb6c9c709ec29264a609350c7b4a541f9 (patch) | |
| tree | a2c1997a99e9659ee038663e1a8da4b56ec01ad5 /src/zenstore/cache/cacherpc.cpp | |
| parent | remove all referenced attachments in op from pending chunk references (#267) (diff) | |
| download | zen-48657d2fb6c9c709ec29264a609350c7b4a541f9.tar.xz zen-48657d2fb6c9c709ec29264a609350c7b4a541f9.zip | |
batch fetch record cache values (#266)
- Improvement: Batch fetch record attachments when appropriate
- Improvement: Reduce memory buffer allocation in BlockStore::IterateBlock
- Improvement: Tweaked BlockStore::IterateBlock logic when to use threaded work (at least 4 chunks requested)
- Bugfix: CasContainerStrategy::IterateChunks could give wrong payload/index when requesting 1 or 2 chunks
Diffstat (limited to 'src/zenstore/cache/cacherpc.cpp')
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 67 |
1 files changed, 65 insertions, 2 deletions
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index e6b6be525..0a1ed0e09 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -16,6 +16,7 @@ #include <zenstore/cidstore.h> #include <zenutil/cache/cacherequests.h> #include <zenutil/packageformat.h> +#include <zenutil/workerpools.h> #include <zencore/memory/llm.h> @@ -561,7 +562,17 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData()); ParseValues(Request); - Request.Complete = true; + Request.Complete = true; + size_t ValueCount = Request.Values.size(); + std::vector<IoHash> CidHashes; + std::vector<size_t> RequestValueIndexes; + const bool DoBatch = ValueCount > 7; + if (DoBatch) + { + CidHashes.reserve(ValueCount); + RequestValueIndexes.reserve(ValueCount); + } + size_t ValueIndex = 0; for (ValueRequestData& Value : Request.Values) { CachePolicy ValuePolicy = Value.DownstreamPolicy; @@ -596,6 +607,11 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb Request.Complete = false; } } + else if (DoBatch) + { + CidHashes.push_back(Value.ContentId); + RequestValueIndexes.push_back(ValueIndex); + } else { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId)) @@ -611,7 +627,6 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId); } } - if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) { NeedUpstreamAttachment = true; @@ -619,6 +634,53 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } Request.Complete = false; } + ValueIndex++; + } + if (!RequestValueIndexes.empty()) + { + m_CidStore.IterateChunks( + CidHashes, + [this, &Request, ValueCount, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool { + try + { + const size_t ValueIndex = RequestValueIndexes[Index]; + ValueRequestData& Value = Request.Values[ValueIndex]; + if (Payload) + { + Value.Payload = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Payload)).MakeOwned(); + if (Value.Payload) + { + Value.Exists = true; + } + else + { + ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId); + } + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("CacheRpcHandler::HandleRpcGetCacheRecords IterateChunks callback failed with '{}'", Ex.what()); + } + return true; + }, + &GetSmallWorkerPool(EWorkloadType::Burst), + 64u * 1024u); + + for (size_t Index : RequestValueIndexes) + { + ValueRequestData& Value = Request.Values[Index]; + if (!Value.Exists) + { + const CachePolicy ValuePolicy = Value.DownstreamPolicy; + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { + NeedUpstreamAttachment = true; + Value.ReadFromUpstream = true; + } + Request.Complete = false; + } + } } } } @@ -1415,6 +1477,7 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, using namespace cache::detail; const bool HasUpstream = m_UpstreamCache.IsActive(); + // TODO: BatchGet records? std::vector<CacheKeyRequest*> UpstreamRecordRequests; for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex) { |