From bd85a74a9d15fd676a6677fbd4d5ab4e3dcb0d42 Mon Sep 17 00:00:00 2001 From: mattpetersepic <58296718+mattpetersepic@users.noreply.github.com> Date: Tue, 25 Jan 2022 06:57:47 -0700 Subject: Cachepolicy (#36) * Copy CachePolicy implementation from UE5/Release-5.0. Add backwards compatability for clients and upstreams that are using the old protocol. * Add RefPtr templated move operator and constructor, so that RefPtr A = std::move(RefPtr()) will do a move. * Fix broken CachePolicy tests and add tests for new Save/Load. * Remove TODO comments * CachePolicy Save/Load Fixes from codereview * Fix comment to match code change. * Remove backwards compatibility for CachePolicy change. Convert policy string tokens to PascalCase. Fix tests for new policy text. Change ParseCachePolicy to assert string is non-empty and always succeed. * Fix release build: use ZEN_WITH_TESTS define --- zenserver/upstream/upstreamcache.cpp | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 3d6641a4f..b0343a61b 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -378,7 +378,7 @@ namespace detail { if (It == std::end(CacheRecord.PayloadIds)) { - OutReason = fmt::format("payload '{}' MISSING from local cache", PayloadId); + OutReason = fmt::format("value '{}' MISSING from local cache", PayloadId); return false; } @@ -394,7 +394,7 @@ namespace detail { if (!BlobResult.Success) { - OutReason = fmt::format("upload payload '{}' FAILED, reason '{}'", PayloadId, BlobResult.Reason); + OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", PayloadId, BlobResult.Reason); return false; } @@ -473,7 +473,7 @@ namespace detail { Sb << MissingHash.ToHexString() << ","; } - return {.Reason = fmt::format("finalize '{}/{}' FAILED, still needs payload(s) '{}'", + return {.Reason = fmt::format("finalize '{}/{}' FAILED, still needs value(s) '{}'", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, Sb.ToString()), @@ -646,9 +646,8 @@ namespace detail { } BatchRequest.EndArray(); - BatchRequest.BeginObject("Policy"sv); - CacheRecordPolicy::Save(Policy, BatchRequest); - BatchRequest.EndObject(); + BatchRequest.SetName("Policy"sv); + Policy.Save(BatchRequest); } BatchRequest.EndObject(); @@ -726,7 +725,7 @@ namespace detail { CbObjectWriter BatchRequest; BatchRequest << "Method"sv - << "GetCachePayloads"; + << "GetCacheValues"; BatchRequest.BeginObject("Params"sv); { @@ -743,12 +742,11 @@ namespace detail { BatchRequest << "Bucket"sv << Request.Key.Bucket; BatchRequest << "Hash"sv << Request.Key.Hash; BatchRequest.EndObject(); - - BatchRequest.AddObjectId("PayloadId"sv, Request.PayloadId); + BatchRequest.AddObjectId("ValueId"sv, Request.PayloadId); BatchRequest << "ChunkId"sv << Request.ChunkId; BatchRequest << "RawOffset"sv << Request.RawOffset; BatchRequest << "RawSize"sv << Request.RawSize; - BatchRequest << "Policy"sv << static_cast(Request.Policy); + BatchRequest << "Policy"sv << WriteToString<128>(Request.Policy).ToView(); } BatchRequest.EndObject(); } @@ -828,7 +826,7 @@ namespace detail { } else { - return {.Reason = std::string("invalid payload buffer"), .Success = false}; + return {.Reason = std::string("invalid value buffer"), .Success = false}; } } -- cgit v1.2.3 From 5291894278e160b9200b6ce261c6ab5437e45ccc Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Wed, 26 Jan 2022 09:00:16 +0100 Subject: Fixed issue with missing endpoint name when configuring upstream cache from Lua. --- zenserver/upstream/upstreamcache.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index b0343a61b..8b02a437a 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -535,10 +535,12 @@ namespace detail { public: ZenUpstreamEndpoint(const ZenStructuredCacheClientOptions& Options) : m_Log(zen::logging::Get("upstream")) - , m_Info({.Name = std::string(Options.Name)}) , m_ConnectTimeout(Options.ConnectTimeout) , m_Timeout(Options.Timeout) { + ZEN_ASSERT(!Options.Name.empty()); + m_Info.Name = Options.Name; + for (const auto& Url : Options.Urls) { m_Endpoints.push_back({.Url = Url}); -- cgit v1.2.3 From 09f2ac4d9aaea0107af8fbd6a41c2d3fe3e450ba Mon Sep 17 00:00:00 2001 From: mattpetersepic <58296718+mattpetersepic@users.noreply.github.com> Date: Wed, 26 Jan 2022 18:45:04 -0700 Subject: Implement SkipData,QueryLocal,StoreLocal for HandleRpcGetCacheRecords (#41) * Implement SkipData,QueryLocal,StoreLocal for HandleRpcGetCacheRecords. --- zenserver/upstream/upstreamcache.cpp | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 8b02a437a..091406db3 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -1046,7 +1046,7 @@ public: virtual void GetCacheRecords(std::span CacheKeys, std::span KeyIndex, - const CacheRecordPolicy& Policy, + const CacheRecordPolicy& DownstreamPolicy, OnCacheRecordGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::GetCacheRecords"); @@ -1057,6 +1057,8 @@ public: if (m_Options.ReadUpstream) { + CacheRecordPolicy UpstreamPolicy = DownstreamPolicy.ConvertToUpstream(); + for (auto& Endpoint : m_Endpoints) { if (RemainingKeys.empty()) @@ -1075,18 +1077,19 @@ public: { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCacheRecords(CacheKeys, RemainingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) { - if (Params.Record) - { - OnComplete(std::forward(Params)); + Result = + Endpoint->GetCacheRecords(CacheKeys, RemainingKeys, UpstreamPolicy, [&](CacheRecordGetCompleteParams&& Params) { + if (Params.Record) + { + OnComplete(std::forward(Params)); - Stats.CacheHitCount.Increment(1); - } - else - { - Missing.push_back(Params.KeyIndex); - } - }); + Stats.CacheHitCount.Increment(1); + } + else + { + Missing.push_back(Params.KeyIndex); + } + }); } Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size())); -- cgit v1.2.3 From fb1afc87a436ff6374daeaef5f7682f098a85cd7 Mon Sep 17 00:00:00 2001 From: mattpetersepic <58296718+mattpetersepic@users.noreply.github.com> Date: Thu, 27 Jan 2022 10:52:07 -0700 Subject: Rename Paylod to Value to match the client side. Rename PayloadId to ValueContentId where its a hash instead of an oid. --- zenserver/upstream/upstreamcache.cpp | 111 ++++++++++++++++++----------------- 1 file changed, 56 insertions(+), 55 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 091406db3..657cfb729 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -270,14 +270,14 @@ namespace detail { return Result; } - virtual GetUpstreamCacheResult GetCachePayload(const CacheKey&, const IoHash& PayloadId) override + virtual GetUpstreamCacheResult GetCacheValue(const CacheKey&, const IoHash& ValueContentId) override { - ZEN_TRACE_CPU("Upstream::Horde::GetSingleCachePayload"); + ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheValue"); try { CloudCacheSession Session(m_Client); - const CloudCacheResult Result = Session.GetCompressedBlob(PayloadId); + const CloudCacheResult Result = Session.GetCompressedBlob(ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -301,11 +301,11 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCachePayloads(std::span CacheChunkRequests, - std::span RequestIndex, - OnCachePayloadGetComplete&& OnComplete) override final + virtual GetUpstreamCacheResult GetCacheValues(std::span CacheChunkRequests, + std::span RequestIndex, + OnCacheValueGetComplete&& OnComplete) override final { - ZEN_TRACE_CPU("Upstream::Horde::GetCachePayloads"); + ZEN_TRACE_CPU("Upstream::Horde::GetCacheValues"); CloudCacheSession Session(m_Client); GetUpstreamCacheResult Result; @@ -325,7 +325,7 @@ namespace detail { m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); } - OnComplete({.Request = Request, .RequestIndex = Index, .Payload = Payload}); + OnComplete({.Request = Request, .RequestIndex = Index, .Value = Payload}); } return Result; @@ -333,11 +333,11 @@ namespace detail { virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, - std::span Payloads) override + std::span Values) override { ZEN_TRACE_CPU("Upstream::Horde::PutCacheRecord"); - ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); + ZEN_ASSERT(CacheRecord.ValueContentIds.size() == Values.size()); const int32_t MaxAttempts = 3; try @@ -371,30 +371,31 @@ namespace detail { int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; - const auto PutBlobs = [&](std::span PayloadIds, std::string& OutReason) -> bool { - for (const IoHash& PayloadId : PayloadIds) + const auto PutBlobs = [&](std::span ValueContentIds, std::string& OutReason) -> bool { + for (const IoHash& ValueContentId : ValueContentIds) { - const auto It = std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), PayloadId); + const auto It = + std::find(std::begin(CacheRecord.ValueContentIds), std::end(CacheRecord.ValueContentIds), ValueContentId); - if (It == std::end(CacheRecord.PayloadIds)) + if (It == std::end(CacheRecord.ValueContentIds)) { - OutReason = fmt::format("value '{}' MISSING from local cache", PayloadId); + OutReason = fmt::format("value '{}' MISSING from local cache", ValueContentId); return false; } - const size_t Idx = std::distance(std::begin(CacheRecord.PayloadIds), It); + const size_t Idx = std::distance(std::begin(CacheRecord.ValueContentIds), It); CloudCacheResult BlobResult; for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) { - BlobResult = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); + BlobResult = Session.PutCompressedBlob(CacheRecord.ValueContentIds[Idx], Values[Idx]); } m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); if (!BlobResult.Success) { - OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", PayloadId, BlobResult.Reason); + OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", ValueContentId, BlobResult.Reason); return false; } @@ -685,14 +686,14 @@ namespace detail { return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } - virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override + virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override { - ZEN_TRACE_CPU("Upstream::Zen::GetSingleCachePayload"); + ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheValue"); try { ZenStructuredCacheSession Session(*m_Client); - const ZenCacheResult Result = Session.GetCachePayload(CacheKey.Bucket, CacheKey.Hash, PayloadId); + const ZenCacheResult Result = Session.GetCacheValue(CacheKey.Bucket, CacheKey.Hash, ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -716,11 +717,11 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCachePayloads(std::span CacheChunkRequests, - std::span RequestIndex, - OnCachePayloadGetComplete&& OnComplete) override final + virtual GetUpstreamCacheResult GetCacheValues(std::span CacheChunkRequests, + std::span RequestIndex, + OnCacheValueGetComplete&& OnComplete) override final { - ZEN_TRACE_CPU("Upstream::Zen::GetCachePayloads"); + ZEN_TRACE_CPU("Upstream::Zen::GetCacheValues"); std::vector IndexMap; IndexMap.reserve(RequestIndex.size()); @@ -744,7 +745,7 @@ namespace detail { BatchRequest << "Bucket"sv << Request.Key.Bucket; BatchRequest << "Hash"sv << Request.Key.Hash; BatchRequest.EndObject(); - BatchRequest.AddObjectId("ValueId"sv, Request.PayloadId); + BatchRequest.AddObjectId("ValueId"sv, Request.ValueId); BatchRequest << "ChunkId"sv << Request.ChunkId; BatchRequest << "RawOffset"sv << Request.RawOffset; BatchRequest << "RawSize"sv << Request.RawSize; @@ -784,7 +785,7 @@ namespace detail { } } - OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = std::move(Payload)}); + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = std::move(Payload)}); } return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; @@ -793,7 +794,7 @@ namespace detail { for (size_t Index : RequestIndex) { - OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = IoBuffer()}); } return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; @@ -801,11 +802,11 @@ namespace detail { virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, - std::span Payloads) override + std::span Values) override { ZEN_TRACE_CPU("Upstream::Zen::PutCacheRecord"); - ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); + ZEN_ASSERT(CacheRecord.ValueContentIds.size() == Values.size()); const int32_t MaxAttempts = 3; try @@ -820,9 +821,9 @@ namespace detail { CbPackage Package; Package.SetObject(CbObject(SharedBuffer(RecordValue))); - for (const IoBuffer& Payload : Payloads) + for (const IoBuffer& Value : Values) { - if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Payload))) + if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Value))) { Package.AddAttachment(CbAttachment(AttachmentBuffer)); } @@ -848,15 +849,15 @@ namespace detail { } else { - for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) + for (size_t Idx = 0, Count = Values.size(); Idx < Count; Idx++) { Result.Success = false; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - Result = Session.PutCachePayload(CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - CacheRecord.PayloadIds[Idx], - Payloads[Idx]); + Result = Session.PutCacheValue(CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, + CacheRecord.ValueContentIds[Idx], + Values[Idx]); } m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -866,7 +867,7 @@ namespace detail { if (!Result.Success) { - return {.Reason = "Failed to upload payload", + return {.Reason = "Failed to upload value", .Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = false}; @@ -1115,11 +1116,11 @@ public: } } - virtual void GetCachePayloads(std::span CacheChunkRequests, - std::span RequestIndex, - OnCachePayloadGetComplete&& OnComplete) override final + virtual void GetCacheValues(std::span CacheChunkRequests, + std::span RequestIndex, + OnCacheValueGetComplete&& OnComplete) override final { - ZEN_TRACE_CPU("Upstream::GetCachePayloads"); + ZEN_TRACE_CPU("Upstream::GetCacheValues"); std::shared_lock _(m_EndpointsMutex); @@ -1145,10 +1146,10 @@ public: { metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming); - Result = Endpoint->GetCachePayloads(CacheChunkRequests, RemainingKeys, [&](CachePayloadGetCompleteParams&& Params) { - if (Params.Payload) + Result = Endpoint->GetCacheValues(CacheChunkRequests, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) { + if (Params.Value) { - OnComplete(std::forward(Params)); + OnComplete(std::forward(Params)); Stats.CacheHitCount.Increment(1); } @@ -1166,7 +1167,7 @@ public: { Stats.CacheErrorCount.Increment(1); - ZEN_ERROR("get cache payloads(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", + ZEN_ERROR("get cache values(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", Endpoint->GetEndpointInfo().Url, Result.Error.Reason, Result.Error.ErrorCode); @@ -1178,13 +1179,13 @@ public: for (size_t Index : RemainingKeys) { - OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = IoBuffer()}); } } - virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override + virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override { - ZEN_TRACE_CPU("Upstream::GetCachePayload"); + ZEN_TRACE_CPU("Upstream::GetCacheValue"); if (m_Options.ReadUpstream) { @@ -1200,7 +1201,7 @@ public: { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCachePayload(CacheKey, PayloadId); + Result = Endpoint->GetCacheValue(CacheKey, ValueContentId); } Stats.CacheGetCount.Increment(1); @@ -1217,7 +1218,7 @@ public: { Stats.CacheErrorCount.Increment(1); - ZEN_ERROR("get cache payload FAILED, endpoint '{}', reason '{}', error code '{}'", + ZEN_ERROR("get cache value FAILED, endpoint '{}', reason '{}', error code '{}'", Endpoint->GetEndpointInfo().Url, Result.Error.Reason, Result.Error.ErrorCode); @@ -1302,18 +1303,18 @@ private: return; } - for (const IoHash& PayloadId : CacheRecord.PayloadIds) + for (const IoHash& ValueContentId : CacheRecord.ValueContentIds) { - if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId)) + if (IoBuffer Payload = m_CidStore.FindChunkByCid(ValueContentId)) { Payloads.push_back(Payload); } else { - ZEN_WARN("process upstream FAILED, '{}/{}/{}', payload doesn't exist in CAS", + ZEN_WARN("process upstream FAILED, '{}/{}/{}', ValueContentId doesn't exist in CAS", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, - PayloadId); + ValueContentId); return; } } -- cgit v1.2.3