aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/include
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-18 11:19:10 +0100
committerGitHub Enterprise <[email protected]>2026-03-18 11:19:10 +0100
commiteba410c4168e23d7908827eb34b7cf0c58a5dc48 (patch)
tree3cda8e8f3f81941d3bb5b84a8155350c5bb2068c /src/zencompute/include
parentbugfix release - v5.7.23 (#851) (diff)
downloadzen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.tar.xz
zen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.zip
Compute batching (#849)
### Compute Batch Submission - Consolidate duplicated action submission logic in `httpcomputeservice` into a single `HandleSubmitAction` supporting both single-action and batch (actions array) payloads - Group actions by queue in `RemoteHttpRunner` and submit as batches with configurable chunk size, falling back to individual submission on failure - Extract shared helpers: `MakeErrorResult`, `ValidateQueueForEnqueue`, `ActivateActionInQueue`, `RemoveActionFromActiveMaps` ### Retracted Action State - Add `Retracted` state to `RunnerAction` for retry-free rescheduling — an explicit request to pull an action back and reschedule it on a different runner without incrementing `RetryCount` - Implement idempotent `RetractAction()` on `RunnerAction` and `ComputeServiceSession` - Add `POST jobs/{lsn}/retract` and `queues/{queueref}/jobs/{lsn}/retract` HTTP endpoints - Add state machine documentation and per-state comments to `RunnerAction` ### Compute Race Fixes - Fix race in `HandleActionUpdates` where actions enqueued between session abandon and scheduler tick were never abandoned, causing `GetActionResult` to return 202 indefinitely - Fix queue `ActiveCount` race where `NotifyQueueActionComplete` was called after releasing `m_ResultsLock`, allowing callers to observe stale counters immediately after `GetActionResult` returned OK ### Logging Optimization and ANSI improvements - Improve `AnsiColorStdoutSink` write efficiency — single write call, dirty-flag flush, `RwLock` instead of `std::mutex` - Move ANSI color emission from sink into formatters via `Formatter::SetColorEnabled()`; remove `ColorRangeStart`/`End` from `LogMessage` - Extract color helpers (`AnsiColorForLevel`, `StripAnsiSgrSequences`) into `helpers.h` - Strip upstream ANSI SGR escapes in non-color output mode. This enables colour in log messages without polluting log files with ANSI control sequences - Move `RotatingFileSink`, `JsonFormatter`, and `FullFormatter` from header-only to pimpl with `.cpp` files ### CLI / Exec Refactoring - Extract `ExecSessionRunner` class from ~920-line `ExecUsingSession` into focused methods and a `ExecSessionConfig` struct - Replace monolithic `ExecCommand` with subcommand-based architecture (`http`, `inproc`, `beacon`, `dump`, `buildlog`) - Allow parent options to appear after subcommand name by parsing subcommand args permissively and forwarding unmatched tokens to the parent parser ### Testing Improvements - Fix `--test-suite` filter being ignored due to accumulation with default wildcard filter - Add test suite banners to test listener output - Made `function.session.abandon_pending` test more robust ### Startup / Reliability Fixes - Fix silent exit when a second zenserver instance detects a port conflict — use `ZEN_CONSOLE_*` for log calls that precede `InitializeLogging()` - Fix two potential SIGSEGV paths during early startup: guard `sentry_options_new()` returning nullptr, and throw on `ZenServerState::Register()` returning nullptr instead of dereferencing - Fail on unrecognized zenserver `--mode` instead of silently defaulting to store ### Other - Show host details (hostname, platform, CPU count, memory) when discovering new compute workers - Move frontend `html.zip` from source tree into build directory - Add format specifications for Compact Binary and Compressed Buffer wire formats - Add `WriteCompactBinaryObject` to zencore - Extended `ConsoleTui` with additional functionality - Add `--vscode` option to `xmake sln` for clangd / `compile_commands.json` support - Disable compute/horde/nomad in release builds (not yet production-ready) - Disable unintended `ASIO_HAS_IO_URING` enablement - Fix crashpad patch missing leading whitespace - Clean up code triggering gcc false positives
Diffstat (limited to 'src/zencompute/include')
-rw-r--r--src/zencompute/include/zencompute/computeservice.h69
-rw-r--r--src/zencompute/include/zencompute/httpcomputeservice.h13
2 files changed, 80 insertions, 2 deletions
diff --git a/src/zencompute/include/zencompute/computeservice.h b/src/zencompute/include/zencompute/computeservice.h
index 65ec5f9ee..1ca78738a 100644
--- a/src/zencompute/include/zencompute/computeservice.h
+++ b/src/zencompute/include/zencompute/computeservice.h
@@ -13,6 +13,7 @@
# include <zenhttp/httpcommon.h>
# include <filesystem>
+# include <span>
namespace zen {
class ChunkResolver;
@@ -29,6 +30,53 @@ class RemoteHttpRunner;
struct RunnerAction;
struct SubmitResult;
+/**
+ * Observer interface for action completion notifications.
+ *
+ * Implementors receive a batch of notifications whenever actions reach a
+ * terminal state (Completed, Failed, Abandoned, Cancelled). The callback
+ * fires on the scheduler thread *after* the action result has been placed
+ * in m_ResultsMap, so GET /jobs/{lsn} will succeed by the time the client
+ * reacts to the notification.
+ */
+class IComputeCompletionObserver
+{
+public:
+ virtual ~IComputeCompletionObserver() = default;
+
+ enum class ActionState
+ {
+ Completed,
+ Failed,
+ Abandoned,
+ Cancelled,
+ };
+
+ struct CompletedActionNotification
+ {
+ int Lsn;
+ ActionState State;
+ };
+
+ static constexpr std::string_view ActionStateToString(ActionState State)
+ {
+ switch (State)
+ {
+ case ActionState::Completed:
+ return "Completed";
+ case ActionState::Failed:
+ return "Failed";
+ case ActionState::Abandoned:
+ return "Abandoned";
+ case ActionState::Cancelled:
+ return "Cancelled";
+ }
+ return "Unknown";
+ }
+
+ virtual void OnActionsCompleted(std::span<const CompletedActionNotification> Actions) = 0;
+};
+
struct WorkerDesc
{
CbPackage Descriptor;
@@ -91,11 +139,25 @@ public:
// Sunset can be reached from any non-Sunset state.
bool RequestStateTransition(SessionState NewState);
+ // Convenience helpers for common state transitions.
+ bool Ready() { return RequestStateTransition(SessionState::Ready); }
+ bool Abandon() { return RequestStateTransition(SessionState::Abandoned); }
+
// Orchestration
void SetOrchestratorEndpoint(std::string_view Endpoint);
void SetOrchestratorBasePath(std::filesystem::path BasePath);
+ void SetOrchestrator(std::string_view Endpoint, std::filesystem::path BasePath)
+ {
+ SetOrchestratorEndpoint(Endpoint);
+ SetOrchestratorBasePath(std::move(BasePath));
+ }
+
+ /// Immediately wake the scheduler to re-poll the orchestrator for worker changes.
+ /// Resets the polling throttle so the next scheduler tick calls UpdateCoordinatorState().
+ void NotifyOrchestratorChanged();
+
// Worker registration and discovery
void RegisterWorker(CbPackage Worker);
@@ -182,6 +244,7 @@ public:
};
[[nodiscard]] RescheduleResult RescheduleAction(int ActionLsn);
+ [[nodiscard]] RescheduleResult RetractAction(int ActionLsn);
void GetCompleted(CbWriter&);
@@ -215,7 +278,7 @@ public:
// sized to match RunnerAction::State::_Count but we can't use the enum here
// for dependency reasons, so just use a fixed size array and static assert in
// the implementation file
- uint64_t Timestamps[8] = {};
+ uint64_t Timestamps[9] = {};
};
[[nodiscard]] std::vector<ActionHistoryEntry> GetActionHistory(int Limit = 100);
@@ -235,6 +298,10 @@ public:
void EmitStats(CbObjectWriter& Cbo);
+ // Completion observer (used by HttpComputeService for WebSocket push)
+
+ void SetCompletionObserver(IComputeCompletionObserver* Observer);
+
// Recording
void StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath);
diff --git a/src/zencompute/include/zencompute/httpcomputeservice.h b/src/zencompute/include/zencompute/httpcomputeservice.h
index ee1cd2614..b58e73a0d 100644
--- a/src/zencompute/include/zencompute/httpcomputeservice.h
+++ b/src/zencompute/include/zencompute/httpcomputeservice.h
@@ -9,6 +9,7 @@
# include "zencompute/computeservice.h"
# include <zenhttp/httpserver.h>
+# include <zenhttp/websocket.h>
# include <filesystem>
# include <memory>
@@ -22,7 +23,7 @@ namespace zen::compute {
/**
* HTTP interface for compute service
*/
-class HttpComputeService : public HttpService, public IHttpStatsProvider
+class HttpComputeService : public HttpService, public IHttpStatsProvider, public IWebSocketHandler, public IComputeCompletionObserver
{
public:
HttpComputeService(CidStore& InCidStore,
@@ -42,6 +43,16 @@ public:
void HandleStatsRequest(HttpServerRequest& Request) override;
+ // IWebSocketHandler
+
+ void OnWebSocketOpen(Ref<WebSocketConnection> Connection) override;
+ void OnWebSocketMessage(WebSocketConnection& Conn, const WebSocketMessage& Msg) override;
+ void OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code, std::string_view Reason) override;
+
+ // IComputeCompletionObserver
+
+ void OnActionsCompleted(std::span<const CompletedActionNotification> Actions) override;
+
private:
struct Impl;
std::unique_ptr<Impl> m_Impl;