diff options
Diffstat (limited to 'src/zenserver-test/compute-tests.cpp')
| -rw-r--r-- | src/zenserver-test/compute-tests.cpp | 1700 |
1 files changed, 1700 insertions, 0 deletions
diff --git a/src/zenserver-test/compute-tests.cpp b/src/zenserver-test/compute-tests.cpp new file mode 100644 index 000000000..c90ac5d8b --- /dev/null +++ b/src/zenserver-test/compute-tests.cpp @@ -0,0 +1,1700 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zencore/zencore.h> + +#if ZEN_WITH_TESTS && ZEN_WITH_COMPUTE_SERVICES + +# include <zenbase/zenbase.h> +# include <zencore/compactbinary.h> +# include <zencore/compactbinarybuilder.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/compress.h> +# include <zencore/filesystem.h> +# include <zencore/guid.h> +# include <zencore/iobuffer.h> +# include <zencore/iohash.h> +# include <zencore/testing.h> +# include <zencore/testutils.h> +# include <zencore/thread.h> +# include <zencore/timer.h> +# include <zenhttp/httpclient.h> +# include <zenhttp/httpserver.h> +# include <zencompute/computeservice.h> +# include <zenstore/zenstore.h> +# include <zenutil/zenserverprocess.h> + +# include "zenserver-test.h" + +# include <thread> + +namespace zen::tests::compute { + +using namespace std::literals; + +// BuildSystemVersion and function version GUIDs matching zentest-appstub +static constexpr std::string_view kBuildSystemVersion = "17fe280d-ccd8-4be8-a9d1-89c944a70969"; +static constexpr std::string_view kRot13Version = "13131313-1313-1313-1313-131313131313"; +static constexpr std::string_view kSleepVersion = "88888888-8888-8888-8888-888888888888"; + +// In-memory implementation of ChunkResolver for test use. +// Stores compressed data keyed by decompressed content hash. +class InMemoryChunkResolver : public ChunkResolver +{ +public: + IoBuffer FindChunkByCid(const IoHash& DecompressedId) override + { + auto It = m_Chunks.find(DecompressedId); + if (It != m_Chunks.end()) + { + return It->second; + } + return {}; + } + + void AddChunk(const IoHash& DecompressedId, IoBuffer Data) { m_Chunks[DecompressedId] = std::move(Data); } + +private: + std::unordered_map<IoHash, IoBuffer> m_Chunks; +}; + +// Read, compress, and register zentest-appstub as a worker. +// Returns the WorkerId (hash of the worker package object). +static IoHash +RegisterWorker(HttpClient& Client, ZenServerEnvironment& Env) +{ + std::filesystem::path AppStubPath = Env.ProgramBaseDir() / ("zentest-appstub" ZEN_EXE_SUFFIX_LITERAL); + + FileContents AppStubData = zen::ReadFile(AppStubPath); + REQUIRE_MESSAGE(!AppStubData.ErrorCode, fmt::format("Failed to read '{}': {}", AppStubPath.string(), AppStubData.ErrorCode.message())); + + IoBuffer AppStubBuffer = AppStubData.Flatten(); + + CompressedBuffer AppStubCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(AppStubBuffer.GetData(), AppStubBuffer.Size()), + OodleCompressor::Selkie, + OodleCompressionLevel::HyperFast4); + + const IoHash AppStubRawHash = AppStubCompressed.DecodeRawHash(); + const uint64_t AppStubRawSize = AppStubBuffer.Size(); + + CbAttachment AppStubAttachment(std::move(AppStubCompressed), AppStubRawHash); + + CbObjectWriter WorkerWriter; + WorkerWriter << "buildsystem_version"sv << Guid::FromString(kBuildSystemVersion); + WorkerWriter << "path"sv + << "zentest-appstub"sv; + + WorkerWriter.BeginArray("executables"sv); + WorkerWriter.BeginObject(); + WorkerWriter << "name"sv + << "zentest-appstub"sv; + WorkerWriter.AddAttachment("hash"sv, AppStubAttachment); + WorkerWriter << "size"sv << AppStubRawSize; + WorkerWriter.EndObject(); + WorkerWriter.EndArray(); + + WorkerWriter.BeginArray("functions"sv); + WorkerWriter.BeginObject(); + WorkerWriter << "name"sv + << "Rot13"sv; + WorkerWriter << "version"sv << Guid::FromString(kRot13Version); + WorkerWriter.EndObject(); + WorkerWriter.BeginObject(); + WorkerWriter << "name"sv + << "Sleep"sv; + WorkerWriter << "version"sv << Guid::FromString(kSleepVersion); + WorkerWriter.EndObject(); + WorkerWriter.EndArray(); + + CbPackage WorkerPackage; + WorkerPackage.SetObject(WorkerWriter.Save()); + WorkerPackage.AddAttachment(AppStubAttachment); + + const IoHash WorkerId = WorkerPackage.GetObjectHash(); + + const std::string WorkerUrl = fmt::format("/workers/{}", WorkerId.ToHexString()); + HttpClient::Response RegisterResp = Client.Post(WorkerUrl, std::move(WorkerPackage)); + REQUIRE_MESSAGE(RegisterResp, + fmt::format("Worker registration failed: status={}, body={}", int(RegisterResp.StatusCode), RegisterResp.ToText())); + + return WorkerId; +} + +// Build a Rot13 action CbPackage for the given input string. +static CbPackage +BuildRot13ActionPackage(std::string_view Input) +{ + CompressedBuffer InputCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Input.data(), Input.size()), + OodleCompressor::Selkie, + OodleCompressionLevel::HyperFast4); + + const IoHash InputRawHash = InputCompressed.DecodeRawHash(); + const uint64_t InputRawSize = Input.size(); + + CbAttachment InputAttachment(std::move(InputCompressed), InputRawHash); + + CbObjectWriter ActionWriter; + ActionWriter << "Function"sv + << "Rot13"sv; + ActionWriter << "FunctionVersion"sv << Guid::FromString(kRot13Version); + ActionWriter << "BuildSystemVersion"sv << Guid::FromString(kBuildSystemVersion); + ActionWriter.BeginObject("Inputs"sv); + ActionWriter.BeginObject("Source"sv); + ActionWriter.AddAttachment("RawHash"sv, InputAttachment); + ActionWriter << "RawSize"sv << InputRawSize; + ActionWriter.EndObject(); + ActionWriter.EndObject(); + + CbPackage ActionPackage; + ActionPackage.SetObject(ActionWriter.Save()); + ActionPackage.AddAttachment(InputAttachment); + + return ActionPackage; +} + +// Build a Sleep action CbPackage. The worker sleeps for SleepTimeMs before returning its input. +static CbPackage +BuildSleepActionPackage(std::string_view Input, uint64_t SleepTimeMs) +{ + CompressedBuffer InputCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Input.data(), Input.size()), + OodleCompressor::Selkie, + OodleCompressionLevel::HyperFast4); + + const IoHash InputRawHash = InputCompressed.DecodeRawHash(); + const uint64_t InputRawSize = Input.size(); + + CbAttachment InputAttachment(std::move(InputCompressed), InputRawHash); + + CbObjectWriter ActionWriter; + ActionWriter << "Function"sv + << "Sleep"sv; + ActionWriter << "FunctionVersion"sv << Guid::FromString(kSleepVersion); + ActionWriter << "BuildSystemVersion"sv << Guid::FromString(kBuildSystemVersion); + ActionWriter.BeginObject("Inputs"sv); + ActionWriter.BeginObject("Source"sv); + ActionWriter.AddAttachment("RawHash"sv, InputAttachment); + ActionWriter << "RawSize"sv << InputRawSize; + ActionWriter.EndObject(); + ActionWriter.EndObject(); + ActionWriter.BeginObject("Constants"sv); + ActionWriter << "SleepTimeMs"sv << SleepTimeMs; + ActionWriter.EndObject(); + + CbPackage ActionPackage; + ActionPackage.SetObject(ActionWriter.Save()); + ActionPackage.AddAttachment(InputAttachment); + + return ActionPackage; +} + +// Build a Sleep action CbObject and populate the chunk resolver with the input attachment. +static CbObject +BuildSleepActionForSession(std::string_view Input, uint64_t SleepTimeMs, InMemoryChunkResolver& Resolver) +{ + CompressedBuffer InputCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Input.data(), Input.size()), + OodleCompressor::Selkie, + OodleCompressionLevel::HyperFast4); + + const IoHash InputRawHash = InputCompressed.DecodeRawHash(); + const uint64_t InputRawSize = Input.size(); + + Resolver.AddChunk(InputRawHash, InputCompressed.GetCompressed().Flatten().AsIoBuffer()); + + CbAttachment InputAttachment(std::move(InputCompressed), InputRawHash); + + CbObjectWriter ActionWriter; + ActionWriter << "Function"sv + << "Sleep"sv; + ActionWriter << "FunctionVersion"sv << Guid::FromString(kSleepVersion); + ActionWriter << "BuildSystemVersion"sv << Guid::FromString(kBuildSystemVersion); + ActionWriter.BeginObject("Inputs"sv); + ActionWriter.BeginObject("Source"sv); + ActionWriter.AddAttachment("RawHash"sv, InputAttachment); + ActionWriter << "RawSize"sv << InputRawSize; + ActionWriter.EndObject(); + ActionWriter.EndObject(); + ActionWriter.BeginObject("Constants"sv); + ActionWriter << "SleepTimeMs"sv << SleepTimeMs; + ActionWriter.EndObject(); + + return ActionWriter.Save(); +} + +static HttpClient::Response +PollForResult(HttpClient& Client, const std::string& ResultUrl, uint64_t TimeoutMs = 30'000) +{ + HttpClient::Response Resp; + Stopwatch Timer; + + while (Timer.GetElapsedTimeMs() < TimeoutMs) + { + Resp = Client.Get(ResultUrl); + + if (Resp.StatusCode == HttpResponseCode::OK) + { + break; + } + + Sleep(100); + } + + return Resp; +} + +static bool +PollForLsnInCompleted(HttpClient& Client, const std::string& CompletedUrl, int Lsn, uint64_t TimeoutMs = 30'000) +{ + Stopwatch Timer; + + while (Timer.GetElapsedTimeMs() < TimeoutMs) + { + HttpClient::Response Resp = Client.Get(CompletedUrl); + + if (Resp) + { + for (auto& Item : Resp.AsObject()["completed"sv]) + { + if (Item.AsInt32() == Lsn) + { + return true; + } + } + } + + Sleep(100); + } + + return false; +} + +static std::string +GetRot13Output(const CbPackage& ResultPackage) +{ + CbObject ResultObj = ResultPackage.GetObject(); + + IoHash OutputHash; + CbFieldView ValuesField = ResultObj["Values"sv]; + + if (CbFieldViewIterator It = begin(ValuesField); It.HasValue()) + { + OutputHash = (*It).AsObjectView()["RawHash"sv].AsHash(); + } + + REQUIRE_MESSAGE(OutputHash != IoHash::Zero, "Expected non-zero output hash in result Values array"); + + const CbAttachment* OutputAttachment = ResultPackage.FindAttachment(OutputHash); + REQUIRE_MESSAGE(OutputAttachment != nullptr, "Output attachment not found in result package"); + + CompressedBuffer OutputCompressed = OutputAttachment->AsCompressedBinary(); + SharedBuffer OutputData = OutputCompressed.Decompress(); + + return std::string(static_cast<const char*>(OutputData.GetData()), OutputData.GetSize()); +} + +// Mock orchestrator HTTP service that serves GET /orch/agents with a controllable response. +class MockOrchestratorService : public HttpService +{ +public: + MockOrchestratorService() + { + // Initialize with empty worker list + CbObjectWriter Cbo; + Cbo.BeginArray("workers"sv); + Cbo.EndArray(); + m_WorkerList = Cbo.Save(); + } + + const char* BaseUri() const override { return "/orch/"; } + + void HandleRequest(HttpServerRequest& Request) override + { + if (Request.RequestVerb() == HttpVerb::kGet && Request.RelativeUri() == "agents"sv) + { + RwLock::SharedLockScope Lock(m_Lock); + Request.WriteResponse(HttpResponseCode::OK, m_WorkerList); + return; + } + Request.WriteResponse(HttpResponseCode::NotFound); + } + + void SetWorkerList(CbObject WorkerList) + { + RwLock::ExclusiveLockScope Lock(m_Lock); + m_WorkerList = std::move(WorkerList); + } + +private: + RwLock m_Lock; + CbObject m_WorkerList; +}; + +// Manages in-process ASIO HTTP server lifecycle for mock orchestrator. +struct MockOrchestratorFixture +{ + MockOrchestratorService Service; + ScopedTemporaryDirectory TmpDir; + Ref<HttpServer> Server; + std::thread ServerThread; + uint16_t Port = 0; + + MockOrchestratorFixture() + { + HttpServerConfig Config; + Config.ServerClass = "asio"; + Config.ForceLoopback = true; + Server = CreateHttpServer(Config); + Server->RegisterService(Service); + Port = static_cast<uint16_t>(Server->Initialize(TestEnv.GetNewPortNumber(), TmpDir.Path())); + ZEN_ASSERT(Port != 0); + ServerThread = std::thread([this]() { Server->Run(false); }); + } + + ~MockOrchestratorFixture() + { + Server->RequestExit(); + if (ServerThread.joinable()) + { + ServerThread.join(); + } + Server->Close(); + } + + std::string GetEndpoint() const { return fmt::format("http://localhost:{}", Port); } +}; + +// Build the CbObject response for /orch/agents matching the format UpdateCoordinatorState expects. +static CbObject +BuildAgentListResponse(std::initializer_list<std::pair<std::string_view, std::string_view>> Workers) +{ + CbObjectWriter Cbo; + Cbo.BeginArray("workers"sv); + for (const auto& [Id, Uri] : Workers) + { + Cbo.BeginObject(); + Cbo << "id"sv << Id; + Cbo << "uri"sv << Uri; + Cbo << "hostname"sv + << "localhost"sv; + Cbo << "reachable"sv << true; + Cbo << "dt"sv << uint64_t(0); + Cbo.EndObject(); + } + Cbo.EndArray(); + return Cbo.Save(); +} + +// Build the worker CbPackage for zentest-appstub AND populate the chunk resolver. +// This is the same logic as RegisterWorker() but returns the package instead of POSTing it. +static CbPackage +BuildWorkerPackage(ZenServerEnvironment& Env, InMemoryChunkResolver& Resolver) +{ + std::filesystem::path AppStubPath = Env.ProgramBaseDir() / ("zentest-appstub" ZEN_EXE_SUFFIX_LITERAL); + + FileContents AppStubData = zen::ReadFile(AppStubPath); + REQUIRE_MESSAGE(!AppStubData.ErrorCode, fmt::format("Failed to read '{}': {}", AppStubPath.string(), AppStubData.ErrorCode.message())); + + IoBuffer AppStubBuffer = AppStubData.Flatten(); + + CompressedBuffer AppStubCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(AppStubBuffer.GetData(), AppStubBuffer.Size()), + OodleCompressor::Selkie, + OodleCompressionLevel::HyperFast4); + + const IoHash AppStubRawHash = AppStubCompressed.DecodeRawHash(); + const uint64_t AppStubRawSize = AppStubBuffer.Size(); + + // Store compressed data in chunk resolver for when the remote runner needs it + Resolver.AddChunk(AppStubRawHash, AppStubCompressed.GetCompressed().Flatten().AsIoBuffer()); + + CbAttachment AppStubAttachment(std::move(AppStubCompressed), AppStubRawHash); + + CbObjectWriter WorkerWriter; + WorkerWriter << "buildsystem_version"sv << Guid::FromString(kBuildSystemVersion); + WorkerWriter << "path"sv + << "zentest-appstub"sv; + + WorkerWriter.BeginArray("executables"sv); + WorkerWriter.BeginObject(); + WorkerWriter << "name"sv + << "zentest-appstub"sv; + WorkerWriter.AddAttachment("hash"sv, AppStubAttachment); + WorkerWriter << "size"sv << AppStubRawSize; + WorkerWriter.EndObject(); + WorkerWriter.EndArray(); + + WorkerWriter.BeginArray("functions"sv); + WorkerWriter.BeginObject(); + WorkerWriter << "name"sv + << "Rot13"sv; + WorkerWriter << "version"sv << Guid::FromString(kRot13Version); + WorkerWriter.EndObject(); + WorkerWriter.BeginObject(); + WorkerWriter << "name"sv + << "Sleep"sv; + WorkerWriter << "version"sv << Guid::FromString(kSleepVersion); + WorkerWriter.EndObject(); + WorkerWriter.EndArray(); + + CbPackage WorkerPackage; + WorkerPackage.SetObject(WorkerWriter.Save()); + WorkerPackage.AddAttachment(AppStubAttachment); + + return WorkerPackage; +} + +// Build a Rot13 action CbObject (not CbPackage) and populate the chunk resolver with the input attachment. +static CbObject +BuildRot13ActionForSession(std::string_view Input, InMemoryChunkResolver& Resolver) +{ + CompressedBuffer InputCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Input.data(), Input.size()), + OodleCompressor::Selkie, + OodleCompressionLevel::HyperFast4); + + const IoHash InputRawHash = InputCompressed.DecodeRawHash(); + const uint64_t InputRawSize = Input.size(); + + // Store compressed data in chunk resolver + Resolver.AddChunk(InputRawHash, InputCompressed.GetCompressed().Flatten().AsIoBuffer()); + + CbAttachment InputAttachment(std::move(InputCompressed), InputRawHash); + + CbObjectWriter ActionWriter; + ActionWriter << "Function"sv + << "Rot13"sv; + ActionWriter << "FunctionVersion"sv << Guid::FromString(kRot13Version); + ActionWriter << "BuildSystemVersion"sv << Guid::FromString(kBuildSystemVersion); + ActionWriter.BeginObject("Inputs"sv); + ActionWriter.BeginObject("Source"sv); + ActionWriter.AddAttachment("RawHash"sv, InputAttachment); + ActionWriter << "RawSize"sv << InputRawSize; + ActionWriter.EndObject(); + ActionWriter.EndObject(); + + return ActionWriter.Save(); +} + +TEST_SUITE_BEGIN("server.function"); + +TEST_CASE("function.rot13") +{ + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); + REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); + HttpClient Client(ComputeBaseUri); + + const IoHash WorkerId = RegisterWorker(Client, TestEnv); + + // Submit action via legacy /jobs/{worker} endpoint + const std::string JobUrl = fmt::format("/jobs/{}", WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello World"sv)); + REQUIRE_MESSAGE(SubmitResp, fmt::format("Job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText())); + + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from job submission"); + + // Poll for result via legacy /jobs/{lsn} endpoint + const std::string ResultUrl = fmt::format("/jobs/{}", Lsn); + HttpClient::Response ResultResp = PollForResult(Client, ResultUrl); + REQUIRE_MESSAGE( + ResultResp.StatusCode == HttpResponseCode::OK, + fmt::format("Job did not complete in time. Last status: {}\nServer log:\n{}", int(ResultResp.StatusCode), Instance.GetLogOutput())); + + // Verify result: Rot13("Hello World") == "Uryyb Jbeyq" + CbPackage ResultPackage = ResultResp.AsPackage(); + REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Action failed (empty result package)\nServer log:\n{}", Instance.GetLogOutput())); + + CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv); +} + +TEST_CASE("function.workers") +{ + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); + REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); + HttpClient Client(ComputeBaseUri); + + // Before registration, GET /workers should return an empty list + HttpClient::Response EmptyListResp = Client.Get("/workers"sv); + REQUIRE_MESSAGE(EmptyListResp, "Failed to list workers before registration"); + CHECK_EQ(EmptyListResp.AsObject()["workers"sv].AsArrayView().Num(), 0); + + const IoHash WorkerId = RegisterWorker(Client, TestEnv); + + // GET /workers — the registered worker should appear in the listing + HttpClient::Response ListResp = Client.Get("/workers"sv); + REQUIRE_MESSAGE(ListResp, "Failed to list workers after registration"); + + bool WorkerFound = false; + for (auto& Item : ListResp.AsObject()["workers"sv]) + { + if (Item.AsHash() == WorkerId) + { + WorkerFound = true; + break; + } + } + + REQUIRE_MESSAGE(WorkerFound, fmt::format("Worker {} not found in worker listing", WorkerId.ToHexString())); + + // GET /workers/{worker} — descriptor should match what was registered + const std::string WorkerUrl = fmt::format("/workers/{}", WorkerId.ToHexString()); + HttpClient::Response DescResp = Client.Get(WorkerUrl); + REQUIRE_MESSAGE(DescResp, fmt::format("Failed to get worker descriptor: status={}", int(DescResp.StatusCode))); + + CbObject Desc = DescResp.AsObject(); + CHECK_EQ(Desc["buildsystem_version"sv].AsUuid(), Guid::FromString(kBuildSystemVersion)); + CHECK_EQ(Desc["path"sv].AsString(), "zentest-appstub"sv); + + bool Rot13Found = false; + bool SleepFound = false; + for (auto& Item : Desc["functions"sv]) + { + std::string_view Name = Item.AsObjectView()["name"sv].AsString(); + if (Name == "Rot13"sv) + { + CHECK_EQ(Item.AsObjectView()["version"sv].AsUuid(), Guid::FromString(kRot13Version)); + Rot13Found = true; + } + else if (Name == "Sleep"sv) + { + CHECK_EQ(Item.AsObjectView()["version"sv].AsUuid(), Guid::FromString(kSleepVersion)); + SleepFound = true; + } + } + + CHECK_MESSAGE(Rot13Found, "Rot13 function not found in worker descriptor"); + CHECK_MESSAGE(SleepFound, "Sleep function not found in worker descriptor"); + + // GET /workers/{unknown} — should return 404 + const std::string UnknownUrl = fmt::format("/workers/{}", IoHash::Zero.ToHexString()); + HttpClient::Response NotFoundResp = Client.Get(UnknownUrl); + CHECK_EQ(NotFoundResp.StatusCode, HttpResponseCode::NotFound); +} + +TEST_CASE("function.queues.lifecycle") +{ + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); + REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); + HttpClient Client(ComputeBaseUri); + + const IoHash WorkerId = RegisterWorker(Client, TestEnv); + + // Create a queue + HttpClient::Response CreateResp = Client.Post("/queues"sv); + REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}, body={}", int(CreateResp.StatusCode), CreateResp.ToText())); + + const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); + REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id from queue creation"); + + // Verify the queue appears in the listing + HttpClient::Response ListResp = Client.Get("/queues"sv); + REQUIRE_MESSAGE(ListResp, "Failed to list queues"); + + bool QueueFound = false; + for (auto& Item : ListResp.AsObject()["queues"sv]) + { + if (Item.AsObjectView()["queue_id"sv].AsInt32() == QueueId) + { + QueueFound = true; + break; + } + } + + REQUIRE_MESSAGE(QueueFound, fmt::format("Queue {} not found in queue listing", QueueId)); + + // Submit action via queue-scoped endpoint + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello World"sv)); + REQUIRE_MESSAGE(SubmitResp, + fmt::format("Queue job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText())); + + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from queue job submission"); + + // Poll for completion via queue-scoped /completed endpoint + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), + fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}", + Lsn, + QueueId, + Instance.GetLogOutput())); + + // Retrieve result via queue-scoped /jobs/{lsn} endpoint + const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn); + HttpClient::Response ResultResp = Client.Get(ResultUrl); + REQUIRE_MESSAGE( + ResultResp.StatusCode == HttpResponseCode::OK, + fmt::format("Failed to retrieve result: status={}\nServer log:\n{}", int(ResultResp.StatusCode), Instance.GetLogOutput())); + + // Verify result: Rot13("Hello World") == "Uryyb Jbeyq" + CbPackage ResultPackage = ResultResp.AsPackage(); + REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Empty result package\nServer log:\n{}", Instance.GetLogOutput())); + + CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv); + + // Verify queue status reflects completion + const std::string StatusUrl = fmt::format("/queues/{}", QueueId); + HttpClient::Response StatusResp = Client.Get(StatusUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); + + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0); + CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 0); + CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "active"); +} + +TEST_CASE("function.queues.cancel") +{ + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); + REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); + HttpClient Client(ComputeBaseUri); + + const IoHash WorkerId = RegisterWorker(Client, TestEnv); + + // Create a queue + HttpClient::Response CreateResp = Client.Post("/queues"sv); + REQUIRE_MESSAGE(CreateResp, "Queue creation failed"); + + const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); + REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id from queue creation"); + + // Submit a job + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello World"sv)); + REQUIRE_MESSAGE(SubmitResp, fmt::format("Job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText())); + + // Cancel the queue + const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + HttpClient::Response CancelResp = Client.Delete(QueueUrl); + REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent, + fmt::format("Queue cancellation failed: status={}, body={}", int(CancelResp.StatusCode), CancelResp.ToText())); + + // Verify queue status shows cancelled + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status after cancel"); + + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "cancelled"); +} + +TEST_CASE("function.queues.remote") +{ + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); + REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); + HttpClient Client(ComputeBaseUri); + + const IoHash WorkerId = RegisterWorker(Client, TestEnv); + + // Create a remote queue — response includes both an integer queue_id and an OID queue_token + HttpClient::Response CreateResp = Client.Post("/queues/remote"sv); + REQUIRE_MESSAGE(CreateResp, + fmt::format("Remote queue creation failed: status={}, body={}", int(CreateResp.StatusCode), CreateResp.ToText())); + + CbObject CreateObj = CreateResp.AsObject(); + const std::string QueueToken = std::string(CreateObj["queue_token"sv].AsString()); + REQUIRE_MESSAGE(!QueueToken.empty(), "Expected non-empty queue_token from remote queue creation"); + + // All subsequent requests use the opaque token in place of the integer queue id + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueToken, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello World"sv)); + REQUIRE_MESSAGE(SubmitResp, + fmt::format("Remote queue job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText())); + + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from remote queue job submission"); + + // Poll for completion via the token-addressed /completed endpoint + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueToken); + REQUIRE_MESSAGE( + PollForLsnInCompleted(Client, CompletedUrl, Lsn), + fmt::format("LSN {} did not appear in remote queue completed list within timeout\nServer log:\n{}", Lsn, Instance.GetLogOutput())); + + // Retrieve result via the token-addressed /jobs/{lsn} endpoint + const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueToken, Lsn); + HttpClient::Response ResultResp = Client.Get(ResultUrl); + REQUIRE_MESSAGE(ResultResp.StatusCode == HttpResponseCode::OK, + fmt::format("Failed to retrieve result from remote queue: status={}\nServer log:\n{}", + int(ResultResp.StatusCode), + Instance.GetLogOutput())); + + // Verify result: Rot13("Hello World") == "Uryyb Jbeyq" + CbPackage ResultPackage = ResultResp.AsPackage(); + REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Empty result package\nServer log:\n{}", Instance.GetLogOutput())); + + CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv); +} + +TEST_CASE("function.queues.cancel_running") +{ + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); + REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); + HttpClient Client(ComputeBaseUri); + + const IoHash WorkerId = RegisterWorker(Client, TestEnv); + + // Create a queue + HttpClient::Response CreateResp = Client.Post("/queues"sv); + REQUIRE_MESSAGE(CreateResp, "Queue creation failed"); + + const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); + REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id from queue creation"); + + // Submit a Sleep job long enough that it will still be running when we cancel + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildSleepActionPackage("data"sv, 30'000)); + REQUIRE_MESSAGE(SubmitResp, + fmt::format("Sleep job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText())); + + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission"); + + // Wait for the worker process to start executing before cancelling + Sleep(1'000); + + // Cancel the queue, which should interrupt the running Sleep job + const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + HttpClient::Response CancelResp = Client.Delete(QueueUrl); + REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent, + fmt::format("Queue cancellation failed: status={}, body={}", int(CancelResp.StatusCode), CancelResp.ToText())); + + // The cancelled job should appear in the /completed endpoint once the process exits + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), + fmt::format("LSN {} did not appear in queue {} completed list after cancel\nServer log:\n{}", + Lsn, + QueueId, + Instance.GetLogOutput())); + + // Verify the queue reflects one cancelled action + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status after cancel"); + + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "cancelled"); + CHECK_EQ(QueueStatus["cancelled_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); +} + +TEST_CASE("function.queues.remote_cancel") +{ + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); + REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); + HttpClient Client(ComputeBaseUri); + + const IoHash WorkerId = RegisterWorker(Client, TestEnv); + + // Create a remote queue to obtain an OID token for token-addressed cancellation + HttpClient::Response CreateResp = Client.Post("/queues/remote"sv); + REQUIRE_MESSAGE(CreateResp, + fmt::format("Remote queue creation failed: status={}, body={}", int(CreateResp.StatusCode), CreateResp.ToText())); + + const std::string QueueToken = std::string(CreateResp.AsObject()["queue_token"sv].AsString()); + REQUIRE_MESSAGE(!QueueToken.empty(), "Expected non-empty queue_token from remote queue creation"); + + // Submit a long-running Sleep job via the token-addressed endpoint + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueToken, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildSleepActionPackage("data"sv, 30'000)); + REQUIRE_MESSAGE(SubmitResp, + fmt::format("Sleep job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText())); + + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission"); + + // Wait for the worker process to start executing before cancelling + Sleep(1'000); + + // Cancel the queue via its OID token + const std::string QueueUrl = fmt::format("/queues/{}", QueueToken); + HttpClient::Response CancelResp = Client.Delete(QueueUrl); + REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent, + fmt::format("Remote queue cancellation failed: status={}, body={}", int(CancelResp.StatusCode), CancelResp.ToText())); + + // The cancelled job should appear in the token-addressed /completed endpoint + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueToken); + REQUIRE_MESSAGE( + PollForLsnInCompleted(Client, CompletedUrl, Lsn), + fmt::format("LSN {} did not appear in remote queue completed list after cancel\nServer log:\n{}", Lsn, Instance.GetLogOutput())); + + // Verify the queue status reflects the cancellation + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get remote queue status after cancel"); + + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "cancelled"); + CHECK_EQ(QueueStatus["cancelled_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); +} + +TEST_CASE("function.queues.drain") +{ + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); + REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); + HttpClient Client(ComputeBaseUri); + + const IoHash WorkerId = RegisterWorker(Client, TestEnv); + + // Create a queue + HttpClient::Response CreateResp = Client.Post("/queues"sv); + REQUIRE_MESSAGE(CreateResp, "Queue creation failed"); + + const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); + const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + + // Submit a long-running job so we can verify it completes even after drain + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response Submit1 = Client.Post(JobUrl, BuildSleepActionPackage("data"sv, 2'000)); + REQUIRE_MESSAGE(Submit1, fmt::format("First job submission failed: status={}", int(Submit1.StatusCode))); + const int Lsn1 = Submit1.AsObject()["lsn"sv].AsInt32(); + + // Drain the queue + const std::string DrainUrl = fmt::format("/queues/{}/drain", QueueId); + HttpClient::Response DrainResp = Client.Post(DrainUrl); + REQUIRE_MESSAGE(DrainResp, fmt::format("Drain failed: status={}, body={}", int(DrainResp.StatusCode), DrainResp.ToText())); + CHECK_EQ(std::string(DrainResp.AsObject()["state"sv].AsString()), "draining"); + + // Second submission should be rejected with 424 + HttpClient::Response Submit2 = Client.Post(JobUrl, BuildRot13ActionPackage("Hello"sv)); + CHECK_EQ(Submit2.StatusCode, HttpResponseCode::FailedDependency); + CHECK_EQ(std::string(Submit2.AsObject()["error"sv].AsString()), "queue is draining"); + + // First job should still complete + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn1), + fmt::format("LSN {} did not complete after drain\nServer log:\n{}", Lsn1, Instance.GetLogOutput())); + + // Queue status should show draining + complete + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); + + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "draining"); + CHECK(QueueStatus["is_complete"sv].AsBool()); +} + +TEST_CASE("function.priority") +{ + // Spawn server with max-actions=1 to guarantee serialized action execution, + // which lets us deterministically verify that higher-priority pending jobs + // are scheduled before lower-priority ones. + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady("--max-actions=1"); + REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); + HttpClient Client(ComputeBaseUri); + + const IoHash WorkerId = RegisterWorker(Client, TestEnv); + + // Create a queue for all test jobs + HttpClient::Response CreateResp = Client.Post("/queues"sv); + REQUIRE_MESSAGE(CreateResp, "Queue creation failed"); + + const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); + REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id"); + + // Submit a blocker Sleep job to occupy the single execution slot. + // Once the blocker is running, the scheduler must choose among the pending + // jobs by priority when the slot becomes free. + const std::string BlockerJobUrl = fmt::format("/queues/{}/jobs/{}?priority=0", QueueId, WorkerId.ToHexString()); + HttpClient::Response BlockerResp = Client.Post(BlockerJobUrl, BuildSleepActionPackage("data"sv, 1'000)); + REQUIRE_MESSAGE(BlockerResp, fmt::format("Blocker job submission failed: status={}", int(BlockerResp.StatusCode))); + + // Submit 3 low-priority Rot13 jobs + const std::string LowJobUrl = fmt::format("/queues/{}/jobs/{}?priority=0", QueueId, WorkerId.ToHexString()); + + HttpClient::Response LowResp1 = Client.Post(LowJobUrl, BuildRot13ActionPackage("low1"sv)); + REQUIRE_MESSAGE(LowResp1, "Low-priority job 1 submission failed"); + const int LsnLow1 = LowResp1.AsObject()["lsn"sv].AsInt32(); + + HttpClient::Response LowResp2 = Client.Post(LowJobUrl, BuildRot13ActionPackage("low2"sv)); + REQUIRE_MESSAGE(LowResp2, "Low-priority job 2 submission failed"); + const int LsnLow2 = LowResp2.AsObject()["lsn"sv].AsInt32(); + + HttpClient::Response LowResp3 = Client.Post(LowJobUrl, BuildRot13ActionPackage("low3"sv)); + REQUIRE_MESSAGE(LowResp3, "Low-priority job 3 submission failed"); + const int LsnLow3 = LowResp3.AsObject()["lsn"sv].AsInt32(); + + // Submit 1 high-priority Rot13 job — should execute before the low-priority ones + const std::string HighJobUrl = fmt::format("/queues/{}/jobs/{}?priority=10", QueueId, WorkerId.ToHexString()); + HttpClient::Response HighResp = Client.Post(HighJobUrl, BuildRot13ActionPackage("high"sv)); + REQUIRE_MESSAGE(HighResp, "High-priority job submission failed"); + const int LsnHigh = HighResp.AsObject()["lsn"sv].AsInt32(); + + // Wait for all 4 priority-test jobs to appear in the queue's completed list. + // This avoids any snapshot-timing race: by the time we compare timestamps, all + // jobs have already finished and their history entries are stable. + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + + { + bool AllCompleted = false; + Stopwatch WaitTimer; + + while (!AllCompleted && WaitTimer.GetElapsedTimeMs() < 30'000) + { + HttpClient::Response Resp = Client.Get(CompletedUrl); + + if (Resp) + { + bool FoundHigh = false; + bool FoundLow1 = false; + bool FoundLow2 = false; + bool FoundLow3 = false; + + CbObject RespObj = Resp.AsObject(); + + for (auto& Item : RespObj["completed"sv]) + { + const int Lsn = Item.AsInt32(); + if (Lsn == LsnHigh) + { + FoundHigh = true; + } + else if (Lsn == LsnLow1) + { + FoundLow1 = true; + } + else if (Lsn == LsnLow2) + { + FoundLow2 = true; + } + else if (Lsn == LsnLow3) + { + FoundLow3 = true; + } + } + + AllCompleted = FoundHigh && FoundLow1 && FoundLow2 && FoundLow3; + } + + if (!AllCompleted) + { + Sleep(100); + } + } + + REQUIRE_MESSAGE( + AllCompleted, + fmt::format( + "Not all priority test jobs completed within timeout (lsnHigh={} lsnLow1={} lsnLow2={} lsnLow3={})\nServer log:\n{}", + LsnHigh, + LsnLow1, + LsnLow2, + LsnLow3, + Instance.GetLogOutput())); + } + + // Query the queue-scoped history to obtain the time_Completed timestamp for each + // job. The history endpoint records when each RunnerAction::State transition + // occurred, so time_Completed is the wall-clock tick at which the action finished. + // Using the queue-scoped endpoint avoids exposing history from other queues. + const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); + HttpClient::Response HistoryResp = Client.Get(HistoryUrl); + REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); + + CbObject HistoryObj = HistoryResp.AsObject(); + + auto GetCompletedTimestamp = [&](int Lsn) -> uint64_t { + for (auto& Item : HistoryObj["history"sv]) + { + if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + { + return Item.AsObjectView()["time_Completed"sv].AsUInt64(); + } + } + return 0; + }; + + const uint64_t TimeHigh = GetCompletedTimestamp(LsnHigh); + const uint64_t TimeLow1 = GetCompletedTimestamp(LsnLow1); + const uint64_t TimeLow2 = GetCompletedTimestamp(LsnLow2); + const uint64_t TimeLow3 = GetCompletedTimestamp(LsnLow3); + + REQUIRE_MESSAGE(TimeHigh != 0, fmt::format("lsnHigh={} not found in action history", LsnHigh)); + REQUIRE_MESSAGE(TimeLow1 != 0, fmt::format("lsnLow1={} not found in action history", LsnLow1)); + REQUIRE_MESSAGE(TimeLow2 != 0, fmt::format("lsnLow2={} not found in action history", LsnLow2)); + REQUIRE_MESSAGE(TimeLow3 != 0, fmt::format("lsnLow3={} not found in action history", LsnLow3)); + + // The high-priority job must have completed strictly before every low-priority job + CHECK_MESSAGE(TimeHigh < TimeLow1, + fmt::format("Priority ordering violated: lsnHigh={} completed at t={} but lsnLow1={} completed at t={} (expected later)", + LsnHigh, + TimeHigh, + LsnLow1, + TimeLow1)); + CHECK_MESSAGE(TimeHigh < TimeLow2, + fmt::format("Priority ordering violated: lsnHigh={} completed at t={} but lsnLow2={} completed at t={} (expected later)", + LsnHigh, + TimeHigh, + LsnLow2, + TimeLow2)); + CHECK_MESSAGE(TimeHigh < TimeLow3, + fmt::format("Priority ordering violated: lsnHigh={} completed at t={} but lsnLow3={} completed at t={} (expected later)", + LsnHigh, + TimeHigh, + LsnLow3, + TimeLow3)); +} + +////////////////////////////////////////////////////////////////////////// +// Remote worker synchronization tests +// +// These tests exercise the orchestrator discovery path where new compute +// nodes appear over time and must receive previously registered workers +// via SyncWorkersToRunner(). + +TEST_CASE("function.remote.worker_sync_on_discovery") +{ + // Spawn real zenserver in compute mode + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t ServerPort = Instance.SpawnServerAndWaitUntilReady(); + REQUIRE_MESSAGE(ServerPort != 0, Instance.GetLogOutput()); + + const std::string ServerUri = fmt::format("http://localhost:{}", ServerPort); + + // Start mock orchestrator with empty worker list + MockOrchestratorFixture MockOrch; + + // Create session infrastructure + InMemoryChunkResolver Resolver; + ScopedTemporaryDirectory SessionBaseDir; + zen::compute::ComputeServiceSession Session(Resolver); + Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint()); + Session.SetOrchestratorBasePath(SessionBaseDir.Path()); + Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); + + // Register worker on session (stored locally, no runners yet) + CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); + Session.RegisterWorker(WorkerPackage); + + // Update mock orchestrator to advertise the real server + MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", ServerUri}})); + + // Wait for scheduler to discover the runner (~5s throttle + margin) + Sleep(7'000); + + // Submit Rot13 action via session + CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver); + + zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueAction(ActionObj, 0); + REQUIRE_MESSAGE(EnqueueRes, "Action enqueue failed"); + + // Poll for result + CbPackage ResultPackage; + HttpResponseCode ResultCode = HttpResponseCode::Accepted; + Stopwatch Timer; + + while (Timer.GetElapsedTimeMs() < 30'000) + { + ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage); + if (ResultCode == HttpResponseCode::OK) + { + break; + } + Sleep(200); + } + + REQUIRE_MESSAGE( + ResultCode == HttpResponseCode::OK, + fmt::format("Action did not complete in time. Last status: {}\nServer log:\n{}", int(ResultCode), Instance.GetLogOutput())); + + REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Empty result package\nServer log:\n{}", Instance.GetLogOutput())); + + CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv); + + Session.Shutdown(); +} + +TEST_CASE("function.remote.late_runner_discovery") +{ + // Spawn first server + ZenServerInstance Instance1(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance1.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port1 = Instance1.SpawnServerAndWaitUntilReady(); + REQUIRE_MESSAGE(Port1 != 0, Instance1.GetLogOutput()); + + const std::string ServerUri1 = fmt::format("http://localhost:{}", Port1); + + // Start mock orchestrator advertising W1 + MockOrchestratorFixture MockOrch; + MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", ServerUri1}})); + + // Create session and register worker + InMemoryChunkResolver Resolver; + ScopedTemporaryDirectory SessionBaseDir; + zen::compute::ComputeServiceSession Session(Resolver); + Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint()); + Session.SetOrchestratorBasePath(SessionBaseDir.Path()); + Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); + + CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); + Session.RegisterWorker(WorkerPackage); + + // Wait for W1 discovery + Sleep(7'000); + + // Baseline: submit Rot13 action and verify it completes on W1 + { + CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver); + + zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueAction(ActionObj, 0); + REQUIRE_MESSAGE(EnqueueRes, "Baseline action enqueue failed"); + + CbPackage ResultPackage; + HttpResponseCode ResultCode = HttpResponseCode::Accepted; + Stopwatch Timer; + + while (Timer.GetElapsedTimeMs() < 30'000) + { + ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage); + if (ResultCode == HttpResponseCode::OK) + { + break; + } + Sleep(200); + } + + REQUIRE_MESSAGE(ResultCode == HttpResponseCode::OK, + fmt::format("Baseline action did not complete in time\nServer log:\n{}", Instance1.GetLogOutput())); + + CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv); + } + + // Spawn second server + ZenServerInstance Instance2(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance2.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port2 = Instance2.SpawnServerAndWaitUntilReady(); + REQUIRE_MESSAGE(Port2 != 0, Instance2.GetLogOutput()); + + const std::string ServerUri2 = fmt::format("http://localhost:{}", Port2); + + // Update mock orchestrator to include both W1 and W2 + MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", ServerUri1}, {"worker-2", ServerUri2}})); + + // Wait for W2 discovery + Sleep(7'000); + + // Verify W2 received the worker by querying its /compute/workers endpoint directly + { + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port2); + HttpClient Client(ComputeBaseUri); + HttpClient::Response ListResp = Client.Get("/workers"sv); + REQUIRE_MESSAGE(ListResp, "Failed to list workers on W2"); + + bool WorkerFound = false; + for (auto& Item : ListResp.AsObject()["workers"sv]) + { + if (Item.AsHash() == WorkerPackage.GetObjectHash()) + { + WorkerFound = true; + break; + } + } + + REQUIRE_MESSAGE(WorkerFound, + fmt::format("Worker not found on W2 after discovery — SyncWorkersToRunner may have failed\nW2 log:\n{}", + Instance2.GetLogOutput())); + } + + // Submit another action and verify it completes (could run on either W1 or W2) + { + CbObject ActionObj = BuildRot13ActionForSession("Second Test"sv, Resolver); + + zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueAction(ActionObj, 0); + REQUIRE_MESSAGE(EnqueueRes, "Second action enqueue failed"); + + CbPackage ResultPackage; + HttpResponseCode ResultCode = HttpResponseCode::Accepted; + Stopwatch Timer; + + while (Timer.GetElapsedTimeMs() < 30'000) + { + ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage); + if (ResultCode == HttpResponseCode::OK) + { + break; + } + Sleep(200); + } + + REQUIRE_MESSAGE(ResultCode == HttpResponseCode::OK, + fmt::format("Second action did not complete in time\nW1 log:\n{}\nW2 log:\n{}", + Instance1.GetLogOutput(), + Instance2.GetLogOutput())); + + // Rot13("Second Test") = "Frpbaq Grfg" + CHECK_EQ(GetRot13Output(ResultPackage), "Frpbaq Grfg"sv); + } + + Session.Shutdown(); +} + +TEST_CASE("function.remote.queue_association") +{ + // Spawn real zenserver as a remote compute node + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + REQUIRE_MESSAGE(Instance.SpawnServerAndWaitUntilReady() != 0, Instance.GetLogOutput()); + + // Start mock orchestrator advertising the server + MockOrchestratorFixture MockOrch; + MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", Instance.GetBaseUri()}})); + + // Create session infrastructure + InMemoryChunkResolver Resolver; + ScopedTemporaryDirectory SessionBaseDir; + zen::compute::ComputeServiceSession Session(Resolver); + Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint()); + Session.SetOrchestratorBasePath(SessionBaseDir.Path()); + Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); + + // Register worker on session + CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); + Session.RegisterWorker(WorkerPackage); + + // Wait for scheduler to discover the runner + Sleep(7'000); + + // Create a local queue and submit action to it + auto QueueResult = Session.CreateQueue(); + REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create local queue"); + const int QueueId = QueueResult.QueueId; + + CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver); + + zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueActionToQueue(QueueId, ActionObj, 0); + REQUIRE_MESSAGE(EnqueueRes, "Action enqueue to queue failed"); + + // Poll for result + CbPackage ResultPackage; + HttpResponseCode ResultCode = HttpResponseCode::Accepted; + Stopwatch Timer; + + while (Timer.GetElapsedTimeMs() < 30'000) + { + ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage); + if (ResultCode == HttpResponseCode::OK) + { + break; + } + Sleep(200); + } + + REQUIRE_MESSAGE( + ResultCode == HttpResponseCode::OK, + fmt::format("Action did not complete in time. Last status: {}\nServer log:\n{}", int(ResultCode), Instance.GetLogOutput())); + + REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Empty result package\nServer log:\n{}", Instance.GetLogOutput())); + CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv); + + // Verify that a non-implicit remote queue was created on the compute node + HttpClient Client(Instance.GetBaseUri() + "/compute"); + + HttpClient::Response QueuesResp = Client.Get("/queues"sv); + REQUIRE_MESSAGE(QueuesResp, "Failed to list queues on remote server"); + + bool RemoteQueueFound = false; + for (auto& Item : QueuesResp.AsObject()["queues"sv]) + { + if (!Item.AsObjectView()["implicit"sv].AsBool()) + { + RemoteQueueFound = true; + break; + } + } + + CHECK_MESSAGE(RemoteQueueFound, "Expected a non-implicit remote queue on the compute node"); + + Session.Shutdown(); +} + +TEST_CASE("function.remote.queue_cancel_propagation") +{ + // Spawn real zenserver as a remote compute node + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + REQUIRE_MESSAGE(Instance.SpawnServerAndWaitUntilReady() != 0, Instance.GetLogOutput()); + + // Start mock orchestrator advertising the server + MockOrchestratorFixture MockOrch; + MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", Instance.GetBaseUri()}})); + + // Create session infrastructure + InMemoryChunkResolver Resolver; + ScopedTemporaryDirectory SessionBaseDir; + zen::compute::ComputeServiceSession Session(Resolver); + Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint()); + Session.SetOrchestratorBasePath(SessionBaseDir.Path()); + Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); + + // Register worker on session + CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); + Session.RegisterWorker(WorkerPackage); + + // Wait for scheduler to discover the runner + Sleep(7'000); + + // Create a local queue and submit a long-running Sleep action + auto QueueResult = Session.CreateQueue(); + REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create local queue"); + const int QueueId = QueueResult.QueueId; + + CbObject ActionObj = BuildSleepActionForSession("data"sv, 30'000, Resolver); + + zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueActionToQueue(QueueId, ActionObj, 0); + REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed"); + + // Wait for the action to start running on the remote + Sleep(2'000); + + // Cancel the local queue — this should propagate to the remote + Session.CancelQueue(QueueId); + + // Poll for the action to complete (as cancelled) + CbPackage ResultPackage; + HttpResponseCode ResultCode = HttpResponseCode::Accepted; + Stopwatch Timer; + + while (Timer.GetElapsedTimeMs() < 30'000) + { + ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage); + if (ResultCode == HttpResponseCode::OK) + { + break; + } + Sleep(200); + } + + // Verify the local queue shows cancelled + auto QueueStatus = Session.GetQueueStatus(QueueId); + CHECK(QueueStatus.State == zen::compute::ComputeServiceSession::QueueState::Cancelled); + + // Verify the remote queue on the compute node is also cancelled + HttpClient Client(Instance.GetBaseUri() + "/compute"); + + HttpClient::Response QueuesResp = Client.Get("/queues"sv); + REQUIRE_MESSAGE(QueuesResp, "Failed to list queues on remote server"); + + bool RemoteQueueCancelled = false; + for (auto& Item : QueuesResp.AsObject()["queues"sv]) + { + if (!Item.AsObjectView()["implicit"sv].AsBool()) + { + RemoteQueueCancelled = std::string(Item.AsObjectView()["state"sv].AsString()) == "cancelled"; + break; + } + } + + CHECK_MESSAGE(RemoteQueueCancelled, "Expected the remote queue to be cancelled"); + + Session.Shutdown(); +} + +TEST_CASE("function.abandon_running_http") +{ + // Spawn a real zenserver to execute a long-running action, then abandon via HTTP endpoint + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); + REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); + HttpClient Client(ComputeBaseUri); + + const IoHash WorkerId = RegisterWorker(Client, TestEnv); + + // Create a queue and submit a long-running Sleep job + HttpClient::Response CreateResp = Client.Post("/queues"sv); + REQUIRE_MESSAGE(CreateResp, "Queue creation failed"); + + const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); + REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id"); + + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildSleepActionPackage("data"sv, 30'000)); + REQUIRE_MESSAGE(SubmitResp, fmt::format("Sleep job submission failed: status={}", int(SubmitResp.StatusCode))); + + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN"); + + // Wait for the process to start running + Sleep(1'000); + + // Verify the ready endpoint returns OK before abandon + { + HttpClient::Response ReadyResp = Client.Get("/ready"sv); + CHECK(ReadyResp.StatusCode == HttpResponseCode::OK); + } + + // Trigger abandon via the HTTP endpoint + HttpClient::Response AbandonResp = Client.Post("/abandon"sv); + REQUIRE_MESSAGE(AbandonResp.StatusCode == HttpResponseCode::OK, + fmt::format("Abandon request failed: status={}, body={}", int(AbandonResp.StatusCode), AbandonResp.ToText())); + + // Ready endpoint should now return 503 + { + HttpClient::Response ReadyResp = Client.Get("/ready"sv); + CHECK(ReadyResp.StatusCode == HttpResponseCode::ServiceUnavailable); + } + + // The abandoned action should appear in the completed endpoint once the process exits + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), + fmt::format("LSN {} did not appear in queue {} completed list after abandon\nServer log:\n{}", + Lsn, + QueueId, + Instance.GetLogOutput())); + + // Verify the queue reflects one abandoned action + const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status after abandon"); + + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(QueueStatus["abandoned_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); + CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0); + + // Submitting new work should be rejected + HttpClient::Response RejectedResp = Client.Post(JobUrl, BuildRot13ActionPackage("rejected"sv)); + CHECK_MESSAGE(RejectedResp.StatusCode != HttpResponseCode::OK, "Expected action submission to be rejected in Abandoned state"); +} + +TEST_CASE("function.session.abandon_pending") +{ + // Create a session with no runners so actions stay pending + InMemoryChunkResolver Resolver; + ScopedTemporaryDirectory SessionBaseDir; + zen::compute::ComputeServiceSession Session(Resolver); + Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); + + CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); + Session.RegisterWorker(WorkerPackage); + + // Enqueue several actions — they will stay pending because there are no runners + auto QueueResult = Session.CreateQueue(); + REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create queue"); + + CbObject ActionObj = BuildRot13ActionForSession("abandon-test"sv, Resolver); + + auto Enqueue1 = Session.EnqueueActionToQueue(QueueResult.QueueId, ActionObj, 0); + auto Enqueue2 = Session.EnqueueActionToQueue(QueueResult.QueueId, ActionObj, 0); + auto Enqueue3 = Session.EnqueueActionToQueue(QueueResult.QueueId, ActionObj, 0); + REQUIRE_MESSAGE(Enqueue1, "Failed to enqueue action 1"); + REQUIRE_MESSAGE(Enqueue2, "Failed to enqueue action 2"); + REQUIRE_MESSAGE(Enqueue3, "Failed to enqueue action 3"); + + // Transition to Abandoned — should mark all pending actions as Abandoned + bool Transitioned = Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Abandoned); + CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned"); + CHECK(Session.GetSessionState() == zen::compute::ComputeServiceSession::SessionState::Abandoned); + CHECK(!Session.IsHealthy()); + + // Give the scheduler thread time to process the state changes + Sleep(2'000); + + // All three actions should now be in the results map as abandoned + for (int Lsn : {Enqueue1.Lsn, Enqueue2.Lsn, Enqueue3.Lsn}) + { + CbPackage Result; + HttpResponseCode Code = Session.GetActionResult(Lsn, Result); + CHECK_MESSAGE(Code == HttpResponseCode::OK, fmt::format("Expected action LSN {} to be in results (got {})", Lsn, int(Code))); + } + + // Queue should show 0 active, 3 abandoned + auto Status = Session.GetQueueStatus(QueueResult.QueueId); + CHECK_EQ(Status.ActiveCount, 0); + CHECK_EQ(Status.AbandonedCount, 3); + + // New actions should be rejected + auto Rejected = Session.EnqueueActionToQueue(QueueResult.QueueId, ActionObj, 0); + CHECK_MESSAGE(!Rejected, "Expected action submission to be rejected in Abandoned state"); + + // Abandoned → Sunset should be valid + CHECK(Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Sunset)); + + Session.Shutdown(); +} + +TEST_CASE("function.session.abandon_running") +{ + // Spawn a real zenserver as a remote compute node + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + REQUIRE_MESSAGE(Instance.SpawnServerAndWaitUntilReady() != 0, Instance.GetLogOutput()); + + // Start mock orchestrator advertising the server + MockOrchestratorFixture MockOrch; + MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", Instance.GetBaseUri()}})); + + // Create session infrastructure + InMemoryChunkResolver Resolver; + ScopedTemporaryDirectory SessionBaseDir; + zen::compute::ComputeServiceSession Session(Resolver); + Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint()); + Session.SetOrchestratorBasePath(SessionBaseDir.Path()); + Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); + + CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); + Session.RegisterWorker(WorkerPackage); + + // Wait for scheduler to discover the runner + Sleep(7'000); + + // Create a queue and submit a long-running Sleep action + auto QueueResult = Session.CreateQueue(); + REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create queue"); + const int QueueId = QueueResult.QueueId; + + CbObject ActionObj = BuildSleepActionForSession("data"sv, 30'000, Resolver); + + auto EnqueueRes = Session.EnqueueActionToQueue(QueueId, ActionObj, 0); + REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed"); + + // Wait for the action to start running on the remote + Sleep(2'000); + + // Transition to Abandoned — should abandon the running action + bool Transitioned = Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Abandoned); + CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned"); + CHECK(!Session.IsHealthy()); + + // Poll for the action to complete (as abandoned) + CbPackage ResultPackage; + HttpResponseCode ResultCode = HttpResponseCode::Accepted; + Stopwatch Timer; + + while (Timer.GetElapsedTimeMs() < 30'000) + { + ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage); + if (ResultCode == HttpResponseCode::OK) + { + break; + } + Sleep(200); + } + + REQUIRE_MESSAGE(ResultCode == HttpResponseCode::OK, + fmt::format("Action did not complete within timeout\nServer log:\n{}", Instance.GetLogOutput())); + + // Verify the queue shows abandoned, not completed + auto QueueStatus = Session.GetQueueStatus(QueueId); + CHECK_EQ(QueueStatus.ActiveCount, 0); + CHECK_EQ(QueueStatus.AbandonedCount, 1); + CHECK_EQ(QueueStatus.CompletedCount, 0); + + Session.Shutdown(); +} + +TEST_CASE("function.remote.abandon_propagation") +{ + // Spawn real zenserver as a remote compute node + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + REQUIRE_MESSAGE(Instance.SpawnServerAndWaitUntilReady() != 0, Instance.GetLogOutput()); + + // Start mock orchestrator advertising the server + MockOrchestratorFixture MockOrch; + MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", Instance.GetBaseUri()}})); + + // Create session infrastructure + InMemoryChunkResolver Resolver; + ScopedTemporaryDirectory SessionBaseDir; + zen::compute::ComputeServiceSession Session(Resolver); + Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint()); + Session.SetOrchestratorBasePath(SessionBaseDir.Path()); + Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); + + // Register worker on session + CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); + Session.RegisterWorker(WorkerPackage); + + // Wait for scheduler to discover the runner + Sleep(7'000); + + // Create a local queue and submit a long-running Sleep action + auto QueueResult = Session.CreateQueue(); + REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create local queue"); + const int QueueId = QueueResult.QueueId; + + CbObject ActionObj = BuildSleepActionForSession("data"sv, 30'000, Resolver); + + auto EnqueueRes = Session.EnqueueActionToQueue(QueueId, ActionObj, 0); + REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed"); + + // Wait for the action to start running on the remote + Sleep(2'000); + + // Transition to Abandoned — should abandon the running action and propagate + bool Transitioned = Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Abandoned); + CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned"); + + // Poll for the action to complete + CbPackage ResultPackage; + HttpResponseCode ResultCode = HttpResponseCode::Accepted; + Stopwatch Timer; + + while (Timer.GetElapsedTimeMs() < 30'000) + { + ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage); + if (ResultCode == HttpResponseCode::OK) + { + break; + } + Sleep(200); + } + + REQUIRE_MESSAGE(ResultCode == HttpResponseCode::OK, + fmt::format("Action did not complete within timeout\nServer log:\n{}", Instance.GetLogOutput())); + + // Verify the local queue shows abandoned + auto QueueStatus = Session.GetQueueStatus(QueueId); + CHECK_EQ(QueueStatus.ActiveCount, 0); + CHECK_EQ(QueueStatus.AbandonedCount, 1); + + // Session should not be healthy + CHECK(!Session.IsHealthy()); + + // The remote compute node should still be healthy (only the parent abandoned) + HttpClient RemoteClient(Instance.GetBaseUri() + "/compute"); + HttpClient::Response ReadyResp = RemoteClient.Get("/ready"sv); + CHECK_MESSAGE(ReadyResp.StatusCode == HttpResponseCode::OK, "Remote compute node should still be healthy"); + + Session.Shutdown(); +} + +TEST_SUITE_END(); + +} // namespace zen::tests::compute + +#endif |