aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver-test
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver-test')
-rw-r--r--src/zenserver-test/buildstore-tests.cpp155
-rw-r--r--src/zenserver-test/cache-tests.cpp328
-rw-r--r--src/zenserver-test/compute-tests.cpp638
-rw-r--r--src/zenserver-test/hub-tests.cpp33
-rw-r--r--src/zenserver-test/logging-tests.cpp22
-rw-r--r--src/zenserver-test/objectstore-tests.cpp344
-rw-r--r--src/zenserver-test/projectstore-tests.cpp1090
-rw-r--r--src/zenserver-test/zenserver-test.cpp10
8 files changed, 1285 insertions, 1335 deletions
diff --git a/src/zenserver-test/buildstore-tests.cpp b/src/zenserver-test/buildstore-tests.cpp
index cf9b10896..1f2157993 100644
--- a/src/zenserver-test/buildstore-tests.cpp
+++ b/src/zenserver-test/buildstore-tests.cpp
@@ -148,8 +148,6 @@ TEST_CASE("buildstore.blobs")
}
{
- // Single-range Get
-
ZenServerInstance Instance(TestEnv);
const uint16_t PortNumber =
@@ -158,6 +156,7 @@ TEST_CASE("buildstore.blobs")
HttpClient Client(Instance.GetBaseUri() + "/builds/");
+ // Single-range Get
{
const IoHash& RawHash = CompressedBlobsHashes.front();
uint64_t BlobSize = CompressedBlobsSizes.front();
@@ -183,20 +182,63 @@ TEST_CASE("buildstore.blobs")
MemoryView RangeView = Payload.GetView();
CHECK(ActualRange.EqualBytes(RangeView));
}
- }
- {
- // Single-range Post
+ {
+ // GET blob not found
+ IoHash FakeHash = IoHash::HashBuffer("nonexistent", 11);
+ HttpClient::Response Result = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, FakeHash));
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound);
+ }
- ZenServerInstance Instance(TestEnv);
+ {
+ // GET with out-of-bounds range
+ const IoHash& RawHash = CompressedBlobsHashes.front();
+ uint64_t BlobSize = CompressedBlobsSizes.front();
- const uint16_t PortNumber =
- Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath));
- CHECK(PortNumber != 0);
+ HttpClient::KeyValueMap Headers;
+ Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", BlobSize + 100, BlobSize + 200)});
- HttpClient Client(Instance.GetBaseUri() + "/builds/");
+ HttpClient::Response Result = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), Headers);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::RangeNotSatisfiable);
+ }
{
+ // GET with multi-range header (uses Download for multipart boundary parsing)
+ const IoHash& RawHash = CompressedBlobsHashes.front();
+ uint64_t BlobSize = CompressedBlobsSizes.front();
+
+ uint64_t Range1Start = 0;
+ uint64_t Range1End = BlobSize / 4 - 1;
+ uint64_t Range2Start = BlobSize / 2;
+ uint64_t Range2End = BlobSize / 2 + BlobSize / 4 - 1;
+
+ HttpClient::KeyValueMap Headers;
+ Headers.Entries.insert({"Range", fmt::format("bytes={}-{},{}-{}", Range1Start, Range1End, Range2Start, Range2End)});
+
+ HttpClient::Response Result =
+ Client.Download(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), SystemRootPath, Headers);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::PartialContent);
+ REQUIRE_EQ(Result.Ranges.size(), 2);
+
+ HttpClient::Response FullBlobResult = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash),
+ HttpClient::Accept(ZenContentType::kCompressedBinary));
+ REQUIRE(FullBlobResult);
+
+ uint64_t Range1Len = Range1End - Range1Start + 1;
+ uint64_t Range2Len = Range2End - Range2Start + 1;
+
+ MemoryView ExpectedRange1 = FullBlobResult.ResponsePayload.GetView().Mid(Range1Start, Range1Len);
+ MemoryView ExpectedRange2 = FullBlobResult.ResponsePayload.GetView().Mid(Range2Start, Range2Len);
+
+ MemoryView ActualRange1 = Result.ResponsePayload.GetView().Mid(Result.Ranges[0].OffsetInPayload, Range1Len);
+ MemoryView ActualRange2 = Result.ResponsePayload.GetView().Mid(Result.Ranges[1].OffsetInPayload, Range2Len);
+
+ CHECK(ExpectedRange1.EqualBytes(ActualRange1));
+ CHECK(ExpectedRange2.EqualBytes(ActualRange2));
+ }
+
+ // Single-range Post
+ {
uint64_t RangeSizeSum = 0;
const IoHash& RawHash = CompressedBlobsHashes.front();
@@ -259,19 +301,96 @@ TEST_CASE("buildstore.blobs")
Offset += Range.second;
}
}
- }
- {
- // Multi-range
+ {
+ // POST with wrong accept type
+ const IoHash& RawHash = CompressedBlobsHashes.front();
- ZenServerInstance Instance(TestEnv);
+ CbObjectWriter Writer;
+ Writer.BeginArray("ranges"sv);
+ Writer.BeginObject();
+ Writer.AddInteger("offset"sv, uint64_t(0));
+ Writer.AddInteger("length"sv, uint64_t(10));
+ Writer.EndObject();
+ Writer.EndArray();
- const uint16_t PortNumber =
- Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath));
- CHECK(PortNumber != 0);
+ HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash),
+ Writer.Save(),
+ HttpClient::Accept(ZenContentType::kBinary));
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::BadRequest);
+ }
- HttpClient Client(Instance.GetBaseUri() + "/builds/");
+ {
+ // POST with missing payload
+ const IoHash& RawHash = CompressedBlobsHashes.front();
+
+ HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash),
+ HttpClient::Accept(ZenContentType::kCbPackage));
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::BadRequest);
+ }
+
+ {
+ // POST with empty ranges array
+ const IoHash& RawHash = CompressedBlobsHashes.front();
+ CbObjectWriter Writer;
+ Writer.BeginArray("ranges"sv);
+ Writer.EndArray();
+
+ HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash),
+ Writer.Save(),
+ HttpClient::Accept(ZenContentType::kCbPackage));
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::BadRequest);
+ }
+
+ {
+ // POST with range count exceeding maximum
+ const IoHash& RawHash = CompressedBlobsHashes.front();
+
+ CbObjectWriter Writer;
+ Writer.BeginArray("ranges"sv);
+ for (uint32_t I = 0; I < 257; I++)
+ {
+ Writer.BeginObject();
+ Writer.AddInteger("offset"sv, uint64_t(0));
+ Writer.AddInteger("length"sv, uint64_t(1));
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+
+ HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash),
+ Writer.Save(),
+ HttpClient::Accept(ZenContentType::kCbPackage));
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::BadRequest);
+ }
+
+ {
+ // POST with out-of-bounds range returns length=0
+ const IoHash& RawHash = CompressedBlobsHashes.front();
+ uint64_t BlobSize = CompressedBlobsSizes.front();
+
+ CbObjectWriter Writer;
+ Writer.BeginArray("ranges"sv);
+ Writer.BeginObject();
+ Writer.AddInteger("offset"sv, BlobSize + 100);
+ Writer.AddInteger("length"sv, uint64_t(50));
+ Writer.EndObject();
+ Writer.EndArray();
+
+ HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash),
+ Writer.Save(),
+ HttpClient::Accept(ZenContentType::kCbPackage));
+ REQUIRE(Result);
+ CbPackage ResponsePackage = ParsePackageMessage(Result.ResponsePayload);
+ CbObjectView ResponseObject = ResponsePackage.GetObject();
+ CbArrayView RangeArray = ResponseObject["ranges"sv].AsArrayView();
+ REQUIRE_EQ(RangeArray.Num(), uint64_t(1));
+ CbObjectView Range = (*begin(RangeArray)).AsObjectView();
+ CHECK_EQ(Range["offset"sv].AsUInt64(), BlobSize + 100);
+ CHECK_EQ(Range["length"sv].AsUInt64(), uint64_t(0));
+ }
+
+ // Multi-range
{
uint64_t RangeSizeSum = 0;
diff --git a/src/zenserver-test/cache-tests.cpp b/src/zenserver-test/cache-tests.cpp
index 986dc67e0..e54e7060d 100644
--- a/src/zenserver-test/cache-tests.cpp
+++ b/src/zenserver-test/cache-tests.cpp
@@ -172,143 +172,85 @@ TEST_CASE("zcache.cbpackage")
return true;
};
- SUBCASE("PUT/GET returns correct package")
- {
- std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
-
- ZenServerInstance Instance1(TestEnv);
- Instance1.SetDataDir(TestDir);
- const uint16_t PortNumber = Instance1.SpawnServerAndWaitUntilReady();
- const std::string BaseUri = fmt::format("http://localhost:{}/z$", PortNumber);
+ std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir();
+ std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir();
- HttpClient Http{BaseUri};
+ ZenServerInstance RemoteInstance(TestEnv);
+ RemoteInstance.SetDataDir(RemoteDataDir);
+ const uint16_t RemotePortNumber = RemoteInstance.SpawnServerAndWaitUntilReady();
- const std::string_view Bucket = "mosdef"sv;
- zen::IoHash Key;
- zen::CbPackage ExpectedPackage = CreateTestPackage(Key);
+ ZenServerInstance LocalInstance(TestEnv);
+ LocalInstance.SetDataDir(LocalDataDir);
+ LocalInstance.SpawnServer(TestEnv.GetNewPortNumber(),
+ fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber));
+ const uint16_t LocalPortNumber = LocalInstance.WaitUntilReady();
+ CHECK_MESSAGE(LocalPortNumber != 0, LocalInstance.GetLogOutput());
- // PUT
- {
- zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
- HttpClient::Response Result = Http.Put(fmt::format("/{}/{}", Bucket, Key), Body);
- CHECK(Result.StatusCode == HttpResponseCode::Created);
- }
+ const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber);
+ const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber);
- // GET
- {
- HttpClient::Response Result = Http.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}});
- CHECK(Result.StatusCode == HttpResponseCode::OK);
+ HttpClient LocalHttp{LocalBaseUri};
+ HttpClient RemoteHttp{RemoteBaseUri};
- zen::CbPackage Package;
- const bool Ok = Package.TryLoad(Result.ResponsePayload);
- CHECK(Ok);
- CHECK(IsEqual(Package, ExpectedPackage));
- }
- }
+ const std::string_view Bucket = "mosdef"sv;
- SUBCASE("PUT propagates upstream")
+ // Phase 1: PUT/GET returns correct package (via local)
{
- // Setup local and remote server
- std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir();
- std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir();
-
- ZenServerInstance RemoteInstance(TestEnv);
- RemoteInstance.SetDataDir(RemoteDataDir);
- const uint16_t RemotePortNumber = RemoteInstance.SpawnServerAndWaitUntilReady();
-
- ZenServerInstance LocalInstance(TestEnv);
- LocalInstance.SetDataDir(LocalDataDir);
- LocalInstance.SpawnServer(TestEnv.GetNewPortNumber(),
- fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber));
- const uint16_t LocalPortNumber = LocalInstance.WaitUntilReady();
- CHECK_MESSAGE(LocalPortNumber != 0, LocalInstance.GetLogOutput());
-
- const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber);
- const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber);
+ zen::IoHash Key1;
+ zen::CbPackage ExpectedPackage = CreateTestPackage(Key1);
- const std::string_view Bucket = "mosdef"sv;
- zen::IoHash Key;
- zen::CbPackage ExpectedPackage = CreateTestPackage(Key);
+ zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
+ HttpClient::Response PutResult = LocalHttp.Put(fmt::format("/{}/{}", Bucket, Key1), Body);
+ CHECK(PutResult.StatusCode == HttpResponseCode::Created);
- HttpClient LocalHttp{LocalBaseUri};
- HttpClient RemoteHttp{RemoteBaseUri};
+ HttpClient::Response GetResult = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key1), {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(GetResult.StatusCode == HttpResponseCode::OK);
- // Store the cache record package in the local instance
- {
- zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
- HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}", Bucket, Key), Body);
-
- CHECK(Result.StatusCode == HttpResponseCode::Created);
- }
-
- // The cache record can be retrieved as a package from the local instance
- {
- HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}});
- CHECK(Result.StatusCode == HttpResponseCode::OK);
-
- zen::CbPackage Package;
- const bool Ok = Package.TryLoad(Result.ResponsePayload);
- CHECK(Ok);
- CHECK(IsEqual(Package, ExpectedPackage));
- }
-
- // The cache record can be retrieved as a package from the remote instance
- {
- HttpClient::Response Result = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}});
- CHECK(Result.StatusCode == HttpResponseCode::OK);
-
- zen::CbPackage Package;
- const bool Ok = Package.TryLoad(Result.ResponsePayload);
- CHECK(Ok);
- CHECK(IsEqual(Package, ExpectedPackage));
- }
+ zen::CbPackage Package;
+ const bool Ok = Package.TryLoad(GetResult.ResponsePayload);
+ CHECK(Ok);
+ CHECK(IsEqual(Package, ExpectedPackage));
}
- SUBCASE("GET finds upstream when missing in local")
+ // Phase 2: PUT propagates upstream
{
- // Setup local and remote server
- std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir();
- std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir();
+ zen::IoHash Key2;
+ zen::CbPackage ExpectedPackage = CreateTestPackage(Key2);
- ZenServerInstance RemoteInstance(TestEnv);
- RemoteInstance.SetDataDir(RemoteDataDir);
- const uint16_t RemotePortNumber = RemoteInstance.SpawnServerAndWaitUntilReady();
+ zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
+ HttpClient::Response PutResult = LocalHttp.Put(fmt::format("/{}/{}", Bucket, Key2), Body);
+ CHECK(PutResult.StatusCode == HttpResponseCode::Created);
- ZenServerInstance LocalInstance(TestEnv);
- LocalInstance.SetDataDir(LocalDataDir);
- LocalInstance.SpawnServer(TestEnv.GetNewPortNumber(),
- fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber));
- const uint16_t LocalPortNumber = LocalInstance.WaitUntilReady();
- CHECK_MESSAGE(LocalPortNumber != 0, LocalInstance.GetLogOutput());
+ HttpClient::Response LocalGetResult = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key2), {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(LocalGetResult.StatusCode == HttpResponseCode::OK);
- const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber);
- const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber);
+ zen::CbPackage LocalPackage;
+ CHECK(LocalPackage.TryLoad(LocalGetResult.ResponsePayload));
+ CHECK(IsEqual(LocalPackage, ExpectedPackage));
- HttpClient LocalHttp{LocalBaseUri};
- HttpClient RemoteHttp{RemoteBaseUri};
+ HttpClient::Response RemoteGetResult = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key2), {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(RemoteGetResult.StatusCode == HttpResponseCode::OK);
- const std::string_view Bucket = "mosdef"sv;
- zen::IoHash Key;
- zen::CbPackage ExpectedPackage = CreateTestPackage(Key);
+ zen::CbPackage RemotePackage;
+ CHECK(RemotePackage.TryLoad(RemoteGetResult.ResponsePayload));
+ CHECK(IsEqual(RemotePackage, ExpectedPackage));
+ }
- // Store the cache record package in upstream cache
- {
- zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
- HttpClient::Response Result = RemoteHttp.Put(fmt::format("/{}/{}", Bucket, Key), Body);
+ // Phase 3: GET finds upstream when missing in local
+ {
+ zen::IoHash Key3;
+ zen::CbPackage ExpectedPackage = CreateTestPackage(Key3);
- CHECK(Result.StatusCode == HttpResponseCode::Created);
- }
+ zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
+ HttpClient::Response PutResult = RemoteHttp.Put(fmt::format("/{}/{}", Bucket, Key3), Body);
+ CHECK(PutResult.StatusCode == HttpResponseCode::Created);
- // The cache record can be retrieved as a package from the local cache
- {
- HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}});
- CHECK(Result.StatusCode == HttpResponseCode::OK);
+ HttpClient::Response GetResult = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key3), {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(GetResult.StatusCode == HttpResponseCode::OK);
- zen::CbPackage Package;
- const bool Ok = Package.TryLoad(Result.ResponsePayload);
- CHECK(Ok);
- CHECK(IsEqual(Package, ExpectedPackage));
- }
+ zen::CbPackage Package;
+ CHECK(Package.TryLoad(GetResult.ResponsePayload));
+ CHECK(IsEqual(Package, ExpectedPackage));
}
}
@@ -348,25 +290,25 @@ TEST_CASE("zcache.policy")
return Package;
};
- SUBCASE("query - 'local' does not query upstream (binary)")
- {
- ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
- ZenServerInstance UpstreamInst(TestEnv);
- UpstreamCfg.Spawn(UpstreamInst);
- const uint16_t UpstreamPort = UpstreamCfg.Port;
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance UpstreamInst(TestEnv);
+ UpstreamCfg.Spawn(UpstreamInst);
+
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
+ ZenServerInstance LocalInst(TestEnv);
+ LocalCfg.Spawn(LocalInst);
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamPort);
- ZenServerInstance LocalInst(TestEnv);
- LocalCfg.Spawn(LocalInst);
+ HttpClient LocalHttp{LocalCfg.BaseUri};
+ HttpClient RemoteHttp{UpstreamCfg.BaseUri};
- const std::string_view Bucket = "legacy"sv;
+ // query - 'local' does not query upstream (binary)
+ // Uses size 1024 for unique key
+ {
+ const auto Bucket = "legacy"sv;
zen::IoHash Key;
IoBuffer BinaryValue = GenerateData(1024, Key);
- HttpClient LocalHttp{LocalCfg.BaseUri};
- HttpClient RemoteHttp{UpstreamCfg.BaseUri};
-
{
HttpClient::Response Result = RemoteHttp.Put(fmt::format("/{}/{}", Bucket, Key), BinaryValue);
CHECK(Result.StatusCode == HttpResponseCode::Created);
@@ -385,26 +327,14 @@ TEST_CASE("zcache.policy")
}
}
- SUBCASE("store - 'local' does not store upstream (binary)")
+ // store - 'local' does not store upstream (binary)
+ // Uses size 2048 for unique key
{
- ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
- ZenServerInstance UpstreamInst(TestEnv);
- UpstreamCfg.Spawn(UpstreamInst);
- const uint16_t UpstreamPort = UpstreamCfg.Port;
-
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamPort);
- ZenServerInstance LocalInst(TestEnv);
- LocalCfg.Spawn(LocalInst);
-
const auto Bucket = "legacy"sv;
zen::IoHash Key;
- IoBuffer BinaryValue = GenerateData(1024, Key);
-
- HttpClient LocalHttp{LocalCfg.BaseUri};
- HttpClient RemoteHttp{UpstreamCfg.BaseUri};
+ IoBuffer BinaryValue = GenerateData(2048, Key);
- // Store binary cache value locally
{
HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,StoreLocal", Bucket, Key),
BinaryValue,
@@ -423,25 +353,14 @@ TEST_CASE("zcache.policy")
}
}
- SUBCASE("store - 'local/remote' stores local and upstream (binary)")
+ // store - 'local/remote' stores local and upstream (binary)
+ // Uses size 4096 for unique key
{
- ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
- ZenServerInstance UpstreamInst(TestEnv);
- UpstreamCfg.Spawn(UpstreamInst);
-
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
- ZenServerInstance LocalInst(TestEnv);
- LocalCfg.Spawn(LocalInst);
-
const auto Bucket = "legacy"sv;
zen::IoHash Key;
- IoBuffer BinaryValue = GenerateData(1024, Key);
+ IoBuffer BinaryValue = GenerateData(4096, Key);
- HttpClient LocalHttp{LocalCfg.BaseUri};
- HttpClient RemoteHttp{UpstreamCfg.BaseUri};
-
- // Store binary cache value locally and upstream
{
HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,Store", Bucket, Key),
BinaryValue,
@@ -460,27 +379,16 @@ TEST_CASE("zcache.policy")
}
}
- SUBCASE("query - 'local' does not query upstream (cbpackage)")
+ // query - 'local' does not query upstream (cbpackage)
+ // Uses bucket "policy4" to isolate from other cbpackage scenarios (deterministic key)
{
- ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
- ZenServerInstance UpstreamInst(TestEnv);
- UpstreamCfg.Spawn(UpstreamInst);
-
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
- ZenServerInstance LocalInst(TestEnv);
- LocalCfg.Spawn(LocalInst);
-
- const auto Bucket = "legacy"sv;
+ const auto Bucket = "policy4"sv;
zen::IoHash Key;
zen::IoHash PayloadId;
zen::CbPackage Package = GeneratePackage(Key, PayloadId);
IoBuffer Buf = SerializeToBuffer(Package);
- HttpClient LocalHttp{LocalCfg.BaseUri};
- HttpClient RemoteHttp{UpstreamCfg.BaseUri};
-
- // Store package upstream
{
HttpClient::Response Result = RemoteHttp.Put(fmt::format("/{}/{}", Bucket, Key), Buf);
CHECK(Result.StatusCode == HttpResponseCode::Created);
@@ -499,27 +407,16 @@ TEST_CASE("zcache.policy")
}
}
- SUBCASE("store - 'local' does not store upstream (cbpackage)")
+ // store - 'local' does not store upstream (cbpackage)
+ // Uses bucket "policy5" to isolate from other cbpackage scenarios (deterministic key)
{
- ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
- ZenServerInstance UpstreamInst(TestEnv);
- UpstreamCfg.Spawn(UpstreamInst);
-
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
- ZenServerInstance LocalInst(TestEnv);
- LocalCfg.Spawn(LocalInst);
-
- const auto Bucket = "legacy"sv;
+ const auto Bucket = "policy5"sv;
zen::IoHash Key;
zen::IoHash PayloadId;
zen::CbPackage Package = GeneratePackage(Key, PayloadId);
IoBuffer Buf = SerializeToBuffer(Package);
- HttpClient LocalHttp{LocalCfg.BaseUri};
- HttpClient RemoteHttp{UpstreamCfg.BaseUri};
-
- // Store package locally
{
HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,StoreLocal", Bucket, Key), Buf);
CHECK(Result.StatusCode == HttpResponseCode::Created);
@@ -536,27 +433,16 @@ TEST_CASE("zcache.policy")
}
}
- SUBCASE("store - 'local/remote' stores local and upstream (cbpackage)")
+ // store - 'local/remote' stores local and upstream (cbpackage)
+ // Uses bucket "policy6" to isolate from other cbpackage scenarios (deterministic key)
{
- ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
- ZenServerInstance UpstreamInst(TestEnv);
- UpstreamCfg.Spawn(UpstreamInst);
-
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
- ZenServerInstance LocalInst(TestEnv);
- LocalCfg.Spawn(LocalInst);
-
- const auto Bucket = "legacy"sv;
+ const auto Bucket = "policy6"sv;
zen::IoHash Key;
zen::IoHash PayloadId;
zen::CbPackage Package = GeneratePackage(Key, PayloadId);
IoBuffer Buf = SerializeToBuffer(Package);
- HttpClient LocalHttp{LocalCfg.BaseUri};
- HttpClient RemoteHttp{UpstreamCfg.BaseUri};
-
- // Store package locally and upstream
{
HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,Store", Bucket, Key), Buf);
CHECK(Result.StatusCode == HttpResponseCode::Created);
@@ -573,78 +459,62 @@ TEST_CASE("zcache.policy")
}
}
- SUBCASE("skip - 'data' returns cache record without attachments/empty payload")
+ // skip - 'data' returns cache record without attachments/empty payload
+ // Uses bucket "skiptest7" to isolate from other cbpackage scenarios
{
- ZenConfig Cfg = ZenConfig::New(TestEnv.GetNewPortNumber());
- ZenServerInstance Instance(TestEnv);
- Cfg.Spawn(Instance);
-
- const auto Bucket = "test"sv;
+ const auto Bucket = "skiptest7"sv;
zen::IoHash Key;
zen::IoHash PayloadId;
zen::CbPackage Package = GeneratePackage(Key, PayloadId);
IoBuffer Buf = SerializeToBuffer(Package);
- HttpClient Http{Cfg.BaseUri};
-
- // Store package
{
- HttpClient::Response Result = Http.Put(fmt::format("/{}/{}", Bucket, Key), Buf);
+ HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}", Bucket, Key), Buf);
CHECK(Result.StatusCode == HttpResponseCode::Created);
}
- // Get package
{
HttpClient::Response Result =
- Http.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}});
+ LocalHttp.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}});
CHECK(Result);
CbPackage ResponsePackage;
CHECK(ResponsePackage.TryLoad(Result.ResponsePayload));
CHECK(ResponsePackage.GetAttachments().size() == 0);
}
- // Get record
{
HttpClient::Response Result =
- Http.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/x-ue-cb"}});
+ LocalHttp.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/x-ue-cb"}});
CHECK(Result);
CbObject ResponseObject = zen::LoadCompactBinaryObject(Result.ResponsePayload);
CHECK(ResponseObject);
}
- // Get payload
{
- HttpClient::Response Result =
- Http.Get(fmt::format("/{}/{}/{}?Policy=Default,SkipData", Bucket, Key, PayloadId), {{"Accept", "application/x-ue-comp"}});
+ HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}/{}?Policy=Default,SkipData", Bucket, Key, PayloadId),
+ {{"Accept", "application/x-ue-comp"}});
CHECK(Result);
CHECK(Result.ResponsePayload.GetSize() == 0);
}
}
- SUBCASE("skip - 'data' returns empty binary value")
+ // skip - 'data' returns empty binary value
+ // Uses size 8192 for unique key (avoids collision with size 1024/2048/4096 above)
{
- ZenConfig Cfg = ZenConfig::New(TestEnv.GetNewPortNumber());
- ZenServerInstance Instance(TestEnv);
- Cfg.Spawn(Instance);
-
- const auto Bucket = "test"sv;
+ const auto Bucket = "skiptest8"sv;
zen::IoHash Key;
- IoBuffer BinaryValue = GenerateData(1024, Key);
-
- HttpClient Http{Cfg.BaseUri};
+ IoBuffer BinaryValue = GenerateData(8192, Key);
- // Store binary cache value
{
- HttpClient::Response Result = Http.Put(fmt::format("/{}/{}", Bucket, Key), BinaryValue);
+ HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}", Bucket, Key), BinaryValue);
CHECK(Result.StatusCode == HttpResponseCode::Created);
}
- // Get package
{
HttpClient::Response Result =
- Http.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/octet-stream"}});
+ LocalHttp.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/octet-stream"}});
CHECK(Result);
CHECK(Result.ResponsePayload.GetSize() == 0);
}
diff --git a/src/zenserver-test/compute-tests.cpp b/src/zenserver-test/compute-tests.cpp
index 835d72713..ce2db366a 100644
--- a/src/zenserver-test/compute-tests.cpp
+++ b/src/zenserver-test/compute-tests.cpp
@@ -357,6 +357,41 @@ PollForLsnInCompleted(HttpClient& Client, const std::string& CompletedUrl, int L
return false;
}
+static void
+WaitForActionRunning(zen::compute::ComputeServiceSession& Session, uint64_t TimeoutMs = 10'000)
+{
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < TimeoutMs)
+ {
+ if (Session.GetActionCounts().Running > 0)
+ {
+ return;
+ }
+ Sleep(50);
+ }
+ FAIL("Timed out waiting for action to reach Running state");
+}
+
+static void
+WaitForAnyActionRunningHttp(HttpClient& Client, uint64_t TimeoutMs = 10'000)
+{
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < TimeoutMs)
+ {
+ HttpClient::Response Resp = Client.Get("/jobs/running"sv);
+ if (Resp)
+ {
+ CbObject Obj = Resp.AsObject();
+ if (Obj["running"sv].AsArrayView().Num() > 0)
+ {
+ return;
+ }
+ }
+ Sleep(50);
+ }
+ FAIL("Timed out waiting for any action to reach Running state");
+}
+
static std::string
GetRot13Output(const CbPackage& ResultPackage)
{
@@ -906,10 +941,10 @@ TEST_CASE("function.queues.cancel_running")
REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission");
// Wait for the worker process to start executing before cancelling
- Sleep(1'000);
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ WaitForAnyActionRunningHttp(Client);
// Cancel the queue, which should interrupt the running Sleep job
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
HttpClient::Response CancelResp = Client.Delete(QueueUrl);
REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent,
fmt::format("Queue cancellation failed: status={}, body={}", CancelResp.StatusCode, CancelResp.ToText()));
@@ -961,10 +996,10 @@ TEST_CASE("function.queues.remote_cancel")
REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission");
// Wait for the worker process to start executing before cancelling
- Sleep(1'000);
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueToken);
+ WaitForAnyActionRunningHttp(Client);
// Cancel the queue via its OID token
- const std::string QueueUrl = fmt::format("/queues/{}", QueueToken);
HttpClient::Response CancelResp = Client.Delete(QueueUrl);
REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent,
fmt::format("Remote queue cancellation failed: status={}, body={}", CancelResp.StatusCode, CancelResp.ToText()));
@@ -1207,7 +1242,7 @@ TEST_CASE("function.priority")
// that exit with non-zero exit codes, including retry behaviour and
// final failure reporting.
-TEST_CASE("function.exit_code.failed_action")
+TEST_CASE("function.exit_code")
{
ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
Instance.SetDataDir(TestEnv.CreateNewTestDir());
@@ -1219,232 +1254,154 @@ TEST_CASE("function.exit_code.failed_action")
const IoHash WorkerId = RegisterWorker(Client, TestEnv);
- // Create a queue with max_retries=0 so the action fails immediately
- // without being rescheduled.
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 0;
-
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
-
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ auto CreateQueue = [&](int MaxRetries) -> std::pair<int, std::string> {
+ CbObjectWriter ConfigWriter;
+ ConfigWriter << "max_retries"sv << MaxRetries;
+ CbObjectWriter BodyWriter;
+ BodyWriter << "config"sv << ConfigWriter.Save();
+ HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
+ REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ return {QueueId, fmt::format("/queues/{}", QueueId)};
+ };
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ // Scenario 1: failed_action - immediate failure with max_retries=0
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(0);
- // Submit a Fail action with exit code 42
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(42));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(42));
+ REQUIRE_MESSAGE(SubmitResp,
+ fmt::format("Fail job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
- const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
- REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Fail job submission");
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Fail job submission");
- // Poll for the LSN to appear in the completed list
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
- fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
- Lsn,
- QueueId,
- Instance.GetLogOutput()));
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
- // Verify queue status reflects the failure
- HttpClient::Response StatusResp = Client.Get(QueueUrl);
- REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- CbObject QueueStatus = StatusResp.AsObject();
- CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
- // Verify action history records the failure
- const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
- HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
- REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
+ const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
+ HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
+ REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
- bool FoundInHistory = false;
- for (auto& Item : HistoryResp.AsObject()["history"sv])
- {
- if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ bool FoundInHistory = false;
+ for (auto& Item : HistoryResp.AsObject()["history"sv])
{
- CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
- FoundInHistory = true;
- break;
+ if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ {
+ CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
+ FoundInHistory = true;
+ break;
+ }
}
- }
- CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn));
-
- // GET /jobs/{lsn} for a failed action should return OK but with an empty result package
- const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn);
- HttpClient::Response ResultResp = Client.Get(ResultUrl);
- CHECK_EQ(ResultResp.StatusCode, HttpResponseCode::OK);
-}
-
-TEST_CASE("function.exit_code.auto_retry")
-{
- ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
- Instance.SetDataDir(TestEnv.CreateNewTestDir());
- const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
- REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+ CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn));
- const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
- HttpClient Client(ComputeBaseUri);
-
- const IoHash WorkerId = RegisterWorker(Client, TestEnv);
-
- // Create a queue with max_retries=2 so the action is retried twice before
- // being reported as failed (3 total attempts: initial + 2 retries).
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 2;
-
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
+ const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn);
+ HttpClient::Response ResultResp = Client.Get(ResultUrl);
+ CHECK_EQ(ResultResp.StatusCode, HttpResponseCode::OK);
+ }
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ // Scenario 2: auto_retry - retried twice before permanent failure
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(2);
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(1));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode));
- // Submit a Fail action — the worker process will exit with code 1 on
- // every attempt, eventually exhausting retries.
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(1));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode));
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
- const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
+ fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
- // Poll for the LSN to appear in the completed list — this only
- // happens after all retries are exhausted.
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
- fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}",
- Lsn,
- QueueId,
- Instance.GetLogOutput()));
+ const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
+ HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
+ REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
- // Verify the action history records the retry count
- const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
- HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
- REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
-
- for (auto& Item : HistoryResp.AsObject()["history"sv])
- {
- if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ for (auto& Item : HistoryResp.AsObject()["history"sv])
{
- CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
- CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 2);
- break;
+ if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ {
+ CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
+ CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 2);
+ break;
+ }
}
- }
- // Queue should show 1 failed, 0 completed
- HttpClient::Response StatusResp = Client.Get(QueueUrl);
- REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- CbObject QueueStatus = StatusResp.AsObject();
- CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
-}
-
-TEST_CASE("function.exit_code.reschedule_failed")
-{
- ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
- Instance.SetDataDir(TestEnv.CreateNewTestDir());
- const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
- REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
-
- const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
- HttpClient Client(ComputeBaseUri);
-
- const IoHash WorkerId = RegisterWorker(Client, TestEnv);
-
- // Create a queue with max_retries=1 so we have room for one manual reschedule
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 1;
-
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
-
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
-
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
-
- // Submit a Fail action — auto-retry will fire once, then it lands in results as Failed
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(7));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode));
-
- const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
-
- // Wait for the action to exhaust its auto-retry and land in completed
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
- fmt::format("LSN {} did not appear in queue completed list\nServer log:\n{}", Lsn, Instance.GetLogOutput()));
-
- // Try to manually reschedule — should fail because retry limit is reached
- const std::string RescheduleUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn);
- HttpClient::Response RescheduleResp = Client.Post(RescheduleUrl);
- CHECK_EQ(RescheduleResp.StatusCode, HttpResponseCode::Conflict);
-}
-
-TEST_CASE("function.exit_code.mixed_success_and_failure")
-{
- ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
- Instance.SetDataDir(TestEnv.CreateNewTestDir());
- const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
- REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+ }
- const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
- HttpClient Client(ComputeBaseUri);
+ // Scenario 3: reschedule_failed - manual reschedule rejected after retry limit
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(1);
- const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(7));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode));
- // Create a queue with max_retries=0 for fast failure
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 0;
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
+ fmt::format("LSN {} did not appear in queue completed list\nServer log:\n{}", Lsn, Instance.GetLogOutput()));
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ const std::string RescheduleUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn);
+ HttpClient::Response RescheduleResp = Client.Post(RescheduleUrl);
+ CHECK_EQ(RescheduleResp.StatusCode, HttpResponseCode::Conflict);
+ }
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ // Scenario 4: mixed_success_and_failure - one success and one failure in the same queue
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(0);
- // Submit one Rot13 (success) and one Fail (failure)
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SuccessResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello"sv));
- REQUIRE_MESSAGE(SuccessResp, "Rot13 job submission failed");
- const int LsnSuccess = SuccessResp.AsObject()["lsn"sv].AsInt32();
+ HttpClient::Response SuccessResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello"sv));
+ REQUIRE_MESSAGE(SuccessResp, "Rot13 job submission failed");
+ const int LsnSuccess = SuccessResp.AsObject()["lsn"sv].AsInt32();
- HttpClient::Response FailResp = Client.Post(JobUrl, BuildFailActionPackage(1));
- REQUIRE_MESSAGE(FailResp, "Fail job submission failed");
- const int LsnFail = FailResp.AsObject()["lsn"sv].AsInt32();
+ HttpClient::Response FailResp = Client.Post(JobUrl, BuildFailActionPackage(1));
+ REQUIRE_MESSAGE(FailResp, "Fail job submission failed");
+ const int LsnFail = FailResp.AsObject()["lsn"sv].AsInt32();
- // Wait for both to appear in the completed list
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnSuccess),
- fmt::format("Success LSN {} did not complete\nServer log:\n{}", LsnSuccess, Instance.GetLogOutput()));
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnFail),
- fmt::format("Fail LSN {} did not complete\nServer log:\n{}", LsnFail, Instance.GetLogOutput()));
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnSuccess),
+ fmt::format("Success LSN {} did not complete\nServer log:\n{}", LsnSuccess, Instance.GetLogOutput()));
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnFail),
+ fmt::format("Fail LSN {} did not complete\nServer log:\n{}", LsnFail, Instance.GetLogOutput()));
- // Verify queue counters
- HttpClient::Response StatusResp = Client.Get(QueueUrl);
- REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- CbObject QueueStatus = StatusResp.AsObject();
- CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0);
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0);
+ }
}
-TEST_CASE("function.crash.abort")
+TEST_CASE("function.crash")
{
ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
Instance.SetDataDir(TestEnv.CreateNewTestDir());
@@ -1456,174 +1413,125 @@ TEST_CASE("function.crash.abort")
const IoHash WorkerId = RegisterWorker(Client, TestEnv);
- // Create a queue with max_retries=0 so we don't wait through retries
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 0;
+ auto CreateQueue = [&](int MaxRetries) -> std::pair<int, std::string> {
+ CbObjectWriter ConfigWriter;
+ ConfigWriter << "max_retries"sv << MaxRetries;
+ CbObjectWriter BodyWriter;
+ BodyWriter << "config"sv << ConfigWriter.Save();
+ HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
+ REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ return {QueueId, fmt::format("/queues/{}", QueueId)};
+ };
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
+ // Scenario 1: abort - worker process calls std::abort(), no retries
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(0);
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv));
+ REQUIRE_MESSAGE(SubmitResp,
+ fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission");
- // Submit a Crash action that calls std::abort()
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
- const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
- REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission");
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- // Poll for the LSN to appear in the completed list
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
- fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
- Lsn,
- QueueId,
- Instance.GetLogOutput()));
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
- // Verify queue status reflects the failure
- HttpClient::Response StatusResp = Client.Get(QueueUrl);
- REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
-
- CbObject QueueStatus = StatusResp.AsObject();
- CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
-
- // Verify action history records the failure
- const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
- HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
- REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
+ const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
+ HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
+ REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
- bool FoundInHistory = false;
- for (auto& Item : HistoryResp.AsObject()["history"sv])
- {
- if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ bool FoundInHistory = false;
+ for (auto& Item : HistoryResp.AsObject()["history"sv])
{
- CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
- FoundInHistory = true;
- break;
+ if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ {
+ CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
+ FoundInHistory = true;
+ break;
+ }
}
+ CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn));
}
- CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn));
-}
-
-TEST_CASE("function.crash.nullptr")
-{
- ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
- Instance.SetDataDir(TestEnv.CreateNewTestDir());
- const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
- REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
-
- const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
- HttpClient Client(ComputeBaseUri);
-
- const IoHash WorkerId = RegisterWorker(Client, TestEnv);
-
- // Create a queue with max_retries=0
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 0;
-
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
-
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
-
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
-
- // Submit a Crash action that dereferences a null pointer
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("nullptr"sv));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
-
- const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
- REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission");
-
- // Poll for the LSN to appear in the completed list
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
- fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
- Lsn,
- QueueId,
- Instance.GetLogOutput()));
-
- // Verify queue status reflects the failure
- HttpClient::Response StatusResp = Client.Get(QueueUrl);
- REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- CbObject QueueStatus = StatusResp.AsObject();
- CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
-}
-
-TEST_CASE("function.crash.auto_retry")
-{
- ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
- Instance.SetDataDir(TestEnv.CreateNewTestDir());
- const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
- REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+ // Scenario 2: nullptr - worker process dereferences null, no retries
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(0);
- const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
- HttpClient Client(ComputeBaseUri);
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("nullptr"sv));
+ REQUIRE_MESSAGE(SubmitResp,
+ fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
- const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission");
- // Create a queue with max_retries=1 — the crash should be retried once
- // before being reported as permanently failed.
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 1;
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+ }
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ // Scenario 3: auto_retry - crash retried once before permanent failure
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(1);
- // Submit a Crash action — will crash on every attempt
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}", SubmitResp.StatusCode));
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}", SubmitResp.StatusCode));
- const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
- // Poll for the LSN to appear in the completed list after retries exhaust
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
- fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}",
- Lsn,
- QueueId,
- Instance.GetLogOutput()));
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
+ fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
- // Verify the action history records the retry count
- const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
- HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
- REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
+ const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
+ HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
+ REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
- for (auto& Item : HistoryResp.AsObject()["history"sv])
- {
- if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ for (auto& Item : HistoryResp.AsObject()["history"sv])
{
- CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
- CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 1);
- break;
+ if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ {
+ CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
+ CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 1);
+ break;
+ }
}
- }
- // Queue should show 1 failed, 0 completed
- HttpClient::Response StatusResp = Client.Get(QueueUrl);
- REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- CbObject QueueStatus = StatusResp.AsObject();
- CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+ }
}
//////////////////////////////////////////////////////////////////////////
@@ -1662,7 +1570,6 @@ TEST_CASE("function.remote.worker_sync_on_discovery")
// Trigger immediate orchestrator re-query and wait for runner setup
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Submit Rot13 action via session
CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver);
@@ -1721,7 +1628,6 @@ TEST_CASE("function.remote.late_runner_discovery")
// Wait for W1 discovery
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Baseline: submit Rot13 action and verify it completes on W1
{
@@ -1763,23 +1669,33 @@ TEST_CASE("function.remote.late_runner_discovery")
// Wait for W2 discovery
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
- // Verify W2 received the worker by querying its /compute/workers endpoint directly
+ // Poll W2 until the worker has been synced (SyncWorkersToRunner is async)
{
- const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port2);
- HttpClient Client(ComputeBaseUri);
- HttpClient::Response ListResp = Client.Get("/workers"sv);
- REQUIRE_MESSAGE(ListResp, "Failed to list workers on W2");
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port2);
+ HttpClient Client(ComputeBaseUri);
+ bool WorkerFound = false;
+ Stopwatch Timer;
- bool WorkerFound = false;
- for (auto& Item : ListResp.AsObject()["workers"sv])
+ while (Timer.GetElapsedTimeMs() < 10'000)
{
- if (Item.AsHash() == WorkerPackage.GetObjectHash())
+ HttpClient::Response ListResp = Client.Get("/workers"sv);
+ if (ListResp)
+ {
+ for (auto& Item : ListResp.AsObject()["workers"sv])
+ {
+ if (Item.AsHash() == WorkerPackage.GetObjectHash())
+ {
+ WorkerFound = true;
+ break;
+ }
+ }
+ }
+ if (WorkerFound)
{
- WorkerFound = true;
break;
}
+ Sleep(50);
}
REQUIRE_MESSAGE(WorkerFound,
@@ -1844,7 +1760,6 @@ TEST_CASE("function.remote.queue_association")
// Wait for scheduler to discover the runner
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Create a local queue and submit action to it
auto QueueResult = Session.CreateQueue();
@@ -1922,7 +1837,6 @@ TEST_CASE("function.remote.queue_cancel_propagation")
// Wait for scheduler to discover the runner
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Create a local queue and submit a long-running Sleep action
auto QueueResult = Session.CreateQueue();
@@ -1935,7 +1849,7 @@ TEST_CASE("function.remote.queue_cancel_propagation")
REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed");
// Wait for the action to start running on the remote
- Sleep(2'000);
+ WaitForActionRunning(Session);
// Cancel the local queue — this should propagate to the remote
Session.CancelQueue(QueueId);
@@ -2008,7 +1922,7 @@ TEST_CASE("function.abandon_running_http")
REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN");
// Wait for the process to start running
- Sleep(1'000);
+ WaitForAnyActionRunningHttp(Client);
// Verify the ready endpoint returns OK before abandon
{
@@ -2139,7 +2053,6 @@ TEST_CASE("function.session.abandon_running")
// Wait for scheduler to discover the runner
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Create a queue and submit a long-running Sleep action
auto QueueResult = Session.CreateQueue();
@@ -2152,7 +2065,7 @@ TEST_CASE("function.session.abandon_running")
REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed");
// Wait for the action to start running on the remote
- Sleep(2'000);
+ WaitForActionRunning(Session);
// Transition to Abandoned — should abandon the running action
bool Transitioned = Session.Abandon();
@@ -2210,7 +2123,6 @@ TEST_CASE("function.remote.abandon_propagation")
// Wait for scheduler to discover the runner
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Create a local queue and submit a long-running Sleep action
auto QueueResult = Session.CreateQueue();
@@ -2223,7 +2135,7 @@ TEST_CASE("function.remote.abandon_propagation")
REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed");
// Wait for the action to start running on the remote
- Sleep(2'000);
+ WaitForActionRunning(Session);
// Transition to Abandoned — should abandon the running action and propagate
bool Transitioned = Session.Abandon();
@@ -2283,7 +2195,6 @@ TEST_CASE("function.remote.shutdown_cancels_queues")
Session.RegisterWorker(WorkerPackage);
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Create a queue and submit a long-running action so the remote queue is established
auto QueueResult = Session.CreateQueue();
@@ -2296,7 +2207,7 @@ TEST_CASE("function.remote.shutdown_cancels_queues")
REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed");
// Wait for the action to start running on the remote
- Sleep(2'000);
+ WaitForActionRunning(Session);
// Verify the remote has a non-implicit queue before shutdown
HttpClient RemoteClient(Instance.GetBaseUri() + "/compute");
@@ -2358,7 +2269,6 @@ TEST_CASE("function.remote.shutdown_rejects_new_work")
// Wait for runner discovery
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Baseline: submit an action and verify it completes
{
@@ -2415,7 +2325,7 @@ TEST_CASE("function.session.retract_pending")
REQUIRE_MESSAGE(Enqueued, "Failed to enqueue action");
// Let the scheduler process the pending action
- Sleep(500);
+ Sleep(50);
// Retract the pending action
auto Result = Session.RetractAction(Enqueued.Lsn);
@@ -2424,7 +2334,7 @@ TEST_CASE("function.session.retract_pending")
// The action should be re-enqueued as pending (still no runners, so stays pending).
// Let the scheduler process the retracted action back to pending.
- Sleep(500);
+ Sleep(50);
// Queue should still show 1 active (the action was rescheduled, not completed)
auto Status = Session.GetQueueStatus(QueueResult.QueueId);
@@ -2509,8 +2419,8 @@ TEST_CASE("function.retract_http")
const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from job submission");
- // Wait for the scheduler to process the pending action into m_PendingActions
- Sleep(1'000);
+ // Wait for the blocker action to start running (occupying the single slot)
+ WaitForAnyActionRunningHttp(Client);
// Retract the pending action via POST /jobs/{lsn}/retract
const std::string RetractUrl = fmt::format("/jobs/{}/retract", Lsn);
@@ -2529,7 +2439,7 @@ TEST_CASE("function.retract_http")
}
// A second retract should also succeed (action is back to pending)
- Sleep(500);
+ Sleep(50);
HttpClient::Response RetractResp2 = Client.Post(RetractUrl);
CHECK_MESSAGE(RetractResp2.StatusCode == HttpResponseCode::OK,
fmt::format("Second retract failed: status={}, body={}", RetractResp2.StatusCode, RetractResp2.ToText()));
diff --git a/src/zenserver-test/hub-tests.cpp b/src/zenserver-test/hub-tests.cpp
index 487e22b4b..35a840e5d 100644
--- a/src/zenserver-test/hub-tests.cpp
+++ b/src/zenserver-test/hub-tests.cpp
@@ -329,17 +329,36 @@ TEST_CASE("hub.lifecycle.children")
CHECK_EQ(Result.AsText(), "GhijklmNop"sv);
}
- Result = Client.Post("modules/abc/deprovision");
+ // Deprovision all modules at once
+ Result = Client.Post("deprovision");
REQUIRE(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted);
+ {
+ CbObject Body = Result.AsObject();
+ CbArrayView AcceptedArr = Body["Accepted"].AsArrayView();
+ CHECK_EQ(AcceptedArr.Num(), 2u);
+ bool FoundAbc = false;
+ bool FoundDef = false;
+ for (CbFieldView F : AcceptedArr)
+ {
+ if (F.AsString() == "abc"sv)
+ {
+ FoundAbc = true;
+ }
+ else if (F.AsString() == "def"sv)
+ {
+ FoundDef = true;
+ }
+ }
+ CHECK(FoundAbc);
+ CHECK(FoundDef);
+ }
REQUIRE(WaitForModuleGone(Client, "abc"));
+ REQUIRE(WaitForModuleGone(Client, "def"));
{
HttpClient ModClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout);
CHECK(WaitForPortUnreachable(ModClient));
}
-
- Result = Client.Post("modules/def/deprovision");
- REQUIRE(Result);
- REQUIRE(WaitForModuleGone(Client, "def"));
{
HttpClient ModClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout);
CHECK(WaitForPortUnreachable(ModClient));
@@ -349,6 +368,10 @@ TEST_CASE("hub.lifecycle.children")
Result = Client.Get("status");
REQUIRE(Result);
CHECK_EQ(Result.AsObject()["modules"].AsArrayView().Num(), 0u);
+
+ // Deprovision-all with no modules
+ Result = Client.Post("deprovision");
+ CHECK(Result);
}
static bool
diff --git a/src/zenserver-test/logging-tests.cpp b/src/zenserver-test/logging-tests.cpp
index 2e530ff92..549d2b322 100644
--- a/src/zenserver-test/logging-tests.cpp
+++ b/src/zenserver-test/logging-tests.cpp
@@ -146,28 +146,6 @@ TEST_CASE("logging.file.json")
CHECK_MESSAGE(LogContains(FileLog, "\"message\""), FileLog);
CHECK_MESSAGE(LogContains(FileLog, "\"source\": \"zenserver\""), FileLog);
CHECK_MESSAGE(LogContains(FileLog, "server session id"), FileLog);
-}
-
-// --log-id <id> is automatically set to the server instance name in test mode.
-// The JSON formatter emits this value as the "id" field, so every entry in a
-// .json log file must carry a non-empty "id".
-TEST_CASE("logging.log_id")
-{
- const std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
- const std::filesystem::path LogFile = TestDir / "test.json";
-
- ZenServerInstance Instance(TestEnv);
- Instance.SetDataDir(TestDir);
-
- const std::string LogArg = fmt::format("--abslog {}", LogFile.string());
- const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(LogArg);
- CHECK_MESSAGE(Port != 0, Instance.GetLogOutput());
-
- Instance.Shutdown();
-
- CHECK_MESSAGE(std::filesystem::exists(LogFile), "JSON log file was not created");
- const std::string FileLog = ReadFileToString(LogFile);
- // The JSON formatter writes the log-id as: "id": "<value>",
CHECK_MESSAGE(LogContains(FileLog, "\"id\": \""), FileLog);
}
diff --git a/src/zenserver-test/objectstore-tests.cpp b/src/zenserver-test/objectstore-tests.cpp
index ff2314089..99c92e15f 100644
--- a/src/zenserver-test/objectstore-tests.cpp
+++ b/src/zenserver-test/objectstore-tests.cpp
@@ -19,18 +19,22 @@ using namespace std::literals;
TEST_SUITE_BEGIN("server.objectstore");
-TEST_CASE("objectstore.blobs")
+TEST_CASE("objectstore")
{
- std::string_view Bucket = "bkt"sv;
+ ZenServerInstance Instance(TestEnv);
+
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady("--objectstore-enabled");
+ CHECK(Port != 0);
- std::vector<IoHash> CompressedBlobsHashes;
- std::vector<uint64_t> BlobsSizes;
- std::vector<uint64_t> CompressedBlobsSizes;
+ // --- objectstore.blobs ---
{
- ZenServerInstance Instance(TestEnv);
+ INFO("objectstore.blobs");
- const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady(fmt::format("--objectstore-enabled"));
- CHECK(PortNumber != 0);
+ std::string_view Bucket = "bkt"sv;
+
+ std::vector<IoHash> CompressedBlobsHashes;
+ std::vector<uint64_t> BlobsSizes;
+ std::vector<uint64_t> CompressedBlobsSizes;
HttpClient Client(Instance.GetBaseUri() + "/obj/");
@@ -68,97 +72,192 @@ TEST_CASE("objectstore.blobs")
CHECK_EQ(RawSize, BlobsSizes[I]);
}
}
-}
-TEST_CASE("objectstore.s3client")
-{
- ZenServerInstance Instance(TestEnv);
- const uint16_t Port = Instance.SpawnServerAndWaitUntilReady("--objectstore-enabled");
- CHECK_MESSAGE(Port != 0, Instance.GetLogOutput());
-
- // S3Client in path-style builds paths as /{bucket}/{key}.
- // The objectstore routes objects at bucket/{bucket}/{key} relative to its base.
- // Point the S3Client endpoint at {server}/obj/bucket so the paths line up.
- S3ClientOptions Opts;
- Opts.BucketName = "s3test";
- Opts.Region = "us-east-1";
- Opts.Endpoint = fmt::format("http://localhost:{}/obj/bucket", Port);
- Opts.PathStyle = true;
- Opts.Credentials.AccessKeyId = "testkey";
- Opts.Credentials.SecretAccessKey = "testsecret";
-
- S3Client Client(Opts);
-
- // -- PUT + GET roundtrip --
- std::string_view TestData = "hello from s3client via objectstore"sv;
- IoBuffer Content = IoBufferBuilder::MakeFromMemory(MakeMemoryView(TestData));
- S3Result PutRes = Client.PutObject("test/hello.txt", std::move(Content));
- REQUIRE_MESSAGE(PutRes.IsSuccess(), PutRes.Error);
-
- S3GetObjectResult GetRes = Client.GetObject("test/hello.txt");
- REQUIRE_MESSAGE(GetRes.IsSuccess(), GetRes.Error);
- CHECK(GetRes.AsText() == TestData);
-
- // -- PUT overwrites --
- IoBuffer Original = IoBufferBuilder::MakeFromMemory(MakeMemoryView("original"sv));
- IoBuffer Overwrite = IoBufferBuilder::MakeFromMemory(MakeMemoryView("overwritten"sv));
- REQUIRE(Client.PutObject("overwrite/file.txt", std::move(Original)).IsSuccess());
- REQUIRE(Client.PutObject("overwrite/file.txt", std::move(Overwrite)).IsSuccess());
-
- S3GetObjectResult OverwriteGet = Client.GetObject("overwrite/file.txt");
- REQUIRE(OverwriteGet.IsSuccess());
- CHECK(OverwriteGet.AsText() == "overwritten"sv);
-
- // -- GET not found --
- S3GetObjectResult NotFoundGet = Client.GetObject("nonexistent/file.dat");
- CHECK_FALSE(NotFoundGet.IsSuccess());
-
- // -- HEAD found --
- std::string_view HeadData = "head test data"sv;
- IoBuffer HeadContent = IoBufferBuilder::MakeFromMemory(MakeMemoryView(HeadData));
- REQUIRE(Client.PutObject("head/meta.txt", std::move(HeadContent)).IsSuccess());
-
- S3HeadObjectResult HeadRes = Client.HeadObject("head/meta.txt");
- REQUIRE_MESSAGE(HeadRes.IsSuccess(), HeadRes.Error);
- CHECK(HeadRes.Status == HeadObjectResult::Found);
- CHECK(HeadRes.Info.Size == HeadData.size());
-
- // -- HEAD not found --
- S3HeadObjectResult HeadNotFound = Client.HeadObject("nonexistent/file.dat");
- CHECK(HeadNotFound.IsSuccess());
- CHECK(HeadNotFound.Status == HeadObjectResult::NotFound);
-
- // -- LIST objects --
- for (int i = 0; i < 3; ++i)
+ // --- objectstore.s3client ---
{
- std::string Key = fmt::format("listing/item-{}.txt", i);
- std::string Payload = fmt::format("content-{}", i);
- IoBuffer Buf = IoBufferBuilder::MakeFromMemory(MakeMemoryView(Payload));
- REQUIRE(Client.PutObject(Key, std::move(Buf)).IsSuccess());
- }
+ INFO("objectstore.s3client");
+
+ S3ClientOptions Opts;
+ Opts.BucketName = "s3test";
+ Opts.Region = "us-east-1";
+ Opts.Endpoint = fmt::format("http://localhost:{}/obj/bucket", Port);
+ Opts.PathStyle = true;
+ Opts.Credentials.AccessKeyId = "testkey";
+ Opts.Credentials.SecretAccessKey = "testsecret";
+
+ S3Client Client(Opts);
+
+ // -- PUT + GET roundtrip --
+ std::string_view TestData = "hello from s3client via objectstore"sv;
+ IoBuffer Content = IoBufferBuilder::MakeFromMemory(MakeMemoryView(TestData));
+ S3Result PutRes = Client.PutObject("test/hello.txt", std::move(Content));
+ REQUIRE_MESSAGE(PutRes.IsSuccess(), PutRes.Error);
+
+ S3GetObjectResult GetRes = Client.GetObject("test/hello.txt");
+ REQUIRE_MESSAGE(GetRes.IsSuccess(), GetRes.Error);
+ CHECK(GetRes.AsText() == TestData);
+
+ // -- PUT overwrites --
+ IoBuffer Original = IoBufferBuilder::MakeFromMemory(MakeMemoryView("original"sv));
+ IoBuffer Overwrite = IoBufferBuilder::MakeFromMemory(MakeMemoryView("overwritten"sv));
+ REQUIRE(Client.PutObject("overwrite/file.txt", std::move(Original)).IsSuccess());
+ REQUIRE(Client.PutObject("overwrite/file.txt", std::move(Overwrite)).IsSuccess());
+
+ S3GetObjectResult OverwriteGet = Client.GetObject("overwrite/file.txt");
+ REQUIRE(OverwriteGet.IsSuccess());
+ CHECK(OverwriteGet.AsText() == "overwritten"sv);
+
+ // -- GET not found --
+ S3GetObjectResult NotFoundGet = Client.GetObject("nonexistent/file.dat");
+ CHECK_FALSE(NotFoundGet.IsSuccess());
+
+ // -- HEAD found --
+ std::string_view HeadData = "head test data"sv;
+ IoBuffer HeadContent = IoBufferBuilder::MakeFromMemory(MakeMemoryView(HeadData));
+ REQUIRE(Client.PutObject("head/meta.txt", std::move(HeadContent)).IsSuccess());
+
+ S3HeadObjectResult HeadRes = Client.HeadObject("head/meta.txt");
+ REQUIRE_MESSAGE(HeadRes.IsSuccess(), HeadRes.Error);
+ CHECK(HeadRes.Status == HeadObjectResult::Found);
+ CHECK(HeadRes.Info.Size == HeadData.size());
+
+ // -- HEAD not found --
+ S3HeadObjectResult HeadNotFound = Client.HeadObject("nonexistent/file.dat");
+ CHECK(HeadNotFound.IsSuccess());
+ CHECK(HeadNotFound.Status == HeadObjectResult::NotFound);
+
+ // -- LIST objects --
+ for (int i = 0; i < 3; ++i)
+ {
+ std::string Key = fmt::format("listing/item-{}.txt", i);
+ std::string Payload = fmt::format("content-{}", i);
+ IoBuffer Buf = IoBufferBuilder::MakeFromMemory(MakeMemoryView(Payload));
+ REQUIRE(Client.PutObject(Key, std::move(Buf)).IsSuccess());
+ }
- S3ListObjectsResult ListRes = Client.ListObjects("listing/");
- REQUIRE_MESSAGE(ListRes.IsSuccess(), ListRes.Error);
- REQUIRE(ListRes.Objects.size() == 3);
+ S3ListObjectsResult ListRes = Client.ListObjects("listing/");
+ REQUIRE_MESSAGE(ListRes.IsSuccess(), ListRes.Error);
+ REQUIRE(ListRes.Objects.size() == 3);
+
+ std::vector<std::string> Keys;
+ for (const S3ObjectInfo& Obj : ListRes.Objects)
+ {
+ Keys.push_back(Obj.Key);
+ CHECK(Obj.Size > 0);
+ }
+ std::sort(Keys.begin(), Keys.end());
+ CHECK(Keys[0] == "listing/item-0.txt");
+ CHECK(Keys[1] == "listing/item-1.txt");
+ CHECK(Keys[2] == "listing/item-2.txt");
+
+ // -- LIST empty prefix --
+ S3ListObjectsResult EmptyList = Client.ListObjects("no-such-prefix/");
+ REQUIRE(EmptyList.IsSuccess());
+ CHECK(EmptyList.Objects.empty());
+ }
- std::vector<std::string> Keys;
- for (const S3ObjectInfo& Obj : ListRes.Objects)
+ // --- objectstore.range-requests ---
{
- Keys.push_back(Obj.Key);
- CHECK(Obj.Size > 0);
+ INFO("objectstore.range-requests");
+
+ HttpClient Client(Instance.GetBaseUri() + "/obj/");
+
+ IoBuffer Blob = CreateRandomBlob(1024);
+ MemoryView BlobView = Blob.GetView();
+ std::string ObjectPath = "bucket/bkt/range-test/data.bin";
+
+ HttpClient::Response PutResult = Client.Put(ObjectPath, IoBuffer(Blob));
+ REQUIRE(PutResult);
+
+ // Full GET without Range header
+ {
+ HttpClient::Response Result = Client.Get(ObjectPath);
+ CHECK(Result.StatusCode == HttpResponseCode::OK);
+ CHECK_EQ(Result.ResponsePayload.GetSize(), 1024u);
+ CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView));
+ }
+
+ // Single range: bytes 100-199
+ {
+ HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=100-199"}});
+ CHECK(Result.StatusCode == HttpResponseCode::PartialContent);
+ CHECK_EQ(Result.ResponsePayload.GetSize(), 100u);
+ CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView.Mid(100, 100)));
+ }
+
+ // Range starting at zero: bytes 0-49
+ {
+ HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=0-49"}});
+ CHECK(Result.StatusCode == HttpResponseCode::PartialContent);
+ CHECK_EQ(Result.ResponsePayload.GetSize(), 50u);
+ CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView.Mid(0, 50)));
+ }
+
+ // Range at end of file: bytes 1000-1023
+ {
+ HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=1000-1023"}});
+ CHECK(Result.StatusCode == HttpResponseCode::PartialContent);
+ CHECK_EQ(Result.ResponsePayload.GetSize(), 24u);
+ CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView.Mid(1000, 24)));
+ }
+
+ // Multiple ranges: bytes 0-49 and 100-149
+ {
+ HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=0-49,100-149"}});
+ CHECK(Result.StatusCode == HttpResponseCode::PartialContent);
+
+ std::string_view Body(reinterpret_cast<const char*>(Result.ResponsePayload.GetData()), Result.ResponsePayload.GetSize());
+
+ // Verify multipart structure contains both range payloads
+ CHECK(Body.find("Content-Range: bytes 0-49/1024") != std::string_view::npos);
+ CHECK(Body.find("Content-Range: bytes 100-149/1024") != std::string_view::npos);
+
+ // Extract and verify actual data for first range
+ auto FindPartData = [&](std::string_view ContentRange) -> std::string_view {
+ size_t Pos = Body.find(ContentRange);
+ if (Pos == std::string_view::npos)
+ {
+ return {};
+ }
+ // Skip past the Content-Range line and the blank line separator
+ Pos = Body.find("\r\n\r\n", Pos);
+ if (Pos == std::string_view::npos)
+ {
+ return {};
+ }
+ Pos += 4;
+ size_t End = Body.find("\r\n--", Pos);
+ if (End == std::string_view::npos)
+ {
+ return {};
+ }
+ return Body.substr(Pos, End - Pos);
+ };
+
+ std::string_view Part1 = FindPartData("Content-Range: bytes 0-49/1024");
+ CHECK_EQ(Part1.size(), 50u);
+ CHECK(MemoryView(Part1.data(), Part1.size()).EqualBytes(BlobView.Mid(0, 50)));
+
+ std::string_view Part2 = FindPartData("Content-Range: bytes 100-149/1024");
+ CHECK_EQ(Part2.size(), 50u);
+ CHECK(MemoryView(Part2.data(), Part2.size()).EqualBytes(BlobView.Mid(100, 50)));
+ }
+
+ // Out-of-bounds single range
+ {
+ HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=2000-2099"}});
+ CHECK(Result.StatusCode == HttpResponseCode::RangeNotSatisfiable);
+ }
+
+ // Out-of-bounds multi-range
+ {
+ HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=0-49,2000-2099"}});
+ CHECK(Result.StatusCode == HttpResponseCode::RangeNotSatisfiable);
+ }
}
- std::sort(Keys.begin(), Keys.end());
- CHECK(Keys[0] == "listing/item-0.txt");
- CHECK(Keys[1] == "listing/item-1.txt");
- CHECK(Keys[2] == "listing/item-2.txt");
-
- // -- LIST empty prefix --
- S3ListObjectsResult EmptyList = Client.ListObjects("no-such-prefix/");
- REQUIRE(EmptyList.IsSuccess());
- CHECK(EmptyList.Objects.empty());
}
-TEST_CASE("objectstore.range-requests")
+TEST_CASE("objectstore.range-requests-download")
{
ZenServerInstance Instance(TestEnv);
const uint16_t Port = Instance.SpawnServerAndWaitUntilReady("--objectstore-enabled");
@@ -168,55 +267,42 @@ TEST_CASE("objectstore.range-requests")
IoBuffer Blob = CreateRandomBlob(1024);
MemoryView BlobView = Blob.GetView();
- std::string ObjectPath = "bucket/bkt/range-test/data.bin";
+ std::string ObjectPath = "bucket/bkt/range-download-test/data.bin";
HttpClient::Response PutResult = Client.Put(ObjectPath, IoBuffer(Blob));
REQUIRE(PutResult);
- // Full GET without Range header
- {
- HttpClient::Response Result = Client.Get(ObjectPath);
- CHECK(Result.StatusCode == HttpResponseCode::OK);
- CHECK_EQ(Result.ResponsePayload.GetSize(), 1024u);
- CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView));
- }
-
- // Single range: bytes 100-199
- {
- HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=100-199"}});
- CHECK(Result.StatusCode == HttpResponseCode::PartialContent);
- CHECK_EQ(Result.ResponsePayload.GetSize(), 100u);
- CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView.Mid(100, 100)));
- }
+ ScopedTemporaryDirectory DownloadDir;
- // Range starting at zero: bytes 0-49
+ // Single range via Download: verify Ranges is populated and GetRanges maps correctly
{
- HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=0-49"}});
+ HttpClient::Response Result = Client.Download(ObjectPath, DownloadDir.Path(), {{"Range", "bytes=100-199"}});
CHECK(Result.StatusCode == HttpResponseCode::PartialContent);
- CHECK_EQ(Result.ResponsePayload.GetSize(), 50u);
- CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView.Mid(0, 50)));
+ REQUIRE_EQ(Result.Ranges.size(), 1u);
+ CHECK_EQ(Result.Ranges[0].RangeOffset, 100u);
+ CHECK_EQ(Result.Ranges[0].RangeLength, 100u);
+
+ std::vector<std::pair<uint64_t, uint64_t>> RequestedRanges = {{100, 100}};
+ std::vector<std::pair<uint64_t, uint64_t>> PayloadRanges = Result.GetRanges(RequestedRanges);
+ REQUIRE_EQ(PayloadRanges.size(), 1u);
+ CHECK(Result.ResponsePayload.GetView().Mid(PayloadRanges[0].first, PayloadRanges[0].second).EqualBytes(BlobView.Mid(100, 100)));
}
- // Range at end of file: bytes 1000-1023
+ // Multi-range via Download: verify Ranges is populated for both parts and GetRanges maps correctly
{
- HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=1000-1023"}});
+ HttpClient::Response Result = Client.Download(ObjectPath, DownloadDir.Path(), {{"Range", "bytes=0-49,100-149"}});
CHECK(Result.StatusCode == HttpResponseCode::PartialContent);
- CHECK_EQ(Result.ResponsePayload.GetSize(), 24u);
- CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView.Mid(1000, 24)));
- }
-
- // Multiple ranges: not supported, falls back to 200 with full body per RFC 7233
- {
- HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=0-49,100-149"}});
- CHECK(Result.StatusCode == HttpResponseCode::OK);
- CHECK_EQ(Result.ResponsePayload.GetSize(), 1024u);
- CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView));
- }
-
- // Out-of-bounds range: should return 400
- {
- HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=2000-2099"}});
- CHECK(Result.StatusCode == HttpResponseCode::BadRequest);
+ REQUIRE_EQ(Result.Ranges.size(), 2u);
+ CHECK_EQ(Result.Ranges[0].RangeOffset, 0u);
+ CHECK_EQ(Result.Ranges[0].RangeLength, 50u);
+ CHECK_EQ(Result.Ranges[1].RangeOffset, 100u);
+ CHECK_EQ(Result.Ranges[1].RangeLength, 50u);
+
+ std::vector<std::pair<uint64_t, uint64_t>> RequestedRanges = {{0, 50}, {100, 50}};
+ std::vector<std::pair<uint64_t, uint64_t>> PayloadRanges = Result.GetRanges(RequestedRanges);
+ REQUIRE_EQ(PayloadRanges.size(), 2u);
+ CHECK(Result.ResponsePayload.GetView().Mid(PayloadRanges[0].first, PayloadRanges[0].second).EqualBytes(BlobView.Mid(0, 50)));
+ CHECK(Result.ResponsePayload.GetView().Mid(PayloadRanges[1].first, PayloadRanges[1].second).EqualBytes(BlobView.Mid(100, 50)));
}
}
diff --git a/src/zenserver-test/projectstore-tests.cpp b/src/zenserver-test/projectstore-tests.cpp
index cec453511..49d985abb 100644
--- a/src/zenserver-test/projectstore-tests.cpp
+++ b/src/zenserver-test/projectstore-tests.cpp
@@ -41,423 +41,430 @@ TEST_CASE("project.basic")
const uint16_t PortNumber = Instance1.SpawnServerAndWaitUntilReady();
- std::mt19937_64 mt;
-
- zen::StringBuilder<64> BaseUri;
- BaseUri << fmt::format("http://localhost:{}", PortNumber);
+ std::string ServerUri = fmt::format("http://localhost:{}", PortNumber);
std::filesystem::path BinPath = zen::GetRunningExecutablePath();
std::filesystem::path RootPath = BinPath.parent_path().parent_path();
BinPath = BinPath.lexically_relative(RootPath);
- SUBCASE("build store init")
+ auto CreateProjectAndOplog = [&](std::string_view ProjectName, std::string_view OplogName) -> std::string {
+ HttpClient Http{ServerUri};
+
+ zen::CbObjectWriter Body;
+ Body << "id" << ProjectName;
+ Body << "root" << RootPath.c_str();
+ Body << "project"
+ << "/zooom";
+ Body << "engine"
+ << "/zooom";
+ IoBuffer BodyBuf = Body.Save().GetBuffer().AsIoBuffer();
+ auto Response = Http.Post(fmt::format("/prj/{}", ProjectName), BodyBuf);
+ REQUIRE(Response.StatusCode == HttpResponseCode::Created);
+
+ std::string OplogUri = fmt::format("{}/prj/{}/oplog/{}", ServerUri, ProjectName, OplogName);
+ HttpClient OplogHttp{OplogUri};
+ auto OplogResponse = OplogHttp.Post(""sv, IoBuffer{}, ZenContentType::kCbObject);
+ REQUIRE(OplogResponse.StatusCode == HttpResponseCode::Created);
+
+ return OplogUri;
+ };
+
+ // Create a file at a path exceeding Windows MAX_PATH (260 chars) for long filename testing
+ std::filesystem::path LongPathDir = RootPath / "longpathtest";
+ for (int I = 0; I < 5; ++I)
{
- {
- HttpClient Http{BaseUri};
+ LongPathDir /= std::string(50, char('a' + I));
+ }
+ std::filesystem::path LongFilePath = LongPathDir / "testfile.bin";
+ std::filesystem::path LongRelPath = LongFilePath.lexically_relative(RootPath);
- {
- zen::CbObjectWriter Body;
- Body << "id"
- << "test";
- Body << "root" << RootPath.c_str();
- Body << "project"
- << "/zooom";
- Body << "engine"
- << "/zooom";
-
- zen::BinaryWriter MemOut;
- IoBuffer BodyBuf = Body.Save().GetBuffer().AsIoBuffer();
-
- auto Response = Http.Post("/prj/test"sv, BodyBuf);
- CHECK(Response.StatusCode == HttpResponseCode::Created);
- }
+ const uint8_t LongPathFileData[] = {0xDE, 0xAD, 0xBE, 0xEF};
+ CreateDirectories(MakeSafeAbsolutePath(LongPathDir));
+ WriteFile(MakeSafeAbsolutePath(LongFilePath), IoBufferBuilder::MakeCloneFromMemory(LongPathFileData, sizeof(LongPathFileData)));
+ CHECK(LongRelPath.string().length() > 260);
- {
- auto Response = Http.Get("/prj/test"sv);
- REQUIRE(Response.StatusCode == HttpResponseCode::OK);
+ std::string LongClientPath = "/{engine}/client";
+ for (int I = 0; I < 5; ++I)
+ {
+ LongClientPath += '/';
+ LongClientPath.append(50, char('a' + I));
+ }
+ LongClientPath += "/longfile.bin";
+ CHECK(LongClientPath.length() > 260);
- CbObject ResponseObject = Response.AsObject();
+ const std::string_view LongPathChunkId{
+ "00000000"
+ "00000000"
+ "00020000"};
+ auto LongPathFileOid = zen::Oid::FromHexString(LongPathChunkId);
- CHECK(ResponseObject["id"].AsString() == "test"sv);
- CHECK(ResponseObject["root"].AsString() == PathToUtf8(RootPath.c_str()));
- }
+ // --- build store persistence ---
+ // First section also verifies project and oplog creation responses.
+ {
+ HttpClient ServerHttp{ServerUri};
+
+ {
+ zen::CbObjectWriter Body;
+ Body << "id"
+ << "test_persist";
+ Body << "root" << RootPath.c_str();
+ Body << "project"
+ << "/zooom";
+ Body << "engine"
+ << "/zooom";
+ IoBuffer BodyBuf = Body.Save().GetBuffer().AsIoBuffer();
+
+ auto Response = ServerHttp.Post("/prj/test_persist"sv, BodyBuf);
+ CHECK(Response.StatusCode == HttpResponseCode::Created);
}
- BaseUri << "/prj/test/oplog/foobar";
+ {
+ auto Response = ServerHttp.Get("/prj/test_persist"sv);
+ REQUIRE(Response.StatusCode == HttpResponseCode::OK);
+
+ CbObject ResponseObject = Response.AsObject();
+
+ CHECK(ResponseObject["id"].AsString() == "test_persist"sv);
+ CHECK(ResponseObject["root"].AsString() == PathToUtf8(RootPath.c_str()));
+ }
+
+ std::string OplogUri = fmt::format("{}/prj/test_persist/oplog/oplog_persist", ServerUri);
{
- HttpClient Http{BaseUri};
+ HttpClient OplogHttp{OplogUri};
{
- auto Response = Http.Post(""sv, IoBuffer{}, ZenContentType::kCbObject);
+ auto Response = OplogHttp.Post(""sv, IoBuffer{}, ZenContentType::kCbObject);
CHECK(Response.StatusCode == HttpResponseCode::Created);
}
{
- auto Response = Http.Get(""sv);
+ auto Response = OplogHttp.Get(""sv);
REQUIRE(Response.StatusCode == HttpResponseCode::OK);
CbObject ResponseObject = Response.AsObject();
- CHECK(ResponseObject["id"].AsString() == "foobar"sv);
- CHECK(ResponseObject["project"].AsString() == "test"sv);
+ CHECK(ResponseObject["id"].AsString() == "oplog_persist"sv);
+ CHECK(ResponseObject["project"].AsString() == "test_persist"sv);
}
}
- // Create a file at a path exceeding Windows MAX_PATH (260 chars) for long filename testing
- std::filesystem::path LongPathDir = RootPath / "longpathtest";
- for (int I = 0; I < 5; ++I)
- {
- LongPathDir /= std::string(50, char('a' + I));
- }
- std::filesystem::path LongFilePath = LongPathDir / "testfile.bin";
- std::filesystem::path LongRelPath = LongFilePath.lexically_relative(RootPath);
+ uint8_t AttachData[] = {1, 2, 3};
- const uint8_t LongPathFileData[] = {0xDE, 0xAD, 0xBE, 0xEF};
- CreateDirectories(MakeSafeAbsolutePath(LongPathDir));
- WriteFile(MakeSafeAbsolutePath(LongFilePath), IoBufferBuilder::MakeCloneFromMemory(LongPathFileData, sizeof(LongPathFileData)));
- CHECK(LongRelPath.string().length() > 260);
+ zen::CompressedBuffer Attachment = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3}));
+ zen::CbAttachment Attach{Attachment, Attachment.DecodeRawHash()};
- std::string LongClientPath = "/{engine}/client";
- for (int I = 0; I < 5; ++I)
- {
- LongClientPath += '/';
- LongClientPath.append(50, char('a' + I));
- }
- LongClientPath += "/longfile.bin";
- CHECK(LongClientPath.length() > 260);
+ zen::CbObjectWriter OpWriter;
+ OpWriter << "key"
+ << "foo"
+ << "attachment" << Attach;
- const std::string_view LongPathChunkId{
+ const std::string_view ChunkId{
"00000000"
"00000000"
- "00020000"};
- auto LongPathFileOid = zen::Oid::FromHexString(LongPathChunkId);
+ "00010000"};
+ auto FileOid = zen::Oid::FromHexString(ChunkId);
+
+ OpWriter.BeginArray("files");
+ OpWriter.BeginObject();
+ OpWriter << "id" << FileOid;
+ OpWriter << "clientpath"
+ << "/{engine}/client/side/path";
+ OpWriter << "serverpath" << BinPath.c_str();
+ OpWriter.EndObject();
+ OpWriter.BeginObject();
+ OpWriter << "id" << LongPathFileOid;
+ OpWriter << "clientpath" << LongClientPath;
+ OpWriter << "serverpath" << LongRelPath.c_str();
+ OpWriter.EndObject();
+ OpWriter.EndArray();
+
+ zen::CbObject Op = OpWriter.Save();
+
+ zen::CbPackage OpPackage(Op);
+ OpPackage.AddAttachment(Attach);
- SUBCASE("build store persistence")
- {
- uint8_t AttachData[] = {1, 2, 3};
-
- zen::CompressedBuffer Attachment = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3}));
- zen::CbAttachment Attach{Attachment, Attachment.DecodeRawHash()};
-
- zen::CbObjectWriter OpWriter;
- OpWriter << "key"
- << "foo"
- << "attachment" << Attach;
-
- const std::string_view ChunkId{
- "00000000"
- "00000000"
- "00010000"};
- auto FileOid = zen::Oid::FromHexString(ChunkId);
-
- OpWriter.BeginArray("files");
- OpWriter.BeginObject();
- OpWriter << "id" << FileOid;
- OpWriter << "clientpath"
- << "/{engine}/client/side/path";
- OpWriter << "serverpath" << BinPath.c_str();
- OpWriter.EndObject();
- OpWriter.BeginObject();
- OpWriter << "id" << LongPathFileOid;
- OpWriter << "clientpath" << LongClientPath;
- OpWriter << "serverpath" << LongRelPath.c_str();
- OpWriter.EndObject();
- OpWriter.EndArray();
-
- zen::CbObject Op = OpWriter.Save();
-
- zen::CbPackage OpPackage(Op);
- OpPackage.AddAttachment(Attach);
-
- zen::BinaryWriter MemOut;
- legacy::SaveCbPackage(OpPackage, MemOut);
-
- HttpClient Http{BaseUri};
-
- {
- auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView()));
+ zen::BinaryWriter MemOut;
+ legacy::SaveCbPackage(OpPackage, MemOut);
- REQUIRE(Response);
- CHECK(Response.StatusCode == HttpResponseCode::Created);
- }
+ HttpClient Http{OplogUri};
- // Read file data
+ {
+ auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView()));
- {
- zen::StringBuilder<128> ChunkGetUri;
- ChunkGetUri << "/" << ChunkId;
- auto Response = Http.Get(ChunkGetUri);
+ REQUIRE(Response);
+ CHECK(Response.StatusCode == HttpResponseCode::Created);
+ }
- REQUIRE(Response);
- CHECK(Response.StatusCode == HttpResponseCode::OK);
- }
+ {
+ zen::StringBuilder<128> ChunkGetUri;
+ ChunkGetUri << "/" << ChunkId;
+ auto Response = Http.Get(ChunkGetUri);
- {
- zen::StringBuilder<128> ChunkGetUri;
- ChunkGetUri << "/" << ChunkId << "?offset=1&size=10";
- auto Response = Http.Get(ChunkGetUri);
+ REQUIRE(Response);
+ CHECK(Response.StatusCode == HttpResponseCode::OK);
+ }
- REQUIRE(Response);
- CHECK(Response.StatusCode == HttpResponseCode::OK);
- CHECK(Response.ResponsePayload.GetSize() == 10);
- }
+ {
+ zen::StringBuilder<128> ChunkGetUri;
+ ChunkGetUri << "/" << ChunkId << "?offset=1&size=10";
+ auto Response = Http.Get(ChunkGetUri);
- // Read long-path file data
- {
- zen::StringBuilder<128> ChunkGetUri;
- ChunkGetUri << "/" << LongPathChunkId;
- auto Response = Http.Get(ChunkGetUri);
+ REQUIRE(Response);
+ CHECK(Response.StatusCode == HttpResponseCode::OK);
+ CHECK(Response.ResponsePayload.GetSize() == 10);
+ }
- REQUIRE(Response);
- CHECK(Response.StatusCode == HttpResponseCode::OK);
- CHECK(Response.ResponsePayload.GetSize() == sizeof(LongPathFileData));
- }
+ {
+ zen::StringBuilder<128> ChunkGetUri;
+ ChunkGetUri << "/" << LongPathChunkId;
+ auto Response = Http.Get(ChunkGetUri);
- ZEN_INFO("+++++++");
+ REQUIRE(Response);
+ CHECK(Response.StatusCode == HttpResponseCode::OK);
+ CHECK(Response.ResponsePayload.GetSize() == sizeof(LongPathFileData));
}
- SUBCASE("snapshot")
- {
- zen::CbObjectWriter OpWriter;
- OpWriter << "key"
- << "foo";
-
- const std::string_view ChunkId{
- "00000000"
- "00000000"
- "00010000"};
- auto FileOid = zen::Oid::FromHexString(ChunkId);
-
- OpWriter.BeginArray("files");
- OpWriter.BeginObject();
- OpWriter << "id" << FileOid;
- OpWriter << "clientpath"
- << "/{engine}/client/side/path";
- OpWriter << "serverpath" << BinPath.c_str();
- OpWriter.EndObject();
- OpWriter.BeginObject();
- OpWriter << "id" << LongPathFileOid;
- OpWriter << "clientpath" << LongClientPath;
- OpWriter << "serverpath" << LongRelPath.c_str();
- OpWriter.EndObject();
- OpWriter.EndArray();
-
- zen::CbObject Op = OpWriter.Save();
-
- zen::CbPackage OpPackage(Op);
-
- zen::BinaryWriter MemOut;
- legacy::SaveCbPackage(OpPackage, MemOut);
-
- HttpClient Http{BaseUri};
+ ZEN_INFO("+++++++");
+ }
- {
- auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView()));
+ // --- snapshot ---
+ {
+ std::string OplogUri = CreateProjectAndOplog("test_snap", "oplog_snap");
- REQUIRE(Response);
- CHECK(Response.StatusCode == HttpResponseCode::Created);
- }
+ zen::CbObjectWriter OpWriter;
+ OpWriter << "key"
+ << "foo";
- // Read file data, it is raw and uncompressed
- {
- zen::StringBuilder<128> ChunkGetUri;
- ChunkGetUri << "/" << ChunkId;
- auto Response = Http.Get(ChunkGetUri);
+ const std::string_view ChunkId{
+ "00000000"
+ "00000000"
+ "00010000"};
+ auto FileOid = zen::Oid::FromHexString(ChunkId);
+
+ OpWriter.BeginArray("files");
+ OpWriter.BeginObject();
+ OpWriter << "id" << FileOid;
+ OpWriter << "clientpath"
+ << "/{engine}/client/side/path";
+ OpWriter << "serverpath" << BinPath.c_str();
+ OpWriter.EndObject();
+ OpWriter.BeginObject();
+ OpWriter << "id" << LongPathFileOid;
+ OpWriter << "clientpath" << LongClientPath;
+ OpWriter << "serverpath" << LongRelPath.c_str();
+ OpWriter.EndObject();
+ OpWriter.EndArray();
+
+ zen::CbObject Op = OpWriter.Save();
+
+ zen::CbPackage OpPackage(Op);
- REQUIRE(Response);
- REQUIRE(Response.StatusCode == HttpResponseCode::OK);
+ zen::BinaryWriter MemOut;
+ legacy::SaveCbPackage(OpPackage, MemOut);
- IoBuffer Data = Response.ResponsePayload;
- IoBuffer ReferenceData = IoBufferBuilder::MakeFromFile(RootPath / BinPath);
- CHECK(ReferenceData.GetSize() == Data.GetSize());
- CHECK(ReferenceData.GetView().EqualBytes(Data.GetView()));
- }
+ HttpClient Http{OplogUri};
- // Read long-path file data, it is raw and uncompressed
- {
- zen::StringBuilder<128> ChunkGetUri;
- ChunkGetUri << "/" << LongPathChunkId;
- auto Response = Http.Get(ChunkGetUri);
+ {
+ auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView()));
- REQUIRE(Response);
- REQUIRE(Response.StatusCode == HttpResponseCode::OK);
+ REQUIRE(Response);
+ CHECK(Response.StatusCode == HttpResponseCode::Created);
+ }
- IoBuffer Data = Response.ResponsePayload;
- MemoryView ExpectedView{LongPathFileData, sizeof(LongPathFileData)};
- CHECK(Data.GetSize() == sizeof(LongPathFileData));
- CHECK(Data.GetView().EqualBytes(ExpectedView));
- }
+ // Read file data, it is raw and uncompressed
+ {
+ zen::StringBuilder<128> ChunkGetUri;
+ ChunkGetUri << "/" << ChunkId;
+ auto Response = Http.Get(ChunkGetUri);
- {
- IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer.AddString("method"sv, "snapshot"sv); });
- auto Response = Http.Post("/rpc"sv, Payload);
- REQUIRE(Response);
- CHECK(Response.StatusCode == HttpResponseCode::OK);
- }
+ REQUIRE(Response);
+ REQUIRE(Response.StatusCode == HttpResponseCode::OK);
- // Read chunk data, it is now compressed
- {
- zen::StringBuilder<128> ChunkGetUri;
- ChunkGetUri << "/" << ChunkId;
- auto Response = Http.Get(ChunkGetUri, {{"Accept-Type", "application/x-ue-comp"}});
+ IoBuffer Data = Response.ResponsePayload;
+ IoBuffer ReferenceData = IoBufferBuilder::MakeFromFile(RootPath / BinPath);
+ CHECK(ReferenceData.GetSize() == Data.GetSize());
+ CHECK(ReferenceData.GetView().EqualBytes(Data.GetView()));
+ }
- REQUIRE(Response);
- REQUIRE(Response.StatusCode == HttpResponseCode::OK);
+ // Read long-path file data, it is raw and uncompressed
+ {
+ zen::StringBuilder<128> ChunkGetUri;
+ ChunkGetUri << "/" << LongPathChunkId;
+ auto Response = Http.Get(ChunkGetUri);
- IoBuffer Data = Response.ResponsePayload;
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), RawHash, RawSize);
- REQUIRE(Compressed);
- IoBuffer DataDecompressed = Compressed.Decompress().AsIoBuffer();
- IoBuffer ReferenceData = IoBufferBuilder::MakeFromFile(RootPath / BinPath);
- CHECK(RawSize == ReferenceData.GetSize());
- CHECK(ReferenceData.GetSize() == DataDecompressed.GetSize());
- CHECK(ReferenceData.GetView().EqualBytes(DataDecompressed.GetView()));
- }
+ REQUIRE(Response);
+ REQUIRE(Response.StatusCode == HttpResponseCode::OK);
- // Read compressed long-path file data after snapshot
- {
- zen::StringBuilder<128> ChunkGetUri;
- ChunkGetUri << "/" << LongPathChunkId;
- auto Response = Http.Get(ChunkGetUri, {{"Accept-Type", "application/x-ue-comp"}});
+ IoBuffer Data = Response.ResponsePayload;
+ MemoryView ExpectedView{LongPathFileData, sizeof(LongPathFileData)};
+ CHECK(Data.GetSize() == sizeof(LongPathFileData));
+ CHECK(Data.GetView().EqualBytes(ExpectedView));
+ }
- REQUIRE(Response);
- REQUIRE(Response.StatusCode == HttpResponseCode::OK);
+ {
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer.AddString("method"sv, "snapshot"sv); });
+ auto Response = Http.Post("/rpc"sv, Payload);
+ REQUIRE(Response);
+ CHECK(Response.StatusCode == HttpResponseCode::OK);
+ }
- IoBuffer Data = Response.ResponsePayload;
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), RawHash, RawSize);
- REQUIRE(Compressed);
- IoBuffer DataDecompressed = Compressed.Decompress().AsIoBuffer();
- MemoryView ExpectedView{LongPathFileData, sizeof(LongPathFileData)};
- CHECK(RawSize == sizeof(LongPathFileData));
- CHECK(DataDecompressed.GetSize() == sizeof(LongPathFileData));
- CHECK(DataDecompressed.GetView().EqualBytes(ExpectedView));
- }
+ // Read chunk data, it is now compressed
+ {
+ zen::StringBuilder<128> ChunkGetUri;
+ ChunkGetUri << "/" << ChunkId;
+ auto Response = Http.Get(ChunkGetUri, {{"Accept-Type", "application/x-ue-comp"}});
+
+ REQUIRE(Response);
+ REQUIRE(Response.StatusCode == HttpResponseCode::OK);
- ZEN_INFO("+++++++");
+ IoBuffer Data = Response.ResponsePayload;
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), RawHash, RawSize);
+ REQUIRE(Compressed);
+ IoBuffer DataDecompressed = Compressed.Decompress().AsIoBuffer();
+ IoBuffer ReferenceData = IoBufferBuilder::MakeFromFile(RootPath / BinPath);
+ CHECK(RawSize == ReferenceData.GetSize());
+ CHECK(ReferenceData.GetSize() == DataDecompressed.GetSize());
+ CHECK(ReferenceData.GetView().EqualBytes(DataDecompressed.GetView()));
}
- SUBCASE("snapshot zero byte file")
+ // Read compressed long-path file data after snapshot
{
- // A zero-byte file referenced in an oplog entry must survive a
- // snapshot: the file is read, compressed, stored in CidStore, and
- // the oplog is rewritten with a BinaryAttachment reference. After
- // the snapshot the chunk must be retrievable and decompress to an
- // empty payload.
-
- std::filesystem::path EmptyFileRelPath = std::filesystem::path("zerobyte_snapshot_test") / "empty.bin";
- std::filesystem::path EmptyFileAbsPath = RootPath / EmptyFileRelPath;
- CreateDirectories(MakeSafeAbsolutePath(EmptyFileAbsPath.parent_path()));
- // Create a zero-byte file on disk.
- WriteFile(MakeSafeAbsolutePath(EmptyFileAbsPath), IoBuffer{});
- REQUIRE(IsFile(MakeSafeAbsolutePath(EmptyFileAbsPath)));
-
- const std::string_view EmptyChunkId{
- "00000000"
- "00000000"
- "00030000"};
- auto EmptyFileOid = zen::Oid::FromHexString(EmptyChunkId);
-
- zen::CbObjectWriter OpWriter;
- OpWriter << "key"
- << "zero_byte_test";
- OpWriter.BeginArray("files");
- OpWriter.BeginObject();
- OpWriter << "id" << EmptyFileOid;
- OpWriter << "clientpath"
- << "/{engine}/empty_file";
- OpWriter << "serverpath" << EmptyFileRelPath.c_str();
- OpWriter.EndObject();
- OpWriter.EndArray();
-
- zen::CbObject Op = OpWriter.Save();
- zen::CbPackage OpPackage(Op);
-
- zen::BinaryWriter MemOut;
- legacy::SaveCbPackage(OpPackage, MemOut);
-
- HttpClient Http{BaseUri};
+ zen::StringBuilder<128> ChunkGetUri;
+ ChunkGetUri << "/" << LongPathChunkId;
+ auto Response = Http.Get(ChunkGetUri, {{"Accept-Type", "application/x-ue-comp"}});
- {
- auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView()));
- REQUIRE(Response);
- CHECK(Response.StatusCode == HttpResponseCode::Created);
- }
+ REQUIRE(Response);
+ REQUIRE(Response.StatusCode == HttpResponseCode::OK);
- // Read file data before snapshot - raw and uncompressed, 0 bytes.
- // http.sys converts a 200 OK with empty body to 204 No Content, so
- // accept either status code.
- {
- zen::StringBuilder<128> ChunkGetUri;
- ChunkGetUri << "/" << EmptyChunkId;
- auto Response = Http.Get(ChunkGetUri);
+ IoBuffer Data = Response.ResponsePayload;
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), RawHash, RawSize);
+ REQUIRE(Compressed);
+ IoBuffer DataDecompressed = Compressed.Decompress().AsIoBuffer();
+ MemoryView ExpectedView{LongPathFileData, sizeof(LongPathFileData)};
+ CHECK(RawSize == sizeof(LongPathFileData));
+ CHECK(DataDecompressed.GetSize() == sizeof(LongPathFileData));
+ CHECK(DataDecompressed.GetView().EqualBytes(ExpectedView));
+ }
- REQUIRE(Response);
- CHECK((Response.StatusCode == HttpResponseCode::OK || Response.StatusCode == HttpResponseCode::NoContent));
- CHECK(Response.ResponsePayload.GetSize() == 0);
- }
+ ZEN_INFO("+++++++");
+ }
- // Trigger snapshot.
- {
- IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer.AddString("method"sv, "snapshot"sv); });
- auto Response = Http.Post("/rpc"sv, Payload);
- REQUIRE(Response);
- CHECK(Response.StatusCode == HttpResponseCode::OK);
- }
+ // --- snapshot zero byte file ---
+ {
+ std::string OplogUri = CreateProjectAndOplog("test_zero", "oplog_zero");
- // Read chunk after snapshot - compressed, decompresses to 0 bytes.
- {
- zen::StringBuilder<128> ChunkGetUri;
- ChunkGetUri << "/" << EmptyChunkId;
- auto Response = Http.Get(ChunkGetUri, {{"Accept-Type", "application/x-ue-comp"}});
+ std::filesystem::path EmptyFileRelPath = std::filesystem::path("zerobyte_snapshot_test") / "empty.bin";
+ std::filesystem::path EmptyFileAbsPath = RootPath / EmptyFileRelPath;
+ CreateDirectories(MakeSafeAbsolutePath(EmptyFileAbsPath.parent_path()));
+ WriteFile(MakeSafeAbsolutePath(EmptyFileAbsPath), IoBuffer{});
+ REQUIRE(IsFile(MakeSafeAbsolutePath(EmptyFileAbsPath)));
- REQUIRE(Response);
- REQUIRE(Response.StatusCode == HttpResponseCode::OK);
+ const std::string_view EmptyChunkId{
+ "00000000"
+ "00000000"
+ "00030000"};
+ auto EmptyFileOid = zen::Oid::FromHexString(EmptyChunkId);
+
+ zen::CbObjectWriter OpWriter;
+ OpWriter << "key"
+ << "zero_byte_test";
+ OpWriter.BeginArray("files");
+ OpWriter.BeginObject();
+ OpWriter << "id" << EmptyFileOid;
+ OpWriter << "clientpath"
+ << "/{engine}/empty_file";
+ OpWriter << "serverpath" << EmptyFileRelPath.c_str();
+ OpWriter.EndObject();
+ OpWriter.EndArray();
+
+ zen::CbObject Op = OpWriter.Save();
+ zen::CbPackage OpPackage(Op);
- IoBuffer Data = Response.ResponsePayload;
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), RawHash, RawSize);
- REQUIRE(Compressed);
- CHECK(RawSize == 0);
- IoBuffer DataDecompressed = Compressed.Decompress().AsIoBuffer();
- CHECK(DataDecompressed.GetSize() == 0);
- }
+ zen::BinaryWriter MemOut;
+ legacy::SaveCbPackage(OpPackage, MemOut);
- // Cleanup
- {
- std::error_code Ec;
- DeleteDirectories(MakeSafeAbsolutePath(RootPath / "zerobyte_snapshot_test"), Ec);
- }
+ HttpClient Http{OplogUri};
- ZEN_INFO("+++++++");
+ {
+ auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView()));
+ REQUIRE(Response);
+ CHECK(Response.StatusCode == HttpResponseCode::Created);
}
- SUBCASE("test chunk not found error")
+ // Read file data before snapshot - raw and uncompressed, 0 bytes.
+ // http.sys converts a 200 OK with empty body to 204 No Content, so
+ // accept either status code.
{
- HttpClient Http{BaseUri};
+ zen::StringBuilder<128> ChunkGetUri;
+ ChunkGetUri << "/" << EmptyChunkId;
+ auto Response = Http.Get(ChunkGetUri);
- for (size_t I = 0; I < 65; I++)
- {
- zen::StringBuilder<128> PostUri;
- PostUri << "/f77c781846caead318084604/info";
- auto Response = Http.Get(PostUri);
+ REQUIRE(Response);
+ CHECK((Response.StatusCode == HttpResponseCode::OK || Response.StatusCode == HttpResponseCode::NoContent));
+ CHECK(Response.ResponsePayload.GetSize() == 0);
+ }
- REQUIRE(!Response.Error);
- CHECK(Response.StatusCode == HttpResponseCode::NotFound);
- }
+ // Trigger snapshot.
+ {
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer.AddString("method"sv, "snapshot"sv); });
+ auto Response = Http.Post("/rpc"sv, Payload);
+ REQUIRE(Response);
+ CHECK(Response.StatusCode == HttpResponseCode::OK);
+ }
+
+ // Read chunk after snapshot - compressed, decompresses to 0 bytes.
+ {
+ zen::StringBuilder<128> ChunkGetUri;
+ ChunkGetUri << "/" << EmptyChunkId;
+ auto Response = Http.Get(ChunkGetUri, {{"Accept-Type", "application/x-ue-comp"}});
+
+ REQUIRE(Response);
+ REQUIRE(Response.StatusCode == HttpResponseCode::OK);
+
+ IoBuffer Data = Response.ResponsePayload;
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), RawHash, RawSize);
+ REQUIRE(Compressed);
+ CHECK(RawSize == 0);
+ IoBuffer DataDecompressed = Compressed.Decompress().AsIoBuffer();
+ CHECK(DataDecompressed.GetSize() == 0);
}
- // Cleanup long-path test directory
{
std::error_code Ec;
- DeleteDirectories(MakeSafeAbsolutePath(RootPath / "longpathtest"), Ec);
+ DeleteDirectories(MakeSafeAbsolutePath(RootPath / "zerobyte_snapshot_test"), Ec);
+ }
+
+ ZEN_INFO("+++++++");
+ }
+
+ // --- test chunk not found error ---
+ {
+ std::string OplogUri = CreateProjectAndOplog("test_notfound", "oplog_notfound");
+ HttpClient Http{OplogUri};
+
+ for (size_t I = 0; I < 65; I++)
+ {
+ zen::StringBuilder<128> PostUri;
+ PostUri << "/f77c781846caead318084604/info";
+ auto Response = Http.Get(PostUri);
+
+ REQUIRE(!Response.Error);
+ CHECK(Response.StatusCode == HttpResponseCode::NotFound);
}
}
+
+ // Cleanup long-path test directory
+ {
+ std::error_code Ec;
+ DeleteDirectories(MakeSafeAbsolutePath(RootPath / "longpathtest"), Ec);
+ }
}
CbPackage
@@ -753,86 +760,102 @@ TEST_CASE("project.remote")
}
};
- SUBCASE("File")
+ // --- Zen ---
+ // NOTE: Zen export must run before file-based exports from the same source
+ // oplog. A prior file export leaves server-side state that causes a
+ // subsequent zen-protocol export from the same oplog to abort.
{
+ INFO("Zen");
ScopedTemporaryDirectory TempDir;
{
- IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) {
+ std::string ExportSourceUri = Servers.GetInstance(0).GetBaseUri();
+ std::string ExportTargetUri = Servers.GetInstance(1).GetBaseUri();
+ MakeProject(ExportTargetUri, "proj0_zen");
+ MakeOplog(ExportTargetUri, "proj0_zen", "oplog0_zen");
+
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
Writer << "method"sv
<< "export"sv;
Writer << "params" << BeginObject;
{
Writer << "maxblocksize"sv << 3072u;
Writer << "maxchunkembedsize"sv << 1296u;
- Writer << "chunkfilesizelimit"sv << 5u * 1024u;
Writer << "maxchunksperblock"sv << 16u;
+ Writer << "chunkfilesizelimit"sv << 5u * 1024u;
Writer << "force"sv << false;
- Writer << "file"sv << BeginObject;
+ Writer << "zen"sv << BeginObject;
{
- Writer << "path"sv << path;
- Writer << "name"sv
- << "proj0_oplog0"sv;
+ Writer << "url"sv << ExportTargetUri.substr(7);
+ Writer << "project"
+ << "proj0_zen";
+ Writer << "oplog"
+ << "oplog0_zen";
}
- Writer << EndObject; // "file"
+ Writer << EndObject; // "zen"
}
Writer << EndObject; // "params"
});
- HttpClient Http{Servers.GetInstance(0).GetBaseUri()};
-
+ HttpClient Http{Servers.GetInstance(0).GetBaseUri()};
HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload);
HttpWaitForCompletion(Servers.GetInstance(0), Response);
}
+ ValidateAttachments(1, "proj0_zen", "oplog0_zen");
+ ValidateOplog(1, "proj0_zen", "oplog0_zen");
+
{
- MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_copy");
- MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
+ std::string ImportSourceUri = Servers.GetInstance(1).GetBaseUri();
+ std::string ImportTargetUri = Servers.GetInstance(2).GetBaseUri();
+ MakeProject(ImportTargetUri, "proj1");
+ MakeOplog(ImportTargetUri, "proj1", "oplog1");
- IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) {
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
Writer << "method"sv
<< "import"sv;
Writer << "params" << BeginObject;
{
Writer << "force"sv << false;
- Writer << "file"sv << BeginObject;
+ Writer << "zen"sv << BeginObject;
{
- Writer << "path"sv << path;
- Writer << "name"sv
- << "proj0_oplog0"sv;
+ Writer << "url"sv << ImportSourceUri.substr(7);
+ Writer << "project"
+ << "proj0_zen";
+ Writer << "oplog"
+ << "oplog0_zen";
}
- Writer << EndObject; // "file"
+ Writer << EndObject; // "zen"
}
Writer << EndObject; // "params"
});
- HttpClient Http{Servers.GetInstance(1).GetBaseUri()};
-
- HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_copy", "oplog0_copy"), Payload);
- HttpWaitForCompletion(Servers.GetInstance(1), Response);
+ HttpClient Http{Servers.GetInstance(2).GetBaseUri()};
+ HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj1", "oplog1"), Payload);
+ HttpWaitForCompletion(Servers.GetInstance(2), Response);
}
- ValidateAttachments(1, "proj0_copy", "oplog0_copy");
- ValidateOplog(1, "proj0_copy", "oplog0_copy");
+ ValidateAttachments(2, "proj1", "oplog1");
+ ValidateOplog(2, "proj1", "oplog1");
}
- SUBCASE("File disable blocks")
+ // --- File ---
{
+ INFO("File");
ScopedTemporaryDirectory TempDir;
{
- IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) {
Writer << "method"sv
<< "export"sv;
Writer << "params" << BeginObject;
{
Writer << "maxblocksize"sv << 3072u;
Writer << "maxchunkembedsize"sv << 1296u;
- Writer << "maxchunksperblock"sv << 16u;
Writer << "chunkfilesizelimit"sv << 5u * 1024u;
- Writer << "force"sv << false;
+ Writer << "maxchunksperblock"sv << 16u;
+ Writer << "force"sv << true;
Writer << "file"sv << BeginObject;
{
- Writer << "path"sv << TempDir.Path().string();
+ Writer << "path"sv << path;
Writer << "name"sv
<< "proj0_oplog0"sv;
- Writer << "disableblocks"sv << true;
}
Writer << EndObject; // "file"
}
@@ -845,9 +868,10 @@ TEST_CASE("project.remote")
HttpWaitForCompletion(Servers.GetInstance(0), Response);
}
{
- MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_copy");
- MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
- IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_file");
+ MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_file", "oplog0_file");
+
+ IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) {
Writer << "method"sv
<< "import"sv;
Writer << "params" << BeginObject;
@@ -855,7 +879,7 @@ TEST_CASE("project.remote")
Writer << "force"sv << false;
Writer << "file"sv << BeginObject;
{
- Writer << "path"sv << TempDir.Path().string();
+ Writer << "path"sv << path;
Writer << "name"sv
<< "proj0_oplog0"sv;
}
@@ -866,15 +890,16 @@ TEST_CASE("project.remote")
HttpClient Http{Servers.GetInstance(1).GetBaseUri()};
- HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_copy", "oplog0_copy"), Payload);
+ HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_file", "oplog0_file"), Payload);
HttpWaitForCompletion(Servers.GetInstance(1), Response);
}
- ValidateAttachments(1, "proj0_copy", "oplog0_copy");
- ValidateOplog(1, "proj0_copy", "oplog0_copy");
+ ValidateAttachments(1, "proj0_file", "oplog0_file");
+ ValidateOplog(1, "proj0_file", "oplog0_file");
}
- SUBCASE("File force temp blocks")
+ // --- File disable blocks ---
{
+ INFO("File disable blocks");
ScopedTemporaryDirectory TempDir;
{
IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
@@ -886,26 +911,27 @@ TEST_CASE("project.remote")
Writer << "maxchunkembedsize"sv << 1296u;
Writer << "maxchunksperblock"sv << 16u;
Writer << "chunkfilesizelimit"sv << 5u * 1024u;
- Writer << "force"sv << false;
+ Writer << "force"sv << true;
Writer << "file"sv << BeginObject;
{
Writer << "path"sv << TempDir.Path().string();
Writer << "name"sv
<< "proj0_oplog0"sv;
- Writer << "enabletempblocks"sv << true;
+ Writer << "disableblocks"sv << true;
}
Writer << EndObject; // "file"
}
Writer << EndObject; // "params"
});
- HttpClient Http{Servers.GetInstance(0).GetBaseUri()};
+ HttpClient Http{Servers.GetInstance(0).GetBaseUri()};
+
HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload);
HttpWaitForCompletion(Servers.GetInstance(0), Response);
}
{
- MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_copy");
- MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
+ MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_noblock");
+ MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_noblock", "oplog0_noblock");
IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
Writer << "method"sv
<< "import"sv;
@@ -923,23 +949,20 @@ TEST_CASE("project.remote")
Writer << EndObject; // "params"
});
- HttpClient Http{Servers.GetInstance(1).GetBaseUri()};
- HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_copy", "oplog0_copy"), Payload);
+ HttpClient Http{Servers.GetInstance(1).GetBaseUri()};
+
+ HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_noblock", "oplog0_noblock"), Payload);
HttpWaitForCompletion(Servers.GetInstance(1), Response);
}
- ValidateAttachments(1, "proj0_copy", "oplog0_copy");
- ValidateOplog(1, "proj0_copy", "oplog0_copy");
+ ValidateAttachments(1, "proj0_noblock", "oplog0_noblock");
+ ValidateOplog(1, "proj0_noblock", "oplog0_noblock");
}
- SUBCASE("Zen")
+ // --- File force temp blocks ---
{
+ INFO("File force temp blocks");
ScopedTemporaryDirectory TempDir;
{
- std::string ExportSourceUri = Servers.GetInstance(0).GetBaseUri();
- std::string ExportTargetUri = Servers.GetInstance(1).GetBaseUri();
- MakeProject(ExportTargetUri, "proj0_copy");
- MakeOplog(ExportTargetUri, "proj0_copy", "oplog0_copy");
-
IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
Writer << "method"sv
<< "export"sv;
@@ -949,14 +972,13 @@ TEST_CASE("project.remote")
Writer << "maxchunkembedsize"sv << 1296u;
Writer << "maxchunksperblock"sv << 16u;
Writer << "chunkfilesizelimit"sv << 5u * 1024u;
- Writer << "force"sv << false;
- Writer << "zen"sv << BeginObject;
+ Writer << "force"sv << true;
+ Writer << "file"sv << BeginObject;
{
- Writer << "url"sv << ExportTargetUri.substr(7);
- Writer << "project"
- << "proj0_copy";
- Writer << "oplog"
- << "oplog0_copy";
+ Writer << "path"sv << TempDir.Path().string();
+ Writer << "name"sv
+ << "proj0_oplog0"sv;
+ Writer << "enabletempblocks"sv << true;
}
Writer << EndObject; // "file"
}
@@ -967,40 +989,32 @@ TEST_CASE("project.remote")
HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload);
HttpWaitForCompletion(Servers.GetInstance(0), Response);
}
- ValidateAttachments(1, "proj0_copy", "oplog0_copy");
- ValidateOplog(1, "proj0_copy", "oplog0_copy");
-
{
- std::string ImportSourceUri = Servers.GetInstance(1).GetBaseUri();
- std::string ImportTargetUri = Servers.GetInstance(2).GetBaseUri();
- MakeProject(ImportTargetUri, "proj1");
- MakeOplog(ImportTargetUri, "proj1", "oplog1");
-
+ MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_tmpblock");
+ MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_tmpblock", "oplog0_tmpblock");
IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
Writer << "method"sv
<< "import"sv;
Writer << "params" << BeginObject;
{
Writer << "force"sv << false;
- Writer << "zen"sv << BeginObject;
+ Writer << "file"sv << BeginObject;
{
- Writer << "url"sv << ImportSourceUri.substr(7);
- Writer << "project"
- << "proj0_copy";
- Writer << "oplog"
- << "oplog0_copy";
+ Writer << "path"sv << TempDir.Path().string();
+ Writer << "name"sv
+ << "proj0_oplog0"sv;
}
Writer << EndObject; // "file"
}
Writer << EndObject; // "params"
});
- HttpClient Http{Servers.GetInstance(2).GetBaseUri()};
- HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj1", "oplog1"), Payload);
- HttpWaitForCompletion(Servers.GetInstance(2), Response);
+ HttpClient Http{Servers.GetInstance(1).GetBaseUri()};
+ HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_tmpblock", "oplog0_tmpblock"), Payload);
+ HttpWaitForCompletion(Servers.GetInstance(1), Response);
}
- ValidateAttachments(2, "proj1", "oplog1");
- ValidateOplog(2, "proj1", "oplog1");
+ ValidateAttachments(1, "proj0_tmpblock", "oplog0_tmpblock");
+ ValidateOplog(1, "proj0_tmpblock", "oplog0_tmpblock");
}
}
@@ -1379,7 +1393,7 @@ TEST_CASE("project.file.data.transitions")
return Package;
};
- SUBCASE("path-referenced file is retrievable")
+ // --- path-referenced file is retrievable ---
{
MakeProject("proj_path"sv);
MakeOplog("proj_path"sv, "oplog"sv);
@@ -1397,7 +1411,7 @@ TEST_CASE("project.file.data.transitions")
}
}
- SUBCASE("hash-referenced file is retrievable")
+ // --- hash-referenced file is retrievable ---
{
MakeProject("proj_hash"sv);
MakeOplog("proj_hash"sv, "oplog"sv);
@@ -1416,34 +1430,35 @@ TEST_CASE("project.file.data.transitions")
}
}
- SUBCASE("hash-referenced to path-referenced transition with different content")
+ struct TransitionVariant
{
- MakeProject("proj_hash_to_path_diff"sv);
- MakeOplog("proj_hash_to_path_diff"sv, "oplog"sv);
+ std::string_view Suffix;
+ bool SameOpKey;
+ bool RunGc;
+ };
- Oid FirstOpKey = Oid::NewOid();
- Oid SecondOpKey;
- bool RunGcAfterTransition = false;
+ static constexpr TransitionVariant Variants[] = {
+ {"_nk", false, false},
+ {"_sk", true, false},
+ {"_nk_gc", false, true},
+ {"_sk_gc", true, true},
+ };
- SUBCASE("new op key") { SecondOpKey = Oid::NewOid(); }
- SUBCASE("same op key") { SecondOpKey = FirstOpKey; }
- SUBCASE("new op key with gc")
- {
- SecondOpKey = Oid::NewOid();
- RunGcAfterTransition = true;
- }
- SUBCASE("same op key with gc")
- {
- SecondOpKey = FirstOpKey;
- RunGcAfterTransition = true;
- }
+ // --- hash-referenced to path-referenced transition with different content ---
+ for (const TransitionVariant& V : Variants)
+ {
+ std::string ProjName = fmt::format("proj_h2pd{}", V.Suffix);
+ MakeProject(ProjName);
+ MakeOplog(ProjName, "oplog"sv);
+
+ Oid FirstOpKey = Oid::NewOid();
+ Oid SecondOpKey = V.SameOpKey ? FirstOpKey : Oid::NewOid();
- // First op: file with CAS hash (content differs from the on-disk file)
{
CbPackage Op = BuildHashReferencedFileOp(FirstOpKey, CompressedBlob);
- PostOplogEntry("proj_hash_to_path_diff"sv, "oplog"sv, Op);
+ PostOplogEntry(ProjName, "oplog"sv, Op);
- HttpClient::Response Response = GetChunk("proj_hash_to_path_diff"sv);
+ HttpClient::Response Response = GetChunk(ProjName);
CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk first op"));
if (Response.IsSuccess())
{
@@ -1453,19 +1468,17 @@ TEST_CASE("project.file.data.transitions")
}
}
- // Second op: same FileId transitions to serverpath (different data)
{
CbPackage Op = BuildPathReferencedFileOp(SecondOpKey);
- PostOplogEntry("proj_hash_to_path_diff"sv, "oplog"sv, Op);
+ PostOplogEntry(ProjName, "oplog"sv, Op);
}
- if (RunGcAfterTransition)
+ if (V.RunGc)
{
TriggerGcAndWait();
}
- // Must serve the on-disk file content, not the old CAS blob
- HttpClient::Response Response = GetChunk("proj_hash_to_path_diff"sv);
+ HttpClient::Response Response = GetChunk(ProjName);
CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk after transition"));
if (Response.IsSuccess())
{
@@ -1475,95 +1488,68 @@ TEST_CASE("project.file.data.transitions")
}
}
- SUBCASE("hash-referenced to path-referenced transition with identical content")
+ // --- hash-referenced to path-referenced transition with identical content ---
{
- // Compress the same on-disk file content as a CAS blob so both references yield identical data
CompressedBuffer MatchingBlob = CompressedBuffer::Compress(SharedBuffer::Clone(FileBlob.GetView()));
- MakeProject("proj_hash_to_path_same"sv);
- MakeOplog("proj_hash_to_path_same"sv, "oplog"sv);
+ for (const TransitionVariant& V : Variants)
+ {
+ std::string ProjName = fmt::format("proj_h2ps{}", V.Suffix);
+ MakeProject(ProjName);
+ MakeOplog(ProjName, "oplog"sv);
- Oid FirstOpKey = Oid::NewOid();
- Oid SecondOpKey;
- bool RunGcAfterTransition = false;
+ Oid FirstOpKey = Oid::NewOid();
+ Oid SecondOpKey = V.SameOpKey ? FirstOpKey : Oid::NewOid();
- SUBCASE("new op key") { SecondOpKey = Oid::NewOid(); }
- SUBCASE("same op key") { SecondOpKey = FirstOpKey; }
- SUBCASE("new op key with gc")
- {
- SecondOpKey = Oid::NewOid();
- RunGcAfterTransition = true;
- }
- SUBCASE("same op key with gc")
- {
- SecondOpKey = FirstOpKey;
- RunGcAfterTransition = true;
- }
+ {
+ CbPackage Op = BuildHashReferencedFileOp(FirstOpKey, MatchingBlob);
+ PostOplogEntry(ProjName, "oplog"sv, Op);
- // First op: file with CAS hash (content matches the on-disk file)
- {
- CbPackage Op = BuildHashReferencedFileOp(FirstOpKey, MatchingBlob);
- PostOplogEntry("proj_hash_to_path_same"sv, "oplog"sv, Op);
+ HttpClient::Response Response = GetChunk(ProjName);
+ CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk first op"));
+ if (Response.IsSuccess())
+ {
+ IoBuffer Payload = GetDecompressedPayload(Response);
+ CHECK(Payload.GetView().EqualBytes(FileBlob.GetView()));
+ }
+ }
- HttpClient::Response Response = GetChunk("proj_hash_to_path_same"sv);
- CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk first op"));
+ {
+ CbPackage Op = BuildPathReferencedFileOp(SecondOpKey);
+ PostOplogEntry(ProjName, "oplog"sv, Op);
+ }
+
+ if (V.RunGc)
+ {
+ TriggerGcAndWait();
+ }
+
+ HttpClient::Response Response = GetChunk(ProjName);
+ CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk after transition"));
if (Response.IsSuccess())
{
IoBuffer Payload = GetDecompressedPayload(Response);
+ CHECK_EQ(Payload.GetSize(), FileBlob.GetSize());
CHECK(Payload.GetView().EqualBytes(FileBlob.GetView()));
}
}
-
- // Second op: same FileId transitions to serverpath (same data)
- {
- CbPackage Op = BuildPathReferencedFileOp(SecondOpKey);
- PostOplogEntry("proj_hash_to_path_same"sv, "oplog"sv, Op);
- }
-
- if (RunGcAfterTransition)
- {
- TriggerGcAndWait();
- }
-
- // Must still resolve successfully after the transition
- HttpClient::Response Response = GetChunk("proj_hash_to_path_same"sv);
- CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk after transition"));
- if (Response.IsSuccess())
- {
- IoBuffer Payload = GetDecompressedPayload(Response);
- CHECK_EQ(Payload.GetSize(), FileBlob.GetSize());
- CHECK(Payload.GetView().EqualBytes(FileBlob.GetView()));
- }
}
- SUBCASE("path-referenced to hash-referenced transition with different content")
+ // --- path-referenced to hash-referenced transition with different content ---
+ for (const TransitionVariant& V : Variants)
{
- MakeProject("proj_path_to_hash_diff"sv);
- MakeOplog("proj_path_to_hash_diff"sv, "oplog"sv);
-
- Oid FirstOpKey = Oid::NewOid();
- Oid SecondOpKey;
- bool RunGcAfterTransition = false;
+ std::string ProjName = fmt::format("proj_p2hd{}", V.Suffix);
+ MakeProject(ProjName);
+ MakeOplog(ProjName, "oplog"sv);
- SUBCASE("new op key") { SecondOpKey = Oid::NewOid(); }
- SUBCASE("same op key") { SecondOpKey = FirstOpKey; }
- SUBCASE("new op key with gc")
- {
- SecondOpKey = Oid::NewOid();
- RunGcAfterTransition = true;
- }
- SUBCASE("same op key with gc")
- {
- SecondOpKey = FirstOpKey;
- RunGcAfterTransition = true;
- }
+ Oid FirstOpKey = Oid::NewOid();
+ Oid SecondOpKey = V.SameOpKey ? FirstOpKey : Oid::NewOid();
- // First op: file with serverpath
{
CbPackage Op = BuildPathReferencedFileOp(FirstOpKey);
- PostOplogEntry("proj_path_to_hash_diff"sv, "oplog"sv, Op);
+ PostOplogEntry(ProjName, "oplog"sv, Op);
- HttpClient::Response Response = GetChunk("proj_path_to_hash_diff"sv);
+ HttpClient::Response Response = GetChunk(ProjName);
CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk first op"));
if (Response.IsSuccess())
{
@@ -1572,19 +1558,17 @@ TEST_CASE("project.file.data.transitions")
}
}
- // Second op: same FileId transitions to CAS hash (different data)
{
CbPackage Op = BuildHashReferencedFileOp(SecondOpKey, CompressedBlob);
- PostOplogEntry("proj_path_to_hash_diff"sv, "oplog"sv, Op);
+ PostOplogEntry(ProjName, "oplog"sv, Op);
}
- if (RunGcAfterTransition)
+ if (V.RunGc)
{
TriggerGcAndWait();
}
- // Must serve the CAS blob content, not the old on-disk file
- HttpClient::Response Response = GetChunk("proj_path_to_hash_diff"sv);
+ HttpClient::Response Response = GetChunk(ProjName);
CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk after transition"));
if (Response.IsSuccess())
{
@@ -1595,65 +1579,51 @@ TEST_CASE("project.file.data.transitions")
}
}
- SUBCASE("path-referenced to hash-referenced transition with identical content")
+ // --- path-referenced to hash-referenced transition with identical content ---
{
- // Compress the same on-disk file content as a CAS blob so both references yield identical data
CompressedBuffer MatchingBlob = CompressedBuffer::Compress(SharedBuffer::Clone(FileBlob.GetView()));
- MakeProject("proj_path_to_hash_same"sv);
- MakeOplog("proj_path_to_hash_same"sv, "oplog"sv);
+ for (const TransitionVariant& V : Variants)
+ {
+ std::string ProjName = fmt::format("proj_p2hs{}", V.Suffix);
+ MakeProject(ProjName);
+ MakeOplog(ProjName, "oplog"sv);
- Oid FirstOpKey = Oid::NewOid();
- Oid SecondOpKey;
- bool RunGcAfterTransition = false;
+ Oid FirstOpKey = Oid::NewOid();
+ Oid SecondOpKey = V.SameOpKey ? FirstOpKey : Oid::NewOid();
- SUBCASE("new op key") { SecondOpKey = Oid::NewOid(); }
- SUBCASE("same op key") { SecondOpKey = FirstOpKey; }
- SUBCASE("new op key with gc")
- {
- SecondOpKey = Oid::NewOid();
- RunGcAfterTransition = true;
- }
- SUBCASE("same op key with gc")
- {
- SecondOpKey = FirstOpKey;
- RunGcAfterTransition = true;
- }
+ {
+ CbPackage Op = BuildPathReferencedFileOp(FirstOpKey);
+ PostOplogEntry(ProjName, "oplog"sv, Op);
- // First op: file with serverpath
- {
- CbPackage Op = BuildPathReferencedFileOp(FirstOpKey);
- PostOplogEntry("proj_path_to_hash_same"sv, "oplog"sv, Op);
+ HttpClient::Response Response = GetChunk(ProjName);
+ CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk first op"));
+ if (Response.IsSuccess())
+ {
+ IoBuffer Payload = GetDecompressedPayload(Response);
+ CHECK(Payload.GetView().EqualBytes(FileBlob.GetView()));
+ }
+ }
- HttpClient::Response Response = GetChunk("proj_path_to_hash_same"sv);
- CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk first op"));
+ {
+ CbPackage Op = BuildHashReferencedFileOp(SecondOpKey, MatchingBlob);
+ PostOplogEntry(ProjName, "oplog"sv, Op);
+ }
+
+ if (V.RunGc)
+ {
+ TriggerGcAndWait();
+ }
+
+ HttpClient::Response Response = GetChunk(ProjName);
+ CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk after transition"));
if (Response.IsSuccess())
{
IoBuffer Payload = GetDecompressedPayload(Response);
+ CHECK_EQ(Payload.GetSize(), FileBlob.GetSize());
CHECK(Payload.GetView().EqualBytes(FileBlob.GetView()));
}
}
-
- // Second op: same FileId transitions to CAS hash (same data)
- {
- CbPackage Op = BuildHashReferencedFileOp(SecondOpKey, MatchingBlob);
- PostOplogEntry("proj_path_to_hash_same"sv, "oplog"sv, Op);
- }
-
- if (RunGcAfterTransition)
- {
- TriggerGcAndWait();
- }
-
- // Must still resolve successfully after the transition
- HttpClient::Response Response = GetChunk("proj_path_to_hash_same"sv);
- CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk after transition"));
- if (Response.IsSuccess())
- {
- IoBuffer Payload = GetDecompressedPayload(Response);
- CHECK_EQ(Payload.GetSize(), FileBlob.GetSize());
- CHECK(Payload.GetView().EqualBytes(FileBlob.GetView()));
- }
}
}
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp
index cf7ffe4e4..d713f693f 100644
--- a/src/zenserver-test/zenserver-test.cpp
+++ b/src/zenserver-test/zenserver-test.cpp
@@ -199,7 +199,7 @@ TEST_CASE("default.single")
HttpClient Http{fmt::format("http://localhost:{}", PortNumber)};
- for (int i = 0; i < 100; ++i)
+ for (int i = 0; i < 20; ++i)
{
auto res = Http.Get("/test/hello"sv);
++RequestCount;
@@ -238,7 +238,6 @@ TEST_CASE("default.loopback")
ZEN_INFO("Running loopback server test...");
- SUBCASE("ipv4 endpoint connectivity")
{
HttpClient Http{fmt::format("http://127.0.0.1:{}", PortNumber)};
@@ -247,7 +246,6 @@ TEST_CASE("default.loopback")
CHECK(res);
}
- SUBCASE("ipv6 endpoint connectivity")
{
HttpClient Http{fmt::format("http://[::1]:{}", PortNumber)};
@@ -287,7 +285,7 @@ TEST_CASE("multi.basic")
HttpClient Http{fmt::format("http://localhost:{}", PortNumber)};
- for (int i = 0; i < 100; ++i)
+ for (int i = 0; i < 20; ++i)
{
auto res = Http.Get("/test/hello"sv);
++RequestCount;
@@ -401,13 +399,11 @@ TEST_CASE("http.unixsocket")
Settings.UnixSocketPath = SocketPath;
HttpClient Http{fmt::format("http://localhost:{}", PortNumber), Settings, {}};
- SUBCASE("GET over unix socket")
{
HttpClient::Response Res = Http.Get("/testing/hello");
CHECK(Res.IsSuccess());
}
- SUBCASE("POST echo over unix socket")
{
IoBuffer Body{IoBuffer::Wrap, "unix-test", 9};
HttpClient::Response Res = Http.Post("/testing/echo", Body);
@@ -431,13 +427,11 @@ TEST_CASE("http.nonetwork")
Settings.UnixSocketPath = SocketPath;
HttpClient Http{fmt::format("http://localhost:{}", PortNumber), Settings, {}};
- SUBCASE("GET over unix socket succeeds")
{
HttpClient::Response Res = Http.Get("/testing/hello");
CHECK(Res.IsSuccess());
}
- SUBCASE("TCP connection is refused")
{
asio::io_context IoContext;
asio::ip::tcp::socket Socket(IoContext);