diff options
| author | Dan Engelbrecht <[email protected]> | 2022-09-06 16:42:59 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-09-06 07:42:59 -0700 |
| commit | f9ddb13500f41549fa472f6aaffa096d64554145 (patch) | |
| tree | 24de689afc0cdb6cd7dd99ce85d66b7223630c64 | |
| parent | updated codeowners (#157) (diff) | |
| download | zen-f9ddb13500f41549fa472f6aaffa096d64554145.tar.xz zen-f9ddb13500f41549fa472f6aaffa096d64554145.zip | |
Implement proper GetCacheValues upstream (#155)
* Implement proper GetCacheValues upstream
* changelog
| -rw-r--r-- | CHANGELOG.md | 1 | ||||
| -rw-r--r-- | VERSION.txt | 2 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 51 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 249 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 24 | ||||
| -rw-r--r-- | zenutil/include/zenutil/cache/cachekey.h | 6 |
6 files changed, 298 insertions, 35 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d5a41975..b3378372e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - Improvement: namespace/bucket validation now uses AsciiSet for more efficient validation - Improvement: Frontend: simplified content-type logic - Improvement: Improved message indicating no GC is scheduled +- Improvement: Implement proper GetCacheValues upstream path - Bugfix: Fixed issue in CbPackage marshaling of local reference - Bugfix: Fix crash when switching Zen upstream configured via DNS when one endpoint becomes unresposive - Bugfix: Fixed issue where projects would not be discovered via DiscoverProjects due to use of stem() vs filename() diff --git a/VERSION.txt b/VERSION.txt index 650599101..c812ecb80 100644 --- a/VERSION.txt +++ b/VERSION.txt @@ -1 +1 @@ -0.1.4-pre24
\ No newline at end of file +0.1.4-pre26
\ No newline at end of file diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 1f89e7362..f8cbfa55c 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -1718,6 +1718,8 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http { CacheKey Key; CachePolicy Policy; + IoHash RawHash = IoHash::Zero; + uint64_t RawSize = 0; CompressedBuffer Result; }; std::vector<RequestData> Requests; @@ -1788,35 +1790,45 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http if (!RemoteRequestIndexes.empty()) { - std::vector<CacheChunkRequest> RequestedRecordsData; - std::vector<CacheChunkRequest*> CacheChunkRequests; + std::vector<CacheValueRequest> RequestedRecordsData; + std::vector<CacheValueRequest*> CacheValueRequests; RequestedRecordsData.reserve(RemoteRequestIndexes.size()); - CacheChunkRequests.reserve(RemoteRequestIndexes.size()); + CacheValueRequests.reserve(RemoteRequestIndexes.size()); for (size_t Index : RemoteRequestIndexes) { RequestData& Request = Requests[Index]; - RequestedRecordsData.push_back({.Key = {Request.Key.Bucket, Request.Key.Hash}}); - CacheChunkRequests.push_back(&RequestedRecordsData.back()); + RequestedRecordsData.push_back({.Key = {Request.Key.Bucket, Request.Key.Hash}, .Policy = ConvertToUpstream(Request.Policy)}); + CacheValueRequests.push_back(&RequestedRecordsData.back()); } Stopwatch Timer; m_UpstreamCache.GetCacheValues( *Namespace, - CacheChunkRequests, + CacheValueRequests, [this, Namespace, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer](CacheValueGetCompleteParams&& Params) { - CacheChunkRequest& ChunkRequest = Params.Request; - if (Params.Value) + CacheValueRequest& ChunkRequest = Params.Request; + if (Params.RawHash != IoHash::Zero) { size_t RequestOffset = std::distance(RequestedRecordsData.data(), &ChunkRequest); size_t RequestIndex = RemoteRequestIndexes[RequestOffset]; RequestData& Request = Requests[RequestIndex]; - Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)); - if (Request.Result && IsCompressedBinary(Params.Value.GetContentType())) + Request.RawHash = Params.RawHash; + Request.RawSize = Params.RawSize; + const bool HasData = IsCompressedBinary(Params.Value.GetContentType()); + const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData); + const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal); + const bool IsHit = SkipData || HasData; + if (IsHit) { - // TODO: Respect the StoreLocal flag once we have upstream existence-only checks. For now the requirement - // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive - // for us to keep the data only on the upstream server. - // if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) - m_CacheStore.Put(*Namespace, Request.Key.Bucket, Request.Key.Hash, ZenCacheValue{Params.Value}); + if (HasData && !SkipData) + { + Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)); + } + + if (HasData && StoreData) + { + m_CacheStore.Put(*Namespace, Request.Key.Bucket, Request.Key.Hash, ZenCacheValue{Params.Value}); + } + ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", *Namespace, ChunkRequest.Key.Bucket, @@ -1864,6 +1876,11 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http ResponseObject.AddInteger("RawSize"sv, Result.GetRawSize()); } } + else if (Request.RawHash != IoHash::Zero) + { + ResponseObject.AddHash("RawHash"sv, Request.RawHash); + ResponseObject.AddInteger("RawSize"sv, Request.RawSize); + } } ResponseObject.EndObject(); } @@ -2260,7 +2277,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names if (!UpstreamChunks.empty()) { - const auto OnCacheValueGetComplete = [this, Namespace, &RequestKeys, &Requests](CacheValueGetCompleteParams&& Params) { + const auto OnCacheChunksGetComplete = [this, Namespace, &RequestKeys, &Requests](CacheChunkGetCompleteParams&& Params) { if (Params.RawHash == Params.RawHash.Zero) { return; @@ -2304,7 +2321,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names m_CacheStats.UpstreamHitCount++; }; - m_UpstreamCache.GetCacheValues(Namespace, UpstreamChunks, std::move(OnCacheValueGetComplete)); + m_UpstreamCache.GetCacheChunks(Namespace, UpstreamChunks, std::move(OnCacheChunksGetComplete)); } } diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index a897c21fd..7f5759e47 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -378,11 +378,11 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, + virtual GetUpstreamCacheResult GetCacheChunks(std::string_view Namespace, std::span<CacheChunkRequest*> CacheChunkRequests, - OnCacheValueGetComplete&& OnComplete) override final + OnCacheChunksGetComplete&& OnComplete) override final { - ZEN_TRACE_CPU("Upstream::Horde::GetCacheValues"); + ZEN_TRACE_CPU("Upstream::Horde::GetCacheChunks"); CloudCacheSession Session(m_Client); GetUpstreamCacheResult Result; @@ -424,6 +424,52 @@ namespace detail { return Result; } + virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, + std::span<CacheValueRequest*> CacheValueRequests, + OnCacheValueGetComplete&& OnComplete) override final + { + ZEN_TRACE_CPU("Upstream::Horde::GetCacheValues"); + + CloudCacheSession Session(m_Client); + GetUpstreamCacheResult Result; + + for (CacheValueRequest* RequestPtr : CacheValueRequests) + { + CacheValueRequest& Request = *RequestPtr; + IoBuffer Payload; + + CompressedBuffer Compressed; + if (!Result.Error) + { + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); + const CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, IoHash::Zero); + Payload = BlobResult.Response; + + AppendResult(BlobResult, Result); + + m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); + if (Payload && IsCompressedBinary(Payload.GetContentType())) + { + Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); + } + } + + if (Compressed) + { + OnComplete({.Request = Request, + .RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()), + .RawSize = Compressed.GetRawSize(), + .Value = Payload}); + } + else + { + OnComplete({.Request = Request, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); + } + } + + return Result; + } + virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, std::span<IoBuffer const> Values) override @@ -851,7 +897,7 @@ namespace detail { const CacheKey& CacheKey, const IoHash& ValueContentId) override { - ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheValue"); + ZEN_TRACE_CPU("Upstream::Zen::GetCacheValue"); try { @@ -881,22 +927,130 @@ namespace detail { } virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, - std::span<CacheChunkRequest*> CacheChunkRequests, + std::span<CacheValueRequest*> CacheValueRequests, OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::Zen::GetCacheValues"); + ZEN_ASSERT(!CacheValueRequests.empty()); + + CbObjectWriter BatchRequest; + BatchRequest << "Method"sv + << "GetCacheValues"sv; + + BatchRequest.BeginObject("Params"sv); + { + CachePolicy DefaultPolicy = CacheValueRequests[0]->Policy; + BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy).ToView(); + BatchRequest << "Namespace"sv << Namespace; + + BatchRequest.BeginArray("Requests"sv); + { + for (CacheValueRequest* RequestPtr : CacheValueRequests) + { + const CacheValueRequest& Request = *RequestPtr; + + BatchRequest.BeginObject(); + { + BatchRequest.BeginObject("Key"sv); + BatchRequest << "Bucket"sv << Request.Key.Bucket; + BatchRequest << "Hash"sv << Request.Key.Hash; + BatchRequest.EndObject(); + if (Request.Policy != DefaultPolicy) + { + BatchRequest << "Policy"sv << WriteToString<128>(Request.Policy).ToView(); + } + } + BatchRequest.EndObject(); + } + } + BatchRequest.EndArray(); + } + BatchRequest.EndObject(); + + CbPackage BatchResponse; + ZenCacheResult Result; + + { + ZenStructuredCacheSession Session(GetClientRef()); + Result = Session.InvokeRpc(BatchRequest.Save()); + } + + m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); + + if (Result.Success) + { + if (BatchResponse.TryLoad(Result.Response)) + { + CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView(); + if (CacheValueRequests.size() != Results.Num()) + { + ZEN_WARN("Upstream::Zen::GetCacheValues invalid number of Requests from Upstream."); + } + else + { + for (size_t RequestIndex = 0; CbFieldView ChunkField : Results) + { + CacheValueRequest& Request = *CacheValueRequests[RequestIndex++]; + CbObjectView ChunkObject = ChunkField.AsObjectView(); + IoHash RawHash = ChunkObject["RawHash"sv].AsHash(); + IoBuffer Payload; + uint64_t RawSize = 0; + if (RawHash != IoHash::Zero) + { + bool Success = false; + const CbAttachment* Attachment = BatchResponse.FindAttachment(RawHash); + if (Attachment) + { + if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary()) + { + Payload = Compressed.GetCompressed().Flatten().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCompressedBinary); + RawSize = Compressed.GetRawSize(); + Success = true; + } + } + if (!Success) + { + CbFieldView RawSizeField = ChunkObject["RawSize"sv]; + RawSize = RawSizeField.AsUInt64(); + Success = !RawSizeField.HasError(); + } + if (!Success) + { + RawHash = IoHash::Zero; + } + } + OnComplete({.Request = Request, .RawHash = RawHash, .RawSize = RawSize, .Value = std::move(Payload)}); + } + + return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; + } + } + } + + for (CacheValueRequest* RequestPtr : CacheValueRequests) + { + OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); + } + + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } + + virtual GetUpstreamCacheResult GetCacheChunks(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheChunksGetComplete&& OnComplete) override final + { + ZEN_TRACE_CPU("Upstream::Zen::GetCacheChunks"); ZEN_ASSERT(!CacheChunkRequests.empty()); CbObjectWriter BatchRequest; BatchRequest << "Method"sv << "GetCacheChunks"sv; - BatchRequest << "Namespace"sv << Namespace; BatchRequest.BeginObject("Params"sv); { CachePolicy DefaultPolicy = CacheChunkRequests[0]->Policy; BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy).ToView(); - BatchRequest << "Namespace"sv << Namespace; BatchRequest.BeginArray("ChunkRequests"sv); @@ -956,7 +1110,7 @@ namespace detail { CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView(); if (CacheChunkRequests.size() != Results.Num()) { - ZEN_WARN("Upstream::Zen::GetCacheValues invalid number of Requests from Upstream."); + ZEN_WARN("Upstream::Zen::GetCacheChunks invalid number of Requests from Upstream."); } else { @@ -1401,11 +1555,11 @@ public: } } - virtual void GetCacheValues(std::string_view Namespace, + virtual void GetCacheChunks(std::string_view Namespace, std::span<CacheChunkRequest*> CacheChunkRequests, - OnCacheValueGetComplete&& OnComplete) override final + OnCacheChunksGetComplete&& OnComplete) override final { - ZEN_TRACE_CPU("Upstream::GetCacheValues"); + ZEN_TRACE_CPU("Upstream::GetCacheChunks"); std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); @@ -1431,10 +1585,10 @@ public: { metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming); - Result = Endpoint->GetCacheValues(Namespace, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) { + Result = Endpoint->GetCacheChunks(Namespace, RemainingKeys, [&](CacheChunkGetCompleteParams&& Params) { if (Params.RawHash != Params.RawHash.Zero) { - OnComplete(std::forward<CacheValueGetCompleteParams>(Params)); + OnComplete(std::forward<CacheChunkGetCompleteParams>(Params)); Stats.CacheHitCount.Increment(1); } @@ -1462,7 +1616,7 @@ public: } } - for (CacheChunkRequest* RequestPtr : CacheChunkRequests) + for (CacheChunkRequest* RequestPtr : RemainingKeys) { OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); } @@ -1516,6 +1670,73 @@ public: return {}; } + virtual void GetCacheValues(std::string_view Namespace, + std::span<CacheValueRequest*> CacheValueRequests, + OnCacheValueGetComplete&& OnComplete) override final + { + ZEN_TRACE_CPU("Upstream::GetCacheValues"); + + std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); + + std::vector<CacheValueRequest*> RemainingKeys(CacheValueRequests.begin(), CacheValueRequests.end()); + + if (m_Options.ReadUpstream) + { + for (auto& Endpoint : m_Endpoints) + { + if (RemainingKeys.empty()) + { + break; + } + + if (Endpoint->GetState() != UpstreamEndpointState::kOk) + { + continue; + } + + UpstreamEndpointStats& Stats = Endpoint->Stats(); + std::vector<CacheValueRequest*> Missing; + GetUpstreamCacheResult Result; + { + metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming); + + Result = Endpoint->GetCacheValues(Namespace, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) { + if (Params.RawHash != Params.RawHash.Zero) + { + OnComplete(std::forward<CacheValueGetCompleteParams>(Params)); + + Stats.CacheHitCount.Increment(1); + } + else + { + Missing.push_back(&Params.Request); + } + }); + } + + Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size())); + Stats.CacheGetTotalBytes.Increment(Result.Bytes); + + if (Result.Error) + { + Stats.CacheErrorCount.Increment(1); + + ZEN_ERROR("get cache values(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", + Endpoint->GetEndpointInfo().Url, + Result.Error.Reason, + Result.Error.ErrorCode); + } + + RemainingKeys = std::move(Missing); + } + } + + for (CacheValueRequest* RequestPtr : RemainingKeys) + { + OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); + } + } + virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) override { if (m_RunState.IsRunning && m_Options.WriteUpstream && m_Endpoints.size() > 0) diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 13548efc8..5b154a1b5 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -81,7 +81,7 @@ using OnCacheRecordGetComplete = std::function<void(CacheRecordGetCompleteParams struct CacheValueGetCompleteParams { - CacheChunkRequest& Request; + CacheValueRequest& Request; IoHash RawHash; uint64_t RawSize; IoBuffer Value; @@ -89,6 +89,16 @@ struct CacheValueGetCompleteParams using OnCacheValueGetComplete = std::function<void(CacheValueGetCompleteParams&&)>; +struct CacheChunkGetCompleteParams +{ + CacheChunkRequest& Request; + IoHash RawHash; + uint64_t RawSize; + IoBuffer Value; +}; + +using OnCacheChunksGetComplete = std::function<void(CacheChunkGetCompleteParams&&)>; + struct UpstreamEndpointStats { metrics::OperationTiming CacheGetRequestTiming; @@ -171,9 +181,13 @@ public: virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, const CacheKey& CacheKey, const IoHash& PayloadId) = 0; virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, - std::span<CacheChunkRequest*> CacheChunkRequests, + std::span<CacheValueRequest*> CacheValueRequests, OnCacheValueGetComplete&& OnComplete) = 0; + virtual GetUpstreamCacheResult GetCacheChunks(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheChunksGetComplete&& OnComplete) = 0; + virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, std::span<IoBuffer const> Payloads) = 0; @@ -207,9 +221,13 @@ public: virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, const CacheKey& CacheKey, const IoHash& ValueContentId) = 0; virtual void GetCacheValues(std::string_view Namespace, - std::span<CacheChunkRequest*> CacheChunkRequests, + std::span<CacheValueRequest*> CacheValueRequests, OnCacheValueGetComplete&& OnComplete) = 0; + virtual void GetCacheChunks(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheChunksGetComplete&& OnComplete) = 0; + virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; virtual void GetStatus(CbObjectWriter& CbO) = 0; diff --git a/zenutil/include/zenutil/cache/cachekey.h b/zenutil/include/zenutil/cache/cachekey.h index 9adde8fc7..741375946 100644 --- a/zenutil/include/zenutil/cache/cachekey.h +++ b/zenutil/include/zenutil/cache/cachekey.h @@ -47,6 +47,12 @@ struct CacheKeyRequest CacheRecordPolicy Policy; }; +struct CacheValueRequest +{ + CacheKey Key; + CachePolicy Policy = CachePolicy::Default; +}; + inline bool operator<(const CacheChunkRequest& A, const CacheChunkRequest& B) { |