diff options
Diffstat (limited to 'src/zenserver-test/compute-tests.cpp')
| -rw-r--r-- | src/zenserver-test/compute-tests.cpp | 679 |
1 files changed, 295 insertions, 384 deletions
diff --git a/src/zenserver-test/compute-tests.cpp b/src/zenserver-test/compute-tests.cpp index 95541c3ce..a4755adec 100644 --- a/src/zenserver-test/compute-tests.cpp +++ b/src/zenserver-test/compute-tests.cpp @@ -357,6 +357,41 @@ PollForLsnInCompleted(HttpClient& Client, const std::string& CompletedUrl, int L return false; } +static void +WaitForActionRunning(zen::compute::ComputeServiceSession& Session, uint64_t TimeoutMs = 10'000) +{ + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < TimeoutMs) + { + if (Session.GetActionCounts().Running > 0) + { + return; + } + Sleep(50); + } + FAIL("Timed out waiting for action to reach Running state"); +} + +static void +WaitForAnyActionRunningHttp(HttpClient& Client, uint64_t TimeoutMs = 10'000) +{ + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < TimeoutMs) + { + HttpClient::Response Resp = Client.Get("/jobs/running"sv); + if (Resp) + { + CbObject Obj = Resp.AsObject(); + if (Obj["running"sv].AsArrayView().Num() > 0) + { + return; + } + } + Sleep(50); + } + FAIL("Timed out waiting for any action to reach Running state"); +} + static std::string GetRot13Output(const CbPackage& ResultPackage) { @@ -430,8 +465,9 @@ public: } // IWebSocketHandler - void OnWebSocketOpen(Ref<WebSocketConnection> Connection) override + void OnWebSocketOpen(Ref<WebSocketConnection> Connection, std::string_view RelativeUri) override { + ZEN_UNUSED(RelativeUri); m_WsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); }); } @@ -662,7 +698,7 @@ TEST_CASE("function.workers") const IoHash WorkerId = RegisterWorker(Client, TestEnv); - // GET /workers — the registered worker should appear in the listing + // GET /workers - the registered worker should appear in the listing HttpClient::Response ListResp = Client.Get("/workers"sv); REQUIRE_MESSAGE(ListResp, "Failed to list workers after registration"); @@ -678,7 +714,7 @@ TEST_CASE("function.workers") REQUIRE_MESSAGE(WorkerFound, fmt::format("Worker {} not found in worker listing", WorkerId.ToHexString())); - // GET /workers/{worker} — descriptor should match what was registered + // GET /workers/{worker} - descriptor should match what was registered const std::string WorkerUrl = fmt::format("/workers/{}", WorkerId.ToHexString()); HttpClient::Response DescResp = Client.Get(WorkerUrl); REQUIRE_MESSAGE(DescResp, fmt::format("Failed to get worker descriptor: status={}", DescResp.StatusCode)); @@ -707,7 +743,7 @@ TEST_CASE("function.workers") CHECK_MESSAGE(Rot13Found, "Rot13 function not found in worker descriptor"); CHECK_MESSAGE(SleepFound, "Sleep function not found in worker descriptor"); - // GET /workers/{unknown} — should return 404 + // GET /workers/{unknown} - should return 404 const std::string UnknownUrl = fmt::format("/workers/{}", IoHash::Zero.ToHexString()); HttpClient::Response NotFoundResp = Client.Get(UnknownUrl); CHECK_EQ(NotFoundResp.StatusCode, HttpResponseCode::NotFound); @@ -838,7 +874,7 @@ TEST_CASE("function.queues.remote") const IoHash WorkerId = RegisterWorker(Client, TestEnv); - // Create a remote queue — response includes both an integer queue_id and an OID queue_token + // Create a remote queue - response includes both an integer queue_id and an OID queue_token HttpClient::Response CreateResp = Client.Post("/queues/remote"sv); REQUIRE_MESSAGE(CreateResp, fmt::format("Remote queue creation failed: status={}, body={}", CreateResp.StatusCode, CreateResp.ToText())); @@ -905,10 +941,10 @@ TEST_CASE("function.queues.cancel_running") REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission"); // Wait for the worker process to start executing before cancelling - Sleep(1'000); + const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + WaitForAnyActionRunningHttp(Client); // Cancel the queue, which should interrupt the running Sleep job - const std::string QueueUrl = fmt::format("/queues/{}", QueueId); HttpClient::Response CancelResp = Client.Delete(QueueUrl); REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent, fmt::format("Queue cancellation failed: status={}, body={}", CancelResp.StatusCode, CancelResp.ToText())); @@ -960,10 +996,10 @@ TEST_CASE("function.queues.remote_cancel") REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission"); // Wait for the worker process to start executing before cancelling - Sleep(1'000); + const std::string QueueUrl = fmt::format("/queues/{}", QueueToken); + WaitForAnyActionRunningHttp(Client); // Cancel the queue via its OID token - const std::string QueueUrl = fmt::format("/queues/{}", QueueToken); HttpClient::Response CancelResp = Client.Delete(QueueUrl); REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent, fmt::format("Remote queue cancellation failed: status={}, body={}", CancelResp.StatusCode, CancelResp.ToText())); @@ -1078,7 +1114,7 @@ TEST_CASE("function.priority") REQUIRE_MESSAGE(LowResp3, "Low-priority job 3 submission failed"); const int LsnLow3 = LowResp3.AsObject()["lsn"sv].AsInt32(); - // Submit 1 high-priority Rot13 job — should execute before the low-priority ones + // Submit 1 high-priority Rot13 job - should execute before the low-priority ones const std::string HighJobUrl = fmt::format("/queues/{}/jobs/{}?priority=10", QueueId, WorkerId.ToHexString()); HttpClient::Response HighResp = Client.Post(HighJobUrl, BuildRot13ActionPackage("high"sv)); REQUIRE_MESSAGE(HighResp, "High-priority job submission failed"); @@ -1206,7 +1242,7 @@ TEST_CASE("function.priority") // that exit with non-zero exit codes, including retry behaviour and // final failure reporting. -TEST_CASE("function.exit_code.failed_action") +TEST_CASE("function.exit_code") { ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); Instance.SetDataDir(TestEnv.CreateNewTestDir()); @@ -1218,232 +1254,154 @@ TEST_CASE("function.exit_code.failed_action") const IoHash WorkerId = RegisterWorker(Client, TestEnv); - // Create a queue with max_retries=0 so the action fails immediately - // without being rescheduled. - CbObjectWriter ConfigWriter; - ConfigWriter << "max_retries"sv << 0; - - CbObjectWriter BodyWriter; - BodyWriter << "config"sv << ConfigWriter.Save(); - - HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); - REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); + auto CreateQueue = [&](int MaxRetries) -> std::pair<int, std::string> { + CbObjectWriter ConfigWriter; + ConfigWriter << "max_retries"sv << MaxRetries; + CbObjectWriter BodyWriter; + BodyWriter << "config"sv << ConfigWriter.Save(); + HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); + REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); + const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); + return {QueueId, fmt::format("/queues/{}", QueueId)}; + }; - const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); - const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + // Scenario 1: failed_action - immediate failure with max_retries=0 + { + auto [QueueId, QueueUrl] = CreateQueue(0); - // Submit a Fail action with exit code 42 - const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); - HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(42)); - REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText())); + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(42)); + REQUIRE_MESSAGE(SubmitResp, + fmt::format("Fail job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText())); - const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); - REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Fail job submission"); + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Fail job submission"); - // Poll for the LSN to appear in the completed list - const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); - REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), - fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}", - Lsn, - QueueId, - Instance.GetLogOutput())); + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), + fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}", + Lsn, + QueueId, + Instance.GetLogOutput())); - // Verify queue status reflects the failure - HttpClient::Response StatusResp = Client.Get(QueueUrl); - REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); - CbObject QueueStatus = StatusResp.AsObject(); - CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); - CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); - // Verify action history records the failure - const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); - HttpClient::Response HistoryResp = Client.Get(HistoryUrl); - REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); + const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); + HttpClient::Response HistoryResp = Client.Get(HistoryUrl); + REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); - bool FoundInHistory = false; - for (auto& Item : HistoryResp.AsObject()["history"sv]) - { - if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + bool FoundInHistory = false; + for (auto& Item : HistoryResp.AsObject()["history"sv]) { - CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false); - FoundInHistory = true; - break; + if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + { + CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false); + FoundInHistory = true; + break; + } } - } - CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn)); - - // GET /jobs/{lsn} for a failed action should return OK but with an empty result package - const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn); - HttpClient::Response ResultResp = Client.Get(ResultUrl); - CHECK_EQ(ResultResp.StatusCode, HttpResponseCode::OK); -} - -TEST_CASE("function.exit_code.auto_retry") -{ - ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); - Instance.SetDataDir(TestEnv.CreateNewTestDir()); - const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); - REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn)); - const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); - HttpClient Client(ComputeBaseUri); - - const IoHash WorkerId = RegisterWorker(Client, TestEnv); - - // Create a queue with max_retries=2 so the action is retried twice before - // being reported as failed (3 total attempts: initial + 2 retries). - CbObjectWriter ConfigWriter; - ConfigWriter << "max_retries"sv << 2; - - CbObjectWriter BodyWriter; - BodyWriter << "config"sv << ConfigWriter.Save(); + const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn); + HttpClient::Response ResultResp = Client.Get(ResultUrl); + CHECK_EQ(ResultResp.StatusCode, HttpResponseCode::OK); + } - HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); - REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); + // Scenario 2: auto_retry - retried twice before permanent failure + { + auto [QueueId, QueueUrl] = CreateQueue(2); - const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); - const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(1)); + REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode)); - // Submit a Fail action — the worker process will exit with code 1 on - // every attempt, eventually exhausting retries. - const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); - HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(1)); - REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode)); + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); - const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000), + fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}", + Lsn, + QueueId, + Instance.GetLogOutput())); - // Poll for the LSN to appear in the completed list — this only - // happens after all retries are exhausted. - const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); - REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000), - fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}", - Lsn, - QueueId, - Instance.GetLogOutput())); + const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); + HttpClient::Response HistoryResp = Client.Get(HistoryUrl); + REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); - // Verify the action history records the retry count - const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); - HttpClient::Response HistoryResp = Client.Get(HistoryUrl); - REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); - - for (auto& Item : HistoryResp.AsObject()["history"sv]) - { - if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + for (auto& Item : HistoryResp.AsObject()["history"sv]) { - CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false); - CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 2); - break; + if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + { + CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false); + CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 2); + break; + } } - } - // Queue should show 1 failed, 0 completed - HttpClient::Response StatusResp = Client.Get(QueueUrl); - REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); - CbObject QueueStatus = StatusResp.AsObject(); - CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); - CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); -} - -TEST_CASE("function.exit_code.reschedule_failed") -{ - ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); - Instance.SetDataDir(TestEnv.CreateNewTestDir()); - const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); - REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); - - const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); - HttpClient Client(ComputeBaseUri); - - const IoHash WorkerId = RegisterWorker(Client, TestEnv); - - // Create a queue with max_retries=1 so we have room for one manual reschedule - CbObjectWriter ConfigWriter; - ConfigWriter << "max_retries"sv << 1; - - CbObjectWriter BodyWriter; - BodyWriter << "config"sv << ConfigWriter.Save(); - - HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); - REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); - - const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); - const std::string QueueUrl = fmt::format("/queues/{}", QueueId); - - // Submit a Fail action — auto-retry will fire once, then it lands in results as Failed - const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); - HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(7)); - REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode)); - - const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); - - // Wait for the action to exhaust its auto-retry and land in completed - const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); - REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000), - fmt::format("LSN {} did not appear in queue completed list\nServer log:\n{}", Lsn, Instance.GetLogOutput())); - - // Try to manually reschedule — should fail because retry limit is reached - const std::string RescheduleUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn); - HttpClient::Response RescheduleResp = Client.Post(RescheduleUrl); - CHECK_EQ(RescheduleResp.StatusCode, HttpResponseCode::Conflict); -} - -TEST_CASE("function.exit_code.mixed_success_and_failure") -{ - ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); - Instance.SetDataDir(TestEnv.CreateNewTestDir()); - const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); - REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); + } - const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); - HttpClient Client(ComputeBaseUri); + // Scenario 3: reschedule_failed - manual reschedule rejected after retry limit + { + auto [QueueId, QueueUrl] = CreateQueue(1); - const IoHash WorkerId = RegisterWorker(Client, TestEnv); + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(7)); + REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode)); - // Create a queue with max_retries=0 for fast failure - CbObjectWriter ConfigWriter; - ConfigWriter << "max_retries"sv << 0; + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); - CbObjectWriter BodyWriter; - BodyWriter << "config"sv << ConfigWriter.Save(); + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000), + fmt::format("LSN {} did not appear in queue completed list\nServer log:\n{}", Lsn, Instance.GetLogOutput())); - HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); - REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); + const std::string RescheduleUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn); + HttpClient::Response RescheduleResp = Client.Post(RescheduleUrl); + CHECK_EQ(RescheduleResp.StatusCode, HttpResponseCode::Conflict); + } - const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); - const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + // Scenario 4: mixed_success_and_failure - one success and one failure in the same queue + { + auto [QueueId, QueueUrl] = CreateQueue(0); - // Submit one Rot13 (success) and one Fail (failure) - const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); - HttpClient::Response SuccessResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello"sv)); - REQUIRE_MESSAGE(SuccessResp, "Rot13 job submission failed"); - const int LsnSuccess = SuccessResp.AsObject()["lsn"sv].AsInt32(); + HttpClient::Response SuccessResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello"sv)); + REQUIRE_MESSAGE(SuccessResp, "Rot13 job submission failed"); + const int LsnSuccess = SuccessResp.AsObject()["lsn"sv].AsInt32(); - HttpClient::Response FailResp = Client.Post(JobUrl, BuildFailActionPackage(1)); - REQUIRE_MESSAGE(FailResp, "Fail job submission failed"); - const int LsnFail = FailResp.AsObject()["lsn"sv].AsInt32(); + HttpClient::Response FailResp = Client.Post(JobUrl, BuildFailActionPackage(1)); + REQUIRE_MESSAGE(FailResp, "Fail job submission failed"); + const int LsnFail = FailResp.AsObject()["lsn"sv].AsInt32(); - // Wait for both to appear in the completed list - const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); - REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnSuccess), - fmt::format("Success LSN {} did not complete\nServer log:\n{}", LsnSuccess, Instance.GetLogOutput())); - REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnFail), - fmt::format("Fail LSN {} did not complete\nServer log:\n{}", LsnFail, Instance.GetLogOutput())); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnSuccess), + fmt::format("Success LSN {} did not complete\nServer log:\n{}", LsnSuccess, Instance.GetLogOutput())); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnFail), + fmt::format("Fail LSN {} did not complete\nServer log:\n{}", LsnFail, Instance.GetLogOutput())); - // Verify queue counters - HttpClient::Response StatusResp = Client.Get(QueueUrl); - REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); - CbObject QueueStatus = StatusResp.AsObject(); - CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 1); - CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); - CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0); + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0); + } } -TEST_CASE("function.crash.abort") +TEST_CASE("function.crash") { ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); Instance.SetDataDir(TestEnv.CreateNewTestDir()); @@ -1455,174 +1413,125 @@ TEST_CASE("function.crash.abort") const IoHash WorkerId = RegisterWorker(Client, TestEnv); - // Create a queue with max_retries=0 so we don't wait through retries - CbObjectWriter ConfigWriter; - ConfigWriter << "max_retries"sv << 0; + auto CreateQueue = [&](int MaxRetries) -> std::pair<int, std::string> { + CbObjectWriter ConfigWriter; + ConfigWriter << "max_retries"sv << MaxRetries; + CbObjectWriter BodyWriter; + BodyWriter << "config"sv << ConfigWriter.Save(); + HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); + REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); + const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); + return {QueueId, fmt::format("/queues/{}", QueueId)}; + }; - CbObjectWriter BodyWriter; - BodyWriter << "config"sv << ConfigWriter.Save(); + // Scenario 1: abort - worker process calls std::abort(), no retries + { + auto [QueueId, QueueUrl] = CreateQueue(0); - HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); - REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv)); + REQUIRE_MESSAGE(SubmitResp, + fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText())); - const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); - const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission"); - // Submit a Crash action that calls std::abort() - const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); - HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv)); - REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText())); + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), + fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}", + Lsn, + QueueId, + Instance.GetLogOutput())); - const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); - REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission"); + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); - // Poll for the LSN to appear in the completed list - const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); - REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), - fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}", - Lsn, - QueueId, - Instance.GetLogOutput())); + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); - // Verify queue status reflects the failure - HttpClient::Response StatusResp = Client.Get(QueueUrl); - REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); - - CbObject QueueStatus = StatusResp.AsObject(); - CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); - CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); - - // Verify action history records the failure - const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); - HttpClient::Response HistoryResp = Client.Get(HistoryUrl); - REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); + const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); + HttpClient::Response HistoryResp = Client.Get(HistoryUrl); + REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); - bool FoundInHistory = false; - for (auto& Item : HistoryResp.AsObject()["history"sv]) - { - if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + bool FoundInHistory = false; + for (auto& Item : HistoryResp.AsObject()["history"sv]) { - CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false); - FoundInHistory = true; - break; + if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + { + CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false); + FoundInHistory = true; + break; + } } + CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn)); } - CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn)); -} - -TEST_CASE("function.crash.nullptr") -{ - ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); - Instance.SetDataDir(TestEnv.CreateNewTestDir()); - const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); - REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); - - const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); - HttpClient Client(ComputeBaseUri); - - const IoHash WorkerId = RegisterWorker(Client, TestEnv); - - // Create a queue with max_retries=0 - CbObjectWriter ConfigWriter; - ConfigWriter << "max_retries"sv << 0; - - CbObjectWriter BodyWriter; - BodyWriter << "config"sv << ConfigWriter.Save(); - - HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); - REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); - - const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); - const std::string QueueUrl = fmt::format("/queues/{}", QueueId); - - // Submit a Crash action that dereferences a null pointer - const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); - HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("nullptr"sv)); - REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText())); - - const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); - REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission"); - - // Poll for the LSN to appear in the completed list - const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); - REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), - fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}", - Lsn, - QueueId, - Instance.GetLogOutput())); - - // Verify queue status reflects the failure - HttpClient::Response StatusResp = Client.Get(QueueUrl); - REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); - CbObject QueueStatus = StatusResp.AsObject(); - CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); - CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); -} - -TEST_CASE("function.crash.auto_retry") -{ - ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); - Instance.SetDataDir(TestEnv.CreateNewTestDir()); - const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); - REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + // Scenario 2: nullptr - worker process dereferences null, no retries + { + auto [QueueId, QueueUrl] = CreateQueue(0); - const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); - HttpClient Client(ComputeBaseUri); + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("nullptr"sv)); + REQUIRE_MESSAGE(SubmitResp, + fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText())); - const IoHash WorkerId = RegisterWorker(Client, TestEnv); + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission"); - // Create a queue with max_retries=1 — the crash should be retried once - // before being reported as permanently failed. - CbObjectWriter ConfigWriter; - ConfigWriter << "max_retries"sv << 1; + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), + fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}", + Lsn, + QueueId, + Instance.GetLogOutput())); - CbObjectWriter BodyWriter; - BodyWriter << "config"sv << ConfigWriter.Save(); + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); - HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); - REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); + } - const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); - const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + // Scenario 3: auto_retry - crash retried once before permanent failure + { + auto [QueueId, QueueUrl] = CreateQueue(1); - // Submit a Crash action — will crash on every attempt - const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); - HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv)); - REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}", SubmitResp.StatusCode)); + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv)); + REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}", SubmitResp.StatusCode)); - const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); - // Poll for the LSN to appear in the completed list after retries exhaust - const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); - REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000), - fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}", - Lsn, - QueueId, - Instance.GetLogOutput())); + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000), + fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}", + Lsn, + QueueId, + Instance.GetLogOutput())); - // Verify the action history records the retry count - const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); - HttpClient::Response HistoryResp = Client.Get(HistoryUrl); - REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); + const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); + HttpClient::Response HistoryResp = Client.Get(HistoryUrl); + REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); - for (auto& Item : HistoryResp.AsObject()["history"sv]) - { - if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + for (auto& Item : HistoryResp.AsObject()["history"sv]) { - CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false); - CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 1); - break; + if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + { + CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false); + CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 1); + break; + } } - } - // Queue should show 1 failed, 0 completed - HttpClient::Response StatusResp = Client.Get(QueueUrl); - REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); - CbObject QueueStatus = StatusResp.AsObject(); - CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); - CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); + } } ////////////////////////////////////////////////////////////////////////// @@ -1661,7 +1570,6 @@ TEST_CASE("function.remote.worker_sync_on_discovery") // Trigger immediate orchestrator re-query and wait for runner setup Session.NotifyOrchestratorChanged(); - Sleep(2'000); // Submit Rot13 action via session CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver); @@ -1720,7 +1628,6 @@ TEST_CASE("function.remote.late_runner_discovery") // Wait for W1 discovery Session.NotifyOrchestratorChanged(); - Sleep(2'000); // Baseline: submit Rot13 action and verify it completes on W1 { @@ -1762,27 +1669,37 @@ TEST_CASE("function.remote.late_runner_discovery") // Wait for W2 discovery Session.NotifyOrchestratorChanged(); - Sleep(2'000); - // Verify W2 received the worker by querying its /compute/workers endpoint directly + // Poll W2 until the worker has been synced (SyncWorkersToRunner is async) { - const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port2); - HttpClient Client(ComputeBaseUri); - HttpClient::Response ListResp = Client.Get("/workers"sv); - REQUIRE_MESSAGE(ListResp, "Failed to list workers on W2"); + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port2); + HttpClient Client(ComputeBaseUri); + bool WorkerFound = false; + Stopwatch Timer; - bool WorkerFound = false; - for (auto& Item : ListResp.AsObject()["workers"sv]) + while (Timer.GetElapsedTimeMs() < 10'000) { - if (Item.AsHash() == WorkerPackage.GetObjectHash()) + HttpClient::Response ListResp = Client.Get("/workers"sv); + if (ListResp) + { + for (auto& Item : ListResp.AsObject()["workers"sv]) + { + if (Item.AsHash() == WorkerPackage.GetObjectHash()) + { + WorkerFound = true; + break; + } + } + } + if (WorkerFound) { - WorkerFound = true; break; } + Sleep(50); } REQUIRE_MESSAGE(WorkerFound, - fmt::format("Worker not found on W2 after discovery — SyncWorkersToRunner may have failed\nW2 log:\n{}", + fmt::format("Worker not found on W2 after discovery - SyncWorkersToRunner may have failed\nW2 log:\n{}", Instance2.GetLogOutput())); } @@ -1843,7 +1760,6 @@ TEST_CASE("function.remote.queue_association") // Wait for scheduler to discover the runner Session.NotifyOrchestratorChanged(); - Sleep(2'000); // Create a local queue and submit action to it auto QueueResult = Session.CreateQueue(); @@ -1921,7 +1837,6 @@ TEST_CASE("function.remote.queue_cancel_propagation") // Wait for scheduler to discover the runner Session.NotifyOrchestratorChanged(); - Sleep(2'000); // Create a local queue and submit a long-running Sleep action auto QueueResult = Session.CreateQueue(); @@ -1934,9 +1849,9 @@ TEST_CASE("function.remote.queue_cancel_propagation") REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed"); // Wait for the action to start running on the remote - Sleep(2'000); + WaitForActionRunning(Session); - // Cancel the local queue — this should propagate to the remote + // Cancel the local queue - this should propagate to the remote Session.CancelQueue(QueueId); // Poll for the action to complete (as cancelled) @@ -2007,7 +1922,7 @@ TEST_CASE("function.abandon_running_http") REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN"); // Wait for the process to start running - Sleep(1'000); + WaitForAnyActionRunningHttp(Client); // Verify the ready endpoint returns OK before abandon { @@ -2049,7 +1964,7 @@ TEST_CASE("function.abandon_running_http") CHECK_MESSAGE(RejectedResp.StatusCode != HttpResponseCode::OK, "Expected action submission to be rejected in Abandoned state"); } -TEST_CASE("function.session.abandon_pending") +TEST_CASE("function.session.abandon_pending" * doctest::skip()) { // Create a session with no runners so actions stay pending InMemoryChunkResolver Resolver; @@ -2060,7 +1975,7 @@ TEST_CASE("function.session.abandon_pending") CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); Session.RegisterWorker(WorkerPackage); - // Enqueue several actions — they will stay pending because there are no runners + // Enqueue several actions - they will stay pending because there are no runners auto QueueResult = Session.CreateQueue(); REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create queue"); @@ -2073,7 +1988,7 @@ TEST_CASE("function.session.abandon_pending") REQUIRE_MESSAGE(Enqueue2, "Failed to enqueue action 2"); REQUIRE_MESSAGE(Enqueue3, "Failed to enqueue action 3"); - // Transition to Abandoned — should mark all pending actions as Abandoned + // Transition to Abandoned - should mark all pending actions as Abandoned bool Transitioned = Session.Abandon(); CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned"); CHECK(Session.GetSessionState() == zen::compute::ComputeServiceSession::SessionState::Abandoned); @@ -2109,7 +2024,7 @@ TEST_CASE("function.session.abandon_pending") auto Rejected = Session.EnqueueActionToQueue(QueueResult.QueueId, ActionObj, 0); CHECK_MESSAGE(!Rejected, "Expected action submission to be rejected in Abandoned state"); - // Abandoned → Sunset should be valid + // Abandoned -> Sunset should be valid CHECK(Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Sunset)); Session.Shutdown(); @@ -2138,7 +2053,6 @@ TEST_CASE("function.session.abandon_running") // Wait for scheduler to discover the runner Session.NotifyOrchestratorChanged(); - Sleep(2'000); // Create a queue and submit a long-running Sleep action auto QueueResult = Session.CreateQueue(); @@ -2151,9 +2065,9 @@ TEST_CASE("function.session.abandon_running") REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed"); // Wait for the action to start running on the remote - Sleep(2'000); + WaitForActionRunning(Session); - // Transition to Abandoned — should abandon the running action + // Transition to Abandoned - should abandon the running action bool Transitioned = Session.Abandon(); CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned"); CHECK(!Session.IsHealthy()); @@ -2209,7 +2123,6 @@ TEST_CASE("function.remote.abandon_propagation") // Wait for scheduler to discover the runner Session.NotifyOrchestratorChanged(); - Sleep(2'000); // Create a local queue and submit a long-running Sleep action auto QueueResult = Session.CreateQueue(); @@ -2222,9 +2135,9 @@ TEST_CASE("function.remote.abandon_propagation") REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed"); // Wait for the action to start running on the remote - Sleep(2'000); + WaitForActionRunning(Session); - // Transition to Abandoned — should abandon the running action and propagate + // Transition to Abandoned - should abandon the running action and propagate bool Transitioned = Session.Abandon(); CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned"); @@ -2282,7 +2195,6 @@ TEST_CASE("function.remote.shutdown_cancels_queues") Session.RegisterWorker(WorkerPackage); Session.NotifyOrchestratorChanged(); - Sleep(2'000); // Create a queue and submit a long-running action so the remote queue is established auto QueueResult = Session.CreateQueue(); @@ -2295,7 +2207,7 @@ TEST_CASE("function.remote.shutdown_cancels_queues") REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed"); // Wait for the action to start running on the remote - Sleep(2'000); + WaitForActionRunning(Session); // Verify the remote has a non-implicit queue before shutdown HttpClient RemoteClient(Instance.GetBaseUri() + "/compute"); @@ -2315,7 +2227,7 @@ TEST_CASE("function.remote.shutdown_cancels_queues") REQUIRE_MESSAGE(RemoteQueueFound, "Expected remote queue to exist before shutdown"); } - // Shut down the session — this should cancel all remote queues + // Shut down the session - this should cancel all remote queues Session.Shutdown(); // Verify the remote queue is now cancelled @@ -2357,7 +2269,6 @@ TEST_CASE("function.remote.shutdown_rejects_new_work") // Wait for runner discovery Session.NotifyOrchestratorChanged(); - Sleep(2'000); // Baseline: submit an action and verify it completes { @@ -2385,7 +2296,7 @@ TEST_CASE("function.remote.shutdown_rejects_new_work") CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv); } - // Shut down — the remote runner should now reject new work + // Shut down - the remote runner should now reject new work Session.Shutdown(); // Attempting to enqueue after shutdown should fail (session is in Sunset state) @@ -2414,7 +2325,7 @@ TEST_CASE("function.session.retract_pending") REQUIRE_MESSAGE(Enqueued, "Failed to enqueue action"); // Let the scheduler process the pending action - Sleep(500); + Sleep(50); // Retract the pending action auto Result = Session.RetractAction(Enqueued.Lsn); @@ -2423,7 +2334,7 @@ TEST_CASE("function.session.retract_pending") // The action should be re-enqueued as pending (still no runners, so stays pending). // Let the scheduler process the retracted action back to pending. - Sleep(500); + Sleep(50); // Queue should still show 1 active (the action was rescheduled, not completed) auto Status = Session.GetQueueStatus(QueueResult.QueueId); @@ -2475,7 +2386,7 @@ TEST_CASE("function.session.retract_not_terminal") REQUIRE_MESSAGE(Code == HttpResponseCode::OK, "Action did not complete within timeout"); - // Retract should fail — action already completed (no longer in pending/running maps) + // Retract should fail - action already completed (no longer in pending/running maps) auto RetractResult = Session.RetractAction(Enqueued.Lsn); CHECK(!RetractResult.Success); @@ -2501,15 +2412,15 @@ TEST_CASE("function.retract_http") HttpClient::Response BlockerResp = Client.Post(BlockerUrl, BuildSleepActionPackage("data"sv, 30'000)); REQUIRE_MESSAGE(BlockerResp, fmt::format("Blocker submission failed: status={}", BlockerResp.StatusCode)); - // Submit a second action — it will stay pending because the slot is occupied + // Submit a second action - it will stay pending because the slot is occupied HttpClient::Response SubmitResp = Client.Post(BlockerUrl, BuildRot13ActionPackage("Retract HTTP Test"sv)); REQUIRE_MESSAGE(SubmitResp, fmt::format("Job submission failed: status={}", SubmitResp.StatusCode)); const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from job submission"); - // Wait for the scheduler to process the pending action into m_PendingActions - Sleep(1'000); + // Wait for the blocker action to start running (occupying the single slot) + WaitForAnyActionRunningHttp(Client); // Retract the pending action via POST /jobs/{lsn}/retract const std::string RetractUrl = fmt::format("/jobs/{}/retract", Lsn); @@ -2528,7 +2439,7 @@ TEST_CASE("function.retract_http") } // A second retract should also succeed (action is back to pending) - Sleep(500); + Sleep(50); HttpClient::Response RetractResp2 = Client.Post(RetractUrl); CHECK_MESSAGE(RetractResp2.StatusCode == HttpResponseCode::OK, fmt::format("Second retract failed: status={}, body={}", RetractResp2.StatusCode, RetractResp2.ToText())); @@ -2553,13 +2464,13 @@ TEST_CASE("function.session.immediate_query_after_enqueue") auto EnqueueRes = Session.EnqueueAction(ActionObj, 0); REQUIRE_MESSAGE(EnqueueRes, "Failed to enqueue action"); - // Query by LSN immediately — must not return NotFound + // Query by LSN immediately - must not return NotFound CbPackage Result; HttpResponseCode Code = Session.GetActionResult(EnqueueRes.Lsn, Result); CHECK_MESSAGE(Code == HttpResponseCode::Accepted, fmt::format("GetActionResult returned {} immediately after enqueue, expected Accepted", Code)); - // Query by ActionId immediately — must not return NotFound + // Query by ActionId immediately - must not return NotFound const IoHash ActionId = ActionObj.GetHash(); CbPackage FindResult; HttpResponseCode FindCode = Session.FindActionResult(ActionId, FindResult); |