diff options
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 211 |
1 files changed, 125 insertions, 86 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index baed1b3b6..ac75285c7 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -15,6 +15,8 @@ #include <zencore/timer.h> #include <zencore/trace.h> +#include <zenhttp/httpshared.h> + #include <zenstore/cidstore.h> #include <auth/authmgr.h> @@ -180,7 +182,9 @@ namespace detail { virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); } - virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, + const CacheKey& CacheKey, + ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheRecord"); @@ -274,21 +278,20 @@ namespace detail { if (Result.ErrorCode == 0) { - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + return {.Status = {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}, + .Value = Result.Response, + .Source = &m_Info}; } else { - return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}}; } } catch (std::exception& Err) { m_Status.Set(UpstreamEndpointState::kError, Err.what()); - return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; + return {.Status = {.Error{.ErrorCode = -1, .Reason = Err.what()}}}; } } @@ -307,12 +310,14 @@ namespace detail { CbPackage Package; CbObject Record; + double ElapsedSeconds = 0.0; if (!Result.Error) { std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); CloudCacheResult RefResult = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); AppendResult(RefResult, Result); + ElapsedSeconds = RefResult.ElapsedSeconds; m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason); @@ -340,15 +345,18 @@ namespace detail { } } - OnComplete({.Request = *Request, .Record = Record, .Package = Package}); + OnComplete( + {.Request = *Request, .Record = Record, .Package = Package, .ElapsedSeconds = ElapsedSeconds, .Source = &m_Info}); } return Result; } - virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, const CacheKey&, const IoHash& ValueContentId) override + virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace, + const CacheKey&, + const IoHash& ValueContentId) override { - ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheValue"); + ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheChunk"); try { @@ -360,21 +368,20 @@ namespace detail { if (Result.ErrorCode == 0) { - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + return {.Status = {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}, + .Value = Result.Response, + .Source = &m_Info}; } else { - return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}}; } } catch (std::exception& Err) { m_Status.Set(UpstreamEndpointState::kError, Err.what()); - return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; + return {.Status = {.Error{.ErrorCode = -1, .Reason = Err.what()}}}; } } @@ -392,6 +399,7 @@ namespace detail { CacheChunkRequest& Request = *RequestPtr; IoBuffer Payload; + double ElapsedSeconds = 0.0; CompressedBuffer Compressed; if (!Result.Error) { @@ -400,7 +408,8 @@ namespace detail { Request.ChunkId == IoHash::Zero ? Session.GetInlineBlob(BlobStoreNamespace, Request.Key.Bucket, Request.Key.Hash, Request.ChunkId) : Session.GetCompressedBlob(BlobStoreNamespace, Request.ChunkId); - Payload = BlobResult.Response; + ElapsedSeconds = BlobResult.ElapsedSeconds; + Payload = BlobResult.Response; AppendResult(BlobResult, Result); @@ -413,10 +422,12 @@ namespace detail { if (Compressed) { - OnComplete({.Request = Request, - .RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()), - .RawSize = Compressed.GetRawSize(), - .Value = Payload}); + OnComplete({.Request = Request, + .RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()), + .RawSize = Compressed.GetRawSize(), + .Value = Payload, + .ElapsedSeconds = ElapsedSeconds, + .Source = &m_Info}); } else { @@ -441,6 +452,7 @@ namespace detail { CacheValueRequest& Request = *RequestPtr; IoBuffer Payload; + double ElapsedSeconds = 0.0; CompressedBuffer Compressed; if (!Result.Error) { @@ -448,7 +460,8 @@ namespace detail { IoHash PayloadHash; const CloudCacheResult BlobResult = Session.GetInlineBlob(BlobStoreNamespace, Request.Key.Bucket, Request.Key.Hash, PayloadHash); - Payload = BlobResult.Response; + ElapsedSeconds = BlobResult.ElapsedSeconds; + Payload = BlobResult.Response; AppendResult(BlobResult, Result); @@ -479,10 +492,12 @@ namespace detail { if (Compressed) { - OnComplete({.Request = Request, - .RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()), - .RawSize = Compressed.GetRawSize(), - .Value = Payload}); + OnComplete({.Request = Request, + .RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()), + .RawSize = Compressed.GetRawSize(), + .Value = Payload, + .ElapsedSeconds = ElapsedSeconds, + .Source = &m_Info}); } else { @@ -803,7 +818,9 @@ namespace detail { virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); } - virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, + const CacheKey& CacheKey, + ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheRecord"); @@ -816,21 +833,20 @@ namespace detail { if (Result.ErrorCode == 0) { - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + return {.Status = {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}, + .Value = Result.Response, + .Source = &m_Info}; } else { - return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}}; } } catch (std::exception& Err) { m_Status.Set(UpstreamEndpointState::kError, Err.what()); - return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; + return {.Status = {.Error{.ErrorCode = -1, .Reason = Err.what()}}}; } } @@ -876,7 +892,6 @@ namespace detail { } BatchRequest.EndObject(); - CbPackage BatchResponse; ZenCacheResult Result; { @@ -888,24 +903,33 @@ namespace detail { if (Result.Success) { - if (BatchResponse.TryLoad(Result.Response)) + CbPackage BatchResponse; + if (ParsePackageMessageWithLegacyFallback(Result.Response, BatchResponse)) { CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView(); if (Results.Num() != Requests.size()) { - ZEN_WARN("Upstream::Zen::GetCacheRecords invalid number of Requests from Upstream."); + ZEN_WARN("Upstream::Zen::GetCacheRecords invalid number of Response results from Upstream."); } else { for (size_t Index = 0; CbFieldView Record : Results) { CacheKeyRequest* Request = Requests[Index++]; - OnComplete({.Request = *Request, .Record = Record.AsObjectView(), .Package = BatchResponse}); + OnComplete({.Request = *Request, + .Record = Record.AsObjectView(), + .Package = BatchResponse, + .ElapsedSeconds = Result.ElapsedSeconds, + .Source = &m_Info}); } return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; } } + else + { + ZEN_WARN("Upstream::Zen::GetCacheRecords invalid Response from Upstream."); + } } for (CacheKeyRequest* Request : Requests) @@ -916,36 +940,35 @@ namespace detail { return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } - virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, - const CacheKey& CacheKey, - const IoHash& ValueContentId) override + virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace, + const CacheKey& CacheKey, + const IoHash& ValueContentId) override { - ZEN_TRACE_CPU("Upstream::Zen::GetCacheValue"); + ZEN_TRACE_CPU("Upstream::Zen::GetCacheChunk"); try { ZenStructuredCacheSession Session(GetClientRef()); - const ZenCacheResult Result = Session.GetCacheValue(Namespace, CacheKey.Bucket, CacheKey.Hash, ValueContentId); + const ZenCacheResult Result = Session.GetCacheChunk(Namespace, CacheKey.Bucket, CacheKey.Hash, ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); if (Result.ErrorCode == 0) { - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + return {.Status = {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}, + .Value = Result.Response, + .Source = &m_Info}; } else { - return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}}; } } catch (std::exception& Err) { m_Status.Set(UpstreamEndpointState::kError, Err.what()); - return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; + return {.Status = {.Error{.ErrorCode = -1, .Reason = Err.what()}}}; } } @@ -990,7 +1013,6 @@ namespace detail { } BatchRequest.EndObject(); - CbPackage BatchResponse; ZenCacheResult Result; { @@ -1002,12 +1024,13 @@ namespace detail { if (Result.Success) { - if (BatchResponse.TryLoad(Result.Response)) + CbPackage BatchResponse; + if (ParsePackageMessageWithLegacyFallback(Result.Response, BatchResponse)) { CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView(); if (CacheValueRequests.size() != Results.Num()) { - ZEN_WARN("Upstream::Zen::GetCacheValues invalid number of Requests from Upstream."); + ZEN_WARN("Upstream::Zen::GetCacheValues invalid number of Response results from Upstream."); } else { @@ -1043,12 +1066,21 @@ namespace detail { RawHash = IoHash::Zero; } } - OnComplete({.Request = Request, .RawHash = RawHash, .RawSize = RawSize, .Value = std::move(Payload)}); + OnComplete({.Request = Request, + .RawHash = RawHash, + .RawSize = RawSize, + .Value = std::move(Payload), + .ElapsedSeconds = Result.ElapsedSeconds, + .Source = &m_Info}); } return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; } } + else + { + ZEN_WARN("Upstream::Zen::GetCacheValues invalid Response from Upstream."); + } } for (CacheValueRequest* RequestPtr : CacheValueRequests) @@ -1116,7 +1148,6 @@ namespace detail { } BatchRequest.EndObject(); - CbPackage BatchResponse; ZenCacheResult Result; { @@ -1128,12 +1159,13 @@ namespace detail { if (Result.Success) { - if (BatchResponse.TryLoad(Result.Response)) + CbPackage BatchResponse; + if (ParsePackageMessageWithLegacyFallback(Result.Response, BatchResponse)) { CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView(); if (CacheChunkRequests.size() != Results.Num()) { - ZEN_WARN("Upstream::Zen::GetCacheChunks invalid number of Requests from Upstream."); + ZEN_WARN("Upstream::Zen::GetCacheChunks invalid number of Response results from Upstream."); } else { @@ -1169,12 +1201,21 @@ namespace detail { RawHash = IoHash::Zero; } } - OnComplete({.Request = Request, .RawHash = RawHash, .RawSize = RawSize, .Value = std::move(Payload)}); + OnComplete({.Request = Request, + .RawHash = RawHash, + .RawSize = RawSize, + .Value = std::move(Payload), + .ElapsedSeconds = Result.ElapsedSeconds, + .Source = &m_Info}); } return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; } } + else + { + ZEN_WARN("Upstream::Zen::GetCacheChunks invalid Response from Upstream."); + } } for (CacheChunkRequest* RequestPtr : CacheChunkRequests) @@ -1464,7 +1505,7 @@ public: } } - virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::GetCacheRecord"); @@ -1479,31 +1520,29 @@ public: continue; } - UpstreamEndpointStats& Stats = Endpoint->Stats(); - GetUpstreamCacheResult Result; - { - metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCacheRecord(Namespace, CacheKey, Type); - } + UpstreamEndpointStats& Stats = Endpoint->Stats(); + metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); + GetUpstreamCacheSingleResult Result = Endpoint->GetCacheRecord(Namespace, CacheKey, Type); + Scope.Stop(); Stats.CacheGetCount.Increment(1); - Stats.CacheGetTotalBytes.Increment(Result.Bytes); + Stats.CacheGetTotalBytes.Increment(Result.Status.Bytes); - if (Result.Success) + if (Result.Status.Success) { Stats.CacheHitCount.Increment(1); return Result; } - if (Result.Error) + if (Result.Status.Error) { Stats.CacheErrorCount.Increment(1); ZEN_WARN("get cache record FAILED, endpoint '{}', reason '{}', error code '{}'", Endpoint->GetEndpointInfo().Url, - Result.Error.Reason, - Result.Error.ErrorCode); + Result.Status.Error.Reason, + Result.Status.Error.ErrorCode); } } } @@ -1572,6 +1611,7 @@ public: } } + const UpstreamEndpointInfo Info; for (CacheKeyRequest* Request : RemainingKeys) { OnComplete({.Request = *Request, .Record = CbObjectView(), .Package = CbPackage()}); @@ -1629,7 +1669,7 @@ public: { Stats.CacheErrorCount.Increment(1); - ZEN_WARN("get cache values(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", + ZEN_WARN("get cache chunks(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", Endpoint->GetEndpointInfo().Url, Result.Error.Reason, Result.Error.ErrorCode); @@ -1639,17 +1679,18 @@ public: } } + const UpstreamEndpointInfo Info; for (CacheChunkRequest* RequestPtr : RemainingKeys) { OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); } } - virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, - const CacheKey& CacheKey, - const IoHash& ValueContentId) override + virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace, + const CacheKey& CacheKey, + const IoHash& ValueContentId) override { - ZEN_TRACE_CPU("Upstream::GetCacheValue"); + ZEN_TRACE_CPU("Upstream::GetCacheChunk"); if (m_Options.ReadUpstream) { @@ -1660,32 +1701,29 @@ public: continue; } - UpstreamEndpointStats& Stats = Endpoint->Stats(); - GetUpstreamCacheResult Result; - - { - metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCacheValue(Namespace, CacheKey, ValueContentId); - } + UpstreamEndpointStats& Stats = Endpoint->Stats(); + metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); + GetUpstreamCacheSingleResult Result = Endpoint->GetCacheChunk(Namespace, CacheKey, ValueContentId); + Scope.Stop(); Stats.CacheGetCount.Increment(1); - Stats.CacheGetTotalBytes.Increment(Result.Bytes); + Stats.CacheGetTotalBytes.Increment(Result.Status.Bytes); - if (Result.Success) + if (Result.Status.Success) { Stats.CacheHitCount.Increment(1); return Result; } - if (Result.Error) + if (Result.Status.Error) { Stats.CacheErrorCount.Increment(1); - ZEN_WARN("get cache value FAILED, endpoint '{}', reason '{}', error code '{}'", + ZEN_WARN("get cache chunk FAILED, endpoint '{}', reason '{}', error code '{}'", Endpoint->GetEndpointInfo().Url, - Result.Error.Reason, - Result.Error.ErrorCode); + Result.Status.Error.Reason, + Result.Status.Error.ErrorCode); } } } @@ -1754,6 +1792,7 @@ public: } } + const UpstreamEndpointInfo Info; for (CacheValueRequest* RequestPtr : RemainingKeys) { OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); |