// Copyright Epic Games, Inc. All Rights Reserved. #include #if ZEN_WITH_TESTS && ZEN_WITH_COMPUTE_SERVICES # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include "zenserver-test.h" # include namespace zen::tests::compute { using namespace std::literals; // BuildSystemVersion and function version GUIDs matching zentest-appstub static constexpr std::string_view kBuildSystemVersion = "17fe280d-ccd8-4be8-a9d1-89c944a70969"; static constexpr std::string_view kRot13Version = "13131313-1313-1313-1313-131313131313"; static constexpr std::string_view kSleepVersion = "88888888-8888-8888-8888-888888888888"; // In-memory implementation of ChunkResolver for test use. // Stores compressed data keyed by decompressed content hash. class InMemoryChunkResolver : public ChunkResolver { public: IoBuffer FindChunkByCid(const IoHash& DecompressedId) override { auto It = m_Chunks.find(DecompressedId); if (It != m_Chunks.end()) { return It->second; } return {}; } void AddChunk(const IoHash& DecompressedId, IoBuffer Data) { m_Chunks[DecompressedId] = std::move(Data); } private: std::unordered_map m_Chunks; }; // Read, compress, and register zentest-appstub as a worker. // Returns the WorkerId (hash of the worker package object). static IoHash RegisterWorker(HttpClient& Client, ZenServerEnvironment& Env) { std::filesystem::path AppStubPath = Env.ProgramBaseDir() / ("zentest-appstub" ZEN_EXE_SUFFIX_LITERAL); FileContents AppStubData = zen::ReadFile(AppStubPath); REQUIRE_MESSAGE(!AppStubData.ErrorCode, fmt::format("Failed to read '{}': {}", AppStubPath.string(), AppStubData.ErrorCode.message())); IoBuffer AppStubBuffer = AppStubData.Flatten(); CompressedBuffer AppStubCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(AppStubBuffer.GetData(), AppStubBuffer.Size()), OodleCompressor::Selkie, OodleCompressionLevel::HyperFast4); const IoHash AppStubRawHash = AppStubCompressed.DecodeRawHash(); const uint64_t AppStubRawSize = AppStubBuffer.Size(); CbAttachment AppStubAttachment(std::move(AppStubCompressed), AppStubRawHash); CbObjectWriter WorkerWriter; WorkerWriter << "buildsystem_version"sv << Guid::FromString(kBuildSystemVersion); WorkerWriter << "path"sv << "zentest-appstub"sv; WorkerWriter.BeginArray("executables"sv); WorkerWriter.BeginObject(); WorkerWriter << "name"sv << "zentest-appstub"sv; WorkerWriter.AddAttachment("hash"sv, AppStubAttachment); WorkerWriter << "size"sv << AppStubRawSize; WorkerWriter.EndObject(); WorkerWriter.EndArray(); WorkerWriter.BeginArray("functions"sv); WorkerWriter.BeginObject(); WorkerWriter << "name"sv << "Rot13"sv; WorkerWriter << "version"sv << Guid::FromString(kRot13Version); WorkerWriter.EndObject(); WorkerWriter.BeginObject(); WorkerWriter << "name"sv << "Sleep"sv; WorkerWriter << "version"sv << Guid::FromString(kSleepVersion); WorkerWriter.EndObject(); WorkerWriter.EndArray(); CbPackage WorkerPackage; WorkerPackage.SetObject(WorkerWriter.Save()); WorkerPackage.AddAttachment(AppStubAttachment); const IoHash WorkerId = WorkerPackage.GetObjectHash(); const std::string WorkerUrl = fmt::format("/workers/{}", WorkerId.ToHexString()); HttpClient::Response RegisterResp = Client.Post(WorkerUrl, std::move(WorkerPackage)); REQUIRE_MESSAGE(RegisterResp, fmt::format("Worker registration failed: status={}, body={}", int(RegisterResp.StatusCode), RegisterResp.ToText())); return WorkerId; } // Build a Rot13 action CbPackage for the given input string. static CbPackage BuildRot13ActionPackage(std::string_view Input) { CompressedBuffer InputCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Input.data(), Input.size()), OodleCompressor::Selkie, OodleCompressionLevel::HyperFast4); const IoHash InputRawHash = InputCompressed.DecodeRawHash(); const uint64_t InputRawSize = Input.size(); CbAttachment InputAttachment(std::move(InputCompressed), InputRawHash); CbObjectWriter ActionWriter; ActionWriter << "Function"sv << "Rot13"sv; ActionWriter << "FunctionVersion"sv << Guid::FromString(kRot13Version); ActionWriter << "BuildSystemVersion"sv << Guid::FromString(kBuildSystemVersion); ActionWriter.BeginObject("Inputs"sv); ActionWriter.BeginObject("Source"sv); ActionWriter.AddAttachment("RawHash"sv, InputAttachment); ActionWriter << "RawSize"sv << InputRawSize; ActionWriter.EndObject(); ActionWriter.EndObject(); CbPackage ActionPackage; ActionPackage.SetObject(ActionWriter.Save()); ActionPackage.AddAttachment(InputAttachment); return ActionPackage; } // Build a Sleep action CbPackage. The worker sleeps for SleepTimeMs before returning its input. static CbPackage BuildSleepActionPackage(std::string_view Input, uint64_t SleepTimeMs) { CompressedBuffer InputCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Input.data(), Input.size()), OodleCompressor::Selkie, OodleCompressionLevel::HyperFast4); const IoHash InputRawHash = InputCompressed.DecodeRawHash(); const uint64_t InputRawSize = Input.size(); CbAttachment InputAttachment(std::move(InputCompressed), InputRawHash); CbObjectWriter ActionWriter; ActionWriter << "Function"sv << "Sleep"sv; ActionWriter << "FunctionVersion"sv << Guid::FromString(kSleepVersion); ActionWriter << "BuildSystemVersion"sv << Guid::FromString(kBuildSystemVersion); ActionWriter.BeginObject("Inputs"sv); ActionWriter.BeginObject("Source"sv); ActionWriter.AddAttachment("RawHash"sv, InputAttachment); ActionWriter << "RawSize"sv << InputRawSize; ActionWriter.EndObject(); ActionWriter.EndObject(); ActionWriter.BeginObject("Constants"sv); ActionWriter << "SleepTimeMs"sv << SleepTimeMs; ActionWriter.EndObject(); CbPackage ActionPackage; ActionPackage.SetObject(ActionWriter.Save()); ActionPackage.AddAttachment(InputAttachment); return ActionPackage; } // Build a Sleep action CbObject and populate the chunk resolver with the input attachment. static CbObject BuildSleepActionForSession(std::string_view Input, uint64_t SleepTimeMs, InMemoryChunkResolver& Resolver) { CompressedBuffer InputCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Input.data(), Input.size()), OodleCompressor::Selkie, OodleCompressionLevel::HyperFast4); const IoHash InputRawHash = InputCompressed.DecodeRawHash(); const uint64_t InputRawSize = Input.size(); Resolver.AddChunk(InputRawHash, InputCompressed.GetCompressed().Flatten().AsIoBuffer()); CbAttachment InputAttachment(std::move(InputCompressed), InputRawHash); CbObjectWriter ActionWriter; ActionWriter << "Function"sv << "Sleep"sv; ActionWriter << "FunctionVersion"sv << Guid::FromString(kSleepVersion); ActionWriter << "BuildSystemVersion"sv << Guid::FromString(kBuildSystemVersion); ActionWriter.BeginObject("Inputs"sv); ActionWriter.BeginObject("Source"sv); ActionWriter.AddAttachment("RawHash"sv, InputAttachment); ActionWriter << "RawSize"sv << InputRawSize; ActionWriter.EndObject(); ActionWriter.EndObject(); ActionWriter.BeginObject("Constants"sv); ActionWriter << "SleepTimeMs"sv << SleepTimeMs; ActionWriter.EndObject(); return ActionWriter.Save(); } static HttpClient::Response PollForResult(HttpClient& Client, const std::string& ResultUrl, uint64_t TimeoutMs = 30'000) { HttpClient::Response Resp; Stopwatch Timer; while (Timer.GetElapsedTimeMs() < TimeoutMs) { Resp = Client.Get(ResultUrl); if (Resp.StatusCode == HttpResponseCode::OK) { break; } Sleep(100); } return Resp; } static bool PollForLsnInCompleted(HttpClient& Client, const std::string& CompletedUrl, int Lsn, uint64_t TimeoutMs = 30'000) { Stopwatch Timer; while (Timer.GetElapsedTimeMs() < TimeoutMs) { HttpClient::Response Resp = Client.Get(CompletedUrl); if (Resp) { for (auto& Item : Resp.AsObject()["completed"sv]) { if (Item.AsInt32() == Lsn) { return true; } } } Sleep(100); } return false; } static std::string GetRot13Output(const CbPackage& ResultPackage) { CbObject ResultObj = ResultPackage.GetObject(); IoHash OutputHash; CbFieldView ValuesField = ResultObj["Values"sv]; if (CbFieldViewIterator It = begin(ValuesField); It.HasValue()) { OutputHash = (*It).AsObjectView()["RawHash"sv].AsHash(); } REQUIRE_MESSAGE(OutputHash != IoHash::Zero, "Expected non-zero output hash in result Values array"); const CbAttachment* OutputAttachment = ResultPackage.FindAttachment(OutputHash); REQUIRE_MESSAGE(OutputAttachment != nullptr, "Output attachment not found in result package"); CompressedBuffer OutputCompressed = OutputAttachment->AsCompressedBinary(); SharedBuffer OutputData = OutputCompressed.Decompress(); return std::string(static_cast(OutputData.GetData()), OutputData.GetSize()); } // Mock orchestrator HTTP service that serves GET /orch/agents with a controllable response. class MockOrchestratorService : public HttpService { public: MockOrchestratorService() { // Initialize with empty worker list CbObjectWriter Cbo; Cbo.BeginArray("workers"sv); Cbo.EndArray(); m_WorkerList = Cbo.Save(); } const char* BaseUri() const override { return "/orch/"; } void HandleRequest(HttpServerRequest& Request) override { if (Request.RequestVerb() == HttpVerb::kGet && Request.RelativeUri() == "agents"sv) { RwLock::SharedLockScope Lock(m_Lock); Request.WriteResponse(HttpResponseCode::OK, m_WorkerList); return; } Request.WriteResponse(HttpResponseCode::NotFound); } void SetWorkerList(CbObject WorkerList) { RwLock::ExclusiveLockScope Lock(m_Lock); m_WorkerList = std::move(WorkerList); } private: RwLock m_Lock; CbObject m_WorkerList; }; // Manages in-process ASIO HTTP server lifecycle for mock orchestrator. struct MockOrchestratorFixture { MockOrchestratorService Service; ScopedTemporaryDirectory TmpDir; Ref Server; std::thread ServerThread; uint16_t Port = 0; MockOrchestratorFixture() { HttpServerConfig Config; Config.ServerClass = "asio"; Config.ForceLoopback = true; Server = CreateHttpServer(Config); Server->RegisterService(Service); Port = static_cast(Server->Initialize(TestEnv.GetNewPortNumber(), TmpDir.Path())); ZEN_ASSERT(Port != 0); ServerThread = std::thread([this]() { Server->Run(false); }); } ~MockOrchestratorFixture() { Server->RequestExit(); if (ServerThread.joinable()) { ServerThread.join(); } Server->Close(); } std::string GetEndpoint() const { return fmt::format("http://localhost:{}", Port); } }; // Build the CbObject response for /orch/agents matching the format UpdateCoordinatorState expects. static CbObject BuildAgentListResponse(std::initializer_list> Workers) { CbObjectWriter Cbo; Cbo.BeginArray("workers"sv); for (const auto& [Id, Uri] : Workers) { Cbo.BeginObject(); Cbo << "id"sv << Id; Cbo << "uri"sv << Uri; Cbo << "hostname"sv << "localhost"sv; Cbo << "reachable"sv << true; Cbo << "dt"sv << uint64_t(0); Cbo.EndObject(); } Cbo.EndArray(); return Cbo.Save(); } // Build the worker CbPackage for zentest-appstub AND populate the chunk resolver. // This is the same logic as RegisterWorker() but returns the package instead of POSTing it. static CbPackage BuildWorkerPackage(ZenServerEnvironment& Env, InMemoryChunkResolver& Resolver) { std::filesystem::path AppStubPath = Env.ProgramBaseDir() / ("zentest-appstub" ZEN_EXE_SUFFIX_LITERAL); FileContents AppStubData = zen::ReadFile(AppStubPath); REQUIRE_MESSAGE(!AppStubData.ErrorCode, fmt::format("Failed to read '{}': {}", AppStubPath.string(), AppStubData.ErrorCode.message())); IoBuffer AppStubBuffer = AppStubData.Flatten(); CompressedBuffer AppStubCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(AppStubBuffer.GetData(), AppStubBuffer.Size()), OodleCompressor::Selkie, OodleCompressionLevel::HyperFast4); const IoHash AppStubRawHash = AppStubCompressed.DecodeRawHash(); const uint64_t AppStubRawSize = AppStubBuffer.Size(); // Store compressed data in chunk resolver for when the remote runner needs it Resolver.AddChunk(AppStubRawHash, AppStubCompressed.GetCompressed().Flatten().AsIoBuffer()); CbAttachment AppStubAttachment(std::move(AppStubCompressed), AppStubRawHash); CbObjectWriter WorkerWriter; WorkerWriter << "buildsystem_version"sv << Guid::FromString(kBuildSystemVersion); WorkerWriter << "path"sv << "zentest-appstub"sv; WorkerWriter.BeginArray("executables"sv); WorkerWriter.BeginObject(); WorkerWriter << "name"sv << "zentest-appstub"sv; WorkerWriter.AddAttachment("hash"sv, AppStubAttachment); WorkerWriter << "size"sv << AppStubRawSize; WorkerWriter.EndObject(); WorkerWriter.EndArray(); WorkerWriter.BeginArray("functions"sv); WorkerWriter.BeginObject(); WorkerWriter << "name"sv << "Rot13"sv; WorkerWriter << "version"sv << Guid::FromString(kRot13Version); WorkerWriter.EndObject(); WorkerWriter.BeginObject(); WorkerWriter << "name"sv << "Sleep"sv; WorkerWriter << "version"sv << Guid::FromString(kSleepVersion); WorkerWriter.EndObject(); WorkerWriter.EndArray(); CbPackage WorkerPackage; WorkerPackage.SetObject(WorkerWriter.Save()); WorkerPackage.AddAttachment(AppStubAttachment); return WorkerPackage; } // Build a Rot13 action CbObject (not CbPackage) and populate the chunk resolver with the input attachment. static CbObject BuildRot13ActionForSession(std::string_view Input, InMemoryChunkResolver& Resolver) { CompressedBuffer InputCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Input.data(), Input.size()), OodleCompressor::Selkie, OodleCompressionLevel::HyperFast4); const IoHash InputRawHash = InputCompressed.DecodeRawHash(); const uint64_t InputRawSize = Input.size(); // Store compressed data in chunk resolver Resolver.AddChunk(InputRawHash, InputCompressed.GetCompressed().Flatten().AsIoBuffer()); CbAttachment InputAttachment(std::move(InputCompressed), InputRawHash); CbObjectWriter ActionWriter; ActionWriter << "Function"sv << "Rot13"sv; ActionWriter << "FunctionVersion"sv << Guid::FromString(kRot13Version); ActionWriter << "BuildSystemVersion"sv << Guid::FromString(kBuildSystemVersion); ActionWriter.BeginObject("Inputs"sv); ActionWriter.BeginObject("Source"sv); ActionWriter.AddAttachment("RawHash"sv, InputAttachment); ActionWriter << "RawSize"sv << InputRawSize; ActionWriter.EndObject(); ActionWriter.EndObject(); return ActionWriter.Save(); } TEST_SUITE_BEGIN("server.function"); TEST_CASE("function.rot13") { ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); Instance.SetDataDir(TestEnv.CreateNewTestDir()); const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); HttpClient Client(ComputeBaseUri); const IoHash WorkerId = RegisterWorker(Client, TestEnv); // Submit action via legacy /jobs/{worker} endpoint const std::string JobUrl = fmt::format("/jobs/{}", WorkerId.ToHexString()); HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello World"sv)); REQUIRE_MESSAGE(SubmitResp, fmt::format("Job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText())); const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from job submission"); // Poll for result via legacy /jobs/{lsn} endpoint const std::string ResultUrl = fmt::format("/jobs/{}", Lsn); HttpClient::Response ResultResp = PollForResult(Client, ResultUrl); REQUIRE_MESSAGE( ResultResp.StatusCode == HttpResponseCode::OK, fmt::format("Job did not complete in time. Last status: {}\nServer log:\n{}", int(ResultResp.StatusCode), Instance.GetLogOutput())); // Verify result: Rot13("Hello World") == "Uryyb Jbeyq" CbPackage ResultPackage = ResultResp.AsPackage(); REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Action failed (empty result package)\nServer log:\n{}", Instance.GetLogOutput())); CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv); } TEST_CASE("function.workers") { ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); Instance.SetDataDir(TestEnv.CreateNewTestDir()); const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); HttpClient Client(ComputeBaseUri); // Before registration, GET /workers should return an empty list HttpClient::Response EmptyListResp = Client.Get("/workers"sv); REQUIRE_MESSAGE(EmptyListResp, "Failed to list workers before registration"); CHECK_EQ(EmptyListResp.AsObject()["workers"sv].AsArrayView().Num(), 0); const IoHash WorkerId = RegisterWorker(Client, TestEnv); // GET /workers — the registered worker should appear in the listing HttpClient::Response ListResp = Client.Get("/workers"sv); REQUIRE_MESSAGE(ListResp, "Failed to list workers after registration"); bool WorkerFound = false; for (auto& Item : ListResp.AsObject()["workers"sv]) { if (Item.AsHash() == WorkerId) { WorkerFound = true; break; } } REQUIRE_MESSAGE(WorkerFound, fmt::format("Worker {} not found in worker listing", WorkerId.ToHexString())); // GET /workers/{worker} — descriptor should match what was registered const std::string WorkerUrl = fmt::format("/workers/{}", WorkerId.ToHexString()); HttpClient::Response DescResp = Client.Get(WorkerUrl); REQUIRE_MESSAGE(DescResp, fmt::format("Failed to get worker descriptor: status={}", int(DescResp.StatusCode))); CbObject Desc = DescResp.AsObject(); CHECK_EQ(Desc["buildsystem_version"sv].AsUuid(), Guid::FromString(kBuildSystemVersion)); CHECK_EQ(Desc["path"sv].AsString(), "zentest-appstub"sv); bool Rot13Found = false; bool SleepFound = false; for (auto& Item : Desc["functions"sv]) { std::string_view Name = Item.AsObjectView()["name"sv].AsString(); if (Name == "Rot13"sv) { CHECK_EQ(Item.AsObjectView()["version"sv].AsUuid(), Guid::FromString(kRot13Version)); Rot13Found = true; } else if (Name == "Sleep"sv) { CHECK_EQ(Item.AsObjectView()["version"sv].AsUuid(), Guid::FromString(kSleepVersion)); SleepFound = true; } } CHECK_MESSAGE(Rot13Found, "Rot13 function not found in worker descriptor"); CHECK_MESSAGE(SleepFound, "Sleep function not found in worker descriptor"); // GET /workers/{unknown} — should return 404 const std::string UnknownUrl = fmt::format("/workers/{}", IoHash::Zero.ToHexString()); HttpClient::Response NotFoundResp = Client.Get(UnknownUrl); CHECK_EQ(NotFoundResp.StatusCode, HttpResponseCode::NotFound); } TEST_CASE("function.queues.lifecycle") { ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); Instance.SetDataDir(TestEnv.CreateNewTestDir()); const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); HttpClient Client(ComputeBaseUri); const IoHash WorkerId = RegisterWorker(Client, TestEnv); // Create a queue HttpClient::Response CreateResp = Client.Post("/queues"sv); REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}, body={}", int(CreateResp.StatusCode), CreateResp.ToText())); const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id from queue creation"); // Verify the queue appears in the listing HttpClient::Response ListResp = Client.Get("/queues"sv); REQUIRE_MESSAGE(ListResp, "Failed to list queues"); bool QueueFound = false; for (auto& Item : ListResp.AsObject()["queues"sv]) { if (Item.AsObjectView()["queue_id"sv].AsInt32() == QueueId) { QueueFound = true; break; } } REQUIRE_MESSAGE(QueueFound, fmt::format("Queue {} not found in queue listing", QueueId)); // Submit action via queue-scoped endpoint const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello World"sv)); REQUIRE_MESSAGE(SubmitResp, fmt::format("Queue job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText())); const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from queue job submission"); // Poll for completion via queue-scoped /completed endpoint const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}", Lsn, QueueId, Instance.GetLogOutput())); // Retrieve result via queue-scoped /jobs/{lsn} endpoint const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn); HttpClient::Response ResultResp = Client.Get(ResultUrl); REQUIRE_MESSAGE( ResultResp.StatusCode == HttpResponseCode::OK, fmt::format("Failed to retrieve result: status={}\nServer log:\n{}", int(ResultResp.StatusCode), Instance.GetLogOutput())); // Verify result: Rot13("Hello World") == "Uryyb Jbeyq" CbPackage ResultPackage = ResultResp.AsPackage(); REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Empty result package\nServer log:\n{}", Instance.GetLogOutput())); CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv); // Verify queue status reflects completion const std::string StatusUrl = fmt::format("/queues/{}", QueueId); HttpClient::Response StatusResp = Client.Get(StatusUrl); REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); CbObject QueueStatus = StatusResp.AsObject(); CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 1); CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0); CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 0); CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "active"); } TEST_CASE("function.queues.cancel") { ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); Instance.SetDataDir(TestEnv.CreateNewTestDir()); const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); HttpClient Client(ComputeBaseUri); const IoHash WorkerId = RegisterWorker(Client, TestEnv); // Create a queue HttpClient::Response CreateResp = Client.Post("/queues"sv); REQUIRE_MESSAGE(CreateResp, "Queue creation failed"); const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id from queue creation"); // Submit a job const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello World"sv)); REQUIRE_MESSAGE(SubmitResp, fmt::format("Job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText())); // Cancel the queue const std::string QueueUrl = fmt::format("/queues/{}", QueueId); HttpClient::Response CancelResp = Client.Delete(QueueUrl); REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent, fmt::format("Queue cancellation failed: status={}, body={}", int(CancelResp.StatusCode), CancelResp.ToText())); // Verify queue status shows cancelled HttpClient::Response StatusResp = Client.Get(QueueUrl); REQUIRE_MESSAGE(StatusResp, "Failed to get queue status after cancel"); CbObject QueueStatus = StatusResp.AsObject(); CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "cancelled"); } TEST_CASE("function.queues.remote") { ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); Instance.SetDataDir(TestEnv.CreateNewTestDir()); const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); HttpClient Client(ComputeBaseUri); const IoHash WorkerId = RegisterWorker(Client, TestEnv); // Create a remote queue — response includes both an integer queue_id and an OID queue_token HttpClient::Response CreateResp = Client.Post("/queues/remote"sv); REQUIRE_MESSAGE(CreateResp, fmt::format("Remote queue creation failed: status={}, body={}", int(CreateResp.StatusCode), CreateResp.ToText())); CbObject CreateObj = CreateResp.AsObject(); const std::string QueueToken = std::string(CreateObj["queue_token"sv].AsString()); REQUIRE_MESSAGE(!QueueToken.empty(), "Expected non-empty queue_token from remote queue creation"); // All subsequent requests use the opaque token in place of the integer queue id const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueToken, WorkerId.ToHexString()); HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello World"sv)); REQUIRE_MESSAGE(SubmitResp, fmt::format("Remote queue job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText())); const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from remote queue job submission"); // Poll for completion via the token-addressed /completed endpoint const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueToken); REQUIRE_MESSAGE( PollForLsnInCompleted(Client, CompletedUrl, Lsn), fmt::format("LSN {} did not appear in remote queue completed list within timeout\nServer log:\n{}", Lsn, Instance.GetLogOutput())); // Retrieve result via the token-addressed /jobs/{lsn} endpoint const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueToken, Lsn); HttpClient::Response ResultResp = Client.Get(ResultUrl); REQUIRE_MESSAGE(ResultResp.StatusCode == HttpResponseCode::OK, fmt::format("Failed to retrieve result from remote queue: status={}\nServer log:\n{}", int(ResultResp.StatusCode), Instance.GetLogOutput())); // Verify result: Rot13("Hello World") == "Uryyb Jbeyq" CbPackage ResultPackage = ResultResp.AsPackage(); REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Empty result package\nServer log:\n{}", Instance.GetLogOutput())); CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv); } TEST_CASE("function.queues.cancel_running") { ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); Instance.SetDataDir(TestEnv.CreateNewTestDir()); const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); HttpClient Client(ComputeBaseUri); const IoHash WorkerId = RegisterWorker(Client, TestEnv); // Create a queue HttpClient::Response CreateResp = Client.Post("/queues"sv); REQUIRE_MESSAGE(CreateResp, "Queue creation failed"); const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id from queue creation"); // Submit a Sleep job long enough that it will still be running when we cancel const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildSleepActionPackage("data"sv, 30'000)); REQUIRE_MESSAGE(SubmitResp, fmt::format("Sleep job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText())); const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission"); // Wait for the worker process to start executing before cancelling Sleep(1'000); // Cancel the queue, which should interrupt the running Sleep job const std::string QueueUrl = fmt::format("/queues/{}", QueueId); HttpClient::Response CancelResp = Client.Delete(QueueUrl); REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent, fmt::format("Queue cancellation failed: status={}, body={}", int(CancelResp.StatusCode), CancelResp.ToText())); // The cancelled job should appear in the /completed endpoint once the process exits const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), fmt::format("LSN {} did not appear in queue {} completed list after cancel\nServer log:\n{}", Lsn, QueueId, Instance.GetLogOutput())); // Verify the queue reflects one cancelled action HttpClient::Response StatusResp = Client.Get(QueueUrl); REQUIRE_MESSAGE(StatusResp, "Failed to get queue status after cancel"); CbObject QueueStatus = StatusResp.AsObject(); CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "cancelled"); CHECK_EQ(QueueStatus["cancelled_count"sv].AsInt32(), 1); CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); } TEST_CASE("function.queues.remote_cancel") { ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); Instance.SetDataDir(TestEnv.CreateNewTestDir()); const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); HttpClient Client(ComputeBaseUri); const IoHash WorkerId = RegisterWorker(Client, TestEnv); // Create a remote queue to obtain an OID token for token-addressed cancellation HttpClient::Response CreateResp = Client.Post("/queues/remote"sv); REQUIRE_MESSAGE(CreateResp, fmt::format("Remote queue creation failed: status={}, body={}", int(CreateResp.StatusCode), CreateResp.ToText())); const std::string QueueToken = std::string(CreateResp.AsObject()["queue_token"sv].AsString()); REQUIRE_MESSAGE(!QueueToken.empty(), "Expected non-empty queue_token from remote queue creation"); // Submit a long-running Sleep job via the token-addressed endpoint const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueToken, WorkerId.ToHexString()); HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildSleepActionPackage("data"sv, 30'000)); REQUIRE_MESSAGE(SubmitResp, fmt::format("Sleep job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText())); const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission"); // Wait for the worker process to start executing before cancelling Sleep(1'000); // Cancel the queue via its OID token const std::string QueueUrl = fmt::format("/queues/{}", QueueToken); HttpClient::Response CancelResp = Client.Delete(QueueUrl); REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent, fmt::format("Remote queue cancellation failed: status={}, body={}", int(CancelResp.StatusCode), CancelResp.ToText())); // The cancelled job should appear in the token-addressed /completed endpoint const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueToken); REQUIRE_MESSAGE( PollForLsnInCompleted(Client, CompletedUrl, Lsn), fmt::format("LSN {} did not appear in remote queue completed list after cancel\nServer log:\n{}", Lsn, Instance.GetLogOutput())); // Verify the queue status reflects the cancellation HttpClient::Response StatusResp = Client.Get(QueueUrl); REQUIRE_MESSAGE(StatusResp, "Failed to get remote queue status after cancel"); CbObject QueueStatus = StatusResp.AsObject(); CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "cancelled"); CHECK_EQ(QueueStatus["cancelled_count"sv].AsInt32(), 1); CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); } TEST_CASE("function.queues.drain") { ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); Instance.SetDataDir(TestEnv.CreateNewTestDir()); const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); HttpClient Client(ComputeBaseUri); const IoHash WorkerId = RegisterWorker(Client, TestEnv); // Create a queue HttpClient::Response CreateResp = Client.Post("/queues"sv); REQUIRE_MESSAGE(CreateResp, "Queue creation failed"); const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); const std::string QueueUrl = fmt::format("/queues/{}", QueueId); // Submit a long-running job so we can verify it completes even after drain const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); HttpClient::Response Submit1 = Client.Post(JobUrl, BuildSleepActionPackage("data"sv, 2'000)); REQUIRE_MESSAGE(Submit1, fmt::format("First job submission failed: status={}", int(Submit1.StatusCode))); const int Lsn1 = Submit1.AsObject()["lsn"sv].AsInt32(); // Drain the queue const std::string DrainUrl = fmt::format("/queues/{}/drain", QueueId); HttpClient::Response DrainResp = Client.Post(DrainUrl); REQUIRE_MESSAGE(DrainResp, fmt::format("Drain failed: status={}, body={}", int(DrainResp.StatusCode), DrainResp.ToText())); CHECK_EQ(std::string(DrainResp.AsObject()["state"sv].AsString()), "draining"); // Second submission should be rejected with 424 HttpClient::Response Submit2 = Client.Post(JobUrl, BuildRot13ActionPackage("Hello"sv)); CHECK_EQ(Submit2.StatusCode, HttpResponseCode::FailedDependency); CHECK_EQ(std::string(Submit2.AsObject()["error"sv].AsString()), "queue is draining"); // First job should still complete const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn1), fmt::format("LSN {} did not complete after drain\nServer log:\n{}", Lsn1, Instance.GetLogOutput())); // Queue status should show draining + complete HttpClient::Response StatusResp = Client.Get(QueueUrl); REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); CbObject QueueStatus = StatusResp.AsObject(); CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "draining"); CHECK(QueueStatus["is_complete"sv].AsBool()); } TEST_CASE("function.priority") { // Spawn server with max-actions=1 to guarantee serialized action execution, // which lets us deterministically verify that higher-priority pending jobs // are scheduled before lower-priority ones. ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); Instance.SetDataDir(TestEnv.CreateNewTestDir()); const uint16_t Port = Instance.SpawnServerAndWaitUntilReady("--max-actions=1"); REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); HttpClient Client(ComputeBaseUri); const IoHash WorkerId = RegisterWorker(Client, TestEnv); // Create a queue for all test jobs HttpClient::Response CreateResp = Client.Post("/queues"sv); REQUIRE_MESSAGE(CreateResp, "Queue creation failed"); const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id"); // Submit a blocker Sleep job to occupy the single execution slot. // Once the blocker is running, the scheduler must choose among the pending // jobs by priority when the slot becomes free. const std::string BlockerJobUrl = fmt::format("/queues/{}/jobs/{}?priority=0", QueueId, WorkerId.ToHexString()); HttpClient::Response BlockerResp = Client.Post(BlockerJobUrl, BuildSleepActionPackage("data"sv, 1'000)); REQUIRE_MESSAGE(BlockerResp, fmt::format("Blocker job submission failed: status={}", int(BlockerResp.StatusCode))); // Submit 3 low-priority Rot13 jobs const std::string LowJobUrl = fmt::format("/queues/{}/jobs/{}?priority=0", QueueId, WorkerId.ToHexString()); HttpClient::Response LowResp1 = Client.Post(LowJobUrl, BuildRot13ActionPackage("low1"sv)); REQUIRE_MESSAGE(LowResp1, "Low-priority job 1 submission failed"); const int LsnLow1 = LowResp1.AsObject()["lsn"sv].AsInt32(); HttpClient::Response LowResp2 = Client.Post(LowJobUrl, BuildRot13ActionPackage("low2"sv)); REQUIRE_MESSAGE(LowResp2, "Low-priority job 2 submission failed"); const int LsnLow2 = LowResp2.AsObject()["lsn"sv].AsInt32(); HttpClient::Response LowResp3 = Client.Post(LowJobUrl, BuildRot13ActionPackage("low3"sv)); REQUIRE_MESSAGE(LowResp3, "Low-priority job 3 submission failed"); const int LsnLow3 = LowResp3.AsObject()["lsn"sv].AsInt32(); // Submit 1 high-priority Rot13 job — should execute before the low-priority ones const std::string HighJobUrl = fmt::format("/queues/{}/jobs/{}?priority=10", QueueId, WorkerId.ToHexString()); HttpClient::Response HighResp = Client.Post(HighJobUrl, BuildRot13ActionPackage("high"sv)); REQUIRE_MESSAGE(HighResp, "High-priority job submission failed"); const int LsnHigh = HighResp.AsObject()["lsn"sv].AsInt32(); // Wait for all 4 priority-test jobs to appear in the queue's completed list. // This avoids any snapshot-timing race: by the time we compare timestamps, all // jobs have already finished and their history entries are stable. const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); { bool AllCompleted = false; Stopwatch WaitTimer; while (!AllCompleted && WaitTimer.GetElapsedTimeMs() < 30'000) { HttpClient::Response Resp = Client.Get(CompletedUrl); if (Resp) { bool FoundHigh = false; bool FoundLow1 = false; bool FoundLow2 = false; bool FoundLow3 = false; CbObject RespObj = Resp.AsObject(); for (auto& Item : RespObj["completed"sv]) { const int Lsn = Item.AsInt32(); if (Lsn == LsnHigh) { FoundHigh = true; } else if (Lsn == LsnLow1) { FoundLow1 = true; } else if (Lsn == LsnLow2) { FoundLow2 = true; } else if (Lsn == LsnLow3) { FoundLow3 = true; } } AllCompleted = FoundHigh && FoundLow1 && FoundLow2 && FoundLow3; } if (!AllCompleted) { Sleep(100); } } REQUIRE_MESSAGE( AllCompleted, fmt::format( "Not all priority test jobs completed within timeout (lsnHigh={} lsnLow1={} lsnLow2={} lsnLow3={})\nServer log:\n{}", LsnHigh, LsnLow1, LsnLow2, LsnLow3, Instance.GetLogOutput())); } // Query the queue-scoped history to obtain the time_Completed timestamp for each // job. The history endpoint records when each RunnerAction::State transition // occurred, so time_Completed is the wall-clock tick at which the action finished. // Using the queue-scoped endpoint avoids exposing history from other queues. const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); HttpClient::Response HistoryResp = Client.Get(HistoryUrl); REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); CbObject HistoryObj = HistoryResp.AsObject(); auto GetCompletedTimestamp = [&](int Lsn) -> uint64_t { for (auto& Item : HistoryObj["history"sv]) { if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) { return Item.AsObjectView()["time_Completed"sv].AsUInt64(); } } return 0; }; const uint64_t TimeHigh = GetCompletedTimestamp(LsnHigh); const uint64_t TimeLow1 = GetCompletedTimestamp(LsnLow1); const uint64_t TimeLow2 = GetCompletedTimestamp(LsnLow2); const uint64_t TimeLow3 = GetCompletedTimestamp(LsnLow3); REQUIRE_MESSAGE(TimeHigh != 0, fmt::format("lsnHigh={} not found in action history", LsnHigh)); REQUIRE_MESSAGE(TimeLow1 != 0, fmt::format("lsnLow1={} not found in action history", LsnLow1)); REQUIRE_MESSAGE(TimeLow2 != 0, fmt::format("lsnLow2={} not found in action history", LsnLow2)); REQUIRE_MESSAGE(TimeLow3 != 0, fmt::format("lsnLow3={} not found in action history", LsnLow3)); // The high-priority job must have completed strictly before every low-priority job CHECK_MESSAGE(TimeHigh < TimeLow1, fmt::format("Priority ordering violated: lsnHigh={} completed at t={} but lsnLow1={} completed at t={} (expected later)", LsnHigh, TimeHigh, LsnLow1, TimeLow1)); CHECK_MESSAGE(TimeHigh < TimeLow2, fmt::format("Priority ordering violated: lsnHigh={} completed at t={} but lsnLow2={} completed at t={} (expected later)", LsnHigh, TimeHigh, LsnLow2, TimeLow2)); CHECK_MESSAGE(TimeHigh < TimeLow3, fmt::format("Priority ordering violated: lsnHigh={} completed at t={} but lsnLow3={} completed at t={} (expected later)", LsnHigh, TimeHigh, LsnLow3, TimeLow3)); } ////////////////////////////////////////////////////////////////////////// // Remote worker synchronization tests // // These tests exercise the orchestrator discovery path where new compute // nodes appear over time and must receive previously registered workers // via SyncWorkersToRunner(). TEST_CASE("function.remote.worker_sync_on_discovery") { // Spawn real zenserver in compute mode ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); Instance.SetDataDir(TestEnv.CreateNewTestDir()); const uint16_t ServerPort = Instance.SpawnServerAndWaitUntilReady(); REQUIRE_MESSAGE(ServerPort != 0, Instance.GetLogOutput()); const std::string ServerUri = fmt::format("http://localhost:{}", ServerPort); // Start mock orchestrator with empty worker list MockOrchestratorFixture MockOrch; // Create session infrastructure InMemoryChunkResolver Resolver; ScopedTemporaryDirectory SessionBaseDir; zen::compute::ComputeServiceSession Session(Resolver); Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint()); Session.SetOrchestratorBasePath(SessionBaseDir.Path()); Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); // Register worker on session (stored locally, no runners yet) CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); Session.RegisterWorker(WorkerPackage); // Update mock orchestrator to advertise the real server MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", ServerUri}})); // Wait for scheduler to discover the runner (~5s throttle + margin) Sleep(7'000); // Submit Rot13 action via session CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver); zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueAction(ActionObj, 0); REQUIRE_MESSAGE(EnqueueRes, "Action enqueue failed"); // Poll for result CbPackage ResultPackage; HttpResponseCode ResultCode = HttpResponseCode::Accepted; Stopwatch Timer; while (Timer.GetElapsedTimeMs() < 30'000) { ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage); if (ResultCode == HttpResponseCode::OK) { break; } Sleep(200); } REQUIRE_MESSAGE( ResultCode == HttpResponseCode::OK, fmt::format("Action did not complete in time. Last status: {}\nServer log:\n{}", int(ResultCode), Instance.GetLogOutput())); REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Empty result package\nServer log:\n{}", Instance.GetLogOutput())); CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv); Session.Shutdown(); } TEST_CASE("function.remote.late_runner_discovery") { // Spawn first server ZenServerInstance Instance1(TestEnv, ZenServerInstance::ServerMode::kComputeServer); Instance1.SetDataDir(TestEnv.CreateNewTestDir()); const uint16_t Port1 = Instance1.SpawnServerAndWaitUntilReady(); REQUIRE_MESSAGE(Port1 != 0, Instance1.GetLogOutput()); const std::string ServerUri1 = fmt::format("http://localhost:{}", Port1); // Start mock orchestrator advertising W1 MockOrchestratorFixture MockOrch; MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", ServerUri1}})); // Create session and register worker InMemoryChunkResolver Resolver; ScopedTemporaryDirectory SessionBaseDir; zen::compute::ComputeServiceSession Session(Resolver); Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint()); Session.SetOrchestratorBasePath(SessionBaseDir.Path()); Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); Session.RegisterWorker(WorkerPackage); // Wait for W1 discovery Sleep(7'000); // Baseline: submit Rot13 action and verify it completes on W1 { CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver); zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueAction(ActionObj, 0); REQUIRE_MESSAGE(EnqueueRes, "Baseline action enqueue failed"); CbPackage ResultPackage; HttpResponseCode ResultCode = HttpResponseCode::Accepted; Stopwatch Timer; while (Timer.GetElapsedTimeMs() < 30'000) { ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage); if (ResultCode == HttpResponseCode::OK) { break; } Sleep(200); } REQUIRE_MESSAGE(ResultCode == HttpResponseCode::OK, fmt::format("Baseline action did not complete in time\nServer log:\n{}", Instance1.GetLogOutput())); CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv); } // Spawn second server ZenServerInstance Instance2(TestEnv, ZenServerInstance::ServerMode::kComputeServer); Instance2.SetDataDir(TestEnv.CreateNewTestDir()); const uint16_t Port2 = Instance2.SpawnServerAndWaitUntilReady(); REQUIRE_MESSAGE(Port2 != 0, Instance2.GetLogOutput()); const std::string ServerUri2 = fmt::format("http://localhost:{}", Port2); // Update mock orchestrator to include both W1 and W2 MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", ServerUri1}, {"worker-2", ServerUri2}})); // Wait for W2 discovery Sleep(7'000); // Verify W2 received the worker by querying its /compute/workers endpoint directly { const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port2); HttpClient Client(ComputeBaseUri); HttpClient::Response ListResp = Client.Get("/workers"sv); REQUIRE_MESSAGE(ListResp, "Failed to list workers on W2"); bool WorkerFound = false; for (auto& Item : ListResp.AsObject()["workers"sv]) { if (Item.AsHash() == WorkerPackage.GetObjectHash()) { WorkerFound = true; break; } } REQUIRE_MESSAGE(WorkerFound, fmt::format("Worker not found on W2 after discovery — SyncWorkersToRunner may have failed\nW2 log:\n{}", Instance2.GetLogOutput())); } // Submit another action and verify it completes (could run on either W1 or W2) { CbObject ActionObj = BuildRot13ActionForSession("Second Test"sv, Resolver); zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueAction(ActionObj, 0); REQUIRE_MESSAGE(EnqueueRes, "Second action enqueue failed"); CbPackage ResultPackage; HttpResponseCode ResultCode = HttpResponseCode::Accepted; Stopwatch Timer; while (Timer.GetElapsedTimeMs() < 30'000) { ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage); if (ResultCode == HttpResponseCode::OK) { break; } Sleep(200); } REQUIRE_MESSAGE(ResultCode == HttpResponseCode::OK, fmt::format("Second action did not complete in time\nW1 log:\n{}\nW2 log:\n{}", Instance1.GetLogOutput(), Instance2.GetLogOutput())); // Rot13("Second Test") = "Frpbaq Grfg" CHECK_EQ(GetRot13Output(ResultPackage), "Frpbaq Grfg"sv); } Session.Shutdown(); } TEST_CASE("function.remote.queue_association") { // Spawn real zenserver as a remote compute node ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); Instance.SetDataDir(TestEnv.CreateNewTestDir()); REQUIRE_MESSAGE(Instance.SpawnServerAndWaitUntilReady() != 0, Instance.GetLogOutput()); // Start mock orchestrator advertising the server MockOrchestratorFixture MockOrch; MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", Instance.GetBaseUri()}})); // Create session infrastructure InMemoryChunkResolver Resolver; ScopedTemporaryDirectory SessionBaseDir; zen::compute::ComputeServiceSession Session(Resolver); Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint()); Session.SetOrchestratorBasePath(SessionBaseDir.Path()); Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); // Register worker on session CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); Session.RegisterWorker(WorkerPackage); // Wait for scheduler to discover the runner Sleep(7'000); // Create a local queue and submit action to it auto QueueResult = Session.CreateQueue(); REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create local queue"); const int QueueId = QueueResult.QueueId; CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver); zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueActionToQueue(QueueId, ActionObj, 0); REQUIRE_MESSAGE(EnqueueRes, "Action enqueue to queue failed"); // Poll for result CbPackage ResultPackage; HttpResponseCode ResultCode = HttpResponseCode::Accepted; Stopwatch Timer; while (Timer.GetElapsedTimeMs() < 30'000) { ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage); if (ResultCode == HttpResponseCode::OK) { break; } Sleep(200); } REQUIRE_MESSAGE( ResultCode == HttpResponseCode::OK, fmt::format("Action did not complete in time. Last status: {}\nServer log:\n{}", int(ResultCode), Instance.GetLogOutput())); REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Empty result package\nServer log:\n{}", Instance.GetLogOutput())); CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv); // Verify that a non-implicit remote queue was created on the compute node HttpClient Client(Instance.GetBaseUri() + "/compute"); HttpClient::Response QueuesResp = Client.Get("/queues"sv); REQUIRE_MESSAGE(QueuesResp, "Failed to list queues on remote server"); bool RemoteQueueFound = false; for (auto& Item : QueuesResp.AsObject()["queues"sv]) { if (!Item.AsObjectView()["implicit"sv].AsBool()) { RemoteQueueFound = true; break; } } CHECK_MESSAGE(RemoteQueueFound, "Expected a non-implicit remote queue on the compute node"); Session.Shutdown(); } TEST_CASE("function.remote.queue_cancel_propagation") { // Spawn real zenserver as a remote compute node ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); Instance.SetDataDir(TestEnv.CreateNewTestDir()); REQUIRE_MESSAGE(Instance.SpawnServerAndWaitUntilReady() != 0, Instance.GetLogOutput()); // Start mock orchestrator advertising the server MockOrchestratorFixture MockOrch; MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", Instance.GetBaseUri()}})); // Create session infrastructure InMemoryChunkResolver Resolver; ScopedTemporaryDirectory SessionBaseDir; zen::compute::ComputeServiceSession Session(Resolver); Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint()); Session.SetOrchestratorBasePath(SessionBaseDir.Path()); Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); // Register worker on session CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); Session.RegisterWorker(WorkerPackage); // Wait for scheduler to discover the runner Sleep(7'000); // Create a local queue and submit a long-running Sleep action auto QueueResult = Session.CreateQueue(); REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create local queue"); const int QueueId = QueueResult.QueueId; CbObject ActionObj = BuildSleepActionForSession("data"sv, 30'000, Resolver); zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueActionToQueue(QueueId, ActionObj, 0); REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed"); // Wait for the action to start running on the remote Sleep(2'000); // Cancel the local queue — this should propagate to the remote Session.CancelQueue(QueueId); // Poll for the action to complete (as cancelled) CbPackage ResultPackage; HttpResponseCode ResultCode = HttpResponseCode::Accepted; Stopwatch Timer; while (Timer.GetElapsedTimeMs() < 30'000) { ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage); if (ResultCode == HttpResponseCode::OK) { break; } Sleep(200); } // Verify the local queue shows cancelled auto QueueStatus = Session.GetQueueStatus(QueueId); CHECK(QueueStatus.State == zen::compute::ComputeServiceSession::QueueState::Cancelled); // Verify the remote queue on the compute node is also cancelled HttpClient Client(Instance.GetBaseUri() + "/compute"); HttpClient::Response QueuesResp = Client.Get("/queues"sv); REQUIRE_MESSAGE(QueuesResp, "Failed to list queues on remote server"); bool RemoteQueueCancelled = false; for (auto& Item : QueuesResp.AsObject()["queues"sv]) { if (!Item.AsObjectView()["implicit"sv].AsBool()) { RemoteQueueCancelled = std::string(Item.AsObjectView()["state"sv].AsString()) == "cancelled"; break; } } CHECK_MESSAGE(RemoteQueueCancelled, "Expected the remote queue to be cancelled"); Session.Shutdown(); } TEST_CASE("function.abandon_running_http") { // Spawn a real zenserver to execute a long-running action, then abandon via HTTP endpoint ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); Instance.SetDataDir(TestEnv.CreateNewTestDir()); const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); HttpClient Client(ComputeBaseUri); const IoHash WorkerId = RegisterWorker(Client, TestEnv); // Create a queue and submit a long-running Sleep job HttpClient::Response CreateResp = Client.Post("/queues"sv); REQUIRE_MESSAGE(CreateResp, "Queue creation failed"); const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id"); const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildSleepActionPackage("data"sv, 30'000)); REQUIRE_MESSAGE(SubmitResp, fmt::format("Sleep job submission failed: status={}", int(SubmitResp.StatusCode))); const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN"); // Wait for the process to start running Sleep(1'000); // Verify the ready endpoint returns OK before abandon { HttpClient::Response ReadyResp = Client.Get("/ready"sv); CHECK(ReadyResp.StatusCode == HttpResponseCode::OK); } // Trigger abandon via the HTTP endpoint HttpClient::Response AbandonResp = Client.Post("/abandon"sv); REQUIRE_MESSAGE(AbandonResp.StatusCode == HttpResponseCode::OK, fmt::format("Abandon request failed: status={}, body={}", int(AbandonResp.StatusCode), AbandonResp.ToText())); // Ready endpoint should now return 503 { HttpClient::Response ReadyResp = Client.Get("/ready"sv); CHECK(ReadyResp.StatusCode == HttpResponseCode::ServiceUnavailable); } // The abandoned action should appear in the completed endpoint once the process exits const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), fmt::format("LSN {} did not appear in queue {} completed list after abandon\nServer log:\n{}", Lsn, QueueId, Instance.GetLogOutput())); // Verify the queue reflects one abandoned action const std::string QueueUrl = fmt::format("/queues/{}", QueueId); HttpClient::Response StatusResp = Client.Get(QueueUrl); REQUIRE_MESSAGE(StatusResp, "Failed to get queue status after abandon"); CbObject QueueStatus = StatusResp.AsObject(); CHECK_EQ(QueueStatus["abandoned_count"sv].AsInt32(), 1); CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0); // Submitting new work should be rejected HttpClient::Response RejectedResp = Client.Post(JobUrl, BuildRot13ActionPackage("rejected"sv)); CHECK_MESSAGE(RejectedResp.StatusCode != HttpResponseCode::OK, "Expected action submission to be rejected in Abandoned state"); } TEST_CASE("function.session.abandon_pending") { // Create a session with no runners so actions stay pending InMemoryChunkResolver Resolver; ScopedTemporaryDirectory SessionBaseDir; zen::compute::ComputeServiceSession Session(Resolver); Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); Session.RegisterWorker(WorkerPackage); // Enqueue several actions — they will stay pending because there are no runners auto QueueResult = Session.CreateQueue(); REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create queue"); CbObject ActionObj = BuildRot13ActionForSession("abandon-test"sv, Resolver); auto Enqueue1 = Session.EnqueueActionToQueue(QueueResult.QueueId, ActionObj, 0); auto Enqueue2 = Session.EnqueueActionToQueue(QueueResult.QueueId, ActionObj, 0); auto Enqueue3 = Session.EnqueueActionToQueue(QueueResult.QueueId, ActionObj, 0); REQUIRE_MESSAGE(Enqueue1, "Failed to enqueue action 1"); REQUIRE_MESSAGE(Enqueue2, "Failed to enqueue action 2"); REQUIRE_MESSAGE(Enqueue3, "Failed to enqueue action 3"); // Transition to Abandoned — should mark all pending actions as Abandoned bool Transitioned = Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Abandoned); CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned"); CHECK(Session.GetSessionState() == zen::compute::ComputeServiceSession::SessionState::Abandoned); CHECK(!Session.IsHealthy()); // Give the scheduler thread time to process the state changes Sleep(2'000); // All three actions should now be in the results map as abandoned for (int Lsn : {Enqueue1.Lsn, Enqueue2.Lsn, Enqueue3.Lsn}) { CbPackage Result; HttpResponseCode Code = Session.GetActionResult(Lsn, Result); CHECK_MESSAGE(Code == HttpResponseCode::OK, fmt::format("Expected action LSN {} to be in results (got {})", Lsn, int(Code))); } // Queue should show 0 active, 3 abandoned auto Status = Session.GetQueueStatus(QueueResult.QueueId); CHECK_EQ(Status.ActiveCount, 0); CHECK_EQ(Status.AbandonedCount, 3); // New actions should be rejected auto Rejected = Session.EnqueueActionToQueue(QueueResult.QueueId, ActionObj, 0); CHECK_MESSAGE(!Rejected, "Expected action submission to be rejected in Abandoned state"); // Abandoned → Sunset should be valid CHECK(Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Sunset)); Session.Shutdown(); } TEST_CASE("function.session.abandon_running") { // Spawn a real zenserver as a remote compute node ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); Instance.SetDataDir(TestEnv.CreateNewTestDir()); REQUIRE_MESSAGE(Instance.SpawnServerAndWaitUntilReady() != 0, Instance.GetLogOutput()); // Start mock orchestrator advertising the server MockOrchestratorFixture MockOrch; MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", Instance.GetBaseUri()}})); // Create session infrastructure InMemoryChunkResolver Resolver; ScopedTemporaryDirectory SessionBaseDir; zen::compute::ComputeServiceSession Session(Resolver); Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint()); Session.SetOrchestratorBasePath(SessionBaseDir.Path()); Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); Session.RegisterWorker(WorkerPackage); // Wait for scheduler to discover the runner Sleep(7'000); // Create a queue and submit a long-running Sleep action auto QueueResult = Session.CreateQueue(); REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create queue"); const int QueueId = QueueResult.QueueId; CbObject ActionObj = BuildSleepActionForSession("data"sv, 30'000, Resolver); auto EnqueueRes = Session.EnqueueActionToQueue(QueueId, ActionObj, 0); REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed"); // Wait for the action to start running on the remote Sleep(2'000); // Transition to Abandoned — should abandon the running action bool Transitioned = Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Abandoned); CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned"); CHECK(!Session.IsHealthy()); // Poll for the action to complete (as abandoned) CbPackage ResultPackage; HttpResponseCode ResultCode = HttpResponseCode::Accepted; Stopwatch Timer; while (Timer.GetElapsedTimeMs() < 30'000) { ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage); if (ResultCode == HttpResponseCode::OK) { break; } Sleep(200); } REQUIRE_MESSAGE(ResultCode == HttpResponseCode::OK, fmt::format("Action did not complete within timeout\nServer log:\n{}", Instance.GetLogOutput())); // Verify the queue shows abandoned, not completed auto QueueStatus = Session.GetQueueStatus(QueueId); CHECK_EQ(QueueStatus.ActiveCount, 0); CHECK_EQ(QueueStatus.AbandonedCount, 1); CHECK_EQ(QueueStatus.CompletedCount, 0); Session.Shutdown(); } TEST_CASE("function.remote.abandon_propagation") { // Spawn real zenserver as a remote compute node ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); Instance.SetDataDir(TestEnv.CreateNewTestDir()); REQUIRE_MESSAGE(Instance.SpawnServerAndWaitUntilReady() != 0, Instance.GetLogOutput()); // Start mock orchestrator advertising the server MockOrchestratorFixture MockOrch; MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", Instance.GetBaseUri()}})); // Create session infrastructure InMemoryChunkResolver Resolver; ScopedTemporaryDirectory SessionBaseDir; zen::compute::ComputeServiceSession Session(Resolver); Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint()); Session.SetOrchestratorBasePath(SessionBaseDir.Path()); Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); // Register worker on session CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); Session.RegisterWorker(WorkerPackage); // Wait for scheduler to discover the runner Sleep(7'000); // Create a local queue and submit a long-running Sleep action auto QueueResult = Session.CreateQueue(); REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create local queue"); const int QueueId = QueueResult.QueueId; CbObject ActionObj = BuildSleepActionForSession("data"sv, 30'000, Resolver); auto EnqueueRes = Session.EnqueueActionToQueue(QueueId, ActionObj, 0); REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed"); // Wait for the action to start running on the remote Sleep(2'000); // Transition to Abandoned — should abandon the running action and propagate bool Transitioned = Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Abandoned); CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned"); // Poll for the action to complete CbPackage ResultPackage; HttpResponseCode ResultCode = HttpResponseCode::Accepted; Stopwatch Timer; while (Timer.GetElapsedTimeMs() < 30'000) { ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage); if (ResultCode == HttpResponseCode::OK) { break; } Sleep(200); } REQUIRE_MESSAGE(ResultCode == HttpResponseCode::OK, fmt::format("Action did not complete within timeout\nServer log:\n{}", Instance.GetLogOutput())); // Verify the local queue shows abandoned auto QueueStatus = Session.GetQueueStatus(QueueId); CHECK_EQ(QueueStatus.ActiveCount, 0); CHECK_EQ(QueueStatus.AbandonedCount, 1); // Session should not be healthy CHECK(!Session.IsHealthy()); // The remote compute node should still be healthy (only the parent abandoned) HttpClient RemoteClient(Instance.GetBaseUri() + "/compute"); HttpClient::Response ReadyResp = RemoteClient.Get("/ready"sv); CHECK_MESSAGE(ReadyResp.StatusCode == HttpResponseCode::OK, "Remote compute node should still be healthy"); Session.Shutdown(); } TEST_SUITE_END(); } // namespace zen::tests::compute #endif