diff options
| author | Stefan Boberg <[email protected]> | 2021-11-18 14:33:44 +0100 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-11-18 14:33:44 +0100 |
| commit | e53df312f3c4dcef19add9cd26afc324557b1f5a (patch) | |
| tree | a3d7b59f29e484d48edffb2a26bbb0dd2d95533d /zenserver/upstream/upstreamcache.cpp | |
| parent | gc: implemented timestamped snapshot persistence (diff) | |
| parent | Change error code for failed upsteam apply (diff) | |
| download | zen-e53df312f3c4dcef19add9cd26afc324557b1f5a.tar.xz zen-e53df312f3c4dcef19add9cd26afc324557b1f5a.zip | |
merge from main
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 678 |
1 files changed, 537 insertions, 141 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 168449d05..e2dc09872 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -40,13 +40,15 @@ namespace detail { : m_Log(zen::logging::Get("upstream")) , m_UseLegacyDdc(Options.UseLegacyDdc) { - using namespace fmt::literals; - m_DisplayName = "Jupiter - '{}'"_format(Options.ServiceUrl); - m_Client = new CloudCacheClient(Options); + m_Info.Name = "Horde"sv; + m_Info.Url = Options.ServiceUrl; + m_Client = new CloudCacheClient(Options); } virtual ~JupiterUpstreamEndpoint() = default; + virtual const UpstreamEndpointInfo& GetEndpointInfo() const { return m_Info; } + virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); } virtual bool IsHealthy() const override { return m_HealthOk.load(); } @@ -58,7 +60,7 @@ namespace detail { CloudCacheSession Session(m_Client); const CloudCacheResult Result = Session.Authenticate(); - m_HealthOk = Result.ErrorCode == 0; + m_HealthOk = Result.Success && Result.ErrorCode == 0; return {.Reason = std::move(Result.Reason), .Ok = Result.Success}; } @@ -68,9 +70,7 @@ namespace detail { } } - virtual std::string_view DisplayName() const override { return m_DisplayName; } - - virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override { try { @@ -144,12 +144,69 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKey> CacheKeys, + std::span<size_t> KeyIndex, + const CacheRecordPolicy& Policy, + OnCacheRecordGetComplete&& OnComplete) override + { + ZEN_UNUSED(Policy); + + CloudCacheSession Session(m_Client); + GetUpstreamCacheResult Result; + + for (size_t Index : KeyIndex) + { + const CacheKey& CacheKey = CacheKeys[Index]; + CbPackage Package; + CbObject Record; + + if (!Result.Error) + { + CloudCacheResult RefResult = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); + AppendResult(RefResult, Result); + + if (RefResult.ErrorCode == 0) + { + const CbValidateError ValidationResult = ValidateCompactBinary(RefResult.Response, CbValidateMode::All); + if (ValidationResult == CbValidateError::None) + { + Record = LoadCompactBinaryObject(RefResult.Response); + Record.IterateAttachments([this, &Session, &Result, &Package](CbFieldView AttachmentHash) { + CloudCacheResult BlobResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); + AppendResult(BlobResult, Result); + + if (BlobResult.ErrorCode == 0) + { + if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(BlobResult.Response))) + { + Package.AddAttachment(CbAttachment(Chunk)); + } + } + else + { + m_HealthOk = false; + } + }); + } + } + else + { + m_HealthOk = false; + } + } + + OnComplete({.CacheKey = CacheKey, .KeyIndex = Index, .Record = Record, .Package = Package}); + } + + return Result; + } + + virtual GetUpstreamCacheResult GetCachePayload(const CacheKey&, const IoHash& PayloadId) override { try { CloudCacheSession Session(m_Client); - const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); + const CloudCacheResult Result = Session.GetCompressedBlob(PayloadId); if (Result.ErrorCode == 0) { @@ -171,12 +228,41 @@ namespace detail { } } + virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCachePayloadGetComplete&& OnComplete) override final + { + CloudCacheSession Session(m_Client); + GetUpstreamCacheResult Result; + + for (size_t Index : RequestIndex) + { + const CacheChunkRequest& Request = CacheChunkRequests[Index]; + IoBuffer Payload; + + if (!Result.Error) + { + const CloudCacheResult BlobResult = Session.GetCompressedBlob(Request.ChunkId); + Payload = BlobResult.Response; + + AppendResult(BlobResult, Result); + m_HealthOk = BlobResult.ErrorCode == 0; + } + + OnComplete({.Request = Request, .RequestIndex = Index, .Payload = Payload}); + } + + return Result; + } + virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, std::span<IoBuffer const> Payloads) override { + using namespace fmt::literals; + ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); - const uint32_t MaxAttempts = 3; + const int32_t MaxAttempts = 3; try { @@ -200,125 +286,132 @@ namespace detail { } } - return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; + m_HealthOk = Result.ErrorCode == 0; + + return {.Reason = std::move(Result.Reason), + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; } else { - bool Success = false; int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; - for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) - { - Success = false; - for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++) + const auto PutBlobs = [&](std::span<IoHash> PayloadIds, std::string& OutReason) -> bool { + for (const IoHash& PayloadId : PayloadIds) { - if (CloudCacheResult Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); - Result.Success) + const auto It = std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), PayloadId); + + if (It == std::end(CacheRecord.PayloadIds)) { - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; - Success = true; - break; + OutReason = "payload '{}' MISSING from local cache"_format(PayloadId); + return false; } - } - if (!Success) - { - return {.Reason = "Failed to upload payload", - .Bytes = TotalBytes, - .ElapsedSeconds = TotalElapsedSeconds, - .Success = false}; + const size_t Idx = std::distance(std::begin(CacheRecord.PayloadIds), It); + + CloudCacheResult BlobResult; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) + { + BlobResult = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); + } + + m_HealthOk = BlobResult.ErrorCode == 0; + + if (!BlobResult.Success) + { + OutReason = "upload payload '{}' FAILED, reason '{}'"_format(PayloadId, BlobResult.Reason); + return false; + } + + TotalBytes += BlobResult.Bytes; + TotalElapsedSeconds += BlobResult.ElapsedSeconds; } + + return true; + }; + + PutRefResult RefResult; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++) + { + RefResult = + Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, ZenContentType::kCbObject); } - Success = false; - for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++) + m_HealthOk = RefResult.ErrorCode == 0; + + if (!RefResult.Success) { - if (PutRefResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - RecordValue, - ZenContentType::kCbObject); - Result.Success) - { - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; - Success = true; + return {.Reason = "upload cache record '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + RefResult.Reason), + .Success = false}; + } - if (!Result.Needs.empty()) - { - for (const IoHash& NeededHash : Result.Needs) - { - Success = false; + TotalBytes += RefResult.Bytes; + TotalElapsedSeconds += RefResult.ElapsedSeconds; - if (auto It = - std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), NeededHash); - It != std::end(CacheRecord.PayloadIds)) - { - const size_t Idx = It - std::begin(CacheRecord.PayloadIds); - - if (CloudCacheResult BlobResult = - Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); - BlobResult.Success) - { - TotalBytes += BlobResult.Bytes; - TotalElapsedSeconds += BlobResult.ElapsedSeconds; - Success = true; - } - else - { - ZEN_WARN("upload missing payload '{}/{}/{}' FAILED", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - NeededHash); - } - } - else - { - ZEN_WARN("needed payload '{}/{}/{}' MISSING", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - NeededHash); - } - } + std::string Reason; + if (!PutBlobs(RefResult.Needs, Reason)) + { + return {.Reason = std::move(Reason), .Success = false}; + } - const IoHash RefHash = IoHash::HashBuffer(RecordValue); + const IoHash RefHash = IoHash::HashBuffer(RecordValue); + FinalizeRefResult FinalizeResult = Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); + m_HealthOk = FinalizeResult.ErrorCode == 0; - if (FinalizeRefResult FinalizeResult = - Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); - FinalizeResult.Success) - { - TotalBytes += FinalizeResult.Bytes; - TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; - Success = true; + if (!FinalizeResult.Success) + { + return {.Reason = "finalize cache record '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + FinalizeResult.Reason), + .Success = false}; + } - for (const IoHash& MissingHash : FinalizeResult.Needs) - { - ZEN_WARN("finalize '{}/{}' FAILED, missing '{}'", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - MissingHash); - } - } - else - { - ZEN_WARN("finalize '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash); - Success = false; - } - } + if (!FinalizeResult.Needs.empty()) + { + if (!PutBlobs(FinalizeResult.Needs, Reason)) + { + return {.Reason = std::move(Reason), .Success = false}; + } - if (Success) + FinalizeResult = Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); + m_HealthOk = FinalizeResult.ErrorCode == 0; + + if (!FinalizeResult.Success) + { + return {.Reason = "finalize '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + FinalizeResult.Reason), + .Success = false}; + } + + if (!FinalizeResult.Needs.empty()) + { + ExtendableStringBuilder<256> Sb; + for (const IoHash& MissingHash : FinalizeResult.Needs) { - break; + Sb << MissingHash.ToHexString() << ","; } + + return {.Reason = "finalize '{}/{}' FAILED, still needs payload(s) '{}'"_format(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + Sb.ToString()), + .Success = false}; } } - return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Success}; + TotalBytes += FinalizeResult.Bytes; + TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; + + return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true}; } } catch (std::exception& Err) { + m_HealthOk = false; return {.Reason = std::string(Err.what()), .Success = false}; } } @@ -326,9 +419,22 @@ namespace detail { virtual UpstreamEndpointStats& Stats() override { return m_Stats; } private: + static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out) + { + Out.Success &= Result.Success; + Out.Bytes += Result.Bytes; + Out.ElapsedSeconds += Result.ElapsedSeconds; + + if (Result.ErrorCode) + { + Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}; + } + }; + spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; + UpstreamEndpointInfo m_Info; bool m_UseLegacyDdc; std::string m_DisplayName; RefPtr<CloudCacheClient> m_Client; @@ -349,9 +455,13 @@ namespace detail { }; public: - ZenUpstreamEndpoint(std::span<std::string const> Urls) : m_Log(zen::logging::Get("upstream")), m_DisplayName("ZEN") + ZenUpstreamEndpoint(const ZenStructuredCacheClientOptions& Options) + : m_Log(zen::logging::Get("upstream")) + , m_Info({.Name = std::string("Zen")}) + , m_ConnectTimeout(Options.ConnectTimeout) + , m_Timeout(Options.Timeout) { - for (const auto& Url : Urls) + for (const auto& Url : Options.Urls) { m_Endpoints.push_back({.Url = Url}); } @@ -359,6 +469,8 @@ namespace detail { ~ZenUpstreamEndpoint() = default; + virtual const UpstreamEndpointInfo& GetEndpointInfo() const { return m_Info; } + virtual UpstreamEndpointHealth Initialize() override { using namespace fmt::literals; @@ -366,9 +478,8 @@ namespace detail { const ZenEndpoint& Ep = GetEndpoint(); if (Ep.Ok) { - m_ServiceUrl = Ep.Url; - m_DisplayName = "ZEN - {}"_format(m_ServiceUrl); - m_Client = new ZenStructuredCacheClient(m_ServiceUrl); + m_Info.Url = Ep.Url; + m_Client = new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout}); m_HealthOk = true; return {.Ok = true}; @@ -391,9 +502,9 @@ namespace detail { const ZenEndpoint& Ep = GetEndpoint(); if (Ep.Ok) { - m_ServiceUrl = Ep.Url; - m_DisplayName = "ZEN - {}"_format(m_ServiceUrl); - m_Client = new ZenStructuredCacheClient(m_ServiceUrl); + m_Info.Url = Ep.Url; + m_Client = + new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout}); m_HealthOk = true; return {.Ok = true}; @@ -420,9 +531,7 @@ namespace detail { } } - virtual std::string_view DisplayName() const override { return m_DisplayName; } - - virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override { try { @@ -449,13 +558,80 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKey> CacheKeys, + std::span<size_t> KeyIndex, + const CacheRecordPolicy& Policy, + OnCacheRecordGetComplete&& OnComplete) override + { + std::vector<size_t> IndexMap; + IndexMap.reserve(KeyIndex.size()); + + CbObjectWriter BatchRequest; + BatchRequest << "Method"sv + << "GetCacheRecords"; + + BatchRequest.BeginObject("Params"sv); + { + BatchRequest.BeginArray("CacheKeys"sv); + for (size_t Index : KeyIndex) + { + const CacheKey& Key = CacheKeys[Index]; + IndexMap.push_back(Index); + + BatchRequest.BeginObject(); + BatchRequest << "Bucket"sv << Key.Bucket; + BatchRequest << "Hash"sv << Key.Hash; + BatchRequest.EndObject(); + } + BatchRequest.EndArray(); + + BatchRequest.BeginObject("Policy"sv); + CacheRecordPolicy::Save(Policy, BatchRequest); + BatchRequest.EndObject(); + } + BatchRequest.EndObject(); + + CbPackage BatchResponse; + ZenCacheResult Result; + + { + ZenStructuredCacheSession Session(*m_Client); + Result = Session.InvokeRpc(BatchRequest.Save()); + } + + if (Result.Success) + { + if (BatchResponse.TryLoad(Result.Response)) + { + for (size_t LocalIndex = 0; CbFieldView Record : BatchResponse.GetObject()["Result"sv]) + { + const size_t Index = IndexMap[LocalIndex++]; + OnComplete( + {.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = Record.AsObjectView(), .Package = BatchResponse}); + } + + return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; + } + } + else if (Result.ErrorCode) + { + m_HealthOk = false; + } + + for (size_t Index : KeyIndex) + { + OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); + } + + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } + + virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override { try { ZenStructuredCacheSession Session(*m_Client); - const ZenCacheResult Result = - Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId); + const ZenCacheResult Result = Session.GetCachePayload(CacheKey.Bucket, CacheKey.Hash, PayloadId); if (Result.ErrorCode == 0) { @@ -477,12 +653,96 @@ namespace detail { } } + virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCachePayloadGetComplete&& OnComplete) override final + { + std::vector<size_t> IndexMap; + IndexMap.reserve(RequestIndex.size()); + + CbObjectWriter BatchRequest; + BatchRequest << "Method"sv + << "GetCachePayloads"; + + BatchRequest.BeginObject("Params"sv); + { + BatchRequest.BeginArray("ChunkRequests"sv); + { + for (size_t Index : RequestIndex) + { + const CacheChunkRequest& Request = CacheChunkRequests[Index]; + IndexMap.push_back(Index); + + BatchRequest.BeginObject(); + { + BatchRequest.BeginObject("Key"sv); + BatchRequest << "Bucket"sv << Request.Key.Bucket; + BatchRequest << "Hash"sv << Request.Key.Hash; + BatchRequest.EndObject(); + + BatchRequest.AddObjectId("PayloadId"sv, Request.PayloadId); + BatchRequest << "ChunkId"sv << Request.ChunkId; + BatchRequest << "RawOffset"sv << Request.RawOffset; + BatchRequest << "RawSize"sv << Request.RawSize; + BatchRequest << "Policy"sv << static_cast<uint32_t>(Request.Policy); + } + BatchRequest.EndObject(); + } + } + BatchRequest.EndArray(); + } + BatchRequest.EndObject(); + + CbPackage BatchResponse; + ZenCacheResult Result; + + { + ZenStructuredCacheSession Session(*m_Client); + Result = Session.InvokeRpc(BatchRequest.Save()); + } + + if (Result.Success) + { + if (BatchResponse.TryLoad(Result.Response)) + { + for (size_t LocalIndex = 0; CbFieldView AttachmentHash : BatchResponse.GetObject()["Result"sv]) + { + const size_t Index = IndexMap[LocalIndex++]; + IoBuffer Payload; + + if (const CbAttachment* Attachment = BatchResponse.FindAttachment(AttachmentHash.AsHash())) + { + if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary()) + { + Payload = Compressed.GetCompressed().Flatten().AsIoBuffer(); + } + } + + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = std::move(Payload)}); + } + + return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; + } + } + else if (Result.ErrorCode) + { + m_HealthOk = false; + } + + for (size_t Index : RequestIndex) + { + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); + } + + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } + virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, std::span<IoBuffer const> Payloads) override { ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); - const uint32_t MaxAttempts = 3; + const int32_t MaxAttempts = 3; try { @@ -565,12 +825,15 @@ namespace detail { TotalElapsedSeconds += Result.ElapsedSeconds; } - return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success}; + return {.Reason = std::move(Result.Reason), + .Bytes = TotalBytes, + .ElapsedSeconds = TotalElapsedSeconds, + .Success = Result.Success}; } - catch (std::exception& e) + catch (std::exception& Err) { m_HealthOk = false; - return {.Reason = std::string(e.what()), .Success = false}; + return {.Reason = std::string(Err.what()), .Success = false}; } } @@ -581,7 +844,7 @@ namespace detail { { for (ZenEndpoint& Ep : m_Endpoints) { - ZenStructuredCacheClient Client(Ep.Url); + ZenStructuredCacheClient Client({.Url = Ep.Url, .ConnectTimeout = std::chrono::milliseconds(1000)}); ZenStructuredCacheSession Session(Client); const int32_t SampleCount = 2; @@ -602,7 +865,7 @@ namespace detail { for (const auto& Ep : m_Endpoints) { - ZEN_INFO("ping ZEN endpoint '{}' latency '{:.3}s' {}", Ep.Url, Ep.Latency, Ep.Ok ? "OK" : Ep.Reason); + ZEN_INFO("ping 'Zen' endpoint '{}' latency '{:.3}s' {}", Ep.Url, Ep.Latency, Ep.Ok ? "OK" : Ep.Reason); } return m_Endpoints.front(); @@ -611,9 +874,10 @@ namespace detail { spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; - std::string m_ServiceUrl; + UpstreamEndpointInfo m_Info; std::vector<ZenEndpoint> m_Endpoints; - std::string m_DisplayName; + std::chrono::milliseconds m_ConnectTimeout; + std::chrono::milliseconds m_Timeout; RefPtr<ZenStructuredCacheClient> m_Client; UpstreamEndpointStats m_Stats; std::atomic_bool m_HealthOk{false}; @@ -700,7 +964,7 @@ struct UpstreamStats const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0; Logger.debug("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", - Ep->DisplayName(), + Ep->GetEndpointInfo().Name, HitRate, DownBytes, DownSpeed, @@ -734,13 +998,15 @@ public: for (auto& Endpoint : m_Endpoints) { const UpstreamEndpointHealth Health = Endpoint->Initialize(); + const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo(); + if (Health.Ok) { - ZEN_INFO("initialize endpoint '{}' OK", Endpoint->DisplayName()); + ZEN_INFO("'{}' endpoint '{}' OK", Info.Name, Info.Url); } else { - ZEN_WARN("initialize endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); + ZEN_WARN("'{}' endpoint '{}' FAILED, reason '{}'", Info.Name, Info.Url, Health.Reason); } } @@ -761,7 +1027,7 @@ public: virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); } - virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override { if (m_Options.ReadUpstream) { @@ -776,6 +1042,14 @@ public: { return Result; } + + if (Result.Error) + { + ZEN_ERROR("get cache record FAILED, endpoint '{}', reason '{}', error code '{}'", + Endpoint->GetEndpointInfo().Url, + Result.Error.Reason, + Result.Error.ErrorCode); + } } } } @@ -783,7 +1057,99 @@ public: return {}; } - virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + virtual void GetCacheRecords(std::span<CacheKey> CacheKeys, + std::span<size_t> KeyIndex, + const CacheRecordPolicy& Policy, + OnCacheRecordGetComplete&& OnComplete) override final + { + std::vector<size_t> MissingKeys(KeyIndex.begin(), KeyIndex.end()); + + if (m_Options.ReadUpstream) + { + for (auto& Endpoint : m_Endpoints) + { + if (Endpoint->IsHealthy() && !MissingKeys.empty()) + { + std::vector<size_t> Missing; + + auto Result = Endpoint->GetCacheRecords(CacheKeys, MissingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) { + if (Params.Record) + { + OnComplete(std::forward<CacheRecordGetCompleteParams>(Params)); + } + else + { + Missing.push_back(Params.KeyIndex); + } + }); + + if (Result.Error) + { + ZEN_ERROR("get cache record(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", + Endpoint->GetEndpointInfo().Url, + Result.Error.Reason, + Result.Error.ErrorCode); + } + + m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + MissingKeys = std::move(Missing); + } + } + } + + for (size_t Index : MissingKeys) + { + OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); + } + } + + virtual void GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCachePayloadGetComplete&& OnComplete) override final + { + std::vector<size_t> MissingPayloads(RequestIndex.begin(), RequestIndex.end()); + + if (m_Options.ReadUpstream) + { + for (auto& Endpoint : m_Endpoints) + { + if (Endpoint->IsHealthy() && !MissingPayloads.empty()) + { + std::vector<size_t> Missing; + + auto Result = + Endpoint->GetCachePayloads(CacheChunkRequests, MissingPayloads, [&](CachePayloadGetCompleteParams&& Params) { + if (Params.Payload) + { + OnComplete(std::forward<CachePayloadGetCompleteParams>(Params)); + } + else + { + Missing.push_back(Params.RequestIndex); + } + }); + + if (Result.Error) + { + ZEN_ERROR("get cache payloads(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", + Endpoint->GetEndpointInfo().Url, + Result.Error.Reason, + Result.Error.ErrorCode); + } + + m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + MissingPayloads = std::move(Missing); + } + } + } + + for (size_t Index : MissingPayloads) + { + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); + } + } + + virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override { if (m_Options.ReadUpstream) { @@ -791,13 +1157,21 @@ public: { if (Endpoint->IsHealthy()) { - const GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); + const GetUpstreamCacheResult Result = Endpoint->GetCachePayload(CacheKey, PayloadId); m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); if (Result.Success) { return Result; } + + if (Result.Error) + { + ZEN_ERROR("get cache payload FAILED, endpoint '{}', reason '{}', error code '{}'", + Endpoint->GetEndpointInfo().Url, + Result.Error.Reason, + Result.Error.ErrorCode); + } } } } @@ -834,8 +1208,10 @@ public: Status.BeginArray("endpoints"); for (const auto& Ep : m_Endpoints) { + const UpstreamEndpointInfo& Info = Ep->GetEndpointInfo(); Status.BeginObject(); - Status << "name" << Ep->DisplayName(); + Status << "name" << Info.Name; + Status << "url" << Info.Url; Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv); UpstreamEndpointStats& Stats = Ep->Stats(); @@ -890,6 +1266,15 @@ private: { const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + + if (!Result.Success) + { + ZEN_WARN("upload cache record '{}/{}' FAILED, endpoint '{}', reason '{}'", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + Endpoint->GetEndpointInfo().Url, + Result.Reason); + } } } } @@ -905,9 +1290,12 @@ private: { ProcessCacheRecord(std::move(CacheRecord)); } - catch (std::exception& e) + catch (std::exception& Err) { - ZEN_WARN("process upstream ({}/{}) FAILED '{}'", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, e.what()); + ZEN_ERROR("upload cache record '{}/{}' FAILED, reason '{}'", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + Err.what()); } } @@ -930,20 +1318,28 @@ private: } } - for (auto& Endpoint : m_Endpoints) + try { - if (!Endpoint->IsHealthy()) + for (auto& Endpoint : m_Endpoints) { - if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok) - { - ZEN_INFO("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason); - } - else + if (!Endpoint->IsHealthy()) { - ZEN_WARN("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); + const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo(); + if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok) + { + ZEN_INFO("health check endpoint '{} - {}' OK", Info.Name, Info.Url, Health.Reason); + } + else + { + ZEN_WARN("health check endpoint '{} - {}' FAILED, reason '{}'", Info.Name, Info.Url, Health.Reason); + } } } } + catch (std::exception& Err) + { + ZEN_ERROR("check endpoint(s) health FAILED, reason '{}'", Err.what()); + } } } @@ -1015,9 +1411,9 @@ MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) } std::unique_ptr<UpstreamEndpoint> -MakeZenUpstreamEndpoint(std::span<std::string const> Urls) +MakeZenUpstreamEndpoint(const ZenStructuredCacheClientOptions& Options) { - return std::make_unique<detail::ZenUpstreamEndpoint>(Urls); + return std::make_unique<detail::ZenUpstreamEndpoint>(Options); } } // namespace zen |