diff options
| author | Dan Engelbrecht <[email protected]> | 2022-08-19 03:30:54 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-08-19 03:30:54 -0700 |
| commit | 606274a83d71928f8621c1d23648a26e8f79fa7d (patch) | |
| tree | d951d51e1a7919463aeec398612e97b47f99f788 /zenserver/upstream/upstreamcache.cpp | |
| parent | bump vcpkg version to 2022.08.15 (#146) (diff) | |
| download | zen-606274a83d71928f8621c1d23648a26e8f79fa7d.tar.xz zen-606274a83d71928f8621c1d23648a26e8f79fa7d.zip | |
De/fix crash on non responding upstream (#145)
* Fix ZenStructuredCacheClient lifetime issues
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 60 |
1 files changed, 47 insertions, 13 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 7d1f72004..40d627862 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -717,10 +717,15 @@ namespace detail { const ZenEndpoint& Ep = GetEndpoint(); - m_Info.Url = Ep.Url; + if (m_Info.Url != Ep.Url) + { + ZEN_INFO("Setting Zen upstream URL to '{}'", Ep.Url); + m_Info.Url = Ep.Url; + } if (Ep.Ok) { + RwLock::ExclusiveLockScope _(m_ClientLock); m_Client = new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout}); m_Status.Set(UpstreamEndpointState::kOk); } @@ -749,7 +754,7 @@ namespace detail { try { - ZenStructuredCacheSession Session(*m_Client); + ZenStructuredCacheSession Session(GetClientRef()); const ZenCacheResult Result = Session.GetCacheRecord(Namespace, CacheKey.Bucket, CacheKey.Hash, Type); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -820,7 +825,7 @@ namespace detail { ZenCacheResult Result; { - ZenStructuredCacheSession Session(*m_Client); + ZenStructuredCacheSession Session(GetClientRef()); Result = Session.InvokeRpc(BatchRequest.Save()); } @@ -864,7 +869,7 @@ namespace detail { try { - ZenStructuredCacheSession Session(*m_Client); + ZenStructuredCacheSession Session(GetClientRef()); const ZenCacheResult Result = Session.GetCacheValue(Namespace, CacheKey.Bucket, CacheKey.Hash, ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -952,7 +957,7 @@ namespace detail { ZenCacheResult Result; { - ZenStructuredCacheSession Session(*m_Client); + ZenStructuredCacheSession Session(GetClientRef()); Result = Session.InvokeRpc(BatchRequest.Save()); } @@ -1028,7 +1033,7 @@ namespace detail { try { - ZenStructuredCacheSession Session(*m_Client); + ZenStructuredCacheSession Session(GetClientRef()); ZenCacheResult Result; int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; @@ -1180,12 +1185,24 @@ namespace detail { virtual UpstreamEndpointStats& Stats() override { return m_Stats; } private: + Ref<ZenStructuredCacheClient> GetClientRef() + { + // m_Client can be modified at any time by a different thread. + // Make sure we safely bump the refcount inside a scope lock + RwLock::SharedLockScope _(m_ClientLock); + ZEN_ASSERT(m_Client); + Ref<ZenStructuredCacheClient> ClientRef(m_Client); + _.ReleaseNow(); + return ClientRef; + } + const ZenEndpoint& GetEndpoint() { for (ZenEndpoint& Ep : m_Endpoints) { - ZenStructuredCacheClient Client({.Url = Ep.Url, .ConnectTimeout = std::chrono::milliseconds(1000)}); - ZenStructuredCacheSession Session(Client); + Ref<ZenStructuredCacheClient> Client( + new ZenStructuredCacheClient({.Url = Ep.Url, .ConnectTimeout = std::chrono::milliseconds(1000)})); + ZenStructuredCacheSession Session(std::move(Client)); const int32_t SampleCount = 2; Ep.Ok = false; @@ -1220,6 +1237,7 @@ namespace detail { std::vector<ZenEndpoint> m_Endpoints; std::chrono::milliseconds m_ConnectTimeout; std::chrono::milliseconds m_Timeout; + RwLock m_ClientLock; RefPtr<ZenStructuredCacheClient> m_Client; }; @@ -1582,7 +1600,10 @@ private: if (!m_CacheStore.Get(CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue)) { - ZEN_WARN("process upstream FAILED, '{}/{}', cache record doesn't exist", CacheRecord.Key.Bucket, CacheRecord.Key.Hash); + ZEN_WARN("process upstream FAILED, '{}/{}/{}', cache record doesn't exist", + CacheRecord.Namespace, + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash); return; } @@ -1623,7 +1644,8 @@ private: if (!Result.Success) { - ZEN_WARN("upload cache record '{}/{}' FAILED, endpoint '{}', reason '{}'", + ZEN_WARN("upload cache record '{}/{}/{}' FAILED, endpoint '{}', reason '{}'", + CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, Endpoint->GetEndpointInfo().Url, @@ -1645,7 +1667,11 @@ private: } catch (std::exception& Err) { - ZEN_ERROR("upload cache record '{}/{}' FAILED, reason '{}'", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, Err.what()); + ZEN_ERROR("upload cache record '{}/{}/{}' FAILED, reason '{}'", + CacheRecord.Namespace, + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, + Err.what()); } } @@ -1677,8 +1703,16 @@ private: for (auto& Endpoint : m_Endpoints) { - if (Endpoint->GetState() == UpstreamEndpointState::kError || - Endpoint->GetState() == UpstreamEndpointState::kUnauthorized) + UpstreamEndpointState State = Endpoint->GetState(); + if (State == UpstreamEndpointState::kError) + { + Endpoints.push_back(Endpoint.get()); + ZEN_WARN("HEALTH - endpoint '{} - {}' is in error state '{}'", + Endpoint->GetEndpointInfo().Name, + Endpoint->GetEndpointInfo().Url, + Endpoint->GetStatus().Reason); + } + if (State == UpstreamEndpointState::kUnauthorized) { Endpoints.push_back(Endpoint.get()); } |