diff options
Diffstat (limited to 'src/zenserver-test/compute-tests.cpp')
| -rw-r--r-- | src/zenserver-test/compute-tests.cpp | 638 |
1 files changed, 274 insertions, 364 deletions
diff --git a/src/zenserver-test/compute-tests.cpp b/src/zenserver-test/compute-tests.cpp index 835d72713..ce2db366a 100644 --- a/src/zenserver-test/compute-tests.cpp +++ b/src/zenserver-test/compute-tests.cpp @@ -357,6 +357,41 @@ PollForLsnInCompleted(HttpClient& Client, const std::string& CompletedUrl, int L return false; } +static void +WaitForActionRunning(zen::compute::ComputeServiceSession& Session, uint64_t TimeoutMs = 10'000) +{ + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < TimeoutMs) + { + if (Session.GetActionCounts().Running > 0) + { + return; + } + Sleep(50); + } + FAIL("Timed out waiting for action to reach Running state"); +} + +static void +WaitForAnyActionRunningHttp(HttpClient& Client, uint64_t TimeoutMs = 10'000) +{ + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < TimeoutMs) + { + HttpClient::Response Resp = Client.Get("/jobs/running"sv); + if (Resp) + { + CbObject Obj = Resp.AsObject(); + if (Obj["running"sv].AsArrayView().Num() > 0) + { + return; + } + } + Sleep(50); + } + FAIL("Timed out waiting for any action to reach Running state"); +} + static std::string GetRot13Output(const CbPackage& ResultPackage) { @@ -906,10 +941,10 @@ TEST_CASE("function.queues.cancel_running") REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission"); // Wait for the worker process to start executing before cancelling - Sleep(1'000); + const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + WaitForAnyActionRunningHttp(Client); // Cancel the queue, which should interrupt the running Sleep job - const std::string QueueUrl = fmt::format("/queues/{}", QueueId); HttpClient::Response CancelResp = Client.Delete(QueueUrl); REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent, fmt::format("Queue cancellation failed: status={}, body={}", CancelResp.StatusCode, CancelResp.ToText())); @@ -961,10 +996,10 @@ TEST_CASE("function.queues.remote_cancel") REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission"); // Wait for the worker process to start executing before cancelling - Sleep(1'000); + const std::string QueueUrl = fmt::format("/queues/{}", QueueToken); + WaitForAnyActionRunningHttp(Client); // Cancel the queue via its OID token - const std::string QueueUrl = fmt::format("/queues/{}", QueueToken); HttpClient::Response CancelResp = Client.Delete(QueueUrl); REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent, fmt::format("Remote queue cancellation failed: status={}, body={}", CancelResp.StatusCode, CancelResp.ToText())); @@ -1207,7 +1242,7 @@ TEST_CASE("function.priority") // that exit with non-zero exit codes, including retry behaviour and // final failure reporting. -TEST_CASE("function.exit_code.failed_action") +TEST_CASE("function.exit_code") { ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); Instance.SetDataDir(TestEnv.CreateNewTestDir()); @@ -1219,232 +1254,154 @@ TEST_CASE("function.exit_code.failed_action") const IoHash WorkerId = RegisterWorker(Client, TestEnv); - // Create a queue with max_retries=0 so the action fails immediately - // without being rescheduled. - CbObjectWriter ConfigWriter; - ConfigWriter << "max_retries"sv << 0; - - CbObjectWriter BodyWriter; - BodyWriter << "config"sv << ConfigWriter.Save(); - - HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); - REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); + auto CreateQueue = [&](int MaxRetries) -> std::pair<int, std::string> { + CbObjectWriter ConfigWriter; + ConfigWriter << "max_retries"sv << MaxRetries; + CbObjectWriter BodyWriter; + BodyWriter << "config"sv << ConfigWriter.Save(); + HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); + REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); + const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); + return {QueueId, fmt::format("/queues/{}", QueueId)}; + }; - const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); - const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + // Scenario 1: failed_action - immediate failure with max_retries=0 + { + auto [QueueId, QueueUrl] = CreateQueue(0); - // Submit a Fail action with exit code 42 - const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); - HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(42)); - REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText())); + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(42)); + REQUIRE_MESSAGE(SubmitResp, + fmt::format("Fail job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText())); - const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); - REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Fail job submission"); + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Fail job submission"); - // Poll for the LSN to appear in the completed list - const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); - REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), - fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}", - Lsn, - QueueId, - Instance.GetLogOutput())); + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), + fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}", + Lsn, + QueueId, + Instance.GetLogOutput())); - // Verify queue status reflects the failure - HttpClient::Response StatusResp = Client.Get(QueueUrl); - REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); - CbObject QueueStatus = StatusResp.AsObject(); - CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); - CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); - // Verify action history records the failure - const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); - HttpClient::Response HistoryResp = Client.Get(HistoryUrl); - REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); + const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); + HttpClient::Response HistoryResp = Client.Get(HistoryUrl); + REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); - bool FoundInHistory = false; - for (auto& Item : HistoryResp.AsObject()["history"sv]) - { - if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + bool FoundInHistory = false; + for (auto& Item : HistoryResp.AsObject()["history"sv]) { - CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false); - FoundInHistory = true; - break; + if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + { + CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false); + FoundInHistory = true; + break; + } } - } - CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn)); - - // GET /jobs/{lsn} for a failed action should return OK but with an empty result package - const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn); - HttpClient::Response ResultResp = Client.Get(ResultUrl); - CHECK_EQ(ResultResp.StatusCode, HttpResponseCode::OK); -} - -TEST_CASE("function.exit_code.auto_retry") -{ - ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); - Instance.SetDataDir(TestEnv.CreateNewTestDir()); - const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); - REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn)); - const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); - HttpClient Client(ComputeBaseUri); - - const IoHash WorkerId = RegisterWorker(Client, TestEnv); - - // Create a queue with max_retries=2 so the action is retried twice before - // being reported as failed (3 total attempts: initial + 2 retries). - CbObjectWriter ConfigWriter; - ConfigWriter << "max_retries"sv << 2; - - CbObjectWriter BodyWriter; - BodyWriter << "config"sv << ConfigWriter.Save(); + const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn); + HttpClient::Response ResultResp = Client.Get(ResultUrl); + CHECK_EQ(ResultResp.StatusCode, HttpResponseCode::OK); + } - HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); - REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); + // Scenario 2: auto_retry - retried twice before permanent failure + { + auto [QueueId, QueueUrl] = CreateQueue(2); - const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); - const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(1)); + REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode)); - // Submit a Fail action — the worker process will exit with code 1 on - // every attempt, eventually exhausting retries. - const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); - HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(1)); - REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode)); + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); - const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000), + fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}", + Lsn, + QueueId, + Instance.GetLogOutput())); - // Poll for the LSN to appear in the completed list — this only - // happens after all retries are exhausted. - const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); - REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000), - fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}", - Lsn, - QueueId, - Instance.GetLogOutput())); + const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); + HttpClient::Response HistoryResp = Client.Get(HistoryUrl); + REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); - // Verify the action history records the retry count - const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); - HttpClient::Response HistoryResp = Client.Get(HistoryUrl); - REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); - - for (auto& Item : HistoryResp.AsObject()["history"sv]) - { - if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + for (auto& Item : HistoryResp.AsObject()["history"sv]) { - CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false); - CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 2); - break; + if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + { + CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false); + CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 2); + break; + } } - } - // Queue should show 1 failed, 0 completed - HttpClient::Response StatusResp = Client.Get(QueueUrl); - REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); - CbObject QueueStatus = StatusResp.AsObject(); - CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); - CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); -} - -TEST_CASE("function.exit_code.reschedule_failed") -{ - ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); - Instance.SetDataDir(TestEnv.CreateNewTestDir()); - const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); - REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); - - const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); - HttpClient Client(ComputeBaseUri); - - const IoHash WorkerId = RegisterWorker(Client, TestEnv); - - // Create a queue with max_retries=1 so we have room for one manual reschedule - CbObjectWriter ConfigWriter; - ConfigWriter << "max_retries"sv << 1; - - CbObjectWriter BodyWriter; - BodyWriter << "config"sv << ConfigWriter.Save(); - - HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); - REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); - - const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); - const std::string QueueUrl = fmt::format("/queues/{}", QueueId); - - // Submit a Fail action — auto-retry will fire once, then it lands in results as Failed - const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); - HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(7)); - REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode)); - - const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); - - // Wait for the action to exhaust its auto-retry and land in completed - const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); - REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000), - fmt::format("LSN {} did not appear in queue completed list\nServer log:\n{}", Lsn, Instance.GetLogOutput())); - - // Try to manually reschedule — should fail because retry limit is reached - const std::string RescheduleUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn); - HttpClient::Response RescheduleResp = Client.Post(RescheduleUrl); - CHECK_EQ(RescheduleResp.StatusCode, HttpResponseCode::Conflict); -} - -TEST_CASE("function.exit_code.mixed_success_and_failure") -{ - ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); - Instance.SetDataDir(TestEnv.CreateNewTestDir()); - const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); - REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); + } - const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); - HttpClient Client(ComputeBaseUri); + // Scenario 3: reschedule_failed - manual reschedule rejected after retry limit + { + auto [QueueId, QueueUrl] = CreateQueue(1); - const IoHash WorkerId = RegisterWorker(Client, TestEnv); + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(7)); + REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode)); - // Create a queue with max_retries=0 for fast failure - CbObjectWriter ConfigWriter; - ConfigWriter << "max_retries"sv << 0; + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); - CbObjectWriter BodyWriter; - BodyWriter << "config"sv << ConfigWriter.Save(); + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000), + fmt::format("LSN {} did not appear in queue completed list\nServer log:\n{}", Lsn, Instance.GetLogOutput())); - HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); - REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); + const std::string RescheduleUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn); + HttpClient::Response RescheduleResp = Client.Post(RescheduleUrl); + CHECK_EQ(RescheduleResp.StatusCode, HttpResponseCode::Conflict); + } - const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); - const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + // Scenario 4: mixed_success_and_failure - one success and one failure in the same queue + { + auto [QueueId, QueueUrl] = CreateQueue(0); - // Submit one Rot13 (success) and one Fail (failure) - const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); - HttpClient::Response SuccessResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello"sv)); - REQUIRE_MESSAGE(SuccessResp, "Rot13 job submission failed"); - const int LsnSuccess = SuccessResp.AsObject()["lsn"sv].AsInt32(); + HttpClient::Response SuccessResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello"sv)); + REQUIRE_MESSAGE(SuccessResp, "Rot13 job submission failed"); + const int LsnSuccess = SuccessResp.AsObject()["lsn"sv].AsInt32(); - HttpClient::Response FailResp = Client.Post(JobUrl, BuildFailActionPackage(1)); - REQUIRE_MESSAGE(FailResp, "Fail job submission failed"); - const int LsnFail = FailResp.AsObject()["lsn"sv].AsInt32(); + HttpClient::Response FailResp = Client.Post(JobUrl, BuildFailActionPackage(1)); + REQUIRE_MESSAGE(FailResp, "Fail job submission failed"); + const int LsnFail = FailResp.AsObject()["lsn"sv].AsInt32(); - // Wait for both to appear in the completed list - const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); - REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnSuccess), - fmt::format("Success LSN {} did not complete\nServer log:\n{}", LsnSuccess, Instance.GetLogOutput())); - REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnFail), - fmt::format("Fail LSN {} did not complete\nServer log:\n{}", LsnFail, Instance.GetLogOutput())); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnSuccess), + fmt::format("Success LSN {} did not complete\nServer log:\n{}", LsnSuccess, Instance.GetLogOutput())); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnFail), + fmt::format("Fail LSN {} did not complete\nServer log:\n{}", LsnFail, Instance.GetLogOutput())); - // Verify queue counters - HttpClient::Response StatusResp = Client.Get(QueueUrl); - REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); - CbObject QueueStatus = StatusResp.AsObject(); - CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 1); - CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); - CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0); + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0); + } } -TEST_CASE("function.crash.abort") +TEST_CASE("function.crash") { ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); Instance.SetDataDir(TestEnv.CreateNewTestDir()); @@ -1456,174 +1413,125 @@ TEST_CASE("function.crash.abort") const IoHash WorkerId = RegisterWorker(Client, TestEnv); - // Create a queue with max_retries=0 so we don't wait through retries - CbObjectWriter ConfigWriter; - ConfigWriter << "max_retries"sv << 0; + auto CreateQueue = [&](int MaxRetries) -> std::pair<int, std::string> { + CbObjectWriter ConfigWriter; + ConfigWriter << "max_retries"sv << MaxRetries; + CbObjectWriter BodyWriter; + BodyWriter << "config"sv << ConfigWriter.Save(); + HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); + REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); + const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); + return {QueueId, fmt::format("/queues/{}", QueueId)}; + }; - CbObjectWriter BodyWriter; - BodyWriter << "config"sv << ConfigWriter.Save(); + // Scenario 1: abort - worker process calls std::abort(), no retries + { + auto [QueueId, QueueUrl] = CreateQueue(0); - HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); - REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv)); + REQUIRE_MESSAGE(SubmitResp, + fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText())); - const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); - const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission"); - // Submit a Crash action that calls std::abort() - const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); - HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv)); - REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText())); + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), + fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}", + Lsn, + QueueId, + Instance.GetLogOutput())); - const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); - REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission"); + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); - // Poll for the LSN to appear in the completed list - const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); - REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), - fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}", - Lsn, - QueueId, - Instance.GetLogOutput())); + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); - // Verify queue status reflects the failure - HttpClient::Response StatusResp = Client.Get(QueueUrl); - REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); - - CbObject QueueStatus = StatusResp.AsObject(); - CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); - CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); - - // Verify action history records the failure - const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); - HttpClient::Response HistoryResp = Client.Get(HistoryUrl); - REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); + const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); + HttpClient::Response HistoryResp = Client.Get(HistoryUrl); + REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); - bool FoundInHistory = false; - for (auto& Item : HistoryResp.AsObject()["history"sv]) - { - if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + bool FoundInHistory = false; + for (auto& Item : HistoryResp.AsObject()["history"sv]) { - CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false); - FoundInHistory = true; - break; + if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + { + CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false); + FoundInHistory = true; + break; + } } + CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn)); } - CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn)); -} - -TEST_CASE("function.crash.nullptr") -{ - ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); - Instance.SetDataDir(TestEnv.CreateNewTestDir()); - const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); - REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); - - const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); - HttpClient Client(ComputeBaseUri); - - const IoHash WorkerId = RegisterWorker(Client, TestEnv); - - // Create a queue with max_retries=0 - CbObjectWriter ConfigWriter; - ConfigWriter << "max_retries"sv << 0; - - CbObjectWriter BodyWriter; - BodyWriter << "config"sv << ConfigWriter.Save(); - - HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); - REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); - - const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); - const std::string QueueUrl = fmt::format("/queues/{}", QueueId); - - // Submit a Crash action that dereferences a null pointer - const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); - HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("nullptr"sv)); - REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText())); - - const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); - REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission"); - - // Poll for the LSN to appear in the completed list - const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); - REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), - fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}", - Lsn, - QueueId, - Instance.GetLogOutput())); - - // Verify queue status reflects the failure - HttpClient::Response StatusResp = Client.Get(QueueUrl); - REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); - CbObject QueueStatus = StatusResp.AsObject(); - CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); - CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); -} - -TEST_CASE("function.crash.auto_retry") -{ - ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); - Instance.SetDataDir(TestEnv.CreateNewTestDir()); - const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); - REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + // Scenario 2: nullptr - worker process dereferences null, no retries + { + auto [QueueId, QueueUrl] = CreateQueue(0); - const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); - HttpClient Client(ComputeBaseUri); + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("nullptr"sv)); + REQUIRE_MESSAGE(SubmitResp, + fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText())); - const IoHash WorkerId = RegisterWorker(Client, TestEnv); + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission"); - // Create a queue with max_retries=1 — the crash should be retried once - // before being reported as permanently failed. - CbObjectWriter ConfigWriter; - ConfigWriter << "max_retries"sv << 1; + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), + fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}", + Lsn, + QueueId, + Instance.GetLogOutput())); - CbObjectWriter BodyWriter; - BodyWriter << "config"sv << ConfigWriter.Save(); + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); - HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); - REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); + } - const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); - const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + // Scenario 3: auto_retry - crash retried once before permanent failure + { + auto [QueueId, QueueUrl] = CreateQueue(1); - // Submit a Crash action — will crash on every attempt - const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); - HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv)); - REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}", SubmitResp.StatusCode)); + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv)); + REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}", SubmitResp.StatusCode)); - const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); - // Poll for the LSN to appear in the completed list after retries exhaust - const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); - REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000), - fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}", - Lsn, - QueueId, - Instance.GetLogOutput())); + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000), + fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}", + Lsn, + QueueId, + Instance.GetLogOutput())); - // Verify the action history records the retry count - const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); - HttpClient::Response HistoryResp = Client.Get(HistoryUrl); - REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); + const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); + HttpClient::Response HistoryResp = Client.Get(HistoryUrl); + REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); - for (auto& Item : HistoryResp.AsObject()["history"sv]) - { - if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + for (auto& Item : HistoryResp.AsObject()["history"sv]) { - CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false); - CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 1); - break; + if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + { + CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false); + CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 1); + break; + } } - } - // Queue should show 1 failed, 0 completed - HttpClient::Response StatusResp = Client.Get(QueueUrl); - REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); - CbObject QueueStatus = StatusResp.AsObject(); - CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); - CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); + } } ////////////////////////////////////////////////////////////////////////// @@ -1662,7 +1570,6 @@ TEST_CASE("function.remote.worker_sync_on_discovery") // Trigger immediate orchestrator re-query and wait for runner setup Session.NotifyOrchestratorChanged(); - Sleep(2'000); // Submit Rot13 action via session CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver); @@ -1721,7 +1628,6 @@ TEST_CASE("function.remote.late_runner_discovery") // Wait for W1 discovery Session.NotifyOrchestratorChanged(); - Sleep(2'000); // Baseline: submit Rot13 action and verify it completes on W1 { @@ -1763,23 +1669,33 @@ TEST_CASE("function.remote.late_runner_discovery") // Wait for W2 discovery Session.NotifyOrchestratorChanged(); - Sleep(2'000); - // Verify W2 received the worker by querying its /compute/workers endpoint directly + // Poll W2 until the worker has been synced (SyncWorkersToRunner is async) { - const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port2); - HttpClient Client(ComputeBaseUri); - HttpClient::Response ListResp = Client.Get("/workers"sv); - REQUIRE_MESSAGE(ListResp, "Failed to list workers on W2"); + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port2); + HttpClient Client(ComputeBaseUri); + bool WorkerFound = false; + Stopwatch Timer; - bool WorkerFound = false; - for (auto& Item : ListResp.AsObject()["workers"sv]) + while (Timer.GetElapsedTimeMs() < 10'000) { - if (Item.AsHash() == WorkerPackage.GetObjectHash()) + HttpClient::Response ListResp = Client.Get("/workers"sv); + if (ListResp) + { + for (auto& Item : ListResp.AsObject()["workers"sv]) + { + if (Item.AsHash() == WorkerPackage.GetObjectHash()) + { + WorkerFound = true; + break; + } + } + } + if (WorkerFound) { - WorkerFound = true; break; } + Sleep(50); } REQUIRE_MESSAGE(WorkerFound, @@ -1844,7 +1760,6 @@ TEST_CASE("function.remote.queue_association") // Wait for scheduler to discover the runner Session.NotifyOrchestratorChanged(); - Sleep(2'000); // Create a local queue and submit action to it auto QueueResult = Session.CreateQueue(); @@ -1922,7 +1837,6 @@ TEST_CASE("function.remote.queue_cancel_propagation") // Wait for scheduler to discover the runner Session.NotifyOrchestratorChanged(); - Sleep(2'000); // Create a local queue and submit a long-running Sleep action auto QueueResult = Session.CreateQueue(); @@ -1935,7 +1849,7 @@ TEST_CASE("function.remote.queue_cancel_propagation") REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed"); // Wait for the action to start running on the remote - Sleep(2'000); + WaitForActionRunning(Session); // Cancel the local queue — this should propagate to the remote Session.CancelQueue(QueueId); @@ -2008,7 +1922,7 @@ TEST_CASE("function.abandon_running_http") REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN"); // Wait for the process to start running - Sleep(1'000); + WaitForAnyActionRunningHttp(Client); // Verify the ready endpoint returns OK before abandon { @@ -2139,7 +2053,6 @@ TEST_CASE("function.session.abandon_running") // Wait for scheduler to discover the runner Session.NotifyOrchestratorChanged(); - Sleep(2'000); // Create a queue and submit a long-running Sleep action auto QueueResult = Session.CreateQueue(); @@ -2152,7 +2065,7 @@ TEST_CASE("function.session.abandon_running") REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed"); // Wait for the action to start running on the remote - Sleep(2'000); + WaitForActionRunning(Session); // Transition to Abandoned — should abandon the running action bool Transitioned = Session.Abandon(); @@ -2210,7 +2123,6 @@ TEST_CASE("function.remote.abandon_propagation") // Wait for scheduler to discover the runner Session.NotifyOrchestratorChanged(); - Sleep(2'000); // Create a local queue and submit a long-running Sleep action auto QueueResult = Session.CreateQueue(); @@ -2223,7 +2135,7 @@ TEST_CASE("function.remote.abandon_propagation") REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed"); // Wait for the action to start running on the remote - Sleep(2'000); + WaitForActionRunning(Session); // Transition to Abandoned — should abandon the running action and propagate bool Transitioned = Session.Abandon(); @@ -2283,7 +2195,6 @@ TEST_CASE("function.remote.shutdown_cancels_queues") Session.RegisterWorker(WorkerPackage); Session.NotifyOrchestratorChanged(); - Sleep(2'000); // Create a queue and submit a long-running action so the remote queue is established auto QueueResult = Session.CreateQueue(); @@ -2296,7 +2207,7 @@ TEST_CASE("function.remote.shutdown_cancels_queues") REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed"); // Wait for the action to start running on the remote - Sleep(2'000); + WaitForActionRunning(Session); // Verify the remote has a non-implicit queue before shutdown HttpClient RemoteClient(Instance.GetBaseUri() + "/compute"); @@ -2358,7 +2269,6 @@ TEST_CASE("function.remote.shutdown_rejects_new_work") // Wait for runner discovery Session.NotifyOrchestratorChanged(); - Sleep(2'000); // Baseline: submit an action and verify it completes { @@ -2415,7 +2325,7 @@ TEST_CASE("function.session.retract_pending") REQUIRE_MESSAGE(Enqueued, "Failed to enqueue action"); // Let the scheduler process the pending action - Sleep(500); + Sleep(50); // Retract the pending action auto Result = Session.RetractAction(Enqueued.Lsn); @@ -2424,7 +2334,7 @@ TEST_CASE("function.session.retract_pending") // The action should be re-enqueued as pending (still no runners, so stays pending). // Let the scheduler process the retracted action back to pending. - Sleep(500); + Sleep(50); // Queue should still show 1 active (the action was rescheduled, not completed) auto Status = Session.GetQueueStatus(QueueResult.QueueId); @@ -2509,8 +2419,8 @@ TEST_CASE("function.retract_http") const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from job submission"); - // Wait for the scheduler to process the pending action into m_PendingActions - Sleep(1'000); + // Wait for the blocker action to start running (occupying the single slot) + WaitForAnyActionRunningHttp(Client); // Retract the pending action via POST /jobs/{lsn}/retract const std::string RetractUrl = fmt::format("/jobs/{}/retract", Lsn); @@ -2529,7 +2439,7 @@ TEST_CASE("function.retract_http") } // A second retract should also succeed (action is back to pending) - Sleep(500); + Sleep(50); HttpClient::Response RetractResp2 = Client.Post(RetractUrl); CHECK_MESSAGE(RetractResp2.StatusCode == HttpResponseCode::OK, fmt::format("Second retract failed: status={}, body={}", RetractResp2.StatusCode, RetractResp2.ToText())); |