From ab9ff98c693490db3dd5bc2bb89c774699d86ad3 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Thu, 2 Apr 2026 18:54:47 +0200 Subject: Improve compute scheduler throughput and reduce lock contention - Parallelize per-runner SubmitActions via WorkerThreadPool when multiple runners are active - Make remote submissions non-blocking: dispatch to pool and return immediately, marking actions as Submitting so the scheduler won't re-pick them; failures transition to Failed for auto-retry - Downgrade m_ActionMapLock from exclusive to shared in SchedulePendingActions and move sort/resize outside the lock - Release m_RunnersLock before submitting by snapshotting runners as Ref copies, avoiding holding the lock during HTTP I/O - Add per-runner submit timing warning (>500ms) with host context - Add GetDisplayName() virtual to FunctionRunner for diagnostics --- src/zencompute/runners/functionrunner.cpp | 100 +++++++++++++++++++++++++----- 1 file changed, 83 insertions(+), 17 deletions(-) (limited to 'src/zencompute/runners/functionrunner.cpp') diff --git a/src/zencompute/runners/functionrunner.cpp b/src/zencompute/runners/functionrunner.cpp index 67e12b84e..f8a54b3f8 100644 --- a/src/zencompute/runners/functionrunner.cpp +++ b/src/zencompute/runners/functionrunner.cpp @@ -6,9 +6,13 @@ # include # include +# include +# include # include +# include # include +# include # include namespace zen::compute { @@ -118,23 +122,34 @@ std::vector BaseRunnerGroup::SubmitActions(const std::vector>& Actions) { ZEN_TRACE_CPU("BaseRunnerGroup::SubmitActions"); - RwLock::SharedLockScope _(m_RunnersLock); - const int RunnerCount = gsl::narrow(m_Runners.size()); + // Snapshot runners and query capacity under the lock, then release + // before submitting — HTTP submissions to remote runners can take + // hundreds of milliseconds and we must not hold m_RunnersLock during I/O. - if (RunnerCount == 0) - { - return std::vector(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "No runners available"}); - } + std::vector> Runners; + std::vector Capacities; + std::vector>> PerRunnerActions; + size_t TotalCapacity = 0; - // Query capacity per runner and compute total - std::vector Capacities(RunnerCount); - size_t TotalCapacity = 0; + m_RunnersLock.WithSharedLock([&] { + const int RunnerCount = gsl::narrow(m_Runners.size()); + Runners.assign(m_Runners.begin(), m_Runners.end()); + Capacities.resize(RunnerCount); + PerRunnerActions.resize(RunnerCount); - for (int i = 0; i < RunnerCount; ++i) + for (int i = 0; i < RunnerCount; ++i) + { + Capacities[i] = Runners[i]->QueryCapacity(); + TotalCapacity += Capacities[i]; + } + }); + + const int RunnerCount = gsl::narrow(Runners.size()); + + if (RunnerCount == 0) { - Capacities[i] = m_Runners[i]->QueryCapacity(); - TotalCapacity += Capacities[i]; + return std::vector(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "No runners available"}); } if (TotalCapacity == 0) @@ -143,9 +158,8 @@ BaseRunnerGroup::SubmitActions(const std::vector>& Actions) } // Distribute actions across runners proportionally to their available capacity - std::vector>> PerRunnerActions(RunnerCount); - std::vector ActionRunnerIndex(Actions.size()); - size_t ActionIdx = 0; + std::vector ActionRunnerIndex(Actions.size()); + size_t ActionIdx = 0; for (int i = 0; i < RunnerCount; ++i) { @@ -176,14 +190,66 @@ BaseRunnerGroup::SubmitActions(const std::vector>& Actions) } } - // Submit batches per runner + // Submit batches per runner — in parallel when a worker pool is available + std::vector> PerRunnerResults(RunnerCount); + int ActiveRunnerCount = 0; for (int i = 0; i < RunnerCount; ++i) { if (!PerRunnerActions[i].empty()) { - PerRunnerResults[i] = m_Runners[i]->SubmitActions(PerRunnerActions[i]); + ++ActiveRunnerCount; + } + } + + static constexpr uint64_t SubmitWarnThresholdMs = 500; + + auto SubmitToRunner = [&](int RunnerIndex) { + Stopwatch Timer; + + PerRunnerResults[RunnerIndex] = Runners[RunnerIndex]->SubmitActions(PerRunnerActions[RunnerIndex]); + + uint64_t ElapsedMs = Timer.GetElapsedTimeMs(); + if (ElapsedMs >= SubmitWarnThresholdMs) + { + ZEN_WARN("submit of {} actions to '{}' took {}ms", + PerRunnerActions[RunnerIndex].size(), + Runners[RunnerIndex]->GetDisplayName(), + ElapsedMs); + } + }; + + if (m_WorkerPool && ActiveRunnerCount > 1) + { + std::vector> Futures(RunnerCount); + + for (int i = 0; i < RunnerCount; ++i) + { + if (!PerRunnerActions[i].empty()) + { + std::packaged_task Task([&SubmitToRunner, i]() { SubmitToRunner(i); }); + + Futures[i] = m_WorkerPool->EnqueueTask(std::move(Task), WorkerThreadPool::EMode::EnableBacklog); + } + } + + for (int i = 0; i < RunnerCount; ++i) + { + if (Futures[i].valid()) + { + Futures[i].get(); + } + } + } + else + { + for (int i = 0; i < RunnerCount; ++i) + { + if (!PerRunnerActions[i].empty()) + { + SubmitToRunner(i); + } } } -- cgit v1.2.3