aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver-test
diff options
context:
space:
mode:
authorLiam Mitchell <[email protected]>2026-03-09 19:06:36 -0700
committerLiam Mitchell <[email protected]>2026-03-09 19:06:36 -0700
commitd1abc50ee9d4fb72efc646e17decafea741caa34 (patch)
treee4288e00f2f7ca0391b83d986efcb69d3ba66a83 /src/zenserver-test
parentAllow requests with invalid content-types unless specified in command line or... (diff)
parentupdated chunk–block analyser (#818) (diff)
downloadzen-d1abc50ee9d4fb72efc646e17decafea741caa34.tar.xz
zen-d1abc50ee9d4fb72efc646e17decafea741caa34.zip
Merge branch 'main' into lm/restrict-content-type
Diffstat (limited to 'src/zenserver-test')
-rw-r--r--src/zenserver-test/buildstore-tests.cpp220
-rw-r--r--src/zenserver-test/cache-tests.cpp14
-rw-r--r--src/zenserver-test/cacherequests.cpp4
-rw-r--r--src/zenserver-test/compute-tests.cpp1700
-rw-r--r--src/zenserver-test/hub-tests.cpp8
-rw-r--r--src/zenserver-test/logging-tests.cpp261
-rw-r--r--src/zenserver-test/nomad-tests.cpp130
-rw-r--r--src/zenserver-test/objectstore-tests.cpp74
-rw-r--r--src/zenserver-test/projectstore-tests.cpp38
-rw-r--r--src/zenserver-test/workspace-tests.cpp8
-rw-r--r--src/zenserver-test/xmake.lua7
-rw-r--r--src/zenserver-test/zenserver-test.cpp26
12 files changed, 2444 insertions, 46 deletions
diff --git a/src/zenserver-test/buildstore-tests.cpp b/src/zenserver-test/buildstore-tests.cpp
index 02b308485..cf9b10896 100644
--- a/src/zenserver-test/buildstore-tests.cpp
+++ b/src/zenserver-test/buildstore-tests.cpp
@@ -27,6 +27,8 @@ namespace zen::tests {
using namespace std::literals;
+TEST_SUITE_BEGIN("server.buildstore");
+
TEST_CASE("buildstore.blobs")
{
std::filesystem::path SystemRootPath = TestEnv.CreateNewTestDir();
@@ -36,7 +38,8 @@ TEST_CASE("buildstore.blobs")
std::string_view Bucket = "bkt"sv;
Oid BuildId = Oid::NewOid();
- std::vector<IoHash> CompressedBlobsHashes;
+ std::vector<IoHash> CompressedBlobsHashes;
+ std::vector<uint64_t> CompressedBlobsSizes;
{
ZenServerInstance Instance(TestEnv);
@@ -51,6 +54,7 @@ TEST_CASE("buildstore.blobs")
IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7);
CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob)));
CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash());
+ CompressedBlobsSizes.push_back(CompressedBlob.GetCompressedSize());
IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer();
Payload.SetContentType(ZenContentType::kCompressedBinary);
@@ -107,6 +111,7 @@ TEST_CASE("buildstore.blobs")
IoBuffer Blob = CreateSemiRandomBlob(5713 + I * 7);
CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob)));
CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash());
+ CompressedBlobsSizes.push_back(CompressedBlob.GetCompressedSize());
IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer();
Payload.SetContentType(ZenContentType::kCompressedBinary);
@@ -141,6 +146,201 @@ TEST_CASE("buildstore.blobs")
CHECK(IoHash::HashBuffer(Decompressed) == RawHash);
}
}
+
+ {
+ // Single-range Get
+
+ ZenServerInstance Instance(TestEnv);
+
+ const uint16_t PortNumber =
+ Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath));
+ CHECK(PortNumber != 0);
+
+ HttpClient Client(Instance.GetBaseUri() + "/builds/");
+
+ {
+ const IoHash& RawHash = CompressedBlobsHashes.front();
+ uint64_t BlobSize = CompressedBlobsSizes.front();
+
+ std::vector<std::pair<uint64_t, uint64_t>> Ranges = {{BlobSize / 16 * 1, BlobSize / 2}};
+
+ uint64_t RangeSizeSum = Ranges.front().second;
+
+ HttpClient::KeyValueMap Headers;
+
+ Headers.Entries.insert(
+ {"Range", fmt::format("bytes={}-{}", Ranges.front().first, Ranges.front().first + Ranges.front().second - 1)});
+
+ HttpClient::Response Result = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), Headers);
+ REQUIRE(Result);
+ IoBuffer Payload = Result.ResponsePayload;
+ CHECK_EQ(RangeSizeSum, Payload.GetSize());
+
+ HttpClient::Response FullBlobResult = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash),
+ HttpClient::Accept(ZenContentType::kCompressedBinary));
+ REQUIRE(FullBlobResult);
+ MemoryView ActualRange = FullBlobResult.ResponsePayload.GetView().Mid(Ranges.front().first, Ranges.front().second);
+ MemoryView RangeView = Payload.GetView();
+ CHECK(ActualRange.EqualBytes(RangeView));
+ }
+ }
+
+ {
+ // Single-range Post
+
+ ZenServerInstance Instance(TestEnv);
+
+ const uint16_t PortNumber =
+ Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath));
+ CHECK(PortNumber != 0);
+
+ HttpClient Client(Instance.GetBaseUri() + "/builds/");
+
+ {
+ uint64_t RangeSizeSum = 0;
+
+ const IoHash& RawHash = CompressedBlobsHashes.front();
+ uint64_t BlobSize = CompressedBlobsSizes.front();
+
+ std::vector<std::pair<uint64_t, uint64_t>> Ranges = {{BlobSize / 16 * 1, BlobSize / 2}};
+
+ CbObjectWriter Writer;
+ Writer.BeginArray("ranges"sv);
+ {
+ for (const std::pair<uint64_t, uint64_t>& Range : Ranges)
+ {
+ Writer.BeginObject();
+ {
+ Writer.AddInteger("offset"sv, Range.first);
+ Writer.AddInteger("length"sv, Range.second);
+ RangeSizeSum += Range.second;
+ }
+ Writer.EndObject();
+ }
+ }
+ Writer.EndArray(); // ranges
+
+ HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash),
+ Writer.Save(),
+ HttpClient::Accept(ZenContentType::kCbPackage));
+ REQUIRE(Result);
+ IoBuffer Payload = Result.ResponsePayload;
+ REQUIRE(Payload.GetContentType() == ZenContentType::kCbPackage);
+
+ CbPackage ResponsePackage = ParsePackageMessage(Payload);
+ CbObjectView ResponseObject = ResponsePackage.GetObject();
+
+ CbArrayView RangeArray = ResponseObject["ranges"sv].AsArrayView();
+ CHECK_EQ(RangeArray.Num(), Ranges.size());
+ size_t RangeOffset = 0;
+ for (CbFieldView View : RangeArray)
+ {
+ CbObjectView Range = View.AsObjectView();
+ CHECK_EQ(Range["offset"sv].AsUInt64(), Ranges[RangeOffset].first);
+ CHECK_EQ(Range["length"sv].AsUInt64(), Ranges[RangeOffset].second);
+ RangeOffset++;
+ }
+
+ const CbAttachment* DataAttachment = ResponsePackage.FindAttachment(RawHash);
+ REQUIRE(DataAttachment);
+ SharedBuffer PayloadRanges = DataAttachment->AsBinary();
+ CHECK_EQ(RangeSizeSum, PayloadRanges.GetSize());
+
+ HttpClient::Response FullBlobResult = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash),
+ HttpClient::Accept(ZenContentType::kCompressedBinary));
+ REQUIRE(FullBlobResult);
+
+ uint64_t Offset = 0;
+ for (const std::pair<uint64_t, uint64_t>& Range : Ranges)
+ {
+ MemoryView ActualRange = FullBlobResult.ResponsePayload.GetView().Mid(Range.first, Range.second);
+ MemoryView RangeView = PayloadRanges.GetView().Mid(Offset, Range.second);
+ CHECK(ActualRange.EqualBytes(RangeView));
+ Offset += Range.second;
+ }
+ }
+ }
+
+ {
+ // Multi-range
+
+ ZenServerInstance Instance(TestEnv);
+
+ const uint16_t PortNumber =
+ Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath));
+ CHECK(PortNumber != 0);
+
+ HttpClient Client(Instance.GetBaseUri() + "/builds/");
+
+ {
+ uint64_t RangeSizeSum = 0;
+
+ const IoHash& RawHash = CompressedBlobsHashes.front();
+ uint64_t BlobSize = CompressedBlobsSizes.front();
+
+ std::vector<std::pair<uint64_t, uint64_t>> Ranges = {
+ {BlobSize / 16 * 1, BlobSize / 20},
+ {BlobSize / 16 * 3, BlobSize / 32},
+ {BlobSize / 16 * 5, BlobSize / 16},
+ {BlobSize - BlobSize / 16, BlobSize / 16 - 1},
+ };
+
+ CbObjectWriter Writer;
+ Writer.BeginArray("ranges"sv);
+ {
+ for (const std::pair<uint64_t, uint64_t>& Range : Ranges)
+ {
+ Writer.BeginObject();
+ {
+ Writer.AddInteger("offset"sv, Range.first);
+ Writer.AddInteger("length"sv, Range.second);
+ RangeSizeSum += Range.second;
+ }
+ Writer.EndObject();
+ }
+ }
+ Writer.EndArray(); // ranges
+
+ HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash),
+ Writer.Save(),
+ HttpClient::Accept(ZenContentType::kCbPackage));
+ REQUIRE(Result);
+ IoBuffer Payload = Result.ResponsePayload;
+ REQUIRE(Payload.GetContentType() == ZenContentType::kCbPackage);
+
+ CbPackage ResponsePackage = ParsePackageMessage(Payload);
+ CbObjectView ResponseObject = ResponsePackage.GetObject();
+
+ CbArrayView RangeArray = ResponseObject["ranges"sv].AsArrayView();
+ CHECK_EQ(RangeArray.Num(), Ranges.size());
+ size_t RangeOffset = 0;
+ for (CbFieldView View : RangeArray)
+ {
+ CbObjectView Range = View.AsObjectView();
+ CHECK_EQ(Range["offset"sv].AsUInt64(), Ranges[RangeOffset].first);
+ CHECK_EQ(Range["length"sv].AsUInt64(), Ranges[RangeOffset].second);
+ RangeOffset++;
+ }
+
+ const CbAttachment* DataAttachment = ResponsePackage.FindAttachment(RawHash);
+ REQUIRE(DataAttachment);
+ SharedBuffer PayloadRanges = DataAttachment->AsBinary();
+ CHECK_EQ(RangeSizeSum, PayloadRanges.GetSize());
+
+ HttpClient::Response FullBlobResult = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash),
+ HttpClient::Accept(ZenContentType::kCompressedBinary));
+ REQUIRE(FullBlobResult);
+
+ uint64_t Offset = 0;
+ for (const std::pair<uint64_t, uint64_t>& Range : Ranges)
+ {
+ MemoryView ActualRange = FullBlobResult.ResponsePayload.GetView().Mid(Range.first, Range.second);
+ MemoryView RangeView = PayloadRanges.GetView().Mid(Offset, Range.second);
+ CHECK(ActualRange.EqualBytes(RangeView));
+ Offset += Range.second;
+ }
+ }
+ }
}
namespace {
@@ -191,7 +391,7 @@ TEST_CASE("buildstore.metadata")
HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/getBlobMetadata", Namespace, Bucket, BuildId),
Payload,
HttpClient::Accept(ZenContentType::kCbObject));
- CHECK(Result);
+ REQUIRE(Result);
std::vector<CbObject> ResultMetadatas;
@@ -372,7 +572,7 @@ TEST_CASE("buildstore.cache")
{
std::vector<BuildStorageCache::BlobExistsResult> Exists = Cache->BlobsExists(BuildId, BlobHashes);
- CHECK(Exists.size() == BlobHashes.size());
+ REQUIRE(Exists.size() == BlobHashes.size());
for (size_t I = 0; I < BlobCount; I++)
{
CHECK(Exists[I].HasBody);
@@ -411,7 +611,7 @@ TEST_CASE("buildstore.cache")
{
std::vector<BuildStorageCache::BlobExistsResult> Exists = Cache->BlobsExists(BuildId, BlobHashes);
- CHECK(Exists.size() == BlobHashes.size());
+ REQUIRE(Exists.size() == BlobHashes.size());
for (size_t I = 0; I < BlobCount; I++)
{
CHECK(Exists[I].HasBody);
@@ -419,7 +619,7 @@ TEST_CASE("buildstore.cache")
}
std::vector<CbObject> FetchedMetadatas = Cache->GetBlobMetadatas(BuildId, BlobHashes);
- CHECK_EQ(BlobCount, FetchedMetadatas.size());
+ REQUIRE_EQ(BlobCount, FetchedMetadatas.size());
for (size_t I = 0; I < BlobCount; I++)
{
@@ -440,7 +640,7 @@ TEST_CASE("buildstore.cache")
{
std::vector<BuildStorageCache::BlobExistsResult> Exists = Cache->BlobsExists(BuildId, BlobHashes);
- CHECK(Exists.size() == BlobHashes.size());
+ REQUIRE(Exists.size() == BlobHashes.size());
for (size_t I = 0; I < BlobCount * 2; I++)
{
CHECK(Exists[I].HasBody);
@@ -451,7 +651,7 @@ TEST_CASE("buildstore.cache")
CHECK_EQ(BlobCount, MetaDatas.size());
std::vector<CbObject> FetchedMetadatas = Cache->GetBlobMetadatas(BuildId, BlobHashes);
- CHECK_EQ(BlobCount, FetchedMetadatas.size());
+ REQUIRE_EQ(BlobCount, FetchedMetadatas.size());
for (size_t I = 0; I < BlobCount; I++)
{
@@ -474,7 +674,7 @@ TEST_CASE("buildstore.cache")
CreateZenBuildStorageCache(Client, Stats, Namespace, Bucket, TempDir, GetTinyWorkerPool(EWorkloadType::Background)));
std::vector<BuildStorageCache::BlobExistsResult> Exists = Cache->BlobsExists(BuildId, BlobHashes);
- CHECK(Exists.size() == BlobHashes.size());
+ REQUIRE(Exists.size() == BlobHashes.size());
for (size_t I = 0; I < BlobCount * 2; I++)
{
CHECK(Exists[I].HasBody);
@@ -493,7 +693,7 @@ TEST_CASE("buildstore.cache")
CHECK_EQ(BlobCount, MetaDatas.size());
std::vector<CbObject> FetchedMetadatas = Cache->GetBlobMetadatas(BuildId, BlobHashes);
- CHECK_EQ(BlobCount, FetchedMetadatas.size());
+ REQUIRE_EQ(BlobCount, FetchedMetadatas.size());
for (size_t I = 0; I < BlobCount; I++)
{
@@ -502,5 +702,7 @@ TEST_CASE("buildstore.cache")
}
}
+TEST_SUITE_END();
+
} // namespace zen::tests
#endif
diff --git a/src/zenserver-test/cache-tests.cpp b/src/zenserver-test/cache-tests.cpp
index 0272d3797..334dd04ab 100644
--- a/src/zenserver-test/cache-tests.cpp
+++ b/src/zenserver-test/cache-tests.cpp
@@ -23,6 +23,8 @@
namespace zen::tests {
+TEST_SUITE_BEGIN("server.cache");
+
TEST_CASE("zcache.basic")
{
using namespace std::literals;
@@ -145,7 +147,7 @@ TEST_CASE("zcache.cbpackage")
for (const zen::CbAttachment& LhsAttachment : LhsAttachments)
{
const zen::CbAttachment* RhsAttachment = Rhs.FindAttachment(LhsAttachment.GetHash());
- CHECK(RhsAttachment);
+ REQUIRE(RhsAttachment);
zen::SharedBuffer LhsBuffer = LhsAttachment.AsCompressedBinary().Decompress();
CHECK(!LhsBuffer.IsNull());
@@ -1373,14 +1375,8 @@ TEST_CASE("zcache.rpc")
}
}
-TEST_CASE("zcache.failing.upstream")
+TEST_CASE("zcache.failing.upstream" * doctest::skip())
{
- // This is an exploratory test that takes a long time to run, so lets skip it by default
- if (true)
- {
- return;
- }
-
using namespace std::literals;
using namespace utils;
@@ -2669,6 +2665,8 @@ TEST_CASE("zcache.batchoperations")
}
}
+TEST_SUITE_END();
+
} // namespace zen::tests
#endif
diff --git a/src/zenserver-test/cacherequests.cpp b/src/zenserver-test/cacherequests.cpp
index 46339aebb..f5302a359 100644
--- a/src/zenserver-test/cacherequests.cpp
+++ b/src/zenserver-test/cacherequests.cpp
@@ -1037,6 +1037,8 @@ namespace zen { namespace cacherequests {
static CompressedBuffer MakeCompressedBuffer(size_t Size) { return CompressedBuffer::Compress(SharedBuffer(IoBuffer(Size))); };
+ TEST_SUITE_BEGIN("server.cacherequests");
+
TEST_CASE("cacherequests.put.cache.records")
{
PutCacheRecordsRequest EmptyRequest;
@@ -1458,5 +1460,7 @@ namespace zen { namespace cacherequests {
"!default!",
Invalid));
}
+
+ TEST_SUITE_END();
#endif
}} // namespace zen::cacherequests
diff --git a/src/zenserver-test/compute-tests.cpp b/src/zenserver-test/compute-tests.cpp
new file mode 100644
index 000000000..c90ac5d8b
--- /dev/null
+++ b/src/zenserver-test/compute-tests.cpp
@@ -0,0 +1,1700 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zencore/zencore.h>
+
+#if ZEN_WITH_TESTS && ZEN_WITH_COMPUTE_SERVICES
+
+# include <zenbase/zenbase.h>
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/compress.h>
+# include <zencore/filesystem.h>
+# include <zencore/guid.h>
+# include <zencore/iobuffer.h>
+# include <zencore/iohash.h>
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+# include <zencore/thread.h>
+# include <zencore/timer.h>
+# include <zenhttp/httpclient.h>
+# include <zenhttp/httpserver.h>
+# include <zencompute/computeservice.h>
+# include <zenstore/zenstore.h>
+# include <zenutil/zenserverprocess.h>
+
+# include "zenserver-test.h"
+
+# include <thread>
+
+namespace zen::tests::compute {
+
+using namespace std::literals;
+
+// BuildSystemVersion and function version GUIDs matching zentest-appstub
+static constexpr std::string_view kBuildSystemVersion = "17fe280d-ccd8-4be8-a9d1-89c944a70969";
+static constexpr std::string_view kRot13Version = "13131313-1313-1313-1313-131313131313";
+static constexpr std::string_view kSleepVersion = "88888888-8888-8888-8888-888888888888";
+
+// In-memory implementation of ChunkResolver for test use.
+// Stores compressed data keyed by decompressed content hash.
+class InMemoryChunkResolver : public ChunkResolver
+{
+public:
+ IoBuffer FindChunkByCid(const IoHash& DecompressedId) override
+ {
+ auto It = m_Chunks.find(DecompressedId);
+ if (It != m_Chunks.end())
+ {
+ return It->second;
+ }
+ return {};
+ }
+
+ void AddChunk(const IoHash& DecompressedId, IoBuffer Data) { m_Chunks[DecompressedId] = std::move(Data); }
+
+private:
+ std::unordered_map<IoHash, IoBuffer> m_Chunks;
+};
+
+// Read, compress, and register zentest-appstub as a worker.
+// Returns the WorkerId (hash of the worker package object).
+static IoHash
+RegisterWorker(HttpClient& Client, ZenServerEnvironment& Env)
+{
+ std::filesystem::path AppStubPath = Env.ProgramBaseDir() / ("zentest-appstub" ZEN_EXE_SUFFIX_LITERAL);
+
+ FileContents AppStubData = zen::ReadFile(AppStubPath);
+ REQUIRE_MESSAGE(!AppStubData.ErrorCode, fmt::format("Failed to read '{}': {}", AppStubPath.string(), AppStubData.ErrorCode.message()));
+
+ IoBuffer AppStubBuffer = AppStubData.Flatten();
+
+ CompressedBuffer AppStubCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(AppStubBuffer.GetData(), AppStubBuffer.Size()),
+ OodleCompressor::Selkie,
+ OodleCompressionLevel::HyperFast4);
+
+ const IoHash AppStubRawHash = AppStubCompressed.DecodeRawHash();
+ const uint64_t AppStubRawSize = AppStubBuffer.Size();
+
+ CbAttachment AppStubAttachment(std::move(AppStubCompressed), AppStubRawHash);
+
+ CbObjectWriter WorkerWriter;
+ WorkerWriter << "buildsystem_version"sv << Guid::FromString(kBuildSystemVersion);
+ WorkerWriter << "path"sv
+ << "zentest-appstub"sv;
+
+ WorkerWriter.BeginArray("executables"sv);
+ WorkerWriter.BeginObject();
+ WorkerWriter << "name"sv
+ << "zentest-appstub"sv;
+ WorkerWriter.AddAttachment("hash"sv, AppStubAttachment);
+ WorkerWriter << "size"sv << AppStubRawSize;
+ WorkerWriter.EndObject();
+ WorkerWriter.EndArray();
+
+ WorkerWriter.BeginArray("functions"sv);
+ WorkerWriter.BeginObject();
+ WorkerWriter << "name"sv
+ << "Rot13"sv;
+ WorkerWriter << "version"sv << Guid::FromString(kRot13Version);
+ WorkerWriter.EndObject();
+ WorkerWriter.BeginObject();
+ WorkerWriter << "name"sv
+ << "Sleep"sv;
+ WorkerWriter << "version"sv << Guid::FromString(kSleepVersion);
+ WorkerWriter.EndObject();
+ WorkerWriter.EndArray();
+
+ CbPackage WorkerPackage;
+ WorkerPackage.SetObject(WorkerWriter.Save());
+ WorkerPackage.AddAttachment(AppStubAttachment);
+
+ const IoHash WorkerId = WorkerPackage.GetObjectHash();
+
+ const std::string WorkerUrl = fmt::format("/workers/{}", WorkerId.ToHexString());
+ HttpClient::Response RegisterResp = Client.Post(WorkerUrl, std::move(WorkerPackage));
+ REQUIRE_MESSAGE(RegisterResp,
+ fmt::format("Worker registration failed: status={}, body={}", int(RegisterResp.StatusCode), RegisterResp.ToText()));
+
+ return WorkerId;
+}
+
+// Build a Rot13 action CbPackage for the given input string.
+static CbPackage
+BuildRot13ActionPackage(std::string_view Input)
+{
+ CompressedBuffer InputCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Input.data(), Input.size()),
+ OodleCompressor::Selkie,
+ OodleCompressionLevel::HyperFast4);
+
+ const IoHash InputRawHash = InputCompressed.DecodeRawHash();
+ const uint64_t InputRawSize = Input.size();
+
+ CbAttachment InputAttachment(std::move(InputCompressed), InputRawHash);
+
+ CbObjectWriter ActionWriter;
+ ActionWriter << "Function"sv
+ << "Rot13"sv;
+ ActionWriter << "FunctionVersion"sv << Guid::FromString(kRot13Version);
+ ActionWriter << "BuildSystemVersion"sv << Guid::FromString(kBuildSystemVersion);
+ ActionWriter.BeginObject("Inputs"sv);
+ ActionWriter.BeginObject("Source"sv);
+ ActionWriter.AddAttachment("RawHash"sv, InputAttachment);
+ ActionWriter << "RawSize"sv << InputRawSize;
+ ActionWriter.EndObject();
+ ActionWriter.EndObject();
+
+ CbPackage ActionPackage;
+ ActionPackage.SetObject(ActionWriter.Save());
+ ActionPackage.AddAttachment(InputAttachment);
+
+ return ActionPackage;
+}
+
+// Build a Sleep action CbPackage. The worker sleeps for SleepTimeMs before returning its input.
+static CbPackage
+BuildSleepActionPackage(std::string_view Input, uint64_t SleepTimeMs)
+{
+ CompressedBuffer InputCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Input.data(), Input.size()),
+ OodleCompressor::Selkie,
+ OodleCompressionLevel::HyperFast4);
+
+ const IoHash InputRawHash = InputCompressed.DecodeRawHash();
+ const uint64_t InputRawSize = Input.size();
+
+ CbAttachment InputAttachment(std::move(InputCompressed), InputRawHash);
+
+ CbObjectWriter ActionWriter;
+ ActionWriter << "Function"sv
+ << "Sleep"sv;
+ ActionWriter << "FunctionVersion"sv << Guid::FromString(kSleepVersion);
+ ActionWriter << "BuildSystemVersion"sv << Guid::FromString(kBuildSystemVersion);
+ ActionWriter.BeginObject("Inputs"sv);
+ ActionWriter.BeginObject("Source"sv);
+ ActionWriter.AddAttachment("RawHash"sv, InputAttachment);
+ ActionWriter << "RawSize"sv << InputRawSize;
+ ActionWriter.EndObject();
+ ActionWriter.EndObject();
+ ActionWriter.BeginObject("Constants"sv);
+ ActionWriter << "SleepTimeMs"sv << SleepTimeMs;
+ ActionWriter.EndObject();
+
+ CbPackage ActionPackage;
+ ActionPackage.SetObject(ActionWriter.Save());
+ ActionPackage.AddAttachment(InputAttachment);
+
+ return ActionPackage;
+}
+
+// Build a Sleep action CbObject and populate the chunk resolver with the input attachment.
+static CbObject
+BuildSleepActionForSession(std::string_view Input, uint64_t SleepTimeMs, InMemoryChunkResolver& Resolver)
+{
+ CompressedBuffer InputCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Input.data(), Input.size()),
+ OodleCompressor::Selkie,
+ OodleCompressionLevel::HyperFast4);
+
+ const IoHash InputRawHash = InputCompressed.DecodeRawHash();
+ const uint64_t InputRawSize = Input.size();
+
+ Resolver.AddChunk(InputRawHash, InputCompressed.GetCompressed().Flatten().AsIoBuffer());
+
+ CbAttachment InputAttachment(std::move(InputCompressed), InputRawHash);
+
+ CbObjectWriter ActionWriter;
+ ActionWriter << "Function"sv
+ << "Sleep"sv;
+ ActionWriter << "FunctionVersion"sv << Guid::FromString(kSleepVersion);
+ ActionWriter << "BuildSystemVersion"sv << Guid::FromString(kBuildSystemVersion);
+ ActionWriter.BeginObject("Inputs"sv);
+ ActionWriter.BeginObject("Source"sv);
+ ActionWriter.AddAttachment("RawHash"sv, InputAttachment);
+ ActionWriter << "RawSize"sv << InputRawSize;
+ ActionWriter.EndObject();
+ ActionWriter.EndObject();
+ ActionWriter.BeginObject("Constants"sv);
+ ActionWriter << "SleepTimeMs"sv << SleepTimeMs;
+ ActionWriter.EndObject();
+
+ return ActionWriter.Save();
+}
+
+static HttpClient::Response
+PollForResult(HttpClient& Client, const std::string& ResultUrl, uint64_t TimeoutMs = 30'000)
+{
+ HttpClient::Response Resp;
+ Stopwatch Timer;
+
+ while (Timer.GetElapsedTimeMs() < TimeoutMs)
+ {
+ Resp = Client.Get(ResultUrl);
+
+ if (Resp.StatusCode == HttpResponseCode::OK)
+ {
+ break;
+ }
+
+ Sleep(100);
+ }
+
+ return Resp;
+}
+
+static bool
+PollForLsnInCompleted(HttpClient& Client, const std::string& CompletedUrl, int Lsn, uint64_t TimeoutMs = 30'000)
+{
+ Stopwatch Timer;
+
+ while (Timer.GetElapsedTimeMs() < TimeoutMs)
+ {
+ HttpClient::Response Resp = Client.Get(CompletedUrl);
+
+ if (Resp)
+ {
+ for (auto& Item : Resp.AsObject()["completed"sv])
+ {
+ if (Item.AsInt32() == Lsn)
+ {
+ return true;
+ }
+ }
+ }
+
+ Sleep(100);
+ }
+
+ return false;
+}
+
+static std::string
+GetRot13Output(const CbPackage& ResultPackage)
+{
+ CbObject ResultObj = ResultPackage.GetObject();
+
+ IoHash OutputHash;
+ CbFieldView ValuesField = ResultObj["Values"sv];
+
+ if (CbFieldViewIterator It = begin(ValuesField); It.HasValue())
+ {
+ OutputHash = (*It).AsObjectView()["RawHash"sv].AsHash();
+ }
+
+ REQUIRE_MESSAGE(OutputHash != IoHash::Zero, "Expected non-zero output hash in result Values array");
+
+ const CbAttachment* OutputAttachment = ResultPackage.FindAttachment(OutputHash);
+ REQUIRE_MESSAGE(OutputAttachment != nullptr, "Output attachment not found in result package");
+
+ CompressedBuffer OutputCompressed = OutputAttachment->AsCompressedBinary();
+ SharedBuffer OutputData = OutputCompressed.Decompress();
+
+ return std::string(static_cast<const char*>(OutputData.GetData()), OutputData.GetSize());
+}
+
+// Mock orchestrator HTTP service that serves GET /orch/agents with a controllable response.
+class MockOrchestratorService : public HttpService
+{
+public:
+ MockOrchestratorService()
+ {
+ // Initialize with empty worker list
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("workers"sv);
+ Cbo.EndArray();
+ m_WorkerList = Cbo.Save();
+ }
+
+ const char* BaseUri() const override { return "/orch/"; }
+
+ void HandleRequest(HttpServerRequest& Request) override
+ {
+ if (Request.RequestVerb() == HttpVerb::kGet && Request.RelativeUri() == "agents"sv)
+ {
+ RwLock::SharedLockScope Lock(m_Lock);
+ Request.WriteResponse(HttpResponseCode::OK, m_WorkerList);
+ return;
+ }
+ Request.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ void SetWorkerList(CbObject WorkerList)
+ {
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+ m_WorkerList = std::move(WorkerList);
+ }
+
+private:
+ RwLock m_Lock;
+ CbObject m_WorkerList;
+};
+
+// Manages in-process ASIO HTTP server lifecycle for mock orchestrator.
+struct MockOrchestratorFixture
+{
+ MockOrchestratorService Service;
+ ScopedTemporaryDirectory TmpDir;
+ Ref<HttpServer> Server;
+ std::thread ServerThread;
+ uint16_t Port = 0;
+
+ MockOrchestratorFixture()
+ {
+ HttpServerConfig Config;
+ Config.ServerClass = "asio";
+ Config.ForceLoopback = true;
+ Server = CreateHttpServer(Config);
+ Server->RegisterService(Service);
+ Port = static_cast<uint16_t>(Server->Initialize(TestEnv.GetNewPortNumber(), TmpDir.Path()));
+ ZEN_ASSERT(Port != 0);
+ ServerThread = std::thread([this]() { Server->Run(false); });
+ }
+
+ ~MockOrchestratorFixture()
+ {
+ Server->RequestExit();
+ if (ServerThread.joinable())
+ {
+ ServerThread.join();
+ }
+ Server->Close();
+ }
+
+ std::string GetEndpoint() const { return fmt::format("http://localhost:{}", Port); }
+};
+
+// Build the CbObject response for /orch/agents matching the format UpdateCoordinatorState expects.
+static CbObject
+BuildAgentListResponse(std::initializer_list<std::pair<std::string_view, std::string_view>> Workers)
+{
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("workers"sv);
+ for (const auto& [Id, Uri] : Workers)
+ {
+ Cbo.BeginObject();
+ Cbo << "id"sv << Id;
+ Cbo << "uri"sv << Uri;
+ Cbo << "hostname"sv
+ << "localhost"sv;
+ Cbo << "reachable"sv << true;
+ Cbo << "dt"sv << uint64_t(0);
+ Cbo.EndObject();
+ }
+ Cbo.EndArray();
+ return Cbo.Save();
+}
+
+// Build the worker CbPackage for zentest-appstub AND populate the chunk resolver.
+// This is the same logic as RegisterWorker() but returns the package instead of POSTing it.
+static CbPackage
+BuildWorkerPackage(ZenServerEnvironment& Env, InMemoryChunkResolver& Resolver)
+{
+ std::filesystem::path AppStubPath = Env.ProgramBaseDir() / ("zentest-appstub" ZEN_EXE_SUFFIX_LITERAL);
+
+ FileContents AppStubData = zen::ReadFile(AppStubPath);
+ REQUIRE_MESSAGE(!AppStubData.ErrorCode, fmt::format("Failed to read '{}': {}", AppStubPath.string(), AppStubData.ErrorCode.message()));
+
+ IoBuffer AppStubBuffer = AppStubData.Flatten();
+
+ CompressedBuffer AppStubCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(AppStubBuffer.GetData(), AppStubBuffer.Size()),
+ OodleCompressor::Selkie,
+ OodleCompressionLevel::HyperFast4);
+
+ const IoHash AppStubRawHash = AppStubCompressed.DecodeRawHash();
+ const uint64_t AppStubRawSize = AppStubBuffer.Size();
+
+ // Store compressed data in chunk resolver for when the remote runner needs it
+ Resolver.AddChunk(AppStubRawHash, AppStubCompressed.GetCompressed().Flatten().AsIoBuffer());
+
+ CbAttachment AppStubAttachment(std::move(AppStubCompressed), AppStubRawHash);
+
+ CbObjectWriter WorkerWriter;
+ WorkerWriter << "buildsystem_version"sv << Guid::FromString(kBuildSystemVersion);
+ WorkerWriter << "path"sv
+ << "zentest-appstub"sv;
+
+ WorkerWriter.BeginArray("executables"sv);
+ WorkerWriter.BeginObject();
+ WorkerWriter << "name"sv
+ << "zentest-appstub"sv;
+ WorkerWriter.AddAttachment("hash"sv, AppStubAttachment);
+ WorkerWriter << "size"sv << AppStubRawSize;
+ WorkerWriter.EndObject();
+ WorkerWriter.EndArray();
+
+ WorkerWriter.BeginArray("functions"sv);
+ WorkerWriter.BeginObject();
+ WorkerWriter << "name"sv
+ << "Rot13"sv;
+ WorkerWriter << "version"sv << Guid::FromString(kRot13Version);
+ WorkerWriter.EndObject();
+ WorkerWriter.BeginObject();
+ WorkerWriter << "name"sv
+ << "Sleep"sv;
+ WorkerWriter << "version"sv << Guid::FromString(kSleepVersion);
+ WorkerWriter.EndObject();
+ WorkerWriter.EndArray();
+
+ CbPackage WorkerPackage;
+ WorkerPackage.SetObject(WorkerWriter.Save());
+ WorkerPackage.AddAttachment(AppStubAttachment);
+
+ return WorkerPackage;
+}
+
+// Build a Rot13 action CbObject (not CbPackage) and populate the chunk resolver with the input attachment.
+static CbObject
+BuildRot13ActionForSession(std::string_view Input, InMemoryChunkResolver& Resolver)
+{
+ CompressedBuffer InputCompressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Input.data(), Input.size()),
+ OodleCompressor::Selkie,
+ OodleCompressionLevel::HyperFast4);
+
+ const IoHash InputRawHash = InputCompressed.DecodeRawHash();
+ const uint64_t InputRawSize = Input.size();
+
+ // Store compressed data in chunk resolver
+ Resolver.AddChunk(InputRawHash, InputCompressed.GetCompressed().Flatten().AsIoBuffer());
+
+ CbAttachment InputAttachment(std::move(InputCompressed), InputRawHash);
+
+ CbObjectWriter ActionWriter;
+ ActionWriter << "Function"sv
+ << "Rot13"sv;
+ ActionWriter << "FunctionVersion"sv << Guid::FromString(kRot13Version);
+ ActionWriter << "BuildSystemVersion"sv << Guid::FromString(kBuildSystemVersion);
+ ActionWriter.BeginObject("Inputs"sv);
+ ActionWriter.BeginObject("Source"sv);
+ ActionWriter.AddAttachment("RawHash"sv, InputAttachment);
+ ActionWriter << "RawSize"sv << InputRawSize;
+ ActionWriter.EndObject();
+ ActionWriter.EndObject();
+
+ return ActionWriter.Save();
+}
+
+TEST_SUITE_BEGIN("server.function");
+
+TEST_CASE("function.rot13")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Submit action via legacy /jobs/{worker} endpoint
+ const std::string JobUrl = fmt::format("/jobs/{}", WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello World"sv));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText()));
+
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from job submission");
+
+ // Poll for result via legacy /jobs/{lsn} endpoint
+ const std::string ResultUrl = fmt::format("/jobs/{}", Lsn);
+ HttpClient::Response ResultResp = PollForResult(Client, ResultUrl);
+ REQUIRE_MESSAGE(
+ ResultResp.StatusCode == HttpResponseCode::OK,
+ fmt::format("Job did not complete in time. Last status: {}\nServer log:\n{}", int(ResultResp.StatusCode), Instance.GetLogOutput()));
+
+ // Verify result: Rot13("Hello World") == "Uryyb Jbeyq"
+ CbPackage ResultPackage = ResultResp.AsPackage();
+ REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Action failed (empty result package)\nServer log:\n{}", Instance.GetLogOutput()));
+
+ CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv);
+}
+
+TEST_CASE("function.workers")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ // Before registration, GET /workers should return an empty list
+ HttpClient::Response EmptyListResp = Client.Get("/workers"sv);
+ REQUIRE_MESSAGE(EmptyListResp, "Failed to list workers before registration");
+ CHECK_EQ(EmptyListResp.AsObject()["workers"sv].AsArrayView().Num(), 0);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // GET /workers — the registered worker should appear in the listing
+ HttpClient::Response ListResp = Client.Get("/workers"sv);
+ REQUIRE_MESSAGE(ListResp, "Failed to list workers after registration");
+
+ bool WorkerFound = false;
+ for (auto& Item : ListResp.AsObject()["workers"sv])
+ {
+ if (Item.AsHash() == WorkerId)
+ {
+ WorkerFound = true;
+ break;
+ }
+ }
+
+ REQUIRE_MESSAGE(WorkerFound, fmt::format("Worker {} not found in worker listing", WorkerId.ToHexString()));
+
+ // GET /workers/{worker} — descriptor should match what was registered
+ const std::string WorkerUrl = fmt::format("/workers/{}", WorkerId.ToHexString());
+ HttpClient::Response DescResp = Client.Get(WorkerUrl);
+ REQUIRE_MESSAGE(DescResp, fmt::format("Failed to get worker descriptor: status={}", int(DescResp.StatusCode)));
+
+ CbObject Desc = DescResp.AsObject();
+ CHECK_EQ(Desc["buildsystem_version"sv].AsUuid(), Guid::FromString(kBuildSystemVersion));
+ CHECK_EQ(Desc["path"sv].AsString(), "zentest-appstub"sv);
+
+ bool Rot13Found = false;
+ bool SleepFound = false;
+ for (auto& Item : Desc["functions"sv])
+ {
+ std::string_view Name = Item.AsObjectView()["name"sv].AsString();
+ if (Name == "Rot13"sv)
+ {
+ CHECK_EQ(Item.AsObjectView()["version"sv].AsUuid(), Guid::FromString(kRot13Version));
+ Rot13Found = true;
+ }
+ else if (Name == "Sleep"sv)
+ {
+ CHECK_EQ(Item.AsObjectView()["version"sv].AsUuid(), Guid::FromString(kSleepVersion));
+ SleepFound = true;
+ }
+ }
+
+ CHECK_MESSAGE(Rot13Found, "Rot13 function not found in worker descriptor");
+ CHECK_MESSAGE(SleepFound, "Sleep function not found in worker descriptor");
+
+ // GET /workers/{unknown} — should return 404
+ const std::string UnknownUrl = fmt::format("/workers/{}", IoHash::Zero.ToHexString());
+ HttpClient::Response NotFoundResp = Client.Get(UnknownUrl);
+ CHECK_EQ(NotFoundResp.StatusCode, HttpResponseCode::NotFound);
+}
+
+TEST_CASE("function.queues.lifecycle")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Create a queue
+ HttpClient::Response CreateResp = Client.Post("/queues"sv);
+ REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}, body={}", int(CreateResp.StatusCode), CreateResp.ToText()));
+
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id from queue creation");
+
+ // Verify the queue appears in the listing
+ HttpClient::Response ListResp = Client.Get("/queues"sv);
+ REQUIRE_MESSAGE(ListResp, "Failed to list queues");
+
+ bool QueueFound = false;
+ for (auto& Item : ListResp.AsObject()["queues"sv])
+ {
+ if (Item.AsObjectView()["queue_id"sv].AsInt32() == QueueId)
+ {
+ QueueFound = true;
+ break;
+ }
+ }
+
+ REQUIRE_MESSAGE(QueueFound, fmt::format("Queue {} not found in queue listing", QueueId));
+
+ // Submit action via queue-scoped endpoint
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello World"sv));
+ REQUIRE_MESSAGE(SubmitResp,
+ fmt::format("Queue job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText()));
+
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from queue job submission");
+
+ // Poll for completion via queue-scoped /completed endpoint
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
+
+ // Retrieve result via queue-scoped /jobs/{lsn} endpoint
+ const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn);
+ HttpClient::Response ResultResp = Client.Get(ResultUrl);
+ REQUIRE_MESSAGE(
+ ResultResp.StatusCode == HttpResponseCode::OK,
+ fmt::format("Failed to retrieve result: status={}\nServer log:\n{}", int(ResultResp.StatusCode), Instance.GetLogOutput()));
+
+ // Verify result: Rot13("Hello World") == "Uryyb Jbeyq"
+ CbPackage ResultPackage = ResultResp.AsPackage();
+ REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Empty result package\nServer log:\n{}", Instance.GetLogOutput()));
+
+ CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv);
+
+ // Verify queue status reflects completion
+ const std::string StatusUrl = fmt::format("/queues/{}", QueueId);
+ HttpClient::Response StatusResp = Client.Get(StatusUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0);
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 0);
+ CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "active");
+}
+
+TEST_CASE("function.queues.cancel")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Create a queue
+ HttpClient::Response CreateResp = Client.Post("/queues"sv);
+ REQUIRE_MESSAGE(CreateResp, "Queue creation failed");
+
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id from queue creation");
+
+ // Submit a job
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello World"sv));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText()));
+
+ // Cancel the queue
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ HttpClient::Response CancelResp = Client.Delete(QueueUrl);
+ REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent,
+ fmt::format("Queue cancellation failed: status={}, body={}", int(CancelResp.StatusCode), CancelResp.ToText()));
+
+ // Verify queue status shows cancelled
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status after cancel");
+
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "cancelled");
+}
+
+TEST_CASE("function.queues.remote")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Create a remote queue — response includes both an integer queue_id and an OID queue_token
+ HttpClient::Response CreateResp = Client.Post("/queues/remote"sv);
+ REQUIRE_MESSAGE(CreateResp,
+ fmt::format("Remote queue creation failed: status={}, body={}", int(CreateResp.StatusCode), CreateResp.ToText()));
+
+ CbObject CreateObj = CreateResp.AsObject();
+ const std::string QueueToken = std::string(CreateObj["queue_token"sv].AsString());
+ REQUIRE_MESSAGE(!QueueToken.empty(), "Expected non-empty queue_token from remote queue creation");
+
+ // All subsequent requests use the opaque token in place of the integer queue id
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueToken, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello World"sv));
+ REQUIRE_MESSAGE(SubmitResp,
+ fmt::format("Remote queue job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText()));
+
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from remote queue job submission");
+
+ // Poll for completion via the token-addressed /completed endpoint
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueToken);
+ REQUIRE_MESSAGE(
+ PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in remote queue completed list within timeout\nServer log:\n{}", Lsn, Instance.GetLogOutput()));
+
+ // Retrieve result via the token-addressed /jobs/{lsn} endpoint
+ const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueToken, Lsn);
+ HttpClient::Response ResultResp = Client.Get(ResultUrl);
+ REQUIRE_MESSAGE(ResultResp.StatusCode == HttpResponseCode::OK,
+ fmt::format("Failed to retrieve result from remote queue: status={}\nServer log:\n{}",
+ int(ResultResp.StatusCode),
+ Instance.GetLogOutput()));
+
+ // Verify result: Rot13("Hello World") == "Uryyb Jbeyq"
+ CbPackage ResultPackage = ResultResp.AsPackage();
+ REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Empty result package\nServer log:\n{}", Instance.GetLogOutput()));
+
+ CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv);
+}
+
+TEST_CASE("function.queues.cancel_running")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Create a queue
+ HttpClient::Response CreateResp = Client.Post("/queues"sv);
+ REQUIRE_MESSAGE(CreateResp, "Queue creation failed");
+
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id from queue creation");
+
+ // Submit a Sleep job long enough that it will still be running when we cancel
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildSleepActionPackage("data"sv, 30'000));
+ REQUIRE_MESSAGE(SubmitResp,
+ fmt::format("Sleep job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText()));
+
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission");
+
+ // Wait for the worker process to start executing before cancelling
+ Sleep(1'000);
+
+ // Cancel the queue, which should interrupt the running Sleep job
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ HttpClient::Response CancelResp = Client.Delete(QueueUrl);
+ REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent,
+ fmt::format("Queue cancellation failed: status={}, body={}", int(CancelResp.StatusCode), CancelResp.ToText()));
+
+ // The cancelled job should appear in the /completed endpoint once the process exits
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in queue {} completed list after cancel\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
+
+ // Verify the queue reflects one cancelled action
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status after cancel");
+
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "cancelled");
+ CHECK_EQ(QueueStatus["cancelled_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+}
+
+TEST_CASE("function.queues.remote_cancel")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Create a remote queue to obtain an OID token for token-addressed cancellation
+ HttpClient::Response CreateResp = Client.Post("/queues/remote"sv);
+ REQUIRE_MESSAGE(CreateResp,
+ fmt::format("Remote queue creation failed: status={}, body={}", int(CreateResp.StatusCode), CreateResp.ToText()));
+
+ const std::string QueueToken = std::string(CreateResp.AsObject()["queue_token"sv].AsString());
+ REQUIRE_MESSAGE(!QueueToken.empty(), "Expected non-empty queue_token from remote queue creation");
+
+ // Submit a long-running Sleep job via the token-addressed endpoint
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueToken, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildSleepActionPackage("data"sv, 30'000));
+ REQUIRE_MESSAGE(SubmitResp,
+ fmt::format("Sleep job submission failed: status={}, body={}", int(SubmitResp.StatusCode), SubmitResp.ToText()));
+
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission");
+
+ // Wait for the worker process to start executing before cancelling
+ Sleep(1'000);
+
+ // Cancel the queue via its OID token
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueToken);
+ HttpClient::Response CancelResp = Client.Delete(QueueUrl);
+ REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent,
+ fmt::format("Remote queue cancellation failed: status={}, body={}", int(CancelResp.StatusCode), CancelResp.ToText()));
+
+ // The cancelled job should appear in the token-addressed /completed endpoint
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueToken);
+ REQUIRE_MESSAGE(
+ PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in remote queue completed list after cancel\nServer log:\n{}", Lsn, Instance.GetLogOutput()));
+
+ // Verify the queue status reflects the cancellation
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get remote queue status after cancel");
+
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "cancelled");
+ CHECK_EQ(QueueStatus["cancelled_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+}
+
+TEST_CASE("function.queues.drain")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Create a queue
+ HttpClient::Response CreateResp = Client.Post("/queues"sv);
+ REQUIRE_MESSAGE(CreateResp, "Queue creation failed");
+
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+
+ // Submit a long-running job so we can verify it completes even after drain
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response Submit1 = Client.Post(JobUrl, BuildSleepActionPackage("data"sv, 2'000));
+ REQUIRE_MESSAGE(Submit1, fmt::format("First job submission failed: status={}", int(Submit1.StatusCode)));
+ const int Lsn1 = Submit1.AsObject()["lsn"sv].AsInt32();
+
+ // Drain the queue
+ const std::string DrainUrl = fmt::format("/queues/{}/drain", QueueId);
+ HttpClient::Response DrainResp = Client.Post(DrainUrl);
+ REQUIRE_MESSAGE(DrainResp, fmt::format("Drain failed: status={}, body={}", int(DrainResp.StatusCode), DrainResp.ToText()));
+ CHECK_EQ(std::string(DrainResp.AsObject()["state"sv].AsString()), "draining");
+
+ // Second submission should be rejected with 424
+ HttpClient::Response Submit2 = Client.Post(JobUrl, BuildRot13ActionPackage("Hello"sv));
+ CHECK_EQ(Submit2.StatusCode, HttpResponseCode::FailedDependency);
+ CHECK_EQ(std::string(Submit2.AsObject()["error"sv].AsString()), "queue is draining");
+
+ // First job should still complete
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn1),
+ fmt::format("LSN {} did not complete after drain\nServer log:\n{}", Lsn1, Instance.GetLogOutput()));
+
+ // Queue status should show draining + complete
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(std::string(QueueStatus["state"sv].AsString()), "draining");
+ CHECK(QueueStatus["is_complete"sv].AsBool());
+}
+
+TEST_CASE("function.priority")
+{
+ // Spawn server with max-actions=1 to guarantee serialized action execution,
+ // which lets us deterministically verify that higher-priority pending jobs
+ // are scheduled before lower-priority ones.
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady("--max-actions=1");
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Create a queue for all test jobs
+ HttpClient::Response CreateResp = Client.Post("/queues"sv);
+ REQUIRE_MESSAGE(CreateResp, "Queue creation failed");
+
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id");
+
+ // Submit a blocker Sleep job to occupy the single execution slot.
+ // Once the blocker is running, the scheduler must choose among the pending
+ // jobs by priority when the slot becomes free.
+ const std::string BlockerJobUrl = fmt::format("/queues/{}/jobs/{}?priority=0", QueueId, WorkerId.ToHexString());
+ HttpClient::Response BlockerResp = Client.Post(BlockerJobUrl, BuildSleepActionPackage("data"sv, 1'000));
+ REQUIRE_MESSAGE(BlockerResp, fmt::format("Blocker job submission failed: status={}", int(BlockerResp.StatusCode)));
+
+ // Submit 3 low-priority Rot13 jobs
+ const std::string LowJobUrl = fmt::format("/queues/{}/jobs/{}?priority=0", QueueId, WorkerId.ToHexString());
+
+ HttpClient::Response LowResp1 = Client.Post(LowJobUrl, BuildRot13ActionPackage("low1"sv));
+ REQUIRE_MESSAGE(LowResp1, "Low-priority job 1 submission failed");
+ const int LsnLow1 = LowResp1.AsObject()["lsn"sv].AsInt32();
+
+ HttpClient::Response LowResp2 = Client.Post(LowJobUrl, BuildRot13ActionPackage("low2"sv));
+ REQUIRE_MESSAGE(LowResp2, "Low-priority job 2 submission failed");
+ const int LsnLow2 = LowResp2.AsObject()["lsn"sv].AsInt32();
+
+ HttpClient::Response LowResp3 = Client.Post(LowJobUrl, BuildRot13ActionPackage("low3"sv));
+ REQUIRE_MESSAGE(LowResp3, "Low-priority job 3 submission failed");
+ const int LsnLow3 = LowResp3.AsObject()["lsn"sv].AsInt32();
+
+ // Submit 1 high-priority Rot13 job — should execute before the low-priority ones
+ const std::string HighJobUrl = fmt::format("/queues/{}/jobs/{}?priority=10", QueueId, WorkerId.ToHexString());
+ HttpClient::Response HighResp = Client.Post(HighJobUrl, BuildRot13ActionPackage("high"sv));
+ REQUIRE_MESSAGE(HighResp, "High-priority job submission failed");
+ const int LsnHigh = HighResp.AsObject()["lsn"sv].AsInt32();
+
+ // Wait for all 4 priority-test jobs to appear in the queue's completed list.
+ // This avoids any snapshot-timing race: by the time we compare timestamps, all
+ // jobs have already finished and their history entries are stable.
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+
+ {
+ bool AllCompleted = false;
+ Stopwatch WaitTimer;
+
+ while (!AllCompleted && WaitTimer.GetElapsedTimeMs() < 30'000)
+ {
+ HttpClient::Response Resp = Client.Get(CompletedUrl);
+
+ if (Resp)
+ {
+ bool FoundHigh = false;
+ bool FoundLow1 = false;
+ bool FoundLow2 = false;
+ bool FoundLow3 = false;
+
+ CbObject RespObj = Resp.AsObject();
+
+ for (auto& Item : RespObj["completed"sv])
+ {
+ const int Lsn = Item.AsInt32();
+ if (Lsn == LsnHigh)
+ {
+ FoundHigh = true;
+ }
+ else if (Lsn == LsnLow1)
+ {
+ FoundLow1 = true;
+ }
+ else if (Lsn == LsnLow2)
+ {
+ FoundLow2 = true;
+ }
+ else if (Lsn == LsnLow3)
+ {
+ FoundLow3 = true;
+ }
+ }
+
+ AllCompleted = FoundHigh && FoundLow1 && FoundLow2 && FoundLow3;
+ }
+
+ if (!AllCompleted)
+ {
+ Sleep(100);
+ }
+ }
+
+ REQUIRE_MESSAGE(
+ AllCompleted,
+ fmt::format(
+ "Not all priority test jobs completed within timeout (lsnHigh={} lsnLow1={} lsnLow2={} lsnLow3={})\nServer log:\n{}",
+ LsnHigh,
+ LsnLow1,
+ LsnLow2,
+ LsnLow3,
+ Instance.GetLogOutput()));
+ }
+
+ // Query the queue-scoped history to obtain the time_Completed timestamp for each
+ // job. The history endpoint records when each RunnerAction::State transition
+ // occurred, so time_Completed is the wall-clock tick at which the action finished.
+ // Using the queue-scoped endpoint avoids exposing history from other queues.
+ const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
+ HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
+ REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
+
+ CbObject HistoryObj = HistoryResp.AsObject();
+
+ auto GetCompletedTimestamp = [&](int Lsn) -> uint64_t {
+ for (auto& Item : HistoryObj["history"sv])
+ {
+ if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ {
+ return Item.AsObjectView()["time_Completed"sv].AsUInt64();
+ }
+ }
+ return 0;
+ };
+
+ const uint64_t TimeHigh = GetCompletedTimestamp(LsnHigh);
+ const uint64_t TimeLow1 = GetCompletedTimestamp(LsnLow1);
+ const uint64_t TimeLow2 = GetCompletedTimestamp(LsnLow2);
+ const uint64_t TimeLow3 = GetCompletedTimestamp(LsnLow3);
+
+ REQUIRE_MESSAGE(TimeHigh != 0, fmt::format("lsnHigh={} not found in action history", LsnHigh));
+ REQUIRE_MESSAGE(TimeLow1 != 0, fmt::format("lsnLow1={} not found in action history", LsnLow1));
+ REQUIRE_MESSAGE(TimeLow2 != 0, fmt::format("lsnLow2={} not found in action history", LsnLow2));
+ REQUIRE_MESSAGE(TimeLow3 != 0, fmt::format("lsnLow3={} not found in action history", LsnLow3));
+
+ // The high-priority job must have completed strictly before every low-priority job
+ CHECK_MESSAGE(TimeHigh < TimeLow1,
+ fmt::format("Priority ordering violated: lsnHigh={} completed at t={} but lsnLow1={} completed at t={} (expected later)",
+ LsnHigh,
+ TimeHigh,
+ LsnLow1,
+ TimeLow1));
+ CHECK_MESSAGE(TimeHigh < TimeLow2,
+ fmt::format("Priority ordering violated: lsnHigh={} completed at t={} but lsnLow2={} completed at t={} (expected later)",
+ LsnHigh,
+ TimeHigh,
+ LsnLow2,
+ TimeLow2));
+ CHECK_MESSAGE(TimeHigh < TimeLow3,
+ fmt::format("Priority ordering violated: lsnHigh={} completed at t={} but lsnLow3={} completed at t={} (expected later)",
+ LsnHigh,
+ TimeHigh,
+ LsnLow3,
+ TimeLow3));
+}
+
+//////////////////////////////////////////////////////////////////////////
+// Remote worker synchronization tests
+//
+// These tests exercise the orchestrator discovery path where new compute
+// nodes appear over time and must receive previously registered workers
+// via SyncWorkersToRunner().
+
+TEST_CASE("function.remote.worker_sync_on_discovery")
+{
+ // Spawn real zenserver in compute mode
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t ServerPort = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(ServerPort != 0, Instance.GetLogOutput());
+
+ const std::string ServerUri = fmt::format("http://localhost:{}", ServerPort);
+
+ // Start mock orchestrator with empty worker list
+ MockOrchestratorFixture MockOrch;
+
+ // Create session infrastructure
+ InMemoryChunkResolver Resolver;
+ ScopedTemporaryDirectory SessionBaseDir;
+ zen::compute::ComputeServiceSession Session(Resolver);
+ Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint());
+ Session.SetOrchestratorBasePath(SessionBaseDir.Path());
+ Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready);
+
+ // Register worker on session (stored locally, no runners yet)
+ CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
+ Session.RegisterWorker(WorkerPackage);
+
+ // Update mock orchestrator to advertise the real server
+ MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", ServerUri}}));
+
+ // Wait for scheduler to discover the runner (~5s throttle + margin)
+ Sleep(7'000);
+
+ // Submit Rot13 action via session
+ CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver);
+
+ zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueAction(ActionObj, 0);
+ REQUIRE_MESSAGE(EnqueueRes, "Action enqueue failed");
+
+ // Poll for result
+ CbPackage ResultPackage;
+ HttpResponseCode ResultCode = HttpResponseCode::Accepted;
+ Stopwatch Timer;
+
+ while (Timer.GetElapsedTimeMs() < 30'000)
+ {
+ ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage);
+ if (ResultCode == HttpResponseCode::OK)
+ {
+ break;
+ }
+ Sleep(200);
+ }
+
+ REQUIRE_MESSAGE(
+ ResultCode == HttpResponseCode::OK,
+ fmt::format("Action did not complete in time. Last status: {}\nServer log:\n{}", int(ResultCode), Instance.GetLogOutput()));
+
+ REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Empty result package\nServer log:\n{}", Instance.GetLogOutput()));
+
+ CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv);
+
+ Session.Shutdown();
+}
+
+TEST_CASE("function.remote.late_runner_discovery")
+{
+ // Spawn first server
+ ZenServerInstance Instance1(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance1.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port1 = Instance1.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port1 != 0, Instance1.GetLogOutput());
+
+ const std::string ServerUri1 = fmt::format("http://localhost:{}", Port1);
+
+ // Start mock orchestrator advertising W1
+ MockOrchestratorFixture MockOrch;
+ MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", ServerUri1}}));
+
+ // Create session and register worker
+ InMemoryChunkResolver Resolver;
+ ScopedTemporaryDirectory SessionBaseDir;
+ zen::compute::ComputeServiceSession Session(Resolver);
+ Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint());
+ Session.SetOrchestratorBasePath(SessionBaseDir.Path());
+ Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready);
+
+ CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
+ Session.RegisterWorker(WorkerPackage);
+
+ // Wait for W1 discovery
+ Sleep(7'000);
+
+ // Baseline: submit Rot13 action and verify it completes on W1
+ {
+ CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver);
+
+ zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueAction(ActionObj, 0);
+ REQUIRE_MESSAGE(EnqueueRes, "Baseline action enqueue failed");
+
+ CbPackage ResultPackage;
+ HttpResponseCode ResultCode = HttpResponseCode::Accepted;
+ Stopwatch Timer;
+
+ while (Timer.GetElapsedTimeMs() < 30'000)
+ {
+ ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage);
+ if (ResultCode == HttpResponseCode::OK)
+ {
+ break;
+ }
+ Sleep(200);
+ }
+
+ REQUIRE_MESSAGE(ResultCode == HttpResponseCode::OK,
+ fmt::format("Baseline action did not complete in time\nServer log:\n{}", Instance1.GetLogOutput()));
+
+ CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv);
+ }
+
+ // Spawn second server
+ ZenServerInstance Instance2(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance2.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port2 = Instance2.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port2 != 0, Instance2.GetLogOutput());
+
+ const std::string ServerUri2 = fmt::format("http://localhost:{}", Port2);
+
+ // Update mock orchestrator to include both W1 and W2
+ MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", ServerUri1}, {"worker-2", ServerUri2}}));
+
+ // Wait for W2 discovery
+ Sleep(7'000);
+
+ // Verify W2 received the worker by querying its /compute/workers endpoint directly
+ {
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port2);
+ HttpClient Client(ComputeBaseUri);
+ HttpClient::Response ListResp = Client.Get("/workers"sv);
+ REQUIRE_MESSAGE(ListResp, "Failed to list workers on W2");
+
+ bool WorkerFound = false;
+ for (auto& Item : ListResp.AsObject()["workers"sv])
+ {
+ if (Item.AsHash() == WorkerPackage.GetObjectHash())
+ {
+ WorkerFound = true;
+ break;
+ }
+ }
+
+ REQUIRE_MESSAGE(WorkerFound,
+ fmt::format("Worker not found on W2 after discovery — SyncWorkersToRunner may have failed\nW2 log:\n{}",
+ Instance2.GetLogOutput()));
+ }
+
+ // Submit another action and verify it completes (could run on either W1 or W2)
+ {
+ CbObject ActionObj = BuildRot13ActionForSession("Second Test"sv, Resolver);
+
+ zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueAction(ActionObj, 0);
+ REQUIRE_MESSAGE(EnqueueRes, "Second action enqueue failed");
+
+ CbPackage ResultPackage;
+ HttpResponseCode ResultCode = HttpResponseCode::Accepted;
+ Stopwatch Timer;
+
+ while (Timer.GetElapsedTimeMs() < 30'000)
+ {
+ ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage);
+ if (ResultCode == HttpResponseCode::OK)
+ {
+ break;
+ }
+ Sleep(200);
+ }
+
+ REQUIRE_MESSAGE(ResultCode == HttpResponseCode::OK,
+ fmt::format("Second action did not complete in time\nW1 log:\n{}\nW2 log:\n{}",
+ Instance1.GetLogOutput(),
+ Instance2.GetLogOutput()));
+
+ // Rot13("Second Test") = "Frpbaq Grfg"
+ CHECK_EQ(GetRot13Output(ResultPackage), "Frpbaq Grfg"sv);
+ }
+
+ Session.Shutdown();
+}
+
+TEST_CASE("function.remote.queue_association")
+{
+ // Spawn real zenserver as a remote compute node
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ REQUIRE_MESSAGE(Instance.SpawnServerAndWaitUntilReady() != 0, Instance.GetLogOutput());
+
+ // Start mock orchestrator advertising the server
+ MockOrchestratorFixture MockOrch;
+ MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", Instance.GetBaseUri()}}));
+
+ // Create session infrastructure
+ InMemoryChunkResolver Resolver;
+ ScopedTemporaryDirectory SessionBaseDir;
+ zen::compute::ComputeServiceSession Session(Resolver);
+ Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint());
+ Session.SetOrchestratorBasePath(SessionBaseDir.Path());
+ Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready);
+
+ // Register worker on session
+ CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
+ Session.RegisterWorker(WorkerPackage);
+
+ // Wait for scheduler to discover the runner
+ Sleep(7'000);
+
+ // Create a local queue and submit action to it
+ auto QueueResult = Session.CreateQueue();
+ REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create local queue");
+ const int QueueId = QueueResult.QueueId;
+
+ CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver);
+
+ zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueActionToQueue(QueueId, ActionObj, 0);
+ REQUIRE_MESSAGE(EnqueueRes, "Action enqueue to queue failed");
+
+ // Poll for result
+ CbPackage ResultPackage;
+ HttpResponseCode ResultCode = HttpResponseCode::Accepted;
+ Stopwatch Timer;
+
+ while (Timer.GetElapsedTimeMs() < 30'000)
+ {
+ ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage);
+ if (ResultCode == HttpResponseCode::OK)
+ {
+ break;
+ }
+ Sleep(200);
+ }
+
+ REQUIRE_MESSAGE(
+ ResultCode == HttpResponseCode::OK,
+ fmt::format("Action did not complete in time. Last status: {}\nServer log:\n{}", int(ResultCode), Instance.GetLogOutput()));
+
+ REQUIRE_MESSAGE(bool(ResultPackage), fmt::format("Empty result package\nServer log:\n{}", Instance.GetLogOutput()));
+ CHECK_EQ(GetRot13Output(ResultPackage), "Uryyb Jbeyq"sv);
+
+ // Verify that a non-implicit remote queue was created on the compute node
+ HttpClient Client(Instance.GetBaseUri() + "/compute");
+
+ HttpClient::Response QueuesResp = Client.Get("/queues"sv);
+ REQUIRE_MESSAGE(QueuesResp, "Failed to list queues on remote server");
+
+ bool RemoteQueueFound = false;
+ for (auto& Item : QueuesResp.AsObject()["queues"sv])
+ {
+ if (!Item.AsObjectView()["implicit"sv].AsBool())
+ {
+ RemoteQueueFound = true;
+ break;
+ }
+ }
+
+ CHECK_MESSAGE(RemoteQueueFound, "Expected a non-implicit remote queue on the compute node");
+
+ Session.Shutdown();
+}
+
+TEST_CASE("function.remote.queue_cancel_propagation")
+{
+ // Spawn real zenserver as a remote compute node
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ REQUIRE_MESSAGE(Instance.SpawnServerAndWaitUntilReady() != 0, Instance.GetLogOutput());
+
+ // Start mock orchestrator advertising the server
+ MockOrchestratorFixture MockOrch;
+ MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", Instance.GetBaseUri()}}));
+
+ // Create session infrastructure
+ InMemoryChunkResolver Resolver;
+ ScopedTemporaryDirectory SessionBaseDir;
+ zen::compute::ComputeServiceSession Session(Resolver);
+ Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint());
+ Session.SetOrchestratorBasePath(SessionBaseDir.Path());
+ Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready);
+
+ // Register worker on session
+ CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
+ Session.RegisterWorker(WorkerPackage);
+
+ // Wait for scheduler to discover the runner
+ Sleep(7'000);
+
+ // Create a local queue and submit a long-running Sleep action
+ auto QueueResult = Session.CreateQueue();
+ REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create local queue");
+ const int QueueId = QueueResult.QueueId;
+
+ CbObject ActionObj = BuildSleepActionForSession("data"sv, 30'000, Resolver);
+
+ zen::compute::ComputeServiceSession::EnqueueResult EnqueueRes = Session.EnqueueActionToQueue(QueueId, ActionObj, 0);
+ REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed");
+
+ // Wait for the action to start running on the remote
+ Sleep(2'000);
+
+ // Cancel the local queue — this should propagate to the remote
+ Session.CancelQueue(QueueId);
+
+ // Poll for the action to complete (as cancelled)
+ CbPackage ResultPackage;
+ HttpResponseCode ResultCode = HttpResponseCode::Accepted;
+ Stopwatch Timer;
+
+ while (Timer.GetElapsedTimeMs() < 30'000)
+ {
+ ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage);
+ if (ResultCode == HttpResponseCode::OK)
+ {
+ break;
+ }
+ Sleep(200);
+ }
+
+ // Verify the local queue shows cancelled
+ auto QueueStatus = Session.GetQueueStatus(QueueId);
+ CHECK(QueueStatus.State == zen::compute::ComputeServiceSession::QueueState::Cancelled);
+
+ // Verify the remote queue on the compute node is also cancelled
+ HttpClient Client(Instance.GetBaseUri() + "/compute");
+
+ HttpClient::Response QueuesResp = Client.Get("/queues"sv);
+ REQUIRE_MESSAGE(QueuesResp, "Failed to list queues on remote server");
+
+ bool RemoteQueueCancelled = false;
+ for (auto& Item : QueuesResp.AsObject()["queues"sv])
+ {
+ if (!Item.AsObjectView()["implicit"sv].AsBool())
+ {
+ RemoteQueueCancelled = std::string(Item.AsObjectView()["state"sv].AsString()) == "cancelled";
+ break;
+ }
+ }
+
+ CHECK_MESSAGE(RemoteQueueCancelled, "Expected the remote queue to be cancelled");
+
+ Session.Shutdown();
+}
+
+TEST_CASE("function.abandon_running_http")
+{
+ // Spawn a real zenserver to execute a long-running action, then abandon via HTTP endpoint
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
+ HttpClient Client(ComputeBaseUri);
+
+ const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+
+ // Create a queue and submit a long-running Sleep job
+ HttpClient::Response CreateResp = Client.Post("/queues"sv);
+ REQUIRE_MESSAGE(CreateResp, "Queue creation failed");
+
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ REQUIRE_MESSAGE(QueueId != 0, "Expected non-zero queue_id");
+
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildSleepActionPackage("data"sv, 30'000));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Sleep job submission failed: status={}", int(SubmitResp.StatusCode)));
+
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN");
+
+ // Wait for the process to start running
+ Sleep(1'000);
+
+ // Verify the ready endpoint returns OK before abandon
+ {
+ HttpClient::Response ReadyResp = Client.Get("/ready"sv);
+ CHECK(ReadyResp.StatusCode == HttpResponseCode::OK);
+ }
+
+ // Trigger abandon via the HTTP endpoint
+ HttpClient::Response AbandonResp = Client.Post("/abandon"sv);
+ REQUIRE_MESSAGE(AbandonResp.StatusCode == HttpResponseCode::OK,
+ fmt::format("Abandon request failed: status={}, body={}", int(AbandonResp.StatusCode), AbandonResp.ToText()));
+
+ // Ready endpoint should now return 503
+ {
+ HttpClient::Response ReadyResp = Client.Get("/ready"sv);
+ CHECK(ReadyResp.StatusCode == HttpResponseCode::ServiceUnavailable);
+ }
+
+ // The abandoned action should appear in the completed endpoint once the process exits
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in queue {} completed list after abandon\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
+
+ // Verify the queue reflects one abandoned action
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status after abandon");
+
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["abandoned_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+ CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0);
+
+ // Submitting new work should be rejected
+ HttpClient::Response RejectedResp = Client.Post(JobUrl, BuildRot13ActionPackage("rejected"sv));
+ CHECK_MESSAGE(RejectedResp.StatusCode != HttpResponseCode::OK, "Expected action submission to be rejected in Abandoned state");
+}
+
+TEST_CASE("function.session.abandon_pending")
+{
+ // Create a session with no runners so actions stay pending
+ InMemoryChunkResolver Resolver;
+ ScopedTemporaryDirectory SessionBaseDir;
+ zen::compute::ComputeServiceSession Session(Resolver);
+ Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready);
+
+ CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
+ Session.RegisterWorker(WorkerPackage);
+
+ // Enqueue several actions — they will stay pending because there are no runners
+ auto QueueResult = Session.CreateQueue();
+ REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create queue");
+
+ CbObject ActionObj = BuildRot13ActionForSession("abandon-test"sv, Resolver);
+
+ auto Enqueue1 = Session.EnqueueActionToQueue(QueueResult.QueueId, ActionObj, 0);
+ auto Enqueue2 = Session.EnqueueActionToQueue(QueueResult.QueueId, ActionObj, 0);
+ auto Enqueue3 = Session.EnqueueActionToQueue(QueueResult.QueueId, ActionObj, 0);
+ REQUIRE_MESSAGE(Enqueue1, "Failed to enqueue action 1");
+ REQUIRE_MESSAGE(Enqueue2, "Failed to enqueue action 2");
+ REQUIRE_MESSAGE(Enqueue3, "Failed to enqueue action 3");
+
+ // Transition to Abandoned — should mark all pending actions as Abandoned
+ bool Transitioned = Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Abandoned);
+ CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned");
+ CHECK(Session.GetSessionState() == zen::compute::ComputeServiceSession::SessionState::Abandoned);
+ CHECK(!Session.IsHealthy());
+
+ // Give the scheduler thread time to process the state changes
+ Sleep(2'000);
+
+ // All three actions should now be in the results map as abandoned
+ for (int Lsn : {Enqueue1.Lsn, Enqueue2.Lsn, Enqueue3.Lsn})
+ {
+ CbPackage Result;
+ HttpResponseCode Code = Session.GetActionResult(Lsn, Result);
+ CHECK_MESSAGE(Code == HttpResponseCode::OK, fmt::format("Expected action LSN {} to be in results (got {})", Lsn, int(Code)));
+ }
+
+ // Queue should show 0 active, 3 abandoned
+ auto Status = Session.GetQueueStatus(QueueResult.QueueId);
+ CHECK_EQ(Status.ActiveCount, 0);
+ CHECK_EQ(Status.AbandonedCount, 3);
+
+ // New actions should be rejected
+ auto Rejected = Session.EnqueueActionToQueue(QueueResult.QueueId, ActionObj, 0);
+ CHECK_MESSAGE(!Rejected, "Expected action submission to be rejected in Abandoned state");
+
+ // Abandoned → Sunset should be valid
+ CHECK(Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Sunset));
+
+ Session.Shutdown();
+}
+
+TEST_CASE("function.session.abandon_running")
+{
+ // Spawn a real zenserver as a remote compute node
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ REQUIRE_MESSAGE(Instance.SpawnServerAndWaitUntilReady() != 0, Instance.GetLogOutput());
+
+ // Start mock orchestrator advertising the server
+ MockOrchestratorFixture MockOrch;
+ MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", Instance.GetBaseUri()}}));
+
+ // Create session infrastructure
+ InMemoryChunkResolver Resolver;
+ ScopedTemporaryDirectory SessionBaseDir;
+ zen::compute::ComputeServiceSession Session(Resolver);
+ Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint());
+ Session.SetOrchestratorBasePath(SessionBaseDir.Path());
+ Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready);
+
+ CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
+ Session.RegisterWorker(WorkerPackage);
+
+ // Wait for scheduler to discover the runner
+ Sleep(7'000);
+
+ // Create a queue and submit a long-running Sleep action
+ auto QueueResult = Session.CreateQueue();
+ REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create queue");
+ const int QueueId = QueueResult.QueueId;
+
+ CbObject ActionObj = BuildSleepActionForSession("data"sv, 30'000, Resolver);
+
+ auto EnqueueRes = Session.EnqueueActionToQueue(QueueId, ActionObj, 0);
+ REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed");
+
+ // Wait for the action to start running on the remote
+ Sleep(2'000);
+
+ // Transition to Abandoned — should abandon the running action
+ bool Transitioned = Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Abandoned);
+ CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned");
+ CHECK(!Session.IsHealthy());
+
+ // Poll for the action to complete (as abandoned)
+ CbPackage ResultPackage;
+ HttpResponseCode ResultCode = HttpResponseCode::Accepted;
+ Stopwatch Timer;
+
+ while (Timer.GetElapsedTimeMs() < 30'000)
+ {
+ ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage);
+ if (ResultCode == HttpResponseCode::OK)
+ {
+ break;
+ }
+ Sleep(200);
+ }
+
+ REQUIRE_MESSAGE(ResultCode == HttpResponseCode::OK,
+ fmt::format("Action did not complete within timeout\nServer log:\n{}", Instance.GetLogOutput()));
+
+ // Verify the queue shows abandoned, not completed
+ auto QueueStatus = Session.GetQueueStatus(QueueId);
+ CHECK_EQ(QueueStatus.ActiveCount, 0);
+ CHECK_EQ(QueueStatus.AbandonedCount, 1);
+ CHECK_EQ(QueueStatus.CompletedCount, 0);
+
+ Session.Shutdown();
+}
+
+TEST_CASE("function.remote.abandon_propagation")
+{
+ // Spawn real zenserver as a remote compute node
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ REQUIRE_MESSAGE(Instance.SpawnServerAndWaitUntilReady() != 0, Instance.GetLogOutput());
+
+ // Start mock orchestrator advertising the server
+ MockOrchestratorFixture MockOrch;
+ MockOrch.Service.SetWorkerList(BuildAgentListResponse({{"worker-1", Instance.GetBaseUri()}}));
+
+ // Create session infrastructure
+ InMemoryChunkResolver Resolver;
+ ScopedTemporaryDirectory SessionBaseDir;
+ zen::compute::ComputeServiceSession Session(Resolver);
+ Session.SetOrchestratorEndpoint(MockOrch.GetEndpoint());
+ Session.SetOrchestratorBasePath(SessionBaseDir.Path());
+ Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Ready);
+
+ // Register worker on session
+ CbPackage WorkerPackage = BuildWorkerPackage(TestEnv, Resolver);
+ Session.RegisterWorker(WorkerPackage);
+
+ // Wait for scheduler to discover the runner
+ Sleep(7'000);
+
+ // Create a local queue and submit a long-running Sleep action
+ auto QueueResult = Session.CreateQueue();
+ REQUIRE_MESSAGE(QueueResult.QueueId != 0, "Failed to create local queue");
+ const int QueueId = QueueResult.QueueId;
+
+ CbObject ActionObj = BuildSleepActionForSession("data"sv, 30'000, Resolver);
+
+ auto EnqueueRes = Session.EnqueueActionToQueue(QueueId, ActionObj, 0);
+ REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed");
+
+ // Wait for the action to start running on the remote
+ Sleep(2'000);
+
+ // Transition to Abandoned — should abandon the running action and propagate
+ bool Transitioned = Session.RequestStateTransition(zen::compute::ComputeServiceSession::SessionState::Abandoned);
+ CHECK_MESSAGE(Transitioned, "Failed to transition to Abandoned");
+
+ // Poll for the action to complete
+ CbPackage ResultPackage;
+ HttpResponseCode ResultCode = HttpResponseCode::Accepted;
+ Stopwatch Timer;
+
+ while (Timer.GetElapsedTimeMs() < 30'000)
+ {
+ ResultCode = Session.GetActionResult(EnqueueRes.Lsn, ResultPackage);
+ if (ResultCode == HttpResponseCode::OK)
+ {
+ break;
+ }
+ Sleep(200);
+ }
+
+ REQUIRE_MESSAGE(ResultCode == HttpResponseCode::OK,
+ fmt::format("Action did not complete within timeout\nServer log:\n{}", Instance.GetLogOutput()));
+
+ // Verify the local queue shows abandoned
+ auto QueueStatus = Session.GetQueueStatus(QueueId);
+ CHECK_EQ(QueueStatus.ActiveCount, 0);
+ CHECK_EQ(QueueStatus.AbandonedCount, 1);
+
+ // Session should not be healthy
+ CHECK(!Session.IsHealthy());
+
+ // The remote compute node should still be healthy (only the parent abandoned)
+ HttpClient RemoteClient(Instance.GetBaseUri() + "/compute");
+ HttpClient::Response ReadyResp = RemoteClient.Get("/ready"sv);
+ CHECK_MESSAGE(ReadyResp.StatusCode == HttpResponseCode::OK, "Remote compute node should still be healthy");
+
+ Session.Shutdown();
+}
+
+TEST_SUITE_END();
+
+} // namespace zen::tests::compute
+
+#endif
diff --git a/src/zenserver-test/hub-tests.cpp b/src/zenserver-test/hub-tests.cpp
index 42a5dcae4..11531e30f 100644
--- a/src/zenserver-test/hub-tests.cpp
+++ b/src/zenserver-test/hub-tests.cpp
@@ -24,7 +24,7 @@ namespace zen::tests::hub {
using namespace std::literals;
-TEST_SUITE_BEGIN("hub.lifecycle");
+TEST_SUITE_BEGIN("server.hub");
TEST_CASE("hub.lifecycle.basic")
{
@@ -230,9 +230,7 @@ TEST_CASE("hub.lifecycle.children")
}
}
-TEST_SUITE_END();
-
-TEST_CASE("hub.consul.lifecycle")
+TEST_CASE("hub.consul.lifecycle" * doctest::skip())
{
zen::consul::ConsulProcess ConsulProc;
ConsulProc.SpawnConsulAgent();
@@ -248,5 +246,7 @@ TEST_CASE("hub.consul.lifecycle")
ConsulProc.StopConsulAgent();
}
+TEST_SUITE_END();
+
} // namespace zen::tests::hub
#endif
diff --git a/src/zenserver-test/logging-tests.cpp b/src/zenserver-test/logging-tests.cpp
new file mode 100644
index 000000000..2e530ff92
--- /dev/null
+++ b/src/zenserver-test/logging-tests.cpp
@@ -0,0 +1,261 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zencore/zencore.h>
+
+#if ZEN_WITH_TESTS
+
+# include "zenserver-test.h"
+
+# include <zencore/filesystem.h>
+# include <zencore/logging.h>
+# include <zencore/testing.h>
+# include <zenutil/zenserverprocess.h>
+
+namespace zen::tests {
+
+using namespace std::literals;
+
+TEST_SUITE_BEGIN("server.logging");
+
+//////////////////////////////////////////////////////////////////////////
+
+static bool
+LogContains(const std::string& Log, std::string_view Needle)
+{
+ return Log.find(Needle) != std::string::npos;
+}
+
+static std::string
+ReadFileToString(const std::filesystem::path& Path)
+{
+ FileContents Contents = ReadFile(Path);
+ if (Contents.ErrorCode)
+ {
+ return {};
+ }
+
+ IoBuffer Content = Contents.Flatten();
+ if (!Content)
+ {
+ return {};
+ }
+
+ return std::string(static_cast<const char*>(Content.Data()), Content.Size());
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+// Verify that a log file is created at the default location (DataDir/logs/zenserver.log)
+// even without --abslog. The file must contain "server session id" (logged at INFO
+// to all registered loggers during init) and "log starting at" (emitted once a file
+// sink is first opened).
+TEST_CASE("logging.file.default")
+{
+ const std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+
+ ZenServerInstance Instance(TestEnv);
+ Instance.SetDataDir(TestDir);
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
+ CHECK_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ Instance.Shutdown();
+
+ const std::filesystem::path DefaultLogFile = TestDir / "logs" / "zenserver.log";
+ CHECK_MESSAGE(std::filesystem::exists(DefaultLogFile), "Default log file was not created");
+ const std::string FileLog = ReadFileToString(DefaultLogFile);
+ CHECK_MESSAGE(LogContains(FileLog, "server session id"), FileLog);
+ CHECK_MESSAGE(LogContains(FileLog, "log starting at"), FileLog);
+}
+
+// --quiet sets the console sink level to WARN. The formatted "[info] ..."
+// entry written by the default logger's console sink must therefore not appear
+// in captured stdout. (The "console" named logger — used by ZEN_CONSOLE_*
+// macros — may still emit plain-text messages without a level marker, so we
+// check for the absence of the FullFormatter "[info]" prefix rather than the
+// message text itself.)
+TEST_CASE("logging.console.quiet")
+{
+ ZenServerInstance Instance(TestEnv);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady("--quiet");
+ CHECK_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ Instance.Shutdown();
+
+ const std::string Log = Instance.GetLogOutput();
+ CHECK_MESSAGE(!LogContains(Log, "[info] server session id"), Log);
+}
+
+// --noconsole removes the stdout sink entirely, so the captured console output
+// must not contain any log entries from the logging system.
+TEST_CASE("logging.console.disabled")
+{
+ ZenServerInstance Instance(TestEnv);
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady("--noconsole");
+ CHECK_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ Instance.Shutdown();
+
+ const std::string Log = Instance.GetLogOutput();
+ CHECK_MESSAGE(!LogContains(Log, "server session id"), Log);
+}
+
+// --abslog <path> creates a rotating log file at the specified path.
+// The file must contain "server session id" (logged at INFO to all loggers
+// during init) and "log starting at" (emitted once a file sink is active).
+TEST_CASE("logging.file.basic")
+{
+ const std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+ const std::filesystem::path LogFile = TestDir / "test.log";
+
+ ZenServerInstance Instance(TestEnv);
+ Instance.SetDataDir(TestDir);
+
+ const std::string LogArg = fmt::format("--abslog {}", LogFile.string());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(LogArg);
+ CHECK_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ Instance.Shutdown();
+
+ CHECK_MESSAGE(std::filesystem::exists(LogFile), "Log file was not created");
+ const std::string FileLog = ReadFileToString(LogFile);
+ CHECK_MESSAGE(LogContains(FileLog, "server session id"), FileLog);
+ CHECK_MESSAGE(LogContains(FileLog, "log starting at"), FileLog);
+}
+
+// --abslog with a .json extension selects the JSON formatter.
+// Each log entry must be a JSON object containing at least the "message"
+// and "source" fields.
+TEST_CASE("logging.file.json")
+{
+ const std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+ const std::filesystem::path LogFile = TestDir / "test.json";
+
+ ZenServerInstance Instance(TestEnv);
+ Instance.SetDataDir(TestDir);
+
+ const std::string LogArg = fmt::format("--abslog {}", LogFile.string());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(LogArg);
+ CHECK_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ Instance.Shutdown();
+
+ CHECK_MESSAGE(std::filesystem::exists(LogFile), "JSON log file was not created");
+ const std::string FileLog = ReadFileToString(LogFile);
+ CHECK_MESSAGE(LogContains(FileLog, "\"message\""), FileLog);
+ CHECK_MESSAGE(LogContains(FileLog, "\"source\": \"zenserver\""), FileLog);
+ CHECK_MESSAGE(LogContains(FileLog, "server session id"), FileLog);
+}
+
+// --log-id <id> is automatically set to the server instance name in test mode.
+// The JSON formatter emits this value as the "id" field, so every entry in a
+// .json log file must carry a non-empty "id".
+TEST_CASE("logging.log_id")
+{
+ const std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+ const std::filesystem::path LogFile = TestDir / "test.json";
+
+ ZenServerInstance Instance(TestEnv);
+ Instance.SetDataDir(TestDir);
+
+ const std::string LogArg = fmt::format("--abslog {}", LogFile.string());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(LogArg);
+ CHECK_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ Instance.Shutdown();
+
+ CHECK_MESSAGE(std::filesystem::exists(LogFile), "JSON log file was not created");
+ const std::string FileLog = ReadFileToString(LogFile);
+ // The JSON formatter writes the log-id as: "id": "<value>",
+ CHECK_MESSAGE(LogContains(FileLog, "\"id\": \""), FileLog);
+}
+
+// --log-warn <logger> raises the level threshold above INFO so that INFO messages
+// are filtered. "server session id" is broadcast at INFO to all loggers: it must
+// appear in the main file sink (default logger unaffected) but must NOT appear in
+// http.log where the http_requests logger now has a WARN threshold.
+TEST_CASE("logging.level.warn_suppresses_info")
+{
+ const std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+ const std::filesystem::path LogFile = TestDir / "test.log";
+
+ ZenServerInstance Instance(TestEnv);
+ Instance.SetDataDir(TestDir);
+
+ const std::string LogArg = fmt::format("--abslog {} --log-warn http_requests", LogFile.string());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(LogArg);
+ CHECK_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ Instance.Shutdown();
+
+ CHECK_MESSAGE(std::filesystem::exists(LogFile), "Log file was not created");
+ const std::string FileLog = ReadFileToString(LogFile);
+ CHECK_MESSAGE(LogContains(FileLog, "server session id"), FileLog);
+
+ const std::filesystem::path HttpLogFile = TestDir / "logs" / "http.log";
+ CHECK_MESSAGE(std::filesystem::exists(HttpLogFile), "http.log was not created");
+ const std::string HttpLog = ReadFileToString(HttpLogFile);
+ CHECK_MESSAGE(!LogContains(HttpLog, "server session id"), HttpLog);
+}
+
+// --log-info <logger> sets an explicit INFO threshold. The INFO "server session id"
+// broadcast must still land in http.log, confirming that INFO messages are not
+// filtered when the logger level is exactly INFO.
+TEST_CASE("logging.level.info_allows_info")
+{
+ const std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+ const std::filesystem::path LogFile = TestDir / "test.log";
+
+ ZenServerInstance Instance(TestEnv);
+ Instance.SetDataDir(TestDir);
+
+ const std::string LogArg = fmt::format("--abslog {} --log-info http_requests", LogFile.string());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(LogArg);
+ CHECK_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ Instance.Shutdown();
+
+ const std::filesystem::path HttpLogFile = TestDir / "logs" / "http.log";
+ CHECK_MESSAGE(std::filesystem::exists(HttpLogFile), "http.log was not created");
+ const std::string HttpLog = ReadFileToString(HttpLogFile);
+ CHECK_MESSAGE(LogContains(HttpLog, "server session id"), HttpLog);
+}
+
+// --log-off <logger> silences a named logger entirely.
+// "server session id" is broadcast at INFO to all registered loggers via
+// spdlog::apply_all during init. When the "http_requests" logger is set to
+// OFF its dedicated http.log file must not contain that message.
+// The main file sink (via --abslog) must be unaffected.
+TEST_CASE("logging.level.off_specific_logger")
+{
+ const std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+ const std::filesystem::path LogFile = TestDir / "test.log";
+
+ ZenServerInstance Instance(TestEnv);
+ Instance.SetDataDir(TestDir);
+
+ const std::string LogArg = fmt::format("--abslog {} --log-off http_requests", LogFile.string());
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(LogArg);
+ CHECK_MESSAGE(Port != 0, Instance.GetLogOutput());
+
+ Instance.Shutdown();
+
+ // Main log file must still have the startup message
+ CHECK_MESSAGE(std::filesystem::exists(LogFile), "Log file was not created");
+ const std::string FileLog = ReadFileToString(LogFile);
+ CHECK_MESSAGE(LogContains(FileLog, "server session id"), FileLog);
+
+ // http.log is created by the RotatingFileSink but the logger is OFF, so
+ // the broadcast "server session id" message must not have been written to it
+ const std::filesystem::path HttpLogFile = TestDir / "logs" / "http.log";
+ CHECK_MESSAGE(std::filesystem::exists(HttpLogFile), "http.log was not created");
+ const std::string HttpLog = ReadFileToString(HttpLogFile);
+ CHECK_MESSAGE(!LogContains(HttpLog, "server session id"), HttpLog);
+}
+
+TEST_SUITE_END();
+
+} // namespace zen::tests
+
+#endif
diff --git a/src/zenserver-test/nomad-tests.cpp b/src/zenserver-test/nomad-tests.cpp
new file mode 100644
index 000000000..f8f5a9a30
--- /dev/null
+++ b/src/zenserver-test/nomad-tests.cpp
@@ -0,0 +1,130 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#if ZEN_WITH_TESTS && ZEN_WITH_NOMAD
+# include "zenserver-test.h"
+# include <zencore/filesystem.h>
+# include <zencore/logging.h>
+# include <zencore/testing.h>
+# include <zencore/timer.h>
+# include <zenhttp/httpclient.h>
+# include <zennomad/nomadclient.h>
+# include <zennomad/nomadprocess.h>
+# include <zenutil/zenserverprocess.h>
+
+# include <fmt/format.h>
+
+namespace zen::tests::nomad_tests {
+
+using namespace std::literals;
+
+TEST_SUITE_BEGIN("server.nomad");
+
+TEST_CASE("nomad.client.lifecycle" * doctest::skip())
+{
+ zen::nomad::NomadProcess NomadProc;
+ NomadProc.SpawnNomadAgent();
+
+ zen::nomad::NomadTestClient Client("http://localhost:4646/");
+
+ // Submit a simple batch job that sleeps briefly
+# if ZEN_PLATFORM_WINDOWS
+ auto Job = Client.SubmitJob("zen-test-job", "cmd.exe", {"/C", "timeout /t 10 /nobreak"});
+# else
+ auto Job = Client.SubmitJob("zen-test-job", "/bin/sleep", {"10"});
+# endif
+ REQUIRE(!Job.Id.empty());
+ CHECK_EQ(Job.Status, "pending");
+
+ // Poll until the job is running (or dead)
+ {
+ Stopwatch Timer;
+ bool FoundRunning = false;
+ while (Timer.GetElapsedTimeMs() < 15000)
+ {
+ auto Status = Client.GetJobStatus("zen-test-job");
+ if (Status.Status == "running")
+ {
+ FoundRunning = true;
+ break;
+ }
+ if (Status.Status == "dead")
+ {
+ break;
+ }
+ Sleep(500);
+ }
+ CHECK(FoundRunning);
+ }
+
+ // Verify allocations exist
+ auto Allocs = Client.GetAllocations("zen-test-job");
+ CHECK(!Allocs.empty());
+
+ // Stop the job
+ Client.StopJob("zen-test-job");
+
+ // Verify it reaches dead state
+ {
+ Stopwatch Timer;
+ bool FoundDead = false;
+ while (Timer.GetElapsedTimeMs() < 10000)
+ {
+ auto Status = Client.GetJobStatus("zen-test-job");
+ if (Status.Status == "dead")
+ {
+ FoundDead = true;
+ break;
+ }
+ Sleep(500);
+ }
+ CHECK(FoundDead);
+ }
+
+ NomadProc.StopNomadAgent();
+}
+
+TEST_CASE("nomad.provisioner.integration" * doctest::skip())
+{
+ zen::nomad::NomadProcess NomadProc;
+ NomadProc.SpawnNomadAgent();
+
+ // Spawn zenserver in compute mode with Nomad provisioning enabled
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
+
+ Instance.SetDataDir(TestEnv.CreateNewTestDir());
+
+ std::filesystem::path ZenServerPath = TestEnv.ProgramBaseDir() / "zenserver" ZEN_EXE_SUFFIX_LITERAL;
+
+ std::string NomadArgs = fmt::format(
+ "--nomad-enabled=true"
+ " --nomad-server=http://localhost:4646"
+ " --nomad-driver=raw_exec"
+ " --nomad-binary-path={}"
+ " --nomad-max-cores=32"
+ " --nomad-cores-per-job=32",
+ ZenServerPath.string());
+
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(NomadArgs);
+ REQUIRE(Port != 0);
+
+ // Give the provisioner time to submit jobs.
+ // The management thread has a 5s wait between cycles, and the HTTP client has
+ // a 10s connect timeout, so we need to allow enough time for at least one full cycle.
+ Sleep(15000);
+
+ // Verify jobs were submitted to Nomad
+ zen::nomad::NomadTestClient NomadClient("http://localhost:4646/");
+
+ auto Jobs = NomadClient.ListJobs("zenserver-worker");
+
+ ZEN_INFO("nomad.provisioner.integration: found {} jobs with prefix 'zenserver-worker'", Jobs.size());
+ CHECK_MESSAGE(!Jobs.empty(), Instance.GetLogOutput());
+
+ Instance.Shutdown();
+ NomadProc.StopNomadAgent();
+}
+
+TEST_SUITE_END();
+
+} // namespace zen::tests::nomad_tests
+#endif
diff --git a/src/zenserver-test/objectstore-tests.cpp b/src/zenserver-test/objectstore-tests.cpp
new file mode 100644
index 000000000..f3db5fdf6
--- /dev/null
+++ b/src/zenserver-test/objectstore-tests.cpp
@@ -0,0 +1,74 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#if ZEN_WITH_TESTS
+# include "zenserver-test.h"
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+# include <zenutil/zenserverprocess.h>
+# include <zenhttp/httpclient.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+# include <tsl/robin_set.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen::tests {
+
+using namespace std::literals;
+
+TEST_SUITE_BEGIN("server.objectstore");
+
+TEST_CASE("objectstore.blobs")
+{
+ std::string_view Bucket = "bkt"sv;
+
+ std::vector<IoHash> CompressedBlobsHashes;
+ std::vector<uint64_t> BlobsSizes;
+ std::vector<uint64_t> CompressedBlobsSizes;
+ {
+ ZenServerInstance Instance(TestEnv);
+
+ const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady(fmt::format("--objectstore-enabled"));
+ CHECK(PortNumber != 0);
+
+ HttpClient Client(Instance.GetBaseUri() + "/obj/");
+
+ for (size_t I = 0; I < 5; I++)
+ {
+ IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7);
+ BlobsSizes.push_back(Blob.GetSize());
+ CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob)));
+ CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash());
+ CompressedBlobsSizes.push_back(CompressedBlob.GetCompressedSize());
+ IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCompressedBinary);
+
+ std::string ObjectPath = fmt::format("{}/{}.utoc",
+ CompressedBlobsHashes.back().ToHexString().substr(0, 2),
+ CompressedBlobsHashes.back().ToHexString());
+
+ HttpClient::Response Result = Client.Put(fmt::format("bucket/{}/{}.utoc", Bucket, ObjectPath), Payload);
+ CHECK(Result);
+ }
+
+ for (size_t I = 0; I < 5; I++)
+ {
+ std::string ObjectPath =
+ fmt::format("{}/{}.utoc", CompressedBlobsHashes[I].ToHexString().substr(0, 2), CompressedBlobsHashes[I].ToHexString());
+ HttpClient::Response Result = Client.Get(fmt::format("bucket/{}/{}.utoc", Bucket, ObjectPath));
+ CHECK(Result);
+ CHECK_EQ(Result.ResponsePayload.GetSize(), CompressedBlobsSizes[I]);
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(std::move(Result.ResponsePayload)), RawHash, RawSize);
+ CHECK(Compressed);
+ CHECK_EQ(RawHash, CompressedBlobsHashes[I]);
+ CHECK_EQ(RawSize, BlobsSizes[I]);
+ }
+ }
+}
+
+TEST_SUITE_END();
+
+} // namespace zen::tests
+#endif
diff --git a/src/zenserver-test/projectstore-tests.cpp b/src/zenserver-test/projectstore-tests.cpp
index ead062628..eb2e187d7 100644
--- a/src/zenserver-test/projectstore-tests.cpp
+++ b/src/zenserver-test/projectstore-tests.cpp
@@ -27,6 +27,8 @@ namespace zen::tests {
using namespace std::literals;
+TEST_SUITE_BEGIN("server.projectstore");
+
TEST_CASE("project.basic")
{
using namespace std::literals;
@@ -71,7 +73,7 @@ TEST_CASE("project.basic")
{
auto Response = Http.Get("/prj/test"sv);
- CHECK(Response.StatusCode == HttpResponseCode::OK);
+ REQUIRE(Response.StatusCode == HttpResponseCode::OK);
CbObject ResponseObject = Response.AsObject();
@@ -92,7 +94,7 @@ TEST_CASE("project.basic")
{
auto Response = Http.Get(""sv);
- CHECK(Response.StatusCode == HttpResponseCode::OK);
+ REQUIRE(Response.StatusCode == HttpResponseCode::OK);
CbObject ResponseObject = Response.AsObject();
@@ -213,7 +215,7 @@ TEST_CASE("project.basic")
auto Response = Http.Get(ChunkGetUri);
REQUIRE(Response);
- CHECK(Response.StatusCode == HttpResponseCode::OK);
+ REQUIRE(Response.StatusCode == HttpResponseCode::OK);
IoBuffer Data = Response.ResponsePayload;
IoBuffer ReferenceData = IoBufferBuilder::MakeFromFile(RootPath / BinPath);
@@ -235,13 +237,13 @@ TEST_CASE("project.basic")
auto Response = Http.Get(ChunkGetUri, {{"Accept-Type", "application/x-ue-comp"}});
REQUIRE(Response);
- CHECK(Response.StatusCode == HttpResponseCode::OK);
+ REQUIRE(Response.StatusCode == HttpResponseCode::OK);
IoBuffer Data = Response.ResponsePayload;
IoHash RawHash;
uint64_t RawSize;
CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), RawHash, RawSize);
- CHECK(Compressed);
+ REQUIRE(Compressed);
IoBuffer DataDecompressed = Compressed.Decompress().AsIoBuffer();
IoBuffer ReferenceData = IoBufferBuilder::MakeFromFile(RootPath / BinPath);
CHECK(RawSize == ReferenceData.GetSize());
@@ -436,14 +438,14 @@ TEST_CASE("project.remote")
HttpClient Http{UrlBase};
HttpClient::Response Response = Http.Post(fmt::format("/prj/{}", ProjectName), ProjectPayload);
- CHECK(Response);
+ REQUIRE(Response);
};
auto MakeOplog = [](std::string_view UrlBase, std::string_view ProjectName, std::string_view OplogName) {
HttpClient Http{UrlBase};
HttpClient::Response Response =
Http.Post(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName), IoBuffer{}, ZenContentType::kCbObject);
- CHECK(Response);
+ REQUIRE(Response);
};
auto MakeOp = [](std::string_view UrlBase, std::string_view ProjectName, std::string_view OplogName, const CbPackage& OpPackage) {
@@ -454,7 +456,7 @@ TEST_CASE("project.remote")
HttpClient Http{UrlBase};
HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/new", ProjectName, OplogName), Body);
- CHECK(Response);
+ REQUIRE(Response);
};
MakeProject(Servers.GetInstance(0).GetBaseUri(), "proj0");
@@ -505,7 +507,7 @@ TEST_CASE("project.remote")
HttpClient::Response Response =
Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", Project, Oplog), Payload, {{"Accept", "application/x-ue-cbpkg"}});
- CHECK(Response);
+ REQUIRE(Response);
CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload);
CHECK(ResponsePackage.GetAttachments().size() == AttachmentHashes.size());
for (auto A : ResponsePackage.GetAttachments())
@@ -520,7 +522,7 @@ TEST_CASE("project.remote")
HttpClient Http{Servers.GetInstance(ServerIndex).GetBaseUri()};
HttpClient::Response Response = Http.Get(fmt::format("/prj/{}/oplog/{}/entries", Project, Oplog));
- CHECK(Response);
+ REQUIRE(Response);
IoBuffer Payload(Response.ResponsePayload);
CbObject OplogResonse = LoadCompactBinaryObject(Payload);
@@ -542,7 +544,7 @@ TEST_CASE("project.remote")
auto HttpWaitForCompletion = [](ZenServerInstance& Server, const HttpClient::Response& Response) {
REQUIRE(Response);
const uint64_t JobId = ParseInt<uint64_t>(Response.AsText()).value_or(0);
- CHECK(JobId != 0);
+ REQUIRE(JobId != 0);
HttpClient Http{Server.GetBaseUri()};
@@ -550,10 +552,10 @@ TEST_CASE("project.remote")
{
HttpClient::Response StatusResponse =
Http.Get(fmt::format("/admin/jobs/{}", JobId), {{"Accept", ToString(ZenContentType::kCbObject)}});
- CHECK(StatusResponse);
+ REQUIRE(StatusResponse);
CbObject ResponseObject = StatusResponse.AsObject();
std::string_view Status = ResponseObject["Status"sv].AsString();
- CHECK(Status != "Aborted"sv);
+ REQUIRE(Status != "Aborted"sv);
if (Status == "Complete"sv)
{
return;
@@ -888,17 +890,17 @@ TEST_CASE("project.rpcappendop")
Project.AddString("project"sv, ""sv);
Project.AddString("projectfile"sv, ""sv);
HttpClient::Response Response = Client.Post(fmt::format("/prj/{}", ProjectName), Project.Save());
- CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage(""));
+ REQUIRE_MESSAGE(Response.IsSuccess(), Response.ErrorMessage(""));
};
auto MakeOplog = [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName) {
HttpClient::Response Response =
Client.Post(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName), IoBuffer{}, ZenContentType::kCbObject);
- CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage(""));
+ REQUIRE_MESSAGE(Response.IsSuccess(), Response.ErrorMessage(""));
};
auto GetOplog = [](HttpClient& Client, std::string_view ProjectName, std::string_view OplogName) {
HttpClient::Response Response = Client.Get(fmt::format("/prj/{}/oplog/{}", ProjectName, OplogName));
- CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage(""));
+ REQUIRE_MESSAGE(Response.IsSuccess(), Response.ErrorMessage(""));
return Response.AsObject();
};
@@ -912,7 +914,7 @@ TEST_CASE("project.rpcappendop")
}
Request.EndArray(); // "ops"
HttpClient::Response Response = Client.Post(fmt::format("/prj/{}/oplog/{}/rpc", ProjectName, OplogName), Request.Save());
- CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage(""));
+ REQUIRE_MESSAGE(Response.IsSuccess(), Response.ErrorMessage(""));
CbObjectView ResponsePayload = Response.AsPackage().GetObject();
CbArrayView NeedArray = ResponsePayload["need"sv].AsArrayView();
@@ -1055,6 +1057,8 @@ TEST_CASE("project.rpcappendop")
}
}
+TEST_SUITE_END();
+
} // namespace zen::tests
#endif
diff --git a/src/zenserver-test/workspace-tests.cpp b/src/zenserver-test/workspace-tests.cpp
index 7595d790a..655f28872 100644
--- a/src/zenserver-test/workspace-tests.cpp
+++ b/src/zenserver-test/workspace-tests.cpp
@@ -73,6 +73,8 @@ GenerateFolderContent2(const std::filesystem::path& RootPath)
return Result;
}
+TEST_SUITE_BEGIN("server.workspace");
+
TEST_CASE("workspaces.create")
{
using namespace std::literals;
@@ -514,9 +516,9 @@ TEST_CASE("workspaces.share")
}
IoBuffer BatchResponse =
Client.Post(fmt::format("/ws/{}/{}/batch", WorkspaceId, ShareId), BuildChunkBatchRequest(BatchEntries)).ResponsePayload;
- CHECK(BatchResponse);
+ REQUIRE(BatchResponse);
std::vector<IoBuffer> BatchResult = ParseChunkBatchResponse(BatchResponse);
- CHECK(BatchResult.size() == Files.size());
+ REQUIRE(BatchResult.size() == Files.size());
for (const RequestChunkEntry& Request : BatchEntries)
{
IoBuffer Result = BatchResult[Request.CorrelationId];
@@ -537,5 +539,7 @@ TEST_CASE("workspaces.share")
CHECK(Client.Get(fmt::format("/ws/{}", WorkspaceId)).StatusCode == HttpResponseCode::NotFound);
}
+TEST_SUITE_END();
+
} // namespace zen::tests
#endif
diff --git a/src/zenserver-test/xmake.lua b/src/zenserver-test/xmake.lua
index 2a269cea1..7b208bbc7 100644
--- a/src/zenserver-test/xmake.lua
+++ b/src/zenserver-test/xmake.lua
@@ -6,10 +6,15 @@ target("zenserver-test")
add_headerfiles("**.h")
add_files("*.cpp")
add_files("zenserver-test.cpp", {unity_ignored = true })
- add_deps("zencore", "zenremotestore", "zenhttp")
+ add_deps("zencore", "zenremotestore", "zenhttp", "zencompute", "zenstore")
add_deps("zenserver", {inherit=false})
+ add_deps("zentest-appstub", {inherit=false})
add_packages("http_parser")
+ if has_config("zennomad") then
+ add_deps("zennomad")
+ end
+
if is_plat("macosx") then
add_ldflags("-framework CoreFoundation")
add_ldflags("-framework Security")
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp
index 9a42bb73d..8d5400294 100644
--- a/src/zenserver-test/zenserver-test.cpp
+++ b/src/zenserver-test/zenserver-test.cpp
@@ -4,12 +4,12 @@
#if ZEN_WITH_TESTS
-# define ZEN_TEST_WITH_RUNNER 1
# include "zenserver-test.h"
# include <zencore/except.h>
# include <zencore/fmtutils.h>
# include <zencore/logging.h>
+# include <zencore/logging/registry.h>
# include <zencore/stream.h>
# include <zencore/string.h>
# include <zencore/testutils.h>
@@ -17,8 +17,8 @@
# include <zencore/timer.h>
# include <zenhttp/httpclient.h>
# include <zenhttp/packageformat.h>
-# include <zenutil/commandlineoptions.h>
-# include <zenutil/logging/testformatter.h>
+# include <zenutil/config/commandlineoptions.h>
+# include <zenutil/logging/fullformatter.h>
# include <zenutil/zenserverprocess.h>
# include <atomic>
@@ -86,8 +86,9 @@ main(int argc, char** argv)
zen::logging::InitializeLogging();
- zen::logging::SetLogLevel(zen::logging::level::Debug);
- spdlog::set_formatter(std::make_unique<zen::logging::full_test_formatter>("test", std::chrono::system_clock::now()));
+ zen::logging::SetLogLevel(zen::logging::Debug);
+ zen::logging::Registry::Instance().SetFormatter(
+ std::make_unique<zen::logging::FullFormatter>("test", std::chrono::system_clock::now()));
std::filesystem::path ProgramBaseDir = GetRunningExecutablePath().parent_path();
std::filesystem::path TestBaseDir = std::filesystem::current_path() / ".test";
@@ -97,6 +98,7 @@ main(int argc, char** argv)
// somehow in the future
std::string ServerClass;
+ bool Verbose = false;
for (int i = 1; i < argc; ++i)
{
@@ -107,13 +109,23 @@ main(int argc, char** argv)
ServerClass = argv[++i];
}
}
+ else if (argv[i] == "--verbose"sv)
+ {
+ Verbose = true;
+ }
}
zen::tests::TestEnv.InitializeForTest(ProgramBaseDir, TestBaseDir, ServerClass);
+ if (Verbose)
+ {
+ zen::tests::TestEnv.SetPassthroughOutput(true);
+ }
+
ZEN_INFO("Running tests...(base dir: '{}')", TestBaseDir);
zen::testing::TestRunner Runner;
+ Runner.SetDefaultSuiteFilter("server.*");
Runner.ApplyCommandLine(argc, argv);
return Runner.Run();
@@ -121,6 +133,8 @@ main(int argc, char** argv)
namespace zen::tests {
+TEST_SUITE_BEGIN("server.zenserver");
+
TEST_CASE("default.single")
{
std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
@@ -327,6 +341,8 @@ TEST_CASE("http.package")
CHECK_EQ(ResponsePackage, TestPackage);
}
+TEST_SUITE_END();
+
# if 0
TEST_CASE("lifetime.owner")
{