diff options
| author | Dan Engelbrecht <[email protected]> | 2022-09-06 16:42:59 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-09-06 07:42:59 -0700 |
| commit | f9ddb13500f41549fa472f6aaffa096d64554145 (patch) | |
| tree | 24de689afc0cdb6cd7dd99ce85d66b7223630c64 /zenserver/upstream/upstreamcache.cpp | |
| parent | updated codeowners (#157) (diff) | |
| download | zen-f9ddb13500f41549fa472f6aaffa096d64554145.tar.xz zen-f9ddb13500f41549fa472f6aaffa096d64554145.zip | |
Implement proper GetCacheValues upstream (#155)
* Implement proper GetCacheValues upstream
* changelog
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 249 |
1 files changed, 235 insertions, 14 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index a897c21fd..7f5759e47 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -378,11 +378,11 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, + virtual GetUpstreamCacheResult GetCacheChunks(std::string_view Namespace, std::span<CacheChunkRequest*> CacheChunkRequests, - OnCacheValueGetComplete&& OnComplete) override final + OnCacheChunksGetComplete&& OnComplete) override final { - ZEN_TRACE_CPU("Upstream::Horde::GetCacheValues"); + ZEN_TRACE_CPU("Upstream::Horde::GetCacheChunks"); CloudCacheSession Session(m_Client); GetUpstreamCacheResult Result; @@ -424,6 +424,52 @@ namespace detail { return Result; } + virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, + std::span<CacheValueRequest*> CacheValueRequests, + OnCacheValueGetComplete&& OnComplete) override final + { + ZEN_TRACE_CPU("Upstream::Horde::GetCacheValues"); + + CloudCacheSession Session(m_Client); + GetUpstreamCacheResult Result; + + for (CacheValueRequest* RequestPtr : CacheValueRequests) + { + CacheValueRequest& Request = *RequestPtr; + IoBuffer Payload; + + CompressedBuffer Compressed; + if (!Result.Error) + { + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); + const CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, IoHash::Zero); + Payload = BlobResult.Response; + + AppendResult(BlobResult, Result); + + m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); + if (Payload && IsCompressedBinary(Payload.GetContentType())) + { + Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); + } + } + + if (Compressed) + { + OnComplete({.Request = Request, + .RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()), + .RawSize = Compressed.GetRawSize(), + .Value = Payload}); + } + else + { + OnComplete({.Request = Request, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); + } + } + + return Result; + } + virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, std::span<IoBuffer const> Values) override @@ -851,7 +897,7 @@ namespace detail { const CacheKey& CacheKey, const IoHash& ValueContentId) override { - ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheValue"); + ZEN_TRACE_CPU("Upstream::Zen::GetCacheValue"); try { @@ -881,22 +927,130 @@ namespace detail { } virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, - std::span<CacheChunkRequest*> CacheChunkRequests, + std::span<CacheValueRequest*> CacheValueRequests, OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::Zen::GetCacheValues"); + ZEN_ASSERT(!CacheValueRequests.empty()); + + CbObjectWriter BatchRequest; + BatchRequest << "Method"sv + << "GetCacheValues"sv; + + BatchRequest.BeginObject("Params"sv); + { + CachePolicy DefaultPolicy = CacheValueRequests[0]->Policy; + BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy).ToView(); + BatchRequest << "Namespace"sv << Namespace; + + BatchRequest.BeginArray("Requests"sv); + { + for (CacheValueRequest* RequestPtr : CacheValueRequests) + { + const CacheValueRequest& Request = *RequestPtr; + + BatchRequest.BeginObject(); + { + BatchRequest.BeginObject("Key"sv); + BatchRequest << "Bucket"sv << Request.Key.Bucket; + BatchRequest << "Hash"sv << Request.Key.Hash; + BatchRequest.EndObject(); + if (Request.Policy != DefaultPolicy) + { + BatchRequest << "Policy"sv << WriteToString<128>(Request.Policy).ToView(); + } + } + BatchRequest.EndObject(); + } + } + BatchRequest.EndArray(); + } + BatchRequest.EndObject(); + + CbPackage BatchResponse; + ZenCacheResult Result; + + { + ZenStructuredCacheSession Session(GetClientRef()); + Result = Session.InvokeRpc(BatchRequest.Save()); + } + + m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); + + if (Result.Success) + { + if (BatchResponse.TryLoad(Result.Response)) + { + CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView(); + if (CacheValueRequests.size() != Results.Num()) + { + ZEN_WARN("Upstream::Zen::GetCacheValues invalid number of Requests from Upstream."); + } + else + { + for (size_t RequestIndex = 0; CbFieldView ChunkField : Results) + { + CacheValueRequest& Request = *CacheValueRequests[RequestIndex++]; + CbObjectView ChunkObject = ChunkField.AsObjectView(); + IoHash RawHash = ChunkObject["RawHash"sv].AsHash(); + IoBuffer Payload; + uint64_t RawSize = 0; + if (RawHash != IoHash::Zero) + { + bool Success = false; + const CbAttachment* Attachment = BatchResponse.FindAttachment(RawHash); + if (Attachment) + { + if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary()) + { + Payload = Compressed.GetCompressed().Flatten().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCompressedBinary); + RawSize = Compressed.GetRawSize(); + Success = true; + } + } + if (!Success) + { + CbFieldView RawSizeField = ChunkObject["RawSize"sv]; + RawSize = RawSizeField.AsUInt64(); + Success = !RawSizeField.HasError(); + } + if (!Success) + { + RawHash = IoHash::Zero; + } + } + OnComplete({.Request = Request, .RawHash = RawHash, .RawSize = RawSize, .Value = std::move(Payload)}); + } + + return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; + } + } + } + + for (CacheValueRequest* RequestPtr : CacheValueRequests) + { + OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); + } + + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } + + virtual GetUpstreamCacheResult GetCacheChunks(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheChunksGetComplete&& OnComplete) override final + { + ZEN_TRACE_CPU("Upstream::Zen::GetCacheChunks"); ZEN_ASSERT(!CacheChunkRequests.empty()); CbObjectWriter BatchRequest; BatchRequest << "Method"sv << "GetCacheChunks"sv; - BatchRequest << "Namespace"sv << Namespace; BatchRequest.BeginObject("Params"sv); { CachePolicy DefaultPolicy = CacheChunkRequests[0]->Policy; BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy).ToView(); - BatchRequest << "Namespace"sv << Namespace; BatchRequest.BeginArray("ChunkRequests"sv); @@ -956,7 +1110,7 @@ namespace detail { CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView(); if (CacheChunkRequests.size() != Results.Num()) { - ZEN_WARN("Upstream::Zen::GetCacheValues invalid number of Requests from Upstream."); + ZEN_WARN("Upstream::Zen::GetCacheChunks invalid number of Requests from Upstream."); } else { @@ -1401,11 +1555,11 @@ public: } } - virtual void GetCacheValues(std::string_view Namespace, + virtual void GetCacheChunks(std::string_view Namespace, std::span<CacheChunkRequest*> CacheChunkRequests, - OnCacheValueGetComplete&& OnComplete) override final + OnCacheChunksGetComplete&& OnComplete) override final { - ZEN_TRACE_CPU("Upstream::GetCacheValues"); + ZEN_TRACE_CPU("Upstream::GetCacheChunks"); std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); @@ -1431,10 +1585,10 @@ public: { metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming); - Result = Endpoint->GetCacheValues(Namespace, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) { + Result = Endpoint->GetCacheChunks(Namespace, RemainingKeys, [&](CacheChunkGetCompleteParams&& Params) { if (Params.RawHash != Params.RawHash.Zero) { - OnComplete(std::forward<CacheValueGetCompleteParams>(Params)); + OnComplete(std::forward<CacheChunkGetCompleteParams>(Params)); Stats.CacheHitCount.Increment(1); } @@ -1462,7 +1616,7 @@ public: } } - for (CacheChunkRequest* RequestPtr : CacheChunkRequests) + for (CacheChunkRequest* RequestPtr : RemainingKeys) { OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); } @@ -1516,6 +1670,73 @@ public: return {}; } + virtual void GetCacheValues(std::string_view Namespace, + std::span<CacheValueRequest*> CacheValueRequests, + OnCacheValueGetComplete&& OnComplete) override final + { + ZEN_TRACE_CPU("Upstream::GetCacheValues"); + + std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); + + std::vector<CacheValueRequest*> RemainingKeys(CacheValueRequests.begin(), CacheValueRequests.end()); + + if (m_Options.ReadUpstream) + { + for (auto& Endpoint : m_Endpoints) + { + if (RemainingKeys.empty()) + { + break; + } + + if (Endpoint->GetState() != UpstreamEndpointState::kOk) + { + continue; + } + + UpstreamEndpointStats& Stats = Endpoint->Stats(); + std::vector<CacheValueRequest*> Missing; + GetUpstreamCacheResult Result; + { + metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming); + + Result = Endpoint->GetCacheValues(Namespace, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) { + if (Params.RawHash != Params.RawHash.Zero) + { + OnComplete(std::forward<CacheValueGetCompleteParams>(Params)); + + Stats.CacheHitCount.Increment(1); + } + else + { + Missing.push_back(&Params.Request); + } + }); + } + + Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size())); + Stats.CacheGetTotalBytes.Increment(Result.Bytes); + + if (Result.Error) + { + Stats.CacheErrorCount.Increment(1); + + ZEN_ERROR("get cache values(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", + Endpoint->GetEndpointInfo().Url, + Result.Error.Reason, + Result.Error.ErrorCode); + } + + RemainingKeys = std::move(Missing); + } + } + + for (CacheValueRequest* RequestPtr : RemainingKeys) + { + OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); + } + } + virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) override { if (m_RunState.IsRunning && m_Options.WriteUpstream && m_Endpoints.size() > 0) |