diff options
| author | Dan Engelbrecht <[email protected]> | 2022-08-19 03:30:54 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-08-19 03:30:54 -0700 |
| commit | 606274a83d71928f8621c1d23648a26e8f79fa7d (patch) | |
| tree | d951d51e1a7919463aeec398612e97b47f99f788 /zenserver/upstream | |
| parent | bump vcpkg version to 2022.08.15 (#146) (diff) | |
| download | zen-606274a83d71928f8621c1d23648a26e8f79fa7d.tar.xz zen-606274a83d71928f8621c1d23648a26e8f79fa7d.zip | |
De/fix crash on non responding upstream (#145)
* Fix ZenStructuredCacheClient lifetime issues
Diffstat (limited to 'zenserver/upstream')
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 60 | ||||
| -rw-r--r-- | zenserver/upstream/zen.cpp | 24 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 4 |
3 files changed, 61 insertions, 27 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 7d1f72004..40d627862 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -717,10 +717,15 @@ namespace detail { const ZenEndpoint& Ep = GetEndpoint(); - m_Info.Url = Ep.Url; + if (m_Info.Url != Ep.Url) + { + ZEN_INFO("Setting Zen upstream URL to '{}'", Ep.Url); + m_Info.Url = Ep.Url; + } if (Ep.Ok) { + RwLock::ExclusiveLockScope _(m_ClientLock); m_Client = new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout}); m_Status.Set(UpstreamEndpointState::kOk); } @@ -749,7 +754,7 @@ namespace detail { try { - ZenStructuredCacheSession Session(*m_Client); + ZenStructuredCacheSession Session(GetClientRef()); const ZenCacheResult Result = Session.GetCacheRecord(Namespace, CacheKey.Bucket, CacheKey.Hash, Type); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -820,7 +825,7 @@ namespace detail { ZenCacheResult Result; { - ZenStructuredCacheSession Session(*m_Client); + ZenStructuredCacheSession Session(GetClientRef()); Result = Session.InvokeRpc(BatchRequest.Save()); } @@ -864,7 +869,7 @@ namespace detail { try { - ZenStructuredCacheSession Session(*m_Client); + ZenStructuredCacheSession Session(GetClientRef()); const ZenCacheResult Result = Session.GetCacheValue(Namespace, CacheKey.Bucket, CacheKey.Hash, ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -952,7 +957,7 @@ namespace detail { ZenCacheResult Result; { - ZenStructuredCacheSession Session(*m_Client); + ZenStructuredCacheSession Session(GetClientRef()); Result = Session.InvokeRpc(BatchRequest.Save()); } @@ -1028,7 +1033,7 @@ namespace detail { try { - ZenStructuredCacheSession Session(*m_Client); + ZenStructuredCacheSession Session(GetClientRef()); ZenCacheResult Result; int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; @@ -1180,12 +1185,24 @@ namespace detail { virtual UpstreamEndpointStats& Stats() override { return m_Stats; } private: + Ref<ZenStructuredCacheClient> GetClientRef() + { + // m_Client can be modified at any time by a different thread. + // Make sure we safely bump the refcount inside a scope lock + RwLock::SharedLockScope _(m_ClientLock); + ZEN_ASSERT(m_Client); + Ref<ZenStructuredCacheClient> ClientRef(m_Client); + _.ReleaseNow(); + return ClientRef; + } + const ZenEndpoint& GetEndpoint() { for (ZenEndpoint& Ep : m_Endpoints) { - ZenStructuredCacheClient Client({.Url = Ep.Url, .ConnectTimeout = std::chrono::milliseconds(1000)}); - ZenStructuredCacheSession Session(Client); + Ref<ZenStructuredCacheClient> Client( + new ZenStructuredCacheClient({.Url = Ep.Url, .ConnectTimeout = std::chrono::milliseconds(1000)})); + ZenStructuredCacheSession Session(std::move(Client)); const int32_t SampleCount = 2; Ep.Ok = false; @@ -1220,6 +1237,7 @@ namespace detail { std::vector<ZenEndpoint> m_Endpoints; std::chrono::milliseconds m_ConnectTimeout; std::chrono::milliseconds m_Timeout; + RwLock m_ClientLock; RefPtr<ZenStructuredCacheClient> m_Client; }; @@ -1582,7 +1600,10 @@ private: if (!m_CacheStore.Get(CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) { - ZEN_WARN("process upstream FAILED, '{}/{}', cache record doesn't exist", CacheRecord.Key.Bucket, CacheRecord.Key.Hash); + ZEN_WARN("process upstream FAILED, '{}/{}/{}', cache record doesn't exist", + CacheRecord.Namespace, + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash); return; } @@ -1623,7 +1644,8 @@ private: if (!Result.Success) { - ZEN_WARN("upload cache record '{}/{}' FAILED, endpoint '{}', reason '{}'", + ZEN_WARN("upload cache record '{}/{}/{}' FAILED, endpoint '{}', reason '{}'", + CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, Endpoint->GetEndpointInfo().Url, @@ -1645,7 +1667,11 @@ private: } catch (std::exception& Err) { - ZEN_ERROR("upload cache record '{}/{}' FAILED, reason '{}'", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, Err.what()); + ZEN_ERROR("upload cache record '{}/{}/{}' FAILED, reason '{}'", + CacheRecord.Namespace, + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, + Err.what()); } } @@ -1677,8 +1703,16 @@ private: for (auto& Endpoint : m_Endpoints) { - if (Endpoint->GetState() == UpstreamEndpointState::kError || - Endpoint->GetState() == UpstreamEndpointState::kUnauthorized) + UpstreamEndpointState State = Endpoint->GetState(); + if (State == UpstreamEndpointState::kError) + { + Endpoints.push_back(Endpoint.get()); + ZEN_WARN("HEALTH - endpoint '{} - {}' is in error state '{}'", + Endpoint->GetEndpointInfo().Name, + Endpoint->GetEndpointInfo().Url, + Endpoint->GetStatus().Reason); + } + if (State == UpstreamEndpointState::kUnauthorized) { Endpoints.push_back(Endpoint.get()); } diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index cd6a531ca..b837f767c 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -377,23 +377,23 @@ ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State) using namespace std::literals; -ZenStructuredCacheSession::ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient) -: m_Log(OuterClient.Log()) -, m_Client(OuterClient) +ZenStructuredCacheSession::ZenStructuredCacheSession(Ref<ZenStructuredCacheClient>&& OuterClient) +: m_Log(OuterClient->Log()) +, m_Client(std::move(OuterClient)) { - m_SessionState = m_Client.AllocSessionState(); + m_SessionState = m_Client->AllocSessionState(); } ZenStructuredCacheSession::~ZenStructuredCacheSession() { - m_Client.FreeSessionState(m_SessionState); + m_Client->FreeSessionState(m_SessionState); } ZenCacheResult ZenStructuredCacheSession::CheckHealth() { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/health/check"; + Uri << m_Client->ServiceUrl() << "/health/check"; cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); @@ -411,7 +411,7 @@ ZenCacheResult ZenStructuredCacheSession::GetCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType Type) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/"; + Uri << m_Client->ServiceUrl() << "/z$/"; if (Namespace != ZenCacheStore::DefaultNamespace) { Uri << Namespace << "/"; @@ -443,7 +443,7 @@ ZenStructuredCacheSession::GetCacheValue(std::string_view Namespace, const IoHash& ValueContentId) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/"; + Uri << m_Client->ServiceUrl() << "/z$/"; if (Namespace != ZenCacheStore::DefaultNamespace) { Uri << Namespace << "/"; @@ -481,7 +481,7 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace, ZenContentType Type) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/"; + Uri << m_Client->ServiceUrl() << "/z$/"; if (Namespace != ZenCacheStore::DefaultNamespace) { Uri << Namespace << "/"; @@ -517,7 +517,7 @@ ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace, IoBuffer Payload) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/"; + Uri << m_Client->ServiceUrl() << "/z$/"; if (Namespace != ZenCacheStore::DefaultNamespace) { Uri << Namespace << "/"; @@ -546,7 +546,7 @@ ZenCacheResult ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/$rpc"; + Uri << m_Client->ServiceUrl() << "/z$/$rpc"; BinaryWriter Body; Request.CopyTo(Body); @@ -579,7 +579,7 @@ ZenCacheResult ZenStructuredCacheSession::InvokeRpc(const CbPackage& Request) { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/z$/$rpc"; + Uri << m_Client->ServiceUrl() << "/z$/$rpc"; SharedBuffer Message = FormatPackageMessageBuffer(Request).Flatten(); diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index e8590f940..955cfa107 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -124,7 +124,7 @@ struct ZenStructuredCacheClientOptions class ZenStructuredCacheSession { public: - ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient); + ZenStructuredCacheSession(Ref<ZenStructuredCacheClient>&& OuterClient); ~ZenStructuredCacheSession(); ZenCacheResult CheckHealth(); @@ -147,7 +147,7 @@ private: inline spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; - ZenStructuredCacheClient& m_Client; + Ref<ZenStructuredCacheClient> m_Client; detail::ZenCacheSessionState* m_SessionState; }; |