diff options
Diffstat (limited to 'src/zenhttp/clients/asynchttpclient.cpp')
| -rw-r--r-- | src/zenhttp/clients/asynchttpclient.cpp | 1033 |
1 files changed, 1033 insertions, 0 deletions
diff --git a/src/zenhttp/clients/asynchttpclient.cpp b/src/zenhttp/clients/asynchttpclient.cpp new file mode 100644 index 000000000..4b189af36 --- /dev/null +++ b/src/zenhttp/clients/asynchttpclient.cpp @@ -0,0 +1,1033 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenhttp/asynchttpclient.h> + +#include "httpclientcurlhelpers.h" + +#include <zencore/filesystem.h> +#include <zencore/logging.h> +#include <zencore/session.h> +#include <zencore/thread.h> +#include <zencore/trace.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <asio.hpp> +#include <asio/steady_timer.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +#include <thread> +#include <unordered_map> + +namespace zen { + +////////////////////////////////////////////////////////////////////////// +// +// TransferContext: per-transfer state associated with each CURL easy handle + +struct TransferContext +{ + AsyncHttpCallback Callback; + std::string Body; + std::vector<std::pair<std::string, std::string>> ResponseHeaders; + CurlWriteCallbackData WriteData; + CurlHeaderCallbackData HeaderData; + curl_slist* HeaderList = nullptr; + + // For PUT/POST with payload: keep the data alive until transfer completes + IoBuffer PayloadBuffer; + CurlReadCallbackData ReadData; + + TransferContext(AsyncHttpCallback&& InCallback) : Callback(std::move(InCallback)) + { + WriteData.Body = &Body; + HeaderData.Headers = &ResponseHeaders; + } + + ~TransferContext() + { + if (HeaderList) + { + curl_slist_free_all(HeaderList); + } + } + + TransferContext(const TransferContext&) = delete; + TransferContext& operator=(const TransferContext&) = delete; +}; + +////////////////////////////////////////////////////////////////////////// +// +// AsyncHttpClient::Impl + +struct AsyncHttpClient::Impl +{ + Impl(std::string_view BaseUri, const HttpClientSettings& Settings) + : m_BaseUri(BaseUri) + , m_Settings(Settings) + , m_Log(logging::Get(Settings.LogCategory)) + , m_OwnedIoContext(std::make_unique<asio::io_context>()) + , m_IoContext(*m_OwnedIoContext) + , m_Strand(asio::make_strand(m_IoContext)) + , m_Timer(m_Strand) + { + Init(); + m_WorkGuard.emplace(m_IoContext.get_executor()); + m_IoThread = std::thread([this]() { + SetCurrentThreadName("async_http"); + try + { + m_IoContext.run(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("AsyncHttpClient: unhandled exception in io thread: {}", Ex.what()); + } + }); + } + + Impl(std::string_view BaseUri, asio::io_context& IoContext, const HttpClientSettings& Settings) + : m_BaseUri(BaseUri) + , m_Settings(Settings) + , m_Log(logging::Get(Settings.LogCategory)) + , m_IoContext(IoContext) + , m_Strand(asio::make_strand(m_IoContext)) + , m_Timer(m_Strand) + { + Init(); + } + + ~Impl() + { + // Clean up curl state on the strand where all curl_multi operations + // are serialized. Use a promise to block until the cleanup handler + // has actually executed - essential for the external io_context case + // where we don't own the run loop. + std::promise<void> Done; + std::future<void> DoneFuture = Done.get_future(); + + asio::post(m_Strand, [this, &Done]() { + m_ShuttingDown = true; + m_Timer.cancel(); + + // Release all tracked sockets (don't close - curl owns the fds). + for (auto& [Fd, Info] : m_Sockets) + { + if (Info->Socket.is_open()) + { + Info->Socket.cancel(); + Info->Socket.release(); + } + } + m_Sockets.clear(); + + for (auto& [Handle, Ctx] : m_Transfers) + { + curl_multi_remove_handle(m_Multi, Handle); + curl_easy_cleanup(Handle); + } + m_Transfers.clear(); + + for (CURL* Handle : m_HandlePool) + { + curl_easy_cleanup(Handle); + } + m_HandlePool.clear(); + + Done.set_value(); + }); + + // For owned io_context: release work guard so run() can return after + // processing the cleanup handler above. + m_WorkGuard.reset(); + + if (m_IoThread.joinable()) + { + m_IoThread.join(); + } + else + { + // External io_context: wait for the cleanup handler to complete. + DoneFuture.wait(); + } + + if (m_Multi) + { + curl_multi_cleanup(m_Multi); + } + } + + LoggerRef Log() { return m_Log; } + + void Init() + { + m_Multi = curl_multi_init(); + if (!m_Multi) + { + throw std::runtime_error("curl_multi_init failed"); + } + + SetupMultiCallbacks(); + + if (m_Settings.SessionId == Oid::Zero) + { + m_SessionId = std::string(GetSessionIdString()); + } + else + { + m_SessionId = m_Settings.SessionId.ToString(); + } + } + + // -- Handle pool ----------------------------------------------------- + + CURL* AllocHandle() + { + if (!m_HandlePool.empty()) + { + CURL* Handle = m_HandlePool.back(); + m_HandlePool.pop_back(); + curl_easy_reset(Handle); + return Handle; + } + CURL* Handle = curl_easy_init(); + if (!Handle) + { + throw std::runtime_error("curl_easy_init failed"); + } + return Handle; + } + + void ReleaseHandle(CURL* Handle) { m_HandlePool.push_back(Handle); } + + // -- Configure a handle with common settings ------------------------- + // Called only from DoAsync* lambdas running on the strand. + + void ConfigureHandle(CURL* Handle, std::string_view ResourcePath, const HttpClient::KeyValueMap& Parameters) + { + // Build URL + ExtendableStringBuilder<256> Url; + BuildUrlWithParameters(Url, m_BaseUri, ResourcePath, Parameters); + curl_easy_setopt(Handle, CURLOPT_URL, Url.c_str()); + + // Unix domain socket + if (!m_Settings.UnixSocketPath.empty()) + { + m_UnixSocketPathUtf8 = PathToUtf8(m_Settings.UnixSocketPath); + curl_easy_setopt(Handle, CURLOPT_UNIX_SOCKET_PATH, m_UnixSocketPathUtf8.c_str()); + } + + // Timeouts + if (m_Settings.ConnectTimeout.count() > 0) + { + curl_easy_setopt(Handle, CURLOPT_CONNECTTIMEOUT_MS, static_cast<long>(m_Settings.ConnectTimeout.count())); + } + if (m_Settings.Timeout.count() > 0) + { + curl_easy_setopt(Handle, CURLOPT_TIMEOUT_MS, static_cast<long>(m_Settings.Timeout.count())); + } + + // HTTP/2 + if (m_Settings.AssumeHttp2) + { + curl_easy_setopt(Handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2_PRIOR_KNOWLEDGE); + } + + // SSL + if (m_Settings.InsecureSsl) + { + curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYPEER, 0L); + curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYHOST, 0L); + } + if (!m_Settings.CaBundlePath.empty()) + { + curl_easy_setopt(Handle, CURLOPT_CAINFO, m_Settings.CaBundlePath.c_str()); + } + + // Verbose/debug + if (m_Settings.Verbose) + { + curl_easy_setopt(Handle, CURLOPT_VERBOSE, 1L); + } + + // Thread safety + curl_easy_setopt(Handle, CURLOPT_NOSIGNAL, 1L); + + if (m_Settings.ForbidReuseConnection) + { + curl_easy_setopt(Handle, CURLOPT_FORBID_REUSE, 1L); + } + } + + // -- Access token ---------------------------------------------------- + + std::optional<std::string> GetAccessToken() + { + if (!m_Settings.AccessTokenProvider.has_value()) + { + return {}; + } + { + RwLock::SharedLockScope _(m_AccessTokenLock); + if (!m_CachedAccessToken.NeedsRefresh()) + { + return m_CachedAccessToken.GetValue(); + } + } + RwLock::ExclusiveLockScope _(m_AccessTokenLock); + if (!m_CachedAccessToken.NeedsRefresh()) + { + return m_CachedAccessToken.GetValue(); + } + HttpClientAccessToken NewToken = m_Settings.AccessTokenProvider.value()(); + if (!NewToken.IsValid()) + { + ZEN_WARN("AsyncHttpClient: failed to refresh access token, retrying once"); + NewToken = m_Settings.AccessTokenProvider.value()(); + } + if (NewToken.IsValid()) + { + m_CachedAccessToken = NewToken; + return m_CachedAccessToken.GetValue(); + } + ZEN_WARN("AsyncHttpClient: access token provider returned invalid token"); + return {}; + } + + // -- Submit a transfer ----------------------------------------------- + + void SubmitTransfer(CURL* Handle, std::unique_ptr<TransferContext> Ctx) + { + ZEN_TRACE_CPU("AsyncHttpClient::SubmitTransfer"); + // Setup write/header callbacks + curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, CurlWriteCallback); + curl_easy_setopt(Handle, CURLOPT_WRITEDATA, &Ctx->WriteData); + curl_easy_setopt(Handle, CURLOPT_HEADERFUNCTION, CurlHeaderCallback); + curl_easy_setopt(Handle, CURLOPT_HEADERDATA, &Ctx->HeaderData); + + m_Transfers[Handle] = std::move(Ctx); + + CURLMcode Mc = curl_multi_add_handle(m_Multi, Handle); + if (Mc != CURLM_OK) + { + auto Stolen = std::move(m_Transfers[Handle]); + m_Transfers.erase(Handle); + ReleaseHandle(Handle); + + HttpClient::Response ErrorResponse; + ErrorResponse.Error = + HttpClient::ErrorContext{.ErrorCode = HttpClientErrorCode::kInternalError, + .ErrorMessage = fmt::format("curl_multi_add_handle failed: {}", curl_multi_strerror(Mc))}; + asio::post(m_IoContext, + [Cb = std::move(Stolen->Callback), Response = std::move(ErrorResponse)]() mutable { Cb(std::move(Response)); }); + return; + } + } + + // -- Socket-action integration --------------------------------------- + // + // curl_multi drives I/O via two callbacks: + // - SocketCallback: curl tells us which sockets to watch for read/write + // - TimerCallback: curl tells us when to fire a timeout + // + // On each socket event or timeout we call curl_multi_socket_action(), + // then drain completed transfers via curl_multi_info_read(). + + // Per-socket state: wraps the native fd in an ASIO socket for async_wait. + struct SocketInfo + { + asio::ip::tcp::socket Socket; + int WatchFlags = 0; // CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT + + explicit SocketInfo(asio::io_context& IoContext) : Socket(IoContext) {} + }; + + // Static thunks registered with curl_multi ---------------------------- + + static int CurlSocketCallback(CURL* Easy, curl_socket_t Fd, int Action, void* UserPtr, void* SocketPtr) + { + ZEN_UNUSED(Easy); + auto* Self = static_cast<Impl*>(UserPtr); + Self->OnCurlSocket(Fd, Action, static_cast<SocketInfo*>(SocketPtr)); + return 0; + } + + static int CurlTimerCallback(CURLM* Multi, long TimeoutMs, void* UserPtr) + { + ZEN_UNUSED(Multi); + auto* Self = static_cast<Impl*>(UserPtr); + Self->OnCurlTimer(TimeoutMs); + return 0; + } + + void SetupMultiCallbacks() + { + curl_multi_setopt(m_Multi, CURLMOPT_SOCKETFUNCTION, CurlSocketCallback); + curl_multi_setopt(m_Multi, CURLMOPT_SOCKETDATA, this); + curl_multi_setopt(m_Multi, CURLMOPT_TIMERFUNCTION, CurlTimerCallback); + curl_multi_setopt(m_Multi, CURLMOPT_TIMERDATA, this); + } + + // Called by curl when socket watch state changes --------------------- + + void OnCurlSocket(curl_socket_t Fd, int Action, SocketInfo* Info) + { + if (Action == CURL_POLL_REMOVE) + { + if (Info) + { + // Cancel pending async_wait ops before releasing the fd. + // curl owns the fd, so we must release() rather than close(). + Info->Socket.cancel(); + if (Info->Socket.is_open()) + { + Info->Socket.release(); + } + m_Sockets.erase(Fd); + } + return; + } + + if (!Info) + { + // New socket - wrap the native fd in an ASIO socket. + auto [It, Inserted] = m_Sockets.emplace(Fd, std::make_unique<SocketInfo>(m_IoContext)); + Info = It->second.get(); + + asio::error_code Ec; + // Determine protocol from the fd (v4 vs v6). Default to v4. + Info->Socket.assign(asio::ip::tcp::v4(), Fd, Ec); + if (Ec) + { + // Try v6 as fallback + Info->Socket.assign(asio::ip::tcp::v6(), Fd, Ec); + } + if (Ec) + { + ZEN_WARN("AsyncHttpClient: failed to assign socket fd {}: {}", static_cast<int>(Fd), Ec.message()); + m_Sockets.erase(Fd); + return; + } + + curl_multi_assign(m_Multi, Fd, Info); + } + + Info->WatchFlags = Action; + SetSocketWatch(Fd, Info); + } + + void SetSocketWatch(curl_socket_t Fd, SocketInfo* Info) + { + // Cancel any pending wait before issuing a new one. + Info->Socket.cancel(); + + if (Info->WatchFlags & CURL_POLL_IN) + { + Info->Socket.async_wait(asio::socket_base::wait_read, asio::bind_executor(m_Strand, [this, Fd](const asio::error_code& Ec) { + if (Ec || m_ShuttingDown) + { + return; + } + OnSocketReady(Fd, CURL_CSELECT_IN); + })); + } + + if (Info->WatchFlags & CURL_POLL_OUT) + { + Info->Socket.async_wait(asio::socket_base::wait_write, asio::bind_executor(m_Strand, [this, Fd](const asio::error_code& Ec) { + if (Ec || m_ShuttingDown) + { + return; + } + OnSocketReady(Fd, CURL_CSELECT_OUT); + })); + } + } + + void OnSocketReady(curl_socket_t Fd, int CurlAction) + { + ZEN_TRACE_CPU("AsyncHttpClient::OnSocketReady"); + int StillRunning = 0; + curl_multi_socket_action(m_Multi, Fd, CurlAction, &StillRunning); + CheckCompleted(); + + // Re-arm the watch if the socket is still tracked. + auto It = m_Sockets.find(Fd); + if (It != m_Sockets.end()) + { + SetSocketWatch(Fd, It->second.get()); + } + } + + // Called by curl when it wants a timeout ------------------------------ + + void OnCurlTimer(long TimeoutMs) + { + m_Timer.cancel(); + + if (TimeoutMs < 0) + { + // curl says "no timeout needed" + return; + } + + if (TimeoutMs == 0) + { + // curl wants immediate action - run it directly on the strand. + asio::post(m_Strand, [this]() { + if (m_ShuttingDown) + { + return; + } + int StillRunning = 0; + curl_multi_socket_action(m_Multi, CURL_SOCKET_TIMEOUT, 0, &StillRunning); + CheckCompleted(); + }); + return; + } + + m_Timer.expires_after(std::chrono::milliseconds(TimeoutMs)); + m_Timer.async_wait(asio::bind_executor(m_Strand, [this](const asio::error_code& Ec) { + if (Ec || m_ShuttingDown) + { + return; + } + ZEN_TRACE_CPU("AsyncHttpClient::OnTimeout"); + int StillRunning = 0; + curl_multi_socket_action(m_Multi, CURL_SOCKET_TIMEOUT, 0, &StillRunning); + CheckCompleted(); + })); + } + + // Drain completed transfers from curl_multi -------------------------- + + void CheckCompleted() + { + int MsgsLeft = 0; + CURLMsg* Msg = nullptr; + while ((Msg = curl_multi_info_read(m_Multi, &MsgsLeft)) != nullptr) + { + if (Msg->msg != CURLMSG_DONE) + { + continue; + } + + CURL* Handle = Msg->easy_handle; + CURLcode Result = Msg->data.result; + + curl_multi_remove_handle(m_Multi, Handle); + + auto It = m_Transfers.find(Handle); + if (It == m_Transfers.end()) + { + ReleaseHandle(Handle); + continue; + } + + std::unique_ptr<TransferContext> Ctx = std::move(It->second); + m_Transfers.erase(It); + + CompleteTransfer(Handle, Result, std::move(Ctx)); + } + } + + void CompleteTransfer(CURL* Handle, CURLcode CurlResult, std::unique_ptr<TransferContext> Ctx) + { + ZEN_TRACE_CPU("AsyncHttpClient::CompleteTransfer"); + // Extract result info + long StatusCode = 0; + curl_easy_getinfo(Handle, CURLINFO_RESPONSE_CODE, &StatusCode); + + double Elapsed = 0; + curl_easy_getinfo(Handle, CURLINFO_TOTAL_TIME, &Elapsed); + + curl_off_t UpBytes = 0; + curl_easy_getinfo(Handle, CURLINFO_SIZE_UPLOAD_T, &UpBytes); + + curl_off_t DownBytes = 0; + curl_easy_getinfo(Handle, CURLINFO_SIZE_DOWNLOAD_T, &DownBytes); + + ReleaseHandle(Handle); + + // Build response + HttpClient::Response Response; + Response.StatusCode = HttpResponseCode(StatusCode); + Response.UploadedBytes = static_cast<int64_t>(UpBytes); + Response.DownloadedBytes = static_cast<int64_t>(DownBytes); + Response.ElapsedSeconds = Elapsed; + Response.Header = BuildHeaderMap(Ctx->ResponseHeaders); + + if (CurlResult != CURLE_OK) + { + const char* ErrorMsg = curl_easy_strerror(CurlResult); + + if (CurlResult != CURLE_OPERATION_TIMEDOUT && CurlResult != CURLE_COULDNT_CONNECT && CurlResult != CURLE_ABORTED_BY_CALLBACK) + { + ZEN_WARN("AsyncHttpClient failure: ({}) '{}'", static_cast<int>(CurlResult), ErrorMsg); + } + + if (!Ctx->Body.empty()) + { + Response.ResponsePayload = IoBufferBuilder::MakeCloneFromMemory(Ctx->Body.data(), Ctx->Body.size()); + } + + Response.Error = HttpClient::ErrorContext{.ErrorCode = MapCurlError(CurlResult), .ErrorMessage = std::string(ErrorMsg)}; + } + else if (StatusCode == static_cast<long>(HttpResponseCode::NoContent) || Ctx->Body.empty()) + { + // No payload + } + else + { + IoBuffer PayloadBuffer = IoBufferBuilder::MakeCloneFromMemory(Ctx->Body.data(), Ctx->Body.size()); + ApplyContentTypeFromHeaders(PayloadBuffer, Ctx->ResponseHeaders); + + const HttpResponseCode Code = HttpResponseCode(StatusCode); + if (!IsHttpSuccessCode(Code) && Code != HttpResponseCode::NotFound) + { + ZEN_WARN("AsyncHttpClient request failed: status={}, base={}", static_cast<int>(Code), m_BaseUri); + } + + Response.ResponsePayload = std::move(PayloadBuffer); + } + + // Dispatch the user callback off the strand so a slow callback + // cannot starve the curl_multi poll loop. + asio::post(m_IoContext, [LogRef = m_Log, Cb = std::move(Ctx->Callback), Response = std::move(Response)]() mutable { + try + { + Cb(std::move(Response)); + } + catch (const std::exception& Ex) + { + auto Log = [&]() -> LoggerRef { return LogRef; }; + ZEN_ERROR("AsyncHttpClient: unhandled exception in completion callback: {}", Ex.what()); + } + }); + } + + // -- Async verb implementations -------------------------------------- + + void DoAsyncGet(std::string Url, + AsyncHttpCallback Callback, + HttpClient::KeyValueMap AdditionalHeader, + HttpClient::KeyValueMap Parameters) + { + asio::post(m_Strand, + [this, + Url = std::move(Url), + Callback = std::move(Callback), + AdditionalHeader = std::move(AdditionalHeader), + Parameters = std::move(Parameters)]() mutable { + ZEN_TRACE_CPU("AsyncHttpClient::Get"); + if (m_ShuttingDown) + { + return; + } + CURL* Handle = AllocHandle(); + ConfigureHandle(Handle, Url, Parameters); + curl_easy_setopt(Handle, CURLOPT_HTTPGET, 1L); + + auto Ctx = std::make_unique<TransferContext>(std::move(Callback)); + Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken()); + curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList); + + SubmitTransfer(Handle, std::move(Ctx)); + }); + } + + void DoAsyncHead(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader) + { + asio::post(m_Strand, + [this, Url = std::move(Url), Callback = std::move(Callback), AdditionalHeader = std::move(AdditionalHeader)]() mutable { + ZEN_TRACE_CPU("AsyncHttpClient::Head"); + if (m_ShuttingDown) + { + return; + } + CURL* Handle = AllocHandle(); + ConfigureHandle(Handle, Url, {}); + curl_easy_setopt(Handle, CURLOPT_NOBODY, 1L); + + auto Ctx = std::make_unique<TransferContext>(std::move(Callback)); + Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken()); + curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList); + + SubmitTransfer(Handle, std::move(Ctx)); + }); + } + + void DoAsyncDelete(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader) + { + asio::post(m_Strand, + [this, Url = std::move(Url), Callback = std::move(Callback), AdditionalHeader = std::move(AdditionalHeader)]() mutable { + ZEN_TRACE_CPU("AsyncHttpClient::Delete"); + if (m_ShuttingDown) + { + return; + } + CURL* Handle = AllocHandle(); + ConfigureHandle(Handle, Url, {}); + curl_easy_setopt(Handle, CURLOPT_CUSTOMREQUEST, "DELETE"); + + auto Ctx = std::make_unique<TransferContext>(std::move(Callback)); + Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken()); + curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList); + + SubmitTransfer(Handle, std::move(Ctx)); + }); + } + + void DoAsyncPost(std::string Url, + AsyncHttpCallback Callback, + HttpClient::KeyValueMap AdditionalHeader, + HttpClient::KeyValueMap Parameters) + { + asio::post(m_Strand, + [this, + Url = std::move(Url), + Callback = std::move(Callback), + AdditionalHeader = std::move(AdditionalHeader), + Parameters = std::move(Parameters)]() mutable { + ZEN_TRACE_CPU("AsyncHttpClient::Post"); + if (m_ShuttingDown) + { + return; + } + CURL* Handle = AllocHandle(); + ConfigureHandle(Handle, Url, Parameters); + curl_easy_setopt(Handle, CURLOPT_POST, 1L); + curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE, 0L); + + auto Ctx = std::make_unique<TransferContext>(std::move(Callback)); + Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken()); + curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList); + + SubmitTransfer(Handle, std::move(Ctx)); + }); + } + + void DoAsyncPostWithPayload(std::string Url, + IoBuffer Payload, + ZenContentType ContentType, + AsyncHttpCallback Callback, + HttpClient::KeyValueMap AdditionalHeader) + { + asio::post(m_Strand, + [this, + Url = std::move(Url), + Payload = std::move(Payload), + ContentType, + Callback = std::move(Callback), + AdditionalHeader = std::move(AdditionalHeader)]() mutable { + ZEN_TRACE_CPU("AsyncHttpClient::PostWithPayload"); + if (m_ShuttingDown) + { + return; + } + CURL* Handle = AllocHandle(); + ConfigureHandle(Handle, Url, {}); + curl_easy_setopt(Handle, CURLOPT_POST, 1L); + + auto Ctx = std::make_unique<TransferContext>(std::move(Callback)); + Ctx->PayloadBuffer = std::move(Payload); + Ctx->HeaderList = + BuildHeaderList(AdditionalHeader, + m_SessionId, + GetAccessToken(), + {std::make_pair("Content-Type", std::string(MapContentTypeToString(ContentType)))}); + curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList); + + // Set up read callback for payload data + Ctx->ReadData.DataPtr = static_cast<const uint8_t*>(Ctx->PayloadBuffer.GetData()); + Ctx->ReadData.DataSize = Ctx->PayloadBuffer.GetSize(); + Ctx->ReadData.Offset = 0; + + curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE_LARGE, static_cast<curl_off_t>(Ctx->PayloadBuffer.GetSize())); + curl_easy_setopt(Handle, CURLOPT_READFUNCTION, CurlReadCallback); + curl_easy_setopt(Handle, CURLOPT_READDATA, &Ctx->ReadData); + + SubmitTransfer(Handle, std::move(Ctx)); + }); + } + + void DoAsyncPutWithPayload(std::string Url, + IoBuffer Payload, + AsyncHttpCallback Callback, + HttpClient::KeyValueMap AdditionalHeader, + HttpClient::KeyValueMap Parameters) + { + asio::post(m_Strand, + [this, + Url = std::move(Url), + Payload = std::move(Payload), + Callback = std::move(Callback), + AdditionalHeader = std::move(AdditionalHeader), + Parameters = std::move(Parameters)]() mutable { + ZEN_TRACE_CPU("AsyncHttpClient::Put"); + if (m_ShuttingDown) + { + return; + } + CURL* Handle = AllocHandle(); + ConfigureHandle(Handle, Url, Parameters); + curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L); + + auto Ctx = std::make_unique<TransferContext>(std::move(Callback)); + Ctx->PayloadBuffer = std::move(Payload); + Ctx->HeaderList = BuildHeaderList( + AdditionalHeader, + m_SessionId, + GetAccessToken(), + {std::make_pair("Content-Type", std::string(MapContentTypeToString(Ctx->PayloadBuffer.GetContentType())))}); + curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList); + + Ctx->ReadData.DataPtr = static_cast<const uint8_t*>(Ctx->PayloadBuffer.GetData()); + Ctx->ReadData.DataSize = Ctx->PayloadBuffer.GetSize(); + Ctx->ReadData.Offset = 0; + + curl_easy_setopt(Handle, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(Ctx->PayloadBuffer.GetSize())); + curl_easy_setopt(Handle, CURLOPT_READFUNCTION, CurlReadCallback); + curl_easy_setopt(Handle, CURLOPT_READDATA, &Ctx->ReadData); + + SubmitTransfer(Handle, std::move(Ctx)); + }); + } + + void DoAsyncPutNoPayload(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap Parameters) + { + asio::post(m_Strand, [this, Url = std::move(Url), Callback = std::move(Callback), Parameters = std::move(Parameters)]() mutable { + ZEN_TRACE_CPU("AsyncHttpClient::Put"); + if (m_ShuttingDown) + { + return; + } + CURL* Handle = AllocHandle(); + ConfigureHandle(Handle, Url, Parameters); + curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L); + curl_easy_setopt(Handle, CURLOPT_INFILESIZE_LARGE, 0LL); + + auto Ctx = std::make_unique<TransferContext>(std::move(Callback)); + + HttpClient::KeyValueMap ContentLengthHeader{std::pair<std::string_view, std::string_view>{"Content-Length", "0"}}; + Ctx->HeaderList = BuildHeaderList(ContentLengthHeader, m_SessionId, GetAccessToken()); + curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList); + + SubmitTransfer(Handle, std::move(Ctx)); + }); + } + + // -- Members --------------------------------------------------------- + + std::string m_BaseUri; + HttpClientSettings m_Settings; + LoggerRef m_Log; + std::string m_SessionId; + std::string m_UnixSocketPathUtf8; + + // io_context and strand - all curl_multi operations are serialized on the + // strand, making this safe even when the io_context has multiple threads. + std::unique_ptr<asio::io_context> m_OwnedIoContext; + asio::io_context& m_IoContext; + asio::strand<asio::io_context::executor_type> m_Strand; + std::optional<asio::executor_work_guard<asio::io_context::executor_type>> m_WorkGuard; + std::thread m_IoThread; + + // curl_multi and socket-action state + CURLM* m_Multi = nullptr; + std::unordered_map<CURL*, std::unique_ptr<TransferContext>> m_Transfers; + std::vector<CURL*> m_HandlePool; + std::unordered_map<curl_socket_t, std::unique_ptr<SocketInfo>> m_Sockets; + asio::steady_timer m_Timer; + bool m_ShuttingDown = false; + + // Access token cache + RwLock m_AccessTokenLock; + HttpClientAccessToken m_CachedAccessToken; +}; + +////////////////////////////////////////////////////////////////////////// +// +// AsyncHttpClient public API + +AsyncHttpClient::AsyncHttpClient(std::string_view BaseUri, const HttpClientSettings& Settings) +: m_Impl(std::make_unique<Impl>(BaseUri, Settings)) +{ +} + +AsyncHttpClient::AsyncHttpClient(std::string_view BaseUri, asio::io_context& IoContext, const HttpClientSettings& Settings) +: m_Impl(std::make_unique<Impl>(BaseUri, IoContext, Settings)) +{ +} + +AsyncHttpClient::~AsyncHttpClient() = default; + +// -- Callback-based API -------------------------------------------------- + +void +AsyncHttpClient::AsyncGet(std::string_view Url, + AsyncHttpCallback Callback, + const KeyValueMap& AdditionalHeader, + const KeyValueMap& Parameters) +{ + m_Impl->DoAsyncGet(std::string(Url), std::move(Callback), AdditionalHeader, Parameters); +} + +void +AsyncHttpClient::AsyncHead(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader) +{ + m_Impl->DoAsyncHead(std::string(Url), std::move(Callback), AdditionalHeader); +} + +void +AsyncHttpClient::AsyncDelete(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader) +{ + m_Impl->DoAsyncDelete(std::string(Url), std::move(Callback), AdditionalHeader); +} + +void +AsyncHttpClient::AsyncPost(std::string_view Url, + AsyncHttpCallback Callback, + const KeyValueMap& AdditionalHeader, + const KeyValueMap& Parameters) +{ + m_Impl->DoAsyncPost(std::string(Url), std::move(Callback), AdditionalHeader, Parameters); +} + +void +AsyncHttpClient::AsyncPost(std::string_view Url, const IoBuffer& Payload, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader) +{ + m_Impl->DoAsyncPostWithPayload(std::string(Url), Payload, Payload.GetContentType(), std::move(Callback), AdditionalHeader); +} + +void +AsyncHttpClient::AsyncPost(std::string_view Url, + const IoBuffer& Payload, + ZenContentType ContentType, + AsyncHttpCallback Callback, + const KeyValueMap& AdditionalHeader) +{ + m_Impl->DoAsyncPostWithPayload(std::string(Url), Payload, ContentType, std::move(Callback), AdditionalHeader); +} + +void +AsyncHttpClient::AsyncPut(std::string_view Url, + const IoBuffer& Payload, + AsyncHttpCallback Callback, + const KeyValueMap& AdditionalHeader, + const KeyValueMap& Parameters) +{ + m_Impl->DoAsyncPutWithPayload(std::string(Url), Payload, std::move(Callback), AdditionalHeader, Parameters); +} + +void +AsyncHttpClient::AsyncPut(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& Parameters) +{ + m_Impl->DoAsyncPutNoPayload(std::string(Url), std::move(Callback), Parameters); +} + +// -- Future-based API ---------------------------------------------------- + +std::future<HttpClient::Response> +AsyncHttpClient::Get(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) +{ + auto Promise = std::make_shared<std::promise<Response>>(); + auto Future = Promise->get_future(); + AsyncGet( + Url, + [Promise](Response R) { Promise->set_value(std::move(R)); }, + AdditionalHeader, + Parameters); + return Future; +} + +std::future<HttpClient::Response> +AsyncHttpClient::Head(std::string_view Url, const KeyValueMap& AdditionalHeader) +{ + auto Promise = std::make_shared<std::promise<Response>>(); + auto Future = Promise->get_future(); + AsyncHead( + Url, + [Promise](Response R) { Promise->set_value(std::move(R)); }, + AdditionalHeader); + return Future; +} + +std::future<HttpClient::Response> +AsyncHttpClient::Delete(std::string_view Url, const KeyValueMap& AdditionalHeader) +{ + auto Promise = std::make_shared<std::promise<Response>>(); + auto Future = Promise->get_future(); + AsyncDelete( + Url, + [Promise](Response R) { Promise->set_value(std::move(R)); }, + AdditionalHeader); + return Future; +} + +std::future<HttpClient::Response> +AsyncHttpClient::Post(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) +{ + auto Promise = std::make_shared<std::promise<Response>>(); + auto Future = Promise->get_future(); + AsyncPost( + Url, + [Promise](Response R) { Promise->set_value(std::move(R)); }, + AdditionalHeader, + Parameters); + return Future; +} + +std::future<HttpClient::Response> +AsyncHttpClient::Post(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader) +{ + auto Promise = std::make_shared<std::promise<Response>>(); + auto Future = Promise->get_future(); + AsyncPost( + Url, + Payload, + [Promise](Response R) { Promise->set_value(std::move(R)); }, + AdditionalHeader); + return Future; +} + +std::future<HttpClient::Response> +AsyncHttpClient::Post(std::string_view Url, const IoBuffer& Payload, ZenContentType ContentType, const KeyValueMap& AdditionalHeader) +{ + auto Promise = std::make_shared<std::promise<Response>>(); + auto Future = Promise->get_future(); + AsyncPost( + Url, + Payload, + ContentType, + [Promise](Response R) { Promise->set_value(std::move(R)); }, + AdditionalHeader); + return Future; +} + +std::future<HttpClient::Response> +AsyncHttpClient::Put(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) +{ + auto Promise = std::make_shared<std::promise<Response>>(); + auto Future = Promise->get_future(); + AsyncPut( + Url, + Payload, + [Promise](Response R) { Promise->set_value(std::move(R)); }, + AdditionalHeader, + Parameters); + return Future; +} + +std::future<HttpClient::Response> +AsyncHttpClient::Put(std::string_view Url, const KeyValueMap& Parameters) +{ + auto Promise = std::make_shared<std::promise<Response>>(); + auto Future = Promise->get_future(); + AsyncPut( + Url, + [Promise](Response R) { Promise->set_value(std::move(R)); }, + Parameters); + return Future; +} + +} // namespace zen |