aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/upstream/upstreamcache.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-09-27 13:23:59 +0200
committerGitHub <[email protected]>2023-09-27 13:23:59 +0200
commit369c595b9bc62e1620f40aea21f6457512a7922f (patch)
treed86e63b56483c8ded6bb8d6e63bf37f239e13894 /src/zenserver/upstream/upstreamcache.cpp
parentflush block store blocks when finished (#425) (diff)
downloadzen-369c595b9bc62e1620f40aea21f6457512a7922f.tar.xz
zen-369c595b9bc62e1620f40aea21f6457512a7922f.zip
chache upstream stats improved (#426)
* cache upstream stats improved * fix exit for monitor thread
Diffstat (limited to 'src/zenserver/upstream/upstreamcache.cpp')
-rw-r--r--src/zenserver/upstream/upstreamcache.cpp26
1 files changed, 16 insertions, 10 deletions
diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp
index 15f68f70c..ac647130e 100644
--- a/src/zenserver/upstream/upstreamcache.cpp
+++ b/src/zenserver/upstream/upstreamcache.cpp
@@ -1494,13 +1494,7 @@ public:
{
ZEN_TRACE_CPU("Upstream::Initialize");
- for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++)
- {
- m_UpstreamThreads.emplace_back(&UpstreamCacheImpl::ProcessUpstreamQueue, this, Idx + 1);
- }
-
- m_EndpointMonitorThread = std::thread(&UpstreamCacheImpl::MonitorEndpoints, this);
- m_RunState.IsRunning = true;
+ m_RunState.IsRunning = true;
}
virtual bool IsActive() override
@@ -1527,6 +1521,15 @@ public:
// Register endpoint even if it fails, the health monitor thread will probe failing endpoint(s)
std::unique_lock<std::shared_mutex> _(m_EndpointsMutex);
+ if (m_Endpoints.empty())
+ {
+ for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++)
+ {
+ m_UpstreamThreads.emplace_back(&UpstreamCacheImpl::ProcessUpstreamQueue, this, Idx + 1);
+ }
+
+ m_EndpointMonitorThread = std::thread(&UpstreamCacheImpl::MonitorEndpoints, this);
+ }
m_Endpoints.emplace_back(std::move(Endpoint));
}
@@ -1856,9 +1859,10 @@ public:
{
ZEN_TRACE_CPU("Upstream::GetStatus");
+ Status << "active" << IsActive();
Status << "reading" << m_Options.ReadUpstream;
Status << "writing" << m_Options.WriteUpstream;
- Status << "worker_threads" << m_Options.ThreadCount;
+ Status << "worker_threads" << m_UpstreamThreads.size();
Status << "queue_count" << m_UpstreamQueue.Size();
Status.BeginArray("endpoints");
@@ -2065,8 +2069,10 @@ private:
{
Thread.join();
}
-
- m_EndpointMonitorThread.join();
+ if (m_EndpointMonitorThread.joinable())
+ {
+ m_EndpointMonitorThread.join();
+ }
m_UpstreamThreads.clear();
m_Endpoints.clear();
}