aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver-test/compute-tests.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver-test/compute-tests.cpp')
-rw-r--r--src/zenserver-test/compute-tests.cpp679
1 files changed, 295 insertions, 384 deletions
diff --git a/src/zenserver-test/compute-tests.cpp b/src/zenserver-test/compute-tests.cpp
index 95541c3ce..a4755adec 100644
--- a/src/zenserver-test/compute-tests.cpp
+++ b/src/zenserver-test/compute-tests.cpp
@@ -357,6 +357,41 @@ PollForLsnInCompleted(HttpClient& Client, const std::string& CompletedUrl, int L
return false;
}
+static void
+WaitForActionRunning(zen::compute::ComputeServiceSession& Session, uint64_t TimeoutMs = 10'000)
+{
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < TimeoutMs)
+ {
+ if (Session.GetActionCounts().Running > 0)
+ {
+ return;
+ }
+ Sleep(50);
+ }
+ FAIL("Timed out waiting for action to reach Running state");
+}
+
+static void
+WaitForAnyActionRunningHttp(HttpClient& Client, uint64_t TimeoutMs = 10'000)
+{
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < TimeoutMs)
+ {
+ HttpClient::Response Resp = Client.Get("/jobs/running"sv);
+ if (Resp)
+ {
+ CbObject Obj = Resp.AsObject();
+ if (Obj["running"sv].AsArrayView().Num() > 0)
+ {
+ return;
+ }
+ }
+ Sleep(50);
+ }
+ FAIL("Timed out waiting for any action to reach Running state");
+}
+
static std::string
GetRot13Output(const CbPackage& ResultPackage)
{
@@ -430,8 +465,9 @@ public:
}
// IWebSocketHandler
- void OnWebSocketOpen(Ref<WebSocketConnection> Connection) override
+ void OnWebSocketOpen(Ref<WebSocketConnection> Connection, std::string_view RelativeUri) override
{
+ ZEN_UNUSED(RelativeUri);
m_WsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); });
}
@@ -662,7 +698,7 @@ TEST_CASE("function.workers")
const IoHash WorkerId = RegisterWorker(Client, TestEnv);
- // GET /workers — the registered worker should appear in the listing
+ // GET /workers - the registered worker should appear in the listing
HttpClient::Response ListResp = Client.Get("/workers"sv);
REQUIRE_MESSAGE(ListResp, "Failed to list workers after registration");
@@ -678,7 +714,7 @@ TEST_CASE("function.workers")
REQUIRE_MESSAGE(WorkerFound, fmt::format("Worker {} not found in worker listing", WorkerId.ToHexString()));
- // GET /workers/{worker} — descriptor should match what was registered
+ // GET /workers/{worker} - descriptor should match what was registered
const std::string WorkerUrl = fmt::format("/workers/{}", WorkerId.ToHexString());
HttpClient::Response DescResp = Client.Get(WorkerUrl);
REQUIRE_MESSAGE(DescResp, fmt::format("Failed to get worker descriptor: status={}", DescResp.StatusCode));
@@ -707,7 +743,7 @@ TEST_CASE("function.workers")
CHECK_MESSAGE(Rot13Found, "Rot13 function not found in worker descriptor");
CHECK_MESSAGE(SleepFound, "Sleep function not found in worker descriptor");
- // GET /workers/{unknown} — should return 404
+ // GET /workers/{unknown} - should return 404
const std::string UnknownUrl = fmt::format("/workers/{}", IoHash::Zero.ToHexString());
HttpClient::Response NotFoundResp = Client.Get(UnknownUrl);
CHECK_EQ(NotFoundResp.StatusCode, HttpResponseCode::NotFound);
@@ -838,7 +874,7 @@ TEST_CASE("function.queues.remote")
const IoHash WorkerId = RegisterWorker(Client, TestEnv);
- // Create a remote queue — response includes both an integer queue_id and an OID queue_token
+ // Create a remote queue - response includes both an integer queue_id and an OID queue_token
HttpClient::Response CreateResp = Client.Post("/queues/remote"sv);
REQUIRE_MESSAGE(CreateResp,
fmt::format("Remote queue creation failed: status={}, body={}", CreateResp.StatusCode, CreateResp.ToText()));
@@ -905,10 +941,10 @@ TEST_CASE("function.queues.cancel_running")
REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission");
// Wait for the worker process to start executing before cancelling
- Sleep(1'000);
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ WaitForAnyActionRunningHttp(Client);
// Cancel the queue, which should interrupt the running Sleep job
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
HttpClient::Response CancelResp = Client.Delete(QueueUrl);
REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent,
fmt::format("Queue cancellation failed: status={}, body={}", CancelResp.StatusCode, CancelResp.ToText()));
@@ -960,10 +996,10 @@ TEST_CASE("function.queues.remote_cancel")
REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission");
// Wait for the worker process to start executing before cancelling
- Sleep(1'000);
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueToken);
+ WaitForAnyActionRunningHttp(Client);
// Cancel the queue via its OID token
- const std::string QueueUrl = fmt::format("/queues/{}", QueueToken);
HttpClient::Response CancelResp = Client.Delete(QueueUrl);
REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent,
fmt::format("Remote queue cancellation failed: status={}, body={}", CancelResp.StatusCode, CancelResp.ToText()));
@@ -1078,7 +1114,7 @@ TEST_CASE("function.priority")
REQUIRE_MESSAGE(LowResp3, "Low-priority job 3 submission failed");
const int LsnLow3 = LowResp3.AsObject()["lsn"sv].AsInt32();
- // Submit 1 high-priority Rot13 job — should execute before the low-priority ones
+ // Submit 1 high-priority Rot13 job - should execute before the low-priority ones
const std::string HighJobUrl = fmt::format("/queues/{}/jobs/{}?priority=10", QueueId, WorkerId.ToHexString());
HttpClient::Response HighResp = Client.Post(HighJobUrl, BuildRot13ActionPackage("high"sv));
REQUIRE_MESSAGE(HighResp, "High-priority job submission failed");
@@ -1206,7 +1242,7 @@ TEST_CASE("function.priority")
// that exit with non-zero exit codes, including retry behaviour and
// final failure reporting.
-TEST_CASE("function.exit_code.failed_action")
+TEST_CASE("function.exit_code")
{
ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
Instance.SetDataDir(TestEnv.CreateNewTestDir());
@@ -1218,232 +1254,154 @@ TEST_CASE("function.exit_code.failed_action")
const IoHash WorkerId = RegisterWorker(Client, TestEnv);
- // Create a queue with max_retries=0 so the action fails immediately
- // without being rescheduled.
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 0;
-
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
-
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ auto CreateQueue = [&](int MaxRetries) -> std::pair<int, std::string> {
+ CbObjectWriter ConfigWriter;
+ ConfigWriter << "max_retries"sv << MaxRetries;
+ CbObjectWriter BodyWriter;
+ BodyWriter << "config"sv << ConfigWriter.Save();
+ HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
+ REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ return {QueueId, fmt::format("/queues/{}", QueueId)};
+ };
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ // Scenario 1: failed_action - immediate failure with max_retries=0
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(0);
- // Submit a Fail action with exit code 42
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(42));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(42));
+ REQUIRE_MESSAGE(SubmitResp,
+ fmt::format("Fail job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
- const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
- REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Fail job submission");
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Fail job submission");
- // Poll for the LSN to appear in the completed list
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
- fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
- Lsn,
- QueueId,
- Instance.GetLogOutput()));
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
- // Verify queue status reflects the failure
- HttpClient::Response StatusResp = Client.Get(QueueUrl);
- REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- CbObject QueueStatus = StatusResp.AsObject();
- CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
- // Verify action history records the failure
- const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
- HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
- REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
+ const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
+ HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
+ REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
- bool FoundInHistory = false;
- for (auto& Item : HistoryResp.AsObject()["history"sv])
- {
- if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ bool FoundInHistory = false;
+ for (auto& Item : HistoryResp.AsObject()["history"sv])
{
- CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
- FoundInHistory = true;
- break;
+ if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ {
+ CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
+ FoundInHistory = true;
+ break;
+ }
}
- }
- CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn));
-
- // GET /jobs/{lsn} for a failed action should return OK but with an empty result package
- const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn);
- HttpClient::Response ResultResp = Client.Get(ResultUrl);
- CHECK_EQ(ResultResp.StatusCode, HttpResponseCode::OK);
-}
-
-TEST_CASE("function.exit_code.auto_retry")
-{
- ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
- Instance.SetDataDir(TestEnv.CreateNewTestDir());
- const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
- REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+ CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn));
- const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
- HttpClient Client(ComputeBaseUri);
-
- const IoHash WorkerId = RegisterWorker(Client, TestEnv);
-
- // Create a queue with max_retries=2 so the action is retried twice before
- // being reported as failed (3 total attempts: initial + 2 retries).
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 2;
-
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
+ const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn);
+ HttpClient::Response ResultResp = Client.Get(ResultUrl);
+ CHECK_EQ(ResultResp.StatusCode, HttpResponseCode::OK);
+ }
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ // Scenario 2: auto_retry - retried twice before permanent failure
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(2);
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(1));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode));
- // Submit a Fail action — the worker process will exit with code 1 on
- // every attempt, eventually exhausting retries.
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(1));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode));
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
- const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
+ fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
- // Poll for the LSN to appear in the completed list — this only
- // happens after all retries are exhausted.
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
- fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}",
- Lsn,
- QueueId,
- Instance.GetLogOutput()));
+ const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
+ HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
+ REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
- // Verify the action history records the retry count
- const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
- HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
- REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
-
- for (auto& Item : HistoryResp.AsObject()["history"sv])
- {
- if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ for (auto& Item : HistoryResp.AsObject()["history"sv])
{
- CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
- CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 2);
- break;
+ if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ {
+ CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
+ CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 2);
+ break;
+ }
}
- }
- // Queue should show 1 failed, 0 completed
- HttpClient::Response StatusResp = Client.Get(QueueUrl);
- REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- CbObject QueueStatus = StatusResp.AsObject();
- CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
-}
-
-TEST_CASE("function.exit_code.reschedule_failed")
-{
- ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
- Instance.SetDataDir(TestEnv.CreateNewTestDir());
- const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
- REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
-
- const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
- HttpClient Client(ComputeBaseUri);
-
- const IoHash WorkerId = RegisterWorker(Client, TestEnv);
-
- // Create a queue with max_retries=1 so we have room for one manual reschedule
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 1;
-
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
-
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
-
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
-
- // Submit a Fail action — auto-retry will fire once, then it lands in results as Failed
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(7));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode));
-
- const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
-
- // Wait for the action to exhaust its auto-retry and land in completed
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
- fmt::format("LSN {} did not appear in queue completed list\nServer log:\n{}", Lsn, Instance.GetLogOutput()));
-
- // Try to manually reschedule — should fail because retry limit is reached
- const std::string RescheduleUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn);
- HttpClient::Response RescheduleResp = Client.Post(RescheduleUrl);
- CHECK_EQ(RescheduleResp.StatusCode, HttpResponseCode::Conflict);
-}
-
-TEST_CASE("function.exit_code.mixed_success_and_failure")
-{
- ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
- Instance.SetDataDir(TestEnv.CreateNewTestDir());
- const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
- REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+ }
- const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
- HttpClient Client(ComputeBaseUri);
+ // Scenario 3: reschedule_failed - manual reschedule rejected after retry limit
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(1);
- const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(7));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode));
- // Create a queue with max_retries=0 for fast failure
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 0;
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
+ fmt::format("LSN {} did not appear in queue completed list\nServer log:\n{}", Lsn, Instance.GetLogOutput()));
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ const std::string RescheduleUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn);
+ HttpClient::Response RescheduleResp = Client.Post(RescheduleUrl);
+ CHECK_EQ(RescheduleResp.StatusCode, HttpResponseCode::Conflict);
+ }
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ // Scenario 4: mixed_success_and_failure - one success and one failure in the same queue
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(0);
- // Submit one Rot13 (success) and one Fail (failure)
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SuccessResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello"sv));
- REQUIRE_MESSAGE(SuccessResp, "Rot13 job submission failed");
- const int LsnSuccess = SuccessResp.AsObject()["lsn"sv].AsInt32();
+ HttpClient::Response SuccessResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello"sv));
+ REQUIRE_MESSAGE(SuccessResp, "Rot13 job submission failed");
+ const int LsnSuccess = SuccessResp.AsObject()["lsn"sv].AsInt32();
- HttpClient::Response FailResp = Client.Post(JobUrl, BuildFailActionPackage(1));
- REQUIRE_MESSAGE(FailResp, "Fail job submission failed");
- const int LsnFail = FailResp.AsObject()["lsn"sv].AsInt32();
+ HttpClient::Response FailResp = Client.Post(JobUrl, BuildFailActionPackage(1));
+ REQUIRE_MESSAGE(FailResp, "Fail job submission failed");
+ const int LsnFail = FailResp.AsObject()["lsn"sv].AsInt32();
- // Wait for both to appear in the completed list
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnSuccess),
- fmt::format("Success LSN {} did not complete\nServer log:\n{}", LsnSuccess, Instance.GetLogOutput()));
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnFail),
- fmt::format("Fail LSN {} did not complete\nServer log:\n{}", LsnFail, Instance.GetLogOutput()));
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnSuccess),
+ fmt::format("Success LSN {} did not complete\nServer log:\n{}", LsnSuccess, Instance.GetLogOutput()));
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnFail),
+ fmt::format("Fail LSN {} did not complete\nServer log:\n{}", LsnFail, Instance.GetLogOutput()));
- // Verify queue counters
- HttpClient::Response StatusResp = Client.Get(QueueUrl);
- REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- CbObject QueueStatus = StatusResp.AsObject();
- CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0);
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0);
+ }
}
-TEST_CASE("function.crash.abort")
+TEST_CASE("function.crash")
{
ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
Instance.SetDataDir(TestEnv.CreateNewTestDir());
@@ -1455,174 +1413,125 @@ TEST_CASE("function.crash.abort")
const IoHash WorkerId = RegisterWorker(Client, TestEnv);
- // Create a queue with max_retries=0 so we don't wait through retries
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 0;
+ auto CreateQueue = [&](int MaxRetries) -> std::pair<int, std::string> {
+ CbObjectWriter ConfigWriter;
+ ConfigWriter << "max_retries"sv << MaxRetries;
+ CbObjectWriter BodyWriter;
+ BodyWriter << "config"sv << ConfigWriter.Save();
+ HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
+ REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ return {QueueId, fmt::format("/queues/{}", QueueId)};
+ };
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
+ // Scenario 1: abort - worker process calls std::abort(), no retries
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(0);
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv));
+ REQUIRE_MESSAGE(SubmitResp,
+ fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission");
- // Submit a Crash action that calls std::abort()
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
- const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
- REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission");
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- // Poll for the LSN to appear in the completed list
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
- fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
- Lsn,
- QueueId,
- Instance.GetLogOutput()));
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
- // Verify queue status reflects the failure
- HttpClient::Response StatusResp = Client.Get(QueueUrl);
- REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
-
- CbObject QueueStatus = StatusResp.AsObject();
- CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
-
- // Verify action history records the failure
- const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
- HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
- REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
+ const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
+ HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
+ REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
- bool FoundInHistory = false;
- for (auto& Item : HistoryResp.AsObject()["history"sv])
- {
- if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ bool FoundInHistory = false;
+ for (auto& Item : HistoryResp.AsObject()["history"sv])
{
- CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
- FoundInHistory = true;
- break;
+ if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ {
+ CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
+ FoundInHistory = true;
+ break;
+ }
}
+ CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn));
}
- CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn));
-}
-
-TEST_CASE("function.crash.nullptr")
-{
- ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
- Instance.SetDataDir(TestEnv.CreateNewTestDir());
- const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
- REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
-
- const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
- HttpClient Client(ComputeBaseUri);
-
- const IoHash WorkerId = RegisterWorker(Client, TestEnv);
-
- // Create a queue with max_retries=0
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 0;
-
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
-
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
-
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
-
- // Submit a Crash action that dereferences a null pointer
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("nullptr"sv));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
-
- const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
- REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission");
-
- // Poll for the LSN to appear in the completed list
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
- fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
- Lsn,
- QueueId,
- Instance.GetLogOutput()));
-
- // Verify queue status reflects the failure
- HttpClient::Response StatusResp = Client.Get(QueueUrl);
- REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- CbObject QueueStatus = StatusResp.AsObject();
- CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
-}
-
-TEST_CASE("function.crash.auto_retry")
-{
- ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
- Instance.SetDataDir(TestEnv.CreateNewTestDir());
- const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
- REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+ // Scenario 2: nullptr - worker process dereferences null, no retries
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(0);
- const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
- HttpClient Client(ComputeBaseUri);
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("nullptr"sv));
+ REQUIRE_MESSAGE(SubmitResp,
+ fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
- const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission");
- // Create a queue with max_retries=1 — the crash should be retried once
- // before being reported as permanently failed.
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 1;
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+ }
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ // Scenario 3: auto_retry - crash retried once before permanent failure
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(1);
- // Submit a Crash action — will crash on every attempt
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}", SubmitResp.StatusCode));
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}", SubmitResp.StatusCode));
- const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
- // Poll for the LSN to appear in the completed list after retries exhaust
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
- fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}",
- Lsn,
- QueueId,
- Instance.GetLogOutput()));
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
+ fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
- // Verify the action history records the retry count
- const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
- HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
- REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
+ const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
+ HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
+ REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
- for (auto& Item : HistoryResp.AsObject()["history"sv])
- {
- if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ for (auto& Item : HistoryResp.AsObject()["history"sv])
{
- CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
- CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 1);
- break;
+ if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ {
+ CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
+ CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 1);
+ break;
+ }
}
- }
- // Queue should show 1 failed, 0 completed
- HttpClient::Response StatusResp = Client.Get(QueueUrl);
- REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- CbObject QueueStatus = StatusResp.AsObject();
- CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+ }
}
//////////////////////////////////////////////////////////////////////////
@@ -1661,7 +1570,6 @@ TEST_CASE("function.remote.worker_sync_on_discovery")
// Trigger immediate orchestrator re-query and wait for runner setup
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Submit Rot13 action via session
CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver);
@@ -1720,7 +1628,6 @@ TEST_CASE("function.remote.late_runner_discovery")
// Wait for W1 discovery
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Baseline: submit Rot13 action and verify it completes on W1
{
@@ -1762,27 +1669,37 @@ TEST_CASE("function.remote.late_runner_discovery")
// Wait for W2 discovery
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
- // Verify W2 received the worker by querying its /compute/workers endpoint directly
+ // Poll W2 until the worker has been synced (SyncWorkersToRunner is async)
{
- const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port2);
- HttpClient Client(ComputeBaseUri);
- HttpClient::Response ListResp = Client.Get("/workers"sv);
- REQUIRE_MESSAGE(ListResp, "Failed to list workers on W2");
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port2);
+ HttpClient Client(ComputeBaseUri);
+ bool WorkerFound = false;
+ Stopwatch Timer;
- bool WorkerFound = false;
- for (auto& Item : ListResp.AsObject()["workers"sv])
+ while (Timer.GetElapsedTimeMs() < 10'000)
{
- if (Item.AsHash() == WorkerPackage.GetObjectHash())
+ HttpClient::Response ListResp = Client.Get("/workers"sv);
+ if (ListResp)
+ {
+ for (auto& Item : ListResp.AsObject()["workers"sv])
+ {
+ if (Item.AsHash() == WorkerPackage.GetObjectHash())
+ {
+ WorkerFound = true;
+ break;
+ }
+ }
+ }
+ if (WorkerFound)
{
- WorkerFound = true;
break;
}
+ Sleep(50);
}
REQUIRE_MESSAGE(WorkerFound,
- fmt::format("Worker not found on W2 after discovery — SyncWorkersToRunner may have failed\nW2 log:\n{}",
+ fmt::format("Worker not found on W2 after discovery - SyncWorkersToRunner may have failed\nW2 log:\n{}",
Instance2.GetLogOutput()));
}
@@ -1843,7 +1760,6 @@ TEST_CASE("function.remote.queue_association")
// Wait for scheduler to discover the runner
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Create a local queue and submit action to it
auto QueueResult = Session.CreateQueue();
@@ -1921,7 +1837,6 @@ TEST_CASE("function.remote.queue_cancel_propagation")
// Wait for scheduler to discover the runner
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Create a local queue and submit a long-running Sleep action
auto QueueResult = Session.CreateQueue();
@@ -1934,9 +1849,9 @@ TEST_CASE("function.remote.queue_cancel_propagation")
REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed");
// Wait for the action to start running on the remote
- Sleep(2'000);
+ WaitForActionRunning(Session);
- // Cancel the local queue — this should propagate to the remote
+ // Cancel the local queue - this should propagate to the remote
Session.CancelQueue(QueueId);
// Poll for the action to complete (as cancelled)
@@ -2007,7 +1922,7 @@ TEST_CASE("function.abandon_running_http")
REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN");
// Wait for the process to start running
- Sleep(1'000);
+ WaitForAnyActionRunningHttp(Client);
// Verify the ready endpoint returns OK before abandon
{
@@ -2049,7 +1964,7 @@ TEST_CASE("function.abandon_running_http")
CHECK_MESSAGE(RejectedResp.StatusCode != HttpResponseCode::OK, "Expected action submission to be rejected in Abandoned state");
}
-TEST_CASE("function.session.abandon_pending")
+TEST_CASE("function.session.abandon_pending" * doctest::skip())
{
// Create a session with no runners so actions stay pending
InMemoryChunkResolver Resolver;
@@ -2060,7 +1975,7 @@ TEST_CASE("function.session.abandon_pending")
CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
Session.RegisterWorker(WorkerPackage);
- // Enqueue several actions — they will stay pending because there are no runners
+ // Enqueue several actions - they will stay pending because there are no runners
auto QueueResult = Session.CreateQueue();
REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create queue");
@@ -2073,7 +1988,7 @@ TEST_CASE("function.session.abandon_pending")
REQUIRE_MESSAGE(Enqueue2, "Failed to enqueue action 2");
REQUIRE_MESSAGE(Enqueue3, "Failed to enqueue action 3");
- // Transition to Abandoned — should mark all pending actions as Abandoned
+ // Transition to Abandoned - should mark all pending actions as Abandoned
bool Transitioned = Session.Abandon();
CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned");
CHECK(Session.GetSessionState() == zen::compute::ComputeServiceSession::SessionState::Abandoned);
@@ -2109,7 +2024,7 @@ TEST_CASE("function.session.abandon_pending")
auto Rejected = Session.EnqueueActionToQueue(QueueResult.QueueId, ActionObj, 0);
CHECK_MESSAGE(!Rejected, "Expected action submission to be rejected in Abandoned state");
- // Abandoned → Sunset should be valid
+ // Abandoned -> Sunset should be valid
CHECK(Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Sunset));
Session.Shutdown();
@@ -2138,7 +2053,6 @@ TEST_CASE("function.session.abandon_running")
// Wait for scheduler to discover the runner
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Create a queue and submit a long-running Sleep action
auto QueueResult = Session.CreateQueue();
@@ -2151,9 +2065,9 @@ TEST_CASE("function.session.abandon_running")
REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed");
// Wait for the action to start running on the remote
- Sleep(2'000);
+ WaitForActionRunning(Session);
- // Transition to Abandoned — should abandon the running action
+ // Transition to Abandoned - should abandon the running action
bool Transitioned = Session.Abandon();
CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned");
CHECK(!Session.IsHealthy());
@@ -2209,7 +2123,6 @@ TEST_CASE("function.remote.abandon_propagation")
// Wait for scheduler to discover the runner
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Create a local queue and submit a long-running Sleep action
auto QueueResult = Session.CreateQueue();
@@ -2222,9 +2135,9 @@ TEST_CASE("function.remote.abandon_propagation")
REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed");
// Wait for the action to start running on the remote
- Sleep(2'000);
+ WaitForActionRunning(Session);
- // Transition to Abandoned — should abandon the running action and propagate
+ // Transition to Abandoned - should abandon the running action and propagate
bool Transitioned = Session.Abandon();
CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned");
@@ -2282,7 +2195,6 @@ TEST_CASE("function.remote.shutdown_cancels_queues")
Session.RegisterWorker(WorkerPackage);
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Create a queue and submit a long-running action so the remote queue is established
auto QueueResult = Session.CreateQueue();
@@ -2295,7 +2207,7 @@ TEST_CASE("function.remote.shutdown_cancels_queues")
REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed");
// Wait for the action to start running on the remote
- Sleep(2'000);
+ WaitForActionRunning(Session);
// Verify the remote has a non-implicit queue before shutdown
HttpClient RemoteClient(Instance.GetBaseUri() + "/compute");
@@ -2315,7 +2227,7 @@ TEST_CASE("function.remote.shutdown_cancels_queues")
REQUIRE_MESSAGE(RemoteQueueFound, "Expected remote queue to exist before shutdown");
}
- // Shut down the session — this should cancel all remote queues
+ // Shut down the session - this should cancel all remote queues
Session.Shutdown();
// Verify the remote queue is now cancelled
@@ -2357,7 +2269,6 @@ TEST_CASE("function.remote.shutdown_rejects_new_work")
// Wait for runner discovery
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Baseline: submit an action and verify it completes
{
@@ -2385,7 +2296,7 @@ TEST_CASE("function.remote.shutdown_rejects_new_work")
CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv);
}
- // Shut down — the remote runner should now reject new work
+ // Shut down - the remote runner should now reject new work
Session.Shutdown();
// Attempting to enqueue after shutdown should fail (session is in Sunset state)
@@ -2414,7 +2325,7 @@ TEST_CASE("function.session.retract_pending")
REQUIRE_MESSAGE(Enqueued, "Failed to enqueue action");
// Let the scheduler process the pending action
- Sleep(500);
+ Sleep(50);
// Retract the pending action
auto Result = Session.RetractAction(Enqueued.Lsn);
@@ -2423,7 +2334,7 @@ TEST_CASE("function.session.retract_pending")
// The action should be re-enqueued as pending (still no runners, so stays pending).
// Let the scheduler process the retracted action back to pending.
- Sleep(500);
+ Sleep(50);
// Queue should still show 1 active (the action was rescheduled, not completed)
auto Status = Session.GetQueueStatus(QueueResult.QueueId);
@@ -2475,7 +2386,7 @@ TEST_CASE("function.session.retract_not_terminal")
REQUIRE_MESSAGE(Code == HttpResponseCode::OK, "Action did not complete within timeout");
- // Retract should fail — action already completed (no longer in pending/running maps)
+ // Retract should fail - action already completed (no longer in pending/running maps)
auto RetractResult = Session.RetractAction(Enqueued.Lsn);
CHECK(!RetractResult.Success);
@@ -2501,15 +2412,15 @@ TEST_CASE("function.retract_http")
HttpClient::Response BlockerResp = Client.Post(BlockerUrl, BuildSleepActionPackage("data"sv, 30'000));
REQUIRE_MESSAGE(BlockerResp, fmt::format("Blocker submission failed: status={}", BlockerResp.StatusCode));
- // Submit a second action — it will stay pending because the slot is occupied
+ // Submit a second action - it will stay pending because the slot is occupied
HttpClient::Response SubmitResp = Client.Post(BlockerUrl, BuildRot13ActionPackage("Retract HTTP Test"sv));
REQUIRE_MESSAGE(SubmitResp, fmt::format("Job submission failed: status={}", SubmitResp.StatusCode));
const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from job submission");
- // Wait for the scheduler to process the pending action into m_PendingActions
- Sleep(1'000);
+ // Wait for the blocker action to start running (occupying the single slot)
+ WaitForAnyActionRunningHttp(Client);
// Retract the pending action via POST /jobs/{lsn}/retract
const std::string RetractUrl = fmt::format("/jobs/{}/retract", Lsn);
@@ -2528,7 +2439,7 @@ TEST_CASE("function.retract_http")
}
// A second retract should also succeed (action is back to pending)
- Sleep(500);
+ Sleep(50);
HttpClient::Response RetractResp2 = Client.Post(RetractUrl);
CHECK_MESSAGE(RetractResp2.StatusCode == HttpResponseCode::OK,
fmt::format("Second retract failed: status={}, body={}", RetractResp2.StatusCode, RetractResp2.ToText()));
@@ -2553,13 +2464,13 @@ TEST_CASE("function.session.immediate_query_after_enqueue")
auto EnqueueRes = Session.EnqueueAction(ActionObj, 0);
REQUIRE_MESSAGE(EnqueueRes, "Failed to enqueue action");
- // Query by LSN immediately — must not return NotFound
+ // Query by LSN immediately - must not return NotFound
CbPackage Result;
HttpResponseCode Code = Session.GetActionResult(EnqueueRes.Lsn, Result);
CHECK_MESSAGE(Code == HttpResponseCode::Accepted,
fmt::format("GetActionResult returned {} immediately after enqueue, expected Accepted", Code));
- // Query by ActionId immediately — must not return NotFound
+ // Query by ActionId immediately - must not return NotFound
const IoHash ActionId = ActionObj.GetHash();
CbPackage FindResult;
HttpResponseCode FindCode = Session.FindActionResult(ActionId, FindResult);