aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/CLAUDE.md
blob: f5188123f46b043db451c434500a30ae99d21a20 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# zencompute Module

Lambda-style compute function service. Accepts execution requests from HTTP clients, schedules them across local and remote runners, and tracks results.

## Directory Structure

```
src/zencompute/
├── include/zencompute/       # Public headers
│   ├── computeservice.h      # ComputeServiceSession public API
│   ├── httpcomputeservice.h  # HTTP service wrapper
│   ├── orchestratorservice.h # Worker registry and orchestration
│   ├── httporchestrator.h    # HTTP orchestrator with WebSocket push
│   ├── recordingreader.h     # Recording/replay reader API
│   ├── cloudmetadata.h       # Cloud provider detection (AWS/Azure/GCP)
│   └── mockimds.h            # Test helper for cloud metadata
├── runners/                  # Execution backends
│   ├── functionrunner.h/.cpp # Abstract base + BaseRunnerGroup/RunnerGroup
│   ├── localrunner.h/.cpp    # LocalProcessRunner (sandbox, monitoring, CPU sampling)
│   ├── windowsrunner.h/.cpp  # Windows AppContainer sandboxing + CreateProcessW
│   ├── linuxrunner.h/.cpp    # Linux user/mount/network namespace isolation
│   ├── macrunner.h/.cpp      # macOS Seatbelt sandboxing
│   ├── winerunner.h/.cpp     # Wine runner for Windows executables on Linux
│   ├── remotehttprunner.h/.cpp # Remote HTTP submission to other zenserver instances
│   └── deferreddeleter.h/.cpp  # Background deletion of sandbox directories
├── recording/                # Recording/replay subsystem
│   ├── actionrecorder.h/.cpp # Write actions+attachments to disk
│   └── recordingreader.cpp   # Read recordings back
├── timeline/
│   └── workertimeline.h/.cpp # Per-worker action lifecycle event tracking
├── testing/
│   └── mockimds.cpp          # Mock IMDS for cloud metadata tests
├── computeservice.cpp        # ComputeServiceSession::Impl (~1700 lines)
├── httpcomputeservice.cpp    # HTTP route registration and handlers (~900 lines)
├── httporchestrator.cpp      # Orchestrator HTTP API + WebSocket push
├── orchestratorservice.cpp   # Worker registry, health probing
└── cloudmetadata.cpp         # IMDS probing, termination monitoring
```

## Key Classes

### `ComputeServiceSession` (computeservice.h)
Public API entry point. Uses PIMPL (`struct Impl` in computeservice.cpp). Owns:
- Two `RunnerGroup`s: `m_LocalRunnerGroup`, `m_RemoteRunnerGroup`
- Scheduler thread that drains `m_UpdatedActions` and drives state transitions
- Action maps: `m_PendingActions`, `m_RunningMap`, `m_ResultsMap`
- Queue map: `m_Queues` (QueueEntry objects)
- Action history ring: `m_ActionHistory` (bounded deque, default 1000)

**Session states:** Created → Ready → Draining → Paused → Abandoned → Sunset. Both Abandoned and Sunset can be jumped to from any earlier state. Abandoned is used for spot instance termination grace periods — on entry, all pending and running actions are immediately marked as `RunnerAction::State::Abandoned` and running processes are best-effort cancelled. Auto-retry is suppressed while the session is Abandoned. `IsHealthy()` returns false for Abandoned and Sunset.

### `RunnerAction` (runners/functionrunner.h)
Shared ref-counted struct representing one action through its lifecycle.

**Key fields:**
- `ActionLsn` — global unique sequence number
- `QueueId` — 0 for implicit/unqueued actions
- `Worker` — descriptor + content hash
- `ActionObj` — CbObject with the action spec
- `CpuUsagePercent` / `CpuSeconds` — atomics updated by monitor thread
- `RetryCount` — atomic int tracking how many times the action has been rescheduled
- `Timestamps[State::_Count]` — timestamp of each state transition

**State machine (forward-only under normal flow, atomic):**
```
New → Pending → Submitting → Running → Completed
                                     → Failed
                                     → Abandoned
                                     → Cancelled
```
`SetActionState()` rejects non-forward transitions. The one exception is `ResetActionStateToPending()`, which uses CAS to atomically transition from Failed/Abandoned back to Pending for rescheduling. It clears timestamps from Submitting onward, resets execution fields, increments `RetryCount`, and calls `PostUpdate()` to re-enter the scheduler pipeline.

### `LocalProcessRunner` (runners/localrunner.h)
Base for all local execution. Platform runners subclass this and override:
- `SubmitAction()` — fork/exec the worker process
- `SweepRunningActions()` — poll for process exit (waitpid / WaitForSingleObject)
- `CancelRunningActions()` — signal all processes during shutdown
- `SampleProcessCpu(RunningAction&)` — read platform CPU usage (no-op default)

