aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver-test
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-18 11:20:36 +0100
committerGitHub Enterprise <[email protected]>2026-03-18 11:20:36 +0100
commit60a90b5faa6ae885ebf852b98412aba2dec1a722 (patch)
treecf1eed74d0360095e50635e04f66452e56a4d99c /src/zenserver-test
parentMerge branch 'main' into sb/local-crashreports (diff)
parentCompute batching (#849) (diff)
downloadzen-sb/local-crashreports.tar.xz
zen-sb/local-crashreports.zip
Merge branch 'main' into sb/local-crashreportssb/local-crashreports
Diffstat (limited to 'src/zenserver-test')
-rw-r--r--src/zenserver-test/compute-tests.cpp397
-rw-r--r--src/zenserver-test/zenserver-test.cpp4
2 files changed, 361 insertions, 40 deletions
diff --git a/src/zenserver-test/compute-tests.cpp b/src/zenserver-test/compute-tests.cpp
index c90ac5d8b..021052a3b 100644
--- a/src/zenserver-test/compute-tests.cpp
+++ b/src/zenserver-test/compute-tests.cpp
@@ -19,6 +19,7 @@
# include <zencore/timer.h>
# include <zenhttp/httpclient.h>
# include <zenhttp/httpserver.h>
+# include <zenhttp/websocket.h>
# include <zencompute/computeservice.h>
# include <zenstore/zenstore.h>
# include <zenutil/zenserverprocess.h>
@@ -291,7 +292,9 @@ GetRot13Output(const CbPackage& ResultPackage)
}
// Mock orchestrator HTTP service that serves GET /orch/agents with a controllable response.
-class MockOrchestratorService : public HttpService
+// Also implements IWebSocketHandler so the compute session's WS subscription receives
+// push notifications when the worker list changes.
+class MockOrchestratorService : public HttpService, public IWebSocketHandler
{
public:
MockOrchestratorService()
@@ -318,13 +321,48 @@ public:
void SetWorkerList(CbObject WorkerList)
{
- RwLock::ExclusiveLockScope Lock(m_Lock);
- m_WorkerList = std::move(WorkerList);
+ {
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+ m_WorkerList = std::move(WorkerList);
+ }
+
+ // Broadcast a poke to all connected WebSocket clients so they
+ // immediately re-query the orchestrator instead of waiting for the poll.
+ std::vector<Ref<WebSocketConnection>> Snapshot;
+ m_WsLock.WithSharedLock([&] { Snapshot = m_WsConnections; });
+ for (auto& Conn : Snapshot)
+ {
+ if (Conn->IsOpen())
+ {
+ Conn->SendText("updated"sv);
+ }
+ }
+ }
+
+ // IWebSocketHandler
+ void OnWebSocketOpen(Ref<WebSocketConnection> Connection) override
+ {
+ m_WsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); });
+ }
+
+ void OnWebSocketMessage(WebSocketConnection&, const WebSocketMessage&) override {}
+
+ void OnWebSocketClose(WebSocketConnection& Conn, uint16_t, std::string_view) override
+ {
+ m_WsLock.WithExclusiveLock([&] {
+ auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&](const Ref<WebSocketConnection>& C) {
+ return C.Get() == &Conn;
+ });
+ m_WsConnections.erase(It, m_WsConnections.end());
+ });
}
private:
RwLock m_Lock;
CbObject m_WorkerList;
+
+ RwLock m_WsLock;
+ std::vector<Ref<WebSocketConnection>> m_WsConnections;
};
// Manages in-process ASIO HTTP server lifecycle for mock orchestrator.
@@ -1089,9 +1127,8 @@ TEST_CASE("function.remote.worker_sync_on_discovery")
InMemoryChunkResolver Resolver;
ScopedTemporaryDirectory SessionBaseDir;
zen::compute::ComputeServiceSession Session(Resolver);
- Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint());
- Session.SetOrchestratorBasePath(SessionBaseDir.Path());
- Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready);
+ Session.SetOrchestrator(MockOrch.GetEndpoint(), SessionBaseDir.Path());
+ Session.Ready();
// Register worker on session (stored locally, no runners yet)
CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
@@ -1100,8 +1137,9 @@ TEST_CASE("function.remote.worker_sync_on_discovery")
// Update mock orchestrator to advertise the real server
MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", ServerUri}}));
- // Wait for scheduler to discover the runner (~5s throttle + margin)
- Sleep(7'000);
+ // Trigger immediate orchestrator re-query and wait for runner setup
+ Session.NotifyOrchestratorChanged();
+ Sleep(2'000);
// Submit Rot13 action via session
CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver);
@@ -1153,15 +1191,15 @@ TEST_CASE("function.remote.late_runner_discovery")
InMemoryChunkResolver Resolver;
ScopedTemporaryDirectory SessionBaseDir;
zen::compute::ComputeServiceSession Session(Resolver);
- Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint());
- Session.SetOrchestratorBasePath(SessionBaseDir.Path());
- Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready);
+ Session.SetOrchestrator(MockOrch.GetEndpoint(), SessionBaseDir.Path());
+ Session.Ready();
CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
Session.RegisterWorker(WorkerPackage);
// Wait for W1 discovery
- Sleep(7'000);
+ Session.NotifyOrchestratorChanged();
+ Sleep(2'000);
// Baseline: submit Rot13 action and verify it completes on W1
{
@@ -1202,7 +1240,8 @@ TEST_CASE("function.remote.late_runner_discovery")
MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", ServerUri1}, {"worker-2", ServerUri2}}));
// Wait for W2 discovery
- Sleep(7'000);
+ Session.NotifyOrchestratorChanged();
+ Sleep(2'000);
// Verify W2 received the worker by querying its /compute/workers endpoint directly
{
@@ -1274,16 +1313,16 @@ TEST_CASE("function.remote.queue_association")
InMemoryChunkResolver Resolver;
ScopedTemporaryDirectory SessionBaseDir;
zen::compute::ComputeServiceSession Session(Resolver);
- Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint());
- Session.SetOrchestratorBasePath(SessionBaseDir.Path());
- Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready);
+ Session.SetOrchestrator(MockOrch.GetEndpoint(), SessionBaseDir.Path());
+ Session.Ready();
// Register worker on session
CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
Session.RegisterWorker(WorkerPackage);
// Wait for scheduler to discover the runner
- Sleep(7'000);
+ Session.NotifyOrchestratorChanged();
+ Sleep(2'000);
// Create a local queue and submit action to it
auto QueueResult = Session.CreateQueue();
@@ -1353,16 +1392,16 @@ TEST_CASE("function.remote.queue_cancel_propagation")
InMemoryChunkResolver Resolver;
ScopedTemporaryDirectory SessionBaseDir;
zen::compute::ComputeServiceSession Session(Resolver);
- Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint());
- Session.SetOrchestratorBasePath(SessionBaseDir.Path());
- Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready);
+ Session.SetOrchestrator(MockOrch.GetEndpoint(), SessionBaseDir.Path());
+ Session.Ready();
// Register worker on session
CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
Session.RegisterWorker(WorkerPackage);
// Wait for scheduler to discover the runner
- Sleep(7'000);
+ Session.NotifyOrchestratorChanged();
+ Sleep(2'000);
// Create a local queue and submit a long-running Sleep action
auto QueueResult = Session.CreateQueue();
@@ -1496,7 +1535,7 @@ TEST_CASE("function.session.abandon_pending")
InMemoryChunkResolver Resolver;
ScopedTemporaryDirectory SessionBaseDir;
zen::compute::ComputeServiceSession Session(Resolver);
- Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready);
+ Session.Ready();
CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
Session.RegisterWorker(WorkerPackage);
@@ -1515,19 +1554,29 @@ TEST_CASE("function.session.abandon_pending")
REQUIRE_MESSAGE(Enqueue3, "Failed to enqueue action 3");
// Transition to Abandoned — should mark all pending actions as Abandoned
- bool Transitioned = Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Abandoned);
+ bool Transitioned = Session.Abandon();
CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned");
CHECK(Session.GetSessionState() == zen::compute::ComputeServiceSession::SessionState::Abandoned);
CHECK(!Session.IsHealthy());
- // Give the scheduler thread time to process the state changes
- Sleep(2'000);
-
- // All three actions should now be in the results map as abandoned
+ // Poll until the scheduler thread has processed all abandoned actions into
+ // the results map. The abandon is asynchronous: AbandonAllActions() sets
+ // action state and posts updates, but HandleActionUpdates() on the
+ // scheduler thread must run before results are queryable.
+ Stopwatch Timer;
for (int Lsn : {Enqueue1.Lsn, Enqueue2.Lsn, Enqueue3.Lsn})
{
CbPackage Result;
- HttpResponseCode Code = Session.GetActionResult(Lsn, Result);
+ HttpResponseCode Code = HttpResponseCode::Accepted;
+ while (Timer.GetElapsedTimeMs() < 10'000)
+ {
+ Code = Session.GetActionResult(Lsn, Result);
+ if (Code == HttpResponseCode::OK)
+ {
+ break;
+ }
+ Sleep(100);
+ }
CHECK_MESSAGE(Code == HttpResponseCode::OK, fmt::format("Expected action LSN {} to be in results (got {})", Lsn, int(Code)));
}
@@ -1561,15 +1610,15 @@ TEST_CASE("function.session.abandon_running")
InMemoryChunkResolver Resolver;
ScopedTemporaryDirectory SessionBaseDir;
zen::compute::ComputeServiceSession Session(Resolver);
- Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint());
- Session.SetOrchestratorBasePath(SessionBaseDir.Path());
- Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready);
+ Session.SetOrchestrator(MockOrch.GetEndpoint(), SessionBaseDir.Path());
+ Session.Ready();
CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
Session.RegisterWorker(WorkerPackage);
// Wait for scheduler to discover the runner
- Sleep(7'000);
+ Session.NotifyOrchestratorChanged();
+ Sleep(2'000);
// Create a queue and submit a long-running Sleep action
auto QueueResult = Session.CreateQueue();
@@ -1585,7 +1634,7 @@ TEST_CASE("function.session.abandon_running")
Sleep(2'000);
// Transition to Abandoned — should abandon the running action
- bool Transitioned = Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Abandoned);
+ bool Transitioned = Session.Abandon();
CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned");
CHECK(!Session.IsHealthy());
@@ -1631,16 +1680,16 @@ TEST_CASE("function.remote.abandon_propagation")
InMemoryChunkResolver Resolver;
ScopedTemporaryDirectory SessionBaseDir;
zen::compute::ComputeServiceSession Session(Resolver);
- Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint());
- Session.SetOrchestratorBasePath(SessionBaseDir.Path());
- Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready);
+ Session.SetOrchestrator(MockOrch.GetEndpoint(), SessionBaseDir.Path());
+ Session.Ready();
// Register worker on session
CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
Session.RegisterWorker(WorkerPackage);
// Wait for scheduler to discover the runner
- Sleep(7'000);
+ Session.NotifyOrchestratorChanged();
+ Sleep(2'000);
// Create a local queue and submit a long-running Sleep action
auto QueueResult = Session.CreateQueue();
@@ -1656,7 +1705,7 @@ TEST_CASE("function.remote.abandon_propagation")
Sleep(2'000);
// Transition to Abandoned — should abandon the running action and propagate
- bool Transitioned = Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Abandoned);
+ bool Transitioned = Session.Abandon();
CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned");
// Poll for the action to complete
@@ -1693,6 +1742,278 @@ TEST_CASE("function.remote.abandon_propagation")
Session.Shutdown();
}
+TEST_CASE("function.remote.shutdown_cancels_queues")
+{
+ // Verify that Session.Shutdown() cancels remote queues on the compute node.
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ REQUIRE_MESSAGE(Instance.SpawnServerAndWaitUntilReady() != 0, Instance.GetLogOutput());
+
+ MockOrchestratorFixture MockOrch;
+ MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", Instance.GetBaseUri()}}));
+
+ InMemoryChunkResolver Resolver;
+ ScopedTemporaryDirectory SessionBaseDir;
+ zen::compute::ComputeServiceSession Session(Resolver);
+ Session.SetOrchestrator(MockOrch.GetEndpoint(), SessionBaseDir.Path());
+ Session.Ready();
+
+ CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
+ Session.RegisterWorker(WorkerPackage);
+
+ Session.NotifyOrchestratorChanged();
+ Sleep(2'000);
+
+ // Create a queue and submit a long-running action so the remote queue is established
+ auto QueueResult = Session.CreateQueue();
+ REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create local queue");
+ const int QueueId = QueueResult.QueueId;
+
+ CbObject ActionObj = BuildSleepActionForSession("data"sv, 30'000, Resolver);
+
+ auto EnqueueRes = Session.EnqueueActionToQueue(QueueId, ActionObj, 0);
+ REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed");
+
+ // Wait for the action to start running on the remote
+ Sleep(2'000);
+
+ // Verify the remote has a non-implicit queue before shutdown
+ HttpClient RemoteClient(Instance.GetBaseUri() + "/compute");
+ {
+ HttpClient::Response QueuesResp = RemoteClient.Get("/queues"sv);
+ REQUIRE_MESSAGE(QueuesResp, "Failed to list queues on remote server before shutdown");
+
+ bool RemoteQueueFound = false;
+ for (auto& Item : QueuesResp.AsObject()["queues"sv])
+ {
+ if (!Item.AsObjectView()["implicit"sv].AsBool())
+ {
+ RemoteQueueFound = true;
+ break;
+ }
+ }
+ REQUIRE_MESSAGE(RemoteQueueFound, "Expected remote queue to exist before shutdown");
+ }
+
+ // Shut down the session — this should cancel all remote queues
+ Session.Shutdown();
+
+ // Verify the remote queue is now cancelled
+ {
+ HttpClient::Response QueuesResp = RemoteClient.Get("/queues"sv);
+ REQUIRE_MESSAGE(QueuesResp, "Failed to list queues on remote server after shutdown");
+
+ bool RemoteQueueCancelled = false;
+ for (auto& Item : QueuesResp.AsObject()["queues"sv])
+ {
+ if (!Item.AsObjectView()["implicit"sv].AsBool())
+ {
+ RemoteQueueCancelled = std::string(Item.AsObjectView()["state"sv].AsString()) == "cancelled";
+ break;
+ }
+ }
+ CHECK_MESSAGE(RemoteQueueCancelled, "Expected remote queue to be cancelled after session shutdown");
+ }
+}
+
+TEST_CASE("function.remote.shutdown_rejects_new_work")
+{
+ // Verify that after Shutdown() the remote runner rejects new submissions.
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ REQUIRE_MESSAGE(Instance.SpawnServerAndWaitUntilReady() != 0, Instance.GetLogOutput());
+
+ MockOrchestratorFixture MockOrch;
+ MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", Instance.GetBaseUri()}}));
+
+ InMemoryChunkResolver Resolver;
+ ScopedTemporaryDirectory SessionBaseDir;
+ zen::compute::ComputeServiceSession Session(Resolver);
+ Session.SetOrchestrator(MockOrch.GetEndpoint(), SessionBaseDir.Path());
+ Session.Ready();
+
+ CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
+ Session.RegisterWorker(WorkerPackage);
+
+ // Wait for runner discovery
+ Session.NotifyOrchestratorChanged();
+ Sleep(2'000);
+
+ // Baseline: submit an action and verify it completes
+ {
+ CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver);
+
+ auto EnqueueRes = Session.EnqueueAction(ActionObj, 0);
+ REQUIRE_MESSAGE(EnqueueRes, "Baseline action enqueue failed");
+
+ CbPackage ResultPackage;
+ HttpResponseCode ResultCode = HttpResponseCode::Accepted;
+ Stopwatch Timer;
+
+ while (Timer.GetElapsedTimeMs() < 30'000)
+ {
+ ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage);
+ if (ResultCode == HttpResponseCode::OK)
+ {
+ break;
+ }
+ Sleep(200);
+ }
+
+ REQUIRE_MESSAGE(ResultCode == HttpResponseCode::OK,
+ fmt::format("Baseline action did not complete in time\nServer log:\n{}", Instance.GetLogOutput()));
+ CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv);
+ }
+
+ // Shut down — the remote runner should now reject new work
+ Session.Shutdown();
+
+ // Attempting to enqueue after shutdown should fail (session is in Sunset state)
+ CbObject ActionObj = BuildRot13ActionForSession("rejected"sv, Resolver);
+ auto Rejected = Session.EnqueueAction(ActionObj, 0);
+ CHECK_MESSAGE(!Rejected, "Expected action submission to be rejected after shutdown");
+}
+
+TEST_CASE("function.session.retract_pending")
+{
+ // Create a session with no runners so actions stay pending
+ InMemoryChunkResolver Resolver;
+ ScopedTemporaryDirectory SessionBaseDir;
+ zen::compute::ComputeServiceSession Session(Resolver);
+ Session.Ready();
+
+ CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
+ Session.RegisterWorker(WorkerPackage);
+
+ auto QueueResult = Session.CreateQueue();
+ REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create queue");
+
+ CbObject ActionObj = BuildRot13ActionForSession("retract-test"sv, Resolver);
+
+ auto Enqueued = Session.EnqueueActionToQueue(QueueResult.QueueId, ActionObj, 0);
+ REQUIRE_MESSAGE(Enqueued, "Failed to enqueue action");
+
+ // Let the scheduler process the pending action
+ Sleep(500);
+
+ // Retract the pending action
+ auto Result = Session.RetractAction(Enqueued.Lsn);
+ CHECK_MESSAGE(Result.Success, fmt::format("RetractAction failed: {}", Result.Error));
+ CHECK_EQ(Result.RetryCount, 0); // Retract should NOT increment retry count
+
+ // The action should be re-enqueued as pending (still no runners, so stays pending).
+ // Let the scheduler process the retracted action back to pending.
+ Sleep(500);
+
+ // Queue should still show 1 active (the action was rescheduled, not completed)
+ auto Status = Session.GetQueueStatus(QueueResult.QueueId);
+ CHECK_EQ(Status.ActiveCount, 1);
+ CHECK_EQ(Status.CompletedCount, 0);
+ CHECK_EQ(Status.AbandonedCount, 0);
+ CHECK_EQ(Status.CancelledCount, 0);
+
+ // The action result should NOT be in the results map (it's pending again)
+ CbPackage ResultPackage;
+ HttpResponseCode Code = Session.GetActionResult(Enqueued.Lsn, ResultPackage);
+ CHECK(Code != HttpResponseCode::OK);
+
+ Session.Shutdown();
+}
+
+TEST_CASE("function.session.retract_not_terminal")
+{
+ // Verify that a completed action cannot be retracted
+ InMemoryChunkResolver Resolver;
+ ScopedTemporaryDirectory SessionBaseDir;
+ zen::compute::ComputeServiceSession Session(Resolver);
+ Session.SetOrchestratorBasePath(SessionBaseDir.Path());
+ Session.AddLocalRunner(Resolver, SessionBaseDir.Path());
+ Session.Ready();
+
+ CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
+ Session.RegisterWorker(WorkerPackage);
+
+ CbObject ActionObj = BuildRot13ActionForSession("retract-completed"sv, Resolver);
+
+ auto Enqueued = Session.EnqueueAction(ActionObj, 0);
+ REQUIRE_MESSAGE(Enqueued, "Failed to enqueue action");
+
+ // Wait for the action to complete
+ CbPackage ResultPackage;
+ HttpResponseCode Code = HttpResponseCode::Accepted;
+ Stopwatch Timer;
+
+ while (Timer.GetElapsedTimeMs() < 30'000)
+ {
+ Code = Session.GetActionResult(Enqueued.Lsn, ResultPackage);
+ if (Code == HttpResponseCode::OK)
+ {
+ break;
+ }
+ Sleep(200);
+ }
+
+ REQUIRE_MESSAGE(Code == HttpResponseCode::OK, "Action did not complete within timeout");
+
+ // Retract should fail — action already completed (no longer in pending/running maps)
+ auto RetractResult = Session.RetractAction(Enqueued.Lsn);
+ CHECK(!RetractResult.Success);
+
+ Session.Shutdown();
+}
+
+TEST_CASE("function.retract_http")
+{
+ // Use max-actions=1 with a long sleep to hold the slot, then submit a second
+ // action that will stay pending and can be retracted via the HTTP endpoint.
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady("--max-actions=1");
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Submit a long-running Sleep action to occupy the single execution slot
+ const std::string BlockerUrl = fmt::format("/jobs/{}", WorkerId.ToHexString());
+ HttpClient::Response BlockerResp = Client.Post(BlockerUrl, BuildSleepActionPackage("data"sv, 30'000));
+ REQUIRE_MESSAGE(BlockerResp, fmt::format("Blocker submission failed: status={}", int(BlockerResp.StatusCode)));
+
+ // Submit a second action — it will stay pending because the slot is occupied
+ HttpClient::Response SubmitResp = Client.Post(BlockerUrl, BuildRot13ActionPackage("Retract HTTP Test"sv));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Job submission failed: status={}", int(SubmitResp.StatusCode)));
+
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from job submission");
+
+ // Wait for the scheduler to process the pending action into m_PendingActions
+ Sleep(1'000);
+
+ // Retract the pending action via POST /jobs/{lsn}/retract
+ const std::string RetractUrl = fmt::format("/jobs/{}/retract", Lsn);
+ HttpClient::Response RetractResp = Client.Post(RetractUrl);
+ CHECK_MESSAGE(RetractResp.StatusCode == HttpResponseCode::OK,
+ fmt::format("Retract failed: status={}, body={}\nServer log:\n{}",
+ int(RetractResp.StatusCode),
+ RetractResp.ToText(),
+ Instance.GetLogOutput()));
+
+ if (RetractResp.StatusCode == HttpResponseCode::OK)
+ {
+ CbObject RespObj = RetractResp.AsObject();
+ CHECK(RespObj["success"sv].AsBool());
+ CHECK_EQ(RespObj["lsn"sv].AsInt32(), Lsn);
+ }
+
+ // A second retract should also succeed (action is back to pending)
+ Sleep(500);
+ HttpClient::Response RetractResp2 = Client.Post(RetractUrl);
+ CHECK_MESSAGE(RetractResp2.StatusCode == HttpResponseCode::OK,
+ fmt::format("Second retract failed: status={}, body={}", int(RetractResp2.StatusCode), RetractResp2.ToText()));
+}
+
TEST_SUITE_END();
} // namespace zen::tests::compute
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp
index e89812c1f..42632682b 100644
--- a/src/zenserver-test/zenserver-test.cpp
+++ b/src/zenserver-test/zenserver-test.cpp
@@ -15,6 +15,7 @@
# include <zencore/testutils.h>
# include <zencore/thread.h>
# include <zencore/timer.h>
+# include <zencore/trace.h>
# include <zenhttp/httpclient.h>
# include <zenhttp/packageformat.h>
# include <zenutil/config/commandlineoptions.h>
@@ -134,8 +135,7 @@ main(int argc, char** argv)
ZEN_INFO("Running tests...(base dir: '{}')", TestBaseDir);
zen::testing::TestRunner Runner;
- Runner.SetDefaultSuiteFilter("server.*");
- Runner.ApplyCommandLine(argc, argv);
+ Runner.ApplyCommandLine(argc, argv, "server.*");
return Runner.Run();
}