aboutsummaryrefslogtreecommitdiff
path: root/src/zenhttp/clients/asynchttpclient.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-04-09 11:02:41 +0200
committerGitHub Enterprise <[email protected]>2026-04-09 11:02:41 +0200
commit5900f6a6d892fbe582c46063cc399a840e60ef2e (patch)
tree76735ff6de39c2c515a866ecc9d7b4309d63669d /src/zenhttp/clients/asynchttpclient.cpp
parentmigrate from http_parser to llhttp (#929) (diff)
downloadzen-5900f6a6d892fbe582c46063cc399a840e60ef2e.tar.xz
zen-5900f6a6d892fbe582c46063cc399a840e60ef2e.zip
Add async HTTP client (curl_multi + ASIO) (#918)
- Adds `AsyncHttpClient` — an asynchronous HTTP client using `curl_multi_socket_action` integrated with ASIO for event-driven I/O. Supports GET, POST, PUT, DELETE, HEAD with both callback-based and `std::future`-based APIs. - Extracts shared curl helpers (callbacks, URL encoding, header construction, error mapping) into `httpclientcurlhelpers.h`, eliminating duplication between the sync and async implementations. ## Design - All curl_multi state is serialized on an `asio::strand`, safe with multi-threaded io_contexts. - Two construction modes: owned io_context (creates internal thread) or external io_context (caller runs the loop). - Socket readiness is detected via `asio::ip::tcp::socket::async_wait` driven by curl's `CURLMOPT_SOCKETFUNCTION`/`CURLMOPT_TIMERFUNCTION` — no polling, sub-millisecond latency. - Completion callbacks are dispatched off the strand onto the io_context so slow callbacks don't starve the curl event loop. Exceptions in callbacks are caught and logged. ## Files | File | Change | |------|--------| | `zenhttp/include/zenhttp/asynchttpclient.h` | New public header | | `zenhttp/clients/asynchttpclient.cpp` | Implementation (~1000 lines) | | `zenhttp/clients/httpclientcurlhelpers.h` | Shared curl helpers extracted from sync client | | `zenhttp/clients/httpclientcurl.cpp` | Removed duplicated helpers, uses shared header | | `zenhttp/asynchttpclient_test.cpp` | 8 test cases: verbs, payloads, callbacks, concurrency, external io_context, connection errors | | `zenhttp/zenhttp.cpp` | Forcelink registration for new tests |
Diffstat (limited to 'src/zenhttp/clients/asynchttpclient.cpp')
-rw-r--r--src/zenhttp/clients/asynchttpclient.cpp1033
1 files changed, 1033 insertions, 0 deletions
diff --git a/src/zenhttp/clients/asynchttpclient.cpp b/src/zenhttp/clients/asynchttpclient.cpp
new file mode 100644
index 000000000..bdf7f160c
--- /dev/null
+++ b/src/zenhttp/clients/asynchttpclient.cpp
@@ -0,0 +1,1033 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenhttp/asynchttpclient.h>
+
+#include "httpclientcurlhelpers.h"
+
+#include <zencore/filesystem.h>
+#include <zencore/logging.h>
+#include <zencore/session.h>
+#include <zencore/thread.h>
+#include <zencore/trace.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <asio.hpp>
+#include <asio/steady_timer.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <thread>
+#include <unordered_map>
+
+namespace zen {
+
+//////////////////////////////////////////////////////////////////////////
+//
+// TransferContext: per-transfer state associated with each CURL easy handle
+
+struct TransferContext
+{
+ AsyncHttpCallback Callback;
+ std::string Body;
+ std::vector<std::pair<std::string, std::string>> ResponseHeaders;
+ CurlWriteCallbackData WriteData;
+ CurlHeaderCallbackData HeaderData;
+ curl_slist* HeaderList = nullptr;
+
+ // For PUT/POST with payload: keep the data alive until transfer completes
+ IoBuffer PayloadBuffer;
+ CurlReadCallbackData ReadData;
+
+ TransferContext(AsyncHttpCallback&& InCallback) : Callback(std::move(InCallback))
+ {
+ WriteData.Body = &Body;
+ HeaderData.Headers = &ResponseHeaders;
+ }
+
+ ~TransferContext()
+ {
+ if (HeaderList)
+ {
+ curl_slist_free_all(HeaderList);
+ }
+ }
+
+ TransferContext(const TransferContext&) = delete;
+ TransferContext& operator=(const TransferContext&) = delete;
+};
+
+//////////////////////////////////////////////////////////////////////////
+//
+// AsyncHttpClient::Impl
+
+struct AsyncHttpClient::Impl
+{
+ Impl(std::string_view BaseUri, const HttpClientSettings& Settings)
+ : m_BaseUri(BaseUri)
+ , m_Settings(Settings)
+ , m_Log(logging::Get(Settings.LogCategory))
+ , m_OwnedIoContext(std::make_unique<asio::io_context>())
+ , m_IoContext(*m_OwnedIoContext)
+ , m_Strand(asio::make_strand(m_IoContext))
+ , m_Timer(m_Strand)
+ {
+ Init();
+ m_WorkGuard.emplace(m_IoContext.get_executor());
+ m_IoThread = std::thread([this]() {
+ SetCurrentThreadName("async_http");
+ try
+ {
+ m_IoContext.run();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("AsyncHttpClient: unhandled exception in io thread: {}", Ex.what());
+ }
+ });
+ }
+
+ Impl(std::string_view BaseUri, asio::io_context& IoContext, const HttpClientSettings& Settings)
+ : m_BaseUri(BaseUri)
+ , m_Settings(Settings)
+ , m_Log(logging::Get(Settings.LogCategory))
+ , m_IoContext(IoContext)
+ , m_Strand(asio::make_strand(m_IoContext))
+ , m_Timer(m_Strand)
+ {
+ Init();
+ }
+
+ ~Impl()
+ {
+ // Clean up curl state on the strand where all curl_multi operations
+ // are serialized. Use a promise to block until the cleanup handler
+ // has actually executed — essential for the external io_context case
+ // where we don't own the run loop.
+ std::promise<void> Done;
+ std::future<void> DoneFuture = Done.get_future();
+
+ asio::post(m_Strand, [this, &Done]() {
+ m_ShuttingDown = true;
+ m_Timer.cancel();
+
+ // Release all tracked sockets (don't close — curl owns the fds).
+ for (auto& [Fd, Info] : m_Sockets)
+ {
+ if (Info->Socket.is_open())
+ {
+ Info->Socket.cancel();
+ Info->Socket.release();
+ }
+ }
+ m_Sockets.clear();
+
+ for (auto& [Handle, Ctx] : m_Transfers)
+ {
+ curl_multi_remove_handle(m_Multi, Handle);
+ curl_easy_cleanup(Handle);
+ }
+ m_Transfers.clear();
+
+ for (CURL* Handle : m_HandlePool)
+ {
+ curl_easy_cleanup(Handle);
+ }
+ m_HandlePool.clear();
+
+ Done.set_value();
+ });
+
+ // For owned io_context: release work guard so run() can return after
+ // processing the cleanup handler above.
+ m_WorkGuard.reset();
+
+ if (m_IoThread.joinable())
+ {
+ m_IoThread.join();
+ }
+ else
+ {
+ // External io_context: wait for the cleanup handler to complete.
+ DoneFuture.wait();
+ }
+
+ if (m_Multi)
+ {
+ curl_multi_cleanup(m_Multi);
+ }
+ }
+
+ LoggerRef Log() { return m_Log; }
+
+ void Init()
+ {
+ m_Multi = curl_multi_init();
+ if (!m_Multi)
+ {
+ throw std::runtime_error("curl_multi_init failed");
+ }
+
+ SetupMultiCallbacks();
+
+ if (m_Settings.SessionId == Oid::Zero)
+ {
+ m_SessionId = std::string(GetSessionIdString());
+ }
+ else
+ {
+ m_SessionId = m_Settings.SessionId.ToString();
+ }
+ }
+
+ // ── Handle pool ─────────────────────────────────────────────────────
+
+ CURL* AllocHandle()
+ {
+ if (!m_HandlePool.empty())
+ {
+ CURL* Handle = m_HandlePool.back();
+ m_HandlePool.pop_back();
+ curl_easy_reset(Handle);
+ return Handle;
+ }
+ CURL* Handle = curl_easy_init();
+ if (!Handle)
+ {
+ throw std::runtime_error("curl_easy_init failed");
+ }
+ return Handle;
+ }
+
+ void ReleaseHandle(CURL* Handle) { m_HandlePool.push_back(Handle); }
+
+ // ── Configure a handle with common settings ─────────────────────────
+ // Called only from DoAsync* lambdas running on the strand.
+
+ void ConfigureHandle(CURL* Handle, std::string_view ResourcePath, const HttpClient::KeyValueMap& Parameters)
+ {
+ // Build URL
+ ExtendableStringBuilder<256> Url;
+ BuildUrlWithParameters(Url, m_BaseUri, ResourcePath, Parameters);
+ curl_easy_setopt(Handle, CURLOPT_URL, Url.c_str());
+
+ // Unix domain socket
+ if (!m_Settings.UnixSocketPath.empty())
+ {
+ m_UnixSocketPathUtf8 = PathToUtf8(m_Settings.UnixSocketPath);
+ curl_easy_setopt(Handle, CURLOPT_UNIX_SOCKET_PATH, m_UnixSocketPathUtf8.c_str());
+ }
+
+ // Timeouts
+ if (m_Settings.ConnectTimeout.count() > 0)
+ {
+ curl_easy_setopt(Handle, CURLOPT_CONNECTTIMEOUT_MS, static_cast<long>(m_Settings.ConnectTimeout.count()));
+ }
+ if (m_Settings.Timeout.count() > 0)
+ {
+ curl_easy_setopt(Handle, CURLOPT_TIMEOUT_MS, static_cast<long>(m_Settings.Timeout.count()));
+ }
+
+ // HTTP/2
+ if (m_Settings.AssumeHttp2)
+ {
+ curl_easy_setopt(Handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2_PRIOR_KNOWLEDGE);
+ }
+
+ // SSL
+ if (m_Settings.InsecureSsl)
+ {
+ curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYPEER, 0L);
+ curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYHOST, 0L);
+ }
+ if (!m_Settings.CaBundlePath.empty())
+ {
+ curl_easy_setopt(Handle, CURLOPT_CAINFO, m_Settings.CaBundlePath.c_str());
+ }
+
+ // Verbose/debug
+ if (m_Settings.Verbose)
+ {
+ curl_easy_setopt(Handle, CURLOPT_VERBOSE, 1L);
+ }
+
+ // Thread safety
+ curl_easy_setopt(Handle, CURLOPT_NOSIGNAL, 1L);
+
+ if (m_Settings.ForbidReuseConnection)
+ {
+ curl_easy_setopt(Handle, CURLOPT_FORBID_REUSE, 1L);
+ }
+ }
+
+ // ── Access token ────────────────────────────────────────────────────
+
+ std::optional<std::string> GetAccessToken()
+ {
+ if (!m_Settings.AccessTokenProvider.has_value())
+ {
+ return {};
+ }
+ {
+ RwLock::SharedLockScope _(m_AccessTokenLock);
+ if (!m_CachedAccessToken.NeedsRefresh())
+ {
+ return m_CachedAccessToken.GetValue();
+ }
+ }
+ RwLock::ExclusiveLockScope _(m_AccessTokenLock);
+ if (!m_CachedAccessToken.NeedsRefresh())
+ {
+ return m_CachedAccessToken.GetValue();
+ }
+ HttpClientAccessToken NewToken = m_Settings.AccessTokenProvider.value()();
+ if (!NewToken.IsValid())
+ {
+ ZEN_WARN("AsyncHttpClient: failed to refresh access token, retrying once");
+ NewToken = m_Settings.AccessTokenProvider.value()();
+ }
+ if (NewToken.IsValid())
+ {
+ m_CachedAccessToken = NewToken;
+ return m_CachedAccessToken.GetValue();
+ }
+ ZEN_WARN("AsyncHttpClient: access token provider returned invalid token");
+ return {};
+ }
+
+ // ── Submit a transfer ───────────────────────────────────────────────
+
+ void SubmitTransfer(CURL* Handle, std::unique_ptr<TransferContext> Ctx)
+ {
+ ZEN_TRACE_CPU("AsyncHttpClient::SubmitTransfer");
+ // Setup write/header callbacks
+ curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, CurlWriteCallback);
+ curl_easy_setopt(Handle, CURLOPT_WRITEDATA, &Ctx->WriteData);
+ curl_easy_setopt(Handle, CURLOPT_HEADERFUNCTION, CurlHeaderCallback);
+ curl_easy_setopt(Handle, CURLOPT_HEADERDATA, &Ctx->HeaderData);
+
+ m_Transfers[Handle] = std::move(Ctx);
+
+ CURLMcode Mc = curl_multi_add_handle(m_Multi, Handle);
+ if (Mc != CURLM_OK)
+ {
+ auto Stolen = std::move(m_Transfers[Handle]);
+ m_Transfers.erase(Handle);
+ ReleaseHandle(Handle);
+
+ HttpClient::Response ErrorResponse;
+ ErrorResponse.Error =
+ HttpClient::ErrorContext{.ErrorCode = HttpClientErrorCode::kInternalError,
+ .ErrorMessage = fmt::format("curl_multi_add_handle failed: {}", curl_multi_strerror(Mc))};
+ asio::post(m_IoContext,
+ [Cb = std::move(Stolen->Callback), Response = std::move(ErrorResponse)]() mutable { Cb(std::move(Response)); });
+ return;
+ }
+ }
+
+ // ── Socket-action integration ───────────────────────────────────────
+ //
+ // curl_multi drives I/O via two callbacks:
+ // - SocketCallback: curl tells us which sockets to watch for read/write
+ // - TimerCallback: curl tells us when to fire a timeout
+ //
+ // On each socket event or timeout we call curl_multi_socket_action(),
+ // then drain completed transfers via curl_multi_info_read().
+
+ // Per-socket state: wraps the native fd in an ASIO socket for async_wait.
+ struct SocketInfo
+ {
+ asio::ip::tcp::socket Socket;
+ int WatchFlags = 0; // CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT
+
+ explicit SocketInfo(asio::io_context& IoContext) : Socket(IoContext) {}
+ };
+
+ // Static thunks registered with curl_multi ────────────────────────────
+
+ static int CurlSocketCallback(CURL* Easy, curl_socket_t Fd, int Action, void* UserPtr, void* SocketPtr)
+ {
+ ZEN_UNUSED(Easy);
+ auto* Self = static_cast<Impl*>(UserPtr);
+ Self->OnCurlSocket(Fd, Action, static_cast<SocketInfo*>(SocketPtr));
+ return 0;
+ }
+
+ static int CurlTimerCallback(CURLM* Multi, long TimeoutMs, void* UserPtr)
+ {
+ ZEN_UNUSED(Multi);
+ auto* Self = static_cast<Impl*>(UserPtr);
+ Self->OnCurlTimer(TimeoutMs);
+ return 0;
+ }
+
+ void SetupMultiCallbacks()
+ {
+ curl_multi_setopt(m_Multi, CURLMOPT_SOCKETFUNCTION, CurlSocketCallback);
+ curl_multi_setopt(m_Multi, CURLMOPT_SOCKETDATA, this);
+ curl_multi_setopt(m_Multi, CURLMOPT_TIMERFUNCTION, CurlTimerCallback);
+ curl_multi_setopt(m_Multi, CURLMOPT_TIMERDATA, this);
+ }
+
+ // Called by curl when socket watch state changes ──────────────────────
+
+ void OnCurlSocket(curl_socket_t Fd, int Action, SocketInfo* Info)
+ {
+ if (Action == CURL_POLL_REMOVE)
+ {
+ if (Info)
+ {
+ // Cancel pending async_wait ops before releasing the fd.
+ // curl owns the fd, so we must release() rather than close().
+ Info->Socket.cancel();
+ if (Info->Socket.is_open())
+ {
+ Info->Socket.release();
+ }
+ m_Sockets.erase(Fd);
+ }
+ return;
+ }
+
+ if (!Info)
+ {
+ // New socket — wrap the native fd in an ASIO socket.
+ auto [It, Inserted] = m_Sockets.emplace(Fd, std::make_unique<SocketInfo>(m_IoContext));
+ Info = It->second.get();
+
+ asio::error_code Ec;
+ // Determine protocol from the fd (v4 vs v6). Default to v4.
+ Info->Socket.assign(asio::ip::tcp::v4(), Fd, Ec);
+ if (Ec)
+ {
+ // Try v6 as fallback
+ Info->Socket.assign(asio::ip::tcp::v6(), Fd, Ec);
+ }
+ if (Ec)
+ {
+ ZEN_WARN("AsyncHttpClient: failed to assign socket fd {}: {}", static_cast<int>(Fd), Ec.message());
+ m_Sockets.erase(Fd);
+ return;
+ }
+
+ curl_multi_assign(m_Multi, Fd, Info);
+ }
+
+ Info->WatchFlags = Action;
+ SetSocketWatch(Fd, Info);
+ }
+
+ void SetSocketWatch(curl_socket_t Fd, SocketInfo* Info)
+ {
+ // Cancel any pending wait before issuing a new one.
+ Info->Socket.cancel();
+
+ if (Info->WatchFlags & CURL_POLL_IN)
+ {
+ Info->Socket.async_wait(asio::socket_base::wait_read, asio::bind_executor(m_Strand, [this, Fd](const asio::error_code& Ec) {
+ if (Ec || m_ShuttingDown)
+ {
+ return;
+ }
+ OnSocketReady(Fd, CURL_CSELECT_IN);
+ }));
+ }
+
+ if (Info->WatchFlags & CURL_POLL_OUT)
+ {
+ Info->Socket.async_wait(asio::socket_base::wait_write, asio::bind_executor(m_Strand, [this, Fd](const asio::error_code& Ec) {
+ if (Ec || m_ShuttingDown)
+ {
+ return;
+ }
+ OnSocketReady(Fd, CURL_CSELECT_OUT);
+ }));
+ }
+ }
+
+ void OnSocketReady(curl_socket_t Fd, int CurlAction)
+ {
+ ZEN_TRACE_CPU("AsyncHttpClient::OnSocketReady");
+ int StillRunning = 0;
+ curl_multi_socket_action(m_Multi, Fd, CurlAction, &StillRunning);
+ CheckCompleted();
+
+ // Re-arm the watch if the socket is still tracked.
+ auto It = m_Sockets.find(Fd);
+ if (It != m_Sockets.end())
+ {
+ SetSocketWatch(Fd, It->second.get());
+ }
+ }
+
+ // Called by curl when it wants a timeout ──────────────────────────────
+
+ void OnCurlTimer(long TimeoutMs)
+ {
+ m_Timer.cancel();
+
+ if (TimeoutMs < 0)
+ {
+ // curl says "no timeout needed"
+ return;
+ }
+
+ if (TimeoutMs == 0)
+ {
+ // curl wants immediate action — run it directly on the strand.
+ asio::post(m_Strand, [this]() {
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ int StillRunning = 0;
+ curl_multi_socket_action(m_Multi, CURL_SOCKET_TIMEOUT, 0, &StillRunning);
+ CheckCompleted();
+ });
+ return;
+ }
+
+ m_Timer.expires_after(std::chrono::milliseconds(TimeoutMs));
+ m_Timer.async_wait(asio::bind_executor(m_Strand, [this](const asio::error_code& Ec) {
+ if (Ec || m_ShuttingDown)
+ {
+ return;
+ }
+ ZEN_TRACE_CPU("AsyncHttpClient::OnTimeout");
+ int StillRunning = 0;
+ curl_multi_socket_action(m_Multi, CURL_SOCKET_TIMEOUT, 0, &StillRunning);
+ CheckCompleted();
+ }));
+ }
+
+ // Drain completed transfers from curl_multi ──────────────────────────
+
+ void CheckCompleted()
+ {
+ int MsgsLeft = 0;
+ CURLMsg* Msg = nullptr;
+ while ((Msg = curl_multi_info_read(m_Multi, &MsgsLeft)) != nullptr)
+ {
+ if (Msg->msg != CURLMSG_DONE)
+ {
+ continue;
+ }
+
+ CURL* Handle = Msg->easy_handle;
+ CURLcode Result = Msg->data.result;
+
+ curl_multi_remove_handle(m_Multi, Handle);
+
+ auto It = m_Transfers.find(Handle);
+ if (It == m_Transfers.end())
+ {
+ ReleaseHandle(Handle);
+ continue;
+ }
+
+ std::unique_ptr<TransferContext> Ctx = std::move(It->second);
+ m_Transfers.erase(It);
+
+ CompleteTransfer(Handle, Result, std::move(Ctx));
+ }
+ }
+
+ void CompleteTransfer(CURL* Handle, CURLcode CurlResult, std::unique_ptr<TransferContext> Ctx)
+ {
+ ZEN_TRACE_CPU("AsyncHttpClient::CompleteTransfer");
+ // Extract result info
+ long StatusCode = 0;
+ curl_easy_getinfo(Handle, CURLINFO_RESPONSE_CODE, &StatusCode);
+
+ double Elapsed = 0;
+ curl_easy_getinfo(Handle, CURLINFO_TOTAL_TIME, &Elapsed);
+
+ curl_off_t UpBytes = 0;
+ curl_easy_getinfo(Handle, CURLINFO_SIZE_UPLOAD_T, &UpBytes);
+
+ curl_off_t DownBytes = 0;
+ curl_easy_getinfo(Handle, CURLINFO_SIZE_DOWNLOAD_T, &DownBytes);
+
+ ReleaseHandle(Handle);
+
+ // Build response
+ HttpClient::Response Response;
+ Response.StatusCode = HttpResponseCode(StatusCode);
+ Response.UploadedBytes = static_cast<int64_t>(UpBytes);
+ Response.DownloadedBytes = static_cast<int64_t>(DownBytes);
+ Response.ElapsedSeconds = Elapsed;
+ Response.Header = BuildHeaderMap(Ctx->ResponseHeaders);
+
+ if (CurlResult != CURLE_OK)
+ {
+ const char* ErrorMsg = curl_easy_strerror(CurlResult);
+
+ if (CurlResult != CURLE_OPERATION_TIMEDOUT && CurlResult != CURLE_COULDNT_CONNECT && CurlResult != CURLE_ABORTED_BY_CALLBACK)
+ {
+ ZEN_WARN("AsyncHttpClient failure: ({}) '{}'", static_cast<int>(CurlResult), ErrorMsg);
+ }
+
+ if (!Ctx->Body.empty())
+ {
+ Response.ResponsePayload = IoBufferBuilder::MakeCloneFromMemory(Ctx->Body.data(), Ctx->Body.size());
+ }
+
+ Response.Error = HttpClient::ErrorContext{.ErrorCode = MapCurlError(CurlResult), .ErrorMessage = std::string(ErrorMsg)};
+ }
+ else if (StatusCode == static_cast<long>(HttpResponseCode::NoContent) || Ctx->Body.empty())
+ {
+ // No payload
+ }
+ else
+ {
+ IoBuffer PayloadBuffer = IoBufferBuilder::MakeCloneFromMemory(Ctx->Body.data(), Ctx->Body.size());
+ ApplyContentTypeFromHeaders(PayloadBuffer, Ctx->ResponseHeaders);
+
+ const HttpResponseCode Code = HttpResponseCode(StatusCode);
+ if (!IsHttpSuccessCode(Code) && Code != HttpResponseCode::NotFound)
+ {
+ ZEN_WARN("AsyncHttpClient request failed: status={}, base={}", static_cast<int>(Code), m_BaseUri);
+ }
+
+ Response.ResponsePayload = std::move(PayloadBuffer);
+ }
+
+ // Dispatch the user callback off the strand so a slow callback
+ // cannot starve the curl_multi poll loop.
+ asio::post(m_IoContext, [LogRef = m_Log, Cb = std::move(Ctx->Callback), Response = std::move(Response)]() mutable {
+ try
+ {
+ Cb(std::move(Response));
+ }
+ catch (const std::exception& Ex)
+ {
+ auto Log = [&]() -> LoggerRef { return LogRef; };
+ ZEN_ERROR("AsyncHttpClient: unhandled exception in completion callback: {}", Ex.what());
+ }
+ });
+ }
+
+ // ── Async verb implementations ──────────────────────────────────────
+
+ void DoAsyncGet(std::string Url,
+ AsyncHttpCallback Callback,
+ HttpClient::KeyValueMap AdditionalHeader,
+ HttpClient::KeyValueMap Parameters)
+ {
+ asio::post(m_Strand,
+ [this,
+ Url = std::move(Url),
+ Callback = std::move(Callback),
+ AdditionalHeader = std::move(AdditionalHeader),
+ Parameters = std::move(Parameters)]() mutable {
+ ZEN_TRACE_CPU("AsyncHttpClient::Get");
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ CURL* Handle = AllocHandle();
+ ConfigureHandle(Handle, Url, Parameters);
+ curl_easy_setopt(Handle, CURLOPT_HTTPGET, 1L);
+
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken());
+ curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
+
+ SubmitTransfer(Handle, std::move(Ctx));
+ });
+ }
+
+ void DoAsyncHead(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader)
+ {
+ asio::post(m_Strand,
+ [this, Url = std::move(Url), Callback = std::move(Callback), AdditionalHeader = std::move(AdditionalHeader)]() mutable {
+ ZEN_TRACE_CPU("AsyncHttpClient::Head");
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ CURL* Handle = AllocHandle();
+ ConfigureHandle(Handle, Url, {});
+ curl_easy_setopt(Handle, CURLOPT_NOBODY, 1L);
+
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken());
+ curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
+
+ SubmitTransfer(Handle, std::move(Ctx));
+ });
+ }
+
+ void DoAsyncDelete(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader)
+ {
+ asio::post(m_Strand,
+ [this, Url = std::move(Url), Callback = std::move(Callback), AdditionalHeader = std::move(AdditionalHeader)]() mutable {
+ ZEN_TRACE_CPU("AsyncHttpClient::Delete");
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ CURL* Handle = AllocHandle();
+ ConfigureHandle(Handle, Url, {});
+ curl_easy_setopt(Handle, CURLOPT_CUSTOMREQUEST, "DELETE");
+
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken());
+ curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
+
+ SubmitTransfer(Handle, std::move(Ctx));
+ });
+ }
+
+ void DoAsyncPost(std::string Url,
+ AsyncHttpCallback Callback,
+ HttpClient::KeyValueMap AdditionalHeader,
+ HttpClient::KeyValueMap Parameters)
+ {
+ asio::post(m_Strand,
+ [this,
+ Url = std::move(Url),
+ Callback = std::move(Callback),
+ AdditionalHeader = std::move(AdditionalHeader),
+ Parameters = std::move(Parameters)]() mutable {
+ ZEN_TRACE_CPU("AsyncHttpClient::Post");
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ CURL* Handle = AllocHandle();
+ ConfigureHandle(Handle, Url, Parameters);
+ curl_easy_setopt(Handle, CURLOPT_POST, 1L);
+ curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE, 0L);
+
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken());
+ curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
+
+ SubmitTransfer(Handle, std::move(Ctx));
+ });
+ }
+
+ void DoAsyncPostWithPayload(std::string Url,
+ IoBuffer Payload,
+ ZenContentType ContentType,
+ AsyncHttpCallback Callback,
+ HttpClient::KeyValueMap AdditionalHeader)
+ {
+ asio::post(m_Strand,
+ [this,
+ Url = std::move(Url),
+ Payload = std::move(Payload),
+ ContentType,
+ Callback = std::move(Callback),
+ AdditionalHeader = std::move(AdditionalHeader)]() mutable {
+ ZEN_TRACE_CPU("AsyncHttpClient::PostWithPayload");
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ CURL* Handle = AllocHandle();
+ ConfigureHandle(Handle, Url, {});
+ curl_easy_setopt(Handle, CURLOPT_POST, 1L);
+
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->PayloadBuffer = std::move(Payload);
+ Ctx->HeaderList =
+ BuildHeaderList(AdditionalHeader,
+ m_SessionId,
+ GetAccessToken(),
+ {std::make_pair("Content-Type", std::string(MapContentTypeToString(ContentType)))});
+ curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
+
+ // Set up read callback for payload data
+ Ctx->ReadData.DataPtr = static_cast<const uint8_t*>(Ctx->PayloadBuffer.GetData());
+ Ctx->ReadData.DataSize = Ctx->PayloadBuffer.GetSize();
+ Ctx->ReadData.Offset = 0;
+
+ curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE_LARGE, static_cast<curl_off_t>(Ctx->PayloadBuffer.GetSize()));
+ curl_easy_setopt(Handle, CURLOPT_READFUNCTION, CurlReadCallback);
+ curl_easy_setopt(Handle, CURLOPT_READDATA, &Ctx->ReadData);
+
+ SubmitTransfer(Handle, std::move(Ctx));
+ });
+ }
+
+ void DoAsyncPutWithPayload(std::string Url,
+ IoBuffer Payload,
+ AsyncHttpCallback Callback,
+ HttpClient::KeyValueMap AdditionalHeader,
+ HttpClient::KeyValueMap Parameters)
+ {
+ asio::post(m_Strand,
+ [this,
+ Url = std::move(Url),
+ Payload = std::move(Payload),
+ Callback = std::move(Callback),
+ AdditionalHeader = std::move(AdditionalHeader),
+ Parameters = std::move(Parameters)]() mutable {
+ ZEN_TRACE_CPU("AsyncHttpClient::Put");
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ CURL* Handle = AllocHandle();
+ ConfigureHandle(Handle, Url, Parameters);
+ curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L);
+
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->PayloadBuffer = std::move(Payload);
+ Ctx->HeaderList = BuildHeaderList(
+ AdditionalHeader,
+ m_SessionId,
+ GetAccessToken(),
+ {std::make_pair("Content-Type", std::string(MapContentTypeToString(Ctx->PayloadBuffer.GetContentType())))});
+ curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
+
+ Ctx->ReadData.DataPtr = static_cast<const uint8_t*>(Ctx->PayloadBuffer.GetData());
+ Ctx->ReadData.DataSize = Ctx->PayloadBuffer.GetSize();
+ Ctx->ReadData.Offset = 0;
+
+ curl_easy_setopt(Handle, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(Ctx->PayloadBuffer.GetSize()));
+ curl_easy_setopt(Handle, CURLOPT_READFUNCTION, CurlReadCallback);
+ curl_easy_setopt(Handle, CURLOPT_READDATA, &Ctx->ReadData);
+
+ SubmitTransfer(Handle, std::move(Ctx));
+ });
+ }
+
+ void DoAsyncPutNoPayload(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap Parameters)
+ {
+ asio::post(m_Strand, [this, Url = std::move(Url), Callback = std::move(Callback), Parameters = std::move(Parameters)]() mutable {
+ ZEN_TRACE_CPU("AsyncHttpClient::Put");
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ CURL* Handle = AllocHandle();
+ ConfigureHandle(Handle, Url, Parameters);
+ curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L);
+ curl_easy_setopt(Handle, CURLOPT_INFILESIZE_LARGE, 0LL);
+
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+
+ HttpClient::KeyValueMap ContentLengthHeader{std::pair<std::string_view, std::string_view>{"Content-Length", "0"}};
+ Ctx->HeaderList = BuildHeaderList(ContentLengthHeader, m_SessionId, GetAccessToken());
+ curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
+
+ SubmitTransfer(Handle, std::move(Ctx));
+ });
+ }
+
+ // ── Members ─────────────────────────────────────────────────────────
+
+ std::string m_BaseUri;
+ HttpClientSettings m_Settings;
+ LoggerRef m_Log;
+ std::string m_SessionId;
+ std::string m_UnixSocketPathUtf8;
+
+ // io_context and strand — all curl_multi operations are serialized on the
+ // strand, making this safe even when the io_context has multiple threads.
+ std::unique_ptr<asio::io_context> m_OwnedIoContext;
+ asio::io_context& m_IoContext;
+ asio::strand<asio::io_context::executor_type> m_Strand;
+ std::optional<asio::executor_work_guard<asio::io_context::executor_type>> m_WorkGuard;
+ std::thread m_IoThread;
+
+ // curl_multi and socket-action state
+ CURLM* m_Multi = nullptr;
+ std::unordered_map<CURL*, std::unique_ptr<TransferContext>> m_Transfers;
+ std::vector<CURL*> m_HandlePool;
+ std::unordered_map<curl_socket_t, std::unique_ptr<SocketInfo>> m_Sockets;
+ asio::steady_timer m_Timer;
+ bool m_ShuttingDown = false;
+
+ // Access token cache
+ RwLock m_AccessTokenLock;
+ HttpClientAccessToken m_CachedAccessToken;
+};
+
+//////////////////////////////////////////////////////////////////////////
+//
+// AsyncHttpClient public API
+
+AsyncHttpClient::AsyncHttpClient(std::string_view BaseUri, const HttpClientSettings& Settings)
+: m_Impl(std::make_unique<Impl>(BaseUri, Settings))
+{
+}
+
+AsyncHttpClient::AsyncHttpClient(std::string_view BaseUri, asio::io_context& IoContext, const HttpClientSettings& Settings)
+: m_Impl(std::make_unique<Impl>(BaseUri, IoContext, Settings))
+{
+}
+
+AsyncHttpClient::~AsyncHttpClient() = default;
+
+// ── Callback-based API ──────────────────────────────────────────────────
+
+void
+AsyncHttpClient::AsyncGet(std::string_view Url,
+ AsyncHttpCallback Callback,
+ const KeyValueMap& AdditionalHeader,
+ const KeyValueMap& Parameters)
+{
+ m_Impl->DoAsyncGet(std::string(Url), std::move(Callback), AdditionalHeader, Parameters);
+}
+
+void
+AsyncHttpClient::AsyncHead(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader)
+{
+ m_Impl->DoAsyncHead(std::string(Url), std::move(Callback), AdditionalHeader);
+}
+
+void
+AsyncHttpClient::AsyncDelete(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader)
+{
+ m_Impl->DoAsyncDelete(std::string(Url), std::move(Callback), AdditionalHeader);
+}
+
+void
+AsyncHttpClient::AsyncPost(std::string_view Url,
+ AsyncHttpCallback Callback,
+ const KeyValueMap& AdditionalHeader,
+ const KeyValueMap& Parameters)
+{
+ m_Impl->DoAsyncPost(std::string(Url), std::move(Callback), AdditionalHeader, Parameters);
+}
+
+void
+AsyncHttpClient::AsyncPost(std::string_view Url, const IoBuffer& Payload, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader)
+{
+ m_Impl->DoAsyncPostWithPayload(std::string(Url), Payload, Payload.GetContentType(), std::move(Callback), AdditionalHeader);
+}
+
+void
+AsyncHttpClient::AsyncPost(std::string_view Url,
+ const IoBuffer& Payload,
+ ZenContentType ContentType,
+ AsyncHttpCallback Callback,
+ const KeyValueMap& AdditionalHeader)
+{
+ m_Impl->DoAsyncPostWithPayload(std::string(Url), Payload, ContentType, std::move(Callback), AdditionalHeader);
+}
+
+void
+AsyncHttpClient::AsyncPut(std::string_view Url,
+ const IoBuffer& Payload,
+ AsyncHttpCallback Callback,
+ const KeyValueMap& AdditionalHeader,
+ const KeyValueMap& Parameters)
+{
+ m_Impl->DoAsyncPutWithPayload(std::string(Url), Payload, std::move(Callback), AdditionalHeader, Parameters);
+}
+
+void
+AsyncHttpClient::AsyncPut(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& Parameters)
+{
+ m_Impl->DoAsyncPutNoPayload(std::string(Url), std::move(Callback), Parameters);
+}
+
+// ── Future-based API ────────────────────────────────────────────────────
+
+std::future<HttpClient::Response>
+AsyncHttpClient::Get(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters)
+{
+ auto Promise = std::make_shared<std::promise<Response>>();
+ auto Future = Promise->get_future();
+ AsyncGet(
+ Url,
+ [Promise](Response R) { Promise->set_value(std::move(R)); },
+ AdditionalHeader,
+ Parameters);
+ return Future;
+}
+
+std::future<HttpClient::Response>
+AsyncHttpClient::Head(std::string_view Url, const KeyValueMap& AdditionalHeader)
+{
+ auto Promise = std::make_shared<std::promise<Response>>();
+ auto Future = Promise->get_future();
+ AsyncHead(
+ Url,
+ [Promise](Response R) { Promise->set_value(std::move(R)); },
+ AdditionalHeader);
+ return Future;
+}
+
+std::future<HttpClient::Response>
+AsyncHttpClient::Delete(std::string_view Url, const KeyValueMap& AdditionalHeader)
+{
+ auto Promise = std::make_shared<std::promise<Response>>();
+ auto Future = Promise->get_future();
+ AsyncDelete(
+ Url,
+ [Promise](Response R) { Promise->set_value(std::move(R)); },
+ AdditionalHeader);
+ return Future;
+}
+
+std::future<HttpClient::Response>
+AsyncHttpClient::Post(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters)
+{
+ auto Promise = std::make_shared<std::promise<Response>>();
+ auto Future = Promise->get_future();
+ AsyncPost(
+ Url,
+ [Promise](Response R) { Promise->set_value(std::move(R)); },
+ AdditionalHeader,
+ Parameters);
+ return Future;
+}
+
+std::future<HttpClient::Response>
+AsyncHttpClient::Post(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader)
+{
+ auto Promise = std::make_shared<std::promise<Response>>();
+ auto Future = Promise->get_future();
+ AsyncPost(
+ Url,
+ Payload,
+ [Promise](Response R) { Promise->set_value(std::move(R)); },
+ AdditionalHeader);
+ return Future;
+}
+
+std::future<HttpClient::Response>
+AsyncHttpClient::Post(std::string_view Url, const IoBuffer& Payload, ZenContentType ContentType, const KeyValueMap& AdditionalHeader)
+{
+ auto Promise = std::make_shared<std::promise<Response>>();
+ auto Future = Promise->get_future();
+ AsyncPost(
+ Url,
+ Payload,
+ ContentType,
+ [Promise](Response R) { Promise->set_value(std::move(R)); },
+ AdditionalHeader);
+ return Future;
+}
+
+std::future<HttpClient::Response>
+AsyncHttpClient::Put(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters)
+{
+ auto Promise = std::make_shared<std::promise<Response>>();
+ auto Future = Promise->get_future();
+ AsyncPut(
+ Url,
+ Payload,
+ [Promise](Response R) { Promise->set_value(std::move(R)); },
+ AdditionalHeader,
+ Parameters);
+ return Future;
+}
+
+std::future<HttpClient::Response>
+AsyncHttpClient::Put(std::string_view Url, const KeyValueMap& Parameters)
+{
+ auto Promise = std::make_shared<std::promise<Response>>();
+ auto Future = Promise->get_future();
+ AsyncPut(
+ Url,
+ [Promise](Response R) { Promise->set_value(std::move(R)); },
+ Parameters);
+ return Future;
+}
+
+} // namespace zen