aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver-test/compute-tests.cpp
diff options
context:
space:
mode:
authorLiam Mitchell <[email protected]>2026-03-09 19:06:36 -0700
committerLiam Mitchell <[email protected]>2026-03-09 19:06:36 -0700
commitd1abc50ee9d4fb72efc646e17decafea741caa34 (patch)
treee4288e00f2f7ca0391b83d986efcb69d3ba66a83 /src/zenserver-test/compute-tests.cpp
parentAllow requests with invalid content-types unless specified in command line or... (diff)
parentupdated chunk–block analyser (#818) (diff)
downloadzen-d1abc50ee9d4fb72efc646e17decafea741caa34.tar.xz
zen-d1abc50ee9d4fb72efc646e17decafea741caa34.zip
Merge branch 'main' into lm/restrict-content-type
Diffstat (limited to 'src/zenserver-test/compute-tests.cpp')
-rw-r--r--src/zenserver-test/compute-tests.cpp1700
1 files changed, 1700 insertions, 0 deletions
diff --git a/src/zenserver-test/compute-tests.cpp b/src/zenserver-test/compute-tests.cpp
new file mode 100644
index 000000000..c90ac5d8b
--- /dev/null
+++ b/src/zenserver-test/compute-tests.cpp
@@ -0,0 +1,1700 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zencore/zencore.h>
+
+#if ZEN_WITH_TESTS && ZEN_WITH_COMPUTE_SERVICES
+
+# include <zenbase/zenbase.h>
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/compress.h>
+# include <zencore/filesystem.h>
+# include <zencore/guid.h>
+# include <zencore/iobuffer.h>
+# include <zencore/iohash.h>
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+# include <zencore/thread.h>
+# include <zencore/timer.h>
+# include <zenhttp/httpclient.h>
+# include <zenhttp/httpserver.h>
+# include <zencompute/computeservice.h>
+# include <zenstore/zenstore.h>
+# include <zenutil/zenserverprocess.h>
+
+# include "zenserver-test.h"
+
+# include <thread>
+
+namespace zen::tests::compute {
+
+using namespace std::literals;
+
+// BuildSystemVersion and function version GUIDs matching zentest-appstub
+static constexpr std::string_view kBuildSystemVersion = "17fe280d-ccd8-4be8-a9d1-89c944a70969";
+static constexpr std::string_view kRot13Version = "13131313-1313-1313-1313-131313131313";
+static constexpr std::string_view kSleepVersion = "88888888-8888-8888-8888-888888888888";
+
+// In-memory implementation of ChunkResolver for test use.
+// Stores compressed data keyed by decompressed content hash.
+class InMemoryChunkResolver : public ChunkResolver
+{
+public:
+ IoBuffer FindChunkByCid(const IoHash& DecompressedId) override
+ {
+ auto It = m_Chunks.find(DecompressedId);
+ if (It != m_Chunks.end())
+ {
+ return It->second;
+ }
+ return {};
+ }
+
+ void AddChunk(const IoHash& DecompressedId, IoBuffer Data) { m_Chunks[DecompressedId] = std::move(Data); }
+
+private:
+ std::unordered_map<IoHash, IoBuffer> m_Chunks;
+};
+
+// Read, compress, and register zentest-appstub as a worker.
+// Returns the WorkerId (hash of the worker package object).
+static IoHash
+RegisterWorker(HttpClient& Client, ZenServerEnvironment& Env)
+{
+ std::filesystem::path AppStubPath = Env.ProgramBaseDir() / ("zentest-appstub" ZEN_EXE_SUFFIX_LITERAL);
+
+ FileContents AppStubData = zen::ReadFile(AppStubPath);
+ REQUIRE_MESSAGE(!AppStubData.ErrorCode, fmt::format("Failed to read '{}': {}", AppStubPath.string(), AppStubData.ErrorCode.message()));
+
+ IoBuffer AppStubBuffer = AppStubData.Flatten();
+
+ CompressedBuffer AppStubCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(AppStubBuffer.GetData(), AppStubBuffer.Size()),
+ OodleCompressor::Selkie,
+ OodleCompressionLevel::HyperFast4);
+
+ const IoHash AppStubRawHash = AppStubCompressed.DecodeRawHash();
+ const uint64_t AppStubRawSize = AppStubBuffer.Size();
+
+ CbAttachment AppStubAttachment(std::move(AppStubCompressed), AppStubRawHash);
+
+ CbObjectWriter WorkerWriter;
+ WorkerWriter << "buildsystem_version"sv << Guid::FromString(kBuildSystemVersion);
+ WorkerWriter << "path"sv
+ << "zentest-appstub"sv;
+
+ WorkerWriter.BeginArray("executables"sv);
+ WorkerWriter.BeginObject();
+ WorkerWriter << "name"sv
+ << "zentest-appstub"sv;
+ WorkerWriter.AddAttachment("hash"sv, AppStubAttachment);
+ WorkerWriter << "size"sv << AppStubRawSize;
+ WorkerWriter.EndObject();
+ WorkerWriter.EndArray();
+
+ WorkerWriter.BeginArray("functions"sv);
+ WorkerWriter.BeginObject();
+ WorkerWriter << "name"sv
+ << "Rot13"sv;
+ WorkerWriter << "version"sv << Guid::FromString(kRot13Version);
+ WorkerWriter.EndObject();
+ WorkerWriter.BeginObject();
+ WorkerWriter << "name"sv
+ << "Sleep"sv;
+ WorkerWriter << "version"sv << Guid::FromString(kSleepVersion);
+ WorkerWriter.EndObject();
+ WorkerWriter.EndArray();
+
+ CbPackage WorkerPackage;
+ WorkerPackage.SetObject(WorkerWriter.Save());
+ WorkerPackage.AddAttachment(AppStubAttachment);
+
+ const IoHash WorkerId = WorkerPackage.GetObjectHash();
+
+ const std::string WorkerUrl = fmt::format("/workers/{}", WorkerId.ToHexString());
+ HttpClient::Response RegisterResp = Client.Post(WorkerUrl, std::move(WorkerPackage));
+ REQUIRE_MESSAGE(RegisterResp,
+ fmt::format("Worker registration failed: status={}, body={}", int(RegisterResp.StatusCode), RegisterResp.ToText()));
+
+ return WorkerId;
+}
+
+// Build a Rot13 action CbPackage for the given input string.
+static CbPackage
+BuildRot13ActionPackage(std::string_view Input)
+{
+ CompressedBuffer InputCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Input.data(), Input.size()),
+ OodleCompressor::Selkie,
+ OodleCompressionLevel::HyperFast4);
+
+ const IoHash InputRawHash = InputCompressed.DecodeRawHash();
+ const uint64_t InputRawSize = Input.size();
+
+ CbAttachment InputAttachment(std::move(InputCompressed), InputRawHash);
+
+ CbObjectWriter ActionWriter;
+ ActionWriter << "Function"sv
+ << "Rot13"sv;
+ ActionWriter << "FunctionVersion"sv << Guid::FromString(kRot13Version);
+ ActionWriter << "BuildSystemVersion"sv << Guid::FromString(kBuildSystemVersion);
+ ActionWriter.BeginObject("Inputs"sv);
+ ActionWriter.BeginObject("Source"sv);
+ ActionWriter.AddAttachment("RawHash"sv, InputAttachment);
+ ActionWriter << "RawSize"sv << InputRawSize;
+ ActionWriter.EndObject();
+ ActionWriter.EndObject();
+
+ CbPackage ActionPackage;
+ ActionPackage.SetObject(ActionWriter.Save());
+ ActionPackage.AddAttachment(InputAttachment);
+
+ return ActionPackage;
+}
+
+// Build a Sleep action CbPackage. The worker sleeps for SleepTimeMs before returning its input.
+static CbPackage
+BuildSleepActionPackage(std::string_view Input, uint64_t SleepTimeMs)
+{
+ CompressedBuffer InputCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Input.data(), Input.size()),
+ OodleCompressor::Selkie,
+ OodleCompressionLevel::HyperFast4);
+
+ const IoHash InputRawHash = InputCompressed.DecodeRawHash();
+ const uint64_t InputRawSize = Input.size();
+
+ CbAttachment InputAttachment(std::move(InputCompressed), InputRawHash);
+
+ CbObjectWriter ActionWriter;
+ ActionWriter << "Function"sv
+ << "Sleep"sv;
+ ActionWriter << "FunctionVersion"sv << Guid::FromString(kSleepVersion);
+ ActionWriter << "BuildSystemVersion"sv << Guid::FromString(kBuildSystemVersion);
+ ActionWriter.BeginObject("Inputs"sv);
+ ActionWriter.BeginObject("Source"sv);
+ ActionWriter.AddAttachment("RawHash"sv, InputAttachment);
+ ActionWriter << "RawSize"sv << InputRawSize;
+ ActionWriter.EndObject();
+ ActionWriter.EndObject();
+ ActionWriter.BeginObject("Constants"sv);
+ ActionWriter << "SleepTimeMs"sv << SleepTimeMs;
+ ActionWriter.EndObject();
+
+ CbPackage ActionPackage;
+ ActionPackage.SetObject(ActionWriter.Save());
+ ActionPackage.AddAttachment(InputAttachment);
+
+ return ActionPackage;
+}
+
+// Build a Sleep action CbObject and populate the chunk resolver with the input attachment.
+static CbObject
+BuildSleepActionForSession(std::string_view Input, uint64_t SleepTimeMs, InMemoryChunkResolver& Resolver)
+{
+ CompressedBuffer InputCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Input.data(), Input.size()),
+ OodleCompressor::Selkie,
+ OodleCompressionLevel::HyperFast4);
+
+ const IoHash InputRawHash = InputCompressed.DecodeRawHash();
+ const uint64_t InputRawSize = Input.size();
+
+ Resolver.AddChunk(InputRawHash, InputCompressed.GetCompressed().Flatten().AsIoBuffer());
+
+ CbAttachment InputAttachment(std::move(InputCompressed), InputRawHash);
+
+ CbObjectWriter ActionWriter;
+ ActionWriter << "Function"sv
+ << "Sleep"sv;
+ ActionWriter << "FunctionVersion"sv << Guid::FromString(kSleepVersion);
+ ActionWriter << "BuildSystemVersion"sv << Guid::FromString(kBuildSystemVersion);
+ ActionWriter.BeginObject("Inputs"sv);
+ ActionWriter.BeginObject("Source"sv);
+ ActionWriter.AddAttachment("RawHash"sv, InputAttachment);
+ ActionWriter << "RawSize"sv << InputRawSize;
+ ActionWriter.EndObject();
+ ActionWriter.EndObject();
+ ActionWriter.BeginObject("Constants"sv);
+ ActionWriter << "SleepTimeMs"sv << SleepTimeMs;
+ ActionWriter.EndObject();
+
+ return ActionWriter.Save();
+}
+
+static HttpClient::Response
+PollForResult(HttpClient& Client, const std::string& ResultUrl, uint64_t TimeoutMs = 30'000)
+{
+ HttpClient::Response Resp;
+ Stopwatch Timer;
+
+ while (Timer.GetElapsedTimeMs() < TimeoutMs)
+ {
+ Resp = Client.Get(ResultUrl);
+
+ if (Resp.StatusCode == HttpResponseCode::OK)
+ {
+ break;
+ }
+
+ Sleep(100);
+ }
+
+ return Resp;
+}
+
+static bool
+PollForLsnInCompleted(HttpClient& Client, const std::string& CompletedUrl, int Lsn, uint64_t TimeoutMs = 30'000)
+{
+ Stopwatch Timer;
+
+ while (Timer.GetElapsedTimeMs() < TimeoutMs)
+ {
+ HttpClient::Response Resp = Client.Get(CompletedUrl);
+
+ if (Resp)
+ {
+ for (auto& Item : Resp.AsObject()["completed"sv])
+ {
+ if (Item.AsInt32() == Lsn)
+ {
+ return true;
+ }
+ }
+ }
+
+ Sleep(100);
+ }
+
+ return false;
+}
+
+static std::string
+GetRot13Output(const CbPackage& ResultPackage)
+{
+ CbObject ResultObj = ResultPackage.GetObject();
+
+ IoHash OutputHash;
+ CbFieldView ValuesField = ResultObj["Values"sv];
+
+ if (CbFieldViewIterator It = begin(ValuesField); It.HasValue())
+ {
+ OutputHash = (*It).AsObjectView()["RawHash"sv].AsHash();
+ }
+
+ REQUIRE_MESSAGE(OutputHash != IoHash::Zero, "Expected non-zero output hash in result Values array");
+
+ const CbAttachment* OutputAttachment = ResultPackage.FindAttachment(OutputHash);
+ REQUIRE_MESSAGE(OutputAttachment != nullptr, "Output attachment not found in result package");
+
+ CompressedBuffer OutputCompressed = OutputAttachment->AsCompressedBinary();
+ SharedBuffer OutputData = OutputCompressed.Decompress();
+
+ return std::string(static_cast<const char*>(OutputData.GetData()), OutputData.GetSize());
+}
+
+// Mock orchestrator HTTP service that serves GET /orch/agents with a controllable response.
+class MockOrchestratorService : public HttpService
+{
+public:
+ MockOrchestratorService()
+ {
+ // Initialize with empty worker list
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("workers"sv);
+ Cbo.EndArray();
+ m_WorkerList = Cbo.Save();
+ }
+
+ const char* BaseUri() const override { return "/orch/"; }
+
+ void HandleRequest(HttpServerRequest& Request) override
+ {
+ if (Request.RequestVerb() == HttpVerb::kGet && Request.RelativeUri() == "agents"sv)
+ {
+ RwLock::SharedLockScope Lock(m_Lock);
+ Request.WriteResponse(HttpResponseCode::OK, m_WorkerList);
+ return;
+ }
+ Request.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ void SetWorkerList(CbObject WorkerList)
+ {
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+ m_WorkerList = std::move(WorkerList);
+ }
+
+private:
+ RwLock m_Lock;
+ CbObject m_WorkerList;
+};
+
+// Manages in-process ASIO HTTP server lifecycle for mock orchestrator.
+struct MockOrchestratorFixture
+{
+ MockOrchestratorService Service;
+ ScopedTemporaryDirectory TmpDir;
+ Ref<HttpServer> Server;
+ std::thread ServerThread;
+ uint16_t Port = 0;
+
+ MockOrchestratorFixture()
+ {
+ HttpServerConfig Config;
+ Config.ServerClass = "asio";
+ Config.ForceLoopback = true;
+ Server = CreateHttpServer(Config);
+ Server->RegisterService(Service);
+ Port = static_cast<uint16_t>(Server->Initialize(TestEnv.GetNewPortNumber(), TmpDir.Path()));
+ ZEN_ASSERT(Port != 0);
+ ServerThread = std::thread([this]() { Server->Run(false); });
+ }
+
+ ~MockOrchestratorFixture()
+ {
+ Server->RequestExit();
+ if (ServerThread.joinable())
+ {
+ ServerThread.join();
+ }
+ Server->Close();
+ }
+
+ std::string GetEndpoint() const { return fmt::format("http://localhost:{}", Port); }
+};
+
+// Build the CbObject response for /orch/agents matching the format UpdateCoordinatorState expects.
+static CbObject
+BuildAgentListResponse(std::initializer_list<std::pair<std::string_view, std::string_view>> Workers)
+{
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("workers"sv);
+ for (const auto& [Id, Uri] : Workers)
+ {
+ Cbo.BeginObject();
+ Cbo << "id"sv << Id;
+ Cbo << "uri"sv << Uri;
+ Cbo << "hostname"sv
+ << "localhost"sv;
+ Cbo << "reachable"sv << true;
+ Cbo << "dt"sv << uint64_t(0);
+ Cbo.EndObject();
+ }
+ Cbo.EndArray();
+ return Cbo.Save();
+}
+
+// Build the worker CbPackage for zentest-appstub AND populate the chunk resolver.
+// This is the same logic as RegisterWorker() but returns the package instead of POSTing it.
+static CbPackage
+BuildWorkerPackage(ZenServerEnvironment& Env, InMemoryChunkResolver& Resolver)
+{
+ std::filesystem::path AppStubPath = Env.ProgramBaseDir() / ("zentest-appstub" ZEN_EXE_SUFFIX_LITERAL);
+
+ FileContents AppStubData = zen::ReadFile(AppStubPath);
+ REQUIRE_MESSAGE(!AppStubData.ErrorCode, fmt::format("Failed to read '{}': {}", AppStubPath.string(), AppStubData.ErrorCode.message()));
+
+ IoBuffer AppStubBuffer = AppStubData.Flatten();
+
+ CompressedBuffer AppStubCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(AppStubBuffer.GetData(), AppStubBuffer.Size()),
+ OodleCompressor::Selkie,
+ OodleCompressionLevel::HyperFast4);
+
+ const IoHash AppStubRawHash = AppStubCompressed.DecodeRawHash();
+ const uint64_t AppStubRawSize = AppStubBuffer.Size();
+
+ // Store compressed data in chunk resolver for when the remote runner needs it
+ Resolver.AddChunk(AppStubRawHash, AppStubCompressed.GetCompressed().Flatten().AsIoBuffer());
+
+ CbAttachment AppStubAttachment(std::move(AppStubCompressed), AppStubRawHash);
+
+ CbObjectWriter WorkerWriter;
+ WorkerWriter << "buildsystem_version"sv << Guid::FromString(kBuildSystemVersion);
+ WorkerWriter << "path"sv
+ << "zentest-appstub"sv;
+
+ WorkerWriter.BeginArray("executables"sv);
+ WorkerWriter.BeginObject();
+ WorkerWriter << "name"sv
+ << "zentest-appstub"sv;
+ WorkerWriter.AddAttachment("hash"sv, AppStubAttachment);
+ WorkerWriter << "size"sv << AppStubRawSize;
+ WorkerWriter.EndObject();
+ WorkerWriter.EndArray();
+
+ WorkerWriter.BeginArray("functions"sv);
+ WorkerWriter.BeginObject();
+ WorkerWriter << "name"sv
+ << "Rot13"sv;
+ WorkerWriter << "version"sv << Guid::FromString(kRot13Version);
+ WorkerWriter.EndObject();
+ WorkerWriter.BeginObject();
+ WorkerWriter << "name"sv
+ << "Sleep"sv;
+ WorkerWriter << "version"sv << Guid::FromString(kSleepVersion);
+ WorkerWriter.EndObject();
+ WorkerWriter.EndArray();
+
+ CbPackage WorkerPackage;
+ WorkerPackage.SetObject(WorkerWriter.Save());
+ WorkerPackage.AddAttachment(AppStubAttachment);
+
+ return WorkerPackage;
+}
+
+// Build a Rot13 action CbObject (not CbPackage) and populate the chunk resolver with the input attachment.
+static CbObject
+BuildRot13ActionForSession(std::string_view Input, InMemoryChunkResolver& Resolver)
+{
+ CompressedBuffer InputCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Input.data(), Input.size()),
+ OodleCompressor::Selkie,
+ OodleCompressionLevel::HyperFast4);
+
+ const IoHash InputRawHash = InputCompressed.DecodeRawHash();
+ const uint64_t InputRawSize = Input.size();
+
+ // Store compressed data in chunk resolver
+ Resolver.AddChunk(InputRawHash, InputCompressed.GetCompressed().Flatten().AsIoBuffer());
+
+ CbAttachment InputAttachment(std::move(InputCompressed), InputRawHash);
+
+ CbObjectWriter ActionWriter;
+ ActionWriter << "Function"sv
+ << "Rot13"sv;
+ ActionWriter << "FunctionVersion"sv << Guid::FromString(kRot13Version);
+ ActionWriter << "BuildSystemVersion"sv << Guid::FromString(kBuildSystemVersion);
+ ActionWriter.BeginObject("Inputs"sv);
+ ActionWriter.BeginObject("Source"sv);
+ ActionWriter.AddAttachment("RawHash"sv, InputAttachment);
+ ActionWriter << "RawSize"sv << InputRawSize;
+ ActionWriter.EndObject();
+ ActionWriter.EndObject();
+
+ return ActionWriter.Save();
+}
+
+TEST_SUITE_BEGIN("server.function");
+
+TEST_CASE("function.rot13")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Submit action via legacy /jobs/{worker} endpoint
+ const std::string JobUrl = fmt::format("/jobs/{}", WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello World"sv));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText()));
+
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from job submission");
+
+ // Poll for result via legacy /jobs/{lsn} endpoint
+ const std::string ResultUrl = fmt::format("/jobs/{}", Lsn);
+ HttpClient::Response ResultResp = PollForResult(Client, ResultUrl);
+ REQUIRE_MESSAGE(
+ ResultResp.StatusCode == HttpResponseCode::OK,
+ fmt::format("Job did not complete in time. Last status: {}\nServer log:\n{}", int(ResultResp.StatusCode), Instance.GetLogOutput()));
+
+ // Verify result: Rot13("Hello World") == "Uryyb Jbeyq"
+ CbPackage ResultPackage = ResultResp.AsPackage();
+ REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Action failed (empty result package)\nServer log:\n{}", Instance.GetLogOutput()));
+
+ CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv);
+}
+
+TEST_CASE("function.workers")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ // Before registration, GET /workers should return an empty list
+ HttpClient::Response EmptyListResp = Client.Get("/workers"sv);
+ REQUIRE_MESSAGE(EmptyListResp, "Failed to list workers before registration");
+ CHECK_EQ(EmptyListResp.AsObject()["workers"sv].AsArrayView().Num(), 0);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // GET /workers — the registered worker should appear in the listing
+ HttpClient::Response ListResp = Client.Get("/workers"sv);
+ REQUIRE_MESSAGE(ListResp, "Failed to list workers after registration");
+
+ bool WorkerFound = false;
+ for (auto& Item : ListResp.AsObject()["workers"sv])
+ {
+ if (Item.AsHash() == WorkerId)
+ {
+ WorkerFound = true;
+ break;
+ }
+ }
+
+ REQUIRE_MESSAGE(WorkerFound, fmt::format("Worker {} not found in worker listing", WorkerId.ToHexString()));
+
+ // GET /workers/{worker} — descriptor should match what was registered
+ const std::string WorkerUrl = fmt::format("/workers/{}", WorkerId.ToHexString());
+ HttpClient::Response DescResp = Client.Get(WorkerUrl);
+ REQUIRE_MESSAGE(DescResp, fmt::format("Failed to get worker descriptor: status={}", int(DescResp.StatusCode)));
+
+ CbObject Desc = DescResp.AsObject();
+ CHECK_EQ(Desc["buildsystem_version"sv].AsUuid(), Guid::FromString(kBuildSystemVersion));
+ CHECK_EQ(Desc["path"sv].AsString(), "zentest-appstub"sv);
+
+ bool Rot13Found = false;
+ bool SleepFound = false;
+ for (auto& Item : Desc["functions"sv])
+ {
+ std::string_view Name = Item.AsObjectView()["name"sv].AsString();
+ if (Name == "Rot13"sv)
+ {
+ CHECK_EQ(Item.AsObjectView()["version"sv].AsUuid(), Guid::FromString(kRot13Version));
+ Rot13Found = true;
+ }
+ else if (Name == "Sleep"sv)
+ {
+ CHECK_EQ(Item.AsObjectView()["version"sv].AsUuid(), Guid::FromString(kSleepVersion));
+ SleepFound = true;
+ }
+ }
+
+ CHECK_MESSAGE(Rot13Found, "Rot13 function not found in worker descriptor");
+ CHECK_MESSAGE(SleepFound, "Sleep function not found in worker descriptor");
+
+ // GET /workers/{unknown} — should return 404
+ const std::string UnknownUrl = fmt::format("/workers/{}", IoHash::Zero.ToHexString());
+ HttpClient::Response NotFoundResp = Client.Get(UnknownUrl);
+ CHECK_EQ(NotFoundResp.StatusCode, HttpResponseCode::NotFound);
+}
+
+TEST_CASE("function.queues.lifecycle")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Create a queue
+ HttpClient::Response CreateResp = Client.Post("/queues"sv);
+ REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}, body={}", int(CreateResp.StatusCode), CreateResp.ToText()));
+
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id from queue creation");
+
+ // Verify the queue appears in the listing
+ HttpClient::Response ListResp = Client.Get("/queues"sv);
+ REQUIRE_MESSAGE(ListResp, "Failed to list queues");
+
+ bool QueueFound = false;
+ for (auto& Item : ListResp.AsObject()["queues"sv])
+ {
+ if (Item.AsObjectView()["queue_id"sv].AsInt32() == QueueId)
+ {
+ QueueFound = true;
+ break;
+ }
+ }
+
+ REQUIRE_MESSAGE(QueueFound, fmt::format("Queue {} not found in queue listing", QueueId));
+
+ // Submit action via queue-scoped endpoint
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello World"sv));
+ REQUIRE_MESSAGE(SubmitResp,
+ fmt::format("Queue job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText()));
+
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from queue job submission");
+
+ // Poll for completion via queue-scoped /completed endpoint
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
+
+ // Retrieve result via queue-scoped /jobs/{lsn} endpoint
+ const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn);
+ HttpClient::Response ResultResp = Client.Get(ResultUrl);
+ REQUIRE_MESSAGE(
+ ResultResp.StatusCode == HttpResponseCode::OK,
+ fmt::format("Failed to retrieve result: status={}\nServer log:\n{}", int(ResultResp.StatusCode), Instance.GetLogOutput()));
+
+ // Verify result: Rot13("Hello World") == "Uryyb Jbeyq"
+ CbPackage ResultPackage = ResultResp.AsPackage();
+ REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Empty result package\nServer log:\n{}", Instance.GetLogOutput()));
+
+ CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv);
+
+ // Verify queue status reflects completion
+ const std::string StatusUrl = fmt::format("/queues/{}", QueueId);
+ HttpClient::Response StatusResp = Client.Get(StatusUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0);
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 0);
+ CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "active");
+}
+
+TEST_CASE("function.queues.cancel")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Create a queue
+ HttpClient::Response CreateResp = Client.Post("/queues"sv);
+ REQUIRE_MESSAGE(CreateResp, "Queue creation failed");
+
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id from queue creation");
+
+ // Submit a job
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello World"sv));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText()));
+
+ // Cancel the queue
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ HttpClient::Response CancelResp = Client.Delete(QueueUrl);
+ REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent,
+ fmt::format("Queue cancellation failed: status={}, body={}", int(CancelResp.StatusCode), CancelResp.ToText()));
+
+ // Verify queue status shows cancelled
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status after cancel");
+
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "cancelled");
+}
+
+TEST_CASE("function.queues.remote")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Create a remote queue — response includes both an integer queue_id and an OID queue_token
+ HttpClient::Response CreateResp = Client.Post("/queues/remote"sv);
+ REQUIRE_MESSAGE(CreateResp,
+ fmt::format("Remote queue creation failed: status={}, body={}", int(CreateResp.StatusCode), CreateResp.ToText()));
+
+ CbObject CreateObj = CreateResp.AsObject();
+ const std::string QueueToken = std::string(CreateObj["queue_token"sv].AsString());
+ REQUIRE_MESSAGE(!QueueToken.empty(), "Expected non-empty queue_token from remote queue creation");
+
+ // All subsequent requests use the opaque token in place of the integer queue id
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueToken, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello World"sv));
+ REQUIRE_MESSAGE(SubmitResp,
+ fmt::format("Remote queue job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText()));
+
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from remote queue job submission");
+
+ // Poll for completion via the token-addressed /completed endpoint
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueToken);
+ REQUIRE_MESSAGE(
+ PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in remote queue completed list within timeout\nServer log:\n{}", Lsn, Instance.GetLogOutput()));
+
+ // Retrieve result via the token-addressed /jobs/{lsn} endpoint
+ const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueToken, Lsn);
+ HttpClient::Response ResultResp = Client.Get(ResultUrl);
+ REQUIRE_MESSAGE(ResultResp.StatusCode == HttpResponseCode::OK,
+ fmt::format("Failed to retrieve result from remote queue: status={}\nServer log:\n{}",
+ int(ResultResp.StatusCode),
+ Instance.GetLogOutput()));
+
+ // Verify result: Rot13("Hello World") == "Uryyb Jbeyq"
+ CbPackage ResultPackage = ResultResp.AsPackage();
+ REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Empty result package\nServer log:\n{}", Instance.GetLogOutput()));
+
+ CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv);
+}
+
+TEST_CASE("function.queues.cancel_running")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Create a queue
+ HttpClient::Response CreateResp = Client.Post("/queues"sv);
+ REQUIRE_MESSAGE(CreateResp, "Queue creation failed");
+
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id from queue creation");
+
+ // Submit a Sleep job long enough that it will still be running when we cancel
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildSleepActionPackage("data"sv, 30'000));
+ REQUIRE_MESSAGE(SubmitResp,
+ fmt::format("Sleep job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText()));
+
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission");
+
+ // Wait for the worker process to start executing before cancelling
+ Sleep(1'000);
+
+ // Cancel the queue, which should interrupt the running Sleep job
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ HttpClient::Response CancelResp = Client.Delete(QueueUrl);
+ REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent,
+ fmt::format("Queue cancellation failed: status={}, body={}", int(CancelResp.StatusCode), CancelResp.ToText()));
+
+ // The cancelled job should appear in the /completed endpoint once the process exits
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in queue {} completed list after cancel\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
+
+ // Verify the queue reflects one cancelled action
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status after cancel");
+
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "cancelled");
+ CHECK_EQ(QueueStatus["cancelled_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+}
+
+TEST_CASE("function.queues.remote_cancel")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Create a remote queue to obtain an OID token for token-addressed cancellation
+ HttpClient::Response CreateResp = Client.Post("/queues/remote"sv);
+ REQUIRE_MESSAGE(CreateResp,
+ fmt::format("Remote queue creation failed: status={}, body={}", int(CreateResp.StatusCode), CreateResp.ToText()));
+
+ const std::string QueueToken = std::string(CreateResp.AsObject()["queue_token"sv].AsString());
+ REQUIRE_MESSAGE(!QueueToken.empty(), "Expected non-empty queue_token from remote queue creation");
+
+ // Submit a long-running Sleep job via the token-addressed endpoint
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueToken, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildSleepActionPackage("data"sv, 30'000));
+ REQUIRE_MESSAGE(SubmitResp,
+ fmt::format("Sleep job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText()));
+
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission");
+
+ // Wait for the worker process to start executing before cancelling
+ Sleep(1'000);
+
+ // Cancel the queue via its OID token
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueToken);
+ HttpClient::Response CancelResp = Client.Delete(QueueUrl);
+ REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent,
+ fmt::format("Remote queue cancellation failed: status={}, body={}", int(CancelResp.StatusCode), CancelResp.ToText()));
+
+ // The cancelled job should appear in the token-addressed /completed endpoint
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueToken);
+ REQUIRE_MESSAGE(
+ PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in remote queue completed list after cancel\nServer log:\n{}", Lsn, Instance.GetLogOutput()));
+
+ // Verify the queue status reflects the cancellation
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get remote queue status after cancel");
+
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "cancelled");
+ CHECK_EQ(QueueStatus["cancelled_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+}
+
+TEST_CASE("function.queues.drain")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Create a queue
+ HttpClient::Response CreateResp = Client.Post("/queues"sv);
+ REQUIRE_MESSAGE(CreateResp, "Queue creation failed");
+
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+
+ // Submit a long-running job so we can verify it completes even after drain
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response Submit1 = Client.Post(JobUrl, BuildSleepActionPackage("data"sv, 2'000));
+ REQUIRE_MESSAGE(Submit1, fmt::format("First job submission failed: status={}", int(Submit1.StatusCode)));
+ const int Lsn1 = Submit1.AsObject()["lsn"sv].AsInt32();
+
+ // Drain the queue
+ const std::string DrainUrl = fmt::format("/queues/{}/drain", QueueId);
+ HttpClient::Response DrainResp = Client.Post(DrainUrl);
+ REQUIRE_MESSAGE(DrainResp, fmt::format("Drain failed: status={}, body={}", int(DrainResp.StatusCode), DrainResp.ToText()));
+ CHECK_EQ(std::string(DrainResp.AsObject()["state"sv].AsString()), "draining");
+
+ // Second submission should be rejected with 424
+ HttpClient::Response Submit2 = Client.Post(JobUrl, BuildRot13ActionPackage("Hello"sv));
+ CHECK_EQ(Submit2.StatusCode, HttpResponseCode::FailedDependency);
+ CHECK_EQ(std::string(Submit2.AsObject()["error"sv].AsString()), "queue is draining");
+
+ // First job should still complete
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn1),
+ fmt::format("LSN {} did not complete after drain\nServer log:\n{}", Lsn1, Instance.GetLogOutput()));
+
+ // Queue status should show draining + complete
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "draining");
+ CHECK(QueueStatus["is_complete"sv].AsBool());
+}
+
+TEST_CASE("function.priority")
+{
+ // Spawn server with max-actions=1 to guarantee serialized action execution,
+ // which lets us deterministically verify that higher-priority pending jobs
+ // are scheduled before lower-priority ones.
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady("--max-actions=1");
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Create a queue for all test jobs
+ HttpClient::Response CreateResp = Client.Post("/queues"sv);
+ REQUIRE_MESSAGE(CreateResp, "Queue creation failed");
+
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id");
+
+ // Submit a blocker Sleep job to occupy the single execution slot.
+ // Once the blocker is running, the scheduler must choose among the pending
+ // jobs by priority when the slot becomes free.
+ const std::string BlockerJobUrl = fmt::format("/queues/{}/jobs/{}?priority=0", QueueId, WorkerId.ToHexString());
+ HttpClient::Response BlockerResp = Client.Post(BlockerJobUrl, BuildSleepActionPackage("data"sv, 1'000));
+ REQUIRE_MESSAGE(BlockerResp, fmt::format("Blocker job submission failed: status={}", int(BlockerResp.StatusCode)));
+
+ // Submit 3 low-priority Rot13 jobs
+ const std::string LowJobUrl = fmt::format("/queues/{}/jobs/{}?priority=0", QueueId, WorkerId.ToHexString());
+
+ HttpClient::Response LowResp1 = Client.Post(LowJobUrl, BuildRot13ActionPackage("low1"sv));
+ REQUIRE_MESSAGE(LowResp1, "Low-priority job 1 submission failed");
+ const int LsnLow1 = LowResp1.AsObject()["lsn"sv].AsInt32();
+
+ HttpClient::Response LowResp2 = Client.Post(LowJobUrl, BuildRot13ActionPackage("low2"sv));
+ REQUIRE_MESSAGE(LowResp2, "Low-priority job 2 submission failed");
+ const int LsnLow2 = LowResp2.AsObject()["lsn"sv].AsInt32();
+
+ HttpClient::Response LowResp3 = Client.Post(LowJobUrl, BuildRot13ActionPackage("low3"sv));
+ REQUIRE_MESSAGE(LowResp3, "Low-priority job 3 submission failed");
+ const int LsnLow3 = LowResp3.AsObject()["lsn"sv].AsInt32();
+
+ // Submit 1 high-priority Rot13 job — should execute before the low-priority ones
+ const std::string HighJobUrl = fmt::format("/queues/{}/jobs/{}?priority=10", QueueId, WorkerId.ToHexString());
+ HttpClient::Response HighResp = Client.Post(HighJobUrl, BuildRot13ActionPackage("high"sv));
+ REQUIRE_MESSAGE(HighResp, "High-priority job submission failed");
+ const int LsnHigh = HighResp.AsObject()["lsn"sv].AsInt32();
+
+ // Wait for all 4 priority-test jobs to appear in the queue's completed list.
+ // This avoids any snapshot-timing race: by the time we compare timestamps, all
+ // jobs have already finished and their history entries are stable.
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+
+ {
+ bool AllCompleted = false;
+ Stopwatch WaitTimer;
+
+ while (!AllCompleted && WaitTimer.GetElapsedTimeMs() < 30'000)
+ {
+ HttpClient::Response Resp = Client.Get(CompletedUrl);
+
+ if (Resp)
+ {
+ bool FoundHigh = false;
+ bool FoundLow1 = false;
+ bool FoundLow2 = false;
+ bool FoundLow3 = false;
+
+ CbObject RespObj = Resp.AsObject();
+
+ for (auto& Item : RespObj["completed"sv])
+ {
+ const int Lsn = Item.AsInt32();
+ if (Lsn == LsnHigh)
+ {
+ FoundHigh = true;
+ }
+ else if (Lsn == LsnLow1)
+ {
+ FoundLow1 = true;
+ }
+ else if (Lsn == LsnLow2)
+ {
+ FoundLow2 = true;
+ }
+ else if (Lsn == LsnLow3)
+ {
+ FoundLow3 = true;
+ }
+ }
+
+ AllCompleted = FoundHigh && FoundLow1 && FoundLow2 && FoundLow3;
+ }
+
+ if (!AllCompleted)
+ {
+ Sleep(100);
+ }
+ }
+
+ REQUIRE_MESSAGE(
+ AllCompleted,
+ fmt::format(
+ "Not all priority test jobs completed within timeout (lsnHigh={} lsnLow1={} lsnLow2={} lsnLow3={})\nServer log:\n{}",
+ LsnHigh,
+ LsnLow1,
+ LsnLow2,
+ LsnLow3,
+ Instance.GetLogOutput()));
+ }
+
+ // Query the queue-scoped history to obtain the time_Completed timestamp for each
+ // job. The history endpoint records when each RunnerAction::State transition
+ // occurred, so time_Completed is the wall-clock tick at which the action finished.
+ // Using the queue-scoped endpoint avoids exposing history from other queues.
+ const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
+ HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
+ REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
+
+ CbObject HistoryObj = HistoryResp.AsObject();
+
+ auto GetCompletedTimestamp = [&](int Lsn) -> uint64_t {
+ for (auto& Item : HistoryObj["history"sv])
+ {
+ if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ {
+ return Item.AsObjectView()["time_Completed"sv].AsUInt64();
+ }
+ }
+ return 0;
+ };
+
+ const uint64_t TimeHigh = GetCompletedTimestamp(LsnHigh);
+ const uint64_t TimeLow1 = GetCompletedTimestamp(LsnLow1);
+ const uint64_t TimeLow2 = GetCompletedTimestamp(LsnLow2);
+ const uint64_t TimeLow3 = GetCompletedTimestamp(LsnLow3);
+
+ REQUIRE_MESSAGE(TimeHigh != 0, fmt::format("lsnHigh={} not found in action history", LsnHigh));
+ REQUIRE_MESSAGE(TimeLow1 != 0, fmt::format("lsnLow1={} not found in action history", LsnLow1));
+ REQUIRE_MESSAGE(TimeLow2 != 0, fmt::format("lsnLow2={} not found in action history", LsnLow2));
+ REQUIRE_MESSAGE(TimeLow3 != 0, fmt::format("lsnLow3={} not found in action history", LsnLow3));
+
+ // The high-priority job must have completed strictly before every low-priority job
+ CHECK_MESSAGE(TimeHigh < TimeLow1,
+ fmt::format("Priority ordering violated: lsnHigh={} completed at t={} but lsnLow1={} completed at t={} (expected later)",
+ LsnHigh,
+ TimeHigh,
+ LsnLow1,
+ TimeLow1));
+ CHECK_MESSAGE(TimeHigh < TimeLow2,
+ fmt::format("Priority ordering violated: lsnHigh={} completed at t={} but lsnLow2={} completed at t={} (expected later)",
+ LsnHigh,
+ TimeHigh,
+ LsnLow2,
+ TimeLow2));
+ CHECK_MESSAGE(TimeHigh < TimeLow3,
+ fmt::format("Priority ordering violated: lsnHigh={} completed at t={} but lsnLow3={} completed at t={} (expected later)",
+ LsnHigh,
+ TimeHigh,
+ LsnLow3,
+ TimeLow3));
+}
+
+//////////////////////////////////////////////////////////////////////////
+// Remote worker synchronization tests
+//
+// These tests exercise the orchestrator discovery path where new compute
+// nodes appear over time and must receive previously registered workers
+// via SyncWorkersToRunner().
+
+TEST_CASE("function.remote.worker_sync_on_discovery")
+{
+ // Spawn real zenserver in compute mode
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t ServerPort = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(ServerPort != 0, Instance.GetLogOutput());
+
+ const std::string ServerUri = fmt::format("http://localhost:{}", ServerPort);
+
+ // Start mock orchestrator with empty worker list
+ MockOrchestratorFixture MockOrch;
+
+ // Create session infrastructure
+ InMemoryChunkResolver Resolver;
+ ScopedTemporaryDirectory SessionBaseDir;
+ zen::compute::ComputeServiceSession Session(Resolver);
+ Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint());
+ Session.SetOrchestratorBasePath(SessionBaseDir.Path());
+ Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready);
+
+ // Register worker on session (stored locally, no runners yet)
+ CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
+ Session.RegisterWorker(WorkerPackage);
+
+ // Update mock orchestrator to advertise the real server
+ MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", ServerUri}}));
+
+ // Wait for scheduler to discover the runner (~5s throttle + margin)
+ Sleep(7'000);
+
+ // Submit Rot13 action via session
+ CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver);
+
+ zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueAction(ActionObj, 0);
+ REQUIRE_MESSAGE(EnqueueRes, "Action enqueue failed");
+
+ // Poll for result
+ CbPackage ResultPackage;
+ HttpResponseCode ResultCode = HttpResponseCode::Accepted;
+ Stopwatch Timer;
+
+ while (Timer.GetElapsedTimeMs() < 30'000)
+ {
+ ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage);
+ if (ResultCode == HttpResponseCode::OK)
+ {
+ break;
+ }
+ Sleep(200);
+ }
+
+ REQUIRE_MESSAGE(
+ ResultCode == HttpResponseCode::OK,
+ fmt::format("Action did not complete in time. Last status: {}\nServer log:\n{}", int(ResultCode), Instance.GetLogOutput()));
+
+ REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Empty result package\nServer log:\n{}", Instance.GetLogOutput()));
+
+ CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv);
+
+ Session.Shutdown();
+}
+
+TEST_CASE("function.remote.late_runner_discovery")
+{
+ // Spawn first server
+ ZenServerInstance Instance1(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance1.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port1 = Instance1.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port1 != 0, Instance1.GetLogOutput());
+
+ const std::string ServerUri1 = fmt::format("http://localhost:{}", Port1);
+
+ // Start mock orchestrator advertising W1
+ MockOrchestratorFixture MockOrch;
+ MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", ServerUri1}}));
+
+ // Create session and register worker
+ InMemoryChunkResolver Resolver;
+ ScopedTemporaryDirectory SessionBaseDir;
+ zen::compute::ComputeServiceSession Session(Resolver);
+ Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint());
+ Session.SetOrchestratorBasePath(SessionBaseDir.Path());
+ Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready);
+
+ CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
+ Session.RegisterWorker(WorkerPackage);
+
+ // Wait for W1 discovery
+ Sleep(7'000);
+
+ // Baseline: submit Rot13 action and verify it completes on W1
+ {
+ CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver);
+
+ zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueAction(ActionObj, 0);
+ REQUIRE_MESSAGE(EnqueueRes, "Baseline action enqueue failed");
+
+ CbPackage ResultPackage;
+ HttpResponseCode ResultCode = HttpResponseCode::Accepted;
+ Stopwatch Timer;
+
+ while (Timer.GetElapsedTimeMs() < 30'000)
+ {
+ ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage);
+ if (ResultCode == HttpResponseCode::OK)
+ {
+ break;
+ }
+ Sleep(200);
+ }
+
+ REQUIRE_MESSAGE(ResultCode == HttpResponseCode::OK,
+ fmt::format("Baseline action did not complete in time\nServer log:\n{}", Instance1.GetLogOutput()));
+
+ CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv);
+ }
+
+ // Spawn second server
+ ZenServerInstance Instance2(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance2.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port2 = Instance2.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port2 != 0, Instance2.GetLogOutput());
+
+ const std::string ServerUri2 = fmt::format("http://localhost:{}", Port2);
+
+ // Update mock orchestrator to include both W1 and W2
+ MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", ServerUri1}, {"worker-2", ServerUri2}}));
+
+ // Wait for W2 discovery
+ Sleep(7'000);
+
+ // Verify W2 received the worker by querying its /compute/workers endpoint directly
+ {
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port2);
+ HttpClient Client(ComputeBaseUri);
+ HttpClient::Response ListResp = Client.Get("/workers"sv);
+ REQUIRE_MESSAGE(ListResp, "Failed to list workers on W2");
+
+ bool WorkerFound = false;
+ for (auto& Item : ListResp.AsObject()["workers"sv])
+ {
+ if (Item.AsHash() == WorkerPackage.GetObjectHash())
+ {
+ WorkerFound = true;
+ break;
+ }
+ }
+
+ REQUIRE_MESSAGE(WorkerFound,
+ fmt::format("Worker not found on W2 after discovery — SyncWorkersToRunner may have failed\nW2 log:\n{}",
+ Instance2.GetLogOutput()));
+ }
+
+ // Submit another action and verify it completes (could run on either W1 or W2)
+ {
+ CbObject ActionObj = BuildRot13ActionForSession("Second Test"sv, Resolver);
+
+ zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueAction(ActionObj, 0);
+ REQUIRE_MESSAGE(EnqueueRes, "Second action enqueue failed");
+
+ CbPackage ResultPackage;
+ HttpResponseCode ResultCode = HttpResponseCode::Accepted;
+ Stopwatch Timer;
+
+ while (Timer.GetElapsedTimeMs() < 30'000)
+ {
+ ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage);
+ if (ResultCode == HttpResponseCode::OK)
+ {
+ break;
+ }
+ Sleep(200);
+ }
+
+ REQUIRE_MESSAGE(ResultCode == HttpResponseCode::OK,
+ fmt::format("Second action did not complete in time\nW1 log:\n{}\nW2 log:\n{}",
+ Instance1.GetLogOutput(),
+ Instance2.GetLogOutput()));
+
+ // Rot13("Second Test") = "Frpbaq Grfg"
+ CHECK_EQ(GetRot13Output(ResultPackage), "Frpbaq Grfg"sv);
+ }
+
+ Session.Shutdown();
+}
+
+TEST_CASE("function.remote.queue_association")
+{
+ // Spawn real zenserver as a remote compute node
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ REQUIRE_MESSAGE(Instance.SpawnServerAndWaitUntilReady() != 0, Instance.GetLogOutput());
+
+ // Start mock orchestrator advertising the server
+ MockOrchestratorFixture MockOrch;
+ MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", Instance.GetBaseUri()}}));
+
+ // Create session infrastructure
+ InMemoryChunkResolver Resolver;
+ ScopedTemporaryDirectory SessionBaseDir;
+ zen::compute::ComputeServiceSession Session(Resolver);
+ Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint());
+ Session.SetOrchestratorBasePath(SessionBaseDir.Path());
+ Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready);
+
+ // Register worker on session
+ CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
+ Session.RegisterWorker(WorkerPackage);
+
+ // Wait for scheduler to discover the runner
+ Sleep(7'000);
+
+ // Create a local queue and submit action to it
+ auto QueueResult = Session.CreateQueue();
+ REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create local queue");
+ const int QueueId = QueueResult.QueueId;
+
+ CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver);
+
+ zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueActionToQueue(QueueId, ActionObj, 0);
+ REQUIRE_MESSAGE(EnqueueRes, "Action enqueue to queue failed");
+
+ // Poll for result
+ CbPackage ResultPackage;
+ HttpResponseCode ResultCode = HttpResponseCode::Accepted;
+ Stopwatch Timer;
+
+ while (Timer.GetElapsedTimeMs() < 30'000)
+ {
+ ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage);
+ if (ResultCode == HttpResponseCode::OK)
+ {
+ break;
+ }
+ Sleep(200);
+ }
+
+ REQUIRE_MESSAGE(
+ ResultCode == HttpResponseCode::OK,
+ fmt::format("Action did not complete in time. Last status: {}\nServer log:\n{}", int(ResultCode), Instance.GetLogOutput()));
+
+ REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Empty result package\nServer log:\n{}", Instance.GetLogOutput()));
+ CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv);
+
+ // Verify that a non-implicit remote queue was created on the compute node
+ HttpClient Client(Instance.GetBaseUri() + "/compute");
+
+ HttpClient::Response QueuesResp = Client.Get("/queues"sv);
+ REQUIRE_MESSAGE(QueuesResp, "Failed to list queues on remote server");
+
+ bool RemoteQueueFound = false;
+ for (auto& Item : QueuesResp.AsObject()["queues"sv])
+ {
+ if (!Item.AsObjectView()["implicit"sv].AsBool())
+ {
+ RemoteQueueFound = true;
+ break;
+ }
+ }
+
+ CHECK_MESSAGE(RemoteQueueFound, "Expected a non-implicit remote queue on the compute node");
+
+ Session.Shutdown();
+}
+
+TEST_CASE("function.remote.queue_cancel_propagation")
+{
+ // Spawn real zenserver as a remote compute node
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ REQUIRE_MESSAGE(Instance.SpawnServerAndWaitUntilReady() != 0, Instance.GetLogOutput());
+
+ // Start mock orchestrator advertising the server
+ MockOrchestratorFixture MockOrch;
+ MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", Instance.GetBaseUri()}}));
+
+ // Create session infrastructure
+ InMemoryChunkResolver Resolver;
+ ScopedTemporaryDirectory SessionBaseDir;
+ zen::compute::ComputeServiceSession Session(Resolver);
+ Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint());
+ Session.SetOrchestratorBasePath(SessionBaseDir.Path());
+ Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready);
+
+ // Register worker on session
+ CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
+ Session.RegisterWorker(WorkerPackage);
+
+ // Wait for scheduler to discover the runner
+ Sleep(7'000);
+
+ // Create a local queue and submit a long-running Sleep action
+ auto QueueResult = Session.CreateQueue();
+ REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create local queue");
+ const int QueueId = QueueResult.QueueId;
+
+ CbObject ActionObj = BuildSleepActionForSession("data"sv, 30'000, Resolver);
+
+ zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueActionToQueue(QueueId, ActionObj, 0);
+ REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed");
+
+ // Wait for the action to start running on the remote
+ Sleep(2'000);
+
+ // Cancel the local queue — this should propagate to the remote
+ Session.CancelQueue(QueueId);
+
+ // Poll for the action to complete (as cancelled)
+ CbPackage ResultPackage;
+ HttpResponseCode ResultCode = HttpResponseCode::Accepted;
+ Stopwatch Timer;
+
+ while (Timer.GetElapsedTimeMs() < 30'000)
+ {
+ ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage);
+ if (ResultCode == HttpResponseCode::OK)
+ {
+ break;
+ }
+ Sleep(200);
+ }
+
+ // Verify the local queue shows cancelled
+ auto QueueStatus = Session.GetQueueStatus(QueueId);
+ CHECK(QueueStatus.State == zen::compute::ComputeServiceSession::QueueState::Cancelled);
+
+ // Verify the remote queue on the compute node is also cancelled
+ HttpClient Client(Instance.GetBaseUri() + "/compute");
+
+ HttpClient::Response QueuesResp = Client.Get("/queues"sv);
+ REQUIRE_MESSAGE(QueuesResp, "Failed to list queues on remote server");
+
+ bool RemoteQueueCancelled = false;
+ for (auto& Item : QueuesResp.AsObject()["queues"sv])
+ {
+ if (!Item.AsObjectView()["implicit"sv].AsBool())
+ {
+ RemoteQueueCancelled = std::string(Item.AsObjectView()["state"sv].AsString()) == "cancelled";
+ break;
+ }
+ }
+
+ CHECK_MESSAGE(RemoteQueueCancelled, "Expected the remote queue to be cancelled");
+
+ Session.Shutdown();
+}
+
+TEST_CASE("function.abandon_running_http")
+{
+ // Spawn a real zenserver to execute a long-running action, then abandon via HTTP endpoint
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Create a queue and submit a long-running Sleep job
+ HttpClient::Response CreateResp = Client.Post("/queues"sv);
+ REQUIRE_MESSAGE(CreateResp, "Queue creation failed");
+
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id");
+
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildSleepActionPackage("data"sv, 30'000));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Sleep job submission failed: status={}", int(SubmitResp.StatusCode)));
+
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN");
+
+ // Wait for the process to start running
+ Sleep(1'000);
+
+ // Verify the ready endpoint returns OK before abandon
+ {
+ HttpClient::Response ReadyResp = Client.Get("/ready"sv);
+ CHECK(ReadyResp.StatusCode == HttpResponseCode::OK);
+ }
+
+ // Trigger abandon via the HTTP endpoint
+ HttpClient::Response AbandonResp = Client.Post("/abandon"sv);
+ REQUIRE_MESSAGE(AbandonResp.StatusCode == HttpResponseCode::OK,
+ fmt::format("Abandon request failed: status={}, body={}", int(AbandonResp.StatusCode), AbandonResp.ToText()));
+
+ // Ready endpoint should now return 503
+ {
+ HttpClient::Response ReadyResp = Client.Get("/ready"sv);
+ CHECK(ReadyResp.StatusCode == HttpResponseCode::ServiceUnavailable);
+ }
+
+ // The abandoned action should appear in the completed endpoint once the process exits
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in queue {} completed list after abandon\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
+
+ // Verify the queue reflects one abandoned action
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status after abandon");
+
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["abandoned_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+ CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0);
+
+ // Submitting new work should be rejected
+ HttpClient::Response RejectedResp = Client.Post(JobUrl, BuildRot13ActionPackage("rejected"sv));
+ CHECK_MESSAGE(RejectedResp.StatusCode != HttpResponseCode::OK, "Expected action submission to be rejected in Abandoned state");
+}
+
+TEST_CASE("function.session.abandon_pending")
+{
+ // Create a session with no runners so actions stay pending
+ InMemoryChunkResolver Resolver;
+ ScopedTemporaryDirectory SessionBaseDir;
+ zen::compute::ComputeServiceSession Session(Resolver);
+ Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready);
+
+ CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
+ Session.RegisterWorker(WorkerPackage);
+
+ // Enqueue several actions — they will stay pending because there are no runners
+ auto QueueResult = Session.CreateQueue();
+ REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create queue");
+
+ CbObject ActionObj = BuildRot13ActionForSession("abandon-test"sv, Resolver);
+
+ auto Enqueue1 = Session.EnqueueActionToQueue(QueueResult.QueueId, ActionObj, 0);
+ auto Enqueue2 = Session.EnqueueActionToQueue(QueueResult.QueueId, ActionObj, 0);
+ auto Enqueue3 = Session.EnqueueActionToQueue(QueueResult.QueueId, ActionObj, 0);
+ REQUIRE_MESSAGE(Enqueue1, "Failed to enqueue action 1");
+ REQUIRE_MESSAGE(Enqueue2, "Failed to enqueue action 2");
+ REQUIRE_MESSAGE(Enqueue3, "Failed to enqueue action 3");
+
+ // Transition to Abandoned — should mark all pending actions as Abandoned
+ bool Transitioned = Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Abandoned);
+ CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned");
+ CHECK(Session.GetSessionState() == zen::compute::ComputeServiceSession::SessionState::Abandoned);
+ CHECK(!Session.IsHealthy());
+
+ // Give the scheduler thread time to process the state changes
+ Sleep(2'000);
+
+ // All three actions should now be in the results map as abandoned
+ for (int Lsn : {Enqueue1.Lsn, Enqueue2.Lsn, Enqueue3.Lsn})
+ {
+ CbPackage Result;
+ HttpResponseCode Code = Session.GetActionResult(Lsn, Result);
+ CHECK_MESSAGE(Code == HttpResponseCode::OK, fmt::format("Expected action LSN {} to be in results (got {})", Lsn, int(Code)));
+ }
+
+ // Queue should show 0 active, 3 abandoned
+ auto Status = Session.GetQueueStatus(QueueResult.QueueId);
+ CHECK_EQ(Status.ActiveCount, 0);
+ CHECK_EQ(Status.AbandonedCount, 3);
+
+ // New actions should be rejected
+ auto Rejected = Session.EnqueueActionToQueue(QueueResult.QueueId, ActionObj, 0);
+ CHECK_MESSAGE(!Rejected, "Expected action submission to be rejected in Abandoned state");
+
+ // Abandoned → Sunset should be valid
+ CHECK(Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Sunset));
+
+ Session.Shutdown();
+}
+
+TEST_CASE("function.session.abandon_running")
+{
+ // Spawn a real zenserver as a remote compute node
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ REQUIRE_MESSAGE(Instance.SpawnServerAndWaitUntilReady() != 0, Instance.GetLogOutput());
+
+ // Start mock orchestrator advertising the server
+ MockOrchestratorFixture MockOrch;
+ MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", Instance.GetBaseUri()}}));
+
+ // Create session infrastructure
+ InMemoryChunkResolver Resolver;
+ ScopedTemporaryDirectory SessionBaseDir;
+ zen::compute::ComputeServiceSession Session(Resolver);
+ Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint());
+ Session.SetOrchestratorBasePath(SessionBaseDir.Path());
+ Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready);
+
+ CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
+ Session.RegisterWorker(WorkerPackage);
+
+ // Wait for scheduler to discover the runner
+ Sleep(7'000);
+
+ // Create a queue and submit a long-running Sleep action
+ auto QueueResult = Session.CreateQueue();
+ REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create queue");
+ const int QueueId = QueueResult.QueueId;
+
+ CbObject ActionObj = BuildSleepActionForSession("data"sv, 30'000, Resolver);
+
+ auto EnqueueRes = Session.EnqueueActionToQueue(QueueId, ActionObj, 0);
+ REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed");
+
+ // Wait for the action to start running on the remote
+ Sleep(2'000);
+
+ // Transition to Abandoned — should abandon the running action
+ bool Transitioned = Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Abandoned);
+ CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned");
+ CHECK(!Session.IsHealthy());
+
+ // Poll for the action to complete (as abandoned)
+ CbPackage ResultPackage;
+ HttpResponseCode ResultCode = HttpResponseCode::Accepted;
+ Stopwatch Timer;
+
+ while (Timer.GetElapsedTimeMs() < 30'000)
+ {
+ ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage);
+ if (ResultCode == HttpResponseCode::OK)
+ {
+ break;
+ }
+ Sleep(200);
+ }
+
+ REQUIRE_MESSAGE(ResultCode == HttpResponseCode::OK,
+ fmt::format("Action did not complete within timeout\nServer log:\n{}", Instance.GetLogOutput()));
+
+ // Verify the queue shows abandoned, not completed
+ auto QueueStatus = Session.GetQueueStatus(QueueId);
+ CHECK_EQ(QueueStatus.ActiveCount, 0);
+ CHECK_EQ(QueueStatus.AbandonedCount, 1);
+ CHECK_EQ(QueueStatus.CompletedCount, 0);
+
+ Session.Shutdown();
+}
+
+TEST_CASE("function.remote.abandon_propagation")
+{
+ // Spawn real zenserver as a remote compute node
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ REQUIRE_MESSAGE(Instance.SpawnServerAndWaitUntilReady() != 0, Instance.GetLogOutput());
+
+ // Start mock orchestrator advertising the server
+ MockOrchestratorFixture MockOrch;
+ MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", Instance.GetBaseUri()}}));
+
+ // Create session infrastructure
+ InMemoryChunkResolver Resolver;
+ ScopedTemporaryDirectory SessionBaseDir;
+ zen::compute::ComputeServiceSession Session(Resolver);
+ Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint());
+ Session.SetOrchestratorBasePath(SessionBaseDir.Path());
+ Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready);
+
+ // Register worker on session
+ CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
+ Session.RegisterWorker(WorkerPackage);
+
+ // Wait for scheduler to discover the runner
+ Sleep(7'000);
+
+ // Create a local queue and submit a long-running Sleep action
+ auto QueueResult = Session.CreateQueue();
+ REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create local queue");
+ const int QueueId = QueueResult.QueueId;
+
+ CbObject ActionObj = BuildSleepActionForSession("data"sv, 30'000, Resolver);
+
+ auto EnqueueRes = Session.EnqueueActionToQueue(QueueId, ActionObj, 0);
+ REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed");
+
+ // Wait for the action to start running on the remote
+ Sleep(2'000);
+
+ // Transition to Abandoned — should abandon the running action and propagate
+ bool Transitioned = Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Abandoned);
+ CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned");
+
+ // Poll for the action to complete
+ CbPackage ResultPackage;
+ HttpResponseCode ResultCode = HttpResponseCode::Accepted;
+ Stopwatch Timer;
+
+ while (Timer.GetElapsedTimeMs() < 30'000)
+ {
+ ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage);
+ if (ResultCode == HttpResponseCode::OK)
+ {
+ break;
+ }
+ Sleep(200);
+ }
+
+ REQUIRE_MESSAGE(ResultCode == HttpResponseCode::OK,
+ fmt::format("Action did not complete within timeout\nServer log:\n{}", Instance.GetLogOutput()));
+
+ // Verify the local queue shows abandoned
+ auto QueueStatus = Session.GetQueueStatus(QueueId);
+ CHECK_EQ(QueueStatus.ActiveCount, 0);
+ CHECK_EQ(QueueStatus.AbandonedCount, 1);
+
+ // Session should not be healthy
+ CHECK(!Session.IsHealthy());
+
+ // The remote compute node should still be healthy (only the parent abandoned)
+ HttpClient RemoteClient(Instance.GetBaseUri() + "/compute");
+ HttpClient::Response ReadyResp = RemoteClient.Get("/ready"sv);
+ CHECK_MESSAGE(ReadyResp.StatusCode == HttpResponseCode::OK, "Remote compute node should still be healthy");
+
+ Session.Shutdown();
+}
+
+TEST_SUITE_END();
+
+} // namespace zen::tests::compute
+
+#endif