diff options
| author | Dan Engelbrecht <[email protected]> | 2022-04-26 23:18:44 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-04-26 23:35:37 +0200 |
| commit | 4e538014c4e5089f10aca69657a948e4d912292d (patch) | |
| tree | 50e6f0b5fd85171ba077aec1b92014c6da2e661d /zenserver/cache/structuredcache.cpp | |
| parent | Compute tweaks (#78) (diff) | |
| download | zen-4e538014c4e5089f10aca69657a948e4d912292d.tar.xz zen-4e538014c4e5089f10aca69657a948e4d912292d.zip | |
Use GetCacheValues when checking upstream to reduce number of calls to upstream
Added some timing info to debug logs
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 148 |
1 files changed, 101 insertions, 47 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 8ae531720..8daf08bff 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -650,42 +650,52 @@ HttpStructuredCacheService::HandleCacheValueRequest(HttpServerRequest& Request, void HttpStructuredCacheService::HandleGetCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { + Stopwatch Timer; + IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId); bool InUpstreamCache = false; CachePolicy Policy = PolicyFromURL; - const bool QueryUpstream = !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote); - - if (QueryUpstream) { - if (auto UpstreamResult = m_UpstreamCache.GetCacheValue({Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); - UpstreamResult.Success) + const bool QueryUpstream = !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote); + + if (QueryUpstream) { - if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) - { - m_CidStore.AddChunk(Compressed); - InUpstreamCache = true; - } - else + if (auto UpstreamResult = m_UpstreamCache.GetCacheValue({Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); + UpstreamResult.Success) { - ZEN_WARN("got uncompressed upstream cache value"); + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) + { + m_CidStore.AddChunk(Compressed); + InUpstreamCache = true; + } + else + { + ZEN_WARN("got uncompressed upstream cache value"); + } } } } if (!Value) { - ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, ToString(Request.AcceptContentType())); + ZEN_DEBUG("MISS - '{}/{}/{}' '{}' in {}", + Ref.BucketSegment, + Ref.HashKey, + Ref.ValueContentId, + ToString(Request.AcceptContentType()), + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound); } - ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", + ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({}) in {}", Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, NiceBytes(Value.Size()), ToString(Value.GetContentType()), - InUpstreamCache ? "UPSTREAM" : "LOCAL"); + InUpstreamCache ? "UPSTREAM" : "LOCAL", + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.HitCount++; if (InUpstreamCache) @@ -709,6 +719,8 @@ HttpStructuredCacheService::HandlePutCacheValue(zen::HttpServerRequest& Request, // Note: Individual cacherecord values are not propagated upstream until a valid cache record has been stored ZEN_UNUSED(PolicyFromURL); + Stopwatch Timer; + IoBuffer Body = Request.ReadPayload(); if (!Body || Body.Size() == 0) @@ -734,13 +746,14 @@ HttpStructuredCacheService::HandlePutCacheValue(zen::HttpServerRequest& Request, CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed); - ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}' ({})", + ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}' ({}) in {}", Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, NiceBytes(Body.Size()), ToString(Body.GetContentType()), - Result.New ? "NEW" : "OLD"); + Result.New ? "NEW" : "OLD", + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); const HttpResponseCode ResponseCode = Result.New ? HttpResponseCode::Created : HttpResponseCode::OK; @@ -1444,8 +1457,12 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv); + std::vector<size_t> RemoteRequestIndexes; + for (CbFieldView RequestField : Params["Requests"sv]) { + Stopwatch Timer; + RequestData& Request = Requests.emplace_back(); CbObjectView RequestObject = RequestField.AsObjectView(); CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); @@ -1463,46 +1480,28 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http CachePolicy Policy = Request.Policy; CompressedBuffer& Result = Request.Result; - ZenCacheValue CacheValue; - std::string_view Source; + ZenCacheValue CacheValue; if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType())) { Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value)); - if (Result) - { - Source = "LOCAL"sv; - } - } - } - if (!Result && EnumHasAllFlags(Policy, CachePolicy::QueryRemote)) - { - GetUpstreamCacheResult UpstreamResult = - m_UpstreamCache.GetCacheRecord({Key.Bucket, Key.Hash}, ZenContentType::kCompressedBinary); - if (UpstreamResult.Success && IsCompressedBinary(UpstreamResult.Value.GetContentType())) - { - Result = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)); - if (Result) - { - UpstreamResult.Value.SetContentType(ZenContentType::kCompressedBinary); - Source = "UPSTREAM"sv; - // TODO: Respect the StoreLocal flag once we have upstream existence-only checks. For now the requirement - // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive - // for us to keep the data only on the upstream server. - // if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) - { - m_CacheStore.Put(Key.Bucket, Key.Hash, ZenCacheValue{UpstreamResult.Value}); - } - } } } - if (Result) { - ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}' {} ({})", Key.Bucket, Key.Hash, NiceBytes(Result.GetCompressed().GetSize()), Source); + ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}' {} ({}) in {}", + Key.Bucket, + Key.Hash, + NiceBytes(Result.GetCompressed().GetSize()), + "LOCAL"sv, + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.HitCount++; } + else if (EnumHasAllFlags(Policy, CachePolicy::QueryRemote)) + { + RemoteRequestIndexes.push_back(Requests.size() - 1); + } else if (!EnumHasAnyFlags(Policy, CachePolicy::Query)) { // If they requested no query, do not record this as a miss @@ -1510,10 +1509,65 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http } else { - ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}'", Key.Bucket, Key.Hash); + ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}' ({}) in {}", + Key.Bucket, + Key.Hash, + "LOCAL"sv, + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.MissCount++; } } + + if (!RemoteRequestIndexes.empty()) + { + std::vector<CacheChunkRequest> RequestedRecordsData; + std::vector<CacheChunkRequest*> CacheChunkRequests; + RequestedRecordsData.reserve(RemoteRequestIndexes.size()); + CacheChunkRequests.reserve(RemoteRequestIndexes.size()); + for (size_t Index : RemoteRequestIndexes) + { + RequestData& Request = Requests[Index]; + RequestedRecordsData.push_back({Request.Key.Bucket, Request.Key.Hash}); + CacheChunkRequests.push_back(&RequestedRecordsData.back()); + } + Stopwatch Timer; + m_UpstreamCache.GetCacheValues( + CacheChunkRequests, + [this, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer](CacheValueGetCompleteParams&& Params) { + CacheChunkRequest& ChunkRequest = Params.Request; + if (Params.Value) + { + size_t RequestOffset = std::distance(RequestedRecordsData.data(), &ChunkRequest); + size_t RequestIndex = RemoteRequestIndexes[RequestOffset]; + RequestData& Request = Requests[RequestIndex]; + Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)); + if (Request.Result && IsCompressedBinary(Params.Value.GetContentType())) + { + // TODO: Respect the StoreLocal flag once we have upstream existence-only checks. For now the requirement + // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive + // for us to keep the data only on the upstream server. + // if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) + m_CacheStore.Put(Request.Key.Bucket, Request.Key.Hash, ZenCacheValue{Params.Value}); + ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}' {} ({}) in {}", + ChunkRequest.Key.Bucket, + ChunkRequest.Key.Hash, + NiceBytes(Request.Result.GetCompressed().GetSize()), + "UPSTREAM"sv, + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + m_CacheStats.HitCount++; + m_CacheStats.UpstreamHitCount++; + return; + } + } + ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}' ({}) in {}", + ChunkRequest.Key.Bucket, + ChunkRequest.Key.Hash, + "UPSTREAM"sv, + NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + m_CacheStats.MissCount++; + }); + } + if (Requests.empty()) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); |