aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/runners/remotehttprunner.h
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-18 11:19:10 +0100
committerGitHub Enterprise <[email protected]>2026-03-18 11:19:10 +0100
commiteba410c4168e23d7908827eb34b7cf0c58a5dc48 (patch)
tree3cda8e8f3f81941d3bb5b84a8155350c5bb2068c /src/zencompute/runners/remotehttprunner.h
parentbugfix release - v5.7.23 (#851) (diff)
downloadzen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.tar.xz
zen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.zip
Compute batching (#849)
### Compute Batch Submission - Consolidate duplicated action submission logic in `httpcomputeservice` into a single `HandleSubmitAction` supporting both single-action and batch (actions array) payloads - Group actions by queue in `RemoteHttpRunner` and submit as batches with configurable chunk size, falling back to individual submission on failure - Extract shared helpers: `MakeErrorResult`, `ValidateQueueForEnqueue`, `ActivateActionInQueue`, `RemoveActionFromActiveMaps` ### Retracted Action State - Add `Retracted` state to `RunnerAction` for retry-free rescheduling — an explicit request to pull an action back and reschedule it on a different runner without incrementing `RetryCount` - Implement idempotent `RetractAction()` on `RunnerAction` and `ComputeServiceSession` - Add `POST jobs/{lsn}/retract` and `queues/{queueref}/jobs/{lsn}/retract` HTTP endpoints - Add state machine documentation and per-state comments to `RunnerAction` ### Compute Race Fixes - Fix race in `HandleActionUpdates` where actions enqueued between session abandon and scheduler tick were never abandoned, causing `GetActionResult` to return 202 indefinitely - Fix queue `ActiveCount` race where `NotifyQueueActionComplete` was called after releasing `m_ResultsLock`, allowing callers to observe stale counters immediately after `GetActionResult` returned OK ### Logging Optimization and ANSI improvements - Improve `AnsiColorStdoutSink` write efficiency — single write call, dirty-flag flush, `RwLock` instead of `std::mutex` - Move ANSI color emission from sink into formatters via `Formatter::SetColorEnabled()`; remove `ColorRangeStart`/`End` from `LogMessage` - Extract color helpers (`AnsiColorForLevel`, `StripAnsiSgrSequences`) into `helpers.h` - Strip upstream ANSI SGR escapes in non-color output mode. This enables colour in log messages without polluting log files with ANSI control sequences - Move `RotatingFileSink`, `JsonFormatter`, and `FullFormatter` from header-only to pimpl with `.cpp` files ### CLI / Exec Refactoring - Extract `ExecSessionRunner` class from ~920-line `ExecUsingSession` into focused methods and a `ExecSessionConfig` struct - Replace monolithic `ExecCommand` with subcommand-based architecture (`http`, `inproc`, `beacon`, `dump`, `buildlog`) - Allow parent options to appear after subcommand name by parsing subcommand args permissively and forwarding unmatched tokens to the parent parser ### Testing Improvements - Fix `--test-suite` filter being ignored due to accumulation with default wildcard filter - Add test suite banners to test listener output - Made `function.session.abandon_pending` test more robust ### Startup / Reliability Fixes - Fix silent exit when a second zenserver instance detects a port conflict — use `ZEN_CONSOLE_*` for log calls that precede `InitializeLogging()` - Fix two potential SIGSEGV paths during early startup: guard `sentry_options_new()` returning nullptr, and throw on `ZenServerState::Register()` returning nullptr instead of dereferencing - Fail on unrecognized zenserver `--mode` instead of silently defaulting to store ### Other - Show host details (hostname, platform, CPU count, memory) when discovering new compute workers - Move frontend `html.zip` from source tree into build directory - Add format specifications for Compact Binary and Compressed Buffer wire formats - Add `WriteCompactBinaryObject` to zencore - Extended `ConsoleTui` with additional functionality - Add `--vscode` option to `xmake sln` for clangd / `compile_commands.json` support - Disable compute/horde/nomad in release builds (not yet production-ready) - Disable unintended `ASIO_HAS_IO_URING` enablement - Fix crashpad patch missing leading whitespace - Clean up code triggering gcc false positives
Diffstat (limited to 'src/zencompute/runners/remotehttprunner.h')
-rw-r--r--src/zencompute/runners/remotehttprunner.h23
1 files changed, 20 insertions, 3 deletions
diff --git a/src/zencompute/runners/remotehttprunner.h b/src/zencompute/runners/remotehttprunner.h
index 9119992a9..c17d0cf2a 100644
--- a/src/zencompute/runners/remotehttprunner.h
+++ b/src/zencompute/runners/remotehttprunner.h
@@ -14,9 +14,11 @@
# include <zencore/workthreadpool.h>
# include <zencore/zencore.h>
# include <zenhttp/httpclient.h>
+# include <zenhttp/httpwsclient.h>
# include <atomic>
# include <filesystem>
+# include <memory>
# include <thread>
# include <unordered_map>
@@ -32,7 +34,7 @@ namespace zen::compute {
*/
-class RemoteHttpRunner : public FunctionRunner
+class RemoteHttpRunner : public FunctionRunner, private IWsClientHandler
{
RemoteHttpRunner(RemoteHttpRunner&&) = delete;
RemoteHttpRunner& operator=(RemoteHttpRunner&&) = delete;
@@ -45,7 +47,7 @@ public:
~RemoteHttpRunner();
virtual void Shutdown() override;
- virtual void RegisterWorker(const CbPackage& WorkerPackage) override;
+ [[nodiscard]] virtual bool RegisterWorker(const CbPackage& WorkerPackage) override;
[[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) override;
[[nodiscard]] virtual bool IsHealthy() override;
[[nodiscard]] virtual size_t GetSubmittedActionCount() override;
@@ -66,7 +68,9 @@ private:
std::string m_BaseUrl;
HttpClient m_Http;
- int32_t m_MaxRunningActions = 256; // arbitrary limit for testing
+ std::atomic<bool> m_AcceptNewActions{true};
+ int32_t m_MaxRunningActions = 256; // arbitrary limit for testing
+ int32_t m_MaxBatchSize = 50;
struct HttpRunningAction
{
@@ -92,7 +96,20 @@ private:
// creating remote queues. Generated once at construction and never changes.
Oid m_InstanceId;
+ // WebSocket completion notification client
+ std::unique_ptr<HttpWsClient> m_WsClient;
+ std::atomic<bool> m_WsConnected{false};
+
+ // IWsClientHandler
+ void OnWsOpen() override;
+ void OnWsMessage(const WebSocketMessage& Msg) override;
+ void OnWsClose(uint16_t Code, std::string_view Reason) override;
+
Oid EnsureRemoteQueue(int QueueId, const CbObject& Metadata, const CbObject& Config);
+
+ std::vector<SubmitResult> SubmitActionBatch(const std::string& SubmitUrl, const std::vector<Ref<RunnerAction>>& Actions);
+ std::vector<SubmitResult> ParseBatchResponse(const HttpClient::Response& Response, const std::vector<Ref<RunnerAction>>& Actions);
+ std::vector<SubmitResult> FallbackToIndividualSubmit(const std::vector<Ref<RunnerAction>>& Actions);
};
} // namespace zen::compute