aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/runners/functionrunner.h
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-18 11:19:10 +0100
committerGitHub Enterprise <[email protected]>2026-03-18 11:19:10 +0100
commiteba410c4168e23d7908827eb34b7cf0c58a5dc48 (patch)
tree3cda8e8f3f81941d3bb5b84a8155350c5bb2068c /src/zencompute/runners/functionrunner.h
parentbugfix release - v5.7.23 (#851) (diff)
downloadzen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.tar.xz
zen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.zip
Compute batching (#849)
### Compute Batch Submission - Consolidate duplicated action submission logic in `httpcomputeservice` into a single `HandleSubmitAction` supporting both single-action and batch (actions array) payloads - Group actions by queue in `RemoteHttpRunner` and submit as batches with configurable chunk size, falling back to individual submission on failure - Extract shared helpers: `MakeErrorResult`, `ValidateQueueForEnqueue`, `ActivateActionInQueue`, `RemoveActionFromActiveMaps` ### Retracted Action State - Add `Retracted` state to `RunnerAction` for retry-free rescheduling — an explicit request to pull an action back and reschedule it on a different runner without incrementing `RetryCount` - Implement idempotent `RetractAction()` on `RunnerAction` and `ComputeServiceSession` - Add `POST jobs/{lsn}/retract` and `queues/{queueref}/jobs/{lsn}/retract` HTTP endpoints - Add state machine documentation and per-state comments to `RunnerAction` ### Compute Race Fixes - Fix race in `HandleActionUpdates` where actions enqueued between session abandon and scheduler tick were never abandoned, causing `GetActionResult` to return 202 indefinitely - Fix queue `ActiveCount` race where `NotifyQueueActionComplete` was called after releasing `m_ResultsLock`, allowing callers to observe stale counters immediately after `GetActionResult` returned OK ### Logging Optimization and ANSI improvements - Improve `AnsiColorStdoutSink` write efficiency — single write call, dirty-flag flush, `RwLock` instead of `std::mutex` - Move ANSI color emission from sink into formatters via `Formatter::SetColorEnabled()`; remove `ColorRangeStart`/`End` from `LogMessage` - Extract color helpers (`AnsiColorForLevel`, `StripAnsiSgrSequences`) into `helpers.h` - Strip upstream ANSI SGR escapes in non-color output mode. This enables colour in log messages without polluting log files with ANSI control sequences - Move `RotatingFileSink`, `JsonFormatter`, and `FullFormatter` from header-only to pimpl with `.cpp` files ### CLI / Exec Refactoring - Extract `ExecSessionRunner` class from ~920-line `ExecUsingSession` into focused methods and a `ExecSessionConfig` struct - Replace monolithic `ExecCommand` with subcommand-based architecture (`http`, `inproc`, `beacon`, `dump`, `buildlog`) - Allow parent options to appear after subcommand name by parsing subcommand args permissively and forwarding unmatched tokens to the parent parser ### Testing Improvements - Fix `--test-suite` filter being ignored due to accumulation with default wildcard filter - Add test suite banners to test listener output - Made `function.session.abandon_pending` test more robust ### Startup / Reliability Fixes - Fix silent exit when a second zenserver instance detects a port conflict — use `ZEN_CONSOLE_*` for log calls that precede `InitializeLogging()` - Fix two potential SIGSEGV paths during early startup: guard `sentry_options_new()` returning nullptr, and throw on `ZenServerState::Register()` returning nullptr instead of dereferencing - Fail on unrecognized zenserver `--mode` instead of silently defaulting to store ### Other - Show host details (hostname, platform, CPU count, memory) when discovering new compute workers - Move frontend `html.zip` from source tree into build directory - Add format specifications for Compact Binary and Compressed Buffer wire formats - Add `WriteCompactBinaryObject` to zencore - Extended `ConsoleTui` with additional functionality - Add `--vscode` option to `xmake sln` for clangd / `compile_commands.json` support - Disable compute/horde/nomad in release builds (not yet production-ready) - Disable unintended `ASIO_HAS_IO_URING` enablement - Fix crashpad patch missing leading whitespace - Clean up code triggering gcc false positives
Diffstat (limited to 'src/zencompute/runners/functionrunner.h')
-rw-r--r--src/zencompute/runners/functionrunner.h53
1 files changed, 42 insertions, 11 deletions
diff --git a/src/zencompute/runners/functionrunner.h b/src/zencompute/runners/functionrunner.h
index f67414dbb..56c3f3af0 100644
--- a/src/zencompute/runners/functionrunner.h
+++ b/src/zencompute/runners/functionrunner.h
@@ -29,8 +29,8 @@ public:
FunctionRunner(std::filesystem::path BasePath);
virtual ~FunctionRunner() = 0;
- virtual void Shutdown() = 0;
- virtual void RegisterWorker(const CbPackage& WorkerPackage) = 0;
+ virtual void Shutdown() = 0;
+ [[nodiscard]] virtual bool RegisterWorker(const CbPackage& WorkerPackage) = 0;
[[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) = 0;
[[nodiscard]] virtual size_t GetSubmittedActionCount() = 0;
@@ -63,7 +63,7 @@ public:
SubmitResult SubmitAction(Ref<RunnerAction> Action);
std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions);
size_t GetSubmittedActionCount();
- void RegisterWorker(CbPackage Worker);
+ [[nodiscard]] bool RegisterWorker(CbPackage Worker);
void Shutdown();
bool CancelAction(int ActionLsn);
void CancelRemoteQueue(int QueueId);
@@ -114,6 +114,30 @@ struct RunnerGroup : public BaseRunnerGroup
/**
* This represents an action going through different stages of scheduling and execution.
+ *
+ * State machine
+ * =============
+ *
+ * Normal forward flow (enforced by SetActionState rejecting backward transitions):
+ *
+ * New -> Pending -> Submitting -> Running -> Completed
+ * -> Failed
+ * -> Abandoned
+ * -> Cancelled
+ *
+ * Rescheduling (via ResetActionStateToPending):
+ *
+ * Failed ---> Pending (increments RetryCount, subject to retry limit)
+ * Abandoned ---> Pending (increments RetryCount, subject to retry limit)
+ * Retracted ---> Pending (does NOT increment RetryCount)
+ *
+ * Retraction (via RetractAction, idempotent):
+ *
+ * Pending/Submitting/Running -> Retracted -> Pending (rescheduled)
+ *
+ * Retracted is placed after Cancelled in enum order so that once set,
+ * no runner-side transition (Completed/Failed) can override it via
+ * SetActionState's forward-only rule.
*/
struct RunnerAction : public RefCounted
{
@@ -137,16 +161,20 @@ struct RunnerAction : public RefCounted
enum class State
{
- New,
- Pending,
- Submitting,
- Running,
- Completed,
- Failed,
- Abandoned,
- Cancelled,
+ New, // Initial state at construction, before entering the scheduler
+ Pending, // Queued and waiting for a runner slot
+ Submitting, // Being handed off to a runner (async submission in progress)
+ Running, // Executing on a runner process
+ Completed, // Finished successfully with results available
+ Failed, // Execution failed (transient error, eligible for retry)
+ Abandoned, // Infrastructure termination (e.g. spot eviction, session abandon)
+ Cancelled, // Intentional user cancellation (never retried)
+ Retracted, // Pulled back for rescheduling on a different runner (no retry cost)
_Count
};
+ static_assert(State::Retracted > State::Completed && State::Retracted > State::Failed && State::Retracted > State::Abandoned &&
+ State::Retracted > State::Cancelled,
+ "Retracted must be the highest terminal ordinal so runner-side transitions cannot override it");
static const char* ToString(State _)
{
@@ -168,6 +196,8 @@ struct RunnerAction : public RefCounted
return "Abandoned";
case State::Cancelled:
return "Cancelled";
+ case State::Retracted:
+ return "Retracted";
default:
return "Unknown";
}
@@ -191,6 +221,7 @@ struct RunnerAction : public RefCounted
void SetActionState(State NewState);
bool IsSuccess() const { return ActionState() == State::Completed; }
+ bool RetractAction();
bool ResetActionStateToPending();
bool IsCompleted() const
{