aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/cacherpc.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-06-04 19:30:34 +0200
committerGitHub Enterprise <[email protected]>2024-06-04 19:30:34 +0200
commitbbe46452530a98a0bd36c0024d4f3f914ae23604 (patch)
treee9c901a6ec68d087bc7e746b38d1573b2999a0ef /src/zenstore/cache/cacherpc.cpp
parentUse a smaller thread pool for network operations when doing oplog import to r... (diff)
downloadzen-bbe46452530a98a0bd36c0024d4f3f914ae23604.tar.xz
zen-bbe46452530a98a0bd36c0024d4f3f914ae23604.zip
add batching of CacheStore requests for GetCacheValues/GetCacheChunks (#90)
* cache file size of block on open * add ability to control size limit for small chunk callback when iterating block * Add batch fetch of cache values in the GetCacheValues request
Diffstat (limited to 'src/zenstore/cache/cacherpc.cpp')
-rw-r--r--src/zenstore/cache/cacherpc.cpp166
1 files changed, 111 insertions, 55 deletions
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index d28eda8c4..f6e5d16b3 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -985,69 +985,93 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
const bool HasUpstream = m_UpstreamCache.IsActive();
- CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- for (CbFieldView RequestField : RequestsArray)
+ CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
+ std::vector<ZenCacheValue> CacheValues;
+ const uint64_t RequestCount = RequestsArray.Num();
+ CacheValues.reserve(RequestCount);
{
- ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Request");
-
- m_CacheStats.RpcValueBatchRequests.fetch_add(1);
+ std::unique_ptr<ZenCacheStore::GetBatch> Batch;
+ if (RequestCount > 1)
+ {
+ Batch = std::make_unique<ZenCacheStore::GetBatch>(m_CacheStore, *Namespace, CacheValues);
+ }
+ for (CbFieldView RequestField : RequestsArray)
+ {
+ ZEN_TRACE_CPU("Z$::RpcGetCacheValues::Request");
- Stopwatch Timer;
+ m_CacheStats.RpcValueBatchRequests.fetch_add(1);
- RequestData& Request = Requests.emplace_back();
- CbObjectView RequestObject = RequestField.AsObjectView();
- CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
+ RequestData& Request = Requests.emplace_back();
+ CbObjectView RequestObject = RequestField.AsObjectView();
+ CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
- if (!GetRpcRequestCacheKey(KeyObject, Request.Key))
- {
- return CbPackage{};
- }
+ if (!GetRpcRequestCacheKey(KeyObject, Request.Key))
+ {
+ return CbPackage{};
+ }
- PolicyText = RequestObject["Policy"sv].AsString();
- Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
+ PolicyText = RequestObject["Policy"sv].AsString();
+ Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
- CacheKey& Key = Request.Key;
- CachePolicy Policy = Request.Policy;
+ CacheKey& Key = Request.Key;
+ CachePolicy Policy = Request.Policy;
- ZenCacheValue CacheValue;
- if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
+ ZenCacheValue CacheValue;
+ if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
+ {
+ if (Batch)
+ {
+ m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, *Batch.get());
+ }
+ else
+ {
+ CacheValues.push_back({});
+ m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, CacheValues.back());
+ }
+ }
+ else
+ {
+ CacheValues.push_back({});
+ }
+ }
+ Batch.reset();
+ ZEN_ASSERT(CacheValues.size() == RequestsArray.Num());
+ }
+ for (size_t RequestIndex = 0; RequestIndex < CacheValues.size(); RequestIndex++)
+ {
+ RequestData& Request = Requests[RequestIndex];
+ ZenCacheValue& Value = CacheValues[RequestIndex];
+ if (Value.Value)
{
- if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, CacheValue) &&
- IsCompressedBinary(CacheValue.Value.GetContentType()))
+ if (IsCompressedBinary(Value.Value.GetContentType()))
{
- Request.RawHash = CacheValue.RawHash;
- Request.RawSize = CacheValue.RawSize;
- Request.Result = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value));
+ Request.RawHash = Value.RawHash;
+ Request.RawSize = Value.RawSize;
+ Request.Result = CompressedBuffer::FromCompressedNoValidate(std::move(Value.Value));
}
}
if (Request.Result)
{
- ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
+ ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({})",
*Namespace,
- Key.Bucket,
- Key.Hash,
+ Request.Key.Bucket,
+ Request.Key.Hash,
NiceBytes(Request.Result.GetCompressed().GetSize()),
- "LOCAL"sv,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ "LOCAL"sv);
m_CacheStats.HitCount++;
}
- else if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::QueryRemote))
+ else if (HasUpstream && EnumHasAllFlags(Request.Policy, CachePolicy::QueryRemote))
{
- RemoteRequestIndexes.push_back(Requests.size() - 1);
+ RemoteRequestIndexes.push_back(RequestIndex);
}
- else if (!EnumHasAnyFlags(Policy, CachePolicy::Query))
+ else if (!EnumHasAnyFlags(Request.Policy, CachePolicy::Query))
{
// If they requested no query, do not record this as a miss
- ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash);
+ ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Request.Key.Bucket, Request.Key.Hash);
}
else
{
- ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}",
- *Namespace,
- Key.Bucket,
- Key.Hash,
- "LOCAL"sv,
- NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({})", *Namespace, Request.Key.Bucket, Request.Key.Hash, "LOCAL"sv);
m_CacheStats.MissCount++;
}
}
@@ -1497,29 +1521,61 @@ CacheRpcHandler::GetLocalCacheValues(const CacheRequestContext& Context,
using namespace cache::detail;
const bool HasUpstream = m_UpstreamCache.IsActive();
- for (ChunkRequest* Request : ValueRequests)
+ std::vector<ZenCacheValue> Chunks;
+ Chunks.reserve(ValueRequests.size());
{
- ZEN_TRACE_CPU("Z$::GetLocalCacheValues::Value");
-
- Stopwatch Timer;
- if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
+ std::unique_ptr<ZenCacheStore::GetBatch> Batch;
+ if (ValueRequests.size() > 1)
{
- ZenCacheValue CacheValue;
- if (m_CacheStore.Get(Context, Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue))
+ Batch = std::make_unique<ZenCacheStore::GetBatch>(m_CacheStore, Namespace, Chunks);
+ }
+ for (ChunkRequest* Request : ValueRequests)
+ {
+ Stopwatch Timer;
+ if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
{
- if (IsCompressedBinary(CacheValue.Value.GetContentType()))
+ if (Batch)
{
- Request->Key->ChunkId = CacheValue.RawHash;
- Request->Exists = true;
- Request->RawSize = CacheValue.RawSize;
- Request->RawSizeKnown = true;
- if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
- {
- Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(CacheValue.Value));
- }
+ m_CacheStore.Get(Context, Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, *Batch.get());
+ }
+ else
+ {
+ Chunks.push_back({});
+ m_CacheStore.Get(Context, Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, Chunks.back());
+ }
+ }
+ else
+ {
+ Chunks.push_back({});
+ }
+ Request->ElapsedTimeUs += Timer.GetElapsedTimeUs();
+ }
+ }
+ for (size_t RequestIndex = 0; RequestIndex < ValueRequests.size(); RequestIndex++)
+ {
+ Stopwatch Timer;
+ ChunkRequest* Request = ValueRequests[RequestIndex];
+ if (Chunks[RequestIndex].Value)
+ {
+ if (IsCompressedBinary(Chunks[RequestIndex].Value.GetContentType()))
+ {
+ Request->Key->ChunkId = Chunks[RequestIndex].RawHash;
+ Request->Exists = true;
+ Request->RawSize = Chunks[RequestIndex].RawSize;
+ Request->RawSizeKnown = true;
+ if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData))
+ {
+ Request->Value = CompressedBuffer::FromCompressedNoValidate(std::move(Chunks[RequestIndex].Value));
}
}
}
+ Request->ElapsedTimeUs += Timer.GetElapsedTimeUs();
+ }
+
+ for (ChunkRequest* Request : ValueRequests)
+ {
+ ZEN_TRACE_CPU("Z$::GetLocalCacheValues::Value");
+ Stopwatch Timer;
if (HasUpstream && !Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote))
{
if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::StoreLocal))