diff options
| author | Stefan Boberg <[email protected]> | 2026-03-18 11:19:10 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-18 11:19:10 +0100 |
| commit | eba410c4168e23d7908827eb34b7cf0c58a5dc48 (patch) | |
| tree | 3cda8e8f3f81941d3bb5b84a8155350c5bb2068c /src/zencompute/include | |
| parent | bugfix release - v5.7.23 (#851) (diff) | |
| download | zen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.tar.xz zen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.zip | |
Compute batching (#849)
### Compute Batch Submission
- Consolidate duplicated action submission logic in `httpcomputeservice` into a single `HandleSubmitAction` supporting both single-action and batch (actions array) payloads
- Group actions by queue in `RemoteHttpRunner` and submit as batches with configurable chunk size, falling back to individual submission on failure
- Extract shared helpers: `MakeErrorResult`, `ValidateQueueForEnqueue`, `ActivateActionInQueue`, `RemoveActionFromActiveMaps`
### Retracted Action State
- Add `Retracted` state to `RunnerAction` for retry-free rescheduling — an explicit request to pull an action back and reschedule it on a different runner without incrementing `RetryCount`
- Implement idempotent `RetractAction()` on `RunnerAction` and `ComputeServiceSession`
- Add `POST jobs/{lsn}/retract` and `queues/{queueref}/jobs/{lsn}/retract` HTTP endpoints
- Add state machine documentation and per-state comments to `RunnerAction`
### Compute Race Fixes
- Fix race in `HandleActionUpdates` where actions enqueued between session abandon and scheduler tick were never abandoned, causing `GetActionResult` to return 202 indefinitely
- Fix queue `ActiveCount` race where `NotifyQueueActionComplete` was called after releasing `m_ResultsLock`, allowing callers to observe stale counters immediately after `GetActionResult` returned OK
### Logging Optimization and ANSI improvements
- Improve `AnsiColorStdoutSink` write efficiency — single write call, dirty-flag flush, `RwLock` instead of `std::mutex`
- Move ANSI color emission from sink into formatters via `Formatter::SetColorEnabled()`; remove `ColorRangeStart`/`End` from `LogMessage`
- Extract color helpers (`AnsiColorForLevel`, `StripAnsiSgrSequences`) into `helpers.h`
- Strip upstream ANSI SGR escapes in non-color output mode. This enables colour in log messages without polluting log files with ANSI control sequences
- Move `RotatingFileSink`, `JsonFormatter`, and `FullFormatter` from header-only to pimpl with `.cpp` files
### CLI / Exec Refactoring
- Extract `ExecSessionRunner` class from ~920-line `ExecUsingSession` into focused methods and a `ExecSessionConfig` struct
- Replace monolithic `ExecCommand` with subcommand-based architecture (`http`, `inproc`, `beacon`, `dump`, `buildlog`)
- Allow parent options to appear after subcommand name by parsing subcommand args permissively and forwarding unmatched tokens to the parent parser
### Testing Improvements
- Fix `--test-suite` filter being ignored due to accumulation with default wildcard filter
- Add test suite banners to test listener output
- Made `function.session.abandon_pending` test more robust
### Startup / Reliability Fixes
- Fix silent exit when a second zenserver instance detects a port conflict — use `ZEN_CONSOLE_*` for log calls that precede `InitializeLogging()`
- Fix two potential SIGSEGV paths during early startup: guard `sentry_options_new()` returning nullptr, and throw on `ZenServerState::Register()` returning nullptr instead of dereferencing
- Fail on unrecognized zenserver `--mode` instead of silently defaulting to store
### Other
- Show host details (hostname, platform, CPU count, memory) when discovering new compute workers
- Move frontend `html.zip` from source tree into build directory
- Add format specifications for Compact Binary and Compressed Buffer wire formats
- Add `WriteCompactBinaryObject` to zencore
- Extended `ConsoleTui` with additional functionality
- Add `--vscode` option to `xmake sln` for clangd / `compile_commands.json` support
- Disable compute/horde/nomad in release builds (not yet production-ready)
- Disable unintended `ASIO_HAS_IO_URING` enablement
- Fix crashpad patch missing leading whitespace
- Clean up code triggering gcc false positives
Diffstat (limited to 'src/zencompute/include')
| -rw-r--r-- | src/zencompute/include/zencompute/computeservice.h | 69 | ||||
| -rw-r--r-- | src/zencompute/include/zencompute/httpcomputeservice.h | 13 |
2 files changed, 80 insertions, 2 deletions
diff --git a/src/zencompute/include/zencompute/computeservice.h b/src/zencompute/include/zencompute/computeservice.h index 65ec5f9ee..1ca78738a 100644 --- a/src/zencompute/include/zencompute/computeservice.h +++ b/src/zencompute/include/zencompute/computeservice.h @@ -13,6 +13,7 @@ # include <zenhttp/httpcommon.h> # include <filesystem> +# include <span> namespace zen { class ChunkResolver; @@ -29,6 +30,53 @@ class RemoteHttpRunner; struct RunnerAction; struct SubmitResult; +/** + * Observer interface for action completion notifications. + * + * Implementors receive a batch of notifications whenever actions reach a + * terminal state (Completed, Failed, Abandoned, Cancelled). The callback + * fires on the scheduler thread *after* the action result has been placed + * in m_ResultsMap, so GET /jobs/{lsn} will succeed by the time the client + * reacts to the notification. + */ +class IComputeCompletionObserver +{ +public: + virtual ~IComputeCompletionObserver() = default; + + enum class ActionState + { + Completed, + Failed, + Abandoned, + Cancelled, + }; + + struct CompletedActionNotification + { + int Lsn; + ActionState State; + }; + + static constexpr std::string_view ActionStateToString(ActionState State) + { + switch (State) + { + case ActionState::Completed: + return "Completed"; + case ActionState::Failed: + return "Failed"; + case ActionState::Abandoned: + return "Abandoned"; + case ActionState::Cancelled: + return "Cancelled"; + } + return "Unknown"; + } + + virtual void OnActionsCompleted(std::span<const CompletedActionNotification> Actions) = 0; +}; + struct WorkerDesc { CbPackage Descriptor; @@ -91,11 +139,25 @@ public: // Sunset can be reached from any non-Sunset state. bool RequestStateTransition(SessionState NewState); + // Convenience helpers for common state transitions. + bool Ready() { return RequestStateTransition(SessionState::Ready); } + bool Abandon() { return RequestStateTransition(SessionState::Abandoned); } + // Orchestration void SetOrchestratorEndpoint(std::string_view Endpoint); void SetOrchestratorBasePath(std::filesystem::path BasePath); + void SetOrchestrator(std::string_view Endpoint, std::filesystem::path BasePath) + { + SetOrchestratorEndpoint(Endpoint); + SetOrchestratorBasePath(std::move(BasePath)); + } + + /// Immediately wake the scheduler to re-poll the orchestrator for worker changes. + /// Resets the polling throttle so the next scheduler tick calls UpdateCoordinatorState(). + void NotifyOrchestratorChanged(); + // Worker registration and discovery void RegisterWorker(CbPackage Worker); @@ -182,6 +244,7 @@ public: }; [[nodiscard]] RescheduleResult RescheduleAction(int ActionLsn); + [[nodiscard]] RescheduleResult RetractAction(int ActionLsn); void GetCompleted(CbWriter&); @@ -215,7 +278,7 @@ public: // sized to match RunnerAction::State::_Count but we can't use the enum here // for dependency reasons, so just use a fixed size array and static assert in // the implementation file - uint64_t Timestamps[8] = {}; + uint64_t Timestamps[9] = {}; }; [[nodiscard]] std::vector<ActionHistoryEntry> GetActionHistory(int Limit = 100); @@ -235,6 +298,10 @@ public: void EmitStats(CbObjectWriter& Cbo); + // Completion observer (used by HttpComputeService for WebSocket push) + + void SetCompletionObserver(IComputeCompletionObserver* Observer); + // Recording void StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath); diff --git a/src/zencompute/include/zencompute/httpcomputeservice.h b/src/zencompute/include/zencompute/httpcomputeservice.h index ee1cd2614..b58e73a0d 100644 --- a/src/zencompute/include/zencompute/httpcomputeservice.h +++ b/src/zencompute/include/zencompute/httpcomputeservice.h @@ -9,6 +9,7 @@ # include "zencompute/computeservice.h" # include <zenhttp/httpserver.h> +# include <zenhttp/websocket.h> # include <filesystem> # include <memory> @@ -22,7 +23,7 @@ namespace zen::compute { /** * HTTP interface for compute service */ -class HttpComputeService : public HttpService, public IHttpStatsProvider +class HttpComputeService : public HttpService, public IHttpStatsProvider, public IWebSocketHandler, public IComputeCompletionObserver { public: HttpComputeService(CidStore& InCidStore, @@ -42,6 +43,16 @@ public: void HandleStatsRequest(HttpServerRequest& Request) override; + // IWebSocketHandler + + void OnWebSocketOpen(Ref<WebSocketConnection> Connection) override; + void OnWebSocketMessage(WebSocketConnection& Conn, const WebSocketMessage& Msg) override; + void OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code, std::string_view Reason) override; + + // IComputeCompletionObserver + + void OnActionsCompleted(std::span<const CompletedActionNotification> Actions) override; + private: struct Impl; std::unique_ptr<Impl> m_Impl; |