diff options
Diffstat (limited to 'src/zenhttp/clients/asynchttpclient.cpp')
| -rw-r--r-- | src/zenhttp/clients/asynchttpclient.cpp | 1947 |
1 files changed, 1539 insertions, 408 deletions
diff --git a/src/zenhttp/clients/asynchttpclient.cpp b/src/zenhttp/clients/asynchttpclient.cpp index ea88fc783..e7e904f89 100644 --- a/src/zenhttp/clients/asynchttpclient.cpp +++ b/src/zenhttp/clients/asynchttpclient.cpp @@ -4,8 +4,11 @@ #include "httpclientcurlhelpers.h" +#include <zencore/basicfile.h> #include <zencore/filesystem.h> +#include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/scopeguard.h> #include <zencore/session.h> #include <zencore/thread.h> #include <zencore/trace.h> @@ -15,6 +18,9 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <asio/steady_timer.hpp> ZEN_THIRD_PARTY_INCLUDES_END +#include <algorithm> +#include <charconv> +#include <deque> #include <thread> #include <unordered_map> @@ -22,45 +28,204 @@ namespace zen { ////////////////////////////////////////////////////////////////////////// // +// AsyncRequestToken state (forward-declared in the public header so tokens +// can be returned by value without leaking impl details). + +struct AsyncRequestToken::State +{ + std::function<void()> CancelFn; + std::atomic<bool> Cancelled = false; +}; + +////////////////////////////////////////////////////////////////////////// +// // TransferContext: per-transfer state associated with each CURL easy handle +// Request blueprint kept alongside each transfer so retries can re-issue with +// the original verb/url/headers/payload after the previous attempt's transient +// failure. +enum class AsyncRequestMethod +{ + Get, + Head, + Delete, + Post, + PostWithPayload, + Put, + PutWithPayload, + PutWithSource, // PUT, body pulled via OnReadSource (no materialized payload) + Stream, // GET, response body delivered via OnData callback (no copy) +}; + +inline std::string_view +AsyncRequestMethodName(AsyncRequestMethod M) +{ + switch (M) + { + case AsyncRequestMethod::Get: + return "GET"; + case AsyncRequestMethod::Head: + return "HEAD"; + case AsyncRequestMethod::Delete: + return "DELETE"; + case AsyncRequestMethod::Post: + return "POST"; + case AsyncRequestMethod::PostWithPayload: + return "POST(payload)"; + case AsyncRequestMethod::Put: + return "PUT"; + case AsyncRequestMethod::PutWithPayload: + return "PUT(payload)"; + case AsyncRequestMethod::PutWithSource: + return "PUT(stream)"; + case AsyncRequestMethod::Stream: + return "GET(stream)"; + } + return "?"; +} + +struct AsyncRequestSpec +{ + AsyncRequestMethod Method = AsyncRequestMethod::Get; + std::string Url; + HttpClient::KeyValueMap AdditionalHeader; + HttpClient::KeyValueMap Parameters; + IoBuffer Payload; // POST/PUT with payload + ZenContentType ContentType = ZenContentType::kUnknownContentType; + bool HasContentType = false; + AsyncHttpDataCallback OnData; // Stream method + AsyncHttpReadSource OnReadSource; // PutWithSource method + uint64_t StreamingPutSize = 0; // Content-Length for PutWithSource + + // Opt-in header capture. By default Response::Header is left empty; inline + // extracts always run because they steer the body path. WantHeaderMap pays + // O(headers) string allocs on the io thread (rare - most callers use + // Response::FindHeader). WantEtag triggers an inline parse only. + bool WantEtag = false; + bool WantHeaderMap = false; +}; + struct TransferContext { - AsyncHttpCallback Callback; - std::string Body; - std::vector<std::pair<std::string, std::string>> ResponseHeaders; - CurlWriteCallbackData WriteData; - CurlHeaderCallbackData HeaderData; - curl_slist* HeaderList = nullptr; - - // For PUT/POST with payload: keep the data alive until transfer completes + AsyncHttpCallback Callback; + AsyncRequestSpec Spec; + uint8_t AttemptCount = 0; + uint64_t TokenId = 0; + bool Cancelled = false; + // Curl handle owning this transfer once submitted to curl_multi. Null between + // Submit() and SubmitFromSpec(), and again after CompleteTransfer releases the + // handle back to the pool. + CURL* CurlHandle = nullptr; + // Strong reference to the user-facing AsyncRequestToken::State. Kept alive + // so AsyncRequestToken::Cancel() retains a valid CancelFn target until the + // transfer completes. The typed pointer also lets SubmitFromSpec read + // State.Cancelled to honour cancels that arrived between Submit() returning + // and the io thread running SubmitFromSpec. + std::shared_ptr<AsyncRequestToken::State> TokenStateRef; + // Two paths gated by BodyPreallocated: when Content-Length is known we + // fill Body in place (zero-copy move into Response); otherwise BodyChunks + // accumulates per-WRITE IoBuffers, flattened at completion. + IoBuffer Body; + uint64_t BodyWriteOffset = 0; + bool BodyPreallocated = false; + std::vector<IoBuffer> BodyChunks; + + // Raw response headers, "Key: Value\r\n" lines as delivered by curl. + // One growing buffer; reserve covers common case (~1 KiB) with no realloc. + std::string HeaderArena; + + // Captured at the curl callback boundary so an exception out of the user + // OnData / OnReadSource never propagates through curl's C frames (UB). + // Surfaced as a kInternalError response by CompleteTransfer. + bool CallbackFailed = false; + std::string CallbackErrorMessage; + + // Inline-parsed in CurlHeaderCallback. Etag is populated only when + // Spec.WantEtag is set; the others are always parsed since they steer + // the body path / content-type tagging. + uint64_t ContentLength = 0; + bool ContentLengthSet = false; + ZenContentType BodyContentType = ZenContentType::kUnknownContentType; + std::string Etag; + + curl_slist* HeaderList = nullptr; + IoBuffer PayloadBuffer; CurlReadCallbackData ReadData; + uint64_t SourceOffset = 0; // PutWithSource: bytes pulled from Spec.OnReadSource so far. + + // Last attempt's failure, kept across the backoff so a retry-abandoned + // path can surface the underlying cause instead of a generic + // "Request canceled (retry abandoned)". Non-empty LastErrorMessage = stash valid. + CURLcode LastCurlResult = CURLE_OK; + long LastStatusCode = 0; + std::string LastErrorMessage; + + TransferContext(AsyncHttpCallback&& InCallback) : Callback(std::move(InCallback)) { HeaderArena.reserve(1024); } + + ~TransferContext() { FreeHeaderList(); } + + TransferContext(const TransferContext&) = delete; + TransferContext& operator=(const TransferContext&) = delete; - TransferContext(AsyncHttpCallback&& InCallback) : Callback(std::move(InCallback)) + // Reset accumulated response state so the same context can be re-submitted + // for a retry attempt. + void ResetForRetry() { - WriteData.Body = &Body; - HeaderData.Headers = &ResponseHeaders; + Body = IoBuffer{}; + BodyWriteOffset = 0; + BodyPreallocated = false; + BodyChunks.clear(); + HeaderArena.clear(); // keep capacity + ContentLength = 0; + ContentLengthSet = false; + BodyContentType = ZenContentType::kUnknownContentType; + Etag.clear(); + FreeHeaderList(); + ReadData = {}; + SourceOffset = 0; + CallbackFailed = false; + CallbackErrorMessage.clear(); + // LastCurlResult / LastStatusCode / LastErrorMessage are intentionally NOT + // cleared - they describe the just-finished attempt that triggered this + // retry, and surface in the abandon path if the next attempt is cancelled + // or shutdown-aborted. } - ~TransferContext() + void FreeHeaderList() { if (HeaderList) { curl_slist_free_all(HeaderList); + HeaderList = nullptr; } } - - TransferContext(const TransferContext&) = delete; - TransferContext& operator=(const TransferContext&) = delete; }; ////////////////////////////////////////////////////////////////////////// // -// AsyncHttpClient::Impl +// SocketInfo: per-socket state. -struct AsyncHttpClient::Impl +struct AsyncSocketInfo { + asio::ip::tcp::socket Socket; + int WatchFlags = 0; // CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT + int PendingFlags = 0; // directions with outstanding async_wait + + // Bound to the strand executor so async_wait completions are serialized on + // the same strand that drives curl_multi - safe even when the underlying + // io_context is multithreaded (external-context mode). + explicit AsyncSocketInfo(const asio::strand<asio::io_context::executor_type>& Strand) : Socket(Strand) {} +}; + +// Holds the curl_multi instance and a strand that serializes every curl_multi op. Owned io_context +// (default ctor) spins a private thread driving run(); external io_context mode lets the caller +// drive the loop. The strand is the single serialization point for both modes. +struct AsyncHttpClient::Impl : std::enable_shared_from_this<AsyncHttpClient::Impl> +{ + // Owned-io_context ctor: allocate a private io_context and run it on a + // dedicated thread. Cleanest path for callers that just want an + // AsyncHttpClient and don't care about the loop. Impl(std::string_view BaseUri, const HttpClientSettings& Settings) : m_BaseUri(BaseUri) , m_Settings(Settings) @@ -72,19 +237,33 @@ struct AsyncHttpClient::Impl { Init(); m_WorkGuard.emplace(m_IoContext.get_executor()); - m_IoThread = std::thread([this]() { - SetCurrentThreadName("async_http"); - try - { - m_IoContext.run(); - } - catch (const std::exception& Ex) + + auto ThreadGuard = MakeGuard([this]() { + m_WorkGuard.reset(); + if (m_IoThread.joinable()) { - ZEN_ERROR("AsyncHttpClient: unhandled exception in io thread: {}", Ex.what()); + m_IoThread.join(); } }); + m_IoThread = std::thread([this]() { + SetCurrentThreadName("async_http"); + try + { + m_IoContext.run(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("AsyncHttpClient: io thread unhandled exception: {}", Ex.what()); + } + }); + ThreadGuard.Dismiss(); } + // External-io_context ctor: caller drives the run loop. We do NOT spawn a + // thread, do NOT hold a work guard, and do NOT call stop()/restart() on + // teardown - the caller's lifecycle owns those. Shutdown blocks on a + // promise until our cleanup handler runs through the strand, so the + // caller MUST keep the loop running until the AsyncHttpClient destructs. Impl(std::string_view BaseUri, asio::io_context& IoContext, const HttpClientSettings& Settings) : m_BaseUri(BaseUri) , m_Settings(Settings) @@ -96,70 +275,167 @@ struct AsyncHttpClient::Impl Init(); } - ~Impl() + ~Impl() { Shutdown(); } + + // Synchronous teardown from ~AsyncHttpClient. Idempotent. Branches by ownership: + // - Owned io_context: post Cleanup, drop the work guard, force run() to return, join the io + // thread, then poll() to drain lambdas that captured shared_ptr<Impl> (the "lambda in + // io_context owned by Impl" cycle would otherwise pin Impl alive forever). + // - External io_context: caller drives the loop, post Cleanup to the strand and block on a + // promise. Destroying from the io thread itself would deadlock. + void Shutdown() { - // Clean up curl state on the strand where all curl_multi operations - // are serialized. Use a promise to block until the cleanup handler - // has actually executed - essential for the external io_context case - // where we don't own the run loop. - std::promise<void> Done; - std::future<void> DoneFuture = Done.get_future(); - - asio::post(m_Strand, [this, &Done]() { - m_ShuttingDown = true; - m_Timer.cancel(); - - // Release all tracked sockets (don't close - curl owns the fds). - for (auto& [Fd, Info] : m_Sockets) - { - if (Info->Socket.is_open()) - { - Info->Socket.cancel(); - Info->Socket.release(); - } - } - m_Sockets.clear(); + if (m_ShutdownDone.exchange(true, std::memory_order_acq_rel)) + { + return; + } - for (auto& [Handle, Ctx] : m_Transfers) + if (m_OwnedIoContext) + { + // Post Cleanup before releasing the work guard so the io thread + // runs Cleanup before run() returns. Run() normally exits when + // the queue is empty AND no work guard is held. + asio::post(m_Strand, [this]() { Cleanup(); }); + m_WorkGuard.reset(); + // Belt-and-suspenders: force run() to return even if asio's + // outstanding_work_ counter is left non-zero by a close-during- + // cancel race in win_iocp_socket_service. The race is observable + // as a hung join() at shutdown after a burst of socket teardowns + // inside curl_multi_cleanup; stop() here is purely a safety net + // for the teardown path. The trailing restart() + poll() drains + // any handlers stop() leaves undispatched. + m_IoContext.stop(); + if (m_IoThread.joinable()) { - curl_multi_remove_handle(m_Multi, Handle); - curl_easy_cleanup(Handle); + m_IoThread.join(); } - m_Transfers.clear(); - for (CURL* Handle : m_HandlePool) + // Drain any leftover work posted to the io_context but not run + // before the thread exited (e.g. a Cancel-after-completion lambda + // that captured shared_ptr<Impl> by value). Loop until the queue + // is empty: a single poll() is one quanta and may not drain + // handlers that themselves post follow-ups. + m_IoContext.restart(); + while (m_IoContext.poll() != 0) { - curl_easy_cleanup(Handle); } - m_HandlePool.clear(); - - Done.set_value(); - }); + } + else + { + // External: block on the cleanup handler so we can guarantee + // curl_multi state is gone before m_Impl drops. + std::promise<void> Done; + std::future<void> DoneFuture = Done.get_future(); + asio::post(m_Strand, [this, &Done]() { + Cleanup(); + Done.set_value(); + }); + DoneFuture.wait(); + } + } - // For owned io_context: release work guard so run() can return after - // processing the cleanup handler above. - m_WorkGuard.reset(); + // Cleanup body, run on the io thread. + void Cleanup() + { + m_ShuttingDown = true; + m_Timer.cancel(); - if (m_IoThread.joinable()) + // Tear down curl handles first; curl drives CURL_POLL_REMOVE + + // CLOSESOCKETFUNCTION for each owned socket, which our handlers + // use to retire SocketInfo entries from m_Sockets. + for (auto& [TokenId, Ctx] : m_Transfers) { - m_IoThread.join(); + if (Ctx->CurlHandle) + { + curl_multi_remove_handle(m_Multi, Ctx->CurlHandle); + curl_easy_cleanup(Ctx->CurlHandle); + Ctx->CurlHandle = nullptr; + } + + HttpClient::Response Resp; + Resp.Error = HttpClient::ErrorContext{ + .ErrorCode = HttpClientErrorCode::kRequestCancelled, + .ErrorMessage = "AsyncHttpClient shutting down", + }; + try + { + Ctx->Callback(std::move(Resp)); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("AsyncHttpClient: unhandled exception in shutdown callback (token={}, {} {}): {}", + TokenId, + AsyncRequestMethodName(Ctx->Spec.Method), + Ctx->Spec.Url, + Ex.what()); + } } - else + m_Transfers.clear(); + m_InFlight = 0; + + // Drain transfers parked in retry backoff. The timer's pending + // async_wait fires after Cleanup with the io_context already stopped; + // it will find no entry and be a no-op. Fire cancel callbacks here + // while we still hold the storage so callers' futures resolve. + for (auto& [Id, Entry] : m_RetryingTransfers) { - // External io_context: wait for the cleanup handler to complete. - DoneFuture.wait(); + if (Entry.Timer) + { + Entry.Timer->cancel(); + } + HttpClient::Response Resp; + Resp.Error = HttpClient::ErrorContext{ + .ErrorCode = HttpClientErrorCode::kRequestCancelled, + .ErrorMessage = "AsyncHttpClient shutting down", + }; + try + { + Entry.Ctx->Callback(std::move(Resp)); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("AsyncHttpClient: unhandled exception in shutdown retry callback (token={}, {} {}): {}", + Id, + AsyncRequestMethodName(Entry.Ctx->Spec.Method), + Entry.Ctx->Spec.Url, + Ex.what()); + } } + m_RetryingTransfers.clear(); + // curl_multi_cleanup walks the connection cache and fires + // CLOSESOCKETFUNCTION for each cached fd. Run it on the io thread + // while m_Sockets is still populated so our callback routes each + // close through the map (Socket dtor closes fd exactly once). if (m_Multi) { curl_multi_cleanup(m_Multi); + m_Multi = nullptr; + } + + for (CURL* Handle : m_HandlePool) + { + curl_easy_cleanup(Handle); + } + m_HandlePool.clear(); + + asio::error_code CancelEc; + for (auto& [Fd, Info] : m_Sockets) + { + Info->Socket.cancel(CancelEc); } + m_Sockets.clear(); } LoggerRef Log() { return m_Log; } void Init() { + if (!m_Settings.UnixSocketPath.empty()) + { + m_UnixSocketPathUtf8 = PathToUtf8(m_Settings.UnixSocketPath); + } + m_Multi = curl_multi_init(); if (!m_Multi) { @@ -168,6 +444,24 @@ struct AsyncHttpClient::Impl SetupMultiCallbacks(); + if (m_Settings.MaxConcurrentConnectionsPerHost != 0) + { + curl_multi_setopt(m_Multi, CURLMOPT_MAX_HOST_CONNECTIONS, static_cast<long>(m_Settings.MaxConcurrentConnectionsPerHost)); + } + if (m_Settings.MaxConcurrentConnectionsTotal != 0) + { + curl_multi_setopt(m_Multi, CURLMOPT_MAX_TOTAL_CONNECTIONS, static_cast<long>(m_Settings.MaxConcurrentConnectionsTotal)); + } + + // Size the idle-conn cache to the in-flight cap so reused conns are + // never evicted while requests are queued. Each eviction costs a + // fresh TCP+TLS handshake (~280ms WAN to S3) on the next reuse. + const long MaxConnectsHint = static_cast<long>(std::max({m_Settings.MaxConcurrentRequests, + m_Settings.MaxConcurrentConnectionsTotal, + m_Settings.MaxConcurrentConnectionsPerHost, + 128u})); + curl_multi_setopt(m_Multi, CURLMOPT_MAXCONNECTS, MaxConnectsHint); + if (m_Settings.SessionId == Oid::Zero) { m_SessionId = std::string(GetSessionIdString()); @@ -178,6 +472,30 @@ struct AsyncHttpClient::Impl } } + // Run a completion callback inline on the io thread. By the time this is + // called, curl_multi_remove_handle + curl_easy_cleanup have already finalized + // the easy handle, so deferring to next io tick buys nothing. Direct call + // saves one alloc + queue insert per request. + // + // CONTRACT: user callbacks run on the AsyncHttpClient io thread. Heavy work + // (disk syscalls, lock contention, large allocations) must be hopped to a + // worker pool; otherwise it stalls curl_multi for ALL in-flight transfers. + void DispatchCallback(AsyncHttpCallback Cb, HttpClient::Response Resp, const TransferContext& Ctx) + { + try + { + Cb(std::move(Resp)); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("AsyncHttpClient: unhandled exception in completion callback (token={}, {} {}): {}", + Ctx.TokenId, + AsyncRequestMethodName(Ctx.Spec.Method), + Ctx.Spec.Url, + Ex.what()); + } + } + // -- Handle pool ----------------------------------------------------- CURL* AllocHandle() @@ -199,20 +517,15 @@ struct AsyncHttpClient::Impl void ReleaseHandle(CURL* Handle) { m_HandlePool.push_back(Handle); } - // -- Configure a handle with common settings ------------------------- - // Called only from DoAsync* lambdas running on the strand. - + // Called only from DoAsync* lambdas running on the io thread. void ConfigureHandle(CURL* Handle, std::string_view ResourcePath, const HttpClient::KeyValueMap& Parameters) { - // Build URL ExtendableStringBuilder<256> Url; BuildUrlWithParameters(Url, m_BaseUri, ResourcePath, Parameters); curl_easy_setopt(Handle, CURLOPT_URL, Url.c_str()); - // Unix domain socket if (!m_Settings.UnixSocketPath.empty()) { - m_UnixSocketPathUtf8 = PathToUtf8(m_Settings.UnixSocketPath); curl_easy_setopt(Handle, CURLOPT_UNIX_SOCKET_PATH, m_UnixSocketPathUtf8.c_str()); } @@ -226,12 +539,6 @@ struct AsyncHttpClient::Impl curl_easy_setopt(Handle, CURLOPT_TIMEOUT_MS, static_cast<long>(m_Settings.Timeout.count())); } - // HTTP/2 - if (m_Settings.AssumeHttp2) - { - curl_easy_setopt(Handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2_PRIOR_KNOWLEDGE); - } - // SSL if (m_Settings.InsecureSsl) { @@ -243,7 +550,6 @@ struct AsyncHttpClient::Impl curl_easy_setopt(Handle, CURLOPT_CAINFO, m_Settings.CaBundlePath.c_str()); } - // Verbose/debug if (m_Settings.Verbose) { curl_easy_setopt(Handle, CURLOPT_VERBOSE, 1L); @@ -252,6 +558,22 @@ struct AsyncHttpClient::Impl // Thread safety curl_easy_setopt(Handle, CURLOPT_NOSIGNAL, 1L); + // 256 KiB recv buffer aligns with optimal read syscall size and matches + // upload buffer; pairs with downstream 512 KiB write slots (2 recv calls + // per write slot under bulk transfer). + curl_easy_setopt(Handle, CURLOPT_BUFFERSIZE, 262144L); + curl_easy_setopt(Handle, CURLOPT_UPLOAD_BUFFERSIZE, 262144L); + + // Skip per-transfer progress bookkeeping; we don't consume it. + curl_easy_setopt(Handle, CURLOPT_NOPROGRESS, 1L); + + // Disable Nagle (default since curl 7.50; explicit for safety). + curl_easy_setopt(Handle, CURLOPT_TCP_NODELAY, 1L); + + // Take ownership of socket close (see CurlCloseSocketCallback). + curl_easy_setopt(Handle, CURLOPT_CLOSESOCKETFUNCTION, &CurlCloseSocketCallback); + curl_easy_setopt(Handle, CURLOPT_CLOSESOCKETDATA, this); + if (m_Settings.ForbidReuseConnection) { curl_easy_setopt(Handle, CURLOPT_FORBID_REUSE, 1L); @@ -260,23 +582,24 @@ struct AsyncHttpClient::Impl // -- Access token ---------------------------------------------------- - std::optional<std::string> GetAccessToken() + struct AccessTokenResult { + std::optional<std::string> Token; + bool ProviderFailed = false; // provider configured but returned invalid twice + }; + + // Called only on the io thread. + AccessTokenResult GetAccessToken() + { + AccessTokenResult Result; if (!m_Settings.AccessTokenProvider.has_value()) { - return {}; + return Result; // No provider: anonymous is the intended mode. } - { - RwLock::SharedLockScope _(m_AccessTokenLock); - if (!m_CachedAccessToken.NeedsRefresh()) - { - return m_CachedAccessToken.GetValue(); - } - } - RwLock::ExclusiveLockScope _(m_AccessTokenLock); if (!m_CachedAccessToken.NeedsRefresh()) { - return m_CachedAccessToken.GetValue(); + Result.Token = m_CachedAccessToken.GetValue(); + return Result; } HttpClientAccessToken NewToken = m_Settings.AccessTokenProvider.value()(); if (!NewToken.IsValid()) @@ -287,10 +610,176 @@ struct AsyncHttpClient::Impl if (NewToken.IsValid()) { m_CachedAccessToken = NewToken; - return m_CachedAccessToken.GetValue(); + Result.Token = m_CachedAccessToken.GetValue(); + return Result; } ZEN_WARN("AsyncHttpClient: access token provider returned invalid token"); - return {}; + Result.ProviderFailed = true; + return Result; + } + + // -- Submit / resubmit ----------------------------------------------- + // + // SubmitFromSpec runs on the io thread. Used for both the initial submission + // and for retries: the AsyncRequestSpec inside Ctx encodes everything + // needed to (re)build the curl handle from scratch. + + void SubmitFromSpec(std::unique_ptr<TransferContext> Ctx) + { + if (m_ShuttingDown) + { + // Synthesize a cancel response so the user callback fires exactly once. + // Without this any Ctx that lands here post-shutdown would be dropped + // silently, leaving waiting futures unresolved. + HttpClient::Response CancelResp; + CancelResp.Error = HttpClient::ErrorContext{ + .ErrorCode = HttpClientErrorCode::kRequestCancelled, + .ErrorMessage = "Request canceled (client shutting down)", + }; + DispatchCallback(std::move(Ctx->Callback), std::move(CancelResp), *Ctx); + return; + } + + // Cancel-before-submit race: the user can call AsyncRequestToken::Cancel() + // between Submit() returning and the io thread running SubmitFromSpec. + // State.Cancelled is set under acq_rel by Cancel(); read here under + // acquire ensures visibility. If set, fire the cancel callback exactly + // once and bail before the transfer enters m_Transfers. + if (Ctx->TokenStateRef && Ctx->TokenStateRef->Cancelled.load(std::memory_order_acquire)) + { + HttpClient::Response CancelResp; + CancelResp.Error = HttpClient::ErrorContext{ + .ErrorCode = HttpClientErrorCode::kRequestCancelled, + .ErrorMessage = "Request canceled before submit", + }; + DispatchCallback(std::move(Ctx->Callback), std::move(CancelResp), *Ctx); + return; + } + + // Allocate the curl handle BEFORE bumping m_InFlight: AllocHandle can throw + // (curl_easy_init returning null). The InFlightGuard covers the rest of + // the body (BuildHeaderList, ExtraHeaders push_back, GetAccessToken can + // all throw bad_alloc) so a throw post-increment doesn't leak the slot. + // HandleGuard returns the handle to the pool on throw; CallbackGuard + // synthesizes a kInternalError response so the user future resolves. + CURL* Handle = AllocHandle(); + ++m_InFlight; + auto InFlightGuard = MakeGuard([this] { --m_InFlight; }); + auto HandleGuard = MakeGuard([this, Handle] { ReleaseHandle(Handle); }); + auto CallbackGuard = MakeGuard([this, &Ctx] { + HttpClient::Response ErrResp; + ErrResp.Error = HttpClient::ErrorContext{ + .ErrorCode = HttpClientErrorCode::kInternalError, + .ErrorMessage = "AsyncHttpClient::SubmitFromSpec: setup threw before dispatch", + }; + DispatchCallback(std::move(Ctx->Callback), std::move(ErrResp), *Ctx); + }); + ConfigureHandle(Handle, Ctx->Spec.Url, Ctx->Spec.Parameters); + + switch (Ctx->Spec.Method) + { + case AsyncRequestMethod::Get: + curl_easy_setopt(Handle, CURLOPT_HTTPGET, 1L); + break; + + case AsyncRequestMethod::Stream: + curl_easy_setopt(Handle, CURLOPT_HTTPGET, 1L); + break; + + case AsyncRequestMethod::Head: + curl_easy_setopt(Handle, CURLOPT_NOBODY, 1L); + break; + + case AsyncRequestMethod::Delete: + curl_easy_setopt(Handle, CURLOPT_CUSTOMREQUEST, "DELETE"); + break; + + case AsyncRequestMethod::Post: + curl_easy_setopt(Handle, CURLOPT_POST, 1L); + curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE, 0L); + break; + + case AsyncRequestMethod::PostWithPayload: + { + curl_easy_setopt(Handle, CURLOPT_POST, 1L); + Ctx->PayloadBuffer = Ctx->Spec.Payload; + Ctx->ReadData.DataPtr = static_cast<const uint8_t*>(Ctx->PayloadBuffer.GetData()); + Ctx->ReadData.DataSize = Ctx->PayloadBuffer.GetSize(); + Ctx->ReadData.Offset = 0; + curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE_LARGE, static_cast<curl_off_t>(Ctx->PayloadBuffer.GetSize())); + curl_easy_setopt(Handle, CURLOPT_READFUNCTION, CurlReadCallback); + curl_easy_setopt(Handle, CURLOPT_READDATA, &Ctx->ReadData); + break; + } + + case AsyncRequestMethod::Put: + curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L); + curl_easy_setopt(Handle, CURLOPT_INFILESIZE_LARGE, 0LL); + break; + + case AsyncRequestMethod::PutWithPayload: + { + curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L); + Ctx->PayloadBuffer = Ctx->Spec.Payload; + Ctx->ReadData.DataPtr = static_cast<const uint8_t*>(Ctx->PayloadBuffer.GetData()); + Ctx->ReadData.DataSize = Ctx->PayloadBuffer.GetSize(); + Ctx->ReadData.Offset = 0; + curl_easy_setopt(Handle, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(Ctx->PayloadBuffer.GetSize())); + curl_easy_setopt(Handle, CURLOPT_READFUNCTION, CurlReadCallback); + curl_easy_setopt(Handle, CURLOPT_READDATA, &Ctx->ReadData); + break; + } + + case AsyncRequestMethod::PutWithSource: + { + curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L); + Ctx->SourceOffset = 0; + curl_easy_setopt(Handle, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(Ctx->Spec.StreamingPutSize)); + curl_easy_setopt(Handle, CURLOPT_READFUNCTION, AsyncCurlSourceReadCallback); + curl_easy_setopt(Handle, CURLOPT_READDATA, Ctx.get()); + break; + } + } + + // Headers - include Content-Type for payload-bearing methods, Content-Length: 0 for empty PUT. + std::vector<std::pair<std::string, std::string>> ExtraHeaders; + if (Ctx->Spec.Method == AsyncRequestMethod::PostWithPayload) + { + const ZenContentType Effective = Ctx->Spec.HasContentType ? Ctx->Spec.ContentType : Ctx->Spec.Payload.GetContentType(); + ExtraHeaders.emplace_back("Content-Type", std::string(MapContentTypeToString(Effective))); + } + else if (Ctx->Spec.Method == AsyncRequestMethod::PutWithPayload) + { + ExtraHeaders.emplace_back("Content-Type", std::string(MapContentTypeToString(Ctx->Spec.Payload.GetContentType()))); + } + else if (Ctx->Spec.Method == AsyncRequestMethod::Put) + { + ExtraHeaders.emplace_back("Content-Length", "0"); + } + + AccessTokenResult Token = GetAccessToken(); + if (Token.ProviderFailed) + { + // Provider configured but failed twice: do NOT silently downgrade + // to an anonymous request - the server will respond 403 and the + // caller has no way to tell auth failed. + HttpClient::Response ErrResp; + ErrResp.Error = HttpClient::ErrorContext{ + .ErrorCode = HttpClientErrorCode::kInternalError, + .ErrorMessage = "AsyncHttpClient: access token provider failed; refusing to issue anonymous request", + }; + DispatchCallback(std::move(Ctx->Callback), std::move(ErrResp), *Ctx); + CallbackGuard.Dismiss(); + return; + } + + Ctx->HeaderList = BuildHeaderList(Ctx->Spec.AdditionalHeader, m_SessionId, std::move(Token.Token), ExtraHeaders); + curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList); + + InFlightGuard.Dismiss(); + HandleGuard.Dismiss(); + CallbackGuard.Dismiss(); + SubmitTransfer(Handle, std::move(Ctx)); } // -- Submit a transfer ----------------------------------------------- @@ -298,56 +787,66 @@ struct AsyncHttpClient::Impl void SubmitTransfer(CURL* Handle, std::unique_ptr<TransferContext> Ctx) { ZEN_TRACE_CPU("AsyncHttpClient::SubmitTransfer"); - // Setup write/header callbacks - curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, CurlWriteCallback); - curl_easy_setopt(Handle, CURLOPT_WRITEDATA, &Ctx->WriteData); - curl_easy_setopt(Handle, CURLOPT_HEADERFUNCTION, CurlHeaderCallback); - curl_easy_setopt(Handle, CURLOPT_HEADERDATA, &Ctx->HeaderData); - - m_Transfers[Handle] = std::move(Ctx); - + // Pick the WRITE callback by method: + // - Stream: forwards each chunk to the caller's OnData (no copy) + // - other: buffers bytes in TransferContext::Body + if (Ctx->Spec.Method == AsyncRequestMethod::Stream) + { + curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, AsyncCurlStreamWriteCallback); + curl_easy_setopt(Handle, CURLOPT_WRITEDATA, Ctx.get()); + } + else + { + curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, AsyncCurlWriteCallback); + curl_easy_setopt(Handle, CURLOPT_WRITEDATA, Ctx.get()); + } + curl_easy_setopt(Handle, CURLOPT_HEADERFUNCTION, AsyncCurlHeaderCallback); + curl_easy_setopt(Handle, CURLOPT_HEADERDATA, Ctx.get()); + // Stash TokenId on the curl handle so CheckCompleted can look up the + // TransferContext directly from curl_multi_info_read's CURLMsg.easy_handle. + curl_easy_setopt(Handle, CURLOPT_PRIVATE, reinterpret_cast<void*>(static_cast<uintptr_t>(Ctx->TokenId))); + + Ctx->CurlHandle = Handle; + const uint64_t TokenIdLocal = Ctx->TokenId; + + // Try the curl_multi add first. On failure, Ctx is still owned locally so the + // rollback is a single Release path with no map churn. On success, ownership + // moves into the single TokenId-keyed lookup table. CURLMcode Mc = curl_multi_add_handle(m_Multi, Handle); if (Mc != CURLM_OK) { - auto Stolen = std::move(m_Transfers[Handle]); - m_Transfers.erase(Handle); + Ctx->CurlHandle = nullptr; ReleaseHandle(Handle); HttpClient::Response ErrorResponse; ErrorResponse.Error = HttpClient::ErrorContext{.ErrorCode = HttpClientErrorCode::kInternalError, .ErrorMessage = fmt::format("curl_multi_add_handle failed: {}", curl_multi_strerror(Mc))}; - asio::post(m_IoContext, - [Cb = std::move(Stolen->Callback), Response = std::move(ErrorResponse)]() mutable { Cb(std::move(Response)); }); + DispatchCallback(std::move(Ctx->Callback), std::move(ErrorResponse), *Ctx); + OnSlotFreed(); return; } - } - // -- Socket-action integration --------------------------------------- - // - // curl_multi drives I/O via two callbacks: - // - SocketCallback: curl tells us which sockets to watch for read/write - // - TimerCallback: curl tells us when to fire a timeout - // - // On each socket event or timeout we call curl_multi_socket_action(), - // then drain completed transfers via curl_multi_info_read(). + m_Transfers.emplace(TokenIdLocal, std::move(Ctx)); + } - // Per-socket state: wraps the native fd in an ASIO socket for async_wait. - struct SocketInfo + // Telemetry only; this client does not gate fan-out. Callers (e.g. + // S3AsyncStorage) layer their own admission semaphore on top. + // Assert catches SubmitFromSpec/OnSlotFreed imbalance. + void OnSlotFreed() { - asio::ip::tcp::socket Socket; - int WatchFlags = 0; // CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT - - explicit SocketInfo(asio::io_context& IoContext) : Socket(IoContext) {} - }; + ZEN_ASSERT(m_InFlight > 0); + --m_InFlight; + } - // Static thunks registered with curl_multi ---------------------------- + // curl_multi drives I/O via SocketCallback (which fds to watch) and TimerCallback (when to fire). + // On each event we call curl_multi_socket_action() and drain via curl_multi_info_read(). + // Static thunks: UserData = Impl* (set via CURLMOPT_SOCKETDATA / CURLMOPT_TIMERDATA). Bodies run on io thread. static int CurlSocketCallback(CURL* Easy, curl_socket_t Fd, int Action, void* UserPtr, void* SocketPtr) { - ZEN_UNUSED(Easy); auto* Self = static_cast<Impl*>(UserPtr); - Self->OnCurlSocket(Fd, Action, static_cast<SocketInfo*>(SocketPtr)); + Self->OnCurlSocket(Easy, Fd, Action, static_cast<AsyncSocketInfo*>(SocketPtr)); return 0; } @@ -359,6 +858,219 @@ struct AsyncHttpClient::Impl return 0; } + // Async-specific HEADER callback. Appends raw "Key: Value\r\n" lines to + // Ctx->HeaderArena (one growing buffer; ~zero allocs in the common case + // where the initial reserve covers the response). Inline-parses + // Content-Length and Content-Type unconditionally; parses ETag only when + // Spec.WantEtag is set. No std::string/pair allocations per line. + static size_t AsyncCurlHeaderCallback(char* Buffer, size_t Size, size_t Nmemb, void* UserData) + { + auto* Ctx = static_cast<TransferContext*>(UserData); + const size_t TotalBytes = Size * Nmemb; + std::string_view Line(Buffer, TotalBytes); + + Ctx->HeaderArena.append(Buffer, TotalBytes); + + while (!Line.empty() && (Line.back() == '\r' || Line.back() == '\n')) + { + Line.remove_suffix(1); + } + if (Line.empty()) + { + return TotalBytes; + } + const size_t Colon = Line.find(':'); + if (Colon == std::string_view::npos) + { + return TotalBytes; // HTTP status line or malformed + } + std::string_view Key = Line.substr(0, Colon); + std::string_view Value = Line.substr(Colon + 1); + while (!Key.empty() && Key.back() == ' ') + { + Key.remove_suffix(1); + } + while (!Value.empty() && Value.front() == ' ') + { + Value.remove_prefix(1); + } + + if (StrCaseEquals(Key, "Content-Length")) + { + uint64_t Length = 0; + std::from_chars_result Res = std::from_chars(Value.data(), Value.data() + Value.size(), Length); + if (Res.ec == std::errc{}) + { + Ctx->ContentLength = Length; + Ctx->ContentLengthSet = true; + } + } + else if (StrCaseEquals(Key, "Content-Type")) + { + Ctx->BodyContentType = ParseContentType(Value); + } + else if (Ctx->Spec.WantEtag && StrCaseEquals(Key, "ETag")) + { + Ctx->Etag.assign(Value); + } + + return TotalBytes; + } + + // Async-specific write callback. Targets a TransferContext directly. + // Preallocates Body from Ctx->ContentLength (parsed in HEADER cb). If + // Content-Length is absent (e.g. chunked encoding), falls back to + // BodyChunks accumulation; CompleteTransfer flattens at the end. + static size_t AsyncCurlWriteCallback(char* Ptr, size_t Size, size_t Nmemb, void* UserData) + { + auto* Ctx = static_cast<TransferContext*>(UserData); + const size_t TotalBytes = Size * Nmemb; + if (TotalBytes == 0) + { + return 0; + } + + if (!Ctx->BodyPreallocated && Ctx->BodyWriteOffset == 0 && Ctx->BodyChunks.empty() && Ctx->ContentLengthSet && + Ctx->ContentLength > 0) + { + Ctx->Body = IoBuffer(static_cast<size_t>(Ctx->ContentLength)); + Ctx->BodyPreallocated = true; + } + + if (Ctx->BodyPreallocated) + { + if (Ctx->BodyWriteOffset + TotalBytes > Ctx->Body.GetSize()) + { + // Server sent more than Content-Length advertised; abort. + return 0; + } + memcpy(static_cast<uint8_t*>(Ctx->Body.MutableData()) + Ctx->BodyWriteOffset, Ptr, TotalBytes); + Ctx->BodyWriteOffset += TotalBytes; + } + else + { + IoBuffer Chunk(TotalBytes); + memcpy(Chunk.MutableData(), Ptr, TotalBytes); + Ctx->BodyChunks.push_back(std::move(Chunk)); + } + + return TotalBytes; + } + + // PutWithSource read callback. Pulls up to MaxBytes from Spec.OnReadSource + // into curl's send buffer. Source closure runs on the io thread - same + // strand discipline as Stream's OnData. Returning 0 with SourceOffset < + // StreamingPutSize signals an upload abort to curl. + static size_t AsyncCurlSourceReadCallback(char* Buffer, size_t Size, size_t Nmemb, void* UserData) + { + auto* Ctx = static_cast<TransferContext*>(UserData); + const size_t MaxBytes = Size * Nmemb; + if (MaxBytes == 0 || !Ctx->Spec.OnReadSource) + { + return 0; + } + // Catch at the curl boundary: user OnReadSource may call into IoBuffer + // allocation, file reads, or async dispatch which can throw. Letting an + // exception unwind through curl's C frames is UB. + size_t Pulled = 0; + try + { + Pulled = Ctx->Spec.OnReadSource(reinterpret_cast<uint8_t*>(Buffer), MaxBytes, Ctx->SourceOffset); + } + catch (const std::exception& Ex) + { + Ctx->CallbackFailed = true; + Ctx->CallbackErrorMessage = fmt::format("upload source callback threw: {}", Ex.what()); + return CURL_READFUNC_ABORT; + } + catch (...) + { + Ctx->CallbackFailed = true; + Ctx->CallbackErrorMessage = "upload source callback threw unknown exception"; + return CURL_READFUNC_ABORT; + } + if (Pulled == 0 && Ctx->SourceOffset < Ctx->Spec.StreamingPutSize) + { + return CURL_READFUNC_ABORT; + } + Ctx->SourceOffset += Pulled; + return Pulled; + } + + // Stream-method write callback. Hands each chunk to the caller's OnData + // without allocating or copying. The pointer is curl's internal receive + // buffer; valid only for the duration of this call. Caller's OnData runs + // on the io thread, so blocking work (disk write etc) blocks the poll + // loop. TotalSize comes from inline-parsed Content-Length (0 if absent / + // chunked). + static size_t AsyncCurlStreamWriteCallback(char* Ptr, size_t Size, size_t Nmemb, void* UserData) + { + auto* Ctx = static_cast<TransferContext*>(UserData); + const size_t TotalBytes = Size * Nmemb; + if (TotalBytes == 0) + { + return 0; + } + + if (!Ctx->Spec.OnData) + { + // Stream method requires OnData by contract; reaching this branch + // is API misuse. Returning TotalBytes (success) would silently + // drop the body and report "ok"; fail loudly instead. + Ctx->CallbackFailed = true; + Ctx->CallbackErrorMessage = "stream request submitted without OnData callback"; + return 0; + } + + // Catch at the curl boundary so an exception inside the user OnData + // (e.g. IoBuffer alloc failure, ScheduleWork rejection, ZEN_ASSERT in a + // downstream pool) cannot propagate through curl's C frames. Stash on + // Ctx so CompleteTransfer surfaces it as kInternalError. + try + { + const bool ContinueTransfer = Ctx->Spec.OnData(reinterpret_cast<const uint8_t*>(Ptr), TotalBytes, Ctx->ContentLength); + return ContinueTransfer ? TotalBytes : 0; // returning 0 aborts + } + catch (const std::exception& Ex) + { + Ctx->CallbackFailed = true; + Ctx->CallbackErrorMessage = fmt::format("stream data callback threw: {}", Ex.what()); + return 0; + } + catch (...) + { + Ctx->CallbackFailed = true; + Ctx->CallbackErrorMessage = "stream data callback threw unknown exception"; + return 0; + } + } + + // Take ownership of socket close. CLOSESOCKETFUNCTION is invoked from + // within curl_multi operations which run on the io thread, so direct map + // access is safe. The asio tcp::socket destructor closes the fd; we + // return 0 to tell curl the close succeeded. Letting curl close as well + // would race (double-close, fd-reuse hazard) and on Windows IOCP + // `release()` throws `operation_not_supported`, killing the io thread. + static int CurlCloseSocketCallback(void* ClientPtr, curl_socket_t Fd) + { + auto* Self = static_cast<Impl*>(ClientPtr); + auto It = Self->m_Sockets.find(Fd); + if (It != Self->m_Sockets.end()) + { + asio::error_code Ec; + It->second->Socket.cancel(Ec); + Self->m_Sockets.erase(It); + return 0; + } + // Fd not tracked (e.g. pre-poll-add or post-shutdown); close directly. +#if ZEN_PLATFORM_WINDOWS + ::closesocket(Fd); +#else + ::close(Fd); +#endif + return 0; + } + void SetupMultiCallbacks() { curl_multi_setopt(m_Multi, CURLMOPT_SOCKETFUNCTION, CurlSocketCallback); @@ -369,45 +1081,97 @@ struct AsyncHttpClient::Impl // Called by curl when socket watch state changes --------------------- - void OnCurlSocket(curl_socket_t Fd, int Action, SocketInfo* Info) + // Synthesize a transport-level failure for the easy handle currently bound + // to the curl_multi entry that owns Fd. Used when the asio side cannot bind + // the fd; without this the affected transfer would hang on curl's + // connect/transfer timeout instead of failing fast with the real error. + void FailEasyHandleForFd(CURL* Easy, std::string_view Reason) + { + if (!Easy) + { + return; + } + curl_multi_remove_handle(m_Multi, Easy); + + char* Private = nullptr; + curl_easy_getinfo(Easy, CURLINFO_PRIVATE, &Private); + const uint64_t TokenId = static_cast<uint64_t>(reinterpret_cast<uintptr_t>(Private)); + + auto It = m_Transfers.find(TokenId); + if (It == m_Transfers.end()) + { + ReleaseHandle(Easy); + return; + } + + std::unique_ptr<TransferContext> Ctx = std::move(It->second); + m_Transfers.erase(It); + Ctx->CurlHandle = nullptr; + ReleaseHandle(Easy); + + HttpClient::Response Resp; + Resp.Error = HttpClient::ErrorContext{ + .ErrorCode = HttpClientErrorCode::kInternalError, + .ErrorMessage = std::string(Reason), + }; + DispatchCallback(std::move(Ctx->Callback), std::move(Resp), *Ctx); + OnSlotFreed(); + } + + void OnCurlSocket(CURL* Easy, curl_socket_t Fd, int Action, AsyncSocketInfo* Info) { if (Action == CURL_POLL_REMOVE) { if (Info) { - // Cancel pending async_wait ops before releasing the fd. - // curl owns the fd, so we must release() rather than close(). - Info->Socket.cancel(); - if (Info->Socket.is_open()) - { - Info->Socket.release(); - } - m_Sockets.erase(Fd); + // Cancel any pending async_wait but KEEP the AsyncSocketInfo + // alive in m_Sockets. CURL_POLL_REMOVE only means "stop + // watching this socket"; curl may still use the fd + // (keep-alive reuse) and rewatch it later. The asio Socket + // stays bound to the same IOCP it was first assigned to; + // we never call release()+assign() on the same fd, which + // avoided a race where in-flight async_wait callbacks + // raced with curl_multi reading from the socket and + // corrupted HTTP framing. The fd's actual close happens + // in CurlCloseSocketCallback, which erases the entry and + // lets the asio Socket destructor close. + asio::error_code Ec; + Info->Socket.cancel(Ec); + Info->WatchFlags = 0; + Info->PendingFlags = 0; } return; } if (!Info) { - // New socket - wrap the native fd in an ASIO socket. - auto [It, Inserted] = m_Sockets.emplace(Fd, std::make_unique<SocketInfo>(m_IoContext)); - Info = It->second.get(); - - asio::error_code Ec; - // Determine protocol from the fd (v4 vs v6). Default to v4. - Info->Socket.assign(asio::ip::tcp::v4(), Fd, Ec); - if (Ec) + // CURL_POLL_IN/OUT with no Info attached. Two cases: + // 1) brand new fd - emplace, assign, IOCP-bind. + // 2) curl re-watching a kept-alive fd it earlier removed - + // reuse the existing AsyncSocketInfo, no re-assign. + auto It = m_Sockets.find(Fd); + if (It == m_Sockets.end()) { - // Try v6 as fallback - Info->Socket.assign(asio::ip::tcp::v6(), Fd, Ec); - } - if (Ec) - { - ZEN_WARN("AsyncHttpClient: failed to assign socket fd {}: {}", static_cast<int>(Fd), Ec.message()); - m_Sockets.erase(Fd); - return; - } + auto [NewIt, _] = m_Sockets.emplace(Fd, std::make_unique<AsyncSocketInfo>(m_Strand)); + It = NewIt; + asio::error_code Ec; + It->second->Socket.assign(asio::ip::tcp::v4(), Fd, Ec); + if (Ec) + { + It->second->Socket.assign(asio::ip::tcp::v6(), Fd, Ec); + } + if (Ec) + { + std::string Reason = + fmt::format("AsyncHttpClient: failed to assign socket fd {}: {}", static_cast<int>(Fd), Ec.message()); + ZEN_WARN("{}", Reason); + m_Sockets.erase(It); + FailEasyHandleForFd(Easy, Reason); + return; + } + } + Info = It->second.get(); curl_multi_assign(m_Multi, Fd, Info); } @@ -415,37 +1179,84 @@ struct AsyncHttpClient::Impl SetSocketWatch(Fd, Info); } - void SetSocketWatch(curl_socket_t Fd, SocketInfo* Info) + void SetSocketWatch(curl_socket_t Fd, AsyncSocketInfo* Info) { - // Cancel any pending wait before issuing a new one. - Info->Socket.cancel(); + // Cancel only when a previously-watched direction is no longer wanted. + // In the common path (one-shot async_wait completes, curl re-watches + // the same flags) PendingFlags is a subset of WatchFlags and we just + // re-arm the missing direction without touching CancelIoEx. + const int Desired = Info->WatchFlags & (CURL_POLL_IN | CURL_POLL_OUT); + + if (Info->PendingFlags & ~Desired) + { + asio::error_code Ec; + Info->Socket.cancel(Ec); + Info->PendingFlags = 0; + } + + const int ToAdd = Desired & ~Info->PendingFlags; - if (Info->WatchFlags & CURL_POLL_IN) + if (ToAdd & CURL_POLL_IN) { - Info->Socket.async_wait(asio::socket_base::wait_read, asio::bind_executor(m_Strand, [this, Fd](const asio::error_code& Ec) { - if (Ec || m_ShuttingDown) - { - return; - } - OnSocketReady(Fd, CURL_CSELECT_IN); - })); + Info->PendingFlags |= CURL_POLL_IN; + Info->Socket.async_wait(asio::socket_base::wait_read, [this, Fd](const asio::error_code& Ec) { + if (m_ShuttingDown) + { + return; + } + auto It = m_Sockets.find(Fd); + if (It == m_Sockets.end()) + { + return; + } + It->second->PendingFlags &= ~CURL_POLL_IN; + if (Ec) + { + if (Ec != asio::error::operation_aborted) + { + ZEN_DEBUG("AsyncHttpClient: read async_wait fd {} error: {}; signalling curl with CURL_CSELECT_ERR", + static_cast<int>(Fd), + Ec.message()); + OnSocketReady(Fd, CURL_CSELECT_ERR); + } + return; + } + OnSocketReady(Fd, CURL_CSELECT_IN); + }); } - if (Info->WatchFlags & CURL_POLL_OUT) + if (ToAdd & CURL_POLL_OUT) { - Info->Socket.async_wait(asio::socket_base::wait_write, asio::bind_executor(m_Strand, [this, Fd](const asio::error_code& Ec) { - if (Ec || m_ShuttingDown) - { - return; - } - OnSocketReady(Fd, CURL_CSELECT_OUT); - })); + Info->PendingFlags |= CURL_POLL_OUT; + Info->Socket.async_wait(asio::socket_base::wait_write, [this, Fd](const asio::error_code& Ec) { + if (m_ShuttingDown) + { + return; + } + auto It = m_Sockets.find(Fd); + if (It == m_Sockets.end()) + { + return; + } + It->second->PendingFlags &= ~CURL_POLL_OUT; + if (Ec) + { + if (Ec != asio::error::operation_aborted) + { + ZEN_DEBUG("AsyncHttpClient: write async_wait fd {} error: {}; signalling curl with CURL_CSELECT_ERR", + static_cast<int>(Fd), + Ec.message()); + OnSocketReady(Fd, CURL_CSELECT_ERR); + } + return; + } + OnSocketReady(Fd, CURL_CSELECT_OUT); + }); } } void OnSocketReady(curl_socket_t Fd, int CurlAction) { - ZEN_TRACE_CPU("AsyncHttpClient::OnSocketReady"); int StillRunning = 0; curl_multi_socket_action(m_Multi, Fd, CurlAction, &StillRunning); CheckCompleted(); @@ -472,7 +1283,7 @@ struct AsyncHttpClient::Impl if (TimeoutMs == 0) { - // curl wants immediate action - run it directly on the strand. + // curl wants immediate action - run it on the next strand tick. asio::post(m_Strand, [this]() { if (m_ShuttingDown) { @@ -486,16 +1297,24 @@ struct AsyncHttpClient::Impl } m_Timer.expires_after(std::chrono::milliseconds(TimeoutMs)); - m_Timer.async_wait(asio::bind_executor(m_Strand, [this](const asio::error_code& Ec) { - if (Ec || m_ShuttingDown) + m_Timer.async_wait([this](const asio::error_code& Ec) { + if (m_ShuttingDown) + { + return; + } + if (Ec) { + if (Ec != asio::error::operation_aborted) + { + ZEN_DEBUG("AsyncHttpClient: curl multi timer error: {}", Ec.message()); + } return; } ZEN_TRACE_CPU("AsyncHttpClient::OnTimeout"); int StillRunning = 0; curl_multi_socket_action(m_Multi, CURL_SOCKET_TIMEOUT, 0, &StillRunning); CheckCompleted(); - })); + }); } // Drain completed transfers from curl_multi -------------------------- @@ -516,7 +1335,13 @@ struct AsyncHttpClient::Impl curl_multi_remove_handle(m_Multi, Handle); - auto It = m_Transfers.find(Handle); + // Recover TokenId from CURLOPT_PRIVATE; cheaper than a per-handle + // reverse map. Returns nullptr if option was never set. + char* Private = nullptr; + curl_easy_getinfo(Handle, CURLINFO_PRIVATE, &Private); + const uint64_t TokenId = static_cast<uint64_t>(reinterpret_cast<uintptr_t>(Private)); + + auto It = m_Transfers.find(TokenId); if (It == m_Transfers.end()) { ReleaseHandle(Handle); @@ -530,9 +1355,45 @@ struct AsyncHttpClient::Impl } } + // Mirrors CurlHttpClient::ShouldRetry semantics; keep the two in sync. + static bool ShouldRetryAsync(CURLcode CurlResult, long StatusCode) + { + switch (CurlResult) + { + case CURLE_OK: + break; + case CURLE_COULDNT_CONNECT: + case CURLE_RECV_ERROR: + case CURLE_SEND_ERROR: + case CURLE_OPERATION_TIMEDOUT: + case CURLE_PARTIAL_FILE: + return true; + default: + return false; + } + switch (static_cast<HttpResponseCode>(StatusCode)) + { + case HttpResponseCode::RequestTimeout: + case HttpResponseCode::TooManyRequests: + case HttpResponseCode::InternalServerError: + case HttpResponseCode::BadGateway: + case HttpResponseCode::ServiceUnavailable: + case HttpResponseCode::GatewayTimeout: + return true; + default: + return false; + } + } + void CompleteTransfer(CURL* Handle, CURLcode CurlResult, std::unique_ptr<TransferContext> Ctx) { ZEN_TRACE_CPU("AsyncHttpClient::CompleteTransfer"); + + // Free the in-flight counter before any retry / cancel branch. Retry + // re-submits via SubmitFromSpec, which re-increments the counter for + // the next attempt - keeping the assert balanced across retries. + OnSlotFreed(); + // Extract result info long StatusCode = 0; curl_easy_getinfo(Handle, CURLINFO_RESPONSE_CODE, &StatusCode); @@ -548,13 +1409,205 @@ struct AsyncHttpClient::Impl ReleaseHandle(Handle); + // Cancellation came in after curl ran but before we processed completion. + // Synthesize a cancel response and skip retry. + if (Ctx->Cancelled) + { + HttpClient::Response CancelResp; + CancelResp.Error = HttpClient::ErrorContext{ + .ErrorCode = HttpClientErrorCode::kRequestCancelled, + .ErrorMessage = "Request canceled", + }; + DispatchCallback(std::move(Ctx->Callback), std::move(CancelResp), *Ctx); + return; + } + + // User OnData / OnReadSource threw inside curl. The transfer is already + // aborted; surface the stashed exception text and skip retry (a callback + // exception is not a transient transport failure). + if (Ctx->CallbackFailed) + { + HttpClient::Response Resp; + Resp.Error = HttpClient::ErrorContext{ + .ErrorCode = HttpClientErrorCode::kInternalError, + .ErrorMessage = std::move(Ctx->CallbackErrorMessage), + }; + DispatchCallback(std::move(Ctx->Callback), std::move(Resp), *Ctx); + return; + } + + // Retry path: re-issue from spec after backoff. Keeps the user callback + // untouched so the eventual final result fires only once. + if (!m_ShuttingDown && Ctx->AttemptCount < m_Settings.RetryCount && ShouldRetryAsync(CurlResult, StatusCode)) + { + ++Ctx->AttemptCount; + const long BackoffMs = 100 * Ctx->AttemptCount; + + if (CurlResult != CURLE_OK) + { + ZEN_INFO("Retry (session: {}): HTTP error ({}) '{}' (Curl error: {}) Attempt {}/{}", + m_SessionId, + static_cast<int>(MapCurlError(CurlResult)), + curl_easy_strerror(CurlResult), + static_cast<int>(CurlResult), + Ctx->AttemptCount, + m_Settings.RetryCount + 1); + } + else + { + ZEN_INFO("Retry (session: {}): HTTP status ({}) '{}' Attempt {}/{}", + m_SessionId, + StatusCode, + zen::ToString(HttpResponseCode(StatusCode)), + Ctx->AttemptCount, + m_Settings.RetryCount + 1); + } + + // Stash the just-finished attempt's failure so the abandon path can + // surface it instead of a generic "Request canceled (retry abandoned)". + // Non-empty LastErrorMessage marks the stash valid. + Ctx->LastCurlResult = CurlResult; + Ctx->LastStatusCode = StatusCode; + Ctx->LastErrorMessage = (CurlResult != CURLE_OK) ? std::string(curl_easy_strerror(CurlResult)) + : std::string(zen::ToString(HttpResponseCode(StatusCode))); + Ctx->ResetForRetry(); + + const uint64_t RetryTokenId = Ctx->TokenId; + auto RetryTimer = std::make_shared<asio::steady_timer>(m_Strand); + RetryTimer->expires_after(std::chrono::milliseconds(BackoffMs)); + + // Park Ctx + Timer in m_RetryingTransfers so HandleCancel can find + // it and cancel the timer (early cancel without paying the full + // backoff). The timer lambda re-claims Ctx through the map; if + // HandleCancel got there first the entry is gone and the lambda + // returns silently. + auto [It, Inserted] = m_RetryingTransfers.emplace(RetryTokenId, RetryEntry{std::move(Ctx), RetryTimer}); + ZEN_ASSERT(Inserted); + + // Capture weak_from_this() rather than raw `this`. With an external + // io_context, Cleanup cancels the timer but the cancellation handler + // is queued on the caller's loop and may fire AFTER ~Impl runs. The + // owned-context path drains via restart()+poll() before destruction + // so this is moot there, but the weak ref is the cheapest way to + // keep both paths safe. + RetryTimer->async_wait([Self = weak_from_this(), RetryTokenId](const asio::error_code& Ec) { + auto Locked = Self.lock(); + if (!Locked) + { + return; + } + Impl& Me = *Locked; + auto It = Me.m_RetryingTransfers.find(RetryTokenId); + if (It == Me.m_RetryingTransfers.end()) + { + // HandleCancel already removed the entry and dispatched the + // cancel callback. + return; + } + std::unique_ptr<TransferContext> Ctx = std::move(It->second.Ctx); + Me.m_RetryingTransfers.erase(It); + + if (Ec || Me.m_ShuttingDown) + { + // Retry abandoned by timer cancellation or client shutdown. + // Surface the underlying failure (stashed pre-backoff) so + // the caller can distinguish a real timeout/throttle from a + // shutdown / cancel race. + HttpClient::Response Resp; + if (!Ctx->LastErrorMessage.empty()) + { + Resp.StatusCode = HttpResponseCode(Ctx->LastStatusCode); + Resp.Error = HttpClient::ErrorContext{ + .ErrorCode = + (Ctx->LastCurlResult != CURLE_OK) ? MapCurlError(Ctx->LastCurlResult) : HttpClientErrorCode::kOtherError, + .ErrorMessage = fmt::format("Request canceled (retry abandoned after: {})", Ctx->LastErrorMessage), + }; + } + else + { + Resp.Error = HttpClient::ErrorContext{ + .ErrorCode = HttpClientErrorCode::kRequestCancelled, + .ErrorMessage = "Request canceled (retry abandoned)", + }; + } + Me.DispatchCallback(std::move(Ctx->Callback), std::move(Resp), *Ctx); + return; + } + Me.SubmitFromSpec(std::move(Ctx)); + }); + return; + } + // Build response HttpClient::Response Response; Response.StatusCode = HttpResponseCode(StatusCode); Response.UploadedBytes = static_cast<int64_t>(UpBytes); Response.DownloadedBytes = static_cast<int64_t>(DownBytes); Response.ElapsedSeconds = Elapsed; - Response.Header = BuildHeaderMap(Ctx->ResponseHeaders); + // Hand the raw arena over - FindHeader scans this lazily. Build the + // parsed KeyValueMap only when caller explicitly asks (rare). + Response.HeaderArena = std::move(Ctx->HeaderArena); + if (Ctx->Spec.WantHeaderMap) + { + std::string_view View(Response.HeaderArena); + while (!View.empty()) + { + const size_t LineEnd = View.find('\n'); + std::string_view Line = LineEnd == std::string_view::npos ? View : View.substr(0, LineEnd); + View = LineEnd == std::string_view::npos ? std::string_view{} : View.substr(LineEnd + 1); + while (!Line.empty() && (Line.back() == '\r' || Line.back() == '\n')) + { + Line.remove_suffix(1); + } + if (auto Header = ParseHeaderLine(Line)) + { + Response.Header->insert_or_assign(std::string(Header->first), std::string(Header->second)); + } + } + } + + // Helper: produce the response IoBuffer from whichever Body path was + // taken. Preallocated path moves with zero copy; chunked fallback + // moves the single chunk or flattens N chunks into one allocation. + auto BuildResponsePayload = [&]() -> IoBuffer { + if (Ctx->BodyPreallocated) + { + IoBuffer Out = std::move(Ctx->Body); + if (Ctx->BodyWriteOffset != Out.GetSize()) + { + // Server closed early - return a non-owning sub-buffer over + // the actually-received prefix. Sub-buffer holds a ref to + // Out's core so the underlying allocation stays alive; no + // memcpy. + return IoBuffer(Out, 0, Ctx->BodyWriteOffset); + } + return Out; + } + if (Ctx->BodyChunks.size() == 1) + { + return std::move(Ctx->BodyChunks[0]); + } + if (!Ctx->BodyChunks.empty()) + { + // Flatten N chunks into one IoBuffer; single alloc avoids a copy chain. + size_t Total = 0; + for (const IoBuffer& C : Ctx->BodyChunks) + { + Total += C.GetSize(); + } + IoBuffer Out(Total); + uint8_t* Dst = static_cast<uint8_t*>(Out.MutableData()); + for (const IoBuffer& C : Ctx->BodyChunks) + { + memcpy(Dst, C.GetData(), C.GetSize()); + Dst += C.GetSize(); + } + return Out; + } + return IoBuffer{}; + }; + + const bool HasBody = Ctx->BodyPreallocated ? Ctx->BodyWriteOffset > 0 : !Ctx->BodyChunks.empty(); if (CurlResult != CURLE_OK) { @@ -562,258 +1615,261 @@ struct AsyncHttpClient::Impl if (CurlResult != CURLE_OPERATION_TIMEDOUT && CurlResult != CURLE_COULDNT_CONNECT && CurlResult != CURLE_ABORTED_BY_CALLBACK) { - ZEN_WARN("AsyncHttpClient failure: ({}) '{}'", static_cast<int>(CurlResult), ErrorMsg); + ZEN_WARN("AsyncHttpClient failure: token={} {} '{}': ({}) '{}'", + Ctx->TokenId, + AsyncRequestMethodName(Ctx->Spec.Method), + Ctx->Spec.Url, + static_cast<int>(CurlResult), + ErrorMsg); } - if (!Ctx->Body.empty()) + if (HasBody) { - Response.ResponsePayload = IoBufferBuilder::MakeCloneFromMemory(Ctx->Body.data(), Ctx->Body.size()); + Response.ResponsePayload = BuildResponsePayload(); } Response.Error = HttpClient::ErrorContext{.ErrorCode = MapCurlError(CurlResult), .ErrorMessage = std::string(ErrorMsg)}; } - else if (StatusCode == static_cast<long>(HttpResponseCode::NoContent) || Ctx->Body.empty()) + else if (StatusCode == static_cast<long>(HttpResponseCode::NoContent) || !HasBody) { // No payload } else { - IoBuffer PayloadBuffer = IoBufferBuilder::MakeCloneFromMemory(Ctx->Body.data(), Ctx->Body.size()); - ApplyContentTypeFromHeaders(PayloadBuffer, Ctx->ResponseHeaders); + IoBuffer PayloadBuffer = BuildResponsePayload(); + if (Ctx->BodyContentType != ZenContentType::kUnknownContentType) + { + PayloadBuffer.SetContentType(Ctx->BodyContentType); + } const HttpResponseCode Code = HttpResponseCode(StatusCode); if (!IsHttpSuccessCode(Code) && Code != HttpResponseCode::NotFound) { - ZEN_WARN("AsyncHttpClient request failed: status={}, base={}", static_cast<int>(Code), m_BaseUri); + ZEN_WARN("AsyncHttpClient request failed: token={} {} '{}': status={}", + Ctx->TokenId, + AsyncRequestMethodName(Ctx->Spec.Method), + Ctx->Spec.Url, + static_cast<int>(Code)); } Response.ResponsePayload = std::move(PayloadBuffer); } - // Dispatch the user callback off the strand so a slow callback - // cannot starve the curl_multi poll loop. - asio::post(m_IoContext, [LogRef = m_Log, Cb = std::move(Ctx->Callback), Response = std::move(Response)]() mutable { - try + // Token reaches terminal state. Ctx (and its embedded TokenState) is + // destroyed when this scope ends; late Cancel() calls find no entry in + // m_Transfers and become no-ops. + DispatchCallback(std::move(Ctx->Callback), std::move(Response), *Ctx); + } + + // -- Async verb implementations -------------------------------------- + + AsyncRequestToken Submit(std::unique_ptr<TransferContext> Ctx) + { + // Allocate token ID + state up front so callers can cancel before the + // posted submit even runs. Token::State is shared between the user-held + // AsyncRequestToken and the TransferContext (no separate strand-side map). + const uint64_t Id = m_NextTokenId.fetch_add(1, std::memory_order_relaxed); + Ctx->TokenId = Id; + + auto State = std::make_shared<AsyncRequestToken::State>(); + State->CancelFn = [WeakSelf = weak_from_this(), Id]() { + auto Self = WeakSelf.lock(); + if (!Self) { - Cb(std::move(Response)); + return; } - catch (const std::exception& Ex) + asio::post(Self->m_Strand, [Self, Id]() { Self->HandleCancel(Id); }); + }; + Ctx->TokenStateRef = State; + + asio::post(m_Strand, [this, Ctx = std::move(Ctx)]() mutable { + if (m_ShuttingDown) { - ZEN_SCOPED_LOG(LogRef); - ZEN_ERROR("AsyncHttpClient: unhandled exception in completion callback: {}", Ex.what()); + HttpClient::Response Resp; + Resp.Error = HttpClient::ErrorContext{ + .ErrorCode = HttpClientErrorCode::kRequestCancelled, + .ErrorMessage = "Request canceled (client shutting down)", + }; + DispatchCallback(std::move(Ctx->Callback), std::move(Resp), *Ctx); + return; } + SubmitFromSpec(std::move(Ctx)); }); - } - // -- Async verb implementations -------------------------------------- + return AsyncRequestToken(std::move(State)); + } - void DoAsyncGet(std::string Url, - AsyncHttpCallback Callback, - HttpClient::KeyValueMap AdditionalHeader, - HttpClient::KeyValueMap Parameters) + void HandleCancel(uint64_t Id) { - asio::post(m_Strand, - [this, - Url = std::move(Url), - Callback = std::move(Callback), - AdditionalHeader = std::move(AdditionalHeader), - Parameters = std::move(Parameters)]() mutable { - ZEN_TRACE_CPU("AsyncHttpClient::Get"); - if (m_ShuttingDown) - { - return; - } - CURL* Handle = AllocHandle(); - ConfigureHandle(Handle, Url, Parameters); - curl_easy_setopt(Handle, CURLOPT_HTTPGET, 1L); - - auto Ctx = std::make_unique<TransferContext>(std::move(Callback)); - Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken()); - curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList); - - SubmitTransfer(Handle, std::move(Ctx)); - }); + auto It = m_Transfers.find(Id); + if (It != m_Transfers.end()) + { + std::unique_ptr<TransferContext> Ctx = std::move(It->second); + m_Transfers.erase(It); + + Ctx->Cancelled = true; + if (Ctx->CurlHandle) + { + curl_multi_remove_handle(m_Multi, Ctx->CurlHandle); + ReleaseHandle(Ctx->CurlHandle); + Ctx->CurlHandle = nullptr; + } + + HttpClient::Response CancelResp; + CancelResp.Error = HttpClient::ErrorContext{ + .ErrorCode = HttpClientErrorCode::kRequestCancelled, + .ErrorMessage = "Request canceled", + }; + DispatchCallback(std::move(Ctx->Callback), std::move(CancelResp), *Ctx); + OnSlotFreed(); + return; + } + + // Cancel landed during the retry backoff: take the parked Ctx + Timer, + // cancel the timer (the in-flight async_wait fires later with + // operation_aborted but finds no entry, so it's a no-op), and dispatch + // the cancel callback now so the user observes immediate cancellation + // rather than waiting out the backoff. + auto RetIt = m_RetryingTransfers.find(Id); + if (RetIt != m_RetryingTransfers.end()) + { + std::unique_ptr<TransferContext> Ctx = std::move(RetIt->second.Ctx); + std::shared_ptr<asio::steady_timer> Timer = std::move(RetIt->second.Timer); + m_RetryingTransfers.erase(RetIt); + Timer->cancel(); + + HttpClient::Response CancelResp; + CancelResp.Error = HttpClient::ErrorContext{ + .ErrorCode = HttpClientErrorCode::kRequestCancelled, + .ErrorMessage = "Request canceled", + }; + DispatchCallback(std::move(Ctx->Callback), std::move(CancelResp), *Ctx); + return; + } + + // Cancel landed before SubmitFromSpec ran (Cancel posted between Submit + // returning and the io thread executing the posted SubmitFromSpec). + // State.Cancelled has already been set by Cancel(); SubmitFromSpec + // checks it and synthesizes the cancel callback when the transfer + // eventually arrives. Nothing to do here. } - void DoAsyncHead(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader) + AsyncRequestToken DoAsyncGet(std::string Url, + AsyncHttpCallback Callback, + HttpClient::KeyValueMap AdditionalHeader, + HttpClient::KeyValueMap Parameters) { - asio::post(m_Strand, - [this, Url = std::move(Url), Callback = std::move(Callback), AdditionalHeader = std::move(AdditionalHeader)]() mutable { - ZEN_TRACE_CPU("AsyncHttpClient::Head"); - if (m_ShuttingDown) - { - return; - } - CURL* Handle = AllocHandle(); - ConfigureHandle(Handle, Url, {}); - curl_easy_setopt(Handle, CURLOPT_NOBODY, 1L); - - auto Ctx = std::make_unique<TransferContext>(std::move(Callback)); - Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken()); - curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList); - - SubmitTransfer(Handle, std::move(Ctx)); - }); + auto Ctx = std::make_unique<TransferContext>(std::move(Callback)); + Ctx->Spec.Method = AsyncRequestMethod::Get; + Ctx->Spec.Url = std::move(Url); + Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader); + Ctx->Spec.Parameters = std::move(Parameters); + return Submit(std::move(Ctx)); } - void DoAsyncDelete(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader) + AsyncRequestToken DoAsyncHead(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader) { - asio::post(m_Strand, - [this, Url = std::move(Url), Callback = std::move(Callback), AdditionalHeader = std::move(AdditionalHeader)]() mutable { - ZEN_TRACE_CPU("AsyncHttpClient::Delete"); - if (m_ShuttingDown) - { - return; - } - CURL* Handle = AllocHandle(); - ConfigureHandle(Handle, Url, {}); - curl_easy_setopt(Handle, CURLOPT_CUSTOMREQUEST, "DELETE"); - - auto Ctx = std::make_unique<TransferContext>(std::move(Callback)); - Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken()); - curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList); - - SubmitTransfer(Handle, std::move(Ctx)); - }); + auto Ctx = std::make_unique<TransferContext>(std::move(Callback)); + Ctx->Spec.Method = AsyncRequestMethod::Head; + Ctx->Spec.Url = std::move(Url); + Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader); + return Submit(std::move(Ctx)); } - void DoAsyncPost(std::string Url, - AsyncHttpCallback Callback, - HttpClient::KeyValueMap AdditionalHeader, - HttpClient::KeyValueMap Parameters) + AsyncRequestToken DoAsyncDelete(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader) { - asio::post(m_Strand, - [this, - Url = std::move(Url), - Callback = std::move(Callback), - AdditionalHeader = std::move(AdditionalHeader), - Parameters = std::move(Parameters)]() mutable { - ZEN_TRACE_CPU("AsyncHttpClient::Post"); - if (m_ShuttingDown) - { - return; - } - CURL* Handle = AllocHandle(); - ConfigureHandle(Handle, Url, Parameters); - curl_easy_setopt(Handle, CURLOPT_POST, 1L); - curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE, 0L); - - auto Ctx = std::make_unique<TransferContext>(std::move(Callback)); - Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken()); - curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList); - - SubmitTransfer(Handle, std::move(Ctx)); - }); + auto Ctx = std::make_unique<TransferContext>(std::move(Callback)); + Ctx->Spec.Method = AsyncRequestMethod::Delete; + Ctx->Spec.Url = std::move(Url); + Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader); + return Submit(std::move(Ctx)); } - void DoAsyncPostWithPayload(std::string Url, - IoBuffer Payload, - ZenContentType ContentType, - AsyncHttpCallback Callback, - HttpClient::KeyValueMap AdditionalHeader) + AsyncRequestToken DoAsyncPost(std::string Url, + AsyncHttpCallback Callback, + HttpClient::KeyValueMap AdditionalHeader, + HttpClient::KeyValueMap Parameters) { - asio::post(m_Strand, - [this, - Url = std::move(Url), - Payload = std::move(Payload), - ContentType, - Callback = std::move(Callback), - AdditionalHeader = std::move(AdditionalHeader)]() mutable { - ZEN_TRACE_CPU("AsyncHttpClient::PostWithPayload"); - if (m_ShuttingDown) - { - return; - } - CURL* Handle = AllocHandle(); - ConfigureHandle(Handle, Url, {}); - curl_easy_setopt(Handle, CURLOPT_POST, 1L); - - auto Ctx = std::make_unique<TransferContext>(std::move(Callback)); - Ctx->PayloadBuffer = std::move(Payload); - Ctx->HeaderList = - BuildHeaderList(AdditionalHeader, - m_SessionId, - GetAccessToken(), - {std::make_pair("Content-Type", std::string(MapContentTypeToString(ContentType)))}); - curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList); - - // Set up read callback for payload data - Ctx->ReadData.DataPtr = static_cast<const uint8_t*>(Ctx->PayloadBuffer.GetData()); - Ctx->ReadData.DataSize = Ctx->PayloadBuffer.GetSize(); - Ctx->ReadData.Offset = 0; - - curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE_LARGE, static_cast<curl_off_t>(Ctx->PayloadBuffer.GetSize())); - curl_easy_setopt(Handle, CURLOPT_READFUNCTION, CurlReadCallback); - curl_easy_setopt(Handle, CURLOPT_READDATA, &Ctx->ReadData); - - SubmitTransfer(Handle, std::move(Ctx)); - }); + auto Ctx = std::make_unique<TransferContext>(std::move(Callback)); + Ctx->Spec.Method = AsyncRequestMethod::Post; + Ctx->Spec.Url = std::move(Url); + Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader); + Ctx->Spec.Parameters = std::move(Parameters); + return Submit(std::move(Ctx)); } - void DoAsyncPutWithPayload(std::string Url, - IoBuffer Payload, - AsyncHttpCallback Callback, - HttpClient::KeyValueMap AdditionalHeader, - HttpClient::KeyValueMap Parameters) + AsyncRequestToken DoAsyncPostWithPayload(std::string Url, + IoBuffer Payload, + ZenContentType ContentType, + AsyncHttpCallback Callback, + HttpClient::KeyValueMap AdditionalHeader) { - asio::post(m_Strand, - [this, - Url = std::move(Url), - Payload = std::move(Payload), - Callback = std::move(Callback), - AdditionalHeader = std::move(AdditionalHeader), - Parameters = std::move(Parameters)]() mutable { - ZEN_TRACE_CPU("AsyncHttpClient::Put"); - if (m_ShuttingDown) - { - return; - } - CURL* Handle = AllocHandle(); - ConfigureHandle(Handle, Url, Parameters); - curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L); - - auto Ctx = std::make_unique<TransferContext>(std::move(Callback)); - Ctx->PayloadBuffer = std::move(Payload); - Ctx->HeaderList = BuildHeaderList( - AdditionalHeader, - m_SessionId, - GetAccessToken(), - {std::make_pair("Content-Type", std::string(MapContentTypeToString(Ctx->PayloadBuffer.GetContentType())))}); - curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList); - - Ctx->ReadData.DataPtr = static_cast<const uint8_t*>(Ctx->PayloadBuffer.GetData()); - Ctx->ReadData.DataSize = Ctx->PayloadBuffer.GetSize(); - Ctx->ReadData.Offset = 0; - - curl_easy_setopt(Handle, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(Ctx->PayloadBuffer.GetSize())); - curl_easy_setopt(Handle, CURLOPT_READFUNCTION, CurlReadCallback); - curl_easy_setopt(Handle, CURLOPT_READDATA, &Ctx->ReadData); - - SubmitTransfer(Handle, std::move(Ctx)); - }); + auto Ctx = std::make_unique<TransferContext>(std::move(Callback)); + Ctx->Spec.Method = AsyncRequestMethod::PostWithPayload; + Ctx->Spec.Url = std::move(Url); + Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader); + Ctx->Spec.Payload = std::move(Payload); + Ctx->Spec.ContentType = ContentType; + Ctx->Spec.HasContentType = true; + return Submit(std::move(Ctx)); } - void DoAsyncPutNoPayload(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap Parameters) + AsyncRequestToken DoAsyncPutWithPayload(std::string Url, + IoBuffer Payload, + AsyncHttpCallback Callback, + HttpClient::KeyValueMap AdditionalHeader, + HttpClient::KeyValueMap Parameters) { - asio::post(m_Strand, [this, Url = std::move(Url), Callback = std::move(Callback), Parameters = std::move(Parameters)]() mutable { - ZEN_TRACE_CPU("AsyncHttpClient::Put"); - if (m_ShuttingDown) - { - return; - } - CURL* Handle = AllocHandle(); - ConfigureHandle(Handle, Url, Parameters); - curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L); - curl_easy_setopt(Handle, CURLOPT_INFILESIZE_LARGE, 0LL); + auto Ctx = std::make_unique<TransferContext>(std::move(Callback)); + Ctx->Spec.Method = AsyncRequestMethod::PutWithPayload; + Ctx->Spec.Url = std::move(Url); + Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader); + Ctx->Spec.Parameters = std::move(Parameters); + Ctx->Spec.Payload = std::move(Payload); + return Submit(std::move(Ctx)); + } - auto Ctx = std::make_unique<TransferContext>(std::move(Callback)); + AsyncRequestToken DoAsyncPutNoPayload(std::string Url, + AsyncHttpCallback Callback, + HttpClient::KeyValueMap AdditionalHeader, + HttpClient::KeyValueMap Parameters) + { + auto Ctx = std::make_unique<TransferContext>(std::move(Callback)); + Ctx->Spec.Method = AsyncRequestMethod::Put; + Ctx->Spec.Url = std::move(Url); + Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader); + Ctx->Spec.Parameters = std::move(Parameters); + return Submit(std::move(Ctx)); + } - HttpClient::KeyValueMap ContentLengthHeader{std::pair<std::string_view, std::string_view>{"Content-Length", "0"}}; - Ctx->HeaderList = BuildHeaderList(ContentLengthHeader, m_SessionId, GetAccessToken()); - curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList); + AsyncRequestToken DoAsyncPutWithSource(std::string Url, + uint64_t TotalSize, + AsyncHttpReadSource Source, + AsyncHttpCallback Callback, + HttpClient::KeyValueMap AdditionalHeader) + { + auto Ctx = std::make_unique<TransferContext>(std::move(Callback)); + Ctx->Spec.Method = AsyncRequestMethod::PutWithSource; + Ctx->Spec.Url = std::move(Url); + Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader); + Ctx->Spec.OnReadSource = std::move(Source); + Ctx->Spec.StreamingPutSize = TotalSize; + return Submit(std::move(Ctx)); + } - SubmitTransfer(Handle, std::move(Ctx)); - }); + AsyncRequestToken DoAsyncStream(std::string Url, + AsyncHttpDataCallback OnData, + AsyncHttpCallback OnComplete, + HttpClient::KeyValueMap AdditionalHeader, + HttpClient::KeyValueMap Parameters) + { + auto Ctx = std::make_unique<TransferContext>(std::move(OnComplete)); + Ctx->Spec.Method = AsyncRequestMethod::Stream; + Ctx->Spec.Url = std::move(Url); + Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader); + Ctx->Spec.Parameters = std::move(Parameters); + Ctx->Spec.OnData = std::move(OnData); + return Submit(std::move(Ctx)); } // -- Members --------------------------------------------------------- @@ -824,25 +1880,44 @@ struct AsyncHttpClient::Impl std::string m_SessionId; std::string m_UnixSocketPathUtf8; - // io_context and strand - all curl_multi operations are serialized on the - // strand, making this safe even when the io_context has multiple threads. - std::unique_ptr<asio::io_context> m_OwnedIoContext; - asio::io_context& m_IoContext; + // io_context: either privately owned (m_OwnedIoContext non-null + we spin + // m_IoThread + hold m_WorkGuard) or supplied by the caller (m_OwnedIoContext + // null; caller drives the loop). Declared before m_Strand so the strand + // can bind its executor from m_IoContext during member init. + std::unique_ptr<asio::io_context> m_OwnedIoContext; + asio::io_context& m_IoContext; + // Single serialization point for every curl_multi operation, every async + // completion handler, and every Cleanup call. Declared after m_IoContext + // (so make_strand can read its executor) and before m_Timer (which binds + // to the strand). asio::strand<asio::io_context::executor_type> m_Strand; std::optional<asio::executor_work_guard<asio::io_context::executor_type>> m_WorkGuard; std::thread m_IoThread; - // curl_multi and socket-action state - CURLM* m_Multi = nullptr; - std::unordered_map<CURL*, std::unique_ptr<TransferContext>> m_Transfers; - std::vector<CURL*> m_HandlePool; - std::unordered_map<curl_socket_t, std::unique_ptr<SocketInfo>> m_Sockets; - asio::steady_timer m_Timer; - bool m_ShuttingDown = false; + // Strand-bound; async_wait completions land back on m_Strand. + asio::steady_timer m_Timer; + CURLM* m_Multi = nullptr; + // Single TokenId-keyed map. CurlHandle lives in TransferContext; reverse + // lookup from CURL* uses CURLOPT_PRIVATE (set in SubmitTransfer). + std::unordered_map<uint64_t, std::unique_ptr<TransferContext>> m_Transfers; + // Transfers parked between curl-side completion and the next attempt's + // SubmitFromSpec. Lookups by TokenId let HandleCancel cancel the backoff + // timer without paying the full delay. + struct RetryEntry + { + std::unique_ptr<TransferContext> Ctx; + std::shared_ptr<asio::steady_timer> Timer; + }; + std::unordered_map<uint64_t, RetryEntry> m_RetryingTransfers; + std::vector<CURL*> m_HandlePool; + std::unordered_map<curl_socket_t, std::unique_ptr<AsyncSocketInfo>> m_Sockets; + uint32_t m_InFlight = 0; // telemetry only; storage layer caps fan-out - // Access token cache - RwLock m_AccessTokenLock; + std::atomic<bool> m_ShuttingDown{false}; HttpClientAccessToken m_CachedAccessToken; + + std::atomic<uint64_t> m_NextTokenId{1}; + std::atomic<bool> m_ShutdownDone{false}; }; ////////////////////////////////////////////////////////////////////////// @@ -850,79 +1925,134 @@ struct AsyncHttpClient::Impl // AsyncHttpClient public API AsyncHttpClient::AsyncHttpClient(std::string_view BaseUri, const HttpClientSettings& Settings) -: m_Impl(std::make_unique<Impl>(BaseUri, Settings)) +: m_Impl(std::make_shared<Impl>(BaseUri, Settings)) { } AsyncHttpClient::AsyncHttpClient(std::string_view BaseUri, asio::io_context& IoContext, const HttpClientSettings& Settings) -: m_Impl(std::make_unique<Impl>(BaseUri, IoContext, Settings)) +: m_Impl(std::make_shared<Impl>(BaseUri, IoContext, Settings)) { } -AsyncHttpClient::~AsyncHttpClient() = default; +AsyncHttpClient::~AsyncHttpClient() +{ + // Drive teardown synchronously while we're guaranteed to be on a user + // thread (not the io thread). Joining and draining here ensures any + // posted lambdas that captured a shared_ptr<Impl> by value (e.g. + // Cancel after the transfer completed) are destroyed and release + // their refs before m_Impl drops; otherwise the cycle "lambda holds + // Impl ref / lambda lives in io_context owned by Impl" would pin Impl + // alive forever and leak curl_multi + socket handles. + if (m_Impl) + { + m_Impl->Shutdown(); + } +} // -- Callback-based API -------------------------------------------------- -void +AsyncRequestToken AsyncHttpClient::AsyncGet(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { - m_Impl->DoAsyncGet(std::string(Url), std::move(Callback), AdditionalHeader, Parameters); + return m_Impl->DoAsyncGet(std::string(Url), std::move(Callback), AdditionalHeader, Parameters); } -void +AsyncRequestToken AsyncHttpClient::AsyncHead(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader) { - m_Impl->DoAsyncHead(std::string(Url), std::move(Callback), AdditionalHeader); + return m_Impl->DoAsyncHead(std::string(Url), std::move(Callback), AdditionalHeader); } -void +AsyncRequestToken AsyncHttpClient::AsyncDelete(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader) { - m_Impl->DoAsyncDelete(std::string(Url), std::move(Callback), AdditionalHeader); + return m_Impl->DoAsyncDelete(std::string(Url), std::move(Callback), AdditionalHeader); } -void +AsyncRequestToken AsyncHttpClient::AsyncPost(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { - m_Impl->DoAsyncPost(std::string(Url), std::move(Callback), AdditionalHeader, Parameters); + return m_Impl->DoAsyncPost(std::string(Url), std::move(Callback), AdditionalHeader, Parameters); } -void +AsyncRequestToken AsyncHttpClient::AsyncPost(std::string_view Url, const IoBuffer& Payload, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader) { - m_Impl->DoAsyncPostWithPayload(std::string(Url), Payload, Payload.GetContentType(), std::move(Callback), AdditionalHeader); + return m_Impl->DoAsyncPostWithPayload(std::string(Url), Payload, Payload.GetContentType(), std::move(Callback), AdditionalHeader); } -void +AsyncRequestToken AsyncHttpClient::AsyncPost(std::string_view Url, const IoBuffer& Payload, ZenContentType ContentType, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader) { - m_Impl->DoAsyncPostWithPayload(std::string(Url), Payload, ContentType, std::move(Callback), AdditionalHeader); + return m_Impl->DoAsyncPostWithPayload(std::string(Url), Payload, ContentType, std::move(Callback), AdditionalHeader); } -void +AsyncRequestToken AsyncHttpClient::AsyncPut(std::string_view Url, const IoBuffer& Payload, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { - m_Impl->DoAsyncPutWithPayload(std::string(Url), Payload, std::move(Callback), AdditionalHeader, Parameters); + return m_Impl->DoAsyncPutWithPayload(std::string(Url), Payload, std::move(Callback), AdditionalHeader, Parameters); +} + +AsyncRequestToken +AsyncHttpClient::AsyncPut(std::string_view Url, + AsyncHttpCallback Callback, + const KeyValueMap& AdditionalHeader, + const KeyValueMap& Parameters) +{ + return m_Impl->DoAsyncPutNoPayload(std::string(Url), std::move(Callback), AdditionalHeader, Parameters); +} + +AsyncRequestToken +AsyncHttpClient::AsyncPut(std::string_view Url, + uint64_t TotalSize, + AsyncHttpReadSource Source, + AsyncHttpCallback OnComplete, + const KeyValueMap& AdditionalHeader) +{ + return m_Impl->DoAsyncPutWithSource(std::string(Url), TotalSize, std::move(Source), std::move(OnComplete), AdditionalHeader); +} + +AsyncRequestToken +AsyncHttpClient::AsyncStream(std::string_view Url, + AsyncHttpDataCallback OnData, + AsyncHttpCallback OnComplete, + const KeyValueMap& AdditionalHeader, + const KeyValueMap& Parameters) +{ + return m_Impl->DoAsyncStream(std::string(Url), std::move(OnData), std::move(OnComplete), AdditionalHeader, Parameters); } +// -- Token cancellation -------------------------------------------------- + void -AsyncHttpClient::AsyncPut(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& Parameters) +AsyncRequestToken::Cancel() { - m_Impl->DoAsyncPutNoPayload(std::string(Url), std::move(Callback), Parameters); + if (!m_State) + { + return; + } + if (m_State->Cancelled.exchange(true, std::memory_order_acq_rel)) + { + return; // already cancelled + } + if (m_State->CancelFn) + { + m_State->CancelFn(); + } } // -- Future-based API ---------------------------------------------------- @@ -1026,6 +2156,7 @@ AsyncHttpClient::Put(std::string_view Url, const KeyValueMap& Parameters) AsyncPut( Url, [Promise](Response R) { Promise->set_value(std::move(R)); }, + KeyValueMap{}, Parameters); return Future; } |