aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/upstreamapply.cpp
diff options
context:
space:
mode:
authorJoe Kirchoff <[email protected]>2022-04-14 13:29:45 -0700
committerGitHub <[email protected]>2022-04-14 13:29:45 -0700
commitce6d3a69c18c74359f9a59002b0e69334e096223 (patch)
tree6d5c160061ba62ba37f572029ac45839506aea8c /zenserver/upstream/upstreamapply.cpp
parentCompute updates (#74) (diff)
downloadzen-ce6d3a69c18c74359f9a59002b0e69334e096223.tar.xz
zen-ce6d3a69c18c74359f9a59002b0e69334e096223.zip
Horde execute storage config (#75)
Diffstat (limited to 'zenserver/upstream/upstreamapply.cpp')
-rw-r--r--zenserver/upstream/upstreamapply.cpp17
1 files changed, 11 insertions, 6 deletions
diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp
index 45515067d..9758e7565 100644
--- a/zenserver/upstream/upstreamapply.cpp
+++ b/zenserver/upstream/upstreamapply.cpp
@@ -83,7 +83,8 @@ public:
, m_CasStore(CasStore)
, m_CidStore(CidStore)
, m_Stats(Options.StatsEnabled)
- , m_AsyncWorkPool(Options.ThreadCount)
+ , m_UpstreamAsyncWorkPool(Options.UpstreamThreadCount)
+ , m_DownstreamAsyncWorkPool(Options.DownstreamThreadCount)
{
}
@@ -147,7 +148,8 @@ public:
}
ApplyRecord.Timepoints["zen-queue-added"] = DateTime::NowTicks();
- m_AsyncWorkPool.ScheduleWork([this, ApplyRecord = std::move(ApplyRecord)]() { ProcessApplyRecord(std::move(ApplyRecord)); });
+ m_UpstreamAsyncWorkPool.ScheduleWork(
+ [this, ApplyRecord = std::move(ApplyRecord)]() { ProcessApplyRecord(std::move(ApplyRecord)); });
return {.ApplyId = ActionId, .Success = true};
}
@@ -171,8 +173,10 @@ public:
virtual void GetStatus(CbObjectWriter& Status) override
{
- Status << "worker_threads" << m_Options.ThreadCount;
- Status << "queue_count" << m_AsyncWorkPool.PendingWork();
+ Status << "upstream_worker_threads" << m_Options.UpstreamThreadCount;
+ Status << "upstream_queue_count" << m_UpstreamAsyncWorkPool.PendingWork();
+ Status << "downstream_worker_threads" << m_Options.DownstreamThreadCount;
+ Status << "downstream_queue_count" << m_DownstreamAsyncWorkPool.PendingWork();
Status.BeginArray("endpoints");
for (const auto& Ep : m_Endpoints)
@@ -280,7 +284,7 @@ private:
{
if (Endpoint->IsHealthy())
{
- GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates(m_AsyncWorkPool);
+ GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates(m_DownstreamAsyncWorkPool);
m_Stats.Add(*Endpoint, Result);
if (!Result.Success)
@@ -416,7 +420,8 @@ private:
std::mutex m_ApplyTasksMutex;
std::vector<std::unique_ptr<UpstreamApplyEndpoint>> m_Endpoints;
Event m_ShutdownEvent;
- WorkerThreadPool m_AsyncWorkPool;
+ WorkerThreadPool m_UpstreamAsyncWorkPool;
+ WorkerThreadPool m_DownstreamAsyncWorkPool;
std::thread m_UpstreamUpdatesThread;
std::thread m_EndpointMonitorThread;
RunState m_RunState;