aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/CLAUDE.md
blob: 750879d5acdd01d9d87af55ceb13a6c88fff5a3c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# zencompute Module

Lambda-style compute function service. Accepts execution requests from HTTP clients, schedules them across local and remote runners, and tracks results.

## Directory Structure

```
src/zencompute/
├── include/zencompute/       # Public headers
│   ├── computeservice.h      # ComputeServiceSession public API
│   ├── httpcomputeservice.h  # HTTP service wrapper
│   ├── orchestratorservice.h # Worker registry and orchestration
│   ├── httporchestrator.h    # HTTP orchestrator with WebSocket push
│   ├── recordingreader.h     # Recording/replay reader API
│   ├── cloudmetadata.h       # Cloud provider detection (AWS/Azure/GCP)
│   └── mockimds.h            # Test helper for cloud metadata
├── runners/                  # Execution backends
│   ├── functionrunner.h/.cpp # Abstract base + BaseRunnerGroup/RunnerGroup
│   ├── localrunner.h/.cpp    # LocalProcessRunner (sandbox, monitoring, CPU sampling)
│   ├── windowsrunner.h/.cpp  # Windows AppContainer sandboxing + CreateProcessW
│   ├── linuxrunner.h/.cpp    # Linux user/mount/network namespace isolation
│   ├── macrunner.h/.cpp      # macOS Seatbelt sandboxing
│   ├── winerunner.h/.cpp     # Wine runner for Windows executables on Linux
│   ├── remotehttprunner.h/.cpp # Remote HTTP submission to other zenserver instances
│   └── deferreddeleter.h/.cpp  # Background deletion of sandbox directories
├── recording/                # Recording/replay subsystem
│   ├── actionrecorder.h/.cpp # Write actions+attachments to disk
│   └── recordingreader.cpp   # Read recordings back
├── timeline/
│   └── workertimeline.h/.cpp # Per-worker action lifecycle event tracking
├── testing/
│   └── mockimds.cpp          # Mock IMDS for cloud metadata tests
├── computeservice.cpp        # ComputeServiceSession::Impl (~1700 lines)
├── httpcomputeservice.cpp    # HTTP route registration and handlers (~900 lines)
├── httporchestrator.cpp      # Orchestrator HTTP API + WebSocket push
├── orchestratorservice.cpp   # Worker registry, health probing
└── cloudmetadata.cpp         # IMDS probing, termination monitoring
```

## Key Classes

### `ComputeServiceSession` (computeservice.h)
Public API entry point. Uses PIMPL (`struct Impl` in computeservice.cpp). Owns:
- Two `RunnerGroup`s: `m_LocalRunnerGroup`, `m_RemoteRunnerGroup`
- Scheduler thread that drains `m_UpdatedActions` and drives state transitions
- Action maps: `m_PendingActions`, `m_RunningMap`, `m_ResultsMap`
- Queue map: `m_Queues` (QueueEntry objects)
- Action history ring: `m_ActionHistory` (bounded deque, default 1000)
- WebSocket client (`m_OrchestratorWsClient`) subscribed to the orchestrator's `/orch/ws` push for instant worker discovery

**Session states:** Created → Ready → Draining → Paused → Abandoned → Sunset. Both Abandoned and Sunset can be jumped to from any earlier state. Abandoned is used for spot instance termination grace periods — on entry, all pending and running actions are immediately marked as `RunnerAction::State::Abandoned` and running processes are best-effort cancelled. Auto-retry is suppressed while the session is Abandoned. `IsHealthy()` returns false for Abandoned and Sunset.

**Convenience helpers:** `Ready()`, `Abandon()`, `SetOrchestrator(Endpoint, BasePath)` are inline wrappers for common state transitions and orchestrator configuration.

### `RunnerAction` (runners/functionrunner.h)
Shared ref-counted struct representing one action through its lifecycle.

**Key fields:**
- `ActionLsn` — global unique sequence number
- `QueueId` — 0 for implicit/unqueued actions
- `Worker` — descriptor + content hash
- `ActionObj` — CbObject with the action spec
- `CpuUsagePercent` / `CpuSeconds` — atomics updated by monitor thread
- `RetryCount` — atomic int tracking how many times the action has been rescheduled
- `Timestamps[State::_Count]` — timestamp of each state transition

**State machine (forward-only under normal flow, atomic):**
```
New → Pending → Submitting → Running → Completed
                                     → Failed
                                     → Abandoned
                                     → Cancelled
                                     → Retracted
```
`SetActionState()` rejects non-forward transitions (Retracted has the highest ordinal so runner-side transitions cannot override it). `ResetActionStateToPending()` uses CAS to atomically transition from Failed/Abandoned back to Pending for rescheduling — it clears timestamps from Submitting onward, resets execution fields, increments `RetryCount`, and calls `PostUpdate()` to re-enter the scheduler pipeline.

