aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-04-09 11:02:41 +0200
committerGitHub Enterprise <[email protected]>2026-04-09 11:02:41 +0200
commit5900f6a6d892fbe582c46063cc399a840e60ef2e (patch)
tree76735ff6de39c2c515a866ecc9d7b4309d63669d /src
parentmigrate from http_parser to llhttp (#929) (diff)
downloadzen-5900f6a6d892fbe582c46063cc399a840e60ef2e.tar.xz
zen-5900f6a6d892fbe582c46063cc399a840e60ef2e.zip
Add async HTTP client (curl_multi + ASIO) (#918)
- Adds `AsyncHttpClient` — an asynchronous HTTP client using `curl_multi_socket_action` integrated with ASIO for event-driven I/O. Supports GET, POST, PUT, DELETE, HEAD with both callback-based and `std::future`-based APIs. - Extracts shared curl helpers (callbacks, URL encoding, header construction, error mapping) into `httpclientcurlhelpers.h`, eliminating duplication between the sync and async implementations. ## Design - All curl_multi state is serialized on an `asio::strand`, safe with multi-threaded io_contexts. - Two construction modes: owned io_context (creates internal thread) or external io_context (caller runs the loop). - Socket readiness is detected via `asio::ip::tcp::socket::async_wait` driven by curl's `CURLMOPT_SOCKETFUNCTION`/`CURLMOPT_TIMERFUNCTION` — no polling, sub-millisecond latency. - Completion callbacks are dispatched off the strand onto the io_context so slow callbacks don't starve the curl event loop. Exceptions in callbacks are caught and logged. ## Files | File | Change | |------|--------| | `zenhttp/include/zenhttp/asynchttpclient.h` | New public header | | `zenhttp/clients/asynchttpclient.cpp` | Implementation (~1000 lines) | | `zenhttp/clients/httpclientcurlhelpers.h` | Shared curl helpers extracted from sync client | | `zenhttp/clients/httpclientcurl.cpp` | Removed duplicated helpers, uses shared header | | `zenhttp/asynchttpclient_test.cpp` | 8 test cases: verbs, payloads, callbacks, concurrency, external io_context, connection errors | | `zenhttp/zenhttp.cpp` | Forcelink registration for new tests |
Diffstat (limited to 'src')
-rw-r--r--src/zenhttp/asynchttpclient_test.cpp315
-rw-r--r--src/zenhttp/clients/asynchttpclient.cpp1033
-rw-r--r--src/zenhttp/clients/httpclientcurl.cpp279
-rw-r--r--src/zenhttp/clients/httpclientcurlhelpers.h293
-rw-r--r--src/zenhttp/include/zenhttp/asynchttpclient.h123
-rw-r--r--src/zenhttp/zenhttp.cpp2
6 files changed, 1776 insertions, 269 deletions
diff --git a/src/zenhttp/asynchttpclient_test.cpp b/src/zenhttp/asynchttpclient_test.cpp
new file mode 100644
index 000000000..151863370
--- /dev/null
+++ b/src/zenhttp/asynchttpclient_test.cpp
@@ -0,0 +1,315 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenhttp/asynchttpclient.h>
+#include <zenhttp/httpserver.h>
+
+#if ZEN_WITH_TESTS
+
+# include <zencore/iobuffer.h>
+# include <zencore/logging.h>
+# include <zencore/scopeguard.h>
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+
+# include "servers/httpasio.h"
+
+# include <atomic>
+# include <thread>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+# include <asio.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+using namespace std::literals;
+
+//////////////////////////////////////////////////////////////////////////
+// Reusable test service for async client tests
+
+class AsyncHttpClientTestService : public HttpService
+{
+public:
+ AsyncHttpClientTestService()
+ {
+ m_Router.RegisterRoute(
+ "hello",
+ [](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK, HttpContentType::kText, "hello world"); },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "echo",
+ [](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ IoBuffer Body = HttpReq.ReadPayload();
+ HttpContentType CT = HttpReq.RequestContentType();
+ HttpReq.WriteResponse(HttpResponseCode::OK, CT, Body);
+ },
+ HttpVerb::kPost | HttpVerb::kPut);
+
+ m_Router.RegisterRoute(
+ "echo/method",
+ [](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ std::string_view Method = ToString(HttpReq.RequestVerb());
+ HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, Method);
+ },
+ HttpVerb::kGet | HttpVerb::kPost | HttpVerb::kPut | HttpVerb::kDelete | HttpVerb::kHead);
+
+ m_Router.RegisterRoute(
+ "nocontent",
+ [](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::NoContent); },
+ HttpVerb::kGet | HttpVerb::kPost | HttpVerb::kPut | HttpVerb::kDelete);
+
+ m_Router.RegisterRoute(
+ "json",
+ [](HttpRouterRequest& Req) {
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, HttpContentType::kJSON, "{\"ok\":true}");
+ },
+ HttpVerb::kGet);
+ }
+
+ virtual const char* BaseUri() const override { return "/api/async-test/"; }
+ virtual void HandleRequest(HttpServerRequest& Request) override { m_Router.HandleRequest(Request); }
+
+private:
+ HttpRequestRouter m_Router;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+struct AsyncTestServerFixture
+{
+ AsyncHttpClientTestService TestService;
+ ScopedTemporaryDirectory TmpDir;
+ Ref<HttpServer> Server;
+ std::thread ServerThread;
+ int Port = -1;
+
+ AsyncTestServerFixture()
+ {
+ Server = CreateHttpAsioServer(AsioConfig{});
+ Port = Server->Initialize(0, TmpDir.Path());
+ ZEN_ASSERT(Port != -1);
+ Server->RegisterService(TestService);
+ ServerThread = std::thread([this]() { Server->Run(false); });
+ }
+
+ ~AsyncTestServerFixture()
+ {
+ Server->RequestExit();
+ if (ServerThread.joinable())
+ {
+ ServerThread.join();
+ }
+ Server->Close();
+ }
+
+ AsyncHttpClient MakeClient(HttpClientSettings Settings = {}) { return AsyncHttpClient(fmt::format("127.0.0.1:{}", Port), Settings); }
+
+ AsyncHttpClient MakeClient(asio::io_context& IoContext, HttpClientSettings Settings = {})
+ {
+ return AsyncHttpClient(fmt::format("127.0.0.1:{}", Port), IoContext, Settings);
+ }
+};
+
+//////////////////////////////////////////////////////////////////////////
+// Tests
+
+TEST_SUITE_BEGIN("http.asynchttpclient");
+
+TEST_CASE("asynchttpclient.future.verbs")
+{
+ AsyncTestServerFixture Fixture;
+ AsyncHttpClient Client = Fixture.MakeClient();
+
+ SUBCASE("GET returns 200 with expected body")
+ {
+ auto Future = Client.Get("/api/async-test/echo/method");
+ auto Resp = Future.get();
+ CHECK(Resp.IsSuccess());
+ CHECK_EQ(Resp.AsText(), "GET");
+ }
+
+ SUBCASE("POST dispatches correctly")
+ {
+ auto Future = Client.Post("/api/async-test/echo/method");
+ auto Resp = Future.get();
+ CHECK(Resp.IsSuccess());
+ CHECK_EQ(Resp.AsText(), "POST");
+ }
+
+ SUBCASE("PUT dispatches correctly")
+ {
+ auto Future = Client.Put("/api/async-test/echo/method");
+ auto Resp = Future.get();
+ CHECK(Resp.IsSuccess());
+ CHECK_EQ(Resp.AsText(), "PUT");
+ }
+
+ SUBCASE("DELETE dispatches correctly")
+ {
+ auto Future = Client.Delete("/api/async-test/echo/method");
+ auto Resp = Future.get();
+ CHECK(Resp.IsSuccess());
+ CHECK_EQ(Resp.AsText(), "DELETE");
+ }
+
+ SUBCASE("HEAD returns 200 with empty body")
+ {
+ auto Future = Client.Head("/api/async-test/echo/method");
+ auto Resp = Future.get();
+ CHECK(Resp.IsSuccess());
+ CHECK_EQ(Resp.AsText(), ""sv);
+ }
+}
+
+TEST_CASE("asynchttpclient.future.get")
+{
+ AsyncTestServerFixture Fixture;
+ AsyncHttpClient Client = Fixture.MakeClient();
+
+ SUBCASE("simple GET with text response")
+ {
+ auto Future = Client.Get("/api/async-test/hello");
+ auto Resp = Future.get();
+ CHECK(Resp.IsSuccess());
+ CHECK_EQ(Resp.StatusCode, HttpResponseCode::OK);
+ CHECK_EQ(Resp.AsText(), "hello world");
+ }
+
+ SUBCASE("GET returning JSON")
+ {
+ auto Future = Client.Get("/api/async-test/json");
+ auto Resp = Future.get();
+ CHECK(Resp.IsSuccess());
+ CHECK_EQ(Resp.AsText(), "{\"ok\":true}");
+ }
+
+ SUBCASE("GET 204 NoContent")
+ {
+ auto Future = Client.Get("/api/async-test/nocontent");
+ auto Resp = Future.get();
+ CHECK(Resp.IsSuccess());
+ CHECK_EQ(Resp.StatusCode, HttpResponseCode::NoContent);
+ }
+}
+
+TEST_CASE("asynchttpclient.future.post.with.payload")
+{
+ AsyncTestServerFixture Fixture;
+ AsyncHttpClient Client = Fixture.MakeClient();
+
+ std::string_view PayloadStr = "async payload data";
+ IoBuffer Payload(IoBuffer::Clone, PayloadStr.data(), PayloadStr.size());
+ Payload.SetContentType(ZenContentType::kText);
+
+ auto Future = Client.Post("/api/async-test/echo", Payload);
+ auto Resp = Future.get();
+ CHECK(Resp.IsSuccess());
+ CHECK_EQ(Resp.AsText(), "async payload data");
+}
+
+TEST_CASE("asynchttpclient.future.put.with.payload")
+{
+ AsyncTestServerFixture Fixture;
+ AsyncHttpClient Client = Fixture.MakeClient();
+
+ std::string_view PutStr = "put payload";
+ IoBuffer Payload(IoBuffer::Clone, PutStr.data(), PutStr.size());
+ Payload.SetContentType(ZenContentType::kText);
+
+ auto Future = Client.Put("/api/async-test/echo", Payload);
+ auto Resp = Future.get();
+ CHECK(Resp.IsSuccess());
+ CHECK_EQ(Resp.AsText(), "put payload");
+}
+
+TEST_CASE("asynchttpclient.callback")
+{
+ AsyncTestServerFixture Fixture;
+ AsyncHttpClient Client = Fixture.MakeClient();
+
+ std::promise<HttpClient::Response> Promise;
+ auto Future = Promise.get_future();
+
+ Client.AsyncGet("/api/async-test/hello", [&Promise](HttpClient::Response Resp) { Promise.set_value(std::move(Resp)); });
+
+ auto Resp = Future.get();
+ CHECK(Resp.IsSuccess());
+ CHECK_EQ(Resp.AsText(), "hello world");
+}
+
+TEST_CASE("asynchttpclient.concurrent.requests")
+{
+ AsyncTestServerFixture Fixture;
+ AsyncHttpClient Client = Fixture.MakeClient();
+
+ // Fire multiple requests concurrently
+ auto Future1 = Client.Get("/api/async-test/hello");
+ auto Future2 = Client.Get("/api/async-test/json");
+ auto Future3 = Client.Post("/api/async-test/echo/method");
+ auto Future4 = Client.Delete("/api/async-test/echo/method");
+
+ auto Resp1 = Future1.get();
+ auto Resp2 = Future2.get();
+ auto Resp3 = Future3.get();
+ auto Resp4 = Future4.get();
+
+ CHECK(Resp1.IsSuccess());
+ CHECK_EQ(Resp1.AsText(), "hello world");
+
+ CHECK(Resp2.IsSuccess());
+ CHECK_EQ(Resp2.AsText(), "{\"ok\":true}");
+
+ CHECK(Resp3.IsSuccess());
+ CHECK_EQ(Resp3.AsText(), "POST");
+
+ CHECK(Resp4.IsSuccess());
+ CHECK_EQ(Resp4.AsText(), "DELETE");
+}
+
+TEST_CASE("asynchttpclient.external.io_context")
+{
+ AsyncTestServerFixture Fixture;
+
+ asio::io_context IoContext;
+ auto WorkGuard = asio::make_work_guard(IoContext);
+ std::thread IoThread([&IoContext]() { IoContext.run(); });
+
+ {
+ AsyncHttpClient Client = Fixture.MakeClient(IoContext);
+
+ auto Future = Client.Get("/api/async-test/hello");
+ auto Resp = Future.get();
+ CHECK(Resp.IsSuccess());
+ CHECK_EQ(Resp.AsText(), "hello world");
+ }
+
+ WorkGuard.reset();
+ IoThread.join();
+}
+
+TEST_CASE("asynchttpclient.connection.error")
+{
+ // Connect to a port where nothing is listening
+ AsyncHttpClient Client("127.0.0.1:1", HttpClientSettings{.ConnectTimeout = std::chrono::milliseconds(500)});
+
+ auto Future = Client.Get("/should-fail");
+ auto Resp = Future.get();
+
+ CHECK_FALSE(Resp.IsSuccess());
+ CHECK(Resp.Error.has_value());
+ CHECK(Resp.Error->IsConnectionError());
+}
+
+TEST_SUITE_END();
+
+void
+asynchttpclient_test_forcelink()
+{
+}
+
+} // namespace zen
+
+#endif
diff --git a/src/zenhttp/clients/asynchttpclient.cpp b/src/zenhttp/clients/asynchttpclient.cpp
new file mode 100644
index 000000000..bdf7f160c
--- /dev/null
+++ b/src/zenhttp/clients/asynchttpclient.cpp
@@ -0,0 +1,1033 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenhttp/asynchttpclient.h>
+
+#include "httpclientcurlhelpers.h"
+
+#include <zencore/filesystem.h>
+#include <zencore/logging.h>
+#include <zencore/session.h>
+#include <zencore/thread.h>
+#include <zencore/trace.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <asio.hpp>
+#include <asio/steady_timer.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <thread>
+#include <unordered_map>
+
+namespace zen {
+
+//////////////////////////////////////////////////////////////////////////
+//
+// TransferContext: per-transfer state associated with each CURL easy handle
+
+struct TransferContext
+{
+ AsyncHttpCallback Callback;
+ std::string Body;
+ std::vector<std::pair<std::string, std::string>> ResponseHeaders;
+ CurlWriteCallbackData WriteData;
+ CurlHeaderCallbackData HeaderData;
+ curl_slist* HeaderList = nullptr;
+
+ // For PUT/POST with payload: keep the data alive until transfer completes
+ IoBuffer PayloadBuffer;
+ CurlReadCallbackData ReadData;
+
+ TransferContext(AsyncHttpCallback&& InCallback) : Callback(std::move(InCallback))
+ {
+ WriteData.Body = &Body;
+ HeaderData.Headers = &ResponseHeaders;
+ }
+
+ ~TransferContext()
+ {
+ if (HeaderList)
+ {
+ curl_slist_free_all(HeaderList);
+ }
+ }
+
+ TransferContext(const TransferContext&) = delete;
+ TransferContext& operator=(const TransferContext&) = delete;
+};
+
+//////////////////////////////////////////////////////////////////////////
+//
+// AsyncHttpClient::Impl
+
+struct AsyncHttpClient::Impl
+{
+ Impl(std::string_view BaseUri, const HttpClientSettings& Settings)
+ : m_BaseUri(BaseUri)
+ , m_Settings(Settings)
+ , m_Log(logging::Get(Settings.LogCategory))
+ , m_OwnedIoContext(std::make_unique<asio::io_context>())
+ , m_IoContext(*m_OwnedIoContext)
+ , m_Strand(asio::make_strand(m_IoContext))
+ , m_Timer(m_Strand)
+ {
+ Init();
+ m_WorkGuard.emplace(m_IoContext.get_executor());
+ m_IoThread = std::thread([this]() {
+ SetCurrentThreadName("async_http");
+ try
+ {
+ m_IoContext.run();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("AsyncHttpClient: unhandled exception in io thread: {}", Ex.what());
+ }
+ });
+ }
+
+ Impl(std::string_view BaseUri, asio::io_context& IoContext, const HttpClientSettings& Settings)
+ : m_BaseUri(BaseUri)
+ , m_Settings(Settings)
+ , m_Log(logging::Get(Settings.LogCategory))
+ , m_IoContext(IoContext)
+ , m_Strand(asio::make_strand(m_IoContext))
+ , m_Timer(m_Strand)
+ {
+ Init();
+ }
+
+ ~Impl()
+ {
+ // Clean up curl state on the strand where all curl_multi operations
+ // are serialized. Use a promise to block until the cleanup handler
+ // has actually executed — essential for the external io_context case
+ // where we don't own the run loop.
+ std::promise<void> Done;
+ std::future<void> DoneFuture = Done.get_future();
+
+ asio::post(m_Strand, [this, &Done]() {
+ m_ShuttingDown = true;
+ m_Timer.cancel();
+
+ // Release all tracked sockets (don't close — curl owns the fds).
+ for (auto& [Fd, Info] : m_Sockets)
+ {
+ if (Info->Socket.is_open())
+ {
+ Info->Socket.cancel();
+ Info->Socket.release();
+ }
+ }
+ m_Sockets.clear();
+
+ for (auto& [Handle, Ctx] : m_Transfers)
+ {
+ curl_multi_remove_handle(m_Multi, Handle);
+ curl_easy_cleanup(Handle);
+ }
+ m_Transfers.clear();
+
+ for (CURL* Handle : m_HandlePool)
+ {
+ curl_easy_cleanup(Handle);
+ }
+ m_HandlePool.clear();
+
+ Done.set_value();
+ });
+
+ // For owned io_context: release work guard so run() can return after
+ // processing the cleanup handler above.
+ m_WorkGuard.reset();
+
+ if (m_IoThread.joinable())
+ {
+ m_IoThread.join();
+ }
+ else
+ {
+ // External io_context: wait for the cleanup handler to complete.
+ DoneFuture.wait();
+ }
+
+ if (m_Multi)
+ {
+ curl_multi_cleanup(m_Multi);
+ }
+ }
+
+ LoggerRef Log() { return m_Log; }
+
+ void Init()
+ {
+ m_Multi = curl_multi_init();
+ if (!m_Multi)
+ {
+ throw std::runtime_error("curl_multi_init failed");
+ }
+
+ SetupMultiCallbacks();
+
+ if (m_Settings.SessionId == Oid::Zero)
+ {
+ m_SessionId = std::string(GetSessionIdString());
+ }
+ else
+ {
+ m_SessionId = m_Settings.SessionId.ToString();
+ }
+ }
+
+ // ── Handle pool ─────────────────────────────────────────────────────
+
+ CURL* AllocHandle()
+ {
+ if (!m_HandlePool.empty())
+ {
+ CURL* Handle = m_HandlePool.back();
+ m_HandlePool.pop_back();
+ curl_easy_reset(Handle);
+ return Handle;
+ }
+ CURL* Handle = curl_easy_init();
+ if (!Handle)
+ {
+ throw std::runtime_error("curl_easy_init failed");
+ }
+ return Handle;
+ }
+
+ void ReleaseHandle(CURL* Handle) { m_HandlePool.push_back(Handle); }
+
+ // ── Configure a handle with common settings ─────────────────────────
+ // Called only from DoAsync* lambdas running on the strand.
+
+ void ConfigureHandle(CURL* Handle, std::string_view ResourcePath, const HttpClient::KeyValueMap& Parameters)
+ {
+ // Build URL
+ ExtendableStringBuilder<256> Url;
+ BuildUrlWithParameters(Url, m_BaseUri, ResourcePath, Parameters);
+ curl_easy_setopt(Handle, CURLOPT_URL, Url.c_str());
+
+ // Unix domain socket
+ if (!m_Settings.UnixSocketPath.empty())
+ {
+ m_UnixSocketPathUtf8 = PathToUtf8(m_Settings.UnixSocketPath);
+ curl_easy_setopt(Handle, CURLOPT_UNIX_SOCKET_PATH, m_UnixSocketPathUtf8.c_str());
+ }
+
+ // Timeouts
+ if (m_Settings.ConnectTimeout.count() > 0)
+ {
+ curl_easy_setopt(Handle, CURLOPT_CONNECTTIMEOUT_MS, static_cast<long>(m_Settings.ConnectTimeout.count()));
+ }
+ if (m_Settings.Timeout.count() > 0)
+ {
+ curl_easy_setopt(Handle, CURLOPT_TIMEOUT_MS, static_cast<long>(m_Settings.Timeout.count()));
+ }
+
+ // HTTP/2
+ if (m_Settings.AssumeHttp2)
+ {
+ curl_easy_setopt(Handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2_PRIOR_KNOWLEDGE);
+ }
+
+ // SSL
+ if (m_Settings.InsecureSsl)
+ {
+ curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYPEER, 0L);
+ curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYHOST, 0L);
+ }
+ if (!m_Settings.CaBundlePath.empty())
+ {
+ curl_easy_setopt(Handle, CURLOPT_CAINFO, m_Settings.CaBundlePath.c_str());
+ }
+
+ // Verbose/debug
+ if (m_Settings.Verbose)
+ {
+ curl_easy_setopt(Handle, CURLOPT_VERBOSE, 1L);
+ }
+
+ // Thread safety
+ curl_easy_setopt(Handle, CURLOPT_NOSIGNAL, 1L);
+
+ if (m_Settings.ForbidReuseConnection)
+ {
+ curl_easy_setopt(Handle, CURLOPT_FORBID_REUSE, 1L);
+ }
+ }
+
+ // ── Access token ────────────────────────────────────────────────────
+
+ std::optional<std::string> GetAccessToken()
+ {
+ if (!m_Settings.AccessTokenProvider.has_value())
+ {
+ return {};
+ }
+ {
+ RwLock::SharedLockScope _(m_AccessTokenLock);
+ if (!m_CachedAccessToken.NeedsRefresh())
+ {
+ return m_CachedAccessToken.GetValue();
+ }
+ }
+ RwLock::ExclusiveLockScope _(m_AccessTokenLock);
+ if (!m_CachedAccessToken.NeedsRefresh())
+ {
+ return m_CachedAccessToken.GetValue();
+ }
+ HttpClientAccessToken NewToken = m_Settings.AccessTokenProvider.value()();
+ if (!NewToken.IsValid())
+ {
+ ZEN_WARN("AsyncHttpClient: failed to refresh access token, retrying once");
+ NewToken = m_Settings.AccessTokenProvider.value()();
+ }
+ if (NewToken.IsValid())
+ {
+ m_CachedAccessToken = NewToken;
+ return m_CachedAccessToken.GetValue();
+ }
+ ZEN_WARN("AsyncHttpClient: access token provider returned invalid token");
+ return {};
+ }
+
+ // ── Submit a transfer ───────────────────────────────────────────────
+
+ void SubmitTransfer(CURL* Handle, std::unique_ptr<TransferContext> Ctx)
+ {
+ ZEN_TRACE_CPU("AsyncHttpClient::SubmitTransfer");
+ // Setup write/header callbacks
+ curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, CurlWriteCallback);
+ curl_easy_setopt(Handle, CURLOPT_WRITEDATA, &Ctx->WriteData);
+ curl_easy_setopt(Handle, CURLOPT_HEADERFUNCTION, CurlHeaderCallback);
+ curl_easy_setopt(Handle, CURLOPT_HEADERDATA, &Ctx->HeaderData);
+
+ m_Transfers[Handle] = std::move(Ctx);
+
+ CURLMcode Mc = curl_multi_add_handle(m_Multi, Handle);
+ if (Mc != CURLM_OK)
+ {
+ auto Stolen = std::move(m_Transfers[Handle]);
+ m_Transfers.erase(Handle);
+ ReleaseHandle(Handle);
+
+ HttpClient::Response ErrorResponse;
+ ErrorResponse.Error =
+ HttpClient::ErrorContext{.ErrorCode = HttpClientErrorCode::kInternalError,
+ .ErrorMessage = fmt::format("curl_multi_add_handle failed: {}", curl_multi_strerror(Mc))};
+ asio::post(m_IoContext,
+ [Cb = std::move(Stolen->Callback), Response = std::move(ErrorResponse)]() mutable { Cb(std::move(Response)); });
+ return;
+ }
+ }
+
+ // ── Socket-action integration ───────────────────────────────────────
+ //
+ // curl_multi drives I/O via two callbacks:
+ // - SocketCallback: curl tells us which sockets to watch for read/write
+ // - TimerCallback: curl tells us when to fire a timeout
+ //
+ // On each socket event or timeout we call curl_multi_socket_action(),
+ // then drain completed transfers via curl_multi_info_read().
+
+ // Per-socket state: wraps the native fd in an ASIO socket for async_wait.
+ struct SocketInfo
+ {
+ asio::ip::tcp::socket Socket;
+ int WatchFlags = 0; // CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT
+
+ explicit SocketInfo(asio::io_context& IoContext) : Socket(IoContext) {}
+ };
+
+ // Static thunks registered with curl_multi ────────────────────────────
+
+ static int CurlSocketCallback(CURL* Easy, curl_socket_t Fd, int Action, void* UserPtr, void* SocketPtr)
+ {
+ ZEN_UNUSED(Easy);
+ auto* Self = static_cast<Impl*>(UserPtr);
+ Self->OnCurlSocket(Fd, Action, static_cast<SocketInfo*>(SocketPtr));
+ return 0;
+ }
+
+ static int CurlTimerCallback(CURLM* Multi, long TimeoutMs, void* UserPtr)
+ {
+ ZEN_UNUSED(Multi);
+ auto* Self = static_cast<Impl*>(UserPtr);
+ Self->OnCurlTimer(TimeoutMs);
+ return 0;
+ }
+
+ void SetupMultiCallbacks()
+ {
+ curl_multi_setopt(m_Multi, CURLMOPT_SOCKETFUNCTION, CurlSocketCallback);
+ curl_multi_setopt(m_Multi, CURLMOPT_SOCKETDATA, this);
+ curl_multi_setopt(m_Multi, CURLMOPT_TIMERFUNCTION, CurlTimerCallback);
+ curl_multi_setopt(m_Multi, CURLMOPT_TIMERDATA, this);
+ }
+
+ // Called by curl when socket watch state changes ──────────────────────
+
+ void OnCurlSocket(curl_socket_t Fd, int Action, SocketInfo* Info)
+ {
+ if (Action == CURL_POLL_REMOVE)
+ {
+ if (Info)
+ {
+ // Cancel pending async_wait ops before releasing the fd.
+ // curl owns the fd, so we must release() rather than close().
+ Info->Socket.cancel();
+ if (Info->Socket.is_open())
+ {
+ Info->Socket.release();
+ }
+ m_Sockets.erase(Fd);
+ }
+ return;
+ }
+
+ if (!Info)
+ {
+ // New socket — wrap the native fd in an ASIO socket.
+ auto [It, Inserted] = m_Sockets.emplace(Fd, std::make_unique<SocketInfo>(m_IoContext));
+ Info = It->second.get();
+
+ asio::error_code Ec;
+ // Determine protocol from the fd (v4 vs v6). Default to v4.
+ Info->Socket.assign(asio::ip::tcp::v4(), Fd, Ec);
+ if (Ec)
+ {
+ // Try v6 as fallback
+ Info->Socket.assign(asio::ip::tcp::v6(), Fd, Ec);
+ }
+ if (Ec)
+ {
+ ZEN_WARN("AsyncHttpClient: failed to assign socket fd {}: {}", static_cast<int>(Fd), Ec.message());
+ m_Sockets.erase(Fd);
+ return;
+ }
+
+ curl_multi_assign(m_Multi, Fd, Info);
+ }
+
+ Info->WatchFlags = Action;
+ SetSocketWatch(Fd, Info);
+ }
+
+ void SetSocketWatch(curl_socket_t Fd, SocketInfo* Info)
+ {
+ // Cancel any pending wait before issuing a new one.
+ Info->Socket.cancel();
+
+ if (Info->WatchFlags & CURL_POLL_IN)
+ {
+ Info->Socket.async_wait(asio::socket_base::wait_read, asio::bind_executor(m_Strand, [this, Fd](const asio::error_code& Ec) {
+ if (Ec || m_ShuttingDown)
+ {
+ return;
+ }
+ OnSocketReady(Fd, CURL_CSELECT_IN);
+ }));
+ }
+
+ if (Info->WatchFlags & CURL_POLL_OUT)
+ {
+ Info->Socket.async_wait(asio::socket_base::wait_write, asio::bind_executor(m_Strand, [this, Fd](const asio::error_code& Ec) {
+ if (Ec || m_ShuttingDown)
+ {
+ return;
+ }
+ OnSocketReady(Fd, CURL_CSELECT_OUT);
+ }));
+ }
+ }
+
+ void OnSocketReady(curl_socket_t Fd, int CurlAction)
+ {
+ ZEN_TRACE_CPU("AsyncHttpClient::OnSocketReady");
+ int StillRunning = 0;
+ curl_multi_socket_action(m_Multi, Fd, CurlAction, &StillRunning);
+ CheckCompleted();
+
+ // Re-arm the watch if the socket is still tracked.
+ auto It = m_Sockets.find(Fd);
+ if (It != m_Sockets.end())
+ {
+ SetSocketWatch(Fd, It->second.get());
+ }
+ }
+
+ // Called by curl when it wants a timeout ──────────────────────────────
+
+ void OnCurlTimer(long TimeoutMs)
+ {
+ m_Timer.cancel();
+
+ if (TimeoutMs < 0)
+ {
+ // curl says "no timeout needed"
+ return;
+ }
+
+ if (TimeoutMs == 0)
+ {
+ // curl wants immediate action — run it directly on the strand.
+ asio::post(m_Strand, [this]() {
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ int StillRunning = 0;
+ curl_multi_socket_action(m_Multi, CURL_SOCKET_TIMEOUT, 0, &StillRunning);
+ CheckCompleted();
+ });
+ return;
+ }
+
+ m_Timer.expires_after(std::chrono::milliseconds(TimeoutMs));
+ m_Timer.async_wait(asio::bind_executor(m_Strand, [this](const asio::error_code& Ec) {
+ if (Ec || m_ShuttingDown)
+ {
+ return;
+ }
+ ZEN_TRACE_CPU("AsyncHttpClient::OnTimeout");
+ int StillRunning = 0;
+ curl_multi_socket_action(m_Multi, CURL_SOCKET_TIMEOUT, 0, &StillRunning);
+ CheckCompleted();
+ }));
+ }
+
+ // Drain completed transfers from curl_multi ──────────────────────────
+
+ void CheckCompleted()
+ {
+ int MsgsLeft = 0;
+ CURLMsg* Msg = nullptr;
+ while ((Msg = curl_multi_info_read(m_Multi, &MsgsLeft)) != nullptr)
+ {
+ if (Msg->msg != CURLMSG_DONE)
+ {
+ continue;
+ }
+
+ CURL* Handle = Msg->easy_handle;
+ CURLcode Result = Msg->data.result;
+
+ curl_multi_remove_handle(m_Multi, Handle);
+
+ auto It = m_Transfers.find(Handle);
+ if (It == m_Transfers.end())
+ {
+ ReleaseHandle(Handle);
+ continue;
+ }
+
+ std::unique_ptr<TransferContext> Ctx = std::move(It->second);
+ m_Transfers.erase(It);
+
+ CompleteTransfer(Handle, Result, std::move(Ctx));
+ }
+ }
+
+ void CompleteTransfer(CURL* Handle, CURLcode CurlResult, std::unique_ptr<TransferContext> Ctx)
+ {
+ ZEN_TRACE_CPU("AsyncHttpClient::CompleteTransfer");
+ // Extract result info
+ long StatusCode = 0;
+ curl_easy_getinfo(Handle, CURLINFO_RESPONSE_CODE, &StatusCode);
+
+ double Elapsed = 0;
+ curl_easy_getinfo(Handle, CURLINFO_TOTAL_TIME, &Elapsed);
+
+ curl_off_t UpBytes = 0;
+ curl_easy_getinfo(Handle, CURLINFO_SIZE_UPLOAD_T, &UpBytes);
+
+ curl_off_t DownBytes = 0;
+ curl_easy_getinfo(Handle, CURLINFO_SIZE_DOWNLOAD_T, &DownBytes);
+
+ ReleaseHandle(Handle);
+
+ // Build response
+ HttpClient::Response Response;
+ Response.StatusCode = HttpResponseCode(StatusCode);
+ Response.UploadedBytes = static_cast<int64_t>(UpBytes);
+ Response.DownloadedBytes = static_cast<int64_t>(DownBytes);
+ Response.ElapsedSeconds = Elapsed;
+ Response.Header = BuildHeaderMap(Ctx->ResponseHeaders);
+
+ if (CurlResult != CURLE_OK)
+ {
+ const char* ErrorMsg = curl_easy_strerror(CurlResult);
+
+ if (CurlResult != CURLE_OPERATION_TIMEDOUT && CurlResult != CURLE_COULDNT_CONNECT && CurlResult != CURLE_ABORTED_BY_CALLBACK)
+ {
+ ZEN_WARN("AsyncHttpClient failure: ({}) '{}'", static_cast<int>(CurlResult), ErrorMsg);
+ }
+
+ if (!Ctx->Body.empty())
+ {
+ Response.ResponsePayload = IoBufferBuilder::MakeCloneFromMemory(Ctx->Body.data(), Ctx->Body.size());
+ }
+
+ Response.Error = HttpClient::ErrorContext{.ErrorCode = MapCurlError(CurlResult), .ErrorMessage = std::string(ErrorMsg)};
+ }
+ else if (StatusCode == static_cast<long>(HttpResponseCode::NoContent) || Ctx->Body.empty())
+ {
+ // No payload
+ }
+ else
+ {
+ IoBuffer PayloadBuffer = IoBufferBuilder::MakeCloneFromMemory(Ctx->Body.data(), Ctx->Body.size());
+ ApplyContentTypeFromHeaders(PayloadBuffer, Ctx->ResponseHeaders);
+
+ const HttpResponseCode Code = HttpResponseCode(StatusCode);
+ if (!IsHttpSuccessCode(Code) && Code != HttpResponseCode::NotFound)
+ {
+ ZEN_WARN("AsyncHttpClient request failed: status={}, base={}", static_cast<int>(Code), m_BaseUri);
+ }
+
+ Response.ResponsePayload = std::move(PayloadBuffer);
+ }
+
+ // Dispatch the user callback off the strand so a slow callback
+ // cannot starve the curl_multi poll loop.
+ asio::post(m_IoContext, [LogRef = m_Log, Cb = std::move(Ctx->Callback), Response = std::move(Response)]() mutable {
+ try
+ {
+ Cb(std::move(Response));
+ }
+ catch (const std::exception& Ex)
+ {
+ auto Log = [&]() -> LoggerRef { return LogRef; };
+ ZEN_ERROR("AsyncHttpClient: unhandled exception in completion callback: {}", Ex.what());
+ }
+ });
+ }
+
+ // ── Async verb implementations ──────────────────────────────────────
+
+ void DoAsyncGet(std::string Url,
+ AsyncHttpCallback Callback,
+ HttpClient::KeyValueMap AdditionalHeader,
+ HttpClient::KeyValueMap Parameters)
+ {
+ asio::post(m_Strand,
+ [this,
+ Url = std::move(Url),
+ Callback = std::move(Callback),
+ AdditionalHeader = std::move(AdditionalHeader),
+ Parameters = std::move(Parameters)]() mutable {
+ ZEN_TRACE_CPU("AsyncHttpClient::Get");
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ CURL* Handle = AllocHandle();
+ ConfigureHandle(Handle, Url, Parameters);
+ curl_easy_setopt(Handle, CURLOPT_HTTPGET, 1L);
+
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken());
+ curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
+
+ SubmitTransfer(Handle, std::move(Ctx));
+ });
+ }
+
+ void DoAsyncHead(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader)
+ {
+ asio::post(m_Strand,
+ [this, Url = std::move(Url), Callback = std::move(Callback), AdditionalHeader = std::move(AdditionalHeader)]() mutable {
+ ZEN_TRACE_CPU("AsyncHttpClient::Head");
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ CURL* Handle = AllocHandle();
+ ConfigureHandle(Handle, Url, {});
+ curl_easy_setopt(Handle, CURLOPT_NOBODY, 1L);
+
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken());
+ curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
+
+ SubmitTransfer(Handle, std::move(Ctx));
+ });
+ }
+
+ void DoAsyncDelete(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader)
+ {
+ asio::post(m_Strand,
+ [this, Url = std::move(Url), Callback = std::move(Callback), AdditionalHeader = std::move(AdditionalHeader)]() mutable {
+ ZEN_TRACE_CPU("AsyncHttpClient::Delete");
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ CURL* Handle = AllocHandle();
+ ConfigureHandle(Handle, Url, {});
+ curl_easy_setopt(Handle, CURLOPT_CUSTOMREQUEST, "DELETE");
+
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken());
+ curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
+
+ SubmitTransfer(Handle, std::move(Ctx));
+ });
+ }
+
+ void DoAsyncPost(std::string Url,
+ AsyncHttpCallback Callback,
+ HttpClient::KeyValueMap AdditionalHeader,
+ HttpClient::KeyValueMap Parameters)
+ {
+ asio::post(m_Strand,
+ [this,
+ Url = std::move(Url),
+ Callback = std::move(Callback),
+ AdditionalHeader = std::move(AdditionalHeader),
+ Parameters = std::move(Parameters)]() mutable {
+ ZEN_TRACE_CPU("AsyncHttpClient::Post");
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ CURL* Handle = AllocHandle();
+ ConfigureHandle(Handle, Url, Parameters);
+ curl_easy_setopt(Handle, CURLOPT_POST, 1L);
+ curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE, 0L);
+
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken());
+ curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
+
+ SubmitTransfer(Handle, std::move(Ctx));
+ });
+ }
+
+ void DoAsyncPostWithPayload(std::string Url,
+ IoBuffer Payload,
+ ZenContentType ContentType,
+ AsyncHttpCallback Callback,
+ HttpClient::KeyValueMap AdditionalHeader)
+ {
+ asio::post(m_Strand,
+ [this,
+ Url = std::move(Url),
+ Payload = std::move(Payload),
+ ContentType,
+ Callback = std::move(Callback),
+ AdditionalHeader = std::move(AdditionalHeader)]() mutable {
+ ZEN_TRACE_CPU("AsyncHttpClient::PostWithPayload");
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ CURL* Handle = AllocHandle();
+ ConfigureHandle(Handle, Url, {});
+ curl_easy_setopt(Handle, CURLOPT_POST, 1L);
+
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->PayloadBuffer = std::move(Payload);
+ Ctx->HeaderList =
+ BuildHeaderList(AdditionalHeader,
+ m_SessionId,
+ GetAccessToken(),
+ {std::make_pair("Content-Type", std::string(MapContentTypeToString(ContentType)))});
+ curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
+
+ // Set up read callback for payload data
+ Ctx->ReadData.DataPtr = static_cast<const uint8_t*>(Ctx->PayloadBuffer.GetData());
+ Ctx->ReadData.DataSize = Ctx->PayloadBuffer.GetSize();
+ Ctx->ReadData.Offset = 0;
+
+ curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE_LARGE, static_cast<curl_off_t>(Ctx->PayloadBuffer.GetSize()));
+ curl_easy_setopt(Handle, CURLOPT_READFUNCTION, CurlReadCallback);
+ curl_easy_setopt(Handle, CURLOPT_READDATA, &Ctx->ReadData);
+
+ SubmitTransfer(Handle, std::move(Ctx));
+ });
+ }
+
+ void DoAsyncPutWithPayload(std::string Url,
+ IoBuffer Payload,
+ AsyncHttpCallback Callback,
+ HttpClient::KeyValueMap AdditionalHeader,
+ HttpClient::KeyValueMap Parameters)
+ {
+ asio::post(m_Strand,
+ [this,
+ Url = std::move(Url),
+ Payload = std::move(Payload),
+ Callback = std::move(Callback),
+ AdditionalHeader = std::move(AdditionalHeader),
+ Parameters = std::move(Parameters)]() mutable {
+ ZEN_TRACE_CPU("AsyncHttpClient::Put");
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ CURL* Handle = AllocHandle();
+ ConfigureHandle(Handle, Url, Parameters);
+ curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L);
+
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->PayloadBuffer = std::move(Payload);
+ Ctx->HeaderList = BuildHeaderList(
+ AdditionalHeader,
+ m_SessionId,
+ GetAccessToken(),
+ {std::make_pair("Content-Type", std::string(MapContentTypeToString(Ctx->PayloadBuffer.GetContentType())))});
+ curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
+
+ Ctx->ReadData.DataPtr = static_cast<const uint8_t*>(Ctx->PayloadBuffer.GetData());
+ Ctx->ReadData.DataSize = Ctx->PayloadBuffer.GetSize();
+ Ctx->ReadData.Offset = 0;
+
+ curl_easy_setopt(Handle, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(Ctx->PayloadBuffer.GetSize()));
+ curl_easy_setopt(Handle, CURLOPT_READFUNCTION, CurlReadCallback);
+ curl_easy_setopt(Handle, CURLOPT_READDATA, &Ctx->ReadData);
+
+ SubmitTransfer(Handle, std::move(Ctx));
+ });
+ }
+
+ void DoAsyncPutNoPayload(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap Parameters)
+ {
+ asio::post(m_Strand, [this, Url = std::move(Url), Callback = std::move(Callback), Parameters = std::move(Parameters)]() mutable {
+ ZEN_TRACE_CPU("AsyncHttpClient::Put");
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ CURL* Handle = AllocHandle();
+ ConfigureHandle(Handle, Url, Parameters);
+ curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L);
+ curl_easy_setopt(Handle, CURLOPT_INFILESIZE_LARGE, 0LL);
+
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+
+ HttpClient::KeyValueMap ContentLengthHeader{std::pair<std::string_view, std::string_view>{"Content-Length", "0"}};
+ Ctx->HeaderList = BuildHeaderList(ContentLengthHeader, m_SessionId, GetAccessToken());
+ curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
+
+ SubmitTransfer(Handle, std::move(Ctx));
+ });
+ }
+
+ // ── Members ─────────────────────────────────────────────────────────
+
+ std::string m_BaseUri;
+ HttpClientSettings m_Settings;
+ LoggerRef m_Log;
+ std::string m_SessionId;
+ std::string m_UnixSocketPathUtf8;
+
+ // io_context and strand — all curl_multi operations are serialized on the
+ // strand, making this safe even when the io_context has multiple threads.
+ std::unique_ptr<asio::io_context> m_OwnedIoContext;
+ asio::io_context& m_IoContext;
+ asio::strand<asio::io_context::executor_type> m_Strand;
+ std::optional<asio::executor_work_guard<asio::io_context::executor_type>> m_WorkGuard;
+ std::thread m_IoThread;
+
+ // curl_multi and socket-action state
+ CURLM* m_Multi = nullptr;
+ std::unordered_map<CURL*, std::unique_ptr<TransferContext>> m_Transfers;
+ std::vector<CURL*> m_HandlePool;
+ std::unordered_map<curl_socket_t, std::unique_ptr<SocketInfo>> m_Sockets;
+ asio::steady_timer m_Timer;
+ bool m_ShuttingDown = false;
+
+ // Access token cache
+ RwLock m_AccessTokenLock;
+ HttpClientAccessToken m_CachedAccessToken;
+};
+
+//////////////////////////////////////////////////////////////////////////
+//
+// AsyncHttpClient public API
+
+AsyncHttpClient::AsyncHttpClient(std::string_view BaseUri, const HttpClientSettings& Settings)
+: m_Impl(std::make_unique<Impl>(BaseUri, Settings))
+{
+}
+
+AsyncHttpClient::AsyncHttpClient(std::string_view BaseUri, asio::io_context& IoContext, const HttpClientSettings& Settings)
+: m_Impl(std::make_unique<Impl>(BaseUri, IoContext, Settings))
+{
+}
+
+AsyncHttpClient::~AsyncHttpClient() = default;
+
+// ── Callback-based API ──────────────────────────────────────────────────
+
+void
+AsyncHttpClient::AsyncGet(std::string_view Url,
+ AsyncHttpCallback Callback,
+ const KeyValueMap& AdditionalHeader,
+ const KeyValueMap& Parameters)
+{
+ m_Impl->DoAsyncGet(std::string(Url), std::move(Callback), AdditionalHeader, Parameters);
+}
+
+void
+AsyncHttpClient::AsyncHead(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader)
+{
+ m_Impl->DoAsyncHead(std::string(Url), std::move(Callback), AdditionalHeader);
+}
+
+void
+AsyncHttpClient::AsyncDelete(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader)
+{
+ m_Impl->DoAsyncDelete(std::string(Url), std::move(Callback), AdditionalHeader);
+}
+
+void
+AsyncHttpClient::AsyncPost(std::string_view Url,
+ AsyncHttpCallback Callback,
+ const KeyValueMap& AdditionalHeader,
+ const KeyValueMap& Parameters)
+{
+ m_Impl->DoAsyncPost(std::string(Url), std::move(Callback), AdditionalHeader, Parameters);
+}
+
+void
+AsyncHttpClient::AsyncPost(std::string_view Url, const IoBuffer& Payload, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader)
+{
+ m_Impl->DoAsyncPostWithPayload(std::string(Url), Payload, Payload.GetContentType(), std::move(Callback), AdditionalHeader);
+}
+
+void
+AsyncHttpClient::AsyncPost(std::string_view Url,
+ const IoBuffer& Payload,
+ ZenContentType ContentType,
+ AsyncHttpCallback Callback,
+ const KeyValueMap& AdditionalHeader)
+{
+ m_Impl->DoAsyncPostWithPayload(std::string(Url), Payload, ContentType, std::move(Callback), AdditionalHeader);
+}
+
+void
+AsyncHttpClient::AsyncPut(std::string_view Url,
+ const IoBuffer& Payload,
+ AsyncHttpCallback Callback,
+ const KeyValueMap& AdditionalHeader,
+ const KeyValueMap& Parameters)
+{
+ m_Impl->DoAsyncPutWithPayload(std::string(Url), Payload, std::move(Callback), AdditionalHeader, Parameters);
+}
+
+void
+AsyncHttpClient::AsyncPut(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& Parameters)
+{
+ m_Impl->DoAsyncPutNoPayload(std::string(Url), std::move(Callback), Parameters);
+}
+
+// ── Future-based API ────────────────────────────────────────────────────
+
+std::future<HttpClient::Response>
+AsyncHttpClient::Get(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters)
+{
+ auto Promise = std::make_shared<std::promise<Response>>();
+ auto Future = Promise->get_future();
+ AsyncGet(
+ Url,
+ [Promise](Response R) { Promise->set_value(std::move(R)); },
+ AdditionalHeader,
+ Parameters);
+ return Future;
+}
+
+std::future<HttpClient::Response>
+AsyncHttpClient::Head(std::string_view Url, const KeyValueMap& AdditionalHeader)
+{
+ auto Promise = std::make_shared<std::promise<Response>>();
+ auto Future = Promise->get_future();
+ AsyncHead(
+ Url,
+ [Promise](Response R) { Promise->set_value(std::move(R)); },
+ AdditionalHeader);
+ return Future;
+}
+
+std::future<HttpClient::Response>
+AsyncHttpClient::Delete(std::string_view Url, const KeyValueMap& AdditionalHeader)
+{
+ auto Promise = std::make_shared<std::promise<Response>>();
+ auto Future = Promise->get_future();
+ AsyncDelete(
+ Url,
+ [Promise](Response R) { Promise->set_value(std::move(R)); },
+ AdditionalHeader);
+ return Future;
+}
+
+std::future<HttpClient::Response>
+AsyncHttpClient::Post(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters)
+{
+ auto Promise = std::make_shared<std::promise<Response>>();
+ auto Future = Promise->get_future();
+ AsyncPost(
+ Url,
+ [Promise](Response R) { Promise->set_value(std::move(R)); },
+ AdditionalHeader,
+ Parameters);
+ return Future;
+}
+
+std::future<HttpClient::Response>
+AsyncHttpClient::Post(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader)
+{
+ auto Promise = std::make_shared<std::promise<Response>>();
+ auto Future = Promise->get_future();
+ AsyncPost(
+ Url,
+ Payload,
+ [Promise](Response R) { Promise->set_value(std::move(R)); },
+ AdditionalHeader);
+ return Future;
+}
+
+std::future<HttpClient::Response>
+AsyncHttpClient::Post(std::string_view Url, const IoBuffer& Payload, ZenContentType ContentType, const KeyValueMap& AdditionalHeader)
+{
+ auto Promise = std::make_shared<std::promise<Response>>();
+ auto Future = Promise->get_future();
+ AsyncPost(
+ Url,
+ Payload,
+ ContentType,
+ [Promise](Response R) { Promise->set_value(std::move(R)); },
+ AdditionalHeader);
+ return Future;
+}
+
+std::future<HttpClient::Response>
+AsyncHttpClient::Put(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters)
+{
+ auto Promise = std::make_shared<std::promise<Response>>();
+ auto Future = Promise->get_future();
+ AsyncPut(
+ Url,
+ Payload,
+ [Promise](Response R) { Promise->set_value(std::move(R)); },
+ AdditionalHeader,
+ Parameters);
+ return Future;
+}
+
+std::future<HttpClient::Response>
+AsyncHttpClient::Put(std::string_view Url, const KeyValueMap& Parameters)
+{
+ auto Promise = std::make_shared<std::promise<Response>>();
+ auto Future = Promise->get_future();
+ AsyncPut(
+ Url,
+ [Promise](Response R) { Promise->set_value(std::move(R)); },
+ Parameters);
+ return Future;
+}
+
+} // namespace zen
diff --git a/src/zenhttp/clients/httpclientcurl.cpp b/src/zenhttp/clients/httpclientcurl.cpp
index d150b44c6..b9af9bd52 100644
--- a/src/zenhttp/clients/httpclientcurl.cpp
+++ b/src/zenhttp/clients/httpclientcurl.cpp
@@ -1,6 +1,7 @@
// Copyright Epic Games, Inc. All Rights Reserved.
#include "httpclientcurl.h"
+#include "httpclientcurlhelpers.h"
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
@@ -29,153 +30,7 @@ static std::atomic<uint32_t> CurlHttpClientRequestIdCounter{0};
//////////////////////////////////////////////////////////////////////////
-static HttpClientErrorCode
-MapCurlError(CURLcode Code)
-{
- switch (Code)
- {
- case CURLE_OK:
- return HttpClientErrorCode::kOK;
- case CURLE_COULDNT_CONNECT:
- return HttpClientErrorCode::kConnectionFailure;
- case CURLE_COULDNT_RESOLVE_HOST:
- return HttpClientErrorCode::kHostResolutionFailure;
- case CURLE_COULDNT_RESOLVE_PROXY:
- return HttpClientErrorCode::kProxyResolutionFailure;
- case CURLE_RECV_ERROR:
- return HttpClientErrorCode::kNetworkReceiveError;
- case CURLE_SEND_ERROR:
- return HttpClientErrorCode::kNetworkSendFailure;
- case CURLE_OPERATION_TIMEDOUT:
- return HttpClientErrorCode::kOperationTimedOut;
- case CURLE_SSL_CONNECT_ERROR:
- return HttpClientErrorCode::kSSLConnectError;
- case CURLE_SSL_CERTPROBLEM:
- return HttpClientErrorCode::kSSLCertificateError;
- case CURLE_PEER_FAILED_VERIFICATION:
- return HttpClientErrorCode::kSSLCACertError;
- case CURLE_SSL_CIPHER:
- case CURLE_SSL_ENGINE_NOTFOUND:
- case CURLE_SSL_ENGINE_SETFAILED:
- return HttpClientErrorCode::kGenericSSLError;
- case CURLE_ABORTED_BY_CALLBACK:
- return HttpClientErrorCode::kRequestCancelled;
- default:
- return HttpClientErrorCode::kOtherError;
- }
-}
-
-//////////////////////////////////////////////////////////////////////////
-//
-// Curl callback helpers
-
-struct WriteCallbackData
-{
- std::string* Body = nullptr;
- std::function<bool()>* CheckIfAbortFunction = nullptr;
-};
-
-static size_t
-CurlWriteCallback(char* Ptr, size_t Size, size_t Nmemb, void* UserData)
-{
- auto* Data = static_cast<WriteCallbackData*>(UserData);
- size_t TotalBytes = Size * Nmemb;
-
- if (Data->CheckIfAbortFunction && *Data->CheckIfAbortFunction && (*Data->CheckIfAbortFunction)())
- {
- return 0; // Signal abort to curl
- }
-
- Data->Body->append(Ptr, TotalBytes);
- return TotalBytes;
-}
-
-struct HeaderCallbackData
-{
- std::vector<std::pair<std::string, std::string>>* Headers = nullptr;
-};
-
-// Trims trailing CRLF, splits on the first colon, and trims whitespace from key and value.
-// Returns nullopt for blank lines or lines without a colon (e.g. HTTP status lines).
-static std::optional<std::pair<std::string_view, std::string_view>>
-ParseHeaderLine(std::string_view Line)
-{
- while (!Line.empty() && (Line.back() == '\r' || Line.back() == '\n'))
- {
- Line.remove_suffix(1);
- }
-
- if (Line.empty())
- {
- return std::nullopt;
- }
-
- size_t ColonPos = Line.find(':');
- if (ColonPos == std::string_view::npos)
- {
- return std::nullopt;
- }
-
- std::string_view Key = Line.substr(0, ColonPos);
- std::string_view Value = Line.substr(ColonPos + 1);
-
- while (!Key.empty() && Key.back() == ' ')
- {
- Key.remove_suffix(1);
- }
- while (!Value.empty() && Value.front() == ' ')
- {
- Value.remove_prefix(1);
- }
-
- return std::pair{Key, Value};
-}
-
-static size_t
-CurlHeaderCallback(char* Buffer, size_t Size, size_t Nmemb, void* UserData)
-{
- auto* Data = static_cast<HeaderCallbackData*>(UserData);
- size_t TotalBytes = Size * Nmemb;
-
- if (auto Header = ParseHeaderLine(std::string_view(Buffer, TotalBytes)))
- {
- auto& [Key, Value] = *Header;
- Data->Headers->emplace_back(std::string(Key), std::string(Value));
- }
-
- return TotalBytes;
-}
-
-struct ReadCallbackData
-{
- const uint8_t* DataPtr = nullptr;
- size_t DataSize = 0;
- size_t Offset = 0;
- std::function<bool()>* CheckIfAbortFunction = nullptr;
-};
-
-static size_t
-CurlReadCallback(char* Buffer, size_t Size, size_t Nmemb, void* UserData)
-{
- auto* Data = static_cast<ReadCallbackData*>(UserData);
- size_t MaxRead = Size * Nmemb;
-
- if (Data->CheckIfAbortFunction && *Data->CheckIfAbortFunction && (*Data->CheckIfAbortFunction)())
- {
- return CURL_READFUNC_ABORT;
- }
-
- size_t Remaining = Data->DataSize - Data->Offset;
- size_t ToRead = std::min(MaxRead, Remaining);
-
- if (ToRead > 0)
- {
- memcpy(Buffer, Data->DataPtr + Data->Offset, ToRead);
- Data->Offset += ToRead;
- }
-
- return ToRead;
-}
+// Curl callback helpers and shared utilities are in httpclientcurlhelpers.h
struct StreamReadCallbackData
{
@@ -281,120 +136,6 @@ CurlDebugCallback(CURL* Handle, curl_infotype Type, char* Data, size_t Size, voi
//////////////////////////////////////////////////////////////////////////
-static std::pair<std::string, std::string>
-HeaderContentType(ZenContentType ContentType)
-{
- return std::make_pair("Content-Type", std::string(MapContentTypeToString(ContentType)));
-}
-
-static curl_slist*
-BuildHeaderList(const HttpClient::KeyValueMap& AdditionalHeader,
- std::string_view SessionId,
- const std::optional<std::string>& AccessToken,
- const std::vector<std::pair<std::string, std::string>>& ExtraHeaders = {})
-{
- curl_slist* Headers = nullptr;
-
- for (const auto& [Key, Value] : *AdditionalHeader)
- {
- ExtendableStringBuilder<64> HeaderLine;
- HeaderLine << Key << ": " << Value;
- Headers = curl_slist_append(Headers, HeaderLine.c_str());
- }
-
- if (!SessionId.empty())
- {
- ExtendableStringBuilder<64> SessionHeader;
- SessionHeader << "UE-Session: " << SessionId;
- Headers = curl_slist_append(Headers, SessionHeader.c_str());
- }
-
- if (AccessToken.has_value())
- {
- ExtendableStringBuilder<128> AuthHeader;
- AuthHeader << "Authorization: " << AccessToken.value();
- Headers = curl_slist_append(Headers, AuthHeader.c_str());
- }
-
- for (const auto& [Key, Value] : ExtraHeaders)
- {
- ExtendableStringBuilder<128> HeaderLine;
- HeaderLine << Key << ": " << Value;
- Headers = curl_slist_append(Headers, HeaderLine.c_str());
- }
-
- return Headers;
-}
-
-static HttpClient::KeyValueMap
-BuildHeaderMap(const std::vector<std::pair<std::string, std::string>>& Headers)
-{
- HttpClient::KeyValueMap HeaderMap;
- for (const auto& [Key, Value] : Headers)
- {
- HeaderMap->insert_or_assign(Key, Value);
- }
- return HeaderMap;
-}
-
-// Scans response headers for Content-Type and applies it to the buffer.
-static void
-ApplyContentTypeFromHeaders(IoBuffer& Buffer, const std::vector<std::pair<std::string, std::string>>& Headers)
-{
- for (const auto& [Key, Value] : Headers)
- {
- if (StrCaseCompare(Key, "Content-Type") == 0)
- {
- Buffer.SetContentType(ParseContentType(Value));
- break;
- }
- }
-}
-
-static void
-AppendUrlEncoded(StringBuilderBase& Out, std::string_view Input)
-{
- static constexpr char HexDigits[] = "0123456789ABCDEF";
- static constexpr AsciiSet Unreserved("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~");
-
- for (char C : Input)
- {
- if (Unreserved.Contains(C))
- {
- Out.Append(C);
- }
- else
- {
- uint8_t Byte = static_cast<uint8_t>(C);
- char Encoded[3] = {'%', HexDigits[Byte >> 4], HexDigits[Byte & 0x0F]};
- Out.Append(std::string_view(Encoded, 3));
- }
- }
-}
-
-static void
-BuildUrlWithParameters(StringBuilderBase& Url,
- std::string_view BaseUrl,
- std::string_view ResourcePath,
- const HttpClient::KeyValueMap& Parameters)
-{
- Url.Append(BaseUrl);
- Url.Append(ResourcePath);
-
- if (!Parameters->empty())
- {
- char Separator = '?';
- for (const auto& [Key, Value] : *Parameters)
- {
- Url.Append(Separator);
- AppendUrlEncoded(Url, Key);
- Url.Append('=');
- AppendUrlEncoded(Url, Value);
- Separator = '&';
- }
- }
-}
-
//////////////////////////////////////////////////////////////////////////
CurlHttpClient::CurlHttpClient(std::string_view BaseUri,
@@ -440,9 +181,9 @@ CurlHttpClient::CurlResult
CurlHttpClient::Session::PerformWithResponseCallbacks()
{
std::string Body;
- WriteCallbackData WriteData{.Body = &Body,
+ CurlWriteCallbackData WriteData{.Body = &Body,
.CheckIfAbortFunction = Outer->m_CheckIfAbortFunction ? &Outer->m_CheckIfAbortFunction : nullptr};
- HeaderCallbackData HdrData{};
+ CurlHeaderCallbackData HdrData{};
std::vector<std::pair<std::string, std::string>> ResponseHeaders;
HdrData.Headers = &ResponseHeaders;
@@ -998,9 +739,9 @@ CurlHttpClient::Put(std::string_view Url, const IoBuffer& Payload, const KeyValu
curl_easy_setopt(H, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(H, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(Payload.GetSize()));
- ReadCallbackData ReadData{.DataPtr = static_cast<const uint8_t*>(Payload.GetData()),
- .DataSize = Payload.GetSize(),
- .CheckIfAbortFunction = m_CheckIfAbortFunction ? &m_CheckIfAbortFunction : nullptr};
+ CurlReadCallbackData ReadData{.DataPtr = static_cast<const uint8_t*>(Payload.GetData()),
+ .DataSize = Payload.GetSize(),
+ .CheckIfAbortFunction = m_CheckIfAbortFunction ? &m_CheckIfAbortFunction : nullptr};
curl_easy_setopt(H, CURLOPT_READFUNCTION, CurlReadCallback);
curl_easy_setopt(H, CURLOPT_READDATA, &ReadData);
@@ -1367,9 +1108,9 @@ CurlHttpClient::Upload(std::string_view Url, const IoBuffer& Payload, const KeyV
return Sess.PerformWithResponseCallbacks();
}
- ReadCallbackData ReadData{.DataPtr = static_cast<const uint8_t*>(Payload.GetData()),
- .DataSize = Payload.GetSize(),
- .CheckIfAbortFunction = m_CheckIfAbortFunction ? &m_CheckIfAbortFunction : nullptr};
+ CurlReadCallbackData ReadData{.DataPtr = static_cast<const uint8_t*>(Payload.GetData()),
+ .DataSize = Payload.GetSize(),
+ .CheckIfAbortFunction = m_CheckIfAbortFunction ? &m_CheckIfAbortFunction : nullptr};
curl_easy_setopt(H, CURLOPT_READFUNCTION, CurlReadCallback);
curl_easy_setopt(H, CURLOPT_READDATA, &ReadData);
diff --git a/src/zenhttp/clients/httpclientcurlhelpers.h b/src/zenhttp/clients/httpclientcurlhelpers.h
new file mode 100644
index 000000000..0605a30f6
--- /dev/null
+++ b/src/zenhttp/clients/httpclientcurlhelpers.h
@@ -0,0 +1,293 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+// Shared helpers for curl-based HTTP client implementations (sync and async).
+// This is an internal header, not part of the public API.
+
+#include <zencore/string.h>
+
+#include <zenhttp/httpclient.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <curl/curl.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <optional>
+#include <string>
+#include <utility>
+#include <vector>
+
+namespace zen {
+
+//////////////////////////////////////////////////////////////////////////
+//
+// Error mapping
+
+inline HttpClientErrorCode
+MapCurlError(CURLcode Code)
+{
+ switch (Code)
+ {
+ case CURLE_OK:
+ return HttpClientErrorCode::kOK;
+ case CURLE_COULDNT_CONNECT:
+ return HttpClientErrorCode::kConnectionFailure;
+ case CURLE_COULDNT_RESOLVE_HOST:
+ return HttpClientErrorCode::kHostResolutionFailure;
+ case CURLE_COULDNT_RESOLVE_PROXY:
+ return HttpClientErrorCode::kProxyResolutionFailure;
+ case CURLE_RECV_ERROR:
+ return HttpClientErrorCode::kNetworkReceiveError;
+ case CURLE_SEND_ERROR:
+ return HttpClientErrorCode::kNetworkSendFailure;
+ case CURLE_OPERATION_TIMEDOUT:
+ return HttpClientErrorCode::kOperationTimedOut;
+ case CURLE_SSL_CONNECT_ERROR:
+ return HttpClientErrorCode::kSSLConnectError;
+ case CURLE_SSL_CERTPROBLEM:
+ return HttpClientErrorCode::kSSLCertificateError;
+ case CURLE_PEER_FAILED_VERIFICATION:
+ return HttpClientErrorCode::kSSLCACertError;
+ case CURLE_SSL_CIPHER:
+ case CURLE_SSL_ENGINE_NOTFOUND:
+ case CURLE_SSL_ENGINE_SETFAILED:
+ return HttpClientErrorCode::kGenericSSLError;
+ case CURLE_ABORTED_BY_CALLBACK:
+ return HttpClientErrorCode::kRequestCancelled;
+ default:
+ return HttpClientErrorCode::kOtherError;
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+//
+// Curl callback data structures and callbacks
+
+struct CurlWriteCallbackData
+{
+ std::string* Body = nullptr;
+ std::function<bool()>* CheckIfAbortFunction = nullptr;
+};
+
+inline size_t
+CurlWriteCallback(char* Ptr, size_t Size, size_t Nmemb, void* UserData)
+{
+ auto* Data = static_cast<CurlWriteCallbackData*>(UserData);
+ size_t TotalBytes = Size * Nmemb;
+
+ if (Data->CheckIfAbortFunction && *Data->CheckIfAbortFunction && (*Data->CheckIfAbortFunction)())
+ {
+ return 0; // Signal abort to curl
+ }
+
+ Data->Body->append(Ptr, TotalBytes);
+ return TotalBytes;
+}
+
+struct CurlHeaderCallbackData
+{
+ std::vector<std::pair<std::string, std::string>>* Headers = nullptr;
+};
+
+// Trims trailing CRLF, splits on the first colon, and trims whitespace from key and value.
+// Returns nullopt for blank lines or lines without a colon (e.g. HTTP status lines).
+inline std::optional<std::pair<std::string_view, std::string_view>>
+ParseHeaderLine(std::string_view Line)
+{
+ while (!Line.empty() && (Line.back() == '\r' || Line.back() == '\n'))
+ {
+ Line.remove_suffix(1);
+ }
+
+ if (Line.empty())
+ {
+ return std::nullopt;
+ }
+
+ size_t ColonPos = Line.find(':');
+ if (ColonPos == std::string_view::npos)
+ {
+ return std::nullopt;
+ }
+
+ std::string_view Key = Line.substr(0, ColonPos);
+ std::string_view Value = Line.substr(ColonPos + 1);
+
+ while (!Key.empty() && Key.back() == ' ')
+ {
+ Key.remove_suffix(1);
+ }
+ while (!Value.empty() && Value.front() == ' ')
+ {
+ Value.remove_prefix(1);
+ }
+
+ return std::pair{Key, Value};
+}
+
+inline size_t
+CurlHeaderCallback(char* Buffer, size_t Size, size_t Nmemb, void* UserData)
+{
+ auto* Data = static_cast<CurlHeaderCallbackData*>(UserData);
+ size_t TotalBytes = Size * Nmemb;
+
+ if (auto Header = ParseHeaderLine(std::string_view(Buffer, TotalBytes)))
+ {
+ auto& [Key, Value] = *Header;
+ Data->Headers->emplace_back(std::string(Key), std::string(Value));
+ }
+
+ return TotalBytes;
+}
+
+struct CurlReadCallbackData
+{
+ const uint8_t* DataPtr = nullptr;
+ size_t DataSize = 0;
+ size_t Offset = 0;
+ std::function<bool()>* CheckIfAbortFunction = nullptr;
+};
+
+inline size_t
+CurlReadCallback(char* Buffer, size_t Size, size_t Nmemb, void* UserData)
+{
+ auto* Data = static_cast<CurlReadCallbackData*>(UserData);
+ size_t MaxRead = Size * Nmemb;
+
+ if (Data->CheckIfAbortFunction && *Data->CheckIfAbortFunction && (*Data->CheckIfAbortFunction)())
+ {
+ return CURL_READFUNC_ABORT;
+ }
+
+ size_t Remaining = Data->DataSize - Data->Offset;
+ size_t ToRead = std::min(MaxRead, Remaining);
+
+ if (ToRead > 0)
+ {
+ memcpy(Buffer, Data->DataPtr + Data->Offset, ToRead);
+ Data->Offset += ToRead;
+ }
+
+ return ToRead;
+}
+
+//////////////////////////////////////////////////////////////////////////
+//
+// URL and header construction
+
+inline void
+AppendUrlEncoded(StringBuilderBase& Out, std::string_view Input)
+{
+ static constexpr char HexDigits[] = "0123456789ABCDEF";
+ static constexpr AsciiSet Unreserved("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~");
+
+ for (char C : Input)
+ {
+ if (Unreserved.Contains(C))
+ {
+ Out.Append(C);
+ }
+ else
+ {
+ uint8_t Byte = static_cast<uint8_t>(C);
+ char Encoded[3] = {'%', HexDigits[Byte >> 4], HexDigits[Byte & 0x0F]};
+ Out.Append(std::string_view(Encoded, 3));
+ }
+ }
+}
+
+inline void
+BuildUrlWithParameters(StringBuilderBase& Url,
+ std::string_view BaseUrl,
+ std::string_view ResourcePath,
+ const HttpClient::KeyValueMap& Parameters)
+{
+ Url.Append(BaseUrl);
+ Url.Append(ResourcePath);
+
+ if (!Parameters->empty())
+ {
+ char Separator = '?';
+ for (const auto& [Key, Value] : *Parameters)
+ {
+ Url.Append(Separator);
+ AppendUrlEncoded(Url, Key);
+ Url.Append('=');
+ AppendUrlEncoded(Url, Value);
+ Separator = '&';
+ }
+ }
+}
+
+inline std::pair<std::string, std::string>
+HeaderContentType(ZenContentType ContentType)
+{
+ return std::make_pair("Content-Type", std::string(MapContentTypeToString(ContentType)));
+}
+
+inline curl_slist*
+BuildHeaderList(const HttpClient::KeyValueMap& AdditionalHeader,
+ std::string_view SessionId,
+ const std::optional<std::string>& AccessToken,
+ const std::vector<std::pair<std::string, std::string>>& ExtraHeaders = {})
+{
+ curl_slist* Headers = nullptr;
+
+ for (const auto& [Key, Value] : *AdditionalHeader)
+ {
+ ExtendableStringBuilder<64> HeaderLine;
+ HeaderLine << Key << ": " << Value;
+ Headers = curl_slist_append(Headers, HeaderLine.c_str());
+ }
+
+ if (!SessionId.empty())
+ {
+ ExtendableStringBuilder<64> SessionHeader;
+ SessionHeader << "UE-Session: " << SessionId;
+ Headers = curl_slist_append(Headers, SessionHeader.c_str());
+ }
+
+ if (AccessToken.has_value())
+ {
+ ExtendableStringBuilder<128> AuthHeader;
+ AuthHeader << "Authorization: " << AccessToken.value();
+ Headers = curl_slist_append(Headers, AuthHeader.c_str());
+ }
+
+ for (const auto& [Key, Value] : ExtraHeaders)
+ {
+ ExtendableStringBuilder<128> HeaderLine;
+ HeaderLine << Key << ": " << Value;
+ Headers = curl_slist_append(Headers, HeaderLine.c_str());
+ }
+
+ return Headers;
+}
+
+inline HttpClient::KeyValueMap
+BuildHeaderMap(const std::vector<std::pair<std::string, std::string>>& Headers)
+{
+ HttpClient::KeyValueMap HeaderMap;
+ for (const auto& [Key, Value] : Headers)
+ {
+ HeaderMap->insert_or_assign(Key, Value);
+ }
+ return HeaderMap;
+}
+
+// Scans response headers for Content-Type and applies it to the buffer.
+inline void
+ApplyContentTypeFromHeaders(IoBuffer& Buffer, const std::vector<std::pair<std::string, std::string>>& Headers)
+{
+ for (const auto& [Key, Value] : Headers)
+ {
+ if (StrCaseCompare(Key, "Content-Type") == 0)
+ {
+ Buffer.SetContentType(ParseContentType(Value));
+ break;
+ }
+ }
+}
+
+} // namespace zen
diff --git a/src/zenhttp/include/zenhttp/asynchttpclient.h b/src/zenhttp/include/zenhttp/asynchttpclient.h
new file mode 100644
index 000000000..58429349d
--- /dev/null
+++ b/src/zenhttp/include/zenhttp/asynchttpclient.h
@@ -0,0 +1,123 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "zenhttp.h"
+
+#include <zenhttp/httpclient.h>
+
+#include <functional>
+#include <future>
+#include <memory>
+
+namespace asio {
+class io_context;
+}
+
+namespace zen {
+
+/// Completion callback for async HTTP operations.
+using AsyncHttpCallback = std::function<void(HttpClient::Response)>;
+
+/** Asynchronous HTTP client backed by curl_multi and ASIO.
+ *
+ * Uses curl_multi_perform() driven by an ASIO steady_timer to process
+ * transfers without blocking the caller. All curl_multi operations are
+ * serialized on an internal strand; callers may issue requests from any
+ * thread, and the io_context may have multiple threads.
+ *
+ * Two construction modes:
+ * - Owned io_context: creates an internal thread (self-contained).
+ * - External io_context: caller runs the event loop.
+ *
+ * Completion callbacks are dispatched on the io_context (not the internal
+ * strand), so a slow callback will not block the curl poll loop. Future-
+ * based wrappers (Get, Post, ...) return a std::future<Response> for
+ * callers that prefer blocking on a result.
+ */
+class AsyncHttpClient
+{
+public:
+ using Response = HttpClient::Response;
+ using KeyValueMap = HttpClient::KeyValueMap;
+
+ /// Construct with an internally-owned io_context and thread.
+ explicit AsyncHttpClient(std::string_view BaseUri, const HttpClientSettings& Settings = {});
+
+ /// Construct with an externally-managed io_context. The io_context must
+ /// outlive this client and must be running (via run()) on at least one thread.
+ AsyncHttpClient(std::string_view BaseUri, asio::io_context& IoContext, const HttpClientSettings& Settings = {});
+
+ ~AsyncHttpClient();
+
+ AsyncHttpClient(const AsyncHttpClient&) = delete;
+ AsyncHttpClient& operator=(const AsyncHttpClient&) = delete;
+
+ // ── Callback-based API ──────────────────────────────────────────────
+
+ void AsyncGet(std::string_view Url,
+ AsyncHttpCallback Callback,
+ const KeyValueMap& AdditionalHeader = {},
+ const KeyValueMap& Parameters = {});
+
+ void AsyncHead(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader = {});
+
+ void AsyncDelete(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader = {});
+
+ void AsyncPost(std::string_view Url,
+ AsyncHttpCallback Callback,
+ const KeyValueMap& AdditionalHeader = {},
+ const KeyValueMap& Parameters = {});
+
+ void AsyncPost(std::string_view Url, const IoBuffer& Payload, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader = {});
+
+ void AsyncPost(std::string_view Url,
+ const IoBuffer& Payload,
+ ZenContentType ContentType,
+ AsyncHttpCallback Callback,
+ const KeyValueMap& AdditionalHeader = {});
+
+ void AsyncPut(std::string_view Url,
+ const IoBuffer& Payload,
+ AsyncHttpCallback Callback,
+ const KeyValueMap& AdditionalHeader = {},
+ const KeyValueMap& Parameters = {});
+
+ void AsyncPut(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& Parameters = {});
+
+ // ── Future-based API ────────────────────────────────────────────────
+
+ [[nodiscard]] std::future<Response> Get(std::string_view Url,
+ const KeyValueMap& AdditionalHeader = {},
+ const KeyValueMap& Parameters = {});
+
+ [[nodiscard]] std::future<Response> Head(std::string_view Url, const KeyValueMap& AdditionalHeader = {});
+
+ [[nodiscard]] std::future<Response> Delete(std::string_view Url, const KeyValueMap& AdditionalHeader = {});
+
+ [[nodiscard]] std::future<Response> Post(std::string_view Url,
+ const KeyValueMap& AdditionalHeader = {},
+ const KeyValueMap& Parameters = {});
+
+ [[nodiscard]] std::future<Response> Post(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader = {});
+
+ [[nodiscard]] std::future<Response> Post(std::string_view Url,
+ const IoBuffer& Payload,
+ ZenContentType ContentType,
+ const KeyValueMap& AdditionalHeader = {});
+
+ [[nodiscard]] std::future<Response> Put(std::string_view Url,
+ const IoBuffer& Payload,
+ const KeyValueMap& AdditionalHeader = {},
+ const KeyValueMap& Parameters = {});
+
+ [[nodiscard]] std::future<Response> Put(std::string_view Url, const KeyValueMap& Parameters = {});
+
+private:
+ struct Impl;
+ std::unique_ptr<Impl> m_Impl;
+};
+
+void asynchttpclient_test_forcelink(); // internal
+
+} // namespace zen
diff --git a/src/zenhttp/zenhttp.cpp b/src/zenhttp/zenhttp.cpp
index 0b2a7ca7c..e15aa4d30 100644
--- a/src/zenhttp/zenhttp.cpp
+++ b/src/zenhttp/zenhttp.cpp
@@ -4,6 +4,7 @@
#if ZEN_WITH_TESTS
+# include <zenhttp/asynchttpclient.h>
# include <zenhttp/httpclient.h>
# include <zenhttp/httpserver.h>
# include <zenhttp/packageformat.h>
@@ -18,6 +19,7 @@ zenhttp_forcelinktests()
httpclient_forcelink();
httpparser_forcelink();
httpclient_test_forcelink();
+ asynchttpclient_test_forcelink();
forcelink_packageformat();
passwordsecurity_forcelink();
websocket_forcelink();