diff options
| author | Dan Engelbrecht <[email protected]> | 2022-05-19 11:37:25 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-05-19 11:37:25 +0200 |
| commit | a9130d34b5318b0da5d3547c432a8734213fbe9b (patch) | |
| tree | 2cdb96f85e221cc24227b410d4d5f8f4e4af7a41 /zenserver/upstream/upstreamcache.cpp | |
| parent | Merge pull request #98 from EpicGames/de/fix-bucket-name-rules (diff) | |
| download | zen-a9130d34b5318b0da5d3547c432a8734213fbe9b.tar.xz zen-a9130d34b5318b0da5d3547c432a8734213fbe9b.zip | |
Keep Namespace out of CacheKey and store it on request level
RPC requests now has a Namespace field under Params instead of one Namespace per cache key
Fall back to legacy upstream HTTP URI format if default namespace is requested
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 103 |
1 files changed, 67 insertions, 36 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 52513abe9..98b4439c7 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -182,7 +182,7 @@ namespace detail { virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); } - virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheRecord"); @@ -191,11 +191,11 @@ namespace detail { CloudCacheSession Session(m_Client); CloudCacheResult Result; - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); if (m_UseLegacyDdc && Type == ZenContentType::kBinary) { - std::string_view DdcNamespace = GetActualDdcNamespace(Session, CacheKey.Namespace); + std::string_view DdcNamespace = GetActualDdcNamespace(Session, Namespace); Result = Session.GetDerivedData(DdcNamespace, CacheKey.Bucket, CacheKey.Hash); } else if (Type == ZenContentType::kCompressedBinary) @@ -299,7 +299,9 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) override + virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace, + std::span<CacheKeyRequest*> Requests, + OnCacheRecordGetComplete&& OnComplete) override { ZEN_TRACE_CPU("Upstream::Horde::GetCacheRecords"); @@ -314,7 +316,7 @@ namespace detail { if (!Result.Error) { - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); CloudCacheResult RefResult = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); AppendResult(RefResult, Result); @@ -351,14 +353,14 @@ namespace detail { return Result; } - virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override + virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, const CacheKey&, const IoHash& ValueContentId) override { ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheValue"); try { CloudCacheSession Session(m_Client); - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheKey.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); const CloudCacheResult Result = Session.GetCompressedBlob(BlobStoreNamespace, ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -383,7 +385,8 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, + virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::Horde::GetCacheValues"); @@ -399,7 +402,7 @@ namespace detail { CompressedBuffer Compressed; if (!Result.Error) { - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Request.Key.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); const CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, Request.ChunkId); Payload = BlobResult.Response; @@ -446,7 +449,7 @@ namespace detail { CloudCacheResult Result; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheRecord.Key.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheRecord.Namespace); if (m_UseLegacyDdc) { Result = Session.PutDerivedData(BlobStoreNamespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue); @@ -484,6 +487,7 @@ namespace detail { return PerformStructuredPut( Session, + CacheRecord.Namespace, CacheRecord.Key, ReferencingObject.Save().GetBuffer().AsIoBuffer(), MaxAttempts, @@ -503,6 +507,7 @@ namespace detail { { return PerformStructuredPut( Session, + CacheRecord.Namespace, CacheRecord.Key, RecordValue, MaxAttempts, @@ -548,6 +553,7 @@ namespace detail { PutUpstreamCacheResult PerformStructuredPut( CloudCacheSession& Session, + std::string_view Namespace, const CacheKey& Key, IoBuffer ObjectBuffer, const int32_t MaxAttempts, @@ -556,7 +562,7 @@ namespace detail { int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Key.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool { for (const IoHash& ValueContentId : ValueContentIds) { @@ -738,14 +744,14 @@ namespace detail { virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); } - virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheRecord"); try { ZenStructuredCacheSession Session(*m_Client); - const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type); + const ZenCacheResult Result = Session.GetCacheRecord(Namespace, CacheKey.Bucket, CacheKey.Hash, Type); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -769,20 +775,24 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) override + virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace, + std::span<CacheKeyRequest*> Requests, + OnCacheRecordGetComplete&& OnComplete) override { ZEN_TRACE_CPU("Upstream::Zen::GetCacheRecords"); ZEN_ASSERT(Requests.size() > 0); CbObjectWriter BatchRequest; BatchRequest << "Method"sv - << "GetCacheRecords"; + << "GetCacheRecords"sv; BatchRequest.BeginObject("Params"sv); { CachePolicy DefaultPolicy = Requests[0]->Policy.GetRecordPolicy(); BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy); + BatchRequest << "Namespace"sv << Namespace; + BatchRequest.BeginArray("Requests"sv); for (CacheKeyRequest* Request : Requests) { @@ -791,7 +801,6 @@ namespace detail { const CacheKey& Key = Request->Key; BatchRequest.BeginObject("Key"sv); { - BatchRequest << "Namespace"sv << Key.Namespace; BatchRequest << "Bucket"sv << Key.Bucket; BatchRequest << "Hash"sv << Key.Hash; } @@ -848,14 +857,16 @@ namespace detail { return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } - virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override + virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, + const CacheKey& CacheKey, + const IoHash& ValueContentId) override { ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheValue"); try { ZenStructuredCacheSession Session(*m_Client); - const ZenCacheResult Result = Session.GetCacheValue(CacheKey.Bucket, CacheKey.Hash, ValueContentId); + const ZenCacheResult Result = Session.GetCacheValue(Namespace, CacheKey.Bucket, CacheKey.Hash, ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -879,7 +890,8 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, + virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::Zen::GetCacheValues"); @@ -887,12 +899,16 @@ namespace detail { CbObjectWriter BatchRequest; BatchRequest << "Method"sv - << "GetCacheChunks"; + << "GetCacheChunks"sv; + BatchRequest << "Namespace"sv << Namespace; BatchRequest.BeginObject("Params"sv); { CachePolicy DefaultPolicy = CacheChunkRequests[0]->Policy; BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy).ToView(); + + BatchRequest << "Namespace"sv << Namespace; + BatchRequest.BeginArray("ChunkRequests"sv); { for (CacheChunkRequest* RequestPtr : CacheChunkRequests) @@ -902,7 +918,6 @@ namespace detail { BatchRequest.BeginObject(); { BatchRequest.BeginObject("Key"sv); - BatchRequest << "Namespace"sv << Request.Key.Namespace; BatchRequest << "Bucket"sv << Request.Key.Bucket; BatchRequest << "Hash"sv << Request.Key.Hash; BatchRequest.EndObject(); @@ -1042,7 +1057,11 @@ namespace detail { for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - Result = Session.PutCacheRecord(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, PackagePayload, CacheRecord.Type); + Result = Session.PutCacheRecord(CacheRecord.Namespace, + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, + PackagePayload, + CacheRecord.Type); } m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -1061,12 +1080,14 @@ namespace detail { CbPackage BatchPackage; CbObjectWriter BatchWriter; BatchWriter << "Method"sv - << "PutCacheValues"; + << "PutCacheValues"sv; BatchWriter.BeginObject("Params"sv); { // DefaultPolicy unspecified and expected to be Default + BatchWriter << "Namespace"sv << CacheRecord.Namespace; + BatchWriter.BeginArray("Requests"sv); { BatchWriter.BeginObject(); @@ -1074,7 +1095,6 @@ namespace detail { const CacheKey& Key = CacheRecord.Key; BatchWriter.BeginObject("Key"sv); { - BatchWriter << "Namespace"sv << Key.Namespace; BatchWriter << "Bucket"sv << Key.Bucket; BatchWriter << "Hash"sv << Key.Hash; } @@ -1108,7 +1128,8 @@ namespace detail { Result.Success = false; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - Result = Session.PutCacheValue(CacheRecord.Key.Bucket, + Result = Session.PutCacheValue(CacheRecord.Namespace, + CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheRecord.ValueContentIds[Idx], Values[Idx]); @@ -1131,7 +1152,11 @@ namespace detail { Result.Success = false; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - Result = Session.PutCacheRecord(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, CacheRecord.Type); + Result = Session.PutCacheRecord(CacheRecord.Namespace, + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, + RecordValue, + CacheRecord.Type); } m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -1259,7 +1284,7 @@ public: } } - virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::GetCacheRecord"); @@ -1278,7 +1303,7 @@ public: GetUpstreamCacheResult Result; { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCacheRecord(CacheKey, Type); + Result = Endpoint->GetCacheRecord(Namespace, CacheKey, Type); } Stats.CacheGetCount.Increment(1); @@ -1306,7 +1331,9 @@ public: return {}; } - virtual void GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) override final + virtual void GetCacheRecords(std::string_view Namespace, + std::span<CacheKeyRequest*> Requests, + OnCacheRecordGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::GetCacheRecords"); @@ -1334,7 +1361,7 @@ public: { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCacheRecords(RemainingKeys, [&](CacheRecordGetCompleteParams&& Params) { + Result = Endpoint->GetCacheRecords(Namespace, RemainingKeys, [&](CacheRecordGetCompleteParams&& Params) { if (Params.Record) { OnComplete(std::forward<CacheRecordGetCompleteParams>(Params)); @@ -1371,7 +1398,9 @@ public: } } - virtual void GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) override final + virtual void GetCacheValues(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::GetCacheValues"); @@ -1399,7 +1428,7 @@ public: { metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming); - Result = Endpoint->GetCacheValues(RemainingKeys, [&](CacheValueGetCompleteParams&& Params) { + Result = Endpoint->GetCacheValues(Namespace, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) { if (Params.RawHash != Params.RawHash.Zero) { OnComplete(std::forward<CacheValueGetCompleteParams>(Params)); @@ -1436,7 +1465,9 @@ public: } } - virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override + virtual GetUpstreamCacheResult GetCacheValue(std::string_view Namespace, + const CacheKey& CacheKey, + const IoHash& ValueContentId) override { ZEN_TRACE_CPU("Upstream::GetCacheValue"); @@ -1454,7 +1485,7 @@ public: { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCacheValue(CacheKey, ValueContentId); + Result = Endpoint->GetCacheValue(Namespace, CacheKey, ValueContentId); } Stats.CacheGetCount.Increment(1); @@ -1550,7 +1581,7 @@ private: ZenCacheValue CacheValue; std::vector<IoBuffer> Payloads; - if (!m_CacheStore.Get(CacheRecord.Key.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) + if (!m_CacheStore.Get(CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) { ZEN_WARN("process upstream FAILED, '{}/{}', cache record doesn't exist", CacheRecord.Key.Bucket, CacheRecord.Key.Hash); return; @@ -1565,7 +1596,7 @@ private: else { ZEN_WARN("process upstream FAILED, '{}/{}/{}/{}', ValueContentId doesn't exist in CAS", - CacheRecord.Key.Namespace, + CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, ValueContentId); |