diff options
| author | mattpetersepic <[email protected]> | 2022-02-01 08:06:36 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-02-01 08:06:36 -0700 |
| commit | 154743f2d2ff2b7163bcf8d7b76eea3e3579aaba (patch) | |
| tree | aef417b5c9a0d5502c7afdb01c4cc598071e956d /zenserver/upstream/upstreamcache.cpp | |
| parent | Tweaked remote_build.py TTY output (diff) | |
| download | zen-154743f2d2ff2b7163bcf8d7b76eea3e3579aaba.tar.xz zen-154743f2d2ff2b7163bcf8d7b76eea3e3579aaba.zip | |
Cache policy support (#47)
Add HandleRpc methods for the remaining ICacheStore requests from unreal: PutCacheValues/GetCacheValues. We now have batched versions for PutCacheRecords,GetCacheRecords,PutCacheValues,GetCacheValues,GetCacheChunks. Add support for CachePolicy flags to all of these batched methods.
* Add Batched PutCacheValues/GetCacheValues. Rename old GetCacheValues to GetCacheChunks.
* HandleRpcGetCacheRecords: Receive a CacheRecordPolicy with each key, and skipdata on attachments we already have.
* Changes to CachePolicy copied from Release-5.0 depot. Change serialization to use the key BasePolicy instead of DefaultValuePolicy.
* GetChunks: Read CacheRecords from remote if necessary to find ContentId. Implement QueryLocal, StoreLocal, and SkipData.
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 261 |
1 files changed, 159 insertions, 102 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 657cfb729..9d3ed2f94 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -18,6 +18,7 @@ #include <zenstore/cas.h> #include <zenstore/cidstore.h> +#include "cache/structuredcache.h" #include "cache/structuredcachestore.h" #include "diag/logging.h" @@ -215,21 +216,16 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKey> CacheKeys, - std::span<size_t> KeyIndex, - const CacheRecordPolicy& Policy, - OnCacheRecordGetComplete&& OnComplete) override + virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) override { ZEN_TRACE_CPU("Upstream::Horde::GetCacheRecords"); - ZEN_UNUSED(Policy); - CloudCacheSession Session(m_Client); GetUpstreamCacheResult Result; - for (size_t Index : KeyIndex) + for (CacheKeyRequest* Request : Requests) { - const CacheKey& CacheKey = CacheKeys[Index]; + const CacheKey& CacheKey = Request->Key; CbPackage Package; CbObject Record; @@ -264,7 +260,7 @@ namespace detail { } } - OnComplete({.Key = CacheKey, .KeyIndex = Index, .Record = Record, .Package = Package}); + OnComplete({.Request = *Request, .Record = Record, .Package = Package}); } return Result; @@ -301,20 +297,20 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> RequestIndex, - OnCacheValueGetComplete&& OnComplete) override final + virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::Horde::GetCacheValues"); CloudCacheSession Session(m_Client); GetUpstreamCacheResult Result; - for (size_t Index : RequestIndex) + for (CacheChunkRequest* RequestPtr : CacheChunkRequests) { - const CacheChunkRequest& Request = CacheChunkRequests[Index]; - IoBuffer Payload; + CacheChunkRequest& Request = *RequestPtr; + IoBuffer Payload; + CompressedBuffer Compressed; if (!Result.Error) { const CloudCacheResult BlobResult = Session.GetCompressedBlob(Request.ChunkId); @@ -323,9 +319,23 @@ namespace detail { AppendResult(BlobResult, Result); m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); + if (Payload && IsCompressedBinary(Payload.GetContentType())) + { + Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); + } } - OnComplete({.Request = Request, .RequestIndex = Index, .Value = Payload}); + if (Compressed) + { + OnComplete({.Request = Request, + .RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()), + .RawSize = Compressed.GetRawSize(), + .Value = Payload}); + } + else + { + OnComplete({.Request = Request, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); + } } return Result; @@ -620,15 +630,10 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKey> CacheKeys, - std::span<size_t> KeyIndex, - const CacheRecordPolicy& Policy, - OnCacheRecordGetComplete&& OnComplete) override + virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) override { ZEN_TRACE_CPU("Upstream::Zen::GetCacheRecords"); - - std::vector<size_t> IndexMap; - IndexMap.reserve(KeyIndex.size()); + ZEN_ASSERT(Requests.size() > 0); CbObjectWriter BatchRequest; BatchRequest << "Method"sv @@ -636,21 +641,30 @@ namespace detail { BatchRequest.BeginObject("Params"sv); { - BatchRequest.BeginArray("CacheKeys"sv); - for (size_t Index : KeyIndex) - { - const CacheKey& Key = CacheKeys[Index]; - IndexMap.push_back(Index); + CachePolicy DefaultPolicy = Requests[0]->Policy.GetRecordPolicy(); + BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy); + BatchRequest.BeginArray("Requests"sv); + for (CacheKeyRequest* Request : Requests) + { BatchRequest.BeginObject(); - BatchRequest << "Bucket"sv << Key.Bucket; - BatchRequest << "Hash"sv << Key.Hash; + { + const CacheKey& Key = Request->Key; + BatchRequest.BeginObject("Key"sv); + { + BatchRequest << "Bucket"sv << Key.Bucket; + BatchRequest << "Hash"sv << Key.Hash; + } + BatchRequest.EndObject(); + if (!Request->Policy.IsUniform() || Request->Policy.GetRecordPolicy() != DefaultPolicy) + { + BatchRequest.SetName("Policy"sv); + Request->Policy.Save(BatchRequest); + } + } BatchRequest.EndObject(); } BatchRequest.EndArray(); - - BatchRequest.SetName("Policy"sv); - Policy.Save(BatchRequest); } BatchRequest.EndObject(); @@ -668,19 +682,27 @@ namespace detail { { if (BatchResponse.TryLoad(Result.Response)) { - for (size_t LocalIndex = 0; CbFieldView Record : BatchResponse.GetObject()["Result"sv]) + CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView(); + if (Results.Num() != Requests.size()) { - const size_t Index = IndexMap[LocalIndex++]; - OnComplete({.Key = CacheKeys[Index], .KeyIndex = Index, .Record = Record.AsObjectView(), .Package = BatchResponse}); + ZEN_WARN("Upstream::Zen::GetCacheRecords invalid number of Requests from Upstream."); } + else + { + for (size_t Index = 0; CbFieldView Record : Results) + { + CacheKeyRequest* Request = Requests[Index++]; + OnComplete({.Request = *Request, .Record = Record.AsObjectView(), .Package = BatchResponse}); + } - return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; + return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; + } } } - for (size_t Index : KeyIndex) + for (CacheKeyRequest* Request : Requests) { - OnComplete({.Key = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); + OnComplete({.Request = *Request, .Record = CbObjectView(), .Package = CbPackage()}); } return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; @@ -717,27 +739,28 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> RequestIndex, - OnCacheValueGetComplete&& OnComplete) override final + virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::Zen::GetCacheValues"); - - std::vector<size_t> IndexMap; - IndexMap.reserve(RequestIndex.size()); + ZEN_ASSERT(!CacheChunkRequests.empty()); CbObjectWriter BatchRequest; BatchRequest << "Method"sv - << "GetCacheValues"; + << "GetCacheChunks"; +#if BACKWARDS_COMPATABILITY_JAN2022 + BatchRequest.AddInteger("MethodVersion"sv, 1); +#endif BatchRequest.BeginObject("Params"sv); { + CachePolicy DefaultPolicy = CacheChunkRequests[0]->Policy; + BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy).ToView(); BatchRequest.BeginArray("ChunkRequests"sv); { - for (size_t Index : RequestIndex) + for (CacheChunkRequest* RequestPtr : CacheChunkRequests) { - const CacheChunkRequest& Request = CacheChunkRequests[Index]; - IndexMap.push_back(Index); + const CacheChunkRequest& Request = *RequestPtr; BatchRequest.BeginObject(); { @@ -745,11 +768,26 @@ namespace detail { BatchRequest << "Bucket"sv << Request.Key.Bucket; BatchRequest << "Hash"sv << Request.Key.Hash; BatchRequest.EndObject(); - BatchRequest.AddObjectId("ValueId"sv, Request.ValueId); - BatchRequest << "ChunkId"sv << Request.ChunkId; - BatchRequest << "RawOffset"sv << Request.RawOffset; - BatchRequest << "RawSize"sv << Request.RawSize; - BatchRequest << "Policy"sv << WriteToString<128>(Request.Policy).ToView(); + if (Request.ValueId) + { + BatchRequest.AddObjectId("ValueId"sv, Request.ValueId); + } + if (Request.ChunkId != Request.ChunkId.Zero) + { + BatchRequest << "ChunkId"sv << Request.ChunkId; + } + if (Request.RawOffset != 0) + { + BatchRequest << "RawOffset"sv << Request.RawOffset; + } + if (Request.RawSize != UINT64_MAX) + { + BatchRequest << "RawSize"sv << Request.RawSize; + } + if (Request.Policy != DefaultPolicy) + { + BatchRequest << "Policy"sv << WriteToString<128>(Request.Policy).ToView(); + } } BatchRequest.EndObject(); } @@ -772,29 +810,56 @@ namespace detail { { if (BatchResponse.TryLoad(Result.Response)) { - for (size_t LocalIndex = 0; CbFieldView AttachmentHash : BatchResponse.GetObject()["Result"sv]) + CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView(); + if (CacheChunkRequests.size() != Results.Num()) { - const size_t Index = IndexMap[LocalIndex++]; - IoBuffer Payload; - - if (const CbAttachment* Attachment = BatchResponse.FindAttachment(AttachmentHash.AsHash())) + ZEN_WARN("Upstream::Zen::GetCacheValues invalid number of Requests from Upstream."); + } + else + { + for (size_t RequestIndex = 0; CbFieldView ChunkField : Results) { - if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary()) + CacheChunkRequest& Request = *CacheChunkRequests[RequestIndex++]; + CbObjectView ChunkObject = ChunkField.AsObjectView(); + IoHash RawHash = ChunkObject["RawHash"sv].AsHash(); + IoBuffer Payload; + uint64_t RawSize = 0; + if (RawHash != IoHash::Zero) { - Payload = Compressed.GetCompressed().Flatten().AsIoBuffer(); + bool Success = false; + const CbAttachment* Attachment = BatchResponse.FindAttachment(RawHash); + if (Attachment) + { + if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary()) + { + Payload = Compressed.GetCompressed().Flatten().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCompressedBinary); + RawSize = Compressed.GetRawSize(); + Success = true; + } + } + if (!Success) + { + CbFieldView RawSizeField = ChunkObject["RawSize"sv]; + RawSize = RawSizeField.AsUInt64(); + Success = !RawSizeField.HasError(); + } + if (!Success) + { + RawHash = IoHash::Zero; + } } + OnComplete({.Request = Request, .RawHash = RawHash, .RawSize = RawSize, .Value = std::move(Payload)}); } - OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = std::move(Payload)}); + return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; } - - return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; } } - for (size_t Index : RequestIndex) + for (CacheChunkRequest* RequestPtr : CacheChunkRequests) { - OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = IoBuffer()}); + OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); } return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; @@ -1045,21 +1110,16 @@ public: return {}; } - virtual void GetCacheRecords(std::span<CacheKey> CacheKeys, - std::span<size_t> KeyIndex, - const CacheRecordPolicy& DownstreamPolicy, - OnCacheRecordGetComplete&& OnComplete) override final + virtual void GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::GetCacheRecords"); std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); - std::vector<size_t> RemainingKeys(KeyIndex.begin(), KeyIndex.end()); + std::vector<CacheKeyRequest*> RemainingKeys(Requests.begin(), Requests.end()); if (m_Options.ReadUpstream) { - CacheRecordPolicy UpstreamPolicy = DownstreamPolicy.ConvertToUpstream(); - for (auto& Endpoint : m_Endpoints) { if (RemainingKeys.empty()) @@ -1072,25 +1132,24 @@ public: continue; } - UpstreamEndpointStats& Stats = Endpoint->Stats(); - std::vector<size_t> Missing; - GetUpstreamCacheResult Result; + UpstreamEndpointStats& Stats = Endpoint->Stats(); + std::vector<CacheKeyRequest*> Missing; + GetUpstreamCacheResult Result; { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = - Endpoint->GetCacheRecords(CacheKeys, RemainingKeys, UpstreamPolicy, [&](CacheRecordGetCompleteParams&& Params) { - if (Params.Record) - { - OnComplete(std::forward<CacheRecordGetCompleteParams>(Params)); + Result = Endpoint->GetCacheRecords(RemainingKeys, [&](CacheRecordGetCompleteParams&& Params) { + if (Params.Record) + { + OnComplete(std::forward<CacheRecordGetCompleteParams>(Params)); - Stats.CacheHitCount.Increment(1); - } - else - { - Missing.push_back(Params.KeyIndex); - } - }); + Stats.CacheHitCount.Increment(1); + } + else + { + Missing.push_back(&Params.Request); + } + }); } Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size())); @@ -1110,21 +1169,19 @@ public: } } - for (size_t Index : RemainingKeys) + for (CacheKeyRequest* Request : RemainingKeys) { - OnComplete({.Key = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); + OnComplete({.Request = *Request, .Record = CbObjectView(), .Package = CbPackage()}); } } - virtual void GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> RequestIndex, - OnCacheValueGetComplete&& OnComplete) override final + virtual void GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::GetCacheValues"); std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); - std::vector<size_t> RemainingKeys(RequestIndex.begin(), RequestIndex.end()); + std::vector<CacheChunkRequest*> RemainingKeys(CacheChunkRequests.begin(), CacheChunkRequests.end()); if (m_Options.ReadUpstream) { @@ -1140,14 +1197,14 @@ public: continue; } - UpstreamEndpointStats& Stats = Endpoint->Stats(); - std::vector<size_t> Missing; - GetUpstreamCacheResult Result; + UpstreamEndpointStats& Stats = Endpoint->Stats(); + std::vector<CacheChunkRequest*> Missing; + GetUpstreamCacheResult Result; { metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming); - Result = Endpoint->GetCacheValues(CacheChunkRequests, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) { - if (Params.Value) + Result = Endpoint->GetCacheValues(RemainingKeys, [&](CacheValueGetCompleteParams&& Params) { + if (Params.RawHash != Params.RawHash.Zero) { OnComplete(std::forward<CacheValueGetCompleteParams>(Params)); @@ -1155,7 +1212,7 @@ public: } else { - Missing.push_back(Params.RequestIndex); + Missing.push_back(&Params.Request); } }); } @@ -1177,9 +1234,9 @@ public: } } - for (size_t Index : RemainingKeys) + for (CacheChunkRequest* RequestPtr : CacheChunkRequests) { - OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = IoBuffer()}); + OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); } } |