diff options
38 files changed, 2260 insertions, 1605 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e9cfa1cd..01f340f59 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,19 @@ ## - +- Improvement: Replaced archived `http_parser` library with `llhttp` for HTTP request parsing +- Improvement: HTTP range responses (RFC 7233) are now fully compliant across the object store and build store + - 206 Partial Content responses now include a `Content-Range` header; previously absent for single-range requests, which broke `HttpClient::GetRanges()` + - 416 Range Not Satisfiable responses now include `Content-Range: bytes */N` as required by RFC 7233 + - Out-of-bounds range requests return 416 Range Not Satisfiable (was 400 Bad Request) + - Single-byte ranges (`bytes=N-N`) are now correctly accepted (were previously rejected) + - Range byte positions widened from 32-bit to 64-bit; RFC 7233 imposes no size limit on byte range values + - Build store binary GET requests with a Range header now return 206 Partial Content with `Content-Range` (previously returned 200 OK without it) +- Improvement: Updated rpmalloc to develop branch commit 262c698d7019 (2026-04-10), which fixes memory ordering on weak architectures and avoids assert on mmap failure with callback +- Improvement: Increased rpmalloc page decommit thresholds to reduce commit/decommit churn under high allocation turnover +- Improvement: Disk full error message for `builds download` now shows human-readable sizes and available free space +- Improvement: Dashboard paginated lists now include a search input that jumps to the page containing the first match and highlights the row +- Improvement: Dashboard paginated lists show a loading indicator while fetching data +- Improvement: Hub dashboard navigates to and highlights newly provisioned instances +- Feature: Hub bulk deprovision endpoint (`POST /hub/deprovision`) tears down all provisioned and hibernated modules in a single request - Bugfix: Added logic to shared memory instance state management to ensure unclean shutdown followed by restart with identical pid doesn't lead to errors. Particularly likely to happen when running on k8s ## 5.8.3 @@ -22,12 +36,11 @@ - Improvement: Cache and Projects pages now paginate and sort lists consistently with the front page - Improvement: Hub dashboard adds obliterate button for individual, bulk, and by-name module deletion - Improvement: Consul service registration for fully provisioned hub instances now sets initial check status to passing -- Improvement: Replaced archived `http_parser` library with `llhttp` for HTTP request parsing - Bugfix: Fixed hub Consul health check registering with `/hub/health` endpoint which does not exist; now uses `/health` - Bugfix: Fixed oplog export crash when a temp attachment file already exists in the pending directory - Bugfix: Hub dashboard Resources tile was missing total disk space - Bugfix: Fixed oplog export losing small attachments when block reuse is active; reused blocks were omitted from the container -- Bugfix: Object store now falls back to sending full payloads and 200 when receiving multi-range requests instead of responding with bad request +- Bugfix: Object store now falls back to sending full payloads and 200 when receiving multi-range requests instead of responding with bad request ## 5.8.2 - Feature: Hub dashboard proxy - instance dashboards are accessible through the hub server at `/hub/proxy/{port}/` without requiring direct port access diff --git a/src/zenhttp/httpclient_test.cpp b/src/zenhttp/httpclient_test.cpp index af653cbb2..9b3148a4a 100644 --- a/src/zenhttp/httpclient_test.cpp +++ b/src/zenhttp/httpclient_test.cpp @@ -194,7 +194,7 @@ public: "slow", [](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponseAsync([](HttpServerRequest& Request) { - Sleep(2000); + Sleep(100); Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, "slow response"); }); }, @@ -750,7 +750,9 @@ TEST_CASE("httpclient.error-handling") { SUBCASE("Connection refused") { - HttpClient Client("127.0.0.1:19999", HttpClientSettings{}, /*CheckIfAbortFunction*/ {}); + HttpClientSettings Settings; + Settings.ConnectTimeout = std::chrono::milliseconds(200); + HttpClient Client("127.0.0.1:19999", Settings, /*CheckIfAbortFunction*/ {}); HttpClient::Response Resp = Client.Get("/api/test/hello"); CHECK(!Resp.IsSuccess()); CHECK(Resp.Error.has_value()); @@ -760,7 +762,7 @@ TEST_CASE("httpclient.error-handling") { TestServerFixture Fixture; HttpClientSettings Settings; - Settings.Timeout = std::chrono::milliseconds(500); + Settings.Timeout = std::chrono::milliseconds(50); HttpClient Client = Fixture.MakeClient(Settings); HttpClient::Response Resp = Client.Get("/api/test/slow"); @@ -970,7 +972,9 @@ TEST_CASE("httpclient.measurelatency") SUBCASE("Failed measurement against unreachable port") { - HttpClient Client("127.0.0.1:19999", HttpClientSettings{}, /*CheckIfAbortFunction*/ {}); + HttpClientSettings Settings; + Settings.ConnectTimeout = std::chrono::milliseconds(200); + HttpClient Client("127.0.0.1:19999", Settings, /*CheckIfAbortFunction*/ {}); LatencyTestResult Result = MeasureLatency(Client, "/api/test/hello"); CHECK(!Result.Success); CHECK(!Result.FailureReason.empty()); diff --git a/src/zenhttp/httpserver.cpp b/src/zenhttp/httpserver.cpp index 38021be16..3668e1e8f 100644 --- a/src/zenhttp/httpserver.cpp +++ b/src/zenhttp/httpserver.cpp @@ -266,10 +266,10 @@ TryParseHttpRangeHeader(std::string_view RangeHeader, HttpRanges& Ranges) return false; } - const auto Start = ParseInt<uint32_t>(Token.substr(0, Delim)); - const auto End = ParseInt<uint32_t>(Token.substr(Delim + 1)); + const auto Start = ParseInt<uint64_t>(Token.substr(0, Delim)); + const auto End = ParseInt<uint64_t>(Token.substr(Delim + 1)); - if (Start.has_value() && End.has_value() && End.value() > Start.value()) + if (Start.has_value() && End.has_value() && End.value() >= Start.value()) { Ranges.push_back({.Start = Start.value(), .End = End.value()}); } @@ -286,6 +286,45 @@ TryParseHttpRangeHeader(std::string_view RangeHeader, HttpRanges& Ranges) return Count != Ranges.size(); } +MultipartByteRangesResult +BuildMultipartByteRanges(const IoBuffer& Data, const HttpRanges& Ranges) +{ + Oid::String_t BoundaryStr; + Oid::NewOid().ToString(BoundaryStr); + std::string_view Boundary(BoundaryStr, Oid::StringLength); + + const uint64_t TotalSize = Data.GetSize(); + + std::vector<IoBuffer> Parts; + Parts.reserve(Ranges.size() * 2 + 1); + + for (const HttpRange& Range : Ranges) + { + uint64_t RangeEnd = (Range.End != ~uint64_t(0)) ? Range.End : TotalSize - 1; + if (RangeEnd >= TotalSize || Range.Start > RangeEnd) + { + return {}; + } + + uint64_t RangeSize = 1 + (RangeEnd - Range.Start); + + std::string PartHeader = fmt::format("\r\n--{}\r\nContent-Type: application/octet-stream\r\nContent-Range: bytes {}-{}/{}\r\n\r\n", + Boundary, + Range.Start, + RangeEnd, + TotalSize); + Parts.push_back(IoBufferBuilder::MakeCloneFromMemory(PartHeader.data(), PartHeader.size())); + + IoBuffer RangeData(Data, Range.Start, RangeSize); + Parts.push_back(RangeData); + } + + std::string ClosingBoundary = fmt::format("\r\n--{}--", Boundary); + Parts.push_back(IoBufferBuilder::MakeCloneFromMemory(ClosingBoundary.data(), ClosingBoundary.size())); + + return {.Parts = std::move(Parts), .ContentType = fmt::format("multipart/byteranges; boundary={}", Boundary)}; +} + ////////////////////////////////////////////////////////////////////////// const std::string_view @@ -564,6 +603,56 @@ HttpServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentType } void +HttpServerRequest::WriteResponse(HttpContentType ContentType, const IoBuffer& Data, const HttpRanges& Ranges) +{ + if (Ranges.empty()) + { + WriteResponse(HttpResponseCode::OK, ContentType, IoBuffer(Data)); + return; + } + + if (Ranges.size() == 1) + { + const HttpRange& Range = Ranges[0]; + const uint64_t TotalSize = Data.GetSize(); + // ~uint64_t(0) is the sentinel meaning "end of file" (suffix range). + const uint64_t RangeEnd = (Range.End != ~uint64_t(0)) ? Range.End : TotalSize - 1; + + if (RangeEnd >= TotalSize || Range.Start > RangeEnd) + { + m_ContentRangeHeader = fmt::format("bytes */{}", TotalSize); + WriteResponse(HttpResponseCode::RangeNotSatisfiable); + return; + } + + const uint64_t RangeSize = 1 + (RangeEnd - Range.Start); + IoBuffer RangeBuf(Data, Range.Start, RangeSize); + + m_ContentRangeHeader = fmt::format("bytes {}-{}/{}", Range.Start, RangeEnd, TotalSize); + WriteResponse(HttpResponseCode::PartialContent, ContentType, std::move(RangeBuf)); + return; + } + + // Multi-range + MultipartByteRangesResult MultipartResult = BuildMultipartByteRanges(Data, Ranges); + if (MultipartResult.Parts.empty()) + { + m_ContentRangeHeader = fmt::format("bytes */{}", Data.GetSize()); + WriteResponse(HttpResponseCode::RangeNotSatisfiable); + return; + } + WriteResponse(HttpResponseCode::PartialContent, std::move(MultipartResult.ContentType), std::span<IoBuffer>(MultipartResult.Parts)); +} + +void +HttpServerRequest::WriteResponse(HttpResponseCode ResponseCode, const std::string& CustomContentType, std::span<IoBuffer> Blobs) +{ + ZEN_ASSERT(ParseContentType(CustomContentType) == HttpContentType::kUnknownContentType); + m_ContentTypeOverride = CustomContentType; + WriteResponse(ResponseCode, HttpContentType::kBinary, Blobs); +} + +void HttpServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, CompositeBuffer& Payload) { std::span<const SharedBuffer> Segments = Payload.GetSegments(); diff --git a/src/zenhttp/include/zenhttp/httpcommon.h b/src/zenhttp/include/zenhttp/httpcommon.h index f9a99f3cc..1d921600d 100644 --- a/src/zenhttp/include/zenhttp/httpcommon.h +++ b/src/zenhttp/include/zenhttp/httpcommon.h @@ -19,8 +19,8 @@ class StringBuilderBase; struct HttpRange { - uint32_t Start = ~uint32_t(0); - uint32_t End = ~uint32_t(0); + uint64_t Start = ~uint64_t(0); + uint64_t End = ~uint64_t(0); }; using HttpRanges = std::vector<HttpRange>; @@ -30,6 +30,16 @@ extern HttpContentType (*ParseContentType)(const std::string_view& ContentTypeSt std::string_view ReasonStringForHttpResultCode(int HttpCode); bool TryParseHttpRangeHeader(std::string_view RangeHeader, HttpRanges& Ranges); +struct MultipartByteRangesResult +{ + std::vector<IoBuffer> Parts; + std::string ContentType; +}; + +// Build a multipart/byteranges response body from the given data and ranges. +// Generates a unique boundary per call. Returns empty Parts if any range is out of bounds. +MultipartByteRangesResult BuildMultipartByteRanges(const IoBuffer& Data, const HttpRanges& Ranges); + enum class HttpVerb : uint8_t { kGet = 1 << 0, diff --git a/src/zenhttp/include/zenhttp/httpserver.h b/src/zenhttp/include/zenhttp/httpserver.h index e8bfcfd4d..136304426 100644 --- a/src/zenhttp/include/zenhttp/httpserver.h +++ b/src/zenhttp/include/zenhttp/httpserver.h @@ -122,11 +122,13 @@ public: virtual void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::u8string_view ResponseString) = 0; virtual void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, CompositeBuffer& Payload); + void WriteResponse(HttpResponseCode ResponseCode, const std::string& CustomContentType, std::span<IoBuffer> Blobs); void WriteResponse(HttpResponseCode ResponseCode, CbObject Data); void WriteResponse(HttpResponseCode ResponseCode, CbArray Array); void WriteResponse(HttpResponseCode ResponseCode, CbPackage Package); void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::string_view ResponseString); void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, IoBuffer Blob); + void WriteResponse(HttpContentType ContentType, const IoBuffer& Data, const HttpRanges& Ranges); virtual void WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler) = 0; @@ -152,6 +154,8 @@ protected: std::string_view m_QueryString; mutable uint32_t m_RequestId = ~uint32_t(0); mutable Oid m_SessionId = Oid::Zero; + std::string m_ContentTypeOverride; + std::string m_ContentRangeHeader; inline void SetIsHandled() { m_Flags |= kIsHandled; } diff --git a/src/zenhttp/servers/httpasio.cpp b/src/zenhttp/servers/httpasio.cpp index 6cda84875..cfba3c95f 100644 --- a/src/zenhttp/servers/httpasio.cpp +++ b/src/zenhttp/servers/httpasio.cpp @@ -625,6 +625,8 @@ public: void SetAllowZeroCopyFileSend(bool Allow) { m_AllowZeroCopyFileSend = Allow; } void SetKeepAlive(bool KeepAlive) { m_IsKeepAlive = KeepAlive; } + void SetContentTypeOverride(std::string Override) { m_ContentTypeOverride = std::move(Override); } + void SetContentRangeHeader(std::string V) { m_ContentRangeHeader = std::move(V); } /** * Initialize the response for sending a payload made up of multiple blobs @@ -768,10 +770,18 @@ public: { ZEN_MEMSCOPE(GetHttpasioTag()); + std::string_view ContentTypeStr = + m_ContentTypeOverride.empty() ? MapContentTypeToString(m_ContentType) : std::string_view(m_ContentTypeOverride); + m_Headers << "HTTP/1.1 " << ResponseCode() << " " << ReasonStringForHttpResultCode(ResponseCode()) << "\r\n" - << "Content-Type: " << MapContentTypeToString(m_ContentType) << "\r\n" + << "Content-Type: " << ContentTypeStr << "\r\n" << "Content-Length: " << ContentLength() << "\r\n"sv; + if (!m_ContentRangeHeader.empty()) + { + m_Headers << "Content-Range: " << m_ContentRangeHeader << "\r\n"sv; + } + if (!m_IsKeepAlive) { m_Headers << "Connection: close\r\n"sv; @@ -898,7 +908,9 @@ private: bool m_AllowZeroCopyFileSend = true; State m_State = State::kUninitialized; HttpContentType m_ContentType = HttpContentType::kBinary; - uint64_t m_ContentLength = 0; + std::string m_ContentTypeOverride; + std::string m_ContentRangeHeader; + uint64_t m_ContentLength = 0; eastl::fixed_vector<IoBuffer, 8> m_DataBuffers; // This is here to keep the IoBuffer buffers/handles alive ExtendableStringBuilder<160> m_Headers; @@ -2131,6 +2143,10 @@ HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode) m_Response.reset(new HttpResponse(HttpContentType::kBinary, m_RequestNumber)); m_Response->SetAllowZeroCopyFileSend(m_AllowZeroCopyFileSend); m_Response->SetKeepAlive(m_Request.IsKeepAlive()); + if (!m_ContentRangeHeader.empty()) + { + m_Response->SetContentRangeHeader(std::move(m_ContentRangeHeader)); + } std::array<IoBuffer, 0> Empty; m_Response->InitializeForPayload((uint16_t)ResponseCode, Empty); @@ -2146,6 +2162,14 @@ HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentT m_Response.reset(new HttpResponse(ContentType, m_RequestNumber)); m_Response->SetAllowZeroCopyFileSend(m_AllowZeroCopyFileSend); m_Response->SetKeepAlive(m_Request.IsKeepAlive()); + if (!m_ContentTypeOverride.empty()) + { + m_Response->SetContentTypeOverride(std::move(m_ContentTypeOverride)); + } + if (!m_ContentRangeHeader.empty()) + { + m_Response->SetContentRangeHeader(std::move(m_ContentRangeHeader)); + } m_Response->InitializeForPayload((uint16_t)ResponseCode, Blobs); } diff --git a/src/zenhttp/servers/httpplugin.cpp b/src/zenhttp/servers/httpplugin.cpp index 31b0315d4..b0fb020e0 100644 --- a/src/zenhttp/servers/httpplugin.cpp +++ b/src/zenhttp/servers/httpplugin.cpp @@ -185,13 +185,17 @@ public: const std::vector<IoBuffer>& ResponseBuffers() const { return m_ResponseBuffers; } void SuppressPayload() { m_ResponseBuffers.resize(1); } + void SetContentTypeOverride(std::string Override) { m_ContentTypeOverride = std::move(Override); } + void SetContentRangeHeader(std::string V) { m_ContentRangeHeader = std::move(V); } std::string_view GetHeaders(); private: - uint16_t m_ResponseCode = 0; - bool m_IsKeepAlive = true; - HttpContentType m_ContentType = HttpContentType::kBinary; + uint16_t m_ResponseCode = 0; + bool m_IsKeepAlive = true; + HttpContentType m_ContentType = HttpContentType::kBinary; + std::string m_ContentTypeOverride; + std::string m_ContentRangeHeader; uint64_t m_ContentLength = 0; std::vector<IoBuffer> m_ResponseBuffers; ExtendableStringBuilder<160> m_Headers; @@ -246,10 +250,18 @@ HttpPluginResponse::GetHeaders() if (m_Headers.Size() == 0) { + std::string_view ContentTypeStr = + m_ContentTypeOverride.empty() ? MapContentTypeToString(m_ContentType) : std::string_view(m_ContentTypeOverride); + m_Headers << "HTTP/1.1 " << ResponseCode() << " " << ReasonStringForHttpResultCode(ResponseCode()) << "\r\n" - << "Content-Type: " << MapContentTypeToString(m_ContentType) << "\r\n" + << "Content-Type: " << ContentTypeStr << "\r\n" << "Content-Length: " << ContentLength() << "\r\n"sv; + if (!m_ContentRangeHeader.empty()) + { + m_Headers << "Content-Range: " << m_ContentRangeHeader << "\r\n"sv; + } + if (!m_IsKeepAlive) { m_Headers << "Connection: close\r\n"sv; @@ -669,6 +681,10 @@ HttpPluginServerRequest::WriteResponse(HttpResponseCode ResponseCode) ZEN_MEMSCOPE(GetHttppluginTag()); m_Response.reset(new HttpPluginResponse(HttpContentType::kBinary)); + if (!m_ContentRangeHeader.empty()) + { + m_Response->SetContentRangeHeader(std::move(m_ContentRangeHeader)); + } std::array<IoBuffer, 0> Empty; m_Response->InitializeForPayload((uint16_t)ResponseCode, Empty); @@ -681,6 +697,14 @@ HttpPluginServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpConten ZEN_MEMSCOPE(GetHttppluginTag()); m_Response.reset(new HttpPluginResponse(ContentType)); + if (!m_ContentTypeOverride.empty()) + { + m_Response->SetContentTypeOverride(std::move(m_ContentTypeOverride)); + } + if (!m_ContentRangeHeader.empty()) + { + m_Response->SetContentRangeHeader(std::move(m_ContentRangeHeader)); + } m_Response->InitializeForPayload((uint16_t)ResponseCode, Blobs); } diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp index 1b722940d..d45804c50 100644 --- a/src/zenhttp/servers/httpsys.cpp +++ b/src/zenhttp/servers/httpsys.cpp @@ -464,6 +464,8 @@ public: inline int64_t GetResponseBodySize() const { return m_TotalDataSize; } void SetLocationHeader(std::string_view Location) { m_LocationHeader = Location; } + void SetContentTypeOverride(std::string Override) { m_ContentTypeOverride = std::move(Override); } + void SetContentRangeHeader(std::string V) { m_ContentRangeHeader = std::move(V); } private: eastl::fixed_vector<HTTP_DATA_CHUNK, 16> m_HttpDataChunks; @@ -473,6 +475,8 @@ private: uint32_t m_RemainingChunkCount = 0; // Backlog for multi-call sends bool m_IsInitialResponse = true; HttpContentType m_ContentType = HttpContentType::kBinary; + std::string m_ContentTypeOverride; + std::string m_ContentRangeHeader; eastl::fixed_vector<IoBuffer, 16> m_DataBuffers; std::string m_LocationHeader; @@ -725,7 +729,8 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode) PHTTP_KNOWN_HEADER ContentTypeHeader = &HttpResponse.Headers.KnownHeaders[HttpHeaderContentType]; - std::string_view ContentTypeString = MapContentTypeToString(m_ContentType); + std::string_view ContentTypeString = + m_ContentTypeOverride.empty() ? MapContentTypeToString(m_ContentType) : std::string_view(m_ContentTypeOverride); ContentTypeHeader->pRawValue = ContentTypeString.data(); ContentTypeHeader->RawValueLength = (USHORT)ContentTypeString.size(); @@ -739,6 +744,15 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode) LocationHeader->RawValueLength = (USHORT)m_LocationHeader.size(); } + // Content-Range header (for 206 Partial Content single-range responses) + + if (!m_ContentRangeHeader.empty()) + { + PHTTP_KNOWN_HEADER ContentRangeHeader = &HttpResponse.Headers.KnownHeaders[HttpHeaderContentRange]; + ContentRangeHeader->pRawValue = m_ContentRangeHeader.data(); + ContentRangeHeader->RawValueLength = (USHORT)m_ContentRangeHeader.size(); + } + std::string_view ReasonString = ReasonStringForHttpResultCode(m_ResponseCode); HttpResponse.StatusCode = m_ResponseCode; @@ -2279,6 +2293,11 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode) HttpMessageResponseRequest* Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode); + if (!m_ContentRangeHeader.empty()) + { + Response->SetContentRangeHeader(std::move(m_ContentRangeHeader)); + } + if (SuppressBody()) { Response->SuppressResponseBody(); @@ -2307,6 +2326,15 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentTy HttpMessageResponseRequest* Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, Blobs); + if (!m_ContentTypeOverride.empty()) + { + Response->SetContentTypeOverride(std::move(m_ContentTypeOverride)); + } + if (!m_ContentRangeHeader.empty()) + { + Response->SetContentRangeHeader(std::move(m_ContentRangeHeader)); + } + if (SuppressBody()) { Response->SuppressResponseBody(); diff --git a/src/zenhttp/servers/wstest.cpp b/src/zenhttp/servers/wstest.cpp index 363c478ae..872698ee5 100644 --- a/src/zenhttp/servers/wstest.cpp +++ b/src/zenhttp/servers/wstest.cpp @@ -5,6 +5,7 @@ # include <zencore/scopeguard.h> # include <zencore/testing.h> # include <zencore/testutils.h> +# include <zencore/timer.h> # include <zenhttp/httpserver.h> # include <zenhttp/httpwsclient.h> @@ -477,6 +478,23 @@ namespace { return Result; } + static void WaitForServerListening(int Port) + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + asio::io_context IoCtx; + asio::ip::tcp::socket Probe(IoCtx); + asio::error_code Ec; + Probe.connect(asio::ip::tcp::endpoint(asio::ip::make_address("127.0.0.1"), static_cast<uint16_t>(Port)), Ec); + if (!Ec) + { + return; + } + Sleep(10); + } + } + } // anonymous namespace TEST_CASE("websocket.integration") @@ -502,8 +520,8 @@ TEST_CASE("websocket.integration") Server->Close(); }); - // Give server a moment to start accepting - Sleep(100); + // Wait for server to start accepting + WaitForServerListening(Port); SUBCASE("handshake succeeds with 101") { @@ -814,7 +832,7 @@ TEST_CASE("websocket.client") Server->Close(); }); - Sleep(100); + WaitForServerListening(Port); SUBCASE("connect, echo, close") { @@ -938,7 +956,7 @@ TEST_CASE("websocket.client.unixsocket") Server->Close(); }); - Sleep(100); + WaitForServerListening(Port); SUBCASE("connect, echo, close over unix socket") { diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index c8cf3212c..1f8b96cc4 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -3036,7 +3036,10 @@ BuildsOperationUpdateFolder::CheckRequiredDiskSpace(const tsl::robin_map<std::st if (Space.Free < (RequiredSpace + 16u * 1024u * 1024u)) { throw std::runtime_error( - fmt::format("Not enough free space for target path '{}', {} of free space is needed", m_Path, RequiredSpace)); + fmt::format("Not enough free space for target path '{}', {} of free space is needed but only {} is available", + m_Path, + NiceBytes(RequiredSpace), + NiceBytes(Space.Free))); } } diff --git a/src/zenserver-test/buildstore-tests.cpp b/src/zenserver-test/buildstore-tests.cpp index cf9b10896..1f2157993 100644 --- a/src/zenserver-test/buildstore-tests.cpp +++ b/src/zenserver-test/buildstore-tests.cpp @@ -148,8 +148,6 @@ TEST_CASE("buildstore.blobs") } { - // Single-range Get - ZenServerInstance Instance(TestEnv); const uint16_t PortNumber = @@ -158,6 +156,7 @@ TEST_CASE("buildstore.blobs") HttpClient Client(Instance.GetBaseUri() + "/builds/"); + // Single-range Get { const IoHash& RawHash = CompressedBlobsHashes.front(); uint64_t BlobSize = CompressedBlobsSizes.front(); @@ -183,20 +182,63 @@ TEST_CASE("buildstore.blobs") MemoryView RangeView = Payload.GetView(); CHECK(ActualRange.EqualBytes(RangeView)); } - } - { - // Single-range Post + { + // GET blob not found + IoHash FakeHash = IoHash::HashBuffer("nonexistent", 11); + HttpClient::Response Result = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, FakeHash)); + CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound); + } - ZenServerInstance Instance(TestEnv); + { + // GET with out-of-bounds range + const IoHash& RawHash = CompressedBlobsHashes.front(); + uint64_t BlobSize = CompressedBlobsSizes.front(); - const uint16_t PortNumber = - Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); - CHECK(PortNumber != 0); + HttpClient::KeyValueMap Headers; + Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", BlobSize + 100, BlobSize + 200)}); - HttpClient Client(Instance.GetBaseUri() + "/builds/"); + HttpClient::Response Result = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), Headers); + CHECK_EQ(Result.StatusCode, HttpResponseCode::RangeNotSatisfiable); + } { + // GET with multi-range header (uses Download for multipart boundary parsing) + const IoHash& RawHash = CompressedBlobsHashes.front(); + uint64_t BlobSize = CompressedBlobsSizes.front(); + + uint64_t Range1Start = 0; + uint64_t Range1End = BlobSize / 4 - 1; + uint64_t Range2Start = BlobSize / 2; + uint64_t Range2End = BlobSize / 2 + BlobSize / 4 - 1; + + HttpClient::KeyValueMap Headers; + Headers.Entries.insert({"Range", fmt::format("bytes={}-{},{}-{}", Range1Start, Range1End, Range2Start, Range2End)}); + + HttpClient::Response Result = + Client.Download(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), SystemRootPath, Headers); + CHECK_EQ(Result.StatusCode, HttpResponseCode::PartialContent); + REQUIRE_EQ(Result.Ranges.size(), 2); + + HttpClient::Response FullBlobResult = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), + HttpClient::Accept(ZenContentType::kCompressedBinary)); + REQUIRE(FullBlobResult); + + uint64_t Range1Len = Range1End - Range1Start + 1; + uint64_t Range2Len = Range2End - Range2Start + 1; + + MemoryView ExpectedRange1 = FullBlobResult.ResponsePayload.GetView().Mid(Range1Start, Range1Len); + MemoryView ExpectedRange2 = FullBlobResult.ResponsePayload.GetView().Mid(Range2Start, Range2Len); + + MemoryView ActualRange1 = Result.ResponsePayload.GetView().Mid(Result.Ranges[0].OffsetInPayload, Range1Len); + MemoryView ActualRange2 = Result.ResponsePayload.GetView().Mid(Result.Ranges[1].OffsetInPayload, Range2Len); + + CHECK(ExpectedRange1.EqualBytes(ActualRange1)); + CHECK(ExpectedRange2.EqualBytes(ActualRange2)); + } + + // Single-range Post + { uint64_t RangeSizeSum = 0; const IoHash& RawHash = CompressedBlobsHashes.front(); @@ -259,19 +301,96 @@ TEST_CASE("buildstore.blobs") Offset += Range.second; } } - } - { - // Multi-range + { + // POST with wrong accept type + const IoHash& RawHash = CompressedBlobsHashes.front(); - ZenServerInstance Instance(TestEnv); + CbObjectWriter Writer; + Writer.BeginArray("ranges"sv); + Writer.BeginObject(); + Writer.AddInteger("offset"sv, uint64_t(0)); + Writer.AddInteger("length"sv, uint64_t(10)); + Writer.EndObject(); + Writer.EndArray(); - const uint16_t PortNumber = - Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); - CHECK(PortNumber != 0); + HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), + Writer.Save(), + HttpClient::Accept(ZenContentType::kBinary)); + CHECK_EQ(Result.StatusCode, HttpResponseCode::BadRequest); + } - HttpClient Client(Instance.GetBaseUri() + "/builds/"); + { + // POST with missing payload + const IoHash& RawHash = CompressedBlobsHashes.front(); + + HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), + HttpClient::Accept(ZenContentType::kCbPackage)); + CHECK_EQ(Result.StatusCode, HttpResponseCode::BadRequest); + } + + { + // POST with empty ranges array + const IoHash& RawHash = CompressedBlobsHashes.front(); + CbObjectWriter Writer; + Writer.BeginArray("ranges"sv); + Writer.EndArray(); + + HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), + Writer.Save(), + HttpClient::Accept(ZenContentType::kCbPackage)); + CHECK_EQ(Result.StatusCode, HttpResponseCode::BadRequest); + } + + { + // POST with range count exceeding maximum + const IoHash& RawHash = CompressedBlobsHashes.front(); + + CbObjectWriter Writer; + Writer.BeginArray("ranges"sv); + for (uint32_t I = 0; I < 257; I++) + { + Writer.BeginObject(); + Writer.AddInteger("offset"sv, uint64_t(0)); + Writer.AddInteger("length"sv, uint64_t(1)); + Writer.EndObject(); + } + Writer.EndArray(); + + HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), + Writer.Save(), + HttpClient::Accept(ZenContentType::kCbPackage)); + CHECK_EQ(Result.StatusCode, HttpResponseCode::BadRequest); + } + + { + // POST with out-of-bounds range returns length=0 + const IoHash& RawHash = CompressedBlobsHashes.front(); + uint64_t BlobSize = CompressedBlobsSizes.front(); + + CbObjectWriter Writer; + Writer.BeginArray("ranges"sv); + Writer.BeginObject(); + Writer.AddInteger("offset"sv, BlobSize + 100); + Writer.AddInteger("length"sv, uint64_t(50)); + Writer.EndObject(); + Writer.EndArray(); + + HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), + Writer.Save(), + HttpClient::Accept(ZenContentType::kCbPackage)); + REQUIRE(Result); + CbPackage ResponsePackage = ParsePackageMessage(Result.ResponsePayload); + CbObjectView ResponseObject = ResponsePackage.GetObject(); + CbArrayView RangeArray = ResponseObject["ranges"sv].AsArrayView(); + REQUIRE_EQ(RangeArray.Num(), uint64_t(1)); + CbObjectView Range = (*begin(RangeArray)).AsObjectView(); + CHECK_EQ(Range["offset"sv].AsUInt64(), BlobSize + 100); + CHECK_EQ(Range["length"sv].AsUInt64(), uint64_t(0)); + } + + // Multi-range { uint64_t RangeSizeSum = 0; diff --git a/src/zenserver-test/cache-tests.cpp b/src/zenserver-test/cache-tests.cpp index 986dc67e0..e54e7060d 100644 --- a/src/zenserver-test/cache-tests.cpp +++ b/src/zenserver-test/cache-tests.cpp @@ -172,143 +172,85 @@ TEST_CASE("zcache.cbpackage") return true; }; - SUBCASE("PUT/GET returns correct package") - { - std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - - ZenServerInstance Instance1(TestEnv); - Instance1.SetDataDir(TestDir); - const uint16_t PortNumber = Instance1.SpawnServerAndWaitUntilReady(); - const std::string BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); + std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir(); + std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir(); - HttpClient Http{BaseUri}; + ZenServerInstance RemoteInstance(TestEnv); + RemoteInstance.SetDataDir(RemoteDataDir); + const uint16_t RemotePortNumber = RemoteInstance.SpawnServerAndWaitUntilReady(); - const std::string_view Bucket = "mosdef"sv; - zen::IoHash Key; - zen::CbPackage ExpectedPackage = CreateTestPackage(Key); + ZenServerInstance LocalInstance(TestEnv); + LocalInstance.SetDataDir(LocalDataDir); + LocalInstance.SpawnServer(TestEnv.GetNewPortNumber(), + fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber)); + const uint16_t LocalPortNumber = LocalInstance.WaitUntilReady(); + CHECK_MESSAGE(LocalPortNumber != 0, LocalInstance.GetLogOutput()); - // PUT - { - zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); - HttpClient::Response Result = Http.Put(fmt::format("/{}/{}", Bucket, Key), Body); - CHECK(Result.StatusCode == HttpResponseCode::Created); - } + const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); + const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber); - // GET - { - HttpClient::Response Result = Http.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.StatusCode == HttpResponseCode::OK); + HttpClient LocalHttp{LocalBaseUri}; + HttpClient RemoteHttp{RemoteBaseUri}; - zen::CbPackage Package; - const bool Ok = Package.TryLoad(Result.ResponsePayload); - CHECK(Ok); - CHECK(IsEqual(Package, ExpectedPackage)); - } - } + const std::string_view Bucket = "mosdef"sv; - SUBCASE("PUT propagates upstream") + // Phase 1: PUT/GET returns correct package (via local) { - // Setup local and remote server - std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir(); - std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir(); - - ZenServerInstance RemoteInstance(TestEnv); - RemoteInstance.SetDataDir(RemoteDataDir); - const uint16_t RemotePortNumber = RemoteInstance.SpawnServerAndWaitUntilReady(); - - ZenServerInstance LocalInstance(TestEnv); - LocalInstance.SetDataDir(LocalDataDir); - LocalInstance.SpawnServer(TestEnv.GetNewPortNumber(), - fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber)); - const uint16_t LocalPortNumber = LocalInstance.WaitUntilReady(); - CHECK_MESSAGE(LocalPortNumber != 0, LocalInstance.GetLogOutput()); - - const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); - const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber); + zen::IoHash Key1; + zen::CbPackage ExpectedPackage = CreateTestPackage(Key1); - const std::string_view Bucket = "mosdef"sv; - zen::IoHash Key; - zen::CbPackage ExpectedPackage = CreateTestPackage(Key); + zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); + HttpClient::Response PutResult = LocalHttp.Put(fmt::format("/{}/{}", Bucket, Key1), Body); + CHECK(PutResult.StatusCode == HttpResponseCode::Created); - HttpClient LocalHttp{LocalBaseUri}; - HttpClient RemoteHttp{RemoteBaseUri}; + HttpClient::Response GetResult = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key1), {{"Accept", "application/x-ue-cbpkg"}}); + CHECK(GetResult.StatusCode == HttpResponseCode::OK); - // Store the cache record package in the local instance - { - zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); - HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}", Bucket, Key), Body); - - CHECK(Result.StatusCode == HttpResponseCode::Created); - } - - // The cache record can be retrieved as a package from the local instance - { - HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.StatusCode == HttpResponseCode::OK); - - zen::CbPackage Package; - const bool Ok = Package.TryLoad(Result.ResponsePayload); - CHECK(Ok); - CHECK(IsEqual(Package, ExpectedPackage)); - } - - // The cache record can be retrieved as a package from the remote instance - { - HttpClient::Response Result = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.StatusCode == HttpResponseCode::OK); - - zen::CbPackage Package; - const bool Ok = Package.TryLoad(Result.ResponsePayload); - CHECK(Ok); - CHECK(IsEqual(Package, ExpectedPackage)); - } + zen::CbPackage Package; + const bool Ok = Package.TryLoad(GetResult.ResponsePayload); + CHECK(Ok); + CHECK(IsEqual(Package, ExpectedPackage)); } - SUBCASE("GET finds upstream when missing in local") + // Phase 2: PUT propagates upstream { - // Setup local and remote server - std::filesystem::path LocalDataDir = TestEnv.CreateNewTestDir(); - std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir(); + zen::IoHash Key2; + zen::CbPackage ExpectedPackage = CreateTestPackage(Key2); - ZenServerInstance RemoteInstance(TestEnv); - RemoteInstance.SetDataDir(RemoteDataDir); - const uint16_t RemotePortNumber = RemoteInstance.SpawnServerAndWaitUntilReady(); + zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); + HttpClient::Response PutResult = LocalHttp.Put(fmt::format("/{}/{}", Bucket, Key2), Body); + CHECK(PutResult.StatusCode == HttpResponseCode::Created); - ZenServerInstance LocalInstance(TestEnv); - LocalInstance.SetDataDir(LocalDataDir); - LocalInstance.SpawnServer(TestEnv.GetNewPortNumber(), - fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber)); - const uint16_t LocalPortNumber = LocalInstance.WaitUntilReady(); - CHECK_MESSAGE(LocalPortNumber != 0, LocalInstance.GetLogOutput()); + HttpClient::Response LocalGetResult = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key2), {{"Accept", "application/x-ue-cbpkg"}}); + CHECK(LocalGetResult.StatusCode == HttpResponseCode::OK); - const auto LocalBaseUri = fmt::format("http://localhost:{}/z$", LocalPortNumber); - const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber); + zen::CbPackage LocalPackage; + CHECK(LocalPackage.TryLoad(LocalGetResult.ResponsePayload)); + CHECK(IsEqual(LocalPackage, ExpectedPackage)); - HttpClient LocalHttp{LocalBaseUri}; - HttpClient RemoteHttp{RemoteBaseUri}; + HttpClient::Response RemoteGetResult = RemoteHttp.Get(fmt::format("/{}/{}", Bucket, Key2), {{"Accept", "application/x-ue-cbpkg"}}); + CHECK(RemoteGetResult.StatusCode == HttpResponseCode::OK); - const std::string_view Bucket = "mosdef"sv; - zen::IoHash Key; - zen::CbPackage ExpectedPackage = CreateTestPackage(Key); + zen::CbPackage RemotePackage; + CHECK(RemotePackage.TryLoad(RemoteGetResult.ResponsePayload)); + CHECK(IsEqual(RemotePackage, ExpectedPackage)); + } - // Store the cache record package in upstream cache - { - zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); - HttpClient::Response Result = RemoteHttp.Put(fmt::format("/{}/{}", Bucket, Key), Body); + // Phase 3: GET finds upstream when missing in local + { + zen::IoHash Key3; + zen::CbPackage ExpectedPackage = CreateTestPackage(Key3); - CHECK(Result.StatusCode == HttpResponseCode::Created); - } + zen::IoBuffer Body = SerializeToBuffer(ExpectedPackage); + HttpClient::Response PutResult = RemoteHttp.Put(fmt::format("/{}/{}", Bucket, Key3), Body); + CHECK(PutResult.StatusCode == HttpResponseCode::Created); - // The cache record can be retrieved as a package from the local cache - { - HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); - CHECK(Result.StatusCode == HttpResponseCode::OK); + HttpClient::Response GetResult = LocalHttp.Get(fmt::format("/{}/{}", Bucket, Key3), {{"Accept", "application/x-ue-cbpkg"}}); + CHECK(GetResult.StatusCode == HttpResponseCode::OK); - zen::CbPackage Package; - const bool Ok = Package.TryLoad(Result.ResponsePayload); - CHECK(Ok); - CHECK(IsEqual(Package, ExpectedPackage)); - } + zen::CbPackage Package; + CHECK(Package.TryLoad(GetResult.ResponsePayload)); + CHECK(IsEqual(Package, ExpectedPackage)); } } @@ -348,25 +290,25 @@ TEST_CASE("zcache.policy") return Package; }; - SUBCASE("query - 'local' does not query upstream (binary)") - { - ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance UpstreamInst(TestEnv); - UpstreamCfg.Spawn(UpstreamInst); - const uint16_t UpstreamPort = UpstreamCfg.Port; + ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); + ZenServerInstance UpstreamInst(TestEnv); + UpstreamCfg.Spawn(UpstreamInst); + + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); + ZenServerInstance LocalInst(TestEnv); + LocalCfg.Spawn(LocalInst); - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamPort); - ZenServerInstance LocalInst(TestEnv); - LocalCfg.Spawn(LocalInst); + HttpClient LocalHttp{LocalCfg.BaseUri}; + HttpClient RemoteHttp{UpstreamCfg.BaseUri}; - const std::string_view Bucket = "legacy"sv; + // query - 'local' does not query upstream (binary) + // Uses size 1024 for unique key + { + const auto Bucket = "legacy"sv; zen::IoHash Key; IoBuffer BinaryValue = GenerateData(1024, Key); - HttpClient LocalHttp{LocalCfg.BaseUri}; - HttpClient RemoteHttp{UpstreamCfg.BaseUri}; - { HttpClient::Response Result = RemoteHttp.Put(fmt::format("/{}/{}", Bucket, Key), BinaryValue); CHECK(Result.StatusCode == HttpResponseCode::Created); @@ -385,26 +327,14 @@ TEST_CASE("zcache.policy") } } - SUBCASE("store - 'local' does not store upstream (binary)") + // store - 'local' does not store upstream (binary) + // Uses size 2048 for unique key { - ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance UpstreamInst(TestEnv); - UpstreamCfg.Spawn(UpstreamInst); - const uint16_t UpstreamPort = UpstreamCfg.Port; - - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamPort); - ZenServerInstance LocalInst(TestEnv); - LocalCfg.Spawn(LocalInst); - const auto Bucket = "legacy"sv; zen::IoHash Key; - IoBuffer BinaryValue = GenerateData(1024, Key); - - HttpClient LocalHttp{LocalCfg.BaseUri}; - HttpClient RemoteHttp{UpstreamCfg.BaseUri}; + IoBuffer BinaryValue = GenerateData(2048, Key); - // Store binary cache value locally { HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,StoreLocal", Bucket, Key), BinaryValue, @@ -423,25 +353,14 @@ TEST_CASE("zcache.policy") } } - SUBCASE("store - 'local/remote' stores local and upstream (binary)") + // store - 'local/remote' stores local and upstream (binary) + // Uses size 4096 for unique key { - ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance UpstreamInst(TestEnv); - UpstreamCfg.Spawn(UpstreamInst); - - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); - ZenServerInstance LocalInst(TestEnv); - LocalCfg.Spawn(LocalInst); - const auto Bucket = "legacy"sv; zen::IoHash Key; - IoBuffer BinaryValue = GenerateData(1024, Key); + IoBuffer BinaryValue = GenerateData(4096, Key); - HttpClient LocalHttp{LocalCfg.BaseUri}; - HttpClient RemoteHttp{UpstreamCfg.BaseUri}; - - // Store binary cache value locally and upstream { HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,Store", Bucket, Key), BinaryValue, @@ -460,27 +379,16 @@ TEST_CASE("zcache.policy") } } - SUBCASE("query - 'local' does not query upstream (cbpackage)") + // query - 'local' does not query upstream (cbpackage) + // Uses bucket "policy4" to isolate from other cbpackage scenarios (deterministic key) { - ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance UpstreamInst(TestEnv); - UpstreamCfg.Spawn(UpstreamInst); - - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); - ZenServerInstance LocalInst(TestEnv); - LocalCfg.Spawn(LocalInst); - - const auto Bucket = "legacy"sv; + const auto Bucket = "policy4"sv; zen::IoHash Key; zen::IoHash PayloadId; zen::CbPackage Package = GeneratePackage(Key, PayloadId); IoBuffer Buf = SerializeToBuffer(Package); - HttpClient LocalHttp{LocalCfg.BaseUri}; - HttpClient RemoteHttp{UpstreamCfg.BaseUri}; - - // Store package upstream { HttpClient::Response Result = RemoteHttp.Put(fmt::format("/{}/{}", Bucket, Key), Buf); CHECK(Result.StatusCode == HttpResponseCode::Created); @@ -499,27 +407,16 @@ TEST_CASE("zcache.policy") } } - SUBCASE("store - 'local' does not store upstream (cbpackage)") + // store - 'local' does not store upstream (cbpackage) + // Uses bucket "policy5" to isolate from other cbpackage scenarios (deterministic key) { - ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance UpstreamInst(TestEnv); - UpstreamCfg.Spawn(UpstreamInst); - - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); - ZenServerInstance LocalInst(TestEnv); - LocalCfg.Spawn(LocalInst); - - const auto Bucket = "legacy"sv; + const auto Bucket = "policy5"sv; zen::IoHash Key; zen::IoHash PayloadId; zen::CbPackage Package = GeneratePackage(Key, PayloadId); IoBuffer Buf = SerializeToBuffer(Package); - HttpClient LocalHttp{LocalCfg.BaseUri}; - HttpClient RemoteHttp{UpstreamCfg.BaseUri}; - - // Store package locally { HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,StoreLocal", Bucket, Key), Buf); CHECK(Result.StatusCode == HttpResponseCode::Created); @@ -536,27 +433,16 @@ TEST_CASE("zcache.policy") } } - SUBCASE("store - 'local/remote' stores local and upstream (cbpackage)") + // store - 'local/remote' stores local and upstream (cbpackage) + // Uses bucket "policy6" to isolate from other cbpackage scenarios (deterministic key) { - ZenConfig UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance UpstreamInst(TestEnv); - UpstreamCfg.Spawn(UpstreamInst); - - ZenConfig LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port); - ZenServerInstance LocalInst(TestEnv); - LocalCfg.Spawn(LocalInst); - - const auto Bucket = "legacy"sv; + const auto Bucket = "policy6"sv; zen::IoHash Key; zen::IoHash PayloadId; zen::CbPackage Package = GeneratePackage(Key, PayloadId); IoBuffer Buf = SerializeToBuffer(Package); - HttpClient LocalHttp{LocalCfg.BaseUri}; - HttpClient RemoteHttp{UpstreamCfg.BaseUri}; - - // Store package locally and upstream { HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}?Policy=Query,Store", Bucket, Key), Buf); CHECK(Result.StatusCode == HttpResponseCode::Created); @@ -573,78 +459,62 @@ TEST_CASE("zcache.policy") } } - SUBCASE("skip - 'data' returns cache record without attachments/empty payload") + // skip - 'data' returns cache record without attachments/empty payload + // Uses bucket "skiptest7" to isolate from other cbpackage scenarios { - ZenConfig Cfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance Instance(TestEnv); - Cfg.Spawn(Instance); - - const auto Bucket = "test"sv; + const auto Bucket = "skiptest7"sv; zen::IoHash Key; zen::IoHash PayloadId; zen::CbPackage Package = GeneratePackage(Key, PayloadId); IoBuffer Buf = SerializeToBuffer(Package); - HttpClient Http{Cfg.BaseUri}; - - // Store package { - HttpClient::Response Result = Http.Put(fmt::format("/{}/{}", Bucket, Key), Buf); + HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}", Bucket, Key), Buf); CHECK(Result.StatusCode == HttpResponseCode::Created); } - // Get package { HttpClient::Response Result = - Http.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); + LocalHttp.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/x-ue-cbpkg"}}); CHECK(Result); CbPackage ResponsePackage; CHECK(ResponsePackage.TryLoad(Result.ResponsePayload)); CHECK(ResponsePackage.GetAttachments().size() == 0); } - // Get record { HttpClient::Response Result = - Http.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/x-ue-cb"}}); + LocalHttp.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/x-ue-cb"}}); CHECK(Result); CbObject ResponseObject = zen::LoadCompactBinaryObject(Result.ResponsePayload); CHECK(ResponseObject); } - // Get payload { - HttpClient::Response Result = - Http.Get(fmt::format("/{}/{}/{}?Policy=Default,SkipData", Bucket, Key, PayloadId), {{"Accept", "application/x-ue-comp"}}); + HttpClient::Response Result = LocalHttp.Get(fmt::format("/{}/{}/{}?Policy=Default,SkipData", Bucket, Key, PayloadId), + {{"Accept", "application/x-ue-comp"}}); CHECK(Result); CHECK(Result.ResponsePayload.GetSize() == 0); } } - SUBCASE("skip - 'data' returns empty binary value") + // skip - 'data' returns empty binary value + // Uses size 8192 for unique key (avoids collision with size 1024/2048/4096 above) { - ZenConfig Cfg = ZenConfig::New(TestEnv.GetNewPortNumber()); - ZenServerInstance Instance(TestEnv); - Cfg.Spawn(Instance); - - const auto Bucket = "test"sv; + const auto Bucket = "skiptest8"sv; zen::IoHash Key; - IoBuffer BinaryValue = GenerateData(1024, Key); - - HttpClient Http{Cfg.BaseUri}; + IoBuffer BinaryValue = GenerateData(8192, Key); - // Store binary cache value { - HttpClient::Response Result = Http.Put(fmt::format("/{}/{}", Bucket, Key), BinaryValue); + HttpClient::Response Result = LocalHttp.Put(fmt::format("/{}/{}", Bucket, Key), BinaryValue); CHECK(Result.StatusCode == HttpResponseCode::Created); } - // Get package { HttpClient::Response Result = - Http.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/octet-stream"}}); + LocalHttp.Get(fmt::format("/{}/{}?Policy=Default,SkipData", Bucket, Key), {{"Accept", "application/octet-stream"}}); CHECK(Result); CHECK(Result.ResponsePayload.GetSize() == 0); } diff --git a/src/zenserver-test/compute-tests.cpp b/src/zenserver-test/compute-tests.cpp index 835d72713..ce2db366a 100644 --- a/src/zenserver-test/compute-tests.cpp +++ b/src/zenserver-test/compute-tests.cpp @@ -357,6 +357,41 @@ PollForLsnInCompleted(HttpClient& Client, const std::string& CompletedUrl, int L return false; } +static void +WaitForActionRunning(zen::compute::ComputeServiceSession& Session, uint64_t TimeoutMs = 10'000) +{ + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < TimeoutMs) + { + if (Session.GetActionCounts().Running > 0) + { + return; + } + Sleep(50); + } + FAIL("Timed out waiting for action to reach Running state"); +} + +static void +WaitForAnyActionRunningHttp(HttpClient& Client, uint64_t TimeoutMs = 10'000) +{ + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < TimeoutMs) + { + HttpClient::Response Resp = Client.Get("/jobs/running"sv); + if (Resp) + { + CbObject Obj = Resp.AsObject(); + if (Obj["running"sv].AsArrayView().Num() > 0) + { + return; + } + } + Sleep(50); + } + FAIL("Timed out waiting for any action to reach Running state"); +} + static std::string GetRot13Output(const CbPackage& ResultPackage) { @@ -906,10 +941,10 @@ TEST_CASE("function.queues.cancel_running") REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission"); // Wait for the worker process to start executing before cancelling - Sleep(1'000); + const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + WaitForAnyActionRunningHttp(Client); // Cancel the queue, which should interrupt the running Sleep job - const std::string QueueUrl = fmt::format("/queues/{}", QueueId); HttpClient::Response CancelResp = Client.Delete(QueueUrl); REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent, fmt::format("Queue cancellation failed: status={}, body={}", CancelResp.StatusCode, CancelResp.ToText())); @@ -961,10 +996,10 @@ TEST_CASE("function.queues.remote_cancel") REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Sleep job submission"); // Wait for the worker process to start executing before cancelling - Sleep(1'000); + const std::string QueueUrl = fmt::format("/queues/{}", QueueToken); + WaitForAnyActionRunningHttp(Client); // Cancel the queue via its OID token - const std::string QueueUrl = fmt::format("/queues/{}", QueueToken); HttpClient::Response CancelResp = Client.Delete(QueueUrl); REQUIRE_MESSAGE(CancelResp.StatusCode == HttpResponseCode::NoContent, fmt::format("Remote queue cancellation failed: status={}, body={}", CancelResp.StatusCode, CancelResp.ToText())); @@ -1207,7 +1242,7 @@ TEST_CASE("function.priority") // that exit with non-zero exit codes, including retry behaviour and // final failure reporting. -TEST_CASE("function.exit_code.failed_action") +TEST_CASE("function.exit_code") { ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); Instance.SetDataDir(TestEnv.CreateNewTestDir()); @@ -1219,232 +1254,154 @@ TEST_CASE("function.exit_code.failed_action") const IoHash WorkerId = RegisterWorker(Client, TestEnv); - // Create a queue with max_retries=0 so the action fails immediately - // without being rescheduled. - CbObjectWriter ConfigWriter; - ConfigWriter << "max_retries"sv << 0; - - CbObjectWriter BodyWriter; - BodyWriter << "config"sv << ConfigWriter.Save(); - - HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); - REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); + auto CreateQueue = [&](int MaxRetries) -> std::pair<int, std::string> { + CbObjectWriter ConfigWriter; + ConfigWriter << "max_retries"sv << MaxRetries; + CbObjectWriter BodyWriter; + BodyWriter << "config"sv << ConfigWriter.Save(); + HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); + REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); + const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); + return {QueueId, fmt::format("/queues/{}", QueueId)}; + }; - const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); - const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + // Scenario 1: failed_action - immediate failure with max_retries=0 + { + auto [QueueId, QueueUrl] = CreateQueue(0); - // Submit a Fail action with exit code 42 - const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); - HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(42)); - REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText())); + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(42)); + REQUIRE_MESSAGE(SubmitResp, + fmt::format("Fail job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText())); - const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); - REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Fail job submission"); + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Fail job submission"); - // Poll for the LSN to appear in the completed list - const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); - REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), - fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}", - Lsn, - QueueId, - Instance.GetLogOutput())); + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), + fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}", + Lsn, + QueueId, + Instance.GetLogOutput())); - // Verify queue status reflects the failure - HttpClient::Response StatusResp = Client.Get(QueueUrl); - REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); - CbObject QueueStatus = StatusResp.AsObject(); - CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); - CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); - // Verify action history records the failure - const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); - HttpClient::Response HistoryResp = Client.Get(HistoryUrl); - REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); + const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); + HttpClient::Response HistoryResp = Client.Get(HistoryUrl); + REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); - bool FoundInHistory = false; - for (auto& Item : HistoryResp.AsObject()["history"sv]) - { - if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + bool FoundInHistory = false; + for (auto& Item : HistoryResp.AsObject()["history"sv]) { - CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false); - FoundInHistory = true; - break; + if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + { + CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false); + FoundInHistory = true; + break; + } } - } - CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn)); - - // GET /jobs/{lsn} for a failed action should return OK but with an empty result package - const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn); - HttpClient::Response ResultResp = Client.Get(ResultUrl); - CHECK_EQ(ResultResp.StatusCode, HttpResponseCode::OK); -} - -TEST_CASE("function.exit_code.auto_retry") -{ - ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); - Instance.SetDataDir(TestEnv.CreateNewTestDir()); - const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); - REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn)); - const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); - HttpClient Client(ComputeBaseUri); - - const IoHash WorkerId = RegisterWorker(Client, TestEnv); - - // Create a queue with max_retries=2 so the action is retried twice before - // being reported as failed (3 total attempts: initial + 2 retries). - CbObjectWriter ConfigWriter; - ConfigWriter << "max_retries"sv << 2; - - CbObjectWriter BodyWriter; - BodyWriter << "config"sv << ConfigWriter.Save(); + const std::string ResultUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn); + HttpClient::Response ResultResp = Client.Get(ResultUrl); + CHECK_EQ(ResultResp.StatusCode, HttpResponseCode::OK); + } - HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); - REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); + // Scenario 2: auto_retry - retried twice before permanent failure + { + auto [QueueId, QueueUrl] = CreateQueue(2); - const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); - const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(1)); + REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode)); - // Submit a Fail action — the worker process will exit with code 1 on - // every attempt, eventually exhausting retries. - const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); - HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(1)); - REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode)); + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); - const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000), + fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}", + Lsn, + QueueId, + Instance.GetLogOutput())); - // Poll for the LSN to appear in the completed list — this only - // happens after all retries are exhausted. - const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); - REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000), - fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}", - Lsn, - QueueId, - Instance.GetLogOutput())); + const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); + HttpClient::Response HistoryResp = Client.Get(HistoryUrl); + REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); - // Verify the action history records the retry count - const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); - HttpClient::Response HistoryResp = Client.Get(HistoryUrl); - REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); - - for (auto& Item : HistoryResp.AsObject()["history"sv]) - { - if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + for (auto& Item : HistoryResp.AsObject()["history"sv]) { - CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false); - CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 2); - break; + if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + { + CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false); + CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 2); + break; + } } - } - // Queue should show 1 failed, 0 completed - HttpClient::Response StatusResp = Client.Get(QueueUrl); - REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); - CbObject QueueStatus = StatusResp.AsObject(); - CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); - CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); -} - -TEST_CASE("function.exit_code.reschedule_failed") -{ - ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); - Instance.SetDataDir(TestEnv.CreateNewTestDir()); - const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); - REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); - - const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); - HttpClient Client(ComputeBaseUri); - - const IoHash WorkerId = RegisterWorker(Client, TestEnv); - - // Create a queue with max_retries=1 so we have room for one manual reschedule - CbObjectWriter ConfigWriter; - ConfigWriter << "max_retries"sv << 1; - - CbObjectWriter BodyWriter; - BodyWriter << "config"sv << ConfigWriter.Save(); - - HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); - REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); - - const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); - const std::string QueueUrl = fmt::format("/queues/{}", QueueId); - - // Submit a Fail action — auto-retry will fire once, then it lands in results as Failed - const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); - HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(7)); - REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode)); - - const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); - - // Wait for the action to exhaust its auto-retry and land in completed - const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); - REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000), - fmt::format("LSN {} did not appear in queue completed list\nServer log:\n{}", Lsn, Instance.GetLogOutput())); - - // Try to manually reschedule — should fail because retry limit is reached - const std::string RescheduleUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn); - HttpClient::Response RescheduleResp = Client.Post(RescheduleUrl); - CHECK_EQ(RescheduleResp.StatusCode, HttpResponseCode::Conflict); -} - -TEST_CASE("function.exit_code.mixed_success_and_failure") -{ - ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); - Instance.SetDataDir(TestEnv.CreateNewTestDir()); - const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); - REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); + } - const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); - HttpClient Client(ComputeBaseUri); + // Scenario 3: reschedule_failed - manual reschedule rejected after retry limit + { + auto [QueueId, QueueUrl] = CreateQueue(1); - const IoHash WorkerId = RegisterWorker(Client, TestEnv); + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildFailActionPackage(7)); + REQUIRE_MESSAGE(SubmitResp, fmt::format("Fail job submission failed: status={}", SubmitResp.StatusCode)); - // Create a queue with max_retries=0 for fast failure - CbObjectWriter ConfigWriter; - ConfigWriter << "max_retries"sv << 0; + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); - CbObjectWriter BodyWriter; - BodyWriter << "config"sv << ConfigWriter.Save(); + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000), + fmt::format("LSN {} did not appear in queue completed list\nServer log:\n{}", Lsn, Instance.GetLogOutput())); - HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); - REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); + const std::string RescheduleUrl = fmt::format("/queues/{}/jobs/{}", QueueId, Lsn); + HttpClient::Response RescheduleResp = Client.Post(RescheduleUrl); + CHECK_EQ(RescheduleResp.StatusCode, HttpResponseCode::Conflict); + } - const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); - const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + // Scenario 4: mixed_success_and_failure - one success and one failure in the same queue + { + auto [QueueId, QueueUrl] = CreateQueue(0); - // Submit one Rot13 (success) and one Fail (failure) - const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); - HttpClient::Response SuccessResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello"sv)); - REQUIRE_MESSAGE(SuccessResp, "Rot13 job submission failed"); - const int LsnSuccess = SuccessResp.AsObject()["lsn"sv].AsInt32(); + HttpClient::Response SuccessResp = Client.Post(JobUrl, BuildRot13ActionPackage("Hello"sv)); + REQUIRE_MESSAGE(SuccessResp, "Rot13 job submission failed"); + const int LsnSuccess = SuccessResp.AsObject()["lsn"sv].AsInt32(); - HttpClient::Response FailResp = Client.Post(JobUrl, BuildFailActionPackage(1)); - REQUIRE_MESSAGE(FailResp, "Fail job submission failed"); - const int LsnFail = FailResp.AsObject()["lsn"sv].AsInt32(); + HttpClient::Response FailResp = Client.Post(JobUrl, BuildFailActionPackage(1)); + REQUIRE_MESSAGE(FailResp, "Fail job submission failed"); + const int LsnFail = FailResp.AsObject()["lsn"sv].AsInt32(); - // Wait for both to appear in the completed list - const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); - REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnSuccess), - fmt::format("Success LSN {} did not complete\nServer log:\n{}", LsnSuccess, Instance.GetLogOutput())); - REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnFail), - fmt::format("Fail LSN {} did not complete\nServer log:\n{}", LsnFail, Instance.GetLogOutput())); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnSuccess), + fmt::format("Success LSN {} did not complete\nServer log:\n{}", LsnSuccess, Instance.GetLogOutput())); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, LsnFail), + fmt::format("Fail LSN {} did not complete\nServer log:\n{}", LsnFail, Instance.GetLogOutput())); - // Verify queue counters - HttpClient::Response StatusResp = Client.Get(QueueUrl); - REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); - CbObject QueueStatus = StatusResp.AsObject(); - CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 1); - CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); - CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0); + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["active_count"sv].AsInt32(), 0); + } } -TEST_CASE("function.crash.abort") +TEST_CASE("function.crash") { ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); Instance.SetDataDir(TestEnv.CreateNewTestDir()); @@ -1456,174 +1413,125 @@ TEST_CASE("function.crash.abort") const IoHash WorkerId = RegisterWorker(Client, TestEnv); - // Create a queue with max_retries=0 so we don't wait through retries - CbObjectWriter ConfigWriter; - ConfigWriter << "max_retries"sv << 0; + auto CreateQueue = [&](int MaxRetries) -> std::pair<int, std::string> { + CbObjectWriter ConfigWriter; + ConfigWriter << "max_retries"sv << MaxRetries; + CbObjectWriter BodyWriter; + BodyWriter << "config"sv << ConfigWriter.Save(); + HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); + REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); + const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); + return {QueueId, fmt::format("/queues/{}", QueueId)}; + }; - CbObjectWriter BodyWriter; - BodyWriter << "config"sv << ConfigWriter.Save(); + // Scenario 1: abort - worker process calls std::abort(), no retries + { + auto [QueueId, QueueUrl] = CreateQueue(0); - HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); - REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv)); + REQUIRE_MESSAGE(SubmitResp, + fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText())); - const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); - const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission"); - // Submit a Crash action that calls std::abort() - const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); - HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv)); - REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText())); + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), + fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}", + Lsn, + QueueId, + Instance.GetLogOutput())); - const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); - REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission"); + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); - // Poll for the LSN to appear in the completed list - const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); - REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), - fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}", - Lsn, - QueueId, - Instance.GetLogOutput())); + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); - // Verify queue status reflects the failure - HttpClient::Response StatusResp = Client.Get(QueueUrl); - REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); - - CbObject QueueStatus = StatusResp.AsObject(); - CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); - CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); - - // Verify action history records the failure - const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); - HttpClient::Response HistoryResp = Client.Get(HistoryUrl); - REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); + const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); + HttpClient::Response HistoryResp = Client.Get(HistoryUrl); + REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); - bool FoundInHistory = false; - for (auto& Item : HistoryResp.AsObject()["history"sv]) - { - if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + bool FoundInHistory = false; + for (auto& Item : HistoryResp.AsObject()["history"sv]) { - CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false); - FoundInHistory = true; - break; + if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + { + CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false); + FoundInHistory = true; + break; + } } + CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn)); } - CHECK_MESSAGE(FoundInHistory, fmt::format("LSN {} not found in action history", Lsn)); -} - -TEST_CASE("function.crash.nullptr") -{ - ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); - Instance.SetDataDir(TestEnv.CreateNewTestDir()); - const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); - REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); - - const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); - HttpClient Client(ComputeBaseUri); - - const IoHash WorkerId = RegisterWorker(Client, TestEnv); - - // Create a queue with max_retries=0 - CbObjectWriter ConfigWriter; - ConfigWriter << "max_retries"sv << 0; - - CbObjectWriter BodyWriter; - BodyWriter << "config"sv << ConfigWriter.Save(); - - HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); - REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); - - const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); - const std::string QueueUrl = fmt::format("/queues/{}", QueueId); - - // Submit a Crash action that dereferences a null pointer - const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); - HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("nullptr"sv)); - REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText())); - - const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); - REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission"); - - // Poll for the LSN to appear in the completed list - const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); - REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), - fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}", - Lsn, - QueueId, - Instance.GetLogOutput())); - - // Verify queue status reflects the failure - HttpClient::Response StatusResp = Client.Get(QueueUrl); - REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); - CbObject QueueStatus = StatusResp.AsObject(); - CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); - CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); -} - -TEST_CASE("function.crash.auto_retry") -{ - ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kComputeServer); - Instance.SetDataDir(TestEnv.CreateNewTestDir()); - const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(); - REQUIRE_MESSAGE(Port != 0, Instance.GetLogOutput()); + // Scenario 2: nullptr - worker process dereferences null, no retries + { + auto [QueueId, QueueUrl] = CreateQueue(0); - const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port); - HttpClient Client(ComputeBaseUri); + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("nullptr"sv)); + REQUIRE_MESSAGE(SubmitResp, + fmt::format("Crash job submission failed: status={}, body={}", SubmitResp.StatusCode, SubmitResp.ToText())); - const IoHash WorkerId = RegisterWorker(Client, TestEnv); + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from Crash job submission"); - // Create a queue with max_retries=1 — the crash should be retried once - // before being reported as permanently failed. - CbObjectWriter ConfigWriter; - ConfigWriter << "max_retries"sv << 1; + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn), + fmt::format("LSN {} did not appear in queue {} completed list within timeout\nServer log:\n{}", + Lsn, + QueueId, + Instance.GetLogOutput())); - CbObjectWriter BodyWriter; - BodyWriter << "config"sv << ConfigWriter.Save(); + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); - HttpClient::Response CreateResp = Client.Post("/queues"sv, BodyWriter.Save()); - REQUIRE_MESSAGE(CreateResp, fmt::format("Queue creation failed: status={}", CreateResp.StatusCode)); + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); + } - const int QueueId = CreateResp.AsObject()["queue_id"sv].AsInt32(); - const std::string QueueUrl = fmt::format("/queues/{}", QueueId); + // Scenario 3: auto_retry - crash retried once before permanent failure + { + auto [QueueId, QueueUrl] = CreateQueue(1); - // Submit a Crash action — will crash on every attempt - const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); - HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv)); - REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}", SubmitResp.StatusCode)); + const std::string JobUrl = fmt::format("/queues/{}/jobs/{}", QueueId, WorkerId.ToHexString()); + HttpClient::Response SubmitResp = Client.Post(JobUrl, BuildCrashActionPackage("abort"sv)); + REQUIRE_MESSAGE(SubmitResp, fmt::format("Crash job submission failed: status={}", SubmitResp.StatusCode)); - const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); + const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); - // Poll for the LSN to appear in the completed list after retries exhaust - const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); - REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000), - fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}", - Lsn, - QueueId, - Instance.GetLogOutput())); + const std::string CompletedUrl = fmt::format("/queues/{}/completed", QueueId); + REQUIRE_MESSAGE(PollForLsnInCompleted(Client, CompletedUrl, Lsn, 60'000), + fmt::format("LSN {} did not appear in queue {} completed list after retries\nServer log:\n{}", + Lsn, + QueueId, + Instance.GetLogOutput())); - // Verify the action history records the retry count - const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); - HttpClient::Response HistoryResp = Client.Get(HistoryUrl); - REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); + const std::string HistoryUrl = fmt::format("/queues/{}/history", QueueId); + HttpClient::Response HistoryResp = Client.Get(HistoryUrl); + REQUIRE_MESSAGE(HistoryResp, "Failed to query queue action history"); - for (auto& Item : HistoryResp.AsObject()["history"sv]) - { - if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + for (auto& Item : HistoryResp.AsObject()["history"sv]) { - CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false); - CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 1); - break; + if (Item.AsObjectView()["lsn"sv].AsInt32() == Lsn) + { + CHECK_EQ(Item.AsObjectView()["succeeded"sv].AsBool(), false); + CHECK_EQ(Item.AsObjectView()["retry_count"sv].AsInt32(), 1); + break; + } } - } - // Queue should show 1 failed, 0 completed - HttpClient::Response StatusResp = Client.Get(QueueUrl); - REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); + HttpClient::Response StatusResp = Client.Get(QueueUrl); + REQUIRE_MESSAGE(StatusResp, "Failed to get queue status"); - CbObject QueueStatus = StatusResp.AsObject(); - CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); - CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); + CbObject QueueStatus = StatusResp.AsObject(); + CHECK_EQ(QueueStatus["failed_count"sv].AsInt32(), 1); + CHECK_EQ(QueueStatus["completed_count"sv].AsInt32(), 0); + } } ////////////////////////////////////////////////////////////////////////// @@ -1662,7 +1570,6 @@ TEST_CASE("function.remote.worker_sync_on_discovery") // Trigger immediate orchestrator re-query and wait for runner setup Session.NotifyOrchestratorChanged(); - Sleep(2'000); // Submit Rot13 action via session CbObject ActionObj = BuildRot13ActionForSession("Hello World"sv, Resolver); @@ -1721,7 +1628,6 @@ TEST_CASE("function.remote.late_runner_discovery") // Wait for W1 discovery Session.NotifyOrchestratorChanged(); - Sleep(2'000); // Baseline: submit Rot13 action and verify it completes on W1 { @@ -1763,23 +1669,33 @@ TEST_CASE("function.remote.late_runner_discovery") // Wait for W2 discovery Session.NotifyOrchestratorChanged(); - Sleep(2'000); - // Verify W2 received the worker by querying its /compute/workers endpoint directly + // Poll W2 until the worker has been synced (SyncWorkersToRunner is async) { - const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port2); - HttpClient Client(ComputeBaseUri); - HttpClient::Response ListResp = Client.Get("/workers"sv); - REQUIRE_MESSAGE(ListResp, "Failed to list workers on W2"); + const std::string ComputeBaseUri = fmt::format("http://localhost:{}/compute", Port2); + HttpClient Client(ComputeBaseUri); + bool WorkerFound = false; + Stopwatch Timer; - bool WorkerFound = false; - for (auto& Item : ListResp.AsObject()["workers"sv]) + while (Timer.GetElapsedTimeMs() < 10'000) { - if (Item.AsHash() == WorkerPackage.GetObjectHash()) + HttpClient::Response ListResp = Client.Get("/workers"sv); + if (ListResp) + { + for (auto& Item : ListResp.AsObject()["workers"sv]) + { + if (Item.AsHash() == WorkerPackage.GetObjectHash()) + { + WorkerFound = true; + break; + } + } + } + if (WorkerFound) { - WorkerFound = true; break; } + Sleep(50); } REQUIRE_MESSAGE(WorkerFound, @@ -1844,7 +1760,6 @@ TEST_CASE("function.remote.queue_association") // Wait for scheduler to discover the runner Session.NotifyOrchestratorChanged(); - Sleep(2'000); // Create a local queue and submit action to it auto QueueResult = Session.CreateQueue(); @@ -1922,7 +1837,6 @@ TEST_CASE("function.remote.queue_cancel_propagation") // Wait for scheduler to discover the runner Session.NotifyOrchestratorChanged(); - Sleep(2'000); // Create a local queue and submit a long-running Sleep action auto QueueResult = Session.CreateQueue(); @@ -1935,7 +1849,7 @@ TEST_CASE("function.remote.queue_cancel_propagation") REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed"); // Wait for the action to start running on the remote - Sleep(2'000); + WaitForActionRunning(Session); // Cancel the local queue — this should propagate to the remote Session.CancelQueue(QueueId); @@ -2008,7 +1922,7 @@ TEST_CASE("function.abandon_running_http") REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN"); // Wait for the process to start running - Sleep(1'000); + WaitForAnyActionRunningHttp(Client); // Verify the ready endpoint returns OK before abandon { @@ -2139,7 +2053,6 @@ TEST_CASE("function.session.abandon_running") // Wait for scheduler to discover the runner Session.NotifyOrchestratorChanged(); - Sleep(2'000); // Create a queue and submit a long-running Sleep action auto QueueResult = Session.CreateQueue(); @@ -2152,7 +2065,7 @@ TEST_CASE("function.session.abandon_running") REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed"); // Wait for the action to start running on the remote - Sleep(2'000); + WaitForActionRunning(Session); // Transition to Abandoned — should abandon the running action bool Transitioned = Session.Abandon(); @@ -2210,7 +2123,6 @@ TEST_CASE("function.remote.abandon_propagation") // Wait for scheduler to discover the runner Session.NotifyOrchestratorChanged(); - Sleep(2'000); // Create a local queue and submit a long-running Sleep action auto QueueResult = Session.CreateQueue(); @@ -2223,7 +2135,7 @@ TEST_CASE("function.remote.abandon_propagation") REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed"); // Wait for the action to start running on the remote - Sleep(2'000); + WaitForActionRunning(Session); // Transition to Abandoned — should abandon the running action and propagate bool Transitioned = Session.Abandon(); @@ -2283,7 +2195,6 @@ TEST_CASE("function.remote.shutdown_cancels_queues") Session.RegisterWorker(WorkerPackage); Session.NotifyOrchestratorChanged(); - Sleep(2'000); // Create a queue and submit a long-running action so the remote queue is established auto QueueResult = Session.CreateQueue(); @@ -2296,7 +2207,7 @@ TEST_CASE("function.remote.shutdown_cancels_queues") REQUIRE_MESSAGE(EnqueueRes, "Sleep action enqueue to queue failed"); // Wait for the action to start running on the remote - Sleep(2'000); + WaitForActionRunning(Session); // Verify the remote has a non-implicit queue before shutdown HttpClient RemoteClient(Instance.GetBaseUri() + "/compute"); @@ -2358,7 +2269,6 @@ TEST_CASE("function.remote.shutdown_rejects_new_work") // Wait for runner discovery Session.NotifyOrchestratorChanged(); - Sleep(2'000); // Baseline: submit an action and verify it completes { @@ -2415,7 +2325,7 @@ TEST_CASE("function.session.retract_pending") REQUIRE_MESSAGE(Enqueued, "Failed to enqueue action"); // Let the scheduler process the pending action - Sleep(500); + Sleep(50); // Retract the pending action auto Result = Session.RetractAction(Enqueued.Lsn); @@ -2424,7 +2334,7 @@ TEST_CASE("function.session.retract_pending") // The action should be re-enqueued as pending (still no runners, so stays pending). // Let the scheduler process the retracted action back to pending. - Sleep(500); + Sleep(50); // Queue should still show 1 active (the action was rescheduled, not completed) auto Status = Session.GetQueueStatus(QueueResult.QueueId); @@ -2509,8 +2419,8 @@ TEST_CASE("function.retract_http") const int Lsn = SubmitResp.AsObject()["lsn"sv].AsInt32(); REQUIRE_MESSAGE(Lsn != 0, "Expected non-zero LSN from job submission"); - // Wait for the scheduler to process the pending action into m_PendingActions - Sleep(1'000); + // Wait for the blocker action to start running (occupying the single slot) + WaitForAnyActionRunningHttp(Client); // Retract the pending action via POST /jobs/{lsn}/retract const std::string RetractUrl = fmt::format("/jobs/{}/retract", Lsn); @@ -2529,7 +2439,7 @@ TEST_CASE("function.retract_http") } // A second retract should also succeed (action is back to pending) - Sleep(500); + Sleep(50); HttpClient::Response RetractResp2 = Client.Post(RetractUrl); CHECK_MESSAGE(RetractResp2.StatusCode == HttpResponseCode::OK, fmt::format("Second retract failed: status={}, body={}", RetractResp2.StatusCode, RetractResp2.ToText())); diff --git a/src/zenserver-test/hub-tests.cpp b/src/zenserver-test/hub-tests.cpp index 487e22b4b..35a840e5d 100644 --- a/src/zenserver-test/hub-tests.cpp +++ b/src/zenserver-test/hub-tests.cpp @@ -329,17 +329,36 @@ TEST_CASE("hub.lifecycle.children") CHECK_EQ(Result.AsText(), "GhijklmNop"sv); } - Result = Client.Post("modules/abc/deprovision"); + // Deprovision all modules at once + Result = Client.Post("deprovision"); REQUIRE(Result); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted); + { + CbObject Body = Result.AsObject(); + CbArrayView AcceptedArr = Body["Accepted"].AsArrayView(); + CHECK_EQ(AcceptedArr.Num(), 2u); + bool FoundAbc = false; + bool FoundDef = false; + for (CbFieldView F : AcceptedArr) + { + if (F.AsString() == "abc"sv) + { + FoundAbc = true; + } + else if (F.AsString() == "def"sv) + { + FoundDef = true; + } + } + CHECK(FoundAbc); + CHECK(FoundDef); + } REQUIRE(WaitForModuleGone(Client, "abc")); + REQUIRE(WaitForModuleGone(Client, "def")); { HttpClient ModClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout); CHECK(WaitForPortUnreachable(ModClient)); } - - Result = Client.Post("modules/def/deprovision"); - REQUIRE(Result); - REQUIRE(WaitForModuleGone(Client, "def")); { HttpClient ModClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout); CHECK(WaitForPortUnreachable(ModClient)); @@ -349,6 +368,10 @@ TEST_CASE("hub.lifecycle.children") Result = Client.Get("status"); REQUIRE(Result); CHECK_EQ(Result.AsObject()["modules"].AsArrayView().Num(), 0u); + + // Deprovision-all with no modules + Result = Client.Post("deprovision"); + CHECK(Result); } static bool diff --git a/src/zenserver-test/logging-tests.cpp b/src/zenserver-test/logging-tests.cpp index 2e530ff92..549d2b322 100644 --- a/src/zenserver-test/logging-tests.cpp +++ b/src/zenserver-test/logging-tests.cpp @@ -146,28 +146,6 @@ TEST_CASE("logging.file.json") CHECK_MESSAGE(LogContains(FileLog, "\"message\""), FileLog); CHECK_MESSAGE(LogContains(FileLog, "\"source\": \"zenserver\""), FileLog); CHECK_MESSAGE(LogContains(FileLog, "server session id"), FileLog); -} - -// --log-id <id> is automatically set to the server instance name in test mode. -// The JSON formatter emits this value as the "id" field, so every entry in a -// .json log file must carry a non-empty "id". -TEST_CASE("logging.log_id") -{ - const std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - const std::filesystem::path LogFile = TestDir / "test.json"; - - ZenServerInstance Instance(TestEnv); - Instance.SetDataDir(TestDir); - - const std::string LogArg = fmt::format("--abslog {}", LogFile.string()); - const uint16_t Port = Instance.SpawnServerAndWaitUntilReady(LogArg); - CHECK_MESSAGE(Port != 0, Instance.GetLogOutput()); - - Instance.Shutdown(); - - CHECK_MESSAGE(std::filesystem::exists(LogFile), "JSON log file was not created"); - const std::string FileLog = ReadFileToString(LogFile); - // The JSON formatter writes the log-id as: "id": "<value>", CHECK_MESSAGE(LogContains(FileLog, "\"id\": \""), FileLog); } diff --git a/src/zenserver-test/objectstore-tests.cpp b/src/zenserver-test/objectstore-tests.cpp index ff2314089..99c92e15f 100644 --- a/src/zenserver-test/objectstore-tests.cpp +++ b/src/zenserver-test/objectstore-tests.cpp @@ -19,18 +19,22 @@ using namespace std::literals; TEST_SUITE_BEGIN("server.objectstore"); -TEST_CASE("objectstore.blobs") +TEST_CASE("objectstore") { - std::string_view Bucket = "bkt"sv; + ZenServerInstance Instance(TestEnv); + + const uint16_t Port = Instance.SpawnServerAndWaitUntilReady("--objectstore-enabled"); + CHECK(Port != 0); - std::vector<IoHash> CompressedBlobsHashes; - std::vector<uint64_t> BlobsSizes; - std::vector<uint64_t> CompressedBlobsSizes; + // --- objectstore.blobs --- { - ZenServerInstance Instance(TestEnv); + INFO("objectstore.blobs"); - const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady(fmt::format("--objectstore-enabled")); - CHECK(PortNumber != 0); + std::string_view Bucket = "bkt"sv; + + std::vector<IoHash> CompressedBlobsHashes; + std::vector<uint64_t> BlobsSizes; + std::vector<uint64_t> CompressedBlobsSizes; HttpClient Client(Instance.GetBaseUri() + "/obj/"); @@ -68,97 +72,192 @@ TEST_CASE("objectstore.blobs") CHECK_EQ(RawSize, BlobsSizes[I]); } } -} -TEST_CASE("objectstore.s3client") -{ - ZenServerInstance Instance(TestEnv); - const uint16_t Port = Instance.SpawnServerAndWaitUntilReady("--objectstore-enabled"); - CHECK_MESSAGE(Port != 0, Instance.GetLogOutput()); - - // S3Client in path-style builds paths as /{bucket}/{key}. - // The objectstore routes objects at bucket/{bucket}/{key} relative to its base. - // Point the S3Client endpoint at {server}/obj/bucket so the paths line up. - S3ClientOptions Opts; - Opts.BucketName = "s3test"; - Opts.Region = "us-east-1"; - Opts.Endpoint = fmt::format("http://localhost:{}/obj/bucket", Port); - Opts.PathStyle = true; - Opts.Credentials.AccessKeyId = "testkey"; - Opts.Credentials.SecretAccessKey = "testsecret"; - - S3Client Client(Opts); - - // -- PUT + GET roundtrip -- - std::string_view TestData = "hello from s3client via objectstore"sv; - IoBuffer Content = IoBufferBuilder::MakeFromMemory(MakeMemoryView(TestData)); - S3Result PutRes = Client.PutObject("test/hello.txt", std::move(Content)); - REQUIRE_MESSAGE(PutRes.IsSuccess(), PutRes.Error); - - S3GetObjectResult GetRes = Client.GetObject("test/hello.txt"); - REQUIRE_MESSAGE(GetRes.IsSuccess(), GetRes.Error); - CHECK(GetRes.AsText() == TestData); - - // -- PUT overwrites -- - IoBuffer Original = IoBufferBuilder::MakeFromMemory(MakeMemoryView("original"sv)); - IoBuffer Overwrite = IoBufferBuilder::MakeFromMemory(MakeMemoryView("overwritten"sv)); - REQUIRE(Client.PutObject("overwrite/file.txt", std::move(Original)).IsSuccess()); - REQUIRE(Client.PutObject("overwrite/file.txt", std::move(Overwrite)).IsSuccess()); - - S3GetObjectResult OverwriteGet = Client.GetObject("overwrite/file.txt"); - REQUIRE(OverwriteGet.IsSuccess()); - CHECK(OverwriteGet.AsText() == "overwritten"sv); - - // -- GET not found -- - S3GetObjectResult NotFoundGet = Client.GetObject("nonexistent/file.dat"); - CHECK_FALSE(NotFoundGet.IsSuccess()); - - // -- HEAD found -- - std::string_view HeadData = "head test data"sv; - IoBuffer HeadContent = IoBufferBuilder::MakeFromMemory(MakeMemoryView(HeadData)); - REQUIRE(Client.PutObject("head/meta.txt", std::move(HeadContent)).IsSuccess()); - - S3HeadObjectResult HeadRes = Client.HeadObject("head/meta.txt"); - REQUIRE_MESSAGE(HeadRes.IsSuccess(), HeadRes.Error); - CHECK(HeadRes.Status == HeadObjectResult::Found); - CHECK(HeadRes.Info.Size == HeadData.size()); - - // -- HEAD not found -- - S3HeadObjectResult HeadNotFound = Client.HeadObject("nonexistent/file.dat"); - CHECK(HeadNotFound.IsSuccess()); - CHECK(HeadNotFound.Status == HeadObjectResult::NotFound); - - // -- LIST objects -- - for (int i = 0; i < 3; ++i) + // --- objectstore.s3client --- { - std::string Key = fmt::format("listing/item-{}.txt", i); - std::string Payload = fmt::format("content-{}", i); - IoBuffer Buf = IoBufferBuilder::MakeFromMemory(MakeMemoryView(Payload)); - REQUIRE(Client.PutObject(Key, std::move(Buf)).IsSuccess()); - } + INFO("objectstore.s3client"); + + S3ClientOptions Opts; + Opts.BucketName = "s3test"; + Opts.Region = "us-east-1"; + Opts.Endpoint = fmt::format("http://localhost:{}/obj/bucket", Port); + Opts.PathStyle = true; + Opts.Credentials.AccessKeyId = "testkey"; + Opts.Credentials.SecretAccessKey = "testsecret"; + + S3Client Client(Opts); + + // -- PUT + GET roundtrip -- + std::string_view TestData = "hello from s3client via objectstore"sv; + IoBuffer Content = IoBufferBuilder::MakeFromMemory(MakeMemoryView(TestData)); + S3Result PutRes = Client.PutObject("test/hello.txt", std::move(Content)); + REQUIRE_MESSAGE(PutRes.IsSuccess(), PutRes.Error); + + S3GetObjectResult GetRes = Client.GetObject("test/hello.txt"); + REQUIRE_MESSAGE(GetRes.IsSuccess(), GetRes.Error); + CHECK(GetRes.AsText() == TestData); + + // -- PUT overwrites -- + IoBuffer Original = IoBufferBuilder::MakeFromMemory(MakeMemoryView("original"sv)); + IoBuffer Overwrite = IoBufferBuilder::MakeFromMemory(MakeMemoryView("overwritten"sv)); + REQUIRE(Client.PutObject("overwrite/file.txt", std::move(Original)).IsSuccess()); + REQUIRE(Client.PutObject("overwrite/file.txt", std::move(Overwrite)).IsSuccess()); + + S3GetObjectResult OverwriteGet = Client.GetObject("overwrite/file.txt"); + REQUIRE(OverwriteGet.IsSuccess()); + CHECK(OverwriteGet.AsText() == "overwritten"sv); + + // -- GET not found -- + S3GetObjectResult NotFoundGet = Client.GetObject("nonexistent/file.dat"); + CHECK_FALSE(NotFoundGet.IsSuccess()); + + // -- HEAD found -- + std::string_view HeadData = "head test data"sv; + IoBuffer HeadContent = IoBufferBuilder::MakeFromMemory(MakeMemoryView(HeadData)); + REQUIRE(Client.PutObject("head/meta.txt", std::move(HeadContent)).IsSuccess()); + + S3HeadObjectResult HeadRes = Client.HeadObject("head/meta.txt"); + REQUIRE_MESSAGE(HeadRes.IsSuccess(), HeadRes.Error); + CHECK(HeadRes.Status == HeadObjectResult::Found); + CHECK(HeadRes.Info.Size == HeadData.size()); + + // -- HEAD not found -- + S3HeadObjectResult HeadNotFound = Client.HeadObject("nonexistent/file.dat"); + CHECK(HeadNotFound.IsSuccess()); + CHECK(HeadNotFound.Status == HeadObjectResult::NotFound); + + // -- LIST objects -- + for (int i = 0; i < 3; ++i) + { + std::string Key = fmt::format("listing/item-{}.txt", i); + std::string Payload = fmt::format("content-{}", i); + IoBuffer Buf = IoBufferBuilder::MakeFromMemory(MakeMemoryView(Payload)); + REQUIRE(Client.PutObject(Key, std::move(Buf)).IsSuccess()); + } - S3ListObjectsResult ListRes = Client.ListObjects("listing/"); - REQUIRE_MESSAGE(ListRes.IsSuccess(), ListRes.Error); - REQUIRE(ListRes.Objects.size() == 3); + S3ListObjectsResult ListRes = Client.ListObjects("listing/"); + REQUIRE_MESSAGE(ListRes.IsSuccess(), ListRes.Error); + REQUIRE(ListRes.Objects.size() == 3); + + std::vector<std::string> Keys; + for (const S3ObjectInfo& Obj : ListRes.Objects) + { + Keys.push_back(Obj.Key); + CHECK(Obj.Size > 0); + } + std::sort(Keys.begin(), Keys.end()); + CHECK(Keys[0] == "listing/item-0.txt"); + CHECK(Keys[1] == "listing/item-1.txt"); + CHECK(Keys[2] == "listing/item-2.txt"); + + // -- LIST empty prefix -- + S3ListObjectsResult EmptyList = Client.ListObjects("no-such-prefix/"); + REQUIRE(EmptyList.IsSuccess()); + CHECK(EmptyList.Objects.empty()); + } - std::vector<std::string> Keys; - for (const S3ObjectInfo& Obj : ListRes.Objects) + // --- objectstore.range-requests --- { - Keys.push_back(Obj.Key); - CHECK(Obj.Size > 0); + INFO("objectstore.range-requests"); + + HttpClient Client(Instance.GetBaseUri() + "/obj/"); + + IoBuffer Blob = CreateRandomBlob(1024); + MemoryView BlobView = Blob.GetView(); + std::string ObjectPath = "bucket/bkt/range-test/data.bin"; + + HttpClient::Response PutResult = Client.Put(ObjectPath, IoBuffer(Blob)); + REQUIRE(PutResult); + + // Full GET without Range header + { + HttpClient::Response Result = Client.Get(ObjectPath); + CHECK(Result.StatusCode == HttpResponseCode::OK); + CHECK_EQ(Result.ResponsePayload.GetSize(), 1024u); + CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView)); + } + + // Single range: bytes 100-199 + { + HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=100-199"}}); + CHECK(Result.StatusCode == HttpResponseCode::PartialContent); + CHECK_EQ(Result.ResponsePayload.GetSize(), 100u); + CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView.Mid(100, 100))); + } + + // Range starting at zero: bytes 0-49 + { + HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=0-49"}}); + CHECK(Result.StatusCode == HttpResponseCode::PartialContent); + CHECK_EQ(Result.ResponsePayload.GetSize(), 50u); + CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView.Mid(0, 50))); + } + + // Range at end of file: bytes 1000-1023 + { + HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=1000-1023"}}); + CHECK(Result.StatusCode == HttpResponseCode::PartialContent); + CHECK_EQ(Result.ResponsePayload.GetSize(), 24u); + CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView.Mid(1000, 24))); + } + + // Multiple ranges: bytes 0-49 and 100-149 + { + HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=0-49,100-149"}}); + CHECK(Result.StatusCode == HttpResponseCode::PartialContent); + + std::string_view Body(reinterpret_cast<const char*>(Result.ResponsePayload.GetData()), Result.ResponsePayload.GetSize()); + + // Verify multipart structure contains both range payloads + CHECK(Body.find("Content-Range: bytes 0-49/1024") != std::string_view::npos); + CHECK(Body.find("Content-Range: bytes 100-149/1024") != std::string_view::npos); + + // Extract and verify actual data for first range + auto FindPartData = [&](std::string_view ContentRange) -> std::string_view { + size_t Pos = Body.find(ContentRange); + if (Pos == std::string_view::npos) + { + return {}; + } + // Skip past the Content-Range line and the blank line separator + Pos = Body.find("\r\n\r\n", Pos); + if (Pos == std::string_view::npos) + { + return {}; + } + Pos += 4; + size_t End = Body.find("\r\n--", Pos); + if (End == std::string_view::npos) + { + return {}; + } + return Body.substr(Pos, End - Pos); + }; + + std::string_view Part1 = FindPartData("Content-Range: bytes 0-49/1024"); + CHECK_EQ(Part1.size(), 50u); + CHECK(MemoryView(Part1.data(), Part1.size()).EqualBytes(BlobView.Mid(0, 50))); + + std::string_view Part2 = FindPartData("Content-Range: bytes 100-149/1024"); + CHECK_EQ(Part2.size(), 50u); + CHECK(MemoryView(Part2.data(), Part2.size()).EqualBytes(BlobView.Mid(100, 50))); + } + + // Out-of-bounds single range + { + HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=2000-2099"}}); + CHECK(Result.StatusCode == HttpResponseCode::RangeNotSatisfiable); + } + + // Out-of-bounds multi-range + { + HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=0-49,2000-2099"}}); + CHECK(Result.StatusCode == HttpResponseCode::RangeNotSatisfiable); + } } - std::sort(Keys.begin(), Keys.end()); - CHECK(Keys[0] == "listing/item-0.txt"); - CHECK(Keys[1] == "listing/item-1.txt"); - CHECK(Keys[2] == "listing/item-2.txt"); - - // -- LIST empty prefix -- - S3ListObjectsResult EmptyList = Client.ListObjects("no-such-prefix/"); - REQUIRE(EmptyList.IsSuccess()); - CHECK(EmptyList.Objects.empty()); } -TEST_CASE("objectstore.range-requests") +TEST_CASE("objectstore.range-requests-download") { ZenServerInstance Instance(TestEnv); const uint16_t Port = Instance.SpawnServerAndWaitUntilReady("--objectstore-enabled"); @@ -168,55 +267,42 @@ TEST_CASE("objectstore.range-requests") IoBuffer Blob = CreateRandomBlob(1024); MemoryView BlobView = Blob.GetView(); - std::string ObjectPath = "bucket/bkt/range-test/data.bin"; + std::string ObjectPath = "bucket/bkt/range-download-test/data.bin"; HttpClient::Response PutResult = Client.Put(ObjectPath, IoBuffer(Blob)); REQUIRE(PutResult); - // Full GET without Range header - { - HttpClient::Response Result = Client.Get(ObjectPath); - CHECK(Result.StatusCode == HttpResponseCode::OK); - CHECK_EQ(Result.ResponsePayload.GetSize(), 1024u); - CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView)); - } - - // Single range: bytes 100-199 - { - HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=100-199"}}); - CHECK(Result.StatusCode == HttpResponseCode::PartialContent); - CHECK_EQ(Result.ResponsePayload.GetSize(), 100u); - CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView.Mid(100, 100))); - } + ScopedTemporaryDirectory DownloadDir; - // Range starting at zero: bytes 0-49 + // Single range via Download: verify Ranges is populated and GetRanges maps correctly { - HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=0-49"}}); + HttpClient::Response Result = Client.Download(ObjectPath, DownloadDir.Path(), {{"Range", "bytes=100-199"}}); CHECK(Result.StatusCode == HttpResponseCode::PartialContent); - CHECK_EQ(Result.ResponsePayload.GetSize(), 50u); - CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView.Mid(0, 50))); + REQUIRE_EQ(Result.Ranges.size(), 1u); + CHECK_EQ(Result.Ranges[0].RangeOffset, 100u); + CHECK_EQ(Result.Ranges[0].RangeLength, 100u); + + std::vector<std::pair<uint64_t, uint64_t>> RequestedRanges = {{100, 100}}; + std::vector<std::pair<uint64_t, uint64_t>> PayloadRanges = Result.GetRanges(RequestedRanges); + REQUIRE_EQ(PayloadRanges.size(), 1u); + CHECK(Result.ResponsePayload.GetView().Mid(PayloadRanges[0].first, PayloadRanges[0].second).EqualBytes(BlobView.Mid(100, 100))); } - // Range at end of file: bytes 1000-1023 + // Multi-range via Download: verify Ranges is populated for both parts and GetRanges maps correctly { - HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=1000-1023"}}); + HttpClient::Response Result = Client.Download(ObjectPath, DownloadDir.Path(), {{"Range", "bytes=0-49,100-149"}}); CHECK(Result.StatusCode == HttpResponseCode::PartialContent); - CHECK_EQ(Result.ResponsePayload.GetSize(), 24u); - CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView.Mid(1000, 24))); - } - - // Multiple ranges: not supported, falls back to 200 with full body per RFC 7233 - { - HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=0-49,100-149"}}); - CHECK(Result.StatusCode == HttpResponseCode::OK); - CHECK_EQ(Result.ResponsePayload.GetSize(), 1024u); - CHECK(Result.ResponsePayload.GetView().EqualBytes(BlobView)); - } - - // Out-of-bounds range: should return 400 - { - HttpClient::Response Result = Client.Get(ObjectPath, {{"Range", "bytes=2000-2099"}}); - CHECK(Result.StatusCode == HttpResponseCode::BadRequest); + REQUIRE_EQ(Result.Ranges.size(), 2u); + CHECK_EQ(Result.Ranges[0].RangeOffset, 0u); + CHECK_EQ(Result.Ranges[0].RangeLength, 50u); + CHECK_EQ(Result.Ranges[1].RangeOffset, 100u); + CHECK_EQ(Result.Ranges[1].RangeLength, 50u); + + std::vector<std::pair<uint64_t, uint64_t>> RequestedRanges = {{0, 50}, {100, 50}}; + std::vector<std::pair<uint64_t, uint64_t>> PayloadRanges = Result.GetRanges(RequestedRanges); + REQUIRE_EQ(PayloadRanges.size(), 2u); + CHECK(Result.ResponsePayload.GetView().Mid(PayloadRanges[0].first, PayloadRanges[0].second).EqualBytes(BlobView.Mid(0, 50))); + CHECK(Result.ResponsePayload.GetView().Mid(PayloadRanges[1].first, PayloadRanges[1].second).EqualBytes(BlobView.Mid(100, 50))); } } diff --git a/src/zenserver-test/projectstore-tests.cpp b/src/zenserver-test/projectstore-tests.cpp index cec453511..49d985abb 100644 --- a/src/zenserver-test/projectstore-tests.cpp +++ b/src/zenserver-test/projectstore-tests.cpp @@ -41,423 +41,430 @@ TEST_CASE("project.basic") const uint16_t PortNumber = Instance1.SpawnServerAndWaitUntilReady(); - std::mt19937_64 mt; - - zen::StringBuilder<64> BaseUri; - BaseUri << fmt::format("http://localhost:{}", PortNumber); + std::string ServerUri = fmt::format("http://localhost:{}", PortNumber); std::filesystem::path BinPath = zen::GetRunningExecutablePath(); std::filesystem::path RootPath = BinPath.parent_path().parent_path(); BinPath = BinPath.lexically_relative(RootPath); - SUBCASE("build store init") + auto CreateProjectAndOplog = [&](std::string_view ProjectName, std::string_view OplogName) -> std::string { + HttpClient Http{ServerUri}; + + zen::CbObjectWriter Body; + Body << "id" << ProjectName; + Body << "root" << RootPath.c_str(); + Body << "project" + << "/zooom"; + Body << "engine" + << "/zooom"; + IoBuffer BodyBuf = Body.Save().GetBuffer().AsIoBuffer(); + auto Response = Http.Post(fmt::format("/prj/{}", ProjectName), BodyBuf); + REQUIRE(Response.StatusCode == HttpResponseCode::Created); + + std::string OplogUri = fmt::format("{}/prj/{}/oplog/{}", ServerUri, ProjectName, OplogName); + HttpClient OplogHttp{OplogUri}; + auto OplogResponse = OplogHttp.Post(""sv, IoBuffer{}, ZenContentType::kCbObject); + REQUIRE(OplogResponse.StatusCode == HttpResponseCode::Created); + + return OplogUri; + }; + + // Create a file at a path exceeding Windows MAX_PATH (260 chars) for long filename testing + std::filesystem::path LongPathDir = RootPath / "longpathtest"; + for (int I = 0; I < 5; ++I) { - { - HttpClient Http{BaseUri}; + LongPathDir /= std::string(50, char('a' + I)); + } + std::filesystem::path LongFilePath = LongPathDir / "testfile.bin"; + std::filesystem::path LongRelPath = LongFilePath.lexically_relative(RootPath); - { - zen::CbObjectWriter Body; - Body << "id" - << "test"; - Body << "root" << RootPath.c_str(); - Body << "project" - << "/zooom"; - Body << "engine" - << "/zooom"; - - zen::BinaryWriter MemOut; - IoBuffer BodyBuf = Body.Save().GetBuffer().AsIoBuffer(); - - auto Response = Http.Post("/prj/test"sv, BodyBuf); - CHECK(Response.StatusCode == HttpResponseCode::Created); - } + const uint8_t LongPathFileData[] = {0xDE, 0xAD, 0xBE, 0xEF}; + CreateDirectories(MakeSafeAbsolutePath(LongPathDir)); + WriteFile(MakeSafeAbsolutePath(LongFilePath), IoBufferBuilder::MakeCloneFromMemory(LongPathFileData, sizeof(LongPathFileData))); + CHECK(LongRelPath.string().length() > 260); - { - auto Response = Http.Get("/prj/test"sv); - REQUIRE(Response.StatusCode == HttpResponseCode::OK); + std::string LongClientPath = "/{engine}/client"; + for (int I = 0; I < 5; ++I) + { + LongClientPath += '/'; + LongClientPath.append(50, char('a' + I)); + } + LongClientPath += "/longfile.bin"; + CHECK(LongClientPath.length() > 260); - CbObject ResponseObject = Response.AsObject(); + const std::string_view LongPathChunkId{ + "00000000" + "00000000" + "00020000"}; + auto LongPathFileOid = zen::Oid::FromHexString(LongPathChunkId); - CHECK(ResponseObject["id"].AsString() == "test"sv); - CHECK(ResponseObject["root"].AsString() == PathToUtf8(RootPath.c_str())); - } + // --- build store persistence --- + // First section also verifies project and oplog creation responses. + { + HttpClient ServerHttp{ServerUri}; + + { + zen::CbObjectWriter Body; + Body << "id" + << "test_persist"; + Body << "root" << RootPath.c_str(); + Body << "project" + << "/zooom"; + Body << "engine" + << "/zooom"; + IoBuffer BodyBuf = Body.Save().GetBuffer().AsIoBuffer(); + + auto Response = ServerHttp.Post("/prj/test_persist"sv, BodyBuf); + CHECK(Response.StatusCode == HttpResponseCode::Created); } - BaseUri << "/prj/test/oplog/foobar"; + { + auto Response = ServerHttp.Get("/prj/test_persist"sv); + REQUIRE(Response.StatusCode == HttpResponseCode::OK); + + CbObject ResponseObject = Response.AsObject(); + + CHECK(ResponseObject["id"].AsString() == "test_persist"sv); + CHECK(ResponseObject["root"].AsString() == PathToUtf8(RootPath.c_str())); + } + + std::string OplogUri = fmt::format("{}/prj/test_persist/oplog/oplog_persist", ServerUri); { - HttpClient Http{BaseUri}; + HttpClient OplogHttp{OplogUri}; { - auto Response = Http.Post(""sv, IoBuffer{}, ZenContentType::kCbObject); + auto Response = OplogHttp.Post(""sv, IoBuffer{}, ZenContentType::kCbObject); CHECK(Response.StatusCode == HttpResponseCode::Created); } { - auto Response = Http.Get(""sv); + auto Response = OplogHttp.Get(""sv); REQUIRE(Response.StatusCode == HttpResponseCode::OK); CbObject ResponseObject = Response.AsObject(); - CHECK(ResponseObject["id"].AsString() == "foobar"sv); - CHECK(ResponseObject["project"].AsString() == "test"sv); + CHECK(ResponseObject["id"].AsString() == "oplog_persist"sv); + CHECK(ResponseObject["project"].AsString() == "test_persist"sv); } } - // Create a file at a path exceeding Windows MAX_PATH (260 chars) for long filename testing - std::filesystem::path LongPathDir = RootPath / "longpathtest"; - for (int I = 0; I < 5; ++I) - { - LongPathDir /= std::string(50, char('a' + I)); - } - std::filesystem::path LongFilePath = LongPathDir / "testfile.bin"; - std::filesystem::path LongRelPath = LongFilePath.lexically_relative(RootPath); + uint8_t AttachData[] = {1, 2, 3}; - const uint8_t LongPathFileData[] = {0xDE, 0xAD, 0xBE, 0xEF}; - CreateDirectories(MakeSafeAbsolutePath(LongPathDir)); - WriteFile(MakeSafeAbsolutePath(LongFilePath), IoBufferBuilder::MakeCloneFromMemory(LongPathFileData, sizeof(LongPathFileData))); - CHECK(LongRelPath.string().length() > 260); + zen::CompressedBuffer Attachment = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3})); + zen::CbAttachment Attach{Attachment, Attachment.DecodeRawHash()}; - std::string LongClientPath = "/{engine}/client"; - for (int I = 0; I < 5; ++I) - { - LongClientPath += '/'; - LongClientPath.append(50, char('a' + I)); - } - LongClientPath += "/longfile.bin"; - CHECK(LongClientPath.length() > 260); + zen::CbObjectWriter OpWriter; + OpWriter << "key" + << "foo" + << "attachment" << Attach; - const std::string_view LongPathChunkId{ + const std::string_view ChunkId{ "00000000" "00000000" - "00020000"}; - auto LongPathFileOid = zen::Oid::FromHexString(LongPathChunkId); + "00010000"}; + auto FileOid = zen::Oid::FromHexString(ChunkId); + + OpWriter.BeginArray("files"); + OpWriter.BeginObject(); + OpWriter << "id" << FileOid; + OpWriter << "clientpath" + << "/{engine}/client/side/path"; + OpWriter << "serverpath" << BinPath.c_str(); + OpWriter.EndObject(); + OpWriter.BeginObject(); + OpWriter << "id" << LongPathFileOid; + OpWriter << "clientpath" << LongClientPath; + OpWriter << "serverpath" << LongRelPath.c_str(); + OpWriter.EndObject(); + OpWriter.EndArray(); + + zen::CbObject Op = OpWriter.Save(); + + zen::CbPackage OpPackage(Op); + OpPackage.AddAttachment(Attach); - SUBCASE("build store persistence") - { - uint8_t AttachData[] = {1, 2, 3}; - - zen::CompressedBuffer Attachment = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3})); - zen::CbAttachment Attach{Attachment, Attachment.DecodeRawHash()}; - - zen::CbObjectWriter OpWriter; - OpWriter << "key" - << "foo" - << "attachment" << Attach; - - const std::string_view ChunkId{ - "00000000" - "00000000" - "00010000"}; - auto FileOid = zen::Oid::FromHexString(ChunkId); - - OpWriter.BeginArray("files"); - OpWriter.BeginObject(); - OpWriter << "id" << FileOid; - OpWriter << "clientpath" - << "/{engine}/client/side/path"; - OpWriter << "serverpath" << BinPath.c_str(); - OpWriter.EndObject(); - OpWriter.BeginObject(); - OpWriter << "id" << LongPathFileOid; - OpWriter << "clientpath" << LongClientPath; - OpWriter << "serverpath" << LongRelPath.c_str(); - OpWriter.EndObject(); - OpWriter.EndArray(); - - zen::CbObject Op = OpWriter.Save(); - - zen::CbPackage OpPackage(Op); - OpPackage.AddAttachment(Attach); - - zen::BinaryWriter MemOut; - legacy::SaveCbPackage(OpPackage, MemOut); - - HttpClient Http{BaseUri}; - - { - auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView())); + zen::BinaryWriter MemOut; + legacy::SaveCbPackage(OpPackage, MemOut); - REQUIRE(Response); - CHECK(Response.StatusCode == HttpResponseCode::Created); - } + HttpClient Http{OplogUri}; - // Read file data + { + auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView())); - { - zen::StringBuilder<128> ChunkGetUri; - ChunkGetUri << "/" << ChunkId; - auto Response = Http.Get(ChunkGetUri); + REQUIRE(Response); + CHECK(Response.StatusCode == HttpResponseCode::Created); + } - REQUIRE(Response); - CHECK(Response.StatusCode == HttpResponseCode::OK); - } + { + zen::StringBuilder<128> ChunkGetUri; + ChunkGetUri << "/" << ChunkId; + auto Response = Http.Get(ChunkGetUri); - { - zen::StringBuilder<128> ChunkGetUri; - ChunkGetUri << "/" << ChunkId << "?offset=1&size=10"; - auto Response = Http.Get(ChunkGetUri); + REQUIRE(Response); + CHECK(Response.StatusCode == HttpResponseCode::OK); + } - REQUIRE(Response); - CHECK(Response.StatusCode == HttpResponseCode::OK); - CHECK(Response.ResponsePayload.GetSize() == 10); - } + { + zen::StringBuilder<128> ChunkGetUri; + ChunkGetUri << "/" << ChunkId << "?offset=1&size=10"; + auto Response = Http.Get(ChunkGetUri); - // Read long-path file data - { - zen::StringBuilder<128> ChunkGetUri; - ChunkGetUri << "/" << LongPathChunkId; - auto Response = Http.Get(ChunkGetUri); + REQUIRE(Response); + CHECK(Response.StatusCode == HttpResponseCode::OK); + CHECK(Response.ResponsePayload.GetSize() == 10); + } - REQUIRE(Response); - CHECK(Response.StatusCode == HttpResponseCode::OK); - CHECK(Response.ResponsePayload.GetSize() == sizeof(LongPathFileData)); - } + { + zen::StringBuilder<128> ChunkGetUri; + ChunkGetUri << "/" << LongPathChunkId; + auto Response = Http.Get(ChunkGetUri); - ZEN_INFO("+++++++"); + REQUIRE(Response); + CHECK(Response.StatusCode == HttpResponseCode::OK); + CHECK(Response.ResponsePayload.GetSize() == sizeof(LongPathFileData)); } - SUBCASE("snapshot") - { - zen::CbObjectWriter OpWriter; - OpWriter << "key" - << "foo"; - - const std::string_view ChunkId{ - "00000000" - "00000000" - "00010000"}; - auto FileOid = zen::Oid::FromHexString(ChunkId); - - OpWriter.BeginArray("files"); - OpWriter.BeginObject(); - OpWriter << "id" << FileOid; - OpWriter << "clientpath" - << "/{engine}/client/side/path"; - OpWriter << "serverpath" << BinPath.c_str(); - OpWriter.EndObject(); - OpWriter.BeginObject(); - OpWriter << "id" << LongPathFileOid; - OpWriter << "clientpath" << LongClientPath; - OpWriter << "serverpath" << LongRelPath.c_str(); - OpWriter.EndObject(); - OpWriter.EndArray(); - - zen::CbObject Op = OpWriter.Save(); - - zen::CbPackage OpPackage(Op); - - zen::BinaryWriter MemOut; - legacy::SaveCbPackage(OpPackage, MemOut); - - HttpClient Http{BaseUri}; + ZEN_INFO("+++++++"); + } - { - auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView())); + // --- snapshot --- + { + std::string OplogUri = CreateProjectAndOplog("test_snap", "oplog_snap"); - REQUIRE(Response); - CHECK(Response.StatusCode == HttpResponseCode::Created); - } + zen::CbObjectWriter OpWriter; + OpWriter << "key" + << "foo"; - // Read file data, it is raw and uncompressed - { - zen::StringBuilder<128> ChunkGetUri; - ChunkGetUri << "/" << ChunkId; - auto Response = Http.Get(ChunkGetUri); + const std::string_view ChunkId{ + "00000000" + "00000000" + "00010000"}; + auto FileOid = zen::Oid::FromHexString(ChunkId); + + OpWriter.BeginArray("files"); + OpWriter.BeginObject(); + OpWriter << "id" << FileOid; + OpWriter << "clientpath" + << "/{engine}/client/side/path"; + OpWriter << "serverpath" << BinPath.c_str(); + OpWriter.EndObject(); + OpWriter.BeginObject(); + OpWriter << "id" << LongPathFileOid; + OpWriter << "clientpath" << LongClientPath; + OpWriter << "serverpath" << LongRelPath.c_str(); + OpWriter.EndObject(); + OpWriter.EndArray(); + + zen::CbObject Op = OpWriter.Save(); + + zen::CbPackage OpPackage(Op); - REQUIRE(Response); - REQUIRE(Response.StatusCode == HttpResponseCode::OK); + zen::BinaryWriter MemOut; + legacy::SaveCbPackage(OpPackage, MemOut); - IoBuffer Data = Response.ResponsePayload; - IoBuffer ReferenceData = IoBufferBuilder::MakeFromFile(RootPath / BinPath); - CHECK(ReferenceData.GetSize() == Data.GetSize()); - CHECK(ReferenceData.GetView().EqualBytes(Data.GetView())); - } + HttpClient Http{OplogUri}; - // Read long-path file data, it is raw and uncompressed - { - zen::StringBuilder<128> ChunkGetUri; - ChunkGetUri << "/" << LongPathChunkId; - auto Response = Http.Get(ChunkGetUri); + { + auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView())); - REQUIRE(Response); - REQUIRE(Response.StatusCode == HttpResponseCode::OK); + REQUIRE(Response); + CHECK(Response.StatusCode == HttpResponseCode::Created); + } - IoBuffer Data = Response.ResponsePayload; - MemoryView ExpectedView{LongPathFileData, sizeof(LongPathFileData)}; - CHECK(Data.GetSize() == sizeof(LongPathFileData)); - CHECK(Data.GetView().EqualBytes(ExpectedView)); - } + // Read file data, it is raw and uncompressed + { + zen::StringBuilder<128> ChunkGetUri; + ChunkGetUri << "/" << ChunkId; + auto Response = Http.Get(ChunkGetUri); - { - IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer.AddString("method"sv, "snapshot"sv); }); - auto Response = Http.Post("/rpc"sv, Payload); - REQUIRE(Response); - CHECK(Response.StatusCode == HttpResponseCode::OK); - } + REQUIRE(Response); + REQUIRE(Response.StatusCode == HttpResponseCode::OK); - // Read chunk data, it is now compressed - { - zen::StringBuilder<128> ChunkGetUri; - ChunkGetUri << "/" << ChunkId; - auto Response = Http.Get(ChunkGetUri, {{"Accept-Type", "application/x-ue-comp"}}); + IoBuffer Data = Response.ResponsePayload; + IoBuffer ReferenceData = IoBufferBuilder::MakeFromFile(RootPath / BinPath); + CHECK(ReferenceData.GetSize() == Data.GetSize()); + CHECK(ReferenceData.GetView().EqualBytes(Data.GetView())); + } - REQUIRE(Response); - REQUIRE(Response.StatusCode == HttpResponseCode::OK); + // Read long-path file data, it is raw and uncompressed + { + zen::StringBuilder<128> ChunkGetUri; + ChunkGetUri << "/" << LongPathChunkId; + auto Response = Http.Get(ChunkGetUri); - IoBuffer Data = Response.ResponsePayload; - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), RawHash, RawSize); - REQUIRE(Compressed); - IoBuffer DataDecompressed = Compressed.Decompress().AsIoBuffer(); - IoBuffer ReferenceData = IoBufferBuilder::MakeFromFile(RootPath / BinPath); - CHECK(RawSize == ReferenceData.GetSize()); - CHECK(ReferenceData.GetSize() == DataDecompressed.GetSize()); - CHECK(ReferenceData.GetView().EqualBytes(DataDecompressed.GetView())); - } + REQUIRE(Response); + REQUIRE(Response.StatusCode == HttpResponseCode::OK); - // Read compressed long-path file data after snapshot - { - zen::StringBuilder<128> ChunkGetUri; - ChunkGetUri << "/" << LongPathChunkId; - auto Response = Http.Get(ChunkGetUri, {{"Accept-Type", "application/x-ue-comp"}}); + IoBuffer Data = Response.ResponsePayload; + MemoryView ExpectedView{LongPathFileData, sizeof(LongPathFileData)}; + CHECK(Data.GetSize() == sizeof(LongPathFileData)); + CHECK(Data.GetView().EqualBytes(ExpectedView)); + } - REQUIRE(Response); - REQUIRE(Response.StatusCode == HttpResponseCode::OK); + { + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer.AddString("method"sv, "snapshot"sv); }); + auto Response = Http.Post("/rpc"sv, Payload); + REQUIRE(Response); + CHECK(Response.StatusCode == HttpResponseCode::OK); + } - IoBuffer Data = Response.ResponsePayload; - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), RawHash, RawSize); - REQUIRE(Compressed); - IoBuffer DataDecompressed = Compressed.Decompress().AsIoBuffer(); - MemoryView ExpectedView{LongPathFileData, sizeof(LongPathFileData)}; - CHECK(RawSize == sizeof(LongPathFileData)); - CHECK(DataDecompressed.GetSize() == sizeof(LongPathFileData)); - CHECK(DataDecompressed.GetView().EqualBytes(ExpectedView)); - } + // Read chunk data, it is now compressed + { + zen::StringBuilder<128> ChunkGetUri; + ChunkGetUri << "/" << ChunkId; + auto Response = Http.Get(ChunkGetUri, {{"Accept-Type", "application/x-ue-comp"}}); + + REQUIRE(Response); + REQUIRE(Response.StatusCode == HttpResponseCode::OK); - ZEN_INFO("+++++++"); + IoBuffer Data = Response.ResponsePayload; + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), RawHash, RawSize); + REQUIRE(Compressed); + IoBuffer DataDecompressed = Compressed.Decompress().AsIoBuffer(); + IoBuffer ReferenceData = IoBufferBuilder::MakeFromFile(RootPath / BinPath); + CHECK(RawSize == ReferenceData.GetSize()); + CHECK(ReferenceData.GetSize() == DataDecompressed.GetSize()); + CHECK(ReferenceData.GetView().EqualBytes(DataDecompressed.GetView())); } - SUBCASE("snapshot zero byte file") + // Read compressed long-path file data after snapshot { - // A zero-byte file referenced in an oplog entry must survive a - // snapshot: the file is read, compressed, stored in CidStore, and - // the oplog is rewritten with a BinaryAttachment reference. After - // the snapshot the chunk must be retrievable and decompress to an - // empty payload. - - std::filesystem::path EmptyFileRelPath = std::filesystem::path("zerobyte_snapshot_test") / "empty.bin"; - std::filesystem::path EmptyFileAbsPath = RootPath / EmptyFileRelPath; - CreateDirectories(MakeSafeAbsolutePath(EmptyFileAbsPath.parent_path())); - // Create a zero-byte file on disk. - WriteFile(MakeSafeAbsolutePath(EmptyFileAbsPath), IoBuffer{}); - REQUIRE(IsFile(MakeSafeAbsolutePath(EmptyFileAbsPath))); - - const std::string_view EmptyChunkId{ - "00000000" - "00000000" - "00030000"}; - auto EmptyFileOid = zen::Oid::FromHexString(EmptyChunkId); - - zen::CbObjectWriter OpWriter; - OpWriter << "key" - << "zero_byte_test"; - OpWriter.BeginArray("files"); - OpWriter.BeginObject(); - OpWriter << "id" << EmptyFileOid; - OpWriter << "clientpath" - << "/{engine}/empty_file"; - OpWriter << "serverpath" << EmptyFileRelPath.c_str(); - OpWriter.EndObject(); - OpWriter.EndArray(); - - zen::CbObject Op = OpWriter.Save(); - zen::CbPackage OpPackage(Op); - - zen::BinaryWriter MemOut; - legacy::SaveCbPackage(OpPackage, MemOut); - - HttpClient Http{BaseUri}; + zen::StringBuilder<128> ChunkGetUri; + ChunkGetUri << "/" << LongPathChunkId; + auto Response = Http.Get(ChunkGetUri, {{"Accept-Type", "application/x-ue-comp"}}); - { - auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView())); - REQUIRE(Response); - CHECK(Response.StatusCode == HttpResponseCode::Created); - } + REQUIRE(Response); + REQUIRE(Response.StatusCode == HttpResponseCode::OK); - // Read file data before snapshot - raw and uncompressed, 0 bytes. - // http.sys converts a 200 OK with empty body to 204 No Content, so - // accept either status code. - { - zen::StringBuilder<128> ChunkGetUri; - ChunkGetUri << "/" << EmptyChunkId; - auto Response = Http.Get(ChunkGetUri); + IoBuffer Data = Response.ResponsePayload; + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), RawHash, RawSize); + REQUIRE(Compressed); + IoBuffer DataDecompressed = Compressed.Decompress().AsIoBuffer(); + MemoryView ExpectedView{LongPathFileData, sizeof(LongPathFileData)}; + CHECK(RawSize == sizeof(LongPathFileData)); + CHECK(DataDecompressed.GetSize() == sizeof(LongPathFileData)); + CHECK(DataDecompressed.GetView().EqualBytes(ExpectedView)); + } - REQUIRE(Response); - CHECK((Response.StatusCode == HttpResponseCode::OK || Response.StatusCode == HttpResponseCode::NoContent)); - CHECK(Response.ResponsePayload.GetSize() == 0); - } + ZEN_INFO("+++++++"); + } - // Trigger snapshot. - { - IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer.AddString("method"sv, "snapshot"sv); }); - auto Response = Http.Post("/rpc"sv, Payload); - REQUIRE(Response); - CHECK(Response.StatusCode == HttpResponseCode::OK); - } + // --- snapshot zero byte file --- + { + std::string OplogUri = CreateProjectAndOplog("test_zero", "oplog_zero"); - // Read chunk after snapshot - compressed, decompresses to 0 bytes. - { - zen::StringBuilder<128> ChunkGetUri; - ChunkGetUri << "/" << EmptyChunkId; - auto Response = Http.Get(ChunkGetUri, {{"Accept-Type", "application/x-ue-comp"}}); + std::filesystem::path EmptyFileRelPath = std::filesystem::path("zerobyte_snapshot_test") / "empty.bin"; + std::filesystem::path EmptyFileAbsPath = RootPath / EmptyFileRelPath; + CreateDirectories(MakeSafeAbsolutePath(EmptyFileAbsPath.parent_path())); + WriteFile(MakeSafeAbsolutePath(EmptyFileAbsPath), IoBuffer{}); + REQUIRE(IsFile(MakeSafeAbsolutePath(EmptyFileAbsPath))); - REQUIRE(Response); - REQUIRE(Response.StatusCode == HttpResponseCode::OK); + const std::string_view EmptyChunkId{ + "00000000" + "00000000" + "00030000"}; + auto EmptyFileOid = zen::Oid::FromHexString(EmptyChunkId); + + zen::CbObjectWriter OpWriter; + OpWriter << "key" + << "zero_byte_test"; + OpWriter.BeginArray("files"); + OpWriter.BeginObject(); + OpWriter << "id" << EmptyFileOid; + OpWriter << "clientpath" + << "/{engine}/empty_file"; + OpWriter << "serverpath" << EmptyFileRelPath.c_str(); + OpWriter.EndObject(); + OpWriter.EndArray(); + + zen::CbObject Op = OpWriter.Save(); + zen::CbPackage OpPackage(Op); - IoBuffer Data = Response.ResponsePayload; - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), RawHash, RawSize); - REQUIRE(Compressed); - CHECK(RawSize == 0); - IoBuffer DataDecompressed = Compressed.Decompress().AsIoBuffer(); - CHECK(DataDecompressed.GetSize() == 0); - } + zen::BinaryWriter MemOut; + legacy::SaveCbPackage(OpPackage, MemOut); - // Cleanup - { - std::error_code Ec; - DeleteDirectories(MakeSafeAbsolutePath(RootPath / "zerobyte_snapshot_test"), Ec); - } + HttpClient Http{OplogUri}; - ZEN_INFO("+++++++"); + { + auto Response = Http.Post("/new", IoBufferBuilder::MakeFromMemory(MemOut.GetView())); + REQUIRE(Response); + CHECK(Response.StatusCode == HttpResponseCode::Created); } - SUBCASE("test chunk not found error") + // Read file data before snapshot - raw and uncompressed, 0 bytes. + // http.sys converts a 200 OK with empty body to 204 No Content, so + // accept either status code. { - HttpClient Http{BaseUri}; + zen::StringBuilder<128> ChunkGetUri; + ChunkGetUri << "/" << EmptyChunkId; + auto Response = Http.Get(ChunkGetUri); - for (size_t I = 0; I < 65; I++) - { - zen::StringBuilder<128> PostUri; - PostUri << "/f77c781846caead318084604/info"; - auto Response = Http.Get(PostUri); + REQUIRE(Response); + CHECK((Response.StatusCode == HttpResponseCode::OK || Response.StatusCode == HttpResponseCode::NoContent)); + CHECK(Response.ResponsePayload.GetSize() == 0); + } - REQUIRE(!Response.Error); - CHECK(Response.StatusCode == HttpResponseCode::NotFound); - } + // Trigger snapshot. + { + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer.AddString("method"sv, "snapshot"sv); }); + auto Response = Http.Post("/rpc"sv, Payload); + REQUIRE(Response); + CHECK(Response.StatusCode == HttpResponseCode::OK); + } + + // Read chunk after snapshot - compressed, decompresses to 0 bytes. + { + zen::StringBuilder<128> ChunkGetUri; + ChunkGetUri << "/" << EmptyChunkId; + auto Response = Http.Get(ChunkGetUri, {{"Accept-Type", "application/x-ue-comp"}}); + + REQUIRE(Response); + REQUIRE(Response.StatusCode == HttpResponseCode::OK); + + IoBuffer Data = Response.ResponsePayload; + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), RawHash, RawSize); + REQUIRE(Compressed); + CHECK(RawSize == 0); + IoBuffer DataDecompressed = Compressed.Decompress().AsIoBuffer(); + CHECK(DataDecompressed.GetSize() == 0); } - // Cleanup long-path test directory { std::error_code Ec; - DeleteDirectories(MakeSafeAbsolutePath(RootPath / "longpathtest"), Ec); + DeleteDirectories(MakeSafeAbsolutePath(RootPath / "zerobyte_snapshot_test"), Ec); + } + + ZEN_INFO("+++++++"); + } + + // --- test chunk not found error --- + { + std::string OplogUri = CreateProjectAndOplog("test_notfound", "oplog_notfound"); + HttpClient Http{OplogUri}; + + for (size_t I = 0; I < 65; I++) + { + zen::StringBuilder<128> PostUri; + PostUri << "/f77c781846caead318084604/info"; + auto Response = Http.Get(PostUri); + + REQUIRE(!Response.Error); + CHECK(Response.StatusCode == HttpResponseCode::NotFound); } } + + // Cleanup long-path test directory + { + std::error_code Ec; + DeleteDirectories(MakeSafeAbsolutePath(RootPath / "longpathtest"), Ec); + } } CbPackage @@ -753,86 +760,102 @@ TEST_CASE("project.remote") } }; - SUBCASE("File") + // --- Zen --- + // NOTE: Zen export must run before file-based exports from the same source + // oplog. A prior file export leaves server-side state that causes a + // subsequent zen-protocol export from the same oplog to abort. { + INFO("Zen"); ScopedTemporaryDirectory TempDir; { - IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) { + std::string ExportSourceUri = Servers.GetInstance(0).GetBaseUri(); + std::string ExportTargetUri = Servers.GetInstance(1).GetBaseUri(); + MakeProject(ExportTargetUri, "proj0_zen"); + MakeOplog(ExportTargetUri, "proj0_zen", "oplog0_zen"); + + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer << "method"sv << "export"sv; Writer << "params" << BeginObject; { Writer << "maxblocksize"sv << 3072u; Writer << "maxchunkembedsize"sv << 1296u; - Writer << "chunkfilesizelimit"sv << 5u * 1024u; Writer << "maxchunksperblock"sv << 16u; + Writer << "chunkfilesizelimit"sv << 5u * 1024u; Writer << "force"sv << false; - Writer << "file"sv << BeginObject; + Writer << "zen"sv << BeginObject; { - Writer << "path"sv << path; - Writer << "name"sv - << "proj0_oplog0"sv; + Writer << "url"sv << ExportTargetUri.substr(7); + Writer << "project" + << "proj0_zen"; + Writer << "oplog" + << "oplog0_zen"; } - Writer << EndObject; // "file" + Writer << EndObject; // "zen" } Writer << EndObject; // "params" }); - HttpClient Http{Servers.GetInstance(0).GetBaseUri()}; - + HttpClient Http{Servers.GetInstance(0).GetBaseUri()}; HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload); HttpWaitForCompletion(Servers.GetInstance(0), Response); } + ValidateAttachments(1, "proj0_zen", "oplog0_zen"); + ValidateOplog(1, "proj0_zen", "oplog0_zen"); + { - MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_copy"); - MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); + std::string ImportSourceUri = Servers.GetInstance(1).GetBaseUri(); + std::string ImportTargetUri = Servers.GetInstance(2).GetBaseUri(); + MakeProject(ImportTargetUri, "proj1"); + MakeOplog(ImportTargetUri, "proj1", "oplog1"); - IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) { + IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer << "method"sv << "import"sv; Writer << "params" << BeginObject; { Writer << "force"sv << false; - Writer << "file"sv << BeginObject; + Writer << "zen"sv << BeginObject; { - Writer << "path"sv << path; - Writer << "name"sv - << "proj0_oplog0"sv; + Writer << "url"sv << ImportSourceUri.substr(7); + Writer << "project" + << "proj0_zen"; + Writer << "oplog" + << "oplog0_zen"; } - Writer << EndObject; // "file" + Writer << EndObject; // "zen" } Writer << EndObject; // "params" }); - HttpClient Http{Servers.GetInstance(1).GetBaseUri()}; - - HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_copy", "oplog0_copy"), Payload); - HttpWaitForCompletion(Servers.GetInstance(1), Response); + HttpClient Http{Servers.GetInstance(2).GetBaseUri()}; + HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj1", "oplog1"), Payload); + HttpWaitForCompletion(Servers.GetInstance(2), Response); } - ValidateAttachments(1, "proj0_copy", "oplog0_copy"); - ValidateOplog(1, "proj0_copy", "oplog0_copy"); + ValidateAttachments(2, "proj1", "oplog1"); + ValidateOplog(2, "proj1", "oplog1"); } - SUBCASE("File disable blocks") + // --- File --- { + INFO("File"); ScopedTemporaryDirectory TempDir; { - IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { + IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) { Writer << "method"sv << "export"sv; Writer << "params" << BeginObject; { Writer << "maxblocksize"sv << 3072u; Writer << "maxchunkembedsize"sv << 1296u; - Writer << "maxchunksperblock"sv << 16u; Writer << "chunkfilesizelimit"sv << 5u * 1024u; - Writer << "force"sv << false; + Writer << "maxchunksperblock"sv << 16u; + Writer << "force"sv << true; Writer << "file"sv << BeginObject; { - Writer << "path"sv << TempDir.Path().string(); + Writer << "path"sv << path; Writer << "name"sv << "proj0_oplog0"sv; - Writer << "disableblocks"sv << true; } Writer << EndObject; // "file" } @@ -845,9 +868,10 @@ TEST_CASE("project.remote") HttpWaitForCompletion(Servers.GetInstance(0), Response); } { - MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_copy"); - MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); - IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { + MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_file"); + MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_file", "oplog0_file"); + + IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) { Writer << "method"sv << "import"sv; Writer << "params" << BeginObject; @@ -855,7 +879,7 @@ TEST_CASE("project.remote") Writer << "force"sv << false; Writer << "file"sv << BeginObject; { - Writer << "path"sv << TempDir.Path().string(); + Writer << "path"sv << path; Writer << "name"sv << "proj0_oplog0"sv; } @@ -866,15 +890,16 @@ TEST_CASE("project.remote") HttpClient Http{Servers.GetInstance(1).GetBaseUri()}; - HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_copy", "oplog0_copy"), Payload); + HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_file", "oplog0_file"), Payload); HttpWaitForCompletion(Servers.GetInstance(1), Response); } - ValidateAttachments(1, "proj0_copy", "oplog0_copy"); - ValidateOplog(1, "proj0_copy", "oplog0_copy"); + ValidateAttachments(1, "proj0_file", "oplog0_file"); + ValidateOplog(1, "proj0_file", "oplog0_file"); } - SUBCASE("File force temp blocks") + // --- File disable blocks --- { + INFO("File disable blocks"); ScopedTemporaryDirectory TempDir; { IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { @@ -886,26 +911,27 @@ TEST_CASE("project.remote") Writer << "maxchunkembedsize"sv << 1296u; Writer << "maxchunksperblock"sv << 16u; Writer << "chunkfilesizelimit"sv << 5u * 1024u; - Writer << "force"sv << false; + Writer << "force"sv << true; Writer << "file"sv << BeginObject; { Writer << "path"sv << TempDir.Path().string(); Writer << "name"sv << "proj0_oplog0"sv; - Writer << "enabletempblocks"sv << true; + Writer << "disableblocks"sv << true; } Writer << EndObject; // "file" } Writer << EndObject; // "params" }); - HttpClient Http{Servers.GetInstance(0).GetBaseUri()}; + HttpClient Http{Servers.GetInstance(0).GetBaseUri()}; + HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload); HttpWaitForCompletion(Servers.GetInstance(0), Response); } { - MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_copy"); - MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy"); + MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_noblock"); + MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_noblock", "oplog0_noblock"); IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer << "method"sv << "import"sv; @@ -923,23 +949,20 @@ TEST_CASE("project.remote") Writer << EndObject; // "params" }); - HttpClient Http{Servers.GetInstance(1).GetBaseUri()}; - HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_copy", "oplog0_copy"), Payload); + HttpClient Http{Servers.GetInstance(1).GetBaseUri()}; + + HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_noblock", "oplog0_noblock"), Payload); HttpWaitForCompletion(Servers.GetInstance(1), Response); } - ValidateAttachments(1, "proj0_copy", "oplog0_copy"); - ValidateOplog(1, "proj0_copy", "oplog0_copy"); + ValidateAttachments(1, "proj0_noblock", "oplog0_noblock"); + ValidateOplog(1, "proj0_noblock", "oplog0_noblock"); } - SUBCASE("Zen") + // --- File force temp blocks --- { + INFO("File force temp blocks"); ScopedTemporaryDirectory TempDir; { - std::string ExportSourceUri = Servers.GetInstance(0).GetBaseUri(); - std::string ExportTargetUri = Servers.GetInstance(1).GetBaseUri(); - MakeProject(ExportTargetUri, "proj0_copy"); - MakeOplog(ExportTargetUri, "proj0_copy", "oplog0_copy"); - IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer << "method"sv << "export"sv; @@ -949,14 +972,13 @@ TEST_CASE("project.remote") Writer << "maxchunkembedsize"sv << 1296u; Writer << "maxchunksperblock"sv << 16u; Writer << "chunkfilesizelimit"sv << 5u * 1024u; - Writer << "force"sv << false; - Writer << "zen"sv << BeginObject; + Writer << "force"sv << true; + Writer << "file"sv << BeginObject; { - Writer << "url"sv << ExportTargetUri.substr(7); - Writer << "project" - << "proj0_copy"; - Writer << "oplog" - << "oplog0_copy"; + Writer << "path"sv << TempDir.Path().string(); + Writer << "name"sv + << "proj0_oplog0"sv; + Writer << "enabletempblocks"sv << true; } Writer << EndObject; // "file" } @@ -967,40 +989,32 @@ TEST_CASE("project.remote") HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0", "oplog0"), Payload); HttpWaitForCompletion(Servers.GetInstance(0), Response); } - ValidateAttachments(1, "proj0_copy", "oplog0_copy"); - ValidateOplog(1, "proj0_copy", "oplog0_copy"); - { - std::string ImportSourceUri = Servers.GetInstance(1).GetBaseUri(); - std::string ImportTargetUri = Servers.GetInstance(2).GetBaseUri(); - MakeProject(ImportTargetUri, "proj1"); - MakeOplog(ImportTargetUri, "proj1", "oplog1"); - + MakeProject(Servers.GetInstance(1).GetBaseUri(), "proj0_tmpblock"); + MakeOplog(Servers.GetInstance(1).GetBaseUri(), "proj0_tmpblock", "oplog0_tmpblock"); IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer << "method"sv << "import"sv; Writer << "params" << BeginObject; { Writer << "force"sv << false; - Writer << "zen"sv << BeginObject; + Writer << "file"sv << BeginObject; { - Writer << "url"sv << ImportSourceUri.substr(7); - Writer << "project" - << "proj0_copy"; - Writer << "oplog" - << "oplog0_copy"; + Writer << "path"sv << TempDir.Path().string(); + Writer << "name"sv + << "proj0_oplog0"sv; } Writer << EndObject; // "file" } Writer << EndObject; // "params" }); - HttpClient Http{Servers.GetInstance(2).GetBaseUri()}; - HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj1", "oplog1"), Payload); - HttpWaitForCompletion(Servers.GetInstance(2), Response); + HttpClient Http{Servers.GetInstance(1).GetBaseUri()}; + HttpClient::Response Response = Http.Post(fmt::format("/prj/{}/oplog/{}/rpc", "proj0_tmpblock", "oplog0_tmpblock"), Payload); + HttpWaitForCompletion(Servers.GetInstance(1), Response); } - ValidateAttachments(2, "proj1", "oplog1"); - ValidateOplog(2, "proj1", "oplog1"); + ValidateAttachments(1, "proj0_tmpblock", "oplog0_tmpblock"); + ValidateOplog(1, "proj0_tmpblock", "oplog0_tmpblock"); } } @@ -1379,7 +1393,7 @@ TEST_CASE("project.file.data.transitions") return Package; }; - SUBCASE("path-referenced file is retrievable") + // --- path-referenced file is retrievable --- { MakeProject("proj_path"sv); MakeOplog("proj_path"sv, "oplog"sv); @@ -1397,7 +1411,7 @@ TEST_CASE("project.file.data.transitions") } } - SUBCASE("hash-referenced file is retrievable") + // --- hash-referenced file is retrievable --- { MakeProject("proj_hash"sv); MakeOplog("proj_hash"sv, "oplog"sv); @@ -1416,34 +1430,35 @@ TEST_CASE("project.file.data.transitions") } } - SUBCASE("hash-referenced to path-referenced transition with different content") + struct TransitionVariant { - MakeProject("proj_hash_to_path_diff"sv); - MakeOplog("proj_hash_to_path_diff"sv, "oplog"sv); + std::string_view Suffix; + bool SameOpKey; + bool RunGc; + }; - Oid FirstOpKey = Oid::NewOid(); - Oid SecondOpKey; - bool RunGcAfterTransition = false; + static constexpr TransitionVariant Variants[] = { + {"_nk", false, false}, + {"_sk", true, false}, + {"_nk_gc", false, true}, + {"_sk_gc", true, true}, + }; - SUBCASE("new op key") { SecondOpKey = Oid::NewOid(); } - SUBCASE("same op key") { SecondOpKey = FirstOpKey; } - SUBCASE("new op key with gc") - { - SecondOpKey = Oid::NewOid(); - RunGcAfterTransition = true; - } - SUBCASE("same op key with gc") - { - SecondOpKey = FirstOpKey; - RunGcAfterTransition = true; - } + // --- hash-referenced to path-referenced transition with different content --- + for (const TransitionVariant& V : Variants) + { + std::string ProjName = fmt::format("proj_h2pd{}", V.Suffix); + MakeProject(ProjName); + MakeOplog(ProjName, "oplog"sv); + + Oid FirstOpKey = Oid::NewOid(); + Oid SecondOpKey = V.SameOpKey ? FirstOpKey : Oid::NewOid(); - // First op: file with CAS hash (content differs from the on-disk file) { CbPackage Op = BuildHashReferencedFileOp(FirstOpKey, CompressedBlob); - PostOplogEntry("proj_hash_to_path_diff"sv, "oplog"sv, Op); + PostOplogEntry(ProjName, "oplog"sv, Op); - HttpClient::Response Response = GetChunk("proj_hash_to_path_diff"sv); + HttpClient::Response Response = GetChunk(ProjName); CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk first op")); if (Response.IsSuccess()) { @@ -1453,19 +1468,17 @@ TEST_CASE("project.file.data.transitions") } } - // Second op: same FileId transitions to serverpath (different data) { CbPackage Op = BuildPathReferencedFileOp(SecondOpKey); - PostOplogEntry("proj_hash_to_path_diff"sv, "oplog"sv, Op); + PostOplogEntry(ProjName, "oplog"sv, Op); } - if (RunGcAfterTransition) + if (V.RunGc) { TriggerGcAndWait(); } - // Must serve the on-disk file content, not the old CAS blob - HttpClient::Response Response = GetChunk("proj_hash_to_path_diff"sv); + HttpClient::Response Response = GetChunk(ProjName); CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk after transition")); if (Response.IsSuccess()) { @@ -1475,95 +1488,68 @@ TEST_CASE("project.file.data.transitions") } } - SUBCASE("hash-referenced to path-referenced transition with identical content") + // --- hash-referenced to path-referenced transition with identical content --- { - // Compress the same on-disk file content as a CAS blob so both references yield identical data CompressedBuffer MatchingBlob = CompressedBuffer::Compress(SharedBuffer::Clone(FileBlob.GetView())); - MakeProject("proj_hash_to_path_same"sv); - MakeOplog("proj_hash_to_path_same"sv, "oplog"sv); + for (const TransitionVariant& V : Variants) + { + std::string ProjName = fmt::format("proj_h2ps{}", V.Suffix); + MakeProject(ProjName); + MakeOplog(ProjName, "oplog"sv); - Oid FirstOpKey = Oid::NewOid(); - Oid SecondOpKey; - bool RunGcAfterTransition = false; + Oid FirstOpKey = Oid::NewOid(); + Oid SecondOpKey = V.SameOpKey ? FirstOpKey : Oid::NewOid(); - SUBCASE("new op key") { SecondOpKey = Oid::NewOid(); } - SUBCASE("same op key") { SecondOpKey = FirstOpKey; } - SUBCASE("new op key with gc") - { - SecondOpKey = Oid::NewOid(); - RunGcAfterTransition = true; - } - SUBCASE("same op key with gc") - { - SecondOpKey = FirstOpKey; - RunGcAfterTransition = true; - } + { + CbPackage Op = BuildHashReferencedFileOp(FirstOpKey, MatchingBlob); + PostOplogEntry(ProjName, "oplog"sv, Op); - // First op: file with CAS hash (content matches the on-disk file) - { - CbPackage Op = BuildHashReferencedFileOp(FirstOpKey, MatchingBlob); - PostOplogEntry("proj_hash_to_path_same"sv, "oplog"sv, Op); + HttpClient::Response Response = GetChunk(ProjName); + CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk first op")); + if (Response.IsSuccess()) + { + IoBuffer Payload = GetDecompressedPayload(Response); + CHECK(Payload.GetView().EqualBytes(FileBlob.GetView())); + } + } - HttpClient::Response Response = GetChunk("proj_hash_to_path_same"sv); - CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk first op")); + { + CbPackage Op = BuildPathReferencedFileOp(SecondOpKey); + PostOplogEntry(ProjName, "oplog"sv, Op); + } + + if (V.RunGc) + { + TriggerGcAndWait(); + } + + HttpClient::Response Response = GetChunk(ProjName); + CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk after transition")); if (Response.IsSuccess()) { IoBuffer Payload = GetDecompressedPayload(Response); + CHECK_EQ(Payload.GetSize(), FileBlob.GetSize()); CHECK(Payload.GetView().EqualBytes(FileBlob.GetView())); } } - - // Second op: same FileId transitions to serverpath (same data) - { - CbPackage Op = BuildPathReferencedFileOp(SecondOpKey); - PostOplogEntry("proj_hash_to_path_same"sv, "oplog"sv, Op); - } - - if (RunGcAfterTransition) - { - TriggerGcAndWait(); - } - - // Must still resolve successfully after the transition - HttpClient::Response Response = GetChunk("proj_hash_to_path_same"sv); - CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk after transition")); - if (Response.IsSuccess()) - { - IoBuffer Payload = GetDecompressedPayload(Response); - CHECK_EQ(Payload.GetSize(), FileBlob.GetSize()); - CHECK(Payload.GetView().EqualBytes(FileBlob.GetView())); - } } - SUBCASE("path-referenced to hash-referenced transition with different content") + // --- path-referenced to hash-referenced transition with different content --- + for (const TransitionVariant& V : Variants) { - MakeProject("proj_path_to_hash_diff"sv); - MakeOplog("proj_path_to_hash_diff"sv, "oplog"sv); - - Oid FirstOpKey = Oid::NewOid(); - Oid SecondOpKey; - bool RunGcAfterTransition = false; + std::string ProjName = fmt::format("proj_p2hd{}", V.Suffix); + MakeProject(ProjName); + MakeOplog(ProjName, "oplog"sv); - SUBCASE("new op key") { SecondOpKey = Oid::NewOid(); } - SUBCASE("same op key") { SecondOpKey = FirstOpKey; } - SUBCASE("new op key with gc") - { - SecondOpKey = Oid::NewOid(); - RunGcAfterTransition = true; - } - SUBCASE("same op key with gc") - { - SecondOpKey = FirstOpKey; - RunGcAfterTransition = true; - } + Oid FirstOpKey = Oid::NewOid(); + Oid SecondOpKey = V.SameOpKey ? FirstOpKey : Oid::NewOid(); - // First op: file with serverpath { CbPackage Op = BuildPathReferencedFileOp(FirstOpKey); - PostOplogEntry("proj_path_to_hash_diff"sv, "oplog"sv, Op); + PostOplogEntry(ProjName, "oplog"sv, Op); - HttpClient::Response Response = GetChunk("proj_path_to_hash_diff"sv); + HttpClient::Response Response = GetChunk(ProjName); CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk first op")); if (Response.IsSuccess()) { @@ -1572,19 +1558,17 @@ TEST_CASE("project.file.data.transitions") } } - // Second op: same FileId transitions to CAS hash (different data) { CbPackage Op = BuildHashReferencedFileOp(SecondOpKey, CompressedBlob); - PostOplogEntry("proj_path_to_hash_diff"sv, "oplog"sv, Op); + PostOplogEntry(ProjName, "oplog"sv, Op); } - if (RunGcAfterTransition) + if (V.RunGc) { TriggerGcAndWait(); } - // Must serve the CAS blob content, not the old on-disk file - HttpClient::Response Response = GetChunk("proj_path_to_hash_diff"sv); + HttpClient::Response Response = GetChunk(ProjName); CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk after transition")); if (Response.IsSuccess()) { @@ -1595,65 +1579,51 @@ TEST_CASE("project.file.data.transitions") } } - SUBCASE("path-referenced to hash-referenced transition with identical content") + // --- path-referenced to hash-referenced transition with identical content --- { - // Compress the same on-disk file content as a CAS blob so both references yield identical data CompressedBuffer MatchingBlob = CompressedBuffer::Compress(SharedBuffer::Clone(FileBlob.GetView())); - MakeProject("proj_path_to_hash_same"sv); - MakeOplog("proj_path_to_hash_same"sv, "oplog"sv); + for (const TransitionVariant& V : Variants) + { + std::string ProjName = fmt::format("proj_p2hs{}", V.Suffix); + MakeProject(ProjName); + MakeOplog(ProjName, "oplog"sv); - Oid FirstOpKey = Oid::NewOid(); - Oid SecondOpKey; - bool RunGcAfterTransition = false; + Oid FirstOpKey = Oid::NewOid(); + Oid SecondOpKey = V.SameOpKey ? FirstOpKey : Oid::NewOid(); - SUBCASE("new op key") { SecondOpKey = Oid::NewOid(); } - SUBCASE("same op key") { SecondOpKey = FirstOpKey; } - SUBCASE("new op key with gc") - { - SecondOpKey = Oid::NewOid(); - RunGcAfterTransition = true; - } - SUBCASE("same op key with gc") - { - SecondOpKey = FirstOpKey; - RunGcAfterTransition = true; - } + { + CbPackage Op = BuildPathReferencedFileOp(FirstOpKey); + PostOplogEntry(ProjName, "oplog"sv, Op); - // First op: file with serverpath - { - CbPackage Op = BuildPathReferencedFileOp(FirstOpKey); - PostOplogEntry("proj_path_to_hash_same"sv, "oplog"sv, Op); + HttpClient::Response Response = GetChunk(ProjName); + CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk first op")); + if (Response.IsSuccess()) + { + IoBuffer Payload = GetDecompressedPayload(Response); + CHECK(Payload.GetView().EqualBytes(FileBlob.GetView())); + } + } - HttpClient::Response Response = GetChunk("proj_path_to_hash_same"sv); - CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk first op")); + { + CbPackage Op = BuildHashReferencedFileOp(SecondOpKey, MatchingBlob); + PostOplogEntry(ProjName, "oplog"sv, Op); + } + + if (V.RunGc) + { + TriggerGcAndWait(); + } + + HttpClient::Response Response = GetChunk(ProjName); + CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk after transition")); if (Response.IsSuccess()) { IoBuffer Payload = GetDecompressedPayload(Response); + CHECK_EQ(Payload.GetSize(), FileBlob.GetSize()); CHECK(Payload.GetView().EqualBytes(FileBlob.GetView())); } } - - // Second op: same FileId transitions to CAS hash (same data) - { - CbPackage Op = BuildHashReferencedFileOp(SecondOpKey, MatchingBlob); - PostOplogEntry("proj_path_to_hash_same"sv, "oplog"sv, Op); - } - - if (RunGcAfterTransition) - { - TriggerGcAndWait(); - } - - // Must still resolve successfully after the transition - HttpClient::Response Response = GetChunk("proj_path_to_hash_same"sv); - CHECK_MESSAGE(Response.IsSuccess(), Response.ErrorMessage("GetChunk after transition")); - if (Response.IsSuccess()) - { - IoBuffer Payload = GetDecompressedPayload(Response); - CHECK_EQ(Payload.GetSize(), FileBlob.GetSize()); - CHECK(Payload.GetView().EqualBytes(FileBlob.GetView())); - } } } diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index cf7ffe4e4..d713f693f 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -199,7 +199,7 @@ TEST_CASE("default.single") HttpClient Http{fmt::format("http://localhost:{}", PortNumber)}; - for (int i = 0; i < 100; ++i) + for (int i = 0; i < 20; ++i) { auto res = Http.Get("/test/hello"sv); ++RequestCount; @@ -238,7 +238,6 @@ TEST_CASE("default.loopback") ZEN_INFO("Running loopback server test..."); - SUBCASE("ipv4 endpoint connectivity") { HttpClient Http{fmt::format("http://127.0.0.1:{}", PortNumber)}; @@ -247,7 +246,6 @@ TEST_CASE("default.loopback") CHECK(res); } - SUBCASE("ipv6 endpoint connectivity") { HttpClient Http{fmt::format("http://[::1]:{}", PortNumber)}; @@ -287,7 +285,7 @@ TEST_CASE("multi.basic") HttpClient Http{fmt::format("http://localhost:{}", PortNumber)}; - for (int i = 0; i < 100; ++i) + for (int i = 0; i < 20; ++i) { auto res = Http.Get("/test/hello"sv); ++RequestCount; @@ -401,13 +399,11 @@ TEST_CASE("http.unixsocket") Settings.UnixSocketPath = SocketPath; HttpClient Http{fmt::format("http://localhost:{}", PortNumber), Settings, {}}; - SUBCASE("GET over unix socket") { HttpClient::Response Res = Http.Get("/testing/hello"); CHECK(Res.IsSuccess()); } - SUBCASE("POST echo over unix socket") { IoBuffer Body{IoBuffer::Wrap, "unix-test", 9}; HttpClient::Response Res = Http.Post("/testing/echo", Body); @@ -431,13 +427,11 @@ TEST_CASE("http.nonetwork") Settings.UnixSocketPath = SocketPath; HttpClient Http{fmt::format("http://localhost:{}", PortNumber), Settings, {}}; - SUBCASE("GET over unix socket succeeds") { HttpClient::Response Res = Http.Get("/testing/hello"); CHECK(Res.IsSuccess()); } - SUBCASE("TCP connection is refused") { asio::io_context IoContext; asio::ip::tcp::socket Socket(IoContext); diff --git a/src/zenserver/frontend/html/pages/cache.js b/src/zenserver/frontend/html/pages/cache.js index 93059b81c..c6567f0be 100644 --- a/src/zenserver/frontend/html/pages/cache.js +++ b/src/zenserver/frontend/html/pages/cache.js @@ -56,7 +56,8 @@ export class Page extends ZenPage this._cache_table = section.add_widget(Table, columns, Table.Flag_FitLeft|Table.Flag_PackRight|Table.Flag_AlignNumeric); - this._cache_pager = new Pager(section, 25, () => this._render_cache_page()); + this._cache_pager = new Pager(section, 25, () => this._render_cache_page(), + Pager.make_search_fn(() => this._cache_data, item => item.namespace)); const cache_drop_link = document.createElement("span"); cache_drop_link.className = "dropall zen_action"; cache_drop_link.style.position = "static"; @@ -64,6 +65,7 @@ export class Page extends ZenPage cache_drop_link.addEventListener("click", () => this.drop_all()); this._cache_pager.prepend(cache_drop_link); + const loading = Pager.loading(section); const zcache_info = await new Fetcher().resource("/z$/").json(); const namespaces = zcache_info["Namespaces"] || []; const results = await Promise.allSettled( @@ -75,6 +77,7 @@ export class Page extends ZenPage .sort((a, b) => a.namespace.localeCompare(b.namespace)); this._cache_pager.set_total(this._cache_data.length); this._render_cache_page(); + loading.remove(); // Namespace detail area (inside namespaces section so it collapses together) this._namespace_host = section; diff --git a/src/zenserver/frontend/html/pages/hub.js b/src/zenserver/frontend/html/pages/hub.js index 7ae1deb5c..3cbfe6092 100644 --- a/src/zenserver/frontend/html/pages/hub.js +++ b/src/zenserver/frontend/html/pages/hub.js @@ -6,6 +6,7 @@ import { ZenPage } from "./page.js" import { Fetcher } from "../util/fetcher.js" import { Friendly } from "../util/friendly.js" import { Modal } from "../util/modal.js" +import { flash_highlight } from "../util/widgets.js" //////////////////////////////////////////////////////////////////////////////// const STABLE_STATES = new Set(["provisioned", "hibernated", "crashed"]); @@ -159,8 +160,36 @@ export class Page extends ZenPage this._btn_next.addEventListener("click", () => this._go_page(this._page + 1)); this._btn_provision = _make_bulk_btn("+", "Provision", () => this._show_provision_modal()); this._btn_obliterate = _make_bulk_btn("\uD83D\uDD25", "Obliterate", () => this._show_obliterate_modal()); + this._search_input = document.createElement("input"); + this._search_input.type = "text"; + this._search_input.className = "module-pager-search"; + this._search_input.placeholder = "Search module\u2026"; + this._search_input.addEventListener("keydown", (e) => + { + if (e.key === "Enter") + { + const term = this._search_input.value.trim().toLowerCase(); + if (!term) { return; } + const idx = this._modules_data.findIndex(m => + (m.moduleId || "").toLowerCase().includes(term) + ); + if (idx >= 0) + { + const id = this._modules_data[idx].moduleId; + this._navigate_to_module(id); + this._flash_module(id); + } + else + { + this._search_input.style.outline = "2px solid var(--theme_fail)"; + setTimeout(() => { this._search_input.style.outline = ""; }, 1000); + } + } + }); + pager.appendChild(this._btn_provision); pager.appendChild(this._btn_obliterate); + pager.appendChild(this._search_input); pager.appendChild(this._btn_prev); pager.appendChild(this._pager_label); pager.appendChild(this._btn_next); @@ -173,8 +202,11 @@ export class Page extends ZenPage this._row_cache = new Map(); // moduleId → row refs, for in-place DOM updates this._updating = false; this._page = 0; - this._page_size = 50; + this._page_size = 25; this._expanded = new Set(); // moduleIds with open metrics panel + this._pending_highlight = null; // moduleId to navigate+flash after next poll + this._pending_highlight_timer = null; + this._loading = mod_section.tag().classify("pager-loading").text("Loading\u2026").inner(); await this._update(); this._poll_timer = setInterval(() => this._update(), 2000); @@ -193,6 +225,15 @@ export class Page extends ZenPage this._render_capacity(stats); this._render_modules(status); + if (this._loading) { this._loading.remove(); this._loading = null; } + if (this._pending_highlight && this._module_map.has(this._pending_highlight)) + { + const id = this._pending_highlight; + this._pending_highlight = null; + clearTimeout(this._pending_highlight_timer); + this._navigate_to_module(id); + this._flash_module(id); + } } catch (e) { /* service unavailable */ } finally { this._updating = false; } @@ -844,14 +885,19 @@ export class Page extends ZenPage submit_label: "Provision", on_submit: async (moduleId) => { const resp = await fetch(`/hub/modules/${encodeURIComponent(moduleId)}/provision`, { method: "POST" }); - if (resp.ok) + if (!resp.ok) { - this._navigate_to_module(moduleId); - return true; + const msg = await resp.text(); + error_div.textContent = msg || ("HTTP " + resp.status); + return false; } - const msg = await resp.text(); - error_div.textContent = msg || ("HTTP " + resp.status); - return false; + // Endpoint returns compact binary (CbObjectWriter), not text + if (resp.status === 200 || resp.status === 202) + { + this._pending_highlight = moduleId; + this._pending_highlight_timer = setTimeout(() => { this._pending_highlight = null; }, 5000); + } + return true; } }); } @@ -885,4 +931,10 @@ export class Page extends ZenPage } } + _flash_module(id) + { + const cached = this._row_cache.get(id); + if (cached) { flash_highlight(cached.tr); } + } + } diff --git a/src/zenserver/frontend/html/pages/projects.js b/src/zenserver/frontend/html/pages/projects.js index 52d5dbb88..e613086a9 100644 --- a/src/zenserver/frontend/html/pages/projects.js +++ b/src/zenserver/frontend/html/pages/projects.js @@ -49,7 +49,8 @@ export class Page extends ZenPage this._project_table = section.add_widget(Table, columns, Table.Flag_FitLeft|Table.Flag_PackRight|Table.Flag_Sortable|Table.Flag_AlignNumeric); - this._project_pager = new Pager(section, 25, () => this._render_projects_page()); + this._project_pager = new Pager(section, 25, () => this._render_projects_page(), + Pager.make_search_fn(() => this._projects_data, p => p.Id)); const drop_link = document.createElement("span"); drop_link.className = "dropall zen_action"; drop_link.style.position = "static"; @@ -57,10 +58,12 @@ export class Page extends ZenPage drop_link.addEventListener("click", () => this.drop_all()); this._project_pager.prepend(drop_link); + const loading = Pager.loading(section); this._projects_data = await new Fetcher().resource("/prj/list").json(); this._projects_data.sort((a, b) => a.Id.localeCompare(b.Id)); this._project_pager.set_total(this._projects_data.length); this._render_projects_page(); + loading.remove(); // Project detail area (inside projects section so it collapses together) this._project_host = section; diff --git a/src/zenserver/frontend/html/pages/start.js b/src/zenserver/frontend/html/pages/start.js index 14ec4bd4a..9a3eb6de3 100644 --- a/src/zenserver/frontend/html/pages/start.js +++ b/src/zenserver/frontend/html/pages/start.js @@ -62,7 +62,8 @@ export class Page extends ZenPage ]; this._project_table = section.add_widget(Table, columns); - this._project_pager = new Pager(section, 25, () => this._render_projects_page()); + this._project_pager = new Pager(section, 25, () => this._render_projects_page(), + Pager.make_search_fn(() => this._projects_data, p => p.Id)); const drop_link = document.createElement("span"); drop_link.className = "dropall zen_action"; drop_link.style.position = "static"; @@ -70,10 +71,12 @@ export class Page extends ZenPage drop_link.addEventListener("click", () => this.drop_all("projects")); this._project_pager.prepend(drop_link); + const prj_loading = Pager.loading(section); this._projects_data = await new Fetcher().resource("/prj/list").json(); this._projects_data.sort((a, b) => a.Id.localeCompare(b.Id)); this._project_pager.set_total(this._projects_data.length); this._render_projects_page(); + prj_loading.remove(); } // cache @@ -92,7 +95,8 @@ export class Page extends ZenPage ]; this._cache_table = section.add_widget(Table, columns, Table.Flag_FitLeft|Table.Flag_PackRight); - this._cache_pager = new Pager(section, 25, () => this._render_cache_page()); + this._cache_pager = new Pager(section, 25, () => this._render_cache_page(), + Pager.make_search_fn(() => this._cache_data, item => item.namespace)); const cache_drop_link = document.createElement("span"); cache_drop_link.className = "dropall zen_action"; cache_drop_link.style.position = "static"; @@ -100,6 +104,7 @@ export class Page extends ZenPage cache_drop_link.addEventListener("click", () => this.drop_all("z$")); this._cache_pager.prepend(cache_drop_link); + const cache_loading = Pager.loading(section); const zcache_info = await new Fetcher().resource("/z$/").json(); const namespaces = zcache_info["Namespaces"] || []; const results = await Promise.allSettled( @@ -111,6 +116,7 @@ export class Page extends ZenPage .sort((a, b) => a.namespace.localeCompare(b.namespace)); this._cache_pager.set_total(this._cache_data.length); this._render_cache_page(); + cache_loading.remove(); } // version diff --git a/src/zenserver/frontend/html/util/widgets.js b/src/zenserver/frontend/html/util/widgets.js index 33d6755ac..b8fc720c1 100644 --- a/src/zenserver/frontend/html/util/widgets.js +++ b/src/zenserver/frontend/html/util/widgets.js @@ -6,6 +6,14 @@ import { Component } from "./component.js" import { Friendly } from "../util/friendly.js" //////////////////////////////////////////////////////////////////////////////// +export function flash_highlight(element) +{ + if (!element) { return; } + element.classList.add("pager-search-highlight"); + setTimeout(() => { element.classList.remove("pager-search-highlight"); }, 1500); +} + +//////////////////////////////////////////////////////////////////////////////// class Widget extends Component { } @@ -404,12 +412,14 @@ export class ProgressBar extends Widget //////////////////////////////////////////////////////////////////////////////// export class Pager { - constructor(section, page_size, on_change) + constructor(section, page_size, on_change, search_fn) { this._page = 0; this._page_size = page_size; this._total = 0; this._on_change = on_change; + this._search_fn = search_fn || null; + this._search_input = null; const pager = section.tag().classify("module-pager").inner(); this._btn_prev = document.createElement("button"); @@ -422,6 +432,23 @@ export class Pager this._btn_next.className = "module-pager-btn"; this._btn_next.textContent = "Next \u2192"; this._btn_next.addEventListener("click", () => this._go_page(this._page + 1)); + + if (this._search_fn) + { + this._search_input = document.createElement("input"); + this._search_input.type = "text"; + this._search_input.className = "module-pager-search"; + this._search_input.placeholder = "Search\u2026"; + this._search_input.addEventListener("keydown", (e) => + { + if (e.key === "Enter") + { + this._do_search(this._search_input.value.trim()); + } + }); + pager.appendChild(this._search_input); + } + pager.appendChild(this._btn_prev); pager.appendChild(this._label); pager.appendChild(this._btn_next); @@ -432,7 +459,8 @@ export class Pager prepend(element) { - this._pager.insertBefore(element, this._btn_prev); + const ref = this._search_input || this._btn_prev; + this._pager.insertBefore(element, ref); } set_total(n) @@ -461,6 +489,23 @@ export class Pager this._on_change(); } + _do_search(term) + { + if (!term || !this._search_fn) + { + return; + } + const result = this._search_fn(term); + if (!result) + { + this._search_input.style.outline = "2px solid var(--theme_fail)"; + setTimeout(() => { this._search_input.style.outline = ""; }, 1000); + return; + } + this._go_page(Math.floor(result.index / this._page_size)); + flash_highlight(this._pager.parentNode.querySelector(`[zs_name="${CSS.escape(result.name)}"]`)); + } + _update_ui() { const total = this._total; @@ -474,6 +519,21 @@ export class Pager ? "No items" : `${start}\u2013${end} of ${total}`; } + + static make_search_fn(get_data, get_key) + { + return (term) => { + const t = term.toLowerCase(); + const data = get_data(); + const i = data.findIndex(item => get_key(item).toLowerCase().includes(t)); + return i < 0 ? null : { index: i, name: get_key(data[i]) }; + }; + } + + static loading(section) + { + return section.tag().classify("pager-loading").text("Loading\u2026").inner(); + } } diff --git a/src/zenserver/frontend/html/zen.css b/src/zenserver/frontend/html/zen.css index ca577675b..8d4e60472 100644 --- a/src/zenserver/frontend/html/zen.css +++ b/src/zenserver/frontend/html/zen.css @@ -1749,6 +1749,53 @@ tr:last-child td { text-align: center; } +.module-pager-search { + font-size: 12px; + padding: 4px 8px; + width: 14em; + border: 1px solid var(--theme_g2); + border-radius: 4px; + background: var(--theme_g4); + color: var(--theme_g0); + outline: none; + transition: border-color 0.15s, outline 0.3s; +} + +.module-pager-search:focus { + border-color: var(--theme_p0); +} + +.module-pager-search::placeholder { + color: var(--theme_g1); +} + +@keyframes pager-search-flash { + from { box-shadow: inset 0 0 0 100px var(--theme_p2); } + to { box-shadow: inset 0 0 0 100px transparent; } +} + +.zen_table > .pager-search-highlight > div { + animation: pager-search-flash 1s linear forwards; +} + +.module-table .pager-search-highlight td { + animation: pager-search-flash 1s linear forwards; +} + +@keyframes pager-loading-pulse { + 0%, 100% { opacity: 0.6; } + 50% { opacity: 0.2; } +} + +.pager-loading { + color: var(--theme_g1); + font-style: italic; + font-size: 14px; + font-weight: 600; + padding: 12px 0; + animation: pager-loading-pulse 1.5s ease-in-out infinite; +} + .module-table td, .module-table th { padding-top: 4px; padding-bottom: 4px; diff --git a/src/zenserver/hub/httphubservice.cpp b/src/zenserver/hub/httphubservice.cpp index e6a900066..e4b0c28d0 100644 --- a/src/zenserver/hub/httphubservice.cpp +++ b/src/zenserver/hub/httphubservice.cpp @@ -121,6 +121,11 @@ HttpHubService::HttpHubService(Hub& Hub, HttpProxyHandler& Proxy, HttpStatsServi HttpVerb::kGet); m_Router.RegisterRoute( + "deprovision", + [this](HttpRouterRequest& Req) { HandleDeprovisionAll(Req.ServerRequest()); }, + HttpVerb::kPost); + + m_Router.RegisterRoute( "modules/{moduleid}", [this](HttpRouterRequest& Req) { std::string_view ModuleId = Req.GetCapture(1); @@ -371,6 +376,81 @@ HttpHubService::GetActivityCounter() } void +HttpHubService::HandleDeprovisionAll(HttpServerRequest& Request) +{ + std::vector<std::string> ModulesToDeprovision; + m_Hub.EnumerateModules([&ModulesToDeprovision](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { + if (InstanceInfo.State == HubInstanceState::Provisioned || InstanceInfo.State == HubInstanceState::Hibernated) + { + ModulesToDeprovision.push_back(std::string(ModuleId)); + } + }); + + if (ModulesToDeprovision.empty()) + { + return Request.WriteResponse(HttpResponseCode::OK); + } + std::vector<std::string> Rejected; + std::vector<std::string> Accepted; + std::vector<std::string> Completed; + for (const std::string& ModuleId : ModulesToDeprovision) + { + Hub::Response Response = m_Hub.Deprovision(ModuleId); + switch (Response.ResponseCode) + { + case Hub::EResponseCode::NotFound: + // Ignore + break; + case Hub::EResponseCode::Rejected: + Rejected.push_back(ModuleId); + break; + case Hub::EResponseCode::Accepted: + Accepted.push_back(ModuleId); + break; + case Hub::EResponseCode::Completed: + Completed.push_back(ModuleId); + break; + } + } + if (Rejected.empty() && Accepted.empty() && Completed.empty()) + { + return Request.WriteResponse(HttpResponseCode::OK); + } + HttpResponseCode Response = HttpResponseCode::OK; + CbObjectWriter Writer; + if (!Completed.empty()) + { + Writer.BeginArray("Completed"); + for (const std::string& ModuleId : Completed) + { + Writer.AddString(ModuleId); + } + Writer.EndArray(); // Completed + } + if (!Accepted.empty()) + { + Writer.BeginArray("Accepted"); + for (const std::string& ModuleId : Accepted) + { + Writer.AddString(ModuleId); + } + Writer.EndArray(); // Accepted + Response = HttpResponseCode::Accepted; + } + if (!Rejected.empty()) + { + Writer.BeginArray("Rejected"); + for (const std::string& ModuleId : Rejected) + { + Writer.AddString(ModuleId); + } + Writer.EndArray(); // Rejected + Response = HttpResponseCode::Conflict; + } + Request.WriteResponse(Response, Writer.Save()); +} + +void HttpHubService::HandleModuleGet(HttpServerRequest& Request, std::string_view ModuleId) { Hub::InstanceInfo InstanceInfo; diff --git a/src/zenserver/hub/httphubservice.h b/src/zenserver/hub/httphubservice.h index ff2cb0029..f4d1b0b89 100644 --- a/src/zenserver/hub/httphubservice.h +++ b/src/zenserver/hub/httphubservice.h @@ -53,6 +53,7 @@ private: HttpStatsService& m_StatsService; HttpStatusService& m_StatusService; + void HandleDeprovisionAll(HttpServerRequest& Request); void HandleModuleGet(HttpServerRequest& Request, std::string_view ModuleId); void HandleModuleDelete(HttpServerRequest& Request, std::string_view ModuleId); diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp index 673306cde..b356064f9 100644 --- a/src/zenserver/hub/hydration.cpp +++ b/src/zenserver/hub/hydration.cpp @@ -1572,7 +1572,7 @@ TEST_CASE("hydration.s3.concurrent") ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); - constexpr int kModuleCount = 16; + constexpr int kModuleCount = 6; constexpr int kThreadCount = 4; TestThreading Threading(kThreadCount); diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp index 00b7a67d7..c5f8724ca 100644 --- a/src/zenserver/main.cpp +++ b/src/zenserver/main.cpp @@ -250,7 +250,7 @@ test_main(int argc, char** argv) zen::MaximizeOpenFileCount(); zen::testing::TestRunner Runner; - Runner.ApplyCommandLine(argc, argv); + Runner.ApplyCommandLine(argc, argv, "server.*"); return Runner.Run(); } #endif diff --git a/src/zenserver/storage/buildstore/httpbuildstore.cpp b/src/zenserver/storage/buildstore/httpbuildstore.cpp index bbbb0c37b..f935e2c6b 100644 --- a/src/zenserver/storage/buildstore/httpbuildstore.cpp +++ b/src/zenserver/storage/buildstore/httpbuildstore.cpp @@ -162,96 +162,81 @@ HttpBuildStoreService::GetBlobRequest(HttpRouterRequest& Req) fmt::format("Invalid blob hash '{}'", Hash)); } - std::vector<std::pair<uint64_t, uint64_t>> OffsetAndLengthPairs; + m_BuildStoreStats.BlobReadCount++; + IoBuffer Blob = m_BuildStore.GetBlob(BlobHash); + if (!Blob) + { + return ServerRequest.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Blob {} not found", Hash)); + } + m_BuildStoreStats.BlobHitCount++; + if (ServerRequest.RequestVerb() == HttpVerb::kPost) { + if (ServerRequest.AcceptContentType() != HttpContentType::kCbPackage) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Accept type '{}' is not supported for blob {}, expected '{}'", + ToString(ServerRequest.AcceptContentType()), + Hash, + ToString(HttpContentType::kCbPackage))); + } + CbObject RangePayload = ServerRequest.ReadPayloadObject(); - if (RangePayload) + if (!RangePayload) { - CbArrayView RangesArray = RangePayload["ranges"sv].AsArrayView(); - OffsetAndLengthPairs.reserve(RangesArray.Num()); - for (CbFieldView FieldView : RangesArray) - { - CbObjectView RangeView = FieldView.AsObjectView(); - uint64_t RangeOffset = RangeView["offset"sv].AsUInt64(); - uint64_t RangeLength = RangeView["length"sv].AsUInt64(); - OffsetAndLengthPairs.push_back(std::make_pair(RangeOffset, RangeLength)); - } - if (OffsetAndLengthPairs.size() > MaxRangeCountPerRequestSupported) - { - return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - fmt::format("Number of ranges ({}) for blob request exceeds maximum range count {}", - OffsetAndLengthPairs.size(), - MaxRangeCountPerRequestSupported)); - } + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Missing payload for range request on blob {}", BlobHash)); } - if (OffsetAndLengthPairs.empty()) + + CbArrayView RangesArray = RangePayload["ranges"sv].AsArrayView(); + const uint64_t RangeCount = RangesArray.Num(); + if (RangeCount == 0) { m_BuildStoreStats.BadRequestCount++; return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, - "Fetching blob without ranges must be done with the GET verb"); + "POST request must include a non-empty 'ranges' array"); } - } - else - { - HttpRanges Ranges; - bool HasRange = ServerRequest.TryGetRanges(Ranges); - if (HasRange) + if (RangeCount > MaxRangeCountPerRequestSupported) { - if (Ranges.size() > 1) - { - // Only a single http range is supported, we have limited support for http multirange responses - m_BuildStoreStats.BadRequestCount++; - return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - fmt::format("Multiple ranges in blob request is only supported for {} accept type", - ToString(HttpContentType::kCbPackage))); - } - const HttpRange& FirstRange = Ranges.front(); - OffsetAndLengthPairs.push_back(std::make_pair<uint64_t, uint64_t>(FirstRange.Start, FirstRange.End - FirstRange.Start + 1)); + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse( + HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Range count {} exceeds maximum of {}", RangeCount, MaxRangeCountPerRequestSupported)); } - } - - m_BuildStoreStats.BlobReadCount++; - IoBuffer Blob = m_BuildStore.GetBlob(BlobHash); - if (!Blob) - { - return ServerRequest.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("Blob with hash '{}' could not be found", Hash)); - } - m_BuildStoreStats.BlobHitCount++; - if (OffsetAndLengthPairs.empty()) - { - return ServerRequest.WriteResponse(HttpResponseCode::OK, Blob.GetContentType(), Blob); - } + const uint64_t BlobSize = Blob.GetSize(); + std::vector<IoBuffer> RangeBuffers; + RangeBuffers.reserve(RangeCount); - if (ServerRequest.AcceptContentType() == HttpContentType::kCbPackage) - { - const uint64_t BlobSize = Blob.GetSize(); + CbPackage ResponsePackage; + CbObjectWriter Writer; - CbPackage ResponsePackage; - std::vector<IoBuffer> RangeBuffers; - CbObjectWriter Writer; Writer.BeginArray("ranges"sv); - for (const std::pair<uint64_t, uint64_t>& Range : OffsetAndLengthPairs) + for (CbFieldView FieldView : RangesArray) { - const uint64_t MaxBlobSize = Range.first < BlobSize ? BlobSize - Range.first : 0; - const uint64_t RangeSize = Min(Range.second, MaxBlobSize); + CbObjectView RangeView = FieldView.AsObjectView(); + uint64_t RangeOffset = RangeView["offset"sv].AsUInt64(); + uint64_t RangeLength = RangeView["length"sv].AsUInt64(); + + const uint64_t MaxBlobSize = RangeOffset < BlobSize ? BlobSize - RangeOffset : 0; + const uint64_t RangeSize = Min(RangeLength, MaxBlobSize); Writer.BeginObject(); { - if (Range.first + RangeSize <= BlobSize) + if (RangeOffset + RangeSize <= BlobSize) { - RangeBuffers.push_back(IoBuffer(Blob, Range.first, RangeSize)); - Writer.AddInteger("offset"sv, Range.first); + RangeBuffers.push_back(IoBuffer(Blob, RangeOffset, RangeSize)); + Writer.AddInteger("offset"sv, RangeOffset); Writer.AddInteger("length"sv, RangeSize); } else { - Writer.AddInteger("offset"sv, Range.first); + Writer.AddInteger("offset"sv, RangeOffset); Writer.AddInteger("length"sv, 0); } } @@ -259,7 +244,7 @@ HttpBuildStoreService::GetBlobRequest(HttpRouterRequest& Req) } Writer.EndArray(); - CompositeBuffer Ranges(RangeBuffers); + CompositeBuffer Ranges(std::move(RangeBuffers)); CbAttachment PayloadAttachment(std::move(Ranges), BlobHash); Writer.AddAttachment("payload", PayloadAttachment); @@ -269,32 +254,21 @@ HttpBuildStoreService::GetBlobRequest(HttpRouterRequest& Req) ResponsePackage.SetObject(HeaderObject); CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage); - uint64_t ResponseSize = RpcResponseBuffer.GetSize(); - ZEN_UNUSED(ResponseSize); return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } else { - if (OffsetAndLengthPairs.size() != 1) + HttpRanges RequestedRangeHeader; + bool HasRange = ServerRequest.TryGetRanges(RequestedRangeHeader); + if (HasRange) { - // Only a single http range is supported, we have limited support for http multirange responses - m_BuildStoreStats.BadRequestCount++; - return ServerRequest.WriteResponse( - HttpResponseCode::BadRequest, - HttpContentType::kText, - fmt::format("Multiple ranges in blob request is only supported for {} accept type", ToString(HttpContentType::kCbPackage))); + // Standard HTTP GET with Range header: framework handles 206, Content-Range, and 416 on OOB. + return ServerRequest.WriteResponse(HttpContentType::kBinary, Blob, RequestedRangeHeader); } - - const std::pair<uint64_t, uint64_t>& OffsetAndLength = OffsetAndLengthPairs.front(); - const uint64_t BlobSize = Blob.GetSize(); - const uint64_t MaxBlobSize = OffsetAndLength.first < BlobSize ? BlobSize - OffsetAndLength.first : 0; - const uint64_t RangeSize = Min(OffsetAndLength.second, MaxBlobSize); - if (OffsetAndLength.first + RangeSize > BlobSize) + else { - return ServerRequest.WriteResponse(HttpResponseCode::NoContent); + return ServerRequest.WriteResponse(HttpResponseCode::OK, Blob.GetContentType(), Blob); } - Blob = IoBuffer(Blob, OffsetAndLength.first, RangeSize); - return ServerRequest.WriteResponse(HttpResponseCode::OK, ZenContentType::kBinary, Blob); } } diff --git a/src/zenserver/storage/objectstore/objectstore.cpp b/src/zenserver/storage/objectstore/objectstore.cpp index e34cd0445..bab9df06d 100644 --- a/src/zenserver/storage/objectstore/objectstore.cpp +++ b/src/zenserver/storage/objectstore/objectstore.cpp @@ -637,11 +637,7 @@ HttpObjectStoreService::GetObject(HttpRouterRequest& Request, const std::string_ } HttpRanges Ranges; - if (Request.ServerRequest().TryGetRanges(Ranges); Ranges.size() > 1) - { - // Multi-range is not supported, fall back to full response per RFC 7233 - Ranges.clear(); - } + Request.ServerRequest().TryGetRanges(Ranges); FileContents File; { @@ -665,42 +661,34 @@ HttpObjectStoreService::GetObject(HttpRouterRequest& Request, const std::string_ if (Ranges.empty()) { - const uint64_t TotalServed = m_TotalBytesServed.fetch_add(FileBuf.Size()) + FileBuf.Size(); - + const uint64_t TotalServed = m_TotalBytesServed.fetch_add(FileBuf.GetSize()) + FileBuf.GetSize(); ZEN_LOG_DEBUG(LogObj, "GET - '{}/{}' ({}) [OK] (Served: {})", BucketName, RelativeBucketPath, - NiceBytes(FileBuf.Size()), + NiceBytes(FileBuf.GetSize()), NiceBytes(TotalServed)); - - Request.ServerRequest().WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, FileBuf); } - else + else if (Ranges.size() == 1) { - const auto Range = Ranges[0]; - const uint64_t RangeSize = 1 + (Range.End - Range.Start); - const uint64_t TotalServed = m_TotalBytesServed.fetch_add(RangeSize) + RangeSize; - - ZEN_LOG_DEBUG(LogObj, - "GET - '{}/{}' (Range: {}-{}) ({}/{}) [OK] (Served: {})", - BucketName, - RelativeBucketPath, - Range.Start, - Range.End, - NiceBytes(RangeSize), - NiceBytes(FileBuf.Size()), - NiceBytes(TotalServed)); - - MemoryView RangeView = FileBuf.GetView().Mid(Range.Start, RangeSize); - if (RangeView.GetSize() != RangeSize) + const uint64_t TotalSize = FileBuf.GetSize(); + const uint64_t RangeEnd = (Ranges[0].End != ~uint64_t(0)) ? Ranges[0].End : TotalSize - 1; + if (RangeEnd < TotalSize && Ranges[0].Start <= RangeEnd) { - return Request.ServerRequest().WriteResponse(HttpResponseCode::BadRequest); + const uint64_t RangeSize = 1 + (RangeEnd - Ranges[0].Start); + const uint64_t TotalServed = m_TotalBytesServed.fetch_add(RangeSize) + RangeSize; + ZEN_LOG_DEBUG(LogObj, + "GET - '{}/{}' (Range: {}-{}) ({}/{}) [OK] (Served: {})", + BucketName, + RelativeBucketPath, + Ranges[0].Start, + RangeEnd, + NiceBytes(RangeSize), + NiceBytes(TotalSize), + NiceBytes(TotalServed)); } - - IoBuffer RangeBuf = IoBuffer(IoBuffer::Wrap, RangeView.GetData(), RangeView.GetSize()); - Request.ServerRequest().WriteResponse(HttpResponseCode::PartialContent, HttpContentType::kBinary, RangeBuf); } + Request.ServerRequest().WriteResponse(HttpContentType::kBinary, FileBuf, Ranges); } void diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 43dc389e2..815762e3b 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -1391,7 +1391,7 @@ TEST_CASE("compactcas.compact.gc") { ScopedTemporaryDirectory TempDir; - const int kIterationCount = 1000; + const int kIterationCount = 200; std::vector<IoHash> Keys(kIterationCount); @@ -1504,7 +1504,7 @@ TEST_CASE("compactcas.threadedinsert") ScopedTemporaryDirectory TempDir; const uint64_t kChunkSize = 1048; - const int32_t kChunkCount = 2048; + const int32_t kChunkCount = 512; uint64_t ExpectedSize = 0; tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher> Chunks; @@ -1803,7 +1803,7 @@ TEST_CASE("compactcas.restart") } const uint64_t kChunkSize = 1048 + 395; - const size_t kChunkCount = 7167; + const size_t kChunkCount = 2000; std::vector<IoHash> Hashes; Hashes.reserve(kChunkCount); @@ -1984,9 +1984,8 @@ TEST_CASE("compactcas.iteratechunks") WorkerThreadPool ThreadPool(Max(GetHardwareConcurrency() - 1u, 2u), "put"); const uint64_t kChunkSize = 1048 + 395; - const size_t kChunkCount = 63840; + const size_t kChunkCount = 10000; - for (uint32_t N = 0; N < 2; N++) { GcManager Gc; CasContainerStrategy Cas(Gc); @@ -2017,7 +2016,7 @@ TEST_CASE("compactcas.iteratechunks") size_t BatchCount = Min<size_t>(kChunkCount - Offset, 512u); WorkLatch.AddCount(1); ThreadPool.ScheduleWork( - [N, &WorkLatch, &InsertLock, &ChunkHashesLookup, &ExpectedSize, &Hashes, &Cas, Offset, BatchCount]() { + [&WorkLatch, &InsertLock, &ChunkHashesLookup, &ExpectedSize, &Hashes, &Cas, Offset, BatchCount]() { auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); std::vector<IoBuffer> BatchBlobs; @@ -2028,7 +2027,7 @@ TEST_CASE("compactcas.iteratechunks") while (BatchBlobs.size() < BatchCount) { IoBuffer Chunk = CreateRandomBlob( - N + kChunkSize + ((BatchHashes.size() % 100) + (BatchHashes.size() % 7) * 315u + Offset % 377)); + kChunkSize + ((BatchHashes.size() % 100) + (BatchHashes.size() % 7) * 315u + Offset % 377)); IoHash Hash = IoHash::HashBuffer(Chunk); { RwLock::ExclusiveLockScope __(InsertLock); diff --git a/src/zenutil/consul/consul.cpp b/src/zenutil/consul/consul.cpp index c9884ea61..430ad0e32 100644 --- a/src/zenutil/consul/consul.cpp +++ b/src/zenutil/consul/consul.cpp @@ -689,7 +689,7 @@ TEST_CASE("util.consul.service_lifecycle") REQUIRE(Client.RegisterService(Info)); REQUIRE(Client.HasService(ServiceId)); - REQUIRE(WaitForCondition([&]() { return HealthServer.Mock.HealthCheckCount.load() >= 1; }, 10000)); + REQUIRE(WaitForCondition([&]() { return HealthServer.Mock.HealthCheckCount.load() >= 1; }, 10000, 50)); CHECK(HealthServer.Mock.HealthCheckCount.load() >= 1); CHECK_EQ(GetCheckStatus(Client, ServiceId), "passing"); @@ -709,13 +709,13 @@ TEST_CASE("util.consul.service_lifecycle") CHECK_EQ(HealthServer.Mock.HealthCheckCount.load(), 0); CHECK_EQ(GetCheckStatus(Client, ServiceId), "passing"); - REQUIRE(WaitForCondition([&]() { return HealthServer.Mock.HealthCheckCount.load() >= 1; }, 10000)); + REQUIRE(WaitForCondition([&]() { return HealthServer.Mock.HealthCheckCount.load() >= 1; }, 10000, 50)); CHECK_EQ(GetCheckStatus(Client, ServiceId), "passing"); HealthServer.Mock.FailHealth.store(true); // Wait for Consul to observe the failing check - REQUIRE(WaitForCondition([&]() { return GetCheckStatus(Client, ServiceId) == "critical"; }, 10000)); + REQUIRE(WaitForCondition([&]() { return GetCheckStatus(Client, ServiceId) == "critical"; }, 10000, 50)); CHECK_EQ(GetCheckStatus(Client, ServiceId), "critical"); // Phase 4: Explicit deregister while critical diff --git a/src/zenutil/filesystemutils.cpp b/src/zenutil/filesystemutils.cpp index 9b7953f95..ccc42a838 100644 --- a/src/zenutil/filesystemutils.cpp +++ b/src/zenutil/filesystemutils.cpp @@ -657,7 +657,7 @@ namespace { void GenerateFile(const std::filesystem::path& Path) { BasicFile _(Path, BasicFile::Mode::kTruncate); } } // namespace -TEST_SUITE_BEGIN("zenutil.filesystemutils"); +TEST_SUITE_BEGIN("util.filesystemutils"); TEST_CASE("filesystemutils.CleanDirectory") { diff --git a/src/zenutil/process/subprocessmanager.cpp b/src/zenutil/process/subprocessmanager.cpp index e908dd63a..597df3c15 100644 --- a/src/zenutil/process/subprocessmanager.cpp +++ b/src/zenutil/process/subprocessmanager.cpp @@ -1209,7 +1209,17 @@ TEST_CASE("SubprocessManager.SpawnAndDetectExit") CallbackFired = true; }); - IoContext.run_for(5s); + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (CallbackFired) + { + break; + } + } + } CHECK(CallbackFired); CHECK(ReceivedExitCode == 42); @@ -1234,7 +1244,17 @@ TEST_CASE("SubprocessManager.SpawnAndDetectCleanExit") CallbackFired = true; }); - IoContext.run_for(5s); + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (CallbackFired) + { + break; + } + } + } CHECK(CallbackFired); CHECK(ReceivedExitCode == 0); @@ -1259,7 +1279,17 @@ TEST_CASE("SubprocessManager.StdoutCapture") ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; }); - IoContext.run_for(5s); + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (Exited) + { + break; + } + } + } CHECK(Exited); std::string Captured = Proc->GetCapturedStdout(); @@ -1288,7 +1318,17 @@ TEST_CASE("SubprocessManager.StderrCapture") ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; }); - IoContext.run_for(5s); + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (Exited) + { + break; + } + } + } CHECK(Exited); std::string CapturedErr = Proc->GetCapturedStderr(); @@ -1320,7 +1360,17 @@ TEST_CASE("SubprocessManager.StdoutCallback") [&](ManagedProcess&, int) { Exited = true; }, [&](ManagedProcess&, std::string_view Data) { ReceivedData.append(Data); }); - IoContext.run_for(5s); + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (Exited) + { + break; + } + } + } CHECK(Exited); CHECK(ReceivedData.find("callback_test") != std::string::npos); @@ -1343,8 +1393,18 @@ TEST_CASE("SubprocessManager.MetricsSampling") ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { Exited = true; }); - // Run for enough time to get metrics samples - IoContext.run_for(1s); + // Poll until metrics are available + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (Proc->GetLatestMetrics().WorkingSetSize > 0) + { + break; + } + } + } ProcessMetrics Metrics = Proc->GetLatestMetrics(); CHECK(Metrics.WorkingSetSize > 0); @@ -1353,7 +1413,17 @@ TEST_CASE("SubprocessManager.MetricsSampling") CHECK(Snapshot.size() == 1); // Let it finish - IoContext.run_for(3s); + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 10'000) + { + IoContext.run_for(10ms); + if (Exited) + { + break; + } + } + } CHECK(Exited); } @@ -1402,12 +1472,31 @@ TEST_CASE("SubprocessManager.KillAndWaitForExit") ManagedProcess* Proc = Manager.Spawn(AppStub, CmdLine, Options, [&](ManagedProcess&, int) { CallbackFired = true; }); // Let it start - IoContext.run_for(200ms); + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (Proc->IsRunning()) + { + break; + } + } + } Proc->Kill(); - IoContext.run_for(2s); - + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (CallbackFired) + { + break; + } + } + } CHECK(CallbackFired); } @@ -1428,7 +1517,17 @@ TEST_CASE("SubprocessManager.AdoptProcess") Manager.Adopt(ProcessHandle(Result), [&](ManagedProcess&, int ExitCode) { ReceivedExitCode = ExitCode; }); - IoContext.run_for(5s); + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (ReceivedExitCode != -1) + { + break; + } + } + } CHECK(ReceivedExitCode == 7); } @@ -1451,7 +1550,17 @@ TEST_CASE("SubprocessManager.UserTag") Proc->SetTag("my-worker-1"); CHECK(Proc->GetTag() == "my-worker-1"); - IoContext.run_for(5s); + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (!ReceivedTag.empty()) + { + break; + } + } + } CHECK(ReceivedTag == "my-worker-1"); } @@ -1481,7 +1590,17 @@ TEST_CASE("ProcessGroup.SpawnAndMembership") CHECK(Group->GetProcessCount() == 2); CHECK(Manager.GetProcessCount() == 2); - IoContext.run_for(5s); + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (ExitCount == 2) + { + break; + } + } + } CHECK(ExitCount == 2); } @@ -1531,7 +1650,17 @@ TEST_CASE("ProcessGroup.AggregateMetrics") Group->Spawn(AppStub, CmdLine, Options, [](ManagedProcess&, int) {}); // Wait for metrics sampling - IoContext.run_for(1s); + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (Group->GetAggregateMetrics().TotalWorkingSetSize > 0) + { + break; + } + } + } AggregateProcessMetrics GroupAgg = Group->GetAggregateMetrics(); CHECK(GroupAgg.ProcessCount == 2); @@ -1597,7 +1726,17 @@ TEST_CASE("ProcessGroup.MixedGroupedAndUngrouped") CHECK(Group->GetProcessCount() == 2); CHECK(Manager.GetProcessCount() == 3); - IoContext.run_for(5s); + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 5'000) + { + IoContext.run_for(10ms); + if (GroupExitCount == 2 && UngroupedExitCode != -1) + { + break; + } + } + } CHECK(GroupExitCount == 2); CHECK(UngroupedExitCode == 0); diff --git a/thirdparty/VERSIONS.md b/thirdparty/VERSIONS.md index 38a1415d3..9b3dcd103 100644 --- a/thirdparty/VERSIONS.md +++ b/thirdparty/VERSIONS.md @@ -19,6 +19,7 @@ dependency. * doctest - v2.4.12 from https://github.com/doctest/doctest/releases/download/v2.4.12/doctest.h * fmt - v12.0.0 from https://github.com/fmtlib/fmt/archive/refs/tags/12.0.0.tar.gz * robin-map - v1.4.0 from https://github.com/Tessil/robin-map/archive/refs/tags/v1.4.0.tar.gz +* rpmalloc - 1.5.0-dev (develop branch commit 262c698d7019, 2026-04-10) from https://github.com/mjansson/rpmalloc (`global_page_free_overflow` and `global_page_free_retain` manually tweaked) * ryml - v0.5.0 from https://github.com/biojppm/rapidyaml (note that there are submodules here which have also been fetched, after stripping all `.git` metadata, for future updates it's probably easier to just grab the .zip/.tar.gz since it includes all submodules) * sol2 - v3.5.0 from https://github.com/ThePhD/sol2/archive/refs/tags/v3.5.0.tar.gz (single/single.py generates the headers) * spdlog - v1.16.0 from https://github.com/gabime/spdlog/releases/tag/v1.16.0.zip diff --git a/thirdparty/rpmalloc/rpmalloc.c b/thirdparty/rpmalloc/rpmalloc.c index 08cefe6dd..b8fe16a0a 100644 --- a/thirdparty/rpmalloc/rpmalloc.c +++ b/thirdparty/rpmalloc/rpmalloc.c @@ -57,6 +57,9 @@ #endif #if PLATFORM_WINDOWS +#ifndef WIN32_LEAN_AND_MEAN +#define WIN32_LEAN_AND_MEAN +#endif #include <windows.h> #include <fibersapi.h> static DWORD fls_key; @@ -184,6 +187,12 @@ madvise(caddr_t, size_t, int); #define SPAN_SIZE (256 * 1024 * 1024) #define SPAN_MASK (~((uintptr_t)(SPAN_SIZE - 1))) +#if ENABLE_VALIDATE_ARGS +//! Maximum allocation size to avoid integer overflow +#undef MAX_ALLOC_SIZE +#define MAX_ALLOC_SIZE (((size_t)-1) - SPAN_SIZE) +#endif + //////////// /// /// Utility macros @@ -258,13 +267,13 @@ static inline size_t rpmalloc_clz(uintptr_t x) { #if ARCH_64BIT #if defined(_MSC_VER) && !defined(__clang__) - return (size_t)_lzcnt_u64(x); + return (size_t)__lzcnt64(x); #else return (size_t)__builtin_clzll(x); #endif #else #if defined(_MSC_VER) && !defined(__clang__) - return (size_t)_lzcnt_u32(x); + return (size_t)__lzcnt32(x); #else return (size_t)__builtin_clzl(x); #endif @@ -279,9 +288,9 @@ wait_spin(void) { #else _mm_pause(); #endif -#elif defined(__x86_64__) || defined(__i386__) +#elif (defined(__x86_64__) || defined(__i386__)) && !defined(_M_ARM64EC) __asm__ volatile("pause" ::: "memory"); -#elif defined(__aarch64__) || (defined(__arm__) && __ARM_ARCH >= 7) +#elif defined(__aarch64__) || (defined(__arm__) && __ARM_ARCH >= 7) || defined(_M_ARM64EC) __asm__ volatile("yield" ::: "memory"); #elif defined(__powerpc__) || defined(__powerpc64__) // No idea if ever been compiled in such archs but ... as precaution @@ -468,6 +477,9 @@ struct heap_t { uint32_t offset; //! Memory map size size_t mapped_size; +#if RPMALLOC_HEAP_STATISTICS + struct rpmalloc_heap_statistics_t stats; +#endif }; _Static_assert(sizeof(page_t) <= PAGE_HEADER_SIZE, "Invalid page header size"); @@ -530,10 +542,10 @@ static const size_class_t global_size_class[SIZE_CLASS_COUNT] = { LCLASS(262144), LCLASS(327680), LCLASS(393216), LCLASS(458752), LCLASS(524288)}; //! Threshold number of pages for when free pages are decommitted -static uint32_t global_page_free_overflow[4] = {16, 8, 2, 0}; +static uint32_t global_page_free_overflow[4] = {64, 16, 4, 0}; //! Number of pages to retain when free page threshold overflows -static uint32_t global_page_free_retain[4] = {4, 2, 1, 0}; +static uint32_t global_page_free_retain[4] = {16, 4, 2, 0}; //! OS huge page support static int os_huge_pages; @@ -719,6 +731,8 @@ os_mmap(size_t size, size_t alignment, size_t* offset, size_t* mapped_size) { // page to avoid saturating the OS commit limit #if ENABLE_DECOMMIT DWORD do_commit = 0; + if (global_config.disable_decommit) + do_commit = MEM_COMMIT; #else DWORD do_commit = MEM_COMMIT; #endif @@ -788,35 +802,29 @@ os_mmap(size_t size, size_t alignment, size_t* offset, size_t* mapped_size) { page_mapped_current, memory_order_relaxed, memory_order_relaxed)) break; } -#if ENABLE_DECOMMIT - size_t page_active_current = - atomic_fetch_add_explicit(&global_statistics.page_active, page_count, memory_order_relaxed) + page_count; - size_t page_active_peak = atomic_load_explicit(&global_statistics.page_active_peak, memory_order_relaxed); - while (page_active_current > page_active_peak) { - if (atomic_compare_exchange_weak_explicit(&global_statistics.page_active_peak, &page_active_peak, - page_active_current, memory_order_relaxed, memory_order_relaxed)) - break; - } -#endif #endif return ptr; } -static void +static int os_mcommit(void* address, size_t size) { #if ENABLE_DECOMMIT - if (global_config.disable_decommit) - return; + if (global_config.disable_decommit) { + return 0; + } #if PLATFORM_WINDOWS if (!VirtualAlloc(address, size, MEM_COMMIT, PAGE_READWRITE)) { + if (global_memory_interface->map_fail_callback && global_memory_interface->map_fail_callback(size)) + return os_mcommit(address, size); rpmalloc_assert(0, "Failed to commit virtual memory block"); + return 1; } #else - /* - if (mprotect(address, size, PROT_READ | PROT_WRITE)) { - rpmalloc_assert(0, "Failed to commit virtual memory block"); - } - */ + /* + if (mprotect(address, size, PROT_READ | PROT_WRITE)) { + rpmalloc_assert(0, "Failed to commit virtual memory block"); + } + */ #endif #if ENABLE_STATISTICS size_t page_count = size / global_config.page_size; @@ -833,23 +841,25 @@ os_mcommit(void* address, size_t size) { #endif (void)sizeof(address); (void)sizeof(size); + return 0; } -static void +static int os_mdecommit(void* address, size_t size) { #if ENABLE_DECOMMIT if (global_config.disable_decommit) - return; + return 1; #if PLATFORM_WINDOWS if (!VirtualFree(address, size, MEM_DECOMMIT)) { rpmalloc_assert(0, "Failed to decommit virtual memory block"); + return 1; } #else - /* - if (mprotect(address, size, PROT_NONE)) { - rpmalloc_assert(0, "Failed to decommit virtual memory block"); - } - */ + /* + if (mprotect(address, size, PROT_NONE)) { + rpmalloc_assert(0, "Failed to decommit virtual memory block"); + } + */ #if defined(MADV_DONTNEED) if (madvise(address, size, MADV_DONTNEED)) { #elif defined(MADV_FREE_REUSABLE) @@ -865,6 +875,7 @@ os_mdecommit(void* address, size_t size) { if (posix_madvise(address, size, POSIX_MADV_DONTNEED)) { #endif rpmalloc_assert(0, "Failed to decommit virtual memory block"); + return 1; } #endif #if ENABLE_STATISTICS @@ -879,6 +890,7 @@ os_mdecommit(void* address, size_t size) { (void)sizeof(address); (void)sizeof(size); #endif + return 0; } static void @@ -986,19 +998,29 @@ page_decommit_memory_pages(page_t* page) { return; void* extra_page = pointer_offset(page, global_config.page_size); size_t extra_page_size = page_get_size(page) - global_config.page_size; - global_memory_interface->memory_decommit(extra_page, extra_page_size); + if (global_memory_interface->memory_decommit(extra_page, extra_page_size) != 0) + return; +#if RPMALLOC_HEAP_STATISTICS && ENABLE_DECOMMIT + if (page->heap) + page->heap->stats.committed_size -= extra_page_size; +#endif page->is_decommitted = 1; } -static inline void +static inline int page_commit_memory_pages(page_t* page) { if (!page->is_decommitted) - return; + return 0; void* extra_page = pointer_offset(page, global_config.page_size); size_t extra_page_size = page_get_size(page) - global_config.page_size; - global_memory_interface->memory_commit(extra_page, extra_page_size); + if (global_memory_interface->memory_commit(extra_page, extra_page_size) != 0) + return 1; page->is_decommitted = 0; #if ENABLE_DECOMMIT +#if RPMALLOC_HEAP_STATISTICS + if (page->heap) + page->heap->stats.committed_size += extra_page_size; +#endif #if !defined(__APPLE__) // When page is recommitted, the blocks in the second memory page and forward // will be zeroed out by OS - take advantage in zalloc/calloc calls and make sure @@ -1008,6 +1030,7 @@ page_commit_memory_pages(page_t* page) { page->is_zero = 1; #endif #endif + return 0; } static void @@ -1090,7 +1113,7 @@ static NOINLINE void page_adopt_thread_free_block_list(page_t* page) { if (page->local_free) return; - unsigned long long thread_free = atomic_load_explicit(&page->thread_free, memory_order_acquire); + unsigned long long thread_free = atomic_load_explicit(&page->thread_free, memory_order_relaxed); if (thread_free != 0) { // Other threads can only replace with another valid list head, this will never change to 0 in other threads while (!atomic_compare_exchange_weak_explicit(&page->thread_free, &thread_free, 0, memory_order_acquire, @@ -1243,8 +1266,13 @@ span_allocate_page(span_t* span) { #if ENABLE_DECOMMIT // The first page is always committed on initial span map of memory - if (span->page_initialized) - global_memory_interface->memory_commit(page, span->page_size); + if (span->page_initialized) { + if (global_memory_interface->memory_commit(page, span->page_size) != 0) + return 0; +#if RPMALLOC_HEAP_STATISTICS + heap->stats.committed_size += span->page_size; +#endif + } #endif ++span->page_initialized; @@ -1268,6 +1296,16 @@ span_allocate_page(span_t* span) { static NOINLINE void span_deallocate_block(span_t* span, page_t* page, void* block) { if (UNEXPECTED(page->page_type == PAGE_HUGE)) { +#if RPMALLOC_HEAP_STATISTICS + if (span->heap) { + span->heap->stats.mapped_size -= span->mapped_size; +#if ENABLE_DECOMMIT + span->heap->stats.committed_size -= span->page_count * span->page_size; +#else + span->heap->stats.committed_size -= mapped_size; +#endif + } +#endif global_memory_interface->memory_unmap(span, span->offset, span->mapped_size); return; } @@ -1303,6 +1341,16 @@ block_deallocate(block_t* block) { page_t* page = span_get_page_from_block(span, block); const int is_thread_local = page_is_thread_heap(page); +#if RPMALLOC_HEAP_STATISTICS + heap_t* heap = span->heap; + if (heap) { + if (span->page_type <= PAGE_LARGE) + heap->stats.allocated_size -= page->block_size; + else + heap->stats.allocated_size -= ((size_t)span->page_size * (size_t)span->page_count); + } +#endif + // Optimized path for thread local free with non-huge block in page // that has no aligned blocks if (EXPECTED(is_thread_local != 0)) { @@ -1373,7 +1421,8 @@ heap_allocate_new(void) { size_t mapped_size = 0; block_t* block = global_memory_interface->memory_map(heap_size, 0, &offset, &mapped_size); #if ENABLE_DECOMMIT - global_memory_interface->memory_commit(block, heap_size); + if (global_memory_interface->memory_commit(block, heap_size) != 0) + return 0; #endif heap_t* heap = heap_initialize((void*)block); heap->offset = (uint32_t)offset; @@ -1442,7 +1491,7 @@ heap_page_free_decommit(heap_t* heap, uint32_t page_type, uint32_t page_retain_c } } -static inline void +static inline int heap_make_free_page_available(heap_t* heap, uint32_t size_class, page_t* page) { page->size_class = size_class; page->block_size = global_size_class[size_class].block_size; @@ -1463,8 +1512,9 @@ heap_make_free_page_available(heap_t* heap, uint32_t size_class, page_t* page) { if (head) head->prev = page; heap->page_available[size_class] = page; - if (page->is_decommitted) - page_commit_memory_pages(page); + if (page->is_decommitted != 0) + return page_commit_memory_pages(page); + return 0; } //! Find or allocate a span for the given page type with the given size class @@ -1478,6 +1528,9 @@ heap_get_span(heap_t* heap, page_type_t page_type) { size_t offset = 0; size_t mapped_size = 0; span_t* span = global_memory_interface->memory_map(SPAN_SIZE, SPAN_SIZE, &offset, &mapped_size); +#if RPMALLOC_HEAP_STATISTICS + heap->stats.mapped_size += mapped_size; +#endif if (EXPECTED(span != 0)) { uint32_t page_count = 0; uint32_t page_size = 0; @@ -1496,7 +1549,15 @@ heap_get_span(heap_t* heap, page_type_t page_type) { page_address_mask = LARGE_PAGE_MASK; } #if ENABLE_DECOMMIT - global_memory_interface->memory_commit(span, page_size); + if (global_memory_interface->memory_commit(span, page_size) != 0) + return 0; +#endif +#if RPMALLOC_HEAP_STATISTICS +#if ENABLE_DECOMMIT + heap->stats.committed_size += page_size; +#else + heap->stats.committed_size += mapped_size; +#endif #endif span->heap = heap; span->page_type = page_type; @@ -1523,9 +1584,9 @@ heap_get_page_generic(heap_t* heap, uint32_t size_class) { page_type_t page_type = get_page_type(size_class); // Check if there is a free page from multithreaded deallocations - uintptr_t block_mt = atomic_load_explicit(&heap->thread_free[page_type], memory_order_acquire); + uintptr_t block_mt = atomic_load_explicit(&heap->thread_free[page_type], memory_order_relaxed); if (UNEXPECTED(block_mt != 0)) { - while (!atomic_compare_exchange_weak_explicit(&heap->thread_free[page_type], &block_mt, 0, memory_order_release, + while (!atomic_compare_exchange_weak_explicit(&heap->thread_free[page_type], &block_mt, 0, memory_order_acquire, memory_order_relaxed)) { wait_spin(); } @@ -1547,7 +1608,8 @@ heap_get_page_generic(heap_t* heap, uint32_t size_class) { rpmalloc_assert(heap->page_free_commit_count[page_type] > 0, "Free committed page count out of sync"); --heap->page_free_commit_count[page_type]; } - heap_make_free_page_available(heap, size_class, page); + if (heap_make_free_page_available(heap, size_class, page) != 0) + return 0; return page; } rpmalloc_assert(heap->page_free_commit_count[page_type] == 0, "Free committed page count out of sync"); @@ -1565,7 +1627,8 @@ heap_get_page_generic(heap_t* heap, uint32_t size_class) { span_t* span = heap_get_span(heap, page_type); if (EXPECTED(span != 0)) { page = span_allocate_page(span); - heap_make_free_page_available(page->heap, size_class, page); + if (heap_make_free_page_available(page->heap, size_class, page) != 0) + return 0; } return page; @@ -1604,6 +1667,7 @@ heap_allocate_block_small_to_large(heap_t* heap, uint32_t size_class, unsigned i static NOINLINE RPMALLOC_ALLOCATOR void* heap_allocate_block_huge(heap_t* heap, size_t size, unsigned int zero) { if (heap->id == 0) { + // Thread has not yet initialized, assign heap and try again rpmalloc_initialize(0); heap = get_thread_heap(); } @@ -1614,7 +1678,16 @@ heap_allocate_block_huge(heap_t* heap, size_t size, unsigned int zero) { if (block) { span_t* span = block; #if ENABLE_DECOMMIT - global_memory_interface->memory_commit(span, alloc_size); + if (global_memory_interface->memory_commit(span, alloc_size) != 0) + return 0; +#endif +#if RPMALLOC_HEAP_STATISTICS + heap->stats.mapped_size += mapped_size; +#if ENABLE_DECOMMIT + heap->stats.committed_size += alloc_size; +#else + heap->stats.committed_size += mapped_size; +#endif #endif span->heap = heap; span->page_type = PAGE_HUGE; @@ -1635,6 +1708,9 @@ heap_allocate_block_huge(heap_t* heap, size_t size, unsigned int zero) { void* ptr = pointer_offset(block, SPAN_HEADER_SIZE); if (zero) memset(ptr, 0, size); +#if RPMALLOC_HEAP_STATISTICS + heap->stats.allocated_size += size; +#endif return ptr; } return 0; @@ -1644,6 +1720,10 @@ static RPMALLOC_ALLOCATOR NOINLINE void* heap_allocate_block_generic(heap_t* heap, size_t size, unsigned int zero) { uint32_t size_class = get_size_class(size); if (EXPECTED(size_class < SIZE_CLASS_COUNT)) { +#if RPMALLOC_HEAP_STATISTICS + heap->stats.allocated_size += global_size_class[size_class].block_size; +#endif + block_t* block = heap_pop_local_free(heap, size_class); if (EXPECTED(block != 0)) { // Fast track with small block available in heap level local free list @@ -1668,6 +1748,9 @@ heap_allocate_block(heap_t* heap, size_t size, unsigned int zero) { // Fast track with small block available in heap level local free list if (zero) memset(block, 0, global_size_class[size_class].block_size); +#if RPMALLOC_HEAP_STATISTICS + heap->stats.allocated_size += global_size_class[size_class].block_size; +#endif return block; } } @@ -1901,7 +1984,7 @@ rprealloc(void* ptr, size_t size) { extern RPMALLOC_ALLOCATOR void* rpaligned_realloc(void* ptr, size_t alignment, size_t size, size_t oldsize, unsigned int flags) { #if ENABLE_VALIDATE_ARGS - if ((size + alignment < size) || (alignment > _memory_page_size)) { + if ((size + alignment < size) || (alignment > SMALL_PAGE_SIZE)) { errno = EINVAL; return 0; } @@ -2210,6 +2293,21 @@ rpmalloc_dump_statistics(void* file) { #endif } +void +rpmalloc_global_statistics(rpmalloc_global_statistics_t* stats) { +#if ENABLE_STATISTICS + stats->mapped = global_config.page_size * atomic_load_explicit(&global_statistics.page_mapped, memory_order_relaxed); + stats->mapped_peak = global_config.page_size * atomic_load_explicit(&global_statistics.page_mapped_peak, memory_order_relaxed); + stats->committed = global_config.page_size * atomic_load_explicit(&global_statistics.page_commit, memory_order_relaxed); + stats->decommitted = global_config.page_size * atomic_load_explicit(&global_statistics.page_decommit, memory_order_relaxed); + stats->active = global_config.page_size * atomic_load_explicit(&global_statistics.page_active, memory_order_relaxed); + stats->active_peak = global_config.page_size * atomic_load_explicit(&global_statistics.page_active_peak, memory_order_relaxed); + stats->heap_count = atomic_load_explicit(&global_statistics.heap_count, memory_order_relaxed); +#else + memset(stats, 0, sizeof(rpmalloc_global_statistics_t)); +#endif +} + #if RPMALLOC_FIRST_CLASS_HEAPS rpmalloc_heap_t* @@ -2253,6 +2351,17 @@ rpmalloc_heap_aligned_alloc(rpmalloc_heap_t* heap, size_t alignment, size_t size } RPMALLOC_ALLOCATOR void* +rpmalloc_heap_aligned_zalloc(rpmalloc_heap_t* heap, size_t alignment, size_t size) { +#if ENABLE_VALIDATE_ARGS + if (size >= MAX_ALLOC_SIZE) { + errno = EINVAL; + return 0; + } +#endif + return heap_allocate_block_aligned(heap, alignment, size, 1); +} + +RPMALLOC_ALLOCATOR void* rpmalloc_heap_calloc(rpmalloc_heap_t* heap, size_t num, size_t size) { size_t total; #if ENABLE_VALIDATE_ARGS @@ -2312,7 +2421,7 @@ rpmalloc_heap_realloc(rpmalloc_heap_t* heap, void* ptr, size_t size, unsigned in RPMALLOC_ALLOCATOR void* rpmalloc_heap_aligned_realloc(rpmalloc_heap_t* heap, void* ptr, size_t alignment, size_t size, unsigned int flags) { #if ENABLE_VALIDATE_ARGS - if ((size + alignment < size) || (alignment > _memory_page_size)) { + if ((size + alignment < size) || (alignment > SMALL_PAGE_SIZE)) { errno = EINVAL; return 0; } @@ -2332,6 +2441,18 @@ rpmalloc_heap_free_all(rpmalloc_heap_t* heap) { heap_free_all(heap); } +struct rpmalloc_heap_statistics_t +rpmalloc_heap_statistics(rpmalloc_heap_t* heap) { +#if RPMALLOC_HEAP_STATISTICS + if (heap) { + return heap->stats; + } +#endif + (void)sizeof(heap); + struct rpmalloc_heap_statistics_t stats = {0}; + return stats; +} + extern inline void rpmalloc_heap_thread_set_current(rpmalloc_heap_t* heap) { heap_t* prev_heap = get_thread_heap(); diff --git a/thirdparty/rpmalloc/rpmalloc.h b/thirdparty/rpmalloc/rpmalloc.h index d11292fb1..ea7d18e23 100644 --- a/thirdparty/rpmalloc/rpmalloc.h +++ b/thirdparty/rpmalloc/rpmalloc.h @@ -54,11 +54,16 @@ extern "C" { #define RPMALLOC_MAX_ALIGNMENT (256 * 1024) -//! Define RPMALLOC_FIRST_CLASS_HEAPS to enable heap based API (rpmalloc_heap_* functions). +//! Define RPMALLOC_FIRST_CLASS_HEAPS to non-zero to enable heap based API (rpmalloc_heap_* functions). #ifndef RPMALLOC_FIRST_CLASS_HEAPS #define RPMALLOC_FIRST_CLASS_HEAPS 0 #endif +//! Define RPMALLOC_HEAP_STATISTICS to non-zero to enable first class heap statistics gathering. +#ifndef RPMALLOC_HEAP_STATISTICS +#define RPMALLOC_HEAP_STATISTICS 0 +#endif + //! Flag to rpaligned_realloc to not preserve content in reallocation #define RPMALLOC_NO_PRESERVE 1 //! Flag to rpaligned_realloc to fail and return null pointer if grow cannot be done in-place, @@ -72,18 +77,16 @@ typedef struct rpmalloc_global_statistics_t { size_t mapped; //! Peak amount of virtual memory mapped, all of which might not have been committed (only if ENABLE_STATISTICS=1) size_t mapped_peak; - //! Current amount of memory in global caches for small and medium sizes (<32KiB) - size_t cached; - //! Current amount of memory allocated in huge allocations, i.e larger than LARGE_SIZE_LIMIT which is 2MiB by - //! default (only if ENABLE_STATISTICS=1) - size_t huge_alloc; - //! Peak amount of memory allocated in huge allocations, i.e larger than LARGE_SIZE_LIMIT which is 2MiB by default - //! (only if ENABLE_STATISTICS=1) - size_t huge_alloc_peak; - //! Total amount of memory mapped since initialization (only if ENABLE_STATISTICS=1) - size_t mapped_total; - //! Total amount of memory unmapped since initialization (only if ENABLE_STATISTICS=1) - size_t unmapped_total; + //! Running counter of total amount of memory committed (only if ENABLE_STATISTICS=1) + size_t committed; + //! Running counter of total amount of memory decommitted (only if ENABLE_STATISTICS=1) + size_t decommitted; + //! Current amount of virtual memory active and committed (only if ENABLE_STATISTICS=1) + size_t active; + //! Peak amount of virtual memory active and committed (only if ENABLE_STATISTICS=1) + size_t active_peak; + //! Current heap count (only if ENABLE_STATISTICS=1) + size_t heap_count; } rpmalloc_global_statistics_t; typedef struct rpmalloc_thread_statistics_t { @@ -147,10 +150,10 @@ typedef struct rpmalloc_interface_t { //! set a memory_unmap function or else the default implementation will be used for both. This function must be //! thread safe, it can be called by multiple threads simultaneously. void* (*memory_map)(size_t size, size_t alignment, size_t* offset, size_t* mapped_size); - //! Commit a range of memory pages - void (*memory_commit)(void* address, size_t size); - //! Decommit a range of memory pages - void (*memory_decommit)(void* address, size_t size); + //! Commit a range of memory pages. Return non-zero if the operation failed and the address range could not be committed. + int (*memory_commit)(void* address, size_t size); + //! Decommit a range of memory pages. Return non-zero if the operation failed and the address range could not be decommitted. + int (*memory_decommit)(void* address, size_t size); //! Unmap the memory pages starting at address and spanning the given number of bytes. If you set a memory_unmap //! function, you must also set a memory_map function or else the default implementation will be used for both. This //! function must be thread safe, it can be called by multiple threads simultaneously. @@ -260,44 +263,38 @@ rprealloc(void* ptr, size_t size) RPMALLOC_ATTRIB_MALLOC RPMALLOC_ATTRIB_ALLOC_S //! Reallocate the given block to at least the given size and alignment, // with optional control flags (see RPMALLOC_NO_PRESERVE). // Alignment must be a power of two and a multiple of sizeof(void*), -// and should ideally be less than memory page size. A caveat of rpmalloc -// internals is that this must also be strictly less than the span size (default 64KiB) +// and should ideally be less than memory page size. RPMALLOC_EXPORT RPMALLOC_ALLOCATOR void* rpaligned_realloc(void* ptr, size_t alignment, size_t size, size_t oldsize, unsigned int flags) RPMALLOC_ATTRIB_MALLOC RPMALLOC_ATTRIB_ALLOC_SIZE(3); //! Allocate a memory block of at least the given size and alignment. // Alignment must be a power of two and a multiple of sizeof(void*), -// and should ideally be less than memory page size. A caveat of rpmalloc -// internals is that this must also be strictly less than the span size (default 64KiB) +// and should ideally be less than memory page size. RPMALLOC_EXPORT RPMALLOC_ALLOCATOR void* rpaligned_alloc(size_t alignment, size_t size) RPMALLOC_ATTRIB_MALLOC RPMALLOC_ATTRIB_ALLOC_SIZE(2); //! Allocate a memory block of at least the given size and alignment. // Alignment must be a power of two and a multiple of sizeof(void*), -// and should ideally be less than memory page size. A caveat of rpmalloc -// internals is that this must also be strictly less than the span size (default 64KiB) +// and should ideally be less than memory page size. RPMALLOC_EXPORT RPMALLOC_ALLOCATOR void* rpaligned_zalloc(size_t alignment, size_t size) RPMALLOC_ATTRIB_MALLOC RPMALLOC_ATTRIB_ALLOC_SIZE(2); //! Allocate a memory block of at least the given size and alignment, and zero initialize it. // Alignment must be a power of two and a multiple of sizeof(void*), -// and should ideally be less than memory page size. A caveat of rpmalloc -// internals is that this must also be strictly less than the span size (default 64KiB) +// and should ideally be less than memory page size. RPMALLOC_EXPORT RPMALLOC_ALLOCATOR void* rpaligned_calloc(size_t alignment, size_t num, size_t size) RPMALLOC_ATTRIB_MALLOC RPMALLOC_ATTRIB_ALLOC_SIZE2(2, 3); //! Allocate a memory block of at least the given size and alignment. // Alignment must be a power of two and a multiple of sizeof(void*), -// and should ideally be less than memory page size. A caveat of rpmalloc -// internals is that this must also be strictly less than the span size (default 64KiB) +// and should ideally be less than memory page size. RPMALLOC_EXPORT RPMALLOC_ALLOCATOR void* rpmemalign(size_t alignment, size_t size) RPMALLOC_ATTRIB_MALLOC RPMALLOC_ATTRIB_ALLOC_SIZE(2); //! Allocate a memory block of at least the given size and alignment. // Alignment must be a power of two and a multiple of sizeof(void*), -// and should ideally be less than memory page size. A caveat of rpmalloc -// internals is that this must also be strictly less than the span size (default 64KiB) +// and should ideally be less than memory page size. RPMALLOC_EXPORT int rpposix_memalign(void** memptr, size_t alignment, size_t size); @@ -336,12 +333,18 @@ rpmalloc_heap_alloc(rpmalloc_heap_t* heap, size_t size) RPMALLOC_ATTRIB_MALLOC R //! Allocate a memory block of at least the given size using the given heap. The returned // block will have the requested alignment. Alignment must be a power of two and a multiple of sizeof(void*), -// and should ideally be less than memory page size. A caveat of rpmalloc -// internals is that this must also be strictly less than the span size (default 64KiB). +// and should ideally be less than memory page size. RPMALLOC_EXPORT RPMALLOC_ALLOCATOR void* rpmalloc_heap_aligned_alloc(rpmalloc_heap_t* heap, size_t alignment, size_t size) RPMALLOC_ATTRIB_MALLOC RPMALLOC_ATTRIB_ALLOC_SIZE(3); +//! Allocate a zero initialized memory block of at least the given size using the given heap. The returned +// block will have the requested alignment. Alignment must be a power of two and a multiple of sizeof(void*), +// and should ideally be less than memory page size. +RPMALLOC_EXPORT RPMALLOC_ALLOCATOR void* +rpmalloc_heap_aligned_zalloc(rpmalloc_heap_t* heap, size_t alignment, size_t size) RPMALLOC_ATTRIB_MALLOC + RPMALLOC_ATTRIB_ALLOC_SIZE(3); + //! Allocate a memory block of at least the given size using the given heap and zero initialize it. RPMALLOC_EXPORT RPMALLOC_ALLOCATOR void* rpmalloc_heap_calloc(rpmalloc_heap_t* heap, size_t num, size_t size) RPMALLOC_ATTRIB_MALLOC @@ -349,8 +352,7 @@ rpmalloc_heap_calloc(rpmalloc_heap_t* heap, size_t num, size_t size) RPMALLOC_AT //! Allocate a memory block of at least the given size using the given heap and zero initialize it. The returned // block will have the requested alignment. Alignment must either be zero, or a power of two and a multiple of -// sizeof(void*), and should ideally be less than memory page size. A caveat of rpmalloc internals is that this must -// also be strictly less than the span size (default 64KiB). +// sizeof(void*), and should ideally be less than memory page size. RPMALLOC_EXPORT RPMALLOC_ALLOCATOR void* rpmalloc_heap_aligned_calloc(rpmalloc_heap_t* heap, size_t alignment, size_t num, size_t size) RPMALLOC_ATTRIB_MALLOC RPMALLOC_ATTRIB_ALLOC_SIZE2(3, 4); @@ -364,8 +366,7 @@ rpmalloc_heap_realloc(rpmalloc_heap_t* heap, void* ptr, size_t size, unsigned in //! Reallocate the given block to at least the given size. The memory block MUST be allocated // by the same heap given to this function. The returned block will have the requested alignment. // Alignment must be either zero, or a power of two and a multiple of sizeof(void*), and should ideally be -// less than memory page size. A caveat of rpmalloc internals is that this must also be strictly less than -// the span size (default 64KiB). +// less than memory page size. RPMALLOC_EXPORT RPMALLOC_ALLOCATOR void* rpmalloc_heap_aligned_realloc(rpmalloc_heap_t* heap, void* ptr, size_t alignment, size_t size, unsigned int flags) RPMALLOC_ATTRIB_MALLOC RPMALLOC_ATTRIB_ALLOC_SIZE(4); @@ -379,6 +380,19 @@ rpmalloc_heap_free(rpmalloc_heap_t* heap, void* ptr); RPMALLOC_EXPORT void rpmalloc_heap_free_all(rpmalloc_heap_t* heap); +struct rpmalloc_heap_statistics_t { + // Number of bytes allocated + size_t allocated_size; + // Number of bytes committed + size_t committed_size; + // Number of bytes mapped + size_t mapped_size; +}; + +//! Get heap statistics (if enabled in build) +RPMALLOC_EXPORT struct rpmalloc_heap_statistics_t +rpmalloc_heap_statistics(rpmalloc_heap_t* heap); + //! Set the given heap as the current heap for the calling thread. A heap MUST only be current heap // for a single thread, a heap can never be shared between multiple threads. The previous // current heap for the calling thread is released to be reused by other threads. diff --git a/thirdparty/xmake.lua b/thirdparty/xmake.lua index 1f5902fdf..ea861fc55 100644 --- a/thirdparty/xmake.lua +++ b/thirdparty/xmake.lua @@ -32,9 +32,9 @@ target('ue-trace') add_includedirs("trace", {public=true}) add_headerfiles("trace/**.h") --- rpmalloc 1.5.0-dev.20250810 --- Vendored from develop branch commit 6b34d956911b (2025-08-10) --- https://github.com/mjansson/rpmalloc/commit/6b34d956911b +-- rpmalloc 1.5.0-dev.20251026 +-- Vendored from develop branch commit feb43aee0d4d (2025-10-26) +-- https://github.com/mjansson/rpmalloc/commit/feb43aee0d4d target('rpmalloc') set_kind("static") set_group('thirdparty') |