diff options
| author | Stefan Boberg <[email protected]> | 2026-02-27 12:05:41 +0100 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2026-02-27 12:05:41 +0100 |
| commit | ebdc8c19da54756a0969e2b2c624f2eae3ea8aff (patch) | |
| tree | e4e2d06ee7f3f2dc989af0642877ed0c6c028876 /src/zencompute/localrunner.cpp | |
| parent | FunctionServiceSession::Impl::SubmitActions now uses the batching LocalProces... (diff) | |
| download | zen-ebdc8c19da54756a0969e2b2c624f2eae3ea8aff.tar.xz zen-ebdc8c19da54756a0969e2b2c624f2eae3ea8aff.zip | |
made local action submits more async by introducing "submitting" state
Diffstat (limited to 'src/zencompute/localrunner.cpp')
| -rw-r--r-- | src/zencompute/localrunner.cpp | 57 |
1 files changed, 40 insertions, 17 deletions
diff --git a/src/zencompute/localrunner.cpp b/src/zencompute/localrunner.cpp index c167b6055..ceb1c19d4 100644 --- a/src/zencompute/localrunner.cpp +++ b/src/zencompute/localrunner.cpp @@ -181,14 +181,16 @@ LocalProcessRunner::QueryCapacity() return 0; } - size_t RunningCount = m_RunningMap.size(); + const size_t InFlightCount = m_RunningMap.size() + m_SubmittingCount.load(std::memory_order_relaxed); - if (RunningCount >= size_t(m_MaxRunningActions)) + if (const size_t MaxRunningActions = m_MaxRunningActions; InFlightCount >= MaxRunningActions) { return 0; } - - return m_MaxRunningActions - RunningCount; + else + { + return MaxRunningActions - InFlightCount; + } } std::vector<SubmitResult> @@ -206,26 +208,47 @@ LocalProcessRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) return Results; } - // For nontrivial batches, submit actions in parallel via the worker pool. - // PrepareActionSubmission (sandbox creation, worker manifesting, input - // writing) is heavily I/O-bound and benefits from overlapping. + // For nontrivial batches, check capacity upfront and accept what fits. + // Accepted actions are transitioned to Submitting and dispatched to the + // worker pool as fire-and-forget, so SubmitActions returns immediately + // and the scheduler thread is free to handle completions and updates. - std::vector<std::future<SubmitResult>> Futures; - Futures.reserve(Actions.size()); + size_t Available = QueryCapacity(); - for (const Ref<RunnerAction>& Action : Actions) + std::vector<SubmitResult> Results(Actions.size()); + + size_t AcceptCount = std::min(Available, Actions.size()); + + for (size_t i = 0; i < AcceptCount; ++i) { - std::packaged_task<SubmitResult()> Task([this, Action]() { return SubmitAction(Action); }); + const Ref<RunnerAction>& Action = Actions[i]; - Futures.push_back(m_WorkerPool.EnqueueTask(std::move(Task), WorkerThreadPool::EMode::EnableBacklog)); - } + Action->SetActionState(RunnerAction::State::Submitting); + m_SubmittingCount.fetch_add(1, std::memory_order_relaxed); - std::vector<SubmitResult> Results; - Results.reserve(Futures.size()); + Results[i] = SubmitResult{.IsAccepted = true}; + + m_WorkerPool.ScheduleWork( + [this, Action]() { + auto CountGuard = MakeGuard([this] { m_SubmittingCount.fetch_sub(1, std::memory_order_relaxed); }); + + SubmitResult Result = SubmitAction(Action); + + if (!Result.IsAccepted) + { + // This might require another state? We should + // distinguish between outright rejections (e.g. invalid action) + // and transient failures (e.g. failed to launch process) which might + // be retried by the scheduler, but for now just fail the action + Action->SetActionState(RunnerAction::State::Failed); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } - for (auto& Future : Futures) + for (size_t i = AcceptCount; i < Actions.size(); ++i) { - Results.push_back(Future.get()); + Results[i] = SubmitResult{.IsAccepted = false}; } return Results; |