diff options
| author | Dan Engelbrecht <[email protected]> | 2022-09-06 16:42:59 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-09-06 07:42:59 -0700 |
| commit | f9ddb13500f41549fa472f6aaffa096d64554145 (patch) | |
| tree | 24de689afc0cdb6cd7dd99ce85d66b7223630c64 /zenserver/cache/structuredcache.cpp | |
| parent | updated codeowners (#157) (diff) | |
| download | zen-f9ddb13500f41549fa472f6aaffa096d64554145.tar.xz zen-f9ddb13500f41549fa472f6aaffa096d64554145.zip | |
Implement proper GetCacheValues upstream (#155)
* Implement proper GetCacheValues upstream
* changelog
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 51 |
1 files changed, 34 insertions, 17 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 1f89e7362..f8cbfa55c 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -1718,6 +1718,8 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http { CacheKey Key; CachePolicy Policy; + IoHash RawHash = IoHash::Zero; + uint64_t RawSize = 0; CompressedBuffer Result; }; std::vector<RequestData> Requests; @@ -1788,35 +1790,45 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http if (!RemoteRequestIndexes.empty()) { - std::vector<CacheChunkRequest> RequestedRecordsData; - std::vector<CacheChunkRequest*> CacheChunkRequests; + std::vector<CacheValueRequest> RequestedRecordsData; + std::vector<CacheValueRequest*> CacheValueRequests; RequestedRecordsData.reserve(RemoteRequestIndexes.size()); - CacheChunkRequests.reserve(RemoteRequestIndexes.size()); + CacheValueRequests.reserve(RemoteRequestIndexes.size()); for (size_t Index : RemoteRequestIndexes) { RequestData& Request = Requests[Index]; - RequestedRecordsData.push_back({.Key = {Request.Key.Bucket, Request.Key.Hash}}); - CacheChunkRequests.push_back(&RequestedRecordsData.back()); + RequestedRecordsData.push_back({.Key = {Request.Key.Bucket, Request.Key.Hash}, .Policy = ConvertToUpstream(Request.Policy)}); + CacheValueRequests.push_back(&RequestedRecordsData.back()); } Stopwatch Timer; m_UpstreamCache.GetCacheValues( *Namespace, - CacheChunkRequests, + CacheValueRequests, [this, Namespace, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer](CacheValueGetCompleteParams&& Params) { - CacheChunkRequest& ChunkRequest = Params.Request; - if (Params.Value) + CacheValueRequest& ChunkRequest = Params.Request; + if (Params.RawHash != IoHash::Zero) { size_t RequestOffset = std::distance(RequestedRecordsData.data(), &ChunkRequest); size_t RequestIndex = RemoteRequestIndexes[RequestOffset]; RequestData& Request = Requests[RequestIndex]; - Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)); - if (Request.Result && IsCompressedBinary(Params.Value.GetContentType())) + Request.RawHash = Params.RawHash; + Request.RawSize = Params.RawSize; + const bool HasData = IsCompressedBinary(Params.Value.GetContentType()); + const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData); + const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal); + const bool IsHit = SkipData || HasData; + if (IsHit) { - // TODO: Respect the StoreLocal flag once we have upstream existence-only checks. For now the requirement - // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive - // for us to keep the data only on the upstream server. - // if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) - m_CacheStore.Put(*Namespace, Request.Key.Bucket, Request.Key.Hash, ZenCacheValue{Params.Value}); + if (HasData && !SkipData) + { + Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)); + } + + if (HasData && StoreData) + { + m_CacheStore.Put(*Namespace, Request.Key.Bucket, Request.Key.Hash, ZenCacheValue{Params.Value}); + } + ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", *Namespace, ChunkRequest.Key.Bucket, @@ -1864,6 +1876,11 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http ResponseObject.AddInteger("RawSize"sv, Result.GetRawSize()); } } + else if (Request.RawHash != IoHash::Zero) + { + ResponseObject.AddHash("RawHash"sv, Request.RawHash); + ResponseObject.AddInteger("RawSize"sv, Request.RawSize); + } } ResponseObject.EndObject(); } @@ -2260,7 +2277,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names if (!UpstreamChunks.empty()) { - const auto OnCacheValueGetComplete = [this, Namespace, &RequestKeys, &Requests](CacheValueGetCompleteParams&& Params) { + const auto OnCacheChunksGetComplete = [this, Namespace, &RequestKeys, &Requests](CacheChunkGetCompleteParams&& Params) { if (Params.RawHash == Params.RawHash.Zero) { return; @@ -2304,7 +2321,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names m_CacheStats.UpstreamHitCount++; }; - m_UpstreamCache.GetCacheValues(Namespace, UpstreamChunks, std::move(OnCacheValueGetComplete)); + m_UpstreamCache.GetCacheChunks(Namespace, UpstreamChunks, std::move(OnCacheChunksGetComplete)); } } |