aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/runners/functionrunner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencompute/runners/functionrunner.cpp')
-rw-r--r--src/zencompute/runners/functionrunner.cpp44
1 files changed, 38 insertions, 6 deletions
diff --git a/src/zencompute/runners/functionrunner.cpp b/src/zencompute/runners/functionrunner.cpp
index 768cdf1e1..4f116e7d8 100644
--- a/src/zencompute/runners/functionrunner.cpp
+++ b/src/zencompute/runners/functionrunner.cpp
@@ -215,15 +215,22 @@ BaseRunnerGroup::GetSubmittedActionCount()
return TotalCount;
}
-void
+bool
BaseRunnerGroup::RegisterWorker(CbPackage Worker)
{
RwLock::SharedLockScope _(m_RunnersLock);
+ bool AllSucceeded = true;
+
for (auto& Runner : m_Runners)
{
- Runner->RegisterWorker(Worker);
+ if (!Runner->RegisterWorker(Worker))
+ {
+ AllSucceeded = false;
+ }
}
+
+ return AllSucceeded;
}
void
@@ -276,12 +283,34 @@ RunnerAction::~RunnerAction()
}
bool
+RunnerAction::RetractAction()
+{
+ State CurrentState = m_ActionState.load();
+
+ do
+ {
+ // Only allow retraction from pre-terminal states (idempotent if already retracted)
+ if (CurrentState > State::Running)
+ {
+ return CurrentState == State::Retracted;
+ }
+
+ if (m_ActionState.compare_exchange_strong(CurrentState, State::Retracted))
+ {
+ this->Timestamps[static_cast<int>(State::Retracted)] = DateTime::Now().GetTicks();
+ m_OwnerSession->PostUpdate(this);
+ return true;
+ }
+ } while (true);
+}
+
+bool
RunnerAction::ResetActionStateToPending()
{
- // Only allow reset from Failed or Abandoned states
+ // Only allow reset from Failed, Abandoned, or Retracted states
State CurrentState = m_ActionState.load();
- if (CurrentState != State::Failed && CurrentState != State::Abandoned)
+ if (CurrentState != State::Failed && CurrentState != State::Abandoned && CurrentState != State::Retracted)
{
return false;
}
@@ -305,8 +334,11 @@ RunnerAction::ResetActionStateToPending()
CpuUsagePercent.store(-1.0f, std::memory_order_relaxed);
CpuSeconds.store(0.0f, std::memory_order_relaxed);
- // Increment retry count
- RetryCount.fetch_add(1, std::memory_order_relaxed);
+ // Increment retry count (skip for Retracted — nothing failed)
+ if (CurrentState != State::Retracted)
+ {
+ RetryCount.fetch_add(1, std::memory_order_relaxed);
+ }
// Re-enter the scheduler pipeline
m_OwnerSession->PostUpdate(this);