**Retracted state:** An explicit, instigator-initiated request to pull an action back and reschedule it on a different runner (e.g. capacity opened up elsewhere). Unlike Failed/Abandoned auto-retry, rescheduling from Retracted does not increment `RetryCount` since nothing went wrong. Retraction is idempotent and can target Pending, Submitting, or Running actions.

### `LocalProcessRunner` (runners/localrunner.h)
Base for all local execution. Platform runners subclass this and override:
- `SubmitAction()` — fork/exec the worker process
- `SweepRunningActions()` — poll for process exit (waitpid / WaitForSingleObject)
- `CancelRunningActions()` — signal all processes during shutdown
- `SampleProcessCpu(RunningAction&)` — read platform CPU usage (no-op default)

**Infrastructure owned by LocalProcessRunner:**
- Monitor thread — calls `SweepRunningActions()` then `SampleRunningProcessCpu()` in a loop
- `m_RunningMap``RwLock`-guarded map of `Lsn → RunningAction`
- `DeferredDirectoryDeleter` — sandbox directories are queued for async deletion
- `PrepareActionSubmission()` — shared preamble (capacity check, sandbox creation, worker manifesting, input decompression)
- `ProcessCompletedActions()` — shared post-processing (gather outputs, set state, enqueue deletion)

**CPU sampling:** `SampleRunningProcessCpu()` iterates `m_RunningMap` under shared lock, calls `SampleProcessCpu()` per entry, throttled to every 5 seconds per action. Platform implementations:
- Linux: `/proc/{pid}/stat` utime+stime jiffies ÷ `_SC_CLK_TCK`
- Windows: `GetProcessTimes()` in 100ns intervals ÷ 10,000,000
- macOS: `proc_pidinfo(PROC_PIDTASKINFO)` pti_total_user+system nanoseconds ÷ 1,000,000,000

### `FunctionRunner` / `RunnerGroup` (runners/functionrunner.h)
Abstract base for runners. `RunnerGroup<T>` holds a vector of runners and load-balances across them using a round-robin atomic index. `BaseRunnerGroup::SubmitActions()` distributes a batch proportionally based on per-runner capacity. `SubmitActions()` supports batch submission — actions are grouped and forwarded in chunks.

### `RemoteHttpRunner` (runners/remotehttprunner.h)
Submits actions to remote zenserver instances over HTTP. Key features:
- **WebSocket completion notifications**: connects a WS client to `/compute/ws` on the remote. When a message arrives (action completed), the monitor thread wakes immediately instead of polling. Falls back to adaptive polling (200ms→50ms) when WS is unavailable.
- **Batch submission**: groups actions by queue and submits in configurable chunks (`m_MaxBatchSize`, default 50), falling back to individual submission on failure.
- **Queue cancellation**: `CancelRemoteQueue()` sends cancel requests to the remote.
- **Graceful shutdown**: `Shutdown()` closes the WS client, cancels all remote queues, stops the monitor thread, and marks remaining actions as Failed.

### `HttpComputeService` (include/zencompute/httpcomputeservice.h)
Wraps `ComputeServiceSession` as an HTTP service. All routes are registered in the constructor. Handles CbPackage attachment ingestion from `CidStore` before forwarding to the service. Supports both single-action and batch (actions array) payloads via a shared `HandleSubmitAction` helper.

## Orchestrator Discovery

`ComputeServiceSession` discovers remote workers via the orchestrator endpoint (`SetOrchestratorEndpoint()`). Two complementary mechanisms:

1. **Polling** (`UpdateCoordinatorState`): `GET /orch/agents` on the scheduler thread, throttled to every 5s (500ms when no workers are known yet). Discovers new workers and removes stale/unreachable ones.

2. **WebSocket push** (`OrchestratorWsHandler`): connects to `/orch/ws` on the orchestrator at setup time. When the orchestrator broadcasts a state change, the handler sets `m_OrchestratorQueryForced` and signals the scheduler event, bypassing the polling throttle. Falls back silently to polling if the WS connection fails.

`NotifyOrchestratorChanged()` is the public API to trigger an immediate re-query — useful in tests and for external notification sources.

Use `HttpToWsUrl(Endpoint, Path)` from `zenhttp/httpwsclient.h` to convert HTTP(S) endpoints to WebSocket URLs. This helper is shared across all WS client setup sites in the codebase.

## Action Lifecycle (End to End)

