diff options
Diffstat (limited to 'zenserver/upstream')
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 132 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 85 |
2 files changed, 117 insertions, 100 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 1aa195469..c1c0395e7 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -180,7 +180,9 @@ namespace detail { virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); } - virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, + const CacheKey& CacheKey, + ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheRecord"); @@ -274,21 +276,20 @@ namespace detail { if (Result.ErrorCode == 0) { - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + return {.Status = {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}, + .Value = Result.Response, + .Source = &m_Info}; } else { - return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}}; } } catch (std::exception& Err) { m_Status.Set(UpstreamEndpointState::kError, Err.what()); - return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; + return {.Status = {.Error{.ErrorCode = -1, .Reason = Err.what()}}}; } } @@ -342,13 +343,16 @@ namespace detail { } } - OnComplete({.Request = *Request, .Record = Record, .Package = Package, .ElapsedSeconds = ElapsedSeconds}); + OnComplete( + {.Request = *Request, .Record = Record, .Package = Package, .ElapsedSeconds = ElapsedSeconds, .Source = &m_Info}); } return Result; } - virtual GetUpstreamCacheResult GetCacheChunk(std::string_view Namespace, const CacheKey&, const IoHash& ValueContentId) override + virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace, + const CacheKey&, + const IoHash& ValueContentId) override { ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheChunk"); @@ -362,21 +366,20 @@ namespace detail { if (Result.ErrorCode == 0) { - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + return {.Status = {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}, + .Value = Result.Response, + .Source = &m_Info}; } else { - return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}}; } } catch (std::exception& Err) { m_Status.Set(UpstreamEndpointState::kError, Err.what()); - return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; + return {.Status = {.Error{.ErrorCode = -1, .Reason = Err.what()}}}; } } @@ -418,7 +421,8 @@ namespace detail { .RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()), .RawSize = Compressed.GetRawSize(), .Value = Payload, - .ElapsedSeconds = ElapsedSeconds}); + .ElapsedSeconds = ElapsedSeconds, + .Source = &m_Info}); } else { @@ -487,7 +491,8 @@ namespace detail { .RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()), .RawSize = Compressed.GetRawSize(), .Value = Payload, - .ElapsedSeconds = ElapsedSeconds}); + .ElapsedSeconds = ElapsedSeconds, + .Source = &m_Info}); } else { @@ -808,7 +813,9 @@ namespace detail { virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); } - virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, + const CacheKey& CacheKey, + ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheRecord"); @@ -821,21 +828,20 @@ namespace detail { if (Result.ErrorCode == 0) { - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + return {.Status = {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}, + .Value = Result.Response, + .Source = &m_Info}; } else { - return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}}; } } catch (std::exception& Err) { m_Status.Set(UpstreamEndpointState::kError, Err.what()); - return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; + return {.Status = {.Error{.ErrorCode = -1, .Reason = Err.what()}}}; } } @@ -908,7 +914,8 @@ namespace detail { OnComplete({.Request = *Request, .Record = Record.AsObjectView(), .Package = BatchResponse, - .ElapsedSeconds = Result.ElapsedSeconds}); + .ElapsedSeconds = Result.ElapsedSeconds, + .Source = &m_Info}); } return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; @@ -924,9 +931,9 @@ namespace detail { return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } - virtual GetUpstreamCacheResult GetCacheChunk(std::string_view Namespace, - const CacheKey& CacheKey, - const IoHash& ValueContentId) override + virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace, + const CacheKey& CacheKey, + const IoHash& ValueContentId) override { ZEN_TRACE_CPU("Upstream::Zen::GetCacheChunk"); @@ -939,21 +946,20 @@ namespace detail { if (Result.ErrorCode == 0) { - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + return {.Status = {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}, + .Value = Result.Response, + .Source = &m_Info}; } else { - return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}}; } } catch (std::exception& Err) { m_Status.Set(UpstreamEndpointState::kError, Err.what()); - return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; + return {.Status = {.Error{.ErrorCode = -1, .Reason = Err.what()}}}; } } @@ -1055,7 +1061,8 @@ namespace detail { .RawHash = RawHash, .RawSize = RawSize, .Value = std::move(Payload), - .ElapsedSeconds = Result.ElapsedSeconds}); + .ElapsedSeconds = Result.ElapsedSeconds, + .Source = &m_Info}); } return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; @@ -1185,7 +1192,8 @@ namespace detail { .RawHash = RawHash, .RawSize = RawSize, .Value = std::move(Payload), - .ElapsedSeconds = Result.ElapsedSeconds}); + .ElapsedSeconds = Result.ElapsedSeconds, + .Source = &m_Info}); } return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; @@ -1480,7 +1488,7 @@ public: } } - virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::GetCacheRecord"); @@ -1495,31 +1503,29 @@ public: continue; } - UpstreamEndpointStats& Stats = Endpoint->Stats(); - GetUpstreamCacheResult Result; - { - metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCacheRecord(Namespace, CacheKey, Type); - } + UpstreamEndpointStats& Stats = Endpoint->Stats(); + metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); + GetUpstreamCacheSingleResult Result = Endpoint->GetCacheRecord(Namespace, CacheKey, Type); + Scope.Stop(); Stats.CacheGetCount.Increment(1); - Stats.CacheGetTotalBytes.Increment(Result.Bytes); + Stats.CacheGetTotalBytes.Increment(Result.Status.Bytes); - if (Result.Success) + if (Result.Status.Success) { Stats.CacheHitCount.Increment(1); return Result; } - if (Result.Error) + if (Result.Status.Error) { Stats.CacheErrorCount.Increment(1); ZEN_WARN("get cache record FAILED, endpoint '{}', reason '{}', error code '{}'", Endpoint->GetEndpointInfo().Url, - Result.Error.Reason, - Result.Error.ErrorCode); + Result.Status.Error.Reason, + Result.Status.Error.ErrorCode); } } } @@ -1588,6 +1594,7 @@ public: } } + const UpstreamEndpointInfo Info; for (CacheKeyRequest* Request : RemainingKeys) { OnComplete({.Request = *Request, .Record = CbObjectView(), .Package = CbPackage()}); @@ -1655,15 +1662,16 @@ public: } } + const UpstreamEndpointInfo Info; for (CacheChunkRequest* RequestPtr : RemainingKeys) { OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); } } - virtual GetUpstreamCacheResult GetCacheChunk(std::string_view Namespace, - const CacheKey& CacheKey, - const IoHash& ValueContentId) override + virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace, + const CacheKey& CacheKey, + const IoHash& ValueContentId) override { ZEN_TRACE_CPU("Upstream::GetCacheChunk"); @@ -1676,32 +1684,29 @@ public: continue; } - UpstreamEndpointStats& Stats = Endpoint->Stats(); - GetUpstreamCacheResult Result; - - { - metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCacheChunk(Namespace, CacheKey, ValueContentId); - } + UpstreamEndpointStats& Stats = Endpoint->Stats(); + metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); + GetUpstreamCacheSingleResult Result = Endpoint->GetCacheChunk(Namespace, CacheKey, ValueContentId); + Scope.Stop(); Stats.CacheGetCount.Increment(1); - Stats.CacheGetTotalBytes.Increment(Result.Bytes); + Stats.CacheGetTotalBytes.Increment(Result.Status.Bytes); - if (Result.Success) + if (Result.Status.Success) { Stats.CacheHitCount.Increment(1); return Result; } - if (Result.Error) + if (Result.Status.Error) { Stats.CacheErrorCount.Increment(1); ZEN_WARN("get cache chunk FAILED, endpoint '{}', reason '{}', error code '{}'", Endpoint->GetEndpointInfo().Url, - Result.Error.Reason, - Result.Error.ErrorCode); + Result.Status.Error.Reason, + Result.Status.Error.ErrorCode); } } } @@ -1770,6 +1775,7 @@ public: } } + const UpstreamEndpointInfo Info; for (CacheValueRequest* RequestPtr : RemainingKeys) { OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 108c097da..695c06b32 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -53,15 +53,27 @@ struct UpstreamError explicit operator bool() const { return ErrorCode != 0; } }; +struct UpstreamEndpointInfo +{ + std::string Name; + std::string Url; +}; + struct GetUpstreamCacheResult { - IoBuffer Value; UpstreamError Error{}; int64_t Bytes{}; double ElapsedSeconds{}; bool Success = false; }; +struct GetUpstreamCacheSingleResult +{ + GetUpstreamCacheResult Status; + IoBuffer Value; + const UpstreamEndpointInfo* Source = nullptr; +}; + struct PutUpstreamCacheResult { std::string Reason; @@ -72,32 +84,35 @@ struct PutUpstreamCacheResult struct CacheRecordGetCompleteParams { - CacheKeyRequest& Request; - const CbObjectView& Record; - const CbPackage& Package; - double ElapsedSeconds{}; + CacheKeyRequest& Request; + const CbObjectView& Record; + const CbPackage& Package; + double ElapsedSeconds{}; + const UpstreamEndpointInfo* Source = nullptr; }; using OnCacheRecordGetComplete = std::function<void(CacheRecordGetCompleteParams&&)>; struct CacheValueGetCompleteParams { - CacheValueRequest& Request; - IoHash RawHash; - uint64_t RawSize; - IoBuffer Value; - double ElapsedSeconds{}; + CacheValueRequest& Request; + IoHash RawHash; + uint64_t RawSize; + IoBuffer Value; + double ElapsedSeconds{}; + const UpstreamEndpointInfo* Source = nullptr; }; using OnCacheValueGetComplete = std::function<void(CacheValueGetCompleteParams&&)>; struct CacheChunkGetCompleteParams { - CacheChunkRequest& Request; - IoHash RawHash; - uint64_t RawSize; - IoBuffer Value; - double ElapsedSeconds{}; + CacheChunkRequest& Request; + IoHash RawHash; + uint64_t RawSize; + IoBuffer Value; + double ElapsedSeconds{}; + const UpstreamEndpointInfo* Source = nullptr; }; using OnCacheChunksGetComplete = std::function<void(CacheChunkGetCompleteParams&&)>; @@ -156,12 +171,6 @@ struct UpstreamEndpointStatus UpstreamEndpointState State; }; -struct UpstreamEndpointInfo -{ - std::string Name; - std::string Url; -}; - /** * The upstream endpoint is responsible for handling upload/downloading of cache records. */ @@ -177,19 +186,19 @@ public: virtual UpstreamEndpointState GetState() = 0; virtual UpstreamEndpointStatus GetStatus() = 0; - virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0; - virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace, - std::span<CacheKeyRequest*> Requests, - OnCacheRecordGetComplete&& OnComplete) = 0; + virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0; + virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace, + std::span<CacheKeyRequest*> Requests, + OnCacheRecordGetComplete&& OnComplete) = 0; virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, std::span<CacheValueRequest*> CacheValueRequests, OnCacheValueGetComplete&& OnComplete) = 0; - virtual GetUpstreamCacheResult GetCacheChunk(std::string_view Namespace, const CacheKey& CacheKey, const IoHash& PayloadId) = 0; - virtual GetUpstreamCacheResult GetCacheChunks(std::string_view Namespace, - std::span<CacheChunkRequest*> CacheChunkRequests, - OnCacheChunksGetComplete&& OnComplete) = 0; + virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace, const CacheKey& CacheKey, const IoHash& PayloadId) = 0; + virtual GetUpstreamCacheResult GetCacheChunks(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheChunksGetComplete&& OnComplete) = 0; virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, @@ -217,19 +226,21 @@ public: virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0; virtual void IterateEndpoints(std::function<bool(UpstreamEndpoint&)>&& Fn) = 0; - virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0; - virtual void GetCacheRecords(std::string_view Namespace, - std::span<CacheKeyRequest*> Requests, - OnCacheRecordGetComplete&& OnComplete) = 0; + virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0; + virtual void GetCacheRecords(std::string_view Namespace, + std::span<CacheKeyRequest*> Requests, + OnCacheRecordGetComplete&& OnComplete) = 0; virtual void GetCacheValues(std::string_view Namespace, std::span<CacheValueRequest*> CacheValueRequests, OnCacheValueGetComplete&& OnComplete) = 0; - virtual GetUpstreamCacheResult GetCacheChunk(std::string_view Namespace, const CacheKey& CacheKey, const IoHash& ValueContentId) = 0; - virtual void GetCacheChunks(std::string_view Namespace, - std::span<CacheChunkRequest*> CacheChunkRequests, - OnCacheChunksGetComplete&& OnComplete) = 0; + virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace, + const CacheKey& CacheKey, + const IoHash& ValueContentId) = 0; + virtual void GetCacheChunks(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheChunksGetComplete&& OnComplete) = 0; virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; |