diff options
| author | Dan Engelbrecht <[email protected]> | 2022-05-19 11:37:25 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-05-19 11:37:25 +0200 |
| commit | a9130d34b5318b0da5d3547c432a8734213fbe9b (patch) | |
| tree | 2cdb96f85e221cc24227b410d4d5f8f4e4af7a41 /zenserver | |
| parent | Merge pull request #98 from EpicGames/de/fix-bucket-name-rules (diff) | |
| download | zen-a9130d34b5318b0da5d3547c432a8734213fbe9b.tar.xz zen-a9130d34b5318b0da5d3547c432a8734213fbe9b.zip | |
Keep Namespace out of CacheKey and store it on request level
RPC requests now has a Namespace field under Params instead of one Namespace per cache key
Fall back to legacy upstream HTTP URI format if default namespace is requested
Diffstat (limited to 'zenserver')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 221 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 17 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 103 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 26 | ||||
| -rw-r--r-- | zenserver/upstream/zen.cpp | 47 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 16 |
6 files changed, 272 insertions, 158 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index bc6f31dd3..a349f13e1 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -70,6 +70,7 @@ struct AttachmentCount struct PutRequestData { + std::string Namespace; CacheKey Key; CbObjectView RecordObject; CacheRecordPolicy Policy; @@ -244,30 +245,27 @@ namespace { } } - bool GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key) + std::optional<std::string> GetRpcRequestNamespace(const CbObjectView Params) { - CbFieldView NamespaceField = KeyView["Namespace"sv]; - std::optional<std::string> Namespace; + CbFieldView NamespaceField = Params["Namespace"sv]; if (!NamespaceField) { - Namespace = ZenCacheStore::DefaultNamespace; + return std::string(ZenCacheStore::DefaultNamespace); } - else + + if (NamespaceField.HasError()) { - if (NamespaceField.HasError()) - { - return false; - } - if (!NamespaceField.IsString()) - { - return false; - } - Namespace = GetValidNamespaceName(NamespaceField.AsString()); + return {}; } - if (!Namespace.has_value()) + if (!NamespaceField.IsString()) { - return false; + return {}; } + return GetValidNamespaceName(NamespaceField.AsString()); + } + + bool GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key) + { CbFieldView BucketField = KeyView["Bucket"sv]; if (BucketField.HasError()) { @@ -292,7 +290,7 @@ namespace { return false; } IoHash Hash = HashField.AsHash(); - Key = CacheKey::Create(*Namespace, *Bucket, Hash); + Key = CacheKey::Create(*Bucket, Hash); return true; } @@ -596,7 +594,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming); if (GetUpstreamCacheResult UpstreamResult = - m_UpstreamCache.GetCacheRecord({Ref.Namespace, Ref.BucketSegment, Ref.HashKey}, AcceptType); + m_UpstreamCache.GetCacheRecord(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, AcceptType); UpstreamResult.Success) { Success = true; @@ -769,7 +767,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote)) { - m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Key = {Ref.Namespace, Ref.BucketSegment, Ref.HashKey}}); + m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}}); } Request.WriteResponse(HttpResponseCode::Created); @@ -819,7 +817,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject, - .Key = {Ref.Namespace, Ref.BucketSegment, Ref.HashKey}, + .Namespace = Ref.Namespace, + .Key = {Ref.BucketSegment, Ref.HashKey}, .ValueContentIds = std::move(ValidAttachments)}); } @@ -904,7 +903,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, - .Key = {Ref.Namespace, Ref.BucketSegment, Ref.HashKey}, + .Namespace = Ref.Namespace, + .Key = {Ref.BucketSegment, Ref.HashKey}, .ValueContentIds = std::move(ValidAttachments)}); } @@ -946,7 +946,7 @@ HttpStructuredCacheService::HandleGetCacheValue(zen::HttpServerRequest& Request, if (QueryUpstream) { - if (auto UpstreamResult = m_UpstreamCache.GetCacheValue({Ref.Namespace, Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); + if (auto UpstreamResult = m_UpstreamCache.GetCacheValue(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); UpstreamResult.Success) { if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) @@ -1124,8 +1124,13 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv); - std::string_view PolicyText = Params["DefaultPolicy"].AsString(); - DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::string_view PolicyText = Params["DefaultPolicy"].AsString(); + std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); + if (!Namespace) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; std::vector<bool> Results; for (CbFieldView RequestField : Params["Requests"sv]) { @@ -1139,7 +1144,7 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req return Request.WriteResponse(HttpResponseCode::BadRequest); } CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); - PutRequestData PutRequest{std::move(Key), RecordObject, std::move(Policy)}; + PutRequestData PutRequest{*Namespace, std::move(Key), RecordObject, std::move(Policy)}; PutResult Result = PutCacheRecord(PutRequest, &BatchRequest); @@ -1203,7 +1208,7 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack else { ZEN_WARN("PUT - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", - Request.Key.Namespace, + Request.Namespace, Request.Key.Bucket, Request.Key.Hash, ToString(HttpContentType::kCbPackage), @@ -1225,7 +1230,7 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack } ZEN_DEBUG("PUT - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)", - Request.Key.Namespace, + Request.Namespace, Request.Key.Bucket, Request.Key.Hash, NiceBytes(TransferredSize), @@ -1237,14 +1242,16 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack CacheValue.Value = IoBuffer(Record.GetSize()); Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Request.Key.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue); + m_CacheStore.Put(Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue); const bool IsPartialRecord = Count.Valid != Count.Total; if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord) { - m_UpstreamCache.EnqueueUpstream( - {.Type = ZenContentType::kCbPackage, .Key = Request.Key, .ValueContentIds = std::move(ValidAttachments)}); + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, + .Namespace = Request.Namespace, + .Key = Request.Key, + .ValueContentIds = std::move(ValidAttachments)}); } return PutResult::Success; } @@ -1277,8 +1284,13 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt bool UsedUpstream = false; }; - std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); - CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); + CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); + if (!Namespace) + { + return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); + } std::vector<RecordRequestData> Requests; std::vector<size_t> UpstreamIndexes; CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); @@ -1322,7 +1334,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt ZenCacheValue RecordCacheValue; if (EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryLocal) && - m_CacheStore.Get(Key.Namespace, Key.Bucket, Key.Hash, RecordCacheValue)) + m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, RecordCacheValue)) { Request.RecordCacheValue = std::move(RecordCacheValue.Value); if (Request.RecordCacheValue.GetContentType() != ZenContentType::kCbObject) @@ -1436,7 +1448,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt } } - const auto OnCacheRecordGetComplete = [this, &ParseValues](CacheRecordGetCompleteParams&& Params) { + const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues](CacheRecordGetCompleteParams&& Params) { if (!Params.Record) { return; @@ -1453,7 +1465,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt Request.RecordObject = ObjectBuffer; if (EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal)) { - m_CacheStore.Put(Key.Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}); + m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}); } ParseValues(Request); Request.UsedUpstream = true; @@ -1493,7 +1505,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt { ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}/{}'", Value.ContentId, - Key.Namespace, + *Namespace, Key.Bucket, Key.Hash); } @@ -1510,7 +1522,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt } }; - m_UpstreamCache.GetCacheRecords(UpstreamRequests, std::move(OnCacheRecordGetComplete)); + m_UpstreamCache.GetCacheRecords(*Namespace, UpstreamRequests, std::move(OnCacheRecordGetComplete)); } CbPackage ResponsePackage; @@ -1533,7 +1545,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt } ZEN_DEBUG("HIT - '{}/{}/{}' {}{}{}", - Key.Namespace, + *Namespace, Key.Bucket, Key.Hash, NiceBytes(Request.RecordCacheValue.Size()), @@ -1549,11 +1561,11 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt if (!EnumHasAnyFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::Query)) { // If they requested no query, do not record this as a miss - ZEN_DEBUG("DISABLEDQUERY - '{}/{}/{}'", Key.Namespace, Key.Bucket, Key.Hash); + ZEN_DEBUG("DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash); } else { - ZEN_DEBUG("MISS - '{}/{}/{}' {}", Key.Namespace, Key.Bucket, Key.Hash, Request.RecordObject ? ""sv : "(PARTIAL)"sv); + ZEN_DEBUG("MISS - '{}/{}/{}' {}", *Namespace, Key.Bucket, Key.Hash, Request.RecordObject ? ""sv : "(PARTIAL)"sv); m_CacheStats.MissCount++; } } @@ -1579,8 +1591,13 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheValues"sv); - std::string_view PolicyText = Params["DefaultPolicy"].AsString(); - CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::string_view PolicyText = Params["DefaultPolicy"].AsString(); + CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); + if (!Namespace) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } std::vector<bool> Results; for (CbFieldView RequestField : Params["Requests"sv]) { @@ -1615,21 +1632,21 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ { IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); Value.SetContentType(ZenContentType::kCompressedBinary); - m_CacheStore.Put(Key.Namespace, Key.Bucket, Key.Hash, {.Value = Value}); + m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = Value}); TransferredSize = Chunk.GetCompressedSize(); } Succeeded = true; } else { - ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", Key.Namespace, Key.Bucket, Key.Hash, RawHash); + ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash); return Request.WriteResponse(HttpResponseCode::BadRequest); } } else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { ZenCacheValue ExistingValue; - if (m_CacheStore.Get(Key.Namespace, Key.Bucket, Key.Hash, ExistingValue) && + if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, ExistingValue) && IsCompressedBinary(ExistingValue.Value.GetContentType())) { Succeeded = true; @@ -1640,11 +1657,11 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ if (Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Key = Key}); + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = Key}); } Results.push_back(Succeeded); ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}'", - Key.Namespace, + *Namespace, Key.Bucket, Key.Hash, NiceBytes(TransferredSize), @@ -1679,9 +1696,15 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http { ZEN_TRACE_CPU("Z$::RpcGetCacheValues"); - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); - std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); - CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); + CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::optional<std::string> Namespace = GetRpcRequestNamespace(Params); + if (!Namespace) + { + return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); + } + struct RequestData { CacheKey Key; @@ -1717,7 +1740,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http ZenCacheValue CacheValue; if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { - if (m_CacheStore.Get(Key.Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType())) + if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType())) { Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value)); } @@ -1725,7 +1748,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http if (Result) { ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", - Key.Namespace, + *Namespace, Key.Bucket, Key.Hash, NiceBytes(Result.GetCompressed().GetSize()), @@ -1740,12 +1763,12 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http else if (!EnumHasAnyFlags(Policy, CachePolicy::Query)) { // If they requested no query, do not record this as a miss - ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", Key.Namespace, Key.Bucket, Key.Hash); + ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash); } else { ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}", - Key.Namespace, + *Namespace, Key.Bucket, Key.Hash, "LOCAL"sv, @@ -1763,13 +1786,14 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http for (size_t Index : RemoteRequestIndexes) { RequestData& Request = Requests[Index]; - RequestedRecordsData.push_back({{Request.Key.Namespace, Request.Key.Bucket, Request.Key.Hash}}); + RequestedRecordsData.push_back({Request.Key.Bucket, Request.Key.Hash}); CacheChunkRequests.push_back(&RequestedRecordsData.back()); } Stopwatch Timer; m_UpstreamCache.GetCacheValues( + *Namespace, CacheChunkRequests, - [this, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer](CacheValueGetCompleteParams&& Params) { + [this, Namespace, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer](CacheValueGetCompleteParams&& Params) { CacheChunkRequest& ChunkRequest = Params.Request; if (Params.Value) { @@ -1783,9 +1807,9 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive // for us to keep the data only on the upstream server. // if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) - m_CacheStore.Put(Request.Key.Namespace, Request.Key.Bucket, Request.Key.Hash, ZenCacheValue{Params.Value}); + m_CacheStore.Put(*Namespace, Request.Key.Bucket, Request.Key.Hash, ZenCacheValue{Params.Value}); ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", - ChunkRequest.Key.Namespace, + *Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, NiceBytes(Request.Result.GetCompressed().GetSize()), @@ -1797,7 +1821,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http } } ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}", - ChunkRequest.Key.Namespace, + *Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, "UPSTREAM"sv, @@ -1888,6 +1912,7 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& Http ZEN_TRACE_CPU("Z$::RpcGetCacheChunks"); + std::string Namespace; std::vector<CacheKeyRequest> RecordKeys; // Data about a Record necessary to identify it to the upstream std::vector<RecordBody> Records; // Scratch-space data about a Record when fulfilling RecordRequests std::vector<CacheChunkRequest> RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream @@ -1897,27 +1922,28 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& Http std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests - if (!ParseGetCacheChunksRequest(RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest)) + if (!ParseGetCacheChunksRequest(Namespace, RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest)) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); } // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we // have it locally, and otherwise append a request for the payload to UpstreamChunks - GetLocalCacheRecords(RecordKeys, Records, RecordRequests, UpstreamChunks); + GetLocalCacheRecords(Namespace, RecordKeys, Records, RecordRequests, UpstreamChunks); // For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks - GetLocalCacheValues(ValueRequests, UpstreamChunks); + GetLocalCacheValues(Namespace, ValueRequests, UpstreamChunks); // Call GetCacheChunks on the upstream for any payloads we do not have locally - GetUpstreamCacheChunks(UpstreamChunks, RequestKeys, Requests); + GetUpstreamCacheChunks(Namespace, UpstreamChunks, RequestKeys, Requests); // Send the payload and descriptive data about each chunk to the client - WriteGetCacheChunksResponse(Requests, HttpRequest); + WriteGetCacheChunksResponse(Namespace, Requests, HttpRequest); } bool -HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyRequest>& RecordKeys, +HttpStructuredCacheService::ParseGetCacheChunksRequest(std::string& Namespace, + std::vector<CacheKeyRequest>& RecordKeys, std::vector<cache::detail::RecordBody>& Records, std::vector<CacheChunkRequest>& RequestKeys, std::vector<cache::detail::ChunkRequest>& Requests, @@ -1929,11 +1955,20 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyReque ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv); - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); - std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString(); - CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default; - CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView(); - size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num()); + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString(); + CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default; + + std::optional<std::string> NamespaceText = GetRpcRequestNamespace(Params); + if (!NamespaceText) + { + ZEN_WARN("GetCacheChunks: Invalid namespace in ChunkRequest."); + return false; + } + Namespace = *NamespaceText; + + CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView(); + size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num()); // Note that these reservations allow us to take pointers to the elements while populating them. If the reservation is removed, // we will need to change the pointers to indexes to handle reallocations. @@ -1996,11 +2031,9 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyReque } else { - ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{}/{} came after {}/{}/{}.", - RequestKey.Key.Namespace, + ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.", RequestKey.Key.Bucket, RequestKey.Key.Hash, - PreviousRecordKey->Key.Namespace, PreviousRecordKey->Key.Bucket, PreviousRecordKey->Key.Hash); return false; @@ -2022,7 +2055,8 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyReque } void -HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& RecordKeys, +HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespace, + std::vector<CacheKeyRequest>& RecordKeys, std::vector<cache::detail::RecordBody>& Records, std::vector<cache::detail::ChunkRequest*>& RecordRequests, std::vector<CacheChunkRequest*>& OutUpstreamChunks) @@ -2041,7 +2075,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal)) { ZenCacheValue CacheValue; - if (m_CacheStore.Get(RecordKey.Key.Namespace, RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue)) + if (m_CacheStore.Get(Namespace, RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue)) { Record.Exists = true; Record.CacheValue = std::move(CacheValue.Value); @@ -2058,7 +2092,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& if (!UpstreamRecordRequests.empty()) { - const auto OnCacheRecordGetComplete = [this, &RecordKeys, &Records](CacheRecordGetCompleteParams&& Params) { + const auto OnCacheRecordGetComplete = [this, Namespace, &RecordKeys, &Records](CacheRecordGetCompleteParams&& Params) { if (!Params.Record) { return; @@ -2076,10 +2110,10 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& if (EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal)) { - m_CacheStore.Put(Key.Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}); + m_CacheStore.Put(Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}); } }; - m_UpstreamCache.GetCacheRecords(UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); + m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); } std::vector<CacheChunkRequest*> UpstreamPayloadRequests; @@ -2163,7 +2197,8 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& } void -HttpStructuredCacheService::GetLocalCacheValues(std::vector<cache::detail::ChunkRequest*>& ValueRequests, +HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespace, + std::vector<cache::detail::ChunkRequest*>& ValueRequests, std::vector<CacheChunkRequest*>& OutUpstreamChunks) { using namespace cache::detail; @@ -2173,7 +2208,7 @@ HttpStructuredCacheService::GetLocalCacheValues(std::vector<cache::detail::Chunk if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) { ZenCacheValue CacheValue; - if (m_CacheStore.Get(Request->Key->Key.Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue)) + if (m_CacheStore.Get(Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue)) { if (IsCompressedBinary(CacheValue.Value.GetContentType())) { @@ -2207,7 +2242,8 @@ HttpStructuredCacheService::GetLocalCacheValues(std::vector<cache::detail::Chunk } void -HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest*>& UpstreamChunks, +HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Namespace, + std::vector<CacheChunkRequest*>& UpstreamChunks, std::vector<CacheChunkRequest>& RequestKeys, std::vector<cache::detail::ChunkRequest>& Requests) { @@ -2215,7 +2251,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest if (!UpstreamChunks.empty()) { - const auto OnCacheValueGetComplete = [this, &RequestKeys, &Requests](CacheValueGetCompleteParams&& Params) { + const auto OnCacheValueGetComplete = [this, Namespace, &RequestKeys, &Requests](CacheValueGetCompleteParams&& Params) { if (Params.RawHash == Params.RawHash.Zero) { return; @@ -2242,7 +2278,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest } else { - m_CacheStore.Put(Key.Key.Namespace, Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value}); + m_CacheStore.Put(Namespace, Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value}); } } if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) @@ -2259,12 +2295,13 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest m_CacheStats.UpstreamHitCount++; }; - m_UpstreamCache.GetCacheValues(UpstreamChunks, std::move(OnCacheValueGetComplete)); + m_UpstreamCache.GetCacheValues(Namespace, UpstreamChunks, std::move(OnCacheValueGetComplete)); } } void -HttpStructuredCacheService::WriteGetCacheChunksResponse(std::vector<cache::detail::ChunkRequest>& Requests, +HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespace, + std::vector<cache::detail::ChunkRequest>& Requests, zen::HttpServerRequest& HttpRequest) { using namespace cache::detail; @@ -2290,7 +2327,7 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::vector<cache::detai } ZEN_DEBUG("HIT - '{}/{}/{}/{}' {} '{}' ({})", - Request.Key->Key.Namespace, + Namespace, Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId, @@ -2301,19 +2338,11 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::vector<cache::detai } else if (!EnumHasAnyFlags(Request.DownstreamPolicy, CachePolicy::Query)) { - ZEN_DEBUG("SKIP - '{}/{}/{}/{}'", - Request.Key->Key.Namespace, - Request.Key->Key.Bucket, - Request.Key->Key.Hash, - Request.Key->ValueId); + ZEN_DEBUG("SKIP - '{}/{}/{}/{}'", Namespace, Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId); } else { - ZEN_DEBUG("MISS - '{}/{}/{}/{}'", - Request.Key->Key.Namespace, - Request.Key->Key.Bucket, - Request.Key->Key.Hash, - Request.Key->ValueId); + ZEN_DEBUG("MISS - '{}/{}/{}/{}'", Namespace, Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId); m_CacheStats.MissCount++; } } diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 5f248edd1..890a2ebab 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -118,7 +118,8 @@ private: PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */ - bool ParseGetCacheChunksRequest(std::vector<CacheKeyRequest>& RecordKeys, + bool ParseGetCacheChunksRequest(std::string& Namespace, + std::vector<CacheKeyRequest>& RecordKeys, std::vector<cache::detail::RecordBody>& Records, std::vector<CacheChunkRequest>& RequestKeys, std::vector<cache::detail::ChunkRequest>& Requests, @@ -126,18 +127,24 @@ private: std::vector<cache::detail::ChunkRequest*>& ValueRequests, CbObjectView RpcRequest); /** HandleRpcGetCacheChunks Helper: Load records to get ContentId for RecordRequests, and load their payloads if they exist locally. */ - void GetLocalCacheRecords(std::vector<CacheKeyRequest>& RecordKeys, + void GetLocalCacheRecords(std::string_view Namespace, + std::vector<CacheKeyRequest>& RecordKeys, std::vector<cache::detail::RecordBody>& Records, std::vector<cache::detail::ChunkRequest*>& RecordRequests, std::vector<CacheChunkRequest*>& OutUpstreamChunks); /** HandleRpcGetCacheChunks Helper: For ValueRequests, load their payloads if they exist locally. */ - void GetLocalCacheValues(std::vector<cache::detail::ChunkRequest*>& ValueRequests, std::vector<CacheChunkRequest*>& OutUpstreamChunks); + void GetLocalCacheValues(std::string_view Namespace, + std::vector<cache::detail::ChunkRequest*>& ValueRequests, + std::vector<CacheChunkRequest*>& OutUpstreamChunks); /** HandleRpcGetCacheChunks Helper: Load payloads from upstream that did not exist locally. */ - void GetUpstreamCacheChunks(std::vector<CacheChunkRequest*>& UpstreamChunks, + void GetUpstreamCacheChunks(std::string_view Namespace, + std::vector<CacheChunkRequest*>& UpstreamChunks, std::vector<CacheChunkRequest>& RequestKeys, std::vector<cache::detail::ChunkRequest>& Requests); /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */ - void WriteGetCacheChunksResponse(std::vector<cache::detail::ChunkRequest>& Requests, zen::HttpServerRequest& HttpRequest); + void WriteGetCacheChunksResponse(std::string_view Namespace, + std::vector<cache::detail::ChunkRequest>& Requests, + zen::HttpServerRequest& HttpRequest); spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 52513abe9..98b4439c7 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -182,7 +182,7 @@ namespace detail { virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); } - virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheRecord"); @@ -191,11 +191,11 @@ namespace detail { CloudCacheSession Session(m_Client); CloudCacheResult Result; - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); if (m_UseLegacyDdc && Type == ZenContentType::kBinary) { - std::string_view DdcNamespace = GetActualDdcNamespace(Session, CacheKey.Namespace); + std::string_view DdcNamespace = GetActualDdcNamespace(Session, Namespace); Result = Session.GetDerivedData(DdcNamespace, CacheKey.Bucket, CacheKey.Hash); } else if (Type == ZenContentType::kCompressedBinary) @@ -299,7 +299,9 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) override + virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace, + std::span<CacheKeyRequest*> Requests, + OnCacheRecordGetComplete&& OnComplete) override { ZEN_TRACE_CPU("Upstream::Horde::GetCacheRecords"); @@ -314,7 +316,7 @@ namespace detail { if (!Result.Error) { - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); CloudCacheResult RefResult = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); AppendResult(RefResult, Result); @@ -351,14 +353,14 @@ namespace detail { return Result; } - virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override + virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, const CacheKey&, const IoHash& ValueContentId) override { ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheValue"); try { CloudCacheSession Session(m_Client); - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); const CloudCacheResult Result = Session.GetCompressedBlob(BlobStoreNamespace, ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -383,7 +385,8 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, + virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::Horde::GetCacheValues"); @@ -399,7 +402,7 @@ namespace detail { CompressedBuffer Compressed; if (!Result.Error) { - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Request.Key.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); const CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, Request.ChunkId); Payload = BlobResult.Response; @@ -446,7 +449,7 @@ namespace detail { CloudCacheResult Result; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheRecord.Key.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheRecord.Namespace); if (m_UseLegacyDdc) { Result = Session.PutDerivedData(BlobStoreNamespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue); @@ -484,6 +487,7 @@ namespace detail { return PerformStructuredPut( Session, + CacheRecord.Namespace, CacheRecord.Key, ReferencingObject.Save().GetBuffer().AsIoBuffer(), MaxAttempts, @@ -503,6 +507,7 @@ namespace detail { { return PerformStructuredPut( Session, + CacheRecord.Namespace, CacheRecord.Key, RecordValue, MaxAttempts, @@ -548,6 +553,7 @@ namespace detail { PutUpstreamCacheResult PerformStructuredPut( CloudCacheSession& Session, + std::string_view Namespace, const CacheKey& Key, IoBuffer ObjectBuffer, const int32_t MaxAttempts, @@ -556,7 +562,7 @@ namespace detail { int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Key.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool { for (const IoHash& ValueContentId : ValueContentIds) { @@ -738,14 +744,14 @@ namespace detail { virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); } - virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheRecord"); try { ZenStructuredCacheSession Session(*m_Client); - const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type); + const ZenCacheResult Result = Session.GetCacheRecord(Namespace, CacheKey.Bucket, CacheKey.Hash, Type); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -769,20 +775,24 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) override + virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace, + std::span<CacheKeyRequest*> Requests, + OnCacheRecordGetComplete&& OnComplete) override { ZEN_TRACE_CPU("Upstream::Zen::GetCacheRecords"); ZEN_ASSERT(Requests.size() > 0); CbObjectWriter BatchRequest; BatchRequest << "Method"sv - << "GetCacheRecords"; + << "GetCacheRecords"sv; BatchRequest.BeginObject("Params"sv); { CachePolicy DefaultPolicy = Requests[0]->Policy.GetRecordPolicy(); BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy); + BatchRequest << "Namespace"sv << Namespace; + BatchRequest.BeginArray("Requests"sv); for (CacheKeyRequest* Request : Requests) { @@ -791,7 +801,6 @@ namespace detail { const CacheKey& Key = Request->Key; BatchRequest.BeginObject("Key"sv); { - BatchRequest << "Namespace"sv << Key.Namespace; BatchRequest << "Bucket"sv << Key.Bucket; BatchRequest << "Hash"sv << Key.Hash; } @@ -848,14 +857,16 @@ namespace detail { return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } - virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override + virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, + const CacheKey& CacheKey, + const IoHash& ValueContentId) override { ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheValue"); try { ZenStructuredCacheSession Session(*m_Client); - const ZenCacheResult Result = Session.GetCacheValue(CacheKey.Bucket, CacheKey.Hash, ValueContentId); + const ZenCacheResult Result = Session.GetCacheValue(Namespace, CacheKey.Bucket, CacheKey.Hash, ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -879,7 +890,8 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, + virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::Zen::GetCacheValues"); @@ -887,12 +899,16 @@ namespace detail { CbObjectWriter BatchRequest; BatchRequest << "Method"sv - << "GetCacheChunks"; + << "GetCacheChunks"sv; + BatchRequest << "Namespace"sv << Namespace; BatchRequest.BeginObject("Params"sv); { CachePolicy DefaultPolicy = CacheChunkRequests[0]->Policy; BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy).ToView(); + + BatchRequest << "Namespace"sv << Namespace; + BatchRequest.BeginArray("ChunkRequests"sv); { for (CacheChunkRequest* RequestPtr : CacheChunkRequests) @@ -902,7 +918,6 @@ namespace detail { BatchRequest.BeginObject(); { BatchRequest.BeginObject("Key"sv); - BatchRequest << "Namespace"sv << Request.Key.Namespace; BatchRequest << "Bucket"sv << Request.Key.Bucket; BatchRequest << "Hash"sv << Request.Key.Hash; BatchRequest.EndObject(); @@ -1042,7 +1057,11 @@ namespace detail { for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - Result = Session.PutCacheRecord(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, PackagePayload, CacheRecord.Type); + Result = Session.PutCacheRecord(CacheRecord.Namespace, + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, + PackagePayload, + CacheRecord.Type); } m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -1061,12 +1080,14 @@ namespace detail { CbPackage BatchPackage; CbObjectWriter BatchWriter; BatchWriter << "Method"sv - << "PutCacheValues"; + << "PutCacheValues"sv; BatchWriter.BeginObject("Params"sv); { // DefaultPolicy unspecified and expected to be Default + BatchWriter << "Namespace"sv << CacheRecord.Namespace; + BatchWriter.BeginArray("Requests"sv); { BatchWriter.BeginObject(); @@ -1074,7 +1095,6 @@ namespace detail { const CacheKey& Key = CacheRecord.Key; BatchWriter.BeginObject("Key"sv); { - BatchWriter << "Namespace"sv << Key.Namespace; BatchWriter << "Bucket"sv << Key.Bucket; BatchWriter << "Hash"sv << Key.Hash; } @@ -1108,7 +1128,8 @@ namespace detail { Result.Success = false; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - Result = Session.PutCacheValue(CacheRecord.Key.Bucket, + Result = Session.PutCacheValue(CacheRecord.Namespace, + CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheRecord.ValueContentIds[Idx], Values[Idx]); @@ -1131,7 +1152,11 @@ namespace detail { Result.Success = false; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - Result = Session.PutCacheRecord(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, CacheRecord.Type); + Result = Session.PutCacheRecord(CacheRecord.Namespace, + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, + RecordValue, + CacheRecord.Type); } m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -1259,7 +1284,7 @@ public: } } - virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::GetCacheRecord"); @@ -1278,7 +1303,7 @@ public: GetUpstreamCacheResult Result; { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCacheRecord(CacheKey, Type); + Result = Endpoint->GetCacheRecord(Namespace, CacheKey, Type); } Stats.CacheGetCount.Increment(1); @@ -1306,7 +1331,9 @@ public: return {}; } - virtual void GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) override final + virtual void GetCacheRecords(std::string_view Namespace, + std::span<CacheKeyRequest*> Requests, + OnCacheRecordGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::GetCacheRecords"); @@ -1334,7 +1361,7 @@ public: { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCacheRecords(RemainingKeys, [&](CacheRecordGetCompleteParams&& Params) { + Result = Endpoint->GetCacheRecords(Namespace, RemainingKeys, [&](CacheRecordGetCompleteParams&& Params) { if (Params.Record) { OnComplete(std::forward<CacheRecordGetCompleteParams>(Params)); @@ -1371,7 +1398,9 @@ public: } } - virtual void GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) override final + virtual void GetCacheValues(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::GetCacheValues"); @@ -1399,7 +1428,7 @@ public: { metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming); - Result = Endpoint->GetCacheValues(RemainingKeys, [&](CacheValueGetCompleteParams&& Params) { + Result = Endpoint->GetCacheValues(Namespace, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) { if (Params.RawHash != Params.RawHash.Zero) { OnComplete(std::forward<CacheValueGetCompleteParams>(Params)); @@ -1436,7 +1465,9 @@ public: } } - virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override + virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, + const CacheKey& CacheKey, + const IoHash& ValueContentId) override { ZEN_TRACE_CPU("Upstream::GetCacheValue"); @@ -1454,7 +1485,7 @@ public: { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCacheValue(CacheKey, ValueContentId); + Result = Endpoint->GetCacheValue(Namespace, CacheKey, ValueContentId); } Stats.CacheGetCount.Increment(1); @@ -1550,7 +1581,7 @@ private: ZenCacheValue CacheValue; std::vector<IoBuffer> Payloads; - if (!m_CacheStore.Get(CacheRecord.Key.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) + if (!m_CacheStore.Get(CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) { ZEN_WARN("process upstream FAILED, '{}/{}', cache record doesn't exist", CacheRecord.Key.Bucket, CacheRecord.Key.Hash); return; @@ -1565,7 +1596,7 @@ private: else { ZEN_WARN("process upstream FAILED, '{}/{}/{}/{}', ValueContentId doesn't exist in CAS", - CacheRecord.Key.Namespace, + CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, ValueContentId); diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 6f18b3119..13548efc8 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -32,6 +32,7 @@ struct ZenStructuredCacheClientOptions; struct UpstreamCacheRecord { ZenContentType Type = ZenContentType::kBinary; + std::string Namespace; CacheKey Key; std::vector<IoHash> ValueContentIds; }; @@ -163,12 +164,15 @@ public: virtual UpstreamEndpointState GetState() = 0; virtual UpstreamEndpointStatus GetStatus() = 0; - virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0; - virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) = 0; + virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0; + virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace, + std::span<CacheKeyRequest*> Requests, + OnCacheRecordGetComplete&& OnComplete) = 0; - virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& PayloadId) = 0; - virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, - OnCacheValueGetComplete&& OnComplete) = 0; + virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, const CacheKey& CacheKey, const IoHash& PayloadId) = 0; + virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheValueGetComplete&& OnComplete) = 0; virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, @@ -196,11 +200,15 @@ public: virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0; virtual void IterateEndpoints(std::function<bool(UpstreamEndpoint&)>&& Fn) = 0; - virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0; - virtual void GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) = 0; + virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0; + virtual void GetCacheRecords(std::string_view Namespace, + std::span<CacheKeyRequest*> Requests, + OnCacheRecordGetComplete&& OnComplete) = 0; - virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) = 0; - virtual void GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) = 0; + virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, const CacheKey& CacheKey, const IoHash& ValueContentId) = 0; + virtual void GetCacheValues(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheValueGetComplete&& OnComplete) = 0; virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index 1ac4afe5c..efc75b5b4 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -408,10 +408,15 @@ ZenStructuredCacheSession::CheckHealth() } ZenCacheResult -ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type) +ZenStructuredCacheSession::GetCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType Type) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString(); + Uri << m_Client.ServiceUrl() << "/z$/"; + if (Namespace != ZenCacheStore::DefaultNamespace) + { + Uri << Namespace << "/"; + } + Uri << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); @@ -432,10 +437,18 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas } ZenCacheResult -ZenStructuredCacheSession::GetCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId) +ZenStructuredCacheSession::GetCacheValue(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + const IoHash& ValueContentId) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); + Uri << m_Client.ServiceUrl() << "/z$/"; + if (Namespace != ZenCacheStore::DefaultNamespace) + { + Uri << Namespace << "/"; + } + Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); @@ -457,10 +470,19 @@ ZenStructuredCacheSession::GetCacheValue(std::string_view BucketId, const IoHash } ZenCacheResult -ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type) +ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + IoBuffer Value, + ZenContentType Type) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString(); + Uri << m_Client.ServiceUrl() << "/z$/"; + if (Namespace != ZenCacheStore::DefaultNamespace) + { + Uri << Namespace << "/"; + } + Uri << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); @@ -485,10 +507,19 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas } ZenCacheResult -ZenStructuredCacheSession::PutCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId, IoBuffer Payload) +ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + const IoHash& ValueContentId, + IoBuffer Payload) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); + Uri << m_Client.ServiceUrl() << "/z$/"; + if (Namespace != ZenCacheStore::DefaultNamespace) + { + Uri << Namespace << "/"; + } + Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index f70d9d06f..e8590f940 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -128,10 +128,18 @@ public: ~ZenStructuredCacheSession(); ZenCacheResult CheckHealth(); - ZenCacheResult GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type); - ZenCacheResult GetCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId); - ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type); - ZenCacheResult PutCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId, IoBuffer Payload); + ZenCacheResult GetCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType Type); + ZenCacheResult GetCacheValue(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId); + ZenCacheResult PutCacheRecord(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + IoBuffer Value, + ZenContentType Type); + ZenCacheResult PutCacheValue(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + const IoHash& ValueContentId, + IoBuffer Payload); ZenCacheResult InvokeRpc(const CbObjectView& Request); ZenCacheResult InvokeRpc(const CbPackage& Package); |