aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/zenhttp/httpclient_test.cpp12
-rw-r--r--src/zenhttp/httpserver.cpp95
-rw-r--r--src/zenhttp/include/zenhttp/httpcommon.h14
-rw-r--r--src/zenhttp/include/zenhttp/httpserver.h4
-rw-r--r--src/zenhttp/servers/httpasio.cpp28
-rw-r--r--src/zenhttp/servers/httpplugin.cpp32
-rw-r--r--src/zenhttp/servers/httpsys.cpp30
-rw-r--r--src/zenhttp/servers/wstest.cpp26
-rw-r--r--src/zenremotestore/builds/buildstorageoperations.cpp5
-rw-r--r--src/zenserver-test/buildstore-tests.cpp155
-rw-r--r--src/zenserver-test/cache-tests.cpp328
-rw-r--r--src/zenserver-test/compute-tests.cpp638
-rw-r--r--src/zenserver-test/hub-tests.cpp33
-rw-r--r--src/zenserver-test/logging-tests.cpp22
-rw-r--r--src/zenserver-test/objectstore-tests.cpp344
-rw-r--r--src/zenserver-test/projectstore-tests.cpp1090
-rw-r--r--src/zenserver-test/zenserver-test.cpp10
-rw-r--r--src/zenserver/frontend/html/pages/cache.js5
-rw-r--r--src/zenserver/frontend/html/pages/hub.js66
-rw-r--r--src/zenserver/frontend/html/pages/projects.js5
-rw-r--r--src/zenserver/frontend/html/pages/start.js10
-rw-r--r--src/zenserver/frontend/html/util/widgets.js64
-rw-r--r--src/zenserver/frontend/html/zen.css47
-rw-r--r--src/zenserver/hub/httphubservice.cpp80
-rw-r--r--src/zenserver/hub/httphubservice.h1
-rw-r--r--src/zenserver/hub/hydration.cpp2
-rw-r--r--src/zenserver/main.cpp2
-rw-r--r--src/zenserver/storage/buildstore/httpbuildstore.cpp144
-rw-r--r--src/zenserver/storage/objectstore/objectstore.cpp50
-rw-r--r--src/zenstore/compactcas.cpp13
-rw-r--r--src/zenutil/consul/consul.cpp6
-rw-r--r--src/zenutil/filesystemutils.cpp2
-rw-r--r--src/zenutil/process/subprocessmanager.cpp171
33 files changed, 2020 insertions, 1514 deletions
diff --git a/src/zenhttp/httpclient_test.cpp b/src/zenhttp/httpclient_test.cpp
index af653cbb2..9b3148a4a 100644
--- a/src/zenhttp/httpclient_test.cpp
+++ b/src/zenhttp/httpclient_test.cpp
@@ -194,7 +194,7 @@ public:
"slow",
[](HttpRouterRequest& Req) {
Req.ServerRequest().WriteResponseAsync([](HttpServerRequest& Request) {
- Sleep(2000);
+ Sleep(100);
Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, "slow response");
});
},
@@ -750,7 +750,9 @@ TEST_CASE("httpclient.error-handling")
{
SUBCASE("Connection refused")
{
- HttpClient Client("127.0.0.1:19999", HttpClientSettings{}, /*CheckIfAbortFunction*/ {});
+ HttpClientSettings Settings;
+ Settings.ConnectTimeout = std::chrono::milliseconds(200);
+ HttpClient Client("127.0.0.1:19999", Settings, /*CheckIfAbortFunction*/ {});
HttpClient::Response Resp = Client.Get("/api/test/hello");
CHECK(!Resp.IsSuccess());
CHECK(Resp.Error.has_value());
@@ -760,7 +762,7 @@ TEST_CASE("httpclient.error-handling")
{
TestServerFixture Fixture;
HttpClientSettings Settings;
- Settings.Timeout = std::chrono::milliseconds(500);
+ Settings.Timeout = std::chrono::milliseconds(50);
HttpClient Client = Fixture.MakeClient(Settings);
HttpClient::Response Resp = Client.Get("/api/test/slow");
@@ -970,7 +972,9 @@ TEST_CASE("httpclient.measurelatency")
SUBCASE("Failed measurement against unreachable port")
{
- HttpClient Client("127.0.0.1:19999", HttpClientSettings{}, /*CheckIfAbortFunction*/ {});
+ HttpClientSettings Settings;
+ Settings.ConnectTimeout = std::chrono::milliseconds(200);
+ HttpClient Client("127.0.0.1:19999", Settings, /*CheckIfAbortFunction*/ {});
LatencyTestResult Result = MeasureLatency(Client, "/api/test/hello");
CHECK(!Result.Success);
CHECK(!Result.FailureReason.empty());
diff --git a/src/zenhttp/httpserver.cpp b/src/zenhttp/httpserver.cpp
index 38021be16..3668e1e8f 100644
--- a/src/zenhttp/httpserver.cpp
+++ b/src/zenhttp/httpserver.cpp
@@ -266,10 +266,10 @@ TryParseHttpRangeHeader(std::string_view RangeHeader, HttpRanges& Ranges)
return false;
}
- const auto Start = ParseInt<uint32_t>(Token.substr(0, Delim));
- const auto End = ParseInt<uint32_t>(Token.substr(Delim + 1));
+ const auto Start = ParseInt<uint64_t>(Token.substr(0, Delim));
+ const auto End = ParseInt<uint64_t>(Token.substr(Delim + 1));
- if (Start.has_value() && End.has_value() && End.value() > Start.value())
+ if (Start.has_value() && End.has_value() && End.value() >= Start.value())
{
Ranges.push_back({.Start = Start.value(), .End = End.value()});
}
@@ -286,6 +286,45 @@ TryParseHttpRangeHeader(std::string_view RangeHeader, HttpRanges& Ranges)
return Count != Ranges.size();
}
+MultipartByteRangesResult
+BuildMultipartByteRanges(const IoBuffer& Data, const HttpRanges& Ranges)
+{
+ Oid::String_t BoundaryStr;
+ Oid::NewOid().ToString(BoundaryStr);
+ std::string_view Boundary(BoundaryStr, Oid::StringLength);
+
+ const uint64_t TotalSize = Data.GetSize();
+
+ std::vector<IoBuffer> Parts;
+ Parts.reserve(Ranges.size() * 2 + 1);
+
+ for (const HttpRange& Range : Ranges)
+ {
+ uint64_t RangeEnd = (Range.End != ~uint64_t(0)) ? Range.End : TotalSize - 1;
+ if (RangeEnd >= TotalSize || Range.Start > RangeEnd)
+ {
+ return {};
+ }
+
+ uint64_t RangeSize = 1 + (RangeEnd - Range.Start);
+
+ std::string PartHeader = fmt::format("\r\n--{}\r\nContent-Type: application/octet-stream\r\nContent-Range: bytes {}-{}/{}\r\n\r\n",
+ Boundary,
+ Range.Start,
+ RangeEnd,
+ TotalSize);
+ Parts.push_back(IoBufferBuilder::MakeCloneFromMemory(PartHeader.data(), PartHeader.size()));
+
+ IoBuffer RangeData(Data, Range.Start, RangeSize);
+ Parts.push_back(RangeData);
+ }
+
+ std::string ClosingBoundary = fmt::format("\r\n--{}--", Boundary);
+ Parts.push_back(IoBufferBuilder::MakeCloneFromMemory(ClosingBoundary.data(), ClosingBoundary.size()));
+
+ return {.Parts = std::move(Parts), .ContentType = fmt::format("multipart/byteranges; boundary={}", Boundary)};
+}
+
//////////////////////////////////////////////////////////////////////////
const std::string_view
@@ -564,6 +603,56 @@ HttpServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentType
}
void
+HttpServerRequest::WriteResponse(HttpContentType ContentType, const IoBuffer& Data, const HttpRanges& Ranges)
+{
+ if (Ranges.empty())
+ {
+ WriteResponse(HttpResponseCode::OK, ContentType, IoBuffer(Data));
+ return;
+ }
+
+ if (Ranges.size() == 1)
+ {
+ const HttpRange& Range = Ranges[0];
+ const uint64_t TotalSize = Data.GetSize();
+ // ~uint64_t(0) is the sentinel meaning "end of file" (suffix range).
+ const uint64_t RangeEnd = (Range.End != ~uint64_t(0)) ? Range.End : TotalSize - 1;
+
+ if (RangeEnd >= TotalSize || Range.Start > RangeEnd)
+ {
+ m_ContentRangeHeader = fmt::format("bytes */{}", TotalSize);
+ WriteResponse(HttpResponseCode::RangeNotSatisfiable);
+ return;
+ }
+
+ const uint64_t RangeSize = 1 + (RangeEnd - Range.Start);
+ IoBuffer RangeBuf(Data, Range.Start, RangeSize);
+
+ m_ContentRangeHeader = fmt::format("bytes {}-{}/{}", Range.Start, RangeEnd, TotalSize);
+ WriteResponse(HttpResponseCode::PartialContent, ContentType, std::move(RangeBuf));
+ return;
+ }
+
+ // Multi-range
+ MultipartByteRangesResult MultipartResult = BuildMultipartByteRanges(Data, Ranges);
+ if (MultipartResult.Parts.empty())
+ {
+ m_ContentRangeHeader = fmt::format("bytes */{}", Data.GetSize());
+ WriteResponse(HttpResponseCode::RangeNotSatisfiable);
+ return;
+ }
+ WriteResponse(HttpResponseCode::PartialContent, std::move(MultipartResult.ContentType), std::span<IoBuffer>(MultipartResult.Parts));
+}
+
+void
+HttpServerRequest::WriteResponse(HttpResponseCode ResponseCode, const std::string& CustomContentType, std::span<IoBuffer> Blobs)
+{
+ ZEN_ASSERT(ParseContentType(CustomContentType) == HttpContentType::kUnknownContentType);
+ m_ContentTypeOverride = CustomContentType;
+ WriteResponse(ResponseCode, HttpContentType::kBinary, Blobs);
+}
+
+void
HttpServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, CompositeBuffer& Payload)
{
std::span<const SharedBuffer> Segments = Payload.GetSegments();
diff --git a/src/zenhttp/include/zenhttp/httpcommon.h b/src/zenhttp/include/zenhttp/httpcommon.h
index f9a99f3cc..1d921600d 100644
--- a/src/zenhttp/include/zenhttp/httpcommon.h
+++ b/src/zenhttp/include/zenhttp/httpcommon.h
@@ -19,8 +19,8 @@ class StringBuilderBase;
struct HttpRange
{
- uint32_t Start = ~uint32_t(0);
- uint32_t End = ~uint32_t(0);
+ uint64_t Start = ~uint64_t(0);
+ uint64_t End = ~uint64_t(0);
};
using HttpRanges = std::vector<HttpRange>;
@@ -30,6 +30,16 @@ extern HttpContentType (*ParseContentType)(const std::string_view& ContentTypeSt
std::string_view ReasonStringForHttpResultCode(int HttpCode);
bool TryParseHttpRangeHeader(std::string_view RangeHeader, HttpRanges& Ranges);
+struct MultipartByteRangesResult
+{
+ std::vector<IoBuffer> Parts;
+ std::string ContentType;
+};
+
+// Build a multipart/byteranges response body from the given data and ranges.
+// Generates a unique boundary per call. Returns empty Parts if any range is out of bounds.
+MultipartByteRangesResult BuildMultipartByteRanges(const IoBuffer& Data, const HttpRanges& Ranges);
+
enum class HttpVerb : uint8_t
{
kGet = 1 << 0,
diff --git a/src/zenhttp/include/zenhttp/httpserver.h b/src/zenhttp/include/zenhttp/httpserver.h
index e8bfcfd4d..136304426 100644
--- a/src/zenhttp/include/zenhttp/httpserver.h
+++ b/src/zenhttp/include/zenhttp/httpserver.h
@@ -122,11 +122,13 @@ public:
virtual void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::u8string_view ResponseString) = 0;
virtual void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, CompositeBuffer& Payload);
+ void WriteResponse(HttpResponseCode ResponseCode, const std::string& CustomContentType, std::span<IoBuffer> Blobs);
void WriteResponse(HttpResponseCode ResponseCode, CbObject Data);
void WriteResponse(HttpResponseCode ResponseCode, CbArray Array);
void WriteResponse(HttpResponseCode ResponseCode, CbPackage Package);
void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::string_view ResponseString);
void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, IoBuffer Blob);
+ void WriteResponse(HttpContentType ContentType, const IoBuffer& Data, const HttpRanges& Ranges);
virtual void WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler) = 0;
@@ -152,6 +154,8 @@ protected:
std::string_view m_QueryString;
mutable uint32_t m_RequestId = ~uint32_t(0);
mutable Oid m_SessionId = Oid::Zero;
+ std::string m_ContentTypeOverride;
+ std::string m_ContentRangeHeader;
inline void SetIsHandled() { m_Flags |= kIsHandled; }
diff --git a/src/zenhttp/servers/httpasio.cpp b/src/zenhttp/servers/httpasio.cpp
index 6cda84875..cfba3c95f 100644
--- a/src/zenhttp/servers/httpasio.cpp
+++ b/src/zenhttp/servers/httpasio.cpp
@@ -625,6 +625,8 @@ public:
void SetAllowZeroCopyFileSend(bool Allow) { m_AllowZeroCopyFileSend = Allow; }
void SetKeepAlive(bool KeepAlive) { m_IsKeepAlive = KeepAlive; }
+ void SetContentTypeOverride(std::string Override) { m_ContentTypeOverride = std::move(Override); }
+ void SetContentRangeHeader(std::string V) { m_ContentRangeHeader = std::move(V); }
/**
* Initialize the response for sending a payload made up of multiple blobs
@@ -768,10 +770,18 @@ public:
{
ZEN_MEMSCOPE(GetHttpasioTag());
+ std::string_view ContentTypeStr =
+ m_ContentTypeOverride.empty() ? MapContentTypeToString(m_ContentType) : std::string_view(m_ContentTypeOverride);
+
m_Headers << "HTTP/1.1 " << ResponseCode() << " " << ReasonStringForHttpResultCode(ResponseCode()) << "\r\n"
- << "Content-Type: " << MapContentTypeToString(m_ContentType) << "\r\n"
+ << "Content-Type: " << ContentTypeStr << "\r\n"
<< "Content-Length: " << ContentLength() << "\r\n"sv;
+ if (!m_ContentRangeHeader.empty())
+ {
+ m_Headers << "Content-Range: " << m_ContentRangeHeader << "\r\n"sv;
+ }
+
if (!m_IsKeepAlive)
{
m_Headers << "Connection: close\r\n"sv;
@@ -898,7 +908,9 @@ private:
bool m_AllowZeroCopyFileSend = true;
State m_State = State::kUninitialized;
HttpContentType m_ContentType = HttpContentType::kBinary;
- uint64_t m_ContentLength = 0;
+ std::string m_ContentTypeOverride;
+ std::string m_ContentRangeHeader;
+ uint64_t m_ContentLength = 0;
eastl::fixed_vector<IoBuffer, 8> m_DataBuffers; // This is here to keep the IoBuffer buffers/handles alive
ExtendableStringBuilder<160> m_Headers;
@@ -2131,6 +2143,10 @@ HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode)
m_Response.reset(new HttpResponse(HttpContentType::kBinary, m_RequestNumber));
m_Response->SetAllowZeroCopyFileSend(m_AllowZeroCopyFileSend);
m_Response->SetKeepAlive(m_Request.IsKeepAlive());
+ if (!m_ContentRangeHeader.empty())
+ {
+ m_Response->SetContentRangeHeader(std::move(m_ContentRangeHeader));
+ }
std::array<IoBuffer, 0> Empty;
m_Response->InitializeForPayload((uint16_t)ResponseCode, Empty);
@@ -2146,6 +2162,14 @@ HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentT
m_Response.reset(new HttpResponse(ContentType, m_RequestNumber));
m_Response->SetAllowZeroCopyFileSend(m_AllowZeroCopyFileSend);
m_Response->SetKeepAlive(m_Request.IsKeepAlive());
+ if (!m_ContentTypeOverride.empty())
+ {
+ m_Response->SetContentTypeOverride(std::move(m_ContentTypeOverride));
+ }
+ if (!m_ContentRangeHeader.empty())
+ {
+ m_Response->SetContentRangeHeader(std::move(m_ContentRangeHeader));
+ }
m_Response->InitializeForPayload((uint16_t)ResponseCode, Blobs);
}
diff --git a/src/zenhttp/servers/httpplugin.cpp b/src/zenhttp/servers/httpplugin.cpp
index 31b0315d4..b0fb020e0 100644
--- a/src/zenhttp/servers/httpplugin.cpp
+++ b/src/zenhttp/servers/httpplugin.cpp
@@ -185,13 +185,17 @@ public:
const std::vector<IoBuffer>& ResponseBuffers() const { return m_ResponseBuffers; }
void SuppressPayload() { m_ResponseBuffers.resize(1); }
+ void SetContentTypeOverride(std::string Override) { m_ContentTypeOverride = std::move(Override); }
+ void SetContentRangeHeader(std::string V) { m_ContentRangeHeader = std::move(V); }
std::string_view GetHeaders();
private:
- uint16_t m_ResponseCode = 0;
- bool m_IsKeepAlive = true;
- HttpContentType m_ContentType = HttpContentType::kBinary;
+ uint16_t m_ResponseCode = 0;
+ bool m_IsKeepAlive = true;
+ HttpContentType m_ContentType = HttpContentType::kBinary;
+ std::string m_ContentTypeOverride;
+ std::string m_ContentRangeHeader;
uint64_t m_ContentLength = 0;
std::vector<IoBuffer> m_ResponseBuffers;
ExtendableStringBuilder<160> m_Headers;
@@ -246,10 +250,18 @@ HttpPluginResponse::GetHeaders()
if (m_Headers.Size() == 0)
{
+ std::string_view ContentTypeStr =
+ m_ContentTypeOverride.empty() ? MapContentTypeToString(m_ContentType) : std::string_view(m_ContentTypeOverride);
+
m_Headers << "HTTP/1.1 " << ResponseCode() << " " << ReasonStringForHttpResultCode(ResponseCode()) << "\r\n"
- << "Content-Type: " << MapContentTypeToString(m_ContentType) << "\r\n"
+ << "Content-Type: " << ContentTypeStr << "\r\n"
<< "Content-Length: " << ContentLength() << "\r\n"sv;
+ if (!m_ContentRangeHeader.empty())
+ {
+ m_Headers << "Content-Range: " << m_ContentRangeHeader << "\r\n"sv;
+ }
+
if (!m_IsKeepAlive)
{
m_Headers << "Connection: close\r\n"sv;
@@ -669,6 +681,10 @@ HttpPluginServerRequest::WriteResponse(HttpResponseCode ResponseCode)
ZEN_MEMSCOPE(GetHttppluginTag());
m_Response.reset(new HttpPluginResponse(HttpContentType::kBinary));
+ if (!m_ContentRangeHeader.empty())
+ {
+ m_Response->SetContentRangeHeader(std::move(m_ContentRangeHeader));
+ }
std::array<IoBuffer, 0> Empty;
m_Response->InitializeForPayload((uint16_t)ResponseCode, Empty);
@@ -681,6 +697,14 @@ HttpPluginServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpConten
ZEN_MEMSCOPE(GetHttppluginTag());
m_Response.reset(new HttpPluginResponse(ContentType));
+ if (!m_ContentTypeOverride.empty())
+ {
+ m_Response->SetContentTypeOverride(std::move(m_ContentTypeOverride));
+ }
+ if (!m_ContentRangeHeader.empty())
+ {
+ m_Response->SetContentRangeHeader(std::move(m_ContentRangeHeader));
+ }
m_Response->InitializeForPayload((uint16_t)ResponseCode, Blobs);
}
diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp
index 1b722940d..d45804c50 100644
--- a/src/zenhttp/servers/httpsys.cpp
+++ b/src/zenhttp/servers/httpsys.cpp
@@ -464,6 +464,8 @@ public:
inline int64_t GetResponseBodySize() const { return m_TotalDataSize; }
void SetLocationHeader(std::string_view Location) { m_LocationHeader = Location; }
+ void SetContentTypeOverride(std::string Override) { m_ContentTypeOverride = std::move(Override); }
+ void SetContentRangeHeader(std::string V) { m_ContentRangeHeader = std::move(V); }
private:
eastl::fixed_vector<HTTP_DATA_CHUNK, 16> m_HttpDataChunks;
@@ -473,6 +475,8 @@ private:
uint32_t m_RemainingChunkCount = 0; // Backlog for multi-call sends
bool m_IsInitialResponse = true;
HttpContentType m_ContentType = HttpContentType::kBinary;
+ std::string m_ContentTypeOverride;
+ std::string m_ContentRangeHeader;
eastl::fixed_vector<IoBuffer, 16> m_DataBuffers;
std::string m_LocationHeader;
@@ -725,7 +729,8 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode)
PHTTP_KNOWN_HEADER ContentTypeHeader = &HttpResponse.Headers.KnownHeaders[HttpHeaderContentType];
- std::string_view ContentTypeString = MapContentTypeToString(m_ContentType);
+ std::string_view ContentTypeString =
+ m_ContentTypeOverride.empty() ? MapContentTypeToString(m_ContentType) : std::string_view(m_ContentTypeOverride);
ContentTypeHeader->pRawValue = ContentTypeString.data();
ContentTypeHeader->RawValueLength = (USHORT)ContentTypeString.size();
@@ -739,6 +744,15 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode)
LocationHeader->RawValueLength = (USHORT)m_LocationHeader.size();
}
+ // Content-Range header (for 206 Partial Content single-range responses)
+
+ if (!m_ContentRangeHeader.empty())
+ {
+ PHTTP_KNOWN_HEADER ContentRangeHeader = &HttpResponse.Headers.KnownHeaders[HttpHeaderContentRange];
+ ContentRangeHeader->pRawValue = m_ContentRangeHeader.data();
+ ContentRangeHeader->RawValueLength = (USHORT)m_ContentRangeHeader.size();
+ }
+
std::string_view ReasonString = ReasonStringForHttpResultCode(m_ResponseCode);
HttpResponse.StatusCode = m_ResponseCode;
@@ -2279,6 +2293,11 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode)
HttpMessageResponseRequest* Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode);
+ if (!m_ContentRangeHeader.empty())
+ {
+ Response->SetContentRangeHeader(std::move(m_ContentRangeHeader));
+ }
+
if (SuppressBody())
{
Response->SuppressResponseBody();
@@ -2307,6 +2326,15 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentTy
HttpMessageResponseRequest* Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, Blobs);
+ if (!m_ContentTypeOverride.empty())
+ {
+ Response->SetContentTypeOverride(std::move(m_ContentTypeOverride));
+ }
+ if (!m_ContentRangeHeader.empty())
+ {
+ Response->SetContentRangeHeader(std::move(m_ContentRangeHeader));
+ }
+
if (SuppressBody())
{
Response->SuppressResponseBody();
diff --git a/src/zenhttp/servers/wstest.cpp b/src/zenhttp/servers/wstest.cpp
index 363c478ae..872698ee5 100644
--- a/src/zenhttp/servers/wstest.cpp
+++ b/src/zenhttp/servers/wstest.cpp
@@ -5,6 +5,7 @@
# include <zencore/scopeguard.h>
# include <zencore/testing.h>
# include <zencore/testutils.h>
+# include <zencore/timer.h>
# include <zenhttp/httpserver.h>
# include <zenhttp/httpwsclient.h>
@@ -477,6 +478,23 @@ namespace {
return Result;
}
+ static void WaitForServerListening(int Port)
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ asio::io_context IoCtx;
+ asio::ip::tcp::socket Probe(IoCtx);
+ asio::error_code Ec;
+ Probe.connect(asio::ip::tcp::endpoint(asio::ip::make_address("127.0.0.1"), static_cast<uint16_t>(Port)), Ec);
+ if (!Ec)
+ {
+ return;
+ }
+ Sleep(10);
+ }
+ }
+
} // anonymous namespace
TEST_CASE("websocket.integration")
@@ -502,8 +520,8 @@ TEST_CASE("websocket.integration")
Server->Close();
});
- // Give server a moment to start accepting
- Sleep(100);
+ // Wait for server to start accepting
+ WaitForServerListening(Port);
SUBCASE("handshake succeeds with 101")
{
@@ -814,7 +832,7 @@ TEST_CASE("websocket.client")
Server->Close();
});
- Sleep(100);
+ WaitForServerListening(Port);
SUBCASE("connect, echo, close")
{
@@ -938,7 +956,7 @@ TEST_CASE("websocket.client.unixsocket")
Server->Close();
});
- Sleep(100);
+ WaitForServerListening(Port);
SUBCASE("connect, echo, close over unix socket")
{
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp
index c8cf3212c..1f8b96cc4 100644
--- a/src/zenremotestore/builds/buildstorageoperations.cpp
+++ b/src/zenremotestore/builds/buildstorageoperations.cpp
@@ -3036,7 +3036,10 @@ BuildsOperationUpdateFolder::CheckRequiredDiskSpace(const tsl::robin_map<std::st
if (Space.Free < (RequiredSpace + 16u * 1024u * 1024u))
{
throw std::runtime_error(
- fmt::format("Not enough free space for target path '{}', {} of free space is needed", m_Path, RequiredSpace));
+ fmt::format("Not enough free space for target path '{}', {} of free space is needed but only {} is available",
+ m_Path,
+ NiceBytes(RequiredSpace),
+ NiceBytes(Space.Free)));
}
}
diff --git a/src/zenserver-test/buildstore-tests.cpp b/src/zenserver-test/buildstore-tests.cpp
index cf9b10896..1f2157993 100644
--- a/src/zenserver-test/buildstore-tests.cpp
+++ b/src/zenserver-test/buildstore-tests.cpp
@@ -148,8 +148,6 @@ TEST_CASE("buildstore.blobs")
}
{
- // Single-range Get
-
ZenServerInstance Instance(TestEnv);
const uint16_t PortNumber =
@@ -158,6 +156,7 @@ TEST_CASE("buildstore.blobs")
HttpClient Client(Instance.GetBaseUri() + "/builds/");
+ // Single-range Get
{
const IoHash& RawHash = CompressedBlobsHashes.front();
uint64_t BlobSize = CompressedBlobsSizes.front();
@@ -183,20 +182,63 @@ TEST_CASE("buildstore.blobs")
MemoryView RangeView = Payload.GetView();
CHECK(ActualRange.EqualBytes(RangeView));
}
- }
- {
- // Single-range Post
+ {
+ // GET blob not found
+ IoHash FakeHash = IoHash::HashBuffer("nonexistent", 11);
+ HttpClient::Response Result = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, FakeHash));
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound);
+ }
- ZenServerInstance Instance(TestEnv);
+ {
+ // GET with out-of-bounds range
+ const IoHash& RawHash = CompressedBlobsHashes.front();
+ uint64_t BlobSize = CompressedBlobsSizes.front();
- const uint16_t PortNumber =
- Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath));
- CHECK(PortNumber != 0);
+ HttpClient::KeyValueMap Headers;
+ Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", BlobSize + 100, BlobSize + 200)});
- HttpClient Client(Instance.GetBaseUri() + "/builds/");
+ HttpClient::Response Result = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), Headers);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::RangeNotSatisfiable);
+ }
{
+ // GET with multi-range header (uses Download for multipart boundary parsing)
+ const IoHash& RawHash = CompressedBlobsHashes.front();
+ uint64_t BlobSize = CompressedBlobsSizes.front();
+
+ uint64_t Range1Start = 0;
+ uint64_t Range1End = BlobSize / 4 - 1;
+ uint64_t Range2Start = BlobSize / 2;
+ uint64_t Range2End = BlobSize / 2 + BlobSize / 4 - 1;
+
+ HttpClient::KeyValueMap Headers;
+ Headers.Entries.insert({"Range", fmt::format("bytes={}-{},{}-{}", Range1Start, Range1End, Range2Start, Range2End)});
+
+ HttpClient::Response Result =
+ Client.Download(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), SystemRootPath, Headers);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::PartialContent);
+ REQUIRE_EQ(Result.Ranges.size(), 2);
+
+ HttpClient::Response FullBlobResult = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash),
+ HttpClient::Accept(ZenContentType::kCompressedBinary));
+ REQUIRE(FullBlobResult);
+
+ uint64_t Range1Len = Range1End - Range1Start + 1;
+ uint64_t Range2Len = Range2End - Range2Start + 1;
+
+ MemoryView ExpectedRange1 = FullBlobResult.ResponsePayload.GetView().Mid(Range1Start, Range1Len);
+ MemoryView ExpectedRange2 = FullBlobResult.ResponsePayload.GetView().Mid(Range2Start, Range2Len);
+
+ MemoryView ActualRange1 = Result.ResponsePayload.GetView().Mid(Result.Ranges[0].OffsetInPayload, Range1Len);
+ MemoryView ActualRange2 = Result.ResponsePayload.GetView().Mid(Result.Ranges[1].OffsetInPayload, Range2Len);
+
+ CHECK(ExpectedRange1.EqualBytes(ActualRange1));
+ CHECK(ExpectedRange2.EqualBytes(ActualRange2));
+ }
+
+ // Single-range Post
+ {
uint64_t RangeSizeSum = 0;
const IoHash& RawHash = CompressedBlobsHashes.front();
@@ -259,19 +301,96 @@ TEST_CASE("buildstore.blobs")
Offset += Range.second;
}
}
- }
- {
- // Multi-range
+ {
+ // POST with wrong accept type
+ const IoHash& RawHash = CompressedBlobsHashes.front();
- ZenServerInstance Instance(TestEnv);
+ CbObjectWriter Writer;
+ Writer.BeginArray("ranges"sv);
+ Writer.BeginObject();
+ Writer.AddInteger("offset"sv, uint64_t(0));
+ Writer.AddInteger("length"sv, uint64_t(10));
+ Writer.EndObject();
+ Writer.EndArray();
- const uint16_t PortNumber =
- Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath));
- CHECK(PortNumber != 0);
+ HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash),
+ Writer.Save(),
+ HttpClient::Accept(ZenContentType::kBinary));
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::BadRequest);
+ }
- HttpClient Client(Instance.GetBaseUri() + "/builds/");
+ {
+ // POST with missing payload
+ const IoHash& RawHash = CompressedBlobsHashes.front();
+
+ HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash),
+ HttpClient::Accept(ZenContentType::kCbPackage));
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::BadRequest);
+ }
+
+ {
+ // POST with empty ranges array
+ const IoHash& RawHash = CompressedBlobsHashes.front();
+ CbObjectWriter Writer;
+ Writer.BeginArray("ranges"sv);
+ Writer.EndArray();
+
+ HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash),
+ Writer.Save(),
+ HttpClient::Accept(ZenContentType::kCbPackage));
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::BadRequest);
+ }
+
+ {
+ // POST with range count exceeding maximum
+ const IoHash& RawHash = CompressedBlobsHashes.front();
+
+ CbObjectWriter Writer;
+ Writer.BeginArray("ranges"sv);
+ for (uint32_t I = 0; I < 257; I++)
+ {
+ Writer.BeginObject();
+ Writer.AddInteger("offset"sv, uint64_t(0));
+ Writer.AddInteger("length"sv, uint64_t(1));
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+
+ HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash),
+ Writer.Save(),
+ HttpClient::Accept(ZenContentType::kCbPackage));
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::BadRequest);
+ }
+
+ {
+ // POST with out-of-bounds range returns length=0
+ const IoHash& RawHash = CompressedBlobsHashes.front();
+ uint64_t BlobSize = CompressedBlobsSizes.front();
+
+ CbObjectWriter Writer;
+ Writer.BeginArray("ranges"sv);
+ Writer.BeginObject();
+ Writer.AddInteger("offset"sv, BlobSize + 100);
+ Writer.AddInteger("length"sv, uint64_t(50));
+ Writer.EndObject();
+ Writer.EndArray();
+
+ HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash),
+ Writer.Save(),
+ HttpClient::Accept(ZenContentType::kCbPackage));
+ REQUIRE(Result);
+ CbPackage ResponsePackage = ParsePackageMessage(Result.ResponsePayload);
+ CbObjectView ResponseObject = ResponsePackage.GetObject();
+ CbArrayView RangeArray = ResponseObject["ranges"sv].AsArrayView();
+ REQUIRE_EQ(RangeArray.Num(), uint64_t(1));
+ CbObjectView Range = (*begin(RangeArray)).AsObjectView();
+ CHECK_EQ(Range["offset"sv].AsUInt64(), BlobSize + 100);
+ CHECK_EQ(Range["length"sv].AsUInt64(), uint64_t(0));
+ }
+
+ // Multi-range
{
uint64_t RangeSizeSum = 0;
diff --git a/src/zenserver-test/cache-tests.cpp b/src/zenserver-test/cache-tests.cpp
index 986dc67e0..e54e7060d 100644
--- a/src/zenserver-test/cache-tests.cpp
+++ b/src/zenserver-test/cache-tests.cpp
@@ -172,143 +172,85 @@ TEST_CASE("zcache.cbpackage")
return true;
};
- SUBCASE("PUT/GET returns correct package")
- {
- std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
-
- ZenServerInstance Instance1(TestEnv);
- Instance1.SetDataDir(TestDir);
- const uint16_t PortNumber = Instance1.SpawnServerAndWaitUntilReady();
- const std::string BaseUri = fmt::format("http://localhost:{}/z$", PortNumber);
+ std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir();
+ std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir();
- HttpClient Http{BaseUri};
+ ZenServerInstance RemoteInstance(TestEnv);
+ RemoteInstance.SetDataDir(RemoteDataDir);
+ const uint16_t RemotePortNumber = RemoteInstance.SpawnServerAndWaitUntilReady();
- const std::string_view Bucket = "mosdef"sv;
- zen::IoHash Key;
- zen::CbPackage ExpectedPackage = CreateTestPackage(Key);
+ ZenServerInstance LocalInstance(TestEnv);
+ LocalInstance.SetDataDir(LocalDataDir);
+ LocalInstance.SpawnServer(TestEnv.GetNewPortNumber(),
+ fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber));
+ const uint16_t LocalPortNumber = LocalInstance.WaitUntilReady();
+ CHECK_MESSAGE(LocalPortNumber != 0, LocalInstance.GetLogOutput());
- // PUT
- {
- zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
- HttpClient::Response Result = Http.Put(fmt::format("/{}/{}", Bucket, Key), Body);
- CHECK(Result.StatusCode == HttpResponseCode::Created);
- }
+ const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber);
+ const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber);
- // GET
- {
- HttpClient::Response Result = Http.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}});
- CHECK(Result.StatusCode == HttpResponseCode::OK);
+ HttpClient LocalHttp{LocalBaseUri};
+ HttpClient RemoteHttp{RemoteBaseUri};
- zen::CbPackage Package;
- const bool Ok = Package.TryLoad(Result.ResponsePayload);
- CHECK(Ok);
- CHECK(IsEqual(Package, ExpectedPackage));
- }
- }
+ const std::string_view Bucket = "mosdef"sv;
- SUBCASE("PUT propagates upstream")
+ // Phase 1: PUT/GET returns correct package (via local)
{
- // Setup local and remote server
- std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir();
- std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir();
-
- ZenServerInstance RemoteInstance(TestEnv);
- RemoteInstance.SetDataDir(RemoteDataDir);
- const uint16_t RemotePortNumber = RemoteInstance.SpawnServerAndWaitUntilReady();
-
- ZenServerInstance LocalInstance(TestEnv);
- LocalInstance.SetDataDir(LocalDataDir);
- LocalInstance.SpawnServer(TestEnv.GetNewPortNumber(),
- fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber));
- const uint16_t LocalPortNumber = LocalInstance.WaitUntilReady();
- CHECK_MESSAGE(LocalPortNumber != 0, LocalInstance.GetLogOutput());
-
- const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber);
- const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber);
+ zen::IoHash Key1;
+ zen::CbPackage ExpectedPackage = CreateTestPackage(Key1);
- const std::string_view Bucket = "mosdef"sv;
- zen::IoHash Key;
- zen::CbPackage ExpectedPackage = CreateTestPackage(Key);
+ zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
+ HttpClient::Response PutResult = LocalHttp.Put(fmt::format("/{}/{}", Bucket, Key1), Body);
+ CHECK(PutResult.StatusCode == HttpResponseCode::Created);
- HttpClient LocalHttp{LocalBaseUri};
- HttpClient RemoteHttp{RemoteBaseUri};
+ HttpClient::Response GetResult = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key1), {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(GetResult.StatusCode == HttpResponseCode::OK);
- // Store the cache record package in the local instance
- {
- zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
- HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}", Bucket, Key), Body);
-
- CHECK(Result.StatusCode == HttpResponseCode::Created);
- }
-
- // The cache record can be retrieved as a package from the local instance
- {
- HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}});
- CHECK(Result.StatusCode == HttpResponseCode::OK);
-
- zen::CbPackage Package;
- const bool Ok = Package.TryLoad(Result.ResponsePayload);
- CHECK(Ok);
- CHECK(IsEqual(Package, ExpectedPackage));
- }
-
- // The cache record can be retrieved as a package from the remote instance
- {
- HttpClient::Response Result = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}});
- CHECK(Result.StatusCode == HttpResponseCode::OK);
-
- zen::CbPackage Package;
- const bool Ok = Package.TryLoad(Result.ResponsePayload);
- CHECK(Ok);
- CHECK(IsEqual(Package, ExpectedPackage));
- }
+ zen::CbPackage Package;
+ const bool Ok = Package.TryLoad(GetResult.ResponsePayload);
+ CHECK(Ok);
+ CHECK(IsEqual(Package, ExpectedPackage));
}
- SUBCASE("GET finds upstream when missing in local")
+ // Phase 2: PUT propagates upstream
{
- // Setup local and remote server
- std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir();
- std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir();
+ zen::IoHash Key2;
+ zen::CbPackage ExpectedPackage = CreateTestPackage(Key2);
- ZenServerInstance RemoteInstance(TestEnv);
- RemoteInstance.SetDataDir(RemoteDataDir);
- const uint16_t RemotePortNumber = RemoteInstance.SpawnServerAndWaitUntilReady();
+ zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
+ HttpClient::Response PutResult = LocalHttp.Put(fmt::format("/{}/{}", Bucket, Key2), Body);
+ CHECK(PutResult.StatusCode == HttpResponseCode::Created);
- ZenServerInstance LocalInstance(TestEnv);
- LocalInstance.SetDataDir(LocalDataDir);
- LocalInstance.SpawnServer(TestEnv.GetNewPortNumber(),
- fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber));
- const uint16_t LocalPortNumber = LocalInstance.WaitUntilReady();
- CHECK_MESSAGE(LocalPortNumber != 0, LocalInstance.GetLogOutput());
+ HttpClient::Response LocalGetResult = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key2), {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(LocalGetResult.StatusCode == HttpResponseCode::OK);
- const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber);
- const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber);
+ zen::CbPackage LocalPackage;
+ CHECK(LocalPackage.TryLoad(LocalGetResult.ResponsePayload));
+ CHECK(IsEqual(LocalPackage, ExpectedPackage));
- HttpClient LocalHttp{LocalBaseUri};
- HttpClient RemoteHttp{RemoteBaseUri};
+ HttpClient::Response RemoteGetResult = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key2), {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(RemoteGetResult.StatusCode == HttpResponseCode::OK);
- const std::string_view Bucket = "mosdef"sv;
- zen::IoHash Key;
- zen::CbPackage ExpectedPackage = CreateTestPackage(Key);
+ zen::CbPackage RemotePackage;
+ CHECK(RemotePackage.TryLoad(RemoteGetResult.ResponsePayload));
+ CHECK(IsEqual(RemotePackage, ExpectedPackage));
+ }
- // Store the cache record package in upstream cache
- {
- zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
- HttpClient::Response Result = RemoteHttp.Put(fmt::format("/{}/{}", Bucket, Key), Body);
+ // Phase 3: GET finds upstream when missing in local
+ {
+ zen::IoHash Key3;
+ zen::CbPackage ExpectedPackage = CreateTestPackage(Key3);
- CHECK(Result.StatusCode == HttpResponseCode::Created);
- }
+ zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage);
+ HttpClient::Response PutResult = RemoteHttp.Put(fmt::format("/{}/{}", Bucket, Key3), Body);
+ CHECK(PutResult.StatusCode == HttpResponseCode::Created);
- // The cache record can be retrieved as a package from the local cache
- {
- HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}});
- CHECK(Result.StatusCode == HttpResponseCode::OK);
+ HttpClient::Response GetResult = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key3), {{"Accept", "application/x-ue-cbpkg"}});
+ CHECK(GetResult.StatusCode == HttpResponseCode::OK);
- zen::CbPackage Package;
- const bool Ok = Package.TryLoad(Result.ResponsePayload);
- CHECK(Ok);
- CHECK(IsEqual(Package, ExpectedPackage));
- }
+ zen::CbPackage Package;
+ CHECK(Package.TryLoad(GetResult.ResponsePayload));
+ CHECK(IsEqual(Package, ExpectedPackage));
}
}
@@ -348,25 +290,25 @@ TEST_CASE("zcache.policy")
return Package;
};
- SUBCASE("query - 'local' does not query upstream (binary)")
- {
- ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
- ZenServerInstance UpstreamInst(TestEnv);
- UpstreamCfg.Spawn(UpstreamInst);
- const uint16_t UpstreamPort = UpstreamCfg.Port;
+ ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
+ ZenServerInstance UpstreamInst(TestEnv);
+ UpstreamCfg.Spawn(UpstreamInst);
+
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
+ ZenServerInstance LocalInst(TestEnv);
+ LocalCfg.Spawn(LocalInst);
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamPort);
- ZenServerInstance LocalInst(TestEnv);
- LocalCfg.Spawn(LocalInst);
+ HttpClient LocalHttp{LocalCfg.BaseUri};
+ HttpClient RemoteHttp{UpstreamCfg.BaseUri};
- const std::string_view Bucket = "legacy"sv;
+ // query - 'local' does not query upstream (binary)
+ // Uses size 1024 for unique key
+ {
+ const auto Bucket = "legacy"sv;
zen::IoHash Key;
IoBuffer BinaryValue = GenerateData(1024, Key);
- HttpClient LocalHttp{LocalCfg.BaseUri};
- HttpClient RemoteHttp{UpstreamCfg.BaseUri};
-
{
HttpClient::Response Result = RemoteHttp.Put(fmt::format("/{}/{}", Bucket, Key), BinaryValue);
CHECK(Result.StatusCode == HttpResponseCode::Created);
@@ -385,26 +327,14 @@ TEST_CASE("zcache.policy")
}
}
- SUBCASE("store - 'local' does not store upstream (binary)")
+ // store - 'local' does not store upstream (binary)
+ // Uses size 2048 for unique key
{
- ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
- ZenServerInstance UpstreamInst(TestEnv);
- UpstreamCfg.Spawn(UpstreamInst);
- const uint16_t UpstreamPort = UpstreamCfg.Port;
-
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamPort);
- ZenServerInstance LocalInst(TestEnv);
- LocalCfg.Spawn(LocalInst);
-
const auto Bucket = "legacy"sv;
zen::IoHash Key;
- IoBuffer BinaryValue = GenerateData(1024, Key);
-
- HttpClient LocalHttp{LocalCfg.BaseUri};
- HttpClient RemoteHttp{UpstreamCfg.BaseUri};
+ IoBuffer BinaryValue = GenerateData(2048, Key);
- // Store binary cache value locally
{
HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,StoreLocal", Bucket, Key),
BinaryValue,
@@ -423,25 +353,14 @@ TEST_CASE("zcache.policy")
}
}
- SUBCASE("store - 'local/remote' stores local and upstream (binary)")
+ // store - 'local/remote' stores local and upstream (binary)
+ // Uses size 4096 for unique key
{
- ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
- ZenServerInstance UpstreamInst(TestEnv);
- UpstreamCfg.Spawn(UpstreamInst);
-
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
- ZenServerInstance LocalInst(TestEnv);
- LocalCfg.Spawn(LocalInst);
-
const auto Bucket = "legacy"sv;
zen::IoHash Key;
- IoBuffer BinaryValue = GenerateData(1024, Key);
+ IoBuffer BinaryValue = GenerateData(4096, Key);
- HttpClient LocalHttp{LocalCfg.BaseUri};
- HttpClient RemoteHttp{UpstreamCfg.BaseUri};
-
- // Store binary cache value locally and upstream
{
HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,Store", Bucket, Key),
BinaryValue,
@@ -460,27 +379,16 @@ TEST_CASE("zcache.policy")
}
}
- SUBCASE("query - 'local' does not query upstream (cbpackage)")
+ // query - 'local' does not query upstream (cbpackage)
+ // Uses bucket "policy4" to isolate from other cbpackage scenarios (deterministic key)
{
- ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
- ZenServerInstance UpstreamInst(TestEnv);
- UpstreamCfg.Spawn(UpstreamInst);
-
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
- ZenServerInstance LocalInst(TestEnv);
- LocalCfg.Spawn(LocalInst);
-
- const auto Bucket = "legacy"sv;
+ const auto Bucket = "policy4"sv;
zen::IoHash Key;
zen::IoHash PayloadId;
zen::CbPackage Package = GeneratePackage(Key, PayloadId);
IoBuffer Buf = SerializeToBuffer(Package);
- HttpClient LocalHttp{LocalCfg.BaseUri};
- HttpClient RemoteHttp{UpstreamCfg.BaseUri};
-
- // Store package upstream
{
HttpClient::Response Result = RemoteHttp.Put(fmt::format("/{}/{}", Bucket, Key), Buf);
CHECK(Result.StatusCode == HttpResponseCode::Created);
@@ -499,27 +407,16 @@ TEST_CASE("zcache.policy")
}
}
- SUBCASE("store - 'local' does not store upstream (cbpackage)")
+ // store - 'local' does not store upstream (cbpackage)
+ // Uses bucket "policy5" to isolate from other cbpackage scenarios (deterministic key)
{
- ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
- ZenServerInstance UpstreamInst(TestEnv);
- UpstreamCfg.Spawn(UpstreamInst);
-
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
- ZenServerInstance LocalInst(TestEnv);
- LocalCfg.Spawn(LocalInst);
-
- const auto Bucket = "legacy"sv;
+ const auto Bucket = "policy5"sv;
zen::IoHash Key;
zen::IoHash PayloadId;
zen::CbPackage Package = GeneratePackage(Key, PayloadId);
IoBuffer Buf = SerializeToBuffer(Package);
- HttpClient LocalHttp{LocalCfg.BaseUri};
- HttpClient RemoteHttp{UpstreamCfg.BaseUri};
-
- // Store package locally
{
HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,StoreLocal", Bucket, Key), Buf);
CHECK(Result.StatusCode == HttpResponseCode::Created);
@@ -536,27 +433,16 @@ TEST_CASE("zcache.policy")
}
}
- SUBCASE("store - 'local/remote' stores local and upstream (cbpackage)")
+ // store - 'local/remote' stores local and upstream (cbpackage)
+ // Uses bucket "policy6" to isolate from other cbpackage scenarios (deterministic key)
{
- ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
- ZenServerInstance UpstreamInst(TestEnv);
- UpstreamCfg.Spawn(UpstreamInst);
-
- ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
- ZenServerInstance LocalInst(TestEnv);
- LocalCfg.Spawn(LocalInst);
-
- const auto Bucket = "legacy"sv;
+ const auto Bucket = "policy6"sv;
zen::IoHash Key;
zen::IoHash PayloadId;
zen::CbPackage Package = GeneratePackage(Key, PayloadId);
IoBuffer Buf = SerializeToBuffer(Package);
- HttpClient LocalHttp{LocalCfg.BaseUri};
- HttpClient RemoteHttp{UpstreamCfg.BaseUri};
-
- // Store package locally and upstream
{
HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,Store", Bucket, Key), Buf);
CHECK(Result.StatusCode == HttpResponseCode::Created);
@@ -573,78 +459,62 @@ TEST_CASE("zcache.policy")
}
}
- SUBCASE("skip - 'data' returns cache record without attachments/empty payload")
+ // skip - 'data' returns cache record without attachments/empty payload
+ // Uses bucket "skiptest7" to isolate from other cbpackage scenarios
{
- ZenConfig Cfg = ZenConfig::New(TestEnv.GetNewPortNumber());
- ZenServerInstance Instance(TestEnv);
- Cfg.Spawn(Instance);
-
- const auto Bucket = "test"sv;
+ const auto Bucket = "skiptest7"sv;
zen::IoHash Key;
zen::IoHash PayloadId;
zen::CbPackage Package = GeneratePackage(Key, PayloadId);
IoBuffer Buf = SerializeToBuffer(Package);
- HttpClient Http{Cfg.BaseUri};
-
- // Store package
{
- HttpClient::Response Result = Http.Put(fmt::format("/{}/{}", Bucket, Key), Buf);
+ HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}", Bucket, Key), Buf);
CHECK(Result.StatusCode == HttpResponseCode::Created);
}
- // Get package
{
HttpClient::Response Result =
- Http.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}});
+ LocalHttp.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}});
CHECK(Result);
CbPackage ResponsePackage;
CHECK(ResponsePackage.TryLoad(Result.ResponsePayload));
CHECK(ResponsePackage.GetAttachments().size() == 0);
}
- // Get record
{
HttpClient::Response Result =
- Http.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/x-ue-cb"}});
+ LocalHttp.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/x-ue-cb"}});
CHECK(Result);
CbObject ResponseObject = zen::LoadCompactBinaryObject(Result.ResponsePayload);
CHECK(ResponseObject);
}
- // Get payload
{
- HttpClient::Response Result =
- Http.Get(fmt::format("/{}/{}/{}?Policy=Default,SkipData", Bucket, Key, PayloadId), {{"Accept", "application/x-ue-comp"}});
+ HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}/{}?Policy=Default,SkipData", Bucket, Key, PayloadId),
+ {{"Accept", "application/x-ue-comp"}});
CHECK(Result);
CHECK(Result.ResponsePayload.GetSize() == 0);
}
}
- SUBCASE("skip - 'data' returns empty binary value")
+ // skip - 'data' returns empty binary value
+ // Uses size 8192 for unique key (avoids collision with size 1024/2048/4096 above)
{
- ZenConfig Cfg = ZenConfig::New(TestEnv.GetNewPortNumber());
- ZenServerInstance Instance(TestEnv);
- Cfg.Spawn(Instance);
-
- const auto Bucket = "test"sv;
+ const auto Bucket = "skiptest8"sv;
zen::IoHash Key;
- IoBuffer BinaryValue = GenerateData(1024, Key);
-
- HttpClient Http{Cfg.BaseUri};
+ IoBuffer BinaryValue = GenerateData(8192, Key);
- // Store binary cache value
{
- HttpClient::Response Result = Http.Put(fmt::format("/{}/{}", Bucket, Key), BinaryValue);
+ HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}", Bucket, Key), BinaryValue);
CHECK(Result.StatusCode == HttpResponseCode::Created);
}
- // Get package
{
HttpClient::Response Result =
- Http.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/octet-stream"}});
+ LocalHttp.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/octet-stream"}});
CHECK(Result);
CHECK(Result.ResponsePayload.GetSize() == 0);
}
diff --git a/src/zenserver-test/compute-tests.cpp b/src/zenserver-test/compute-tests.cpp
index 835d72713..ce2db366a 100644
--- a/src/zenserver-test/compute-tests.cpp
+++ b/src/zenserver-test/compute-tests.cpp
@@ -357,6 +357,41 @@ PollForLsnInCompleted(HttpClient& Client, const std::string& CompletedUrl, int L
return false;
}
+static void
+WaitForActionRunning(zen::compute::ComputeServiceSession& Session, uint64_t TimeoutMs = 10'000)
+{
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < TimeoutMs)
+ {
+ if (Session.GetActionCounts().Running > 0)
+ {
+ return;
+ }
+ Sleep(50);
+ }
+ FAIL("Timed out waiting for action to reach Running state");
+}
+
+static void
+WaitForAnyActionRunningHttp(HttpClient& Client, uint64_t TimeoutMs = 10'000)
+{
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < TimeoutMs)
+ {
+ HttpClient::Response Resp = Client.Get("/jobs/running"sv);
+ if (Resp)
+ {
+ CbObject Obj = Resp.AsObject();
+ if (Obj["running"sv].AsArrayView().Num() > 0)
+ {
+ return;
+ }
+ }
+ Sleep(50);
+ }
+ FAIL("Timed out waiting for any action to reach Running state");
+}
+
static std::string
GetRot13Output(const CbPackage& ResultPackage)
{
@@ -906,10 +941,10 @@ TEST_CASE("function.queues.cancel_running")
REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission");
// Wait for the worker process to start executing before cancelling
- Sleep(1'000);
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ WaitForAnyActionRunningHttp(Client);
// Cancel the queue, which should interrupt the running Sleep job
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
HttpClient::Response CancelResp = Client.Delete(QueueUrl);
REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent,
fmt::format("Queue cancellation failed: status={}, body={}", CancelResp.StatusCode, CancelResp.ToText()));
@@ -961,10 +996,10 @@ TEST_CASE("function.queues.remote_cancel")
REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission");
// Wait for the worker process to start executing before cancelling
- Sleep(1'000);
+ const std::string QueueUrl = fmt::format("/queues/{}", QueueToken);
+ WaitForAnyActionRunningHttp(Client);
// Cancel the queue via its OID token
- const std::string QueueUrl = fmt::format("/queues/{}", QueueToken);
HttpClient::Response CancelResp = Client.Delete(QueueUrl);
REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent,
fmt::format("Remote queue cancellation failed: status={}, body={}", CancelResp.StatusCode, CancelResp.ToText()));
@@ -1207,7 +1242,7 @@ TEST_CASE("function.priority")
// that exit with non-zero exit codes, including retry behaviour and
// final failure reporting.
-TEST_CASE("function.exit_code.failed_action")
+TEST_CASE("function.exit_code")
{
ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
Instance.SetDataDir(TestEnv.CreateNewTestDir());
@@ -1219,232 +1254,154 @@ TEST_CASE("function.exit_code.failed_action")
const IoHash WorkerId = RegisterWorker(Client, TestEnv);
- // Create a queue with max_retries=0 so the action fails immediately
- // without being rescheduled.
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 0;
-
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
-
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ auto CreateQueue = [&](int MaxRetries) -> std::pair<int, std::string> {
+ CbObjectWriter ConfigWriter;
+ ConfigWriter << "max_retries"sv << MaxRetries;
+ CbObjectWriter BodyWriter;
+ BodyWriter << "config"sv << ConfigWriter.Save();
+ HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
+ REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ return {QueueId, fmt::format("/queues/{}", QueueId)};
+ };
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ // Scenario 1: failed_action - immediate failure with max_retries=0
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(0);
- // Submit a Fail action with exit code 42
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(42));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(42));
+ REQUIRE_MESSAGE(SubmitResp,
+ fmt::format("Fail job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
- const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
- REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Fail job submission");
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Fail job submission");
- // Poll for the LSN to appear in the completed list
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
- fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
- Lsn,
- QueueId,
- Instance.GetLogOutput()));
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
- // Verify queue status reflects the failure
- HttpClient::Response StatusResp = Client.Get(QueueUrl);
- REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- CbObject QueueStatus = StatusResp.AsObject();
- CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
- // Verify action history records the failure
- const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
- HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
- REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
+ const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
+ HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
+ REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
- bool FoundInHistory = false;
- for (auto& Item : HistoryResp.AsObject()["history"sv])
- {
- if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ bool FoundInHistory = false;
+ for (auto& Item : HistoryResp.AsObject()["history"sv])
{
- CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
- FoundInHistory = true;
- break;
+ if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ {
+ CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
+ FoundInHistory = true;
+ break;
+ }
}
- }
- CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn));
-
- // GET /jobs/{lsn} for a failed action should return OK but with an empty result package
- const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn);
- HttpClient::Response ResultResp = Client.Get(ResultUrl);
- CHECK_EQ(ResultResp.StatusCode, HttpResponseCode::OK);
-}
-
-TEST_CASE("function.exit_code.auto_retry")
-{
- ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
- Instance.SetDataDir(TestEnv.CreateNewTestDir());
- const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
- REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+ CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn));
- const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
- HttpClient Client(ComputeBaseUri);
-
- const IoHash WorkerId = RegisterWorker(Client, TestEnv);
-
- // Create a queue with max_retries=2 so the action is retried twice before
- // being reported as failed (3 total attempts: initial + 2 retries).
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 2;
-
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
+ const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn);
+ HttpClient::Response ResultResp = Client.Get(ResultUrl);
+ CHECK_EQ(ResultResp.StatusCode, HttpResponseCode::OK);
+ }
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ // Scenario 2: auto_retry - retried twice before permanent failure
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(2);
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(1));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode));
- // Submit a Fail action — the worker process will exit with code 1 on
- // every attempt, eventually exhausting retries.
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(1));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode));
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
- const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
+ fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
- // Poll for the LSN to appear in the completed list — this only
- // happens after all retries are exhausted.
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
- fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}",
- Lsn,
- QueueId,
- Instance.GetLogOutput()));
+ const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
+ HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
+ REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
- // Verify the action history records the retry count
- const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
- HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
- REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
-
- for (auto& Item : HistoryResp.AsObject()["history"sv])
- {
- if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ for (auto& Item : HistoryResp.AsObject()["history"sv])
{
- CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
- CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 2);
- break;
+ if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ {
+ CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
+ CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 2);
+ break;
+ }
}
- }
- // Queue should show 1 failed, 0 completed
- HttpClient::Response StatusResp = Client.Get(QueueUrl);
- REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- CbObject QueueStatus = StatusResp.AsObject();
- CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
-}
-
-TEST_CASE("function.exit_code.reschedule_failed")
-{
- ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
- Instance.SetDataDir(TestEnv.CreateNewTestDir());
- const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
- REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
-
- const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
- HttpClient Client(ComputeBaseUri);
-
- const IoHash WorkerId = RegisterWorker(Client, TestEnv);
-
- // Create a queue with max_retries=1 so we have room for one manual reschedule
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 1;
-
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
-
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
-
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
-
- // Submit a Fail action — auto-retry will fire once, then it lands in results as Failed
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(7));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode));
-
- const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
-
- // Wait for the action to exhaust its auto-retry and land in completed
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
- fmt::format("LSN {} did not appear in queue completed list\nServer log:\n{}", Lsn, Instance.GetLogOutput()));
-
- // Try to manually reschedule — should fail because retry limit is reached
- const std::string RescheduleUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn);
- HttpClient::Response RescheduleResp = Client.Post(RescheduleUrl);
- CHECK_EQ(RescheduleResp.StatusCode, HttpResponseCode::Conflict);
-}
-
-TEST_CASE("function.exit_code.mixed_success_and_failure")
-{
- ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
- Instance.SetDataDir(TestEnv.CreateNewTestDir());
- const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
- REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+ }
- const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
- HttpClient Client(ComputeBaseUri);
+ // Scenario 3: reschedule_failed - manual reschedule rejected after retry limit
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(1);
- const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(7));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode));
- // Create a queue with max_retries=0 for fast failure
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 0;
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
+ fmt::format("LSN {} did not appear in queue completed list\nServer log:\n{}", Lsn, Instance.GetLogOutput()));
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ const std::string RescheduleUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn);
+ HttpClient::Response RescheduleResp = Client.Post(RescheduleUrl);
+ CHECK_EQ(RescheduleResp.StatusCode, HttpResponseCode::Conflict);
+ }
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ // Scenario 4: mixed_success_and_failure - one success and one failure in the same queue
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(0);
- // Submit one Rot13 (success) and one Fail (failure)
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SuccessResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello"sv));
- REQUIRE_MESSAGE(SuccessResp, "Rot13 job submission failed");
- const int LsnSuccess = SuccessResp.AsObject()["lsn"sv].AsInt32();
+ HttpClient::Response SuccessResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello"sv));
+ REQUIRE_MESSAGE(SuccessResp, "Rot13 job submission failed");
+ const int LsnSuccess = SuccessResp.AsObject()["lsn"sv].AsInt32();
- HttpClient::Response FailResp = Client.Post(JobUrl, BuildFailActionPackage(1));
- REQUIRE_MESSAGE(FailResp, "Fail job submission failed");
- const int LsnFail = FailResp.AsObject()["lsn"sv].AsInt32();
+ HttpClient::Response FailResp = Client.Post(JobUrl, BuildFailActionPackage(1));
+ REQUIRE_MESSAGE(FailResp, "Fail job submission failed");
+ const int LsnFail = FailResp.AsObject()["lsn"sv].AsInt32();
- // Wait for both to appear in the completed list
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnSuccess),
- fmt::format("Success LSN {} did not complete\nServer log:\n{}", LsnSuccess, Instance.GetLogOutput()));
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnFail),
- fmt::format("Fail LSN {} did not complete\nServer log:\n{}", LsnFail, Instance.GetLogOutput()));
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnSuccess),
+ fmt::format("Success LSN {} did not complete\nServer log:\n{}", LsnSuccess, Instance.GetLogOutput()));
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnFail),
+ fmt::format("Fail LSN {} did not complete\nServer log:\n{}", LsnFail, Instance.GetLogOutput()));
- // Verify queue counters
- HttpClient::Response StatusResp = Client.Get(QueueUrl);
- REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- CbObject QueueStatus = StatusResp.AsObject();
- CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0);
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0);
+ }
}
-TEST_CASE("function.crash.abort")
+TEST_CASE("function.crash")
{
ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
Instance.SetDataDir(TestEnv.CreateNewTestDir());
@@ -1456,174 +1413,125 @@ TEST_CASE("function.crash.abort")
const IoHash WorkerId = RegisterWorker(Client, TestEnv);
- // Create a queue with max_retries=0 so we don't wait through retries
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 0;
+ auto CreateQueue = [&](int MaxRetries) -> std::pair<int, std::string> {
+ CbObjectWriter ConfigWriter;
+ ConfigWriter << "max_retries"sv << MaxRetries;
+ CbObjectWriter BodyWriter;
+ BodyWriter << "config"sv << ConfigWriter.Save();
+ HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
+ REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
+ return {QueueId, fmt::format("/queues/{}", QueueId)};
+ };
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
+ // Scenario 1: abort - worker process calls std::abort(), no retries
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(0);
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv));
+ REQUIRE_MESSAGE(SubmitResp,
+ fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission");
- // Submit a Crash action that calls std::abort()
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
- const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
- REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission");
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- // Poll for the LSN to appear in the completed list
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
- fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
- Lsn,
- QueueId,
- Instance.GetLogOutput()));
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
- // Verify queue status reflects the failure
- HttpClient::Response StatusResp = Client.Get(QueueUrl);
- REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
-
- CbObject QueueStatus = StatusResp.AsObject();
- CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
-
- // Verify action history records the failure
- const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
- HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
- REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
+ const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
+ HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
+ REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
- bool FoundInHistory = false;
- for (auto& Item : HistoryResp.AsObject()["history"sv])
- {
- if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ bool FoundInHistory = false;
+ for (auto& Item : HistoryResp.AsObject()["history"sv])
{
- CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
- FoundInHistory = true;
- break;
+ if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ {
+ CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
+ FoundInHistory = true;
+ break;
+ }
}
+ CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn));
}
- CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn));
-}
-
-TEST_CASE("function.crash.nullptr")
-{
- ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
- Instance.SetDataDir(TestEnv.CreateNewTestDir());
- const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
- REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
-
- const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
- HttpClient Client(ComputeBaseUri);
-
- const IoHash WorkerId = RegisterWorker(Client, TestEnv);
-
- // Create a queue with max_retries=0
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 0;
-
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
-
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
-
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
-
- // Submit a Crash action that dereferences a null pointer
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("nullptr"sv));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
-
- const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
- REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission");
-
- // Poll for the LSN to appear in the completed list
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
- fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
- Lsn,
- QueueId,
- Instance.GetLogOutput()));
-
- // Verify queue status reflects the failure
- HttpClient::Response StatusResp = Client.Get(QueueUrl);
- REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- CbObject QueueStatus = StatusResp.AsObject();
- CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
-}
-
-TEST_CASE("function.crash.auto_retry")
-{
- ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer);
- Instance.SetDataDir(TestEnv.CreateNewTestDir());
- const uint16_t Port = Instance.SpawnServerAndWaitUntilReady();
- REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput());
+ // Scenario 2: nullptr - worker process dereferences null, no retries
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(0);
- const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port);
- HttpClient Client(ComputeBaseUri);
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("nullptr"sv));
+ REQUIRE_MESSAGE(SubmitResp,
+ fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText()));
- const IoHash WorkerId = RegisterWorker(Client, TestEnv);
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission");
- // Create a queue with max_retries=1 — the crash should be retried once
- // before being reported as permanently failed.
- CbObjectWriter ConfigWriter;
- ConfigWriter << "max_retries"sv << 1;
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn),
+ fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
- CbObjectWriter BodyWriter;
- BodyWriter << "config"sv << ConfigWriter.Save();
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save());
- REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode));
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+ }
- const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32();
- const std::string QueueUrl = fmt::format("/queues/{}", QueueId);
+ // Scenario 3: auto_retry - crash retried once before permanent failure
+ {
+ auto [QueueId, QueueUrl] = CreateQueue(1);
- // Submit a Crash action — will crash on every attempt
- const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
- HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv));
- REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}", SubmitResp.StatusCode));
+ const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString());
+ HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv));
+ REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}", SubmitResp.StatusCode));
- const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
+ const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
- // Poll for the LSN to appear in the completed list after retries exhaust
- const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
- REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
- fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}",
- Lsn,
- QueueId,
- Instance.GetLogOutput()));
+ const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId);
+ REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000),
+ fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}",
+ Lsn,
+ QueueId,
+ Instance.GetLogOutput()));
- // Verify the action history records the retry count
- const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
- HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
- REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
+ const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId);
+ HttpClient::Response HistoryResp = Client.Get(HistoryUrl);
+ REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history");
- for (auto& Item : HistoryResp.AsObject()["history"sv])
- {
- if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ for (auto& Item : HistoryResp.AsObject()["history"sv])
{
- CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
- CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 1);
- break;
+ if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn)
+ {
+ CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false);
+ CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 1);
+ break;
+ }
}
- }
- // Queue should show 1 failed, 0 completed
- HttpClient::Response StatusResp = Client.Get(QueueUrl);
- REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
+ HttpClient::Response StatusResp = Client.Get(QueueUrl);
+ REQUIRE_MESSAGE(StatusResp, "Failed to get queue status");
- CbObject QueueStatus = StatusResp.AsObject();
- CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
- CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+ CbObject QueueStatus = StatusResp.AsObject();
+ CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1);
+ CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0);
+ }
}
//////////////////////////////////////////////////////////////////////////
@@ -1662,7 +1570,6 @@ TEST_CASE("function.remote.worker_sync_on_discovery")
// Trigger immediate orchestrator re-query and wait for runner setup
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Submit Rot13 action via session
CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver);
@@ -1721,7 +1628,6 @@ TEST_CASE("function.remote.late_runner_discovery")
// Wait for W1 discovery
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Baseline: submit Rot13 action and verify it completes on W1
{
@@ -1763,23 +1669,33 @@ TEST_CASE("function.remote.late_runner_discovery")
// Wait for W2 discovery
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
- // Verify W2 received the worker by querying its /compute/workers endpoint directly
+ // Poll W2 until the worker has been synced (SyncWorkersToRunner is async)
{
- const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port2);
- HttpClient Client(ComputeBaseUri);
- HttpClient::Response ListResp = Client.Get("/workers"sv);
- REQUIRE_MESSAGE(ListResp, "Failed to list workers on W2");
+ const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port2);
+ HttpClient Client(ComputeBaseUri);
+ bool WorkerFound = false;
+ Stopwatch Timer;
- bool WorkerFound = false;
- for (auto& Item : ListResp.AsObject()["workers"sv])
+ while (Timer.GetElapsedTimeMs() < 10'000)
{
- if (Item.AsHash() == WorkerPackage.GetObjectHash())
+ HttpClient::Response ListResp = Client.Get("/workers"sv);
+ if (ListResp)
+ {
+ for (auto& Item : ListResp.AsObject()["workers"sv])
+ {
+ if (Item.AsHash() == WorkerPackage.GetObjectHash())
+ {
+ WorkerFound = true;
+ break;
+ }
+ }
+ }
+ if (WorkerFound)
{
- WorkerFound = true;
break;
}
+ Sleep(50);
}
REQUIRE_MESSAGE(WorkerFound,
@@ -1844,7 +1760,6 @@ TEST_CASE("function.remote.queue_association")
// Wait for scheduler to discover the runner
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Create a local queue and submit action to it
auto QueueResult = Session.CreateQueue();
@@ -1922,7 +1837,6 @@ TEST_CASE("function.remote.queue_cancel_propagation")
// Wait for scheduler to discover the runner
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Create a local queue and submit a long-running Sleep action
auto QueueResult = Session.CreateQueue();
@@ -1935,7 +1849,7 @@ TEST_CASE("function.remote.queue_cancel_propagation")
REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed");
// Wait for the action to start running on the remote
- Sleep(2'000);
+ WaitForActionRunning(Session);
// Cancel the local queue — this should propagate to the remote
Session.CancelQueue(QueueId);
@@ -2008,7 +1922,7 @@ TEST_CASE("function.abandon_running_http")
REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN");
// Wait for the process to start running
- Sleep(1'000);
+ WaitForAnyActionRunningHttp(Client);
// Verify the ready endpoint returns OK before abandon
{
@@ -2139,7 +2053,6 @@ TEST_CASE("function.session.abandon_running")
// Wait for scheduler to discover the runner
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Create a queue and submit a long-running Sleep action
auto QueueResult = Session.CreateQueue();
@@ -2152,7 +2065,7 @@ TEST_CASE("function.session.abandon_running")
REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed");
// Wait for the action to start running on the remote
- Sleep(2'000);
+ WaitForActionRunning(Session);
// Transition to Abandoned — should abandon the running action
bool Transitioned = Session.Abandon();
@@ -2210,7 +2123,6 @@ TEST_CASE("function.remote.abandon_propagation")
// Wait for scheduler to discover the runner
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Create a local queue and submit a long-running Sleep action
auto QueueResult = Session.CreateQueue();
@@ -2223,7 +2135,7 @@ TEST_CASE("function.remote.abandon_propagation")
REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed");
// Wait for the action to start running on the remote
- Sleep(2'000);
+ WaitForActionRunning(Session);
// Transition to Abandoned — should abandon the running action and propagate
bool Transitioned = Session.Abandon();
@@ -2283,7 +2195,6 @@ TEST_CASE("function.remote.shutdown_cancels_queues")
Session.RegisterWorker(WorkerPackage);
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Create a queue and submit a long-running action so the remote queue is established
auto QueueResult = Session.CreateQueue();
@@ -2296,7 +2207,7 @@ TEST_CASE("function.remote.shutdown_cancels_queues")
REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed");
// Wait for the action to start running on the remote
- Sleep(2'000);
+ WaitForActionRunning(Session);
// Verify the remote has a non-implicit queue before shutdown
HttpClient RemoteClient(Instance.GetBaseUri() + "/compute");
@@ -2358,7 +2269,6 @@ TEST_CASE("function.remote.shutdown_rejects_new_work")
// Wait for runner discovery
Session.NotifyOrchestratorChanged();
- Sleep(2'000);
// Baseline: submit an action and verify it completes
{
@@ -2415,7 +2325,7 @@ TEST_CASE("function.session.retract_pending")
REQUIRE_MESSAGE(Enqueued, "Failed to enqueue action");
// Let the scheduler process the pending action
- Sleep(500);
+ Sleep(50);
// Retract the pending action
auto Result = Session.RetractAction(Enqueued.Lsn);
@@ -2424,7 +2334,7 @@ TEST_CASE("function.session.retract_pending")
// The action should be re-enqueued as pending (still no runners, so stays pending).
// Let the scheduler process the retracted action back to pending.
- Sleep(500);
+ Sleep(50);
// Queue should still show 1 active (the action was rescheduled, not completed)
auto Status = Session.GetQueueStatus(QueueResult.QueueId);
@@ -2509,8 +2419,8 @@ TEST_CASE("function.retract_http")
const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32();
REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from job submission");
- // Wait for the scheduler to process the pending action into m_PendingActions
- Sleep(1'000);
+ // Wait for the blocker action to start running (occupying the single slot)
+ WaitForAnyActionRunningHttp(Client);
// Retract the pending action via POST /jobs/{lsn}/retract
const std::string RetractUrl = fmt::format("/jobs/{}/retract", Lsn);
@@ -2529,7 +2439,7 @@ TEST_CASE("function.retract_http")
}
// A second retract should also succeed (action is back to pending)
- Sleep(500);
+ Sleep(50);
HttpClient::Response RetractResp2 = Client.Post(RetractUrl);
CHECK_MESSAGE(RetractResp2.StatusCode == HttpResponseCode::OK,
fmt::format("Second retract failed: status={}, body={}", RetractResp2.StatusCode, RetractResp2.ToText()));
diff --git a/src/zenserver-test/hub-tests.cpp b/src/zenserver-test/hub-tests.cpp
index 487e22b4b..35a840e5d 100644
--- a/src/zenserver-test/hub-tests.cpp
+++ b/src/zenserver-test/hub-tests.cpp
@@ -329,17 +329,36 @@ TEST_CASE("hub.lifecycle.children")
CHECK_EQ(Result.AsText(), "GhijklmNop"sv);
}
- Result = Client.Post("modules/abc/deprovision");
+ // Deprovision all modules at once
+ Result = Client.Post("deprovision");
REQUIRE(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted);
+ {
+ CbObject Body = Result.AsObject();
+ CbArrayView AcceptedArr = Body["Accepted"].AsArrayView();
+ CHECK_EQ(AcceptedArr.Num(), 2u);
+ bool FoundAbc = false;
+ bool FoundDef = false;
+ for (CbFieldView F : AcceptedArr)
+ {
+ if (F.AsString() == "abc"sv)
+ {
+ FoundAbc = true;
+ }
+ else if (F.AsString() == "def"sv)
+ {
+ FoundDef = true;
+ }
+ }
+ CHECK(FoundAbc);
+ CHECK(FoundDef);
+ }
REQUIRE(WaitForModuleGone(Client, "abc"));
+ REQUIRE(WaitForModuleGone(Client, "def"));
{
HttpClient ModClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout);
CHECK(WaitForPortUnreachable(ModClient));
}
-
- Result = Client.Post("modules/def/deprovision");
- REQUIRE(Result);
- REQUIRE(WaitForModuleGone(Client, "def"));
{
HttpClient ModClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout);
CHECK(WaitForPortUnreachable(ModClient));
@@ -349,6 +368,10 @@ TEST_CASE("hub.lifecycle.children")
Result = Client.Get("status");
REQUIRE(Result);
CHECK_EQ(Result.AsObject()["modules"].AsArrayView().Num(), 0u);
+
+ // Deprovision-all with no modules
+ Result = Client.Post("deprovision");
+ CHECK(Result);
}
static bool
diff --git a/src/zenserver-test/logging-tests.cpp b/src/zenserver-test/logging-tests.cpp
index 2e530ff92..549d2b322 100644
--- a/src/zenserver-test/logging-tests.cpp
+++ b/src/zenserver-test/logging-tests.cpp
@@ -146,28 +146,6 @@ TEST_CASE("logging.file.json")
CHECK_MESSAGE(LogContains(FileLog, "\"message\""), FileLog);
CHECK_MESSAGE(LogContains(FileLog, "\"source\": \"zenserver\""), FileLog);
CHECK_MESSAGE(LogContains(FileLog, "server session id"), FileLog);
-}
-
-// --log-id <id> is automatically set to the server instance name in test mode.
-// The JSON formatter emits this value as the "id" field, so every entry in a
-// .json log file must carry a non-empty "id".
-TEST_CASE("logging.log_id")
-{
- const std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
- const std::filesystem::path LogFile = TestDir / "test.json";
-
- ZenServerInstance Instance(TestEnv);
- Instance.SetDataDir(TestDir);
-
- const std::string LogArg = fmt::format("--abslog {}", LogFile.string());
- const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(LogArg);
- CHECK_MESSAGE(Port != 0, Instance.GetLogOutput());
-
- Instance.Shutdown();
-
- CHECK_MESSAGE(std::filesystem::exists(LogFile), "JSON log file was not created");
- const std::string FileLog = ReadFileToString(LogFile);
- // The JSON formatter writes the log-id as: "id": "<value>",
CHECK_MESSAGE(LogContains(FileLog, "\"id\": \""), FileLog);
}
diff --git a/src/zenserver-test/objectstore-tests.cpp b/src/zenserver-test/objectstore-tests.cpp
index ff2314089..99c92e15f 100644
--- a/src/zenserver-test/objectstore-tests.cpp
+++ b/src/zenserver-test/objectstore-tests.cpp
@@ -19,18 +19,22 @@ using namespace std::literals;
TEST_SUITE_BEGIN("server.objectstore");
-TEST_CASE("objectstore.blobs")
+TEST_CASE("objectstore")
{
- std::string_view Bucket = "bkt"sv;
+ ZenServerInstance Instance(TestEnv);
+
+ const uint16_t Port = Instance.SpawnServerAndWaitUntilReady("--objectstore-enabled");
+ CHECK(Port != 0);
- std::vector<IoHash> CompressedBlobsHashes;
- std::vector<uint64_t> BlobsSizes;
- std::vector<uint64_t> CompressedBlobsSizes;
+ // --- objectstore.blobs ---
{
- ZenServerInstance Instance(TestEnv);
+ INFO("objectstore.blobs");
- const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady(fmt::format("--objectstore-enabled"));
- CHECK(PortNumber != 0);
+ std::string_view Bucket = "bkt"sv;
+
+ std::vector<IoHash> CompressedBlobsHashes;
+ std::vector<uint64_t> BlobsSizes;
+ std::vector<uint64_t> CompressedBlobsSizes;
HttpClient Client(Instance.GetBaseUri() + "/obj/");
@@ -68,97 +72,192 @@ TEST_CASE("objectstore.blobs")
CHECK_EQ(RawSize, BlobsSizes[I]);
}
}
-}
-TEST_CASE("objectstore.s3client")
-{
- ZenServerInstance Instance(TestEnv);
- const uint16_t Port = Instance.SpawnServerAndWaitUntilReady("--objectstore-enabled");
- CHECK_MESSAGE(Port != 0, Instance.GetLogOutput());
-
- // S3Client in path-style builds paths as /{bucket}/{key}.
- // The objectstore routes objects at bucket/{bucket}/{key} relative to its base.
- // Point the S3Client endpoint at {server}/obj/bucket so the paths line up.
- S3ClientOptions Opts;
- Opts.BucketName = "s3test";
- Opts.Region = "us-east-1";
- Opts.Endpoint = fmt::format("http://localhost:{}/obj/bucket", Port);
- Opts.PathStyle = true;
- Opts.Credentials.AccessKeyId = "testkey";
- Opts.Credentials.SecretAccessKey = "testsecret";
-
- S3Client Client(Opts);
-
- // -- PUT + GET roundtrip --
- std::string_view TestData = "hello from s3client via objectstore"sv;
- IoBuffer Content = IoBufferBuilder::MakeFromMemory(MakeMemoryView(TestData));
- S3Result PutRes = Client.PutObject("test/hello.txt", std::move(Content));
- REQUIRE_MESSAGE(PutRes.IsSuccess(), PutRes.Error);
-
- S3GetObjectResult GetRes = Client.GetObject("test/hello.txt");
- REQUIRE_MESSAGE(GetRes.IsSuccess(), GetRes.Error);
- CHECK(GetRes.AsText() == TestData);
-
- // -- PUT overwrites --
- IoBuffer Original = IoBufferBuilder::MakeFromMemory(MakeMemoryView("original"sv));
- IoBuffer Overwrite = IoBufferBuilder::MakeFromMemory(MakeMemoryView("overwritten"sv));
- REQUIRE(Client.PutObject("overwrite/file.txt", std::move(Original)).IsSuccess());
- REQUIRE(Client.PutObject("overwrite/file.txt", std::move(Overwrite)).IsSuccess());
-
- S3GetObjectResult OverwriteGet = Client.GetObject("overwrite/file.txt");
- REQUIRE(OverwriteGet.IsSuccess());
- CHECK(OverwriteGet.AsText() == "overwritten"sv);
-
- // -- GET not found --
- S3GetObjectResult NotFoundGet = Client.GetObject("nonexistent/file.dat");
- CHECK_FALSE(NotFoundGet.IsSuccess());
-
- // -- HEAD found --
- std::string_view HeadData = "head test data"sv;
- IoBuffer HeadContent = IoBufferBuilder::MakeFromMemory(MakeMemoryView(HeadData));
- REQUIRE(Client.PutObject("head/meta.txt", std::move(HeadContent)).IsSuccess());
-
- S3HeadObjectResult HeadRes = Client.HeadObject("head/meta.txt");
- REQUIRE_MESSAGE(HeadRes.IsSuccess(), HeadRes.Error);
- CHECK(HeadRes.Status == HeadObjectResult::Found);
- CHECK(HeadRes.Info.Size == HeadData.size());
-
- // -- HEAD not found --
- S3HeadObjectResult HeadNotFound = Client.HeadObject("nonexistent/file.dat");
- CHECK(HeadNotFound.IsSuccess());
- CHECK(HeadNotFound.Status == HeadObjectResult::NotFound);
-
- // -- LIST objects --
- for (int i = 0; i < 3; ++i)
+ // --- objectstore.s3client ---
{
- std::string Key = fmt::format("listing/item-{}.txt", i);
- std::string Payload = fmt::format("content-{}", i);
- IoBuffer Buf = IoBufferBuilder::MakeFromMemory(MakeMemoryView(Payload));
- REQUIRE(Client.PutObject(Key, std::move(Buf)).IsSuccess());
- }
+ INFO("objectstore.s3client");
+
+ S3ClientOptions Opts;
+ Opts.BucketName = "s3test";
+ Opts.Region = "us-east-1";
+ Opts.Endpoint = fmt::format("http://localhost:{}/obj/bucket", Port);
+ Opts.PathStyle = true;
+ Opts.Credentials.AccessKeyId = "testkey";
+ Opts.Credentials.SecretAccessKey = "testsecret";
+
+ S3Client Client(Opts);
+
+ // -- PUT + GET roundtrip --
+ std::string_view TestData = "hello from s3client via objectstore"sv;
+ IoBuffer Content = IoBufferBuilder::MakeFromMemory(MakeMemoryView(TestData));
+ S3Result PutRes = Client.PutObject("test/hello.txt", std::move(Content));
+ REQUIRE_MESSAGE(PutRes.IsSuccess(), PutRes.Error);
+
+ S3GetObjectResult GetRes = Client.GetObject("test/hello.txt");
+ REQUIRE_MESSAGE(GetRes.IsSuccess(), GetRes.Error);
+ CHECK(GetRes.AsText() == TestData);
+
+ // -- PUT overwrites --
+ IoBuffer Original = IoBufferBuilder::MakeFromMemory(MakeMemoryView("original"sv));
+ IoBuffer Overwrite = IoBufferBuilder::MakeFromMemory(MakeMemoryView("overwritten"sv));
+ REQUIRE(Client.PutObject("overwrite/file.txt", std::move(Original)).IsSuccess());
+ REQUIRE(Client.PutObject("overwrite/file.txt", std::move(Overwrite)).IsSuccess());
+
+ S3GetObjectResult OverwriteGet = Client.GetObject("overwrite/file.txt");
+ REQUIRE(OverwriteGet.IsSuccess());
+ CHECK(OverwriteGet.AsText() == "overwritten"sv);
+
+ // -- GET not found --
+ S3GetObjectResult NotFoundGet = Client.GetObject("nonexistent/file.dat");
+ CHECK_FALSE(NotFoundGet.IsSuccess());
+
+ // -- HEAD found --
+ std::string_view HeadData = "head test data"sv;
+ IoBuffer HeadContent = IoBufferBuilder::MakeFromMemory(MakeMemoryView(HeadData));
+ REQUIRE(Client.PutObject("head/meta.txt", std::move(HeadContent)).IsSuccess());
+
+ S3HeadObjectResult HeadRes = Client.HeadObject("head/meta.txt");
+ REQUIRE_MESSAGE(HeadRes.IsSuccess(), HeadRes.Error);
+ CHECK(HeadRes.Status == HeadObjectResult::Found);
+ CHECK(HeadRes.Info.Size == HeadData.size());
+
+ // -- HEAD not found --
+ S3HeadObjectResult HeadNotFound = Client.HeadObject("nonexistent/file.dat");
+ CHECK(HeadNotFound.IsSuccess());
+ CHECK(HeadNotFound.Status == HeadObjectResult::NotFound);
+
+ // -- LIST objects --
+ for (int i = 0; i < 3; ++i)
+ {
+ std::string Key = fmt::format("listing/item-{}.txt", i);
+ std::string Payload = fmt::format("content-{}", i);
+ IoBuffer Buf = IoBufferBuilder::MakeFromMemory(MakeMemoryView(Payload));
+ REQUIRE(Client.PutObject(Key, std::move(Buf)).IsSuccess());
+ }
- S3ListObjectsResult ListRes = Client.ListObjects("listing/");
- REQUIRE_MESSAGE(ListRes.IsSuccess(), ListRes.Error);
- REQUIRE(ListRes.Objects.size() == 3);
+ S3ListObjectsResult ListRes = Client.ListObjects("listing/");
+ REQUIRE_MESSAGE(ListRes.IsSuccess(), ListRes.Error);
+ REQUIRE(ListRes.Objects.size() == 3);
+
+ std::vector<std::string> Keys;
+ for (const S3ObjectInfo& Obj : ListRes.Objects)
+ {
+ Keys.push_back(Obj.Key);
+ CHECK(Obj.Size > 0);
+ }
+ std::sort(Keys.begin(), Keys.end());
+ CHECK(Keys[0] == "listing/item-0.txt");
+ CHECK(Keys[1] == "listing/item-1.txt");
+ CHECK(Keys[2] == "listing/item-2.txt");
+
+ // -- LIST empty prefix --
+ S3ListObjectsResult EmptyList = Client.ListObjects("no-such-prefix/");
+ REQUIRE(EmptyList.IsSuccess());
+ CHECK(EmptyList.Objects.empty());
+ }
- std::vector<std::string> Keys;
- for (const S3ObjectInfo& Obj : ListRes.Objects)
+ // --- objectstore.range-requests ---
{
- Keys.push_back(Obj.Key);
- CHECK(Obj.Size > 0);
+ INFO("objectstore.range-requests");
+
+ HttpClient Client(Instance.GetBaseUri() + "/obj/");
+
+ IoBuffer Blob = CreateRandomBlob(1024);
+ MemoryView BlobView = Blob.GetView();
+ std::string ObjectPath = "bucket/bkt/range-test/data.bin";
+
+ HttpClient::Response PutResult = Client.Put(ObjectPath, IoBuffer(Blob));
+ REQUIRE(PutResult);
+
+ // Full GET without Range header
+ {
+ HttpClient::Response Result = Client.Get(ObjectPath);
+ CHECK(Result.StatusCode == HttpResponseCode::OK);
+ CHECK_EQ(Result.ResponsePayload.GetSize(), 1024u);
+ CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView));
+ }
+
+ // Single range: bytes 100-199
+ {
+ HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=100-199"}});
+ CHECK(Result.StatusCode == HttpResponseCode::PartialContent);
+ CHECK_EQ(Result.ResponsePayload.GetSize(), 100u);
+ CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView.Mid(100, 100)));
+ }
+
+ // Range starting at zero: bytes 0-49
+ {
+ HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=0-49"}});
+ CHECK(Result.StatusCode == HttpResponseCode::PartialContent);
+ CHECK_EQ(Result.ResponsePayload.GetSize(), 50u);
+ CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView.Mid(0, 50)));
+ }
+
+ // Range at end of file: bytes 1000-1023
+ {
+ HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=1000-1023"}});
+ CHECK(Result.StatusCode == HttpResponseCode::PartialContent);
+ CHECK_EQ(Result.ResponsePayload.GetSize(), 24u);
+ CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView.Mid(1000, 24)));
+ }
+
+ // Multiple ranges: bytes 0-49 and 100-149
+ {
+ HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=0-49,100-149"}});
+ CHECK(Result.StatusCode == HttpResponseCode::PartialContent);
+
+ std::string_view Body(reinterpret_cast<const char*>(Result.ResponsePayload.GetData()), Result.ResponsePayload.GetSize());
+
+ // Verify multipart structure contains both range payloads
+ CHECK(Body.find("Content-Range: bytes 0-49/1024") != std::string_view::npos);
+ CHECK(Body.find("Content-Range: bytes 100-149/1024") != std::string_view::npos);
+
+ // Extract and verify actual data for first range
+ auto FindPartData = [&](std::string_view ContentRange) -> std::string_view {
+ size_t Pos = Body.find(ContentRange);
+ if (Pos == std::string_view::npos)
+ {
+ return {};
+ }
+ // Skip past the Content-Range line and the blank line separator
+ Pos = Body.find("\r\n\r\n", Pos);
+ if (Pos == std::string_view::npos)
+ {
+ return {};
+ }
+ Pos += 4;
+ size_t End = Body.find("\r\n--", Pos);
+ if (End == std::string_view::npos)
+ {
+ return {};
+ }
+ return Body.substr(Pos, End - Pos);
+ };
+
+ std::string_view Part1 = FindPartData("Content-Range: bytes 0-49/1024");
+ CHECK_EQ(Part1.size(), 50u);
+ CHECK(MemoryView(Part1.data(), Part1.size()).EqualBytes(BlobView.Mid(0, 50)));
+
+ std::string_view Part2 = FindPartData("Content-Range: bytes 100-149/1024");
+ CHECK_EQ(Part2.size(), 50u);
+ CHECK(MemoryView(Part2.data(), Part2.size()).EqualBytes(BlobView.Mid(100, 50)));
+ }
+
+ // Out-of-bounds single range
+ {
+ HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=2000-2099"}});
+ CHECK(Result.StatusCode == HttpResponseCode::RangeNotSatisfiable);
+ }
+
+ // Out-of-bounds multi-range
+ {
+ HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=0-49,2000-2099"}});
+ CHECK(Result.StatusCode == HttpResponseCode::RangeNotSatisfiable);
+ }
}
- std::sort(Keys.begin(), Keys.end());
- CHECK(Keys[0] == "listing/item-0.txt");
- CHECK(Keys[1] == "listing/item-1.txt");
- CHECK(Keys[2] == "listing/item-2.txt");
-
- // -- LIST empty prefix --
- S3ListObjectsResult EmptyList = Client.ListObjects("no-such-prefix/");
- REQUIRE(EmptyList.IsSuccess());
- CHECK(EmptyList.Objects.empty());
}
-TEST_CASE("objectstore.range-requests")
+TEST_CASE("objectstore.range-requests-download")
{
ZenServerInstance Instance(TestEnv);
const uint16_t Port = Instance.SpawnServerAndWaitUntilReady("--objectstore-enabled");
@@ -168,55 +267,42 @@ TEST_CASE("objectstore.range-requests")
IoBuffer Blob = CreateRandomBlob(1024);
MemoryView BlobView = Blob.GetView();
- std::string ObjectPath = "bucket/bkt/range-test/data.bin";
+ std::string ObjectPath = "bucket/bkt/range-download-test/data.bin";
HttpClient::Response PutResult = Client.Put(ObjectPath, IoBuffer(Blob));
REQUIRE(PutResult);
- // Full GET without Range header
- {
- HttpClient::Response Result = Client.Get(ObjectPath);
- CHECK(Result.StatusCode == HttpResponseCode::OK);
- CHECK_EQ(Result.ResponsePayload.GetSize(), 1024u);
- CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView));
- }
-
- // Single range: bytes 100-199
- {
- HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=100-199"}});
- CHECK(Result.StatusCode == HttpResponseCode::PartialContent);
- CHECK_EQ(Result.ResponsePayload.GetSize(), 100u);
- CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView.Mid(100, 100)));
- }
+ ScopedTemporaryDirectory DownloadDir;
- // Range starting at zero: bytes 0-49
+ // Single range via Download: verify Ranges is populated and GetRanges maps correctly
{
- HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=0-49"}});
+ HttpClient::Response Result = Client.Download(ObjectPath, DownloadDir.Path(), {{"Range", "bytes=100-199"}});
CHECK(Result.StatusCode == HttpResponseCode::PartialContent);
- CHECK_EQ(Result.ResponsePayload.GetSize(), 50u);
- CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView.Mid(0, 50)));
+ REQUIRE_EQ(Result.Ranges.size(), 1u);
+ CHECK_EQ(Result.Ranges[0].RangeOffset, 100u);
+ CHECK_EQ(Result.Ranges[0].RangeLength, 100u);
+
+ std::vector<std::pair<uint64_t, uint64_t>> RequestedRanges = {{100, 100}};
+ std::vector<std::pair<uint64_t, uint64_t>> PayloadRanges = Result.GetRanges(RequestedRanges);
+ REQUIRE_EQ(PayloadRanges.size(), 1u);
+ CHECK(Result.ResponsePayload.GetView().Mid(PayloadRanges[0].first, PayloadRanges[0].second).EqualBytes(BlobView.Mid(100, 100)));
}
- // Range at end of file: bytes 1000-1023
+ // Multi-range via Download: verify Ranges is populated for both parts and GetRanges maps correctly
{
- HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=1000-1023"}});
+ HttpClient::Response Result = Client.Download(ObjectPath, DownloadDir.Path(), {{"Range", "bytes=0-49,100-149"}});
CHECK(Result.StatusCode == HttpResponseCode::PartialContent);
- CHECK_EQ(Result.ResponsePayload.GetSize(), 24u);
- CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView.Mid(1000, 24)));
- }
-
- // Multiple ranges: not supported, falls back to 200 with full body per RFC 7233
- {
- HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=0-49,100-149"}});
- CHECK(Result.StatusCode == HttpResponseCode::OK);
- CHECK_EQ(Result.ResponsePayload.GetSize(), 1024u);
- CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView));
- }
-
- // Out-of-bounds range: should return 400
- {
- HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=2000-2099"}});
- CHECK(Result.StatusCode == HttpResponseCode::BadRequest);
+ REQUIRE_EQ(Result.Ranges.size(), 2u);
+ CHECK_EQ(Result.Ranges[0].RangeOffset, 0u);
+ CHECK_EQ(Result.Ranges[0].RangeLength, 50u);
+ CHECK_EQ(Result.Ranges[1].RangeOffset, 100u);
+ CHECK_EQ(Result.Ranges[1].RangeLength, 50u);
+
+ std::vector<std::pair<uint64_t, uint64_t>> RequestedRanges = {{0, 50}, {100, 50}};
+ std::vector<std::pair<uint64_t, uint64_t>> PayloadRanges = Result.GetRanges(RequestedRanges);
+ REQUIRE_EQ(PayloadRanges.size(), 2u);
+ CHECK(Result.ResponsePayload.GetView().Mid(PayloadRanges[0].first, PayloadRanges[0].second).EqualBytes(BlobView.Mid(0, 50)));
+ CHECK(Result.ResponsePayload.GetView().Mid(PayloadRanges[1].first, PayloadRanges[1].second).EqualBytes(BlobView.Mid(100, 50)));
}
}
diff --git a/src/zenserver-test/projectstore-tests.cpp b/src/zenserver-test/projectstore-tests.cpp
index cec453511..49d985abb 100644
--- a/src/zenserver-test/projectstore-tests.cpp
+++ b/src/zenserver-test/projectstore-tests.cpp
@@ -41,423 +41,430 @@ TEST_CASE("project.basic")
const uint16_t PortNumber = Instance1.SpawnServerAndWaitUntilReady();
- std::mt19937_64 mt;
-
- zen::StringBuilder<64> BaseUri;
- BaseUri << fmt::format("http://localhost:{}", PortNumber);
+ std::string ServerUri = fmt::format("http://localhost:{}", PortNumber);
std::filesystem::path BinPath = zen::GetRunningExecutablePath();
std::filesystem::path RootPath = BinPath.parent_path().parent_path();
BinPath = BinPath.lexically_relative(RootPath);
- SUBCASE("build store init")
+ auto CreateProjectAndOplog = [&](std::string_view ProjectName, std::string_view OplogName) -> std::string {
+ HttpClient Http{ServerUri};
+
+ zen::CbObjectWriter Body;
+ Body << "id" << ProjectName;
+ Body << "root" << RootPath.c_str();
+ Body << "project"
+ << "/zooom";
+ Body << "engine"
+ << "/zooom";
+ IoBuffer BodyBuf = Body.Save().GetBuffer().AsIoBuffer();
+ auto Response = Http.Post(fmt::format("/prj/{}", ProjectName), BodyBuf);
+ REQUIRE(Response.StatusCode == HttpResponseCode::Created);
+
+ std::string OplogUri = fmt::format("{}/prj/{}/oplog/{}", ServerUri, ProjectName, OplogName);
+ HttpClient OplogHttp{OplogUri};
+ auto OplogResponse = OplogHttp.Post(""sv, IoBuffer{}, ZenContentType::kCbObject);
+ REQUIRE(OplogResponse.StatusCode == HttpResponseCode::Created);
+
+ return OplogUri;
+ };
+
+ // Create a file at a path exceeding Windows MAX_PATH (260 chars) for long filename testing
+ std::filesystem::path LongPathDir = RootPath / "longpathtest";
+ for (int I = 0; I < 5; ++I)
{
- {
- HttpClient Http{BaseUri};
+ LongPathDir /= std::string(50, char('a' + I));
+ }
+ std::filesystem::path LongFilePath = LongPathDir / "testfile.bin";
+ std::filesystem::path LongRelPath = LongFilePath.lexically_relative(RootPath);
- {
- zen::CbObjectWriter Body;
- Body << "id"
- << "test";
- Body << "root" << RootPath.c_str();
- Body << "project"
- << "/zooom";
- Body << "engine"
- << "/zooom";
-
- zen::BinaryWriter MemOut;
- IoBuffer BodyBuf = Body.Save().GetBuffer().AsIoBuffer();
-
- auto Response = Http.Post("/prj/test"sv, BodyBuf);
- CHECK(Response.StatusCode == HttpResponseCode::Created);
- }
+ const uint8_t LongPathFileData[] = {0xDE, 0xAD, 0xBE, 0xEF};
+ CreateDirectories(MakeSafeAbsolutePath(LongPathDir));
+ WriteFile(MakeSafeAbsolutePath(LongFilePath), IoBufferBuilder::MakeCloneFromMemory(LongPathFileData, sizeof(LongPathFileData)));
+ CHECK(LongRelPath.string().length() > 260);
- {
- auto Response = Http.Get("/prj/test"sv);
- REQUIRE(Response.StatusCode == HttpResponseCode::OK);
+ std::string LongClientPath = "/{engine}/client";
+ for (int I = 0; I < 5; ++I)
+ {
+ LongClientPath += '/';
+ LongClientPath.append(50, char('a' + I));
+ }
+ LongClientPath += "/longfile.bin";
+ CHECK(LongClientPath.length() > 260);
- CbObject ResponseObject = Response.AsObject();
+ const std::string_view LongPathChunkId{
+ "00000000"
+ "00000000"
+ "00020000"};
+ auto LongPathFileOid = zen::Oid::FromHexString(LongPathChunkId);
- CHECK(ResponseObject["id"].AsString() == "test"sv);
- CHECK(ResponseObject["root"].AsString() == PathToUtf8(RootPath.c_str()));
- }
+ // --- build store persistence ---
+ // First section also verifies project and oplog creation responses.
+ {
+ HttpClient ServerHttp{ServerUri};
+
+ {
+ zen::CbObjectWriter Body;
+ Body << "id"
+ << "test_persist";
+ Body << "root" << RootPath.c_str();
+ Body << "project"
+ << "/zooom";
+ Body << "engine"
+ << "/zooom";
+ IoBuffer BodyBuf = Body.Save().GetBuffer().AsIoBuffer();
+
+ auto Response = ServerHttp.Post("/prj/test_persist"sv, BodyBuf);
+ CHECK(Response.StatusCode == HttpResponseCode::Created);
}
- BaseUri << "/prj/test/oplog/foobar";
+ {
+ auto Response = ServerHttp.Get("/prj/test_persist"sv);
+ REQUIRE(Response.StatusCode == HttpResponseCode::OK);
+
+ CbObject ResponseObject = Response.AsObject();
+
+ CHECK(ResponseObject["id"].AsString() == "test_persist"sv);
+ CHECK(ResponseObject["root"].AsString() == PathToUtf8(RootPath.c_str()));
+ }
+
+ std::string OplogUri = fmt::format("{}/prj/test_persist/oplog/oplog_persist", ServerUri);
{
- HttpClient Http{BaseUri};
+ HttpClient OplogHttp{OplogUri};
{
- auto Response = Http.Post(""sv, IoBuffer{}, ZenContentType::kCbObject);
+ auto Response = OplogHttp.Post(""sv, IoBuffer{}, ZenContentType::kCbObject);
CHECK(Response.StatusCode == HttpResponseCode::Created);
}
{
- auto Response = Http.Get(""sv);
+ auto Response = OplogHttp.Get(""sv);
REQUIRE(Response.StatusCode == HttpResponseCode::OK);
CbObject ResponseObject = Response.AsObject();
- CHECK(ResponseObject["id"].AsString() == "foobar"sv);
- CHECK(ResponseObject["project"].AsString() == "test"sv);
+ CHECK(ResponseObject["id"].AsString() == "oplog_persist"sv);
+ CHECK(ResponseObject["project"].AsString() == "test_persist"sv);
}
}
- // Create a file at a path exceeding Windows MAX_PATH (260 chars) for long filename testing
- std::filesystem::path LongPathDir = RootPath / "longpathtest";
- for (int I = 0; I < 5; ++I)
- {
- LongPathDir /= std::string(50, char('a' + I));
- }
- std::filesystem::path LongFilePath = LongPathDir / "testfile.bin";
- std::filesystem::path LongRelPath = LongFilePath.lexically_relative(RootPath);
+ uint8_t AttachData[] = {1, 2, 3};
- const uint8_t LongPathFileData[] = {0xDE, 0xAD, 0xBE, 0xEF};
- CreateDirectories(MakeSafeAbsolutePath(LongPathDir));
- WriteFile(MakeSafeAbsolutePath(LongFilePath), IoBufferBuilder::MakeCloneFromMemory(LongPathFileData, sizeof(LongPathFileData)));
- CHECK(LongRelPath.string().length() > 260);
+ zen::CompressedBuffer Attachment = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3}));
+ zen::CbAttachment Attach{Attachment, Attachment.DecodeRawHash()};
- std::string LongClientPath = "/{engine}/client";
- for (int I = 0; I < 5; ++I)
- {
- LongClientPath += '/';
- LongClientPath.append(50, char('a' + I));
- }
- LongClientPath += "/longfile.bin";
- CHECK(LongClientPath.length() > 260);
+ zen::CbObjectWriter OpWriter;
+ OpWriter << "key"
+ << "foo"
+ << "attachment" << Attach;
- const std::string_view LongPathChunkId{
+ const std::string_view ChunkId{
"00000000"
"00000000"
- "00020000"};
- auto LongPathFileOid = zen::Oid::FromHexString(LongPathChunkId);
+ "00010000"};
+ auto FileOid = zen::Oid::FromHexString(ChunkId);
+
+ OpWriter.BeginArray("files");
+ OpWriter.BeginObject();
+ OpWriter << "id" << FileOid;
+ OpWriter << "clientpath"
+ << "/{engine}/client/side/path";
+ OpWriter << "serverpath" << BinPath.c_str();
+ OpWriter.EndObject();
+ OpWriter.BeginObject();
+ OpWriter << "id" << LongPathFileOid;
+ OpWriter << "clientpath" << LongClientPath;
+ OpWriter << "serverpath" << LongRelPath.c_str();
+ OpWriter.EndObject();
+ OpWriter.EndArray();
+
+ zen::CbObject Op = OpWriter.Save();
+
+ zen::CbPackage OpPackage(Op);
+ OpPackage.AddAttachment(Attach);
- SUBCASE("build store persistence")
- {
- uint8_t AttachData[] = {1, 2, 3};
-
- zen::CompressedBuffer Attachment = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3}));
- zen::CbAttachment Attach{Attachment, Attachment.DecodeRawHash()};
-
- zen::CbObjectWriter OpWriter;
- OpWriter << "key"
- << "foo"
- << "attachment" << Attach;
-
- const std::string_view ChunkId{
- "00000000"
- "00000000"
- "00010000"};
- auto FileOid = zen::Oid::FromHexString(ChunkId);
-
- OpWriter.BeginArray("files");
- OpWriter.BeginObject();
- OpWriter << "id" << FileOid;
- OpWriter << "clientpath"
- << "/{engine}/client/side/path";
- OpWriter << "serverpath" << BinPath.c_str();
- OpWriter.EndObject();
- OpWriter.BeginObject();
- OpWriter << "id" << LongPathFileOid;
- OpWriter << "clientpath" << LongClientPath;
- OpWriter << "serverpath" << LongRelPath.c_str();
- OpWriter.EndObject();
- OpWriter.EndArray();
-
- zen::CbObject Op = OpWriter.Save();
-
- zen::CbPackage OpPackage(Op);
- OpPackage.AddAttachment(Attach);
-
- zen::BinaryWriter MemOut;
- legacy::SaveCbPackage(OpPackage, MemOut);
-
- HttpClient Http{BaseUri};
-
- {
- auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView()));
+ zen::BinaryWriter MemOut;
+ legacy::SaveCbPackage(OpPackage, MemOut);
- REQUIRE(Response);
- CHECK(Response.StatusCode == HttpResponseCode::Created);
- }
+ HttpClient Http{OplogUri};
- // Read file data
+ {
+ auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView()));
- {
- zen::StringBuilder<128> ChunkGetUri;
- ChunkGetUri << "/" << ChunkId;
- auto Response = Http.Get(ChunkGetUri);
+ REQUIRE(Response);
+ CHECK(Response.StatusCode == HttpResponseCode::Created);
+ }
- REQUIRE(Response);
- CHECK(Response.StatusCode == HttpResponseCode::OK);
- }
+ {
+ zen::StringBuilder<128> ChunkGetUri;
+ ChunkGetUri << "/" << ChunkId;
+ auto Response = Http.Get(ChunkGetUri);
- {
- zen::StringBuilder<128> ChunkGetUri;
- ChunkGetUri << "/" << ChunkId << "?offset=1&size=10";
- auto Response = Http.Get(ChunkGetUri);
+ REQUIRE(Response);
+ CHECK(Response.StatusCode == HttpResponseCode::OK);
+ }
- REQUIRE(Response);
- CHECK(Response.StatusCode == HttpResponseCode::OK);
- CHECK(Response.ResponsePayload.GetSize() == 10);
- }
+ {
+ zen::StringBuilder<128> ChunkGetUri;
+ ChunkGetUri << "/" << ChunkId << "?offset=1&size=10";
+ auto Response = Http.Get(ChunkGetUri);
- // Read long-path file data
- {
- zen::StringBuilder<128> ChunkGetUri;
- ChunkGetUri << "/" << LongPathChunkId;
- auto Response = Http.Get(ChunkGetUri);
+ REQUIRE(Response);
+ CHECK(Response.StatusCode == HttpResponseCode::OK);
+ CHECK(Response.ResponsePayload.GetSize() == 10);
+ }
- REQUIRE(Response);
- CHECK(Response.StatusCode == HttpResponseCode::OK);
- CHECK(Response.ResponsePayload.GetSize() == sizeof(LongPathFileData));
- }
+ {
+ zen::StringBuilder<128> ChunkGetUri;
+ ChunkGetUri << "/" << LongPathChunkId;
+ auto Response = Http.Get(ChunkGetUri);
- ZEN_INFO("+++++++");
+ REQUIRE(Response);
+ CHECK(Response.StatusCode == HttpResponseCode::OK);
+ CHECK(Response.ResponsePayload.GetSize() == sizeof(LongPathFileData));
}
- SUBCASE("snapshot")
- {
- zen::CbObjectWriter OpWriter;
- OpWriter << "key"
- << "foo";
-
- const std::string_view ChunkId{
- "00000000"
- "00000000"
- "00010000"};
- auto FileOid = zen::Oid::FromHexString(ChunkId);
-
- OpWriter.BeginArray("files");
- OpWriter.BeginObject();
- OpWriter << "id" << FileOid;
- OpWriter << "clientpath"
- << "/{engine}/client/side/path";
- OpWriter << "serverpath" << BinPath.c_str();
- OpWriter.EndObject();
- OpWriter.BeginObject();
- OpWriter << "id" << LongPathFileOid;
- OpWriter << "clientpath" << LongClientPath;
- OpWriter << "serverpath" << LongRelPath.c_str();
- OpWriter.EndObject();
- OpWriter.EndArray();
-
- zen::CbObject Op = OpWriter.Save();
-
- zen::CbPackage OpPackage(Op);
-
- zen::BinaryWriter MemOut;
- legacy::SaveCbPackage(OpPackage, MemOut);
-
- HttpClient Http{BaseUri};
+ ZEN_INFO("+++++++");
+ }
- {
- auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView()));
+ // --- snapshot ---
+ {
+ std::string OplogUri = CreateProjectAndOplog("test_snap", "oplog_snap");
- REQUIRE(Response);
- CHECK(Response.StatusCode == HttpResponseCode::Created);
- }
+ zen::CbObjectWriter OpWriter;
+ OpWriter << "key"
+ << "foo";
- // Read file data, it is raw and uncompressed
- {
- zen::StringBuilder<128> ChunkGetUri;
- ChunkGetUri << "/" << ChunkId;
- auto Response = Http.Get(ChunkGetUri);
+ const std::string_view ChunkId{
+ "00000000"
+ "00000000"
+ "00010000"};
+ auto FileOid = zen::Oid::FromHexString(ChunkId);
+
+ OpWriter.BeginArray("files");
+ OpWriter.BeginObject();
+ OpWriter << "id" << FileOid;
+ OpWriter << "clientpath"
+ << "/{engine}/client/side/path";
+ OpWriter << "serverpath" << BinPath.c_str();
+ OpWriter.EndObject();
+ OpWriter.BeginObject();
+ OpWriter << "id" << LongPathFileOid;
+ OpWriter << "clientpath" << LongClientPath;
+ OpWriter << "serverpath" << LongRelPath.c_str();
+ OpWriter.EndObject();
+ OpWriter.EndArray();
+
+ zen::CbObject Op = OpWriter.Save();
+
+ zen::CbPackage OpPackage(Op);
- REQUIRE(Response);
- REQUIRE(Response.StatusCode == HttpResponseCode::OK);
+ zen::BinaryWriter MemOut;
+ legacy::SaveCbPackage(OpPackage, MemOut);
- IoBuffer Data = Response.ResponsePayload;
- IoBuffer ReferenceData = IoBufferBuilder::MakeFromFile(RootPath / BinPath);
- CHECK(ReferenceData.GetSize() == Data.GetSize());
- CHECK(ReferenceData.GetView().EqualBytes(Data.GetView()));
- }
+ HttpClient Http{OplogUri};
- // Read long-path file data, it is raw and uncompressed
- {
- zen::StringBuilder<128> ChunkGetUri;
- ChunkGetUri << "/" << LongPathChunkId;
- auto Response = Http.Get(ChunkGetUri);
+ {
+ auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView()));
- REQUIRE(Response);
- REQUIRE(Response.StatusCode == HttpResponseCode::OK);
+ REQUIRE(Response);
+ CHECK(Response.StatusCode == HttpResponseCode::Created);
+ }
- IoBuffer Data = Response.ResponsePayload;
- MemoryView ExpectedView{LongPathFileData, sizeof(LongPathFileData)};
- CHECK(Data.GetSize() == sizeof(LongPathFileData));
- CHECK(Data.GetView().EqualBytes(ExpectedView));
- }
+ // Read file data, it is raw and uncompressed
+ {
+ zen::StringBuilder<128> ChunkGetUri;
+ ChunkGetUri << "/" << ChunkId;
+ auto Response = Http.Get(ChunkGetUri);
- {
- IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer.AddString("method"sv, "snapshot"sv); });
- auto Response = Http.Post("/rpc"sv, Payload);
- REQUIRE(Response);
- CHECK(Response.StatusCode == HttpResponseCode::OK);
- }
+ REQUIRE(Response);
+ REQUIRE(Response.StatusCode == HttpResponseCode::OK);
- // Read chunk data, it is now compressed
- {
- zen::StringBuilder<128> ChunkGetUri;
- ChunkGetUri << "/" << ChunkId;
- auto Response = Http.Get(ChunkGetUri, {{"Accept-Type", "application/x-ue-comp"}});
+ IoBuffer Data = Response.ResponsePayload;
+ IoBuffer ReferenceData = IoBufferBuilder::MakeFromFile(RootPath / BinPath);
+ CHECK(ReferenceData.GetSize() == Data.GetSize());
+ CHECK(ReferenceData.GetView().EqualBytes(Data.GetView()));
+ }
- REQUIRE(Response);
- REQUIRE(Response.StatusCode == HttpResponseCode::OK);
+ // Read long-path file data, it is raw and uncompressed
+ {
+ zen::StringBuilder<128> ChunkGetUri;
+ ChunkGetUri << "/" << LongPathChunkId;
+ auto Response = Http.Get(ChunkGetUri);
- IoBuffer Data = Response.ResponsePayload;
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), RawHash, RawSize);
- REQUIRE(Compressed);
- IoBuffer DataDecompressed = Compressed.Decompress().AsIoBuffer();
- IoBuffer ReferenceData = IoBufferBuilder::MakeFromFile(RootPath / BinPath);
- CHECK(RawSize == ReferenceData.GetSize());
- CHECK(ReferenceData.GetSize() == DataDecompressed.GetSize());
- CHECK(ReferenceData.GetView().EqualBytes(DataDecompressed.GetView()));
- }
+ REQUIRE(Response);
+ REQUIRE(Response.StatusCode == HttpResponseCode::OK);
- // Read compressed long-path file data after snapshot
- {
- zen::StringBuilder<128> ChunkGetUri;
- ChunkGetUri << "/" << LongPathChunkId;
- auto Response = Http.Get(ChunkGetUri, {{"Accept-Type", "application/x-ue-comp"}});
+ IoBuffer Data = Response.ResponsePayload;
+ MemoryView ExpectedView{LongPathFileData, sizeof(LongPathFileData)};
+ CHECK(Data.GetSize() == sizeof(LongPathFileData));
+ CHECK(Data.GetView().EqualBytes(ExpectedView));
+ }
- REQUIRE(Response);
- REQUIRE(Response.StatusCode == HttpResponseCode::OK);
+ {
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer.AddString("method"sv, "snapshot"sv); });
+ auto Response = Http.Post("/rpc"sv, Payload);
+ REQUIRE(Response);
+ CHECK(Response.StatusCode == HttpResponseCode::OK);
+ }
- IoBuffer Data = Response.ResponsePayload;
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), RawHash, RawSize);
- REQUIRE(Compressed);
- IoBuffer DataDecompressed = Compressed.Decompress().AsIoBuffer();
- MemoryView ExpectedView{LongPathFileData, sizeof(LongPathFileData)};
- CHECK(RawSize == sizeof(LongPathFileData));
- CHECK(DataDecompressed.GetSize() == sizeof(LongPathFileData));
- CHECK(DataDecompressed.GetView().EqualBytes(ExpectedView));
- }
+ // Read chunk data, it is now compressed
+ {
+ zen::StringBuilder<128> ChunkGetUri;
+ ChunkGetUri << "/" << ChunkId;
+ auto Response = Http.Get(ChunkGetUri, {{"Accept-Type", "application/x-ue-comp"}});
+
+ REQUIRE(Response);
+ REQUIRE(Response.StatusCode == HttpResponseCode::OK);
- ZEN_INFO("+++++++");
+ IoBuffer Data = Response.ResponsePayload;
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), RawHash, RawSize);
+ REQUIRE(Compressed);
+ IoBuffer DataDecompressed = Compressed.Decompress().AsIoBuffer();
+ IoBuffer ReferenceData = IoBufferBuilder::MakeFromFile(RootPath / BinPath);
+ CHECK(RawSize == ReferenceData.GetSize());
+ CHECK(ReferenceData.GetSize() == DataDecompressed.GetSize());
+ CHECK(ReferenceData.GetView().EqualBytes(DataDecompressed.GetView()));
}
- SUBCASE("snapshot zero byte file")
+ // Read compressed long-path file data after snapshot
{
- // A zero-byte file referenced in an oplog entry must survive a
- // snapshot: the file is read, compressed, stored in CidStore, and
- // the oplog is rewritten with a BinaryAttachment reference. After
- // the snapshot the chunk must be retrievable and decompress to an
- // empty payload.
-
- std::filesystem::path EmptyFileRelPath = std::filesystem::path("zerobyte_snapshot_test") / "empty.bin";
- std::filesystem::path EmptyFileAbsPath = RootPath / EmptyFileRelPath;
- CreateDirectories(MakeSafeAbsolutePath(EmptyFileAbsPath.parent_path()));
- // Create a zero-byte file on disk.
- WriteFile(MakeSafeAbsolutePath(EmptyFileAbsPath), IoBuffer{});
- REQUIRE(IsFile(MakeSafeAbsolutePath(EmptyFileAbsPath)));
-
- const std::string_view EmptyChunkId{
- "00000000"
- "00000000"
- "00030000"};
- auto EmptyFileOid = zen::Oid::FromHexString(EmptyChunkId);
-
- zen::CbObjectWriter OpWriter;
- OpWriter << "key"
- << "zero_byte_test";
- OpWriter.BeginArray("files");
- OpWriter.BeginObject();
- OpWriter << "id" << EmptyFileOid;
- OpWriter << "clientpath"
- << "/{engine}/empty_file";
- OpWriter << "serverpath" << EmptyFileRelPath.c_str();
- OpWriter.EndObject();
- OpWriter.EndArray();
-
- zen::CbObject Op = OpWriter.Save();
- zen::CbPackage OpPackage(Op);
-
- zen::BinaryWriter MemOut;
- legacy::SaveCbPackage(OpPackage, MemOut);
-
- HttpClient Http{BaseUri};
+ zen::StringBuilder<128> ChunkGetUri;
+ ChunkGetUri << "/" << LongPathChunkId;
+ auto Response = Http.Get(ChunkGetUri, {{"Accept-Type", "application/x-ue-comp"}});
- {
- auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView()));
- REQUIRE(Response);
- CHECK(Response.StatusCode == HttpResponseCode::Created);
- }
+ REQUIRE(Response);
+ REQUIRE(Response.StatusCode == HttpResponseCode::OK);
- // Read file data before snapshot - raw and uncompressed, 0 bytes.
- // http.sys converts a 200 OK with empty body to 204 No Content, so
- // accept either status code.
- {
- zen::StringBuilder<128> ChunkGetUri;
- ChunkGetUri << "/" << EmptyChunkId;
- auto Response = Http.Get(ChunkGetUri);
+ IoBuffer Data = Response.ResponsePayload;
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), RawHash, RawSize);
+ REQUIRE(Compressed);
+ IoBuffer DataDecompressed = Compressed.Decompress().AsIoBuffer();
+ MemoryView ExpectedView{LongPathFileData, sizeof(LongPathFileData)};
+ CHECK(RawSize == sizeof(LongPathFileData));
+ CHECK(DataDecompressed.GetSize() == sizeof(LongPathFileData));
+ CHECK(DataDecompressed.GetView().EqualBytes(ExpectedView));
+ }
- REQUIRE(Response);
- CHECK((Response.StatusCode == HttpResponseCode::OK || Response.StatusCode == HttpResponseCode::NoContent));
- CHECK(Response.ResponsePayload.GetSize() == 0);
- }
+ ZEN_INFO("+++++++");
+ }
- // Trigger snapshot.
- {
- IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer.AddString("method"sv, "snapshot"sv); });
- auto Response = Http.Post("/rpc"sv, Payload);
- REQUIRE(Response);
- CHECK(Response.StatusCode == HttpResponseCode::OK);
- }
+ // --- snapshot zero byte file ---
+ {
+ std::string OplogUri = CreateProjectAndOplog("test_zero", "oplog_zero");
- // Read chunk after snapshot - compressed, decompresses to 0 bytes.
- {
- zen::StringBuilder<128> ChunkGetUri;
- ChunkGetUri << "/" << EmptyChunkId;
- auto Response = Http.Get(ChunkGetUri, {{"Accept-Type", "application/x-ue-comp"}});
+ std::filesystem::path EmptyFileRelPath = std::filesystem::path("zerobyte_snapshot_test") / "empty.bin";
+ std::filesystem::path EmptyFileAbsPath = RootPath / EmptyFileRelPath;
+ CreateDirectories(MakeSafeAbsolutePath(EmptyFileAbsPath.parent_path()));
+ WriteFile(MakeSafeAbsolutePath(EmptyFileAbsPath), IoBuffer{});
+ REQUIRE(IsFile(MakeSafeAbsolutePath(EmptyFileAbsPath)));
- REQUIRE(Response);
- REQUIRE(Response.StatusCode == HttpResponseCode::OK);
+ const std::string_view EmptyChunkId{
+ "00000000"
+ "00000000"
+ "00030000"};
+ auto EmptyFileOid = zen::Oid::FromHexString(EmptyChunkId);
+
+ zen::CbObjectWriter OpWriter;
+ OpWriter << "key"
+ << "zero_byte_test";
+ OpWriter.BeginArray("files");
+ OpWriter.BeginObject();
+ OpWriter << "id" << EmptyFileOid;
+ OpWriter << "clientpath"
+ << "/{engine}/empty_file";
+ OpWriter << "serverpath" << EmptyFileRelPath.c_str();
+ OpWriter.EndObject();
+ OpWriter.EndArray();
+
+ zen::CbObject Op = OpWriter.Save();
+ zen::CbPackage OpPackage(Op);
- IoBuffer Data = Response.ResponsePayload;
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), RawHash, RawSize);
- REQUIRE(Compressed);
- CHECK(RawSize == 0);
- IoBuffer DataDecompressed = Compressed.Decompress().AsIoBuffer();
- CHECK(DataDecompressed.GetSize() == 0);
- }
+ zen::BinaryWriter MemOut;
+ legacy::SaveCbPackage(OpPackage, MemOut);
- // Cleanup
- {
- std::error_code Ec;
- DeleteDirectories(MakeSafeAbsolutePath(RootPath / "zerobyte_snapshot_test"), Ec);
- }
+ HttpClient Http{OplogUri};
- ZEN_INFO("+++++++");
+ {
+ auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView()));
+ REQUIRE(Response);
+ CHECK(Response.StatusCode == HttpResponseCode::Created);
}
- SUBCASE("test chunk not found error")
+ // Read file data before snapshot - raw and uncompressed, 0 bytes.
+ // http.sys converts a 200 OK with empty body to 204 No Content, so
+ // accept either status code.
{
- HttpClient Http{BaseUri};
+ zen::StringBuilder<128> ChunkGetUri;
+ ChunkGetUri << "/" << EmptyChunkId;
+ auto Response = Http.Get(ChunkGetUri);
- for (size_t I = 0; I < 65; I++)
- {
- zen::StringBuilder<128> PostUri;
- PostUri << "/f77c781846caead318084604/info";
- auto Response = Http.Get(PostUri);
+ REQUIRE(Response);
+ CHECK((Response.StatusCode == HttpResponseCode::OK || Response.StatusCode == HttpResponseCode::NoContent));
+ CHECK(Response.ResponsePayload.GetSize() == 0);
+ }
- REQUIRE(!Response.Error);
- CHECK(Response.StatusCode == HttpResponseCode::NotFound);
- }
+ // Trigger snapshot.
+ {
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer.AddString("method"sv, "snapshot"sv); });
+ auto Response = Http.Post("/rpc"sv, Payload);
+ REQUIRE(Response);
+ CHECK(Response.StatusCode == HttpResponseCode::OK);
+ }
+
+ // Read chunk after snapshot - compressed, decompresses to 0 bytes.
+ {
+ zen::StringBuilder<128> ChunkGetUri;
+ ChunkGetUri << "/" << EmptyChunkId;
+ auto Response = Http.Get(ChunkGetUri, {{"Accept-Type", "application/x-ue-comp"}});
+
+ REQUIRE(Response);
+ REQUIRE(Response.StatusCode == HttpResponseCode::OK);
+
+ IoBuffer Data = Response.ResponsePayload;
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), RawHash, RawSize);
+ REQUIRE(Compressed);
+ CHECK(RawSize == 0);
+ IoBuffer DataDecompressed = Compressed.Decompress().AsIoBuffer();
+ CHECK(DataDecompressed.GetSize() == 0);
}
- // Cleanup long-path test directory
{
std::error_code Ec;
- DeleteDirectories(MakeSafeAbsolutePath(RootPath / "longpathtest"), Ec);
+ DeleteDirectories(MakeSafeAbsolutePath(RootPath / "zerobyte_snapshot_test"), Ec);
+ }
+
+ ZEN_INFO("+++++++");
+ }
+
+ // --- test chunk not found error ---
+ {
+ std::string OplogUri = CreateProjectAndOplog("test_notfound", "oplog_notfound");
+ HttpClient Http{OplogUri};
+
+ for (size_t I = 0; I < 65; I++)
+ {
+ zen::StringBuilder<128> PostUri;
+ PostUri << "/f77c781846caead318084604/info";
+ auto Response = Http.Get(PostUri);
+
+ REQUIRE(!Response.Error);
+ CHECK(Response.StatusCode == HttpResponseCode::NotFound);
}
}
+
+ // Cleanup long-path test directory
+ {
+ std::error_code Ec;
+ DeleteDirectories(MakeSafeAbsolutePath(RootPath / "longpathtest"), Ec);
+ }
}
CbPackage
@@ -753,86 +760,102 @@ TEST_CASE("project.remote")
}
};
- SUBCASE("File")
+ // --- Zen ---
+ // NOTE: Zen export must run before file-based exports from the same source
+ // oplog. A prior file export leaves server-side state that causes a
+ // subsequent zen-protocol export from the same oplog to abort.
{
+ INFO("Zen");
ScopedTemporaryDirectory TempDir;
{
- IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) {
+ std::string ExportSourceUri = Servers.GetInstance(0).GetBaseUri();
+ std::string ExportTargetUri = Servers.GetInstance(1).GetBaseUri();
+ MakeProject(ExportTargetUri, "proj0_zen");
+ MakeOplog(ExportTargetUri, "proj0_zen", "oplog0_zen");
+
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
Writer << "method"sv
<< "export"sv;
Writer << "params" << BeginObject;
{
Writer << "maxblocksize"sv << 3072u;
Writer << "maxchunkembedsize"sv << 1296u;
- Writer << "chunkfilesizelimit"sv << 5u * 1024u;
Writer << "maxchunksperblock"sv << 16u;
+ Writer << "chunkfilesizelimit"sv << 5u * 1024u;
Writer << "force"sv << false;
- Writer << "file"sv << BeginObject;
+ Writer << "zen"sv << BeginObject;
{
- Writer << "path"sv << path;
- Writer << "name"sv
- << "proj0_oplog0"sv;
+ Writer << "url"sv << ExportTargetUri.substr(7);
+ Writer << "project"
+ << "proj0_zen";
+ Writer << "oplog"
+ << "oplog0_zen";
}
- Writer << EndObject; // "file"
+ Writer << EndObject; // "zen"
}
Writer << EndObject; // "params"
});
- HttpClient Http{Servers.GetInstance(0).GetBaseUri()};
-
+ HttpClient Http{Servers.GetInstance(0).GetBaseUri()};
HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload);
HttpWaitForCompletion(Servers.GetInstance(0), Response);
}
+ ValidateAttachments(1, "proj0_zen", "oplog0_zen");
+ ValidateOplog(1, "proj0_zen", "oplog0_zen");
+
{
- MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_copy");
- MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
+ std::string ImportSourceUri = Servers.GetInstance(1).GetBaseUri();
+ std::string ImportTargetUri = Servers.GetInstance(2).GetBaseUri();
+ MakeProject(ImportTargetUri, "proj1");
+ MakeOplog(ImportTargetUri, "proj1", "oplog1");
- IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) {
+ IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
Writer << "method"sv
<< "import"sv;
Writer << "params" << BeginObject;
{
Writer << "force"sv << false;
- Writer << "file"sv << BeginObject;
+ Writer << "zen"sv << BeginObject;
{
- Writer << "path"sv << path;
- Writer << "name"sv
- << "proj0_oplog0"sv;
+ Writer << "url"sv << ImportSourceUri.substr(7);
+ Writer << "project"
+ << "proj0_zen";
+ Writer << "oplog"
+ << "oplog0_zen";
}
- Writer << EndObject; // "file"
+ Writer << EndObject; // "zen"
}
Writer << EndObject; // "params"
});
- HttpClient Http{Servers.GetInstance(1).GetBaseUri()};
-
- HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_copy", "oplog0_copy"), Payload);
- HttpWaitForCompletion(Servers.GetInstance(1), Response);
+ HttpClient Http{Servers.GetInstance(2).GetBaseUri()};
+ HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj1", "oplog1"), Payload);
+ HttpWaitForCompletion(Servers.GetInstance(2), Response);
}
- ValidateAttachments(1, "proj0_copy", "oplog0_copy");
- ValidateOplog(1, "proj0_copy", "oplog0_copy");
+ ValidateAttachments(2, "proj1", "oplog1");
+ ValidateOplog(2, "proj1", "oplog1");
}
- SUBCASE("File disable blocks")
+ // --- File ---
{
+ INFO("File");
ScopedTemporaryDirectory TempDir;
{
- IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) {
Writer << "method"sv
<< "export"sv;
Writer << "params" << BeginObject;
{
Writer << "maxblocksize"sv << 3072u;
Writer << "maxchunkembedsize"sv << 1296u;
- Writer << "maxchunksperblock"sv << 16u;
Writer << "chunkfilesizelimit"sv << 5u * 1024u;
- Writer << "force"sv << false;
+ Writer << "maxchunksperblock"sv << 16u;
+ Writer << "force"sv << true;
Writer << "file"sv << BeginObject;
{
- Writer << "path"sv << TempDir.Path().string();
+ Writer << "path"sv << path;
Writer << "name"sv
<< "proj0_oplog0"sv;
- Writer << "disableblocks"sv << true;
}
Writer << EndObject; // "file"
}
@@ -845,9 +868,10 @@ TEST_CASE("project.remote")
HttpWaitForCompletion(Servers.GetInstance(0), Response);
}
{
- MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_copy");
- MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
- IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
+ MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_file");
+ MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_file", "oplog0_file");
+
+ IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) {
Writer << "method"sv
<< "import"sv;
Writer << "params" << BeginObject;
@@ -855,7 +879,7 @@ TEST_CASE("project.remote")
Writer << "force"sv << false;
Writer << "file"sv << BeginObject;
{
- Writer << "path"sv << TempDir.Path().string();
+ Writer << "path"sv << path;
Writer << "name"sv
<< "proj0_oplog0"sv;
}
@@ -866,15 +890,16 @@ TEST_CASE("project.remote")
HttpClient Http{Servers.GetInstance(1).GetBaseUri()};
- HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_copy", "oplog0_copy"), Payload);
+ HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_file", "oplog0_file"), Payload);
HttpWaitForCompletion(Servers.GetInstance(1), Response);
}
- ValidateAttachments(1, "proj0_copy", "oplog0_copy");
- ValidateOplog(1, "proj0_copy", "oplog0_copy");
+ ValidateAttachments(1, "proj0_file", "oplog0_file");
+ ValidateOplog(1, "proj0_file", "oplog0_file");
}
- SUBCASE("File force temp blocks")
+ // --- File disable blocks ---
{
+ INFO("File disable blocks");
ScopedTemporaryDirectory TempDir;
{
IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
@@ -886,26 +911,27 @@ TEST_CASE("project.remote")
Writer << "maxchunkembedsize"sv << 1296u;
Writer << "maxchunksperblock"sv << 16u;
Writer << "chunkfilesizelimit"sv << 5u * 1024u;
- Writer << "force"sv << false;
+ Writer << "force"sv << true;
Writer << "file"sv << BeginObject;
{
Writer << "path"sv << TempDir.Path().string();
Writer << "name"sv
<< "proj0_oplog0"sv;
- Writer << "enabletempblocks"sv << true;
+ Writer << "disableblocks"sv << true;
}
Writer << EndObject; // "file"
}
Writer << EndObject; // "params"
});
- HttpClient Http{Servers.GetInstance(0).GetBaseUri()};
+ HttpClient Http{Servers.GetInstance(0).GetBaseUri()};
+
HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload);
HttpWaitForCompletion(Servers.GetInstance(0), Response);
}
{
- MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_copy");
- MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
+ MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_noblock");
+ MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_noblock", "oplog0_noblock");
IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
Writer << "method"sv
<< "import"sv;
@@ -923,23 +949,20 @@ TEST_CASE("project.remote")
Writer << EndObject; // "params"
});
- HttpClient Http{Servers.GetInstance(1).GetBaseUri()};
- HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_copy", "oplog0_copy"), Payload);
+ HttpClient Http{Servers.GetInstance(1).GetBaseUri()};
+
+ HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_noblock", "oplog0_noblock"), Payload);
HttpWaitForCompletion(Servers.GetInstance(1), Response);
}
- ValidateAttachments(1, "proj0_copy", "oplog0_copy");
- ValidateOplog(1, "proj0_copy", "oplog0_copy");
+ ValidateAttachments(1, "proj0_noblock", "oplog0_noblock");
+ ValidateOplog(1, "proj0_noblock", "oplog0_noblock");
}
- SUBCASE("Zen")
+ // --- File force temp blocks ---
{
+ INFO("File force temp blocks");
ScopedTemporaryDirectory TempDir;
{
- std::string ExportSourceUri = Servers.GetInstance(0).GetBaseUri();
- std::string ExportTargetUri = Servers.GetInstance(1).GetBaseUri();
- MakeProject(ExportTargetUri, "proj0_copy");
- MakeOplog(ExportTargetUri, "proj0_copy", "oplog0_copy");
-
IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
Writer << "method"sv
<< "export"sv;
@@ -949,14 +972,13 @@ TEST_CASE("project.remote")
Writer << "maxchunkembedsize"sv << 1296u;
Writer << "maxchunksperblock"sv << 16u;
Writer << "chunkfilesizelimit"sv << 5u * 1024u;
- Writer << "force"sv << false;
- Writer << "zen"sv << BeginObject;
+ Writer << "force"sv << true;
+ Writer << "file"sv << BeginObject;
{
- Writer << "url"sv << ExportTargetUri.substr(7);
- Writer << "project"
- << "proj0_copy";
- Writer << "oplog"
- << "oplog0_copy";
+ Writer << "path"sv << TempDir.Path().string();
+ Writer << "name"sv
+ << "proj0_oplog0"sv;
+ Writer << "enabletempblocks"sv << true;
}
Writer << EndObject; // "file"
}
@@ -967,40 +989,32 @@ TEST_CASE("project.remote")
HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload);
HttpWaitForCompletion(Servers.GetInstance(0), Response);
}
- ValidateAttachments(1, "proj0_copy", "oplog0_copy");
- ValidateOplog(1, "proj0_copy", "oplog0_copy");
-
{
- std::string ImportSourceUri = Servers.GetInstance(1).GetBaseUri();
- std::string ImportTargetUri = Servers.GetInstance(2).GetBaseUri();
- MakeProject(ImportTargetUri, "proj1");
- MakeOplog(ImportTargetUri, "proj1", "oplog1");
-
+ MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_tmpblock");
+ MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_tmpblock", "oplog0_tmpblock");
IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
Writer << "method"sv
<< "import"sv;
Writer << "params" << BeginObject;
{
Writer << "force"sv << false;
- Writer << "zen"sv << BeginObject;
+ Writer << "file"sv << BeginObject;
{
- Writer << "url"sv << ImportSourceUri.substr(7);
- Writer << "project"
- << "proj0_copy";
- Writer << "oplog"
- << "oplog0_copy";
+ Writer << "path"sv << TempDir.Path().string();
+ Writer << "name"sv
+ << "proj0_oplog0"sv;
}
Writer << EndObject; // "file"
}
Writer << EndObject; // "params"
});
- HttpClient Http{Servers.GetInstance(2).GetBaseUri()};
- HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj1", "oplog1"), Payload);
- HttpWaitForCompletion(Servers.GetInstance(2), Response);
+ HttpClient Http{Servers.GetInstance(1).GetBaseUri()};
+ HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_tmpblock", "oplog0_tmpblock"), Payload);
+ HttpWaitForCompletion(Servers.GetInstance(1), Response);
}
- ValidateAttachments(2, "proj1", "oplog1");
- ValidateOplog(2, "proj1", "oplog1");
+ ValidateAttachments(1, "proj0_tmpblock", "oplog0_tmpblock");
+ ValidateOplog(1, "proj0_tmpblock", "oplog0_tmpblock");
}
}
@@ -1379,7 +1393,7 @@ TEST_CASE("project.file.data.transitions")
return Package;
};
- SUBCASE("path-referenced file is retrievable")
+ // --- path-referenced file is retrievable ---
{
MakeProject("proj_path"sv);
MakeOplog("proj_path"sv, "oplog"sv);
@@ -1397,7 +1411,7 @@ TEST_CASE("project.file.data.transitions")
}
}
- SUBCASE("hash-referenced file is retrievable")
+ // --- hash-referenced file is retrievable ---
{
MakeProject("proj_hash"sv);
MakeOplog("proj_hash"sv, "oplog"sv);
@@ -1416,34 +1430,35 @@ TEST_CASE("project.file.data.transitions")
}
}
- SUBCASE("hash-referenced to path-referenced transition with different content")
+ struct TransitionVariant
{
- MakeProject("proj_hash_to_path_diff"sv);
- MakeOplog("proj_hash_to_path_diff"sv, "oplog"sv);
+ std::string_view Suffix;
+ bool SameOpKey;
+ bool RunGc;
+ };
- Oid FirstOpKey = Oid::NewOid();
- Oid SecondOpKey;
- bool RunGcAfterTransition = false;
+ static constexpr TransitionVariant Variants[] = {
+ {"_nk", false, false},
+ {"_sk", true, false},
+ {"_nk_gc", false, true},
+ {"_sk_gc", true, true},
+ };
- SUBCASE("new op key") { SecondOpKey = Oid::NewOid(); }
- SUBCASE("same op key") { SecondOpKey = FirstOpKey; }
- SUBCASE("new op key with gc")
- {
- SecondOpKey = Oid::NewOid();
- RunGcAfterTransition = true;
- }
- SUBCASE("same op key with gc")
- {
- SecondOpKey = FirstOpKey;
- RunGcAfterTransition = true;
- }
+ // --- hash-referenced to path-referenced transition with different content ---
+ for (const TransitionVariant& V : Variants)
+ {
+ std::string ProjName = fmt::format("proj_h2pd{}", V.Suffix);
+ MakeProject(ProjName);
+ MakeOplog(ProjName, "oplog"sv);
+
+ Oid FirstOpKey = Oid::NewOid();
+ Oid SecondOpKey = V.SameOpKey ? FirstOpKey : Oid::NewOid();
- // First op: file with CAS hash (content differs from the on-disk file)
{
CbPackage Op = BuildHashReferencedFileOp(FirstOpKey, CompressedBlob);
- PostOplogEntry("proj_hash_to_path_diff"sv, "oplog"sv, Op);
+ PostOplogEntry(ProjName, "oplog"sv, Op);
- HttpClient::Response Response = GetChunk("proj_hash_to_path_diff"sv);
+ HttpClient::Response Response = GetChunk(ProjName);
CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk first op"));
if (Response.IsSuccess())
{
@@ -1453,19 +1468,17 @@ TEST_CASE("project.file.data.transitions")
}
}
- // Second op: same FileId transitions to serverpath (different data)
{
CbPackage Op = BuildPathReferencedFileOp(SecondOpKey);
- PostOplogEntry("proj_hash_to_path_diff"sv, "oplog"sv, Op);
+ PostOplogEntry(ProjName, "oplog"sv, Op);
}
- if (RunGcAfterTransition)
+ if (V.RunGc)
{
TriggerGcAndWait();
}
- // Must serve the on-disk file content, not the old CAS blob
- HttpClient::Response Response = GetChunk("proj_hash_to_path_diff"sv);
+ HttpClient::Response Response = GetChunk(ProjName);
CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk after transition"));
if (Response.IsSuccess())
{
@@ -1475,95 +1488,68 @@ TEST_CASE("project.file.data.transitions")
}
}
- SUBCASE("hash-referenced to path-referenced transition with identical content")
+ // --- hash-referenced to path-referenced transition with identical content ---
{
- // Compress the same on-disk file content as a CAS blob so both references yield identical data
CompressedBuffer MatchingBlob = CompressedBuffer::Compress(SharedBuffer::Clone(FileBlob.GetView()));
- MakeProject("proj_hash_to_path_same"sv);
- MakeOplog("proj_hash_to_path_same"sv, "oplog"sv);
+ for (const TransitionVariant& V : Variants)
+ {
+ std::string ProjName = fmt::format("proj_h2ps{}", V.Suffix);
+ MakeProject(ProjName);
+ MakeOplog(ProjName, "oplog"sv);
- Oid FirstOpKey = Oid::NewOid();
- Oid SecondOpKey;
- bool RunGcAfterTransition = false;
+ Oid FirstOpKey = Oid::NewOid();
+ Oid SecondOpKey = V.SameOpKey ? FirstOpKey : Oid::NewOid();
- SUBCASE("new op key") { SecondOpKey = Oid::NewOid(); }
- SUBCASE("same op key") { SecondOpKey = FirstOpKey; }
- SUBCASE("new op key with gc")
- {
- SecondOpKey = Oid::NewOid();
- RunGcAfterTransition = true;
- }
- SUBCASE("same op key with gc")
- {
- SecondOpKey = FirstOpKey;
- RunGcAfterTransition = true;
- }
+ {
+ CbPackage Op = BuildHashReferencedFileOp(FirstOpKey, MatchingBlob);
+ PostOplogEntry(ProjName, "oplog"sv, Op);
- // First op: file with CAS hash (content matches the on-disk file)
- {
- CbPackage Op = BuildHashReferencedFileOp(FirstOpKey, MatchingBlob);
- PostOplogEntry("proj_hash_to_path_same"sv, "oplog"sv, Op);
+ HttpClient::Response Response = GetChunk(ProjName);
+ CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk first op"));
+ if (Response.IsSuccess())
+ {
+ IoBuffer Payload = GetDecompressedPayload(Response);
+ CHECK(Payload.GetView().EqualBytes(FileBlob.GetView()));
+ }
+ }
- HttpClient::Response Response = GetChunk("proj_hash_to_path_same"sv);
- CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk first op"));
+ {
+ CbPackage Op = BuildPathReferencedFileOp(SecondOpKey);
+ PostOplogEntry(ProjName, "oplog"sv, Op);
+ }
+
+ if (V.RunGc)
+ {
+ TriggerGcAndWait();
+ }
+
+ HttpClient::Response Response = GetChunk(ProjName);
+ CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk after transition"));
if (Response.IsSuccess())
{
IoBuffer Payload = GetDecompressedPayload(Response);
+ CHECK_EQ(Payload.GetSize(), FileBlob.GetSize());
CHECK(Payload.GetView().EqualBytes(FileBlob.GetView()));
}
}
-
- // Second op: same FileId transitions to serverpath (same data)
- {
- CbPackage Op = BuildPathReferencedFileOp(SecondOpKey);
- PostOplogEntry("proj_hash_to_path_same"sv, "oplog"sv, Op);
- }
-
- if (RunGcAfterTransition)
- {
- TriggerGcAndWait();
- }
-
- // Must still resolve successfully after the transition
- HttpClient::Response Response = GetChunk("proj_hash_to_path_same"sv);
- CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk after transition"));
- if (Response.IsSuccess())
- {
- IoBuffer Payload = GetDecompressedPayload(Response);
- CHECK_EQ(Payload.GetSize(), FileBlob.GetSize());
- CHECK(Payload.GetView().EqualBytes(FileBlob.GetView()));
- }
}
- SUBCASE("path-referenced to hash-referenced transition with different content")
+ // --- path-referenced to hash-referenced transition with different content ---
+ for (const TransitionVariant& V : Variants)
{
- MakeProject("proj_path_to_hash_diff"sv);
- MakeOplog("proj_path_to_hash_diff"sv, "oplog"sv);
-
- Oid FirstOpKey = Oid::NewOid();
- Oid SecondOpKey;
- bool RunGcAfterTransition = false;
+ std::string ProjName = fmt::format("proj_p2hd{}", V.Suffix);
+ MakeProject(ProjName);
+ MakeOplog(ProjName, "oplog"sv);
- SUBCASE("new op key") { SecondOpKey = Oid::NewOid(); }
- SUBCASE("same op key") { SecondOpKey = FirstOpKey; }
- SUBCASE("new op key with gc")
- {
- SecondOpKey = Oid::NewOid();
- RunGcAfterTransition = true;
- }
- SUBCASE("same op key with gc")
- {
- SecondOpKey = FirstOpKey;
- RunGcAfterTransition = true;
- }
+ Oid FirstOpKey = Oid::NewOid();
+ Oid SecondOpKey = V.SameOpKey ? FirstOpKey : Oid::NewOid();
- // First op: file with serverpath
{
CbPackage Op = BuildPathReferencedFileOp(FirstOpKey);
- PostOplogEntry("proj_path_to_hash_diff"sv, "oplog"sv, Op);
+ PostOplogEntry(ProjName, "oplog"sv, Op);
- HttpClient::Response Response = GetChunk("proj_path_to_hash_diff"sv);
+ HttpClient::Response Response = GetChunk(ProjName);
CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk first op"));
if (Response.IsSuccess())
{
@@ -1572,19 +1558,17 @@ TEST_CASE("project.file.data.transitions")
}
}
- // Second op: same FileId transitions to CAS hash (different data)
{
CbPackage Op = BuildHashReferencedFileOp(SecondOpKey, CompressedBlob);
- PostOplogEntry("proj_path_to_hash_diff"sv, "oplog"sv, Op);
+ PostOplogEntry(ProjName, "oplog"sv, Op);
}
- if (RunGcAfterTransition)
+ if (V.RunGc)
{
TriggerGcAndWait();
}
- // Must serve the CAS blob content, not the old on-disk file
- HttpClient::Response Response = GetChunk("proj_path_to_hash_diff"sv);
+ HttpClient::Response Response = GetChunk(ProjName);
CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk after transition"));
if (Response.IsSuccess())
{
@@ -1595,65 +1579,51 @@ TEST_CASE("project.file.data.transitions")
}
}
- SUBCASE("path-referenced to hash-referenced transition with identical content")
+ // --- path-referenced to hash-referenced transition with identical content ---
{
- // Compress the same on-disk file content as a CAS blob so both references yield identical data
CompressedBuffer MatchingBlob = CompressedBuffer::Compress(SharedBuffer::Clone(FileBlob.GetView()));
- MakeProject("proj_path_to_hash_same"sv);
- MakeOplog("proj_path_to_hash_same"sv, "oplog"sv);
+ for (const TransitionVariant& V : Variants)
+ {
+ std::string ProjName = fmt::format("proj_p2hs{}", V.Suffix);
+ MakeProject(ProjName);
+ MakeOplog(ProjName, "oplog"sv);
- Oid FirstOpKey = Oid::NewOid();
- Oid SecondOpKey;
- bool RunGcAfterTransition = false;
+ Oid FirstOpKey = Oid::NewOid();
+ Oid SecondOpKey = V.SameOpKey ? FirstOpKey : Oid::NewOid();
- SUBCASE("new op key") { SecondOpKey = Oid::NewOid(); }
- SUBCASE("same op key") { SecondOpKey = FirstOpKey; }
- SUBCASE("new op key with gc")
- {
- SecondOpKey = Oid::NewOid();
- RunGcAfterTransition = true;
- }
- SUBCASE("same op key with gc")
- {
- SecondOpKey = FirstOpKey;
- RunGcAfterTransition = true;
- }
+ {
+ CbPackage Op = BuildPathReferencedFileOp(FirstOpKey);
+ PostOplogEntry(ProjName, "oplog"sv, Op);
- // First op: file with serverpath
- {
- CbPackage Op = BuildPathReferencedFileOp(FirstOpKey);
- PostOplogEntry("proj_path_to_hash_same"sv, "oplog"sv, Op);
+ HttpClient::Response Response = GetChunk(ProjName);
+ CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk first op"));
+ if (Response.IsSuccess())
+ {
+ IoBuffer Payload = GetDecompressedPayload(Response);
+ CHECK(Payload.GetView().EqualBytes(FileBlob.GetView()));
+ }
+ }
- HttpClient::Response Response = GetChunk("proj_path_to_hash_same"sv);
- CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk first op"));
+ {
+ CbPackage Op = BuildHashReferencedFileOp(SecondOpKey, MatchingBlob);
+ PostOplogEntry(ProjName, "oplog"sv, Op);
+ }
+
+ if (V.RunGc)
+ {
+ TriggerGcAndWait();
+ }
+
+ HttpClient::Response Response = GetChunk(ProjName);
+ CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk after transition"));
if (Response.IsSuccess())
{
IoBuffer Payload = GetDecompressedPayload(Response);
+ CHECK_EQ(Payload.GetSize(), FileBlob.GetSize());
CHECK(Payload.GetView().EqualBytes(FileBlob.GetView()));
}
}
-
- // Second op: same FileId transitions to CAS hash (same data)
- {
- CbPackage Op = BuildHashReferencedFileOp(SecondOpKey, MatchingBlob);
- PostOplogEntry("proj_path_to_hash_same"sv, "oplog"sv, Op);
- }
-
- if (RunGcAfterTransition)
- {
- TriggerGcAndWait();
- }
-
- // Must still resolve successfully after the transition
- HttpClient::Response Response = GetChunk("proj_path_to_hash_same"sv);
- CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk after transition"));
- if (Response.IsSuccess())
- {
- IoBuffer Payload = GetDecompressedPayload(Response);
- CHECK_EQ(Payload.GetSize(), FileBlob.GetSize());
- CHECK(Payload.GetView().EqualBytes(FileBlob.GetView()));
- }
}
}
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp
index cf7ffe4e4..d713f693f 100644
--- a/src/zenserver-test/zenserver-test.cpp
+++ b/src/zenserver-test/zenserver-test.cpp
@@ -199,7 +199,7 @@ TEST_CASE("default.single")
HttpClient Http{fmt::format("http://localhost:{}", PortNumber)};
- for (int i = 0; i < 100; ++i)
+ for (int i = 0; i < 20; ++i)
{
auto res = Http.Get("/test/hello"sv);
++RequestCount;
@@ -238,7 +238,6 @@ TEST_CASE("default.loopback")
ZEN_INFO("Running loopback server test...");
- SUBCASE("ipv4 endpoint connectivity")
{
HttpClient Http{fmt::format("http://127.0.0.1:{}", PortNumber)};
@@ -247,7 +246,6 @@ TEST_CASE("default.loopback")
CHECK(res);
}
- SUBCASE("ipv6 endpoint connectivity")
{
HttpClient Http{fmt::format("http://[::1]:{}", PortNumber)};
@@ -287,7 +285,7 @@ TEST_CASE("multi.basic")
HttpClient Http{fmt::format("http://localhost:{}", PortNumber)};
- for (int i = 0; i < 100; ++i)
+ for (int i = 0; i < 20; ++i)
{
auto res = Http.Get("/test/hello"sv);
++RequestCount;
@@ -401,13 +399,11 @@ TEST_CASE("http.unixsocket")
Settings.UnixSocketPath = SocketPath;
HttpClient Http{fmt::format("http://localhost:{}", PortNumber), Settings, {}};
- SUBCASE("GET over unix socket")
{
HttpClient::Response Res = Http.Get("/testing/hello");
CHECK(Res.IsSuccess());
}
- SUBCASE("POST echo over unix socket")
{
IoBuffer Body{IoBuffer::Wrap, "unix-test", 9};
HttpClient::Response Res = Http.Post("/testing/echo", Body);
@@ -431,13 +427,11 @@ TEST_CASE("http.nonetwork")
Settings.UnixSocketPath = SocketPath;
HttpClient Http{fmt::format("http://localhost:{}", PortNumber), Settings, {}};
- SUBCASE("GET over unix socket succeeds")
{
HttpClient::Response Res = Http.Get("/testing/hello");
CHECK(Res.IsSuccess());
}
- SUBCASE("TCP connection is refused")
{
asio::io_context IoContext;
asio::ip::tcp::socket Socket(IoContext);
diff --git a/src/zenserver/frontend/html/pages/cache.js b/src/zenserver/frontend/html/pages/cache.js
index 93059b81c..c6567f0be 100644
--- a/src/zenserver/frontend/html/pages/cache.js
+++ b/src/zenserver/frontend/html/pages/cache.js
@@ -56,7 +56,8 @@ export class Page extends ZenPage
this._cache_table = section.add_widget(Table, columns, Table.Flag_FitLeft|Table.Flag_PackRight|Table.Flag_AlignNumeric);
- this._cache_pager = new Pager(section, 25, () => this._render_cache_page());
+ this._cache_pager = new Pager(section, 25, () => this._render_cache_page(),
+ Pager.make_search_fn(() => this._cache_data, item => item.namespace));
const cache_drop_link = document.createElement("span");
cache_drop_link.className = "dropall zen_action";
cache_drop_link.style.position = "static";
@@ -64,6 +65,7 @@ export class Page extends ZenPage
cache_drop_link.addEventListener("click", () => this.drop_all());
this._cache_pager.prepend(cache_drop_link);
+ const loading = Pager.loading(section);
const zcache_info = await new Fetcher().resource("/z$/").json();
const namespaces = zcache_info["Namespaces"] || [];
const results = await Promise.allSettled(
@@ -75,6 +77,7 @@ export class Page extends ZenPage
.sort((a, b) => a.namespace.localeCompare(b.namespace));
this._cache_pager.set_total(this._cache_data.length);
this._render_cache_page();
+ loading.remove();
// Namespace detail area (inside namespaces section so it collapses together)
this._namespace_host = section;
diff --git a/src/zenserver/frontend/html/pages/hub.js b/src/zenserver/frontend/html/pages/hub.js
index 7ae1deb5c..3cbfe6092 100644
--- a/src/zenserver/frontend/html/pages/hub.js
+++ b/src/zenserver/frontend/html/pages/hub.js
@@ -6,6 +6,7 @@ import { ZenPage } from "./page.js"
import { Fetcher } from "../util/fetcher.js"
import { Friendly } from "../util/friendly.js"
import { Modal } from "../util/modal.js"
+import { flash_highlight } from "../util/widgets.js"
////////////////////////////////////////////////////////////////////////////////
const STABLE_STATES = new Set(["provisioned", "hibernated", "crashed"]);
@@ -159,8 +160,36 @@ export class Page extends ZenPage
this._btn_next.addEventListener("click", () => this._go_page(this._page + 1));
this._btn_provision = _make_bulk_btn("+", "Provision", () => this._show_provision_modal());
this._btn_obliterate = _make_bulk_btn("\uD83D\uDD25", "Obliterate", () => this._show_obliterate_modal());
+ this._search_input = document.createElement("input");
+ this._search_input.type = "text";
+ this._search_input.className = "module-pager-search";
+ this._search_input.placeholder = "Search module\u2026";
+ this._search_input.addEventListener("keydown", (e) =>
+ {
+ if (e.key === "Enter")
+ {
+ const term = this._search_input.value.trim().toLowerCase();
+ if (!term) { return; }
+ const idx = this._modules_data.findIndex(m =>
+ (m.moduleId || "").toLowerCase().includes(term)
+ );
+ if (idx >= 0)
+ {
+ const id = this._modules_data[idx].moduleId;
+ this._navigate_to_module(id);
+ this._flash_module(id);
+ }
+ else
+ {
+ this._search_input.style.outline = "2px solid var(--theme_fail)";
+ setTimeout(() => { this._search_input.style.outline = ""; }, 1000);
+ }
+ }
+ });
+
pager.appendChild(this._btn_provision);
pager.appendChild(this._btn_obliterate);
+ pager.appendChild(this._search_input);
pager.appendChild(this._btn_prev);
pager.appendChild(this._pager_label);
pager.appendChild(this._btn_next);
@@ -173,8 +202,11 @@ export class Page extends ZenPage
this._row_cache = new Map(); // moduleId → row refs, for in-place DOM updates
this._updating = false;
this._page = 0;
- this._page_size = 50;
+ this._page_size = 25;
this._expanded = new Set(); // moduleIds with open metrics panel
+ this._pending_highlight = null; // moduleId to navigate+flash after next poll
+ this._pending_highlight_timer = null;
+ this._loading = mod_section.tag().classify("pager-loading").text("Loading\u2026").inner();
await this._update();
this._poll_timer = setInterval(() => this._update(), 2000);
@@ -193,6 +225,15 @@ export class Page extends ZenPage
this._render_capacity(stats);
this._render_modules(status);
+ if (this._loading) { this._loading.remove(); this._loading = null; }
+ if (this._pending_highlight && this._module_map.has(this._pending_highlight))
+ {
+ const id = this._pending_highlight;
+ this._pending_highlight = null;
+ clearTimeout(this._pending_highlight_timer);
+ this._navigate_to_module(id);
+ this._flash_module(id);
+ }
}
catch (e) { /* service unavailable */ }
finally { this._updating = false; }
@@ -844,14 +885,19 @@ export class Page extends ZenPage
submit_label: "Provision",
on_submit: async (moduleId) => {
const resp = await fetch(`/hub/modules/${encodeURIComponent(moduleId)}/provision`, { method: "POST" });
- if (resp.ok)
+ if (!resp.ok)
{
- this._navigate_to_module(moduleId);
- return true;
+ const msg = await resp.text();
+ error_div.textContent = msg || ("HTTP " + resp.status);
+ return false;
}
- const msg = await resp.text();
- error_div.textContent = msg || ("HTTP " + resp.status);
- return false;
+ // Endpoint returns compact binary (CbObjectWriter), not text
+ if (resp.status === 200 || resp.status === 202)
+ {
+ this._pending_highlight = moduleId;
+ this._pending_highlight_timer = setTimeout(() => { this._pending_highlight = null; }, 5000);
+ }
+ return true;
}
});
}
@@ -885,4 +931,10 @@ export class Page extends ZenPage
}
}
+ _flash_module(id)
+ {
+ const cached = this._row_cache.get(id);
+ if (cached) { flash_highlight(cached.tr); }
+ }
+
}
diff --git a/src/zenserver/frontend/html/pages/projects.js b/src/zenserver/frontend/html/pages/projects.js
index 52d5dbb88..e613086a9 100644
--- a/src/zenserver/frontend/html/pages/projects.js
+++ b/src/zenserver/frontend/html/pages/projects.js
@@ -49,7 +49,8 @@ export class Page extends ZenPage
this._project_table = section.add_widget(Table, columns, Table.Flag_FitLeft|Table.Flag_PackRight|Table.Flag_Sortable|Table.Flag_AlignNumeric);
- this._project_pager = new Pager(section, 25, () => this._render_projects_page());
+ this._project_pager = new Pager(section, 25, () => this._render_projects_page(),
+ Pager.make_search_fn(() => this._projects_data, p => p.Id));
const drop_link = document.createElement("span");
drop_link.className = "dropall zen_action";
drop_link.style.position = "static";
@@ -57,10 +58,12 @@ export class Page extends ZenPage
drop_link.addEventListener("click", () => this.drop_all());
this._project_pager.prepend(drop_link);
+ const loading = Pager.loading(section);
this._projects_data = await new Fetcher().resource("/prj/list").json();
this._projects_data.sort((a, b) => a.Id.localeCompare(b.Id));
this._project_pager.set_total(this._projects_data.length);
this._render_projects_page();
+ loading.remove();
// Project detail area (inside projects section so it collapses together)
this._project_host = section;
diff --git a/src/zenserver/frontend/html/pages/start.js b/src/zenserver/frontend/html/pages/start.js
index 14ec4bd4a..9a3eb6de3 100644
--- a/src/zenserver/frontend/html/pages/start.js
+++ b/src/zenserver/frontend/html/pages/start.js
@@ -62,7 +62,8 @@ export class Page extends ZenPage
];
this._project_table = section.add_widget(Table, columns);
- this._project_pager = new Pager(section, 25, () => this._render_projects_page());
+ this._project_pager = new Pager(section, 25, () => this._render_projects_page(),
+ Pager.make_search_fn(() => this._projects_data, p => p.Id));
const drop_link = document.createElement("span");
drop_link.className = "dropall zen_action";
drop_link.style.position = "static";
@@ -70,10 +71,12 @@ export class Page extends ZenPage
drop_link.addEventListener("click", () => this.drop_all("projects"));
this._project_pager.prepend(drop_link);
+ const prj_loading = Pager.loading(section);
this._projects_data = await new Fetcher().resource("/prj/list").json();
this._projects_data.sort((a, b) => a.Id.localeCompare(b.Id));
this._project_pager.set_total(this._projects_data.length);
this._render_projects_page();
+ prj_loading.remove();
}
// cache
@@ -92,7 +95,8 @@ export class Page extends ZenPage
];
this._cache_table = section.add_widget(Table, columns, Table.Flag_FitLeft|Table.Flag_PackRight);
- this._cache_pager = new Pager(section, 25, () => this._render_cache_page());
+ this._cache_pager = new Pager(section, 25, () => this._render_cache_page(),
+ Pager.make_search_fn(() => this._cache_data, item => item.namespace));
const cache_drop_link = document.createElement("span");
cache_drop_link.className = "dropall zen_action";
cache_drop_link.style.position = "static";
@@ -100,6 +104,7 @@ export class Page extends ZenPage
cache_drop_link.addEventListener("click", () => this.drop_all("z$"));
this._cache_pager.prepend(cache_drop_link);
+ const cache_loading = Pager.loading(section);
const zcache_info = await new Fetcher().resource("/z$/").json();
const namespaces = zcache_info["Namespaces"] || [];
const results = await Promise.allSettled(
@@ -111,6 +116,7 @@ export class Page extends ZenPage
.sort((a, b) => a.namespace.localeCompare(b.namespace));
this._cache_pager.set_total(this._cache_data.length);
this._render_cache_page();
+ cache_loading.remove();
}
// version
diff --git a/src/zenserver/frontend/html/util/widgets.js b/src/zenserver/frontend/html/util/widgets.js
index 33d6755ac..b8fc720c1 100644
--- a/src/zenserver/frontend/html/util/widgets.js
+++ b/src/zenserver/frontend/html/util/widgets.js
@@ -6,6 +6,14 @@ import { Component } from "./component.js"
import { Friendly } from "../util/friendly.js"
////////////////////////////////////////////////////////////////////////////////
+export function flash_highlight(element)
+{
+ if (!element) { return; }
+ element.classList.add("pager-search-highlight");
+ setTimeout(() => { element.classList.remove("pager-search-highlight"); }, 1500);
+}
+
+////////////////////////////////////////////////////////////////////////////////
class Widget extends Component
{
}
@@ -404,12 +412,14 @@ export class ProgressBar extends Widget
////////////////////////////////////////////////////////////////////////////////
export class Pager
{
- constructor(section, page_size, on_change)
+ constructor(section, page_size, on_change, search_fn)
{
this._page = 0;
this._page_size = page_size;
this._total = 0;
this._on_change = on_change;
+ this._search_fn = search_fn || null;
+ this._search_input = null;
const pager = section.tag().classify("module-pager").inner();
this._btn_prev = document.createElement("button");
@@ -422,6 +432,23 @@ export class Pager
this._btn_next.className = "module-pager-btn";
this._btn_next.textContent = "Next \u2192";
this._btn_next.addEventListener("click", () => this._go_page(this._page + 1));
+
+ if (this._search_fn)
+ {
+ this._search_input = document.createElement("input");
+ this._search_input.type = "text";
+ this._search_input.className = "module-pager-search";
+ this._search_input.placeholder = "Search\u2026";
+ this._search_input.addEventListener("keydown", (e) =>
+ {
+ if (e.key === "Enter")
+ {
+ this._do_search(this._search_input.value.trim());
+ }
+ });
+ pager.appendChild(this._search_input);
+ }
+
pager.appendChild(this._btn_prev);
pager.appendChild(this._label);
pager.appendChild(this._btn_next);
@@ -432,7 +459,8 @@ export class Pager
prepend(element)
{
- this._pager.insertBefore(element, this._btn_prev);
+ const ref = this._search_input || this._btn_prev;
+ this._pager.insertBefore(element, ref);
}
set_total(n)
@@ -461,6 +489,23 @@ export class Pager
this._on_change();
}
+ _do_search(term)
+ {
+ if (!term || !this._search_fn)
+ {
+ return;
+ }
+ const result = this._search_fn(term);
+ if (!result)
+ {
+ this._search_input.style.outline = "2px solid var(--theme_fail)";
+ setTimeout(() => { this._search_input.style.outline = ""; }, 1000);
+ return;
+ }
+ this._go_page(Math.floor(result.index / this._page_size));
+ flash_highlight(this._pager.parentNode.querySelector(`[zs_name="${CSS.escape(result.name)}"]`));
+ }
+
_update_ui()
{
const total = this._total;
@@ -474,6 +519,21 @@ export class Pager
? "No items"
: `${start}\u2013${end} of ${total}`;
}
+
+ static make_search_fn(get_data, get_key)
+ {
+ return (term) => {
+ const t = term.toLowerCase();
+ const data = get_data();
+ const i = data.findIndex(item => get_key(item).toLowerCase().includes(t));
+ return i < 0 ? null : { index: i, name: get_key(data[i]) };
+ };
+ }
+
+ static loading(section)
+ {
+ return section.tag().classify("pager-loading").text("Loading\u2026").inner();
+ }
}
diff --git a/src/zenserver/frontend/html/zen.css b/src/zenserver/frontend/html/zen.css
index ca577675b..8d4e60472 100644
--- a/src/zenserver/frontend/html/zen.css
+++ b/src/zenserver/frontend/html/zen.css
@@ -1749,6 +1749,53 @@ tr:last-child td {
text-align: center;
}
+.module-pager-search {
+ font-size: 12px;
+ padding: 4px 8px;
+ width: 14em;
+ border: 1px solid var(--theme_g2);
+ border-radius: 4px;
+ background: var(--theme_g4);
+ color: var(--theme_g0);
+ outline: none;
+ transition: border-color 0.15s, outline 0.3s;
+}
+
+.module-pager-search:focus {
+ border-color: var(--theme_p0);
+}
+
+.module-pager-search::placeholder {
+ color: var(--theme_g1);
+}
+
+@keyframes pager-search-flash {
+ from { box-shadow: inset 0 0 0 100px var(--theme_p2); }
+ to { box-shadow: inset 0 0 0 100px transparent; }
+}
+
+.zen_table > .pager-search-highlight > div {
+ animation: pager-search-flash 1s linear forwards;
+}
+
+.module-table .pager-search-highlight td {
+ animation: pager-search-flash 1s linear forwards;
+}
+
+@keyframes pager-loading-pulse {
+ 0%, 100% { opacity: 0.6; }
+ 50% { opacity: 0.2; }
+}
+
+.pager-loading {
+ color: var(--theme_g1);
+ font-style: italic;
+ font-size: 14px;
+ font-weight: 600;
+ padding: 12px 0;
+ animation: pager-loading-pulse 1.5s ease-in-out infinite;
+}
+
.module-table td, .module-table th {
padding-top: 4px;
padding-bottom: 4px;
diff --git a/src/zenserver/hub/httphubservice.cpp b/src/zenserver/hub/httphubservice.cpp
index e6a900066..e4b0c28d0 100644
--- a/src/zenserver/hub/httphubservice.cpp
+++ b/src/zenserver/hub/httphubservice.cpp
@@ -121,6 +121,11 @@ HttpHubService::HttpHubService(Hub& Hub, HttpProxyHandler& Proxy, HttpStatsServi
HttpVerb::kGet);
m_Router.RegisterRoute(
+ "deprovision",
+ [this](HttpRouterRequest& Req) { HandleDeprovisionAll(Req.ServerRequest()); },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
"modules/{moduleid}",
[this](HttpRouterRequest& Req) {
std::string_view ModuleId = Req.GetCapture(1);
@@ -371,6 +376,81 @@ HttpHubService::GetActivityCounter()
}
void
+HttpHubService::HandleDeprovisionAll(HttpServerRequest& Request)
+{
+ std::vector<std::string> ModulesToDeprovision;
+ m_Hub.EnumerateModules([&ModulesToDeprovision](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) {
+ if (InstanceInfo.State == HubInstanceState::Provisioned || InstanceInfo.State == HubInstanceState::Hibernated)
+ {
+ ModulesToDeprovision.push_back(std::string(ModuleId));
+ }
+ });
+
+ if (ModulesToDeprovision.empty())
+ {
+ return Request.WriteResponse(HttpResponseCode::OK);
+ }
+ std::vector<std::string> Rejected;
+ std::vector<std::string> Accepted;
+ std::vector<std::string> Completed;
+ for (const std::string& ModuleId : ModulesToDeprovision)
+ {
+ Hub::Response Response = m_Hub.Deprovision(ModuleId);
+ switch (Response.ResponseCode)
+ {
+ case Hub::EResponseCode::NotFound:
+ // Ignore
+ break;
+ case Hub::EResponseCode::Rejected:
+ Rejected.push_back(ModuleId);
+ break;
+ case Hub::EResponseCode::Accepted:
+ Accepted.push_back(ModuleId);
+ break;
+ case Hub::EResponseCode::Completed:
+ Completed.push_back(ModuleId);
+ break;
+ }
+ }
+ if (Rejected.empty() && Accepted.empty() && Completed.empty())
+ {
+ return Request.WriteResponse(HttpResponseCode::OK);
+ }
+ HttpResponseCode Response = HttpResponseCode::OK;
+ CbObjectWriter Writer;
+ if (!Completed.empty())
+ {
+ Writer.BeginArray("Completed");
+ for (const std::string& ModuleId : Completed)
+ {
+ Writer.AddString(ModuleId);
+ }
+ Writer.EndArray(); // Completed
+ }
+ if (!Accepted.empty())
+ {
+ Writer.BeginArray("Accepted");
+ for (const std::string& ModuleId : Accepted)
+ {
+ Writer.AddString(ModuleId);
+ }
+ Writer.EndArray(); // Accepted
+ Response = HttpResponseCode::Accepted;
+ }
+ if (!Rejected.empty())
+ {
+ Writer.BeginArray("Rejected");
+ for (const std::string& ModuleId : Rejected)
+ {
+ Writer.AddString(ModuleId);
+ }
+ Writer.EndArray(); // Rejected
+ Response = HttpResponseCode::Conflict;
+ }
+ Request.WriteResponse(Response, Writer.Save());
+}
+
+void
HttpHubService::HandleModuleGet(HttpServerRequest& Request, std::string_view ModuleId)
{
Hub::InstanceInfo InstanceInfo;
diff --git a/src/zenserver/hub/httphubservice.h b/src/zenserver/hub/httphubservice.h
index ff2cb0029..f4d1b0b89 100644
--- a/src/zenserver/hub/httphubservice.h
+++ b/src/zenserver/hub/httphubservice.h
@@ -53,6 +53,7 @@ private:
HttpStatsService& m_StatsService;
HttpStatusService& m_StatusService;
+ void HandleDeprovisionAll(HttpServerRequest& Request);
void HandleModuleGet(HttpServerRequest& Request, std::string_view ModuleId);
void HandleModuleDelete(HttpServerRequest& Request, std::string_view ModuleId);
diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp
index 673306cde..b356064f9 100644
--- a/src/zenserver/hub/hydration.cpp
+++ b/src/zenserver/hub/hydration.cpp
@@ -1572,7 +1572,7 @@ TEST_CASE("hydration.s3.concurrent")
ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
- constexpr int kModuleCount = 16;
+ constexpr int kModuleCount = 6;
constexpr int kThreadCount = 4;
TestThreading Threading(kThreadCount);
diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp
index 00b7a67d7..c5f8724ca 100644
--- a/src/zenserver/main.cpp
+++ b/src/zenserver/main.cpp
@@ -250,7 +250,7 @@ test_main(int argc, char** argv)
zen::MaximizeOpenFileCount();
zen::testing::TestRunner Runner;
- Runner.ApplyCommandLine(argc, argv);
+ Runner.ApplyCommandLine(argc, argv, "server.*");
return Runner.Run();
}
#endif
diff --git a/src/zenserver/storage/buildstore/httpbuildstore.cpp b/src/zenserver/storage/buildstore/httpbuildstore.cpp
index bbbb0c37b..f935e2c6b 100644
--- a/src/zenserver/storage/buildstore/httpbuildstore.cpp
+++ b/src/zenserver/storage/buildstore/httpbuildstore.cpp
@@ -162,96 +162,81 @@ HttpBuildStoreService::GetBlobRequest(HttpRouterRequest& Req)
fmt::format("Invalid blob hash '{}'", Hash));
}
- std::vector<std::pair<uint64_t, uint64_t>> OffsetAndLengthPairs;
+ m_BuildStoreStats.BlobReadCount++;
+ IoBuffer Blob = m_BuildStore.GetBlob(BlobHash);
+ if (!Blob)
+ {
+ return ServerRequest.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Blob {} not found", Hash));
+ }
+ m_BuildStoreStats.BlobHitCount++;
+
if (ServerRequest.RequestVerb() == HttpVerb::kPost)
{
+ if (ServerRequest.AcceptContentType() != HttpContentType::kCbPackage)
+ {
+ m_BuildStoreStats.BadRequestCount++;
+ return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Accept type '{}' is not supported for blob {}, expected '{}'",
+ ToString(ServerRequest.AcceptContentType()),
+ Hash,
+ ToString(HttpContentType::kCbPackage)));
+ }
+
CbObject RangePayload = ServerRequest.ReadPayloadObject();
- if (RangePayload)
+ if (!RangePayload)
{
- CbArrayView RangesArray = RangePayload["ranges"sv].AsArrayView();
- OffsetAndLengthPairs.reserve(RangesArray.Num());
- for (CbFieldView FieldView : RangesArray)
- {
- CbObjectView RangeView = FieldView.AsObjectView();
- uint64_t RangeOffset = RangeView["offset"sv].AsUInt64();
- uint64_t RangeLength = RangeView["length"sv].AsUInt64();
- OffsetAndLengthPairs.push_back(std::make_pair(RangeOffset, RangeLength));
- }
- if (OffsetAndLengthPairs.size() > MaxRangeCountPerRequestSupported)
- {
- return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- fmt::format("Number of ranges ({}) for blob request exceeds maximum range count {}",
- OffsetAndLengthPairs.size(),
- MaxRangeCountPerRequestSupported));
- }
+ m_BuildStoreStats.BadRequestCount++;
+ return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Missing payload for range request on blob {}", BlobHash));
}
- if (OffsetAndLengthPairs.empty())
+
+ CbArrayView RangesArray = RangePayload["ranges"sv].AsArrayView();
+ const uint64_t RangeCount = RangesArray.Num();
+ if (RangeCount == 0)
{
m_BuildStoreStats.BadRequestCount++;
return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
HttpContentType::kText,
- "Fetching blob without ranges must be done with the GET verb");
+ "POST request must include a non-empty 'ranges' array");
}
- }
- else
- {
- HttpRanges Ranges;
- bool HasRange = ServerRequest.TryGetRanges(Ranges);
- if (HasRange)
+ if (RangeCount > MaxRangeCountPerRequestSupported)
{
- if (Ranges.size() > 1)
- {
- // Only a single http range is supported, we have limited support for http multirange responses
- m_BuildStoreStats.BadRequestCount++;
- return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- fmt::format("Multiple ranges in blob request is only supported for {} accept type",
- ToString(HttpContentType::kCbPackage)));
- }
- const HttpRange& FirstRange = Ranges.front();
- OffsetAndLengthPairs.push_back(std::make_pair<uint64_t, uint64_t>(FirstRange.Start, FirstRange.End - FirstRange.Start + 1));
+ m_BuildStoreStats.BadRequestCount++;
+ return ServerRequest.WriteResponse(
+ HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Range count {} exceeds maximum of {}", RangeCount, MaxRangeCountPerRequestSupported));
}
- }
-
- m_BuildStoreStats.BlobReadCount++;
- IoBuffer Blob = m_BuildStore.GetBlob(BlobHash);
- if (!Blob)
- {
- return ServerRequest.WriteResponse(HttpResponseCode::NotFound,
- HttpContentType::kText,
- fmt::format("Blob with hash '{}' could not be found", Hash));
- }
- m_BuildStoreStats.BlobHitCount++;
- if (OffsetAndLengthPairs.empty())
- {
- return ServerRequest.WriteResponse(HttpResponseCode::OK, Blob.GetContentType(), Blob);
- }
+ const uint64_t BlobSize = Blob.GetSize();
+ std::vector<IoBuffer> RangeBuffers;
+ RangeBuffers.reserve(RangeCount);
- if (ServerRequest.AcceptContentType() == HttpContentType::kCbPackage)
- {
- const uint64_t BlobSize = Blob.GetSize();
+ CbPackage ResponsePackage;
+ CbObjectWriter Writer;
- CbPackage ResponsePackage;
- std::vector<IoBuffer> RangeBuffers;
- CbObjectWriter Writer;
Writer.BeginArray("ranges"sv);
- for (const std::pair<uint64_t, uint64_t>& Range : OffsetAndLengthPairs)
+ for (CbFieldView FieldView : RangesArray)
{
- const uint64_t MaxBlobSize = Range.first < BlobSize ? BlobSize - Range.first : 0;
- const uint64_t RangeSize = Min(Range.second, MaxBlobSize);
+ CbObjectView RangeView = FieldView.AsObjectView();
+ uint64_t RangeOffset = RangeView["offset"sv].AsUInt64();
+ uint64_t RangeLength = RangeView["length"sv].AsUInt64();
+
+ const uint64_t MaxBlobSize = RangeOffset < BlobSize ? BlobSize - RangeOffset : 0;
+ const uint64_t RangeSize = Min(RangeLength, MaxBlobSize);
Writer.BeginObject();
{
- if (Range.first + RangeSize <= BlobSize)
+ if (RangeOffset + RangeSize <= BlobSize)
{
- RangeBuffers.push_back(IoBuffer(Blob, Range.first, RangeSize));
- Writer.AddInteger("offset"sv, Range.first);
+ RangeBuffers.push_back(IoBuffer(Blob, RangeOffset, RangeSize));
+ Writer.AddInteger("offset"sv, RangeOffset);
Writer.AddInteger("length"sv, RangeSize);
}
else
{
- Writer.AddInteger("offset"sv, Range.first);
+ Writer.AddInteger("offset"sv, RangeOffset);
Writer.AddInteger("length"sv, 0);
}
}
@@ -259,7 +244,7 @@ HttpBuildStoreService::GetBlobRequest(HttpRouterRequest& Req)
}
Writer.EndArray();
- CompositeBuffer Ranges(RangeBuffers);
+ CompositeBuffer Ranges(std::move(RangeBuffers));
CbAttachment PayloadAttachment(std::move(Ranges), BlobHash);
Writer.AddAttachment("payload", PayloadAttachment);
@@ -269,32 +254,21 @@ HttpBuildStoreService::GetBlobRequest(HttpRouterRequest& Req)
ResponsePackage.SetObject(HeaderObject);
CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage);
- uint64_t ResponseSize = RpcResponseBuffer.GetSize();
- ZEN_UNUSED(ResponseSize);
return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
}
else
{
- if (OffsetAndLengthPairs.size() != 1)
+ HttpRanges RequestedRangeHeader;
+ bool HasRange = ServerRequest.TryGetRanges(RequestedRangeHeader);
+ if (HasRange)
{
- // Only a single http range is supported, we have limited support for http multirange responses
- m_BuildStoreStats.BadRequestCount++;
- return ServerRequest.WriteResponse(
- HttpResponseCode::BadRequest,
- HttpContentType::kText,
- fmt::format("Multiple ranges in blob request is only supported for {} accept type", ToString(HttpContentType::kCbPackage)));
+ // Standard HTTP GET with Range header: framework handles 206, Content-Range, and 416 on OOB.
+ return ServerRequest.WriteResponse(HttpContentType::kBinary, Blob, RequestedRangeHeader);
}
-
- const std::pair<uint64_t, uint64_t>& OffsetAndLength = OffsetAndLengthPairs.front();
- const uint64_t BlobSize = Blob.GetSize();
- const uint64_t MaxBlobSize = OffsetAndLength.first < BlobSize ? BlobSize - OffsetAndLength.first : 0;
- const uint64_t RangeSize = Min(OffsetAndLength.second, MaxBlobSize);
- if (OffsetAndLength.first + RangeSize > BlobSize)
+ else
{
- return ServerRequest.WriteResponse(HttpResponseCode::NoContent);
+ return ServerRequest.WriteResponse(HttpResponseCode::OK, Blob.GetContentType(), Blob);
}
- Blob = IoBuffer(Blob, OffsetAndLength.first, RangeSize);
- return ServerRequest.WriteResponse(HttpResponseCode::OK, ZenContentType::kBinary, Blob);
}
}
diff --git a/src/zenserver/storage/objectstore/objectstore.cpp b/src/zenserver/storage/objectstore/objectstore.cpp
index e34cd0445..bab9df06d 100644
--- a/src/zenserver/storage/objectstore/objectstore.cpp
+++ b/src/zenserver/storage/objectstore/objectstore.cpp
@@ -637,11 +637,7 @@ HttpObjectStoreService::GetObject(HttpRouterRequest& Request, const std::string_
}
HttpRanges Ranges;
- if (Request.ServerRequest().TryGetRanges(Ranges); Ranges.size() > 1)
- {
- // Multi-range is not supported, fall back to full response per RFC 7233
- Ranges.clear();
- }
+ Request.ServerRequest().TryGetRanges(Ranges);
FileContents File;
{
@@ -665,42 +661,34 @@ HttpObjectStoreService::GetObject(HttpRouterRequest& Request, const std::string_
if (Ranges.empty())
{
- const uint64_t TotalServed = m_TotalBytesServed.fetch_add(FileBuf.Size()) + FileBuf.Size();
-
+ const uint64_t TotalServed = m_TotalBytesServed.fetch_add(FileBuf.GetSize()) + FileBuf.GetSize();
ZEN_LOG_DEBUG(LogObj,
"GET - '{}/{}' ({}) [OK] (Served: {})",
BucketName,
RelativeBucketPath,
- NiceBytes(FileBuf.Size()),
+ NiceBytes(FileBuf.GetSize()),
NiceBytes(TotalServed));
-
- Request.ServerRequest().WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, FileBuf);
}
- else
+ else if (Ranges.size() == 1)
{
- const auto Range = Ranges[0];
- const uint64_t RangeSize = 1 + (Range.End - Range.Start);
- const uint64_t TotalServed = m_TotalBytesServed.fetch_add(RangeSize) + RangeSize;
-
- ZEN_LOG_DEBUG(LogObj,
- "GET - '{}/{}' (Range: {}-{}) ({}/{}) [OK] (Served: {})",
- BucketName,
- RelativeBucketPath,
- Range.Start,
- Range.End,
- NiceBytes(RangeSize),
- NiceBytes(FileBuf.Size()),
- NiceBytes(TotalServed));
-
- MemoryView RangeView = FileBuf.GetView().Mid(Range.Start, RangeSize);
- if (RangeView.GetSize() != RangeSize)
+ const uint64_t TotalSize = FileBuf.GetSize();
+ const uint64_t RangeEnd = (Ranges[0].End != ~uint64_t(0)) ? Ranges[0].End : TotalSize - 1;
+ if (RangeEnd < TotalSize && Ranges[0].Start <= RangeEnd)
{
- return Request.ServerRequest().WriteResponse(HttpResponseCode::BadRequest);
+ const uint64_t RangeSize = 1 + (RangeEnd - Ranges[0].Start);
+ const uint64_t TotalServed = m_TotalBytesServed.fetch_add(RangeSize) + RangeSize;
+ ZEN_LOG_DEBUG(LogObj,
+ "GET - '{}/{}' (Range: {}-{}) ({}/{}) [OK] (Served: {})",
+ BucketName,
+ RelativeBucketPath,
+ Ranges[0].Start,
+ RangeEnd,
+ NiceBytes(RangeSize),
+ NiceBytes(TotalSize),
+ NiceBytes(TotalServed));
}
-
- IoBuffer RangeBuf = IoBuffer(IoBuffer::Wrap, RangeView.GetData(), RangeView.GetSize());
- Request.ServerRequest().WriteResponse(HttpResponseCode::PartialContent, HttpContentType::kBinary, RangeBuf);
}
+ Request.ServerRequest().WriteResponse(HttpContentType::kBinary, FileBuf, Ranges);
}
void
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 43dc389e2..815762e3b 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -1391,7 +1391,7 @@ TEST_CASE("compactcas.compact.gc")
{
ScopedTemporaryDirectory TempDir;
- const int kIterationCount = 1000;
+ const int kIterationCount = 200;
std::vector<IoHash> Keys(kIterationCount);
@@ -1504,7 +1504,7 @@ TEST_CASE("compactcas.threadedinsert")
ScopedTemporaryDirectory TempDir;
const uint64_t kChunkSize = 1048;
- const int32_t kChunkCount = 2048;
+ const int32_t kChunkCount = 512;
uint64_t ExpectedSize = 0;
tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher> Chunks;
@@ -1803,7 +1803,7 @@ TEST_CASE("compactcas.restart")
}
const uint64_t kChunkSize = 1048 + 395;
- const size_t kChunkCount = 7167;
+ const size_t kChunkCount = 2000;
std::vector<IoHash> Hashes;
Hashes.reserve(kChunkCount);
@@ -1984,9 +1984,8 @@ TEST_CASE("compactcas.iteratechunks")
WorkerThreadPool ThreadPool(Max(GetHardwareConcurrency() - 1u, 2u), "put");
const uint64_t kChunkSize = 1048 + 395;
- const size_t kChunkCount = 63840;
+ const size_t kChunkCount = 10000;
- for (uint32_t N = 0; N < 2; N++)
{
GcManager Gc;
CasContainerStrategy Cas(Gc);
@@ -2017,7 +2016,7 @@ TEST_CASE("compactcas.iteratechunks")
size_t BatchCount = Min<size_t>(kChunkCount - Offset, 512u);
WorkLatch.AddCount(1);
ThreadPool.ScheduleWork(
- [N, &WorkLatch, &InsertLock, &ChunkHashesLookup, &ExpectedSize, &Hashes, &Cas, Offset, BatchCount]() {
+ [&WorkLatch, &InsertLock, &ChunkHashesLookup, &ExpectedSize, &Hashes, &Cas, Offset, BatchCount]() {
auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
std::vector<IoBuffer> BatchBlobs;
@@ -2028,7 +2027,7 @@ TEST_CASE("compactcas.iteratechunks")
while (BatchBlobs.size() < BatchCount)
{
IoBuffer Chunk = CreateRandomBlob(
- N + kChunkSize + ((BatchHashes.size() % 100) + (BatchHashes.size() % 7) * 315u + Offset % 377));
+ kChunkSize + ((BatchHashes.size() % 100) + (BatchHashes.size() % 7) * 315u + Offset % 377));
IoHash Hash = IoHash::HashBuffer(Chunk);
{
RwLock::ExclusiveLockScope __(InsertLock);
diff --git a/src/zenutil/consul/consul.cpp b/src/zenutil/consul/consul.cpp
index c9884ea61..430ad0e32 100644
--- a/src/zenutil/consul/consul.cpp
+++ b/src/zenutil/consul/consul.cpp
@@ -689,7 +689,7 @@ TEST_CASE("util.consul.service_lifecycle")
REQUIRE(Client.RegisterService(Info));
REQUIRE(Client.HasService(ServiceId));
- REQUIRE(WaitForCondition([&]() { return HealthServer.Mock.HealthCheckCount.load() >= 1; }, 10000));
+ REQUIRE(WaitForCondition([&]() { return HealthServer.Mock.HealthCheckCount.load() >= 1; }, 10000, 50));
CHECK(HealthServer.Mock.HealthCheckCount.load() >= 1);
CHECK_EQ(GetCheckStatus(Client, ServiceId), "passing");
@@ -709,13 +709,13 @@ TEST_CASE("util.consul.service_lifecycle")
CHECK_EQ(HealthServer.Mock.HealthCheckCount.load(), 0);
CHECK_EQ(GetCheckStatus(Client, ServiceId), "passing");
- REQUIRE(WaitForCondition([&]() { return HealthServer.Mock.HealthCheckCount.load() >= 1; }, 10000));
+ REQUIRE(WaitForCondition([&]() { return HealthServer.Mock.HealthCheckCount.load() >= 1; }, 10000, 50));
CHECK_EQ(GetCheckStatus(Client, ServiceId), "passing");
HealthServer.Mock.FailHealth.store(true);
// Wait for Consul to observe the failing check
- REQUIRE(WaitForCondition([&]() { return GetCheckStatus(Client, ServiceId) == "critical"; }, 10000));
+ REQUIRE(WaitForCondition([&]() { return GetCheckStatus(Client, ServiceId) == "critical"; }, 10000, 50));
CHECK_EQ(GetCheckStatus(Client, ServiceId), "critical");
// Phase 4: Explicit deregister while critical
diff --git a/src/zenutil/filesystemutils.cpp b/src/zenutil/filesystemutils.cpp
index 9b7953f95..ccc42a838 100644
--- a/src/zenutil/filesystemutils.cpp
+++ b/src/zenutil/filesystemutils.cpp
@@ -657,7 +657,7 @@ namespace {
void GenerateFile(const std::filesystem::path& Path) { BasicFile _(Path, BasicFile::Mode::kTruncate); }
} // namespace
-TEST_SUITE_BEGIN("zenutil.filesystemutils");
+TEST_SUITE_BEGIN("util.filesystemutils");
TEST_CASE("filesystemutils.CleanDirectory")
{
diff --git a/src/zenutil/process/subprocessmanager.cpp b/src/zenutil/process/subprocessmanager.cpp
index e908dd63a..597df3c15 100644
--- a/src/zenutil/process/subprocessmanager.cpp
+++ b/src/zenutil/process/subprocessmanager.cpp
@@ -1209,7 +1209,17 @@ TEST_CASE("SubprocessManager.SpawnAndDetectExit")
CallbackFired = true;
});
- IoContext.run_for(5s);
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (CallbackFired)
+ {
+ break;
+ }
+ }
+ }
CHECK(CallbackFired);
CHECK(ReceivedExitCode == 42);
@@ -1234,7 +1244,17 @@ TEST_CASE("SubprocessManager.SpawnAndDetectCleanExit")
CallbackFired = true;
});
- IoContext.run_for(5s);
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (CallbackFired)
+ {
+ break;
+ }
+ }
+ }
CHECK(CallbackFired);
CHECK(ReceivedExitCode == 0);
@@ -1259,7 +1279,17 @@ TEST_CASE("SubprocessManager.StdoutCapture")
ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; });
- IoContext.run_for(5s);
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (Exited)
+ {
+ break;
+ }
+ }
+ }
CHECK(Exited);
std::string Captured = Proc->GetCapturedStdout();
@@ -1288,7 +1318,17 @@ TEST_CASE("SubprocessManager.StderrCapture")
ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; });
- IoContext.run_for(5s);
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (Exited)
+ {
+ break;
+ }
+ }
+ }
CHECK(Exited);
std::string CapturedErr = Proc->GetCapturedStderr();
@@ -1320,7 +1360,17 @@ TEST_CASE("SubprocessManager.StdoutCallback")
[&](ManagedProcess&, int) { Exited = true; },
[&](ManagedProcess&, std::string_view Data) { ReceivedData.append(Data); });
- IoContext.run_for(5s);
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (Exited)
+ {
+ break;
+ }
+ }
+ }
CHECK(Exited);
CHECK(ReceivedData.find("callback_test") != std::string::npos);
@@ -1343,8 +1393,18 @@ TEST_CASE("SubprocessManager.MetricsSampling")
ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; });
- // Run for enough time to get metrics samples
- IoContext.run_for(1s);
+ // Poll until metrics are available
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (Proc->GetLatestMetrics().WorkingSetSize > 0)
+ {
+ break;
+ }
+ }
+ }
ProcessMetrics Metrics = Proc->GetLatestMetrics();
CHECK(Metrics.WorkingSetSize > 0);
@@ -1353,7 +1413,17 @@ TEST_CASE("SubprocessManager.MetricsSampling")
CHECK(Snapshot.size() == 1);
// Let it finish
- IoContext.run_for(3s);
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 10'000)
+ {
+ IoContext.run_for(10ms);
+ if (Exited)
+ {
+ break;
+ }
+ }
+ }
CHECK(Exited);
}
@@ -1402,12 +1472,31 @@ TEST_CASE("SubprocessManager.KillAndWaitForExit")
ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { CallbackFired = true; });
// Let it start
- IoContext.run_for(200ms);
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (Proc->IsRunning())
+ {
+ break;
+ }
+ }
+ }
Proc->Kill();
- IoContext.run_for(2s);
-
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (CallbackFired)
+ {
+ break;
+ }
+ }
+ }
CHECK(CallbackFired);
}
@@ -1428,7 +1517,17 @@ TEST_CASE("SubprocessManager.AdoptProcess")
Manager.Adopt(ProcessHandle(Result), [&](ManagedProcess&, int ExitCode) { ReceivedExitCode = ExitCode; });
- IoContext.run_for(5s);
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (ReceivedExitCode != -1)
+ {
+ break;
+ }
+ }
+ }
CHECK(ReceivedExitCode == 7);
}
@@ -1451,7 +1550,17 @@ TEST_CASE("SubprocessManager.UserTag")
Proc->SetTag("my-worker-1");
CHECK(Proc->GetTag() == "my-worker-1");
- IoContext.run_for(5s);
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (!ReceivedTag.empty())
+ {
+ break;
+ }
+ }
+ }
CHECK(ReceivedTag == "my-worker-1");
}
@@ -1481,7 +1590,17 @@ TEST_CASE("ProcessGroup.SpawnAndMembership")
CHECK(Group->GetProcessCount() == 2);
CHECK(Manager.GetProcessCount() == 2);
- IoContext.run_for(5s);
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (ExitCount == 2)
+ {
+ break;
+ }
+ }
+ }
CHECK(ExitCount == 2);
}
@@ -1531,7 +1650,17 @@ TEST_CASE("ProcessGroup.AggregateMetrics")
Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {});
// Wait for metrics sampling
- IoContext.run_for(1s);
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (Group->GetAggregateMetrics().TotalWorkingSetSize > 0)
+ {
+ break;
+ }
+ }
+ }
AggregateProcessMetrics GroupAgg = Group->GetAggregateMetrics();
CHECK(GroupAgg.ProcessCount == 2);
@@ -1597,7 +1726,17 @@ TEST_CASE("ProcessGroup.MixedGroupedAndUngrouped")
CHECK(Group->GetProcessCount() == 2);
CHECK(Manager.GetProcessCount() == 3);
- IoContext.run_for(5s);
+ {
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < 5'000)
+ {
+ IoContext.run_for(10ms);
+ if (GroupExitCount == 2 && UngroupedExitCode != -1)
+ {
+ break;
+ }
+ }
+ }
CHECK(GroupExitCount == 2);
CHECK(UngroupedExitCode == 0);