// Copyright Epic Games, Inc. All Rights Reserved. #include #include "httpclientcurlhelpers.h" #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include #include ZEN_THIRD_PARTY_INCLUDES_END #include #include namespace zen { ////////////////////////////////////////////////////////////////////////// // // TransferContext: per-transfer state associated with each CURL easy handle struct TransferContext { AsyncHttpCallback Callback; std::string Body; std::vector> ResponseHeaders; CurlWriteCallbackData WriteData; CurlHeaderCallbackData HeaderData; curl_slist* HeaderList = nullptr; // For PUT/POST with payload: keep the data alive until transfer completes IoBuffer PayloadBuffer; CurlReadCallbackData ReadData; TransferContext(AsyncHttpCallback&& InCallback) : Callback(std::move(InCallback)) { WriteData.Body = &Body; HeaderData.Headers = &ResponseHeaders; } ~TransferContext() { if (HeaderList) { curl_slist_free_all(HeaderList); } } TransferContext(const TransferContext&) = delete; TransferContext& operator=(const TransferContext&) = delete; }; ////////////////////////////////////////////////////////////////////////// // // AsyncHttpClient::Impl struct AsyncHttpClient::Impl { Impl(std::string_view BaseUri, const HttpClientSettings& Settings) : m_BaseUri(BaseUri) , m_Settings(Settings) , m_Log(logging::Get(Settings.LogCategory)) , m_OwnedIoContext(std::make_unique()) , m_IoContext(*m_OwnedIoContext) , m_Strand(asio::make_strand(m_IoContext)) , m_Timer(m_Strand) { Init(); m_WorkGuard.emplace(m_IoContext.get_executor()); m_IoThread = std::thread([this]() { SetCurrentThreadName("async_http"); try { m_IoContext.run(); } catch (const std::exception& Ex) { ZEN_ERROR("AsyncHttpClient: unhandled exception in io thread: {}", Ex.what()); } }); } Impl(std::string_view BaseUri, asio::io_context& IoContext, const HttpClientSettings& Settings) : m_BaseUri(BaseUri) , m_Settings(Settings) , m_Log(logging::Get(Settings.LogCategory)) , m_IoContext(IoContext) , m_Strand(asio::make_strand(m_IoContext)) , m_Timer(m_Strand) { Init(); } ~Impl() { // Clean up curl state on the strand where all curl_multi operations // are serialized. Use a promise to block until the cleanup handler // has actually executed — essential for the external io_context case // where we don't own the run loop. std::promise Done; std::future DoneFuture = Done.get_future(); asio::post(m_Strand, [this, &Done]() { m_ShuttingDown = true; m_Timer.cancel(); // Release all tracked sockets (don't close — curl owns the fds). for (auto& [Fd, Info] : m_Sockets) { if (Info->Socket.is_open()) { Info->Socket.cancel(); Info->Socket.release(); } } m_Sockets.clear(); for (auto& [Handle, Ctx] : m_Transfers) { curl_multi_remove_handle(m_Multi, Handle); curl_easy_cleanup(Handle); } m_Transfers.clear(); for (CURL* Handle : m_HandlePool) { curl_easy_cleanup(Handle); } m_HandlePool.clear(); Done.set_value(); }); // For owned io_context: release work guard so run() can return after // processing the cleanup handler above. m_WorkGuard.reset(); if (m_IoThread.joinable()) { m_IoThread.join(); } else { // External io_context: wait for the cleanup handler to complete. DoneFuture.wait(); } if (m_Multi) { curl_multi_cleanup(m_Multi); } } LoggerRef Log() { return m_Log; } void Init() { m_Multi = curl_multi_init(); if (!m_Multi) { throw std::runtime_error("curl_multi_init failed"); } SetupMultiCallbacks(); if (m_Settings.SessionId == Oid::Zero) { m_SessionId = std::string(GetSessionIdString()); } else { m_SessionId = m_Settings.SessionId.ToString(); } } // ── Handle pool ───────────────────────────────────────────────────── CURL* AllocHandle() { if (!m_HandlePool.empty()) { CURL* Handle = m_HandlePool.back(); m_HandlePool.pop_back(); curl_easy_reset(Handle); return Handle; } CURL* Handle = curl_easy_init(); if (!Handle) { throw std::runtime_error("curl_easy_init failed"); } return Handle; } void ReleaseHandle(CURL* Handle) { m_HandlePool.push_back(Handle); } // ── Configure a handle with common settings ───────────────────────── // Called only from DoAsync* lambdas running on the strand. void ConfigureHandle(CURL* Handle, std::string_view ResourcePath, const HttpClient::KeyValueMap& Parameters) { // Build URL ExtendableStringBuilder<256> Url; BuildUrlWithParameters(Url, m_BaseUri, ResourcePath, Parameters); curl_easy_setopt(Handle, CURLOPT_URL, Url.c_str()); // Unix domain socket if (!m_Settings.UnixSocketPath.empty()) { m_UnixSocketPathUtf8 = PathToUtf8(m_Settings.UnixSocketPath); curl_easy_setopt(Handle, CURLOPT_UNIX_SOCKET_PATH, m_UnixSocketPathUtf8.c_str()); } // Timeouts if (m_Settings.ConnectTimeout.count() > 0) { curl_easy_setopt(Handle, CURLOPT_CONNECTTIMEOUT_MS, static_cast(m_Settings.ConnectTimeout.count())); } if (m_Settings.Timeout.count() > 0) { curl_easy_setopt(Handle, CURLOPT_TIMEOUT_MS, static_cast(m_Settings.Timeout.count())); } // HTTP/2 if (m_Settings.AssumeHttp2) { curl_easy_setopt(Handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2_PRIOR_KNOWLEDGE); } // SSL if (m_Settings.InsecureSsl) { curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYPEER, 0L); curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYHOST, 0L); } if (!m_Settings.CaBundlePath.empty()) { curl_easy_setopt(Handle, CURLOPT_CAINFO, m_Settings.CaBundlePath.c_str()); } // Verbose/debug if (m_Settings.Verbose) { curl_easy_setopt(Handle, CURLOPT_VERBOSE, 1L); } // Thread safety curl_easy_setopt(Handle, CURLOPT_NOSIGNAL, 1L); if (m_Settings.ForbidReuseConnection) { curl_easy_setopt(Handle, CURLOPT_FORBID_REUSE, 1L); } } // ── Access token ──────────────────────────────────────────────────── std::optional GetAccessToken() { if (!m_Settings.AccessTokenProvider.has_value()) { return {}; } { RwLock::SharedLockScope _(m_AccessTokenLock); if (!m_CachedAccessToken.NeedsRefresh()) { return m_CachedAccessToken.GetValue(); } } RwLock::ExclusiveLockScope _(m_AccessTokenLock); if (!m_CachedAccessToken.NeedsRefresh()) { return m_CachedAccessToken.GetValue(); } HttpClientAccessToken NewToken = m_Settings.AccessTokenProvider.value()(); if (!NewToken.IsValid()) { ZEN_WARN("AsyncHttpClient: failed to refresh access token, retrying once"); NewToken = m_Settings.AccessTokenProvider.value()(); } if (NewToken.IsValid()) { m_CachedAccessToken = NewToken; return m_CachedAccessToken.GetValue(); } ZEN_WARN("AsyncHttpClient: access token provider returned invalid token"); return {}; } // ── Submit a transfer ─────────────────────────────────────────────── void SubmitTransfer(CURL* Handle, std::unique_ptr Ctx) { ZEN_TRACE_CPU("AsyncHttpClient::SubmitTransfer"); // Setup write/header callbacks curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, CurlWriteCallback); curl_easy_setopt(Handle, CURLOPT_WRITEDATA, &Ctx->WriteData); curl_easy_setopt(Handle, CURLOPT_HEADERFUNCTION, CurlHeaderCallback); curl_easy_setopt(Handle, CURLOPT_HEADERDATA, &Ctx->HeaderData); m_Transfers[Handle] = std::move(Ctx); CURLMcode Mc = curl_multi_add_handle(m_Multi, Handle); if (Mc != CURLM_OK) { auto Stolen = std::move(m_Transfers[Handle]); m_Transfers.erase(Handle); ReleaseHandle(Handle); HttpClient::Response ErrorResponse; ErrorResponse.Error = HttpClient::ErrorContext{.ErrorCode = HttpClientErrorCode::kInternalError, .ErrorMessage = fmt::format("curl_multi_add_handle failed: {}", curl_multi_strerror(Mc))}; asio::post(m_IoContext, [Cb = std::move(Stolen->Callback), Response = std::move(ErrorResponse)]() mutable { Cb(std::move(Response)); }); return; } } // ── Socket-action integration ─────────────────────────────────────── // // curl_multi drives I/O via two callbacks: // - SocketCallback: curl tells us which sockets to watch for read/write // - TimerCallback: curl tells us when to fire a timeout // // On each socket event or timeout we call curl_multi_socket_action(), // then drain completed transfers via curl_multi_info_read(). // Per-socket state: wraps the native fd in an ASIO socket for async_wait. struct SocketInfo { asio::ip::tcp::socket Socket; int WatchFlags = 0; // CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT explicit SocketInfo(asio::io_context& IoContext) : Socket(IoContext) {} }; // Static thunks registered with curl_multi ──────────────────────────── static int CurlSocketCallback(CURL* Easy, curl_socket_t Fd, int Action, void* UserPtr, void* SocketPtr) { ZEN_UNUSED(Easy); auto* Self = static_cast(UserPtr); Self->OnCurlSocket(Fd, Action, static_cast(SocketPtr)); return 0; } static int CurlTimerCallback(CURLM* Multi, long TimeoutMs, void* UserPtr) { ZEN_UNUSED(Multi); auto* Self = static_cast(UserPtr); Self->OnCurlTimer(TimeoutMs); return 0; } void SetupMultiCallbacks() { curl_multi_setopt(m_Multi, CURLMOPT_SOCKETFUNCTION, CurlSocketCallback); curl_multi_setopt(m_Multi, CURLMOPT_SOCKETDATA, this); curl_multi_setopt(m_Multi, CURLMOPT_TIMERFUNCTION, CurlTimerCallback); curl_multi_setopt(m_Multi, CURLMOPT_TIMERDATA, this); } // Called by curl when socket watch state changes ────────────────────── void OnCurlSocket(curl_socket_t Fd, int Action, SocketInfo* Info) { if (Action == CURL_POLL_REMOVE) { if (Info) { // Cancel pending async_wait ops before releasing the fd. // curl owns the fd, so we must release() rather than close(). Info->Socket.cancel(); if (Info->Socket.is_open()) { Info->Socket.release(); } m_Sockets.erase(Fd); } return; } if (!Info) { // New socket — wrap the native fd in an ASIO socket. auto [It, Inserted] = m_Sockets.emplace(Fd, std::make_unique(m_IoContext)); Info = It->second.get(); asio::error_code Ec; // Determine protocol from the fd (v4 vs v6). Default to v4. Info->Socket.assign(asio::ip::tcp::v4(), Fd, Ec); if (Ec) { // Try v6 as fallback Info->Socket.assign(asio::ip::tcp::v6(), Fd, Ec); } if (Ec) { ZEN_WARN("AsyncHttpClient: failed to assign socket fd {}: {}", static_cast(Fd), Ec.message()); m_Sockets.erase(Fd); return; } curl_multi_assign(m_Multi, Fd, Info); } Info->WatchFlags = Action; SetSocketWatch(Fd, Info); } void SetSocketWatch(curl_socket_t Fd, SocketInfo* Info) { // Cancel any pending wait before issuing a new one. Info->Socket.cancel(); if (Info->WatchFlags & CURL_POLL_IN) { Info->Socket.async_wait(asio::socket_base::wait_read, asio::bind_executor(m_Strand, [this, Fd](const asio::error_code& Ec) { if (Ec || m_ShuttingDown) { return; } OnSocketReady(Fd, CURL_CSELECT_IN); })); } if (Info->WatchFlags & CURL_POLL_OUT) { Info->Socket.async_wait(asio::socket_base::wait_write, asio::bind_executor(m_Strand, [this, Fd](const asio::error_code& Ec) { if (Ec || m_ShuttingDown) { return; } OnSocketReady(Fd, CURL_CSELECT_OUT); })); } } void OnSocketReady(curl_socket_t Fd, int CurlAction) { ZEN_TRACE_CPU("AsyncHttpClient::OnSocketReady"); int StillRunning = 0; curl_multi_socket_action(m_Multi, Fd, CurlAction, &StillRunning); CheckCompleted(); // Re-arm the watch if the socket is still tracked. auto It = m_Sockets.find(Fd); if (It != m_Sockets.end()) { SetSocketWatch(Fd, It->second.get()); } } // Called by curl when it wants a timeout ────────────────────────────── void OnCurlTimer(long TimeoutMs) { m_Timer.cancel(); if (TimeoutMs < 0) { // curl says "no timeout needed" return; } if (TimeoutMs == 0) { // curl wants immediate action — run it directly on the strand. asio::post(m_Strand, [this]() { if (m_ShuttingDown) { return; } int StillRunning = 0; curl_multi_socket_action(m_Multi, CURL_SOCKET_TIMEOUT, 0, &StillRunning); CheckCompleted(); }); return; } m_Timer.expires_after(std::chrono::milliseconds(TimeoutMs)); m_Timer.async_wait(asio::bind_executor(m_Strand, [this](const asio::error_code& Ec) { if (Ec || m_ShuttingDown) { return; } ZEN_TRACE_CPU("AsyncHttpClient::OnTimeout"); int StillRunning = 0; curl_multi_socket_action(m_Multi, CURL_SOCKET_TIMEOUT, 0, &StillRunning); CheckCompleted(); })); } // Drain completed transfers from curl_multi ────────────────────────── void CheckCompleted() { int MsgsLeft = 0; CURLMsg* Msg = nullptr; while ((Msg = curl_multi_info_read(m_Multi, &MsgsLeft)) != nullptr) { if (Msg->msg != CURLMSG_DONE) { continue; } CURL* Handle = Msg->easy_handle; CURLcode Result = Msg->data.result; curl_multi_remove_handle(m_Multi, Handle); auto It = m_Transfers.find(Handle); if (It == m_Transfers.end()) { ReleaseHandle(Handle); continue; } std::unique_ptr Ctx = std::move(It->second); m_Transfers.erase(It); CompleteTransfer(Handle, Result, std::move(Ctx)); } } void CompleteTransfer(CURL* Handle, CURLcode CurlResult, std::unique_ptr Ctx) { ZEN_TRACE_CPU("AsyncHttpClient::CompleteTransfer"); // Extract result info long StatusCode = 0; curl_easy_getinfo(Handle, CURLINFO_RESPONSE_CODE, &StatusCode); double Elapsed = 0; curl_easy_getinfo(Handle, CURLINFO_TOTAL_TIME, &Elapsed); curl_off_t UpBytes = 0; curl_easy_getinfo(Handle, CURLINFO_SIZE_UPLOAD_T, &UpBytes); curl_off_t DownBytes = 0; curl_easy_getinfo(Handle, CURLINFO_SIZE_DOWNLOAD_T, &DownBytes); ReleaseHandle(Handle); // Build response HttpClient::Response Response; Response.StatusCode = HttpResponseCode(StatusCode); Response.UploadedBytes = static_cast(UpBytes); Response.DownloadedBytes = static_cast(DownBytes); Response.ElapsedSeconds = Elapsed; Response.Header = BuildHeaderMap(Ctx->ResponseHeaders); if (CurlResult != CURLE_OK) { const char* ErrorMsg = curl_easy_strerror(CurlResult); if (CurlResult != CURLE_OPERATION_TIMEDOUT && CurlResult != CURLE_COULDNT_CONNECT && CurlResult != CURLE_ABORTED_BY_CALLBACK) { ZEN_WARN("AsyncHttpClient failure: ({}) '{}'", static_cast(CurlResult), ErrorMsg); } if (!Ctx->Body.empty()) { Response.ResponsePayload = IoBufferBuilder::MakeCloneFromMemory(Ctx->Body.data(), Ctx->Body.size()); } Response.Error = HttpClient::ErrorContext{.ErrorCode = MapCurlError(CurlResult), .ErrorMessage = std::string(ErrorMsg)}; } else if (StatusCode == static_cast(HttpResponseCode::NoContent) || Ctx->Body.empty()) { // No payload } else { IoBuffer PayloadBuffer = IoBufferBuilder::MakeCloneFromMemory(Ctx->Body.data(), Ctx->Body.size()); ApplyContentTypeFromHeaders(PayloadBuffer, Ctx->ResponseHeaders); const HttpResponseCode Code = HttpResponseCode(StatusCode); if (!IsHttpSuccessCode(Code) && Code != HttpResponseCode::NotFound) { ZEN_WARN("AsyncHttpClient request failed: status={}, base={}", static_cast(Code), m_BaseUri); } Response.ResponsePayload = std::move(PayloadBuffer); } // Dispatch the user callback off the strand so a slow callback // cannot starve the curl_multi poll loop. asio::post(m_IoContext, [LogRef = m_Log, Cb = std::move(Ctx->Callback), Response = std::move(Response)]() mutable { try { Cb(std::move(Response)); } catch (const std::exception& Ex) { auto Log = [&]() -> LoggerRef { return LogRef; }; ZEN_ERROR("AsyncHttpClient: unhandled exception in completion callback: {}", Ex.what()); } }); } // ── Async verb implementations ────────────────────────────────────── void DoAsyncGet(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader, HttpClient::KeyValueMap Parameters) { asio::post(m_Strand, [this, Url = std::move(Url), Callback = std::move(Callback), AdditionalHeader = std::move(AdditionalHeader), Parameters = std::move(Parameters)]() mutable { ZEN_TRACE_CPU("AsyncHttpClient::Get"); if (m_ShuttingDown) { return; } CURL* Handle = AllocHandle(); ConfigureHandle(Handle, Url, Parameters); curl_easy_setopt(Handle, CURLOPT_HTTPGET, 1L); auto Ctx = std::make_unique(std::move(Callback)); Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken()); curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList); SubmitTransfer(Handle, std::move(Ctx)); }); } void DoAsyncHead(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader) { asio::post(m_Strand, [this, Url = std::move(Url), Callback = std::move(Callback), AdditionalHeader = std::move(AdditionalHeader)]() mutable { ZEN_TRACE_CPU("AsyncHttpClient::Head"); if (m_ShuttingDown) { return; } CURL* Handle = AllocHandle(); ConfigureHandle(Handle, Url, {}); curl_easy_setopt(Handle, CURLOPT_NOBODY, 1L); auto Ctx = std::make_unique(std::move(Callback)); Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken()); curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList); SubmitTransfer(Handle, std::move(Ctx)); }); } void DoAsyncDelete(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader) { asio::post(m_Strand, [this, Url = std::move(Url), Callback = std::move(Callback), AdditionalHeader = std::move(AdditionalHeader)]() mutable { ZEN_TRACE_CPU("AsyncHttpClient::Delete"); if (m_ShuttingDown) { return; } CURL* Handle = AllocHandle(); ConfigureHandle(Handle, Url, {}); curl_easy_setopt(Handle, CURLOPT_CUSTOMREQUEST, "DELETE"); auto Ctx = std::make_unique(std::move(Callback)); Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken()); curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList); SubmitTransfer(Handle, std::move(Ctx)); }); } void DoAsyncPost(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader, HttpClient::KeyValueMap Parameters) { asio::post(m_Strand, [this, Url = std::move(Url), Callback = std::move(Callback), AdditionalHeader = std::move(AdditionalHeader), Parameters = std::move(Parameters)]() mutable { ZEN_TRACE_CPU("AsyncHttpClient::Post"); if (m_ShuttingDown) { return; } CURL* Handle = AllocHandle(); ConfigureHandle(Handle, Url, Parameters); curl_easy_setopt(Handle, CURLOPT_POST, 1L); curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE, 0L); auto Ctx = std::make_unique(std::move(Callback)); Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken()); curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList); SubmitTransfer(Handle, std::move(Ctx)); }); } void DoAsyncPostWithPayload(std::string Url, IoBuffer Payload, ZenContentType ContentType, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader) { asio::post(m_Strand, [this, Url = std::move(Url), Payload = std::move(Payload), ContentType, Callback = std::move(Callback), AdditionalHeader = std::move(AdditionalHeader)]() mutable { ZEN_TRACE_CPU("AsyncHttpClient::PostWithPayload"); if (m_ShuttingDown) { return; } CURL* Handle = AllocHandle(); ConfigureHandle(Handle, Url, {}); curl_easy_setopt(Handle, CURLOPT_POST, 1L); auto Ctx = std::make_unique(std::move(Callback)); Ctx->PayloadBuffer = std::move(Payload); Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken(), {std::make_pair("Content-Type", std::string(MapContentTypeToString(ContentType)))}); curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList); // Set up read callback for payload data Ctx->ReadData.DataPtr = static_cast(Ctx->PayloadBuffer.GetData()); Ctx->ReadData.DataSize = Ctx->PayloadBuffer.GetSize(); Ctx->ReadData.Offset = 0; curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE_LARGE, static_cast(Ctx->PayloadBuffer.GetSize())); curl_easy_setopt(Handle, CURLOPT_READFUNCTION, CurlReadCallback); curl_easy_setopt(Handle, CURLOPT_READDATA, &Ctx->ReadData); SubmitTransfer(Handle, std::move(Ctx)); }); } void DoAsyncPutWithPayload(std::string Url, IoBuffer Payload, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader, HttpClient::KeyValueMap Parameters) { asio::post(m_Strand, [this, Url = std::move(Url), Payload = std::move(Payload), Callback = std::move(Callback), AdditionalHeader = std::move(AdditionalHeader), Parameters = std::move(Parameters)]() mutable { ZEN_TRACE_CPU("AsyncHttpClient::Put"); if (m_ShuttingDown) { return; } CURL* Handle = AllocHandle(); ConfigureHandle(Handle, Url, Parameters); curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L); auto Ctx = std::make_unique(std::move(Callback)); Ctx->PayloadBuffer = std::move(Payload); Ctx->HeaderList = BuildHeaderList( AdditionalHeader, m_SessionId, GetAccessToken(), {std::make_pair("Content-Type", std::string(MapContentTypeToString(Ctx->PayloadBuffer.GetContentType())))}); curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList); Ctx->ReadData.DataPtr = static_cast(Ctx->PayloadBuffer.GetData()); Ctx->ReadData.DataSize = Ctx->PayloadBuffer.GetSize(); Ctx->ReadData.Offset = 0; curl_easy_setopt(Handle, CURLOPT_INFILESIZE_LARGE, static_cast(Ctx->PayloadBuffer.GetSize())); curl_easy_setopt(Handle, CURLOPT_READFUNCTION, CurlReadCallback); curl_easy_setopt(Handle, CURLOPT_READDATA, &Ctx->ReadData); SubmitTransfer(Handle, std::move(Ctx)); }); } void DoAsyncPutNoPayload(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap Parameters) { asio::post(m_Strand, [this, Url = std::move(Url), Callback = std::move(Callback), Parameters = std::move(Parameters)]() mutable { ZEN_TRACE_CPU("AsyncHttpClient::Put"); if (m_ShuttingDown) { return; } CURL* Handle = AllocHandle(); ConfigureHandle(Handle, Url, Parameters); curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L); curl_easy_setopt(Handle, CURLOPT_INFILESIZE_LARGE, 0LL); auto Ctx = std::make_unique(std::move(Callback)); HttpClient::KeyValueMap ContentLengthHeader{std::pair{"Content-Length", "0"}}; Ctx->HeaderList = BuildHeaderList(ContentLengthHeader, m_SessionId, GetAccessToken()); curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList); SubmitTransfer(Handle, std::move(Ctx)); }); } // ── Members ───────────────────────────────────────────────────────── std::string m_BaseUri; HttpClientSettings m_Settings; LoggerRef m_Log; std::string m_SessionId; std::string m_UnixSocketPathUtf8; // io_context and strand — all curl_multi operations are serialized on the // strand, making this safe even when the io_context has multiple threads. std::unique_ptr m_OwnedIoContext; asio::io_context& m_IoContext; asio::strand m_Strand; std::optional> m_WorkGuard; std::thread m_IoThread; // curl_multi and socket-action state CURLM* m_Multi = nullptr; std::unordered_map> m_Transfers; std::vector m_HandlePool; std::unordered_map> m_Sockets; asio::steady_timer m_Timer; bool m_ShuttingDown = false; // Access token cache RwLock m_AccessTokenLock; HttpClientAccessToken m_CachedAccessToken; }; ////////////////////////////////////////////////////////////////////////// // // AsyncHttpClient public API AsyncHttpClient::AsyncHttpClient(std::string_view BaseUri, const HttpClientSettings& Settings) : m_Impl(std::make_unique(BaseUri, Settings)) { } AsyncHttpClient::AsyncHttpClient(std::string_view BaseUri, asio::io_context& IoContext, const HttpClientSettings& Settings) : m_Impl(std::make_unique(BaseUri, IoContext, Settings)) { } AsyncHttpClient::~AsyncHttpClient() = default; // ── Callback-based API ────────────────────────────────────────────────── void AsyncHttpClient::AsyncGet(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { m_Impl->DoAsyncGet(std::string(Url), std::move(Callback), AdditionalHeader, Parameters); } void AsyncHttpClient::AsyncHead(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader) { m_Impl->DoAsyncHead(std::string(Url), std::move(Callback), AdditionalHeader); } void AsyncHttpClient::AsyncDelete(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader) { m_Impl->DoAsyncDelete(std::string(Url), std::move(Callback), AdditionalHeader); } void AsyncHttpClient::AsyncPost(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { m_Impl->DoAsyncPost(std::string(Url), std::move(Callback), AdditionalHeader, Parameters); } void AsyncHttpClient::AsyncPost(std::string_view Url, const IoBuffer& Payload, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader) { m_Impl->DoAsyncPostWithPayload(std::string(Url), Payload, Payload.GetContentType(), std::move(Callback), AdditionalHeader); } void AsyncHttpClient::AsyncPost(std::string_view Url, const IoBuffer& Payload, ZenContentType ContentType, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader) { m_Impl->DoAsyncPostWithPayload(std::string(Url), Payload, ContentType, std::move(Callback), AdditionalHeader); } void AsyncHttpClient::AsyncPut(std::string_view Url, const IoBuffer& Payload, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { m_Impl->DoAsyncPutWithPayload(std::string(Url), Payload, std::move(Callback), AdditionalHeader, Parameters); } void AsyncHttpClient::AsyncPut(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& Parameters) { m_Impl->DoAsyncPutNoPayload(std::string(Url), std::move(Callback), Parameters); } // ── Future-based API ──────────────────────────────────────────────────── std::future AsyncHttpClient::Get(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { auto Promise = std::make_shared>(); auto Future = Promise->get_future(); AsyncGet( Url, [Promise](Response R) { Promise->set_value(std::move(R)); }, AdditionalHeader, Parameters); return Future; } std::future AsyncHttpClient::Head(std::string_view Url, const KeyValueMap& AdditionalHeader) { auto Promise = std::make_shared>(); auto Future = Promise->get_future(); AsyncHead( Url, [Promise](Response R) { Promise->set_value(std::move(R)); }, AdditionalHeader); return Future; } std::future AsyncHttpClient::Delete(std::string_view Url, const KeyValueMap& AdditionalHeader) { auto Promise = std::make_shared>(); auto Future = Promise->get_future(); AsyncDelete( Url, [Promise](Response R) { Promise->set_value(std::move(R)); }, AdditionalHeader); return Future; } std::future AsyncHttpClient::Post(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { auto Promise = std::make_shared>(); auto Future = Promise->get_future(); AsyncPost( Url, [Promise](Response R) { Promise->set_value(std::move(R)); }, AdditionalHeader, Parameters); return Future; } std::future AsyncHttpClient::Post(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader) { auto Promise = std::make_shared>(); auto Future = Promise->get_future(); AsyncPost( Url, Payload, [Promise](Response R) { Promise->set_value(std::move(R)); }, AdditionalHeader); return Future; } std::future AsyncHttpClient::Post(std::string_view Url, const IoBuffer& Payload, ZenContentType ContentType, const KeyValueMap& AdditionalHeader) { auto Promise = std::make_shared>(); auto Future = Promise->get_future(); AsyncPost( Url, Payload, ContentType, [Promise](Response R) { Promise->set_value(std::move(R)); }, AdditionalHeader); return Future; } std::future AsyncHttpClient::Put(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { auto Promise = std::make_shared>(); auto Future = Promise->get_future(); AsyncPut( Url, Payload, [Promise](Response R) { Promise->set_value(std::move(R)); }, AdditionalHeader, Parameters); return Future; } std::future AsyncHttpClient::Put(std::string_view Url, const KeyValueMap& Parameters) { auto Promise = std::make_shared>(); auto Future = Promise->get_future(); AsyncPut( Url, [Promise](Response R) { Promise->set_value(std::move(R)); }, Parameters); return Future; } } // namespace zen