aboutsummaryrefslogtreecommitdiff
path: root/src/zenhttp/asynchttpclient_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenhttp/asynchttpclient_test.cpp')
-rw-r--r--src/zenhttp/asynchttpclient_test.cpp683
1 files changed, 571 insertions, 112 deletions
diff --git a/src/zenhttp/asynchttpclient_test.cpp b/src/zenhttp/asynchttpclient_test.cpp
index 151863370..0b6877c7b 100644
--- a/src/zenhttp/asynchttpclient_test.cpp
+++ b/src/zenhttp/asynchttpclient_test.cpp
@@ -5,21 +5,24 @@
#if ZEN_WITH_TESTS
+# include <zencore/basicfile.h>
# include <zencore/iobuffer.h>
# include <zencore/logging.h>
# include <zencore/scopeguard.h>
# include <zencore/testing.h>
# include <zencore/testutils.h>
+# include <zencore/thread.h>
# include "servers/httpasio.h"
-# include <atomic>
-# include <thread>
-
ZEN_THIRD_PARTY_INCLUDES_START
# include <asio.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
+# include <atomic>
+# include <cstring>
+# include <thread>
+
namespace zen {
using namespace std::literals;
@@ -67,13 +70,65 @@ public:
Req.ServerRequest().WriteResponse(HttpResponseCode::OK, HttpContentType::kJSON, "{\"ok\":true}");
},
HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "large",
+ [](HttpRouterRequest& Req) {
+ // 4 MB body so the response exercises chunked write callbacks.
+ IoBuffer Body(4u * 1024u * 1024u);
+ uint8_t* Data = static_cast<uint8_t*>(Body.MutableData());
+ for (size_t I = 0; I < Body.GetSize(); ++I)
+ {
+ Data[I] = static_cast<uint8_t>(I & 0xFF);
+ }
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Body);
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "slow",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+ std::string_view MsStr = Params.GetValue("ms");
+ int Ms = MsStr.empty() ? 100 : std::atoi(std::string(MsStr).c_str());
+ m_SlowHits.fetch_add(1, std::memory_order_relaxed);
+ zen::Sleep(Ms);
+ HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, "slow ok");
+ },
+ HttpVerb::kGet);
+
+ // Returns 503 for the first ?fail=N requests, then 200 for the rest.
+ m_Router.RegisterRoute(
+ "flaky",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+ std::string_view FailStr = Params.GetValue("fail");
+ const int FailN = FailStr.empty() ? 0 : std::atoi(std::string(FailStr).c_str());
+ const int Hit = m_FlakyHits.fetch_add(1, std::memory_order_relaxed) + 1;
+ if (Hit <= FailN)
+ {
+ HttpReq.WriteResponse(HttpResponseCode::ServiceUnavailable, HttpContentType::kText, "fail");
+ }
+ else
+ {
+ HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, "ok");
+ }
+ },
+ HttpVerb::kGet);
}
+ std::atomic<int>& SlowHits() { return m_SlowHits; }
+ std::atomic<int>& FlakyHits() { return m_FlakyHits; }
+
virtual const char* BaseUri() const override { return "/api/async-test/"; }
virtual void HandleRequest(HttpServerRequest& Request) override { m_Router.HandleRequest(Request); }
private:
HttpRequestRouter m_Router;
+ std::atomic<int> m_SlowHits{0};
+ std::atomic<int> m_FlakyHits{0};
};
//////////////////////////////////////////////////////////////////////////
@@ -118,189 +173,593 @@ struct AsyncTestServerFixture
TEST_SUITE_BEGIN("http.asynchttpclient");
-TEST_CASE("asynchttpclient.future.verbs")
+// Future API + callback API + verb dispatch + payload echo + lifecycle. All
+// scopes share one fixture and one default-settings client. Per-scope sets
+// up its own promises/futures.
+TEST_CASE("asynchttpclient.basic")
{
AsyncTestServerFixture Fixture;
AsyncHttpClient Client = Fixture.MakeClient();
- SUBCASE("GET returns 200 with expected body")
+ // future.verbs - GET / POST / PUT / DELETE / HEAD echo the verb.
{
- auto Future = Client.Get("/api/async-test/echo/method");
- auto Resp = Future.get();
+ auto Resp = Client.Get("/api/async-test/echo/method").get();
CHECK(Resp.IsSuccess());
CHECK_EQ(Resp.AsText(), "GET");
}
-
- SUBCASE("POST dispatches correctly")
{
- auto Future = Client.Post("/api/async-test/echo/method");
- auto Resp = Future.get();
+ auto Resp = Client.Post("/api/async-test/echo/method").get();
CHECK(Resp.IsSuccess());
CHECK_EQ(Resp.AsText(), "POST");
}
-
- SUBCASE("PUT dispatches correctly")
{
- auto Future = Client.Put("/api/async-test/echo/method");
- auto Resp = Future.get();
+ auto Resp = Client.Put("/api/async-test/echo/method").get();
CHECK(Resp.IsSuccess());
CHECK_EQ(Resp.AsText(), "PUT");
}
-
- SUBCASE("DELETE dispatches correctly")
{
- auto Future = Client.Delete("/api/async-test/echo/method");
- auto Resp = Future.get();
+ auto Resp = Client.Delete("/api/async-test/echo/method").get();
CHECK(Resp.IsSuccess());
CHECK_EQ(Resp.AsText(), "DELETE");
}
-
- SUBCASE("HEAD returns 200 with empty body")
{
- auto Future = Client.Head("/api/async-test/echo/method");
- auto Resp = Future.get();
+ auto Resp = Client.Head("/api/async-test/echo/method").get();
CHECK(Resp.IsSuccess());
CHECK_EQ(Resp.AsText(), ""sv);
}
-}
-TEST_CASE("asynchttpclient.future.get")
-{
- AsyncTestServerFixture Fixture;
- AsyncHttpClient Client = Fixture.MakeClient();
-
- SUBCASE("simple GET with text response")
+ // future.get - text body, JSON body, 204 NoContent.
{
- auto Future = Client.Get("/api/async-test/hello");
- auto Resp = Future.get();
+ auto Resp = Client.Get("/api/async-test/hello").get();
CHECK(Resp.IsSuccess());
CHECK_EQ(Resp.StatusCode, HttpResponseCode::OK);
CHECK_EQ(Resp.AsText(), "hello world");
}
-
- SUBCASE("GET returning JSON")
{
- auto Future = Client.Get("/api/async-test/json");
- auto Resp = Future.get();
+ auto Resp = Client.Get("/api/async-test/json").get();
CHECK(Resp.IsSuccess());
CHECK_EQ(Resp.AsText(), "{\"ok\":true}");
}
-
- SUBCASE("GET 204 NoContent")
{
- auto Future = Client.Get("/api/async-test/nocontent");
- auto Resp = Future.get();
+ auto Resp = Client.Get("/api/async-test/nocontent").get();
CHECK(Resp.IsSuccess());
CHECK_EQ(Resp.StatusCode, HttpResponseCode::NoContent);
}
-}
-
-TEST_CASE("asynchttpclient.future.post.with.payload")
-{
- AsyncTestServerFixture Fixture;
- AsyncHttpClient Client = Fixture.MakeClient();
- std::string_view PayloadStr = "async payload data";
- IoBuffer Payload(IoBuffer::Clone, PayloadStr.data(), PayloadStr.size());
- Payload.SetContentType(ZenContentType::kText);
+ // future.post.with.payload + future.put.with.payload - echo round-trips.
+ {
+ std::string_view Str = "async payload data";
+ IoBuffer Payload(IoBuffer::Clone, Str.data(), Str.size());
+ Payload.SetContentType(ZenContentType::kText);
+ auto Resp = Client.Post("/api/async-test/echo", Payload).get();
+ CHECK(Resp.IsSuccess());
+ CHECK_EQ(Resp.AsText(), "async payload data");
+ }
+ {
+ std::string_view Str = "put payload";
+ IoBuffer Payload(IoBuffer::Clone, Str.data(), Str.size());
+ Payload.SetContentType(ZenContentType::kText);
+ auto Resp = Client.Put("/api/async-test/echo", Payload).get();
+ CHECK(Resp.IsSuccess());
+ CHECK_EQ(Resp.AsText(), "put payload");
+ }
- auto Future = Client.Post("/api/async-test/echo", Payload);
- auto Resp = Future.get();
- CHECK(Resp.IsSuccess());
- CHECK_EQ(Resp.AsText(), "async payload data");
-}
+ // callback - AsyncGet completion fires the callback.
+ {
+ std::promise<HttpClient::Response> Promise;
+ auto Future = Promise.get_future();
+ Client.AsyncGet("/api/async-test/hello", [&Promise](HttpClient::Response Resp) { Promise.set_value(std::move(Resp)); });
+ auto Resp = Future.get();
+ CHECK(Resp.IsSuccess());
+ CHECK_EQ(Resp.AsText(), "hello world");
+ }
-TEST_CASE("asynchttpclient.future.put.with.payload")
-{
- AsyncTestServerFixture Fixture;
- AsyncHttpClient Client = Fixture.MakeClient();
+ // concurrent.requests - multiple verbs in flight at once.
+ {
+ auto F1 = Client.Get("/api/async-test/hello");
+ auto F2 = Client.Get("/api/async-test/json");
+ auto F3 = Client.Post("/api/async-test/echo/method");
+ auto F4 = Client.Delete("/api/async-test/echo/method");
+ auto R1 = F1.get();
+ auto R2 = F2.get();
+ auto R3 = F3.get();
+ auto R4 = F4.get();
+ CHECK(R1.IsSuccess());
+ CHECK_EQ(R1.AsText(), "hello world");
+ CHECK(R2.IsSuccess());
+ CHECK_EQ(R2.AsText(), "{\"ok\":true}");
+ CHECK(R3.IsSuccess());
+ CHECK_EQ(R3.AsText(), "POST");
+ CHECK(R4.IsSuccess());
+ CHECK_EQ(R4.AsText(), "DELETE");
+ }
- std::string_view PutStr = "put payload";
- IoBuffer Payload(IoBuffer::Clone, PutStr.data(), PutStr.size());
- Payload.SetContentType(ZenContentType::kText);
+ // cancel.after.completion.is.noop - late Cancel must be quiet.
+ {
+ auto Resp = Client.Get("/api/async-test/hello").get();
+ REQUIRE(Resp.IsSuccess());
+ std::promise<HttpClient::Response> P;
+ auto F = P.get_future();
+ AsyncRequestToken Token = Client.AsyncGet("/api/async-test/hello", [&P](HttpClient::Response R) { P.set_value(std::move(R)); });
+ auto Resp2 = F.get();
+ REQUIRE(Resp2.IsSuccess());
+ Token.Cancel(); // no-op; must not crash
+ }
- auto Future = Client.Put("/api/async-test/echo", Payload);
- auto Resp = Future.get();
- CHECK(Resp.IsSuccess());
- CHECK_EQ(Resp.AsText(), "put payload");
+ // lifecycle.repeated.construct.destroy - 8 fresh clients against the same
+ // server. Catches io thread / curl_multi leaks across construct/destroy.
+ for (int I = 0; I < 8; ++I)
+ {
+ AsyncHttpClient Local = Fixture.MakeClient();
+ auto Resp = Local.Get("/api/async-test/hello").get();
+ CHECK(Resp.IsSuccess());
+ CHECK_EQ(Resp.AsText(), "hello world");
+ }
}
-TEST_CASE("asynchttpclient.callback")
+// Submit-side behavior: external io_context, shutdown cancel, no-queue
+// contract, cross-thread cancel-before-submit race, unlimited fan-out.
+//
+// MaxConcurrentRequests is applied as curl connection caps only; cap-level
+// fan-out throttling lives in the storage layer (see
+// server.s3asyncstorage.admission.fanout).
+TEST_CASE("asynchttpclient.submit_and_shutdown")
{
AsyncTestServerFixture Fixture;
- AsyncHttpClient Client = Fixture.MakeClient();
- std::promise<HttpClient::Response> Promise;
- auto Future = Promise.get_future();
+ // external.io_context - caller drives the run loop. Verifies the
+ // Cleanup-via-promise path in the dtor.
+ {
+ asio::io_context IoContext;
+ auto WorkGuard = asio::make_work_guard(IoContext);
+ std::thread IoThread([&IoContext]() { IoContext.run(); });
+ {
+ AsyncHttpClient Client = Fixture.MakeClient(IoContext);
+ auto Resp = Client.Get("/api/async-test/echo/method").get();
+ CHECK(Resp.IsSuccess());
+ CHECK_EQ(Resp.AsText(), "GET");
+ }
+ WorkGuard.reset();
+ IoThread.join();
+ }
+
+ // shutdown.cancels.in.flight - dtor synthesizes cancel for all in-flight.
+ {
+ const int N = 6;
+ std::vector<std::promise<HttpClient::Response>> Promises(N);
+ std::vector<std::future<HttpClient::Response>> Futures;
+ for (auto& P : Promises)
+ {
+ Futures.push_back(P.get_future());
+ }
+ {
+ AsyncHttpClient Client = Fixture.MakeClient();
+ std::vector<AsyncRequestToken> Tokens;
+ for (int I = 0; I < N; ++I)
+ {
+ Tokens.push_back(Client.AsyncGet("/api/async-test/slow?ms=2000",
+ [&Promises, I](HttpClient::Response R) { Promises[I].set_value(std::move(R)); }));
+ }
+ Sleep(50); // let requests actually start before client teardown
+ }
+ int CancelCount = 0;
+ for (auto& F : Futures)
+ {
+ REQUIRE(F.wait_for(std::chrono::seconds(5)) == std::future_status::ready);
+ HttpClient::Response R = F.get();
+ if (R.Error.has_value() && R.Error->ErrorCode == HttpClientErrorCode::kRequestCancelled)
+ {
+ ++CancelCount;
+ }
+ }
+ CHECK(CancelCount == N);
+ }
+
+ // contract.no.queue - all submissions reach the network despite cap=4.
+ // Fan-out gating is the storage layer's responsibility.
+ {
+ HttpClientSettings Settings;
+ Settings.MaxConcurrentRequests = 4;
+ AsyncHttpClient Client = Fixture.MakeClient(Settings);
+
+ const int N = 100;
+ std::vector<std::promise<HttpClient::Response>> Promises(N);
+ std::vector<std::future<HttpClient::Response>> Futures;
+ std::vector<AsyncRequestToken> Tokens;
+ for (auto& P : Promises)
+ {
+ Futures.push_back(P.get_future());
+ }
+ for (int I = 0; I < N; ++I)
+ {
+ Tokens.push_back(
+ Client.AsyncGet("/api/async-test/hello", [&Promises, I](HttpClient::Response R) { Promises[I].set_value(std::move(R)); }));
+ }
+ for (auto& F : Futures)
+ {
+ REQUIRE(F.wait_for(std::chrono::seconds(15)) == std::future_status::ready);
+ CHECK(F.get().IsSuccess());
+ }
+ }
- Client.AsyncGet("/api/async-test/hello", [&Promise](HttpClient::Response Resp) { Promise.set_value(std::move(Resp)); });
+ // cancel.before.submit - cross-thread race: SubmitFromSpec posted, Cancel
+ // from another thread fires before/after submit handler runs. All callbacks
+ // must surface kRequestCancelled exactly once.
+ {
+ AsyncHttpClient Client = Fixture.MakeClient();
+ const int N = 16;
+ std::vector<std::promise<HttpClient::Response>> Promises(N);
+ std::vector<std::future<HttpClient::Response>> Futures;
+ std::vector<AsyncRequestToken> Tokens;
+ for (auto& P : Promises)
+ {
+ Futures.push_back(P.get_future());
+ }
+ for (int I = 0; I < N; ++I)
+ {
+ Tokens.push_back(Client.AsyncGet("/api/async-test/slow?ms=2000",
+ [&Promises, I](HttpClient::Response R) { Promises[I].set_value(std::move(R)); }));
+ }
+ std::thread CancelThread([&]() {
+ for (auto& T : Tokens)
+ {
+ T.Cancel();
+ }
+ });
+ for (auto& F : Futures)
+ {
+ REQUIRE(F.wait_for(std::chrono::seconds(5)) == std::future_status::ready);
+ HttpClient::Response R = F.get();
+ REQUIRE(R.Error.has_value());
+ CHECK(R.Error->ErrorCode == HttpClientErrorCode::kRequestCancelled);
+ }
+ CancelThread.join();
+ }
- auto Resp = Future.get();
- CHECK(Resp.IsSuccess());
- CHECK_EQ(Resp.AsText(), "hello world");
+ // unlimited.parallel.fanout - 8 parallel 100ms requests with default
+ // settings (no cap) finish well under the 800ms serial floor. Sized to one
+ // batch on the asio server's 8-thread pool so per-request setup overhead on
+ // slow CI agents does not dominate; threshold leaves >=2x margin over the
+ // ~100ms parallel ideal.
+ {
+ AsyncHttpClient Client = Fixture.MakeClient();
+ const int N = 8;
+ std::vector<std::promise<HttpClient::Response>> Promises(N);
+ std::vector<std::future<HttpClient::Response>> Futures;
+ std::vector<AsyncRequestToken> Tokens;
+ for (auto& P : Promises)
+ {
+ Futures.push_back(P.get_future());
+ }
+ const auto Start = std::chrono::steady_clock::now();
+ for (int I = 0; I < N; ++I)
+ {
+ Tokens.push_back(Client.AsyncGet("/api/async-test/slow?ms=100",
+ [&Promises, I](HttpClient::Response R) { Promises[I].set_value(std::move(R)); }));
+ }
+ for (auto& F : Futures)
+ {
+ REQUIRE(F.wait_for(std::chrono::seconds(10)) == std::future_status::ready);
+ CHECK(F.get().IsSuccess());
+ }
+ const auto ElapsedMs = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - Start).count();
+ CHECK(ElapsedMs < 600);
+ }
}
-TEST_CASE("asynchttpclient.concurrent.requests")
+// AsyncStream coverage: chunk delivery, OnData abort, mid-flight cancel.
+TEST_CASE("asynchttpclient.stream")
{
AsyncTestServerFixture Fixture;
- AsyncHttpClient Client = Fixture.MakeClient();
-
- // Fire multiple requests concurrently
- auto Future1 = Client.Get("/api/async-test/hello");
- auto Future2 = Client.Get("/api/async-test/json");
- auto Future3 = Client.Post("/api/async-test/echo/method");
- auto Future4 = Client.Delete("/api/async-test/echo/method");
- auto Resp1 = Future1.get();
- auto Resp2 = Future2.get();
- auto Resp3 = Future3.get();
- auto Resp4 = Future4.get();
+ // stream.basic - 4 MiB stream completes; bytes accounted; no body buffering.
+ {
+ AsyncHttpClient Client = Fixture.MakeClient();
+ std::atomic<uint64_t> TotalReceived{0};
+ std::atomic<uint32_t> ChunkCount{0};
+ std::atomic<uint64_t> TotalSizeSeen{0};
+ std::promise<HttpClient::Response> P;
+ auto F = P.get_future();
+
+ auto Token = Client.AsyncStream(
+ "/api/async-test/large",
+ [&](const uint8_t* /*Data*/, size_t Size, uint64_t TotalSize) -> bool {
+ TotalReceived.fetch_add(Size, std::memory_order_relaxed);
+ ChunkCount.fetch_add(1, std::memory_order_relaxed);
+ if (TotalSize != 0)
+ {
+ TotalSizeSeen.store(TotalSize, std::memory_order_relaxed);
+ }
+ return true;
+ },
+ [&P](HttpClient::Response R) { P.set_value(std::move(R)); });
- CHECK(Resp1.IsSuccess());
- CHECK_EQ(Resp1.AsText(), "hello world");
+ REQUIRE(F.wait_for(std::chrono::seconds(10)) == std::future_status::ready);
+ auto Resp = F.get();
+ CHECK(Resp.IsSuccess());
+ CHECK_EQ(TotalReceived.load(), 4u * 1024u * 1024u);
+ CHECK_EQ(TotalSizeSeen.load(), 4u * 1024u * 1024u);
+ CHECK(ChunkCount.load() >= 1);
+ CHECK_EQ(Resp.ResponsePayload.GetSize(), 0u);
+ }
- CHECK(Resp2.IsSuccess());
- CHECK_EQ(Resp2.AsText(), "{\"ok\":true}");
+ // stream.ondata.abort - returning false from OnData stops the transfer.
+ {
+ AsyncHttpClient Client = Fixture.MakeClient();
+ std::atomic<uint32_t> ChunkCount{0};
+ std::promise<HttpClient::Response> P;
+ auto F = P.get_future();
+
+ Client.AsyncStream(
+ "/api/async-test/large",
+ [&](const uint8_t*, size_t, uint64_t) -> bool {
+ ChunkCount.fetch_add(1, std::memory_order_relaxed);
+ return false;
+ },
+ [&P](HttpClient::Response R) { P.set_value(std::move(R)); });
- CHECK(Resp3.IsSuccess());
- CHECK_EQ(Resp3.AsText(), "POST");
+ REQUIRE(F.wait_for(std::chrono::seconds(10)) == std::future_status::ready);
+ auto Resp = F.get();
+ CHECK_FALSE(Resp.IsSuccess());
+ CHECK(Resp.Error.has_value());
+ CHECK(ChunkCount.load() <= 1);
+ }
- CHECK(Resp4.IsSuccess());
- CHECK_EQ(Resp4.AsText(), "DELETE");
+ // stream.cancel.mid.flight - Cancel during long stream surfaces
+ // kRequestCancelled. RetryCount=0 so no retry layer masks the error code.
+ {
+ HttpClientSettings Settings;
+ Settings.RetryCount = 0;
+ AsyncHttpClient Client = Fixture.MakeClient(Settings);
+ std::promise<HttpClient::Response> P;
+ auto F = P.get_future();
+ auto Token = Client.AsyncStream(
+ "/api/async-test/slow?ms=2000",
+ [](const uint8_t*, size_t, uint64_t) -> bool { return true; },
+ [&P](HttpClient::Response R) { P.set_value(std::move(R)); });
+ Sleep(50);
+ Token.Cancel();
+ REQUIRE(F.wait_for(std::chrono::seconds(5)) == std::future_status::ready);
+ auto Resp = F.get();
+ CHECK_FALSE(Resp.IsSuccess());
+ REQUIRE(Resp.Error.has_value());
+ CHECK(Resp.Error->ErrorCode == HttpClientErrorCode::kRequestCancelled);
+ }
}
-TEST_CASE("asynchttpclient.external.io_context")
+// High-fanout, mixed verbs, large payload, streaming-source PUT.
+TEST_CASE("asynchttpclient.stress")
{
AsyncTestServerFixture Fixture;
- asio::io_context IoContext;
- auto WorkGuard = asio::make_work_guard(IoContext);
- std::thread IoThread([&IoContext]() { IoContext.run(); });
+ // high.fanout - 32 unlimited parallel GETs all succeed.
+ {
+ AsyncHttpClient Client = Fixture.MakeClient();
+ const int N = 32;
+ std::vector<std::promise<HttpClient::Response>> Promises(N);
+ std::vector<std::future<HttpClient::Response>> Futures;
+ std::vector<AsyncRequestToken> Tokens;
+ for (auto& Pr : Promises)
+ {
+ Futures.push_back(Pr.get_future());
+ }
+ for (int I = 0; I < N; ++I)
+ {
+ Tokens.push_back(
+ Client.AsyncGet("/api/async-test/hello", [&Promises, I](HttpClient::Response R) { Promises[I].set_value(std::move(R)); }));
+ }
+ for (int I = 0; I < N; ++I)
+ {
+ REQUIRE(Futures[I].wait_for(std::chrono::seconds(15)) == std::future_status::ready);
+ auto Resp = Futures[I].get();
+ CHECK(Resp.IsSuccess());
+ CHECK_EQ(Resp.AsText(), "hello world");
+ }
+ }
+ // mixed.verbs.concurrent - GET/POST/PUT/DELETE/Stream concurrently under cap=4.
{
- AsyncHttpClient Client = Fixture.MakeClient(IoContext);
+ HttpClientSettings Settings;
+ Settings.MaxConcurrentRequests = 4;
+ AsyncHttpClient Client = Fixture.MakeClient(Settings);
+
+ IoBuffer Payload(64);
+ std::memset(Payload.MutableData(), 0xAB, Payload.GetSize());
+
+ auto FGet = Client.Get("/api/async-test/hello");
+ auto FPost = Client.Post("/api/async-test/echo", Payload);
+ auto FPut = Client.Put("/api/async-test/echo", Payload);
+ auto FDelete = Client.Delete("/api/async-test/echo/method");
+ auto FJson = Client.Get("/api/async-test/json");
+
+ std::atomic<uint64_t> StreamBytes{0};
+ std::promise<HttpClient::Response> StreamP;
+ auto StreamF = StreamP.get_future();
+ Client.AsyncStream(
+ "/api/async-test/large",
+ [&](const uint8_t*, size_t Size, uint64_t) -> bool {
+ StreamBytes.fetch_add(Size, std::memory_order_relaxed);
+ return true;
+ },
+ [&StreamP](HttpClient::Response R) { StreamP.set_value(std::move(R)); });
+
+ auto Get = FGet.get();
+ auto Post = FPost.get();
+ auto Put = FPut.get();
+ auto Delete = FDelete.get();
+ auto Json = FJson.get();
+ REQUIRE(StreamF.wait_for(std::chrono::seconds(10)) == std::future_status::ready);
+ auto Stream = StreamF.get();
+
+ CHECK(Get.IsSuccess());
+ CHECK_EQ(Get.AsText(), "hello world");
+ CHECK(Post.IsSuccess());
+ CHECK_EQ(Post.ResponsePayload.GetSize(), Payload.GetSize());
+ CHECK(Put.IsSuccess());
+ CHECK_EQ(Put.ResponsePayload.GetSize(), Payload.GetSize());
+ CHECK(Delete.IsSuccess());
+ CHECK_EQ(Delete.AsText(), "DELETE");
+ CHECK(Json.IsSuccess());
+ CHECK_EQ(Json.AsText(), "{\"ok\":true}");
+ CHECK(Stream.IsSuccess());
+ CHECK_EQ(StreamBytes.load(), 4u * 1024u * 1024u);
+ }
- auto Future = Client.Get("/api/async-test/hello");
- auto Resp = Future.get();
- CHECK(Resp.IsSuccess());
- CHECK_EQ(Resp.AsText(), "hello world");
+ // large.put.roundtrip - 4 MiB PUT echoed back; spot-check positions.
+ {
+ AsyncHttpClient Client = Fixture.MakeClient();
+ const size_t Size = 4u * 1024u * 1024u;
+ IoBuffer Payload(Size);
+ uint8_t* Data = static_cast<uint8_t*>(Payload.MutableData());
+ for (size_t I = 0; I < Size; ++I)
+ {
+ Data[I] = static_cast<uint8_t>((I * 31u) & 0xFF);
+ }
+ auto Resp = Client.Put("/api/async-test/echo", Payload).get();
+ REQUIRE(Resp.IsSuccess());
+ REQUIRE_EQ(Resp.ResponsePayload.GetSize(), Size);
+ const uint8_t* RecvData = static_cast<const uint8_t*>(Resp.ResponsePayload.GetData());
+ CHECK(RecvData[0] == Data[0]);
+ CHECK(RecvData[1u << 10] == Data[1u << 10]);
+ CHECK(RecvData[Size / 2] == Data[Size / 2]);
+ CHECK(RecvData[Size - 1] == Data[Size - 1]);
}
- WorkGuard.reset();
- IoThread.join();
+ // streaming.put.source - AsyncPut(url, size, source, callback).
+ // Part 1: 2 MiB echo round-trip.
+ // Part 2: source returning 0 with offset < TotalSize aborts via CURL_READFUNC_ABORT.
+ {
+ AsyncHttpClient Client = Fixture.MakeClient();
+
+ {
+ const size_t Size = 2u * 1024u * 1024u;
+ std::vector<uint8_t> Source(Size);
+ for (size_t I = 0; I < Size; ++I)
+ {
+ Source[I] = static_cast<uint8_t>((I * 17u + 5u) & 0xFF);
+ }
+ std::atomic<uint64_t> SourceCalls{0};
+ std::promise<HttpClient::Response> P;
+ auto F = P.get_future();
+ Client.AsyncPut(
+ "/api/async-test/echo",
+ Size,
+ [&](uint8_t* Dst, size_t MaxBytes, uint64_t Offset) -> size_t {
+ SourceCalls.fetch_add(1, std::memory_order_relaxed);
+ const size_t Remaining = Size > Offset ? Size - static_cast<size_t>(Offset) : 0;
+ const size_t Take = std::min(MaxBytes, Remaining);
+ if (Take == 0)
+ {
+ return 0;
+ }
+ std::memcpy(Dst, Source.data() + Offset, Take);
+ return Take;
+ },
+ [&P](HttpClient::Response R) { P.set_value(std::move(R)); });
+ REQUIRE(F.wait_for(std::chrono::seconds(15)) == std::future_status::ready);
+ HttpClient::Response Resp = F.get();
+ REQUIRE(Resp.IsSuccess());
+ REQUIRE_EQ(Resp.ResponsePayload.GetSize(), Size);
+ CHECK(SourceCalls.load() >= 1);
+ const uint8_t* RecvData = static_cast<const uint8_t*>(Resp.ResponsePayload.GetData());
+ CHECK(RecvData[0] == Source[0]);
+ CHECK(RecvData[Size / 2] == Source[Size / 2]);
+ CHECK(RecvData[Size - 1] == Source[Size - 1]);
+ CHECK(RecvData[1234567] == Source[1234567]);
+ }
+
+ {
+ const size_t DeclaredSize = 1024u * 1024u;
+ std::atomic<uint64_t> SourceCalls{0};
+ std::promise<HttpClient::Response> P;
+ auto F = P.get_future();
+ Client.AsyncPut(
+ "/api/async-test/echo",
+ DeclaredSize,
+ [&](uint8_t* Dst, size_t MaxBytes, uint64_t /*Offset*/) -> size_t {
+ const uint64_t Hits = SourceCalls.fetch_add(1, std::memory_order_relaxed);
+ if (Hits >= 1)
+ {
+ return 0; // abort: 0 returned with Offset < TotalSize
+ }
+ const size_t Take = std::min<size_t>(MaxBytes, 64u);
+ std::memset(Dst, 0xAB, Take);
+ return Take;
+ },
+ [&P](HttpClient::Response R) { P.set_value(std::move(R)); });
+ REQUIRE(F.wait_for(std::chrono::seconds(15)) == std::future_status::ready);
+ HttpClient::Response Resp = F.get();
+ CHECK_FALSE(Resp.IsSuccess());
+ CHECK(Resp.Error.has_value());
+ }
+ }
}
-TEST_CASE("asynchttpclient.connection.error")
+// Connection-error / retry semantics targeting a dead port. No server fixture
+// needed; each scope constructs its own client with bespoke timeout settings.
+TEST_CASE("asynchttpclient.connection_errors")
{
- // Connect to a port where nothing is listening
- AsyncHttpClient Client("127.0.0.1:1", HttpClientSettings{.ConnectTimeout = std::chrono::milliseconds(500)});
+ // connection.error - Get against dead port surfaces connection error.
+ {
+ AsyncHttpClient Client("127.0.0.1:1", HttpClientSettings{.ConnectTimeout = std::chrono::milliseconds(500)});
+ auto Resp = Client.Get("/should-fail").get();
+ CHECK_FALSE(Resp.IsSuccess());
+ CHECK(Resp.Error.has_value());
+ CHECK(Resp.Error->IsConnectionError());
+ }
+
+ // retry.respected.on.connection.error - 2 retries adds >=300ms backoff
+ // (100ms + 200ms accumulated past the initial attempt).
+ {
+ HttpClientSettings Settings{
+ .ConnectTimeout = std::chrono::milliseconds(50),
+ .RetryCount = 2,
+ };
+ AsyncHttpClient Client("127.0.0.1:1", Settings);
+ const auto Start = std::chrono::steady_clock::now();
+ auto Resp = Client.Get("/should-fail").get();
+ const auto Elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - Start);
+ CHECK_FALSE(Resp.IsSuccess());
+ CHECK(Elapsed.count() >= 300);
+ }
- auto Future = Client.Get("/should-fail");
- auto Resp = Future.get();
+ // cancel.in.flight - Cancel mid-connect doesn't hang (regardless of which
+ // side of the strand wins the cancel-vs-ECONNREFUSED race).
+ {
+ HttpClientSettings Settings{
+ .ConnectTimeout = std::chrono::milliseconds(60000),
+ .RetryCount = 0,
+ };
+ AsyncHttpClient Client("127.0.0.1:1", Settings);
+ std::promise<HttpClient::Response> P;
+ auto F = P.get_future();
+ const auto Start = std::chrono::steady_clock::now();
+ AsyncRequestToken Token = Client.AsyncGet("/should-cancel", [&P](HttpClient::Response R) { P.set_value(std::move(R)); });
+ Token.Cancel();
+ REQUIRE(F.wait_for(std::chrono::seconds(5)) == std::future_status::ready);
+ const auto Elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - Start);
+ auto Resp = F.get();
+ CHECK_FALSE(Resp.IsSuccess());
+ REQUIRE(Resp.Error.has_value());
+ CHECK((Resp.Error->ErrorCode == HttpClientErrorCode::kRequestCancelled ||
+ Resp.Error->ErrorCode == HttpClientErrorCode::kConnectionFailure));
+ CHECK(Elapsed.count() < 5000);
+ }
- CHECK_FALSE(Resp.IsSuccess());
- CHECK(Resp.Error.has_value());
- CHECK(Resp.Error->IsConnectionError());
+ // retry.zero.no.retry - RetryCount=0 returns promptly (< 1s) with no
+ // extra backoff past ConnectTimeout.
+ {
+ HttpClientSettings Settings{
+ .ConnectTimeout = std::chrono::milliseconds(50),
+ .RetryCount = 0,
+ };
+ AsyncHttpClient Client("127.0.0.1:1", Settings);
+ const auto Start = std::chrono::steady_clock::now();
+ auto Resp = Client.Get("/should-fail").get();
+ const auto Elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - Start);
+ CHECK_FALSE(Resp.IsSuccess());
+ CHECK(Elapsed.count() < 1000);
+ }
}
TEST_SUITE_END();