From ebdc8c19da54756a0969e2b2c624f2eae3ea8aff Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Fri, 27 Feb 2026 12:05:41 +0100 Subject: made local action submits more async by introducing "submitting" state --- src/zencompute/localrunner.cpp | 57 +++++++++++++++++++++++++++++------------- 1 file changed, 40 insertions(+), 17 deletions(-) (limited to 'src/zencompute/localrunner.cpp') diff --git a/src/zencompute/localrunner.cpp b/src/zencompute/localrunner.cpp index c167b6055..ceb1c19d4 100644 --- a/src/zencompute/localrunner.cpp +++ b/src/zencompute/localrunner.cpp @@ -181,14 +181,16 @@ LocalProcessRunner::QueryCapacity() return 0; } - size_t RunningCount = m_RunningMap.size(); + const size_t InFlightCount = m_RunningMap.size() + m_SubmittingCount.load(std::memory_order_relaxed); - if (RunningCount >= size_t(m_MaxRunningActions)) + if (const size_t MaxRunningActions = m_MaxRunningActions; InFlightCount >= MaxRunningActions) { return 0; } - - return m_MaxRunningActions - RunningCount; + else + { + return MaxRunningActions - InFlightCount; + } } std::vector @@ -206,26 +208,47 @@ LocalProcessRunner::SubmitActions(const std::vector>& Actions) return Results; } - // For nontrivial batches, submit actions in parallel via the worker pool. - // PrepareActionSubmission (sandbox creation, worker manifesting, input - // writing) is heavily I/O-bound and benefits from overlapping. + // For nontrivial batches, check capacity upfront and accept what fits. + // Accepted actions are transitioned to Submitting and dispatched to the + // worker pool as fire-and-forget, so SubmitActions returns immediately + // and the scheduler thread is free to handle completions and updates. - std::vector> Futures; - Futures.reserve(Actions.size()); + size_t Available = QueryCapacity(); - for (const Ref& Action : Actions) + std::vector Results(Actions.size()); + + size_t AcceptCount = std::min(Available, Actions.size()); + + for (size_t i = 0; i < AcceptCount; ++i) { - std::packaged_task Task([this, Action]() { return SubmitAction(Action); }); + const Ref& Action = Actions[i]; - Futures.push_back(m_WorkerPool.EnqueueTask(std::move(Task), WorkerThreadPool::EMode::EnableBacklog)); - } + Action->SetActionState(RunnerAction::State::Submitting); + m_SubmittingCount.fetch_add(1, std::memory_order_relaxed); - std::vector Results; - Results.reserve(Futures.size()); + Results[i] = SubmitResult{.IsAccepted = true}; + + m_WorkerPool.ScheduleWork( + [this, Action]() { + auto CountGuard = MakeGuard([this] { m_SubmittingCount.fetch_sub(1, std::memory_order_relaxed); }); + + SubmitResult Result = SubmitAction(Action); + + if (!Result.IsAccepted) + { + // This might require another state? We should + // distinguish between outright rejections (e.g. invalid action) + // and transient failures (e.g. failed to launch process) which might + // be retried by the scheduler, but for now just fail the action + Action->SetActionState(RunnerAction::State::Failed); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } - for (auto& Future : Futures) + for (size_t i = AcceptCount; i < Actions.size(); ++i) { - Results.push_back(Future.get()); + Results[i] = SubmitResult{.IsAccepted = false}; } return Results; -- cgit v1.2.3