aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver-test/compute-tests.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver-test/compute-tests.cpp')
-rw-r--r--src/zenserver-test/compute-tests.cpp638
1 files changed, 274 insertions, 364 deletions
diff --git a/src/zenserver-test/compute-tests.cpp b/src/zenserver-test/compute-tests.cpp
index 835d72713..ce2db366a 100644
--- a/src/zenserver-test/compute-tests.cpp
+++ b/src/zenserver-test/compute-tests.cpp
@@ -357,6 +357,41 @@ PollForLsnInCompleted(HttpClient& Client, const std::string& CompletedUrl, int L
return false;
}
+static void
+WaitForActionRunning(zen::compute::ComputeServiceSession& Session, uint64_t TimeoutMs = 10'000)
+{
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < TimeoutMs)
+ {
+ if (Session.GetActionCounts().Running > 0)
+ {
+ return;
+ }
+ Sleep(50);
+ }
+ FAIL("Timed out waiting for action to reach Running state");
+}
+
+static void
+WaitForAnyActionRunningHttp(HttpClient& Client, uint64_t TimeoutMs = 10'000)
+{
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < TimeoutMs)
+ {
+ HttpClient::Response Resp = Client.Get("/jobs/running"sv);
+ if (Resp)
+ {
+ CbObject Obj = Resp.AsObject();
+ if (Obj["running"sv].AsArrayView().Num() > 0)
+ {
+ return;
+ }
+ }
+ Sleep(50);
+ }
+ FAIL("Timed out waiting for any action to reach Running state");
+}
+
static std::string
GetRot13Output(const CbPackage& ResultPackage)
{
@@ -906,10 +941,10 @@ TEST_CASE("function.queues.cancel_running")
REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission");
// Wait for the worker process to start executing before cancelling
- Sleep(1'000);
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ WaitForAnyActionRunningHttp(Client);
// Cancel the queue, which should interrupt the running Sleep job
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
HttpClient::Response CancelResp = Client.Delete(QueueUrl);
REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent,
fmt::format("Queue cancellation failed: status={}, body={}", CancelResp.StatusCode, CancelResp.ToText()));
@@ -961,10 +996,10 @@ TEST_CASE("function.queues.remote_cancel")
REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission");
// Wait for the worker process to start executing before cancelling
- Sleep(1'000);
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueToken);
+ WaitForAnyActionRunningHttp(Client);
// Cancel the queue via its OID token
- const std::string QueueUrl = fmt::format("/queues/{}", QueueToken);
HttpClient::Response CancelResp = Client.Delete(QueueUrl);
REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent,
fmt::format("Remote queue cancellation failed: status={}, body={}", CancelResp.StatusCode, CancelResp.ToText()));
@@ -1207,7 +1242,7 @@ TEST_CASE("function.priority")
// that exit with non-zero exit codes, including retry behaviour and
// final failure reporting.
-TEST_CASE("function.exit_code.failed_action")
+TEST_CASE("function.exit_code")
{
ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
Instance.SetDataDir(TestEnv.CreateNewTestDir());
@@ -1219,232 +1254,154 @@ TEST_CASE("function.exit_code.failed_action")
const IoHash WorkerId = RegisterWorker(Client, TestEnv);
- // Create a queue with max_retries=0 so the action fails immediately
- // without being rescheduled.
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 0;
-
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
-
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ auto CreateQueue = [&](int MaxRetries) -> std::pair<int, std::string> {
+ CbObjectWriter ConfigWriter;
+ ConfigWriter << "max_retries"sv << MaxRetries;
+ CbObjectWriter BodyWriter;
+ BodyWriter << "config"sv << ConfigWriter.Save();
+ HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
+ REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ return {QueueId, fmt::format("/queues/{}", QueueId)};
+ };
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ // Scenario 1: failed_action - immediate failure with max_retries=0
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(0);
- // Submit a Fail action with exit code 42
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(42));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(42));
+ REQUIRE_MESSAGE(SubmitResp,
+ fmt::format("Fail job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
- const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
- REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Fail job submission");
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Fail job submission");
- // Poll for the LSN to appear in the completed list
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
- fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
- Lsn,
- QueueId,
- Instance.GetLogOutput()));
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
- // Verify queue status reflects the failure
- HttpClient::Response StatusResp = Client.Get(QueueUrl);
- REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- CbObject QueueStatus = StatusResp.AsObject();
- CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
- // Verify action history records the failure
- const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
- HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
- REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
+ const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
+ HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
+ REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
- bool FoundInHistory = false;
- for (auto& Item : HistoryResp.AsObject()["history"sv])
- {
- if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ bool FoundInHistory = false;
+ for (auto& Item : HistoryResp.AsObject()["history"sv])
{
- CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
- FoundInHistory = true;
- break;
+ if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ {
+ CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
+ FoundInHistory = true;
+ break;
+ }
}
- }
- CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn));
-
- // GET /jobs/{lsn} for a failed action should return OK but with an empty result package
- const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn);
- HttpClient::Response ResultResp = Client.Get(ResultUrl);
- CHECK_EQ(ResultResp.StatusCode, HttpResponseCode::OK);
-}
-
-TEST_CASE("function.exit_code.auto_retry")
-{
- ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
- Instance.SetDataDir(TestEnv.CreateNewTestDir());
- const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
- REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+ CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn));
- const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
- HttpClient Client(ComputeBaseUri);
-
- const IoHash WorkerId = RegisterWorker(Client, TestEnv);
-
- // Create a queue with max_retries=2 so the action is retried twice before
- // being reported as failed (3 total attempts: initial + 2 retries).
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 2;
-
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
+ const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn);
+ HttpClient::Response ResultResp = Client.Get(ResultUrl);
+ CHECK_EQ(ResultResp.StatusCode, HttpResponseCode::OK);
+ }
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ // Scenario 2: auto_retry - retried twice before permanent failure
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(2);
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(1));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode));
- // Submit a Fail action — the worker process will exit with code 1 on
- // every attempt, eventually exhausting retries.
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(1));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode));
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
- const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
+ fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
- // Poll for the LSN to appear in the completed list — this only
- // happens after all retries are exhausted.
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
- fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}",
- Lsn,
- QueueId,
- Instance.GetLogOutput()));
+ const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
+ HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
+ REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
- // Verify the action history records the retry count
- const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
- HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
- REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
-
- for (auto& Item : HistoryResp.AsObject()["history"sv])
- {
- if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ for (auto& Item : HistoryResp.AsObject()["history"sv])
{
- CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
- CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 2);
- break;
+ if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ {
+ CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
+ CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 2);
+ break;
+ }
}
- }
- // Queue should show 1 failed, 0 completed
- HttpClient::Response StatusResp = Client.Get(QueueUrl);
- REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- CbObject QueueStatus = StatusResp.AsObject();
- CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
-}
-
-TEST_CASE("function.exit_code.reschedule_failed")
-{
- ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
- Instance.SetDataDir(TestEnv.CreateNewTestDir());
- const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
- REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
-
- const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
- HttpClient Client(ComputeBaseUri);
-
- const IoHash WorkerId = RegisterWorker(Client, TestEnv);
-
- // Create a queue with max_retries=1 so we have room for one manual reschedule
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 1;
-
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
-
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
-
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
-
- // Submit a Fail action — auto-retry will fire once, then it lands in results as Failed
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(7));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode));
-
- const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
-
- // Wait for the action to exhaust its auto-retry and land in completed
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
- fmt::format("LSN {} did not appear in queue completed list\nServer log:\n{}", Lsn, Instance.GetLogOutput()));
-
- // Try to manually reschedule — should fail because retry limit is reached
- const std::string RescheduleUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn);
- HttpClient::Response RescheduleResp = Client.Post(RescheduleUrl);
- CHECK_EQ(RescheduleResp.StatusCode, HttpResponseCode::Conflict);
-}
-
-TEST_CASE("function.exit_code.mixed_success_and_failure")
-{
- ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
- Instance.SetDataDir(TestEnv.CreateNewTestDir());
- const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
- REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+ }
- const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
- HttpClient Client(ComputeBaseUri);
+ // Scenario 3: reschedule_failed - manual reschedule rejected after retry limit
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(1);
- const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(7));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode));
- // Create a queue with max_retries=0 for fast failure
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 0;
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
+ fmt::format("LSN {} did not appear in queue completed list\nServer log:\n{}", Lsn, Instance.GetLogOutput()));
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ const std::string RescheduleUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn);
+ HttpClient::Response RescheduleResp = Client.Post(RescheduleUrl);
+ CHECK_EQ(RescheduleResp.StatusCode, HttpResponseCode::Conflict);
+ }
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ // Scenario 4: mixed_success_and_failure - one success and one failure in the same queue
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(0);
- // Submit one Rot13 (success) and one Fail (failure)
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SuccessResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello"sv));
- REQUIRE_MESSAGE(SuccessResp, "Rot13 job submission failed");
- const int LsnSuccess = SuccessResp.AsObject()["lsn"sv].AsInt32();
+ HttpClient::Response SuccessResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello"sv));
+ REQUIRE_MESSAGE(SuccessResp, "Rot13 job submission failed");
+ const int LsnSuccess = SuccessResp.AsObject()["lsn"sv].AsInt32();
- HttpClient::Response FailResp = Client.Post(JobUrl, BuildFailActionPackage(1));
- REQUIRE_MESSAGE(FailResp, "Fail job submission failed");
- const int LsnFail = FailResp.AsObject()["lsn"sv].AsInt32();
+ HttpClient::Response FailResp = Client.Post(JobUrl, BuildFailActionPackage(1));
+ REQUIRE_MESSAGE(FailResp, "Fail job submission failed");
+ const int LsnFail = FailResp.AsObject()["lsn"sv].AsInt32();
- // Wait for both to appear in the completed list
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnSuccess),
- fmt::format("Success LSN {} did not complete\nServer log:\n{}", LsnSuccess, Instance.GetLogOutput()));
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnFail),
- fmt::format("Fail LSN {} did not complete\nServer log:\n{}", LsnFail, Instance.GetLogOutput()));
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnSuccess),
+ fmt::format("Success LSN {} did not complete\nServer log:\n{}", LsnSuccess, Instance.GetLogOutput()));
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnFail),
+ fmt::format("Fail LSN {} did not complete\nServer log:\n{}", LsnFail, Instance.GetLogOutput()));
- // Verify queue counters
- HttpClient::Response StatusResp = Client.Get(QueueUrl);
- REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- CbObject QueueStatus = StatusResp.AsObject();
- CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0);
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0);
+ }
}
-TEST_CASE("function.crash.abort")
+TEST_CASE("function.crash")
{
ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
Instance.SetDataDir(TestEnv.CreateNewTestDir());
@@ -1456,174 +1413,125 @@ TEST_CASE("function.crash.abort")
const IoHash WorkerId = RegisterWorker(Client, TestEnv);
- // Create a queue with max_retries=0 so we don't wait through retries
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 0;
+ auto CreateQueue = [&](int MaxRetries) -> std::pair<int, std::string> {
+ CbObjectWriter ConfigWriter;
+ ConfigWriter << "max_retries"sv << MaxRetries;
+ CbObjectWriter BodyWriter;
+ BodyWriter << "config"sv << ConfigWriter.Save();
+ HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
+ REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ return {QueueId, fmt::format("/queues/{}", QueueId)};
+ };
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
+ // Scenario 1: abort - worker process calls std::abort(), no retries
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(0);
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv));
+ REQUIRE_MESSAGE(SubmitResp,
+ fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission");
- // Submit a Crash action that calls std::abort()
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
- const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
- REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission");
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- // Poll for the LSN to appear in the completed list
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
- fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
- Lsn,
- QueueId,
- Instance.GetLogOutput()));
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
- // Verify queue status reflects the failure
- HttpClient::Response StatusResp = Client.Get(QueueUrl);
- REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
-
- CbObject QueueStatus = StatusResp.AsObject();
- CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
-
- // Verify action history records the failure
- const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
- HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
- REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
+ const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
+ HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
+ REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
- bool FoundInHistory = false;
- for (auto& Item : HistoryResp.AsObject()["history"sv])
- {
- if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ bool FoundInHistory = false;
+ for (auto& Item : HistoryResp.AsObject()["history"sv])
{
- CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
- FoundInHistory = true;
- break;
+ if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ {
+ CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
+ FoundInHistory = true;
+ break;
+ }
}
+ CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn));
}
- CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn));
-}
-
-TEST_CASE("function.crash.nullptr")
-{
- ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
- Instance.SetDataDir(TestEnv.CreateNewTestDir());
- const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
- REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
-
- const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
- HttpClient Client(ComputeBaseUri);
-
- const IoHash WorkerId = RegisterWorker(Client, TestEnv);
-
- // Create a queue with max_retries=0
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 0;
-
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
-
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
-
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
-
- // Submit a Crash action that dereferences a null pointer
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("nullptr"sv));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
-
- const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
- REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission");
-
- // Poll for the LSN to appear in the completed list
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
- fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
- Lsn,
- QueueId,
- Instance.GetLogOutput()));
-
- // Verify queue status reflects the failure
- HttpClient::Response StatusResp = Client.Get(QueueUrl);
- REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- CbObject QueueStatus = StatusResp.AsObject();
- CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
-}
-
-TEST_CASE("function.crash.auto_retry")
-{
- ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
- Instance.SetDataDir(TestEnv.CreateNewTestDir());
- const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
- REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+ // Scenario 2: nullptr - worker process dereferences null, no retries
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(0);
- const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
- HttpClient Client(ComputeBaseUri);
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("nullptr"sv));
+ REQUIRE_MESSAGE(SubmitResp,
+ fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
- const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission");
- // Create a queue with max_retries=1 — the crash should be retried once
- // before being reported as permanently failed.
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 1;
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+ }
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ // Scenario 3: auto_retry - crash retried once before permanent failure
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(1);
- // Submit a Crash action — will crash on every attempt
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}", SubmitResp.StatusCode));
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}", SubmitResp.StatusCode));
- const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
- // Poll for the LSN to appear in the completed list after retries exhaust
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
- fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}",
- Lsn,
- QueueId,
- Instance.GetLogOutput()));
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
+ fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
- // Verify the action history records the retry count
- const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
- HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
- REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
+ const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
+ HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
+ REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
- for (auto& Item : HistoryResp.AsObject()["history"sv])
- {
- if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ for (auto& Item : HistoryResp.AsObject()["history"sv])
{
- CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
- CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 1);
- break;
+ if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ {
+ CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
+ CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 1);
+ break;
+ }
}
- }
- // Queue should show 1 failed, 0 completed
- HttpClient::Response StatusResp = Client.Get(QueueUrl);
- REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- CbObject QueueStatus = StatusResp.AsObject();
- CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+ }
}
//////////////////////////////////////////////////////////////////////////
@@ -1662,7 +1570,6 @@ TEST_CASE("function.remote.worker_sync_on_discovery")
// Trigger immediate orchestrator re-query and wait for runner setup
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Submit Rot13 action via session
CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver);
@@ -1721,7 +1628,6 @@ TEST_CASE("function.remote.late_runner_discovery")
// Wait for W1 discovery
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Baseline: submit Rot13 action and verify it completes on W1
{
@@ -1763,23 +1669,33 @@ TEST_CASE("function.remote.late_runner_discovery")
// Wait for W2 discovery
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
- // Verify W2 received the worker by querying its /compute/workers endpoint directly
+ // Poll W2 until the worker has been synced (SyncWorkersToRunner is async)
{
- const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port2);
- HttpClient Client(ComputeBaseUri);
- HttpClient::Response ListResp = Client.Get("/workers"sv);
- REQUIRE_MESSAGE(ListResp, "Failed to list workers on W2");
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port2);
+ HttpClient Client(ComputeBaseUri);
+ bool WorkerFound = false;
+ Stopwatch Timer;
- bool WorkerFound = false;
- for (auto& Item : ListResp.AsObject()["workers"sv])
+ while (Timer.GetElapsedTimeMs() < 10'000)
{
- if (Item.AsHash() == WorkerPackage.GetObjectHash())
+ HttpClient::Response ListResp = Client.Get("/workers"sv);
+ if (ListResp)
+ {
+ for (auto& Item : ListResp.AsObject()["workers"sv])
+ {
+ if (Item.AsHash() == WorkerPackage.GetObjectHash())
+ {
+ WorkerFound = true;
+ break;
+ }
+ }
+ }
+ if (WorkerFound)
{
- WorkerFound = true;
break;
}
+ Sleep(50);
}
REQUIRE_MESSAGE(WorkerFound,
@@ -1844,7 +1760,6 @@ TEST_CASE("function.remote.queue_association")
// Wait for scheduler to discover the runner
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Create a local queue and submit action to it
auto QueueResult = Session.CreateQueue();
@@ -1922,7 +1837,6 @@ TEST_CASE("function.remote.queue_cancel_propagation")
// Wait for scheduler to discover the runner
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Create a local queue and submit a long-running Sleep action
auto QueueResult = Session.CreateQueue();
@@ -1935,7 +1849,7 @@ TEST_CASE("function.remote.queue_cancel_propagation")
REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed");
// Wait for the action to start running on the remote
- Sleep(2'000);
+ WaitForActionRunning(Session);
// Cancel the local queue — this should propagate to the remote
Session.CancelQueue(QueueId);
@@ -2008,7 +1922,7 @@ TEST_CASE("function.abandon_running_http")
REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN");
// Wait for the process to start running
- Sleep(1'000);
+ WaitForAnyActionRunningHttp(Client);
// Verify the ready endpoint returns OK before abandon
{
@@ -2139,7 +2053,6 @@ TEST_CASE("function.session.abandon_running")
// Wait for scheduler to discover the runner
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Create a queue and submit a long-running Sleep action
auto QueueResult = Session.CreateQueue();
@@ -2152,7 +2065,7 @@ TEST_CASE("function.session.abandon_running")
REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed");
// Wait for the action to start running on the remote
- Sleep(2'000);
+ WaitForActionRunning(Session);
// Transition to Abandoned — should abandon the running action
bool Transitioned = Session.Abandon();
@@ -2210,7 +2123,6 @@ TEST_CASE("function.remote.abandon_propagation")
// Wait for scheduler to discover the runner
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Create a local queue and submit a long-running Sleep action
auto QueueResult = Session.CreateQueue();
@@ -2223,7 +2135,7 @@ TEST_CASE("function.remote.abandon_propagation")
REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed");
// Wait for the action to start running on the remote
- Sleep(2'000);
+ WaitForActionRunning(Session);
// Transition to Abandoned — should abandon the running action and propagate
bool Transitioned = Session.Abandon();
@@ -2283,7 +2195,6 @@ TEST_CASE("function.remote.shutdown_cancels_queues")
Session.RegisterWorker(WorkerPackage);
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Create a queue and submit a long-running action so the remote queue is established
auto QueueResult = Session.CreateQueue();
@@ -2296,7 +2207,7 @@ TEST_CASE("function.remote.shutdown_cancels_queues")
REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed");
// Wait for the action to start running on the remote
- Sleep(2'000);
+ WaitForActionRunning(Session);
// Verify the remote has a non-implicit queue before shutdown
HttpClient RemoteClient(Instance.GetBaseUri() + "/compute");
@@ -2358,7 +2269,6 @@ TEST_CASE("function.remote.shutdown_rejects_new_work")
// Wait for runner discovery
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Baseline: submit an action and verify it completes
{
@@ -2415,7 +2325,7 @@ TEST_CASE("function.session.retract_pending")
REQUIRE_MESSAGE(Enqueued, "Failed to enqueue action");
// Let the scheduler process the pending action
- Sleep(500);
+ Sleep(50);
// Retract the pending action
auto Result = Session.RetractAction(Enqueued.Lsn);
@@ -2424,7 +2334,7 @@ TEST_CASE("function.session.retract_pending")
// The action should be re-enqueued as pending (still no runners, so stays pending).
// Let the scheduler process the retracted action back to pending.
- Sleep(500);
+ Sleep(50);
// Queue should still show 1 active (the action was rescheduled, not completed)
auto Status = Session.GetQueueStatus(QueueResult.QueueId);
@@ -2509,8 +2419,8 @@ TEST_CASE("function.retract_http")
const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from job submission");
- // Wait for the scheduler to process the pending action into m_PendingActions
- Sleep(1'000);
+ // Wait for the blocker action to start running (occupying the single slot)
+ WaitForAnyActionRunningHttp(Client);
// Retract the pending action via POST /jobs/{lsn}/retract
const std::string RetractUrl = fmt::format("/jobs/{}/retract", Lsn);
@@ -2529,7 +2439,7 @@ TEST_CASE("function.retract_http")
}
// A second retract should also succeed (action is back to pending)
- Sleep(500);
+ Sleep(50);
HttpClient::Response RetractResp2 = Client.Post(RetractUrl);
CHECK_MESSAGE(RetractResp2.StatusCode == HttpResponseCode::OK,
fmt::format("Second retract failed: status={}, body={}", RetractResp2.StatusCode, RetractResp2.ToText()));