aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/runners/functionrunner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencompute/runners/functionrunner.cpp')
-rw-r--r--src/zencompute/runners/functionrunner.cpp120
1 files changed, 99 insertions, 21 deletions
diff --git a/src/zencompute/runners/functionrunner.cpp b/src/zencompute/runners/functionrunner.cpp
index 67e12b84e..ab22c6363 100644
--- a/src/zencompute/runners/functionrunner.cpp
+++ b/src/zencompute/runners/functionrunner.cpp
@@ -6,9 +6,15 @@
# include <zencore/compactbinary.h>
# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/logging.h>
+# include <zencore/string.h>
+# include <zencore/timer.h>
# include <zencore/trace.h>
+# include <zencore/workthreadpool.h>
# include <fmt/format.h>
+# include <future>
# include <vector>
namespace zen::compute {
@@ -118,23 +124,34 @@ std::vector<SubmitResult>
BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
{
ZEN_TRACE_CPU("BaseRunnerGroup::SubmitActions");
- RwLock::SharedLockScope _(m_RunnersLock);
- const int RunnerCount = gsl::narrow<int>(m_Runners.size());
+ // Snapshot runners and query capacity under the lock, then release
+ // before submitting — HTTP submissions to remote runners can take
+ // hundreds of milliseconds and we must not hold m_RunnersLock during I/O.
- if (RunnerCount == 0)
- {
- return std::vector<SubmitResult>(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "No runners available"});
- }
+ std::vector<Ref<FunctionRunner>> Runners;
+ std::vector<size_t> Capacities;
+ std::vector<std::vector<Ref<RunnerAction>>> PerRunnerActions;
+ size_t TotalCapacity = 0;
- // Query capacity per runner and compute total
- std::vector<size_t> Capacities(RunnerCount);
- size_t TotalCapacity = 0;
+ m_RunnersLock.WithSharedLock([&] {
+ const int RunnerCount = gsl::narrow<int>(m_Runners.size());
+ Runners.assign(m_Runners.begin(), m_Runners.end());
+ Capacities.resize(RunnerCount);
+ PerRunnerActions.resize(RunnerCount);
- for (int i = 0; i < RunnerCount; ++i)
+ for (int i = 0; i < RunnerCount; ++i)
+ {
+ Capacities[i] = Runners[i]->QueryCapacity();
+ TotalCapacity += Capacities[i];
+ }
+ });
+
+ const int RunnerCount = gsl::narrow<int>(Runners.size());
+
+ if (RunnerCount == 0)
{
- Capacities[i] = m_Runners[i]->QueryCapacity();
- TotalCapacity += Capacities[i];
+ return std::vector<SubmitResult>(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "No runners available"});
}
if (TotalCapacity == 0)
@@ -143,9 +160,8 @@ BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
}
// Distribute actions across runners proportionally to their available capacity
- std::vector<std::vector<Ref<RunnerAction>>> PerRunnerActions(RunnerCount);
- std::vector<size_t> ActionRunnerIndex(Actions.size());
- size_t ActionIdx = 0;
+ std::vector<size_t> ActionRunnerIndex(Actions.size());
+ size_t ActionIdx = 0;
for (int i = 0; i < RunnerCount; ++i)
{
@@ -176,14 +192,74 @@ BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
}
}
- // Submit batches per runner
+ // Submit batches per runner — in parallel when a worker pool is available
+
std::vector<std::vector<SubmitResult>> PerRunnerResults(RunnerCount);
+ int ActiveRunnerCount = 0;
for (int i = 0; i < RunnerCount; ++i)
{
if (!PerRunnerActions[i].empty())
{
- PerRunnerResults[i] = m_Runners[i]->SubmitActions(PerRunnerActions[i]);
+ ++ActiveRunnerCount;
+ }
+ }
+
+ static constexpr uint64_t SubmitWarnThresholdMs = 500;
+
+ auto SubmitToRunner = [&](int RunnerIndex) {
+ auto& Runner = Runners[RunnerIndex];
+ Runner->m_LastSubmitStats.Reset();
+
+ Stopwatch Timer;
+
+ PerRunnerResults[RunnerIndex] = Runner->SubmitActions(PerRunnerActions[RunnerIndex]);
+
+ uint64_t ElapsedMs = Timer.GetElapsedTimeMs();
+ if (ElapsedMs >= SubmitWarnThresholdMs)
+ {
+ size_t Attachments = Runner->m_LastSubmitStats.TotalAttachments.load(std::memory_order_relaxed);
+ uint64_t AttachmentBytes = Runner->m_LastSubmitStats.TotalAttachmentBytes.load(std::memory_order_relaxed);
+
+ ZEN_WARN("submit of {} actions ({} attachments, {}) to '{}' took {}ms",
+ PerRunnerActions[RunnerIndex].size(),
+ Attachments,
+ NiceBytes(AttachmentBytes),
+ Runner->GetDisplayName(),
+ ElapsedMs);
+ }
+ };
+
+ if (m_WorkerPool && ActiveRunnerCount > 1)
+ {
+ std::vector<std::future<void>> Futures(RunnerCount);
+
+ for (int i = 0; i < RunnerCount; ++i)
+ {
+ if (!PerRunnerActions[i].empty())
+ {
+ std::packaged_task<void()> Task([&SubmitToRunner, i]() { SubmitToRunner(i); });
+
+ Futures[i] = m_WorkerPool->EnqueueTask(std::move(Task), WorkerThreadPool::EMode::EnableBacklog);
+ }
+ }
+
+ for (int i = 0; i < RunnerCount; ++i)
+ {
+ if (Futures[i].valid())
+ {
+ Futures[i].get();
+ }
+ }
+ }
+ else
+ {
+ for (int i = 0; i < RunnerCount; ++i)
+ {
+ if (!PerRunnerActions[i].empty())
+ {
+ SubmitToRunner(i);
+ }
}
}
@@ -309,10 +385,11 @@ RunnerAction::RetractAction()
bool
RunnerAction::ResetActionStateToPending()
{
- // Only allow reset from Failed, Abandoned, or Retracted states
+ // Only allow reset from Failed, Abandoned, Rejected, or Retracted states
State CurrentState = m_ActionState.load();
- if (CurrentState != State::Failed && CurrentState != State::Abandoned && CurrentState != State::Retracted)
+ if (CurrentState != State::Failed && CurrentState != State::Abandoned && CurrentState != State::Rejected &&
+ CurrentState != State::Retracted)
{
return false;
}
@@ -333,11 +410,12 @@ RunnerAction::ResetActionStateToPending()
// Clear execution fields
ExecutionLocation.clear();
+ FailureReason.clear();
CpuUsagePercent.store(-1.0f, std::memory_order_relaxed);
CpuSeconds.store(0.0f, std::memory_order_relaxed);
- // Increment retry count (skip for Retracted — nothing failed)
- if (CurrentState != State::Retracted)
+ // Increment retry count (skip for Retracted/Rejected — nothing failed)
+ if (CurrentState != State::Retracted && CurrentState != State::Rejected)
{
RetryCount.fetch_add(1, std::memory_order_relaxed);
}