aboutsummaryrefslogtreecommitdiff
path: root/src/zenhttp/clients
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenhttp/clients')
-rw-r--r--src/zenhttp/clients/asynchttpclient.cpp1033
-rw-r--r--src/zenhttp/clients/httpclientcurl.cpp310
-rw-r--r--src/zenhttp/clients/httpclientcurl.h1
-rw-r--r--src/zenhttp/clients/httpclientcurlhelpers.h298
4 files changed, 1366 insertions, 276 deletions
diff --git a/src/zenhttp/clients/asynchttpclient.cpp b/src/zenhttp/clients/asynchttpclient.cpp
new file mode 100644
index 000000000..ea88fc783
--- /dev/null
+++ b/src/zenhttp/clients/asynchttpclient.cpp
@@ -0,0 +1,1033 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenhttp/asynchttpclient.h>
+
+#include "httpclientcurlhelpers.h"
+
+#include <zencore/filesystem.h>
+#include <zencore/logging.h>
+#include <zencore/session.h>
+#include <zencore/thread.h>
+#include <zencore/trace.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <asio.hpp>
+#include <asio/steady_timer.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <thread>
+#include <unordered_map>
+
+namespace zen {
+
+//////////////////////////////////////////////////////////////////////////
+//
+// TransferContext: per-transfer state associated with each CURL easy handle
+
+struct TransferContext
+{
+ AsyncHttpCallback Callback;
+ std::string Body;
+ std::vector<std::pair<std::string, std::string>> ResponseHeaders;
+ CurlWriteCallbackData WriteData;
+ CurlHeaderCallbackData HeaderData;
+ curl_slist* HeaderList = nullptr;
+
+ // For PUT/POST with payload: keep the data alive until transfer completes
+ IoBuffer PayloadBuffer;
+ CurlReadCallbackData ReadData;
+
+ TransferContext(AsyncHttpCallback&& InCallback) : Callback(std::move(InCallback))
+ {
+ WriteData.Body = &Body;
+ HeaderData.Headers = &ResponseHeaders;
+ }
+
+ ~TransferContext()
+ {
+ if (HeaderList)
+ {
+ curl_slist_free_all(HeaderList);
+ }
+ }
+
+ TransferContext(const TransferContext&) = delete;
+ TransferContext& operator=(const TransferContext&) = delete;
+};
+
+//////////////////////////////////////////////////////////////////////////
+//
+// AsyncHttpClient::Impl
+
+struct AsyncHttpClient::Impl
+{
+ Impl(std::string_view BaseUri, const HttpClientSettings& Settings)
+ : m_BaseUri(BaseUri)
+ , m_Settings(Settings)
+ , m_Log(logging::Get(Settings.LogCategory))
+ , m_OwnedIoContext(std::make_unique<asio::io_context>())
+ , m_IoContext(*m_OwnedIoContext)
+ , m_Strand(asio::make_strand(m_IoContext))
+ , m_Timer(m_Strand)
+ {
+ Init();
+ m_WorkGuard.emplace(m_IoContext.get_executor());
+ m_IoThread = std::thread([this]() {
+ SetCurrentThreadName("async_http");
+ try
+ {
+ m_IoContext.run();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("AsyncHttpClient: unhandled exception in io thread: {}", Ex.what());
+ }
+ });
+ }
+
+ Impl(std::string_view BaseUri, asio::io_context& IoContext, const HttpClientSettings& Settings)
+ : m_BaseUri(BaseUri)
+ , m_Settings(Settings)
+ , m_Log(logging::Get(Settings.LogCategory))
+ , m_IoContext(IoContext)
+ , m_Strand(asio::make_strand(m_IoContext))
+ , m_Timer(m_Strand)
+ {
+ Init();
+ }
+
+ ~Impl()
+ {
+ // Clean up curl state on the strand where all curl_multi operations
+ // are serialized. Use a promise to block until the cleanup handler
+ // has actually executed - essential for the external io_context case
+ // where we don't own the run loop.
+ std::promise<void> Done;
+ std::future<void> DoneFuture = Done.get_future();
+
+ asio::post(m_Strand, [this, &Done]() {
+ m_ShuttingDown = true;
+ m_Timer.cancel();
+
+ // Release all tracked sockets (don't close - curl owns the fds).
+ for (auto& [Fd, Info] : m_Sockets)
+ {
+ if (Info->Socket.is_open())
+ {
+ Info->Socket.cancel();
+ Info->Socket.release();
+ }
+ }
+ m_Sockets.clear();
+
+ for (auto& [Handle, Ctx] : m_Transfers)
+ {
+ curl_multi_remove_handle(m_Multi, Handle);
+ curl_easy_cleanup(Handle);
+ }
+ m_Transfers.clear();
+
+ for (CURL* Handle : m_HandlePool)
+ {
+ curl_easy_cleanup(Handle);
+ }
+ m_HandlePool.clear();
+
+ Done.set_value();
+ });
+
+ // For owned io_context: release work guard so run() can return after
+ // processing the cleanup handler above.
+ m_WorkGuard.reset();
+
+ if (m_IoThread.joinable())
+ {
+ m_IoThread.join();
+ }
+ else
+ {
+ // External io_context: wait for the cleanup handler to complete.
+ DoneFuture.wait();
+ }
+
+ if (m_Multi)
+ {
+ curl_multi_cleanup(m_Multi);
+ }
+ }
+
+ LoggerRef Log() { return m_Log; }
+
+ void Init()
+ {
+ m_Multi = curl_multi_init();
+ if (!m_Multi)
+ {
+ throw std::runtime_error("curl_multi_init failed");
+ }
+
+ SetupMultiCallbacks();
+
+ if (m_Settings.SessionId == Oid::Zero)
+ {
+ m_SessionId = std::string(GetSessionIdString());
+ }
+ else
+ {
+ m_SessionId = m_Settings.SessionId.ToString();
+ }
+ }
+
+ // -- Handle pool -----------------------------------------------------
+
+ CURL* AllocHandle()
+ {
+ if (!m_HandlePool.empty())
+ {
+ CURL* Handle = m_HandlePool.back();
+ m_HandlePool.pop_back();
+ curl_easy_reset(Handle);
+ return Handle;
+ }
+ CURL* Handle = curl_easy_init();
+ if (!Handle)
+ {
+ throw std::runtime_error("curl_easy_init failed");
+ }
+ return Handle;
+ }
+
+ void ReleaseHandle(CURL* Handle) { m_HandlePool.push_back(Handle); }
+
+ // -- Configure a handle with common settings -------------------------
+ // Called only from DoAsync* lambdas running on the strand.
+
+ void ConfigureHandle(CURL* Handle, std::string_view ResourcePath, const HttpClient::KeyValueMap& Parameters)
+ {
+ // Build URL
+ ExtendableStringBuilder<256> Url;
+ BuildUrlWithParameters(Url, m_BaseUri, ResourcePath, Parameters);
+ curl_easy_setopt(Handle, CURLOPT_URL, Url.c_str());
+
+ // Unix domain socket
+ if (!m_Settings.UnixSocketPath.empty())
+ {
+ m_UnixSocketPathUtf8 = PathToUtf8(m_Settings.UnixSocketPath);
+ curl_easy_setopt(Handle, CURLOPT_UNIX_SOCKET_PATH, m_UnixSocketPathUtf8.c_str());
+ }
+
+ // Timeouts
+ if (m_Settings.ConnectTimeout.count() > 0)
+ {
+ curl_easy_setopt(Handle, CURLOPT_CONNECTTIMEOUT_MS, static_cast<long>(m_Settings.ConnectTimeout.count()));
+ }
+ if (m_Settings.Timeout.count() > 0)
+ {
+ curl_easy_setopt(Handle, CURLOPT_TIMEOUT_MS, static_cast<long>(m_Settings.Timeout.count()));
+ }
+
+ // HTTP/2
+ if (m_Settings.AssumeHttp2)
+ {
+ curl_easy_setopt(Handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2_PRIOR_KNOWLEDGE);
+ }
+
+ // SSL
+ if (m_Settings.InsecureSsl)
+ {
+ curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYPEER, 0L);
+ curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYHOST, 0L);
+ }
+ if (!m_Settings.CaBundlePath.empty())
+ {
+ curl_easy_setopt(Handle, CURLOPT_CAINFO, m_Settings.CaBundlePath.c_str());
+ }
+
+ // Verbose/debug
+ if (m_Settings.Verbose)
+ {
+ curl_easy_setopt(Handle, CURLOPT_VERBOSE, 1L);
+ }
+
+ // Thread safety
+ curl_easy_setopt(Handle, CURLOPT_NOSIGNAL, 1L);
+
+ if (m_Settings.ForbidReuseConnection)
+ {
+ curl_easy_setopt(Handle, CURLOPT_FORBID_REUSE, 1L);
+ }
+ }
+
+ // -- Access token ----------------------------------------------------
+
+ std::optional<std::string> GetAccessToken()
+ {
+ if (!m_Settings.AccessTokenProvider.has_value())
+ {
+ return {};
+ }
+ {
+ RwLock::SharedLockScope _(m_AccessTokenLock);
+ if (!m_CachedAccessToken.NeedsRefresh())
+ {
+ return m_CachedAccessToken.GetValue();
+ }
+ }
+ RwLock::ExclusiveLockScope _(m_AccessTokenLock);
+ if (!m_CachedAccessToken.NeedsRefresh())
+ {
+ return m_CachedAccessToken.GetValue();
+ }
+ HttpClientAccessToken NewToken = m_Settings.AccessTokenProvider.value()();
+ if (!NewToken.IsValid())
+ {
+ ZEN_WARN("AsyncHttpClient: failed to refresh access token, retrying once");
+ NewToken = m_Settings.AccessTokenProvider.value()();
+ }
+ if (NewToken.IsValid())
+ {
+ m_CachedAccessToken = NewToken;
+ return m_CachedAccessToken.GetValue();
+ }
+ ZEN_WARN("AsyncHttpClient: access token provider returned invalid token");
+ return {};
+ }
+
+ // -- Submit a transfer -----------------------------------------------
+
+ void SubmitTransfer(CURL* Handle, std::unique_ptr<TransferContext> Ctx)
+ {
+ ZEN_TRACE_CPU("AsyncHttpClient::SubmitTransfer");
+ // Setup write/header callbacks
+ curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, CurlWriteCallback);
+ curl_easy_setopt(Handle, CURLOPT_WRITEDATA, &Ctx->WriteData);
+ curl_easy_setopt(Handle, CURLOPT_HEADERFUNCTION, CurlHeaderCallback);
+ curl_easy_setopt(Handle, CURLOPT_HEADERDATA, &Ctx->HeaderData);
+
+ m_Transfers[Handle] = std::move(Ctx);
+
+ CURLMcode Mc = curl_multi_add_handle(m_Multi, Handle);
+ if (Mc != CURLM_OK)
+ {
+ auto Stolen = std::move(m_Transfers[Handle]);
+ m_Transfers.erase(Handle);
+ ReleaseHandle(Handle);
+
+ HttpClient::Response ErrorResponse;
+ ErrorResponse.Error =
+ HttpClient::ErrorContext{.ErrorCode = HttpClientErrorCode::kInternalError,
+ .ErrorMessage = fmt::format("curl_multi_add_handle failed: {}", curl_multi_strerror(Mc))};
+ asio::post(m_IoContext,
+ [Cb = std::move(Stolen->Callback), Response = std::move(ErrorResponse)]() mutable { Cb(std::move(Response)); });
+ return;
+ }
+ }
+
+ // -- Socket-action integration ---------------------------------------
+ //
+ // curl_multi drives I/O via two callbacks:
+ // - SocketCallback: curl tells us which sockets to watch for read/write
+ // - TimerCallback: curl tells us when to fire a timeout
+ //
+ // On each socket event or timeout we call curl_multi_socket_action(),
+ // then drain completed transfers via curl_multi_info_read().
+
+ // Per-socket state: wraps the native fd in an ASIO socket for async_wait.
+ struct SocketInfo
+ {
+ asio::ip::tcp::socket Socket;
+ int WatchFlags = 0; // CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT
+
+ explicit SocketInfo(asio::io_context& IoContext) : Socket(IoContext) {}
+ };
+
+ // Static thunks registered with curl_multi ----------------------------
+
+ static int CurlSocketCallback(CURL* Easy, curl_socket_t Fd, int Action, void* UserPtr, void* SocketPtr)
+ {
+ ZEN_UNUSED(Easy);
+ auto* Self = static_cast<Impl*>(UserPtr);
+ Self->OnCurlSocket(Fd, Action, static_cast<SocketInfo*>(SocketPtr));
+ return 0;
+ }
+
+ static int CurlTimerCallback(CURLM* Multi, long TimeoutMs, void* UserPtr)
+ {
+ ZEN_UNUSED(Multi);
+ auto* Self = static_cast<Impl*>(UserPtr);
+ Self->OnCurlTimer(TimeoutMs);
+ return 0;
+ }
+
+ void SetupMultiCallbacks()
+ {
+ curl_multi_setopt(m_Multi, CURLMOPT_SOCKETFUNCTION, CurlSocketCallback);
+ curl_multi_setopt(m_Multi, CURLMOPT_SOCKETDATA, this);
+ curl_multi_setopt(m_Multi, CURLMOPT_TIMERFUNCTION, CurlTimerCallback);
+ curl_multi_setopt(m_Multi, CURLMOPT_TIMERDATA, this);
+ }
+
+ // Called by curl when socket watch state changes ---------------------
+
+ void OnCurlSocket(curl_socket_t Fd, int Action, SocketInfo* Info)
+ {
+ if (Action == CURL_POLL_REMOVE)
+ {
+ if (Info)
+ {
+ // Cancel pending async_wait ops before releasing the fd.
+ // curl owns the fd, so we must release() rather than close().
+ Info->Socket.cancel();
+ if (Info->Socket.is_open())
+ {
+ Info->Socket.release();
+ }
+ m_Sockets.erase(Fd);
+ }
+ return;
+ }
+
+ if (!Info)
+ {
+ // New socket - wrap the native fd in an ASIO socket.
+ auto [It, Inserted] = m_Sockets.emplace(Fd, std::make_unique<SocketInfo>(m_IoContext));
+ Info = It->second.get();
+
+ asio::error_code Ec;
+ // Determine protocol from the fd (v4 vs v6). Default to v4.
+ Info->Socket.assign(asio::ip::tcp::v4(), Fd, Ec);
+ if (Ec)
+ {
+ // Try v6 as fallback
+ Info->Socket.assign(asio::ip::tcp::v6(), Fd, Ec);
+ }
+ if (Ec)
+ {
+ ZEN_WARN("AsyncHttpClient: failed to assign socket fd {}: {}", static_cast<int>(Fd), Ec.message());
+ m_Sockets.erase(Fd);
+ return;
+ }
+
+ curl_multi_assign(m_Multi, Fd, Info);
+ }
+
+ Info->WatchFlags = Action;
+ SetSocketWatch(Fd, Info);
+ }
+
+ void SetSocketWatch(curl_socket_t Fd, SocketInfo* Info)
+ {
+ // Cancel any pending wait before issuing a new one.
+ Info->Socket.cancel();
+
+ if (Info->WatchFlags & CURL_POLL_IN)
+ {
+ Info->Socket.async_wait(asio::socket_base::wait_read, asio::bind_executor(m_Strand, [this, Fd](const asio::error_code& Ec) {
+ if (Ec || m_ShuttingDown)
+ {
+ return;
+ }
+ OnSocketReady(Fd, CURL_CSELECT_IN);
+ }));
+ }
+
+ if (Info->WatchFlags & CURL_POLL_OUT)
+ {
+ Info->Socket.async_wait(asio::socket_base::wait_write, asio::bind_executor(m_Strand, [this, Fd](const asio::error_code& Ec) {
+ if (Ec || m_ShuttingDown)
+ {
+ return;
+ }
+ OnSocketReady(Fd, CURL_CSELECT_OUT);
+ }));
+ }
+ }
+
+ void OnSocketReady(curl_socket_t Fd, int CurlAction)
+ {
+ ZEN_TRACE_CPU("AsyncHttpClient::OnSocketReady");
+ int StillRunning = 0;
+ curl_multi_socket_action(m_Multi, Fd, CurlAction, &StillRunning);
+ CheckCompleted();
+
+ // Re-arm the watch if the socket is still tracked.
+ auto It = m_Sockets.find(Fd);
+ if (It != m_Sockets.end())
+ {
+ SetSocketWatch(Fd, It->second.get());
+ }
+ }
+
+ // Called by curl when it wants a timeout ------------------------------
+
+ void OnCurlTimer(long TimeoutMs)
+ {
+ m_Timer.cancel();
+
+ if (TimeoutMs < 0)
+ {
+ // curl says "no timeout needed"
+ return;
+ }
+
+ if (TimeoutMs == 0)
+ {
+ // curl wants immediate action - run it directly on the strand.
+ asio::post(m_Strand, [this]() {
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ int StillRunning = 0;
+ curl_multi_socket_action(m_Multi, CURL_SOCKET_TIMEOUT, 0, &StillRunning);
+ CheckCompleted();
+ });
+ return;
+ }
+
+ m_Timer.expires_after(std::chrono::milliseconds(TimeoutMs));
+ m_Timer.async_wait(asio::bind_executor(m_Strand, [this](const asio::error_code& Ec) {
+ if (Ec || m_ShuttingDown)
+ {
+ return;
+ }
+ ZEN_TRACE_CPU("AsyncHttpClient::OnTimeout");
+ int StillRunning = 0;
+ curl_multi_socket_action(m_Multi, CURL_SOCKET_TIMEOUT, 0, &StillRunning);
+ CheckCompleted();
+ }));
+ }
+
+ // Drain completed transfers from curl_multi --------------------------
+
+ void CheckCompleted()
+ {
+ int MsgsLeft = 0;
+ CURLMsg* Msg = nullptr;
+ while ((Msg = curl_multi_info_read(m_Multi, &MsgsLeft)) != nullptr)
+ {
+ if (Msg->msg != CURLMSG_DONE)
+ {
+ continue;
+ }
+
+ CURL* Handle = Msg->easy_handle;
+ CURLcode Result = Msg->data.result;
+
+ curl_multi_remove_handle(m_Multi, Handle);
+
+ auto It = m_Transfers.find(Handle);
+ if (It == m_Transfers.end())
+ {
+ ReleaseHandle(Handle);
+ continue;
+ }
+
+ std::unique_ptr<TransferContext> Ctx = std::move(It->second);
+ m_Transfers.erase(It);
+
+ CompleteTransfer(Handle, Result, std::move(Ctx));
+ }
+ }
+
+ void CompleteTransfer(CURL* Handle, CURLcode CurlResult, std::unique_ptr<TransferContext> Ctx)
+ {
+ ZEN_TRACE_CPU("AsyncHttpClient::CompleteTransfer");
+ // Extract result info
+ long StatusCode = 0;
+ curl_easy_getinfo(Handle, CURLINFO_RESPONSE_CODE, &StatusCode);
+
+ double Elapsed = 0;
+ curl_easy_getinfo(Handle, CURLINFO_TOTAL_TIME, &Elapsed);
+
+ curl_off_t UpBytes = 0;
+ curl_easy_getinfo(Handle, CURLINFO_SIZE_UPLOAD_T, &UpBytes);
+
+ curl_off_t DownBytes = 0;
+ curl_easy_getinfo(Handle, CURLINFO_SIZE_DOWNLOAD_T, &DownBytes);
+
+ ReleaseHandle(Handle);
+
+ // Build response
+ HttpClient::Response Response;
+ Response.StatusCode = HttpResponseCode(StatusCode);
+ Response.UploadedBytes = static_cast<int64_t>(UpBytes);
+ Response.DownloadedBytes = static_cast<int64_t>(DownBytes);
+ Response.ElapsedSeconds = Elapsed;
+ Response.Header = BuildHeaderMap(Ctx->ResponseHeaders);
+
+ if (CurlResult != CURLE_OK)
+ {
+ const char* ErrorMsg = curl_easy_strerror(CurlResult);
+
+ if (CurlResult != CURLE_OPERATION_TIMEDOUT && CurlResult != CURLE_COULDNT_CONNECT && CurlResult != CURLE_ABORTED_BY_CALLBACK)
+ {
+ ZEN_WARN("AsyncHttpClient failure: ({}) '{}'", static_cast<int>(CurlResult), ErrorMsg);
+ }
+
+ if (!Ctx->Body.empty())
+ {
+ Response.ResponsePayload = IoBufferBuilder::MakeCloneFromMemory(Ctx->Body.data(), Ctx->Body.size());
+ }
+
+ Response.Error = HttpClient::ErrorContext{.ErrorCode = MapCurlError(CurlResult), .ErrorMessage = std::string(ErrorMsg)};
+ }
+ else if (StatusCode == static_cast<long>(HttpResponseCode::NoContent) || Ctx->Body.empty())
+ {
+ // No payload
+ }
+ else
+ {
+ IoBuffer PayloadBuffer = IoBufferBuilder::MakeCloneFromMemory(Ctx->Body.data(), Ctx->Body.size());
+ ApplyContentTypeFromHeaders(PayloadBuffer, Ctx->ResponseHeaders);
+
+ const HttpResponseCode Code = HttpResponseCode(StatusCode);
+ if (!IsHttpSuccessCode(Code) && Code != HttpResponseCode::NotFound)
+ {
+ ZEN_WARN("AsyncHttpClient request failed: status={}, base={}", static_cast<int>(Code), m_BaseUri);
+ }
+
+ Response.ResponsePayload = std::move(PayloadBuffer);
+ }
+
+ // Dispatch the user callback off the strand so a slow callback
+ // cannot starve the curl_multi poll loop.
+ asio::post(m_IoContext, [LogRef = m_Log, Cb = std::move(Ctx->Callback), Response = std::move(Response)]() mutable {
+ try
+ {
+ Cb(std::move(Response));
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_SCOPED_LOG(LogRef);
+ ZEN_ERROR("AsyncHttpClient: unhandled exception in completion callback: {}", Ex.what());
+ }
+ });
+ }
+
+ // -- Async verb implementations --------------------------------------
+
+ void DoAsyncGet(std::string Url,
+ AsyncHttpCallback Callback,
+ HttpClient::KeyValueMap AdditionalHeader,
+ HttpClient::KeyValueMap Parameters)
+ {
+ asio::post(m_Strand,
+ [this,
+ Url = std::move(Url),
+ Callback = std::move(Callback),
+ AdditionalHeader = std::move(AdditionalHeader),
+ Parameters = std::move(Parameters)]() mutable {
+ ZEN_TRACE_CPU("AsyncHttpClient::Get");
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ CURL* Handle = AllocHandle();
+ ConfigureHandle(Handle, Url, Parameters);
+ curl_easy_setopt(Handle, CURLOPT_HTTPGET, 1L);
+
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken());
+ curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
+
+ SubmitTransfer(Handle, std::move(Ctx));
+ });
+ }
+
+ void DoAsyncHead(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader)
+ {
+ asio::post(m_Strand,
+ [this, Url = std::move(Url), Callback = std::move(Callback), AdditionalHeader = std::move(AdditionalHeader)]() mutable {
+ ZEN_TRACE_CPU("AsyncHttpClient::Head");
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ CURL* Handle = AllocHandle();
+ ConfigureHandle(Handle, Url, {});
+ curl_easy_setopt(Handle, CURLOPT_NOBODY, 1L);
+
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken());
+ curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
+
+ SubmitTransfer(Handle, std::move(Ctx));
+ });
+ }
+
+ void DoAsyncDelete(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader)
+ {
+ asio::post(m_Strand,
+ [this, Url = std::move(Url), Callback = std::move(Callback), AdditionalHeader = std::move(AdditionalHeader)]() mutable {
+ ZEN_TRACE_CPU("AsyncHttpClient::Delete");
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ CURL* Handle = AllocHandle();
+ ConfigureHandle(Handle, Url, {});
+ curl_easy_setopt(Handle, CURLOPT_CUSTOMREQUEST, "DELETE");
+
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken());
+ curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
+
+ SubmitTransfer(Handle, std::move(Ctx));
+ });
+ }
+
+ void DoAsyncPost(std::string Url,
+ AsyncHttpCallback Callback,
+ HttpClient::KeyValueMap AdditionalHeader,
+ HttpClient::KeyValueMap Parameters)
+ {
+ asio::post(m_Strand,
+ [this,
+ Url = std::move(Url),
+ Callback = std::move(Callback),
+ AdditionalHeader = std::move(AdditionalHeader),
+ Parameters = std::move(Parameters)]() mutable {
+ ZEN_TRACE_CPU("AsyncHttpClient::Post");
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ CURL* Handle = AllocHandle();
+ ConfigureHandle(Handle, Url, Parameters);
+ curl_easy_setopt(Handle, CURLOPT_POST, 1L);
+ curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE, 0L);
+
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken());
+ curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
+
+ SubmitTransfer(Handle, std::move(Ctx));
+ });
+ }
+
+ void DoAsyncPostWithPayload(std::string Url,
+ IoBuffer Payload,
+ ZenContentType ContentType,
+ AsyncHttpCallback Callback,
+ HttpClient::KeyValueMap AdditionalHeader)
+ {
+ asio::post(m_Strand,
+ [this,
+ Url = std::move(Url),
+ Payload = std::move(Payload),
+ ContentType,
+ Callback = std::move(Callback),
+ AdditionalHeader = std::move(AdditionalHeader)]() mutable {
+ ZEN_TRACE_CPU("AsyncHttpClient::PostWithPayload");
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ CURL* Handle = AllocHandle();
+ ConfigureHandle(Handle, Url, {});
+ curl_easy_setopt(Handle, CURLOPT_POST, 1L);
+
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->PayloadBuffer = std::move(Payload);
+ Ctx->HeaderList =
+ BuildHeaderList(AdditionalHeader,
+ m_SessionId,
+ GetAccessToken(),
+ {std::make_pair("Content-Type", std::string(MapContentTypeToString(ContentType)))});
+ curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
+
+ // Set up read callback for payload data
+ Ctx->ReadData.DataPtr = static_cast<const uint8_t*>(Ctx->PayloadBuffer.GetData());
+ Ctx->ReadData.DataSize = Ctx->PayloadBuffer.GetSize();
+ Ctx->ReadData.Offset = 0;
+
+ curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE_LARGE, static_cast<curl_off_t>(Ctx->PayloadBuffer.GetSize()));
+ curl_easy_setopt(Handle, CURLOPT_READFUNCTION, CurlReadCallback);
+ curl_easy_setopt(Handle, CURLOPT_READDATA, &Ctx->ReadData);
+
+ SubmitTransfer(Handle, std::move(Ctx));
+ });
+ }
+
+ void DoAsyncPutWithPayload(std::string Url,
+ IoBuffer Payload,
+ AsyncHttpCallback Callback,
+ HttpClient::KeyValueMap AdditionalHeader,
+ HttpClient::KeyValueMap Parameters)
+ {
+ asio::post(m_Strand,
+ [this,
+ Url = std::move(Url),
+ Payload = std::move(Payload),
+ Callback = std::move(Callback),
+ AdditionalHeader = std::move(AdditionalHeader),
+ Parameters = std::move(Parameters)]() mutable {
+ ZEN_TRACE_CPU("AsyncHttpClient::Put");
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ CURL* Handle = AllocHandle();
+ ConfigureHandle(Handle, Url, Parameters);
+ curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L);
+
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->PayloadBuffer = std::move(Payload);
+ Ctx->HeaderList = BuildHeaderList(
+ AdditionalHeader,
+ m_SessionId,
+ GetAccessToken(),
+ {std::make_pair("Content-Type", std::string(MapContentTypeToString(Ctx->PayloadBuffer.GetContentType())))});
+ curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
+
+ Ctx->ReadData.DataPtr = static_cast<const uint8_t*>(Ctx->PayloadBuffer.GetData());
+ Ctx->ReadData.DataSize = Ctx->PayloadBuffer.GetSize();
+ Ctx->ReadData.Offset = 0;
+
+ curl_easy_setopt(Handle, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(Ctx->PayloadBuffer.GetSize()));
+ curl_easy_setopt(Handle, CURLOPT_READFUNCTION, CurlReadCallback);
+ curl_easy_setopt(Handle, CURLOPT_READDATA, &Ctx->ReadData);
+
+ SubmitTransfer(Handle, std::move(Ctx));
+ });
+ }
+
+ void DoAsyncPutNoPayload(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap Parameters)
+ {
+ asio::post(m_Strand, [this, Url = std::move(Url), Callback = std::move(Callback), Parameters = std::move(Parameters)]() mutable {
+ ZEN_TRACE_CPU("AsyncHttpClient::Put");
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ CURL* Handle = AllocHandle();
+ ConfigureHandle(Handle, Url, Parameters);
+ curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L);
+ curl_easy_setopt(Handle, CURLOPT_INFILESIZE_LARGE, 0LL);
+
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+
+ HttpClient::KeyValueMap ContentLengthHeader{std::pair<std::string_view, std::string_view>{"Content-Length", "0"}};
+ Ctx->HeaderList = BuildHeaderList(ContentLengthHeader, m_SessionId, GetAccessToken());
+ curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
+
+ SubmitTransfer(Handle, std::move(Ctx));
+ });
+ }
+
+ // -- Members ---------------------------------------------------------
+
+ std::string m_BaseUri;
+ HttpClientSettings m_Settings;
+ LoggerRef m_Log;
+ std::string m_SessionId;
+ std::string m_UnixSocketPathUtf8;
+
+ // io_context and strand - all curl_multi operations are serialized on the
+ // strand, making this safe even when the io_context has multiple threads.
+ std::unique_ptr<asio::io_context> m_OwnedIoContext;
+ asio::io_context& m_IoContext;
+ asio::strand<asio::io_context::executor_type> m_Strand;
+ std::optional<asio::executor_work_guard<asio::io_context::executor_type>> m_WorkGuard;
+ std::thread m_IoThread;
+
+ // curl_multi and socket-action state
+ CURLM* m_Multi = nullptr;
+ std::unordered_map<CURL*, std::unique_ptr<TransferContext>> m_Transfers;
+ std::vector<CURL*> m_HandlePool;
+ std::unordered_map<curl_socket_t, std::unique_ptr<SocketInfo>> m_Sockets;
+ asio::steady_timer m_Timer;
+ bool m_ShuttingDown = false;
+
+ // Access token cache
+ RwLock m_AccessTokenLock;
+ HttpClientAccessToken m_CachedAccessToken;
+};
+
+//////////////////////////////////////////////////////////////////////////
+//
+// AsyncHttpClient public API
+
+AsyncHttpClient::AsyncHttpClient(std::string_view BaseUri, const HttpClientSettings& Settings)
+: m_Impl(std::make_unique<Impl>(BaseUri, Settings))
+{
+}
+
+AsyncHttpClient::AsyncHttpClient(std::string_view BaseUri, asio::io_context& IoContext, const HttpClientSettings& Settings)
+: m_Impl(std::make_unique<Impl>(BaseUri, IoContext, Settings))
+{
+}
+
+AsyncHttpClient::~AsyncHttpClient() = default;
+
+// -- Callback-based API --------------------------------------------------
+
+void
+AsyncHttpClient::AsyncGet(std::string_view Url,
+ AsyncHttpCallback Callback,
+ const KeyValueMap& AdditionalHeader,
+ const KeyValueMap& Parameters)
+{
+ m_Impl->DoAsyncGet(std::string(Url), std::move(Callback), AdditionalHeader, Parameters);
+}
+
+void
+AsyncHttpClient::AsyncHead(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader)
+{
+ m_Impl->DoAsyncHead(std::string(Url), std::move(Callback), AdditionalHeader);
+}
+
+void
+AsyncHttpClient::AsyncDelete(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader)
+{
+ m_Impl->DoAsyncDelete(std::string(Url), std::move(Callback), AdditionalHeader);
+}
+
+void
+AsyncHttpClient::AsyncPost(std::string_view Url,
+ AsyncHttpCallback Callback,
+ const KeyValueMap& AdditionalHeader,
+ const KeyValueMap& Parameters)
+{
+ m_Impl->DoAsyncPost(std::string(Url), std::move(Callback), AdditionalHeader, Parameters);
+}
+
+void
+AsyncHttpClient::AsyncPost(std::string_view Url, const IoBuffer& Payload, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader)
+{
+ m_Impl->DoAsyncPostWithPayload(std::string(Url), Payload, Payload.GetContentType(), std::move(Callback), AdditionalHeader);
+}
+
+void
+AsyncHttpClient::AsyncPost(std::string_view Url,
+ const IoBuffer& Payload,
+ ZenContentType ContentType,
+ AsyncHttpCallback Callback,
+ const KeyValueMap& AdditionalHeader)
+{
+ m_Impl->DoAsyncPostWithPayload(std::string(Url), Payload, ContentType, std::move(Callback), AdditionalHeader);
+}
+
+void
+AsyncHttpClient::AsyncPut(std::string_view Url,
+ const IoBuffer& Payload,
+ AsyncHttpCallback Callback,
+ const KeyValueMap& AdditionalHeader,
+ const KeyValueMap& Parameters)
+{
+ m_Impl->DoAsyncPutWithPayload(std::string(Url), Payload, std::move(Callback), AdditionalHeader, Parameters);
+}
+
+void
+AsyncHttpClient::AsyncPut(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& Parameters)
+{
+ m_Impl->DoAsyncPutNoPayload(std::string(Url), std::move(Callback), Parameters);
+}
+
+// -- Future-based API ----------------------------------------------------
+
+std::future<HttpClient::Response>
+AsyncHttpClient::Get(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters)
+{
+ auto Promise = std::make_shared<std::promise<Response>>();
+ auto Future = Promise->get_future();
+ AsyncGet(
+ Url,
+ [Promise](Response R) { Promise->set_value(std::move(R)); },
+ AdditionalHeader,
+ Parameters);
+ return Future;
+}
+
+std::future<HttpClient::Response>
+AsyncHttpClient::Head(std::string_view Url, const KeyValueMap& AdditionalHeader)
+{
+ auto Promise = std::make_shared<std::promise<Response>>();
+ auto Future = Promise->get_future();
+ AsyncHead(
+ Url,
+ [Promise](Response R) { Promise->set_value(std::move(R)); },
+ AdditionalHeader);
+ return Future;
+}
+
+std::future<HttpClient::Response>
+AsyncHttpClient::Delete(std::string_view Url, const KeyValueMap& AdditionalHeader)
+{
+ auto Promise = std::make_shared<std::promise<Response>>();
+ auto Future = Promise->get_future();
+ AsyncDelete(
+ Url,
+ [Promise](Response R) { Promise->set_value(std::move(R)); },
+ AdditionalHeader);
+ return Future;
+}
+
+std::future<HttpClient::Response>
+AsyncHttpClient::Post(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters)
+{
+ auto Promise = std::make_shared<std::promise<Response>>();
+ auto Future = Promise->get_future();
+ AsyncPost(
+ Url,
+ [Promise](Response R) { Promise->set_value(std::move(R)); },
+ AdditionalHeader,
+ Parameters);
+ return Future;
+}
+
+std::future<HttpClient::Response>
+AsyncHttpClient::Post(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader)
+{
+ auto Promise = std::make_shared<std::promise<Response>>();
+ auto Future = Promise->get_future();
+ AsyncPost(
+ Url,
+ Payload,
+ [Promise](Response R) { Promise->set_value(std::move(R)); },
+ AdditionalHeader);
+ return Future;
+}
+
+std::future<HttpClient::Response>
+AsyncHttpClient::Post(std::string_view Url, const IoBuffer& Payload, ZenContentType ContentType, const KeyValueMap& AdditionalHeader)
+{
+ auto Promise = std::make_shared<std::promise<Response>>();
+ auto Future = Promise->get_future();
+ AsyncPost(
+ Url,
+ Payload,
+ ContentType,
+ [Promise](Response R) { Promise->set_value(std::move(R)); },
+ AdditionalHeader);
+ return Future;
+}
+
+std::future<HttpClient::Response>
+AsyncHttpClient::Put(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters)
+{
+ auto Promise = std::make_shared<std::promise<Response>>();
+ auto Future = Promise->get_future();
+ AsyncPut(
+ Url,
+ Payload,
+ [Promise](Response R) { Promise->set_value(std::move(R)); },
+ AdditionalHeader,
+ Parameters);
+ return Future;
+}
+
+std::future<HttpClient::Response>
+AsyncHttpClient::Put(std::string_view Url, const KeyValueMap& Parameters)
+{
+ auto Promise = std::make_shared<std::promise<Response>>();
+ auto Future = Promise->get_future();
+ AsyncPut(
+ Url,
+ [Promise](Response R) { Promise->set_value(std::move(R)); },
+ Parameters);
+ return Future;
+}
+
+} // namespace zen
diff --git a/src/zenhttp/clients/httpclientcurl.cpp b/src/zenhttp/clients/httpclientcurl.cpp
index d150b44c6..eee80c269 100644
--- a/src/zenhttp/clients/httpclientcurl.cpp
+++ b/src/zenhttp/clients/httpclientcurl.cpp
@@ -1,6 +1,7 @@
// Copyright Epic Games, Inc. All Rights Reserved.
#include "httpclientcurl.h"
+#include "httpclientcurlhelpers.h"
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
@@ -29,153 +30,7 @@ static std::atomic<uint32_t> CurlHttpClientRequestIdCounter{0};
//////////////////////////////////////////////////////////////////////////
-static HttpClientErrorCode
-MapCurlError(CURLcode Code)
-{
- switch (Code)
- {
- case CURLE_OK:
- return HttpClientErrorCode::kOK;
- case CURLE_COULDNT_CONNECT:
- return HttpClientErrorCode::kConnectionFailure;
- case CURLE_COULDNT_RESOLVE_HOST:
- return HttpClientErrorCode::kHostResolutionFailure;
- case CURLE_COULDNT_RESOLVE_PROXY:
- return HttpClientErrorCode::kProxyResolutionFailure;
- case CURLE_RECV_ERROR:
- return HttpClientErrorCode::kNetworkReceiveError;
- case CURLE_SEND_ERROR:
- return HttpClientErrorCode::kNetworkSendFailure;
- case CURLE_OPERATION_TIMEDOUT:
- return HttpClientErrorCode::kOperationTimedOut;
- case CURLE_SSL_CONNECT_ERROR:
- return HttpClientErrorCode::kSSLConnectError;
- case CURLE_SSL_CERTPROBLEM:
- return HttpClientErrorCode::kSSLCertificateError;
- case CURLE_PEER_FAILED_VERIFICATION:
- return HttpClientErrorCode::kSSLCACertError;
- case CURLE_SSL_CIPHER:
- case CURLE_SSL_ENGINE_NOTFOUND:
- case CURLE_SSL_ENGINE_SETFAILED:
- return HttpClientErrorCode::kGenericSSLError;
- case CURLE_ABORTED_BY_CALLBACK:
- return HttpClientErrorCode::kRequestCancelled;
- default:
- return HttpClientErrorCode::kOtherError;
- }
-}
-
-//////////////////////////////////////////////////////////////////////////
-//
-// Curl callback helpers
-
-struct WriteCallbackData
-{
- std::string* Body = nullptr;
- std::function<bool()>* CheckIfAbortFunction = nullptr;
-};
-
-static size_t
-CurlWriteCallback(char* Ptr, size_t Size, size_t Nmemb, void* UserData)
-{
- auto* Data = static_cast<WriteCallbackData*>(UserData);
- size_t TotalBytes = Size * Nmemb;
-
- if (Data->CheckIfAbortFunction && *Data->CheckIfAbortFunction && (*Data->CheckIfAbortFunction)())
- {
- return 0; // Signal abort to curl
- }
-
- Data->Body->append(Ptr, TotalBytes);
- return TotalBytes;
-}
-
-struct HeaderCallbackData
-{
- std::vector<std::pair<std::string, std::string>>* Headers = nullptr;
-};
-
-// Trims trailing CRLF, splits on the first colon, and trims whitespace from key and value.
-// Returns nullopt for blank lines or lines without a colon (e.g. HTTP status lines).
-static std::optional<std::pair<std::string_view, std::string_view>>
-ParseHeaderLine(std::string_view Line)
-{
- while (!Line.empty() && (Line.back() == '\r' || Line.back() == '\n'))
- {
- Line.remove_suffix(1);
- }
-
- if (Line.empty())
- {
- return std::nullopt;
- }
-
- size_t ColonPos = Line.find(':');
- if (ColonPos == std::string_view::npos)
- {
- return std::nullopt;
- }
-
- std::string_view Key = Line.substr(0, ColonPos);
- std::string_view Value = Line.substr(ColonPos + 1);
-
- while (!Key.empty() && Key.back() == ' ')
- {
- Key.remove_suffix(1);
- }
- while (!Value.empty() && Value.front() == ' ')
- {
- Value.remove_prefix(1);
- }
-
- return std::pair{Key, Value};
-}
-
-static size_t
-CurlHeaderCallback(char* Buffer, size_t Size, size_t Nmemb, void* UserData)
-{
- auto* Data = static_cast<HeaderCallbackData*>(UserData);
- size_t TotalBytes = Size * Nmemb;
-
- if (auto Header = ParseHeaderLine(std::string_view(Buffer, TotalBytes)))
- {
- auto& [Key, Value] = *Header;
- Data->Headers->emplace_back(std::string(Key), std::string(Value));
- }
-
- return TotalBytes;
-}
-
-struct ReadCallbackData
-{
- const uint8_t* DataPtr = nullptr;
- size_t DataSize = 0;
- size_t Offset = 0;
- std::function<bool()>* CheckIfAbortFunction = nullptr;
-};
-
-static size_t
-CurlReadCallback(char* Buffer, size_t Size, size_t Nmemb, void* UserData)
-{
- auto* Data = static_cast<ReadCallbackData*>(UserData);
- size_t MaxRead = Size * Nmemb;
-
- if (Data->CheckIfAbortFunction && *Data->CheckIfAbortFunction && (*Data->CheckIfAbortFunction)())
- {
- return CURL_READFUNC_ABORT;
- }
-
- size_t Remaining = Data->DataSize - Data->Offset;
- size_t ToRead = std::min(MaxRead, Remaining);
-
- if (ToRead > 0)
- {
- memcpy(Buffer, Data->DataPtr + Data->Offset, ToRead);
- Data->Offset += ToRead;
- }
-
- return ToRead;
-}
+// Curl callback helpers and shared utilities are in httpclientcurlhelpers.h
struct StreamReadCallbackData
{
@@ -233,7 +88,7 @@ CurlDebugCallback(CURL* Handle, curl_infotype Type, char* Data, size_t Size, voi
{
ZEN_UNUSED(Handle);
LoggerRef LogRef = *static_cast<LoggerRef*>(UserPtr);
- auto Log = [&]() -> LoggerRef { return LogRef; };
+ ZEN_SCOPED_LOG(LogRef);
std::string_view DataView(Data, Size);
@@ -281,120 +136,6 @@ CurlDebugCallback(CURL* Handle, curl_infotype Type, char* Data, size_t Size, voi
//////////////////////////////////////////////////////////////////////////
-static std::pair<std::string, std::string>
-HeaderContentType(ZenContentType ContentType)
-{
- return std::make_pair("Content-Type", std::string(MapContentTypeToString(ContentType)));
-}
-
-static curl_slist*
-BuildHeaderList(const HttpClient::KeyValueMap& AdditionalHeader,
- std::string_view SessionId,
- const std::optional<std::string>& AccessToken,
- const std::vector<std::pair<std::string, std::string>>& ExtraHeaders = {})
-{
- curl_slist* Headers = nullptr;
-
- for (const auto& [Key, Value] : *AdditionalHeader)
- {
- ExtendableStringBuilder<64> HeaderLine;
- HeaderLine << Key << ": " << Value;
- Headers = curl_slist_append(Headers, HeaderLine.c_str());
- }
-
- if (!SessionId.empty())
- {
- ExtendableStringBuilder<64> SessionHeader;
- SessionHeader << "UE-Session: " << SessionId;
- Headers = curl_slist_append(Headers, SessionHeader.c_str());
- }
-
- if (AccessToken.has_value())
- {
- ExtendableStringBuilder<128> AuthHeader;
- AuthHeader << "Authorization: " << AccessToken.value();
- Headers = curl_slist_append(Headers, AuthHeader.c_str());
- }
-
- for (const auto& [Key, Value] : ExtraHeaders)
- {
- ExtendableStringBuilder<128> HeaderLine;
- HeaderLine << Key << ": " << Value;
- Headers = curl_slist_append(Headers, HeaderLine.c_str());
- }
-
- return Headers;
-}
-
-static HttpClient::KeyValueMap
-BuildHeaderMap(const std::vector<std::pair<std::string, std::string>>& Headers)
-{
- HttpClient::KeyValueMap HeaderMap;
- for (const auto& [Key, Value] : Headers)
- {
- HeaderMap->insert_or_assign(Key, Value);
- }
- return HeaderMap;
-}
-
-// Scans response headers for Content-Type and applies it to the buffer.
-static void
-ApplyContentTypeFromHeaders(IoBuffer& Buffer, const std::vector<std::pair<std::string, std::string>>& Headers)
-{
- for (const auto& [Key, Value] : Headers)
- {
- if (StrCaseCompare(Key, "Content-Type") == 0)
- {
- Buffer.SetContentType(ParseContentType(Value));
- break;
- }
- }
-}
-
-static void
-AppendUrlEncoded(StringBuilderBase& Out, std::string_view Input)
-{
- static constexpr char HexDigits[] = "0123456789ABCDEF";
- static constexpr AsciiSet Unreserved("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~");
-
- for (char C : Input)
- {
- if (Unreserved.Contains(C))
- {
- Out.Append(C);
- }
- else
- {
- uint8_t Byte = static_cast<uint8_t>(C);
- char Encoded[3] = {'%', HexDigits[Byte >> 4], HexDigits[Byte & 0x0F]};
- Out.Append(std::string_view(Encoded, 3));
- }
- }
-}
-
-static void
-BuildUrlWithParameters(StringBuilderBase& Url,
- std::string_view BaseUrl,
- std::string_view ResourcePath,
- const HttpClient::KeyValueMap& Parameters)
-{
- Url.Append(BaseUrl);
- Url.Append(ResourcePath);
-
- if (!Parameters->empty())
- {
- char Separator = '?';
- for (const auto& [Key, Value] : *Parameters)
- {
- Url.Append(Separator);
- AppendUrlEncoded(Url, Key);
- Url.Append('=');
- AppendUrlEncoded(Url, Value);
- Separator = '&';
- }
- }
-}
-
//////////////////////////////////////////////////////////////////////////
CurlHttpClient::CurlHttpClient(std::string_view BaseUri,
@@ -440,9 +181,9 @@ CurlHttpClient::CurlResult
CurlHttpClient::Session::PerformWithResponseCallbacks()
{
std::string Body;
- WriteCallbackData WriteData{.Body = &Body,
+ CurlWriteCallbackData WriteData{.Body = &Body,
.CheckIfAbortFunction = Outer->m_CheckIfAbortFunction ? &Outer->m_CheckIfAbortFunction : nullptr};
- HeaderCallbackData HdrData{};
+ CurlHeaderCallbackData HdrData{};
std::vector<std::pair<std::string, std::string>> ResponseHeaders;
HdrData.Headers = &ResponseHeaders;
@@ -487,6 +228,13 @@ CurlHttpClient::Session::Perform()
curl_easy_getinfo(Handle, CURLINFO_SIZE_DOWNLOAD_T, &DownBytes);
Result.DownloadedBytes = static_cast<int64_t>(DownBytes);
+ char* EffectiveUrl = nullptr;
+ curl_easy_getinfo(Handle, CURLINFO_EFFECTIVE_URL, &EffectiveUrl);
+ if (EffectiveUrl)
+ {
+ Result.Url = EffectiveUrl;
+ }
+
return Result;
}
@@ -553,8 +301,9 @@ CurlHttpClient::CommonResponse(std::string_view SessionId,
if (Result.ErrorCode != CURLE_OPERATION_TIMEDOUT && Result.ErrorCode != CURLE_COULDNT_CONNECT &&
Result.ErrorCode != CURLE_ABORTED_BY_CALLBACK)
{
- ZEN_WARN("HttpClient client failure (session: {}): ({}) '{}'",
+ ZEN_WARN("HttpClient client failure (session: {}, url: {}): ({}) '{}'",
SessionId,
+ Result.Url,
static_cast<int>(Result.ErrorCode),
Result.ErrorMessage);
}
@@ -699,9 +448,11 @@ CurlHttpClient::ShouldRetry(const CurlResult& Result)
{
case CURLE_OK:
break;
+ case CURLE_COULDNT_CONNECT:
case CURLE_RECV_ERROR:
case CURLE_SEND_ERROR:
case CURLE_OPERATION_TIMEDOUT:
+ case CURLE_PARTIAL_FILE:
return true;
default:
return false;
@@ -748,10 +499,11 @@ CurlHttpClient::DoWithRetry(std::string_view SessionId, std::function<CurlResult
{
if (Result.ErrorCode != CURLE_OK)
{
- ZEN_INFO("Retry (session: {}): HTTP error ({}) '{}' Attempt {}/{}",
+ ZEN_INFO("Retry (session: {}): HTTP error ({}) '{}' (Curl error: {}) Attempt {}/{}",
SessionId,
static_cast<int>(MapCurlError(Result.ErrorCode)),
Result.ErrorMessage,
+ static_cast<int>(Result.ErrorCode),
Attempt,
m_ConnectionSettings.RetryCount + 1);
}
@@ -856,6 +608,12 @@ CurlHttpClient::AllocSession(std::string_view ResourcePath, const KeyValueMap& P
// Disable signal handling for thread safety
curl_easy_setopt(Handle, CURLOPT_NOSIGNAL, 1L);
+ if (m_ConnectionSettings.FollowRedirects)
+ {
+ curl_easy_setopt(Handle, CURLOPT_FOLLOWLOCATION, 1L);
+ curl_easy_setopt(Handle, CURLOPT_MAXREDIRS, static_cast<long>(m_ConnectionSettings.MaxRedirects));
+ }
+
if (m_ConnectionSettings.ForbidReuseConnection)
{
curl_easy_setopt(Handle, CURLOPT_FORBID_REUSE, 1L);
@@ -998,9 +756,9 @@ CurlHttpClient::Put(std::string_view Url, const IoBuffer& Payload, const KeyValu
curl_easy_setopt(H, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(H, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(Payload.GetSize()));
- ReadCallbackData ReadData{.DataPtr = static_cast<const uint8_t*>(Payload.GetData()),
- .DataSize = Payload.GetSize(),
- .CheckIfAbortFunction = m_CheckIfAbortFunction ? &m_CheckIfAbortFunction : nullptr};
+ CurlReadCallbackData ReadData{.DataPtr = static_cast<const uint8_t*>(Payload.GetData()),
+ .DataSize = Payload.GetSize(),
+ .CheckIfAbortFunction = m_CheckIfAbortFunction ? &m_CheckIfAbortFunction : nullptr};
curl_easy_setopt(H, CURLOPT_READFUNCTION, CurlReadCallback);
curl_easy_setopt(H, CURLOPT_READDATA, &ReadData);
@@ -1213,7 +971,7 @@ CurlHttpClient::Post(std::string_view Url,
std::error_code Ec = (*Data->PayloadFile)->Open(*Data->TempFolderPath, ContentLength.value());
if (Ec)
{
- auto Log = [&]() -> LoggerRef { return Data->Log; };
+ ZEN_SCOPED_LOG(Data->Log);
ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Post. Reason: {}",
Data->TempFolderPath->string(),
Ec.message());
@@ -1266,7 +1024,7 @@ CurlHttpClient::Post(std::string_view Url,
std::error_code Ec = (*Data->PayloadFile)->Write(std::string_view(Ptr, TotalBytes));
if (Ec)
{
- auto Log = [&]() -> LoggerRef { return Data->Log; };
+ ZEN_SCOPED_LOG(Data->Log);
ZEN_WARN("Failed to write to temp file in '{}' for HttpClient::Post. Reason: {}",
Data->TempFolderPath->string(),
Ec.message());
@@ -1367,9 +1125,9 @@ CurlHttpClient::Upload(std::string_view Url, const IoBuffer& Payload, const KeyV
return Sess.PerformWithResponseCallbacks();
}
- ReadCallbackData ReadData{.DataPtr = static_cast<const uint8_t*>(Payload.GetData()),
- .DataSize = Payload.GetSize(),
- .CheckIfAbortFunction = m_CheckIfAbortFunction ? &m_CheckIfAbortFunction : nullptr};
+ CurlReadCallbackData ReadData{.DataPtr = static_cast<const uint8_t*>(Payload.GetData()),
+ .DataSize = Payload.GetSize(),
+ .CheckIfAbortFunction = m_CheckIfAbortFunction ? &m_CheckIfAbortFunction : nullptr};
curl_easy_setopt(H, CURLOPT_READFUNCTION, CurlReadCallback);
curl_easy_setopt(H, CURLOPT_READDATA, &ReadData);
@@ -1532,7 +1290,7 @@ CurlHttpClient::Download(std::string_view Url, const std::filesystem::path& Temp
std::error_code Ec = (*Data->PayloadFile)->Open(*Data->TempFolderPath, ContentLength.value());
if (Ec)
{
- auto Log = [&]() -> LoggerRef { return Data->Log; };
+ ZEN_SCOPED_LOG(Data->Log);
ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}",
Data->TempFolderPath->string(),
Ec.message());
@@ -1618,7 +1376,7 @@ CurlHttpClient::Download(std::string_view Url, const std::filesystem::path& Temp
std::error_code Ec = (*Data->PayloadFile)->Write(std::string_view(Ptr, TotalBytes));
if (Ec)
{
- auto Log = [&]() -> LoggerRef { return Data->Log; };
+ ZEN_SCOPED_LOG(Data->Log);
ZEN_WARN("Failed to write to temp file in '{}' for HttpClient::Download. Reason: {}",
Data->TempFolderPath->string(),
Ec.message());
diff --git a/src/zenhttp/clients/httpclientcurl.h b/src/zenhttp/clients/httpclientcurl.h
index bdeb46633..ea9193e65 100644
--- a/src/zenhttp/clients/httpclientcurl.h
+++ b/src/zenhttp/clients/httpclientcurl.h
@@ -73,6 +73,7 @@ private:
int64_t DownloadedBytes = 0;
CURLcode ErrorCode = CURLE_OK;
std::string ErrorMessage;
+ std::string Url;
};
struct Session
diff --git a/src/zenhttp/clients/httpclientcurlhelpers.h b/src/zenhttp/clients/httpclientcurlhelpers.h
new file mode 100644
index 000000000..cb5f5d9a9
--- /dev/null
+++ b/src/zenhttp/clients/httpclientcurlhelpers.h
@@ -0,0 +1,298 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+// Shared helpers for curl-based HTTP client implementations (sync and async).
+// This is an internal header, not part of the public API.
+
+#include <zencore/string.h>
+
+#include <zenhttp/httpclient.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <curl/curl.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <optional>
+#include <string>
+#include <utility>
+#include <vector>
+
+namespace zen {
+
+//////////////////////////////////////////////////////////////////////////
+//
+// Error mapping
+
+inline HttpClientErrorCode
+MapCurlError(CURLcode Code)
+{
+ switch (Code)
+ {
+ case CURLE_OK:
+ return HttpClientErrorCode::kOK;
+ case CURLE_COULDNT_CONNECT:
+ return HttpClientErrorCode::kConnectionFailure;
+ case CURLE_COULDNT_RESOLVE_HOST:
+ return HttpClientErrorCode::kHostResolutionFailure;
+ case CURLE_COULDNT_RESOLVE_PROXY:
+ return HttpClientErrorCode::kProxyResolutionFailure;
+ case CURLE_RECV_ERROR:
+ return HttpClientErrorCode::kNetworkReceiveError;
+ case CURLE_SEND_ERROR:
+ return HttpClientErrorCode::kNetworkSendFailure;
+ case CURLE_OPERATION_TIMEDOUT:
+ return HttpClientErrorCode::kOperationTimedOut;
+ case CURLE_SSL_CONNECT_ERROR:
+ return HttpClientErrorCode::kSSLConnectError;
+ case CURLE_SSL_CERTPROBLEM:
+ return HttpClientErrorCode::kSSLCertificateError;
+ case CURLE_PEER_FAILED_VERIFICATION:
+ return HttpClientErrorCode::kSSLCACertError;
+ case CURLE_SSL_CIPHER:
+ case CURLE_SSL_ENGINE_NOTFOUND:
+ case CURLE_SSL_ENGINE_SETFAILED:
+ return HttpClientErrorCode::kGenericSSLError;
+ case CURLE_ABORTED_BY_CALLBACK:
+ return HttpClientErrorCode::kRequestCancelled;
+ default:
+ return HttpClientErrorCode::kOtherError;
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+//
+// Curl callback data structures and callbacks
+
+struct CurlWriteCallbackData
+{
+ std::string* Body = nullptr;
+ std::function<bool()>* CheckIfAbortFunction = nullptr;
+};
+
+inline size_t
+CurlWriteCallback(char* Ptr, size_t Size, size_t Nmemb, void* UserData)
+{
+ auto* Data = static_cast<CurlWriteCallbackData*>(UserData);
+ size_t TotalBytes = Size * Nmemb;
+
+ if (Data->CheckIfAbortFunction && *Data->CheckIfAbortFunction && (*Data->CheckIfAbortFunction)())
+ {
+ return 0; // Signal abort to curl
+ }
+
+ Data->Body->append(Ptr, TotalBytes);
+ return TotalBytes;
+}
+
+struct CurlHeaderCallbackData
+{
+ std::vector<std::pair<std::string, std::string>>* Headers = nullptr;
+};
+
+// Trims trailing CRLF, splits on the first colon, and trims whitespace from key and value.
+// Returns nullopt for blank lines or lines without a colon (e.g. HTTP status lines).
+inline std::optional<std::pair<std::string_view, std::string_view>>
+ParseHeaderLine(std::string_view Line)
+{
+ while (!Line.empty() && (Line.back() == '\r' || Line.back() == '\n'))
+ {
+ Line.remove_suffix(1);
+ }
+
+ if (Line.empty())
+ {
+ return std::nullopt;
+ }
+
+ size_t ColonPos = Line.find(':');
+ if (ColonPos == std::string_view::npos)
+ {
+ return std::nullopt;
+ }
+
+ std::string_view Key = Line.substr(0, ColonPos);
+ std::string_view Value = Line.substr(ColonPos + 1);
+
+ while (!Key.empty() && Key.back() == ' ')
+ {
+ Key.remove_suffix(1);
+ }
+ while (!Value.empty() && Value.front() == ' ')
+ {
+ Value.remove_prefix(1);
+ }
+
+ return std::pair{Key, Value};
+}
+
+inline size_t
+CurlHeaderCallback(char* Buffer, size_t Size, size_t Nmemb, void* UserData)
+{
+ auto* Data = static_cast<CurlHeaderCallbackData*>(UserData);
+ size_t TotalBytes = Size * Nmemb;
+
+ if (auto Header = ParseHeaderLine(std::string_view(Buffer, TotalBytes)))
+ {
+ auto& [Key, Value] = *Header;
+ Data->Headers->emplace_back(std::string(Key), std::string(Value));
+ }
+
+ return TotalBytes;
+}
+
+struct CurlReadCallbackData
+{
+ const uint8_t* DataPtr = nullptr;
+ size_t DataSize = 0;
+ size_t Offset = 0;
+ std::function<bool()>* CheckIfAbortFunction = nullptr;
+};
+
+inline size_t
+CurlReadCallback(char* Buffer, size_t Size, size_t Nmemb, void* UserData)
+{
+ auto* Data = static_cast<CurlReadCallbackData*>(UserData);
+ size_t MaxRead = Size * Nmemb;
+
+ if (Data->CheckIfAbortFunction && *Data->CheckIfAbortFunction && (*Data->CheckIfAbortFunction)())
+ {
+ return CURL_READFUNC_ABORT;
+ }
+
+ size_t Remaining = Data->DataSize - Data->Offset;
+ size_t ToRead = std::min(MaxRead, Remaining);
+
+ if (ToRead > 0)
+ {
+ memcpy(Buffer, Data->DataPtr + Data->Offset, ToRead);
+ Data->Offset += ToRead;
+ }
+
+ return ToRead;
+}
+
+//////////////////////////////////////////////////////////////////////////
+//
+// URL and header construction
+
+inline void
+AppendUrlEncoded(StringBuilderBase& Out, std::string_view Input)
+{
+ static constexpr char HexDigits[] = "0123456789ABCDEF";
+ static constexpr AsciiSet Unreserved("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~");
+
+ for (char C : Input)
+ {
+ if (Unreserved.Contains(C))
+ {
+ Out.Append(C);
+ }
+ else
+ {
+ uint8_t Byte = static_cast<uint8_t>(C);
+ char Encoded[3] = {'%', HexDigits[Byte >> 4], HexDigits[Byte & 0x0F]};
+ Out.Append(std::string_view(Encoded, 3));
+ }
+ }
+}
+
+inline void
+BuildUrlWithParameters(StringBuilderBase& Url,
+ std::string_view BaseUrl,
+ std::string_view ResourcePath,
+ const HttpClient::KeyValueMap& Parameters)
+{
+ Url.Append(BaseUrl);
+ Url.Append(ResourcePath);
+
+ if (!Parameters->empty())
+ {
+ char Separator = '?';
+ for (const auto& [Key, Value] : *Parameters)
+ {
+ Url.Append(Separator);
+ AppendUrlEncoded(Url, Key);
+ Url.Append('=');
+ AppendUrlEncoded(Url, Value);
+ Separator = '&';
+ }
+ }
+}
+
+inline std::pair<std::string, std::string>
+HeaderContentType(ZenContentType ContentType)
+{
+ return std::make_pair("Content-Type", std::string(MapContentTypeToString(ContentType)));
+}
+
+inline curl_slist*
+BuildHeaderList(const HttpClient::KeyValueMap& AdditionalHeader,
+ std::string_view SessionId,
+ const std::optional<std::string>& AccessToken,
+ const std::vector<std::pair<std::string, std::string>>& ExtraHeaders = {})
+{
+ curl_slist* Headers = nullptr;
+
+ for (const auto& [Key, Value] : *AdditionalHeader)
+ {
+ ExtendableStringBuilder<64> HeaderLine;
+ HeaderLine << Key << ": " << Value;
+ Headers = curl_slist_append(Headers, HeaderLine.c_str());
+ }
+
+ if (!SessionId.empty())
+ {
+ ExtendableStringBuilder<64> SessionHeader;
+ SessionHeader << "UE-Session: " << SessionId;
+ Headers = curl_slist_append(Headers, SessionHeader.c_str());
+ }
+
+ if (AccessToken.has_value())
+ {
+ ExtendableStringBuilder<128> AuthHeader;
+ AuthHeader << "Authorization: " << AccessToken.value();
+ Headers = curl_slist_append(Headers, AuthHeader.c_str());
+ }
+
+ bool HasContentTypeOverride = AdditionalHeader->contains("Content-Type");
+ for (const auto& [Key, Value] : ExtraHeaders)
+ {
+ if (HasContentTypeOverride && Key == "Content-Type")
+ {
+ continue;
+ }
+ ExtendableStringBuilder<128> HeaderLine;
+ HeaderLine << Key << ": " << Value;
+ Headers = curl_slist_append(Headers, HeaderLine.c_str());
+ }
+
+ return Headers;
+}
+
+inline HttpClient::KeyValueMap
+BuildHeaderMap(const std::vector<std::pair<std::string, std::string>>& Headers)
+{
+ HttpClient::KeyValueMap HeaderMap;
+ for (const auto& [Key, Value] : Headers)
+ {
+ HeaderMap->insert_or_assign(Key, Value);
+ }
+ return HeaderMap;
+}
+
+// Scans response headers for Content-Type and applies it to the buffer.
+inline void
+ApplyContentTypeFromHeaders(IoBuffer& Buffer, const std::vector<std::pair<std::string, std::string>>& Headers)
+{
+ for (const auto& [Key, Value] : Headers)
+ {
+ if (StrCaseCompare(Key, "Content-Type") == 0)
+ {
+ Buffer.SetContentType(ParseContentType(Value));
+ break;
+ }
+ }
+}
+
+} // namespace zen