diff options
| author | Per Larsson <[email protected]> | 2021-11-11 11:19:17 +0100 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-11-11 11:19:17 +0100 |
| commit | 2c0e2ab5de21b13dcd25758ca3b96af889db7137 (patch) | |
| tree | 7ea6325cbbd7b30b996635522975c37b144664f5 | |
| parent | Honor cache policy. (diff) | |
| download | zen-2c0e2ab5de21b13dcd25758ca3b96af889db7137.tar.xz zen-2c0e2ab5de21b13dcd25758ca3b96af889db7137.zip | |
Added batch API to upstream endpoints.
| -rw-r--r-- | zenserver/cache/cachekey.cpp | 21 | ||||
| -rw-r--r-- | zenserver/cache/cachekey.h | 4 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 93 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 346 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 65 | ||||
| -rw-r--r-- | zenserver/upstream/zen.cpp | 29 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 3 |
7 files changed, 413 insertions, 148 deletions
diff --git a/zenserver/cache/cachekey.cpp b/zenserver/cache/cachekey.cpp index eca2d95d5..2ead9ac58 100644 --- a/zenserver/cache/cachekey.cpp +++ b/zenserver/cache/cachekey.cpp @@ -3,6 +3,7 @@ #include "cachekey.h" #include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> #include <zencore/string.h> namespace zen { @@ -113,7 +114,7 @@ CacheRecordPolicy::GetPayloadPolicy(const Oid& PayloadId) const } bool -CacheRecordPolicy::FromCompactBinary(CbObjectView RecordPolicyObject, CacheRecordPolicy& OutRecordPolicy) +CacheRecordPolicy::Load(CbObjectView RecordPolicyObject, CacheRecordPolicy& OutRecordPolicy) { using namespace std::literals; @@ -139,6 +140,24 @@ CacheRecordPolicy::FromCompactBinary(CbObjectView RecordPolicyObject, CacheRecor return true; } +void +CacheRecordPolicy::Save(const CacheRecordPolicy& Policy, CbWriter& Writer) +{ + Writer << "RecordPolicy"sv << static_cast<uint32_t>(Policy.GetRecordPolicy()); + Writer << "DefaultPayloadPolicy"sv << static_cast<uint32_t>(Policy.GetDefaultPayloadPolicy()); + + if (!Policy.m_PayloadPolicies.empty()) + { + Writer.BeginArray("PayloadPolicies"sv); + for (const auto& Kv : Policy.m_PayloadPolicies) + { + Writer.AddObjectId("Id"sv, Kv.first); + Writer << "Policy"sv << static_cast<uint32_t>(Kv.second); + } + Writer.EndArray(); + } +} + const CacheKey CacheKey::Empty = CacheKey{.Bucket = std::string(), .Hash = IoHash()}; } // namespace zen diff --git a/zenserver/cache/cachekey.h b/zenserver/cache/cachekey.h index 6ce5d3aab..c32f7ed87 100644 --- a/zenserver/cache/cachekey.h +++ b/zenserver/cache/cachekey.h @@ -12,6 +12,7 @@ namespace zen { class CbObjectView; +class CbWriter; enum class CachePolicy : uint8_t { @@ -50,7 +51,8 @@ public: CachePolicy GetPayloadPolicy(const Oid& PayloadId) const; CachePolicy GetDefaultPayloadPolicy() const { return m_DefaultPayloadPolicy; } - static bool FromCompactBinary(CbObjectView RecordPolicyObject, CacheRecordPolicy& OutRecordPolicy); + static bool Load(CbObjectView RecordPolicyObject, CacheRecordPolicy& OutRecordPolicy); + static void Save(const CacheRecordPolicy& Policy, CbWriter& Writer); private: using PayloadPolicyMap = std::unordered_map<Oid, CachePolicy, Oid::Hasher>; diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index de2fcb27c..721942cc8 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -618,7 +618,7 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques if (QueryUpstream) { - if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId}); + if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId); UpstreamResult.Success) { if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) @@ -830,7 +830,7 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R CbObjectView Params = BatchRequest["Params"sv].AsObjectView(); CacheRecordPolicy Policy; - CacheRecordPolicy::FromCompactBinary(Params["Policy"sv].AsObjectView(), Policy); + CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView(), Policy); const bool SkipAttachments = (Policy.GetRecordPolicy() & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments; const bool QueryRemote = m_UpstreamCache && ((Policy.GetRecordPolicy() & CachePolicy::QueryRemote) == CachePolicy::QueryRemote); @@ -899,18 +899,12 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R if (!UpstreamRequests.empty() && m_UpstreamCache) { - auto UpstreamResult = m_UpstreamCache->GetCacheRecords( - CacheKeys, - UpstreamRequests, - [this, &CacheKeys, &CacheValues, &Payloads, SkipAttachments](size_t KeyIndex, IoBuffer UpstreamValue) { - const CacheKey& Key = CacheKeys[KeyIndex]; - CbPackage UpstreamPackage; - if (UpstreamValue && UpstreamPackage.TryLoad(UpstreamValue)) + const auto OnCacheRecordGetComplete = + [this, &CacheKeys, &CacheValues, &Payloads, &Missing, SkipAttachments](CacheRecordGetCompleteParams&& Params) { + if (Params.Record) { - CbObjectView CacheRecord = UpstreamPackage.GetObject(); - - CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) { - if (const CbAttachment* Attachment = UpstreamPackage.FindAttachment(AttachmentHash.AsHash())) + Params.Record.IterateAttachments([&](CbFieldView AttachmentHash) { + if (const CbAttachment* Attachment = Params.Package.FindAttachment(AttachmentHash.AsHash())) { if (CompressedBuffer Chunk = Attachment->AsCompressedBinary()) { @@ -925,20 +919,21 @@ HttpStructuredCacheService::HandleBatchGetCacheRecords(zen::HttpServerRequest& R }); ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)", - Key.Bucket, - Key.Hash, - NiceBytes(UpstreamValue.Size()), - ToString(UpstreamValue.GetContentType())); + Params.CacheKey.Bucket, + Params.CacheKey.Hash, + NiceBytes(Params.Record.GetView().GetSize()), + ToString(HttpContentType::kCbObject)); - CacheValues[KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(CacheRecord.GetView()); + CacheValues[Params.KeyIndex] = IoBufferBuilder::MakeCloneFromMemory(Params.Record.GetView()); m_CacheStats.UpstreamHitCount++; } - }); + else + { + Missing.push_back(Params.KeyIndex); + } + }; - for (size_t MissingUpstream : UpstreamResult.Missing) - { - Missing.push_back(MissingUpstream); - } + m_UpstreamCache->GetCacheRecords(CacheKeys, UpstreamRequests, Policy, std::move(OnCacheRecordGetComplete)); } for (size_t KeyIndex : Missing) @@ -1104,40 +1099,32 @@ HttpStructuredCacheService::HandleBatchGetCachePayloads(zen::HttpServerRequest& if (!UpstreamRequests.empty() && m_UpstreamCache) { - auto UpstreamResult = m_UpstreamCache->GetCachePayloads( - ChunkRequests, - UpstreamRequests, - [this, &ChunkRequests, &Chunks](size_t ChunkIndex, IoBuffer UpstreamValue) { - const CacheChunkRequest& ChunkRequest = ChunkRequests[ChunkIndex]; - - if (CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamValue))) - { - auto InsertResult = m_CidStore.AddChunk(CompressedChunk); - IoBuffer Chunk = CompressedChunk.GetCompressed().Flatten().AsIoBuffer(); + const auto OnCachePayloadGetComplete = [this, &ChunkRequests, &Chunks, &Missing](CachePayloadGetCompleteParams&& Params) { + if (Params.Payload) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Payload)); + auto InsertResult = m_CidStore.AddChunk(Compressed); - ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", - ChunkRequest.Key.Bucket, - ChunkRequest.Key.Hash, - ChunkRequest.ChunkId, - NiceBytes(Chunk.GetSize()), - ToString(Chunk.GetContentType()), - "UPSTREAM"); + ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", + Params.Request.Key.Bucket, + Params.Request.Key.Hash, + Params.Request.ChunkId, + NiceBytes(Params.Payload.GetSize()), + ToString(Params.Payload.GetContentType()), + "UPSTREAM"); - Chunks[ChunkIndex] = Chunk; + Chunks[Params.RequestIndex] = std::move(Params.Payload); - m_CacheStats.HitCount++; - m_CacheStats.UpstreamHitCount++; - } - else - { - ZEN_WARN("got uncompressed upstream cache payload"); - } - }); + m_CacheStats.HitCount++; + m_CacheStats.UpstreamHitCount++; + } + else + { + Missing.push_back(Params.RequestIndex); + } + }; - for (size_t MissingUpstream : UpstreamResult.Missing) - { - Missing.push_back(MissingUpstream); - } + m_UpstreamCache->GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete)); } for (size_t RequestIndex : Missing) diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 7ef0caf62..0a0706656 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -70,7 +70,7 @@ namespace detail { virtual std::string_view DisplayName() const override { return m_DisplayName; } - virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override { try { @@ -144,12 +144,54 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span<CacheKey> CacheKeys, + std::span<size_t> KeyIndex, + const CacheRecordPolicy& Policy, + OnCacheRecordGetComplete&& OnComplete) override + { + ZEN_UNUSED(Policy); + + CloudCacheSession Session(m_Client); + + for (size_t Index : KeyIndex) + { + const CacheKey& CacheKey = CacheKeys[Index]; + CloudCacheResult Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); + + CbPackage Package; + CbObjectView Record; + + if (Result.Success) + { + const CbValidateError ValidationResult = ValidateCompactBinary(Result.Response, CbValidateMode::All); + if (ValidationResult == CbValidateError::None) + { + Record = CbObjectView(Result.Response.GetData()); + Record.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) { + CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); + if (AttachmentResult.Success) + { + if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response))) + { + Package.AddAttachment(CbAttachment(Chunk)); + } + } + }); + } + } + + OnComplete({.CacheKey = CacheKey, .KeyIndex = Index, .Record = Record, .Package = Package}); + } + + return {}; + } + + virtual GetUpstreamCacheResult GetCachePayload(const CacheKey&, const IoHash& PayloadId) override { try { CloudCacheSession Session(m_Client); - const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); + const CloudCacheResult Result = Session.GetCompressedBlob(PayloadId); if (Result.ErrorCode == 0) { @@ -171,6 +213,29 @@ namespace detail { } } + virtual GetUpstreamCacheBatchResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCachePayloadGetComplete&& OnComplete) override final + { + CloudCacheSession Session(m_Client); + + for (size_t Index : RequestIndex) + { + const CacheChunkRequest& Request = CacheChunkRequests[Index]; + const CloudCacheResult Result = Session.GetCompressedBlob(Request.ChunkId); + + OnComplete({.Request = Request, .RequestIndex = Index, .Payload = Result.Response}); + + if (Result.ErrorCode) + { + m_HealthOk = false; + break; + } + } + + return {}; + } + virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, std::span<IoBuffer const> Payloads) override @@ -419,7 +484,7 @@ namespace detail { virtual std::string_view DisplayName() const override { return m_DisplayName; } - virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override { try { @@ -446,13 +511,81 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span<CacheKey> CacheKeys, + std::span<size_t> KeyIndex, + const CacheRecordPolicy& Policy, + OnCacheRecordGetComplete&& OnComplete) override + { + std::vector<size_t> IndexMap; + IndexMap.reserve(KeyIndex.size()); + + CbObjectWriter BatchRequest; + BatchRequest << "Method"sv + << "GetCacheRecords"; + + BatchRequest.BeginObject("Params"sv); + { + BatchRequest.BeginArray("CacheKeys"sv); + for (size_t Index : KeyIndex) + { + const CacheKey& Key = CacheKeys[Index]; + IndexMap.push_back(Index); + + BatchRequest.BeginObject(); + BatchRequest << "Bucket"sv << Key.Bucket; + BatchRequest << "Hash"sv << Key.Hash; + BatchRequest.EndObject(); + } + BatchRequest.EndArray(); + + BatchRequest.BeginObject("Policy"sv); + CacheRecordPolicy::Save(Policy, BatchRequest); + BatchRequest.EndObject(); + } + BatchRequest.EndObject(); + + CbPackage BatchResponse; + bool Success = false; + + { + ZenStructuredCacheSession Session(*m_Client); + ZenCacheResult Result = Session.InvokeRpc(BatchRequest.Save()); + if (Result.Success) + { + Success = BatchResponse.TryLoad(Result.Response); + } + else if (Result.ErrorCode) + { + Success = m_HealthOk = false; + } + } + + if (!Success) + { + for (size_t Index : KeyIndex) + { + OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); + } + + return {}; + } + + for (size_t LocalIndex = 0; CbFieldView Record : BatchResponse.GetObject()["Result"sv]) + { + const size_t Index = IndexMap[LocalIndex++]; + OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = Record.AsObjectView(), .Package = BatchResponse}); + } + + return {}; + } + + virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override { try { ZenStructuredCacheSession Session(*m_Client); const ZenCacheResult Result = - Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId); + Session.GetCachePayload(CacheKey.Bucket, CacheKey.Hash, PayloadId); if (Result.ErrorCode == 0) { @@ -474,6 +607,91 @@ namespace detail { } } + virtual GetUpstreamCacheBatchResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCachePayloadGetComplete&& OnComplete) override final + { + std::vector<size_t> IndexMap; + IndexMap.reserve(RequestIndex.size()); + + CbObjectWriter BatchRequest; + BatchRequest << "Method"sv + << "GetCachePayloads"; + + BatchRequest.BeginObject("Params"sv); + { + BatchRequest.BeginArray("ChunkRequests"sv); + { + for (size_t Index : RequestIndex) + { + const CacheChunkRequest& Request = CacheChunkRequests[Index]; + IndexMap.push_back(Index); + + BatchRequest.BeginObject(); + { + BatchRequest.BeginObject("Key"sv); + BatchRequest << "Bucket"sv << Request.Key.Bucket; + BatchRequest << "Hash"sv << Request.Key.Hash; + BatchRequest.EndObject(); + + BatchRequest.AddObjectId("PayloadId"sv, Request.PayloadId); + BatchRequest << "ChunkId"sv << Request.ChunkId; + BatchRequest << "RawOffset"sv << Request.RawOffset; + BatchRequest << "RawSize"sv << Request.RawSize; + BatchRequest << "Policy"sv << static_cast<uint32_t>(Request.Policy); + } + BatchRequest.EndObject(); + } + } + BatchRequest.EndArray(); + } + BatchRequest.EndObject(); + + CbPackage BatchResponse; + bool Success = false; + + { + ZenStructuredCacheSession Session(*m_Client); + ZenCacheResult Result = Session.InvokeRpc(BatchRequest.Save()); + if (Result.Success) + { + Success = BatchResponse.TryLoad(Result.Response); + } + else if (Result.ErrorCode) + { + m_HealthOk = false; + } + } + + if (!Success) + { + for (size_t Index : RequestIndex) + { + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); + } + + return {}; + } + + for (int32_t LocalIndex = 0; CbFieldView AttachmentHash : BatchResponse.GetObject()["Result"sv]) + { + const size_t Index = IndexMap[LocalIndex++]; + IoBuffer Payload; + + if (const CbAttachment* Attachment = BatchResponse.FindAttachment(AttachmentHash.AsHash())) + { + if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary()) + { + Payload = Compressed.GetCompressed().Flatten().AsIoBuffer(); + } + } + + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = std::move(Payload)}); + } + + return {}; + } + virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, std::span<IoBuffer const> Payloads) override @@ -758,7 +976,7 @@ public: virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); } - virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override { if (m_Options.ReadUpstream) { @@ -780,94 +998,82 @@ public: return {}; } - virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span<CacheKey> CacheKeys, - std::span<size_t> KeyIndex, - OnCacheGetComplete OnComplete) override final + virtual void GetCacheRecords(std::span<CacheKey> CacheKeys, + std::span<size_t> KeyIndex, + const CacheRecordPolicy& Policy, + OnCacheRecordGetComplete&& OnComplete) override final { - if (!m_Options.ReadUpstream) - { - return {.Missing = std::vector<size_t>(KeyIndex.begin(), KeyIndex.end())}; - } + std::vector<size_t> MissingKeys(KeyIndex.begin(), KeyIndex.end()); - GetUpstreamCacheBatchResult Result; - - for (size_t Idx : KeyIndex) + if (m_Options.ReadUpstream) { - const UpstreamCacheKey CacheKey = {CacheKeys[Idx].Bucket, CacheKeys[Idx].Hash}; - - GetUpstreamCacheResult CacheResult; for (auto& Endpoint : m_Endpoints) { - if (Endpoint->IsHealthy()) + if (Endpoint->IsHealthy() && !MissingKeys.empty()) { - CacheResult = Endpoint->GetCacheRecord(CacheKey, ZenContentType::kCbPackage); - m_Stats.Add(m_Log, *Endpoint, CacheResult, m_Endpoints); + std::vector<size_t> Missing; - if (CacheResult.Success) - { - break; - } - } - } + auto EndpointResult = + Endpoint->GetCacheRecords(CacheKeys, MissingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) { + if (Params.Record) + { + OnComplete(std::forward<CacheRecordGetCompleteParams>(Params)); + } + else + { + Missing.push_back(Params.KeyIndex); + } + }); - if (CacheResult.Success) - { - OnComplete(Idx, CacheResult.Value); - } - else - { - Result.Missing.push_back(Idx); + MissingKeys = std::move(Missing); + } } } - return Result; - } - - virtual GetUpstreamCacheBatchResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> ChunkIndex, - OnCacheGetComplete OnComplete) override final - { - if (!m_Options.ReadUpstream) + for (size_t Index : MissingKeys) { - return {.Missing = std::vector<size_t>(ChunkIndex.begin(), ChunkIndex.end())}; + OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); } + } - GetUpstreamCacheBatchResult Result; + virtual void GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCachePayloadGetComplete&& OnComplete) override final + { + std::vector<size_t> MissingPayloads(RequestIndex.begin(), RequestIndex.end()); - for (size_t Idx : ChunkIndex) + if (m_Options.ReadUpstream) { - const CacheChunkRequest& Chunk = CacheChunkRequests[Idx]; - UpstreamPayloadKey PayloadKey{{Chunk.Key.Bucket, Chunk.Key.Hash}, Chunk.ChunkId}; - - GetUpstreamCacheResult CacheResult; for (auto& Endpoint : m_Endpoints) { - if (Endpoint->IsHealthy()) + if (Endpoint->IsHealthy() && !MissingPayloads.empty()) { - CacheResult = Endpoint->GetCachePayload(PayloadKey); - m_Stats.Add(m_Log, *Endpoint, CacheResult, m_Endpoints); + std::vector<size_t> Missing; - if (CacheResult.Success) - { - break; - } - } - } + auto EndpointResult = + Endpoint->GetCachePayloads(CacheChunkRequests, MissingPayloads, [&](CachePayloadGetCompleteParams&& Params) { + if (Params.Payload) + { + OnComplete(std::forward<CachePayloadGetCompleteParams>(Params)); + } + else + { + Missing.push_back(Params.RequestIndex); + } + }); - if (CacheResult.Success) - { - OnComplete(Idx, CacheResult.Value); - } - else - { - Result.Missing.push_back(Idx); + MissingPayloads = std::move(Missing); + } } } - return Result; + for (size_t Index : MissingPayloads) + { + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); + } } - virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override { if (m_Options.ReadUpstream) { @@ -875,7 +1081,7 @@ public: { if (Endpoint->IsHealthy()) { - const GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); + const GetUpstreamCacheResult Result = Endpoint->GetCachePayload(CacheKey, PayloadId); m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); if (Result.Success) diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index a7bae302d..67bb73b4d 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -15,27 +15,17 @@ namespace zen { +class CbObjectView; +class CbPackage; class CbObjectWriter; class CidStore; class ZenCacheStore; struct CloudCacheClientOptions; -struct UpstreamCacheKey -{ - std::string Bucket; - IoHash Hash; -}; - -struct UpstreamPayloadKey -{ - UpstreamCacheKey CacheKey; - IoHash PayloadId; -}; - struct UpstreamCacheRecord { ZenContentType Type = ZenContentType::kBinary; - UpstreamCacheKey CacheKey; + CacheKey CacheKey; std::vector<IoHash> PayloadIds; }; @@ -98,6 +88,25 @@ struct UpstreamEndpointStats using OnCacheGetComplete = std::function<void(size_t, IoBuffer)>; +struct CacheRecordGetCompleteParams +{ + const CacheKey& CacheKey; + size_t KeyIndex = ~size_t(0); + const CbObjectView& Record; + const CbPackage& Package; +}; + +using OnCacheRecordGetComplete = std::function<void(CacheRecordGetCompleteParams&&)>; + +struct CachePayloadGetCompleteParams +{ + const CacheChunkRequest& Request; + size_t RequestIndex{~size_t(0)}; + IoBuffer Payload; +}; + +using OnCachePayloadGetComplete = std::function<void(CachePayloadGetCompleteParams&&)>; + /** * The upstream endpont is responsible for handling upload/downloading of cache records. */ @@ -114,9 +123,18 @@ public: virtual std::string_view DisplayName() const = 0; - virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0; + virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0; + + virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span<CacheKey> CacheKeys, + std::span<size_t> KeyIndex, + const CacheRecordPolicy& Policy, + OnCacheRecordGetComplete&& OnComplete) = 0; - virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0; + virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) = 0; + + virtual GetUpstreamCacheBatchResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCachePayloadGetComplete&& OnComplete) = 0; virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, @@ -137,17 +155,18 @@ public: virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0; - virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0; + virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0; - virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span<CacheKey> CacheKeys, - std::span<size_t> KeyIndex, - OnCacheGetComplete OnComplete) = 0; + virtual void GetCacheRecords(std::span<CacheKey> CacheKeys, + std::span<size_t> KeyIndex, + const CacheRecordPolicy& RecordPolicy, + OnCacheRecordGetComplete&& OnComplete) = 0; - virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0; + virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) = 0; - virtual GetUpstreamCacheBatchResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> RequestIndex, - OnCacheGetComplete OnComplete) = 0; + virtual void GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCachePayloadGetComplete&& OnComplete) = 0; struct EnqueueResult { diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index 14333f45a..d11058180 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -499,4 +499,33 @@ ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHa .Success = (Response.status_code == 200 || Response.status_code == 201)}; } +ZenCacheResult +ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request) +{ + ExtendableStringBuilder<256> Uri; + Uri << m_Client.ServiceUrl() << "/z$/$batch"; + + BinaryWriter Body; + Request.CopyTo(Body); + + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cbobject"}, {"Accept", "application/x-ue-cbpkg"}}); + Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Body.GetData()), Body.GetSize()}); + + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + } + + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + + return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; +} + } // namespace zen diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index 12e46bd8d..5efe19094 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -28,6 +28,8 @@ class logger; namespace zen { class CbObjectWriter; +class CbObjectView; +class CbPackage; class ZenStructuredCacheClient; /** Zen mesh tracker @@ -116,6 +118,7 @@ public: ZenCacheResult GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId); ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type); ZenCacheResult PutCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId, IoBuffer Payload); + ZenCacheResult InvokeRpc(const CbObjectView& Request); private: inline spdlog::logger& Log() { return m_Log; } |