diff options
| author | Dan Engelbrecht <[email protected]> | 2024-06-04 19:30:34 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-06-04 19:30:34 +0200 |
| commit | bbe46452530a98a0bd36c0024d4f3f914ae23604 (patch) | |
| tree | e9c901a6ec68d087bc7e746b38d1573b2999a0ef /src/zenstore/cache/cacherpc.cpp | |
| parent | Use a smaller thread pool for network operations when doing oplog import to r... (diff) | |
| download | zen-bbe46452530a98a0bd36c0024d4f3f914ae23604.tar.xz zen-bbe46452530a98a0bd36c0024d4f3f914ae23604.zip | |
add batching of CacheStore requests for GetCacheValues/GetCacheChunks (#90)
* cache file size of block on open
* add ability to control size limit for small chunk callback when iterating block
* Add batch fetch of cache values in the GetCacheValues request
Diffstat (limited to 'src/zenstore/cache/cacherpc.cpp')
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 166 |
1 files changed, 111 insertions, 55 deletions
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index d28eda8c4..f6e5d16b3 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -985,69 +985,93 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO const bool HasUpstream = m_UpstreamCache.IsActive(); - CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - for (CbFieldView RequestField : RequestsArray) + CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); + std::vector<ZenCacheValue> CacheValues; + const uint64_t RequestCount = RequestsArray.Num(); + CacheValues.reserve(RequestCount); { - ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Request"); - - m_CacheStats.RpcValueBatchRequests.fetch_add(1); + std::unique_ptr<ZenCacheStore::GetBatch> Batch; + if (RequestCount > 1) + { + Batch = std::make_unique<ZenCacheStore::GetBatch>(m_CacheStore, *Namespace, CacheValues); + } + for (CbFieldView RequestField : RequestsArray) + { + ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Request"); - Stopwatch Timer; + m_CacheStats.RpcValueBatchRequests.fetch_add(1); - RequestData& Request = Requests.emplace_back(); - CbObjectView RequestObject = RequestField.AsObjectView(); - CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); + RequestData& Request = Requests.emplace_back(); + CbObjectView RequestObject = RequestField.AsObjectView(); + CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); - if (!GetRpcRequestCacheKey(KeyObject, Request.Key)) - { - return CbPackage{}; - } + if (!GetRpcRequestCacheKey(KeyObject, Request.Key)) + { + return CbPackage{}; + } - PolicyText = RequestObject["Policy"sv].AsString(); - Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; + PolicyText = RequestObject["Policy"sv].AsString(); + Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; - CacheKey& Key = Request.Key; - CachePolicy Policy = Request.Policy; + CacheKey& Key = Request.Key; + CachePolicy Policy = Request.Policy; - ZenCacheValue CacheValue; - if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) + ZenCacheValue CacheValue; + if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) + { + if (Batch) + { + m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, *Batch.get()); + } + else + { + CacheValues.push_back({}); + m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, CacheValues.back()); + } + } + else + { + CacheValues.push_back({}); + } + } + Batch.reset(); + ZEN_ASSERT(CacheValues.size() == RequestsArray.Num()); + } + for (size_t RequestIndex = 0; RequestIndex < CacheValues.size(); RequestIndex++) + { + RequestData& Request = Requests[RequestIndex]; + ZenCacheValue& Value = CacheValues[RequestIndex]; + if (Value.Value) { - if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, CacheValue) && - IsCompressedBinary(CacheValue.Value.GetContentType())) + if (IsCompressedBinary(Value.Value.GetContentType())) { - Request.RawHash = CacheValue.RawHash; - Request.RawSize = CacheValue.RawSize; - Request.Result = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value)); + Request.RawHash = Value.RawHash; + Request.RawSize = Value.RawSize; + Request.Result = CompressedBuffer::FromCompressedNoValidate(std::move(Value.Value)); } } if (Request.Result) { - ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", + ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({})", *Namespace, - Key.Bucket, - Key.Hash, + Request.Key.Bucket, + Request.Key.Hash, NiceBytes(Request.Result.GetCompressed().GetSize()), - "LOCAL"sv, - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + "LOCAL"sv); m_CacheStats.HitCount++; } - else if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::QueryRemote)) + else if (HasUpstream && EnumHasAllFlags(Request.Policy, CachePolicy::QueryRemote)) { - RemoteRequestIndexes.push_back(Requests.size() - 1); + RemoteRequestIndexes.push_back(RequestIndex); } - else if (!EnumHasAnyFlags(Policy, CachePolicy::Query)) + else if (!EnumHasAnyFlags(Request.Policy, CachePolicy::Query)) { // If they requested no query, do not record this as a miss - ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash); + ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Request.Key.Bucket, Request.Key.Hash); } else { - ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}", - *Namespace, - Key.Bucket, - Key.Hash, - "LOCAL"sv, - NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({})", *Namespace, Request.Key.Bucket, Request.Key.Hash, "LOCAL"sv); m_CacheStats.MissCount++; } } @@ -1497,29 +1521,61 @@ CacheRpcHandler::GetLocalCacheValues(const CacheRequestContext& Context, using namespace cache::detail; const bool HasUpstream = m_UpstreamCache.IsActive(); - for (ChunkRequest* Request : ValueRequests) + std::vector<ZenCacheValue> Chunks; + Chunks.reserve(ValueRequests.size()); { - ZEN_TRACE_CPU("Z$::GetLocalCacheValues::Value"); - - Stopwatch Timer; - if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) + std::unique_ptr<ZenCacheStore::GetBatch> Batch; + if (ValueRequests.size() > 1) { - ZenCacheValue CacheValue; - if (m_CacheStore.Get(Context, Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue)) + Batch = std::make_unique<ZenCacheStore::GetBatch>(m_CacheStore, Namespace, Chunks); + } + for (ChunkRequest* Request : ValueRequests) + { + Stopwatch Timer; + if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) { - if (IsCompressedBinary(CacheValue.Value.GetContentType())) + if (Batch) { - Request->Key->ChunkId = CacheValue.RawHash; - Request->Exists = true; - Request->RawSize = CacheValue.RawSize; - Request->RawSizeKnown = true; - if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) - { - Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value)); - } + m_CacheStore.Get(Context, Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, *Batch.get()); + } + else + { + Chunks.push_back({}); + m_CacheStore.Get(Context, Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, Chunks.back()); + } + } + else + { + Chunks.push_back({}); + } + Request->ElapsedTimeUs += Timer.GetElapsedTimeUs(); + } + } + for (size_t RequestIndex = 0; RequestIndex < ValueRequests.size(); RequestIndex++) + { + Stopwatch Timer; + ChunkRequest* Request = ValueRequests[RequestIndex]; + if (Chunks[RequestIndex].Value) + { + if (IsCompressedBinary(Chunks[RequestIndex].Value.GetContentType())) + { + Request->Key->ChunkId = Chunks[RequestIndex].RawHash; + Request->Exists = true; + Request->RawSize = Chunks[RequestIndex].RawSize; + Request->RawSizeKnown = true; + if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) + { + Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(Chunks[RequestIndex].Value)); } } } + Request->ElapsedTimeUs += Timer.GetElapsedTimeUs(); + } + + for (ChunkRequest* Request : ValueRequests) + { + ZEN_TRACE_CPU("Z$::GetLocalCacheValues::Value"); + Stopwatch Timer; if (HasUpstream && !Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote)) { if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::StoreLocal)) |