# zencompute Module Lambda-style compute function service. Accepts execution requests from HTTP clients, schedules them across local and remote runners, and tracks results. ## Directory Structure ``` src/zencompute/ ├── include/zencompute/ # Public headers │ ├── computeservice.h # ComputeServiceSession public API │ ├── httpcomputeservice.h # HTTP service wrapper │ ├── orchestratorservice.h # Worker registry and orchestration │ ├── httporchestrator.h # HTTP orchestrator with WebSocket push │ ├── recordingreader.h # Recording/replay reader API │ ├── cloudmetadata.h # Cloud provider detection (AWS/Azure/GCP) │ └── mockimds.h # Test helper for cloud metadata ├── runners/ # Execution backends │ ├── functionrunner.h/.cpp # Abstract base + BaseRunnerGroup/RunnerGroup │ ├── localrunner.h/.cpp # LocalProcessRunner (sandbox, monitoring, CPU sampling) │ ├── windowsrunner.h/.cpp # Windows AppContainer sandboxing + CreateProcessW │ ├── linuxrunner.h/.cpp # Linux user/mount/network namespace isolation │ ├── macrunner.h/.cpp # macOS Seatbelt sandboxing │ ├── winerunner.h/.cpp # Wine runner for Windows executables on Linux │ ├── remotehttprunner.h/.cpp # Remote HTTP submission to other zenserver instances │ └── deferreddeleter.h/.cpp # Background deletion of sandbox directories ├── recording/ # Recording/replay subsystem │ ├── actionrecorder.h/.cpp # Write actions+attachments to disk │ └── recordingreader.cpp # Read recordings back ├── timeline/ │ └── workertimeline.h/.cpp # Per-worker action lifecycle event tracking ├── testing/ │ └── mockimds.cpp # Mock IMDS for cloud metadata tests ├── computeservice.cpp # ComputeServiceSession::Impl (~1700 lines) ├── httpcomputeservice.cpp # HTTP route registration and handlers (~900 lines) ├── httporchestrator.cpp # Orchestrator HTTP API + WebSocket push ├── orchestratorservice.cpp # Worker registry, health probing └── cloudmetadata.cpp # IMDS probing, termination monitoring ``` ## Key Classes ### `ComputeServiceSession` (computeservice.h) Public API entry point. Uses PIMPL (`struct Impl` in computeservice.cpp). Owns: - Two `RunnerGroup`s: `m_LocalRunnerGroup`, `m_RemoteRunnerGroup` - Scheduler thread that drains `m_UpdatedActions` and drives state transitions - Action maps: `m_PendingActions`, `m_RunningMap`, `m_ResultsMap` - Queue map: `m_Queues` (QueueEntry objects) - Action history ring: `m_ActionHistory` (bounded deque, default 1000) - WebSocket client (`m_OrchestratorWsClient`) subscribed to the orchestrator's `/orch/ws` push for instant worker discovery **Session states:** Created → Ready → Draining → Paused → Abandoned → Sunset. Both Abandoned and Sunset can be jumped to from any earlier state. Abandoned is used for spot instance termination grace periods — on entry, all pending and running actions are immediately marked as `RunnerAction::State::Abandoned` and running processes are best-effort cancelled. Auto-retry is suppressed while the session is Abandoned. `IsHealthy()` returns false for Abandoned and Sunset. **Convenience helpers:** `Ready()`, `Abandon()`, `SetOrchestrator(Endpoint, BasePath)` are inline wrappers for common state transitions and orchestrator configuration. ### `RunnerAction` (runners/functionrunner.h) Shared ref-counted struct representing one action through its lifecycle. **Key fields:** - `ActionLsn` — global unique sequence number - `QueueId` — 0 for implicit/unqueued actions - `Worker` — descriptor + content hash - `ActionObj` — CbObject with the action spec - `CpuUsagePercent` / `CpuSeconds` — atomics updated by monitor thread - `RetryCount` — atomic int tracking how many times the action has been rescheduled - `Timestamps[State::_Count]` — timestamp of each state transition **State machine (forward-only under normal flow, atomic):** ``` New → Pending → Submitting → Running → Completed → Failed → Abandoned → Cancelled → Retracted ``` `SetActionState()` rejects non-forward transitions (Retracted has the highest ordinal so runner-side transitions cannot override it). `ResetActionStateToPending()` uses CAS to atomically transition from Failed/Abandoned back to Pending for rescheduling — it clears timestamps from Submitting onward, resets execution fields, increments `RetryCount`, and calls `PostUpdate()` to re-enter the scheduler pipeline. **Retracted state:** An explicit, instigator-initiated request to pull an action back and reschedule it on a different runner (e.g. capacity opened up elsewhere). Unlike Failed/Abandoned auto-retry, rescheduling from Retracted does not increment `RetryCount` since nothing went wrong. Retraction is idempotent and can target Pending, Submitting, or Running actions. ### `LocalProcessRunner` (runners/localrunner.h) Base for all local execution. Platform runners subclass this and override: - `SubmitAction()` — fork/exec the worker process - `SweepRunningActions()` — poll for process exit (waitpid / WaitForSingleObject) - `CancelRunningActions()` — signal all processes during shutdown - `SampleProcessCpu(RunningAction&)` — read platform CPU usage (no-op default) **Infrastructure owned by LocalProcessRunner:** - Monitor thread — calls `SweepRunningActions()` then `SampleRunningProcessCpu()` in a loop - `m_RunningMap` — `RwLock`-guarded map of `Lsn → RunningAction` - `DeferredDirectoryDeleter` — sandbox directories are queued for async deletion - `PrepareActionSubmission()` — shared preamble (capacity check, sandbox creation, worker manifesting, input decompression) - `ProcessCompletedActions()` — shared post-processing (gather outputs, set state, enqueue deletion) **CPU sampling:** `SampleRunningProcessCpu()` iterates `m_RunningMap` under shared lock, calls `SampleProcessCpu()` per entry, throttled to every 5 seconds per action. Platform implementations: - Linux: `/proc/{pid}/stat` utime+stime jiffies ÷ `_SC_CLK_TCK` - Windows: `GetProcessTimes()` in 100ns intervals ÷ 10,000,000 - macOS: `proc_pidinfo(PROC_PIDTASKINFO)` pti_total_user+system nanoseconds ÷ 1,000,000,000 ### `FunctionRunner` / `RunnerGroup` (runners/functionrunner.h) Abstract base for runners. `RunnerGroup` holds a vector of runners and load-balances across them using a round-robin atomic index. `BaseRunnerGroup::SubmitActions()` distributes a batch proportionally based on per-runner capacity. `SubmitActions()` supports batch submission — actions are grouped and forwarded in chunks. ### `RemoteHttpRunner` (runners/remotehttprunner.h) Submits actions to remote zenserver instances over HTTP. Key features: - **WebSocket completion notifications**: connects a WS client to `/compute/ws` on the remote. When a message arrives (action completed), the monitor thread wakes immediately instead of polling. Falls back to adaptive polling (200ms→50ms) when WS is unavailable. - **Batch submission**: groups actions by queue and submits in configurable chunks (`m_MaxBatchSize`, default 50), falling back to individual submission on failure. - **Queue cancellation**: `CancelRemoteQueue()` sends cancel requests to the remote. - **Graceful shutdown**: `Shutdown()` closes the WS client, cancels all remote queues, stops the monitor thread, and marks remaining actions as Failed. ### `HttpComputeService` (include/zencompute/httpcomputeservice.h) Wraps `ComputeServiceSession` as an HTTP service. All routes are registered in the constructor. Handles CbPackage attachment ingestion from `CidStore` before forwarding to the service. Supports both single-action and batch (actions array) payloads via a shared `HandleSubmitAction` helper. ## Orchestrator Discovery `ComputeServiceSession` discovers remote workers via the orchestrator endpoint (`SetOrchestratorEndpoint()`). Two complementary mechanisms: 1. **Polling** (`UpdateCoordinatorState`): `GET /orch/agents` on the scheduler thread, throttled to every 5s (500ms when no workers are known yet). Discovers new workers and removes stale/unreachable ones. 2. **WebSocket push** (`OrchestratorWsHandler`): connects to `/orch/ws` on the orchestrator at setup time. When the orchestrator broadcasts a state change, the handler sets `m_OrchestratorQueryForced` and signals the scheduler event, bypassing the polling throttle. Falls back silently to polling if the WS connection fails. `NotifyOrchestratorChanged()` is the public API to trigger an immediate re-query — useful in tests and for external notification sources. Use `HttpToWsUrl(Endpoint, Path)` from `zenhttp/httpwsclient.h` to convert HTTP(S) endpoints to WebSocket URLs. This helper is shared across all WS client setup sites in the codebase. ## Action Lifecycle (End to End) 1. **HTTP POST** → `HttpComputeService` ingests attachments, calls `EnqueueAction()` 2. **Enqueue** → creates `RunnerAction` (New → Pending), calls `PostUpdate()` 3. **PostUpdate** → appends to `m_UpdatedActions`, signals scheduler thread event 4. **Scheduler thread** → drains `m_UpdatedActions`, drives pending actions to runners 5. **Runner `SubmitAction()`** → Pending → Submitting (on runner's worker pool thread) 6. **Process launch** → Submitting → Running, added to `m_RunningMap` 7. **Monitor thread `SweepRunningActions()`** → detects exit, gathers outputs 8. **`ProcessCompletedActions()`** → Running → Completed/Failed/Abandoned, `PostUpdate()` 9. **Scheduler thread `HandleActionUpdates()`** — for Failed/Abandoned actions, checks retry limit; if retries remain, calls `ResetActionStateToPending()` which loops back to step 3. Otherwise moves to `m_ResultsMap`, records history, notifies queue. 10. **Client `GET /jobs/{lsn}`** → returns result from `m_ResultsMap`, schedules retirement ### Action Rescheduling Actions that fail or are abandoned can be automatically retried or manually rescheduled via the API. **Automatic retry (scheduler path):** In `HandleActionUpdates()`, when a Failed or Abandoned state is detected, the scheduler checks `RetryCount < GetMaxRetriesForQueue(QueueId)`. If retries remain, the action is removed from active maps and `ResetActionStateToPending()` is called, which re-enters it into the scheduler pipeline. The action keeps its original LSN so clients can continue polling with the same identifier. **Manual retry (API path):** `POST /compute/jobs/{lsn}` calls `RescheduleAction()`, which finds the action in `m_ResultsMap`, validates state (must be Failed or Abandoned), checks the retry limit, reverses queue counters (moving the LSN from `FinishedLsns` back to `ActiveLsns`), removes from results, and calls `ResetActionStateToPending()`. Returns 200 with `{lsn, retry_count}` on success, 409 Conflict with `{error}` on failure. **Retry limit:** Default of 3, overridable per-queue via the `max_retries` integer field in the queue's `Config` CbObject (set at `CreateQueue` time). Setting `max_retries=0` disables automatic retry entirely; omitting the field (or setting it to a negative value) uses the default of 3. Both automatic and manual paths respect this limit. **Retraction (API path):** `RetractAction(Lsn)` pulls a Pending/Submitting/Running action back for rescheduling on a different runner. The action transitions to Retracted, then `ResetActionStateToPending()` is called *without* incrementing `RetryCount`. Retraction is idempotent. **Cancelled actions are never retried** — cancellation is an intentional user action, not a transient failure. ## Queue System Queues group actions from a single client session. A `QueueEntry` (internal) tracks: - `State` — `std::atomic` lifecycle state (Active → Draining → Cancelled) - `ActiveCount` — pending + running actions (atomic) - `CompletedCount / FailedCount / AbandonedCount / CancelledCount` (atomics) - `ActiveLsns` — for cancellation lookup (under `m_Lock`) - `FinishedLsns` — moved here when actions complete - `IdleSince` — used for 15-minute automatic expiry - `Config` — CbObject set at creation; supports `max_retries` (int, default 3) to override the default retry limit. `0` = no retries, negative or absent = use default **Queue state machine (`QueueState` enum):** ``` Active → Draining → Cancelled \ ↑ ─────────────────────/ ``` - **Active** — accepts new work, schedules pending work, finishes running work (initial state) - **Draining** — rejects new work, finishes existing work (one-way via CAS from Active; cannot override Cancelled) - **Cancelled** — rejects new work, actively cancels in-flight work (reachable from Active or Draining) Key operations: - `CreateQueue(Tag)` → returns `QueueId` - `EnqueueActionToQueue(QueueId, ...)` → action's `QueueId` field is set at creation - `CancelQueue(QueueId)` → marks all active LSNs for cancellation - `DrainQueue(QueueId)` → stops accepting new submissions; existing work finishes naturally (irreversible) - `GetQueueCompleted(QueueId)` → CbWriter output of finished results - Queue references in HTTP routes accept either a decimal ID or an Oid token (24-hex), resolved by `ResolveQueueRef()` ## HTTP API All routes registered in `HttpComputeService` constructor. Prefix is configured externally (typically `/compute`). ### Global endpoints | Method | Path | Description | |--------|------|-------------| | POST | `abandon` | Transition session to Abandoned state (409 if invalid) | | GET | `jobs/history` | Action history (last N, with timestamps per state) | | GET | `jobs/running` | In-flight actions with CPU metrics | | GET | `jobs/completed` | Actions with results available | | GET/POST/DELETE | `jobs/{lsn}` | GET: result; POST: reschedule failed action; DELETE: retire | | POST | `jobs/{lsn}/retract` | Retract a pending/running action for rescheduling (idempotent) | | POST | `jobs/{worker}` | Submit action for specific worker | | POST | `jobs` | Submit action (or batch via `actions` array) | | GET | `workers` | List worker IDs | | GET | `workers/all` | All workers with full descriptors | | GET/POST | `workers/{worker}` | Get/register worker | ### Queue-scoped endpoints Queue ref is capture(1) in all `queues/{queueref}/...` routes. | Method | Path | Description | |--------|------|-------------| | GET | `queues` | List queue IDs | | POST | `queues` | Create queue | | GET/DELETE | `queues/{queueref}` | Status / delete | | POST | `queues/{queueref}/drain` | Drain queue (irreversible; rejects new submissions) | | GET | `queues/{queueref}/completed` | Queue's completed results | | GET | `queues/{queueref}/history` | Queue's action history | | GET | `queues/{queueref}/running` | Queue's running actions | | POST | `queues/{queueref}/jobs` | Submit to queue (or batch via `actions` array) | | GET/POST | `queues/{queueref}/jobs/{lsn}` | GET: result; POST: reschedule | | POST | `queues/{queueref}/jobs/{lsn}/retract` | Retract action for rescheduling | | GET/POST | `queues/{queueref}/workers/...` | Worker endpoints (same as global) | Worker handler logic is extracted into private helpers (`HandleWorkersGet`, `HandleWorkersAllGet`, `HandleWorkerRequest`) shared by top-level and queue-scoped routes. ## Concurrency Model **Locking discipline:** The three action maps (`m_PendingActions`, `m_RunningMap`, `m_ResultsMap`) are guarded by a single `m_ActionMapLock`. This eliminates lock-ordering concerns between maps and prevents actions from being temporarily absent from all maps during state transitions. Runner-level `m_RunningLock` in `LocalProcessRunner` / `RemoteHttpRunner` is a separate lock on a different class — unrelated to the session-level action map lock. **Atomic fields** for counters and simple state: queue counts, `CpuUsagePercent`, `CpuSeconds`, `RetryCount`, `RunnerAction::m_ActionState`. **Update decoupling:** Runners call `PostUpdate(RunnerAction*)` rather than directly mutating service state. The scheduler thread batches and deduplicates updates. **Thread ownership:** - Scheduler thread — drives state transitions, owns `m_PendingActions` - Monitor thread (per runner) — polls process completion, owns `m_RunningMap` via shared lock - Worker pool threads — async submission, brief `SubmitAction()` calls - HTTP threads — read-only access to results, queue status ## Sandbox Layout Each action gets a unique numbered directory under `m_SandboxPath`: ``` scratch/{counter}/ worker/ ← worker binaries (or bind-mounted on Linux) inputs/ ← decompressed action inputs outputs/ ← written by worker process ``` On Linux with sandboxing enabled, the process runs in a pivot-rooted namespace with `/usr`, `/lib`, `/etc`, `/worker` bind-mounted read-only and a tmpfs `/dev`. ## Adding a New HTTP Endpoint 1. Register the route in the `HttpComputeService` constructor in `httpcomputeservice.cpp` 2. If the handler is shared between top-level and a `queues/{queueref}/...` variant, extract it as a private helper method declared in `httpcomputeservice.h` 3. Queue-scoped routes validate the queue ref with `ResolveQueueRef(HttpReq, Req.GetCapture(1))` which writes an error response and returns 0 on failure 4. Use `CbObjectWriter` for response bodies; emit via `HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save())` 5. Conditional fields (e.g., optional CPU metrics): emit inside `if (value > 0.0f)` / `if (value >= 0.0f)` guards to omit absent values rather than emitting sentinel values ## Adding a New Runner Platform 1. Subclass `LocalProcessRunner`, add `h`/`cpp` files in `runners/` 2. Override `SubmitAction()`, `SweepRunningActions()`, `CancelRunningActions()`, and optionally `CancelAction(int)` and `SampleProcessCpu(RunningAction&)` 3. `SampleProcessCpu()` must update both `Running.Action->CpuSeconds` (unconditionally from the absolute OS value) and `Running.Action->CpuUsagePercent` (delta-based, only after second sample) 4. `ProcessHandle` convention: store pid as `reinterpret_cast(static_cast(pid))` for consistency with the base class 5. Register in `ComputeServiceSession::AddLocalRunner()` in `computeservice.cpp`