From 0d3ce5ffecfd894b3a2b0421c562f2bb06fdc1c8 Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Fri, 1 Oct 2021 11:06:32 +0200 Subject: Try upload missing payloads and finalize Jupiter cache ref. --- zenserver/upstream/upstreamcache.cpp | 114 ++++++++++++++++++++++++++++++----- 1 file changed, 99 insertions(+), 15 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 0dd16cd06..0329569d8 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -45,6 +46,7 @@ namespace detail { { std::lock_guard Lock(m_Lock); m_Queue.emplace_back(std::move(Item)); + m_Size++; } m_NewItemSignal.notify_one(); @@ -64,6 +66,7 @@ namespace detail { { Item = std::move(m_Queue.front()); m_Queue.pop_front(); + m_Size--; return true; } @@ -80,7 +83,7 @@ namespace detail { } } - std::size_t Num() const + std::size_t Size() const { std::unique_lock Lock(m_Lock); return m_Queue.size(); @@ -91,12 +94,15 @@ namespace detail { std::condition_variable m_NewItemSignal; std::deque m_Queue; std::atomic_bool m_CompleteAdding{false}; + std::atomic_uint32_t m_Size; }; class JupiterUpstreamEndpoint final : public UpstreamEndpoint { public: - JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) : m_UseLegacyDdc(Options.UseLegacyDdc) + JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) + : m_Log(zen::logging::Get("upstream")) + , m_UseLegacyDdc(Options.UseLegacyDdc) { using namespace fmt::literals; m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl); @@ -277,16 +283,82 @@ namespace detail { Success = false; for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++) { - if (CloudCacheResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - RecordValue, - ZenContentType::kCbObject); + if (PutRefResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + RecordValue, + ZenContentType::kCbObject); Result.Success) { TotalBytes += Result.Bytes; TotalElapsedSeconds += Result.ElapsedSeconds; Success = true; - break; + + if (!Result.Needs.empty()) + { + for (const IoHash& NeededHash : Result.Needs) + { + Success = false; + + if (auto It = + std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), NeededHash); + It != std::end(CacheRecord.PayloadIds)) + { + const size_t Idx = It - std::begin(CacheRecord.PayloadIds); + + if (CloudCacheResult BlobResult = + Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); + BlobResult.Success) + { + TotalBytes += BlobResult.Bytes; + TotalElapsedSeconds += BlobResult.ElapsedSeconds; + Success = true; + } + else + { + ZEN_WARN("upload missing payload '{}/{}/{}' FAILED", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + NeededHash); + } + } + else + { + ZEN_WARN("needed payload '{}/{}/{}' MISSING", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + NeededHash); + } + } + + const IoHash RefHash = IoHash::HashBuffer(RecordValue); + + if (FinalizeRefResult FinalizeResult = + Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); + FinalizeResult.Success) + { + TotalBytes += FinalizeResult.Bytes; + TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; + Success = true; + + for (const IoHash& MissingHash : FinalizeResult.Needs) + { + ZEN_WARN("finalize '{}/{}' FAILED, missing '{}'", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + MissingHash); + } + } + else + { + ZEN_WARN("finalize '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash); + Success = false; + } + } + + if (Success) + { + break; + } } } @@ -302,6 +374,9 @@ namespace detail { virtual UpstreamEndpointStats& Stats() override { return m_Stats; } private: + spdlog::logger& Log() { return m_Log; } + + spdlog::logger& m_Log; bool m_UseLegacyDdc; std::string m_DisplayName; RefPtr m_Client; @@ -515,7 +590,12 @@ struct UpstreamStats } UpstreamEndpointStats& Stats = Endpoint.Stats(); - if (Result.Success) + + if (Result.Error) + { + Stats.ErrorCount++; + } + else if (Result.Success) { Stats.HitCount++; Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); @@ -549,6 +629,10 @@ struct UpstreamStats Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); Stats.SecondsUp.fetch_add(Result.ElapsedSeconds); } + else + { + Stats.ErrorCount++; + } if (m_SampleCount++ % MaxSampleCount) { @@ -575,13 +659,13 @@ struct UpstreamStats const uint64_t TotalCount = HitCount + MissCount; const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) * 100.0 : 0.0; - Logger.info("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", - Ep->DisplayName(), - HitRate, - DownBytes, - DownSpeed, - UpBytes, - UpSpeed); + Logger.debug("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", + Ep->DisplayName(), + HitRate, + DownBytes, + DownSpeed, + UpBytes, + UpSpeed); } } -- cgit v1.2.3 From eff1eb09a293cf92b6c25d7041f6931acf2c169f Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Fri, 1 Oct 2021 11:29:03 +0200 Subject: Added upstream cache perf metrics. --- zenserver/upstream/upstreamcache.cpp | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 0329569d8..03054b542 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -784,6 +784,36 @@ public: return {}; } + virtual void GetStatus(CbObjectWriter& Status) override + { + Status << "reading" << m_Options.ReadUpstream; + Status << "writing" << m_Options.WriteUpstream; + Status << "worker_threads" << m_Options.ThreadCount; + Status << "queue_count" << m_UpstreamQueue.Size(); + + Status.BeginArray("endpoints"); + for (const auto& Ep : m_Endpoints) + { + Status.BeginObject(); + Status << "name" << Ep->DisplayName(); + Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv); + + UpstreamEndpointStats& Stats = Ep->Stats(); + const uint64_t HitCount = Stats.HitCount; + const uint64_t MissCount = Stats.MissCount; + const uint64_t TotalCount = HitCount + MissCount; + const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) * 100.0 : 0.0; + + Status << "hit_ratio" << HitRate; + Status << "downloaded_mb" << Stats.DownBytes; + Status << "uploaded_mb" << Stats.UpBytes; + Status << "error_count" << Stats.ErrorCount; + + Status.EndObject(); + } + Status.EndArray(); + } + private: void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) { -- cgit v1.2.3 From 82a032af24dfefa508c384536e6b5b7dbe65ccb8 Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Fri, 1 Oct 2021 13:29:02 +0200 Subject: Improved error handling for upstream endpoints. --- zenserver/upstream/upstreamcache.cpp | 86 ++++++++++++++++++++++++------------ 1 file changed, 57 insertions(+), 29 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 03054b542..58a5b1ff3 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -186,16 +186,23 @@ namespace detail { } } - m_HealthOk = Result.ErrorCode == 0; - - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + if (Result.ErrorCode == 0) + { + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; + } + else + { + m_HealthOk = false; + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } } catch (std::exception& Err) { - return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } @@ -206,16 +213,23 @@ namespace detail { CloudCacheSession Session(m_Client); const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); - m_HealthOk = Result.ErrorCode == 0; - - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + if (Result.ErrorCode == 0) + { + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; + } + else + { + m_HealthOk = false; + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } } catch (std::exception& Err) { - return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } @@ -429,16 +443,23 @@ namespace detail { ZenStructuredCacheSession Session(*m_Client); const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type); - m_HealthOk = Result.ErrorCode == 0; - - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + if (Result.ErrorCode == 0) + { + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; + } + else + { + m_HealthOk = false; + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } } catch (std::exception& Err) { - return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } @@ -450,16 +471,23 @@ namespace detail { const ZenCacheResult Result = Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId); - m_HealthOk = Result.ErrorCode == 0; - - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + if (Result.ErrorCode == 0) + { + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; + } + else + { + m_HealthOk = false; + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } } catch (std::exception& Err) { - return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } @@ -575,7 +603,7 @@ namespace detail { struct UpstreamStats { - static constexpr uint64_t MaxSampleCount = 100ull; + static constexpr uint64_t MaxSampleCount = 1000ull; UpstreamStats(bool Enabled) : m_Enabled(Enabled) {} -- cgit v1.2.3 From be6e8143f2ea0c4acc87608f651058627b2c229c Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Fri, 1 Oct 2021 14:45:59 +0200 Subject: Added cache HIT/MISS stat counters. --- zenserver/upstream/upstreamcache.cpp | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 58a5b1ff3..b1966e299 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -612,11 +612,6 @@ struct UpstreamStats const GetUpstreamCacheResult& Result, const std::vector>& Endpoints) { - if (!m_Enabled) - { - return; - } - UpstreamEndpointStats& Stats = Endpoint.Stats(); if (Result.Error) @@ -634,7 +629,7 @@ struct UpstreamStats Stats.MissCount++; } - if (m_SampleCount++ % MaxSampleCount) + if (m_Enabled && m_SampleCount++ % MaxSampleCount) { Dump(Logger, Endpoints); } @@ -645,11 +640,6 @@ struct UpstreamStats const PutUpstreamCacheResult& Result, const std::vector>& Endpoints) { - if (!m_Enabled) - { - return; - } - UpstreamEndpointStats& Stats = Endpoint.Stats(); if (Result.Success) { @@ -662,7 +652,7 @@ struct UpstreamStats Stats.ErrorCount++; } - if (m_SampleCount++ % MaxSampleCount) + if (m_Enabled && m_SampleCount++ % MaxSampleCount) { Dump(Logger, Endpoints); } -- cgit v1.2.3