From b0a3de5fec8f4da8f9513b02bc2326aa6a0e7bd5 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Fri, 13 Feb 2026 13:47:51 +0100 Subject: logging config move to zenutil (#754) made logging config options from zenserver available in zen CLI --- src/zenserver-test/zenserver-test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/zenserver-test') diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index 9a42bb73d..4120dec1a 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -17,7 +17,7 @@ # include # include # include -# include +# include # include # include -- cgit v1.2.3 From 149a5c2faa8d59290b8b44717e504532e906aae2 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Wed, 18 Feb 2026 11:28:03 +0100 Subject: structured compute basics (#714) this change adds the `zencompute` component, which can be used to distribute work dispatched from UE using the DDB (Derived Data Build) APIs via zenserver this change also adds a distinct zenserver compute mode (`zenserver compute`) which is intended to be used for leaf compute nodes to exercise the compute functionality without directly involving UE, a `zen exec` subcommand is also added, which can be used to feed replays through the system all new functionality is considered *experimental* and disabled by default at this time, behind the `zencompute` option in xmake config --- src/zenserver-test/function-tests.cpp | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 src/zenserver-test/function-tests.cpp (limited to 'src/zenserver-test') diff --git a/src/zenserver-test/function-tests.cpp b/src/zenserver-test/function-tests.cpp new file mode 100644 index 000000000..559387fa2 --- /dev/null +++ b/src/zenserver-test/function-tests.cpp @@ -0,0 +1,34 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include + +#if ZEN_WITH_TESTS + +# include +# include +# include +# include +# include + +# include "zenserver-test.h" + +namespace zen::tests { + +using namespace std::literals; + +TEST_CASE("function.run") +{ + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + + ZenServerInstance Instance(TestEnv); + Instance.SetDataDir(TestDir); + Instance.SpawnServer(13337); + + ZEN_INFO("Waiting..."); + + Instance.WaitUntilReady(); +} + +} // namespace zen::tests + +#endif -- cgit v1.2.3 From c32b6042dee8444f4e214f227005a657ec87531e Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Fri, 27 Feb 2026 21:22:00 +0100 Subject: add multirange requests to blob store (#795) * add multirange requests to blob store --- src/zenserver-test/buildstore-tests.cpp | 200 +++++++++++++++++++++++++++++++- 1 file changed, 199 insertions(+), 1 deletion(-) (limited to 'src/zenserver-test') diff --git a/src/zenserver-test/buildstore-tests.cpp b/src/zenserver-test/buildstore-tests.cpp index 02b308485..ef48b2362 100644 --- a/src/zenserver-test/buildstore-tests.cpp +++ b/src/zenserver-test/buildstore-tests.cpp @@ -36,7 +36,8 @@ TEST_CASE("buildstore.blobs") std::string_view Bucket = "bkt"sv; Oid BuildId = Oid::NewOid(); - std::vector CompressedBlobsHashes; + std::vector CompressedBlobsHashes; + std::vector CompressedBlobsSizes; { ZenServerInstance Instance(TestEnv); @@ -51,6 +52,7 @@ TEST_CASE("buildstore.blobs") IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); + CompressedBlobsSizes.push_back(CompressedBlob.GetCompressedSize()); IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCompressedBinary); @@ -107,6 +109,7 @@ TEST_CASE("buildstore.blobs") IoBuffer Blob = CreateSemiRandomBlob(5713 + I * 7); CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); + CompressedBlobsSizes.push_back(CompressedBlob.GetCompressedSize()); IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCompressedBinary); @@ -141,6 +144,201 @@ TEST_CASE("buildstore.blobs") CHECK(IoHash::HashBuffer(Decompressed) == RawHash); } } + + { + // Single-range Get + + ZenServerInstance Instance(TestEnv); + + const uint16_t PortNumber = + Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); + CHECK(PortNumber != 0); + + HttpClient Client(Instance.GetBaseUri() + "/builds/"); + + { + const IoHash& RawHash = CompressedBlobsHashes.front(); + uint64_t BlobSize = CompressedBlobsSizes.front(); + + std::vector> Ranges = {{BlobSize / 16 * 1, BlobSize / 2}}; + + uint64_t RangeSizeSum = Ranges.front().second; + + HttpClient::KeyValueMap Headers; + + Headers.Entries.insert( + {"Range", fmt::format("bytes={}-{}", Ranges.front().first, Ranges.front().first + Ranges.front().second - 1)}); + + HttpClient::Response Result = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), Headers); + REQUIRE(Result); + IoBuffer Payload = Result.ResponsePayload; + CHECK_EQ(RangeSizeSum, Payload.GetSize()); + + HttpClient::Response FullBlobResult = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), + HttpClient::Accept(ZenContentType::kCompressedBinary)); + REQUIRE(FullBlobResult); + MemoryView ActualRange = FullBlobResult.ResponsePayload.GetView().Mid(Ranges.front().first, Ranges.front().second); + MemoryView RangeView = Payload.GetView(); + CHECK(ActualRange.EqualBytes(RangeView)); + } + } + + { + // Single-range Post + + ZenServerInstance Instance(TestEnv); + + const uint16_t PortNumber = + Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); + CHECK(PortNumber != 0); + + HttpClient Client(Instance.GetBaseUri() + "/builds/"); + + { + uint64_t RangeSizeSum = 0; + + const IoHash& RawHash = CompressedBlobsHashes.front(); + uint64_t BlobSize = CompressedBlobsSizes.front(); + + std::vector> Ranges = {{BlobSize / 16 * 1, BlobSize / 2}}; + + CbObjectWriter Writer; + Writer.BeginArray("ranges"sv); + { + for (const std::pair& Range : Ranges) + { + Writer.BeginObject(); + { + Writer.AddInteger("offset"sv, Range.first); + Writer.AddInteger("length"sv, Range.second); + RangeSizeSum += Range.second; + } + Writer.EndObject(); + } + } + Writer.EndArray(); // ranges + + HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), + Writer.Save(), + HttpClient::Accept(ZenContentType::kCbPackage)); + REQUIRE(Result); + IoBuffer Payload = Result.ResponsePayload; + REQUIRE(Payload.GetContentType() == ZenContentType::kCbPackage); + + CbPackage ResponsePackage = ParsePackageMessage(Payload); + CbObjectView ResponseObject = ResponsePackage.GetObject(); + + CbArrayView RangeArray = ResponseObject["ranges"sv].AsArrayView(); + CHECK_EQ(RangeArray.Num(), Ranges.size()); + size_t RangeOffset = 0; + for (CbFieldView View : RangeArray) + { + CbObjectView Range = View.AsObjectView(); + CHECK_EQ(Range["offset"sv].AsUInt64(), Ranges[RangeOffset].first); + CHECK_EQ(Range["length"sv].AsUInt64(), Ranges[RangeOffset].second); + RangeOffset++; + } + + const CbAttachment* DataAttachment = ResponsePackage.FindAttachment(RawHash); + REQUIRE(DataAttachment); + SharedBuffer PayloadRanges = DataAttachment->AsBinary(); + CHECK_EQ(RangeSizeSum, PayloadRanges.GetSize()); + + HttpClient::Response FullBlobResult = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), + HttpClient::Accept(ZenContentType::kCompressedBinary)); + REQUIRE(FullBlobResult); + + uint64_t Offset = 0; + for (const std::pair& Range : Ranges) + { + MemoryView ActualRange = FullBlobResult.ResponsePayload.GetView().Mid(Range.first, Range.second); + MemoryView RangeView = PayloadRanges.GetView().Mid(Offset, Range.second); + CHECK(ActualRange.EqualBytes(RangeView)); + Offset += Range.second; + } + } + } + + { + // Multi-range + + ZenServerInstance Instance(TestEnv); + + const uint16_t PortNumber = + Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); + CHECK(PortNumber != 0); + + HttpClient Client(Instance.GetBaseUri() + "/builds/"); + + { + uint64_t RangeSizeSum = 0; + + const IoHash& RawHash = CompressedBlobsHashes.front(); + uint64_t BlobSize = CompressedBlobsSizes.front(); + + std::vector> Ranges = { + {BlobSize / 16 * 1, BlobSize / 20}, + {BlobSize / 16 * 3, BlobSize / 32}, + {BlobSize / 16 * 5, BlobSize / 16}, + {BlobSize - BlobSize / 16, BlobSize / 16 - 1}, + }; + + CbObjectWriter Writer; + Writer.BeginArray("ranges"sv); + { + for (const std::pair& Range : Ranges) + { + Writer.BeginObject(); + { + Writer.AddInteger("offset"sv, Range.first); + Writer.AddInteger("length"sv, Range.second); + RangeSizeSum += Range.second; + } + Writer.EndObject(); + } + } + Writer.EndArray(); // ranges + + HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), + Writer.Save(), + HttpClient::Accept(ZenContentType::kCbPackage)); + REQUIRE(Result); + IoBuffer Payload = Result.ResponsePayload; + REQUIRE(Payload.GetContentType() == ZenContentType::kCbPackage); + + CbPackage ResponsePackage = ParsePackageMessage(Payload); + CbObjectView ResponseObject = ResponsePackage.GetObject(); + + CbArrayView RangeArray = ResponseObject["ranges"sv].AsArrayView(); + CHECK_EQ(RangeArray.Num(), Ranges.size()); + size_t RangeOffset = 0; + for (CbFieldView View : RangeArray) + { + CbObjectView Range = View.AsObjectView(); + CHECK_EQ(Range["offset"sv].AsUInt64(), Ranges[RangeOffset].first); + CHECK_EQ(Range["length"sv].AsUInt64(), Ranges[RangeOffset].second); + RangeOffset++; + } + + const CbAttachment* DataAttachment = ResponsePackage.FindAttachment(RawHash); + REQUIRE(DataAttachment); + SharedBuffer PayloadRanges = DataAttachment->AsBinary(); + CHECK_EQ(RangeSizeSum, PayloadRanges.GetSize()); + + HttpClient::Response FullBlobResult = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), + HttpClient::Accept(ZenContentType::kCompressedBinary)); + REQUIRE(FullBlobResult); + + uint64_t Offset = 0; + for (const std::pair& Range : Ranges) + { + MemoryView ActualRange = FullBlobResult.ResponsePayload.GetView().Mid(Range.first, Range.second); + MemoryView RangeView = PayloadRanges.GetView().Mid(Offset, Range.second); + CHECK(ActualRange.EqualBytes(RangeView)); + Offset += Range.second; + } + } + } } namespace { -- cgit v1.2.3 From c7e0efb9c12f4607d4bc6a844a3e5bd3272bd839 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Sat, 28 Feb 2026 15:36:13 +0100 Subject: test running / reporting improvements (#797) **CI/CD improvements (validate.yml):** - Add test reporter (`ue-foundation/test-reporter@v2`) for all three platforms, rendering JUnit test results directly in PR check runs - Add "Trust workspace" step on Windows to fix git safe.directory ownership issue with self-hosted runners - Clean stale report files before each test run to prevent false failures from leftover XML - Broaden `paths-ignore` to skip builds for non-code changes (`*.md`, `LICENSE`, `.gitignore`, `docs/**`) **Test improvements:** - Convert `CHECK` to `REQUIRE` in several test suites (projectstore, integration, http) for fail-fast behavior - Mark some tests with `doctest::skip()` for selective execution - Skip httpclient transport tests pending investigation - Add `--noskip` option to `xmake test` task - Add `--repeat=` option to `xmake test` task, to run tests repeatedly N times or until there is a failure **xmake test output improvements:** - Add totals row to test summary table - Right-justify numeric columns in summary table --- src/zenserver-test/buildstore-tests.cpp | 16 +++++++-------- src/zenserver-test/cache-tests.cpp | 10 ++------- src/zenserver-test/hub-tests.cpp | 2 +- src/zenserver-test/projectstore-tests.cpp | 34 +++++++++++++++---------------- src/zenserver-test/workspace-tests.cpp | 4 ++-- 5 files changed, 30 insertions(+), 36 deletions(-) (limited to 'src/zenserver-test') diff --git a/src/zenserver-test/buildstore-tests.cpp b/src/zenserver-test/buildstore-tests.cpp index ef48b2362..7cd31db06 100644 --- a/src/zenserver-test/buildstore-tests.cpp +++ b/src/zenserver-test/buildstore-tests.cpp @@ -389,7 +389,7 @@ TEST_CASE("buildstore.metadata") HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/getBlobMetadata", Namespace, Bucket, BuildId), Payload, HttpClient::Accept(ZenContentType::kCbObject)); - CHECK(Result); + REQUIRE(Result); std::vector ResultMetadatas; @@ -570,7 +570,7 @@ TEST_CASE("buildstore.cache") { std::vector Exists = Cache->BlobsExists(BuildId, BlobHashes); - CHECK(Exists.size() == BlobHashes.size()); + REQUIRE(Exists.size() == BlobHashes.size()); for (size_t I = 0; I < BlobCount; I++) { CHECK(Exists[I].HasBody); @@ -609,7 +609,7 @@ TEST_CASE("buildstore.cache") { std::vector Exists = Cache->BlobsExists(BuildId, BlobHashes); - CHECK(Exists.size() == BlobHashes.size()); + REQUIRE(Exists.size() == BlobHashes.size()); for (size_t I = 0; I < BlobCount; I++) { CHECK(Exists[I].HasBody); @@ -617,7 +617,7 @@ TEST_CASE("buildstore.cache") } std::vector FetchedMetadatas = Cache->GetBlobMetadatas(BuildId, BlobHashes); - CHECK_EQ(BlobCount, FetchedMetadatas.size()); + REQUIRE_EQ(BlobCount, FetchedMetadatas.size()); for (size_t I = 0; I < BlobCount; I++) { @@ -638,7 +638,7 @@ TEST_CASE("buildstore.cache") { std::vector Exists = Cache->BlobsExists(BuildId, BlobHashes); - CHECK(Exists.size() == BlobHashes.size()); + REQUIRE(Exists.size() == BlobHashes.size()); for (size_t I = 0; I < BlobCount * 2; I++) { CHECK(Exists[I].HasBody); @@ -649,7 +649,7 @@ TEST_CASE("buildstore.cache") CHECK_EQ(BlobCount, MetaDatas.size()); std::vector FetchedMetadatas = Cache->GetBlobMetadatas(BuildId, BlobHashes); - CHECK_EQ(BlobCount, FetchedMetadatas.size()); + REQUIRE_EQ(BlobCount, FetchedMetadatas.size()); for (size_t I = 0; I < BlobCount; I++) { @@ -672,7 +672,7 @@ TEST_CASE("buildstore.cache") CreateZenBuildStorageCache(Client, Stats, Namespace, Bucket, TempDir, GetTinyWorkerPool(EWorkloadType::Background))); std::vector Exists = Cache->BlobsExists(BuildId, BlobHashes); - CHECK(Exists.size() == BlobHashes.size()); + REQUIRE(Exists.size() == BlobHashes.size()); for (size_t I = 0; I < BlobCount * 2; I++) { CHECK(Exists[I].HasBody); @@ -691,7 +691,7 @@ TEST_CASE("buildstore.cache") CHECK_EQ(BlobCount, MetaDatas.size()); std::vector FetchedMetadatas = Cache->GetBlobMetadatas(BuildId, BlobHashes); - CHECK_EQ(BlobCount, FetchedMetadatas.size()); + REQUIRE_EQ(BlobCount, FetchedMetadatas.size()); for (size_t I = 0; I < BlobCount; I++) { diff --git a/src/zenserver-test/cache-tests.cpp b/src/zenserver-test/cache-tests.cpp index 0272d3797..745a89253 100644 --- a/src/zenserver-test/cache-tests.cpp +++ b/src/zenserver-test/cache-tests.cpp @@ -145,7 +145,7 @@ TEST_CASE("zcache.cbpackage") for (const zen::CbAttachment& LhsAttachment : LhsAttachments) { const zen::CbAttachment* RhsAttachment = Rhs.FindAttachment(LhsAttachment.GetHash()); - CHECK(RhsAttachment); + REQUIRE(RhsAttachment); zen::SharedBuffer LhsBuffer = LhsAttachment.AsCompressedBinary().Decompress(); CHECK(!LhsBuffer.IsNull()); @@ -1373,14 +1373,8 @@ TEST_CASE("zcache.rpc") } } -TEST_CASE("zcache.failing.upstream") +TEST_CASE("zcache.failing.upstream" * doctest::skip()) { - // This is an exploratory test that takes a long time to run, so lets skip it by default - if (true) - { - return; - } - using namespace std::literals; using namespace utils; diff --git a/src/zenserver-test/hub-tests.cpp b/src/zenserver-test/hub-tests.cpp index 42a5dcae4..bd85a5020 100644 --- a/src/zenserver-test/hub-tests.cpp +++ b/src/zenserver-test/hub-tests.cpp @@ -232,7 +232,7 @@ TEST_CASE("hub.lifecycle.children") TEST_SUITE_END(); -TEST_CASE("hub.consul.lifecycle") +TEST_CASE("hub.consul.lifecycle" * doctest::skip()) { zen::consul::ConsulProcess ConsulProc; ConsulProc.SpawnConsulAgent(); diff --git a/src/zenserver-test/projectstore-tests.cpp b/src/zenserver-test/projectstore-tests.cpp index 735aef159..487832405 100644 --- a/src/zenserver-test/projectstore-tests.cpp +++ b/src/zenserver-test/projectstore-tests.cpp @@ -71,7 +71,7 @@ TEST_CASE("project.basic") { auto Response = Http.Get("/prj/test"sv); - CHECK(Response.StatusCode == HttpResponseCode::OK); + REQUIRE(Response.StatusCode == HttpResponseCode::OK); CbObject ResponseObject = Response.AsObject(); @@ -92,7 +92,7 @@ TEST_CASE("project.basic") { auto Response = Http.Get(""sv); - CHECK(Response.StatusCode == HttpResponseCode::OK); + REQUIRE(Response.StatusCode == HttpResponseCode::OK); CbObject ResponseObject = Response.AsObject(); @@ -213,7 +213,7 @@ TEST_CASE("project.basic") auto Response = Http.Get(ChunkGetUri); REQUIRE(Response); - CHECK(Response.StatusCode == HttpResponseCode::OK); + REQUIRE(Response.StatusCode == HttpResponseCode::OK); IoBuffer Data = Response.ResponsePayload; IoBuffer ReferenceData = IoBufferBuilder::MakeFromFile(RootPath / BinPath); @@ -235,13 +235,13 @@ TEST_CASE("project.basic") auto Response = Http.Get(ChunkGetUri, {{"Accept-Type", "application/x-ue-comp"}}); REQUIRE(Response); - CHECK(Response.StatusCode == HttpResponseCode::OK); + REQUIRE(Response.StatusCode == HttpResponseCode::OK); IoBuffer Data = Response.ResponsePayload; IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), RawHash, RawSize); - CHECK(Compressed); + REQUIRE(Compressed); IoBuffer DataDecompressed = Compressed.Decompress().AsIoBuffer(); IoBuffer ReferenceData = IoBufferBuilder::MakeFromFile(RootPath / BinPath); CHECK(RawSize == ReferenceData.GetSize()); @@ -436,13 +436,13 @@ TEST_CASE("project.remote") HttpClient Http{UrlBase}; HttpClient::Response Response = Http.Post(fmt::format("/prj/{}", ProjectName), ProjectPayload); - CHECK(Response); + REQUIRE(Response); }; auto MakeOplog = [](std::string_view UrlBase, std::string_view ProjectName, std::string_view OplogName) { HttpClient Http{UrlBase}; HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName), IoBuffer{}); - CHECK(Response); + REQUIRE(Response); }; auto MakeOp = [](std::string_view UrlBase, std::string_view ProjectName, std::string_view OplogName, const CbPackage& OpPackage) { @@ -453,7 +453,7 @@ TEST_CASE("project.remote") HttpClient Http{UrlBase}; HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/new", ProjectName, OplogName), Body); - CHECK(Response); + REQUIRE(Response); }; MakeProject(Servers.GetInstance(0).GetBaseUri(), "proj0"); @@ -504,7 +504,7 @@ TEST_CASE("project.remote") HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", Project, Oplog), Payload, {{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Response); + REQUIRE(Response); CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload); CHECK(ResponsePackage.GetAttachments().size() == AttachmentHashes.size()); for (auto A : ResponsePackage.GetAttachments()) @@ -519,7 +519,7 @@ TEST_CASE("project.remote") HttpClient Http{Servers.GetInstance(ServerIndex).GetBaseUri()}; HttpClient::Response Response = Http.Get(fmt::format("/prj/{}/oplog/{}/entries", Project, Oplog)); - CHECK(Response); + REQUIRE(Response); IoBuffer Payload(Response.ResponsePayload); CbObject OplogResonse = LoadCompactBinaryObject(Payload); @@ -541,7 +541,7 @@ TEST_CASE("project.remote") auto HttpWaitForCompletion = [](ZenServerInstance& Server, const HttpClient::Response& Response) { REQUIRE(Response); const uint64_t JobId = ParseInt(Response.AsText()).value_or(0); - CHECK(JobId != 0); + REQUIRE(JobId != 0); HttpClient Http{Server.GetBaseUri()}; @@ -549,10 +549,10 @@ TEST_CASE("project.remote") { HttpClient::Response StatusResponse = Http.Get(fmt::format("/admin/jobs/{}", JobId), {{"Accept", ToString(ZenContentType::kCbObject)}}); - CHECK(StatusResponse); + REQUIRE(StatusResponse); CbObject ResponseObject = StatusResponse.AsObject(); std::string_view Status = ResponseObject["Status"sv].AsString(); - CHECK(Status != "Aborted"sv); + REQUIRE(Status != "Aborted"sv); if (Status == "Complete"sv) { return; @@ -887,16 +887,16 @@ TEST_CASE("project.rpcappendop") Project.AddString("project"sv, ""sv); Project.AddString("projectfile"sv, ""sv); HttpClient::Response Response = Client.Post(fmt::format("/prj/{}", ProjectName), Project.Save()); - CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); + REQUIRE_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); }; auto MakeOplog = [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName) { HttpClient::Response Response = Client.Post(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName)); - CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); + REQUIRE_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); }; auto GetOplog = [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName) { HttpClient::Response Response = Client.Get(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName)); - CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); + REQUIRE_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); return Response.AsObject(); }; @@ -910,7 +910,7 @@ TEST_CASE("project.rpcappendop") } Request.EndArray(); // "ops" HttpClient::Response Response = Client.Post(fmt::format("/prj/{}/oplog/{}/rpc", ProjectName, OplogName), Request.Save()); - CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); + REQUIRE_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("")); CbObjectView ResponsePayload = Response.AsPackage().GetObject(); CbArrayView NeedArray = ResponsePayload["need"sv].AsArrayView(); diff --git a/src/zenserver-test/workspace-tests.cpp b/src/zenserver-test/workspace-tests.cpp index 7595d790a..aedadf0c3 100644 --- a/src/zenserver-test/workspace-tests.cpp +++ b/src/zenserver-test/workspace-tests.cpp @@ -514,9 +514,9 @@ TEST_CASE("workspaces.share") } IoBuffer BatchResponse = Client.Post(fmt::format("/ws/{}/{}/batch", WorkspaceId, ShareId), BuildChunkBatchRequest(BatchEntries)).ResponsePayload; - CHECK(BatchResponse); + REQUIRE(BatchResponse); std::vector BatchResult = ParseChunkBatchResponse(BatchResponse); - CHECK(BatchResult.size() == Files.size()); + REQUIRE(BatchResult.size() == Files.size()); for (const RequestChunkEntry& Request : BatchEntries) { IoBuffer Result = BatchResult[Request.CorrelationId]; -- cgit v1.2.3 From 4d01aaee0a45f4c9f96e8a4925eff696be98de8d Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Sun, 1 Mar 2026 12:40:20 +0100 Subject: added `--verbose` option to zenserver-test and `xmake test` (#798) * when `--verbose` is specified to zenserver-test, all child process output (typically, zenserver instances) is piped through to stdout. you can also pass `--verbose` to `xmake test` to accomplish the same thing. * this PR also consolidates all test runner `main` function logic (such as from zencore-test, zenhttp-test etc) into central implementation in zencore for consistency and ease of maintenance * also added extended utf8-tests including a fix to `Utf8ToWide()` --- src/zenserver-test/zenserver-test.cpp | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) (limited to 'src/zenserver-test') diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index 4120dec1a..c7ce633d3 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -4,7 +4,6 @@ #if ZEN_WITH_TESTS -# define ZEN_TEST_WITH_RUNNER 1 # include "zenserver-test.h" # include @@ -97,6 +96,7 @@ main(int argc, char** argv) // somehow in the future std::string ServerClass; + bool Verbose = false; for (int i = 1; i < argc; ++i) { @@ -107,10 +107,19 @@ main(int argc, char** argv) ServerClass = argv[++i]; } } + else if (argv[i] == "--verbose"sv) + { + Verbose = true; + } } zen::tests::TestEnv.InitializeForTest(ProgramBaseDir, TestBaseDir, ServerClass); + if (Verbose) + { + zen::tests::TestEnv.SetPassthroughOutput(true); + } + ZEN_INFO("Running tests...(base dir: '{}')", TestBaseDir); zen::testing::TestRunner Runner; -- cgit v1.2.3 From d604351cb5dc3032a7cb8c84d6ad5f1480325e5c Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Mon, 2 Mar 2026 09:37:14 +0100 Subject: Add test suites (#799) Makes all test cases part of a test suite. Test suites are named after the module and the name of the file containing the implementation of the test. * This allows for better and more predictable filtering of which test cases to run which should also be able to reduce the time CI spends in tests since it can filter on the tests for that particular module. Also improves `xmake test` behaviour: * instead of an explicit list of projects just enumerate the test projects which are available based on build system state * also introduces logic to avoid running `xmake config` unnecessarily which would invalidate the existing build and do lots of unnecessary work since dependencies were invalidated by the updated config * also invokes build only for the chosen test targets As a bonus, also adds `xmake sln --open` which allows opening IDE after generation of solution/xmake project is done. --- src/zenserver-test/buildstore-tests.cpp | 4 ++++ src/zenserver-test/cache-tests.cpp | 4 ++++ src/zenserver-test/cacherequests.cpp | 4 ++++ src/zenserver-test/function-tests.cpp | 4 ++++ src/zenserver-test/hub-tests.cpp | 6 +++--- src/zenserver-test/projectstore-tests.cpp | 4 ++++ src/zenserver-test/workspace-tests.cpp | 4 ++++ src/zenserver-test/zenserver-test.cpp | 5 +++++ 8 files changed, 32 insertions(+), 3 deletions(-) (limited to 'src/zenserver-test') diff --git a/src/zenserver-test/buildstore-tests.cpp b/src/zenserver-test/buildstore-tests.cpp index 7cd31db06..cf9b10896 100644 --- a/src/zenserver-test/buildstore-tests.cpp +++ b/src/zenserver-test/buildstore-tests.cpp @@ -27,6 +27,8 @@ namespace zen::tests { using namespace std::literals; +TEST_SUITE_BEGIN("server.buildstore"); + TEST_CASE("buildstore.blobs") { std::filesystem::path SystemRootPath = TestEnv.CreateNewTestDir(); @@ -700,5 +702,7 @@ TEST_CASE("buildstore.cache") } } +TEST_SUITE_END(); + } // namespace zen::tests #endif diff --git a/src/zenserver-test/cache-tests.cpp b/src/zenserver-test/cache-tests.cpp index 745a89253..334dd04ab 100644 --- a/src/zenserver-test/cache-tests.cpp +++ b/src/zenserver-test/cache-tests.cpp @@ -23,6 +23,8 @@ namespace zen::tests { +TEST_SUITE_BEGIN("server.cache"); + TEST_CASE("zcache.basic") { using namespace std::literals; @@ -2663,6 +2665,8 @@ TEST_CASE("zcache.batchoperations") } } +TEST_SUITE_END(); + } // namespace zen::tests #endif diff --git a/src/zenserver-test/cacherequests.cpp b/src/zenserver-test/cacherequests.cpp index 46339aebb..f5302a359 100644 --- a/src/zenserver-test/cacherequests.cpp +++ b/src/zenserver-test/cacherequests.cpp @@ -1037,6 +1037,8 @@ namespace zen { namespace cacherequests { static CompressedBuffer MakeCompressedBuffer(size_t Size) { return CompressedBuffer::Compress(SharedBuffer(IoBuffer(Size))); }; + TEST_SUITE_BEGIN("server.cacherequests"); + TEST_CASE("cacherequests.put.cache.records") { PutCacheRecordsRequest EmptyRequest; @@ -1458,5 +1460,7 @@ namespace zen { namespace cacherequests { "!default!", Invalid)); } + + TEST_SUITE_END(); #endif }} // namespace zen::cacherequests diff --git a/src/zenserver-test/function-tests.cpp b/src/zenserver-test/function-tests.cpp index 559387fa2..82848c6ad 100644 --- a/src/zenserver-test/function-tests.cpp +++ b/src/zenserver-test/function-tests.cpp @@ -16,6 +16,8 @@ namespace zen::tests { using namespace std::literals; +TEST_SUITE_BEGIN("server.function"); + TEST_CASE("function.run") { std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); @@ -29,6 +31,8 @@ TEST_CASE("function.run") Instance.WaitUntilReady(); } +TEST_SUITE_END(); + } // namespace zen::tests #endif diff --git a/src/zenserver-test/hub-tests.cpp b/src/zenserver-test/hub-tests.cpp index bd85a5020..11531e30f 100644 --- a/src/zenserver-test/hub-tests.cpp +++ b/src/zenserver-test/hub-tests.cpp @@ -24,7 +24,7 @@ namespace zen::tests::hub { using namespace std::literals; -TEST_SUITE_BEGIN("hub.lifecycle"); +TEST_SUITE_BEGIN("server.hub"); TEST_CASE("hub.lifecycle.basic") { @@ -230,8 +230,6 @@ TEST_CASE("hub.lifecycle.children") } } -TEST_SUITE_END(); - TEST_CASE("hub.consul.lifecycle" * doctest::skip()) { zen::consul::ConsulProcess ConsulProc; @@ -248,5 +246,7 @@ TEST_CASE("hub.consul.lifecycle" * doctest::skip()) ConsulProc.StopConsulAgent(); } +TEST_SUITE_END(); + } // namespace zen::tests::hub #endif diff --git a/src/zenserver-test/projectstore-tests.cpp b/src/zenserver-test/projectstore-tests.cpp index 487832405..c73910aaa 100644 --- a/src/zenserver-test/projectstore-tests.cpp +++ b/src/zenserver-test/projectstore-tests.cpp @@ -27,6 +27,8 @@ namespace zen::tests { using namespace std::literals; +TEST_SUITE_BEGIN("server.projectstore"); + TEST_CASE("project.basic") { using namespace std::literals; @@ -1053,6 +1055,8 @@ TEST_CASE("project.rpcappendop") } } +TEST_SUITE_END(); + } // namespace zen::tests #endif diff --git a/src/zenserver-test/workspace-tests.cpp b/src/zenserver-test/workspace-tests.cpp index aedadf0c3..655f28872 100644 --- a/src/zenserver-test/workspace-tests.cpp +++ b/src/zenserver-test/workspace-tests.cpp @@ -73,6 +73,8 @@ GenerateFolderContent2(const std::filesystem::path& RootPath) return Result; } +TEST_SUITE_BEGIN("server.workspace"); + TEST_CASE("workspaces.create") { using namespace std::literals; @@ -537,5 +539,7 @@ TEST_CASE("workspaces.share") CHECK(Client.Get(fmt::format("/ws/{}", WorkspaceId)).StatusCode == HttpResponseCode::NotFound); } +TEST_SUITE_END(); + } // namespace zen::tests #endif diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index c7ce633d3..bd36d731f 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -123,6 +123,7 @@ main(int argc, char** argv) ZEN_INFO("Running tests...(base dir: '{}')", TestBaseDir); zen::testing::TestRunner Runner; + Runner.SetDefaultSuiteFilter("server.*"); Runner.ApplyCommandLine(argc, argv); return Runner.Run(); @@ -130,6 +131,8 @@ main(int argc, char** argv) namespace zen::tests { +TEST_SUITE_BEGIN("server.zenserver"); + TEST_CASE("default.single") { std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); @@ -336,6 +339,8 @@ TEST_CASE("http.package") CHECK_EQ(ResponsePackage, TestPackage); } +TEST_SUITE_END(); + # if 0 TEST_CASE("lifetime.owner") { -- cgit v1.2.3 From 1558b202663d9d18f87b384110891b190ad24ea2 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 3 Mar 2026 13:17:38 +0100 Subject: fix objectstore uri path parsing (#801) * add objectstore tests * in http router, for last matcher, test if it matches the remaining part of the uri --- src/zenserver-test/objectstore-tests.cpp | 74 ++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 src/zenserver-test/objectstore-tests.cpp (limited to 'src/zenserver-test') diff --git a/src/zenserver-test/objectstore-tests.cpp b/src/zenserver-test/objectstore-tests.cpp new file mode 100644 index 000000000..f3db5fdf6 --- /dev/null +++ b/src/zenserver-test/objectstore-tests.cpp @@ -0,0 +1,74 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#if ZEN_WITH_TESTS +# include "zenserver-test.h" +# include +# include +# include +# include + +ZEN_THIRD_PARTY_INCLUDES_START +# include +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen::tests { + +using namespace std::literals; + +TEST_SUITE_BEGIN("server.objectstore"); + +TEST_CASE("objectstore.blobs") +{ + std::string_view Bucket = "bkt"sv; + + std::vector CompressedBlobsHashes; + std::vector BlobsSizes; + std::vector CompressedBlobsSizes; + { + ZenServerInstance Instance(TestEnv); + + const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady(fmt::format("--objectstore-enabled")); + CHECK(PortNumber != 0); + + HttpClient Client(Instance.GetBaseUri() + "/obj/"); + + for (size_t I = 0; I < 5; I++) + { + IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); + BlobsSizes.push_back(Blob.GetSize()); + CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); + CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); + CompressedBlobsSizes.push_back(CompressedBlob.GetCompressedSize()); + IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCompressedBinary); + + std::string ObjectPath = fmt::format("{}/{}.utoc", + CompressedBlobsHashes.back().ToHexString().substr(0, 2), + CompressedBlobsHashes.back().ToHexString()); + + HttpClient::Response Result = Client.Put(fmt::format("bucket/{}/{}.utoc", Bucket, ObjectPath), Payload); + CHECK(Result); + } + + for (size_t I = 0; I < 5; I++) + { + std::string ObjectPath = + fmt::format("{}/{}.utoc", CompressedBlobsHashes[I].ToHexString().substr(0, 2), CompressedBlobsHashes[I].ToHexString()); + HttpClient::Response Result = Client.Get(fmt::format("bucket/{}/{}.utoc", Bucket, ObjectPath)); + CHECK(Result); + CHECK_EQ(Result.ResponsePayload.GetSize(), CompressedBlobsSizes[I]); + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(Result.ResponsePayload)), RawHash, RawSize); + CHECK(Compressed); + CHECK_EQ(RawHash, CompressedBlobsHashes[I]); + CHECK_EQ(RawSize, BlobsSizes[I]); + } + } +} + +TEST_SUITE_END(); + +} // namespace zen::tests +#endif -- cgit v1.2.3 From 0763d09a81e5a1d3df11763a7ec75e7860c9510a Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Wed, 4 Mar 2026 14:13:46 +0100 Subject: compute orchestration (#763) - Added local process runners for Linux/Wine, Mac with some sandboxing support - Horde & Nomad provisioning for development and testing - Client session queues with lifecycle management (active/draining/cancelled), automatic retry with configurable limits, and manual reschedule API - Improved web UI for orchestrator, compute, and hub dashboards with WebSocket push updates - Some security hardening - Improved scalability and `zen exec` command Still experimental - compute support is disabled by default --- src/zenserver-test/compute-tests.cpp | 1700 +++++++++++++++++++++++++++++++++ src/zenserver-test/function-tests.cpp | 38 - src/zenserver-test/logging-tests.cpp | 257 +++++ src/zenserver-test/nomad-tests.cpp | 126 +++ src/zenserver-test/xmake.lua | 7 +- 5 files changed, 2089 insertions(+), 39 deletions(-) create mode 100644 src/zenserver-test/compute-tests.cpp delete mode 100644 src/zenserver-test/function-tests.cpp create mode 100644 src/zenserver-test/logging-tests.cpp create mode 100644 src/zenserver-test/nomad-tests.cpp (limited to 'src/zenserver-test') diff --git a/src/zenserver-test/compute-tests.cpp b/src/zenserver-test/compute-tests.cpp new file mode 100644 index 000000000..c90ac5d8b --- /dev/null +++ b/src/zenserver-test/compute-tests.cpp @@ -0,0 +1,1700 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include + +#if ZEN_WITH_TESTS && ZEN_WITH_COMPUTE_SERVICES + +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include + +# include "zenserver-test.h" + +# include + +namespace zen::tests::compute { + +using namespace std::literals; + +// BuildSystemVersion and function version GUIDs matching zentest-appstub +static constexpr std::string_view kBuildSystemVersion = "17fe280d-ccd8-4be8-a9d1-89c944a70969"; +static constexpr std::string_view kRot13Version = "13131313-1313-1313-1313-131313131313"; +static constexpr std::string_view kSleepVersion = "88888888-8888-8888-8888-888888888888"; + +// In-memory implementation of ChunkResolver for test use. +// Stores compressed data keyed by decompressed content hash. +class InMemoryChunkResolver : public ChunkResolver +{ +public: + IoBuffer FindChunkByCid(const IoHash& DecompressedId) override + { + auto It = m_Chunks.find(DecompressedId); + if (It != m_Chunks.end()) + { + return It->second; + } + return {}; + } + + void AddChunk(const IoHash& DecompressedId, IoBuffer Data) { m_Chunks[DecompressedId] = std::move(Data); } + +private: + std::unordered_map m_Chunks; +}; + +// Read, compress, and register zentest-appstub as a worker. +// Returns the WorkerId (hash of the worker package object). +static IoHash +RegisterWorker(HttpClient& Client, ZenServerEnvironment& Env) +{ + std::filesystem::path AppStubPath = Env.ProgramBaseDir() / ("zentest-appstub" ZEN_EXE_SUFFIX_LITERAL); + + FileContents AppStubData = zen::ReadFile(AppStubPath); + REQUIRE_MESSAGE(!AppStubData.ErrorCode, fmt::format("Failed to read '{}': {}", AppStubPath.string(), AppStubData.ErrorCode.message())); + + IoBuffer AppStubBuffer = AppStubData.Flatten(); + + CompressedBuffer AppStubCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(AppStubBuffer.GetData(), AppStubBuffer.Size()), + OodleCompressor::Selkie, + OodleCompressionLevel::HyperFast4); + + const IoHash AppStubRawHash = AppStubCompressed.DecodeRawHash(); + const uint64_t AppStubRawSize = AppStubBuffer.Size(); + + CbAttachment AppStubAttachment(std::move(AppStubCompressed), AppStubRawHash); + + CbObjectWriter WorkerWriter; + WorkerWriter << "buildsystem_version"sv << Guid::FromString(kBuildSystemVersion); + WorkerWriter << "path"sv + << "zentest-appstub"sv; + + WorkerWriter.BeginArray("executables"sv); + WorkerWriter.BeginObject(); + WorkerWriter << "name"sv + << "zentest-appstub"sv; + WorkerWriter.AddAttachment("hash"sv, AppStubAttachment); + WorkerWriter << "size"sv << AppStubRawSize; + WorkerWriter.EndObject(); + WorkerWriter.EndArray(); + + WorkerWriter.BeginArray("functions"sv); + WorkerWriter.BeginObject(); + WorkerWriter << "name"sv + << "Rot13"sv; + WorkerWriter << "version"sv << Guid::FromString(kRot13Version); + WorkerWriter.EndObject(); + WorkerWriter.BeginObject(); + WorkerWriter << "name"sv + << "Sleep"sv; + WorkerWriter << "version"sv << Guid::FromString(kSleepVersion); + WorkerWriter.EndObject(); + WorkerWriter.EndArray(); + + CbPackage WorkerPackage; + WorkerPackage.SetObject(WorkerWriter.Save()); + WorkerPackage.AddAttachment(AppStubAttachment); + + const IoHash WorkerId = WorkerPackage.GetObjectHash(); + + const std::string WorkerUrl = fmt::format("/workers/{}", WorkerId.ToHexString()); + HttpClient::Response RegisterResp = Client.Post(WorkerUrl, std::move(WorkerPackage)); + REQUIRE_MESSAGE(RegisterResp, + fmt::format("Worker registration failed: status={}, body={}", int(RegisterResp.StatusCode), RegisterResp.ToText())); + + return WorkerId; +} + +// Build a Rot13 action CbPackage for the given input string. +static CbPackage +BuildRot13ActionPackage(std::string_view Input) +{ + CompressedBuffer InputCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Input.data(), Input.size()), + OodleCompressor::Selkie, + OodleCompressionLevel::HyperFast4); + + const IoHash InputRawHash = InputCompressed.DecodeRawHash(); + const uint64_t InputRawSize = Input.size(); + + CbAttachment InputAttachment(std::move(InputCompressed), InputRawHash); + + CbObjectWriter ActionWriter; + ActionWriter << "Function"sv + << "Rot13"sv; + ActionWriter << "FunctionVersion"sv << Guid::FromString(kRot13Version); + ActionWriter << "BuildSystemVersion"sv << Guid::FromString(kBuildSystemVersion); + ActionWriter.BeginObject("Inputs"sv); + ActionWriter.BeginObject("Source"sv); + ActionWriter.AddAttachment("RawHash"sv, InputAttachment); + ActionWriter << "RawSize"sv << InputRawSize; + ActionWriter.EndObject(); + ActionWriter.EndObject(); + + CbPackage ActionPackage; + ActionPackage.SetObject(ActionWriter.Save()); + ActionPackage.AddAttachment(InputAttachment); + + return ActionPackage; +} + +// Build a Sleep action CbPackage. The worker sleeps for SleepTimeMs before returning its input. +static CbPackage +BuildSleepActionPackage(std::string_view Input, uint64_t SleepTimeMs) +{ + CompressedBuffer InputCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Input.data(), Input.size()), + OodleCompressor::Selkie, + OodleCompressionLevel::HyperFast4); + + const IoHash InputRawHash = InputCompressed.DecodeRawHash(); + const uint64_t InputRawSize = Input.size(); + + CbAttachment InputAttachment(std::move(InputCompressed), InputRawHash); + + CbObjectWriter ActionWriter; + ActionWriter << "Function"sv + << "Sleep"sv; + ActionWriter << "FunctionVersion"sv << Guid::FromString(kSleepVersion); + ActionWriter << "BuildSystemVersion"sv << Guid::FromString(kBuildSystemVersion); + ActionWriter.BeginObject("Inputs"sv); + ActionWriter.BeginObject("Source"sv); + ActionWriter.AddAttachment("RawHash"sv, InputAttachment); + ActionWriter << "RawSize"sv << InputRawSize; + ActionWriter.EndObject(); + ActionWriter.EndObject(); + ActionWriter.BeginObject("Constants"sv); + ActionWriter << "SleepTimeMs"sv << SleepTimeMs; + ActionWriter.EndObject(); + + CbPackage ActionPackage; + ActionPackage.SetObject(ActionWriter.Save()); + ActionPackage.AddAttachment(InputAttachment); + + return ActionPackage; +} + +// Build a Sleep action CbObject and populate the chunk resolver with the input attachment. +static CbObject +BuildSleepActionForSession(std::string_view Input, uint64_t SleepTimeMs, InMemoryChunkResolver& Resolver) +{ + CompressedBuffer InputCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Input.data(), Input.size()), + OodleCompressor::Selkie, + OodleCompressionLevel::HyperFast4); + + const IoHash InputRawHash = InputCompressed.DecodeRawHash(); + const uint64_t InputRawSize = Input.size(); + + Resolver.AddChunk(InputRawHash, InputCompressed.GetCompressed().Flatten().AsIoBuffer()); + + CbAttachment InputAttachment(std::move(InputCompressed), InputRawHash); + + CbObjectWriter ActionWriter; + ActionWriter << "Function"sv + << "Sleep"sv; + ActionWriter << "FunctionVersion"sv << Guid::FromString(kSleepVersion); + ActionWriter << "BuildSystemVersion"sv << Guid::FromString(kBuildSystemVersion); + ActionWriter.BeginObject("Inputs"sv); + ActionWriter.BeginObject("Source"sv); + ActionWriter.AddAttachment("RawHash"sv, InputAttachment); + ActionWriter << "RawSize"sv << InputRawSize; + ActionWriter.EndObject(); + ActionWriter.EndObject(); + ActionWriter.BeginObject("Constants"sv); + ActionWriter << "SleepTimeMs"sv << SleepTimeMs; + ActionWriter.EndObject(); + + return ActionWriter.Save(); +} + +static HttpClient::Response +PollForResult(HttpClient& Client, const std::string& ResultUrl, uint64_t TimeoutMs = 30'000) +{ + HttpClient::Response Resp; + Stopwatch Timer; + + while (Timer.GetElapsedTimeMs() < TimeoutMs) + { + Resp = Client.Get(ResultUrl); + + if (Resp.StatusCode == HttpResponseCode::OK) + { + break; + } + + Sleep(100); + } + + return Resp; +} + +static bool +PollForLsnInCompleted(HttpClient& Client, const std::string& CompletedUrl, int Lsn, uint64_t TimeoutMs = 30'000) +{ + Stopwatch Timer; + + while (Timer.GetElapsedTimeMs() < TimeoutMs) + { + HttpClient::Response Resp = Client.Get(CompletedUrl); + + if (Resp) + { + for (auto& Item : Resp.AsObject()["completed"sv]) + { + if (Item.AsInt32() == Lsn) + { + return true; + } + } + } + + Sleep(100); + } + + return false; +} + +static std::string +GetRot13Output(const CbPackage& ResultPackage) +{ + CbObject ResultObj = ResultPackage.GetObject(); + + IoHash OutputHash; + CbFieldView ValuesField = ResultObj["Values"sv]; + + if (CbFieldViewIterator It = begin(ValuesField); It.HasValue()) + { + OutputHash = (*It).AsObjectView()["RawHash"sv].AsHash(); + } + + REQUIRE_MESSAGE(OutputHash != IoHash::Zero, "Expected non-zero output hash in result Values array"); + + const CbAttachment* OutputAttachment = ResultPackage.FindAttachment(OutputHash); + REQUIRE_MESSAGE(OutputAttachment != nullptr, "Output attachment not found in result package"); + + CompressedBuffer OutputCompressed = OutputAttachment->AsCompressedBinary(); + SharedBuffer OutputData = OutputCompressed.Decompress(); + + return std::string(static_cast(OutputData.GetData()), OutputData.GetSize()); +} + +// Mock orchestrator HTTP service that serves GET /orch/agents with a controllable response. +class MockOrchestratorService : public HttpService +{ +public: + MockOrchestratorService() + { + // Initialize with empty worker list + CbObjectWriter Cbo; + Cbo.BeginArray("workers"sv); + Cbo.EndArray(); + m_WorkerList = Cbo.Save(); + } + + const char* BaseUri() const override { return "/orch/"; } + + void HandleRequest(HttpServerRequest& Request) override + { + if (Request.RequestVerb() == HttpVerb::kGet && Request.RelativeUri() == "agents"sv) + { + RwLock::SharedLockScope Lock(m_Lock); + Request.WriteResponse(HttpResponseCode::OK, m_WorkerList); + return; + } + Request.WriteResponse(HttpResponseCode::NotFound); + } + + void SetWorkerList(CbObject WorkerList) + { + RwLock::ExclusiveLockScope Lock(m_Lock); + m_WorkerList = std::move(WorkerList); + } + +private: + RwLock m_Lock; + CbObject m_WorkerList; +}; + +// Manages in-process ASIO HTTP server lifecycle for mock orchestrator. +struct MockOrchestratorFixture +{ + MockOrchestratorService Service; + ScopedTemporaryDirectory TmpDir; + Ref Server; + std::thread ServerThread; + uint16_t Port = 0; + + MockOrchestratorFixture() + { + HttpServerConfig Config; + Config.ServerClass = "asio"; + Config.ForceLoopback = true; + Server = CreateHttpServer(Config); + Server->RegisterService(Service); + Port = static_cast(Server->Initialize(TestEnv.GetNewPortNumber(), TmpDir.Path())); + ZEN_ASSERT(Port != 0); + ServerThread = std::thread([this]() { Server->Run(false); }); + } + + ~MockOrchestratorFixture() + { + Server->RequestExit(); + if (ServerThread.joinable()) + { + ServerThread.join(); + } + Server->Close(); + } + + std::string GetEndpoint() const { return fmt::format("http://localhost:{}", Port); } +}; + +// Build the CbObject response for /orch/agents matching the format UpdateCoordinatorState expects. +static CbObject +BuildAgentListResponse(std::initializer_list> Workers) +{ + CbObjectWriter Cbo; + Cbo.BeginArray("workers"sv); + for (const auto& [Id, Uri] : Workers) + { + Cbo.BeginObject(); + Cbo << "id"sv << Id; + Cbo << "uri"sv << Uri; + Cbo << "hostname"sv + << "localhost"sv; + Cbo << "reachable"sv << true; + Cbo << "dt"sv << uint64_t(0); + Cbo.EndObject(); + } + Cbo.EndArray(); + return Cbo.Save(); +} + +// Build the worker CbPackage for zentest-appstub AND populate the chunk resolver. +// This is the same logic as RegisterWorker() but returns the package instead of POSTing it. +static CbPackage +BuildWorkerPackage(ZenServerEnvironment& Env, InMemoryChunkResolver& Resolver) +{ + std::filesystem::path AppStubPath = Env.ProgramBaseDir() / ("zentest-appstub" ZEN_EXE_SUFFIX_LITERAL); + + FileContents AppStubData = zen::ReadFile(AppStubPath); + REQUIRE_MESSAGE(!AppStubData.ErrorCode, fmt::format("Failed to read '{}': {}", AppStubPath.string(), AppStubData.ErrorCode.message())); + + IoBuffer AppStubBuffer = AppStubData.Flatten(); + + CompressedBuffer AppStubCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(AppStubBuffer.GetData(), AppStubBuffer.Size()), + OodleCompressor::Selkie, + OodleCompressionLevel::HyperFast4); + + const IoHash AppStubRawHash = AppStubCompressed.DecodeRawHash(); + const uint64_t AppStubRawSize = AppStubBuffer.Size(); + + // Store compressed data in chunk resolver for when the remote runner needs it + Resolver.AddChunk(AppStubRawHash, AppStubCompressed.GetCompressed().Flatten().AsIoBuffer()); + + CbAttachment AppStubAttachment(std::move(AppStubCompressed), AppStubRawHash); + + CbObjectWriter WorkerWriter; + WorkerWriter << "buildsystem_version"sv << Guid::FromString(kBuildSystemVersion); + WorkerWriter << "path"sv + << "zentest-appstub"sv; + + WorkerWriter.BeginArray("executables"sv); + WorkerWriter.BeginObject(); + WorkerWriter << "name"sv + << "zentest-appstub"sv; + WorkerWriter.AddAttachment("hash"sv, AppStubAttachment); + WorkerWriter << "size"sv << AppStubRawSize; + WorkerWriter.EndObject(); + WorkerWriter.EndArray(); + + WorkerWriter.BeginArray("functions"sv); + WorkerWriter.BeginObject(); + WorkerWriter << "name"sv + << "Rot13"sv; + WorkerWriter << "version"sv << Guid::FromString(kRot13Version); + WorkerWriter.EndObject(); + WorkerWriter.BeginObject(); + WorkerWriter << "name"sv + << "Sleep"sv; + WorkerWriter << "version"sv << Guid::FromString(kSleepVersion); + WorkerWriter.EndObject(); + WorkerWriter.EndArray(); + + CbPackage WorkerPackage; + WorkerPackage.SetObject(WorkerWriter.Save()); + WorkerPackage.AddAttachment(AppStubAttachment); + + return WorkerPackage; +} + +// Build a Rot13 action CbObject (not CbPackage) and populate the chunk resolver with the input attachment. +static CbObject +BuildRot13ActionForSession(std::string_view Input, InMemoryChunkResolver& Resolver) +{ + CompressedBuffer InputCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Input.data(), Input.size()), + OodleCompressor::Selkie, + OodleCompressionLevel::HyperFast4); + + const IoHash InputRawHash = InputCompressed.DecodeRawHash(); + const uint64_t InputRawSize = Input.size(); + + // Store compressed data in chunk resolver + Resolver.AddChunk(InputRawHash, InputCompressed.GetCompressed().Flatten().AsIoBuffer()); + + CbAttachment InputAttachment(std::move(InputCompressed), InputRawHash); + + CbObjectWriter ActionWriter; + ActionWriter << "Function"sv + << "Rot13"sv; + ActionWriter << "FunctionVersion"sv << Guid::FromString(kRot13Version); + ActionWriter << "BuildSystemVersion"sv << Guid::FromString(kBuildSystemVersion); + ActionWriter.BeginObject("Inputs"sv); + ActionWriter.BeginObject("Source"sv); + ActionWriter.AddAttachment("RawHash"sv, InputAttachment); + ActionWriter << "RawSize"sv << InputRawSize; + ActionWriter.EndObject(); + ActionWriter.EndObject(); + + return ActionWriter.Save(); +} + +TEST_SUITE_BEGIN("server.function"); + +TEST_CASE("function.rot13") +{ + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); + REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); + HttpClient Client(ComputeBaseUri); + + const IoHash WorkerId = RegisterWorker(Client, TestEnv); + + // Submit action via legacy /jobs/{worker} endpoint + const std::string JobUrl = fmt::format("/jobs/{}", WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello World"sv)); + REQUIRE_MESSAGE(SubmitResp, fmt::format("Job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText())); + + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from job submission"); + + // Poll for result via legacy /jobs/{lsn} endpoint + const std::string ResultUrl = fmt::format("/jobs/{}", Lsn); + HttpClient::Response ResultResp = PollForResult(Client, ResultUrl); + REQUIRE_MESSAGE( + ResultResp.StatusCode == HttpResponseCode::OK, + fmt::format("Job did not complete in time. Last status: {}\nServer log:\n{}", int(ResultResp.StatusCode), Instance.GetLogOutput())); + + // Verify result: Rot13("Hello World") == "Uryyb Jbeyq" + CbPackage ResultPackage = ResultResp.AsPackage(); + REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Action failed (empty result package)\nServer log:\n{}", Instance.GetLogOutput())); + + CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv); +} + +TEST_CASE("function.workers") +{ + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); + REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); + HttpClient Client(ComputeBaseUri); + + // Before registration, GET /workers should return an empty list + HttpClient::Response EmptyListResp = Client.Get("/workers"sv); + REQUIRE_MESSAGE(EmptyListResp, "Failed to list workers before registration"); + CHECK_EQ(EmptyListResp.AsObject()["workers"sv].AsArrayView().Num(), 0); + + const IoHash WorkerId = RegisterWorker(Client, TestEnv); + + // GET /workers — the registered worker should appear in the listing + HttpClient::Response ListResp = Client.Get("/workers"sv); + REQUIRE_MESSAGE(ListResp, "Failed to list workers after registration"); + + bool WorkerFound = false; + for (auto& Item : ListResp.AsObject()["workers"sv]) + { + if (Item.AsHash() == WorkerId) + { + WorkerFound = true; + break; + } + } + + REQUIRE_MESSAGE(WorkerFound, fmt::format("Worker {} not found in worker listing", WorkerId.ToHexString())); + + // GET /workers/{worker} — descriptor should match what was registered + const std::string WorkerUrl = fmt::format("/workers/{}", WorkerId.ToHexString()); + HttpClient::Response DescResp = Client.Get(WorkerUrl); + REQUIRE_MESSAGE(DescResp, fmt::format("Failed to get worker descriptor: status={}", int(DescResp.StatusCode))); + + CbObject Desc = DescResp.AsObject(); + CHECK_EQ(Desc["buildsystem_version"sv].AsUuid(), Guid::FromString(kBuildSystemVersion)); + CHECK_EQ(Desc["path"sv].AsString(), "zentest-appstub"sv); + + bool Rot13Found = false; + bool SleepFound = false; + for (auto& Item : Desc["functions"sv]) + { + std::string_view Name = Item.AsObjectView()["name"sv].AsString(); + if (Name == "Rot13"sv) + { + CHECK_EQ(Item.AsObjectView()["version"sv].AsUuid(), Guid::FromString(kRot13Version)); + Rot13Found = true; + } + else if (Name == "Sleep"sv) + { + CHECK_EQ(Item.AsObjectView()["version"sv].AsUuid(), Guid::FromString(kSleepVersion)); + SleepFound = true; + } + } + + CHECK_MESSAGE(Rot13Found, "Rot13 function not found in worker descriptor"); + CHECK_MESSAGE(SleepFound, "Sleep function not found in worker descriptor"); + + // GET /workers/{unknown} — should return 404 + const std::string UnknownUrl = fmt::format("/workers/{}", IoHash::Zero.ToHexString()); + HttpClient::Response NotFoundResp = Client.Get(UnknownUrl); + CHECK_EQ(NotFoundResp.StatusCode, HttpResponseCode::NotFound); +} + +TEST_CASE("function.queues.lifecycle") +{ + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); + REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); + HttpClient Client(ComputeBaseUri); + + const IoHash WorkerId = RegisterWorker(Client, TestEnv); + + // Create a queue + HttpClient::Response CreateResp = Client.Post("/queues"sv); + REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}, body={}", int(CreateResp.StatusCode), CreateResp.ToText())); + + const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); + REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id from queue creation"); + + // Verify the queue appears in the listing + HttpClient::Response ListResp = Client.Get("/queues"sv); + REQUIRE_MESSAGE(ListResp, "Failed to list queues"); + + bool QueueFound = false; + for (auto& Item : ListResp.AsObject()["queues"sv]) + { + if (Item.AsObjectView()["queue_id"sv].AsInt32() == QueueId) + { + QueueFound = true; + break; + } + } + + REQUIRE_MESSAGE(QueueFound, fmt::format("Queue {} not found in queue listing", QueueId)); + + // Submit action via queue-scoped endpoint + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello World"sv)); + REQUIRE_MESSAGE(SubmitResp, + fmt::format("Queue job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText())); + + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from queue job submission"); + + // Poll for completion via queue-scoped /completed endpoint + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), + fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}", + Lsn, + QueueId, + Instance.GetLogOutput())); + + // Retrieve result via queue-scoped /jobs/{lsn} endpoint + const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn); + HttpClient::Response ResultResp = Client.Get(ResultUrl); + REQUIRE_MESSAGE( + ResultResp.StatusCode == HttpResponseCode::OK, + fmt::format("Failed to retrieve result: status={}\nServer log:\n{}", int(ResultResp.StatusCode), Instance.GetLogOutput())); + + // Verify result: Rot13("Hello World") == "Uryyb Jbeyq" + CbPackage ResultPackage = ResultResp.AsPackage(); + REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Empty result package\nServer log:\n{}", Instance.GetLogOutput())); + + CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv); + + // Verify queue status reflects completion + const std::string StatusUrl = fmt::format("/queues/{}", QueueId); + HttpClient::Response StatusResp = Client.Get(StatusUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); + + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0); + CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 0); + CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "active"); +} + +TEST_CASE("function.queues.cancel") +{ + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); + REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); + HttpClient Client(ComputeBaseUri); + + const IoHash WorkerId = RegisterWorker(Client, TestEnv); + + // Create a queue + HttpClient::Response CreateResp = Client.Post("/queues"sv); + REQUIRE_MESSAGE(CreateResp, "Queue creation failed"); + + const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); + REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id from queue creation"); + + // Submit a job + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello World"sv)); + REQUIRE_MESSAGE(SubmitResp, fmt::format("Job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText())); + + // Cancel the queue + const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + HttpClient::Response CancelResp = Client.Delete(QueueUrl); + REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent, + fmt::format("Queue cancellation failed: status={}, body={}", int(CancelResp.StatusCode), CancelResp.ToText())); + + // Verify queue status shows cancelled + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status after cancel"); + + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "cancelled"); +} + +TEST_CASE("function.queues.remote") +{ + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); + REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); + HttpClient Client(ComputeBaseUri); + + const IoHash WorkerId = RegisterWorker(Client, TestEnv); + + // Create a remote queue — response includes both an integer queue_id and an OID queue_token + HttpClient::Response CreateResp = Client.Post("/queues/remote"sv); + REQUIRE_MESSAGE(CreateResp, + fmt::format("Remote queue creation failed: status={}, body={}", int(CreateResp.StatusCode), CreateResp.ToText())); + + CbObject CreateObj = CreateResp.AsObject(); + const std::string QueueToken = std::string(CreateObj["queue_token"sv].AsString()); + REQUIRE_MESSAGE(!QueueToken.empty(), "Expected non-empty queue_token from remote queue creation"); + + // All subsequent requests use the opaque token in place of the integer queue id + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueToken, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello World"sv)); + REQUIRE_MESSAGE(SubmitResp, + fmt::format("Remote queue job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText())); + + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from remote queue job submission"); + + // Poll for completion via the token-addressed /completed endpoint + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueToken); + REQUIRE_MESSAGE( + PollForLsnInCompleted(Client, CompletedUrl, Lsn), + fmt::format("LSN {} did not appear in remote queue completed list within timeout\nServer log:\n{}", Lsn, Instance.GetLogOutput())); + + // Retrieve result via the token-addressed /jobs/{lsn} endpoint + const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueToken, Lsn); + HttpClient::Response ResultResp = Client.Get(ResultUrl); + REQUIRE_MESSAGE(ResultResp.StatusCode == HttpResponseCode::OK, + fmt::format("Failed to retrieve result from remote queue: status={}\nServer log:\n{}", + int(ResultResp.StatusCode), + Instance.GetLogOutput())); + + // Verify result: Rot13("Hello World") == "Uryyb Jbeyq" + CbPackage ResultPackage = ResultResp.AsPackage(); + REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Empty result package\nServer log:\n{}", Instance.GetLogOutput())); + + CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv); +} + +TEST_CASE("function.queues.cancel_running") +{ + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); + REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); + HttpClient Client(ComputeBaseUri); + + const IoHash WorkerId = RegisterWorker(Client, TestEnv); + + // Create a queue + HttpClient::Response CreateResp = Client.Post("/queues"sv); + REQUIRE_MESSAGE(CreateResp, "Queue creation failed"); + + const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); + REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id from queue creation"); + + // Submit a Sleep job long enough that it will still be running when we cancel + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildSleepActionPackage("data"sv, 30'000)); + REQUIRE_MESSAGE(SubmitResp, + fmt::format("Sleep job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText())); + + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission"); + + // Wait for the worker process to start executing before cancelling + Sleep(1'000); + + // Cancel the queue, which should interrupt the running Sleep job + const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + HttpClient::Response CancelResp = Client.Delete(QueueUrl); + REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent, + fmt::format("Queue cancellation failed: status={}, body={}", int(CancelResp.StatusCode), CancelResp.ToText())); + + // The cancelled job should appear in the /completed endpoint once the process exits + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), + fmt::format("LSN {} did not appear in queue {} completed list after cancel\nServer log:\n{}", + Lsn, + QueueId, + Instance.GetLogOutput())); + + // Verify the queue reflects one cancelled action + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status after cancel"); + + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "cancelled"); + CHECK_EQ(QueueStatus["cancelled_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); +} + +TEST_CASE("function.queues.remote_cancel") +{ + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); + REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); + HttpClient Client(ComputeBaseUri); + + const IoHash WorkerId = RegisterWorker(Client, TestEnv); + + // Create a remote queue to obtain an OID token for token-addressed cancellation + HttpClient::Response CreateResp = Client.Post("/queues/remote"sv); + REQUIRE_MESSAGE(CreateResp, + fmt::format("Remote queue creation failed: status={}, body={}", int(CreateResp.StatusCode), CreateResp.ToText())); + + const std::string QueueToken = std::string(CreateResp.AsObject()["queue_token"sv].AsString()); + REQUIRE_MESSAGE(!QueueToken.empty(), "Expected non-empty queue_token from remote queue creation"); + + // Submit a long-running Sleep job via the token-addressed endpoint + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueToken, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildSleepActionPackage("data"sv, 30'000)); + REQUIRE_MESSAGE(SubmitResp, + fmt::format("Sleep job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText())); + + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission"); + + // Wait for the worker process to start executing before cancelling + Sleep(1'000); + + // Cancel the queue via its OID token + const std::string QueueUrl = fmt::format("/queues/{}", QueueToken); + HttpClient::Response CancelResp = Client.Delete(QueueUrl); + REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent, + fmt::format("Remote queue cancellation failed: status={}, body={}", int(CancelResp.StatusCode), CancelResp.ToText())); + + // The cancelled job should appear in the token-addressed /completed endpoint + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueToken); + REQUIRE_MESSAGE( + PollForLsnInCompleted(Client, CompletedUrl, Lsn), + fmt::format("LSN {} did not appear in remote queue completed list after cancel\nServer log:\n{}", Lsn, Instance.GetLogOutput())); + + // Verify the queue status reflects the cancellation + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get remote queue status after cancel"); + + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "cancelled"); + CHECK_EQ(QueueStatus["cancelled_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); +} + +TEST_CASE("function.queues.drain") +{ + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); + REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); + HttpClient Client(ComputeBaseUri); + + const IoHash WorkerId = RegisterWorker(Client, TestEnv); + + // Create a queue + HttpClient::Response CreateResp = Client.Post("/queues"sv); + REQUIRE_MESSAGE(CreateResp, "Queue creation failed"); + + const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); + const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + + // Submit a long-running job so we can verify it completes even after drain + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response Submit1 = Client.Post(JobUrl, BuildSleepActionPackage("data"sv, 2'000)); + REQUIRE_MESSAGE(Submit1, fmt::format("First job submission failed: status={}", int(Submit1.StatusCode))); + const int Lsn1 = Submit1.AsObject()["lsn"sv].AsInt32(); + + // Drain the queue + const std::string DrainUrl = fmt::format("/queues/{}/drain", QueueId); + HttpClient::Response DrainResp = Client.Post(DrainUrl); + REQUIRE_MESSAGE(DrainResp, fmt::format("Drain failed: status={}, body={}", int(DrainResp.StatusCode), DrainResp.ToText())); + CHECK_EQ(std::string(DrainResp.AsObject()["state"sv].AsString()), "draining"); + + // Second submission should be rejected with 424 + HttpClient::Response Submit2 = Client.Post(JobUrl, BuildRot13ActionPackage("Hello"sv)); + CHECK_EQ(Submit2.StatusCode, HttpResponseCode::FailedDependency); + CHECK_EQ(std::string(Submit2.AsObject()["error"sv].AsString()), "queue is draining"); + + // First job should still complete + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn1), + fmt::format("LSN {} did not complete after drain\nServer log:\n{}", Lsn1, Instance.GetLogOutput())); + + // Queue status should show draining + complete + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); + + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "draining"); + CHECK(QueueStatus["is_complete"sv].AsBool()); +} + +TEST_CASE("function.priority") +{ + // Spawn server with max-actions=1 to guarantee serialized action execution, + // which lets us deterministically verify that higher-priority pending jobs + // are scheduled before lower-priority ones. + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady("--max-actions=1"); + REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); + HttpClient Client(ComputeBaseUri); + + const IoHash WorkerId = RegisterWorker(Client, TestEnv); + + // Create a queue for all test jobs + HttpClient::Response CreateResp = Client.Post("/queues"sv); + REQUIRE_MESSAGE(CreateResp, "Queue creation failed"); + + const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); + REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id"); + + // Submit a blocker Sleep job to occupy the single execution slot. + // Once the blocker is running, the scheduler must choose among the pending + // jobs by priority when the slot becomes free. + const std::string BlockerJobUrl = fmt::format("/queues/{}/jobs/{}?priority=0", QueueId, WorkerId.ToHexString()); + HttpClient::Response BlockerResp = Client.Post(BlockerJobUrl, BuildSleepActionPackage("data"sv, 1'000)); + REQUIRE_MESSAGE(BlockerResp, fmt::format("Blocker job submission failed: status={}", int(BlockerResp.StatusCode))); + + // Submit 3 low-priority Rot13 jobs + const std::string LowJobUrl = fmt::format("/queues/{}/jobs/{}?priority=0", QueueId, WorkerId.ToHexString()); + + HttpClient::Response LowResp1 = Client.Post(LowJobUrl, BuildRot13ActionPackage("low1"sv)); + REQUIRE_MESSAGE(LowResp1, "Low-priority job 1 submission failed"); + const int LsnLow1 = LowResp1.AsObject()["lsn"sv].AsInt32(); + + HttpClient::Response LowResp2 = Client.Post(LowJobUrl, BuildRot13ActionPackage("low2"sv)); + REQUIRE_MESSAGE(LowResp2, "Low-priority job 2 submission failed"); + const int LsnLow2 = LowResp2.AsObject()["lsn"sv].AsInt32(); + + HttpClient::Response LowResp3 = Client.Post(LowJobUrl, BuildRot13ActionPackage("low3"sv)); + REQUIRE_MESSAGE(LowResp3, "Low-priority job 3 submission failed"); + const int LsnLow3 = LowResp3.AsObject()["lsn"sv].AsInt32(); + + // Submit 1 high-priority Rot13 job — should execute before the low-priority ones + const std::string HighJobUrl = fmt::format("/queues/{}/jobs/{}?priority=10", QueueId, WorkerId.ToHexString()); + HttpClient::Response HighResp = Client.Post(HighJobUrl, BuildRot13ActionPackage("high"sv)); + REQUIRE_MESSAGE(HighResp, "High-priority job submission failed"); + const int LsnHigh = HighResp.AsObject()["lsn"sv].AsInt32(); + + // Wait for all 4 priority-test jobs to appear in the queue's completed list. + // This avoids any snapshot-timing race: by the time we compare timestamps, all + // jobs have already finished and their history entries are stable. + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + + { + bool AllCompleted = false; + Stopwatch WaitTimer; + + while (!AllCompleted && WaitTimer.GetElapsedTimeMs() < 30'000) + { + HttpClient::Response Resp = Client.Get(CompletedUrl); + + if (Resp) + { + bool FoundHigh = false; + bool FoundLow1 = false; + bool FoundLow2 = false; + bool FoundLow3 = false; + + CbObject RespObj = Resp.AsObject(); + + for (auto& Item : RespObj["completed"sv]) + { + const int Lsn = Item.AsInt32(); + if (Lsn == LsnHigh) + { + FoundHigh = true; + } + else if (Lsn == LsnLow1) + { + FoundLow1 = true; + } + else if (Lsn == LsnLow2) + { + FoundLow2 = true; + } + else if (Lsn == LsnLow3) + { + FoundLow3 = true; + } + } + + AllCompleted = FoundHigh && FoundLow1 && FoundLow2 && FoundLow3; + } + + if (!AllCompleted) + { + Sleep(100); + } + } + + REQUIRE_MESSAGE( + AllCompleted, + fmt::format( + "Not all priority test jobs completed within timeout (lsnHigh={} lsnLow1={} lsnLow2={} lsnLow3={})\nServer log:\n{}", + LsnHigh, + LsnLow1, + LsnLow2, + LsnLow3, + Instance.GetLogOutput())); + } + + // Query the queue-scoped history to obtain the time_Completed timestamp for each + // job. The history endpoint records when each RunnerAction::State transition + // occurred, so time_Completed is the wall-clock tick at which the action finished. + // Using the queue-scoped endpoint avoids exposing history from other queues. + const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); + HttpClient::Response HistoryResp = Client.Get(HistoryUrl); + REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); + + CbObject HistoryObj = HistoryResp.AsObject(); + + auto GetCompletedTimestamp = [&](int Lsn) -> uint64_t { + for (auto& Item : HistoryObj["history"sv]) + { + if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + { + return Item.AsObjectView()["time_Completed"sv].AsUInt64(); + } + } + return 0; + }; + + const uint64_t TimeHigh = GetCompletedTimestamp(LsnHigh); + const uint64_t TimeLow1 = GetCompletedTimestamp(LsnLow1); + const uint64_t TimeLow2 = GetCompletedTimestamp(LsnLow2); + const uint64_t TimeLow3 = GetCompletedTimestamp(LsnLow3); + + REQUIRE_MESSAGE(TimeHigh != 0, fmt::format("lsnHigh={} not found in action history", LsnHigh)); + REQUIRE_MESSAGE(TimeLow1 != 0, fmt::format("lsnLow1={} not found in action history", LsnLow1)); + REQUIRE_MESSAGE(TimeLow2 != 0, fmt::format("lsnLow2={} not found in action history", LsnLow2)); + REQUIRE_MESSAGE(TimeLow3 != 0, fmt::format("lsnLow3={} not found in action history", LsnLow3)); + + // The high-priority job must have completed strictly before every low-priority job + CHECK_MESSAGE(TimeHigh < TimeLow1, + fmt::format("Priority ordering violated: lsnHigh={} completed at t={} but lsnLow1={} completed at t={} (expected later)", + LsnHigh, + TimeHigh, + LsnLow1, + TimeLow1)); + CHECK_MESSAGE(TimeHigh < TimeLow2, + fmt::format("Priority ordering violated: lsnHigh={} completed at t={} but lsnLow2={} completed at t={} (expected later)", + LsnHigh, + TimeHigh, + LsnLow2, + TimeLow2)); + CHECK_MESSAGE(TimeHigh < TimeLow3, + fmt::format("Priority ordering violated: lsnHigh={} completed at t={} but lsnLow3={} completed at t={} (expected later)", + LsnHigh, + TimeHigh, + LsnLow3, + TimeLow3)); +} + +////////////////////////////////////////////////////////////////////////// +// Remote worker synchronization tests +// +// These tests exercise the orchestrator discovery path where new compute +// nodes appear over time and must receive previously registered workers +// via SyncWorkersToRunner(). + +TEST_CASE("function.remote.worker_sync_on_discovery") +{ + // Spawn real zenserver in compute mode + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t ServerPort = Instance.SpawnServerAndWaitUntilReady(); + REQUIRE_MESSAGE(ServerPort != 0, Instance.GetLogOutput()); + + const std::string ServerUri = fmt::format("http://localhost:{}", ServerPort); + + // Start mock orchestrator with empty worker list + MockOrchestratorFixture MockOrch; + + // Create session infrastructure + InMemoryChunkResolver Resolver; + ScopedTemporaryDirectory SessionBaseDir; + zen::compute::ComputeServiceSession Session(Resolver); + Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint()); + Session.SetOrchestratorBasePath(SessionBaseDir.Path()); + Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); + + // Register worker on session (stored locally, no runners yet) + CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); + Session.RegisterWorker(WorkerPackage); + + // Update mock orchestrator to advertise the real server + MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", ServerUri}})); + + // Wait for scheduler to discover the runner (~5s throttle + margin) + Sleep(7'000); + + // Submit Rot13 action via session + CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver); + + zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueAction(ActionObj, 0); + REQUIRE_MESSAGE(EnqueueRes, "Action enqueue failed"); + + // Poll for result + CbPackage ResultPackage; + HttpResponseCode ResultCode = HttpResponseCode::Accepted; + Stopwatch Timer; + + while (Timer.GetElapsedTimeMs() < 30'000) + { + ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage); + if (ResultCode == HttpResponseCode::OK) + { + break; + } + Sleep(200); + } + + REQUIRE_MESSAGE( + ResultCode == HttpResponseCode::OK, + fmt::format("Action did not complete in time. Last status: {}\nServer log:\n{}", int(ResultCode), Instance.GetLogOutput())); + + REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Empty result package\nServer log:\n{}", Instance.GetLogOutput())); + + CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv); + + Session.Shutdown(); +} + +TEST_CASE("function.remote.late_runner_discovery") +{ + // Spawn first server + ZenServerInstance Instance1(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance1.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port1 = Instance1.SpawnServerAndWaitUntilReady(); + REQUIRE_MESSAGE(Port1 != 0, Instance1.GetLogOutput()); + + const std::string ServerUri1 = fmt::format("http://localhost:{}", Port1); + + // Start mock orchestrator advertising W1 + MockOrchestratorFixture MockOrch; + MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", ServerUri1}})); + + // Create session and register worker + InMemoryChunkResolver Resolver; + ScopedTemporaryDirectory SessionBaseDir; + zen::compute::ComputeServiceSession Session(Resolver); + Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint()); + Session.SetOrchestratorBasePath(SessionBaseDir.Path()); + Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); + + CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); + Session.RegisterWorker(WorkerPackage); + + // Wait for W1 discovery + Sleep(7'000); + + // Baseline: submit Rot13 action and verify it completes on W1 + { + CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver); + + zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueAction(ActionObj, 0); + REQUIRE_MESSAGE(EnqueueRes, "Baseline action enqueue failed"); + + CbPackage ResultPackage; + HttpResponseCode ResultCode = HttpResponseCode::Accepted; + Stopwatch Timer; + + while (Timer.GetElapsedTimeMs() < 30'000) + { + ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage); + if (ResultCode == HttpResponseCode::OK) + { + break; + } + Sleep(200); + } + + REQUIRE_MESSAGE(ResultCode == HttpResponseCode::OK, + fmt::format("Baseline action did not complete in time\nServer log:\n{}", Instance1.GetLogOutput())); + + CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv); + } + + // Spawn second server + ZenServerInstance Instance2(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance2.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port2 = Instance2.SpawnServerAndWaitUntilReady(); + REQUIRE_MESSAGE(Port2 != 0, Instance2.GetLogOutput()); + + const std::string ServerUri2 = fmt::format("http://localhost:{}", Port2); + + // Update mock orchestrator to include both W1 and W2 + MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", ServerUri1}, {"worker-2", ServerUri2}})); + + // Wait for W2 discovery + Sleep(7'000); + + // Verify W2 received the worker by querying its /compute/workers endpoint directly + { + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port2); + HttpClient Client(ComputeBaseUri); + HttpClient::Response ListResp = Client.Get("/workers"sv); + REQUIRE_MESSAGE(ListResp, "Failed to list workers on W2"); + + bool WorkerFound = false; + for (auto& Item : ListResp.AsObject()["workers"sv]) + { + if (Item.AsHash() == WorkerPackage.GetObjectHash()) + { + WorkerFound = true; + break; + } + } + + REQUIRE_MESSAGE(WorkerFound, + fmt::format("Worker not found on W2 after discovery — SyncWorkersToRunner may have failed\nW2 log:\n{}", + Instance2.GetLogOutput())); + } + + // Submit another action and verify it completes (could run on either W1 or W2) + { + CbObject ActionObj = BuildRot13ActionForSession("Second Test"sv, Resolver); + + zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueAction(ActionObj, 0); + REQUIRE_MESSAGE(EnqueueRes, "Second action enqueue failed"); + + CbPackage ResultPackage; + HttpResponseCode ResultCode = HttpResponseCode::Accepted; + Stopwatch Timer; + + while (Timer.GetElapsedTimeMs() < 30'000) + { + ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage); + if (ResultCode == HttpResponseCode::OK) + { + break; + } + Sleep(200); + } + + REQUIRE_MESSAGE(ResultCode == HttpResponseCode::OK, + fmt::format("Second action did not complete in time\nW1 log:\n{}\nW2 log:\n{}", + Instance1.GetLogOutput(), + Instance2.GetLogOutput())); + + // Rot13("Second Test") = "Frpbaq Grfg" + CHECK_EQ(GetRot13Output(ResultPackage), "Frpbaq Grfg"sv); + } + + Session.Shutdown(); +} + +TEST_CASE("function.remote.queue_association") +{ + // Spawn real zenserver as a remote compute node + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + REQUIRE_MESSAGE(Instance.SpawnServerAndWaitUntilReady() != 0, Instance.GetLogOutput()); + + // Start mock orchestrator advertising the server + MockOrchestratorFixture MockOrch; + MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", Instance.GetBaseUri()}})); + + // Create session infrastructure + InMemoryChunkResolver Resolver; + ScopedTemporaryDirectory SessionBaseDir; + zen::compute::ComputeServiceSession Session(Resolver); + Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint()); + Session.SetOrchestratorBasePath(SessionBaseDir.Path()); + Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); + + // Register worker on session + CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); + Session.RegisterWorker(WorkerPackage); + + // Wait for scheduler to discover the runner + Sleep(7'000); + + // Create a local queue and submit action to it + auto QueueResult = Session.CreateQueue(); + REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create local queue"); + const int QueueId = QueueResult.QueueId; + + CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver); + + zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueActionToQueue(QueueId, ActionObj, 0); + REQUIRE_MESSAGE(EnqueueRes, "Action enqueue to queue failed"); + + // Poll for result + CbPackage ResultPackage; + HttpResponseCode ResultCode = HttpResponseCode::Accepted; + Stopwatch Timer; + + while (Timer.GetElapsedTimeMs() < 30'000) + { + ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage); + if (ResultCode == HttpResponseCode::OK) + { + break; + } + Sleep(200); + } + + REQUIRE_MESSAGE( + ResultCode == HttpResponseCode::OK, + fmt::format("Action did not complete in time. Last status: {}\nServer log:\n{}", int(ResultCode), Instance.GetLogOutput())); + + REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Empty result package\nServer log:\n{}", Instance.GetLogOutput())); + CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv); + + // Verify that a non-implicit remote queue was created on the compute node + HttpClient Client(Instance.GetBaseUri() + "/compute"); + + HttpClient::Response QueuesResp = Client.Get("/queues"sv); + REQUIRE_MESSAGE(QueuesResp, "Failed to list queues on remote server"); + + bool RemoteQueueFound = false; + for (auto& Item : QueuesResp.AsObject()["queues"sv]) + { + if (!Item.AsObjectView()["implicit"sv].AsBool()) + { + RemoteQueueFound = true; + break; + } + } + + CHECK_MESSAGE(RemoteQueueFound, "Expected a non-implicit remote queue on the compute node"); + + Session.Shutdown(); +} + +TEST_CASE("function.remote.queue_cancel_propagation") +{ + // Spawn real zenserver as a remote compute node + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + REQUIRE_MESSAGE(Instance.SpawnServerAndWaitUntilReady() != 0, Instance.GetLogOutput()); + + // Start mock orchestrator advertising the server + MockOrchestratorFixture MockOrch; + MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", Instance.GetBaseUri()}})); + + // Create session infrastructure + InMemoryChunkResolver Resolver; + ScopedTemporaryDirectory SessionBaseDir; + zen::compute::ComputeServiceSession Session(Resolver); + Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint()); + Session.SetOrchestratorBasePath(SessionBaseDir.Path()); + Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); + + // Register worker on session + CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); + Session.RegisterWorker(WorkerPackage); + + // Wait for scheduler to discover the runner + Sleep(7'000); + + // Create a local queue and submit a long-running Sleep action + auto QueueResult = Session.CreateQueue(); + REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create local queue"); + const int QueueId = QueueResult.QueueId; + + CbObject ActionObj = BuildSleepActionForSession("data"sv, 30'000, Resolver); + + zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueActionToQueue(QueueId, ActionObj, 0); + REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed"); + + // Wait for the action to start running on the remote + Sleep(2'000); + + // Cancel the local queue — this should propagate to the remote + Session.CancelQueue(QueueId); + + // Poll for the action to complete (as cancelled) + CbPackage ResultPackage; + HttpResponseCode ResultCode = HttpResponseCode::Accepted; + Stopwatch Timer; + + while (Timer.GetElapsedTimeMs() < 30'000) + { + ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage); + if (ResultCode == HttpResponseCode::OK) + { + break; + } + Sleep(200); + } + + // Verify the local queue shows cancelled + auto QueueStatus = Session.GetQueueStatus(QueueId); + CHECK(QueueStatus.State == zen::compute::ComputeServiceSession::QueueState::Cancelled); + + // Verify the remote queue on the compute node is also cancelled + HttpClient Client(Instance.GetBaseUri() + "/compute"); + + HttpClient::Response QueuesResp = Client.Get("/queues"sv); + REQUIRE_MESSAGE(QueuesResp, "Failed to list queues on remote server"); + + bool RemoteQueueCancelled = false; + for (auto& Item : QueuesResp.AsObject()["queues"sv]) + { + if (!Item.AsObjectView()["implicit"sv].AsBool()) + { + RemoteQueueCancelled = std::string(Item.AsObjectView()["state"sv].AsString()) == "cancelled"; + break; + } + } + + CHECK_MESSAGE(RemoteQueueCancelled, "Expected the remote queue to be cancelled"); + + Session.Shutdown(); +} + +TEST_CASE("function.abandon_running_http") +{ + // Spawn a real zenserver to execute a long-running action, then abandon via HTTP endpoint + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); + REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); + HttpClient Client(ComputeBaseUri); + + const IoHash WorkerId = RegisterWorker(Client, TestEnv); + + // Create a queue and submit a long-running Sleep job + HttpClient::Response CreateResp = Client.Post("/queues"sv); + REQUIRE_MESSAGE(CreateResp, "Queue creation failed"); + + const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); + REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id"); + + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildSleepActionPackage("data"sv, 30'000)); + REQUIRE_MESSAGE(SubmitResp, fmt::format("Sleep job submission failed: status={}", int(SubmitResp.StatusCode))); + + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN"); + + // Wait for the process to start running + Sleep(1'000); + + // Verify the ready endpoint returns OK before abandon + { + HttpClient::Response ReadyResp = Client.Get("/ready"sv); + CHECK(ReadyResp.StatusCode == HttpResponseCode::OK); + } + + // Trigger abandon via the HTTP endpoint + HttpClient::Response AbandonResp = Client.Post("/abandon"sv); + REQUIRE_MESSAGE(AbandonResp.StatusCode == HttpResponseCode::OK, + fmt::format("Abandon request failed: status={}, body={}", int(AbandonResp.StatusCode), AbandonResp.ToText())); + + // Ready endpoint should now return 503 + { + HttpClient::Response ReadyResp = Client.Get("/ready"sv); + CHECK(ReadyResp.StatusCode == HttpResponseCode::ServiceUnavailable); + } + + // The abandoned action should appear in the completed endpoint once the process exits + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), + fmt::format("LSN {} did not appear in queue {} completed list after abandon\nServer log:\n{}", + Lsn, + QueueId, + Instance.GetLogOutput())); + + // Verify the queue reflects one abandoned action + const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status after abandon"); + + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(QueueStatus["abandoned_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); + CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0); + + // Submitting new work should be rejected + HttpClient::Response RejectedResp = Client.Post(JobUrl, BuildRot13ActionPackage("rejected"sv)); + CHECK_MESSAGE(RejectedResp.StatusCode != HttpResponseCode::OK, "Expected action submission to be rejected in Abandoned state"); +} + +TEST_CASE("function.session.abandon_pending") +{ + // Create a session with no runners so actions stay pending + InMemoryChunkResolver Resolver; + ScopedTemporaryDirectory SessionBaseDir; + zen::compute::ComputeServiceSession Session(Resolver); + Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); + + CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); + Session.RegisterWorker(WorkerPackage); + + // Enqueue several actions — they will stay pending because there are no runners + auto QueueResult = Session.CreateQueue(); + REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create queue"); + + CbObject ActionObj = BuildRot13ActionForSession("abandon-test"sv, Resolver); + + auto Enqueue1 = Session.EnqueueActionToQueue(QueueResult.QueueId, ActionObj, 0); + auto Enqueue2 = Session.EnqueueActionToQueue(QueueResult.QueueId, ActionObj, 0); + auto Enqueue3 = Session.EnqueueActionToQueue(QueueResult.QueueId, ActionObj, 0); + REQUIRE_MESSAGE(Enqueue1, "Failed to enqueue action 1"); + REQUIRE_MESSAGE(Enqueue2, "Failed to enqueue action 2"); + REQUIRE_MESSAGE(Enqueue3, "Failed to enqueue action 3"); + + // Transition to Abandoned — should mark all pending actions as Abandoned + bool Transitioned = Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Abandoned); + CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned"); + CHECK(Session.GetSessionState() == zen::compute::ComputeServiceSession::SessionState::Abandoned); + CHECK(!Session.IsHealthy()); + + // Give the scheduler thread time to process the state changes + Sleep(2'000); + + // All three actions should now be in the results map as abandoned + for (int Lsn : {Enqueue1.Lsn, Enqueue2.Lsn, Enqueue3.Lsn}) + { + CbPackage Result; + HttpResponseCode Code = Session.GetActionResult(Lsn, Result); + CHECK_MESSAGE(Code == HttpResponseCode::OK, fmt::format("Expected action LSN {} to be in results (got {})", Lsn, int(Code))); + } + + // Queue should show 0 active, 3 abandoned + auto Status = Session.GetQueueStatus(QueueResult.QueueId); + CHECK_EQ(Status.ActiveCount, 0); + CHECK_EQ(Status.AbandonedCount, 3); + + // New actions should be rejected + auto Rejected = Session.EnqueueActionToQueue(QueueResult.QueueId, ActionObj, 0); + CHECK_MESSAGE(!Rejected, "Expected action submission to be rejected in Abandoned state"); + + // Abandoned → Sunset should be valid + CHECK(Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Sunset)); + + Session.Shutdown(); +} + +TEST_CASE("function.session.abandon_running") +{ + // Spawn a real zenserver as a remote compute node + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + REQUIRE_MESSAGE(Instance.SpawnServerAndWaitUntilReady() != 0, Instance.GetLogOutput()); + + // Start mock orchestrator advertising the server + MockOrchestratorFixture MockOrch; + MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", Instance.GetBaseUri()}})); + + // Create session infrastructure + InMemoryChunkResolver Resolver; + ScopedTemporaryDirectory SessionBaseDir; + zen::compute::ComputeServiceSession Session(Resolver); + Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint()); + Session.SetOrchestratorBasePath(SessionBaseDir.Path()); + Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); + + CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); + Session.RegisterWorker(WorkerPackage); + + // Wait for scheduler to discover the runner + Sleep(7'000); + + // Create a queue and submit a long-running Sleep action + auto QueueResult = Session.CreateQueue(); + REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create queue"); + const int QueueId = QueueResult.QueueId; + + CbObject ActionObj = BuildSleepActionForSession("data"sv, 30'000, Resolver); + + auto EnqueueRes = Session.EnqueueActionToQueue(QueueId, ActionObj, 0); + REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed"); + + // Wait for the action to start running on the remote + Sleep(2'000); + + // Transition to Abandoned — should abandon the running action + bool Transitioned = Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Abandoned); + CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned"); + CHECK(!Session.IsHealthy()); + + // Poll for the action to complete (as abandoned) + CbPackage ResultPackage; + HttpResponseCode ResultCode = HttpResponseCode::Accepted; + Stopwatch Timer; + + while (Timer.GetElapsedTimeMs() < 30'000) + { + ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage); + if (ResultCode == HttpResponseCode::OK) + { + break; + } + Sleep(200); + } + + REQUIRE_MESSAGE(ResultCode == HttpResponseCode::OK, + fmt::format("Action did not complete within timeout\nServer log:\n{}", Instance.GetLogOutput())); + + // Verify the queue shows abandoned, not completed + auto QueueStatus = Session.GetQueueStatus(QueueId); + CHECK_EQ(QueueStatus.ActiveCount, 0); + CHECK_EQ(QueueStatus.AbandonedCount, 1); + CHECK_EQ(QueueStatus.CompletedCount, 0); + + Session.Shutdown(); +} + +TEST_CASE("function.remote.abandon_propagation") +{ + // Spawn real zenserver as a remote compute node + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + REQUIRE_MESSAGE(Instance.SpawnServerAndWaitUntilReady() != 0, Instance.GetLogOutput()); + + // Start mock orchestrator advertising the server + MockOrchestratorFixture MockOrch; + MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", Instance.GetBaseUri()}})); + + // Create session infrastructure + InMemoryChunkResolver Resolver; + ScopedTemporaryDirectory SessionBaseDir; + zen::compute::ComputeServiceSession Session(Resolver); + Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint()); + Session.SetOrchestratorBasePath(SessionBaseDir.Path()); + Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready); + + // Register worker on session + CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver); + Session.RegisterWorker(WorkerPackage); + + // Wait for scheduler to discover the runner + Sleep(7'000); + + // Create a local queue and submit a long-running Sleep action + auto QueueResult = Session.CreateQueue(); + REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create local queue"); + const int QueueId = QueueResult.QueueId; + + CbObject ActionObj = BuildSleepActionForSession("data"sv, 30'000, Resolver); + + auto EnqueueRes = Session.EnqueueActionToQueue(QueueId, ActionObj, 0); + REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed"); + + // Wait for the action to start running on the remote + Sleep(2'000); + + // Transition to Abandoned — should abandon the running action and propagate + bool Transitioned = Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Abandoned); + CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned"); + + // Poll for the action to complete + CbPackage ResultPackage; + HttpResponseCode ResultCode = HttpResponseCode::Accepted; + Stopwatch Timer; + + while (Timer.GetElapsedTimeMs() < 30'000) + { + ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage); + if (ResultCode == HttpResponseCode::OK) + { + break; + } + Sleep(200); + } + + REQUIRE_MESSAGE(ResultCode == HttpResponseCode::OK, + fmt::format("Action did not complete within timeout\nServer log:\n{}", Instance.GetLogOutput())); + + // Verify the local queue shows abandoned + auto QueueStatus = Session.GetQueueStatus(QueueId); + CHECK_EQ(QueueStatus.ActiveCount, 0); + CHECK_EQ(QueueStatus.AbandonedCount, 1); + + // Session should not be healthy + CHECK(!Session.IsHealthy()); + + // The remote compute node should still be healthy (only the parent abandoned) + HttpClient RemoteClient(Instance.GetBaseUri() + "/compute"); + HttpClient::Response ReadyResp = RemoteClient.Get("/ready"sv); + CHECK_MESSAGE(ReadyResp.StatusCode == HttpResponseCode::OK, "Remote compute node should still be healthy"); + + Session.Shutdown(); +} + +TEST_SUITE_END(); + +} // namespace zen::tests::compute + +#endif diff --git a/src/zenserver-test/function-tests.cpp b/src/zenserver-test/function-tests.cpp deleted file mode 100644 index 82848c6ad..000000000 --- a/src/zenserver-test/function-tests.cpp +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include - -#if ZEN_WITH_TESTS - -# include -# include -# include -# include -# include - -# include "zenserver-test.h" - -namespace zen::tests { - -using namespace std::literals; - -TEST_SUITE_BEGIN("server.function"); - -TEST_CASE("function.run") -{ - std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - - ZenServerInstance Instance(TestEnv); - Instance.SetDataDir(TestDir); - Instance.SpawnServer(13337); - - ZEN_INFO("Waiting..."); - - Instance.WaitUntilReady(); -} - -TEST_SUITE_END(); - -} // namespace zen::tests - -#endif diff --git a/src/zenserver-test/logging-tests.cpp b/src/zenserver-test/logging-tests.cpp new file mode 100644 index 000000000..fe39e14c0 --- /dev/null +++ b/src/zenserver-test/logging-tests.cpp @@ -0,0 +1,257 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include + +#if ZEN_WITH_TESTS + +# include "zenserver-test.h" + +# include +# include +# include +# include + +namespace zen::tests { + +using namespace std::literals; + +////////////////////////////////////////////////////////////////////////// + +static bool +LogContains(const std::string& Log, std::string_view Needle) +{ + return Log.find(Needle) != std::string::npos; +} + +static std::string +ReadFileToString(const std::filesystem::path& Path) +{ + FileContents Contents = ReadFile(Path); + if (Contents.ErrorCode) + { + return {}; + } + + IoBuffer Content = Contents.Flatten(); + if (!Content) + { + return {}; + } + + return std::string(static_cast(Content.Data()), Content.Size()); +} + +////////////////////////////////////////////////////////////////////////// + +// Verify that a log file is created at the default location (DataDir/logs/zenserver.log) +// even without --abslog. The file must contain "server session id" (logged at INFO +// to all registered loggers during init) and "log starting at" (emitted once a file +// sink is first opened). +TEST_CASE("logging.file.default") +{ + const std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + + ZenServerInstance Instance(TestEnv); + Instance.SetDataDir(TestDir); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); + CHECK_MESSAGE(Port != 0, Instance.GetLogOutput()); + + Instance.Shutdown(); + + const std::filesystem::path DefaultLogFile = TestDir / "logs" / "zenserver.log"; + CHECK_MESSAGE(std::filesystem::exists(DefaultLogFile), "Default log file was not created"); + const std::string FileLog = ReadFileToString(DefaultLogFile); + CHECK_MESSAGE(LogContains(FileLog, "server session id"), FileLog); + CHECK_MESSAGE(LogContains(FileLog, "log starting at"), FileLog); +} + +// --quiet sets the console sink level to WARN. The formatted "[info] ..." +// entry written by the default logger's console sink must therefore not appear +// in captured stdout. (The "console" named logger — used by ZEN_CONSOLE_* +// macros — may still emit plain-text messages without a level marker, so we +// check for the absence of the full_formatter "[info]" prefix rather than the +// message text itself.) +TEST_CASE("logging.console.quiet") +{ + ZenServerInstance Instance(TestEnv); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady("--quiet"); + CHECK_MESSAGE(Port != 0, Instance.GetLogOutput()); + + Instance.Shutdown(); + + const std::string Log = Instance.GetLogOutput(); + CHECK_MESSAGE(!LogContains(Log, "[info] server session id"), Log); +} + +// --noconsole removes the stdout sink entirely, so the captured console output +// must not contain any log entries from the logging system. +TEST_CASE("logging.console.disabled") +{ + ZenServerInstance Instance(TestEnv); + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady("--noconsole"); + CHECK_MESSAGE(Port != 0, Instance.GetLogOutput()); + + Instance.Shutdown(); + + const std::string Log = Instance.GetLogOutput(); + CHECK_MESSAGE(!LogContains(Log, "server session id"), Log); +} + +// --abslog creates a rotating log file at the specified path. +// The file must contain "server session id" (logged at INFO to all loggers +// during init) and "log starting at" (emitted once a file sink is active). +TEST_CASE("logging.file.basic") +{ + const std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + const std::filesystem::path LogFile = TestDir / "test.log"; + + ZenServerInstance Instance(TestEnv); + Instance.SetDataDir(TestDir); + + const std::string LogArg = fmt::format("--abslog {}", LogFile.string()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(LogArg); + CHECK_MESSAGE(Port != 0, Instance.GetLogOutput()); + + Instance.Shutdown(); + + CHECK_MESSAGE(std::filesystem::exists(LogFile), "Log file was not created"); + const std::string FileLog = ReadFileToString(LogFile); + CHECK_MESSAGE(LogContains(FileLog, "server session id"), FileLog); + CHECK_MESSAGE(LogContains(FileLog, "log starting at"), FileLog); +} + +// --abslog with a .json extension selects the JSON formatter. +// Each log entry must be a JSON object containing at least the "message" +// and "source" fields. +TEST_CASE("logging.file.json") +{ + const std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + const std::filesystem::path LogFile = TestDir / "test.json"; + + ZenServerInstance Instance(TestEnv); + Instance.SetDataDir(TestDir); + + const std::string LogArg = fmt::format("--abslog {}", LogFile.string()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(LogArg); + CHECK_MESSAGE(Port != 0, Instance.GetLogOutput()); + + Instance.Shutdown(); + + CHECK_MESSAGE(std::filesystem::exists(LogFile), "JSON log file was not created"); + const std::string FileLog = ReadFileToString(LogFile); + CHECK_MESSAGE(LogContains(FileLog, "\"message\""), FileLog); + CHECK_MESSAGE(LogContains(FileLog, "\"source\": \"zenserver\""), FileLog); + CHECK_MESSAGE(LogContains(FileLog, "server session id"), FileLog); +} + +// --log-id is automatically set to the server instance name in test mode. +// The JSON formatter emits this value as the "id" field, so every entry in a +// .json log file must carry a non-empty "id". +TEST_CASE("logging.log_id") +{ + const std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + const std::filesystem::path LogFile = TestDir / "test.json"; + + ZenServerInstance Instance(TestEnv); + Instance.SetDataDir(TestDir); + + const std::string LogArg = fmt::format("--abslog {}", LogFile.string()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(LogArg); + CHECK_MESSAGE(Port != 0, Instance.GetLogOutput()); + + Instance.Shutdown(); + + CHECK_MESSAGE(std::filesystem::exists(LogFile), "JSON log file was not created"); + const std::string FileLog = ReadFileToString(LogFile); + // The JSON formatter writes the log-id as: "id": "", + CHECK_MESSAGE(LogContains(FileLog, "\"id\": \""), FileLog); +} + +// --log-warn raises the level threshold above INFO so that INFO messages +// are filtered. "server session id" is broadcast at INFO to all loggers: it must +// appear in the main file sink (default logger unaffected) but must NOT appear in +// http.log where the http_requests logger now has a WARN threshold. +TEST_CASE("logging.level.warn_suppresses_info") +{ + const std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + const std::filesystem::path LogFile = TestDir / "test.log"; + + ZenServerInstance Instance(TestEnv); + Instance.SetDataDir(TestDir); + + const std::string LogArg = fmt::format("--abslog {} --log-warn http_requests", LogFile.string()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(LogArg); + CHECK_MESSAGE(Port != 0, Instance.GetLogOutput()); + + Instance.Shutdown(); + + CHECK_MESSAGE(std::filesystem::exists(LogFile), "Log file was not created"); + const std::string FileLog = ReadFileToString(LogFile); + CHECK_MESSAGE(LogContains(FileLog, "server session id"), FileLog); + + const std::filesystem::path HttpLogFile = TestDir / "logs" / "http.log"; + CHECK_MESSAGE(std::filesystem::exists(HttpLogFile), "http.log was not created"); + const std::string HttpLog = ReadFileToString(HttpLogFile); + CHECK_MESSAGE(!LogContains(HttpLog, "server session id"), HttpLog); +} + +// --log-info sets an explicit INFO threshold. The INFO "server session id" +// broadcast must still land in http.log, confirming that INFO messages are not +// filtered when the logger level is exactly INFO. +TEST_CASE("logging.level.info_allows_info") +{ + const std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + const std::filesystem::path LogFile = TestDir / "test.log"; + + ZenServerInstance Instance(TestEnv); + Instance.SetDataDir(TestDir); + + const std::string LogArg = fmt::format("--abslog {} --log-info http_requests", LogFile.string()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(LogArg); + CHECK_MESSAGE(Port != 0, Instance.GetLogOutput()); + + Instance.Shutdown(); + + const std::filesystem::path HttpLogFile = TestDir / "logs" / "http.log"; + CHECK_MESSAGE(std::filesystem::exists(HttpLogFile), "http.log was not created"); + const std::string HttpLog = ReadFileToString(HttpLogFile); + CHECK_MESSAGE(LogContains(HttpLog, "server session id"), HttpLog); +} + +// --log-off silences a named logger entirely. +// "server session id" is broadcast at INFO to all registered loggers via +// spdlog::apply_all during init. When the "http_requests" logger is set to +// OFF its dedicated http.log file must not contain that message. +// The main file sink (via --abslog) must be unaffected. +TEST_CASE("logging.level.off_specific_logger") +{ + const std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + const std::filesystem::path LogFile = TestDir / "test.log"; + + ZenServerInstance Instance(TestEnv); + Instance.SetDataDir(TestDir); + + const std::string LogArg = fmt::format("--abslog {} --log-off http_requests", LogFile.string()); + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(LogArg); + CHECK_MESSAGE(Port != 0, Instance.GetLogOutput()); + + Instance.Shutdown(); + + // Main log file must still have the startup message + CHECK_MESSAGE(std::filesystem::exists(LogFile), "Log file was not created"); + const std::string FileLog = ReadFileToString(LogFile); + CHECK_MESSAGE(LogContains(FileLog, "server session id"), FileLog); + + // http.log is created by the RotatingFileSink but the logger is OFF, so + // the broadcast "server session id" message must not have been written to it + const std::filesystem::path HttpLogFile = TestDir / "logs" / "http.log"; + CHECK_MESSAGE(std::filesystem::exists(HttpLogFile), "http.log was not created"); + const std::string HttpLog = ReadFileToString(HttpLogFile); + CHECK_MESSAGE(!LogContains(HttpLog, "server session id"), HttpLog); +} + +} // namespace zen::tests + +#endif diff --git a/src/zenserver-test/nomad-tests.cpp b/src/zenserver-test/nomad-tests.cpp new file mode 100644 index 000000000..6eb99bc3a --- /dev/null +++ b/src/zenserver-test/nomad-tests.cpp @@ -0,0 +1,126 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#if ZEN_WITH_TESTS && ZEN_WITH_NOMAD +# include "zenserver-test.h" +# include +# include +# include +# include +# include +# include +# include +# include + +# include + +namespace zen::tests::nomad_tests { + +using namespace std::literals; + +TEST_CASE("nomad.client.lifecycle" * doctest::skip()) +{ + zen::nomad::NomadProcess NomadProc; + NomadProc.SpawnNomadAgent(); + + zen::nomad::NomadTestClient Client("http://localhost:4646/"); + + // Submit a simple batch job that sleeps briefly +# if ZEN_PLATFORM_WINDOWS + auto Job = Client.SubmitJob("zen-test-job", "cmd.exe", {"/C", "timeout /t 10 /nobreak"}); +# else + auto Job = Client.SubmitJob("zen-test-job", "/bin/sleep", {"10"}); +# endif + REQUIRE(!Job.Id.empty()); + CHECK_EQ(Job.Status, "pending"); + + // Poll until the job is running (or dead) + { + Stopwatch Timer; + bool FoundRunning = false; + while (Timer.GetElapsedTimeMs() < 15000) + { + auto Status = Client.GetJobStatus("zen-test-job"); + if (Status.Status == "running") + { + FoundRunning = true; + break; + } + if (Status.Status == "dead") + { + break; + } + Sleep(500); + } + CHECK(FoundRunning); + } + + // Verify allocations exist + auto Allocs = Client.GetAllocations("zen-test-job"); + CHECK(!Allocs.empty()); + + // Stop the job + Client.StopJob("zen-test-job"); + + // Verify it reaches dead state + { + Stopwatch Timer; + bool FoundDead = false; + while (Timer.GetElapsedTimeMs() < 10000) + { + auto Status = Client.GetJobStatus("zen-test-job"); + if (Status.Status == "dead") + { + FoundDead = true; + break; + } + Sleep(500); + } + CHECK(FoundDead); + } + + NomadProc.StopNomadAgent(); +} + +TEST_CASE("nomad.provisioner.integration" * doctest::skip()) +{ + zen::nomad::NomadProcess NomadProc; + NomadProc.SpawnNomadAgent(); + + // Spawn zenserver in compute mode with Nomad provisioning enabled + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); + + Instance.SetDataDir(TestEnv.CreateNewTestDir()); + + std::filesystem::path ZenServerPath = TestEnv.ProgramBaseDir() / "zenserver" ZEN_EXE_SUFFIX_LITERAL; + + std::string NomadArgs = fmt::format( + "--nomad-enabled=true" + " --nomad-server=http://localhost:4646" + " --nomad-driver=raw_exec" + " --nomad-binary-path={}" + " --nomad-max-cores=32" + " --nomad-cores-per-job=32", + ZenServerPath.string()); + + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(NomadArgs); + REQUIRE(Port != 0); + + // Give the provisioner time to submit jobs. + // The management thread has a 5s wait between cycles, and the HTTP client has + // a 10s connect timeout, so we need to allow enough time for at least one full cycle. + Sleep(15000); + + // Verify jobs were submitted to Nomad + zen::nomad::NomadTestClient NomadClient("http://localhost:4646/"); + + auto Jobs = NomadClient.ListJobs("zenserver-worker"); + + ZEN_INFO("nomad.provisioner.integration: found {} jobs with prefix 'zenserver-worker'", Jobs.size()); + CHECK_MESSAGE(!Jobs.empty(), Instance.GetLogOutput()); + + Instance.Shutdown(); + NomadProc.StopNomadAgent(); +} + +} // namespace zen::tests::nomad_tests +#endif diff --git a/src/zenserver-test/xmake.lua b/src/zenserver-test/xmake.lua index 2a269cea1..7b208bbc7 100644 --- a/src/zenserver-test/xmake.lua +++ b/src/zenserver-test/xmake.lua @@ -6,10 +6,15 @@ target("zenserver-test") add_headerfiles("**.h") add_files("*.cpp") add_files("zenserver-test.cpp", {unity_ignored = true }) - add_deps("zencore", "zenremotestore", "zenhttp") + add_deps("zencore", "zenremotestore", "zenhttp", "zencompute", "zenstore") add_deps("zenserver", {inherit=false}) + add_deps("zentest-appstub", {inherit=false}) add_packages("http_parser") + if has_config("zennomad") then + add_deps("zennomad") + end + if is_plat("macosx") then add_ldflags("-framework CoreFoundation") add_ldflags("-framework Security") -- cgit v1.2.3 From d8940b27c8a5c070c3b48ca9e575929df8d1d888 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Thu, 5 Mar 2026 00:08:19 +0100 Subject: added TEST_SUITE_BEGIN/END around some TEST_CASEs which didn't have them (#809) * added TEST_SUITE_BEGIN/END around some TEST_CASEs which didn't have them * fixed some stats issues * ScopedSpan should Initialize * annotated classes in stats.h with some documentation comments --- src/zenserver-test/logging-tests.cpp | 4 ++++ src/zenserver-test/nomad-tests.cpp | 4 ++++ 2 files changed, 8 insertions(+) (limited to 'src/zenserver-test') diff --git a/src/zenserver-test/logging-tests.cpp b/src/zenserver-test/logging-tests.cpp index fe39e14c0..f284f0371 100644 --- a/src/zenserver-test/logging-tests.cpp +++ b/src/zenserver-test/logging-tests.cpp @@ -15,6 +15,8 @@ namespace zen::tests { using namespace std::literals; +TEST_SUITE_BEGIN("server.logging"); + ////////////////////////////////////////////////////////////////////////// static bool @@ -252,6 +254,8 @@ TEST_CASE("logging.level.off_specific_logger") CHECK_MESSAGE(!LogContains(HttpLog, "server session id"), HttpLog); } +TEST_SUITE_END(); + } // namespace zen::tests #endif diff --git a/src/zenserver-test/nomad-tests.cpp b/src/zenserver-test/nomad-tests.cpp index 6eb99bc3a..f8f5a9a30 100644 --- a/src/zenserver-test/nomad-tests.cpp +++ b/src/zenserver-test/nomad-tests.cpp @@ -17,6 +17,8 @@ namespace zen::tests::nomad_tests { using namespace std::literals; +TEST_SUITE_BEGIN("server.nomad"); + TEST_CASE("nomad.client.lifecycle" * doctest::skip()) { zen::nomad::NomadProcess NomadProc; @@ -122,5 +124,7 @@ TEST_CASE("nomad.provisioner.integration" * doctest::skip()) NomadProc.StopNomadAgent(); } +TEST_SUITE_END(); + } // namespace zen::tests::nomad_tests #endif -- cgit v1.2.3 From 19a117889c2db6b817af9458c04c04f324162e75 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Mon, 9 Mar 2026 10:50:47 +0100 Subject: Eliminate spdlog dependency (#773) Removes the vendored spdlog library (~12,000 lines) and replaces it with a purpose-built logging system in zencore (~1,800 lines). The new implementation provides the same functionality with fewer abstractions, no shared_ptr overhead, and full control over the logging pipeline. ### What changed **New logging core in zencore/logging/:** - LogMessage, Formatter, Sink, Logger, Registry - core abstractions matching spdlog's model but simplified - AnsiColorStdoutSink - ANSI color console output (replaces spdlog stdout_color_sink) - MsvcSink - OutputDebugString on Windows (replaces spdlog msvc_sink) - AsyncSink - async logging via BlockingQueue worker thread (replaces spdlog async_logger) - NullSink, MessageOnlyFormatter - utility types - Thread-safe timestamp caching in formatters using RwLock **Moved to zenutil/logging/:** - FullFormatter - full log formatting with timestamp, logger name, level, source location, multiline alignment - JsonFormatter - structured JSON log output - RotatingFileSink - rotating file sink with atomic size tracking **API changes:** - Log levels are now an enum (LogLevel) instead of int, eliminating the zen::logging::level namespace - LoggerRef no longer wraps shared_ptr - it holds a raw pointer with the registry owning lifetime - Logger error handler is wired through Registry and propagated to all loggers on registration - Logger::Log() now populates ThreadId on every message **Cleanup:** - Deleted thirdparty/spdlog/ entirely (110+ files) - Deleted full_test_formatter (was ~80% duplicate of FullFormatter) - Renamed snake_case classes to PascalCase (full_formatter -> FullFormatter, json_formatter -> JsonFormatter, sentry_sink -> SentrySink) - Removed spdlog from xmake dependency graph ### Build / test impact - zencore no longer depends on spdlog - zenutil and zenvfs xmake.lua updated to drop spdlog dep - zentelemetry xmake.lua updated to drop spdlog dep - All existing tests pass, no test changes required beyond formatter class renames --- src/zenserver-test/logging-tests.cpp | 2 +- src/zenserver-test/zenserver-test.cpp | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) (limited to 'src/zenserver-test') diff --git a/src/zenserver-test/logging-tests.cpp b/src/zenserver-test/logging-tests.cpp index f284f0371..2e530ff92 100644 --- a/src/zenserver-test/logging-tests.cpp +++ b/src/zenserver-test/logging-tests.cpp @@ -71,7 +71,7 @@ TEST_CASE("logging.file.default") // entry written by the default logger's console sink must therefore not appear // in captured stdout. (The "console" named logger — used by ZEN_CONSOLE_* // macros — may still emit plain-text messages without a level marker, so we -// check for the absence of the full_formatter "[info]" prefix rather than the +// check for the absence of the FullFormatter "[info]" prefix rather than the // message text itself.) TEST_CASE("logging.console.quiet") { diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index bd36d731f..8d5400294 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -9,6 +9,7 @@ # include # include # include +# include # include # include # include @@ -17,7 +18,7 @@ # include # include # include -# include +# include # include # include @@ -85,8 +86,9 @@ main(int argc, char** argv) zen::logging::InitializeLogging(); - zen::logging::SetLogLevel(zen::logging::level::Debug); - spdlog::set_formatter(std::make_unique("test", std::chrono::system_clock::now())); + zen::logging::SetLogLevel(zen::logging::Debug); + zen::logging::Registry::Instance().SetFormatter( + std::make_unique("test", std::chrono::system_clock::now())); std::filesystem::path ProgramBaseDir = GetRunningExecutablePath().parent_path(); std::filesystem::path TestBaseDir = std::filesystem::current_path() / ".test"; -- cgit v1.2.3