**Infrastructure owned by LocalProcessRunner:**
- Monitor thread — calls `SweepRunningActions()` then `SampleRunningProcessCpu()` in a loop
- `m_RunningMap``RwLock`-guarded map of `Lsn → RunningAction`
- `DeferredDirectoryDeleter` — sandbox directories are queued for async deletion
- `PrepareActionSubmission()` — shared preamble (capacity check, sandbox creation, worker manifesting, input decompression)
- `ProcessCompletedActions()` — shared post-processing (gather outputs, set state, enqueue deletion)

**CPU sampling:** `SampleRunningProcessCpu()` iterates `m_RunningMap` under shared lock, calls `SampleProcessCpu()` per entry, throttled to every 5 seconds per action. Platform implementations:
- Linux: `/proc/{pid}/stat` utime+stime jiffies ÷ `_SC_CLK_TCK`
- Windows: `GetProcessTimes()` in 100ns intervals ÷ 10,000,000
- macOS: `proc_pidinfo(PROC_PIDTASKINFO)` pti_total_user+system nanoseconds ÷ 1,000,000,000

### `FunctionRunner` / `RunnerGroup` (runners/functionrunner.h)
Abstract base for runners. `RunnerGroup<T>` holds a vector of runners and load-balances across them using a round-robin atomic index. `BaseRunnerGroup::SubmitActions()` distributes a batch proportionally based on per-runner capacity.

### `HttpComputeService` (include/zencompute/httpcomputeservice.h)
Wraps `ComputeServiceSession` as an HTTP service. All routes are registered in the constructor. Handles CbPackage attachment ingestion from `CidStore` before forwarding to the service.

## Action Lifecycle (End to End)

