// Copyright Epic Games, Inc. All Rights Reserved. #include #include "httpclientcurlhelpers.h" #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include #include ZEN_THIRD_PARTY_INCLUDES_END #include #include #include #include #include namespace zen { ////////////////////////////////////////////////////////////////////////// // // AsyncRequestToken state (forward-declared in the public header so tokens // can be returned by value without leaking impl details). struct AsyncRequestToken::State { std::function CancelFn; std::atomic Cancelled = false; }; ////////////////////////////////////////////////////////////////////////// // // TransferContext: per-transfer state associated with each CURL easy handle // Request blueprint kept alongside each transfer so retries can re-issue with // the original verb/url/headers/payload after the previous attempt's transient // failure. enum class AsyncRequestMethod { Get, Head, Delete, Post, PostWithPayload, Put, PutWithPayload, PutWithSource, // PUT, body pulled via OnReadSource (no materialized payload) Stream, // GET, response body delivered via OnData callback (no copy) }; inline std::string_view AsyncRequestMethodName(AsyncRequestMethod M) { switch (M) { case AsyncRequestMethod::Get: return "GET"; case AsyncRequestMethod::Head: return "HEAD"; case AsyncRequestMethod::Delete: return "DELETE"; case AsyncRequestMethod::Post: return "POST"; case AsyncRequestMethod::PostWithPayload: return "POST(payload)"; case AsyncRequestMethod::Put: return "PUT"; case AsyncRequestMethod::PutWithPayload: return "PUT(payload)"; case AsyncRequestMethod::PutWithSource: return "PUT(stream)"; case AsyncRequestMethod::Stream: return "GET(stream)"; } return "?"; } struct AsyncRequestSpec { AsyncRequestMethod Method = AsyncRequestMethod::Get; std::string Url; HttpClient::KeyValueMap AdditionalHeader; HttpClient::KeyValueMap Parameters; IoBuffer Payload; // POST/PUT with payload ZenContentType ContentType = ZenContentType::kUnknownContentType; bool HasContentType = false; AsyncHttpDataCallback OnData; // Stream method AsyncHttpReadSource OnReadSource; // PutWithSource method uint64_t StreamingPutSize = 0; // Content-Length for PutWithSource // Opt-in header capture. By default Response::Header is left empty; inline // extracts always run because they steer the body path. WantHeaderMap pays // O(headers) string allocs on the io thread (rare - most callers use // Response::FindHeader). WantEtag triggers an inline parse only. bool WantEtag = false; bool WantHeaderMap = false; }; struct TransferContext { AsyncHttpCallback Callback; AsyncRequestSpec Spec; uint8_t AttemptCount = 0; uint64_t TokenId = 0; bool Cancelled = false; // Curl handle owning this transfer once submitted to curl_multi. Null between // Submit() and SubmitFromSpec(), and again after CompleteTransfer releases the // handle back to the pool. CURL* CurlHandle = nullptr; // Strong reference to the user-facing AsyncRequestToken::State. Kept alive // so AsyncRequestToken::Cancel() retains a valid CancelFn target until the // transfer completes. The typed pointer also lets SubmitFromSpec read // State.Cancelled to honour cancels that arrived between Submit() returning // and the io thread running SubmitFromSpec. std::shared_ptr TokenStateRef; // Two paths gated by BodyPreallocated: when Content-Length is known we // fill Body in place (zero-copy move into Response); otherwise BodyChunks // accumulates per-WRITE IoBuffers, flattened at completion. IoBuffer Body; uint64_t BodyWriteOffset = 0; bool BodyPreallocated = false; std::vector BodyChunks; // Raw response headers, "Key: Value\r\n" lines as delivered by curl. // One growing buffer; reserve covers common case (~1 KiB) with no realloc. std::string HeaderArena; // Captured at the curl callback boundary so an exception out of the user // OnData / OnReadSource never propagates through curl's C frames (UB). // Surfaced as a kInternalError response by CompleteTransfer. bool CallbackFailed = false; std::string CallbackErrorMessage; // Inline-parsed in CurlHeaderCallback. Etag is populated only when // Spec.WantEtag is set; the others are always parsed since they steer // the body path / content-type tagging. uint64_t ContentLength = 0; bool ContentLengthSet = false; ZenContentType BodyContentType = ZenContentType::kUnknownContentType; std::string Etag; curl_slist* HeaderList = nullptr; IoBuffer PayloadBuffer; CurlReadCallbackData ReadData; uint64_t SourceOffset = 0; // PutWithSource: bytes pulled from Spec.OnReadSource so far. // Last attempt's failure, kept across the backoff so a retry-abandoned // path can surface the underlying cause instead of a generic // "Request canceled (retry abandoned)". Non-empty LastErrorMessage = stash valid. CURLcode LastCurlResult = CURLE_OK; long LastStatusCode = 0; std::string LastErrorMessage; TransferContext(AsyncHttpCallback&& InCallback) : Callback(std::move(InCallback)) { HeaderArena.reserve(1024); } ~TransferContext() { FreeHeaderList(); } TransferContext(const TransferContext&) = delete; TransferContext& operator=(const TransferContext&) = delete; // Reset accumulated response state so the same context can be re-submitted // for a retry attempt. void ResetForRetry() { Body = IoBuffer{}; BodyWriteOffset = 0; BodyPreallocated = false; BodyChunks.clear(); HeaderArena.clear(); // keep capacity ContentLength = 0; ContentLengthSet = false; BodyContentType = ZenContentType::kUnknownContentType; Etag.clear(); FreeHeaderList(); ReadData = {}; SourceOffset = 0; CallbackFailed = false; CallbackErrorMessage.clear(); // LastCurlResult / LastStatusCode / LastErrorMessage are intentionally NOT // cleared - they describe the just-finished attempt that triggered this // retry, and surface in the abandon path if the next attempt is cancelled // or shutdown-aborted. } void FreeHeaderList() { if (HeaderList) { curl_slist_free_all(HeaderList); HeaderList = nullptr; } } }; ////////////////////////////////////////////////////////////////////////// // // SocketInfo: per-socket state. struct AsyncSocketInfo { asio::ip::tcp::socket Socket; int WatchFlags = 0; // CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT int PendingFlags = 0; // directions with outstanding async_wait // Bound to the strand executor so async_wait completions are serialized on // the same strand that drives curl_multi - safe even when the underlying // io_context is multithreaded (external-context mode). explicit AsyncSocketInfo(const asio::strand& Strand) : Socket(Strand) {} }; // Holds the curl_multi instance and a strand that serializes every curl_multi op. Owned io_context // (default ctor) spins a private thread driving run(); external io_context mode lets the caller // drive the loop. The strand is the single serialization point for both modes. struct AsyncHttpClient::Impl : std::enable_shared_from_this { // Owned-io_context ctor: allocate a private io_context and run it on a // dedicated thread. Cleanest path for callers that just want an // AsyncHttpClient and don't care about the loop. Impl(std::string_view BaseUri, const HttpClientSettings& Settings) : m_BaseUri(BaseUri) , m_Settings(Settings) , m_Log(logging::Get(Settings.LogCategory)) , m_OwnedIoContext(std::make_unique()) , m_IoContext(*m_OwnedIoContext) , m_Strand(asio::make_strand(m_IoContext)) , m_Timer(m_Strand) { Init(); m_WorkGuard.emplace(m_IoContext.get_executor()); auto ThreadGuard = MakeGuard([this]() { m_WorkGuard.reset(); if (m_IoThread.joinable()) { m_IoThread.join(); } }); m_IoThread = std::thread([this]() { SetCurrentThreadName("async_http"); try { m_IoContext.run(); } catch (const std::exception& Ex) { ZEN_ERROR("AsyncHttpClient: io thread unhandled exception: {}", Ex.what()); } }); ThreadGuard.Dismiss(); } // External-io_context ctor: caller drives the run loop. We do NOT spawn a // thread, do NOT hold a work guard, and do NOT call stop()/restart() on // teardown - the caller's lifecycle owns those. Shutdown blocks on a // promise until our cleanup handler runs through the strand, so the // caller MUST keep the loop running until the AsyncHttpClient destructs. Impl(std::string_view BaseUri, asio::io_context& IoContext, const HttpClientSettings& Settings) : m_BaseUri(BaseUri) , m_Settings(Settings) , m_Log(logging::Get(Settings.LogCategory)) , m_IoContext(IoContext) , m_Strand(asio::make_strand(m_IoContext)) , m_Timer(m_Strand) { Init(); } ~Impl() { Shutdown(); } // Synchronous teardown from ~AsyncHttpClient. Idempotent. Branches by ownership: // - Owned io_context: post Cleanup, drop the work guard, force run() to return, join the io // thread, then poll() to drain lambdas that captured shared_ptr (the "lambda in // io_context owned by Impl" cycle would otherwise pin Impl alive forever). // - External io_context: caller drives the loop, post Cleanup to the strand and block on a // promise. Destroying from the io thread itself would deadlock. void Shutdown() { if (m_ShutdownDone.exchange(true, std::memory_order_acq_rel)) { return; } if (m_OwnedIoContext) { // Post Cleanup before releasing the work guard so the io thread // runs Cleanup before run() returns. Run() normally exits when // the queue is empty AND no work guard is held. asio::post(m_Strand, [this]() { Cleanup(); }); m_WorkGuard.reset(); // Belt-and-suspenders: force run() to return even if asio's // outstanding_work_ counter is left non-zero by a close-during- // cancel race in win_iocp_socket_service. The race is observable // as a hung join() at shutdown after a burst of socket teardowns // inside curl_multi_cleanup; stop() here is purely a safety net // for the teardown path. The trailing restart() + poll() drains // any handlers stop() leaves undispatched. m_IoContext.stop(); if (m_IoThread.joinable()) { m_IoThread.join(); } // Drain any leftover work posted to the io_context but not run // before the thread exited (e.g. a Cancel-after-completion lambda // that captured shared_ptr by value). Loop until the queue // is empty: a single poll() is one quanta and may not drain // handlers that themselves post follow-ups. m_IoContext.restart(); while (m_IoContext.poll() != 0) { } } else { // External: block on the cleanup handler so we can guarantee // curl_multi state is gone before m_Impl drops. std::promise Done; std::future DoneFuture = Done.get_future(); asio::post(m_Strand, [this, &Done]() { Cleanup(); Done.set_value(); }); DoneFuture.wait(); } } // Cleanup body, run on the io thread. void Cleanup() { m_ShuttingDown = true; m_Timer.cancel(); // Tear down curl handles first; curl drives CURL_POLL_REMOVE + // CLOSESOCKETFUNCTION for each owned socket, which our handlers // use to retire SocketInfo entries from m_Sockets. for (auto& [TokenId, Ctx] : m_Transfers) { if (Ctx->CurlHandle) { curl_multi_remove_handle(m_Multi, Ctx->CurlHandle); curl_easy_cleanup(Ctx->CurlHandle); Ctx->CurlHandle = nullptr; } HttpClient::Response Resp; Resp.Error = HttpClient::ErrorContext{ .ErrorCode = HttpClientErrorCode::kRequestCancelled, .ErrorMessage = "AsyncHttpClient shutting down", }; try { Ctx->Callback(std::move(Resp)); } catch (const std::exception& Ex) { ZEN_ERROR("AsyncHttpClient: unhandled exception in shutdown callback (token={}, {} {}): {}", TokenId, AsyncRequestMethodName(Ctx->Spec.Method), Ctx->Spec.Url, Ex.what()); } } m_Transfers.clear(); m_InFlight = 0; // Drain transfers parked in retry backoff. The timer's pending // async_wait fires after Cleanup with the io_context already stopped; // it will find no entry and be a no-op. Fire cancel callbacks here // while we still hold the storage so callers' futures resolve. for (auto& [Id, Entry] : m_RetryingTransfers) { if (Entry.Timer) { Entry.Timer->cancel(); } HttpClient::Response Resp; Resp.Error = HttpClient::ErrorContext{ .ErrorCode = HttpClientErrorCode::kRequestCancelled, .ErrorMessage = "AsyncHttpClient shutting down", }; try { Entry.Ctx->Callback(std::move(Resp)); } catch (const std::exception& Ex) { ZEN_ERROR("AsyncHttpClient: unhandled exception in shutdown retry callback (token={}, {} {}): {}", Id, AsyncRequestMethodName(Entry.Ctx->Spec.Method), Entry.Ctx->Spec.Url, Ex.what()); } } m_RetryingTransfers.clear(); // curl_multi_cleanup walks the connection cache and fires // CLOSESOCKETFUNCTION for each cached fd. Run it on the io thread // while m_Sockets is still populated so our callback routes each // close through the map (Socket dtor closes fd exactly once). if (m_Multi) { curl_multi_cleanup(m_Multi); m_Multi = nullptr; } for (CURL* Handle : m_HandlePool) { curl_easy_cleanup(Handle); } m_HandlePool.clear(); asio::error_code CancelEc; for (auto& [Fd, Info] : m_Sockets) { Info->Socket.cancel(CancelEc); } m_Sockets.clear(); } LoggerRef Log() { return m_Log; } void Init() { if (!m_Settings.UnixSocketPath.empty()) { m_UnixSocketPathUtf8 = PathToUtf8(m_Settings.UnixSocketPath); } m_Multi = curl_multi_init(); if (!m_Multi) { throw std::runtime_error("curl_multi_init failed"); } SetupMultiCallbacks(); if (m_Settings.MaxConcurrentConnectionsPerHost != 0) { curl_multi_setopt(m_Multi, CURLMOPT_MAX_HOST_CONNECTIONS, static_cast(m_Settings.MaxConcurrentConnectionsPerHost)); } if (m_Settings.MaxConcurrentConnectionsTotal != 0) { curl_multi_setopt(m_Multi, CURLMOPT_MAX_TOTAL_CONNECTIONS, static_cast(m_Settings.MaxConcurrentConnectionsTotal)); } // Size the idle-conn cache to the in-flight cap so reused conns are // never evicted while requests are queued. Each eviction costs a // fresh TCP+TLS handshake (~280ms WAN to S3) on the next reuse. const long MaxConnectsHint = static_cast(std::max({m_Settings.MaxConcurrentRequests, m_Settings.MaxConcurrentConnectionsTotal, m_Settings.MaxConcurrentConnectionsPerHost, 128u})); curl_multi_setopt(m_Multi, CURLMOPT_MAXCONNECTS, MaxConnectsHint); if (m_Settings.SessionId == Oid::Zero) { m_SessionId = std::string(GetSessionIdString()); } else { m_SessionId = m_Settings.SessionId.ToString(); } } // Run a completion callback inline on the io thread. By the time this is // called, curl_multi_remove_handle + curl_easy_cleanup have already finalized // the easy handle, so deferring to next io tick buys nothing. Direct call // saves one alloc + queue insert per request. // // CONTRACT: user callbacks run on the AsyncHttpClient io thread. Heavy work // (disk syscalls, lock contention, large allocations) must be hopped to a // worker pool; otherwise it stalls curl_multi for ALL in-flight transfers. void DispatchCallback(AsyncHttpCallback Cb, HttpClient::Response Resp, const TransferContext& Ctx) { try { Cb(std::move(Resp)); } catch (const std::exception& Ex) { ZEN_ERROR("AsyncHttpClient: unhandled exception in completion callback (token={}, {} {}): {}", Ctx.TokenId, AsyncRequestMethodName(Ctx.Spec.Method), Ctx.Spec.Url, Ex.what()); } } // -- Handle pool ----------------------------------------------------- CURL* AllocHandle() { if (!m_HandlePool.empty()) { CURL* Handle = m_HandlePool.back(); m_HandlePool.pop_back(); curl_easy_reset(Handle); return Handle; } CURL* Handle = curl_easy_init(); if (!Handle) { throw std::runtime_error("curl_easy_init failed"); } return Handle; } void ReleaseHandle(CURL* Handle) { m_HandlePool.push_back(Handle); } // Called only from DoAsync* lambdas running on the io thread. void ConfigureHandle(CURL* Handle, std::string_view ResourcePath, const HttpClient::KeyValueMap& Parameters) { ExtendableStringBuilder<256> Url; BuildUrlWithParameters(Url, m_BaseUri, ResourcePath, Parameters); curl_easy_setopt(Handle, CURLOPT_URL, Url.c_str()); if (!m_Settings.UnixSocketPath.empty()) { curl_easy_setopt(Handle, CURLOPT_UNIX_SOCKET_PATH, m_UnixSocketPathUtf8.c_str()); } // Timeouts if (m_Settings.ConnectTimeout.count() > 0) { curl_easy_setopt(Handle, CURLOPT_CONNECTTIMEOUT_MS, static_cast(m_Settings.ConnectTimeout.count())); } if (m_Settings.Timeout.count() > 0) { curl_easy_setopt(Handle, CURLOPT_TIMEOUT_MS, static_cast(m_Settings.Timeout.count())); } // SSL if (m_Settings.InsecureSsl) { curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYPEER, 0L); curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYHOST, 0L); } if (!m_Settings.CaBundlePath.empty()) { curl_easy_setopt(Handle, CURLOPT_CAINFO, m_Settings.CaBundlePath.c_str()); } if (m_Settings.Verbose) { curl_easy_setopt(Handle, CURLOPT_VERBOSE, 1L); } // Thread safety curl_easy_setopt(Handle, CURLOPT_NOSIGNAL, 1L); // 256 KiB recv buffer aligns with optimal read syscall size and matches // upload buffer; pairs with downstream 512 KiB write slots (2 recv calls // per write slot under bulk transfer). curl_easy_setopt(Handle, CURLOPT_BUFFERSIZE, 262144L); curl_easy_setopt(Handle, CURLOPT_UPLOAD_BUFFERSIZE, 262144L); // Skip per-transfer progress bookkeeping; we don't consume it. curl_easy_setopt(Handle, CURLOPT_NOPROGRESS, 1L); // Disable Nagle (default since curl 7.50; explicit for safety). curl_easy_setopt(Handle, CURLOPT_TCP_NODELAY, 1L); // Take ownership of socket close (see CurlCloseSocketCallback). curl_easy_setopt(Handle, CURLOPT_CLOSESOCKETFUNCTION, &CurlCloseSocketCallback); curl_easy_setopt(Handle, CURLOPT_CLOSESOCKETDATA, this); if (m_Settings.ForbidReuseConnection) { curl_easy_setopt(Handle, CURLOPT_FORBID_REUSE, 1L); } } // -- Access token ---------------------------------------------------- struct AccessTokenResult { std::optional Token; bool ProviderFailed = false; // provider configured but returned invalid twice }; // Called only on the io thread. AccessTokenResult GetAccessToken() { AccessTokenResult Result; if (!m_Settings.AccessTokenProvider.has_value()) { return Result; // No provider: anonymous is the intended mode. } if (!m_CachedAccessToken.NeedsRefresh()) { Result.Token = m_CachedAccessToken.GetValue(); return Result; } HttpClientAccessToken NewToken = m_Settings.AccessTokenProvider.value()(); if (!NewToken.IsValid()) { ZEN_WARN("AsyncHttpClient: failed to refresh access token, retrying once"); NewToken = m_Settings.AccessTokenProvider.value()(); } if (NewToken.IsValid()) { m_CachedAccessToken = NewToken; Result.Token = m_CachedAccessToken.GetValue(); return Result; } ZEN_WARN("AsyncHttpClient: access token provider returned invalid token"); Result.ProviderFailed = true; return Result; } // -- Submit / resubmit ----------------------------------------------- // // SubmitFromSpec runs on the io thread. Used for both the initial submission // and for retries: the AsyncRequestSpec inside Ctx encodes everything // needed to (re)build the curl handle from scratch. void SubmitFromSpec(std::unique_ptr Ctx) { if (m_ShuttingDown) { // Synthesize a cancel response so the user callback fires exactly once. // Without this any Ctx that lands here post-shutdown would be dropped // silently, leaving waiting futures unresolved. HttpClient::Response CancelResp; CancelResp.Error = HttpClient::ErrorContext{ .ErrorCode = HttpClientErrorCode::kRequestCancelled, .ErrorMessage = "Request canceled (client shutting down)", }; DispatchCallback(std::move(Ctx->Callback), std::move(CancelResp), *Ctx); return; } // Cancel-before-submit race: the user can call AsyncRequestToken::Cancel() // between Submit() returning and the io thread running SubmitFromSpec. // State.Cancelled is set under acq_rel by Cancel(); read here under // acquire ensures visibility. If set, fire the cancel callback exactly // once and bail before the transfer enters m_Transfers. if (Ctx->TokenStateRef && Ctx->TokenStateRef->Cancelled.load(std::memory_order_acquire)) { HttpClient::Response CancelResp; CancelResp.Error = HttpClient::ErrorContext{ .ErrorCode = HttpClientErrorCode::kRequestCancelled, .ErrorMessage = "Request canceled before submit", }; DispatchCallback(std::move(Ctx->Callback), std::move(CancelResp), *Ctx); return; } // Allocate the curl handle BEFORE bumping m_InFlight: AllocHandle can throw // (curl_easy_init returning null). The InFlightGuard covers the rest of // the body (BuildHeaderList, ExtraHeaders push_back, GetAccessToken can // all throw bad_alloc) so a throw post-increment doesn't leak the slot. // HandleGuard returns the handle to the pool on throw; CallbackGuard // synthesizes a kInternalError response so the user future resolves. CURL* Handle = AllocHandle(); ++m_InFlight; auto InFlightGuard = MakeGuard([this] { --m_InFlight; }); auto HandleGuard = MakeGuard([this, Handle] { ReleaseHandle(Handle); }); auto CallbackGuard = MakeGuard([this, &Ctx] { HttpClient::Response ErrResp; ErrResp.Error = HttpClient::ErrorContext{ .ErrorCode = HttpClientErrorCode::kInternalError, .ErrorMessage = "AsyncHttpClient::SubmitFromSpec: setup threw before dispatch", }; DispatchCallback(std::move(Ctx->Callback), std::move(ErrResp), *Ctx); }); ConfigureHandle(Handle, Ctx->Spec.Url, Ctx->Spec.Parameters); switch (Ctx->Spec.Method) { case AsyncRequestMethod::Get: curl_easy_setopt(Handle, CURLOPT_HTTPGET, 1L); break; case AsyncRequestMethod::Stream: curl_easy_setopt(Handle, CURLOPT_HTTPGET, 1L); break; case AsyncRequestMethod::Head: curl_easy_setopt(Handle, CURLOPT_NOBODY, 1L); break; case AsyncRequestMethod::Delete: curl_easy_setopt(Handle, CURLOPT_CUSTOMREQUEST, "DELETE"); break; case AsyncRequestMethod::Post: curl_easy_setopt(Handle, CURLOPT_POST, 1L); curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE, 0L); break; case AsyncRequestMethod::PostWithPayload: { curl_easy_setopt(Handle, CURLOPT_POST, 1L); Ctx->PayloadBuffer = Ctx->Spec.Payload; Ctx->ReadData.DataPtr = static_cast(Ctx->PayloadBuffer.GetData()); Ctx->ReadData.DataSize = Ctx->PayloadBuffer.GetSize(); Ctx->ReadData.Offset = 0; curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE_LARGE, static_cast(Ctx->PayloadBuffer.GetSize())); curl_easy_setopt(Handle, CURLOPT_READFUNCTION, CurlReadCallback); curl_easy_setopt(Handle, CURLOPT_READDATA, &Ctx->ReadData); break; } case AsyncRequestMethod::Put: curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L); curl_easy_setopt(Handle, CURLOPT_INFILESIZE_LARGE, 0LL); break; case AsyncRequestMethod::PutWithPayload: { curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L); Ctx->PayloadBuffer = Ctx->Spec.Payload; Ctx->ReadData.DataPtr = static_cast(Ctx->PayloadBuffer.GetData()); Ctx->ReadData.DataSize = Ctx->PayloadBuffer.GetSize(); Ctx->ReadData.Offset = 0; curl_easy_setopt(Handle, CURLOPT_INFILESIZE_LARGE, static_cast(Ctx->PayloadBuffer.GetSize())); curl_easy_setopt(Handle, CURLOPT_READFUNCTION, CurlReadCallback); curl_easy_setopt(Handle, CURLOPT_READDATA, &Ctx->ReadData); break; } case AsyncRequestMethod::PutWithSource: { curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L); Ctx->SourceOffset = 0; curl_easy_setopt(Handle, CURLOPT_INFILESIZE_LARGE, static_cast(Ctx->Spec.StreamingPutSize)); curl_easy_setopt(Handle, CURLOPT_READFUNCTION, AsyncCurlSourceReadCallback); curl_easy_setopt(Handle, CURLOPT_READDATA, Ctx.get()); break; } } // Headers - include Content-Type for payload-bearing methods, Content-Length: 0 for empty PUT. std::vector> ExtraHeaders; if (Ctx->Spec.Method == AsyncRequestMethod::PostWithPayload) { const ZenContentType Effective = Ctx->Spec.HasContentType ? Ctx->Spec.ContentType : Ctx->Spec.Payload.GetContentType(); ExtraHeaders.emplace_back("Content-Type", std::string(MapContentTypeToString(Effective))); } else if (Ctx->Spec.Method == AsyncRequestMethod::PutWithPayload) { ExtraHeaders.emplace_back("Content-Type", std::string(MapContentTypeToString(Ctx->Spec.Payload.GetContentType()))); } else if (Ctx->Spec.Method == AsyncRequestMethod::Put) { ExtraHeaders.emplace_back("Content-Length", "0"); } AccessTokenResult Token = GetAccessToken(); if (Token.ProviderFailed) { // Provider configured but failed twice: do NOT silently downgrade // to an anonymous request - the server will respond 403 and the // caller has no way to tell auth failed. HttpClient::Response ErrResp; ErrResp.Error = HttpClient::ErrorContext{ .ErrorCode = HttpClientErrorCode::kInternalError, .ErrorMessage = "AsyncHttpClient: access token provider failed; refusing to issue anonymous request", }; DispatchCallback(std::move(Ctx->Callback), std::move(ErrResp), *Ctx); CallbackGuard.Dismiss(); return; } Ctx->HeaderList = BuildHeaderList(Ctx->Spec.AdditionalHeader, m_SessionId, std::move(Token.Token), ExtraHeaders); curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList); InFlightGuard.Dismiss(); HandleGuard.Dismiss(); CallbackGuard.Dismiss(); SubmitTransfer(Handle, std::move(Ctx)); } // -- Submit a transfer ----------------------------------------------- void SubmitTransfer(CURL* Handle, std::unique_ptr Ctx) { ZEN_TRACE_CPU("AsyncHttpClient::SubmitTransfer"); // Pick the WRITE callback by method: // - Stream: forwards each chunk to the caller's OnData (no copy) // - other: buffers bytes in TransferContext::Body if (Ctx->Spec.Method == AsyncRequestMethod::Stream) { curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, AsyncCurlStreamWriteCallback); curl_easy_setopt(Handle, CURLOPT_WRITEDATA, Ctx.get()); } else { curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, AsyncCurlWriteCallback); curl_easy_setopt(Handle, CURLOPT_WRITEDATA, Ctx.get()); } curl_easy_setopt(Handle, CURLOPT_HEADERFUNCTION, AsyncCurlHeaderCallback); curl_easy_setopt(Handle, CURLOPT_HEADERDATA, Ctx.get()); // Stash TokenId on the curl handle so CheckCompleted can look up the // TransferContext directly from curl_multi_info_read's CURLMsg.easy_handle. curl_easy_setopt(Handle, CURLOPT_PRIVATE, reinterpret_cast(static_cast(Ctx->TokenId))); Ctx->CurlHandle = Handle; const uint64_t TokenIdLocal = Ctx->TokenId; // Try the curl_multi add first. On failure, Ctx is still owned locally so the // rollback is a single Release path with no map churn. On success, ownership // moves into the single TokenId-keyed lookup table. CURLMcode Mc = curl_multi_add_handle(m_Multi, Handle); if (Mc != CURLM_OK) { Ctx->CurlHandle = nullptr; ReleaseHandle(Handle); HttpClient::Response ErrorResponse; ErrorResponse.Error = HttpClient::ErrorContext{.ErrorCode = HttpClientErrorCode::kInternalError, .ErrorMessage = fmt::format("curl_multi_add_handle failed: {}", curl_multi_strerror(Mc))}; DispatchCallback(std::move(Ctx->Callback), std::move(ErrorResponse), *Ctx); OnSlotFreed(); return; } m_Transfers.emplace(TokenIdLocal, std::move(Ctx)); } // Telemetry only; this client does not gate fan-out. Callers (e.g. // S3AsyncStorage) layer their own admission semaphore on top. // Assert catches SubmitFromSpec/OnSlotFreed imbalance. void OnSlotFreed() { ZEN_ASSERT(m_InFlight > 0); --m_InFlight; } // curl_multi drives I/O via SocketCallback (which fds to watch) and TimerCallback (when to fire). // On each event we call curl_multi_socket_action() and drain via curl_multi_info_read(). // Static thunks: UserData = Impl* (set via CURLMOPT_SOCKETDATA / CURLMOPT_TIMERDATA). Bodies run on io thread. static int CurlSocketCallback(CURL* Easy, curl_socket_t Fd, int Action, void* UserPtr, void* SocketPtr) { auto* Self = static_cast(UserPtr); Self->OnCurlSocket(Easy, Fd, Action, static_cast(SocketPtr)); return 0; } static int CurlTimerCallback(CURLM* Multi, long TimeoutMs, void* UserPtr) { ZEN_UNUSED(Multi); auto* Self = static_cast(UserPtr); Self->OnCurlTimer(TimeoutMs); return 0; } // Async-specific HEADER callback. Appends raw "Key: Value\r\n" lines to // Ctx->HeaderArena (one growing buffer; ~zero allocs in the common case // where the initial reserve covers the response). Inline-parses // Content-Length and Content-Type unconditionally; parses ETag only when // Spec.WantEtag is set. No std::string/pair allocations per line. static size_t AsyncCurlHeaderCallback(char* Buffer, size_t Size, size_t Nmemb, void* UserData) { auto* Ctx = static_cast(UserData); const size_t TotalBytes = Size * Nmemb; std::string_view Line(Buffer, TotalBytes); Ctx->HeaderArena.append(Buffer, TotalBytes); while (!Line.empty() && (Line.back() == '\r' || Line.back() == '\n')) { Line.remove_suffix(1); } if (Line.empty()) { return TotalBytes; } const size_t Colon = Line.find(':'); if (Colon == std::string_view::npos) { return TotalBytes; // HTTP status line or malformed } std::string_view Key = Line.substr(0, Colon); std::string_view Value = Line.substr(Colon + 1); while (!Key.empty() && Key.back() == ' ') { Key.remove_suffix(1); } while (!Value.empty() && Value.front() == ' ') { Value.remove_prefix(1); } if (StrCaseEquals(Key, "Content-Length")) { uint64_t Length = 0; std::from_chars_result Res = std::from_chars(Value.data(), Value.data() + Value.size(), Length); if (Res.ec == std::errc{}) { Ctx->ContentLength = Length; Ctx->ContentLengthSet = true; } } else if (StrCaseEquals(Key, "Content-Type")) { Ctx->BodyContentType = ParseContentType(Value); } else if (Ctx->Spec.WantEtag && StrCaseEquals(Key, "ETag")) { Ctx->Etag.assign(Value); } return TotalBytes; } // Async-specific write callback. Targets a TransferContext directly. // Preallocates Body from Ctx->ContentLength (parsed in HEADER cb). If // Content-Length is absent (e.g. chunked encoding), falls back to // BodyChunks accumulation; CompleteTransfer flattens at the end. static size_t AsyncCurlWriteCallback(char* Ptr, size_t Size, size_t Nmemb, void* UserData) { auto* Ctx = static_cast(UserData); const size_t TotalBytes = Size * Nmemb; if (TotalBytes == 0) { return 0; } if (!Ctx->BodyPreallocated && Ctx->BodyWriteOffset == 0 && Ctx->BodyChunks.empty() && Ctx->ContentLengthSet && Ctx->ContentLength > 0) { Ctx->Body = IoBuffer(static_cast(Ctx->ContentLength)); Ctx->BodyPreallocated = true; } if (Ctx->BodyPreallocated) { if (Ctx->BodyWriteOffset + TotalBytes > Ctx->Body.GetSize()) { // Server sent more than Content-Length advertised; abort. return 0; } memcpy(static_cast(Ctx->Body.MutableData()) + Ctx->BodyWriteOffset, Ptr, TotalBytes); Ctx->BodyWriteOffset += TotalBytes; } else { IoBuffer Chunk(TotalBytes); memcpy(Chunk.MutableData(), Ptr, TotalBytes); Ctx->BodyChunks.push_back(std::move(Chunk)); } return TotalBytes; } // PutWithSource read callback. Pulls up to MaxBytes from Spec.OnReadSource // into curl's send buffer. Source closure runs on the io thread - same // strand discipline as Stream's OnData. Returning 0 with SourceOffset < // StreamingPutSize signals an upload abort to curl. static size_t AsyncCurlSourceReadCallback(char* Buffer, size_t Size, size_t Nmemb, void* UserData) { auto* Ctx = static_cast(UserData); const size_t MaxBytes = Size * Nmemb; if (MaxBytes == 0 || !Ctx->Spec.OnReadSource) { return 0; } // Catch at the curl boundary: user OnReadSource may call into IoBuffer // allocation, file reads, or async dispatch which can throw. Letting an // exception unwind through curl's C frames is UB. size_t Pulled = 0; try { Pulled = Ctx->Spec.OnReadSource(reinterpret_cast(Buffer), MaxBytes, Ctx->SourceOffset); } catch (const std::exception& Ex) { Ctx->CallbackFailed = true; Ctx->CallbackErrorMessage = fmt::format("upload source callback threw: {}", Ex.what()); return CURL_READFUNC_ABORT; } catch (...) { Ctx->CallbackFailed = true; Ctx->CallbackErrorMessage = "upload source callback threw unknown exception"; return CURL_READFUNC_ABORT; } if (Pulled == 0 && Ctx->SourceOffset < Ctx->Spec.StreamingPutSize) { return CURL_READFUNC_ABORT; } Ctx->SourceOffset += Pulled; return Pulled; } // Stream-method write callback. Hands each chunk to the caller's OnData // without allocating or copying. The pointer is curl's internal receive // buffer; valid only for the duration of this call. Caller's OnData runs // on the io thread, so blocking work (disk write etc) blocks the poll // loop. TotalSize comes from inline-parsed Content-Length (0 if absent / // chunked). static size_t AsyncCurlStreamWriteCallback(char* Ptr, size_t Size, size_t Nmemb, void* UserData) { auto* Ctx = static_cast(UserData); const size_t TotalBytes = Size * Nmemb; if (TotalBytes == 0) { return 0; } if (!Ctx->Spec.OnData) { // Stream method requires OnData by contract; reaching this branch // is API misuse. Returning TotalBytes (success) would silently // drop the body and report "ok"; fail loudly instead. Ctx->CallbackFailed = true; Ctx->CallbackErrorMessage = "stream request submitted without OnData callback"; return 0; } // Catch at the curl boundary so an exception inside the user OnData // (e.g. IoBuffer alloc failure, ScheduleWork rejection, ZEN_ASSERT in a // downstream pool) cannot propagate through curl's C frames. Stash on // Ctx so CompleteTransfer surfaces it as kInternalError. try { const bool ContinueTransfer = Ctx->Spec.OnData(reinterpret_cast(Ptr), TotalBytes, Ctx->ContentLength); return ContinueTransfer ? TotalBytes : 0; // returning 0 aborts } catch (const std::exception& Ex) { Ctx->CallbackFailed = true; Ctx->CallbackErrorMessage = fmt::format("stream data callback threw: {}", Ex.what()); return 0; } catch (...) { Ctx->CallbackFailed = true; Ctx->CallbackErrorMessage = "stream data callback threw unknown exception"; return 0; } } // Take ownership of socket close. CLOSESOCKETFUNCTION is invoked from // within curl_multi operations which run on the io thread, so direct map // access is safe. The asio tcp::socket destructor closes the fd; we // return 0 to tell curl the close succeeded. Letting curl close as well // would race (double-close, fd-reuse hazard) and on Windows IOCP // `release()` throws `operation_not_supported`, killing the io thread. static int CurlCloseSocketCallback(void* ClientPtr, curl_socket_t Fd) { auto* Self = static_cast(ClientPtr); auto It = Self->m_Sockets.find(Fd); if (It != Self->m_Sockets.end()) { asio::error_code Ec; It->second->Socket.cancel(Ec); Self->m_Sockets.erase(It); return 0; } // Fd not tracked (e.g. pre-poll-add or post-shutdown); close directly. #if ZEN_PLATFORM_WINDOWS ::closesocket(Fd); #else ::close(Fd); #endif return 0; } void SetupMultiCallbacks() { curl_multi_setopt(m_Multi, CURLMOPT_SOCKETFUNCTION, CurlSocketCallback); curl_multi_setopt(m_Multi, CURLMOPT_SOCKETDATA, this); curl_multi_setopt(m_Multi, CURLMOPT_TIMERFUNCTION, CurlTimerCallback); curl_multi_setopt(m_Multi, CURLMOPT_TIMERDATA, this); } // Called by curl when socket watch state changes --------------------- // Synthesize a transport-level failure for the easy handle currently bound // to the curl_multi entry that owns Fd. Used when the asio side cannot bind // the fd; without this the affected transfer would hang on curl's // connect/transfer timeout instead of failing fast with the real error. void FailEasyHandleForFd(CURL* Easy, std::string_view Reason) { if (!Easy) { return; } curl_multi_remove_handle(m_Multi, Easy); char* Private = nullptr; curl_easy_getinfo(Easy, CURLINFO_PRIVATE, &Private); const uint64_t TokenId = static_cast(reinterpret_cast(Private)); auto It = m_Transfers.find(TokenId); if (It == m_Transfers.end()) { ReleaseHandle(Easy); return; } std::unique_ptr Ctx = std::move(It->second); m_Transfers.erase(It); Ctx->CurlHandle = nullptr; ReleaseHandle(Easy); HttpClient::Response Resp; Resp.Error = HttpClient::ErrorContext{ .ErrorCode = HttpClientErrorCode::kInternalError, .ErrorMessage = std::string(Reason), }; DispatchCallback(std::move(Ctx->Callback), std::move(Resp), *Ctx); OnSlotFreed(); } void OnCurlSocket(CURL* Easy, curl_socket_t Fd, int Action, AsyncSocketInfo* Info) { if (Action == CURL_POLL_REMOVE) { if (Info) { // Cancel any pending async_wait but KEEP the AsyncSocketInfo // alive in m_Sockets. CURL_POLL_REMOVE only means "stop // watching this socket"; curl may still use the fd // (keep-alive reuse) and rewatch it later. The asio Socket // stays bound to the same IOCP it was first assigned to; // we never call release()+assign() on the same fd, which // avoided a race where in-flight async_wait callbacks // raced with curl_multi reading from the socket and // corrupted HTTP framing. The fd's actual close happens // in CurlCloseSocketCallback, which erases the entry and // lets the asio Socket destructor close. asio::error_code Ec; Info->Socket.cancel(Ec); Info->WatchFlags = 0; Info->PendingFlags = 0; } return; } if (!Info) { // CURL_POLL_IN/OUT with no Info attached. Two cases: // 1) brand new fd - emplace, assign, IOCP-bind. // 2) curl re-watching a kept-alive fd it earlier removed - // reuse the existing AsyncSocketInfo, no re-assign. auto It = m_Sockets.find(Fd); if (It == m_Sockets.end()) { auto [NewIt, _] = m_Sockets.emplace(Fd, std::make_unique(m_Strand)); It = NewIt; asio::error_code Ec; It->second->Socket.assign(asio::ip::tcp::v4(), Fd, Ec); if (Ec) { It->second->Socket.assign(asio::ip::tcp::v6(), Fd, Ec); } if (Ec) { std::string Reason = fmt::format("AsyncHttpClient: failed to assign socket fd {}: {}", static_cast(Fd), Ec.message()); ZEN_WARN("{}", Reason); m_Sockets.erase(It); FailEasyHandleForFd(Easy, Reason); return; } } Info = It->second.get(); curl_multi_assign(m_Multi, Fd, Info); } Info->WatchFlags = Action; SetSocketWatch(Fd, Info); } void SetSocketWatch(curl_socket_t Fd, AsyncSocketInfo* Info) { // Cancel only when a previously-watched direction is no longer wanted. // In the common path (one-shot async_wait completes, curl re-watches // the same flags) PendingFlags is a subset of WatchFlags and we just // re-arm the missing direction without touching CancelIoEx. const int Desired = Info->WatchFlags & (CURL_POLL_IN | CURL_POLL_OUT); if (Info->PendingFlags & ~Desired) { asio::error_code Ec; Info->Socket.cancel(Ec); Info->PendingFlags = 0; } const int ToAdd = Desired & ~Info->PendingFlags; if (ToAdd & CURL_POLL_IN) { Info->PendingFlags |= CURL_POLL_IN; Info->Socket.async_wait(asio::socket_base::wait_read, [this, Fd](const asio::error_code& Ec) { if (m_ShuttingDown) { return; } auto It = m_Sockets.find(Fd); if (It == m_Sockets.end()) { return; } It->second->PendingFlags &= ~CURL_POLL_IN; if (Ec) { if (Ec != asio::error::operation_aborted) { ZEN_DEBUG("AsyncHttpClient: read async_wait fd {} error: {}; signalling curl with CURL_CSELECT_ERR", static_cast(Fd), Ec.message()); OnSocketReady(Fd, CURL_CSELECT_ERR); } return; } OnSocketReady(Fd, CURL_CSELECT_IN); }); } if (ToAdd & CURL_POLL_OUT) { Info->PendingFlags |= CURL_POLL_OUT; Info->Socket.async_wait(asio::socket_base::wait_write, [this, Fd](const asio::error_code& Ec) { if (m_ShuttingDown) { return; } auto It = m_Sockets.find(Fd); if (It == m_Sockets.end()) { return; } It->second->PendingFlags &= ~CURL_POLL_OUT; if (Ec) { if (Ec != asio::error::operation_aborted) { ZEN_DEBUG("AsyncHttpClient: write async_wait fd {} error: {}; signalling curl with CURL_CSELECT_ERR", static_cast(Fd), Ec.message()); OnSocketReady(Fd, CURL_CSELECT_ERR); } return; } OnSocketReady(Fd, CURL_CSELECT_OUT); }); } } void OnSocketReady(curl_socket_t Fd, int CurlAction) { int StillRunning = 0; curl_multi_socket_action(m_Multi, Fd, CurlAction, &StillRunning); CheckCompleted(); // Re-arm the watch if the socket is still tracked. auto It = m_Sockets.find(Fd); if (It != m_Sockets.end()) { SetSocketWatch(Fd, It->second.get()); } } // Called by curl when it wants a timeout ------------------------------ void OnCurlTimer(long TimeoutMs) { m_Timer.cancel(); if (TimeoutMs < 0) { // curl says "no timeout needed" return; } if (TimeoutMs == 0) { // curl wants immediate action - run it on the next strand tick. asio::post(m_Strand, [this]() { if (m_ShuttingDown) { return; } int StillRunning = 0; curl_multi_socket_action(m_Multi, CURL_SOCKET_TIMEOUT, 0, &StillRunning); CheckCompleted(); }); return; } m_Timer.expires_after(std::chrono::milliseconds(TimeoutMs)); m_Timer.async_wait([this](const asio::error_code& Ec) { if (m_ShuttingDown) { return; } if (Ec) { if (Ec != asio::error::operation_aborted) { ZEN_DEBUG("AsyncHttpClient: curl multi timer error: {}", Ec.message()); } return; } ZEN_TRACE_CPU("AsyncHttpClient::OnTimeout"); int StillRunning = 0; curl_multi_socket_action(m_Multi, CURL_SOCKET_TIMEOUT, 0, &StillRunning); CheckCompleted(); }); } // Drain completed transfers from curl_multi -------------------------- void CheckCompleted() { int MsgsLeft = 0; CURLMsg* Msg = nullptr; while ((Msg = curl_multi_info_read(m_Multi, &MsgsLeft)) != nullptr) { if (Msg->msg != CURLMSG_DONE) { continue; } CURL* Handle = Msg->easy_handle; CURLcode Result = Msg->data.result; curl_multi_remove_handle(m_Multi, Handle); // Recover TokenId from CURLOPT_PRIVATE; cheaper than a per-handle // reverse map. Returns nullptr if option was never set. char* Private = nullptr; curl_easy_getinfo(Handle, CURLINFO_PRIVATE, &Private); const uint64_t TokenId = static_cast(reinterpret_cast(Private)); auto It = m_Transfers.find(TokenId); if (It == m_Transfers.end()) { ReleaseHandle(Handle); continue; } std::unique_ptr Ctx = std::move(It->second); m_Transfers.erase(It); CompleteTransfer(Handle, Result, std::move(Ctx)); } } // Mirrors CurlHttpClient::ShouldRetry semantics; keep the two in sync. static bool ShouldRetryAsync(CURLcode CurlResult, long StatusCode) { switch (CurlResult) { case CURLE_OK: break; case CURLE_COULDNT_CONNECT: case CURLE_RECV_ERROR: case CURLE_SEND_ERROR: case CURLE_OPERATION_TIMEDOUT: case CURLE_PARTIAL_FILE: return true; default: return false; } switch (static_cast(StatusCode)) { case HttpResponseCode::RequestTimeout: case HttpResponseCode::TooManyRequests: case HttpResponseCode::InternalServerError: case HttpResponseCode::BadGateway: case HttpResponseCode::ServiceUnavailable: case HttpResponseCode::GatewayTimeout: return true; default: return false; } } void CompleteTransfer(CURL* Handle, CURLcode CurlResult, std::unique_ptr Ctx) { ZEN_TRACE_CPU("AsyncHttpClient::CompleteTransfer"); // Free the in-flight counter before any retry / cancel branch. Retry // re-submits via SubmitFromSpec, which re-increments the counter for // the next attempt - keeping the assert balanced across retries. OnSlotFreed(); // Extract result info long StatusCode = 0; curl_easy_getinfo(Handle, CURLINFO_RESPONSE_CODE, &StatusCode); double Elapsed = 0; curl_easy_getinfo(Handle, CURLINFO_TOTAL_TIME, &Elapsed); curl_off_t UpBytes = 0; curl_easy_getinfo(Handle, CURLINFO_SIZE_UPLOAD_T, &UpBytes); curl_off_t DownBytes = 0; curl_easy_getinfo(Handle, CURLINFO_SIZE_DOWNLOAD_T, &DownBytes); ReleaseHandle(Handle); // Cancellation came in after curl ran but before we processed completion. // Synthesize a cancel response and skip retry. if (Ctx->Cancelled) { HttpClient::Response CancelResp; CancelResp.Error = HttpClient::ErrorContext{ .ErrorCode = HttpClientErrorCode::kRequestCancelled, .ErrorMessage = "Request canceled", }; DispatchCallback(std::move(Ctx->Callback), std::move(CancelResp), *Ctx); return; } // User OnData / OnReadSource threw inside curl. The transfer is already // aborted; surface the stashed exception text and skip retry (a callback // exception is not a transient transport failure). if (Ctx->CallbackFailed) { HttpClient::Response Resp; Resp.Error = HttpClient::ErrorContext{ .ErrorCode = HttpClientErrorCode::kInternalError, .ErrorMessage = std::move(Ctx->CallbackErrorMessage), }; DispatchCallback(std::move(Ctx->Callback), std::move(Resp), *Ctx); return; } // Retry path: re-issue from spec after backoff. Keeps the user callback // untouched so the eventual final result fires only once. if (!m_ShuttingDown && Ctx->AttemptCount < m_Settings.RetryCount && ShouldRetryAsync(CurlResult, StatusCode)) { ++Ctx->AttemptCount; const long BackoffMs = 100 * Ctx->AttemptCount; if (CurlResult != CURLE_OK) { ZEN_INFO("Retry (session: {}): HTTP error ({}) '{}' (Curl error: {}) Attempt {}/{}", m_SessionId, static_cast(MapCurlError(CurlResult)), curl_easy_strerror(CurlResult), static_cast(CurlResult), Ctx->AttemptCount, m_Settings.RetryCount + 1); } else { ZEN_INFO("Retry (session: {}): HTTP status ({}) '{}' Attempt {}/{}", m_SessionId, StatusCode, zen::ToString(HttpResponseCode(StatusCode)), Ctx->AttemptCount, m_Settings.RetryCount + 1); } // Stash the just-finished attempt's failure so the abandon path can // surface it instead of a generic "Request canceled (retry abandoned)". // Non-empty LastErrorMessage marks the stash valid. Ctx->LastCurlResult = CurlResult; Ctx->LastStatusCode = StatusCode; Ctx->LastErrorMessage = (CurlResult != CURLE_OK) ? std::string(curl_easy_strerror(CurlResult)) : std::string(zen::ToString(HttpResponseCode(StatusCode))); Ctx->ResetForRetry(); const uint64_t RetryTokenId = Ctx->TokenId; auto RetryTimer = std::make_shared(m_Strand); RetryTimer->expires_after(std::chrono::milliseconds(BackoffMs)); // Park Ctx + Timer in m_RetryingTransfers so HandleCancel can find // it and cancel the timer (early cancel without paying the full // backoff). The timer lambda re-claims Ctx through the map; if // HandleCancel got there first the entry is gone and the lambda // returns silently. auto [It, Inserted] = m_RetryingTransfers.emplace(RetryTokenId, RetryEntry{std::move(Ctx), RetryTimer}); ZEN_ASSERT(Inserted); // Capture weak_from_this() rather than raw `this`. With an external // io_context, Cleanup cancels the timer but the cancellation handler // is queued on the caller's loop and may fire AFTER ~Impl runs. The // owned-context path drains via restart()+poll() before destruction // so this is moot there, but the weak ref is the cheapest way to // keep both paths safe. RetryTimer->async_wait([Self = weak_from_this(), RetryTokenId](const asio::error_code& Ec) { auto Locked = Self.lock(); if (!Locked) { return; } Impl& Me = *Locked; auto It = Me.m_RetryingTransfers.find(RetryTokenId); if (It == Me.m_RetryingTransfers.end()) { // HandleCancel already removed the entry and dispatched the // cancel callback. return; } std::unique_ptr Ctx = std::move(It->second.Ctx); Me.m_RetryingTransfers.erase(It); if (Ec || Me.m_ShuttingDown) { // Retry abandoned by timer cancellation or client shutdown. // Surface the underlying failure (stashed pre-backoff) so // the caller can distinguish a real timeout/throttle from a // shutdown / cancel race. HttpClient::Response Resp; if (!Ctx->LastErrorMessage.empty()) { Resp.StatusCode = HttpResponseCode(Ctx->LastStatusCode); Resp.Error = HttpClient::ErrorContext{ .ErrorCode = (Ctx->LastCurlResult != CURLE_OK) ? MapCurlError(Ctx->LastCurlResult) : HttpClientErrorCode::kOtherError, .ErrorMessage = fmt::format("Request canceled (retry abandoned after: {})", Ctx->LastErrorMessage), }; } else { Resp.Error = HttpClient::ErrorContext{ .ErrorCode = HttpClientErrorCode::kRequestCancelled, .ErrorMessage = "Request canceled (retry abandoned)", }; } Me.DispatchCallback(std::move(Ctx->Callback), std::move(Resp), *Ctx); return; } Me.SubmitFromSpec(std::move(Ctx)); }); return; } // Build response HttpClient::Response Response; Response.StatusCode = HttpResponseCode(StatusCode); Response.UploadedBytes = static_cast(UpBytes); Response.DownloadedBytes = static_cast(DownBytes); Response.ElapsedSeconds = Elapsed; // Hand the raw arena over - FindHeader scans this lazily. Build the // parsed KeyValueMap only when caller explicitly asks (rare). Response.HeaderArena = std::move(Ctx->HeaderArena); if (Ctx->Spec.WantHeaderMap) { std::string_view View(Response.HeaderArena); while (!View.empty()) { const size_t LineEnd = View.find('\n'); std::string_view Line = LineEnd == std::string_view::npos ? View : View.substr(0, LineEnd); View = LineEnd == std::string_view::npos ? std::string_view{} : View.substr(LineEnd + 1); while (!Line.empty() && (Line.back() == '\r' || Line.back() == '\n')) { Line.remove_suffix(1); } if (auto Header = ParseHeaderLine(Line)) { Response.Header->insert_or_assign(std::string(Header->first), std::string(Header->second)); } } } // Helper: produce the response IoBuffer from whichever Body path was // taken. Preallocated path moves with zero copy; chunked fallback // moves the single chunk or flattens N chunks into one allocation. auto BuildResponsePayload = [&]() -> IoBuffer { if (Ctx->BodyPreallocated) { IoBuffer Out = std::move(Ctx->Body); if (Ctx->BodyWriteOffset != Out.GetSize()) { // Server closed early - return a non-owning sub-buffer over // the actually-received prefix. Sub-buffer holds a ref to // Out's core so the underlying allocation stays alive; no // memcpy. return IoBuffer(Out, 0, Ctx->BodyWriteOffset); } return Out; } if (Ctx->BodyChunks.size() == 1) { return std::move(Ctx->BodyChunks[0]); } if (!Ctx->BodyChunks.empty()) { // Flatten N chunks into one IoBuffer; single alloc avoids a copy chain. size_t Total = 0; for (const IoBuffer& C : Ctx->BodyChunks) { Total += C.GetSize(); } IoBuffer Out(Total); uint8_t* Dst = static_cast(Out.MutableData()); for (const IoBuffer& C : Ctx->BodyChunks) { memcpy(Dst, C.GetData(), C.GetSize()); Dst += C.GetSize(); } return Out; } return IoBuffer{}; }; const bool HasBody = Ctx->BodyPreallocated ? Ctx->BodyWriteOffset > 0 : !Ctx->BodyChunks.empty(); if (CurlResult != CURLE_OK) { const char* ErrorMsg = curl_easy_strerror(CurlResult); if (CurlResult != CURLE_OPERATION_TIMEDOUT && CurlResult != CURLE_COULDNT_CONNECT && CurlResult != CURLE_ABORTED_BY_CALLBACK) { ZEN_WARN("AsyncHttpClient failure: token={} {} '{}': ({}) '{}'", Ctx->TokenId, AsyncRequestMethodName(Ctx->Spec.Method), Ctx->Spec.Url, static_cast(CurlResult), ErrorMsg); } if (HasBody) { Response.ResponsePayload = BuildResponsePayload(); } Response.Error = HttpClient::ErrorContext{.ErrorCode = MapCurlError(CurlResult), .ErrorMessage = std::string(ErrorMsg)}; } else if (StatusCode == static_cast(HttpResponseCode::NoContent) || !HasBody) { // No payload } else { IoBuffer PayloadBuffer = BuildResponsePayload(); if (Ctx->BodyContentType != ZenContentType::kUnknownContentType) { PayloadBuffer.SetContentType(Ctx->BodyContentType); } const HttpResponseCode Code = HttpResponseCode(StatusCode); if (!IsHttpSuccessCode(Code) && Code != HttpResponseCode::NotFound) { ZEN_WARN("AsyncHttpClient request failed: token={} {} '{}': status={}", Ctx->TokenId, AsyncRequestMethodName(Ctx->Spec.Method), Ctx->Spec.Url, static_cast(Code)); } Response.ResponsePayload = std::move(PayloadBuffer); } // Token reaches terminal state. Ctx (and its embedded TokenState) is // destroyed when this scope ends; late Cancel() calls find no entry in // m_Transfers and become no-ops. DispatchCallback(std::move(Ctx->Callback), std::move(Response), *Ctx); } // -- Async verb implementations -------------------------------------- AsyncRequestToken Submit(std::unique_ptr Ctx) { // Allocate token ID + state up front so callers can cancel before the // posted submit even runs. Token::State is shared between the user-held // AsyncRequestToken and the TransferContext (no separate strand-side map). const uint64_t Id = m_NextTokenId.fetch_add(1, std::memory_order_relaxed); Ctx->TokenId = Id; auto State = std::make_shared(); State->CancelFn = [WeakSelf = weak_from_this(), Id]() { auto Self = WeakSelf.lock(); if (!Self) { return; } asio::post(Self->m_Strand, [Self, Id]() { Self->HandleCancel(Id); }); }; Ctx->TokenStateRef = State; asio::post(m_Strand, [this, Ctx = std::move(Ctx)]() mutable { if (m_ShuttingDown) { HttpClient::Response Resp; Resp.Error = HttpClient::ErrorContext{ .ErrorCode = HttpClientErrorCode::kRequestCancelled, .ErrorMessage = "Request canceled (client shutting down)", }; DispatchCallback(std::move(Ctx->Callback), std::move(Resp), *Ctx); return; } SubmitFromSpec(std::move(Ctx)); }); return AsyncRequestToken(std::move(State)); } void HandleCancel(uint64_t Id) { auto It = m_Transfers.find(Id); if (It != m_Transfers.end()) { std::unique_ptr Ctx = std::move(It->second); m_Transfers.erase(It); Ctx->Cancelled = true; if (Ctx->CurlHandle) { curl_multi_remove_handle(m_Multi, Ctx->CurlHandle); ReleaseHandle(Ctx->CurlHandle); Ctx->CurlHandle = nullptr; } HttpClient::Response CancelResp; CancelResp.Error = HttpClient::ErrorContext{ .ErrorCode = HttpClientErrorCode::kRequestCancelled, .ErrorMessage = "Request canceled", }; DispatchCallback(std::move(Ctx->Callback), std::move(CancelResp), *Ctx); OnSlotFreed(); return; } // Cancel landed during the retry backoff: take the parked Ctx + Timer, // cancel the timer (the in-flight async_wait fires later with // operation_aborted but finds no entry, so it's a no-op), and dispatch // the cancel callback now so the user observes immediate cancellation // rather than waiting out the backoff. auto RetIt = m_RetryingTransfers.find(Id); if (RetIt != m_RetryingTransfers.end()) { std::unique_ptr Ctx = std::move(RetIt->second.Ctx); std::shared_ptr Timer = std::move(RetIt->second.Timer); m_RetryingTransfers.erase(RetIt); Timer->cancel(); HttpClient::Response CancelResp; CancelResp.Error = HttpClient::ErrorContext{ .ErrorCode = HttpClientErrorCode::kRequestCancelled, .ErrorMessage = "Request canceled", }; DispatchCallback(std::move(Ctx->Callback), std::move(CancelResp), *Ctx); return; } // Cancel landed before SubmitFromSpec ran (Cancel posted between Submit // returning and the io thread executing the posted SubmitFromSpec). // State.Cancelled has already been set by Cancel(); SubmitFromSpec // checks it and synthesizes the cancel callback when the transfer // eventually arrives. Nothing to do here. } AsyncRequestToken DoAsyncGet(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader, HttpClient::KeyValueMap Parameters) { auto Ctx = std::make_unique(std::move(Callback)); Ctx->Spec.Method = AsyncRequestMethod::Get; Ctx->Spec.Url = std::move(Url); Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader); Ctx->Spec.Parameters = std::move(Parameters); return Submit(std::move(Ctx)); } AsyncRequestToken DoAsyncHead(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader) { auto Ctx = std::make_unique(std::move(Callback)); Ctx->Spec.Method = AsyncRequestMethod::Head; Ctx->Spec.Url = std::move(Url); Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader); return Submit(std::move(Ctx)); } AsyncRequestToken DoAsyncDelete(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader) { auto Ctx = std::make_unique(std::move(Callback)); Ctx->Spec.Method = AsyncRequestMethod::Delete; Ctx->Spec.Url = std::move(Url); Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader); return Submit(std::move(Ctx)); } AsyncRequestToken DoAsyncPost(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader, HttpClient::KeyValueMap Parameters) { auto Ctx = std::make_unique(std::move(Callback)); Ctx->Spec.Method = AsyncRequestMethod::Post; Ctx->Spec.Url = std::move(Url); Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader); Ctx->Spec.Parameters = std::move(Parameters); return Submit(std::move(Ctx)); } AsyncRequestToken DoAsyncPostWithPayload(std::string Url, IoBuffer Payload, ZenContentType ContentType, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader) { auto Ctx = std::make_unique(std::move(Callback)); Ctx->Spec.Method = AsyncRequestMethod::PostWithPayload; Ctx->Spec.Url = std::move(Url); Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader); Ctx->Spec.Payload = std::move(Payload); Ctx->Spec.ContentType = ContentType; Ctx->Spec.HasContentType = true; return Submit(std::move(Ctx)); } AsyncRequestToken DoAsyncPutWithPayload(std::string Url, IoBuffer Payload, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader, HttpClient::KeyValueMap Parameters) { auto Ctx = std::make_unique(std::move(Callback)); Ctx->Spec.Method = AsyncRequestMethod::PutWithPayload; Ctx->Spec.Url = std::move(Url); Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader); Ctx->Spec.Parameters = std::move(Parameters); Ctx->Spec.Payload = std::move(Payload); return Submit(std::move(Ctx)); } AsyncRequestToken DoAsyncPutNoPayload(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader, HttpClient::KeyValueMap Parameters) { auto Ctx = std::make_unique(std::move(Callback)); Ctx->Spec.Method = AsyncRequestMethod::Put; Ctx->Spec.Url = std::move(Url); Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader); Ctx->Spec.Parameters = std::move(Parameters); return Submit(std::move(Ctx)); } AsyncRequestToken DoAsyncPutWithSource(std::string Url, uint64_t TotalSize, AsyncHttpReadSource Source, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader) { auto Ctx = std::make_unique(std::move(Callback)); Ctx->Spec.Method = AsyncRequestMethod::PutWithSource; Ctx->Spec.Url = std::move(Url); Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader); Ctx->Spec.OnReadSource = std::move(Source); Ctx->Spec.StreamingPutSize = TotalSize; return Submit(std::move(Ctx)); } AsyncRequestToken DoAsyncStream(std::string Url, AsyncHttpDataCallback OnData, AsyncHttpCallback OnComplete, HttpClient::KeyValueMap AdditionalHeader, HttpClient::KeyValueMap Parameters) { auto Ctx = std::make_unique(std::move(OnComplete)); Ctx->Spec.Method = AsyncRequestMethod::Stream; Ctx->Spec.Url = std::move(Url); Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader); Ctx->Spec.Parameters = std::move(Parameters); Ctx->Spec.OnData = std::move(OnData); return Submit(std::move(Ctx)); } // -- Members --------------------------------------------------------- std::string m_BaseUri; HttpClientSettings m_Settings; LoggerRef m_Log; std::string m_SessionId; std::string m_UnixSocketPathUtf8; // io_context: either privately owned (m_OwnedIoContext non-null + we spin // m_IoThread + hold m_WorkGuard) or supplied by the caller (m_OwnedIoContext // null; caller drives the loop). Declared before m_Strand so the strand // can bind its executor from m_IoContext during member init. std::unique_ptr m_OwnedIoContext; asio::io_context& m_IoContext; // Single serialization point for every curl_multi operation, every async // completion handler, and every Cleanup call. Declared after m_IoContext // (so make_strand can read its executor) and before m_Timer (which binds // to the strand). asio::strand m_Strand; std::optional> m_WorkGuard; std::thread m_IoThread; // Strand-bound; async_wait completions land back on m_Strand. asio::steady_timer m_Timer; CURLM* m_Multi = nullptr; // Single TokenId-keyed map. CurlHandle lives in TransferContext; reverse // lookup from CURL* uses CURLOPT_PRIVATE (set in SubmitTransfer). std::unordered_map> m_Transfers; // Transfers parked between curl-side completion and the next attempt's // SubmitFromSpec. Lookups by TokenId let HandleCancel cancel the backoff // timer without paying the full delay. struct RetryEntry { std::unique_ptr Ctx; std::shared_ptr Timer; }; std::unordered_map m_RetryingTransfers; std::vector m_HandlePool; std::unordered_map> m_Sockets; uint32_t m_InFlight = 0; // telemetry only; storage layer caps fan-out std::atomic m_ShuttingDown{false}; HttpClientAccessToken m_CachedAccessToken; std::atomic m_NextTokenId{1}; std::atomic m_ShutdownDone{false}; }; ////////////////////////////////////////////////////////////////////////// // // AsyncHttpClient public API AsyncHttpClient::AsyncHttpClient(std::string_view BaseUri, const HttpClientSettings& Settings) : m_Impl(std::make_shared(BaseUri, Settings)) { } AsyncHttpClient::AsyncHttpClient(std::string_view BaseUri, asio::io_context& IoContext, const HttpClientSettings& Settings) : m_Impl(std::make_shared(BaseUri, IoContext, Settings)) { } AsyncHttpClient::~AsyncHttpClient() { // Drive teardown synchronously while we're guaranteed to be on a user // thread (not the io thread). Joining and draining here ensures any // posted lambdas that captured a shared_ptr by value (e.g. // Cancel after the transfer completed) are destroyed and release // their refs before m_Impl drops; otherwise the cycle "lambda holds // Impl ref / lambda lives in io_context owned by Impl" would pin Impl // alive forever and leak curl_multi + socket handles. if (m_Impl) { m_Impl->Shutdown(); } } // -- Callback-based API -------------------------------------------------- AsyncRequestToken AsyncHttpClient::AsyncGet(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { return m_Impl->DoAsyncGet(std::string(Url), std::move(Callback), AdditionalHeader, Parameters); } AsyncRequestToken AsyncHttpClient::AsyncHead(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader) { return m_Impl->DoAsyncHead(std::string(Url), std::move(Callback), AdditionalHeader); } AsyncRequestToken AsyncHttpClient::AsyncDelete(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader) { return m_Impl->DoAsyncDelete(std::string(Url), std::move(Callback), AdditionalHeader); } AsyncRequestToken AsyncHttpClient::AsyncPost(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { return m_Impl->DoAsyncPost(std::string(Url), std::move(Callback), AdditionalHeader, Parameters); } AsyncRequestToken AsyncHttpClient::AsyncPost(std::string_view Url, const IoBuffer& Payload, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader) { return m_Impl->DoAsyncPostWithPayload(std::string(Url), Payload, Payload.GetContentType(), std::move(Callback), AdditionalHeader); } AsyncRequestToken AsyncHttpClient::AsyncPost(std::string_view Url, const IoBuffer& Payload, ZenContentType ContentType, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader) { return m_Impl->DoAsyncPostWithPayload(std::string(Url), Payload, ContentType, std::move(Callback), AdditionalHeader); } AsyncRequestToken AsyncHttpClient::AsyncPut(std::string_view Url, const IoBuffer& Payload, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { return m_Impl->DoAsyncPutWithPayload(std::string(Url), Payload, std::move(Callback), AdditionalHeader, Parameters); } AsyncRequestToken AsyncHttpClient::AsyncPut(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { return m_Impl->DoAsyncPutNoPayload(std::string(Url), std::move(Callback), AdditionalHeader, Parameters); } AsyncRequestToken AsyncHttpClient::AsyncPut(std::string_view Url, uint64_t TotalSize, AsyncHttpReadSource Source, AsyncHttpCallback OnComplete, const KeyValueMap& AdditionalHeader) { return m_Impl->DoAsyncPutWithSource(std::string(Url), TotalSize, std::move(Source), std::move(OnComplete), AdditionalHeader); } AsyncRequestToken AsyncHttpClient::AsyncStream(std::string_view Url, AsyncHttpDataCallback OnData, AsyncHttpCallback OnComplete, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { return m_Impl->DoAsyncStream(std::string(Url), std::move(OnData), std::move(OnComplete), AdditionalHeader, Parameters); } // -- Token cancellation -------------------------------------------------- void AsyncRequestToken::Cancel() { if (!m_State) { return; } if (m_State->Cancelled.exchange(true, std::memory_order_acq_rel)) { return; // already cancelled } if (m_State->CancelFn) { m_State->CancelFn(); } } // -- Future-based API ---------------------------------------------------- std::future AsyncHttpClient::Get(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { auto Promise = std::make_shared>(); auto Future = Promise->get_future(); AsyncGet( Url, [Promise](Response R) { Promise->set_value(std::move(R)); }, AdditionalHeader, Parameters); return Future; } std::future AsyncHttpClient::Head(std::string_view Url, const KeyValueMap& AdditionalHeader) { auto Promise = std::make_shared>(); auto Future = Promise->get_future(); AsyncHead( Url, [Promise](Response R) { Promise->set_value(std::move(R)); }, AdditionalHeader); return Future; } std::future AsyncHttpClient::Delete(std::string_view Url, const KeyValueMap& AdditionalHeader) { auto Promise = std::make_shared>(); auto Future = Promise->get_future(); AsyncDelete( Url, [Promise](Response R) { Promise->set_value(std::move(R)); }, AdditionalHeader); return Future; } std::future AsyncHttpClient::Post(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { auto Promise = std::make_shared>(); auto Future = Promise->get_future(); AsyncPost( Url, [Promise](Response R) { Promise->set_value(std::move(R)); }, AdditionalHeader, Parameters); return Future; } std::future AsyncHttpClient::Post(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader) { auto Promise = std::make_shared>(); auto Future = Promise->get_future(); AsyncPost( Url, Payload, [Promise](Response R) { Promise->set_value(std::move(R)); }, AdditionalHeader); return Future; } std::future AsyncHttpClient::Post(std::string_view Url, const IoBuffer& Payload, ZenContentType ContentType, const KeyValueMap& AdditionalHeader) { auto Promise = std::make_shared>(); auto Future = Promise->get_future(); AsyncPost( Url, Payload, ContentType, [Promise](Response R) { Promise->set_value(std::move(R)); }, AdditionalHeader); return Future; } std::future AsyncHttpClient::Put(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { auto Promise = std::make_shared>(); auto Future = Promise->get_future(); AsyncPut( Url, Payload, [Promise](Response R) { Promise->set_value(std::move(R)); }, AdditionalHeader, Parameters); return Future; } std::future AsyncHttpClient::Put(std::string_view Url, const KeyValueMap& Parameters) { auto Promise = std::make_shared>(); auto Future = Promise->get_future(); AsyncPut( Url, [Promise](Response R) { Promise->set_value(std::move(R)); }, KeyValueMap{}, Parameters); return Future; } } // namespace zen