diff options
| author | Martin Ridgers <[email protected]> | 2021-11-15 09:10:39 +0100 |
|---|---|---|
| committer | Martin Ridgers <[email protected]> | 2021-11-15 09:10:39 +0100 |
| commit | b258c117aba04c6a672fb87d07d126449d961a73 (patch) | |
| tree | 174ccc6a674a173f417debd31a11d32348f042c6 /zenserver/upstream/upstreamcache.cpp | |
| parent | Fixed up FileSystemTranersal visitor to use std::fs::path (diff) | |
| parent | Updated cache policy according to UE. (diff) | |
| download | zen-b258c117aba04c6a672fb87d07d126449d961a73.tar.xz zen-b258c117aba04c6a672fb87d07d126449d961a73.zip | |
Merged main
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 343 |
1 files changed, 333 insertions, 10 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 00555f2ce..ade71c5d2 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -70,7 +70,7 @@ namespace detail { virtual std::string_view DisplayName() const override { return m_DisplayName; } - virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override { try { @@ -144,12 +144,69 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKey> CacheKeys, + std::span<size_t> KeyIndex, + const CacheRecordPolicy& Policy, + OnCacheRecordGetComplete&& OnComplete) override + { + ZEN_UNUSED(Policy); + + CloudCacheSession Session(m_Client); + GetUpstreamCacheResult Result; + + for (size_t Index : KeyIndex) + { + const CacheKey& CacheKey = CacheKeys[Index]; + CbPackage Package; + CbObject Record; + + if (!Result.Error) + { + CloudCacheResult RefResult = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); + AppendResult(RefResult, Result); + + if (RefResult.ErrorCode == 0) + { + const CbValidateError ValidationResult = ValidateCompactBinary(RefResult.Response, CbValidateMode::All); + if (ValidationResult == CbValidateError::None) + { + Record = LoadCompactBinaryObject(RefResult.Response); + Record.IterateAttachments([this, &Session, &Result, &Package](CbFieldView AttachmentHash) { + CloudCacheResult BlobResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); + AppendResult(BlobResult, Result); + + if (BlobResult.ErrorCode == 0) + { + if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(BlobResult.Response))) + { + Package.AddAttachment(CbAttachment(Chunk)); + } + } + else + { + m_HealthOk = false; + } + }); + } + } + else + { + m_HealthOk = false; + } + } + + OnComplete({.CacheKey = CacheKey, .KeyIndex = Index, .Record = Record, .Package = Package}); + } + + return Result; + } + + virtual GetUpstreamCacheResult GetCachePayload(const CacheKey&, const IoHash& PayloadId) override { try { CloudCacheSession Session(m_Client); - const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); + const CloudCacheResult Result = Session.GetCompressedBlob(PayloadId); if (Result.ErrorCode == 0) { @@ -171,6 +228,33 @@ namespace detail { } } + virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCachePayloadGetComplete&& OnComplete) override final + { + CloudCacheSession Session(m_Client); + GetUpstreamCacheResult Result; + + for (size_t Index : RequestIndex) + { + const CacheChunkRequest& Request = CacheChunkRequests[Index]; + IoBuffer Payload; + + if (!Result.Error) + { + const CloudCacheResult BlobResult = Session.GetCompressedBlob(Request.ChunkId); + Payload = BlobResult.Response; + + AppendResult(BlobResult, Result); + m_HealthOk = BlobResult.ErrorCode == 0; + } + + OnComplete({.Request = Request, .RequestIndex = Index, .Payload = Payload}); + } + + return Result; + } + virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, std::span<IoBuffer const> Payloads) override @@ -323,6 +407,18 @@ namespace detail { virtual UpstreamEndpointStats& Stats() override { return m_Stats; } private: + static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out) + { + Out.Success &= Result.Success; + Out.Bytes += Result.Bytes; + Out.ElapsedSeconds += Result.ElapsedSeconds; + + if (Result.ErrorCode) + { + Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}; + } + }; + spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; @@ -419,7 +515,7 @@ namespace detail { virtual std::string_view DisplayName() const override { return m_DisplayName; } - virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override { try { @@ -446,13 +542,80 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKey> CacheKeys, + std::span<size_t> KeyIndex, + const CacheRecordPolicy& Policy, + OnCacheRecordGetComplete&& OnComplete) override + { + std::vector<size_t> IndexMap; + IndexMap.reserve(KeyIndex.size()); + + CbObjectWriter BatchRequest; + BatchRequest << "Method"sv + << "GetCacheRecords"; + + BatchRequest.BeginObject("Params"sv); + { + BatchRequest.BeginArray("CacheKeys"sv); + for (size_t Index : KeyIndex) + { + const CacheKey& Key = CacheKeys[Index]; + IndexMap.push_back(Index); + + BatchRequest.BeginObject(); + BatchRequest << "Bucket"sv << Key.Bucket; + BatchRequest << "Hash"sv << Key.Hash; + BatchRequest.EndObject(); + } + BatchRequest.EndArray(); + + BatchRequest.BeginObject("Policy"sv); + CacheRecordPolicy::Save(Policy, BatchRequest); + BatchRequest.EndObject(); + } + BatchRequest.EndObject(); + + CbPackage BatchResponse; + ZenCacheResult Result; + + { + ZenStructuredCacheSession Session(*m_Client); + Result = Session.InvokeRpc(BatchRequest.Save()); + } + + if (Result.Success) + { + if (BatchResponse.TryLoad(Result.Response)) + { + for (size_t LocalIndex = 0; CbFieldView Record : BatchResponse.GetObject()["Result"sv]) + { + const size_t Index = IndexMap[LocalIndex++]; + OnComplete( + {.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = Record.AsObjectView(), .Package = BatchResponse}); + } + + return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; + } + } + else if (Result.ErrorCode) + { + m_HealthOk = false; + } + + for (size_t Index : KeyIndex) + { + OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); + } + + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } + + virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override { try { ZenStructuredCacheSession Session(*m_Client); - const ZenCacheResult Result = - Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId); + const ZenCacheResult Result = Session.GetCachePayload(CacheKey.Bucket, CacheKey.Hash, PayloadId); if (Result.ErrorCode == 0) { @@ -474,6 +637,90 @@ namespace detail { } } + virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCachePayloadGetComplete&& OnComplete) override final + { + std::vector<size_t> IndexMap; + IndexMap.reserve(RequestIndex.size()); + + CbObjectWriter BatchRequest; + BatchRequest << "Method"sv + << "GetCachePayloads"; + + BatchRequest.BeginObject("Params"sv); + { + BatchRequest.BeginArray("ChunkRequests"sv); + { + for (size_t Index : RequestIndex) + { + const CacheChunkRequest& Request = CacheChunkRequests[Index]; + IndexMap.push_back(Index); + + BatchRequest.BeginObject(); + { + BatchRequest.BeginObject("Key"sv); + BatchRequest << "Bucket"sv << Request.Key.Bucket; + BatchRequest << "Hash"sv << Request.Key.Hash; + BatchRequest.EndObject(); + + BatchRequest.AddObjectId("PayloadId"sv, Request.PayloadId); + BatchRequest << "ChunkId"sv << Request.ChunkId; + BatchRequest << "RawOffset"sv << Request.RawOffset; + BatchRequest << "RawSize"sv << Request.RawSize; + BatchRequest << "Policy"sv << static_cast<uint32_t>(Request.Policy); + } + BatchRequest.EndObject(); + } + } + BatchRequest.EndArray(); + } + BatchRequest.EndObject(); + + CbPackage BatchResponse; + ZenCacheResult Result; + + { + ZenStructuredCacheSession Session(*m_Client); + Result = Session.InvokeRpc(BatchRequest.Save()); + } + + if (Result.Success) + { + if (BatchResponse.TryLoad(Result.Response)) + { + for (size_t LocalIndex = 0; CbFieldView AttachmentHash : BatchResponse.GetObject()["Result"sv]) + { + const size_t Index = IndexMap[LocalIndex++]; + IoBuffer Payload; + + if (const CbAttachment* Attachment = BatchResponse.FindAttachment(AttachmentHash.AsHash())) + { + if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary()) + { + Payload = Compressed.GetCompressed().Flatten().AsIoBuffer(); + } + } + + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = std::move(Payload)}); + } + + return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; + } + } + else if (Result.ErrorCode) + { + m_HealthOk = false; + } + + for (size_t Index : RequestIndex) + { + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); + } + + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } + virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, std::span<IoBuffer const> Payloads) override @@ -758,7 +1005,7 @@ public: virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); } - virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override { if (m_Options.ReadUpstream) { @@ -780,7 +1027,83 @@ public: return {}; } - virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + virtual void GetCacheRecords(std::span<CacheKey> CacheKeys, + std::span<size_t> KeyIndex, + const CacheRecordPolicy& Policy, + OnCacheRecordGetComplete&& OnComplete) override final + { + std::vector<size_t> MissingKeys(KeyIndex.begin(), KeyIndex.end()); + + if (m_Options.ReadUpstream) + { + for (auto& Endpoint : m_Endpoints) + { + if (Endpoint->IsHealthy() && !MissingKeys.empty()) + { + std::vector<size_t> Missing; + + auto Result = Endpoint->GetCacheRecords(CacheKeys, MissingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) { + if (Params.Record) + { + OnComplete(std::forward<CacheRecordGetCompleteParams>(Params)); + } + else + { + Missing.push_back(Params.KeyIndex); + } + }); + + m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + MissingKeys = std::move(Missing); + } + } + } + + for (size_t Index : MissingKeys) + { + OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); + } + } + + virtual void GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCachePayloadGetComplete&& OnComplete) override final + { + std::vector<size_t> MissingPayloads(RequestIndex.begin(), RequestIndex.end()); + + if (m_Options.ReadUpstream) + { + for (auto& Endpoint : m_Endpoints) + { + if (Endpoint->IsHealthy() && !MissingPayloads.empty()) + { + std::vector<size_t> Missing; + + auto Result = + Endpoint->GetCachePayloads(CacheChunkRequests, MissingPayloads, [&](CachePayloadGetCompleteParams&& Params) { + if (Params.Payload) + { + OnComplete(std::forward<CachePayloadGetCompleteParams>(Params)); + } + else + { + Missing.push_back(Params.RequestIndex); + } + }); + + m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + MissingPayloads = std::move(Missing); + } + } + } + + for (size_t Index : MissingPayloads) + { + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); + } + } + + virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override { if (m_Options.ReadUpstream) { @@ -788,7 +1111,7 @@ public: { if (Endpoint->IsHealthy()) { - const GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); + const GetUpstreamCacheResult Result = Endpoint->GetCachePayload(CacheKey, PayloadId); m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); if (Result.Success) |