1. **HTTP POST**`HttpComputeService` ingests attachments, calls `EnqueueAction()`
2. **Enqueue** → creates `RunnerAction` (New → Pending), calls `PostUpdate()`
3. **PostUpdate** → appends to `m_UpdatedActions`, signals scheduler thread event
4. **Scheduler thread** → drains `m_UpdatedActions`, drives pending actions to runners
5. **Runner `SubmitAction()`** → Pending → Submitting (on runner's worker pool thread)
6. **Process launch** → Submitting → Running, added to `m_RunningMap`
7. **Monitor thread `SweepRunningActions()`** → detects exit, gathers outputs
8. **`ProcessCompletedActions()`** → Running → Completed/Failed/Abandoned, `PostUpdate()`
9. **Scheduler thread `HandleActionUpdates()`** — for Failed/Abandoned actions, checks retry limit; if retries remain, calls `ResetActionStateToPending()` which loops back to step 3. Otherwise moves to `m_ResultsMap`, records history, notifies queue.
10. **Client `GET /jobs/{lsn}`** → returns result from `m_ResultsMap`, schedules retirement

### Action Rescheduling

Actions that fail or are abandoned can be automatically retried or manually rescheduled via the API.

**Automatic retry (scheduler path):** In `HandleActionUpdates()`, when a Failed or Abandoned state is detected, the scheduler checks `RetryCount < GetMaxRetriesForQueue(QueueId)`. If retries remain, the action is removed from active maps and `ResetActionStateToPending()` is called, which re-enters it into the scheduler pipeline. The action keeps its original LSN so clients can continue polling with the same identifier.

**Manual retry (API path):** `POST /compute/jobs/{lsn}` calls `RescheduleAction()`, which finds the action in `m_ResultsMap`, validates state (must be Failed or Abandoned), checks the retry limit, reverses queue counters (moving the LSN from `FinishedLsns` back to `ActiveLsns`), removes from results, and calls `ResetActionStateToPending()`. Returns 200 with `{lsn, retry_count}` on success, 409 Conflict with `{error}` on failure.

**Retry limit:** Default of 3, overridable per-queue via the `max_retries` integer field in the queue's `Config` CbObject (set at `CreateQueue` time). Both automatic and manual paths respect this limit.

**Cancelled actions are never retried** — cancellation is an intentional user action, not a transient failure.

## Queue System

Queues group actions from a single client session. A `QueueEntry` (internal) tracks:
- `State``std::atomic<QueueState>` lifecycle state (Active → Draining → Cancelled)
- `ActiveCount` — pending + running actions (atomic)
- `CompletedCount / FailedCount / AbandonedCount / CancelledCount` (atomics)
- `ActiveLsns` — for cancellation lookup (under `m_Lock`)
- `FinishedLsns` — moved here when actions complete
- `IdleSince` — used for 15-minute automatic expiry
- `Config` — CbObject set at creation; supports `max_retries` (int) to override the default retry limit

**Queue state machine (`QueueState` enum):**
```
Active  →  Draining  →  Cancelled
   \                       ↑
    ─────────────────────/
```
- **Active** — accepts new work, schedules pending work, finishes running work (initial state)
- **Draining** — rejects new work, finishes existing work (one-way via CAS from Active; cannot override Cancelled)
- **Cancelled** — rejects new work, actively cancels in-flight work (reachable from Active or Draining)

Key operations:
- `CreateQueue(Tag)` → returns `QueueId`
- `EnqueueActionToQueue(QueueId, ...)` → action's `QueueId` field is set at creation
- `CancelQueue(QueueId)` → marks all active LSNs for cancellation
- `DrainQueue(QueueId)` → stops accepting new submissions; existing work finishes naturally (irreversible)
- `GetQueueCompleted(QueueId)` → CbWriter output of finished results
- Queue references in HTTP routes accept either a decimal ID or an Oid token (24-hex), resolved by `ResolveQueueRef()`

## HTTP API

All routes registered in `HttpComputeService` constructor. Prefix is configured externally (typically `/compute`).

### Global endpoints
| Method | Path | Description |
|--------|------|-------------|
| POST | `abandon` | Transition session to Abandoned state (409 if invalid) |
| GET | `jobs/history` | Action history (last N, with timestamps per state) |
| GET | `jobs/running` | In-flight actions with CPU metrics |
| GET | `jobs/completed` | Actions with results available |
| GET/POST/DELETE | `jobs/{lsn}` | GET: result; POST: reschedule failed action; DELETE: retire |
| POST | `jobs/{worker}` | Submit action for specific worker |
| POST | `jobs` | Submit action (worker resolved from descriptor) |
| GET | `workers` | List worker IDs |
| GET | `workers/all` | All workers with full descriptors |
| GET/POST | `workers/{worker}` | Get/register worker |

### Queue-scoped endpoints
Queue ref is capture(1) in all `queues/{queueref}/...` routes.

| Method | Path | Description |
|--------|------|-------------|
| GET | `queues` | List queue IDs |
| POST | `queues` | Create queue |
| GET/DELETE | `queues/{queueref}` | Status / delete |
| POST | `queues/{queueref}/drain` | Drain queue (irreversible; rejects new submissions) |
| GET | `queues/{queueref}/completed` | Queue's completed results |
| GET | `queues/{queueref}/history` | Queue's action history |
| GET | `queues/{queueref}/running` | Queue's running actions |
| POST | `queues/{queueref}/jobs` | Submit to queue |
| GET/POST | `queues/{queueref}/jobs/{lsn}` | GET: result; POST: reschedule |
| GET/POST | `queues/{queueref}/workers/...` | Worker endpoints (same as global) |

Worker handler logic is extracted into private helpers (`HandleWorkersGet`, `HandleWorkersAllGet`, `HandleWorkerRequest`) shared by top-level and queue-scoped routes.

## Concurrency Model

**Locking discipline:** When multiple locks must be held simultaneously, always acquire in this order to prevent deadlocks:
1. `m_ResultsLock`
2. `m_RunningLock` (comment in localrunner.h: "must be taken *after* m_ResultsLock")
3. `m_PendingLock`
4. `m_QueueLock`

**Atomic fields** for counters and simple state: queue counts, `CpuUsagePercent`, `CpuSeconds`, `RetryCount`, `RunnerAction::m_ActionState`.

**Update decoupling:** Runners call `PostUpdate(RunnerAction*)` rather than directly mutating service state. The scheduler thread batches and deduplicates updates.

**Thread ownership:**
- Scheduler thread — drives state transitions, owns `m_PendingActions`
- Monitor thread (per runner) — polls process completion, owns `m_RunningMap` via shared lock
- Worker pool threads — async submission, brief `SubmitAction()` calls
- HTTP threads — read-only access to results, queue status

## Sandbox Layout

Each action gets a unique numbered directory under `m_SandboxPath`:
```
scratch/{counter}/
  worker/    ← worker binaries (or bind-mounted on Linux)
  inputs/    ← decompressed action inputs
  outputs/   ← written by worker process
```

On Linux with sandboxing enabled, the process runs in a pivot-rooted namespace with `/usr`, `/lib`, `/etc`, `/worker` bind-mounted read-only and a tmpfs `/dev`.

## Adding a New HTTP Endpoint

1. Register the route in the `HttpComputeService` constructor in `httpcomputeservice.cpp`
2. If the handler is shared between top-level and a `queues/{queueref}/...` variant, extract it as a private helper method declared in `httpcomputeservice.h`
3. Queue-scoped routes validate the queue ref with `ResolveQueueRef(HttpReq, Req.GetCapture(1))` which writes an error response and returns 0 on failure
4. Use `CbObjectWriter` for response bodies; emit via `HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save())`
5. Conditional fields (e.g., optional CPU metrics): emit inside `if (value > 0.0f)` / `if (value >= 0.0f)` guards to omit absent values rather than emitting sentinel values

## Adding a New Runner Platform

1. Subclass `LocalProcessRunner`, add `h`/`cpp` files in `runners/`
2. Override `SubmitAction()`, `SweepRunningActions()`, `CancelRunningActions()`, and optionally `CancelAction(int)` and `SampleProcessCpu(RunningAction&)`
3. `SampleProcessCpu()` must update both `Running.Action->CpuSeconds` (unconditionally from the absolute OS value) and `Running.Action->CpuUsagePercent` (delta-based, only after second sample)
4. `ProcessHandle` convention: store pid as `reinterpret_cast<void*>(static_cast<intptr_t>(pid))` for consistency with the base class
5. Register in `ComputeServiceSession::AddLocalRunner()` in `computeservice.cpp`