diff options
Diffstat (limited to 'src/zenhttp/asynchttpclient_test.cpp')
| -rw-r--r-- | src/zenhttp/asynchttpclient_test.cpp | 683 |
1 files changed, 571 insertions, 112 deletions
diff --git a/src/zenhttp/asynchttpclient_test.cpp b/src/zenhttp/asynchttpclient_test.cpp index 151863370..0b6877c7b 100644 --- a/src/zenhttp/asynchttpclient_test.cpp +++ b/src/zenhttp/asynchttpclient_test.cpp @@ -5,21 +5,24 @@ #if ZEN_WITH_TESTS +# include <zencore/basicfile.h> # include <zencore/iobuffer.h> # include <zencore/logging.h> # include <zencore/scopeguard.h> # include <zencore/testing.h> # include <zencore/testutils.h> +# include <zencore/thread.h> # include "servers/httpasio.h" -# include <atomic> -# include <thread> - ZEN_THIRD_PARTY_INCLUDES_START # include <asio.hpp> ZEN_THIRD_PARTY_INCLUDES_END +# include <atomic> +# include <cstring> +# include <thread> + namespace zen { using namespace std::literals; @@ -67,13 +70,65 @@ public: Req.ServerRequest().WriteResponse(HttpResponseCode::OK, HttpContentType::kJSON, "{\"ok\":true}"); }, HttpVerb::kGet); + + m_Router.RegisterRoute( + "large", + [](HttpRouterRequest& Req) { + // 4 MB body so the response exercises chunked write callbacks. + IoBuffer Body(4u * 1024u * 1024u); + uint8_t* Data = static_cast<uint8_t*>(Body.MutableData()); + for (size_t I = 0; I < Body.GetSize(); ++I) + { + Data[I] = static_cast<uint8_t>(I & 0xFF); + } + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Body); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "slow", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + std::string_view MsStr = Params.GetValue("ms"); + int Ms = MsStr.empty() ? 100 : std::atoi(std::string(MsStr).c_str()); + m_SlowHits.fetch_add(1, std::memory_order_relaxed); + zen::Sleep(Ms); + HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, "slow ok"); + }, + HttpVerb::kGet); + + // Returns 503 for the first ?fail=N requests, then 200 for the rest. + m_Router.RegisterRoute( + "flaky", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + std::string_view FailStr = Params.GetValue("fail"); + const int FailN = FailStr.empty() ? 0 : std::atoi(std::string(FailStr).c_str()); + const int Hit = m_FlakyHits.fetch_add(1, std::memory_order_relaxed) + 1; + if (Hit <= FailN) + { + HttpReq.WriteResponse(HttpResponseCode::ServiceUnavailable, HttpContentType::kText, "fail"); + } + else + { + HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, "ok"); + } + }, + HttpVerb::kGet); } + std::atomic<int>& SlowHits() { return m_SlowHits; } + std::atomic<int>& FlakyHits() { return m_FlakyHits; } + virtual const char* BaseUri() const override { return "/api/async-test/"; } virtual void HandleRequest(HttpServerRequest& Request) override { m_Router.HandleRequest(Request); } private: HttpRequestRouter m_Router; + std::atomic<int> m_SlowHits{0}; + std::atomic<int> m_FlakyHits{0}; }; ////////////////////////////////////////////////////////////////////////// @@ -118,189 +173,593 @@ struct AsyncTestServerFixture TEST_SUITE_BEGIN("http.asynchttpclient"); -TEST_CASE("asynchttpclient.future.verbs") +// Future API + callback API + verb dispatch + payload echo + lifecycle. All +// scopes share one fixture and one default-settings client. Per-scope sets +// up its own promises/futures. +TEST_CASE("asynchttpclient.basic") { AsyncTestServerFixture Fixture; AsyncHttpClient Client = Fixture.MakeClient(); - SUBCASE("GET returns 200 with expected body") + // future.verbs - GET / POST / PUT / DELETE / HEAD echo the verb. { - auto Future = Client.Get("/api/async-test/echo/method"); - auto Resp = Future.get(); + auto Resp = Client.Get("/api/async-test/echo/method").get(); CHECK(Resp.IsSuccess()); CHECK_EQ(Resp.AsText(), "GET"); } - - SUBCASE("POST dispatches correctly") { - auto Future = Client.Post("/api/async-test/echo/method"); - auto Resp = Future.get(); + auto Resp = Client.Post("/api/async-test/echo/method").get(); CHECK(Resp.IsSuccess()); CHECK_EQ(Resp.AsText(), "POST"); } - - SUBCASE("PUT dispatches correctly") { - auto Future = Client.Put("/api/async-test/echo/method"); - auto Resp = Future.get(); + auto Resp = Client.Put("/api/async-test/echo/method").get(); CHECK(Resp.IsSuccess()); CHECK_EQ(Resp.AsText(), "PUT"); } - - SUBCASE("DELETE dispatches correctly") { - auto Future = Client.Delete("/api/async-test/echo/method"); - auto Resp = Future.get(); + auto Resp = Client.Delete("/api/async-test/echo/method").get(); CHECK(Resp.IsSuccess()); CHECK_EQ(Resp.AsText(), "DELETE"); } - - SUBCASE("HEAD returns 200 with empty body") { - auto Future = Client.Head("/api/async-test/echo/method"); - auto Resp = Future.get(); + auto Resp = Client.Head("/api/async-test/echo/method").get(); CHECK(Resp.IsSuccess()); CHECK_EQ(Resp.AsText(), ""sv); } -} -TEST_CASE("asynchttpclient.future.get") -{ - AsyncTestServerFixture Fixture; - AsyncHttpClient Client = Fixture.MakeClient(); - - SUBCASE("simple GET with text response") + // future.get - text body, JSON body, 204 NoContent. { - auto Future = Client.Get("/api/async-test/hello"); - auto Resp = Future.get(); + auto Resp = Client.Get("/api/async-test/hello").get(); CHECK(Resp.IsSuccess()); CHECK_EQ(Resp.StatusCode, HttpResponseCode::OK); CHECK_EQ(Resp.AsText(), "hello world"); } - - SUBCASE("GET returning JSON") { - auto Future = Client.Get("/api/async-test/json"); - auto Resp = Future.get(); + auto Resp = Client.Get("/api/async-test/json").get(); CHECK(Resp.IsSuccess()); CHECK_EQ(Resp.AsText(), "{\"ok\":true}"); } - - SUBCASE("GET 204 NoContent") { - auto Future = Client.Get("/api/async-test/nocontent"); - auto Resp = Future.get(); + auto Resp = Client.Get("/api/async-test/nocontent").get(); CHECK(Resp.IsSuccess()); CHECK_EQ(Resp.StatusCode, HttpResponseCode::NoContent); } -} - -TEST_CASE("asynchttpclient.future.post.with.payload") -{ - AsyncTestServerFixture Fixture; - AsyncHttpClient Client = Fixture.MakeClient(); - std::string_view PayloadStr = "async payload data"; - IoBuffer Payload(IoBuffer::Clone, PayloadStr.data(), PayloadStr.size()); - Payload.SetContentType(ZenContentType::kText); + // future.post.with.payload + future.put.with.payload - echo round-trips. + { + std::string_view Str = "async payload data"; + IoBuffer Payload(IoBuffer::Clone, Str.data(), Str.size()); + Payload.SetContentType(ZenContentType::kText); + auto Resp = Client.Post("/api/async-test/echo", Payload).get(); + CHECK(Resp.IsSuccess()); + CHECK_EQ(Resp.AsText(), "async payload data"); + } + { + std::string_view Str = "put payload"; + IoBuffer Payload(IoBuffer::Clone, Str.data(), Str.size()); + Payload.SetContentType(ZenContentType::kText); + auto Resp = Client.Put("/api/async-test/echo", Payload).get(); + CHECK(Resp.IsSuccess()); + CHECK_EQ(Resp.AsText(), "put payload"); + } - auto Future = Client.Post("/api/async-test/echo", Payload); - auto Resp = Future.get(); - CHECK(Resp.IsSuccess()); - CHECK_EQ(Resp.AsText(), "async payload data"); -} + // callback - AsyncGet completion fires the callback. + { + std::promise<HttpClient::Response> Promise; + auto Future = Promise.get_future(); + Client.AsyncGet("/api/async-test/hello", [&Promise](HttpClient::Response Resp) { Promise.set_value(std::move(Resp)); }); + auto Resp = Future.get(); + CHECK(Resp.IsSuccess()); + CHECK_EQ(Resp.AsText(), "hello world"); + } -TEST_CASE("asynchttpclient.future.put.with.payload") -{ - AsyncTestServerFixture Fixture; - AsyncHttpClient Client = Fixture.MakeClient(); + // concurrent.requests - multiple verbs in flight at once. + { + auto F1 = Client.Get("/api/async-test/hello"); + auto F2 = Client.Get("/api/async-test/json"); + auto F3 = Client.Post("/api/async-test/echo/method"); + auto F4 = Client.Delete("/api/async-test/echo/method"); + auto R1 = F1.get(); + auto R2 = F2.get(); + auto R3 = F3.get(); + auto R4 = F4.get(); + CHECK(R1.IsSuccess()); + CHECK_EQ(R1.AsText(), "hello world"); + CHECK(R2.IsSuccess()); + CHECK_EQ(R2.AsText(), "{\"ok\":true}"); + CHECK(R3.IsSuccess()); + CHECK_EQ(R3.AsText(), "POST"); + CHECK(R4.IsSuccess()); + CHECK_EQ(R4.AsText(), "DELETE"); + } - std::string_view PutStr = "put payload"; - IoBuffer Payload(IoBuffer::Clone, PutStr.data(), PutStr.size()); - Payload.SetContentType(ZenContentType::kText); + // cancel.after.completion.is.noop - late Cancel must be quiet. + { + auto Resp = Client.Get("/api/async-test/hello").get(); + REQUIRE(Resp.IsSuccess()); + std::promise<HttpClient::Response> P; + auto F = P.get_future(); + AsyncRequestToken Token = Client.AsyncGet("/api/async-test/hello", [&P](HttpClient::Response R) { P.set_value(std::move(R)); }); + auto Resp2 = F.get(); + REQUIRE(Resp2.IsSuccess()); + Token.Cancel(); // no-op; must not crash + } - auto Future = Client.Put("/api/async-test/echo", Payload); - auto Resp = Future.get(); - CHECK(Resp.IsSuccess()); - CHECK_EQ(Resp.AsText(), "put payload"); + // lifecycle.repeated.construct.destroy - 8 fresh clients against the same + // server. Catches io thread / curl_multi leaks across construct/destroy. + for (int I = 0; I < 8; ++I) + { + AsyncHttpClient Local = Fixture.MakeClient(); + auto Resp = Local.Get("/api/async-test/hello").get(); + CHECK(Resp.IsSuccess()); + CHECK_EQ(Resp.AsText(), "hello world"); + } } -TEST_CASE("asynchttpclient.callback") +// Submit-side behavior: external io_context, shutdown cancel, no-queue +// contract, cross-thread cancel-before-submit race, unlimited fan-out. +// +// MaxConcurrentRequests is applied as curl connection caps only; cap-level +// fan-out throttling lives in the storage layer (see +// server.s3asyncstorage.admission.fanout). +TEST_CASE("asynchttpclient.submit_and_shutdown") { AsyncTestServerFixture Fixture; - AsyncHttpClient Client = Fixture.MakeClient(); - std::promise<HttpClient::Response> Promise; - auto Future = Promise.get_future(); + // external.io_context - caller drives the run loop. Verifies the + // Cleanup-via-promise path in the dtor. + { + asio::io_context IoContext; + auto WorkGuard = asio::make_work_guard(IoContext); + std::thread IoThread([&IoContext]() { IoContext.run(); }); + { + AsyncHttpClient Client = Fixture.MakeClient(IoContext); + auto Resp = Client.Get("/api/async-test/echo/method").get(); + CHECK(Resp.IsSuccess()); + CHECK_EQ(Resp.AsText(), "GET"); + } + WorkGuard.reset(); + IoThread.join(); + } + + // shutdown.cancels.in.flight - dtor synthesizes cancel for all in-flight. + { + const int N = 6; + std::vector<std::promise<HttpClient::Response>> Promises(N); + std::vector<std::future<HttpClient::Response>> Futures; + for (auto& P : Promises) + { + Futures.push_back(P.get_future()); + } + { + AsyncHttpClient Client = Fixture.MakeClient(); + std::vector<AsyncRequestToken> Tokens; + for (int I = 0; I < N; ++I) + { + Tokens.push_back(Client.AsyncGet("/api/async-test/slow?ms=2000", + [&Promises, I](HttpClient::Response R) { Promises[I].set_value(std::move(R)); })); + } + Sleep(50); // let requests actually start before client teardown + } + int CancelCount = 0; + for (auto& F : Futures) + { + REQUIRE(F.wait_for(std::chrono::seconds(5)) == std::future_status::ready); + HttpClient::Response R = F.get(); + if (R.Error.has_value() && R.Error->ErrorCode == HttpClientErrorCode::kRequestCancelled) + { + ++CancelCount; + } + } + CHECK(CancelCount == N); + } + + // contract.no.queue - all submissions reach the network despite cap=4. + // Fan-out gating is the storage layer's responsibility. + { + HttpClientSettings Settings; + Settings.MaxConcurrentRequests = 4; + AsyncHttpClient Client = Fixture.MakeClient(Settings); + + const int N = 100; + std::vector<std::promise<HttpClient::Response>> Promises(N); + std::vector<std::future<HttpClient::Response>> Futures; + std::vector<AsyncRequestToken> Tokens; + for (auto& P : Promises) + { + Futures.push_back(P.get_future()); + } + for (int I = 0; I < N; ++I) + { + Tokens.push_back( + Client.AsyncGet("/api/async-test/hello", [&Promises, I](HttpClient::Response R) { Promises[I].set_value(std::move(R)); })); + } + for (auto& F : Futures) + { + REQUIRE(F.wait_for(std::chrono::seconds(15)) == std::future_status::ready); + CHECK(F.get().IsSuccess()); + } + } - Client.AsyncGet("/api/async-test/hello", [&Promise](HttpClient::Response Resp) { Promise.set_value(std::move(Resp)); }); + // cancel.before.submit - cross-thread race: SubmitFromSpec posted, Cancel + // from another thread fires before/after submit handler runs. All callbacks + // must surface kRequestCancelled exactly once. + { + AsyncHttpClient Client = Fixture.MakeClient(); + const int N = 16; + std::vector<std::promise<HttpClient::Response>> Promises(N); + std::vector<std::future<HttpClient::Response>> Futures; + std::vector<AsyncRequestToken> Tokens; + for (auto& P : Promises) + { + Futures.push_back(P.get_future()); + } + for (int I = 0; I < N; ++I) + { + Tokens.push_back(Client.AsyncGet("/api/async-test/slow?ms=2000", + [&Promises, I](HttpClient::Response R) { Promises[I].set_value(std::move(R)); })); + } + std::thread CancelThread([&]() { + for (auto& T : Tokens) + { + T.Cancel(); + } + }); + for (auto& F : Futures) + { + REQUIRE(F.wait_for(std::chrono::seconds(5)) == std::future_status::ready); + HttpClient::Response R = F.get(); + REQUIRE(R.Error.has_value()); + CHECK(R.Error->ErrorCode == HttpClientErrorCode::kRequestCancelled); + } + CancelThread.join(); + } - auto Resp = Future.get(); - CHECK(Resp.IsSuccess()); - CHECK_EQ(Resp.AsText(), "hello world"); + // unlimited.parallel.fanout - 8 parallel 100ms requests with default + // settings (no cap) finish well under the 800ms serial floor. Sized to one + // batch on the asio server's 8-thread pool so per-request setup overhead on + // slow CI agents does not dominate; threshold leaves >=2x margin over the + // ~100ms parallel ideal. + { + AsyncHttpClient Client = Fixture.MakeClient(); + const int N = 8; + std::vector<std::promise<HttpClient::Response>> Promises(N); + std::vector<std::future<HttpClient::Response>> Futures; + std::vector<AsyncRequestToken> Tokens; + for (auto& P : Promises) + { + Futures.push_back(P.get_future()); + } + const auto Start = std::chrono::steady_clock::now(); + for (int I = 0; I < N; ++I) + { + Tokens.push_back(Client.AsyncGet("/api/async-test/slow?ms=100", + [&Promises, I](HttpClient::Response R) { Promises[I].set_value(std::move(R)); })); + } + for (auto& F : Futures) + { + REQUIRE(F.wait_for(std::chrono::seconds(10)) == std::future_status::ready); + CHECK(F.get().IsSuccess()); + } + const auto ElapsedMs = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - Start).count(); + CHECK(ElapsedMs < 600); + } } -TEST_CASE("asynchttpclient.concurrent.requests") +// AsyncStream coverage: chunk delivery, OnData abort, mid-flight cancel. +TEST_CASE("asynchttpclient.stream") { AsyncTestServerFixture Fixture; - AsyncHttpClient Client = Fixture.MakeClient(); - - // Fire multiple requests concurrently - auto Future1 = Client.Get("/api/async-test/hello"); - auto Future2 = Client.Get("/api/async-test/json"); - auto Future3 = Client.Post("/api/async-test/echo/method"); - auto Future4 = Client.Delete("/api/async-test/echo/method"); - auto Resp1 = Future1.get(); - auto Resp2 = Future2.get(); - auto Resp3 = Future3.get(); - auto Resp4 = Future4.get(); + // stream.basic - 4 MiB stream completes; bytes accounted; no body buffering. + { + AsyncHttpClient Client = Fixture.MakeClient(); + std::atomic<uint64_t> TotalReceived{0}; + std::atomic<uint32_t> ChunkCount{0}; + std::atomic<uint64_t> TotalSizeSeen{0}; + std::promise<HttpClient::Response> P; + auto F = P.get_future(); + + auto Token = Client.AsyncStream( + "/api/async-test/large", + [&](const uint8_t* /*Data*/, size_t Size, uint64_t TotalSize) -> bool { + TotalReceived.fetch_add(Size, std::memory_order_relaxed); + ChunkCount.fetch_add(1, std::memory_order_relaxed); + if (TotalSize != 0) + { + TotalSizeSeen.store(TotalSize, std::memory_order_relaxed); + } + return true; + }, + [&P](HttpClient::Response R) { P.set_value(std::move(R)); }); - CHECK(Resp1.IsSuccess()); - CHECK_EQ(Resp1.AsText(), "hello world"); + REQUIRE(F.wait_for(std::chrono::seconds(10)) == std::future_status::ready); + auto Resp = F.get(); + CHECK(Resp.IsSuccess()); + CHECK_EQ(TotalReceived.load(), 4u * 1024u * 1024u); + CHECK_EQ(TotalSizeSeen.load(), 4u * 1024u * 1024u); + CHECK(ChunkCount.load() >= 1); + CHECK_EQ(Resp.ResponsePayload.GetSize(), 0u); + } - CHECK(Resp2.IsSuccess()); - CHECK_EQ(Resp2.AsText(), "{\"ok\":true}"); + // stream.ondata.abort - returning false from OnData stops the transfer. + { + AsyncHttpClient Client = Fixture.MakeClient(); + std::atomic<uint32_t> ChunkCount{0}; + std::promise<HttpClient::Response> P; + auto F = P.get_future(); + + Client.AsyncStream( + "/api/async-test/large", + [&](const uint8_t*, size_t, uint64_t) -> bool { + ChunkCount.fetch_add(1, std::memory_order_relaxed); + return false; + }, + [&P](HttpClient::Response R) { P.set_value(std::move(R)); }); - CHECK(Resp3.IsSuccess()); - CHECK_EQ(Resp3.AsText(), "POST"); + REQUIRE(F.wait_for(std::chrono::seconds(10)) == std::future_status::ready); + auto Resp = F.get(); + CHECK_FALSE(Resp.IsSuccess()); + CHECK(Resp.Error.has_value()); + CHECK(ChunkCount.load() <= 1); + } - CHECK(Resp4.IsSuccess()); - CHECK_EQ(Resp4.AsText(), "DELETE"); + // stream.cancel.mid.flight - Cancel during long stream surfaces + // kRequestCancelled. RetryCount=0 so no retry layer masks the error code. + { + HttpClientSettings Settings; + Settings.RetryCount = 0; + AsyncHttpClient Client = Fixture.MakeClient(Settings); + std::promise<HttpClient::Response> P; + auto F = P.get_future(); + auto Token = Client.AsyncStream( + "/api/async-test/slow?ms=2000", + [](const uint8_t*, size_t, uint64_t) -> bool { return true; }, + [&P](HttpClient::Response R) { P.set_value(std::move(R)); }); + Sleep(50); + Token.Cancel(); + REQUIRE(F.wait_for(std::chrono::seconds(5)) == std::future_status::ready); + auto Resp = F.get(); + CHECK_FALSE(Resp.IsSuccess()); + REQUIRE(Resp.Error.has_value()); + CHECK(Resp.Error->ErrorCode == HttpClientErrorCode::kRequestCancelled); + } } -TEST_CASE("asynchttpclient.external.io_context") +// High-fanout, mixed verbs, large payload, streaming-source PUT. +TEST_CASE("asynchttpclient.stress") { AsyncTestServerFixture Fixture; - asio::io_context IoContext; - auto WorkGuard = asio::make_work_guard(IoContext); - std::thread IoThread([&IoContext]() { IoContext.run(); }); + // high.fanout - 32 unlimited parallel GETs all succeed. + { + AsyncHttpClient Client = Fixture.MakeClient(); + const int N = 32; + std::vector<std::promise<HttpClient::Response>> Promises(N); + std::vector<std::future<HttpClient::Response>> Futures; + std::vector<AsyncRequestToken> Tokens; + for (auto& Pr : Promises) + { + Futures.push_back(Pr.get_future()); + } + for (int I = 0; I < N; ++I) + { + Tokens.push_back( + Client.AsyncGet("/api/async-test/hello", [&Promises, I](HttpClient::Response R) { Promises[I].set_value(std::move(R)); })); + } + for (int I = 0; I < N; ++I) + { + REQUIRE(Futures[I].wait_for(std::chrono::seconds(15)) == std::future_status::ready); + auto Resp = Futures[I].get(); + CHECK(Resp.IsSuccess()); + CHECK_EQ(Resp.AsText(), "hello world"); + } + } + // mixed.verbs.concurrent - GET/POST/PUT/DELETE/Stream concurrently under cap=4. { - AsyncHttpClient Client = Fixture.MakeClient(IoContext); + HttpClientSettings Settings; + Settings.MaxConcurrentRequests = 4; + AsyncHttpClient Client = Fixture.MakeClient(Settings); + + IoBuffer Payload(64); + std::memset(Payload.MutableData(), 0xAB, Payload.GetSize()); + + auto FGet = Client.Get("/api/async-test/hello"); + auto FPost = Client.Post("/api/async-test/echo", Payload); + auto FPut = Client.Put("/api/async-test/echo", Payload); + auto FDelete = Client.Delete("/api/async-test/echo/method"); + auto FJson = Client.Get("/api/async-test/json"); + + std::atomic<uint64_t> StreamBytes{0}; + std::promise<HttpClient::Response> StreamP; + auto StreamF = StreamP.get_future(); + Client.AsyncStream( + "/api/async-test/large", + [&](const uint8_t*, size_t Size, uint64_t) -> bool { + StreamBytes.fetch_add(Size, std::memory_order_relaxed); + return true; + }, + [&StreamP](HttpClient::Response R) { StreamP.set_value(std::move(R)); }); + + auto Get = FGet.get(); + auto Post = FPost.get(); + auto Put = FPut.get(); + auto Delete = FDelete.get(); + auto Json = FJson.get(); + REQUIRE(StreamF.wait_for(std::chrono::seconds(10)) == std::future_status::ready); + auto Stream = StreamF.get(); + + CHECK(Get.IsSuccess()); + CHECK_EQ(Get.AsText(), "hello world"); + CHECK(Post.IsSuccess()); + CHECK_EQ(Post.ResponsePayload.GetSize(), Payload.GetSize()); + CHECK(Put.IsSuccess()); + CHECK_EQ(Put.ResponsePayload.GetSize(), Payload.GetSize()); + CHECK(Delete.IsSuccess()); + CHECK_EQ(Delete.AsText(), "DELETE"); + CHECK(Json.IsSuccess()); + CHECK_EQ(Json.AsText(), "{\"ok\":true}"); + CHECK(Stream.IsSuccess()); + CHECK_EQ(StreamBytes.load(), 4u * 1024u * 1024u); + } - auto Future = Client.Get("/api/async-test/hello"); - auto Resp = Future.get(); - CHECK(Resp.IsSuccess()); - CHECK_EQ(Resp.AsText(), "hello world"); + // large.put.roundtrip - 4 MiB PUT echoed back; spot-check positions. + { + AsyncHttpClient Client = Fixture.MakeClient(); + const size_t Size = 4u * 1024u * 1024u; + IoBuffer Payload(Size); + uint8_t* Data = static_cast<uint8_t*>(Payload.MutableData()); + for (size_t I = 0; I < Size; ++I) + { + Data[I] = static_cast<uint8_t>((I * 31u) & 0xFF); + } + auto Resp = Client.Put("/api/async-test/echo", Payload).get(); + REQUIRE(Resp.IsSuccess()); + REQUIRE_EQ(Resp.ResponsePayload.GetSize(), Size); + const uint8_t* RecvData = static_cast<const uint8_t*>(Resp.ResponsePayload.GetData()); + CHECK(RecvData[0] == Data[0]); + CHECK(RecvData[1u << 10] == Data[1u << 10]); + CHECK(RecvData[Size / 2] == Data[Size / 2]); + CHECK(RecvData[Size - 1] == Data[Size - 1]); } - WorkGuard.reset(); - IoThread.join(); + // streaming.put.source - AsyncPut(url, size, source, callback). + // Part 1: 2 MiB echo round-trip. + // Part 2: source returning 0 with offset < TotalSize aborts via CURL_READFUNC_ABORT. + { + AsyncHttpClient Client = Fixture.MakeClient(); + + { + const size_t Size = 2u * 1024u * 1024u; + std::vector<uint8_t> Source(Size); + for (size_t I = 0; I < Size; ++I) + { + Source[I] = static_cast<uint8_t>((I * 17u + 5u) & 0xFF); + } + std::atomic<uint64_t> SourceCalls{0}; + std::promise<HttpClient::Response> P; + auto F = P.get_future(); + Client.AsyncPut( + "/api/async-test/echo", + Size, + [&](uint8_t* Dst, size_t MaxBytes, uint64_t Offset) -> size_t { + SourceCalls.fetch_add(1, std::memory_order_relaxed); + const size_t Remaining = Size > Offset ? Size - static_cast<size_t>(Offset) : 0; + const size_t Take = std::min(MaxBytes, Remaining); + if (Take == 0) + { + return 0; + } + std::memcpy(Dst, Source.data() + Offset, Take); + return Take; + }, + [&P](HttpClient::Response R) { P.set_value(std::move(R)); }); + REQUIRE(F.wait_for(std::chrono::seconds(15)) == std::future_status::ready); + HttpClient::Response Resp = F.get(); + REQUIRE(Resp.IsSuccess()); + REQUIRE_EQ(Resp.ResponsePayload.GetSize(), Size); + CHECK(SourceCalls.load() >= 1); + const uint8_t* RecvData = static_cast<const uint8_t*>(Resp.ResponsePayload.GetData()); + CHECK(RecvData[0] == Source[0]); + CHECK(RecvData[Size / 2] == Source[Size / 2]); + CHECK(RecvData[Size - 1] == Source[Size - 1]); + CHECK(RecvData[1234567] == Source[1234567]); + } + + { + const size_t DeclaredSize = 1024u * 1024u; + std::atomic<uint64_t> SourceCalls{0}; + std::promise<HttpClient::Response> P; + auto F = P.get_future(); + Client.AsyncPut( + "/api/async-test/echo", + DeclaredSize, + [&](uint8_t* Dst, size_t MaxBytes, uint64_t /*Offset*/) -> size_t { + const uint64_t Hits = SourceCalls.fetch_add(1, std::memory_order_relaxed); + if (Hits >= 1) + { + return 0; // abort: 0 returned with Offset < TotalSize + } + const size_t Take = std::min<size_t>(MaxBytes, 64u); + std::memset(Dst, 0xAB, Take); + return Take; + }, + [&P](HttpClient::Response R) { P.set_value(std::move(R)); }); + REQUIRE(F.wait_for(std::chrono::seconds(15)) == std::future_status::ready); + HttpClient::Response Resp = F.get(); + CHECK_FALSE(Resp.IsSuccess()); + CHECK(Resp.Error.has_value()); + } + } } -TEST_CASE("asynchttpclient.connection.error") +// Connection-error / retry semantics targeting a dead port. No server fixture +// needed; each scope constructs its own client with bespoke timeout settings. +TEST_CASE("asynchttpclient.connection_errors") { - // Connect to a port where nothing is listening - AsyncHttpClient Client("127.0.0.1:1", HttpClientSettings{.ConnectTimeout = std::chrono::milliseconds(500)}); + // connection.error - Get against dead port surfaces connection error. + { + AsyncHttpClient Client("127.0.0.1:1", HttpClientSettings{.ConnectTimeout = std::chrono::milliseconds(500)}); + auto Resp = Client.Get("/should-fail").get(); + CHECK_FALSE(Resp.IsSuccess()); + CHECK(Resp.Error.has_value()); + CHECK(Resp.Error->IsConnectionError()); + } + + // retry.respected.on.connection.error - 2 retries adds >=300ms backoff + // (100ms + 200ms accumulated past the initial attempt). + { + HttpClientSettings Settings{ + .ConnectTimeout = std::chrono::milliseconds(50), + .RetryCount = 2, + }; + AsyncHttpClient Client("127.0.0.1:1", Settings); + const auto Start = std::chrono::steady_clock::now(); + auto Resp = Client.Get("/should-fail").get(); + const auto Elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - Start); + CHECK_FALSE(Resp.IsSuccess()); + CHECK(Elapsed.count() >= 300); + } - auto Future = Client.Get("/should-fail"); - auto Resp = Future.get(); + // cancel.in.flight - Cancel mid-connect doesn't hang (regardless of which + // side of the strand wins the cancel-vs-ECONNREFUSED race). + { + HttpClientSettings Settings{ + .ConnectTimeout = std::chrono::milliseconds(60000), + .RetryCount = 0, + }; + AsyncHttpClient Client("127.0.0.1:1", Settings); + std::promise<HttpClient::Response> P; + auto F = P.get_future(); + const auto Start = std::chrono::steady_clock::now(); + AsyncRequestToken Token = Client.AsyncGet("/should-cancel", [&P](HttpClient::Response R) { P.set_value(std::move(R)); }); + Token.Cancel(); + REQUIRE(F.wait_for(std::chrono::seconds(5)) == std::future_status::ready); + const auto Elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - Start); + auto Resp = F.get(); + CHECK_FALSE(Resp.IsSuccess()); + REQUIRE(Resp.Error.has_value()); + CHECK((Resp.Error->ErrorCode == HttpClientErrorCode::kRequestCancelled || + Resp.Error->ErrorCode == HttpClientErrorCode::kConnectionFailure)); + CHECK(Elapsed.count() < 5000); + } - CHECK_FALSE(Resp.IsSuccess()); - CHECK(Resp.Error.has_value()); - CHECK(Resp.Error->IsConnectionError()); + // retry.zero.no.retry - RetryCount=0 returns promptly (< 1s) with no + // extra backoff past ConnectTimeout. + { + HttpClientSettings Settings{ + .ConnectTimeout = std::chrono::milliseconds(50), + .RetryCount = 0, + }; + AsyncHttpClient Client("127.0.0.1:1", Settings); + const auto Start = std::chrono::steady_clock::now(); + auto Resp = Client.Get("/should-fail").get(); + const auto Elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - Start); + CHECK_FALSE(Resp.IsSuccess()); + CHECK(Elapsed.count() < 1000); + } } TEST_SUITE_END(); |