diff options
| author | Stefan Boberg <[email protected]> | 2026-03-18 11:19:10 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-18 11:19:10 +0100 |
| commit | eba410c4168e23d7908827eb34b7cf0c58a5dc48 (patch) | |
| tree | 3cda8e8f3f81941d3bb5b84a8155350c5bb2068c /src/zenserver-test/compute-tests.cpp | |
| parent | bugfix release - v5.7.23 (#851) (diff) | |
| download | zen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.tar.xz zen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.zip | |
Compute batching (#849)
### Compute Batch Submission
- Consolidate duplicated action submission logic in `httpcomputeservice` into a single `HandleSubmitAction` supporting both single-action and batch (actions array) payloads
- Group actions by queue in `RemoteHttpRunner` and submit as batches with configurable chunk size, falling back to individual submission on failure
- Extract shared helpers: `MakeErrorResult`, `ValidateQueueForEnqueue`, `ActivateActionInQueue`, `RemoveActionFromActiveMaps`
### Retracted Action State
- Add `Retracted` state to `RunnerAction` for retry-free rescheduling — an explicit request to pull an action back and reschedule it on a different runner without incrementing `RetryCount`
- Implement idempotent `RetractAction()` on `RunnerAction` and `ComputeServiceSession`
- Add `POST jobs/{lsn}/retract` and `queues/{queueref}/jobs/{lsn}/retract` HTTP endpoints
- Add state machine documentation and per-state comments to `RunnerAction`
### Compute Race Fixes
- Fix race in `HandleActionUpdates` where actions enqueued between session abandon and scheduler tick were never abandoned, causing `GetActionResult` to return 202 indefinitely
- Fix queue `ActiveCount` race where `NotifyQueueActionComplete` was called after releasing `m_ResultsLock`, allowing callers to observe stale counters immediately after `GetActionResult` returned OK
### Logging Optimization and ANSI improvements
- Improve `AnsiColorStdoutSink` write efficiency — single write call, dirty-flag flush, `RwLock` instead of `std::mutex`
- Move ANSI color emission from sink into formatters via `Formatter::SetColorEnabled()`; remove `ColorRangeStart`/`End` from `LogMessage`
- Extract color helpers (`AnsiColorForLevel`, `StripAnsiSgrSequences`) into `helpers.h`
- Strip upstream ANSI SGR escapes in non-color output mode. This enables colour in log messages without polluting log files with ANSI control sequences
- Move `RotatingFileSink`, `JsonFormatter`, and `FullFormatter` from header-only to pimpl with `.cpp` files
### CLI / Exec Refactoring
- Extract `ExecSessionRunner` class from ~920-line `ExecUsingSession` into focused methods and a `ExecSessionConfig` struct
- Replace monolithic `ExecCommand` with subcommand-based architecture (`http`, `inproc`, `beacon`, `dump`, `buildlog`)
- Allow parent options to appear after subcommand name by parsing subcommand args permissively and forwarding unmatched tokens to the parent parser
### Testing Improvements
- Fix `--test-suite` filter being ignored due to accumulation with default wildcard filter
- Add test suite banners to test listener output
- Made `function.session.abandon_pending` test more robust
### Startup / Reliability Fixes
- Fix silent exit when a second zenserver instance detects a port conflict — use `ZEN_CONSOLE_*` for log calls that precede `InitializeLogging()`
- Fix two potential SIGSEGV paths during early startup: guard `sentry_options_new()` returning nullptr, and throw on `ZenServerState::Register()` returning nullptr instead of dereferencing
- Fail on unrecognized zenserver `--mode` instead of silently defaulting to store
### Other
- Show host details (hostname, platform, CPU count, memory) when discovering new compute workers
- Move frontend `html.zip` from source tree into build directory
- Add format specifications for Compact Binary and Compressed Buffer wire formats
- Add `WriteCompactBinaryObject` to zencore
- Extended `ConsoleTui` with additional functionality
- Add `--vscode` option to `xmake sln` for clangd / `compile_commands.json` support
- Disable compute/horde/nomad in release builds (not yet production-ready)
- Disable unintended `ASIO_HAS_IO_URING` enablement
- Fix crashpad patch missing leading whitespace
- Clean up code triggering gcc false positives
Diffstat (limited to 'src/zenserver-test/compute-tests.cpp')
| -rw-r--r-- | src/zenserver-test/compute-tests.cpp | 397 |
1 files changed, 359 insertions, 38 deletions
diff --git a/src/zenserver-test/compute-tests.cpp b/src/zenserver-test/compute-tests.cpp index c90ac5d8b..021052a3b 100644 --- a/src/zenserver-test/compute-tests.cpp +++ b/src/zenserver-test/compute-tests.cpp @@ -19,6 +19,7 @@ # include <zencore/timer.h> # include <zenhttp/httpclient.h> # include <zenhttp/httpserver.h> +# include <zenhttp/websocket.h> # include <zencompute/computeservice.h> # include <zenstore/zenstore.h> # include <zenutil/zenserverprocess.h> @@ -291,7 +292,9 @@ GetRot13Output(const CbPackage& ResultPackage) } // Mock orchestrator HTTP service that serves GET /orch/agents with a controllable response. -class MockOrchestratorService : public HttpService +// Also implements IWebSocketHandler so the compute session's WS subscription receives +// push notifications when the worker list changes. +class MockOrchestratorService : public HttpService, public IWebSocketHandler { public: MockOrchestratorService() @@ -318,13 +321,48 @@ public: void SetWorkerList(CbObject WorkerList) { - RwLock::ExclusiveLockScope Lock(m_Lock); - m_WorkerList = std::move(WorkerList); + { + RwLock::ExclusiveLockScope Lock(m_Lock); + m_WorkerList = std::move(WorkerList); + } + + // Broadcast a poke to all connected WebSocket clients so they + // immediately re-query the orchestrator instead of waiting for the poll. + std::vector<Ref<WebSocketConnection>> Snapshot; + m_WsLock.WithSharedLock([&] { Snapshot = m_WsConnections; }); + for (auto& Conn : Snapshot) + { + if (Conn->IsOpen()) + { + Conn->SendText("updated"sv); + } + } + } + + // IWebSocketHandler + void OnWebSocketOpen(Ref<WebSocketConnection> Connection) override + { + m_WsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); }); + } + + void OnWebSocketMessage(WebSocketConnection&, const WebSocketMessage&) override {} + + void OnWebSocketClose(WebSocketConnection& Conn, uint16_t, std::string_view) override + { + m_WsLock.WithExclusiveLock([&] { + auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&](const Ref<WebSocketConnection>& C) { + return C.Get() == &Conn; + }); + m_WsConnections.erase(It, m_WsConnections.end()); + }); } private: RwLock m_Lock; CbObject m_WorkerList; + + RwLock m_WsLock; + std::vector<Ref<WebSocketConnection>> m_WsConnections; }; // Manages in-process ASIO HTTP server lifecycle for mock orchestrator. @@ -1089,9 +1127,8 @@ TEST_CASE("function.remote.worker_sync_on_discovery") InMemoryChunkResolver Resolver; ScopedTemporaryDirectory SessionBaseDir; zen::compute::ComputeServiceSession Session(Resolver); - Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint()); - Session.SetOrchestratorBasePath(SessionBaseDir.Path()); - Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); + Session.SetOrchestrator(MockOrch.GetEndpoint(), SessionBaseDir.Path()); + Session.Ready(); // Register worker on session (stored locally, no runners yet) CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); @@ -1100,8 +1137,9 @@ TEST_CASE("function.remote.worker_sync_on_discovery") // Update mock orchestrator to advertise the real server MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", ServerUri}})); - // Wait for scheduler to discover the runner (~5s throttle + margin) - Sleep(7'000); + // Trigger immediate orchestrator re-query and wait for runner setup + Session.NotifyOrchestratorChanged(); + Sleep(2'000); // Submit Rot13 action via session CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver); @@ -1153,15 +1191,15 @@ TEST_CASE("function.remote.late_runner_discovery") InMemoryChunkResolver Resolver; ScopedTemporaryDirectory SessionBaseDir; zen::compute::ComputeServiceSession Session(Resolver); - Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint()); - Session.SetOrchestratorBasePath(SessionBaseDir.Path()); - Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); + Session.SetOrchestrator(MockOrch.GetEndpoint(), SessionBaseDir.Path()); + Session.Ready(); CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); Session.RegisterWorker(WorkerPackage); // Wait for W1 discovery - Sleep(7'000); + Session.NotifyOrchestratorChanged(); + Sleep(2'000); // Baseline: submit Rot13 action and verify it completes on W1 { @@ -1202,7 +1240,8 @@ TEST_CASE("function.remote.late_runner_discovery") MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", ServerUri1}, {"worker-2", ServerUri2}})); // Wait for W2 discovery - Sleep(7'000); + Session.NotifyOrchestratorChanged(); + Sleep(2'000); // Verify W2 received the worker by querying its /compute/workers endpoint directly { @@ -1274,16 +1313,16 @@ TEST_CASE("function.remote.queue_association") InMemoryChunkResolver Resolver; ScopedTemporaryDirectory SessionBaseDir; zen::compute::ComputeServiceSession Session(Resolver); - Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint()); - Session.SetOrchestratorBasePath(SessionBaseDir.Path()); - Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); + Session.SetOrchestrator(MockOrch.GetEndpoint(), SessionBaseDir.Path()); + Session.Ready(); // Register worker on session CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); Session.RegisterWorker(WorkerPackage); // Wait for scheduler to discover the runner - Sleep(7'000); + Session.NotifyOrchestratorChanged(); + Sleep(2'000); // Create a local queue and submit action to it auto QueueResult = Session.CreateQueue(); @@ -1353,16 +1392,16 @@ TEST_CASE("function.remote.queue_cancel_propagation") InMemoryChunkResolver Resolver; ScopedTemporaryDirectory SessionBaseDir; zen::compute::ComputeServiceSession Session(Resolver); - Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint()); - Session.SetOrchestratorBasePath(SessionBaseDir.Path()); - Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); + Session.SetOrchestrator(MockOrch.GetEndpoint(), SessionBaseDir.Path()); + Session.Ready(); // Register worker on session CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); Session.RegisterWorker(WorkerPackage); // Wait for scheduler to discover the runner - Sleep(7'000); + Session.NotifyOrchestratorChanged(); + Sleep(2'000); // Create a local queue and submit a long-running Sleep action auto QueueResult = Session.CreateQueue(); @@ -1496,7 +1535,7 @@ TEST_CASE("function.session.abandon_pending") InMemoryChunkResolver Resolver; ScopedTemporaryDirectory SessionBaseDir; zen::compute::ComputeServiceSession Session(Resolver); - Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); + Session.Ready(); CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); Session.RegisterWorker(WorkerPackage); @@ -1515,19 +1554,29 @@ TEST_CASE("function.session.abandon_pending") REQUIRE_MESSAGE(Enqueue3, "Failed to enqueue action 3"); // Transition to Abandoned — should mark all pending actions as Abandoned - bool Transitioned = Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Abandoned); + bool Transitioned = Session.Abandon(); CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned"); CHECK(Session.GetSessionState() == zen::compute::ComputeServiceSession::SessionState::Abandoned); CHECK(!Session.IsHealthy()); - // Give the scheduler thread time to process the state changes - Sleep(2'000); - - // All three actions should now be in the results map as abandoned + // Poll until the scheduler thread has processed all abandoned actions into + // the results map. The abandon is asynchronous: AbandonAllActions() sets + // action state and posts updates, but HandleActionUpdates() on the + // scheduler thread must run before results are queryable. + Stopwatch Timer; for (int Lsn : {Enqueue1.Lsn, Enqueue2.Lsn, Enqueue3.Lsn}) { CbPackage Result; - HttpResponseCode Code = Session.GetActionResult(Lsn, Result); + HttpResponseCode Code = HttpResponseCode::Accepted; + while (Timer.GetElapsedTimeMs() < 10'000) + { + Code = Session.GetActionResult(Lsn, Result); + if (Code == HttpResponseCode::OK) + { + break; + } + Sleep(100); + } CHECK_MESSAGE(Code == HttpResponseCode::OK, fmt::format("Expected action LSN {} to be in results (got {})", Lsn, int(Code))); } @@ -1561,15 +1610,15 @@ TEST_CASE("function.session.abandon_running") InMemoryChunkResolver Resolver; ScopedTemporaryDirectory SessionBaseDir; zen::compute::ComputeServiceSession Session(Resolver); - Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint()); - Session.SetOrchestratorBasePath(SessionBaseDir.Path()); - Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); + Session.SetOrchestrator(MockOrch.GetEndpoint(), SessionBaseDir.Path()); + Session.Ready(); CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); Session.RegisterWorker(WorkerPackage); // Wait for scheduler to discover the runner - Sleep(7'000); + Session.NotifyOrchestratorChanged(); + Sleep(2'000); // Create a queue and submit a long-running Sleep action auto QueueResult = Session.CreateQueue(); @@ -1585,7 +1634,7 @@ TEST_CASE("function.session.abandon_running") Sleep(2'000); // Transition to Abandoned — should abandon the running action - bool Transitioned = Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Abandoned); + bool Transitioned = Session.Abandon(); CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned"); CHECK(!Session.IsHealthy()); @@ -1631,16 +1680,16 @@ TEST_CASE("function.remote.abandon_propagation") InMemoryChunkResolver Resolver; ScopedTemporaryDirectory SessionBaseDir; zen::compute::ComputeServiceSession Session(Resolver); - Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint()); - Session.SetOrchestratorBasePath(SessionBaseDir.Path()); - Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); + Session.SetOrchestrator(MockOrch.GetEndpoint(), SessionBaseDir.Path()); + Session.Ready(); // Register worker on session CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); Session.RegisterWorker(WorkerPackage); // Wait for scheduler to discover the runner - Sleep(7'000); + Session.NotifyOrchestratorChanged(); + Sleep(2'000); // Create a local queue and submit a long-running Sleep action auto QueueResult = Session.CreateQueue(); @@ -1656,7 +1705,7 @@ TEST_CASE("function.remote.abandon_propagation") Sleep(2'000); // Transition to Abandoned — should abandon the running action and propagate - bool Transitioned = Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Abandoned); + bool Transitioned = Session.Abandon(); CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned"); // Poll for the action to complete @@ -1693,6 +1742,278 @@ TEST_CASE("function.remote.abandon_propagation") Session.Shutdown(); } +TEST_CASE("function.remote.shutdown_cancels_queues") +{ + // Verify that Session.Shutdown() cancels remote queues on the compute node. + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + REQUIRE_MESSAGE(Instance.SpawnServerAndWaitUntilReady() != 0, Instance.GetLogOutput()); + + MockOrchestratorFixture MockOrch; + MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", Instance.GetBaseUri()}})); + + InMemoryChunkResolver Resolver; + ScopedTemporaryDirectory SessionBaseDir; + zen::compute::ComputeServiceSession Session(Resolver); + Session.SetOrchestrator(MockOrch.GetEndpoint(), SessionBaseDir.Path()); + Session.Ready(); + + CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); + Session.RegisterWorker(WorkerPackage); + + Session.NotifyOrchestratorChanged(); + Sleep(2'000); + + // Create a queue and submit a long-running action so the remote queue is established + auto QueueResult = Session.CreateQueue(); + REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create local queue"); + const int QueueId = QueueResult.QueueId; + + CbObject ActionObj = BuildSleepActionForSession("data"sv, 30'000, Resolver); + + auto EnqueueRes = Session.EnqueueActionToQueue(QueueId, ActionObj, 0); + REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed"); + + // Wait for the action to start running on the remote + Sleep(2'000); + + // Verify the remote has a non-implicit queue before shutdown + HttpClient RemoteClient(Instance.GetBaseUri() + "/compute"); + { + HttpClient::Response QueuesResp = RemoteClient.Get("/queues"sv); + REQUIRE_MESSAGE(QueuesResp, "Failed to list queues on remote server before shutdown"); + + bool RemoteQueueFound = false; + for (auto& Item : QueuesResp.AsObject()["queues"sv]) + { + if (!Item.AsObjectView()["implicit"sv].AsBool()) + { + RemoteQueueFound = true; + break; + } + } + REQUIRE_MESSAGE(RemoteQueueFound, "Expected remote queue to exist before shutdown"); + } + + // Shut down the session — this should cancel all remote queues + Session.Shutdown(); + + // Verify the remote queue is now cancelled + { + HttpClient::Response QueuesResp = RemoteClient.Get("/queues"sv); + REQUIRE_MESSAGE(QueuesResp, "Failed to list queues on remote server after shutdown"); + + bool RemoteQueueCancelled = false; + for (auto& Item : QueuesResp.AsObject()["queues"sv]) + { + if (!Item.AsObjectView()["implicit"sv].AsBool()) + { + RemoteQueueCancelled = std::string(Item.AsObjectView()["state"sv].AsString()) == "cancelled"; + break; + } + } + CHECK_MESSAGE(RemoteQueueCancelled, "Expected remote queue to be cancelled after session shutdown"); + } +} + +TEST_CASE("function.remote.shutdown_rejects_new_work") +{ + // Verify that after Shutdown() the remote runner rejects new submissions. + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + REQUIRE_MESSAGE(Instance.SpawnServerAndWaitUntilReady() != 0, Instance.GetLogOutput()); + + MockOrchestratorFixture MockOrch; + MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", Instance.GetBaseUri()}})); + + InMemoryChunkResolver Resolver; + ScopedTemporaryDirectory SessionBaseDir; + zen::compute::ComputeServiceSession Session(Resolver); + Session.SetOrchestrator(MockOrch.GetEndpoint(), SessionBaseDir.Path()); + Session.Ready(); + + CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); + Session.RegisterWorker(WorkerPackage); + + // Wait for runner discovery + Session.NotifyOrchestratorChanged(); + Sleep(2'000); + + // Baseline: submit an action and verify it completes + { + CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver); + + auto EnqueueRes = Session.EnqueueAction(ActionObj, 0); + REQUIRE_MESSAGE(EnqueueRes, "Baseline action enqueue failed"); + + CbPackage ResultPackage; + HttpResponseCode ResultCode = HttpResponseCode::Accepted; + Stopwatch Timer; + + while (Timer.GetElapsedTimeMs() < 30'000) + { + ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage); + if (ResultCode == HttpResponseCode::OK) + { + break; + } + Sleep(200); + } + + REQUIRE_MESSAGE(ResultCode == HttpResponseCode::OK, + fmt::format("Baseline action did not complete in time\nServer log:\n{}", Instance.GetLogOutput())); + CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv); + } + + // Shut down — the remote runner should now reject new work + Session.Shutdown(); + + // Attempting to enqueue after shutdown should fail (session is in Sunset state) + CbObject ActionObj = BuildRot13ActionForSession("rejected"sv, Resolver); + auto Rejected = Session.EnqueueAction(ActionObj, 0); + CHECK_MESSAGE(!Rejected, "Expected action submission to be rejected after shutdown"); +} + +TEST_CASE("function.session.retract_pending") +{ + // Create a session with no runners so actions stay pending + InMemoryChunkResolver Resolver; + ScopedTemporaryDirectory SessionBaseDir; + zen::compute::ComputeServiceSession Session(Resolver); + Session.Ready(); + + CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); + Session.RegisterWorker(WorkerPackage); + + auto QueueResult = Session.CreateQueue(); + REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create queue"); + + CbObject ActionObj = BuildRot13ActionForSession("retract-test"sv, Resolver); + + auto Enqueued = Session.EnqueueActionToQueue(QueueResult.QueueId, ActionObj, 0); + REQUIRE_MESSAGE(Enqueued, "Failed to enqueue action"); + + // Let the scheduler process the pending action + Sleep(500); + + // Retract the pending action + auto Result = Session.RetractAction(Enqueued.Lsn); + CHECK_MESSAGE(Result.Success, fmt::format("RetractAction failed: {}", Result.Error)); + CHECK_EQ(Result.RetryCount, 0); // Retract should NOT increment retry count + + // The action should be re-enqueued as pending (still no runners, so stays pending). + // Let the scheduler process the retracted action back to pending. + Sleep(500); + + // Queue should still show 1 active (the action was rescheduled, not completed) + auto Status = Session.GetQueueStatus(QueueResult.QueueId); + CHECK_EQ(Status.ActiveCount, 1); + CHECK_EQ(Status.CompletedCount, 0); + CHECK_EQ(Status.AbandonedCount, 0); + CHECK_EQ(Status.CancelledCount, 0); + + // The action result should NOT be in the results map (it's pending again) + CbPackage ResultPackage; + HttpResponseCode Code = Session.GetActionResult(Enqueued.Lsn, ResultPackage); + CHECK(Code != HttpResponseCode::OK); + + Session.Shutdown(); +} + +TEST_CASE("function.session.retract_not_terminal") +{ + // Verify that a completed action cannot be retracted + InMemoryChunkResolver Resolver; + ScopedTemporaryDirectory SessionBaseDir; + zen::compute::ComputeServiceSession Session(Resolver); + Session.SetOrchestratorBasePath(SessionBaseDir.Path()); + Session.AddLocalRunner(Resolver, SessionBaseDir.Path()); + Session.Ready(); + + CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); + Session.RegisterWorker(WorkerPackage); + + CbObject ActionObj = BuildRot13ActionForSession("retract-completed"sv, Resolver); + + auto Enqueued = Session.EnqueueAction(ActionObj, 0); + REQUIRE_MESSAGE(Enqueued, "Failed to enqueue action"); + + // Wait for the action to complete + CbPackage ResultPackage; + HttpResponseCode Code = HttpResponseCode::Accepted; + Stopwatch Timer; + + while (Timer.GetElapsedTimeMs() < 30'000) + { + Code = Session.GetActionResult(Enqueued.Lsn, ResultPackage); + if (Code == HttpResponseCode::OK) + { + break; + } + Sleep(200); + } + + REQUIRE_MESSAGE(Code == HttpResponseCode::OK, "Action did not complete within timeout"); + + // Retract should fail — action already completed (no longer in pending/running maps) + auto RetractResult = Session.RetractAction(Enqueued.Lsn); + CHECK(!RetractResult.Success); + + Session.Shutdown(); +} + +TEST_CASE("function.retract_http") +{ + // Use max-actions=1 with a long sleep to hold the slot, then submit a second + // action that will stay pending and can be retracted via the HTTP endpoint. + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady("--max-actions=1"); + REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); + HttpClient Client(ComputeBaseUri); + + const IoHash WorkerId = RegisterWorker(Client, TestEnv); + + // Submit a long-running Sleep action to occupy the single execution slot + const std::string BlockerUrl = fmt::format("/jobs/{}", WorkerId.ToHexString()); + HttpClient::Response BlockerResp = Client.Post(BlockerUrl, BuildSleepActionPackage("data"sv, 30'000)); + REQUIRE_MESSAGE(BlockerResp, fmt::format("Blocker submission failed: status={}", int(BlockerResp.StatusCode))); + + // Submit a second action — it will stay pending because the slot is occupied + HttpClient::Response SubmitResp = Client.Post(BlockerUrl, BuildRot13ActionPackage("Retract HTTP Test"sv)); + REQUIRE_MESSAGE(SubmitResp, fmt::format("Job submission failed: status={}", int(SubmitResp.StatusCode))); + + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from job submission"); + + // Wait for the scheduler to process the pending action into m_PendingActions + Sleep(1'000); + + // Retract the pending action via POST /jobs/{lsn}/retract + const std::string RetractUrl = fmt::format("/jobs/{}/retract", Lsn); + HttpClient::Response RetractResp = Client.Post(RetractUrl); + CHECK_MESSAGE(RetractResp.StatusCode == HttpResponseCode::OK, + fmt::format("Retract failed: status={}, body={}\nServer log:\n{}", + int(RetractResp.StatusCode), + RetractResp.ToText(), + Instance.GetLogOutput())); + + if (RetractResp.StatusCode == HttpResponseCode::OK) + { + CbObject RespObj = RetractResp.AsObject(); + CHECK(RespObj["success"sv].AsBool()); + CHECK_EQ(RespObj["lsn"sv].AsInt32(), Lsn); + } + + // A second retract should also succeed (action is back to pending) + Sleep(500); + HttpClient::Response RetractResp2 = Client.Post(RetractUrl); + CHECK_MESSAGE(RetractResp2.StatusCode == HttpResponseCode::OK, + fmt::format("Second retract failed: status={}, body={}", int(RetractResp2.StatusCode), RetractResp2.ToText())); +} + TEST_SUITE_END(); } // namespace zen::tests::compute |