1. **HTTP POST**`HttpComputeService` ingests attachments, calls `EnqueueAction()`
2. **Enqueue** → creates `RunnerAction` (New → Pending), calls `PostUpdate()`
3. **PostUpdate** → appends to `m_UpdatedActions`, signals scheduler thread event
4. **Scheduler thread** → drains `m_UpdatedActions`, drives pending actions to runners
5. **Runner `SubmitAction()`** → Pending → Submitting (on runner's worker pool thread)
6. **Process launch** → Submitting → Running, added to `m_RunningMap`
7. **Monitor thread `SweepRunningActions()`** → detects exit, gathers outputs
8. **`ProcessCompletedActions()`** → Running → Completed/Failed/Abandoned, `PostUpdate()`
9. **Scheduler thread `HandleActionUpdates()`** — for Failed/Abandoned actions, checks retry limit; if retries remain, calls `ResetActionStateToPending()` which loops back to step 3. Otherwise moves to `m_ResultsMap`, records history, notifies queue.
10. **Client `GET /jobs/{lsn}`** → returns result from `m_ResultsMap`, schedules retirement

### Action Rescheduling

Actions that fail or are abandoned can be automatically retried or manually rescheduled via the API.

**Automatic retry (scheduler path):** In `HandleActionUpdates()`, when a Failed or Abandoned state is detected, the scheduler checks `RetryCount < GetMaxRetriesForQueue(QueueId)`. If retries remain, the action is removed from active maps and `ResetActionStateToPending()` is called, which re-enters it into the scheduler pipeline. The action keeps its original LSN so clients can continue polling with the same identifier.

**Manual retry (API path):** `POST /compute/jobs/{lsn}` calls `RescheduleAction()`, which finds the action in `m_ResultsMap`, validates state (must be Failed or Abandoned), checks the retry limit, reverses queue counters (moving the LSN from `FinishedLsns` back to `ActiveLsns`), removes from results, and calls `ResetActionStateToPending()`. Returns 200 with `{lsn, retry_count}` on success, 409 Conflict with `{error}` on failure.

**Retry limit:** Default of 3, overridable per-queue via the `max_retries` integer field in the queue's `Config` CbObject (set at `CreateQueue` time). Setting `max_retries=0` disables automatic retry entirely; omitting the field (or setting it to a negative value) uses the default of 3. Both automatic and manual paths respect this limit.

**Retraction (API path):** `RetractAction(Lsn)` pulls a Pending/Submitting/Running action back for rescheduling on a different runner. The action transitions to Retracted, then `ResetActionStateToPending()` is called *without* incrementing `RetryCount`. Retraction is idempotent.

**Cancelled actions are never retried** — cancellation is an intentional user action, not a transient failure.

## Queue System

Queues group actions from a single client session. A `QueueEntry` (internal) tracks:
- `State``std::atomic<QueueState>` lifecycle state (Active → Draining → Cancelled)
- `ActiveCount` — pending + running actions (atomic)
- `CompletedCount / FailedCount / AbandonedCount / CancelledCount` (atomics)
- `ActiveLsns` — for cancellation lookup (under `m_Lock`)
- `FinishedLsns` — moved here when actions complete
- `IdleSince` — used for 15-minute automatic expiry
- `Config` — CbObject set at creation; supports `max_retries` (int, default 3) to override the default retry limit. `0` = no retries, negative or absent = use default

**Queue state machine (`QueueState` enum):**
```
Active  →  Draining  →  Cancelled
   \                       ↑
    ─────────────────────/
```
- **Active** — accepts new work, schedules pending work, finishes running work (initial state)
- **Draining** — rejects new work, finishes existing work (one-way via CAS from Active; cannot override Cancelled)
- **Cancelled** — rejects new work, actively cancels in-flight work (reachable from Active or Draining)

Key operations:
- `CreateQueue(Tag)` → returns `QueueId`
- `EnqueueActionToQueue(QueueId, ...)` → action's `QueueId` field is set at creation
- `CancelQueue(QueueId)` → marks all active LSNs for cancellation
- `DrainQueue(QueueId)` → stops accepting new submissions; existing work finishes naturally (irreversible)
- `GetQueueCompleted(QueueId)` → CbWriter output of finished results
- Queue references in HTTP routes accept either a decimal ID or an Oid token (24-hex), resolved by `ResolveQueueRef()`

## HTTP API

All routes registered in `HttpComputeService` constructor. Prefix is configured externally (typically `/compute`).

### Global endpoints
| Method | Path | Description |
|--------|------|-------------|
| POST | `abandon` | Transition session to Abandoned state (409 if invalid) |
| GET | `jobs/history` | Action history (last N, with timestamps per state) |
| GET | `jobs/running` | In-flight actions with CPU metrics |
| GET | `jobs/completed` | Actions with results available |
| GET/POST/DELETE | `jobs/{lsn}` | GET: result; POST: reschedule failed action; DELETE: retire |
| POST | `jobs/{lsn}/retract` | Retract a pending/running action for rescheduling (idempotent) |
| POST | `jobs/{worker}` | Submit action for specific worker |
| POST | `jobs` | Submit action (or batch via `actions` array) |
| GET | `workers` | List worker IDs |
| GET | `workers/all` | All workers with full descriptors |
| GET/POST | `workers/{worker}` | Get/register worker |

### Queue-scoped endpoints
Queue ref is capture(1) in all `queues/{queueref}/...` routes.

| Method | Path | Description |
|--------|------|-------------|
| GET | `queues` | List queue IDs |
| POST | `queues` | Create queue |
| GET/DELETE | `queues/{queueref}` | Status / delete |
| POST | `queues/{queueref}/drain` | Drain queue (irreversible; rejects new submissions) |
| GET | `queues/{queueref}/completed` | Queue's completed results |
| GET | `queues/{queueref}/history` | Queue's action history |
| GET | `queues/{queueref}/running` | Queue's running actions |
| POST | `queues/{queueref}/jobs` | Submit to queue (or batch via `actions` array) |
| GET/POST | `queues/{queueref}/jobs/{lsn}` | GET: result; POST: reschedule |
| POST | `queues/{queueref}/jobs/{lsn}/retract` | Retract action for rescheduling |
| GET/POST | `queues/{queueref}/workers/...` | Worker endpoints (same as global) |

Worker handler logic is extracted into private helpers (`HandleWorkersGet`, `HandleWorkersAllGet`, `HandleWorkerRequest`) shared by top-level and queue-scoped routes.

## Concurrency Model

**Locking discipline:** The three action maps (`m_PendingActions`, `m_RunningMap`, `m_ResultsMap`) are guarded by a single `m_ActionMapLock`. This eliminates lock-ordering concerns between maps and prevents actions from being temporarily absent from all maps during state transitions. Runner-level `m_RunningLock` in `LocalProcessRunner` / `RemoteHttpRunner` is a separate lock on a different class — unrelated to the session-level action map lock.

**Atomic fields** for counters and simple state: queue counts, `CpuUsagePercent`, `CpuSeconds`, `RetryCount`, `RunnerAction::m_ActionState`.

**Update decoupling:** Runners call `PostUpdate(RunnerAction*)` rather than directly mutating service state. The scheduler thread batches and deduplicates updates.

**Thread ownership:**
- Scheduler thread — drives state transitions, owns `m_PendingActions`
- Monitor thread (per runner) — polls process completion, owns `m_RunningMap` via shared lock
- Worker pool threads — async submission, brief `SubmitAction()` calls
- HTTP threads — read-only access to results, queue status

## Sandbox Layout

Each action gets a unique numbered directory under `m_SandboxPath`:
```
scratch/{counter}/
  worker/    ← worker binaries (or bind-mounted on Linux)
  inputs/    ← decompressed action inputs
  outputs/   ← written by worker process
```

On Linux with sandboxing enabled, the process runs in a pivot-rooted namespace with `/usr`, `/lib`, `/etc`, `/worker` bind-mounted read-only and a tmpfs `/dev`.

## Adding a New HTTP Endpoint

1. Register the route in the `HttpComputeService` constructor in `httpcomputeservice.cpp`
2. If the handler is shared between top-level and a `queues/{queueref}/...` variant, extract it as a private helper method declared in `httpcomputeservice.h`
3. Queue-scoped routes validate the queue ref with `ResolveQueueRef(HttpReq, Req.GetCapture(1))` which writes an error response and returns 0 on failure
4. Use `CbObjectWriter` for response bodies; emit via `HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save())`
5. Conditional fields (e.g., optional CPU metrics): emit inside `if (value > 0.0f)` / `if (value >= 0.0f)` guards to omit absent values rather than emitting sentinel values

## Adding a New Runner Platform

1. Subclass `LocalProcessRunner`, add `h`/`cpp` files in `runners/`
2. Override `SubmitAction()`, `SweepRunningActions()`, `CancelRunningActions()`, and optionally `CancelAction(int)` and `SampleProcessCpu(RunningAction&)`
3. `SampleProcessCpu()` must update both `Running.Action->CpuSeconds` (unconditionally from the absolute OS value) and `Running.Action->CpuUsagePercent` (delta-based, only after second sample)
4. `ProcessHandle` convention: store pid as `reinterpret_cast<void*>(static_cast<intptr_t>(pid))` for consistency with the base class
5. Register in `ComputeServiceSession::AddLocalRunner()` in `computeservice.cpp`