aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver-test/compute-tests.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-30 15:07:08 +0200
committerGitHub Enterprise <[email protected]>2026-03-30 15:07:08 +0200
commit3540d676733efaddecf504b30e9a596465bd43f8 (patch)
tree7a8d8b3d2da993e30c34e3ff36f659b90a2b228e /src/zenserver-test/compute-tests.cpp
parentinclude rawHash in structure output for builds ls command (#903) (diff)
downloadzen-3540d676733efaddecf504b30e9a596465bd43f8.tar.xz
zen-3540d676733efaddecf504b30e9a596465bd43f8.zip
Request validation and resilience improvements (#864)
### Security: Input validation & path safety - **Reject local file references by default** in package parsing — only allow when explicitly opted in by the service (`ParseFlags::kAllowLocalReferences`) and validated by an `ILocalRefPolicy` (fail-closed: no policy = rejected) - **`DataRootLocalRefPolicy`** restricts local ref paths to the server's data root via canonical path prefix matching - **Validate attachment hashes** in compute HTTP handlers — decompresses and re-hashes each attachment at ingestion time to reject tampered payloads - **Path traversal validation** for worker descriptions (`pathvalidation.h`) — rejects absolute paths, `..` components, Windows reserved device names, and invalid filename characters - **Harden CbPackage parsing** against corrupt inputs — overflow-safe attachment count, bounds checks on local ref offset/size, graceful failure instead of `ZEN_ASSERT` for untrusted data - **Harden legacy package parser** — reject zero-size binary fields, missing mappers, and optionally validate resolved attachment hashes - **Bounds check in `CbPackageReader::MarshalLocalChunkReference`** — detect when `MakeFromFile` silently clamps offset+size to file size ### Reliability: Lock consolidation & bug fixes - **Consolidate three action map locks into one** (`m_ActionMapLock`) — eliminates deadlock risk from multi-lock ordering, simplifies state transitions, and fixes a race where newly enqueued actions were briefly invisible to `GetActionResult`/`FindActionResult` - **Fix infinite loop in `BaseRunnerGroup::SubmitActions`** when actions exceed total runner capacity — cap round-robin at `TotalCapacity` and default unassigned results to "No capacity" - **Fix `MakeSafeAbsolutePathInPlace` for UNC paths** — `\server\share` now correctly becomes `\?\UNC\server\share` instead of `\?\server\share` - **Fix `max_retries=0`** — previously fell through to the default of 3; now correctly means "no retries" ### New: ManagedProcessRunner - Cross-platform process runner backed by `SubprocessManager` — uses async exit callbacks instead of polling, delegates CPU/memory metrics to the manager's built-in sampler - `ProcessGroup` (JobObject on Windows, process group on POSIX) for bulk cancellation on shutdown - `--managed` flag on `zen exec inproc` to select this runner - Refactored monitor thread lifecycle — `StartMonitorThread()` now called from derived constructors to avoid calling virtual functions from base constructor ### Process management - **Suppress crash dialogs** via `JOB_OBJECT_UILIMIT_ERRORMODE` + `SEM_NOGPFAULTERRORBOX` in both `WindowsProcessRunner` and `JobObject::Initialize` — prevents WER/Dr. Watson modal dialogs from blocking the monitor thread - **CREATE_SUSPENDED → AssignProcessToJobObject → ResumeThread** pattern in `WindowsProcessRunner` — ensures job object assignment before process execution - **Move stdout/stderr callbacks to `Spawn()` parameters** in `SubprocessManager` — prevents race where early output could be missed before callback installation - Consistent PID logging across all runner types ### Test infrastructure - **`zentest-appstub`**: Added `Fail` (configurable exit code) and `Crash` (abort / nullptr deref) test functions - **Compute integration tests**: exit code handling, auto-retry exhaustion, manual reschedule after failure, mixed success/failure queues, crash handling (abort + nullptr), crash auto-retry, immediate query visibility after enqueue - **Package format tests**: truncated header, bad magic, attachment count overflow, truncated data, local ref rejection/acceptance, policy enforcement (inside/outside root, traversal, no-policy fail-closed) - **Legacy package parser tests**: empty input, zero-size binary, hash resolution with/without mapper, hash mismatch detection - **UNC path tests** for `MakeSafeAbsolutePath` ### Misc - ANSI color helper macros (`ZEN_RED`, `ZEN_BRIGHT_WHITE`, etc.) and `ZEN_BOLD`/`ZEN_DIM`/etc. - Generic `fmt::formatter` for types with free `ToString` functions - Compute dashboard: truncated hash display with monospace font and hover for full value - Renamed `usonpackage_forcelink` → `cbpackage_forcelink` - Compute enabled by default in xmake config (releases still explicitly disable)
Diffstat (limited to 'src/zenserver-test/compute-tests.cpp')
-rw-r--r--src/zenserver-test/compute-tests.cpp631
1 files changed, 593 insertions, 38 deletions
diff --git a/src/zenserver-test/compute-tests.cpp b/src/zenserver-test/compute-tests.cpp
index 021052a3b..95541c3ce 100644
--- a/src/zenserver-test/compute-tests.cpp
+++ b/src/zenserver-test/compute-tests.cpp
@@ -21,6 +21,7 @@
# include <zenhttp/httpserver.h>
# include <zenhttp/websocket.h>
# include <zencompute/computeservice.h>
+# include <zencore/fmtutils.h>
# include <zenstore/zenstore.h>
# include <zenutil/zenserverprocess.h>
@@ -36,6 +37,8 @@ using namespace std::literals;
static constexpr std::string_view kBuildSystemVersion = "17fe280d-ccd8-4be8-a9d1-89c944a70969";
static constexpr std::string_view kRot13Version = "13131313-1313-1313-1313-131313131313";
static constexpr std::string_view kSleepVersion = "88888888-8888-8888-8888-888888888888";
+static constexpr std::string_view kFailVersion = "fa11fa11-fa11-fa11-fa11-fa11fa11fa11";
+static constexpr std::string_view kCrashVersion = "c4a50000-c4a5-c4a5-c4a5-c4a5c4a5c4a5";
// In-memory implementation of ChunkResolver for test use.
// Stores compressed data keyed by decompressed content hash.
@@ -104,6 +107,16 @@ RegisterWorker(HttpClient& Client, ZenServerEnvironment& Env)
<< "Sleep"sv;
WorkerWriter << "version"sv << Guid::FromString(kSleepVersion);
WorkerWriter.EndObject();
+ WorkerWriter.BeginObject();
+ WorkerWriter << "name"sv
+ << "Fail"sv;
+ WorkerWriter << "version"sv << Guid::FromString(kFailVersion);
+ WorkerWriter.EndObject();
+ WorkerWriter.BeginObject();
+ WorkerWriter << "name"sv
+ << "Crash"sv;
+ WorkerWriter << "version"sv << Guid::FromString(kCrashVersion);
+ WorkerWriter.EndObject();
WorkerWriter.EndArray();
CbPackage WorkerPackage;
@@ -115,7 +128,7 @@ RegisterWorker(HttpClient& Client, ZenServerEnvironment& Env)
const std::string WorkerUrl = fmt::format("/workers/{}", WorkerId.ToHexString());
HttpClient::Response RegisterResp = Client.Post(WorkerUrl, std::move(WorkerPackage));
REQUIRE_MESSAGE(RegisterResp,
- fmt::format("Worker registration failed: status={}, body={}", int(RegisterResp.StatusCode), RegisterResp.ToText()));
+ fmt::format("Worker registration failed: status={}, body={}", RegisterResp.StatusCode, RegisterResp.ToText()));
return WorkerId;
}
@@ -220,6 +233,83 @@ BuildSleepActionForSession(std::string_view Input, uint64_t SleepTimeMs, InMemor
return ActionWriter.Save();
}
+// Build a Fail action CbPackage. The worker exits with the given exit code.
+static CbPackage
+BuildFailActionPackage(int ExitCode)
+{
+ // The Fail function throws before reading inputs, but the action structure
+ // still requires a valid input attachment for the runner to manifest.
+ std::string_view Dummy = "x"sv;
+
+ CompressedBuffer InputCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Dummy.data(), Dummy.size()),
+ OodleCompressor::Selkie,
+ OodleCompressionLevel::HyperFast4);
+
+ const IoHash InputRawHash = InputCompressed.DecodeRawHash();
+ const uint64_t InputRawSize = Dummy.size();
+
+ CbAttachment InputAttachment(std::move(InputCompressed), InputRawHash);
+
+ CbObjectWriter ActionWriter;
+ ActionWriter << "Function"sv
+ << "Fail"sv;
+ ActionWriter << "FunctionVersion"sv << Guid::FromString(kFailVersion);
+ ActionWriter << "BuildSystemVersion"sv << Guid::FromString(kBuildSystemVersion);
+ ActionWriter.BeginObject("Inputs"sv);
+ ActionWriter.BeginObject("Source"sv);
+ ActionWriter.AddAttachment("RawHash"sv, InputAttachment);
+ ActionWriter << "RawSize"sv << InputRawSize;
+ ActionWriter.EndObject();
+ ActionWriter.EndObject();
+ ActionWriter.BeginObject("Constants"sv);
+ ActionWriter << "ExitCode"sv << static_cast<uint64_t>(ExitCode);
+ ActionWriter.EndObject();
+
+ CbPackage ActionPackage;
+ ActionPackage.SetObject(ActionWriter.Save());
+ ActionPackage.AddAttachment(InputAttachment);
+
+ return ActionPackage;
+}
+
+// Build a Crash action CbPackage. The worker process crashes hard.
+// Mode: "abort" (default) or "nullptr" (null pointer dereference).
+static CbPackage
+BuildCrashActionPackage(std::string_view Mode = "abort"sv)
+{
+ std::string_view Dummy = "x"sv;
+
+ CompressedBuffer InputCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Dummy.data(), Dummy.size()),
+ OodleCompressor::Selkie,
+ OodleCompressionLevel::HyperFast4);
+
+ const IoHash InputRawHash = InputCompressed.DecodeRawHash();
+ const uint64_t InputRawSize = Dummy.size();
+
+ CbAttachment InputAttachment(std::move(InputCompressed), InputRawHash);
+
+ CbObjectWriter ActionWriter;
+ ActionWriter << "Function"sv
+ << "Crash"sv;
+ ActionWriter << "FunctionVersion"sv << Guid::FromString(kCrashVersion);
+ ActionWriter << "BuildSystemVersion"sv << Guid::FromString(kBuildSystemVersion);
+ ActionWriter.BeginObject("Inputs"sv);
+ ActionWriter.BeginObject("Source"sv);
+ ActionWriter.AddAttachment("RawHash"sv, InputAttachment);
+ ActionWriter << "RawSize"sv << InputRawSize;
+ ActionWriter.EndObject();
+ ActionWriter.EndObject();
+ ActionWriter.BeginObject("Constants"sv);
+ ActionWriter << "Mode"sv << Mode;
+ ActionWriter.EndObject();
+
+ CbPackage ActionPackage;
+ ActionPackage.SetObject(ActionWriter.Save());
+ ActionPackage.AddAttachment(InputAttachment);
+
+ return ActionPackage;
+}
+
static HttpClient::Response
PollForResult(HttpClient& Client, const std::string& ResultUrl, uint64_t TimeoutMs = 30'000)
{
@@ -469,6 +559,16 @@ BuildWorkerPackage(ZenServerEnvironment& Env, InMemoryChunkResolver& Resolver)
<< "Sleep"sv;
WorkerWriter << "version"sv << Guid::FromString(kSleepVersion);
WorkerWriter.EndObject();
+ WorkerWriter.BeginObject();
+ WorkerWriter << "name"sv
+ << "Fail"sv;
+ WorkerWriter << "version"sv << Guid::FromString(kFailVersion);
+ WorkerWriter.EndObject();
+ WorkerWriter.BeginObject();
+ WorkerWriter << "name"sv
+ << "Crash"sv;
+ WorkerWriter << "version"sv << Guid::FromString(kCrashVersion);
+ WorkerWriter.EndObject();
WorkerWriter.EndArray();
CbPackage WorkerPackage;
@@ -526,7 +626,7 @@ TEST_CASE("function.rot13")
// Submit action via legacy /jobs/{worker} endpoint
const std::string JobUrl = fmt::format("/jobs/{}", WorkerId.ToHexString());
HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello World"sv));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText()));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from job submission");
@@ -536,7 +636,7 @@ TEST_CASE("function.rot13")
HttpClient::Response ResultResp = PollForResult(Client, ResultUrl);
REQUIRE_MESSAGE(
ResultResp.StatusCode == HttpResponseCode::OK,
- fmt::format("Job did not complete in time. Last status: {}\nServer log:\n{}", int(ResultResp.StatusCode), Instance.GetLogOutput()));
+ fmt::format("Job did not complete in time. Last status: {}\nServer log:\n{}", ResultResp.StatusCode, Instance.GetLogOutput()));
// Verify result: Rot13("Hello World") == "Uryyb Jbeyq"
CbPackage ResultPackage = ResultResp.AsPackage();
@@ -581,7 +681,7 @@ TEST_CASE("function.workers")
// GET /workers/{worker} — descriptor should match what was registered
const std::string WorkerUrl = fmt::format("/workers/{}", WorkerId.ToHexString());
HttpClient::Response DescResp = Client.Get(WorkerUrl);
- REQUIRE_MESSAGE(DescResp, fmt::format("Failed to get worker descriptor: status={}", int(DescResp.StatusCode)));
+ REQUIRE_MESSAGE(DescResp, fmt::format("Failed to get worker descriptor: status={}", DescResp.StatusCode));
CbObject Desc = DescResp.AsObject();
CHECK_EQ(Desc["buildsystem_version"sv].AsUuid(), Guid::FromString(kBuildSystemVersion));
@@ -627,7 +727,7 @@ TEST_CASE("function.queues.lifecycle")
// Create a queue
HttpClient::Response CreateResp = Client.Post("/queues"sv);
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}, body={}", int(CreateResp.StatusCode), CreateResp.ToText()));
+ REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}, body={}", CreateResp.StatusCode, CreateResp.ToText()));
const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id from queue creation");
@@ -651,8 +751,7 @@ TEST_CASE("function.queues.lifecycle")
// Submit action via queue-scoped endpoint
const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello World"sv));
- REQUIRE_MESSAGE(SubmitResp,
- fmt::format("Queue job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText()));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Queue job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from queue job submission");
@@ -668,9 +767,8 @@ TEST_CASE("function.queues.lifecycle")
// Retrieve result via queue-scoped /jobs/{lsn} endpoint
const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn);
HttpClient::Response ResultResp = Client.Get(ResultUrl);
- REQUIRE_MESSAGE(
- ResultResp.StatusCode == HttpResponseCode::OK,
- fmt::format("Failed to retrieve result: status={}\nServer log:\n{}", int(ResultResp.StatusCode), Instance.GetLogOutput()));
+ REQUIRE_MESSAGE(ResultResp.StatusCode == HttpResponseCode::OK,
+ fmt::format("Failed to retrieve result: status={}\nServer log:\n{}", ResultResp.StatusCode, Instance.GetLogOutput()));
// Verify result: Rot13("Hello World") == "Uryyb Jbeyq"
CbPackage ResultPackage = ResultResp.AsPackage();
@@ -712,13 +810,13 @@ TEST_CASE("function.queues.cancel")
// Submit a job
const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello World"sv));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText()));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
// Cancel the queue
const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
HttpClient::Response CancelResp = Client.Delete(QueueUrl);
REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent,
- fmt::format("Queue cancellation failed: status={}, body={}", int(CancelResp.StatusCode), CancelResp.ToText()));
+ fmt::format("Queue cancellation failed: status={}, body={}", CancelResp.StatusCode, CancelResp.ToText()));
// Verify queue status shows cancelled
HttpClient::Response StatusResp = Client.Get(QueueUrl);
@@ -743,7 +841,7 @@ TEST_CASE("function.queues.remote")
// Create a remote queue — response includes both an integer queue_id and an OID queue_token
HttpClient::Response CreateResp = Client.Post("/queues/remote"sv);
REQUIRE_MESSAGE(CreateResp,
- fmt::format("Remote queue creation failed: status={}, body={}", int(CreateResp.StatusCode), CreateResp.ToText()));
+ fmt::format("Remote queue creation failed: status={}, body={}", CreateResp.StatusCode, CreateResp.ToText()));
CbObject CreateObj = CreateResp.AsObject();
const std::string QueueToken = std::string(CreateObj["queue_token"sv].AsString());
@@ -753,7 +851,7 @@ TEST_CASE("function.queues.remote")
const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueToken, WorkerId.ToHexString());
HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello World"sv));
REQUIRE_MESSAGE(SubmitResp,
- fmt::format("Remote queue job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText()));
+ fmt::format("Remote queue job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from remote queue job submission");
@@ -769,7 +867,7 @@ TEST_CASE("function.queues.remote")
HttpClient::Response ResultResp = Client.Get(ResultUrl);
REQUIRE_MESSAGE(ResultResp.StatusCode == HttpResponseCode::OK,
fmt::format("Failed to retrieve result from remote queue: status={}\nServer log:\n{}",
- int(ResultResp.StatusCode),
+ ResultResp.StatusCode,
Instance.GetLogOutput()));
// Verify result: Rot13("Hello World") == "Uryyb Jbeyq"
@@ -801,8 +899,7 @@ TEST_CASE("function.queues.cancel_running")
// Submit a Sleep job long enough that it will still be running when we cancel
const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildSleepActionPackage("data"sv, 30'000));
- REQUIRE_MESSAGE(SubmitResp,
- fmt::format("Sleep job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText()));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Sleep job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission");
@@ -814,7 +911,7 @@ TEST_CASE("function.queues.cancel_running")
const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
HttpClient::Response CancelResp = Client.Delete(QueueUrl);
REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent,
- fmt::format("Queue cancellation failed: status={}, body={}", int(CancelResp.StatusCode), CancelResp.ToText()));
+ fmt::format("Queue cancellation failed: status={}, body={}", CancelResp.StatusCode, CancelResp.ToText()));
// The cancelled job should appear in the /completed endpoint once the process exits
const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
@@ -849,7 +946,7 @@ TEST_CASE("function.queues.remote_cancel")
// Create a remote queue to obtain an OID token for token-addressed cancellation
HttpClient::Response CreateResp = Client.Post("/queues/remote"sv);
REQUIRE_MESSAGE(CreateResp,
- fmt::format("Remote queue creation failed: status={}, body={}", int(CreateResp.StatusCode), CreateResp.ToText()));
+ fmt::format("Remote queue creation failed: status={}, body={}", CreateResp.StatusCode, CreateResp.ToText()));
const std::string QueueToken = std::string(CreateResp.AsObject()["queue_token"sv].AsString());
REQUIRE_MESSAGE(!QueueToken.empty(), "Expected non-empty queue_token from remote queue creation");
@@ -857,8 +954,7 @@ TEST_CASE("function.queues.remote_cancel")
// Submit a long-running Sleep job via the token-addressed endpoint
const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueToken, WorkerId.ToHexString());
HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildSleepActionPackage("data"sv, 30'000));
- REQUIRE_MESSAGE(SubmitResp,
- fmt::format("Sleep job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText()));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Sleep job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission");
@@ -870,7 +966,7 @@ TEST_CASE("function.queues.remote_cancel")
const std::string QueueUrl = fmt::format("/queues/{}", QueueToken);
HttpClient::Response CancelResp = Client.Delete(QueueUrl);
REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent,
- fmt::format("Remote queue cancellation failed: status={}, body={}", int(CancelResp.StatusCode), CancelResp.ToText()));
+ fmt::format("Remote queue cancellation failed: status={}, body={}", CancelResp.StatusCode, CancelResp.ToText()));
// The cancelled job should appear in the token-addressed /completed endpoint
const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueToken);
@@ -910,13 +1006,13 @@ TEST_CASE("function.queues.drain")
// Submit a long-running job so we can verify it completes even after drain
const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
HttpClient::Response Submit1 = Client.Post(JobUrl, BuildSleepActionPackage("data"sv, 2'000));
- REQUIRE_MESSAGE(Submit1, fmt::format("First job submission failed: status={}", int(Submit1.StatusCode)));
+ REQUIRE_MESSAGE(Submit1, fmt::format("First job submission failed: status={}", Submit1.StatusCode));
const int Lsn1 = Submit1.AsObject()["lsn"sv].AsInt32();
// Drain the queue
const std::string DrainUrl = fmt::format("/queues/{}/drain", QueueId);
HttpClient::Response DrainResp = Client.Post(DrainUrl);
- REQUIRE_MESSAGE(DrainResp, fmt::format("Drain failed: status={}, body={}", int(DrainResp.StatusCode), DrainResp.ToText()));
+ REQUIRE_MESSAGE(DrainResp, fmt::format("Drain failed: status={}, body={}", DrainResp.StatusCode, DrainResp.ToText()));
CHECK_EQ(std::string(DrainResp.AsObject()["state"sv].AsString()), "draining");
// Second submission should be rejected with 424
@@ -965,7 +1061,7 @@ TEST_CASE("function.priority")
// jobs by priority when the slot becomes free.
const std::string BlockerJobUrl = fmt::format("/queues/{}/jobs/{}?priority=0", QueueId, WorkerId.ToHexString());
HttpClient::Response BlockerResp = Client.Post(BlockerJobUrl, BuildSleepActionPackage("data"sv, 1'000));
- REQUIRE_MESSAGE(BlockerResp, fmt::format("Blocker job submission failed: status={}", int(BlockerResp.StatusCode)));
+ REQUIRE_MESSAGE(BlockerResp, fmt::format("Blocker job submission failed: status={}", BlockerResp.StatusCode));
// Submit 3 low-priority Rot13 jobs
const std::string LowJobUrl = fmt::format("/queues/{}/jobs/{}?priority=0", QueueId, WorkerId.ToHexString());
@@ -1104,6 +1200,432 @@ TEST_CASE("function.priority")
}
//////////////////////////////////////////////////////////////////////////
+// Process exit code tests
+//
+// These tests exercise how the compute service handles worker processes
+// that exit with non-zero exit codes, including retry behaviour and
+// final failure reporting.
+
+TEST_CASE("function.exit_code.failed_action")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Create a queue with max_retries=0 so the action fails immediately
+ // without being rescheduled.
+ CbObjectWriter ConfigWriter;
+ ConfigWriter << "max_retries"sv << 0;
+
+ CbObjectWriter BodyWriter;
+ BodyWriter << "config"sv << ConfigWriter.Save();
+
+ HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
+ REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+
+ // Submit a Fail action with exit code 42
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(42));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
+
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Fail job submission");
+
+ // Poll for the LSN to appear in the completed list
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
+
+ // Verify queue status reflects the failure
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+
+ // Verify action history records the failure
+ const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
+ HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
+ REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
+
+ bool FoundInHistory = false;
+ for (auto& Item : HistoryResp.AsObject()["history"sv])
+ {
+ if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ {
+ CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
+ FoundInHistory = true;
+ break;
+ }
+ }
+ CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn));
+
+ // GET /jobs/{lsn} for a failed action should return OK but with an empty result package
+ const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn);
+ HttpClient::Response ResultResp = Client.Get(ResultUrl);
+ CHECK_EQ(ResultResp.StatusCode, HttpResponseCode::OK);
+}
+
+TEST_CASE("function.exit_code.auto_retry")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Create a queue with max_retries=2 so the action is retried twice before
+ // being reported as failed (3 total attempts: initial + 2 retries).
+ CbObjectWriter ConfigWriter;
+ ConfigWriter << "max_retries"sv << 2;
+
+ CbObjectWriter BodyWriter;
+ BodyWriter << "config"sv << ConfigWriter.Save();
+
+ HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
+ REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+
+ // Submit a Fail action — the worker process will exit with code 1 on
+ // every attempt, eventually exhausting retries.
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(1));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode));
+
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+
+ // Poll for the LSN to appear in the completed list — this only
+ // happens after all retries are exhausted.
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
+ fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
+
+ // Verify the action history records the retry count
+ const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
+ HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
+ REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
+
+ for (auto& Item : HistoryResp.AsObject()["history"sv])
+ {
+ if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ {
+ CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
+ CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 2);
+ break;
+ }
+ }
+
+ // Queue should show 1 failed, 0 completed
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+}
+
+TEST_CASE("function.exit_code.reschedule_failed")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Create a queue with max_retries=1 so we have room for one manual reschedule
+ CbObjectWriter ConfigWriter;
+ ConfigWriter << "max_retries"sv << 1;
+
+ CbObjectWriter BodyWriter;
+ BodyWriter << "config"sv << ConfigWriter.Save();
+
+ HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
+ REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+
+ // Submit a Fail action — auto-retry will fire once, then it lands in results as Failed
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(7));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode));
+
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+
+ // Wait for the action to exhaust its auto-retry and land in completed
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
+ fmt::format("LSN {} did not appear in queue completed list\nServer log:\n{}", Lsn, Instance.GetLogOutput()));
+
+ // Try to manually reschedule — should fail because retry limit is reached
+ const std::string RescheduleUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn);
+ HttpClient::Response RescheduleResp = Client.Post(RescheduleUrl);
+ CHECK_EQ(RescheduleResp.StatusCode, HttpResponseCode::Conflict);
+}
+
+TEST_CASE("function.exit_code.mixed_success_and_failure")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Create a queue with max_retries=0 for fast failure
+ CbObjectWriter ConfigWriter;
+ ConfigWriter << "max_retries"sv << 0;
+
+ CbObjectWriter BodyWriter;
+ BodyWriter << "config"sv << ConfigWriter.Save();
+
+ HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
+ REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+
+ // Submit one Rot13 (success) and one Fail (failure)
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+
+ HttpClient::Response SuccessResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello"sv));
+ REQUIRE_MESSAGE(SuccessResp, "Rot13 job submission failed");
+ const int LsnSuccess = SuccessResp.AsObject()["lsn"sv].AsInt32();
+
+ HttpClient::Response FailResp = Client.Post(JobUrl, BuildFailActionPackage(1));
+ REQUIRE_MESSAGE(FailResp, "Fail job submission failed");
+ const int LsnFail = FailResp.AsObject()["lsn"sv].AsInt32();
+
+ // Wait for both to appear in the completed list
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnSuccess),
+ fmt::format("Success LSN {} did not complete\nServer log:\n{}", LsnSuccess, Instance.GetLogOutput()));
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnFail),
+ fmt::format("Fail LSN {} did not complete\nServer log:\n{}", LsnFail, Instance.GetLogOutput()));
+
+ // Verify queue counters
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0);
+}
+
+TEST_CASE("function.crash.abort")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Create a queue with max_retries=0 so we don't wait through retries
+ CbObjectWriter ConfigWriter;
+ ConfigWriter << "max_retries"sv << 0;
+
+ CbObjectWriter BodyWriter;
+ BodyWriter << "config"sv << ConfigWriter.Save();
+
+ HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
+ REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+
+ // Submit a Crash action that calls std::abort()
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
+
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission");
+
+ // Poll for the LSN to appear in the completed list
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
+
+ // Verify queue status reflects the failure
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+
+ // Verify action history records the failure
+ const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
+ HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
+ REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
+
+ bool FoundInHistory = false;
+ for (auto& Item : HistoryResp.AsObject()["history"sv])
+ {
+ if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ {
+ CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
+ FoundInHistory = true;
+ break;
+ }
+ }
+ CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn));
+}
+
+TEST_CASE("function.crash.nullptr")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Create a queue with max_retries=0
+ CbObjectWriter ConfigWriter;
+ ConfigWriter << "max_retries"sv << 0;
+
+ CbObjectWriter BodyWriter;
+ BodyWriter << "config"sv << ConfigWriter.Save();
+
+ HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
+ REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+
+ // Submit a Crash action that dereferences a null pointer
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("nullptr"sv));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
+
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission");
+
+ // Poll for the LSN to appear in the completed list
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
+
+ // Verify queue status reflects the failure
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+}
+
+TEST_CASE("function.crash.auto_retry")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Create a queue with max_retries=1 — the crash should be retried once
+ // before being reported as permanently failed.
+ CbObjectWriter ConfigWriter;
+ ConfigWriter << "max_retries"sv << 1;
+
+ CbObjectWriter BodyWriter;
+ BodyWriter << "config"sv << ConfigWriter.Save();
+
+ HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
+ REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+
+ // Submit a Crash action — will crash on every attempt
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}", SubmitResp.StatusCode));
+
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+
+ // Poll for the LSN to appear in the completed list after retries exhaust
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
+ fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
+
+ // Verify the action history records the retry count
+ const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
+ HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
+ REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
+
+ for (auto& Item : HistoryResp.AsObject()["history"sv])
+ {
+ if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ {
+ CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
+ CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 1);
+ break;
+ }
+ }
+
+ // Queue should show 1 failed, 0 completed
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+}
+
+//////////////////////////////////////////////////////////////////////////
// Remote worker synchronization tests
//
// These tests exercise the orchestrator discovery path where new compute
@@ -1162,9 +1684,8 @@ TEST_CASE("function.remote.worker_sync_on_discovery")
Sleep(200);
}
- REQUIRE_MESSAGE(
- ResultCode == HttpResponseCode::OK,
- fmt::format("Action did not complete in time. Last status: {}\nServer log:\n{}", int(ResultCode), Instance.GetLogOutput()));
+ REQUIRE_MESSAGE(ResultCode == HttpResponseCode::OK,
+ fmt::format("Action did not complete in time. Last status: {}\nServer log:\n{}", ResultCode, Instance.GetLogOutput()));
REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Empty result package\nServer log:\n{}", Instance.GetLogOutput()));
@@ -1349,9 +1870,8 @@ TEST_CASE("function.remote.queue_association")
Sleep(200);
}
- REQUIRE_MESSAGE(
- ResultCode == HttpResponseCode::OK,
- fmt::format("Action did not complete in time. Last status: {}\nServer log:\n{}", int(ResultCode), Instance.GetLogOutput()));
+ REQUIRE_MESSAGE(ResultCode == HttpResponseCode::OK,
+ fmt::format("Action did not complete in time. Last status: {}\nServer log:\n{}", ResultCode, Instance.GetLogOutput()));
REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Empty result package\nServer log:\n{}", Instance.GetLogOutput()));
CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv);
@@ -1481,7 +2001,7 @@ TEST_CASE("function.abandon_running_http")
const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildSleepActionPackage("data"sv, 30'000));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Sleep job submission failed: status={}", int(SubmitResp.StatusCode)));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Sleep job submission failed: status={}", SubmitResp.StatusCode));
const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN");
@@ -1498,7 +2018,7 @@ TEST_CASE("function.abandon_running_http")
// Trigger abandon via the HTTP endpoint
HttpClient::Response AbandonResp = Client.Post("/abandon"sv);
REQUIRE_MESSAGE(AbandonResp.StatusCode == HttpResponseCode::OK,
- fmt::format("Abandon request failed: status={}, body={}", int(AbandonResp.StatusCode), AbandonResp.ToText()));
+ fmt::format("Abandon request failed: status={}, body={}", AbandonResp.StatusCode, AbandonResp.ToText()));
// Ready endpoint should now return 503
{
@@ -1577,7 +2097,7 @@ TEST_CASE("function.session.abandon_pending")
}
Sleep(100);
}
- CHECK_MESSAGE(Code == HttpResponseCode::OK, fmt::format("Expected action LSN {} to be in results (got {})", Lsn, int(Code)));
+ CHECK_MESSAGE(Code == HttpResponseCode::OK, fmt::format("Expected action LSN {} to be in results (got {})", Lsn, Code));
}
// Queue should show 0 active, 3 abandoned
@@ -1979,11 +2499,11 @@ TEST_CASE("function.retract_http")
// Submit a long-running Sleep action to occupy the single execution slot
const std::string BlockerUrl = fmt::format("/jobs/{}", WorkerId.ToHexString());
HttpClient::Response BlockerResp = Client.Post(BlockerUrl, BuildSleepActionPackage("data"sv, 30'000));
- REQUIRE_MESSAGE(BlockerResp, fmt::format("Blocker submission failed: status={}", int(BlockerResp.StatusCode)));
+ REQUIRE_MESSAGE(BlockerResp, fmt::format("Blocker submission failed: status={}", BlockerResp.StatusCode));
// Submit a second action — it will stay pending because the slot is occupied
HttpClient::Response SubmitResp = Client.Post(BlockerUrl, BuildRot13ActionPackage("Retract HTTP Test"sv));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Job submission failed: status={}", int(SubmitResp.StatusCode)));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Job submission failed: status={}", SubmitResp.StatusCode));
const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from job submission");
@@ -1996,7 +2516,7 @@ TEST_CASE("function.retract_http")
HttpClient::Response RetractResp = Client.Post(RetractUrl);
CHECK_MESSAGE(RetractResp.StatusCode == HttpResponseCode::OK,
fmt::format("Retract failed: status={}, body={}\nServer log:\n{}",
- int(RetractResp.StatusCode),
+ RetractResp.StatusCode,
RetractResp.ToText(),
Instance.GetLogOutput()));
@@ -2011,7 +2531,42 @@ TEST_CASE("function.retract_http")
Sleep(500);
HttpClient::Response RetractResp2 = Client.Post(RetractUrl);
CHECK_MESSAGE(RetractResp2.StatusCode == HttpResponseCode::OK,
- fmt::format("Second retract failed: status={}, body={}", int(RetractResp2.StatusCode), RetractResp2.ToText()));
+ fmt::format("Second retract failed: status={}, body={}", RetractResp2.StatusCode, RetractResp2.ToText()));
+}
+
+TEST_CASE("function.session.immediate_query_after_enqueue")
+{
+ // Verify that actions are immediately visible to GetActionResult and
+ // FindActionResult right after enqueue, without waiting for the
+ // scheduler thread to process the update.
+
+ InMemoryChunkResolver Resolver;
+ ScopedTemporaryDirectory SessionBaseDir;
+ zen::compute::ComputeServiceSession Session(Resolver);
+ Session.Ready();
+
+ CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
+ Session.RegisterWorker(WorkerPackage);
+
+ CbObject ActionObj = BuildRot13ActionForSession("immediate-query"sv, Resolver);
+
+ auto EnqueueRes = Session.EnqueueAction(ActionObj, 0);
+ REQUIRE_MESSAGE(EnqueueRes, "Failed to enqueue action");
+
+ // Query by LSN immediately — must not return NotFound
+ CbPackage Result;
+ HttpResponseCode Code = Session.GetActionResult(EnqueueRes.Lsn, Result);
+ CHECK_MESSAGE(Code == HttpResponseCode::Accepted,
+ fmt::format("GetActionResult returned {} immediately after enqueue, expected Accepted", Code));
+
+ // Query by ActionId immediately — must not return NotFound
+ const IoHash ActionId = ActionObj.GetHash();
+ CbPackage FindResult;
+ HttpResponseCode FindCode = Session.FindActionResult(ActionId, FindResult);
+ CHECK_MESSAGE(FindCode == HttpResponseCode::Accepted,
+ fmt::format("FindActionResult returned {} immediately after enqueue, expected Accepted", FindCode));
+
+ Session.Shutdown();
}
TEST_SUITE_END();