diff options
| author | Per Larsson <[email protected]> | 2021-11-11 13:28:52 +0100 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-11-11 13:28:52 +0100 |
| commit | 7940ae2ff24e1b708d1d6a0bccf266213ea7316b (patch) | |
| tree | 7d9863b3a89e8567b34d28205f25b93b5091dd2f /zenserver/upstream/upstreamcache.cpp | |
| parent | Removed batch result. (diff) | |
| download | zen-7940ae2ff24e1b708d1d6a0bccf266213ea7316b.tar.xz zen-7940ae2ff24e1b708d1d6a0bccf266213ea7316b.zip | |
Fixed stats.
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 211 |
1 files changed, 119 insertions, 92 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index d5414daf1..b486b751c 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -151,43 +151,54 @@ namespace detail { { ZEN_UNUSED(Policy); - CloudCacheSession Session(m_Client); + CloudCacheSession Session(m_Client); + GetUpstreamCacheResult Result; for (size_t Index : KeyIndex) { - const CacheKey& CacheKey = CacheKeys[Index]; - CloudCacheResult Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); + const CacheKey& CacheKey = CacheKeys[Index]; + CbPackage Package; + CbObjectView Record; - CbPackage Package; - CbObjectView Record; - - if (Result.ErrorCode == 0) + if (!Result.Error) { - const CbValidateError ValidationResult = ValidateCompactBinary(Result.Response, CbValidateMode::All); - if (ValidationResult == CbValidateError::None) + CloudCacheResult RefResult = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); + AppendResult(RefResult, Result); + + if (RefResult.ErrorCode == 0) { - Record = CbObjectView(Result.Response.GetData()); - Record.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) { - CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); - if (AttachmentResult.Success) - { - if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response))) + const CbValidateError ValidationResult = ValidateCompactBinary(RefResult.Response, CbValidateMode::All); + if (ValidationResult == CbValidateError::None) + { + Record = CbObjectView(RefResult.Response.GetData()); + Record.IterateAttachments([this, &Session, &Result, &Package](CbFieldView AttachmentHash) { + CloudCacheResult BlobResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); + AppendResult(BlobResult, Result); + + if (BlobResult.ErrorCode == 0) { - Package.AddAttachment(CbAttachment(Chunk)); + if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(BlobResult.Response))) + { + Package.AddAttachment(CbAttachment(Chunk)); + } } - } - }); + else + { + m_HealthOk = false; + } + }); + } + } + else + { + m_HealthOk = false; } - } - else - { - m_HealthOk = false; } OnComplete({.CacheKey = CacheKey, .KeyIndex = Index, .Record = Record, .Package = Package}); } - return {}; + return Result; } virtual GetUpstreamCacheResult GetCachePayload(const CacheKey&, const IoHash& PayloadId) override @@ -221,23 +232,27 @@ namespace detail { std::span<size_t> RequestIndex, OnCachePayloadGetComplete&& OnComplete) override final { - CloudCacheSession Session(m_Client); + CloudCacheSession Session(m_Client); + GetUpstreamCacheResult Result; for (size_t Index : RequestIndex) { const CacheChunkRequest& Request = CacheChunkRequests[Index]; - const CloudCacheResult Result = Session.GetCompressedBlob(Request.ChunkId); - - OnComplete({.Request = Request, .RequestIndex = Index, .Payload = Result.Response}); + IoBuffer Payload; - if (Result.ErrorCode) + if (!Result.Error) { - m_HealthOk = false; - break; + const CloudCacheResult BlobResult = Session.GetCompressedBlob(Request.ChunkId); + Payload = BlobResult.Response; + + AppendResult(BlobResult, Result); + m_HealthOk = BlobResult.ErrorCode == 0; } + + OnComplete({.Request = Request, .RequestIndex = Index, .Payload = Payload}); } - return {}; + return Result; } virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, @@ -392,6 +407,18 @@ namespace detail { virtual UpstreamEndpointStats& Stats() override { return m_Stats; } private: + static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out) + { + Out.Success &= Result.Success; + Out.Bytes += Result.Bytes; + Out.ElapsedSeconds += Result.ElapsedSeconds; + + if (Result.ErrorCode) + { + Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}; + } + }; + spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; @@ -548,39 +575,39 @@ namespace detail { } BatchRequest.EndObject(); - CbPackage BatchResponse; - bool Success = false; + CbPackage BatchResponse; + ZenCacheResult Result; { ZenStructuredCacheSession Session(*m_Client); - ZenCacheResult Result = Session.InvokeRpc(BatchRequest.Save()); - if (Result.Success) - { - Success = BatchResponse.TryLoad(Result.Response); - } - else if (Result.ErrorCode) - { - Success = m_HealthOk = false; - } + Result = Session.InvokeRpc(BatchRequest.Save()); } - if (!Success) + if (Result.Success) { - for (size_t Index : KeyIndex) + if (BatchResponse.TryLoad(Result.Response)) { - OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); - } + for (size_t LocalIndex = 0; CbFieldView Record : BatchResponse.GetObject()["Result"sv]) + { + const size_t Index = IndexMap[LocalIndex++]; + OnComplete( + {.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = Record.AsObjectView(), .Package = BatchResponse}); + } - return {}; + return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; + } + } + else if (Result.ErrorCode) + { + m_HealthOk = false; } - for (size_t LocalIndex = 0; CbFieldView Record : BatchResponse.GetObject()["Result"sv]) + for (size_t Index : KeyIndex) { - const size_t Index = IndexMap[LocalIndex++]; - OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = Record.AsObjectView(), .Package = BatchResponse}); + OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); } - return {}; + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override @@ -650,49 +677,48 @@ namespace detail { } BatchRequest.EndObject(); - CbPackage BatchResponse; - bool Success = false; + CbPackage BatchResponse; + ZenCacheResult Result; { ZenStructuredCacheSession Session(*m_Client); - ZenCacheResult Result = Session.InvokeRpc(BatchRequest.Save()); - if (Result.Success) - { - Success = BatchResponse.TryLoad(Result.Response); - } - else if (Result.ErrorCode) - { - m_HealthOk = false; - } + Result = Session.InvokeRpc(BatchRequest.Save()); } - if (!Success) + if (Result.Success) { - for (size_t Index : RequestIndex) + if (BatchResponse.TryLoad(Result.Response)) { - OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); - } - - return {}; - } + for (size_t LocalIndex = 0; CbFieldView AttachmentHash : BatchResponse.GetObject()["Result"sv]) + { + const size_t Index = IndexMap[LocalIndex++]; + IoBuffer Payload; - for (int32_t LocalIndex = 0; CbFieldView AttachmentHash : BatchResponse.GetObject()["Result"sv]) - { - const size_t Index = IndexMap[LocalIndex++]; - IoBuffer Payload; + if (const CbAttachment* Attachment = BatchResponse.FindAttachment(AttachmentHash.AsHash())) + { + if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary()) + { + Payload = Compressed.GetCompressed().Flatten().AsIoBuffer(); + } + } - if (const CbAttachment* Attachment = BatchResponse.FindAttachment(AttachmentHash.AsHash())) - { - if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary()) - { - Payload = Compressed.GetCompressed().Flatten().AsIoBuffer(); + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = std::move(Payload)}); } + + return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; } + } + else if (Result.ErrorCode) + { + m_HealthOk = false; + } - OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = std::move(Payload)}); + for (size_t Index : RequestIndex) + { + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); } - return {}; + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, @@ -1016,18 +1042,18 @@ public: { std::vector<size_t> Missing; - auto EndpointResult = - Endpoint->GetCacheRecords(CacheKeys, MissingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) { - if (Params.Record) - { - OnComplete(std::forward<CacheRecordGetCompleteParams>(Params)); - } - else - { - Missing.push_back(Params.KeyIndex); - } - }); + auto Result = Endpoint->GetCacheRecords(CacheKeys, MissingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) { + if (Params.Record) + { + OnComplete(std::forward<CacheRecordGetCompleteParams>(Params)); + } + else + { + Missing.push_back(Params.KeyIndex); + } + }); + m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); MissingKeys = std::move(Missing); } } @@ -1053,7 +1079,7 @@ public: { std::vector<size_t> Missing; - auto EndpointResult = + auto Result = Endpoint->GetCachePayloads(CacheChunkRequests, MissingPayloads, [&](CachePayloadGetCompleteParams&& Params) { if (Params.Payload) { @@ -1065,6 +1091,7 @@ public: } }); + m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); MissingPayloads = std::move(Missing); } } |