diff options
| author | mattpetersepic <[email protected]> | 2022-01-27 10:52:07 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-01-27 10:52:07 -0700 |
| commit | fb1afc87a436ff6374daeaef5f7682f098a85cd7 (patch) | |
| tree | 89810fc5d85ab81cce7fdc4e26849ad0d25cf2d1 | |
| parent | Add batched CacheRecord put rpc (#38) (diff) | |
| download | zen-fb1afc87a436ff6374daeaef5f7682f098a85cd7.tar.xz zen-fb1afc87a436ff6374daeaef5f7682f098a85cd7.zip | |
Rename Paylod to Value to match the client side. Rename PayloadId to ValueContentId where its a hash instead of an oid.
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 99 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 10 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 111 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 24 | ||||
| -rw-r--r-- | zenserver/upstream/zen.cpp | 8 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 4 | ||||
| -rw-r--r-- | zenutil/include/zenutil/cache/cachekey.h | 6 |
7 files changed, 134 insertions, 128 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index cb29b3645..c2cd679b1 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -142,13 +142,13 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) CachePolicy PolicyFromURL = ParseCachePolicy(Request.GetQueryParams()); - if (Ref.PayloadId == IoHash::Zero) + if (Ref.ValueContentId == IoHash::Zero) { return HandleCacheRecordRequest(Request, Ref, PolicyFromURL); } else { - return HandleCachePayloadRequest(Request, Ref, PolicyFromURL); + return HandleCacheValueRequest(Request, Ref, PolicyFromURL); } return; @@ -511,8 +511,9 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { - m_UpstreamCache.EnqueueUpstream( - {.Type = ZenContentType::kCbObject, .Key = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(ValidAttachments)}); + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject, + .Key = {Ref.BucketSegment, Ref.HashKey}, + .ValueContentIds = std::move(ValidAttachments)}); } Request.WriteResponse(HttpResponseCode::Created); @@ -593,8 +594,9 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { - m_UpstreamCache.EnqueueUpstream( - {.Type = ZenContentType::kCbPackage, .Key = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(ValidAttachments)}); + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, + .Key = {Ref.BucketSegment, Ref.HashKey}, + .ValueContentIds = std::move(ValidAttachments)}); } Request.WriteResponse(HttpResponseCode::Created); @@ -606,16 +608,16 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request } void -HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) +HttpStructuredCacheService::HandleCacheValueRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { switch (Request.RequestVerb()) { case HttpVerb::kHead: case HttpVerb::kGet: - HandleGetCachePayload(Request, Ref, PolicyFromURL); + HandleGetCacheValue(Request, Ref, PolicyFromURL); break; case HttpVerb::kPut: - HandlePutCachePayload(Request, Ref, PolicyFromURL); + HandlePutCacheValue(Request, Ref, PolicyFromURL); break; default: break; @@ -623,16 +625,17 @@ HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request } void -HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) +HttpStructuredCacheService::HandleGetCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { - IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); + IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId); bool InUpstreamCache = false; CachePolicy Policy = PolicyFromURL; - const bool QueryUpstream = !Payload && EnumHasAllFlags(Policy, CachePolicy::QueryRemote); + const bool QueryUpstream = !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote); if (QueryUpstream) { - if (auto UpstreamResult = m_UpstreamCache.GetCachePayload({Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId); UpstreamResult.Success) + if (auto UpstreamResult = m_UpstreamCache.GetCacheValue({Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); + UpstreamResult.Success) { if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) { @@ -646,9 +649,9 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques } } - if (!Payload) + if (!Value) { - ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, ToString(Request.AcceptContentType())); + ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, ToString(Request.AcceptContentType())); m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound); } @@ -656,9 +659,9 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", Ref.BucketSegment, Ref.HashKey, - Ref.PayloadId, - NiceBytes(Payload.Size()), - ToString(Payload.GetContentType()), + Ref.ValueContentId, + NiceBytes(Value.Size()), + ToString(Value.GetContentType()), InUpstreamCache ? "UPSTREAM" : "LOCAL"); m_CacheStats.HitCount++; @@ -673,12 +676,12 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques } else { - Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload); + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value); } } void -HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) +HttpStructuredCacheService::HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { // Note: Individual cacherecord values are not propagated upstream until a valid cache record has been stored ZEN_UNUSED(PolicyFromURL); @@ -699,9 +702,11 @@ HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Reques return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv); } - if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.PayloadId) + if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.ValueContentId) { - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "ValueId does not match attachment hash"sv); + return Request.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "ValueContentId does not match attachment hash"sv); } CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed); @@ -709,7 +714,7 @@ HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Reques ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}' ({})", Ref.BucketSegment, Ref.HashKey, - Ref.PayloadId, + Ref.ValueContentId, NiceBytes(Body.Size()), ToString(Body.GetContentType()), Result.New ? "NEW" : "OLD"); @@ -738,13 +743,13 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& } std::string_view HashSegment; - std::string_view PayloadSegment; + std::string_view ValueSegment; - std::string_view::size_type PayloadSplitOffset = Key.find_last_of('/'); + std::string_view::size_type ValueSplitOffset = Key.find_last_of('/'); // We know there is a slash so no need to check for npos return - if (PayloadSplitOffset == BucketSplitOffset) + if (ValueSplitOffset == BucketSplitOffset) { // Basic cache record lookup HashSegment = Key.substr(BucketSplitOffset + 1); @@ -752,8 +757,8 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& else { // Cache record + valueid lookup - HashSegment = Key.substr(BucketSplitOffset + 1, PayloadSplitOffset - BucketSplitOffset - 1); - PayloadSegment = Key.substr(PayloadSplitOffset + 1); + HashSegment = Key.substr(BucketSplitOffset + 1, ValueSplitOffset - BucketSplitOffset - 1); + ValueSegment = Key.substr(ValueSplitOffset + 1); } if (HashSegment.size() != IoHash::StringLength) @@ -761,9 +766,9 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& return false; } - if (!PayloadSegment.empty() && PayloadSegment.size() == IoHash::StringLength) + if (!ValueSegment.empty() && ValueSegment.size() == IoHash::StringLength) { - const bool IsOk = ParseHexBytes(PayloadSegment.data(), PayloadSegment.size(), OutRef.PayloadId.Hash); + const bool IsOk = ParseHexBytes(ValueSegment.data(), ValueSegment.size(), OutRef.ValueContentId.Hash); if (!IsOk) { @@ -772,7 +777,7 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& } else { - OutRef.PayloadId = IoHash::Zero; + OutRef.ValueContentId = IoHash::Zero; } const bool IsOk = ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash); @@ -826,7 +831,7 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) } else if (Method == "GetCacheValues"sv) { - HandleRpcGetCachePayloads(AsyncRequest, Object); + HandleRpcGetCacheValues(AsyncRequest, Object); } else { @@ -1211,9 +1216,9 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req } void -HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView RpcRequest) +HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView RpcRequest) { - ZEN_TRACE_CPU("Z$::RpcGetCachePayloads"); + ZEN_TRACE_CPU("Z$::RpcGetCacheValues"); ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv); @@ -1228,7 +1233,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); const CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash()); const IoHash ChunkId = RequestObject["ChunkId"sv].AsHash(); - const Oid PayloadId = RequestObject["ValueId"sv].AsObjectId(); + const Oid ValueId = RequestObject["ValueId"sv].AsObjectId(); const uint64_t RawOffset = RequestObject["RawOffset"sv].AsUInt64(); const uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64(); std::string_view PolicyText = RequestObject["Policy"sv].AsString(); @@ -1237,7 +1242,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re // Note we could use emplace_back here but [Apple] LLVM-12's C++ library // can't infer a constructor like other platforms (or can't handle an // initializer list like others do). - ChunkRequests.push_back({Key, ChunkId, PayloadId, RawOffset, RawSize, ChunkPolicy}); + ChunkRequests.push_back({Key, ChunkId, ValueId, RawOffset, RawSize, ChunkPolicy}); } if (ChunkRequests.empty()) @@ -1250,8 +1255,8 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId) // is missing, load the cache record and try to find the raw hash from the ValueId. { - const auto GetChunkIdFromPayloadId = [](CbObjectView Record, const Oid& PayloadId) -> IoHash { - if (PayloadId) + const auto GetChunkIdFromValueId = [](CbObjectView Record, const Oid& ValueId) -> IoHash { + if (ValueId) { // A valid ValueId indicates that the caller is searching for a Value in a Record // that was Put with ICacheStore::Put @@ -1260,7 +1265,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re CbObjectView ValueObject = ValueView.AsObjectView(); const Oid Id = ValueObject["Id"sv].AsObjectId(); - if (Id == PayloadId) + if (Id == ValueId) { return ValueObject["RawHash"sv].AsHash(); } @@ -1270,7 +1275,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re if (CbObjectView ValueObject = Record["Value"sv].AsObjectView()) { const Oid Id = ValueObject["Id"sv].AsObjectId(); - if (Id == PayloadId) + if (Id == ValueId) { return ValueObject["RawHash"sv].AsHash(); } @@ -1281,7 +1286,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re CbObjectView AttachmentObject = AttachmentView.AsObjectView(); const Oid Id = AttachmentObject["Id"sv].AsObjectId(); - if (Id == PayloadId) + if (Id == ValueId) { return AttachmentObject["RawHash"sv].AsHash(); } @@ -1319,7 +1324,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re if (CurrentRecordBuffer) { - ChunkRequest.ChunkId = GetChunkIdFromPayloadId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.PayloadId); + ChunkRequest.ChunkId = GetChunkIdFromValueId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.ValueId); } } } @@ -1366,8 +1371,8 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re if (!UpstreamRequests.empty()) { - const auto OnCachePayloadGetComplete = [this, &Chunks](CachePayloadGetCompleteParams&& Params) { - if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Payload))) + const auto OnCacheValueGetComplete = [this, &Chunks](CacheValueGetCompleteParams&& Params) { + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value))) { m_CidStore.AddChunk(Compressed); @@ -1375,11 +1380,11 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re Params.Request.Key.Bucket, Params.Request.Key.Hash, Params.Request.ChunkId, - NiceBytes(Params.Payload.GetSize()), + NiceBytes(Params.Value.GetSize()), "UPSTREAM"); ZEN_ASSERT(Params.RequestIndex < Chunks.size()); - Chunks[Params.RequestIndex] = std::move(Params.Payload); + Chunks[Params.RequestIndex] = std::move(Params.Value); m_CacheStats.HitCount++; m_CacheStats.UpstreamHitCount++; @@ -1391,7 +1396,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re } }; - m_UpstreamCache.GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete)); + m_UpstreamCache.GetCacheValues(ChunkRequests, UpstreamRequests, std::move(OnCacheValueGetComplete)); } CbPackage RpcResponse; diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 5b6b10898..88bf6cda1 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -74,7 +74,7 @@ private: { std::string BucketSegment; IoHash HashKey; - IoHash PayloadId; + IoHash ValueContentId; }; struct CacheStats @@ -94,13 +94,13 @@ private: void HandleCacheRecordRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); void HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl); void HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); - void HandleCachePayloadRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); - void HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); - void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); + void HandleCacheValueRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); + void HandleGetCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); + void HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); void HandleRpcRequest(zen::HttpServerRequest& Request); void HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest); void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest); - void HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView BatchRequest); + void HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView BatchRequest); void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override; virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override; diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 091406db3..657cfb729 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -270,14 +270,14 @@ namespace detail { return Result; } - virtual GetUpstreamCacheResult GetCachePayload(const CacheKey&, const IoHash& PayloadId) override + virtual GetUpstreamCacheResult GetCacheValue(const CacheKey&, const IoHash& ValueContentId) override { - ZEN_TRACE_CPU("Upstream::Horde::GetSingleCachePayload"); + ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheValue"); try { CloudCacheSession Session(m_Client); - const CloudCacheResult Result = Session.GetCompressedBlob(PayloadId); + const CloudCacheResult Result = Session.GetCompressedBlob(ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -301,11 +301,11 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> RequestIndex, - OnCachePayloadGetComplete&& OnComplete) override final + virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCacheValueGetComplete&& OnComplete) override final { - ZEN_TRACE_CPU("Upstream::Horde::GetCachePayloads"); + ZEN_TRACE_CPU("Upstream::Horde::GetCacheValues"); CloudCacheSession Session(m_Client); GetUpstreamCacheResult Result; @@ -325,7 +325,7 @@ namespace detail { m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); } - OnComplete({.Request = Request, .RequestIndex = Index, .Payload = Payload}); + OnComplete({.Request = Request, .RequestIndex = Index, .Value = Payload}); } return Result; @@ -333,11 +333,11 @@ namespace detail { virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, - std::span<IoBuffer const> Payloads) override + std::span<IoBuffer const> Values) override { ZEN_TRACE_CPU("Upstream::Horde::PutCacheRecord"); - ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); + ZEN_ASSERT(CacheRecord.ValueContentIds.size() == Values.size()); const int32_t MaxAttempts = 3; try @@ -371,30 +371,31 @@ namespace detail { int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; - const auto PutBlobs = [&](std::span<IoHash> PayloadIds, std::string& OutReason) -> bool { - for (const IoHash& PayloadId : PayloadIds) + const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool { + for (const IoHash& ValueContentId : ValueContentIds) { - const auto It = std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), PayloadId); + const auto It = + std::find(std::begin(CacheRecord.ValueContentIds), std::end(CacheRecord.ValueContentIds), ValueContentId); - if (It == std::end(CacheRecord.PayloadIds)) + if (It == std::end(CacheRecord.ValueContentIds)) { - OutReason = fmt::format("value '{}' MISSING from local cache", PayloadId); + OutReason = fmt::format("value '{}' MISSING from local cache", ValueContentId); return false; } - const size_t Idx = std::distance(std::begin(CacheRecord.PayloadIds), It); + const size_t Idx = std::distance(std::begin(CacheRecord.ValueContentIds), It); CloudCacheResult BlobResult; for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) { - BlobResult = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); + BlobResult = Session.PutCompressedBlob(CacheRecord.ValueContentIds[Idx], Values[Idx]); } m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); if (!BlobResult.Success) { - OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", PayloadId, BlobResult.Reason); + OutReason = fmt::format("upload value '{}' FAILED, reason '{}'", ValueContentId, BlobResult.Reason); return false; } @@ -685,14 +686,14 @@ namespace detail { return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } - virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override + virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override { - ZEN_TRACE_CPU("Upstream::Zen::GetSingleCachePayload"); + ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheValue"); try { ZenStructuredCacheSession Session(*m_Client); - const ZenCacheResult Result = Session.GetCachePayload(CacheKey.Bucket, CacheKey.Hash, PayloadId); + const ZenCacheResult Result = Session.GetCacheValue(CacheKey.Bucket, CacheKey.Hash, ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -716,11 +717,11 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> RequestIndex, - OnCachePayloadGetComplete&& OnComplete) override final + virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCacheValueGetComplete&& OnComplete) override final { - ZEN_TRACE_CPU("Upstream::Zen::GetCachePayloads"); + ZEN_TRACE_CPU("Upstream::Zen::GetCacheValues"); std::vector<size_t> IndexMap; IndexMap.reserve(RequestIndex.size()); @@ -744,7 +745,7 @@ namespace detail { BatchRequest << "Bucket"sv << Request.Key.Bucket; BatchRequest << "Hash"sv << Request.Key.Hash; BatchRequest.EndObject(); - BatchRequest.AddObjectId("ValueId"sv, Request.PayloadId); + BatchRequest.AddObjectId("ValueId"sv, Request.ValueId); BatchRequest << "ChunkId"sv << Request.ChunkId; BatchRequest << "RawOffset"sv << Request.RawOffset; BatchRequest << "RawSize"sv << Request.RawSize; @@ -784,7 +785,7 @@ namespace detail { } } - OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = std::move(Payload)}); + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = std::move(Payload)}); } return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; @@ -793,7 +794,7 @@ namespace detail { for (size_t Index : RequestIndex) { - OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = IoBuffer()}); } return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; @@ -801,11 +802,11 @@ namespace detail { virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, - std::span<IoBuffer const> Payloads) override + std::span<IoBuffer const> Values) override { ZEN_TRACE_CPU("Upstream::Zen::PutCacheRecord"); - ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); + ZEN_ASSERT(CacheRecord.ValueContentIds.size() == Values.size()); const int32_t MaxAttempts = 3; try @@ -820,9 +821,9 @@ namespace detail { CbPackage Package; Package.SetObject(CbObject(SharedBuffer(RecordValue))); - for (const IoBuffer& Payload : Payloads) + for (const IoBuffer& Value : Values) { - if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Payload))) + if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Value))) { Package.AddAttachment(CbAttachment(AttachmentBuffer)); } @@ -848,15 +849,15 @@ namespace detail { } else { - for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) + for (size_t Idx = 0, Count = Values.size(); Idx < Count; Idx++) { Result.Success = false; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - Result = Session.PutCachePayload(CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - CacheRecord.PayloadIds[Idx], - Payloads[Idx]); + Result = Session.PutCacheValue(CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, + CacheRecord.ValueContentIds[Idx], + Values[Idx]); } m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -866,7 +867,7 @@ namespace detail { if (!Result.Success) { - return {.Reason = "Failed to upload payload", + return {.Reason = "Failed to upload value", .Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = false}; @@ -1115,11 +1116,11 @@ public: } } - virtual void GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> RequestIndex, - OnCachePayloadGetComplete&& OnComplete) override final + virtual void GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCacheValueGetComplete&& OnComplete) override final { - ZEN_TRACE_CPU("Upstream::GetCachePayloads"); + ZEN_TRACE_CPU("Upstream::GetCacheValues"); std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); @@ -1145,10 +1146,10 @@ public: { metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming); - Result = Endpoint->GetCachePayloads(CacheChunkRequests, RemainingKeys, [&](CachePayloadGetCompleteParams&& Params) { - if (Params.Payload) + Result = Endpoint->GetCacheValues(CacheChunkRequests, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) { + if (Params.Value) { - OnComplete(std::forward<CachePayloadGetCompleteParams>(Params)); + OnComplete(std::forward<CacheValueGetCompleteParams>(Params)); Stats.CacheHitCount.Increment(1); } @@ -1166,7 +1167,7 @@ public: { Stats.CacheErrorCount.Increment(1); - ZEN_ERROR("get cache payloads(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", + ZEN_ERROR("get cache values(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", Endpoint->GetEndpointInfo().Url, Result.Error.Reason, Result.Error.ErrorCode); @@ -1178,13 +1179,13 @@ public: for (size_t Index : RemainingKeys) { - OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = IoBuffer()}); } } - virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override + virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) override { - ZEN_TRACE_CPU("Upstream::GetCachePayload"); + ZEN_TRACE_CPU("Upstream::GetCacheValue"); if (m_Options.ReadUpstream) { @@ -1200,7 +1201,7 @@ public: { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = Endpoint->GetCachePayload(CacheKey, PayloadId); + Result = Endpoint->GetCacheValue(CacheKey, ValueContentId); } Stats.CacheGetCount.Increment(1); @@ -1217,7 +1218,7 @@ public: { Stats.CacheErrorCount.Increment(1); - ZEN_ERROR("get cache payload FAILED, endpoint '{}', reason '{}', error code '{}'", + ZEN_ERROR("get cache value FAILED, endpoint '{}', reason '{}', error code '{}'", Endpoint->GetEndpointInfo().Url, Result.Error.Reason, Result.Error.ErrorCode); @@ -1302,18 +1303,18 @@ private: return; } - for (const IoHash& PayloadId : CacheRecord.PayloadIds) + for (const IoHash& ValueContentId : CacheRecord.ValueContentIds) { - if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId)) + if (IoBuffer Payload = m_CidStore.FindChunkByCid(ValueContentId)) { Payloads.push_back(Payload); } else { - ZEN_WARN("process upstream FAILED, '{}/{}/{}', payload doesn't exist in CAS", + ZEN_WARN("process upstream FAILED, '{}/{}/{}', ValueContentId doesn't exist in CAS", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, - PayloadId); + ValueContentId); return; } } diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 16d8c7929..2087b1fba 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -27,7 +27,7 @@ struct UpstreamCacheRecord { ZenContentType Type = ZenContentType::kBinary; CacheKey Key; - std::vector<IoHash> PayloadIds; + std::vector<IoHash> ValueContentIds; }; struct UpstreamCacheOptions @@ -73,14 +73,14 @@ struct CacheRecordGetCompleteParams using OnCacheRecordGetComplete = std::function<void(CacheRecordGetCompleteParams&&)>; -struct CachePayloadGetCompleteParams +struct CacheValueGetCompleteParams { const CacheChunkRequest& Request; size_t RequestIndex{~size_t(0)}; - IoBuffer Payload; + IoBuffer Value; }; -using OnCachePayloadGetComplete = std::function<void(CachePayloadGetCompleteParams&&)>; +using OnCacheValueGetComplete = std::function<void(CacheValueGetCompleteParams&&)>; struct UpstreamEndpointStats { @@ -156,11 +156,11 @@ public: const CacheRecordPolicy& Policy, OnCacheRecordGetComplete&& OnComplete) = 0; - virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) = 0; + virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& PayloadId) = 0; - virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> RequestIndex, - OnCachePayloadGetComplete&& OnComplete) = 0; + virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCacheValueGetComplete&& OnComplete) = 0; virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, @@ -190,11 +190,11 @@ public: const CacheRecordPolicy& RecordPolicy, OnCacheRecordGetComplete&& OnComplete) = 0; - virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) = 0; + virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) = 0; - virtual void GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> RequestIndex, - OnCachePayloadGetComplete&& OnComplete) = 0; + virtual void GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCacheValueGetComplete&& OnComplete) = 0; virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index cd7f48334..a2666ac02 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -433,10 +433,10 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas } ZenCacheResult -ZenStructuredCacheSession::GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId) +ZenStructuredCacheSession::GetCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString(); + Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); @@ -486,10 +486,10 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas } ZenCacheResult -ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId, IoBuffer Payload) +ZenStructuredCacheSession::PutCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId, IoBuffer Payload) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString(); + Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index c2be2165a..8cc4c121d 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -123,9 +123,9 @@ public: ZenCacheResult CheckHealth(); ZenCacheResult GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type); - ZenCacheResult GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId); + ZenCacheResult GetCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId); ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type); - ZenCacheResult PutCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId, IoBuffer Payload); + ZenCacheResult PutCacheValue(std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId, IoBuffer Payload); ZenCacheResult InvokeRpc(const CbObjectView& Request); private: diff --git a/zenutil/include/zenutil/cache/cachekey.h b/zenutil/include/zenutil/cache/cachekey.h index fb36c7759..a0a83a883 100644 --- a/zenutil/include/zenutil/cache/cachekey.h +++ b/zenutil/include/zenutil/cache/cachekey.h @@ -44,7 +44,7 @@ struct CacheChunkRequest { CacheKey Key; IoHash ChunkId; - Oid PayloadId; + Oid ValueId; uint64_t RawOffset = 0ull; uint64_t RawSize = ~uint64_t(0); CachePolicy Policy = CachePolicy::Default; @@ -69,11 +69,11 @@ operator<(const CacheChunkRequest& A, const CacheChunkRequest& B) { return false; } - if (A.PayloadId < B.PayloadId) + if (A.ValueId < B.ValueId) { return true; } - if (B.PayloadId < A.PayloadId) + if (B.ValueId < A.ValueId) { return false; } |