aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/CLAUDE.md
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencompute/CLAUDE.md')
-rw-r--r--src/zencompute/CLAUDE.md232
1 files changed, 232 insertions, 0 deletions
diff --git a/src/zencompute/CLAUDE.md b/src/zencompute/CLAUDE.md
new file mode 100644
index 000000000..f5188123f
--- /dev/null
+++ b/src/zencompute/CLAUDE.md
@@ -0,0 +1,232 @@
+# zencompute Module
+
+Lambda-style compute function service. Accepts execution requests from HTTP clients, schedules them across local and remote runners, and tracks results.
+
+## Directory Structure
+
+```
+src/zencompute/
+├── include/zencompute/ # Public headers
+│ ├── computeservice.h # ComputeServiceSession public API
+│ ├── httpcomputeservice.h # HTTP service wrapper
+│ ├── orchestratorservice.h # Worker registry and orchestration
+│ ├── httporchestrator.h # HTTP orchestrator with WebSocket push
+│ ├── recordingreader.h # Recording/replay reader API
+│ ├── cloudmetadata.h # Cloud provider detection (AWS/Azure/GCP)
+│ └── mockimds.h # Test helper for cloud metadata
+├── runners/ # Execution backends
+│ ├── functionrunner.h/.cpp # Abstract base + BaseRunnerGroup/RunnerGroup
+│ ├── localrunner.h/.cpp # LocalProcessRunner (sandbox, monitoring, CPU sampling)
+│ ├── windowsrunner.h/.cpp # Windows AppContainer sandboxing + CreateProcessW
+│ ├── linuxrunner.h/.cpp # Linux user/mount/network namespace isolation
+│ ├── macrunner.h/.cpp # macOS Seatbelt sandboxing
+│ ├── winerunner.h/.cpp # Wine runner for Windows executables on Linux
+│ ├── remotehttprunner.h/.cpp # Remote HTTP submission to other zenserver instances
+│ └── deferreddeleter.h/.cpp # Background deletion of sandbox directories
+├── recording/ # Recording/replay subsystem
+│ ├── actionrecorder.h/.cpp # Write actions+attachments to disk
+│ └── recordingreader.cpp # Read recordings back
+├── timeline/
+│ └── workertimeline.h/.cpp # Per-worker action lifecycle event tracking
+├── testing/
+│ └── mockimds.cpp # Mock IMDS for cloud metadata tests
+├── computeservice.cpp # ComputeServiceSession::Impl (~1700 lines)
+├── httpcomputeservice.cpp # HTTP route registration and handlers (~900 lines)
+├── httporchestrator.cpp # Orchestrator HTTP API + WebSocket push
+├── orchestratorservice.cpp # Worker registry, health probing
+└── cloudmetadata.cpp # IMDS probing, termination monitoring
+```
+
+## Key Classes
+
+### `ComputeServiceSession` (computeservice.h)
+Public API entry point. Uses PIMPL (`struct Impl` in computeservice.cpp). Owns:
+- Two `RunnerGroup`s: `m_LocalRunnerGroup`, `m_RemoteRunnerGroup`
+- Scheduler thread that drains `m_UpdatedActions` and drives state transitions
+- Action maps: `m_PendingActions`, `m_RunningMap`, `m_ResultsMap`
+- Queue map: `m_Queues` (QueueEntry objects)
+- Action history ring: `m_ActionHistory` (bounded deque, default 1000)
+
+**Session states:** Created → Ready → Draining → Paused → Abandoned → Sunset. Both Abandoned and Sunset can be jumped to from any earlier state. Abandoned is used for spot instance termination grace periods — on entry, all pending and running actions are immediately marked as `RunnerAction::State::Abandoned` and running processes are best-effort cancelled. Auto-retry is suppressed while the session is Abandoned. `IsHealthy()` returns false for Abandoned and Sunset.
+
+### `RunnerAction` (runners/functionrunner.h)
+Shared ref-counted struct representing one action through its lifecycle.
+
+**Key fields:**
+- `ActionLsn` — global unique sequence number
+- `QueueId` — 0 for implicit/unqueued actions
+- `Worker` — descriptor + content hash
+- `ActionObj` — CbObject with the action spec
+- `CpuUsagePercent` / `CpuSeconds` — atomics updated by monitor thread
+- `RetryCount` — atomic int tracking how many times the action has been rescheduled
+- `Timestamps[State::_Count]` — timestamp of each state transition
+
+**State machine (forward-only under normal flow, atomic):**
+```
+New → Pending → Submitting → Running → Completed
+ → Failed
+ → Abandoned
+ → Cancelled
+```
+`SetActionState()` rejects non-forward transitions. The one exception is `ResetActionStateToPending()`, which uses CAS to atomically transition from Failed/Abandoned back to Pending for rescheduling. It clears timestamps from Submitting onward, resets execution fields, increments `RetryCount`, and calls `PostUpdate()` to re-enter the scheduler pipeline.
+
+### `LocalProcessRunner` (runners/localrunner.h)
+Base for all local execution. Platform runners subclass this and override:
+- `SubmitAction()` — fork/exec the worker process
+- `SweepRunningActions()` — poll for process exit (waitpid / WaitForSingleObject)
+- `CancelRunningActions()` — signal all processes during shutdown
+- `SampleProcessCpu(RunningAction&)` — read platform CPU usage (no-op default)
+
+**Infrastructure owned by LocalProcessRunner:**
+- Monitor thread — calls `SweepRunningActions()` then `SampleRunningProcessCpu()` in a loop
+- `m_RunningMap` — `RwLock`-guarded map of `Lsn → RunningAction`
+- `DeferredDirectoryDeleter` — sandbox directories are queued for async deletion
+- `PrepareActionSubmission()` — shared preamble (capacity check, sandbox creation, worker manifesting, input decompression)
+- `ProcessCompletedActions()` — shared post-processing (gather outputs, set state, enqueue deletion)
+
+**CPU sampling:** `SampleRunningProcessCpu()` iterates `m_RunningMap` under shared lock, calls `SampleProcessCpu()` per entry, throttled to every 5 seconds per action. Platform implementations:
+- Linux: `/proc/{pid}/stat` utime+stime jiffies ÷ `_SC_CLK_TCK`
+- Windows: `GetProcessTimes()` in 100ns intervals ÷ 10,000,000
+- macOS: `proc_pidinfo(PROC_PIDTASKINFO)` pti_total_user+system nanoseconds ÷ 1,000,000,000
+
+### `FunctionRunner` / `RunnerGroup` (runners/functionrunner.h)
+Abstract base for runners. `RunnerGroup<T>` holds a vector of runners and load-balances across them using a round-robin atomic index. `BaseRunnerGroup::SubmitActions()` distributes a batch proportionally based on per-runner capacity.
+
+### `HttpComputeService` (include/zencompute/httpcomputeservice.h)
+Wraps `ComputeServiceSession` as an HTTP service. All routes are registered in the constructor. Handles CbPackage attachment ingestion from `CidStore` before forwarding to the service.
+
+## Action Lifecycle (End to End)
+
+1. **HTTP POST** → `HttpComputeService` ingests attachments, calls `EnqueueAction()`
+2. **Enqueue** → creates `RunnerAction` (New → Pending), calls `PostUpdate()`
+3. **PostUpdate** → appends to `m_UpdatedActions`, signals scheduler thread event
+4. **Scheduler thread** → drains `m_UpdatedActions`, drives pending actions to runners
+5. **Runner `SubmitAction()`** → Pending → Submitting (on runner's worker pool thread)
+6. **Process launch** → Submitting → Running, added to `m_RunningMap`
+7. **Monitor thread `SweepRunningActions()`** → detects exit, gathers outputs
+8. **`ProcessCompletedActions()`** → Running → Completed/Failed/Abandoned, `PostUpdate()`
+9. **Scheduler thread `HandleActionUpdates()`** — for Failed/Abandoned actions, checks retry limit; if retries remain, calls `ResetActionStateToPending()` which loops back to step 3. Otherwise moves to `m_ResultsMap`, records history, notifies queue.
+10. **Client `GET /jobs/{lsn}`** → returns result from `m_ResultsMap`, schedules retirement
+
+### Action Rescheduling
+
+Actions that fail or are abandoned can be automatically retried or manually rescheduled via the API.
+
+**Automatic retry (scheduler path):** In `HandleActionUpdates()`, when a Failed or Abandoned state is detected, the scheduler checks `RetryCount < GetMaxRetriesForQueue(QueueId)`. If retries remain, the action is removed from active maps and `ResetActionStateToPending()` is called, which re-enters it into the scheduler pipeline. The action keeps its original LSN so clients can continue polling with the same identifier.
+
+**Manual retry (API path):** `POST /compute/jobs/{lsn}` calls `RescheduleAction()`, which finds the action in `m_ResultsMap`, validates state (must be Failed or Abandoned), checks the retry limit, reverses queue counters (moving the LSN from `FinishedLsns` back to `ActiveLsns`), removes from results, and calls `ResetActionStateToPending()`. Returns 200 with `{lsn, retry_count}` on success, 409 Conflict with `{error}` on failure.
+
+**Retry limit:** Default of 3, overridable per-queue via the `max_retries` integer field in the queue's `Config` CbObject (set at `CreateQueue` time). Both automatic and manual paths respect this limit.
+
+**Cancelled actions are never retried** — cancellation is an intentional user action, not a transient failure.
+
+## Queue System
+
+Queues group actions from a single client session. A `QueueEntry` (internal) tracks:
+- `State` — `std::atomic<QueueState>` lifecycle state (Active → Draining → Cancelled)
+- `ActiveCount` — pending + running actions (atomic)
+- `CompletedCount / FailedCount / AbandonedCount / CancelledCount` (atomics)
+- `ActiveLsns` — for cancellation lookup (under `m_Lock`)
+- `FinishedLsns` — moved here when actions complete
+- `IdleSince` — used for 15-minute automatic expiry
+- `Config` — CbObject set at creation; supports `max_retries` (int) to override the default retry limit
+
+**Queue state machine (`QueueState` enum):**
+```
+Active → Draining → Cancelled
+ \ ↑
+ ─────────────────────/
+```
+- **Active** — accepts new work, schedules pending work, finishes running work (initial state)
+- **Draining** — rejects new work, finishes existing work (one-way via CAS from Active; cannot override Cancelled)
+- **Cancelled** — rejects new work, actively cancels in-flight work (reachable from Active or Draining)
+
+Key operations:
+- `CreateQueue(Tag)` → returns `QueueId`
+- `EnqueueActionToQueue(QueueId, ...)` → action's `QueueId` field is set at creation
+- `CancelQueue(QueueId)` → marks all active LSNs for cancellation
+- `DrainQueue(QueueId)` → stops accepting new submissions; existing work finishes naturally (irreversible)
+- `GetQueueCompleted(QueueId)` → CbWriter output of finished results
+- Queue references in HTTP routes accept either a decimal ID or an Oid token (24-hex), resolved by `ResolveQueueRef()`
+
+## HTTP API
+
+All routes registered in `HttpComputeService` constructor. Prefix is configured externally (typically `/compute`).
+
+### Global endpoints
+| Method | Path | Description |
+|--------|------|-------------|
+| POST | `abandon` | Transition session to Abandoned state (409 if invalid) |
+| GET | `jobs/history` | Action history (last N, with timestamps per state) |
+| GET | `jobs/running` | In-flight actions with CPU metrics |
+| GET | `jobs/completed` | Actions with results available |
+| GET/POST/DELETE | `jobs/{lsn}` | GET: result; POST: reschedule failed action; DELETE: retire |
+| POST | `jobs/{worker}` | Submit action for specific worker |
+| POST | `jobs` | Submit action (worker resolved from descriptor) |
+| GET | `workers` | List worker IDs |
+| GET | `workers/all` | All workers with full descriptors |
+| GET/POST | `workers/{worker}` | Get/register worker |
+
+### Queue-scoped endpoints
+Queue ref is capture(1) in all `queues/{queueref}/...` routes.
+
+| Method | Path | Description |
+|--------|------|-------------|
+| GET | `queues` | List queue IDs |
+| POST | `queues` | Create queue |
+| GET/DELETE | `queues/{queueref}` | Status / delete |
+| POST | `queues/{queueref}/drain` | Drain queue (irreversible; rejects new submissions) |
+| GET | `queues/{queueref}/completed` | Queue's completed results |
+| GET | `queues/{queueref}/history` | Queue's action history |
+| GET | `queues/{queueref}/running` | Queue's running actions |
+| POST | `queues/{queueref}/jobs` | Submit to queue |
+| GET/POST | `queues/{queueref}/jobs/{lsn}` | GET: result; POST: reschedule |
+| GET/POST | `queues/{queueref}/workers/...` | Worker endpoints (same as global) |
+
+Worker handler logic is extracted into private helpers (`HandleWorkersGet`, `HandleWorkersAllGet`, `HandleWorkerRequest`) shared by top-level and queue-scoped routes.
+
+## Concurrency Model
+
+**Locking discipline:** When multiple locks must be held simultaneously, always acquire in this order to prevent deadlocks:
+1. `m_ResultsLock`
+2. `m_RunningLock` (comment in localrunner.h: "must be taken *after* m_ResultsLock")
+3. `m_PendingLock`
+4. `m_QueueLock`
+
+**Atomic fields** for counters and simple state: queue counts, `CpuUsagePercent`, `CpuSeconds`, `RetryCount`, `RunnerAction::m_ActionState`.
+
+**Update decoupling:** Runners call `PostUpdate(RunnerAction*)` rather than directly mutating service state. The scheduler thread batches and deduplicates updates.
+
+**Thread ownership:**
+- Scheduler thread — drives state transitions, owns `m_PendingActions`
+- Monitor thread (per runner) — polls process completion, owns `m_RunningMap` via shared lock
+- Worker pool threads — async submission, brief `SubmitAction()` calls
+- HTTP threads — read-only access to results, queue status
+
+## Sandbox Layout
+
+Each action gets a unique numbered directory under `m_SandboxPath`:
+```
+scratch/{counter}/
+ worker/ ← worker binaries (or bind-mounted on Linux)
+ inputs/ ← decompressed action inputs
+ outputs/ ← written by worker process
+```
+
+On Linux with sandboxing enabled, the process runs in a pivot-rooted namespace with `/usr`, `/lib`, `/etc`, `/worker` bind-mounted read-only and a tmpfs `/dev`.
+
+## Adding a New HTTP Endpoint
+
+1. Register the route in the `HttpComputeService` constructor in `httpcomputeservice.cpp`
+2. If the handler is shared between top-level and a `queues/{queueref}/...` variant, extract it as a private helper method declared in `httpcomputeservice.h`
+3. Queue-scoped routes validate the queue ref with `ResolveQueueRef(HttpReq, Req.GetCapture(1))` which writes an error response and returns 0 on failure
+4. Use `CbObjectWriter` for response bodies; emit via `HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save())`
+5. Conditional fields (e.g., optional CPU metrics): emit inside `if (value > 0.0f)` / `if (value >= 0.0f)` guards to omit absent values rather than emitting sentinel values
+
+## Adding a New Runner Platform
+
+1. Subclass `LocalProcessRunner`, add `h`/`cpp` files in `runners/`
+2. Override `SubmitAction()`, `SweepRunningActions()`, `CancelRunningActions()`, and optionally `CancelAction(int)` and `SampleProcessCpu(RunningAction&)`
+3. `SampleProcessCpu()` must update both `Running.Action->CpuSeconds` (unconditionally from the absolute OS value) and `Running.Action->CpuUsagePercent` (delta-based, only after second sample)
+4. `ProcessHandle` convention: store pid as `reinterpret_cast<void*>(static_cast<intptr_t>(pid))` for consistency with the base class
+5. Register in `ComputeServiceSession::AddLocalRunner()` in `computeservice.cpp`