aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
-rw-r--r--zenserver/cache/structuredcache.cpp148
1 files changed, 101 insertions, 47 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 8ae531720..8daf08bff 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -650,42 +650,52 @@ HttpStructuredCacheService::HandleCacheValueRequest(HttpServerRequest& Request,
void
HttpStructuredCacheService::HandleGetCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
{
+ Stopwatch Timer;
+
IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId);
bool InUpstreamCache = false;
CachePolicy Policy = PolicyFromURL;
- const bool QueryUpstream = !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote);
-
- if (QueryUpstream)
{
- if (auto UpstreamResult = m_UpstreamCache.GetCacheValue({Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId);
- UpstreamResult.Success)
+ const bool QueryUpstream = !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote);
+
+ if (QueryUpstream)
{
- if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)))
- {
- m_CidStore.AddChunk(Compressed);
- InUpstreamCache = true;
- }
- else
+ if (auto UpstreamResult = m_UpstreamCache.GetCacheValue({Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId);
+ UpstreamResult.Success)
{
- ZEN_WARN("got uncompressed upstream cache value");
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)))
+ {
+ m_CidStore.AddChunk(Compressed);
+ InUpstreamCache = true;
+ }
+ else
+ {
+ ZEN_WARN("got uncompressed upstream cache value");
+ }
}
}
}
if (!Value)
{
- ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, ToString(Request.AcceptContentType()));
+ ZEN_DEBUG("MISS - '{}/{}/{}' '{}' in {}",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ Ref.ValueContentId,
+ ToString(Request.AcceptContentType()),
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
m_CacheStats.MissCount++;
return Request.WriteResponse(HttpResponseCode::NotFound);
}
- ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})",
+ ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({}) in {}",
Ref.BucketSegment,
Ref.HashKey,
Ref.ValueContentId,
NiceBytes(Value.Size()),
ToString(Value.GetContentType()),
- InUpstreamCache ? "UPSTREAM" : "LOCAL");
+ InUpstreamCache ? "UPSTREAM" : "LOCAL",
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
m_CacheStats.HitCount++;
if (InUpstreamCache)
@@ -709,6 +719,8 @@ HttpStructuredCacheService::HandlePutCacheValue(zen::HttpServerRequest& Request,
// Note: Individual cacherecord values are not propagated upstream until a valid cache record has been stored
ZEN_UNUSED(PolicyFromURL);
+ Stopwatch Timer;
+
IoBuffer Body = Request.ReadPayload();
if (!Body || Body.Size() == 0)
@@ -734,13 +746,14 @@ HttpStructuredCacheService::HandlePutCacheValue(zen::HttpServerRequest& Request,
CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed);
- ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}' ({})",
+ ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}' ({}) in {}",
Ref.BucketSegment,
Ref.HashKey,
Ref.ValueContentId,
NiceBytes(Body.Size()),
ToString(Body.GetContentType()),
- Result.New ? "NEW" : "OLD");
+ Result.New ? "NEW" : "OLD",
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
const HttpResponseCode ResponseCode = Result.New ? HttpResponseCode::Created : HttpResponseCode::OK;
@@ -1444,8 +1457,12 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv);
+ std::vector<size_t> RemoteRequestIndexes;
+
for (CbFieldView RequestField : Params["Requests"sv])
{
+ Stopwatch Timer;
+
RequestData& Request = Requests.emplace_back();
CbObjectView RequestObject = RequestField.AsObjectView();
CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
@@ -1463,46 +1480,28 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
CachePolicy Policy = Request.Policy;
CompressedBuffer& Result = Request.Result;
- ZenCacheValue CacheValue;
- std::string_view Source;
+ ZenCacheValue CacheValue;
if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
{
if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType()))
{
Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value));
- if (Result)
- {
- Source = "LOCAL"sv;
- }
- }
- }
- if (!Result && EnumHasAllFlags(Policy, CachePolicy::QueryRemote))
- {
- GetUpstreamCacheResult UpstreamResult =
- m_UpstreamCache.GetCacheRecord({Key.Bucket, Key.Hash}, ZenContentType::kCompressedBinary);
- if (UpstreamResult.Success && IsCompressedBinary(UpstreamResult.Value.GetContentType()))
- {
- Result = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value));
- if (Result)
- {
- UpstreamResult.Value.SetContentType(ZenContentType::kCompressedBinary);
- Source = "UPSTREAM"sv;
- // TODO: Respect the StoreLocal flag once we have upstream existence-only checks. For now the requirement
- // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive
- // for us to keep the data only on the upstream server.
- // if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
- {
- m_CacheStore.Put(Key.Bucket, Key.Hash, ZenCacheValue{UpstreamResult.Value});
- }
- }
}
}
-
if (Result)
{
- ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}' {} ({})", Key.Bucket, Key.Hash, NiceBytes(Result.GetCompressed().GetSize()), Source);
+ ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}' {} ({}) in {}",
+ Key.Bucket,
+ Key.Hash,
+ NiceBytes(Result.GetCompressed().GetSize()),
+ "LOCAL"sv,
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
m_CacheStats.HitCount++;
}
+ else if (EnumHasAllFlags(Policy, CachePolicy::QueryRemote))
+ {
+ RemoteRequestIndexes.push_back(Requests.size() - 1);
+ }
else if (!EnumHasAnyFlags(Policy, CachePolicy::Query))
{
// If they requested no query, do not record this as a miss
@@ -1510,10 +1509,65 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
}
else
{
- ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}'", Key.Bucket, Key.Hash);
+ ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}' ({}) in {}",
+ Key.Bucket,
+ Key.Hash,
+ "LOCAL"sv,
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
m_CacheStats.MissCount++;
}
}
+
+ if (!RemoteRequestIndexes.empty())
+ {
+ std::vector<CacheChunkRequest> RequestedRecordsData;
+ std::vector<CacheChunkRequest*> CacheChunkRequests;
+ RequestedRecordsData.reserve(RemoteRequestIndexes.size());
+ CacheChunkRequests.reserve(RemoteRequestIndexes.size());
+ for (size_t Index : RemoteRequestIndexes)
+ {
+ RequestData& Request = Requests[Index];
+ RequestedRecordsData.push_back({Request.Key.Bucket, Request.Key.Hash});
+ CacheChunkRequests.push_back(&RequestedRecordsData.back());
+ }
+ Stopwatch Timer;
+ m_UpstreamCache.GetCacheValues(
+ CacheChunkRequests,
+ [this, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer](CacheValueGetCompleteParams&& Params) {
+ CacheChunkRequest& ChunkRequest = Params.Request;
+ if (Params.Value)
+ {
+ size_t RequestOffset = std::distance(RequestedRecordsData.data(), &ChunkRequest);
+ size_t RequestIndex = RemoteRequestIndexes[RequestOffset];
+ RequestData& Request = Requests[RequestIndex];
+ Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value));
+ if (Request.Result && IsCompressedBinary(Params.Value.GetContentType()))
+ {
+ // TODO: Respect the StoreLocal flag once we have upstream existence-only checks. For now the requirement
+ // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive
+ // for us to keep the data only on the upstream server.
+ // if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
+ m_CacheStore.Put(Request.Key.Bucket, Request.Key.Hash, ZenCacheValue{Params.Value});
+ ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}' {} ({}) in {}",
+ ChunkRequest.Key.Bucket,
+ ChunkRequest.Key.Hash,
+ NiceBytes(Request.Result.GetCompressed().GetSize()),
+ "UPSTREAM"sv,
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ m_CacheStats.HitCount++;
+ m_CacheStats.UpstreamHitCount++;
+ return;
+ }
+ }
+ ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}' ({}) in {}",
+ ChunkRequest.Key.Bucket,
+ ChunkRequest.Key.Hash,
+ "UPSTREAM"sv,
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ m_CacheStats.MissCount++;
+ });
+ }
+
if (Requests.empty())
{
return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);