aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/upstreamcache.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-08-19 03:30:54 -0700
committerGitHub <[email protected]>2022-08-19 03:30:54 -0700
commit606274a83d71928f8621c1d23648a26e8f79fa7d (patch)
treed951d51e1a7919463aeec398612e97b47f99f788 /zenserver/upstream/upstreamcache.cpp
parentbump vcpkg version to 2022.08.15 (#146) (diff)
downloadzen-606274a83d71928f8621c1d23648a26e8f79fa7d.tar.xz
zen-606274a83d71928f8621c1d23648a26e8f79fa7d.zip
De/fix crash on non responding upstream (#145)
* Fix ZenStructuredCacheClient lifetime issues
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
-rw-r--r--zenserver/upstream/upstreamcache.cpp60
1 files changed, 47 insertions, 13 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 7d1f72004..40d627862 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -717,10 +717,15 @@ namespace detail {
const ZenEndpoint& Ep = GetEndpoint();
- m_Info.Url = Ep.Url;
+ if (m_Info.Url != Ep.Url)
+ {
+ ZEN_INFO("Setting Zen upstream URL to '{}'", Ep.Url);
+ m_Info.Url = Ep.Url;
+ }
if (Ep.Ok)
{
+ RwLock::ExclusiveLockScope _(m_ClientLock);
m_Client = new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout});
m_Status.Set(UpstreamEndpointState::kOk);
}
@@ -749,7 +754,7 @@ namespace detail {
try
{
- ZenStructuredCacheSession Session(*m_Client);
+ ZenStructuredCacheSession Session(GetClientRef());
const ZenCacheResult Result = Session.GetCacheRecord(Namespace, CacheKey.Bucket, CacheKey.Hash, Type);
m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
@@ -820,7 +825,7 @@ namespace detail {
ZenCacheResult Result;
{
- ZenStructuredCacheSession Session(*m_Client);
+ ZenStructuredCacheSession Session(GetClientRef());
Result = Session.InvokeRpc(BatchRequest.Save());
}
@@ -864,7 +869,7 @@ namespace detail {
try
{
- ZenStructuredCacheSession Session(*m_Client);
+ ZenStructuredCacheSession Session(GetClientRef());
const ZenCacheResult Result = Session.GetCacheValue(Namespace, CacheKey.Bucket, CacheKey.Hash, ValueContentId);
m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason);
@@ -952,7 +957,7 @@ namespace detail {
ZenCacheResult Result;
{
- ZenStructuredCacheSession Session(*m_Client);
+ ZenStructuredCacheSession Session(GetClientRef());
Result = Session.InvokeRpc(BatchRequest.Save());
}
@@ -1028,7 +1033,7 @@ namespace detail {
try
{
- ZenStructuredCacheSession Session(*m_Client);
+ ZenStructuredCacheSession Session(GetClientRef());
ZenCacheResult Result;
int64_t TotalBytes = 0ull;
double TotalElapsedSeconds = 0.0;
@@ -1180,12 +1185,24 @@ namespace detail {
virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
private:
+ Ref<ZenStructuredCacheClient> GetClientRef()
+ {
+ // m_Client can be modified at any time by a different thread.
+ // Make sure we safely bump the refcount inside a scope lock
+ RwLock::SharedLockScope _(m_ClientLock);
+ ZEN_ASSERT(m_Client);
+ Ref<ZenStructuredCacheClient> ClientRef(m_Client);
+ _.ReleaseNow();
+ return ClientRef;
+ }
+
const ZenEndpoint& GetEndpoint()
{
for (ZenEndpoint& Ep : m_Endpoints)
{
- ZenStructuredCacheClient Client({.Url = Ep.Url, .ConnectTimeout = std::chrono::milliseconds(1000)});
- ZenStructuredCacheSession Session(Client);
+ Ref<ZenStructuredCacheClient> Client(
+ new ZenStructuredCacheClient({.Url = Ep.Url, .ConnectTimeout = std::chrono::milliseconds(1000)}));
+ ZenStructuredCacheSession Session(std::move(Client));
const int32_t SampleCount = 2;
Ep.Ok = false;
@@ -1220,6 +1237,7 @@ namespace detail {
std::vector<ZenEndpoint> m_Endpoints;
std::chrono::milliseconds m_ConnectTimeout;
std::chrono::milliseconds m_Timeout;
+ RwLock m_ClientLock;
RefPtr<ZenStructuredCacheClient> m_Client;
};
@@ -1582,7 +1600,10 @@ private:
if (!m_CacheStore.Get(CacheRecord.Namespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, CacheValue))
{
- ZEN_WARN("process upstream FAILED, '{}/{}', cache record doesn't exist", CacheRecord.Key.Bucket, CacheRecord.Key.Hash);
+ ZEN_WARN("process upstream FAILED, '{}/{}/{}', cache record doesn't exist",
+ CacheRecord.Namespace,
+ CacheRecord.Key.Bucket,
+ CacheRecord.Key.Hash);
return;
}
@@ -1623,7 +1644,8 @@ private:
if (!Result.Success)
{
- ZEN_WARN("upload cache record '{}/{}' FAILED, endpoint '{}', reason '{}'",
+ ZEN_WARN("upload cache record '{}/{}/{}' FAILED, endpoint '{}', reason '{}'",
+ CacheRecord.Namespace,
CacheRecord.Key.Bucket,
CacheRecord.Key.Hash,
Endpoint->GetEndpointInfo().Url,
@@ -1645,7 +1667,11 @@ private:
}
catch (std::exception& Err)
{
- ZEN_ERROR("upload cache record '{}/{}' FAILED, reason '{}'", CacheRecord.Key.Bucket, CacheRecord.Key.Hash, Err.what());
+ ZEN_ERROR("upload cache record '{}/{}/{}' FAILED, reason '{}'",
+ CacheRecord.Namespace,
+ CacheRecord.Key.Bucket,
+ CacheRecord.Key.Hash,
+ Err.what());
}
}
@@ -1677,8 +1703,16 @@ private:
for (auto& Endpoint : m_Endpoints)
{
- if (Endpoint->GetState() == UpstreamEndpointState::kError ||
- Endpoint->GetState() == UpstreamEndpointState::kUnauthorized)
+ UpstreamEndpointState State = Endpoint->GetState();
+ if (State == UpstreamEndpointState::kError)
+ {
+ Endpoints.push_back(Endpoint.get());
+ ZEN_WARN("HEALTH - endpoint '{} - {}' is in error state '{}'",
+ Endpoint->GetEndpointInfo().Name,
+ Endpoint->GetEndpointInfo().Url,
+ Endpoint->GetStatus().Reason);
+ }
+ if (State == UpstreamEndpointState::kUnauthorized)
{
Endpoints.push_back(Endpoint.get());
}