From 934881f5572778e17b973d29dfb0ad760f29025c Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Mon, 1 Nov 2021 14:36:28 +0100 Subject: Upload cache record before blobs and call finalize when processing upstream to Jupiter. --- zenserver/upstream/upstreamcache.cpp | 195 ++++++++++++++++++----------------- 1 file changed, 102 insertions(+), 93 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 168449d05..c0cd858b6 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -175,8 +175,10 @@ namespace detail { IoBuffer RecordValue, std::span Payloads) override { + using namespace fmt::literals; + ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); - const uint32_t MaxAttempts = 3; + const size_t MaxAttempts = 3; try { @@ -204,117 +206,112 @@ namespace detail { } else { - bool Success = false; int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; - for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) - { - Success = false; - for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++) + const auto PutBlobs = [&](std::span PayloadIds, std::string& OutReason) -> bool { + for (const IoHash& PayloadId : PayloadIds) { - if (CloudCacheResult Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); - Result.Success) + const auto It = std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), PayloadId); + + if (It == std::end(CacheRecord.PayloadIds)) { - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; - Success = true; - break; + OutReason = "payload '{}' MISSING from local cache"_format(PayloadId); + return false; } - } - if (!Success) - { - return {.Reason = "Failed to upload payload", - .Bytes = TotalBytes, - .ElapsedSeconds = TotalElapsedSeconds, - .Success = false}; + const size_t Idx = std::distance(std::begin(CacheRecord.PayloadIds), It); + + CloudCacheResult BlobResult; + for (size_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) + { + BlobResult = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); + } + + if (!BlobResult.Success) + { + OutReason = "upload payload '{}' FAILED, reason '{}'"_format(PayloadId, BlobResult.Reason); + return false; + } + + TotalBytes += BlobResult.Bytes; + TotalElapsedSeconds += BlobResult.ElapsedSeconds; } + + return true; + }; + + PutRefResult RefResult; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++) + { + RefResult = + Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, ZenContentType::kCbObject); } - Success = false; - for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++) + if (!RefResult.Success) { - if (PutRefResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - RecordValue, - ZenContentType::kCbObject); - Result.Success) - { - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; - Success = true; + return {.Reason = "upload cache record '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + RefResult.Reason), + .Success = false}; + } - if (!Result.Needs.empty()) - { - for (const IoHash& NeededHash : Result.Needs) - { - Success = false; - - if (auto It = - std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), NeededHash); - It != std::end(CacheRecord.PayloadIds)) - { - const size_t Idx = It - std::begin(CacheRecord.PayloadIds); - - if (CloudCacheResult BlobResult = - Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); - BlobResult.Success) - { - TotalBytes += BlobResult.Bytes; - TotalElapsedSeconds += BlobResult.ElapsedSeconds; - Success = true; - } - else - { - ZEN_WARN("upload missing payload '{}/{}/{}' FAILED", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - NeededHash); - } - } - else - { - ZEN_WARN("needed payload '{}/{}/{}' MISSING", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - NeededHash); - } - } + TotalBytes += RefResult.Bytes; + TotalElapsedSeconds += RefResult.ElapsedSeconds; - const IoHash RefHash = IoHash::HashBuffer(RecordValue); + std::string Reason; + if (!PutBlobs(RefResult.Needs, Reason)) + { + return {.Reason = std::move(Reason), .Success = false}; + } - if (FinalizeRefResult FinalizeResult = - Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); - FinalizeResult.Success) - { - TotalBytes += FinalizeResult.Bytes; - TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; - Success = true; - - for (const IoHash& MissingHash : FinalizeResult.Needs) - { - ZEN_WARN("finalize '{}/{}' FAILED, missing '{}'", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - MissingHash); - } - } - else - { - ZEN_WARN("finalize '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash); - Success = false; - } - } + const IoHash RefHash = IoHash::HashBuffer(RecordValue); + FinalizeRefResult FinalizeResult = Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); + + if (!FinalizeResult.Success) + { + return {.Reason = "finalize cache record '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + FinalizeResult.Reason), + .Success = false}; + } + + if (!FinalizeResult.Needs.empty()) + { + if (!PutBlobs(FinalizeResult.Needs, Reason)) + { + return {.Reason = std::move(Reason), .Success = false}; + } + + FinalizeResult = Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); + + if (!FinalizeResult.Success) + { + return {.Reason = "finalize '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + FinalizeResult.Reason), + .Success = false}; + } - if (Success) + if (!FinalizeResult.Needs.empty()) + { + ExtendableStringBuilder<256> Sb; + for (const IoHash& MissingHash : FinalizeResult.Needs) { - break; + Sb << MissingHash.ToHexString() << ","; } + + return {.Reason = "finalize '{}/{}' FAILED, still needs payload(s) '{}'"_format(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + Sb.ToString()), + .Success = false}; } } - return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Success}; + TotalBytes += FinalizeResult.Bytes; + TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; + + return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true}; } } catch (std::exception& Err) @@ -890,6 +887,15 @@ private: { const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + + if (!Result.Success) + { + ZEN_WARN("upload cache record '{}/{}' FAILED, endpoint '{}', reason '{}'", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + Endpoint->DisplayName(), + Result.Reason); + } } } } @@ -907,7 +913,10 @@ private: } catch (std::exception& e) { - ZEN_WARN("process upstream ({}/{}) FAILED '{}'", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, e.what()); + ZEN_WARN("upload cache record '{}/{}' FAILED, reason '{}'", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + e.what()); } } -- cgit v1.2.3 From 4b8d4c0e375c729e38bdaadfebf0eaf14f08f5f9 Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Tue, 2 Nov 2021 10:50:18 +0100 Subject: Added upstream batch API. --- zenserver/upstream/upstreamcache.cpp | 41 ++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index c0cd858b6..437b29cd7 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -780,6 +780,47 @@ public: return {}; } + virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span CacheKeys, + std::span KeyIndex, + OnCacheGetComplete OnComplete) override + { + if (!m_Options.ReadUpstream) + { + return {.Missing = std::vector(KeyIndex.begin(), KeyIndex.end())}; + } + + GetUpstreamCacheBatchResult Result; + + for (size_t Idx : KeyIndex) + { + const UpstreamCacheKey CacheKey = {CacheKeys[Idx].Bucket, CacheKeys[Idx].Hash}; + + GetUpstreamCacheResult CacheResult; + for (auto& Endpoint : m_Endpoints) + { + if (Endpoint->IsHealthy()) + { + CacheResult = Endpoint->GetCacheRecord(CacheKey, ZenContentType::kCbPackage); + if (CacheResult.Success) + { + break; + } + } + } + + if (CacheResult.Success) + { + OnComplete(Idx, CacheResult.Value); + } + else + { + Result.Missing.push_back(Idx); + } + } + + return Result; + } + virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override { if (m_Options.ReadUpstream) -- cgit v1.2.3 From c32504578c67f5edf0faf4a458750f67ae53ab3a Mon Sep 17 00:00:00 2001 From: Martin Ridgers Date: Wed, 3 Nov 2021 13:16:35 +0100 Subject: Type consistency around signed/unsigned comparison --- zenserver/upstream/upstreamcache.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index c0cd858b6..a260cb40d 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -178,7 +178,7 @@ namespace detail { using namespace fmt::literals; ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); - const size_t MaxAttempts = 3; + const int32_t MaxAttempts = 3; try { @@ -223,7 +223,7 @@ namespace detail { const size_t Idx = std::distance(std::begin(CacheRecord.PayloadIds), It); CloudCacheResult BlobResult; - for (size_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) + for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) { BlobResult = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); } @@ -479,7 +479,7 @@ namespace detail { std::span Payloads) override { ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); - const uint32_t MaxAttempts = 3; + const int32_t MaxAttempts = 3; try { -- cgit v1.2.3 From e0d54396fa3ba0f5466a4ea1f2810721c18fa55f Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Tue, 9 Nov 2021 13:20:00 +0100 Subject: Sort cache keys when resolving payload ID's. --- zenserver/upstream/upstreamcache.cpp | 48 +++++++++++++++++++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 437b29cd7..7ef0caf62 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -782,7 +782,7 @@ public: virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span CacheKeys, std::span KeyIndex, - OnCacheGetComplete OnComplete) override + OnCacheGetComplete OnComplete) override final { if (!m_Options.ReadUpstream) { @@ -801,6 +801,52 @@ public: if (Endpoint->IsHealthy()) { CacheResult = Endpoint->GetCacheRecord(CacheKey, ZenContentType::kCbPackage); + m_Stats.Add(m_Log, *Endpoint, CacheResult, m_Endpoints); + + if (CacheResult.Success) + { + break; + } + } + } + + if (CacheResult.Success) + { + OnComplete(Idx, CacheResult.Value); + } + else + { + Result.Missing.push_back(Idx); + } + } + + return Result; + } + + virtual GetUpstreamCacheBatchResult GetCachePayloads(std::span CacheChunkRequests, + std::span ChunkIndex, + OnCacheGetComplete OnComplete) override final + { + if (!m_Options.ReadUpstream) + { + return {.Missing = std::vector(ChunkIndex.begin(), ChunkIndex.end())}; + } + + GetUpstreamCacheBatchResult Result; + + for (size_t Idx : ChunkIndex) + { + const CacheChunkRequest& Chunk = CacheChunkRequests[Idx]; + UpstreamPayloadKey PayloadKey{{Chunk.Key.Bucket, Chunk.Key.Hash}, Chunk.ChunkId}; + + GetUpstreamCacheResult CacheResult; + for (auto& Endpoint : m_Endpoints) + { + if (Endpoint->IsHealthy()) + { + CacheResult = Endpoint->GetCachePayload(PayloadKey); + m_Stats.Add(m_Log, *Endpoint, CacheResult, m_Endpoints); + if (CacheResult.Success) { break; -- cgit v1.2.3 From 2c0e2ab5de21b13dcd25758ca3b96af889db7137 Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Thu, 11 Nov 2021 11:19:17 +0100 Subject: Added batch API to upstream endpoints. --- zenserver/upstream/upstreamcache.cpp | 346 ++++++++++++++++++++++++++++------- 1 file changed, 276 insertions(+), 70 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 7ef0caf62..0a0706656 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -70,7 +70,7 @@ namespace detail { virtual std::string_view DisplayName() const override { return m_DisplayName; } - virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override { try { @@ -144,12 +144,54 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span CacheKeys, + std::span KeyIndex, + const CacheRecordPolicy& Policy, + OnCacheRecordGetComplete&& OnComplete) override + { + ZEN_UNUSED(Policy); + + CloudCacheSession Session(m_Client); + + for (size_t Index : KeyIndex) + { + const CacheKey& CacheKey = CacheKeys[Index]; + CloudCacheResult Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); + + CbPackage Package; + CbObjectView Record; + + if (Result.Success) + { + const CbValidateError ValidationResult = ValidateCompactBinary(Result.Response, CbValidateMode::All); + if (ValidationResult == CbValidateError::None) + { + Record = CbObjectView(Result.Response.GetData()); + Record.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) { + CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); + if (AttachmentResult.Success) + { + if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response))) + { + Package.AddAttachment(CbAttachment(Chunk)); + } + } + }); + } + } + + OnComplete({.CacheKey = CacheKey, .KeyIndex = Index, .Record = Record, .Package = Package}); + } + + return {}; + } + + virtual GetUpstreamCacheResult GetCachePayload(const CacheKey&, const IoHash& PayloadId) override { try { CloudCacheSession Session(m_Client); - const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); + const CloudCacheResult Result = Session.GetCompressedBlob(PayloadId); if (Result.ErrorCode == 0) { @@ -171,6 +213,29 @@ namespace detail { } } + virtual GetUpstreamCacheBatchResult GetCachePayloads(std::span CacheChunkRequests, + std::span RequestIndex, + OnCachePayloadGetComplete&& OnComplete) override final + { + CloudCacheSession Session(m_Client); + + for (size_t Index : RequestIndex) + { + const CacheChunkRequest& Request = CacheChunkRequests[Index]; + const CloudCacheResult Result = Session.GetCompressedBlob(Request.ChunkId); + + OnComplete({.Request = Request, .RequestIndex = Index, .Payload = Result.Response}); + + if (Result.ErrorCode) + { + m_HealthOk = false; + break; + } + } + + return {}; + } + virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, std::span Payloads) override @@ -419,7 +484,7 @@ namespace detail { virtual std::string_view DisplayName() const override { return m_DisplayName; } - virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override { try { @@ -446,13 +511,81 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span CacheKeys, + std::span KeyIndex, + const CacheRecordPolicy& Policy, + OnCacheRecordGetComplete&& OnComplete) override + { + std::vector IndexMap; + IndexMap.reserve(KeyIndex.size()); + + CbObjectWriter BatchRequest; + BatchRequest << "Method"sv + << "GetCacheRecords"; + + BatchRequest.BeginObject("Params"sv); + { + BatchRequest.BeginArray("CacheKeys"sv); + for (size_t Index : KeyIndex) + { + const CacheKey& Key = CacheKeys[Index]; + IndexMap.push_back(Index); + + BatchRequest.BeginObject(); + BatchRequest << "Bucket"sv << Key.Bucket; + BatchRequest << "Hash"sv << Key.Hash; + BatchRequest.EndObject(); + } + BatchRequest.EndArray(); + + BatchRequest.BeginObject("Policy"sv); + CacheRecordPolicy::Save(Policy, BatchRequest); + BatchRequest.EndObject(); + } + BatchRequest.EndObject(); + + CbPackage BatchResponse; + bool Success = false; + + { + ZenStructuredCacheSession Session(*m_Client); + ZenCacheResult Result = Session.InvokeRpc(BatchRequest.Save()); + if (Result.Success) + { + Success = BatchResponse.TryLoad(Result.Response); + } + else if (Result.ErrorCode) + { + Success = m_HealthOk = false; + } + } + + if (!Success) + { + for (size_t Index : KeyIndex) + { + OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); + } + + return {}; + } + + for (size_t LocalIndex = 0; CbFieldView Record : BatchResponse.GetObject()["Result"sv]) + { + const size_t Index = IndexMap[LocalIndex++]; + OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = Record.AsObjectView(), .Package = BatchResponse}); + } + + return {}; + } + + virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override { try { ZenStructuredCacheSession Session(*m_Client); const ZenCacheResult Result = - Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId); + Session.GetCachePayload(CacheKey.Bucket, CacheKey.Hash, PayloadId); if (Result.ErrorCode == 0) { @@ -474,6 +607,91 @@ namespace detail { } } + virtual GetUpstreamCacheBatchResult GetCachePayloads(std::span CacheChunkRequests, + std::span RequestIndex, + OnCachePayloadGetComplete&& OnComplete) override final + { + std::vector IndexMap; + IndexMap.reserve(RequestIndex.size()); + + CbObjectWriter BatchRequest; + BatchRequest << "Method"sv + << "GetCachePayloads"; + + BatchRequest.BeginObject("Params"sv); + { + BatchRequest.BeginArray("ChunkRequests"sv); + { + for (size_t Index : RequestIndex) + { + const CacheChunkRequest& Request = CacheChunkRequests[Index]; + IndexMap.push_back(Index); + + BatchRequest.BeginObject(); + { + BatchRequest.BeginObject("Key"sv); + BatchRequest << "Bucket"sv << Request.Key.Bucket; + BatchRequest << "Hash"sv << Request.Key.Hash; + BatchRequest.EndObject(); + + BatchRequest.AddObjectId("PayloadId"sv, Request.PayloadId); + BatchRequest << "ChunkId"sv << Request.ChunkId; + BatchRequest << "RawOffset"sv << Request.RawOffset; + BatchRequest << "RawSize"sv << Request.RawSize; + BatchRequest << "Policy"sv << static_cast(Request.Policy); + } + BatchRequest.EndObject(); + } + } + BatchRequest.EndArray(); + } + BatchRequest.EndObject(); + + CbPackage BatchResponse; + bool Success = false; + + { + ZenStructuredCacheSession Session(*m_Client); + ZenCacheResult Result = Session.InvokeRpc(BatchRequest.Save()); + if (Result.Success) + { + Success = BatchResponse.TryLoad(Result.Response); + } + else if (Result.ErrorCode) + { + m_HealthOk = false; + } + } + + if (!Success) + { + for (size_t Index : RequestIndex) + { + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); + } + + return {}; + } + + for (int32_t LocalIndex = 0; CbFieldView AttachmentHash : BatchResponse.GetObject()["Result"sv]) + { + const size_t Index = IndexMap[LocalIndex++]; + IoBuffer Payload; + + if (const CbAttachment* Attachment = BatchResponse.FindAttachment(AttachmentHash.AsHash())) + { + if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary()) + { + Payload = Compressed.GetCompressed().Flatten().AsIoBuffer(); + } + } + + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = std::move(Payload)}); + } + + return {}; + } + virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, std::span Payloads) override @@ -758,7 +976,7 @@ public: virtual void RegisterEndpoint(std::unique_ptr Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); } - virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override { if (m_Options.ReadUpstream) { @@ -780,94 +998,82 @@ public: return {}; } - virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span CacheKeys, - std::span KeyIndex, - OnCacheGetComplete OnComplete) override final + virtual void GetCacheRecords(std::span CacheKeys, + std::span KeyIndex, + const CacheRecordPolicy& Policy, + OnCacheRecordGetComplete&& OnComplete) override final { - if (!m_Options.ReadUpstream) - { - return {.Missing = std::vector(KeyIndex.begin(), KeyIndex.end())}; - } + std::vector MissingKeys(KeyIndex.begin(), KeyIndex.end()); - GetUpstreamCacheBatchResult Result; - - for (size_t Idx : KeyIndex) + if (m_Options.ReadUpstream) { - const UpstreamCacheKey CacheKey = {CacheKeys[Idx].Bucket, CacheKeys[Idx].Hash}; - - GetUpstreamCacheResult CacheResult; for (auto& Endpoint : m_Endpoints) { - if (Endpoint->IsHealthy()) + if (Endpoint->IsHealthy() && !MissingKeys.empty()) { - CacheResult = Endpoint->GetCacheRecord(CacheKey, ZenContentType::kCbPackage); - m_Stats.Add(m_Log, *Endpoint, CacheResult, m_Endpoints); + std::vector Missing; - if (CacheResult.Success) - { - break; - } - } - } + auto EndpointResult = + Endpoint->GetCacheRecords(CacheKeys, MissingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) { + if (Params.Record) + { + OnComplete(std::forward(Params)); + } + else + { + Missing.push_back(Params.KeyIndex); + } + }); - if (CacheResult.Success) - { - OnComplete(Idx, CacheResult.Value); - } - else - { - Result.Missing.push_back(Idx); + MissingKeys = std::move(Missing); + } } } - return Result; - } - - virtual GetUpstreamCacheBatchResult GetCachePayloads(std::span CacheChunkRequests, - std::span ChunkIndex, - OnCacheGetComplete OnComplete) override final - { - if (!m_Options.ReadUpstream) + for (size_t Index : MissingKeys) { - return {.Missing = std::vector(ChunkIndex.begin(), ChunkIndex.end())}; + OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); } + } - GetUpstreamCacheBatchResult Result; + virtual void GetCachePayloads(std::span CacheChunkRequests, + std::span RequestIndex, + OnCachePayloadGetComplete&& OnComplete) override final + { + std::vector MissingPayloads(RequestIndex.begin(), RequestIndex.end()); - for (size_t Idx : ChunkIndex) + if (m_Options.ReadUpstream) { - const CacheChunkRequest& Chunk = CacheChunkRequests[Idx]; - UpstreamPayloadKey PayloadKey{{Chunk.Key.Bucket, Chunk.Key.Hash}, Chunk.ChunkId}; - - GetUpstreamCacheResult CacheResult; for (auto& Endpoint : m_Endpoints) { - if (Endpoint->IsHealthy()) + if (Endpoint->IsHealthy() && !MissingPayloads.empty()) { - CacheResult = Endpoint->GetCachePayload(PayloadKey); - m_Stats.Add(m_Log, *Endpoint, CacheResult, m_Endpoints); + std::vector Missing; - if (CacheResult.Success) - { - break; - } - } - } + auto EndpointResult = + Endpoint->GetCachePayloads(CacheChunkRequests, MissingPayloads, [&](CachePayloadGetCompleteParams&& Params) { + if (Params.Payload) + { + OnComplete(std::forward(Params)); + } + else + { + Missing.push_back(Params.RequestIndex); + } + }); - if (CacheResult.Success) - { - OnComplete(Idx, CacheResult.Value); - } - else - { - Result.Missing.push_back(Idx); + MissingPayloads = std::move(Missing); + } } } - return Result; + for (size_t Index : MissingPayloads) + { + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); + } } - virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override { if (m_Options.ReadUpstream) { @@ -875,7 +1081,7 @@ public: { if (Endpoint->IsHealthy()) { - const GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); + const GetUpstreamCacheResult Result = Endpoint->GetCachePayload(CacheKey, PayloadId); m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); if (Result.Success) -- cgit v1.2.3 From 31ba344167677f175ec79ce7e579552a9811245d Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Thu, 11 Nov 2021 11:21:31 +0100 Subject: Format and remove unused type. --- zenserver/upstream/upstreamcache.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 0a0706656..a221c7d8e 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -584,8 +584,7 @@ namespace detail { try { ZenStructuredCacheSession Session(*m_Client); - const ZenCacheResult Result = - Session.GetCachePayload(CacheKey.Bucket, CacheKey.Hash, PayloadId); + const ZenCacheResult Result = Session.GetCachePayload(CacheKey.Bucket, CacheKey.Hash, PayloadId); if (Result.ErrorCode == 0) { -- cgit v1.2.3 From efd0c3c0896c97c6cd37e6a6ac2edf331e5b4a3f Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Thu, 11 Nov 2021 11:37:31 +0100 Subject: Removed batch result. --- zenserver/upstream/upstreamcache.cpp | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index a221c7d8e..d5414daf1 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -144,10 +144,10 @@ namespace detail { } } - virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span CacheKeys, - std::span KeyIndex, - const CacheRecordPolicy& Policy, - OnCacheRecordGetComplete&& OnComplete) override + virtual GetUpstreamCacheResult GetCacheRecords(std::span CacheKeys, + std::span KeyIndex, + const CacheRecordPolicy& Policy, + OnCacheRecordGetComplete&& OnComplete) override { ZEN_UNUSED(Policy); @@ -161,7 +161,7 @@ namespace detail { CbPackage Package; CbObjectView Record; - if (Result.Success) + if (Result.ErrorCode == 0) { const CbValidateError ValidationResult = ValidateCompactBinary(Result.Response, CbValidateMode::All); if (ValidationResult == CbValidateError::None) @@ -179,6 +179,10 @@ namespace detail { }); } } + else + { + m_HealthOk = false; + } OnComplete({.CacheKey = CacheKey, .KeyIndex = Index, .Record = Record, .Package = Package}); } @@ -213,9 +217,9 @@ namespace detail { } } - virtual GetUpstreamCacheBatchResult GetCachePayloads(std::span CacheChunkRequests, - std::span RequestIndex, - OnCachePayloadGetComplete&& OnComplete) override final + virtual GetUpstreamCacheResult GetCachePayloads(std::span CacheChunkRequests, + std::span RequestIndex, + OnCachePayloadGetComplete&& OnComplete) override final { CloudCacheSession Session(m_Client); @@ -511,10 +515,10 @@ namespace detail { } } - virtual GetUpstreamCacheBatchResult GetCacheRecords(std::span CacheKeys, - std::span KeyIndex, - const CacheRecordPolicy& Policy, - OnCacheRecordGetComplete&& OnComplete) override + virtual GetUpstreamCacheResult GetCacheRecords(std::span CacheKeys, + std::span KeyIndex, + const CacheRecordPolicy& Policy, + OnCacheRecordGetComplete&& OnComplete) override { std::vector IndexMap; IndexMap.reserve(KeyIndex.size()); @@ -606,9 +610,9 @@ namespace detail { } } - virtual GetUpstreamCacheBatchResult GetCachePayloads(std::span CacheChunkRequests, - std::span RequestIndex, - OnCachePayloadGetComplete&& OnComplete) override final + virtual GetUpstreamCacheResult GetCachePayloads(std::span CacheChunkRequests, + std::span RequestIndex, + OnCachePayloadGetComplete&& OnComplete) override final { std::vector IndexMap; IndexMap.reserve(RequestIndex.size()); -- cgit v1.2.3 From 7940ae2ff24e1b708d1d6a0bccf266213ea7316b Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Thu, 11 Nov 2021 13:28:52 +0100 Subject: Fixed stats. --- zenserver/upstream/upstreamcache.cpp | 211 ++++++++++++++++++++--------------- 1 file changed, 119 insertions(+), 92 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index d5414daf1..b486b751c 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -151,43 +151,54 @@ namespace detail { { ZEN_UNUSED(Policy); - CloudCacheSession Session(m_Client); + CloudCacheSession Session(m_Client); + GetUpstreamCacheResult Result; for (size_t Index : KeyIndex) { - const CacheKey& CacheKey = CacheKeys[Index]; - CloudCacheResult Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); + const CacheKey& CacheKey = CacheKeys[Index]; + CbPackage Package; + CbObjectView Record; - CbPackage Package; - CbObjectView Record; - - if (Result.ErrorCode == 0) + if (!Result.Error) { - const CbValidateError ValidationResult = ValidateCompactBinary(Result.Response, CbValidateMode::All); - if (ValidationResult == CbValidateError::None) + CloudCacheResult RefResult = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); + AppendResult(RefResult, Result); + + if (RefResult.ErrorCode == 0) { - Record = CbObjectView(Result.Response.GetData()); - Record.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) { - CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); - if (AttachmentResult.Success) - { - if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response))) + const CbValidateError ValidationResult = ValidateCompactBinary(RefResult.Response, CbValidateMode::All); + if (ValidationResult == CbValidateError::None) + { + Record = CbObjectView(RefResult.Response.GetData()); + Record.IterateAttachments([this, &Session, &Result, &Package](CbFieldView AttachmentHash) { + CloudCacheResult BlobResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); + AppendResult(BlobResult, Result); + + if (BlobResult.ErrorCode == 0) { - Package.AddAttachment(CbAttachment(Chunk)); + if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(BlobResult.Response))) + { + Package.AddAttachment(CbAttachment(Chunk)); + } } - } - }); + else + { + m_HealthOk = false; + } + }); + } + } + else + { + m_HealthOk = false; } - } - else - { - m_HealthOk = false; } OnComplete({.CacheKey = CacheKey, .KeyIndex = Index, .Record = Record, .Package = Package}); } - return {}; + return Result; } virtual GetUpstreamCacheResult GetCachePayload(const CacheKey&, const IoHash& PayloadId) override @@ -221,23 +232,27 @@ namespace detail { std::span RequestIndex, OnCachePayloadGetComplete&& OnComplete) override final { - CloudCacheSession Session(m_Client); + CloudCacheSession Session(m_Client); + GetUpstreamCacheResult Result; for (size_t Index : RequestIndex) { const CacheChunkRequest& Request = CacheChunkRequests[Index]; - const CloudCacheResult Result = Session.GetCompressedBlob(Request.ChunkId); - - OnComplete({.Request = Request, .RequestIndex = Index, .Payload = Result.Response}); + IoBuffer Payload; - if (Result.ErrorCode) + if (!Result.Error) { - m_HealthOk = false; - break; + const CloudCacheResult BlobResult = Session.GetCompressedBlob(Request.ChunkId); + Payload = BlobResult.Response; + + AppendResult(BlobResult, Result); + m_HealthOk = BlobResult.ErrorCode == 0; } + + OnComplete({.Request = Request, .RequestIndex = Index, .Payload = Payload}); } - return {}; + return Result; } virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, @@ -392,6 +407,18 @@ namespace detail { virtual UpstreamEndpointStats& Stats() override { return m_Stats; } private: + static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out) + { + Out.Success &= Result.Success; + Out.Bytes += Result.Bytes; + Out.ElapsedSeconds += Result.ElapsedSeconds; + + if (Result.ErrorCode) + { + Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}; + } + }; + spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; @@ -548,39 +575,39 @@ namespace detail { } BatchRequest.EndObject(); - CbPackage BatchResponse; - bool Success = false; + CbPackage BatchResponse; + ZenCacheResult Result; { ZenStructuredCacheSession Session(*m_Client); - ZenCacheResult Result = Session.InvokeRpc(BatchRequest.Save()); - if (Result.Success) - { - Success = BatchResponse.TryLoad(Result.Response); - } - else if (Result.ErrorCode) - { - Success = m_HealthOk = false; - } + Result = Session.InvokeRpc(BatchRequest.Save()); } - if (!Success) + if (Result.Success) { - for (size_t Index : KeyIndex) + if (BatchResponse.TryLoad(Result.Response)) { - OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); - } + for (size_t LocalIndex = 0; CbFieldView Record : BatchResponse.GetObject()["Result"sv]) + { + const size_t Index = IndexMap[LocalIndex++]; + OnComplete( + {.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = Record.AsObjectView(), .Package = BatchResponse}); + } - return {}; + return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; + } + } + else if (Result.ErrorCode) + { + m_HealthOk = false; } - for (size_t LocalIndex = 0; CbFieldView Record : BatchResponse.GetObject()["Result"sv]) + for (size_t Index : KeyIndex) { - const size_t Index = IndexMap[LocalIndex++]; - OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = Record.AsObjectView(), .Package = BatchResponse}); + OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); } - return {}; + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override @@ -650,49 +677,48 @@ namespace detail { } BatchRequest.EndObject(); - CbPackage BatchResponse; - bool Success = false; + CbPackage BatchResponse; + ZenCacheResult Result; { ZenStructuredCacheSession Session(*m_Client); - ZenCacheResult Result = Session.InvokeRpc(BatchRequest.Save()); - if (Result.Success) - { - Success = BatchResponse.TryLoad(Result.Response); - } - else if (Result.ErrorCode) - { - m_HealthOk = false; - } + Result = Session.InvokeRpc(BatchRequest.Save()); } - if (!Success) + if (Result.Success) { - for (size_t Index : RequestIndex) + if (BatchResponse.TryLoad(Result.Response)) { - OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); - } - - return {}; - } + for (size_t LocalIndex = 0; CbFieldView AttachmentHash : BatchResponse.GetObject()["Result"sv]) + { + const size_t Index = IndexMap[LocalIndex++]; + IoBuffer Payload; - for (int32_t LocalIndex = 0; CbFieldView AttachmentHash : BatchResponse.GetObject()["Result"sv]) - { - const size_t Index = IndexMap[LocalIndex++]; - IoBuffer Payload; + if (const CbAttachment* Attachment = BatchResponse.FindAttachment(AttachmentHash.AsHash())) + { + if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary()) + { + Payload = Compressed.GetCompressed().Flatten().AsIoBuffer(); + } + } - if (const CbAttachment* Attachment = BatchResponse.FindAttachment(AttachmentHash.AsHash())) - { - if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary()) - { - Payload = Compressed.GetCompressed().Flatten().AsIoBuffer(); + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = std::move(Payload)}); } + + return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; } + } + else if (Result.ErrorCode) + { + m_HealthOk = false; + } - OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = std::move(Payload)}); + for (size_t Index : RequestIndex) + { + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); } - return {}; + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, @@ -1016,18 +1042,18 @@ public: { std::vector Missing; - auto EndpointResult = - Endpoint->GetCacheRecords(CacheKeys, MissingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) { - if (Params.Record) - { - OnComplete(std::forward(Params)); - } - else - { - Missing.push_back(Params.KeyIndex); - } - }); + auto Result = Endpoint->GetCacheRecords(CacheKeys, MissingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) { + if (Params.Record) + { + OnComplete(std::forward(Params)); + } + else + { + Missing.push_back(Params.KeyIndex); + } + }); + m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); MissingKeys = std::move(Missing); } } @@ -1053,7 +1079,7 @@ public: { std::vector Missing; - auto EndpointResult = + auto Result = Endpoint->GetCachePayloads(CacheChunkRequests, MissingPayloads, [&](CachePayloadGetCompleteParams&& Params) { if (Params.Payload) { @@ -1065,6 +1091,7 @@ public: } }); + m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); MissingPayloads = std::move(Missing); } } -- cgit v1.2.3 From 033fd189b876197e263e669c81703bdb550b3ec0 Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Sat, 13 Nov 2021 15:33:22 +0100 Subject: Fixed bug in upstream jupiter endpoint. --- zenserver/upstream/upstreamcache.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 174c3d17a..085932685 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -158,7 +158,7 @@ namespace detail { { const CacheKey& CacheKey = CacheKeys[Index]; CbPackage Package; - CbObjectView Record; + CbObject Record; if (!Result.Error) { @@ -170,7 +170,7 @@ namespace detail { const CbValidateError ValidationResult = ValidateCompactBinary(RefResult.Response, CbValidateMode::All); if (ValidationResult == CbValidateError::None) { - Record = CbObjectView(RefResult.Response.GetData()); + Record = LoadCompactBinaryObject(RefResult.Response); Record.IterateAttachments([this, &Session, &Result, &Package](CbFieldView AttachmentHash) { CloudCacheResult BlobResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); AppendResult(BlobResult, Result); -- cgit v1.2.3 From 1fda861670f67b3220fc661e7975f160f99e6aed Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Tue, 16 Nov 2021 21:07:17 +0100 Subject: Added upstream connect/transfer timeout options. --- zenserver/upstream/upstreamcache.cpp | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 085932685..2741e3e51 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -442,9 +442,13 @@ namespace detail { }; public: - ZenUpstreamEndpoint(std::span Urls) : m_Log(zen::logging::Get("upstream")), m_DisplayName("ZEN") + ZenUpstreamEndpoint(const ZenClientOptions& Options) + : m_Log(zen::logging::Get("upstream")) + , m_DisplayName("ZEN") + , m_ConnectTimeout(Options.ConnectTimeout) + , m_Timeout(Options.Timeout) { - for (const auto& Url : Urls) + for (const auto& Url : Options.Urls) { m_Endpoints.push_back({.Url = Url}); } @@ -461,7 +465,7 @@ namespace detail { { m_ServiceUrl = Ep.Url; m_DisplayName = "ZEN - {}"_format(m_ServiceUrl); - m_Client = new ZenStructuredCacheClient(m_ServiceUrl); + m_Client = new ZenStructuredCacheClient({.Url = m_ServiceUrl, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout}); m_HealthOk = true; return {.Ok = true}; @@ -486,7 +490,8 @@ namespace detail { { m_ServiceUrl = Ep.Url; m_DisplayName = "ZEN - {}"_format(m_ServiceUrl); - m_Client = new ZenStructuredCacheClient(m_ServiceUrl); + m_Client = + new ZenStructuredCacheClient({.Url = m_ServiceUrl, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout}); m_HealthOk = true; return {.Ok = true}; @@ -825,7 +830,7 @@ namespace detail { { for (ZenEndpoint& Ep : m_Endpoints) { - ZenStructuredCacheClient Client(Ep.Url); + ZenStructuredCacheClient Client({.Url = Ep.Url, .ConnectTimeout = std::chrono::milliseconds(1000)}); ZenStructuredCacheSession Session(Client); const int32_t SampleCount = 2; @@ -858,6 +863,8 @@ namespace detail { std::string m_ServiceUrl; std::vector m_Endpoints; std::string m_DisplayName; + std::chrono::milliseconds m_ConnectTimeout; + std::chrono::milliseconds m_Timeout; RefPtr m_Client; UpstreamEndpointStats m_Stats; std::atomic_bool m_HealthOk{false}; @@ -1347,9 +1354,9 @@ MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) } std::unique_ptr -MakeZenUpstreamEndpoint(std::span Urls) +MakeZenUpstreamEndpoint(const ZenClientOptions& Options) { - return std::make_unique(Urls); + return std::make_unique(Options); } } // namespace zen -- cgit v1.2.3 From f45fb6c13407e98434db255b36e3cff402387588 Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Wed, 17 Nov 2021 12:21:28 +0100 Subject: Added connect/transfer timeout options for Jupiter client. --- zenserver/upstream/upstreamcache.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 2741e3e51..0290723f0 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -442,7 +442,7 @@ namespace detail { }; public: - ZenUpstreamEndpoint(const ZenClientOptions& Options) + ZenUpstreamEndpoint(const ZenStructuredCacheClientOptions& Options) : m_Log(zen::logging::Get("upstream")) , m_DisplayName("ZEN") , m_ConnectTimeout(Options.ConnectTimeout) @@ -1354,7 +1354,7 @@ MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) } std::unique_ptr -MakeZenUpstreamEndpoint(const ZenClientOptions& Options) +MakeZenUpstreamEndpoint(const ZenStructuredCacheClientOptions& Options) { return std::make_unique(Options); } -- cgit v1.2.3 From 200913d1b0d9716e73bf465b7b42802cc7b508cc Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Wed, 17 Nov 2021 12:35:34 +0100 Subject: Check both success and error code when initializing Jupiter upstream. --- zenserver/upstream/upstreamcache.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 0290723f0..0fa497faf 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -58,7 +58,7 @@ namespace detail { CloudCacheSession Session(m_Client); const CloudCacheResult Result = Session.Authenticate(); - m_HealthOk = Result.ErrorCode == 0; + m_HealthOk = Result.Success && Result.ErrorCode == 0; return {.Reason = std::move(Result.Reason), .Ok = Result.Success}; } -- cgit v1.2.3 From 731e523ab939c8aebf33ae5f7e127b01599bf575 Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Wed, 17 Nov 2021 15:29:58 +0100 Subject: Log upstream HTTP errors as errors. --- zenserver/upstream/upstreamcache.cpp | 68 +++++++++++++++++++++++++++++------- 1 file changed, 55 insertions(+), 13 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 0fa497faf..726ef331d 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -814,7 +814,10 @@ namespace detail { TotalElapsedSeconds += Result.ElapsedSeconds; } - return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success}; + return {.Reason = std::move(Result.Reason), + .Bytes = TotalBytes, + .ElapsedSeconds = TotalElapsedSeconds, + .Success = Result.Success}; } catch (std::exception& e) { @@ -1027,6 +1030,14 @@ public: { return Result; } + + if (Result.Error) + { + ZEN_ERROR("get cache record FAILED, endpoint '{}', reason '{}', error code '{}'", + Endpoint->DisplayName(), + Result.Error.Reason, + Result.Error.ErrorCode); + } } } } @@ -1060,6 +1071,14 @@ public: } }); + if (Result.Error) + { + ZEN_ERROR("get cache record(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", + Endpoint->DisplayName(), + Result.Error.Reason, + Result.Error.ErrorCode); + } + m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); MissingKeys = std::move(Missing); } @@ -1098,6 +1117,14 @@ public: } }); + if (Result.Error) + { + ZEN_ERROR("get cache payloads(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", + Endpoint->DisplayName(), + Result.Error.Reason, + Result.Error.ErrorCode); + } + m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); MissingPayloads = std::move(Missing); } @@ -1125,6 +1152,14 @@ public: { return Result; } + + if (Result.Error) + { + ZEN_ERROR("get cache payload FAILED, endpoint '{}', reason '{}', error code '{}'", + Endpoint->DisplayName(), + Result.Error.Reason, + Result.Error.ErrorCode); + } } } } @@ -1243,10 +1278,10 @@ private: } catch (std::exception& e) { - ZEN_WARN("upload cache record '{}/{}' FAILED, reason '{}'", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - e.what()); + ZEN_ERROR("upload cache record '{}/{}' FAILED, reason '{}'", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + e.what()); } } @@ -1269,20 +1304,27 @@ private: } } - for (auto& Endpoint : m_Endpoints) + try { - if (!Endpoint->IsHealthy()) + for (auto& Endpoint : m_Endpoints) { - if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok) + if (!Endpoint->IsHealthy()) { - ZEN_INFO("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason); - } - else - { - ZEN_WARN("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); + if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok) + { + ZEN_INFO("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason); + } + else + { + ZEN_WARN("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); + } } } } + catch (std::exception& e) + { + ZEN_ERROR("check endpoint(s) health FAILED, reason '{}'", e.what()); + } } } -- cgit v1.2.3 From 1b9d83b3fa3a6cc6e2fa0f1af72050de2ccf4ea9 Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Wed, 17 Nov 2021 20:31:41 +0100 Subject: Added health check and return missing error message. --- zenserver/upstream/upstreamcache.cpp | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 726ef331d..065471c07 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -286,7 +286,12 @@ namespace detail { } } - return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; + m_HealthOk = Result.ErrorCode == 0; + + return {.Reason = std::move(Result.Reason), + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; } else { @@ -312,6 +317,8 @@ namespace detail { BlobResult = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); } + m_HealthOk = BlobResult.ErrorCode == 0; + if (!BlobResult.Success) { OutReason = "upload payload '{}' FAILED, reason '{}'"_format(PayloadId, BlobResult.Reason); @@ -332,6 +339,8 @@ namespace detail { Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, ZenContentType::kCbObject); } + m_HealthOk = RefResult.ErrorCode == 0; + if (!RefResult.Success) { return {.Reason = "upload cache record '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket, @@ -351,6 +360,7 @@ namespace detail { const IoHash RefHash = IoHash::HashBuffer(RecordValue); FinalizeRefResult FinalizeResult = Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); + m_HealthOk = FinalizeResult.ErrorCode == 0; if (!FinalizeResult.Success) { @@ -368,6 +378,7 @@ namespace detail { } FinalizeResult = Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); + m_HealthOk = FinalizeResult.ErrorCode == 0; if (!FinalizeResult.Success) { @@ -400,6 +411,7 @@ namespace detail { } catch (std::exception& Err) { + m_HealthOk = false; return {.Reason = std::string(Err.what()), .Success = false}; } } @@ -819,10 +831,10 @@ namespace detail { .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success}; } - catch (std::exception& e) + catch (std::exception& Err) { m_HealthOk = false; - return {.Reason = std::string(e.what()), .Success = false}; + return {.Reason = std::string(Err.what()), .Success = false}; } } @@ -1276,12 +1288,12 @@ private: { ProcessCacheRecord(std::move(CacheRecord)); } - catch (std::exception& e) + catch (std::exception& Err) { ZEN_ERROR("upload cache record '{}/{}' FAILED, reason '{}'", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, - e.what()); + Err.what()); } } @@ -1321,9 +1333,9 @@ private: } } } - catch (std::exception& e) + catch (std::exception& Err) { - ZEN_ERROR("check endpoint(s) health FAILED, reason '{}'", e.what()); + ZEN_ERROR("check endpoint(s) health FAILED, reason '{}'", Err.what()); } } } -- cgit v1.2.3 From a3b24501aa2758b0621806086bb6e3ffc83c2b97 Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Wed, 17 Nov 2021 21:06:17 +0100 Subject: Changed upstream DisplayName to UpstreamEndpointInfo with name and url. --- zenserver/upstream/upstreamcache.cpp | 59 +++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 28 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 065471c07..e2dc09872 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -40,13 +40,15 @@ namespace detail { : m_Log(zen::logging::Get("upstream")) , m_UseLegacyDdc(Options.UseLegacyDdc) { - using namespace fmt::literals; - m_DisplayName = "Jupiter - '{}'"_format(Options.ServiceUrl); - m_Client = new CloudCacheClient(Options); + m_Info.Name = "Horde"sv; + m_Info.Url = Options.ServiceUrl; + m_Client = new CloudCacheClient(Options); } virtual ~JupiterUpstreamEndpoint() = default; + virtual const UpstreamEndpointInfo& GetEndpointInfo() const { return m_Info; } + virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); } virtual bool IsHealthy() const override { return m_HealthOk.load(); } @@ -68,8 +70,6 @@ namespace detail { } } - virtual std::string_view DisplayName() const override { return m_DisplayName; } - virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override { try @@ -434,6 +434,7 @@ namespace detail { spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; + UpstreamEndpointInfo m_Info; bool m_UseLegacyDdc; std::string m_DisplayName; RefPtr m_Client; @@ -456,7 +457,7 @@ namespace detail { public: ZenUpstreamEndpoint(const ZenStructuredCacheClientOptions& Options) : m_Log(zen::logging::Get("upstream")) - , m_DisplayName("ZEN") + , m_Info({.Name = std::string("Zen")}) , m_ConnectTimeout(Options.ConnectTimeout) , m_Timeout(Options.Timeout) { @@ -468,6 +469,8 @@ namespace detail { ~ZenUpstreamEndpoint() = default; + virtual const UpstreamEndpointInfo& GetEndpointInfo() const { return m_Info; } + virtual UpstreamEndpointHealth Initialize() override { using namespace fmt::literals; @@ -475,9 +478,8 @@ namespace detail { const ZenEndpoint& Ep = GetEndpoint(); if (Ep.Ok) { - m_ServiceUrl = Ep.Url; - m_DisplayName = "ZEN - {}"_format(m_ServiceUrl); - m_Client = new ZenStructuredCacheClient({.Url = m_ServiceUrl, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout}); + m_Info.Url = Ep.Url; + m_Client = new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout}); m_HealthOk = true; return {.Ok = true}; @@ -500,10 +502,9 @@ namespace detail { const ZenEndpoint& Ep = GetEndpoint(); if (Ep.Ok) { - m_ServiceUrl = Ep.Url; - m_DisplayName = "ZEN - {}"_format(m_ServiceUrl); + m_Info.Url = Ep.Url; m_Client = - new ZenStructuredCacheClient({.Url = m_ServiceUrl, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout}); + new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout}); m_HealthOk = true; return {.Ok = true}; @@ -530,8 +531,6 @@ namespace detail { } } - virtual std::string_view DisplayName() const override { return m_DisplayName; } - virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override { try @@ -866,7 +865,7 @@ namespace detail { for (const auto& Ep : m_Endpoints) { - ZEN_INFO("ping ZEN endpoint '{}' latency '{:.3}s' {}", Ep.Url, Ep.Latency, Ep.Ok ? "OK" : Ep.Reason); + ZEN_INFO("ping 'Zen' endpoint '{}' latency '{:.3}s' {}", Ep.Url, Ep.Latency, Ep.Ok ? "OK" : Ep.Reason); } return m_Endpoints.front(); @@ -875,9 +874,8 @@ namespace detail { spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; - std::string m_ServiceUrl; + UpstreamEndpointInfo m_Info; std::vector m_Endpoints; - std::string m_DisplayName; std::chrono::milliseconds m_ConnectTimeout; std::chrono::milliseconds m_Timeout; RefPtr m_Client; @@ -966,7 +964,7 @@ struct UpstreamStats const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0; Logger.debug("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", - Ep->DisplayName(), + Ep->GetEndpointInfo().Name, HitRate, DownBytes, DownSpeed, @@ -1000,13 +998,15 @@ public: for (auto& Endpoint : m_Endpoints) { const UpstreamEndpointHealth Health = Endpoint->Initialize(); + const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo(); + if (Health.Ok) { - ZEN_INFO("initialize endpoint '{}' OK", Endpoint->DisplayName()); + ZEN_INFO("'{}' endpoint '{}' OK", Info.Name, Info.Url); } else { - ZEN_WARN("initialize endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); + ZEN_WARN("'{}' endpoint '{}' FAILED, reason '{}'", Info.Name, Info.Url, Health.Reason); } } @@ -1046,7 +1046,7 @@ public: if (Result.Error) { ZEN_ERROR("get cache record FAILED, endpoint '{}', reason '{}', error code '{}'", - Endpoint->DisplayName(), + Endpoint->GetEndpointInfo().Url, Result.Error.Reason, Result.Error.ErrorCode); } @@ -1086,7 +1086,7 @@ public: if (Result.Error) { ZEN_ERROR("get cache record(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", - Endpoint->DisplayName(), + Endpoint->GetEndpointInfo().Url, Result.Error.Reason, Result.Error.ErrorCode); } @@ -1132,7 +1132,7 @@ public: if (Result.Error) { ZEN_ERROR("get cache payloads(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", - Endpoint->DisplayName(), + Endpoint->GetEndpointInfo().Url, Result.Error.Reason, Result.Error.ErrorCode); } @@ -1168,7 +1168,7 @@ public: if (Result.Error) { ZEN_ERROR("get cache payload FAILED, endpoint '{}', reason '{}', error code '{}'", - Endpoint->DisplayName(), + Endpoint->GetEndpointInfo().Url, Result.Error.Reason, Result.Error.ErrorCode); } @@ -1208,8 +1208,10 @@ public: Status.BeginArray("endpoints"); for (const auto& Ep : m_Endpoints) { + const UpstreamEndpointInfo& Info = Ep->GetEndpointInfo(); Status.BeginObject(); - Status << "name" << Ep->DisplayName(); + Status << "name" << Info.Name; + Status << "url" << Info.Url; Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv); UpstreamEndpointStats& Stats = Ep->Stats(); @@ -1270,7 +1272,7 @@ private: ZEN_WARN("upload cache record '{}/{}' FAILED, endpoint '{}', reason '{}'", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, - Endpoint->DisplayName(), + Endpoint->GetEndpointInfo().Url, Result.Reason); } } @@ -1322,13 +1324,14 @@ private: { if (!Endpoint->IsHealthy()) { + const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo(); if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok) { - ZEN_INFO("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason); + ZEN_INFO("health check endpoint '{} - {}' OK", Info.Name, Info.Url, Health.Reason); } else { - ZEN_WARN("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); + ZEN_WARN("health check endpoint '{} - {}' FAILED, reason '{}'", Info.Name, Info.Url, Health.Reason); } } } -- cgit v1.2.3