aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/localrunner.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-02-27 12:05:41 +0100
committerStefan Boberg <[email protected]>2026-02-27 12:05:41 +0100
commitebdc8c19da54756a0969e2b2c624f2eae3ea8aff (patch)
treee4e2d06ee7f3f2dc989af0642877ed0c6c028876 /src/zencompute/localrunner.cpp
parentFunctionServiceSession::Impl::SubmitActions now uses the batching LocalProces... (diff)
downloadzen-ebdc8c19da54756a0969e2b2c624f2eae3ea8aff.tar.xz
zen-ebdc8c19da54756a0969e2b2c624f2eae3ea8aff.zip
made local action submits more async by introducing "submitting" state
Diffstat (limited to 'src/zencompute/localrunner.cpp')
-rw-r--r--src/zencompute/localrunner.cpp57
1 files changed, 40 insertions, 17 deletions
diff --git a/src/zencompute/localrunner.cpp b/src/zencompute/localrunner.cpp
index c167b6055..ceb1c19d4 100644
--- a/src/zencompute/localrunner.cpp
+++ b/src/zencompute/localrunner.cpp
@@ -181,14 +181,16 @@ LocalProcessRunner::QueryCapacity()
return 0;
}
- size_t RunningCount = m_RunningMap.size();
+ const size_t InFlightCount = m_RunningMap.size() + m_SubmittingCount.load(std::memory_order_relaxed);
- if (RunningCount >= size_t(m_MaxRunningActions))
+ if (const size_t MaxRunningActions = m_MaxRunningActions; InFlightCount >= MaxRunningActions)
{
return 0;
}
-
- return m_MaxRunningActions - RunningCount;
+ else
+ {
+ return MaxRunningActions - InFlightCount;
+ }
}
std::vector<SubmitResult>
@@ -206,26 +208,47 @@ LocalProcessRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
return Results;
}
- // For nontrivial batches, submit actions in parallel via the worker pool.
- // PrepareActionSubmission (sandbox creation, worker manifesting, input
- // writing) is heavily I/O-bound and benefits from overlapping.
+ // For nontrivial batches, check capacity upfront and accept what fits.
+ // Accepted actions are transitioned to Submitting and dispatched to the
+ // worker pool as fire-and-forget, so SubmitActions returns immediately
+ // and the scheduler thread is free to handle completions and updates.
- std::vector<std::future<SubmitResult>> Futures;
- Futures.reserve(Actions.size());
+ size_t Available = QueryCapacity();
- for (const Ref<RunnerAction>& Action : Actions)
+ std::vector<SubmitResult> Results(Actions.size());
+
+ size_t AcceptCount = std::min(Available, Actions.size());
+
+ for (size_t i = 0; i < AcceptCount; ++i)
{
- std::packaged_task<SubmitResult()> Task([this, Action]() { return SubmitAction(Action); });
+ const Ref<RunnerAction>& Action = Actions[i];
- Futures.push_back(m_WorkerPool.EnqueueTask(std::move(Task), WorkerThreadPool::EMode::EnableBacklog));
- }
+ Action->SetActionState(RunnerAction::State::Submitting);
+ m_SubmittingCount.fetch_add(1, std::memory_order_relaxed);
- std::vector<SubmitResult> Results;
- Results.reserve(Futures.size());
+ Results[i] = SubmitResult{.IsAccepted = true};
+
+ m_WorkerPool.ScheduleWork(
+ [this, Action]() {
+ auto CountGuard = MakeGuard([this] { m_SubmittingCount.fetch_sub(1, std::memory_order_relaxed); });
+
+ SubmitResult Result = SubmitAction(Action);
+
+ if (!Result.IsAccepted)
+ {
+ // This might require another state? We should
+ // distinguish between outright rejections (e.g. invalid action)
+ // and transient failures (e.g. failed to launch process) which might
+ // be retried by the scheduler, but for now just fail the action
+ Action->SetActionState(RunnerAction::State::Failed);
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
- for (auto& Future : Futures)
+ for (size_t i = AcceptCount; i < Actions.size(); ++i)
{
- Results.push_back(Future.get());
+ Results[i] = SubmitResult{.IsAccepted = false};
}
return Results;