diff options
| author | Joe Kirchoff <[email protected]> | 2022-04-14 13:29:45 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-04-14 13:29:45 -0700 |
| commit | ce6d3a69c18c74359f9a59002b0e69334e096223 (patch) | |
| tree | 6d5c160061ba62ba37f572029ac45839506aea8c /zenserver/upstream/upstreamapply.cpp | |
| parent | Compute updates (#74) (diff) | |
| download | zen-ce6d3a69c18c74359f9a59002b0e69334e096223.tar.xz zen-ce6d3a69c18c74359f9a59002b0e69334e096223.zip | |
Horde execute storage config (#75)
Diffstat (limited to 'zenserver/upstream/upstreamapply.cpp')
| -rw-r--r-- | zenserver/upstream/upstreamapply.cpp | 17 |
1 files changed, 11 insertions, 6 deletions
diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp index 45515067d..9758e7565 100644 --- a/zenserver/upstream/upstreamapply.cpp +++ b/zenserver/upstream/upstreamapply.cpp @@ -83,7 +83,8 @@ public: , m_CasStore(CasStore) , m_CidStore(CidStore) , m_Stats(Options.StatsEnabled) - , m_AsyncWorkPool(Options.ThreadCount) + , m_UpstreamAsyncWorkPool(Options.UpstreamThreadCount) + , m_DownstreamAsyncWorkPool(Options.DownstreamThreadCount) { } @@ -147,7 +148,8 @@ public: } ApplyRecord.Timepoints["zen-queue-added"] = DateTime::NowTicks(); - m_AsyncWorkPool.ScheduleWork([this, ApplyRecord = std::move(ApplyRecord)]() { ProcessApplyRecord(std::move(ApplyRecord)); }); + m_UpstreamAsyncWorkPool.ScheduleWork( + [this, ApplyRecord = std::move(ApplyRecord)]() { ProcessApplyRecord(std::move(ApplyRecord)); }); return {.ApplyId = ActionId, .Success = true}; } @@ -171,8 +173,10 @@ public: virtual void GetStatus(CbObjectWriter& Status) override { - Status << "worker_threads" << m_Options.ThreadCount; - Status << "queue_count" << m_AsyncWorkPool.PendingWork(); + Status << "upstream_worker_threads" << m_Options.UpstreamThreadCount; + Status << "upstream_queue_count" << m_UpstreamAsyncWorkPool.PendingWork(); + Status << "downstream_worker_threads" << m_Options.DownstreamThreadCount; + Status << "downstream_queue_count" << m_DownstreamAsyncWorkPool.PendingWork(); Status.BeginArray("endpoints"); for (const auto& Ep : m_Endpoints) @@ -280,7 +284,7 @@ private: { if (Endpoint->IsHealthy()) { - GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates(m_AsyncWorkPool); + GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates(m_DownstreamAsyncWorkPool); m_Stats.Add(*Endpoint, Result); if (!Result.Success) @@ -416,7 +420,8 @@ private: std::mutex m_ApplyTasksMutex; std::vector<std::unique_ptr<UpstreamApplyEndpoint>> m_Endpoints; Event m_ShutdownEvent; - WorkerThreadPool m_AsyncWorkPool; + WorkerThreadPool m_UpstreamAsyncWorkPool; + WorkerThreadPool m_DownstreamAsyncWorkPool; std::thread m_UpstreamUpdatesThread; std::thread m_EndpointMonitorThread; RunState m_RunState; |