aboutsummaryrefslogtreecommitdiff
path: root/src/zenhttp/clients/asynchttpclient.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenhttp/clients/asynchttpclient.cpp')
-rw-r--r--src/zenhttp/clients/asynchttpclient.cpp1947
1 files changed, 1539 insertions, 408 deletions
diff --git a/src/zenhttp/clients/asynchttpclient.cpp b/src/zenhttp/clients/asynchttpclient.cpp
index ea88fc783..e7e904f89 100644
--- a/src/zenhttp/clients/asynchttpclient.cpp
+++ b/src/zenhttp/clients/asynchttpclient.cpp
@@ -4,8 +4,11 @@
#include "httpclientcurlhelpers.h"
+#include <zencore/basicfile.h>
#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
#include <zencore/session.h>
#include <zencore/thread.h>
#include <zencore/trace.h>
@@ -15,6 +18,9 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <asio/steady_timer.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
+#include <algorithm>
+#include <charconv>
+#include <deque>
#include <thread>
#include <unordered_map>
@@ -22,45 +28,204 @@ namespace zen {
//////////////////////////////////////////////////////////////////////////
//
+// AsyncRequestToken state (forward-declared in the public header so tokens
+// can be returned by value without leaking impl details).
+
+struct AsyncRequestToken::State
+{
+ std::function<void()> CancelFn;
+ std::atomic<bool> Cancelled = false;
+};
+
+//////////////////////////////////////////////////////////////////////////
+//
// TransferContext: per-transfer state associated with each CURL easy handle
+// Request blueprint kept alongside each transfer so retries can re-issue with
+// the original verb/url/headers/payload after the previous attempt's transient
+// failure.
+enum class AsyncRequestMethod
+{
+ Get,
+ Head,
+ Delete,
+ Post,
+ PostWithPayload,
+ Put,
+ PutWithPayload,
+ PutWithSource, // PUT, body pulled via OnReadSource (no materialized payload)
+ Stream, // GET, response body delivered via OnData callback (no copy)
+};
+
+inline std::string_view
+AsyncRequestMethodName(AsyncRequestMethod M)
+{
+ switch (M)
+ {
+ case AsyncRequestMethod::Get:
+ return "GET";
+ case AsyncRequestMethod::Head:
+ return "HEAD";
+ case AsyncRequestMethod::Delete:
+ return "DELETE";
+ case AsyncRequestMethod::Post:
+ return "POST";
+ case AsyncRequestMethod::PostWithPayload:
+ return "POST(payload)";
+ case AsyncRequestMethod::Put:
+ return "PUT";
+ case AsyncRequestMethod::PutWithPayload:
+ return "PUT(payload)";
+ case AsyncRequestMethod::PutWithSource:
+ return "PUT(stream)";
+ case AsyncRequestMethod::Stream:
+ return "GET(stream)";
+ }
+ return "?";
+}
+
+struct AsyncRequestSpec
+{
+ AsyncRequestMethod Method = AsyncRequestMethod::Get;
+ std::string Url;
+ HttpClient::KeyValueMap AdditionalHeader;
+ HttpClient::KeyValueMap Parameters;
+ IoBuffer Payload; // POST/PUT with payload
+ ZenContentType ContentType = ZenContentType::kUnknownContentType;
+ bool HasContentType = false;
+ AsyncHttpDataCallback OnData; // Stream method
+ AsyncHttpReadSource OnReadSource; // PutWithSource method
+ uint64_t StreamingPutSize = 0; // Content-Length for PutWithSource
+
+ // Opt-in header capture. By default Response::Header is left empty; inline
+ // extracts always run because they steer the body path. WantHeaderMap pays
+ // O(headers) string allocs on the io thread (rare - most callers use
+ // Response::FindHeader). WantEtag triggers an inline parse only.
+ bool WantEtag = false;
+ bool WantHeaderMap = false;
+};
+
struct TransferContext
{
- AsyncHttpCallback Callback;
- std::string Body;
- std::vector<std::pair<std::string, std::string>> ResponseHeaders;
- CurlWriteCallbackData WriteData;
- CurlHeaderCallbackData HeaderData;
- curl_slist* HeaderList = nullptr;
-
- // For PUT/POST with payload: keep the data alive until transfer completes
+ AsyncHttpCallback Callback;
+ AsyncRequestSpec Spec;
+ uint8_t AttemptCount = 0;
+ uint64_t TokenId = 0;
+ bool Cancelled = false;
+ // Curl handle owning this transfer once submitted to curl_multi. Null between
+ // Submit() and SubmitFromSpec(), and again after CompleteTransfer releases the
+ // handle back to the pool.
+ CURL* CurlHandle = nullptr;
+ // Strong reference to the user-facing AsyncRequestToken::State. Kept alive
+ // so AsyncRequestToken::Cancel() retains a valid CancelFn target until the
+ // transfer completes. The typed pointer also lets SubmitFromSpec read
+ // State.Cancelled to honour cancels that arrived between Submit() returning
+ // and the io thread running SubmitFromSpec.
+ std::shared_ptr<AsyncRequestToken::State> TokenStateRef;
+ // Two paths gated by BodyPreallocated: when Content-Length is known we
+ // fill Body in place (zero-copy move into Response); otherwise BodyChunks
+ // accumulates per-WRITE IoBuffers, flattened at completion.
+ IoBuffer Body;
+ uint64_t BodyWriteOffset = 0;
+ bool BodyPreallocated = false;
+ std::vector<IoBuffer> BodyChunks;
+
+ // Raw response headers, "Key: Value\r\n" lines as delivered by curl.
+ // One growing buffer; reserve covers common case (~1 KiB) with no realloc.
+ std::string HeaderArena;
+
+ // Captured at the curl callback boundary so an exception out of the user
+ // OnData / OnReadSource never propagates through curl's C frames (UB).
+ // Surfaced as a kInternalError response by CompleteTransfer.
+ bool CallbackFailed = false;
+ std::string CallbackErrorMessage;
+
+ // Inline-parsed in CurlHeaderCallback. Etag is populated only when
+ // Spec.WantEtag is set; the others are always parsed since they steer
+ // the body path / content-type tagging.
+ uint64_t ContentLength = 0;
+ bool ContentLengthSet = false;
+ ZenContentType BodyContentType = ZenContentType::kUnknownContentType;
+ std::string Etag;
+
+ curl_slist* HeaderList = nullptr;
+
IoBuffer PayloadBuffer;
CurlReadCallbackData ReadData;
+ uint64_t SourceOffset = 0; // PutWithSource: bytes pulled from Spec.OnReadSource so far.
+
+ // Last attempt's failure, kept across the backoff so a retry-abandoned
+ // path can surface the underlying cause instead of a generic
+ // "Request canceled (retry abandoned)". Non-empty LastErrorMessage = stash valid.
+ CURLcode LastCurlResult = CURLE_OK;
+ long LastStatusCode = 0;
+ std::string LastErrorMessage;
+
+ TransferContext(AsyncHttpCallback&& InCallback) : Callback(std::move(InCallback)) { HeaderArena.reserve(1024); }
+
+ ~TransferContext() { FreeHeaderList(); }
+
+ TransferContext(const TransferContext&) = delete;
+ TransferContext& operator=(const TransferContext&) = delete;
- TransferContext(AsyncHttpCallback&& InCallback) : Callback(std::move(InCallback))
+ // Reset accumulated response state so the same context can be re-submitted
+ // for a retry attempt.
+ void ResetForRetry()
{
- WriteData.Body = &Body;
- HeaderData.Headers = &ResponseHeaders;
+ Body = IoBuffer{};
+ BodyWriteOffset = 0;
+ BodyPreallocated = false;
+ BodyChunks.clear();
+ HeaderArena.clear(); // keep capacity
+ ContentLength = 0;
+ ContentLengthSet = false;
+ BodyContentType = ZenContentType::kUnknownContentType;
+ Etag.clear();
+ FreeHeaderList();
+ ReadData = {};
+ SourceOffset = 0;
+ CallbackFailed = false;
+ CallbackErrorMessage.clear();
+ // LastCurlResult / LastStatusCode / LastErrorMessage are intentionally NOT
+ // cleared - they describe the just-finished attempt that triggered this
+ // retry, and surface in the abandon path if the next attempt is cancelled
+ // or shutdown-aborted.
}
- ~TransferContext()
+ void FreeHeaderList()
{
if (HeaderList)
{
curl_slist_free_all(HeaderList);
+ HeaderList = nullptr;
}
}
-
- TransferContext(const TransferContext&) = delete;
- TransferContext& operator=(const TransferContext&) = delete;
};
//////////////////////////////////////////////////////////////////////////
//
-// AsyncHttpClient::Impl
+// SocketInfo: per-socket state.
-struct AsyncHttpClient::Impl
+struct AsyncSocketInfo
{
+ asio::ip::tcp::socket Socket;
+ int WatchFlags = 0; // CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT
+ int PendingFlags = 0; // directions with outstanding async_wait
+
+ // Bound to the strand executor so async_wait completions are serialized on
+ // the same strand that drives curl_multi - safe even when the underlying
+ // io_context is multithreaded (external-context mode).
+ explicit AsyncSocketInfo(const asio::strand<asio::io_context::executor_type>& Strand) : Socket(Strand) {}
+};
+
+// Holds the curl_multi instance and a strand that serializes every curl_multi op. Owned io_context
+// (default ctor) spins a private thread driving run(); external io_context mode lets the caller
+// drive the loop. The strand is the single serialization point for both modes.
+struct AsyncHttpClient::Impl : std::enable_shared_from_this<AsyncHttpClient::Impl>
+{
+ // Owned-io_context ctor: allocate a private io_context and run it on a
+ // dedicated thread. Cleanest path for callers that just want an
+ // AsyncHttpClient and don't care about the loop.
Impl(std::string_view BaseUri, const HttpClientSettings& Settings)
: m_BaseUri(BaseUri)
, m_Settings(Settings)
@@ -72,19 +237,33 @@ struct AsyncHttpClient::Impl
{
Init();
m_WorkGuard.emplace(m_IoContext.get_executor());
- m_IoThread = std::thread([this]() {
- SetCurrentThreadName("async_http");
- try
- {
- m_IoContext.run();
- }
- catch (const std::exception& Ex)
+
+ auto ThreadGuard = MakeGuard([this]() {
+ m_WorkGuard.reset();
+ if (m_IoThread.joinable())
{
- ZEN_ERROR("AsyncHttpClient: unhandled exception in io thread: {}", Ex.what());
+ m_IoThread.join();
}
});
+ m_IoThread = std::thread([this]() {
+ SetCurrentThreadName("async_http");
+ try
+ {
+ m_IoContext.run();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("AsyncHttpClient: io thread unhandled exception: {}", Ex.what());
+ }
+ });
+ ThreadGuard.Dismiss();
}
+ // External-io_context ctor: caller drives the run loop. We do NOT spawn a
+ // thread, do NOT hold a work guard, and do NOT call stop()/restart() on
+ // teardown - the caller's lifecycle owns those. Shutdown blocks on a
+ // promise until our cleanup handler runs through the strand, so the
+ // caller MUST keep the loop running until the AsyncHttpClient destructs.
Impl(std::string_view BaseUri, asio::io_context& IoContext, const HttpClientSettings& Settings)
: m_BaseUri(BaseUri)
, m_Settings(Settings)
@@ -96,70 +275,167 @@ struct AsyncHttpClient::Impl
Init();
}
- ~Impl()
+ ~Impl() { Shutdown(); }
+
+ // Synchronous teardown from ~AsyncHttpClient. Idempotent. Branches by ownership:
+ // - Owned io_context: post Cleanup, drop the work guard, force run() to return, join the io
+ // thread, then poll() to drain lambdas that captured shared_ptr<Impl> (the "lambda in
+ // io_context owned by Impl" cycle would otherwise pin Impl alive forever).
+ // - External io_context: caller drives the loop, post Cleanup to the strand and block on a
+ // promise. Destroying from the io thread itself would deadlock.
+ void Shutdown()
{
- // Clean up curl state on the strand where all curl_multi operations
- // are serialized. Use a promise to block until the cleanup handler
- // has actually executed - essential for the external io_context case
- // where we don't own the run loop.
- std::promise<void> Done;
- std::future<void> DoneFuture = Done.get_future();
-
- asio::post(m_Strand, [this, &Done]() {
- m_ShuttingDown = true;
- m_Timer.cancel();
-
- // Release all tracked sockets (don't close - curl owns the fds).
- for (auto& [Fd, Info] : m_Sockets)
- {
- if (Info->Socket.is_open())
- {
- Info->Socket.cancel();
- Info->Socket.release();
- }
- }
- m_Sockets.clear();
+ if (m_ShutdownDone.exchange(true, std::memory_order_acq_rel))
+ {
+ return;
+ }
- for (auto& [Handle, Ctx] : m_Transfers)
+ if (m_OwnedIoContext)
+ {
+ // Post Cleanup before releasing the work guard so the io thread
+ // runs Cleanup before run() returns. Run() normally exits when
+ // the queue is empty AND no work guard is held.
+ asio::post(m_Strand, [this]() { Cleanup(); });
+ m_WorkGuard.reset();
+ // Belt-and-suspenders: force run() to return even if asio's
+ // outstanding_work_ counter is left non-zero by a close-during-
+ // cancel race in win_iocp_socket_service. The race is observable
+ // as a hung join() at shutdown after a burst of socket teardowns
+ // inside curl_multi_cleanup; stop() here is purely a safety net
+ // for the teardown path. The trailing restart() + poll() drains
+ // any handlers stop() leaves undispatched.
+ m_IoContext.stop();
+ if (m_IoThread.joinable())
{
- curl_multi_remove_handle(m_Multi, Handle);
- curl_easy_cleanup(Handle);
+ m_IoThread.join();
}
- m_Transfers.clear();
- for (CURL* Handle : m_HandlePool)
+ // Drain any leftover work posted to the io_context but not run
+ // before the thread exited (e.g. a Cancel-after-completion lambda
+ // that captured shared_ptr<Impl> by value). Loop until the queue
+ // is empty: a single poll() is one quanta and may not drain
+ // handlers that themselves post follow-ups.
+ m_IoContext.restart();
+ while (m_IoContext.poll() != 0)
{
- curl_easy_cleanup(Handle);
}
- m_HandlePool.clear();
-
- Done.set_value();
- });
+ }
+ else
+ {
+ // External: block on the cleanup handler so we can guarantee
+ // curl_multi state is gone before m_Impl drops.
+ std::promise<void> Done;
+ std::future<void> DoneFuture = Done.get_future();
+ asio::post(m_Strand, [this, &Done]() {
+ Cleanup();
+ Done.set_value();
+ });
+ DoneFuture.wait();
+ }
+ }
- // For owned io_context: release work guard so run() can return after
- // processing the cleanup handler above.
- m_WorkGuard.reset();
+ // Cleanup body, run on the io thread.
+ void Cleanup()
+ {
+ m_ShuttingDown = true;
+ m_Timer.cancel();
- if (m_IoThread.joinable())
+ // Tear down curl handles first; curl drives CURL_POLL_REMOVE +
+ // CLOSESOCKETFUNCTION for each owned socket, which our handlers
+ // use to retire SocketInfo entries from m_Sockets.
+ for (auto& [TokenId, Ctx] : m_Transfers)
{
- m_IoThread.join();
+ if (Ctx->CurlHandle)
+ {
+ curl_multi_remove_handle(m_Multi, Ctx->CurlHandle);
+ curl_easy_cleanup(Ctx->CurlHandle);
+ Ctx->CurlHandle = nullptr;
+ }
+
+ HttpClient::Response Resp;
+ Resp.Error = HttpClient::ErrorContext{
+ .ErrorCode = HttpClientErrorCode::kRequestCancelled,
+ .ErrorMessage = "AsyncHttpClient shutting down",
+ };
+ try
+ {
+ Ctx->Callback(std::move(Resp));
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("AsyncHttpClient: unhandled exception in shutdown callback (token={}, {} {}): {}",
+ TokenId,
+ AsyncRequestMethodName(Ctx->Spec.Method),
+ Ctx->Spec.Url,
+ Ex.what());
+ }
}
- else
+ m_Transfers.clear();
+ m_InFlight = 0;
+
+ // Drain transfers parked in retry backoff. The timer's pending
+ // async_wait fires after Cleanup with the io_context already stopped;
+ // it will find no entry and be a no-op. Fire cancel callbacks here
+ // while we still hold the storage so callers' futures resolve.
+ for (auto& [Id, Entry] : m_RetryingTransfers)
{
- // External io_context: wait for the cleanup handler to complete.
- DoneFuture.wait();
+ if (Entry.Timer)
+ {
+ Entry.Timer->cancel();
+ }
+ HttpClient::Response Resp;
+ Resp.Error = HttpClient::ErrorContext{
+ .ErrorCode = HttpClientErrorCode::kRequestCancelled,
+ .ErrorMessage = "AsyncHttpClient shutting down",
+ };
+ try
+ {
+ Entry.Ctx->Callback(std::move(Resp));
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("AsyncHttpClient: unhandled exception in shutdown retry callback (token={}, {} {}): {}",
+ Id,
+ AsyncRequestMethodName(Entry.Ctx->Spec.Method),
+ Entry.Ctx->Spec.Url,
+ Ex.what());
+ }
}
+ m_RetryingTransfers.clear();
+ // curl_multi_cleanup walks the connection cache and fires
+ // CLOSESOCKETFUNCTION for each cached fd. Run it on the io thread
+ // while m_Sockets is still populated so our callback routes each
+ // close through the map (Socket dtor closes fd exactly once).
if (m_Multi)
{
curl_multi_cleanup(m_Multi);
+ m_Multi = nullptr;
+ }
+
+ for (CURL* Handle : m_HandlePool)
+ {
+ curl_easy_cleanup(Handle);
+ }
+ m_HandlePool.clear();
+
+ asio::error_code CancelEc;
+ for (auto& [Fd, Info] : m_Sockets)
+ {
+ Info->Socket.cancel(CancelEc);
}
+ m_Sockets.clear();
}
LoggerRef Log() { return m_Log; }
void Init()
{
+ if (!m_Settings.UnixSocketPath.empty())
+ {
+ m_UnixSocketPathUtf8 = PathToUtf8(m_Settings.UnixSocketPath);
+ }
+
m_Multi = curl_multi_init();
if (!m_Multi)
{
@@ -168,6 +444,24 @@ struct AsyncHttpClient::Impl
SetupMultiCallbacks();
+ if (m_Settings.MaxConcurrentConnectionsPerHost != 0)
+ {
+ curl_multi_setopt(m_Multi, CURLMOPT_MAX_HOST_CONNECTIONS, static_cast<long>(m_Settings.MaxConcurrentConnectionsPerHost));
+ }
+ if (m_Settings.MaxConcurrentConnectionsTotal != 0)
+ {
+ curl_multi_setopt(m_Multi, CURLMOPT_MAX_TOTAL_CONNECTIONS, static_cast<long>(m_Settings.MaxConcurrentConnectionsTotal));
+ }
+
+ // Size the idle-conn cache to the in-flight cap so reused conns are
+ // never evicted while requests are queued. Each eviction costs a
+ // fresh TCP+TLS handshake (~280ms WAN to S3) on the next reuse.
+ const long MaxConnectsHint = static_cast<long>(std::max({m_Settings.MaxConcurrentRequests,
+ m_Settings.MaxConcurrentConnectionsTotal,
+ m_Settings.MaxConcurrentConnectionsPerHost,
+ 128u}));
+ curl_multi_setopt(m_Multi, CURLMOPT_MAXCONNECTS, MaxConnectsHint);
+
if (m_Settings.SessionId == Oid::Zero)
{
m_SessionId = std::string(GetSessionIdString());
@@ -178,6 +472,30 @@ struct AsyncHttpClient::Impl
}
}
+ // Run a completion callback inline on the io thread. By the time this is
+ // called, curl_multi_remove_handle + curl_easy_cleanup have already finalized
+ // the easy handle, so deferring to next io tick buys nothing. Direct call
+ // saves one alloc + queue insert per request.
+ //
+ // CONTRACT: user callbacks run on the AsyncHttpClient io thread. Heavy work
+ // (disk syscalls, lock contention, large allocations) must be hopped to a
+ // worker pool; otherwise it stalls curl_multi for ALL in-flight transfers.
+ void DispatchCallback(AsyncHttpCallback Cb, HttpClient::Response Resp, const TransferContext& Ctx)
+ {
+ try
+ {
+ Cb(std::move(Resp));
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("AsyncHttpClient: unhandled exception in completion callback (token={}, {} {}): {}",
+ Ctx.TokenId,
+ AsyncRequestMethodName(Ctx.Spec.Method),
+ Ctx.Spec.Url,
+ Ex.what());
+ }
+ }
+
// -- Handle pool -----------------------------------------------------
CURL* AllocHandle()
@@ -199,20 +517,15 @@ struct AsyncHttpClient::Impl
void ReleaseHandle(CURL* Handle) { m_HandlePool.push_back(Handle); }
- // -- Configure a handle with common settings -------------------------
- // Called only from DoAsync* lambdas running on the strand.
-
+ // Called only from DoAsync* lambdas running on the io thread.
void ConfigureHandle(CURL* Handle, std::string_view ResourcePath, const HttpClient::KeyValueMap& Parameters)
{
- // Build URL
ExtendableStringBuilder<256> Url;
BuildUrlWithParameters(Url, m_BaseUri, ResourcePath, Parameters);
curl_easy_setopt(Handle, CURLOPT_URL, Url.c_str());
- // Unix domain socket
if (!m_Settings.UnixSocketPath.empty())
{
- m_UnixSocketPathUtf8 = PathToUtf8(m_Settings.UnixSocketPath);
curl_easy_setopt(Handle, CURLOPT_UNIX_SOCKET_PATH, m_UnixSocketPathUtf8.c_str());
}
@@ -226,12 +539,6 @@ struct AsyncHttpClient::Impl
curl_easy_setopt(Handle, CURLOPT_TIMEOUT_MS, static_cast<long>(m_Settings.Timeout.count()));
}
- // HTTP/2
- if (m_Settings.AssumeHttp2)
- {
- curl_easy_setopt(Handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2_PRIOR_KNOWLEDGE);
- }
-
// SSL
if (m_Settings.InsecureSsl)
{
@@ -243,7 +550,6 @@ struct AsyncHttpClient::Impl
curl_easy_setopt(Handle, CURLOPT_CAINFO, m_Settings.CaBundlePath.c_str());
}
- // Verbose/debug
if (m_Settings.Verbose)
{
curl_easy_setopt(Handle, CURLOPT_VERBOSE, 1L);
@@ -252,6 +558,22 @@ struct AsyncHttpClient::Impl
// Thread safety
curl_easy_setopt(Handle, CURLOPT_NOSIGNAL, 1L);
+ // 256 KiB recv buffer aligns with optimal read syscall size and matches
+ // upload buffer; pairs with downstream 512 KiB write slots (2 recv calls
+ // per write slot under bulk transfer).
+ curl_easy_setopt(Handle, CURLOPT_BUFFERSIZE, 262144L);
+ curl_easy_setopt(Handle, CURLOPT_UPLOAD_BUFFERSIZE, 262144L);
+
+ // Skip per-transfer progress bookkeeping; we don't consume it.
+ curl_easy_setopt(Handle, CURLOPT_NOPROGRESS, 1L);
+
+ // Disable Nagle (default since curl 7.50; explicit for safety).
+ curl_easy_setopt(Handle, CURLOPT_TCP_NODELAY, 1L);
+
+ // Take ownership of socket close (see CurlCloseSocketCallback).
+ curl_easy_setopt(Handle, CURLOPT_CLOSESOCKETFUNCTION, &CurlCloseSocketCallback);
+ curl_easy_setopt(Handle, CURLOPT_CLOSESOCKETDATA, this);
+
if (m_Settings.ForbidReuseConnection)
{
curl_easy_setopt(Handle, CURLOPT_FORBID_REUSE, 1L);
@@ -260,23 +582,24 @@ struct AsyncHttpClient::Impl
// -- Access token ----------------------------------------------------
- std::optional<std::string> GetAccessToken()
+ struct AccessTokenResult
{
+ std::optional<std::string> Token;
+ bool ProviderFailed = false; // provider configured but returned invalid twice
+ };
+
+ // Called only on the io thread.
+ AccessTokenResult GetAccessToken()
+ {
+ AccessTokenResult Result;
if (!m_Settings.AccessTokenProvider.has_value())
{
- return {};
+ return Result; // No provider: anonymous is the intended mode.
}
- {
- RwLock::SharedLockScope _(m_AccessTokenLock);
- if (!m_CachedAccessToken.NeedsRefresh())
- {
- return m_CachedAccessToken.GetValue();
- }
- }
- RwLock::ExclusiveLockScope _(m_AccessTokenLock);
if (!m_CachedAccessToken.NeedsRefresh())
{
- return m_CachedAccessToken.GetValue();
+ Result.Token = m_CachedAccessToken.GetValue();
+ return Result;
}
HttpClientAccessToken NewToken = m_Settings.AccessTokenProvider.value()();
if (!NewToken.IsValid())
@@ -287,10 +610,176 @@ struct AsyncHttpClient::Impl
if (NewToken.IsValid())
{
m_CachedAccessToken = NewToken;
- return m_CachedAccessToken.GetValue();
+ Result.Token = m_CachedAccessToken.GetValue();
+ return Result;
}
ZEN_WARN("AsyncHttpClient: access token provider returned invalid token");
- return {};
+ Result.ProviderFailed = true;
+ return Result;
+ }
+
+ // -- Submit / resubmit -----------------------------------------------
+ //
+ // SubmitFromSpec runs on the io thread. Used for both the initial submission
+ // and for retries: the AsyncRequestSpec inside Ctx encodes everything
+ // needed to (re)build the curl handle from scratch.
+
+ void SubmitFromSpec(std::unique_ptr<TransferContext> Ctx)
+ {
+ if (m_ShuttingDown)
+ {
+ // Synthesize a cancel response so the user callback fires exactly once.
+ // Without this any Ctx that lands here post-shutdown would be dropped
+ // silently, leaving waiting futures unresolved.
+ HttpClient::Response CancelResp;
+ CancelResp.Error = HttpClient::ErrorContext{
+ .ErrorCode = HttpClientErrorCode::kRequestCancelled,
+ .ErrorMessage = "Request canceled (client shutting down)",
+ };
+ DispatchCallback(std::move(Ctx->Callback), std::move(CancelResp), *Ctx);
+ return;
+ }
+
+ // Cancel-before-submit race: the user can call AsyncRequestToken::Cancel()
+ // between Submit() returning and the io thread running SubmitFromSpec.
+ // State.Cancelled is set under acq_rel by Cancel(); read here under
+ // acquire ensures visibility. If set, fire the cancel callback exactly
+ // once and bail before the transfer enters m_Transfers.
+ if (Ctx->TokenStateRef && Ctx->TokenStateRef->Cancelled.load(std::memory_order_acquire))
+ {
+ HttpClient::Response CancelResp;
+ CancelResp.Error = HttpClient::ErrorContext{
+ .ErrorCode = HttpClientErrorCode::kRequestCancelled,
+ .ErrorMessage = "Request canceled before submit",
+ };
+ DispatchCallback(std::move(Ctx->Callback), std::move(CancelResp), *Ctx);
+ return;
+ }
+
+ // Allocate the curl handle BEFORE bumping m_InFlight: AllocHandle can throw
+ // (curl_easy_init returning null). The InFlightGuard covers the rest of
+ // the body (BuildHeaderList, ExtraHeaders push_back, GetAccessToken can
+ // all throw bad_alloc) so a throw post-increment doesn't leak the slot.
+ // HandleGuard returns the handle to the pool on throw; CallbackGuard
+ // synthesizes a kInternalError response so the user future resolves.
+ CURL* Handle = AllocHandle();
+ ++m_InFlight;
+ auto InFlightGuard = MakeGuard([this] { --m_InFlight; });
+ auto HandleGuard = MakeGuard([this, Handle] { ReleaseHandle(Handle); });
+ auto CallbackGuard = MakeGuard([this, &Ctx] {
+ HttpClient::Response ErrResp;
+ ErrResp.Error = HttpClient::ErrorContext{
+ .ErrorCode = HttpClientErrorCode::kInternalError,
+ .ErrorMessage = "AsyncHttpClient::SubmitFromSpec: setup threw before dispatch",
+ };
+ DispatchCallback(std::move(Ctx->Callback), std::move(ErrResp), *Ctx);
+ });
+ ConfigureHandle(Handle, Ctx->Spec.Url, Ctx->Spec.Parameters);
+
+ switch (Ctx->Spec.Method)
+ {
+ case AsyncRequestMethod::Get:
+ curl_easy_setopt(Handle, CURLOPT_HTTPGET, 1L);
+ break;
+
+ case AsyncRequestMethod::Stream:
+ curl_easy_setopt(Handle, CURLOPT_HTTPGET, 1L);
+ break;
+
+ case AsyncRequestMethod::Head:
+ curl_easy_setopt(Handle, CURLOPT_NOBODY, 1L);
+ break;
+
+ case AsyncRequestMethod::Delete:
+ curl_easy_setopt(Handle, CURLOPT_CUSTOMREQUEST, "DELETE");
+ break;
+
+ case AsyncRequestMethod::Post:
+ curl_easy_setopt(Handle, CURLOPT_POST, 1L);
+ curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE, 0L);
+ break;
+
+ case AsyncRequestMethod::PostWithPayload:
+ {
+ curl_easy_setopt(Handle, CURLOPT_POST, 1L);
+ Ctx->PayloadBuffer = Ctx->Spec.Payload;
+ Ctx->ReadData.DataPtr = static_cast<const uint8_t*>(Ctx->PayloadBuffer.GetData());
+ Ctx->ReadData.DataSize = Ctx->PayloadBuffer.GetSize();
+ Ctx->ReadData.Offset = 0;
+ curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE_LARGE, static_cast<curl_off_t>(Ctx->PayloadBuffer.GetSize()));
+ curl_easy_setopt(Handle, CURLOPT_READFUNCTION, CurlReadCallback);
+ curl_easy_setopt(Handle, CURLOPT_READDATA, &Ctx->ReadData);
+ break;
+ }
+
+ case AsyncRequestMethod::Put:
+ curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L);
+ curl_easy_setopt(Handle, CURLOPT_INFILESIZE_LARGE, 0LL);
+ break;
+
+ case AsyncRequestMethod::PutWithPayload:
+ {
+ curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L);
+ Ctx->PayloadBuffer = Ctx->Spec.Payload;
+ Ctx->ReadData.DataPtr = static_cast<const uint8_t*>(Ctx->PayloadBuffer.GetData());
+ Ctx->ReadData.DataSize = Ctx->PayloadBuffer.GetSize();
+ Ctx->ReadData.Offset = 0;
+ curl_easy_setopt(Handle, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(Ctx->PayloadBuffer.GetSize()));
+ curl_easy_setopt(Handle, CURLOPT_READFUNCTION, CurlReadCallback);
+ curl_easy_setopt(Handle, CURLOPT_READDATA, &Ctx->ReadData);
+ break;
+ }
+
+ case AsyncRequestMethod::PutWithSource:
+ {
+ curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L);
+ Ctx->SourceOffset = 0;
+ curl_easy_setopt(Handle, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(Ctx->Spec.StreamingPutSize));
+ curl_easy_setopt(Handle, CURLOPT_READFUNCTION, AsyncCurlSourceReadCallback);
+ curl_easy_setopt(Handle, CURLOPT_READDATA, Ctx.get());
+ break;
+ }
+ }
+
+ // Headers - include Content-Type for payload-bearing methods, Content-Length: 0 for empty PUT.
+ std::vector<std::pair<std::string, std::string>> ExtraHeaders;
+ if (Ctx->Spec.Method == AsyncRequestMethod::PostWithPayload)
+ {
+ const ZenContentType Effective = Ctx->Spec.HasContentType ? Ctx->Spec.ContentType : Ctx->Spec.Payload.GetContentType();
+ ExtraHeaders.emplace_back("Content-Type", std::string(MapContentTypeToString(Effective)));
+ }
+ else if (Ctx->Spec.Method == AsyncRequestMethod::PutWithPayload)
+ {
+ ExtraHeaders.emplace_back("Content-Type", std::string(MapContentTypeToString(Ctx->Spec.Payload.GetContentType())));
+ }
+ else if (Ctx->Spec.Method == AsyncRequestMethod::Put)
+ {
+ ExtraHeaders.emplace_back("Content-Length", "0");
+ }
+
+ AccessTokenResult Token = GetAccessToken();
+ if (Token.ProviderFailed)
+ {
+ // Provider configured but failed twice: do NOT silently downgrade
+ // to an anonymous request - the server will respond 403 and the
+ // caller has no way to tell auth failed.
+ HttpClient::Response ErrResp;
+ ErrResp.Error = HttpClient::ErrorContext{
+ .ErrorCode = HttpClientErrorCode::kInternalError,
+ .ErrorMessage = "AsyncHttpClient: access token provider failed; refusing to issue anonymous request",
+ };
+ DispatchCallback(std::move(Ctx->Callback), std::move(ErrResp), *Ctx);
+ CallbackGuard.Dismiss();
+ return;
+ }
+
+ Ctx->HeaderList = BuildHeaderList(Ctx->Spec.AdditionalHeader, m_SessionId, std::move(Token.Token), ExtraHeaders);
+ curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
+
+ InFlightGuard.Dismiss();
+ HandleGuard.Dismiss();
+ CallbackGuard.Dismiss();
+ SubmitTransfer(Handle, std::move(Ctx));
}
// -- Submit a transfer -----------------------------------------------
@@ -298,56 +787,66 @@ struct AsyncHttpClient::Impl
void SubmitTransfer(CURL* Handle, std::unique_ptr<TransferContext> Ctx)
{
ZEN_TRACE_CPU("AsyncHttpClient::SubmitTransfer");
- // Setup write/header callbacks
- curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, CurlWriteCallback);
- curl_easy_setopt(Handle, CURLOPT_WRITEDATA, &Ctx->WriteData);
- curl_easy_setopt(Handle, CURLOPT_HEADERFUNCTION, CurlHeaderCallback);
- curl_easy_setopt(Handle, CURLOPT_HEADERDATA, &Ctx->HeaderData);
-
- m_Transfers[Handle] = std::move(Ctx);
-
+ // Pick the WRITE callback by method:
+ // - Stream: forwards each chunk to the caller's OnData (no copy)
+ // - other: buffers bytes in TransferContext::Body
+ if (Ctx->Spec.Method == AsyncRequestMethod::Stream)
+ {
+ curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, AsyncCurlStreamWriteCallback);
+ curl_easy_setopt(Handle, CURLOPT_WRITEDATA, Ctx.get());
+ }
+ else
+ {
+ curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, AsyncCurlWriteCallback);
+ curl_easy_setopt(Handle, CURLOPT_WRITEDATA, Ctx.get());
+ }
+ curl_easy_setopt(Handle, CURLOPT_HEADERFUNCTION, AsyncCurlHeaderCallback);
+ curl_easy_setopt(Handle, CURLOPT_HEADERDATA, Ctx.get());
+ // Stash TokenId on the curl handle so CheckCompleted can look up the
+ // TransferContext directly from curl_multi_info_read's CURLMsg.easy_handle.
+ curl_easy_setopt(Handle, CURLOPT_PRIVATE, reinterpret_cast<void*>(static_cast<uintptr_t>(Ctx->TokenId)));
+
+ Ctx->CurlHandle = Handle;
+ const uint64_t TokenIdLocal = Ctx->TokenId;
+
+ // Try the curl_multi add first. On failure, Ctx is still owned locally so the
+ // rollback is a single Release path with no map churn. On success, ownership
+ // moves into the single TokenId-keyed lookup table.
CURLMcode Mc = curl_multi_add_handle(m_Multi, Handle);
if (Mc != CURLM_OK)
{
- auto Stolen = std::move(m_Transfers[Handle]);
- m_Transfers.erase(Handle);
+ Ctx->CurlHandle = nullptr;
ReleaseHandle(Handle);
HttpClient::Response ErrorResponse;
ErrorResponse.Error =
HttpClient::ErrorContext{.ErrorCode = HttpClientErrorCode::kInternalError,
.ErrorMessage = fmt::format("curl_multi_add_handle failed: {}", curl_multi_strerror(Mc))};
- asio::post(m_IoContext,
- [Cb = std::move(Stolen->Callback), Response = std::move(ErrorResponse)]() mutable { Cb(std::move(Response)); });
+ DispatchCallback(std::move(Ctx->Callback), std::move(ErrorResponse), *Ctx);
+ OnSlotFreed();
return;
}
- }
- // -- Socket-action integration ---------------------------------------
- //
- // curl_multi drives I/O via two callbacks:
- // - SocketCallback: curl tells us which sockets to watch for read/write
- // - TimerCallback: curl tells us when to fire a timeout
- //
- // On each socket event or timeout we call curl_multi_socket_action(),
- // then drain completed transfers via curl_multi_info_read().
+ m_Transfers.emplace(TokenIdLocal, std::move(Ctx));
+ }
- // Per-socket state: wraps the native fd in an ASIO socket for async_wait.
- struct SocketInfo
+ // Telemetry only; this client does not gate fan-out. Callers (e.g.
+ // S3AsyncStorage) layer their own admission semaphore on top.
+ // Assert catches SubmitFromSpec/OnSlotFreed imbalance.
+ void OnSlotFreed()
{
- asio::ip::tcp::socket Socket;
- int WatchFlags = 0; // CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT
-
- explicit SocketInfo(asio::io_context& IoContext) : Socket(IoContext) {}
- };
+ ZEN_ASSERT(m_InFlight > 0);
+ --m_InFlight;
+ }
- // Static thunks registered with curl_multi ----------------------------
+ // curl_multi drives I/O via SocketCallback (which fds to watch) and TimerCallback (when to fire).
+ // On each event we call curl_multi_socket_action() and drain via curl_multi_info_read().
+ // Static thunks: UserData = Impl* (set via CURLMOPT_SOCKETDATA / CURLMOPT_TIMERDATA). Bodies run on io thread.
static int CurlSocketCallback(CURL* Easy, curl_socket_t Fd, int Action, void* UserPtr, void* SocketPtr)
{
- ZEN_UNUSED(Easy);
auto* Self = static_cast<Impl*>(UserPtr);
- Self->OnCurlSocket(Fd, Action, static_cast<SocketInfo*>(SocketPtr));
+ Self->OnCurlSocket(Easy, Fd, Action, static_cast<AsyncSocketInfo*>(SocketPtr));
return 0;
}
@@ -359,6 +858,219 @@ struct AsyncHttpClient::Impl
return 0;
}
+ // Async-specific HEADER callback. Appends raw "Key: Value\r\n" lines to
+ // Ctx->HeaderArena (one growing buffer; ~zero allocs in the common case
+ // where the initial reserve covers the response). Inline-parses
+ // Content-Length and Content-Type unconditionally; parses ETag only when
+ // Spec.WantEtag is set. No std::string/pair allocations per line.
+ static size_t AsyncCurlHeaderCallback(char* Buffer, size_t Size, size_t Nmemb, void* UserData)
+ {
+ auto* Ctx = static_cast<TransferContext*>(UserData);
+ const size_t TotalBytes = Size * Nmemb;
+ std::string_view Line(Buffer, TotalBytes);
+
+ Ctx->HeaderArena.append(Buffer, TotalBytes);
+
+ while (!Line.empty() && (Line.back() == '\r' || Line.back() == '\n'))
+ {
+ Line.remove_suffix(1);
+ }
+ if (Line.empty())
+ {
+ return TotalBytes;
+ }
+ const size_t Colon = Line.find(':');
+ if (Colon == std::string_view::npos)
+ {
+ return TotalBytes; // HTTP status line or malformed
+ }
+ std::string_view Key = Line.substr(0, Colon);
+ std::string_view Value = Line.substr(Colon + 1);
+ while (!Key.empty() && Key.back() == ' ')
+ {
+ Key.remove_suffix(1);
+ }
+ while (!Value.empty() && Value.front() == ' ')
+ {
+ Value.remove_prefix(1);
+ }
+
+ if (StrCaseEquals(Key, "Content-Length"))
+ {
+ uint64_t Length = 0;
+ std::from_chars_result Res = std::from_chars(Value.data(), Value.data() + Value.size(), Length);
+ if (Res.ec == std::errc{})
+ {
+ Ctx->ContentLength = Length;
+ Ctx->ContentLengthSet = true;
+ }
+ }
+ else if (StrCaseEquals(Key, "Content-Type"))
+ {
+ Ctx->BodyContentType = ParseContentType(Value);
+ }
+ else if (Ctx->Spec.WantEtag && StrCaseEquals(Key, "ETag"))
+ {
+ Ctx->Etag.assign(Value);
+ }
+
+ return TotalBytes;
+ }
+
+ // Async-specific write callback. Targets a TransferContext directly.
+ // Preallocates Body from Ctx->ContentLength (parsed in HEADER cb). If
+ // Content-Length is absent (e.g. chunked encoding), falls back to
+ // BodyChunks accumulation; CompleteTransfer flattens at the end.
+ static size_t AsyncCurlWriteCallback(char* Ptr, size_t Size, size_t Nmemb, void* UserData)
+ {
+ auto* Ctx = static_cast<TransferContext*>(UserData);
+ const size_t TotalBytes = Size * Nmemb;
+ if (TotalBytes == 0)
+ {
+ return 0;
+ }
+
+ if (!Ctx->BodyPreallocated && Ctx->BodyWriteOffset == 0 && Ctx->BodyChunks.empty() && Ctx->ContentLengthSet &&
+ Ctx->ContentLength > 0)
+ {
+ Ctx->Body = IoBuffer(static_cast<size_t>(Ctx->ContentLength));
+ Ctx->BodyPreallocated = true;
+ }
+
+ if (Ctx->BodyPreallocated)
+ {
+ if (Ctx->BodyWriteOffset + TotalBytes > Ctx->Body.GetSize())
+ {
+ // Server sent more than Content-Length advertised; abort.
+ return 0;
+ }
+ memcpy(static_cast<uint8_t*>(Ctx->Body.MutableData()) + Ctx->BodyWriteOffset, Ptr, TotalBytes);
+ Ctx->BodyWriteOffset += TotalBytes;
+ }
+ else
+ {
+ IoBuffer Chunk(TotalBytes);
+ memcpy(Chunk.MutableData(), Ptr, TotalBytes);
+ Ctx->BodyChunks.push_back(std::move(Chunk));
+ }
+
+ return TotalBytes;
+ }
+
+ // PutWithSource read callback. Pulls up to MaxBytes from Spec.OnReadSource
+ // into curl's send buffer. Source closure runs on the io thread - same
+ // strand discipline as Stream's OnData. Returning 0 with SourceOffset <
+ // StreamingPutSize signals an upload abort to curl.
+ static size_t AsyncCurlSourceReadCallback(char* Buffer, size_t Size, size_t Nmemb, void* UserData)
+ {
+ auto* Ctx = static_cast<TransferContext*>(UserData);
+ const size_t MaxBytes = Size * Nmemb;
+ if (MaxBytes == 0 || !Ctx->Spec.OnReadSource)
+ {
+ return 0;
+ }
+ // Catch at the curl boundary: user OnReadSource may call into IoBuffer
+ // allocation, file reads, or async dispatch which can throw. Letting an
+ // exception unwind through curl's C frames is UB.
+ size_t Pulled = 0;
+ try
+ {
+ Pulled = Ctx->Spec.OnReadSource(reinterpret_cast<uint8_t*>(Buffer), MaxBytes, Ctx->SourceOffset);
+ }
+ catch (const std::exception& Ex)
+ {
+ Ctx->CallbackFailed = true;
+ Ctx->CallbackErrorMessage = fmt::format("upload source callback threw: {}", Ex.what());
+ return CURL_READFUNC_ABORT;
+ }
+ catch (...)
+ {
+ Ctx->CallbackFailed = true;
+ Ctx->CallbackErrorMessage = "upload source callback threw unknown exception";
+ return CURL_READFUNC_ABORT;
+ }
+ if (Pulled == 0 && Ctx->SourceOffset < Ctx->Spec.StreamingPutSize)
+ {
+ return CURL_READFUNC_ABORT;
+ }
+ Ctx->SourceOffset += Pulled;
+ return Pulled;
+ }
+
+ // Stream-method write callback. Hands each chunk to the caller's OnData
+ // without allocating or copying. The pointer is curl's internal receive
+ // buffer; valid only for the duration of this call. Caller's OnData runs
+ // on the io thread, so blocking work (disk write etc) blocks the poll
+ // loop. TotalSize comes from inline-parsed Content-Length (0 if absent /
+ // chunked).
+ static size_t AsyncCurlStreamWriteCallback(char* Ptr, size_t Size, size_t Nmemb, void* UserData)
+ {
+ auto* Ctx = static_cast<TransferContext*>(UserData);
+ const size_t TotalBytes = Size * Nmemb;
+ if (TotalBytes == 0)
+ {
+ return 0;
+ }
+
+ if (!Ctx->Spec.OnData)
+ {
+ // Stream method requires OnData by contract; reaching this branch
+ // is API misuse. Returning TotalBytes (success) would silently
+ // drop the body and report "ok"; fail loudly instead.
+ Ctx->CallbackFailed = true;
+ Ctx->CallbackErrorMessage = "stream request submitted without OnData callback";
+ return 0;
+ }
+
+ // Catch at the curl boundary so an exception inside the user OnData
+ // (e.g. IoBuffer alloc failure, ScheduleWork rejection, ZEN_ASSERT in a
+ // downstream pool) cannot propagate through curl's C frames. Stash on
+ // Ctx so CompleteTransfer surfaces it as kInternalError.
+ try
+ {
+ const bool ContinueTransfer = Ctx->Spec.OnData(reinterpret_cast<const uint8_t*>(Ptr), TotalBytes, Ctx->ContentLength);
+ return ContinueTransfer ? TotalBytes : 0; // returning 0 aborts
+ }
+ catch (const std::exception& Ex)
+ {
+ Ctx->CallbackFailed = true;
+ Ctx->CallbackErrorMessage = fmt::format("stream data callback threw: {}", Ex.what());
+ return 0;
+ }
+ catch (...)
+ {
+ Ctx->CallbackFailed = true;
+ Ctx->CallbackErrorMessage = "stream data callback threw unknown exception";
+ return 0;
+ }
+ }
+
+ // Take ownership of socket close. CLOSESOCKETFUNCTION is invoked from
+ // within curl_multi operations which run on the io thread, so direct map
+ // access is safe. The asio tcp::socket destructor closes the fd; we
+ // return 0 to tell curl the close succeeded. Letting curl close as well
+ // would race (double-close, fd-reuse hazard) and on Windows IOCP
+ // `release()` throws `operation_not_supported`, killing the io thread.
+ static int CurlCloseSocketCallback(void* ClientPtr, curl_socket_t Fd)
+ {
+ auto* Self = static_cast<Impl*>(ClientPtr);
+ auto It = Self->m_Sockets.find(Fd);
+ if (It != Self->m_Sockets.end())
+ {
+ asio::error_code Ec;
+ It->second->Socket.cancel(Ec);
+ Self->m_Sockets.erase(It);
+ return 0;
+ }
+ // Fd not tracked (e.g. pre-poll-add or post-shutdown); close directly.
+#if ZEN_PLATFORM_WINDOWS
+ ::closesocket(Fd);
+#else
+ ::close(Fd);
+#endif
+ return 0;
+ }
+
void SetupMultiCallbacks()
{
curl_multi_setopt(m_Multi, CURLMOPT_SOCKETFUNCTION, CurlSocketCallback);
@@ -369,45 +1081,97 @@ struct AsyncHttpClient::Impl
// Called by curl when socket watch state changes ---------------------
- void OnCurlSocket(curl_socket_t Fd, int Action, SocketInfo* Info)
+ // Synthesize a transport-level failure for the easy handle currently bound
+ // to the curl_multi entry that owns Fd. Used when the asio side cannot bind
+ // the fd; without this the affected transfer would hang on curl's
+ // connect/transfer timeout instead of failing fast with the real error.
+ void FailEasyHandleForFd(CURL* Easy, std::string_view Reason)
+ {
+ if (!Easy)
+ {
+ return;
+ }
+ curl_multi_remove_handle(m_Multi, Easy);
+
+ char* Private = nullptr;
+ curl_easy_getinfo(Easy, CURLINFO_PRIVATE, &Private);
+ const uint64_t TokenId = static_cast<uint64_t>(reinterpret_cast<uintptr_t>(Private));
+
+ auto It = m_Transfers.find(TokenId);
+ if (It == m_Transfers.end())
+ {
+ ReleaseHandle(Easy);
+ return;
+ }
+
+ std::unique_ptr<TransferContext> Ctx = std::move(It->second);
+ m_Transfers.erase(It);
+ Ctx->CurlHandle = nullptr;
+ ReleaseHandle(Easy);
+
+ HttpClient::Response Resp;
+ Resp.Error = HttpClient::ErrorContext{
+ .ErrorCode = HttpClientErrorCode::kInternalError,
+ .ErrorMessage = std::string(Reason),
+ };
+ DispatchCallback(std::move(Ctx->Callback), std::move(Resp), *Ctx);
+ OnSlotFreed();
+ }
+
+ void OnCurlSocket(CURL* Easy, curl_socket_t Fd, int Action, AsyncSocketInfo* Info)
{
if (Action == CURL_POLL_REMOVE)
{
if (Info)
{
- // Cancel pending async_wait ops before releasing the fd.
- // curl owns the fd, so we must release() rather than close().
- Info->Socket.cancel();
- if (Info->Socket.is_open())
- {
- Info->Socket.release();
- }
- m_Sockets.erase(Fd);
+ // Cancel any pending async_wait but KEEP the AsyncSocketInfo
+ // alive in m_Sockets. CURL_POLL_REMOVE only means "stop
+ // watching this socket"; curl may still use the fd
+ // (keep-alive reuse) and rewatch it later. The asio Socket
+ // stays bound to the same IOCP it was first assigned to;
+ // we never call release()+assign() on the same fd, which
+ // avoided a race where in-flight async_wait callbacks
+ // raced with curl_multi reading from the socket and
+ // corrupted HTTP framing. The fd's actual close happens
+ // in CurlCloseSocketCallback, which erases the entry and
+ // lets the asio Socket destructor close.
+ asio::error_code Ec;
+ Info->Socket.cancel(Ec);
+ Info->WatchFlags = 0;
+ Info->PendingFlags = 0;
}
return;
}
if (!Info)
{
- // New socket - wrap the native fd in an ASIO socket.
- auto [It, Inserted] = m_Sockets.emplace(Fd, std::make_unique<SocketInfo>(m_IoContext));
- Info = It->second.get();
-
- asio::error_code Ec;
- // Determine protocol from the fd (v4 vs v6). Default to v4.
- Info->Socket.assign(asio::ip::tcp::v4(), Fd, Ec);
- if (Ec)
+ // CURL_POLL_IN/OUT with no Info attached. Two cases:
+ // 1) brand new fd - emplace, assign, IOCP-bind.
+ // 2) curl re-watching a kept-alive fd it earlier removed -
+ // reuse the existing AsyncSocketInfo, no re-assign.
+ auto It = m_Sockets.find(Fd);
+ if (It == m_Sockets.end())
{
- // Try v6 as fallback
- Info->Socket.assign(asio::ip::tcp::v6(), Fd, Ec);
- }
- if (Ec)
- {
- ZEN_WARN("AsyncHttpClient: failed to assign socket fd {}: {}", static_cast<int>(Fd), Ec.message());
- m_Sockets.erase(Fd);
- return;
- }
+ auto [NewIt, _] = m_Sockets.emplace(Fd, std::make_unique<AsyncSocketInfo>(m_Strand));
+ It = NewIt;
+ asio::error_code Ec;
+ It->second->Socket.assign(asio::ip::tcp::v4(), Fd, Ec);
+ if (Ec)
+ {
+ It->second->Socket.assign(asio::ip::tcp::v6(), Fd, Ec);
+ }
+ if (Ec)
+ {
+ std::string Reason =
+ fmt::format("AsyncHttpClient: failed to assign socket fd {}: {}", static_cast<int>(Fd), Ec.message());
+ ZEN_WARN("{}", Reason);
+ m_Sockets.erase(It);
+ FailEasyHandleForFd(Easy, Reason);
+ return;
+ }
+ }
+ Info = It->second.get();
curl_multi_assign(m_Multi, Fd, Info);
}
@@ -415,37 +1179,84 @@ struct AsyncHttpClient::Impl
SetSocketWatch(Fd, Info);
}
- void SetSocketWatch(curl_socket_t Fd, SocketInfo* Info)
+ void SetSocketWatch(curl_socket_t Fd, AsyncSocketInfo* Info)
{
- // Cancel any pending wait before issuing a new one.
- Info->Socket.cancel();
+ // Cancel only when a previously-watched direction is no longer wanted.
+ // In the common path (one-shot async_wait completes, curl re-watches
+ // the same flags) PendingFlags is a subset of WatchFlags and we just
+ // re-arm the missing direction without touching CancelIoEx.
+ const int Desired = Info->WatchFlags & (CURL_POLL_IN | CURL_POLL_OUT);
+
+ if (Info->PendingFlags & ~Desired)
+ {
+ asio::error_code Ec;
+ Info->Socket.cancel(Ec);
+ Info->PendingFlags = 0;
+ }
+
+ const int ToAdd = Desired & ~Info->PendingFlags;
- if (Info->WatchFlags & CURL_POLL_IN)
+ if (ToAdd & CURL_POLL_IN)
{
- Info->Socket.async_wait(asio::socket_base::wait_read, asio::bind_executor(m_Strand, [this, Fd](const asio::error_code& Ec) {
- if (Ec || m_ShuttingDown)
- {
- return;
- }
- OnSocketReady(Fd, CURL_CSELECT_IN);
- }));
+ Info->PendingFlags |= CURL_POLL_IN;
+ Info->Socket.async_wait(asio::socket_base::wait_read, [this, Fd](const asio::error_code& Ec) {
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ auto It = m_Sockets.find(Fd);
+ if (It == m_Sockets.end())
+ {
+ return;
+ }
+ It->second->PendingFlags &= ~CURL_POLL_IN;
+ if (Ec)
+ {
+ if (Ec != asio::error::operation_aborted)
+ {
+ ZEN_DEBUG("AsyncHttpClient: read async_wait fd {} error: {}; signalling curl with CURL_CSELECT_ERR",
+ static_cast<int>(Fd),
+ Ec.message());
+ OnSocketReady(Fd, CURL_CSELECT_ERR);
+ }
+ return;
+ }
+ OnSocketReady(Fd, CURL_CSELECT_IN);
+ });
}
- if (Info->WatchFlags & CURL_POLL_OUT)
+ if (ToAdd & CURL_POLL_OUT)
{
- Info->Socket.async_wait(asio::socket_base::wait_write, asio::bind_executor(m_Strand, [this, Fd](const asio::error_code& Ec) {
- if (Ec || m_ShuttingDown)
- {
- return;
- }
- OnSocketReady(Fd, CURL_CSELECT_OUT);
- }));
+ Info->PendingFlags |= CURL_POLL_OUT;
+ Info->Socket.async_wait(asio::socket_base::wait_write, [this, Fd](const asio::error_code& Ec) {
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ auto It = m_Sockets.find(Fd);
+ if (It == m_Sockets.end())
+ {
+ return;
+ }
+ It->second->PendingFlags &= ~CURL_POLL_OUT;
+ if (Ec)
+ {
+ if (Ec != asio::error::operation_aborted)
+ {
+ ZEN_DEBUG("AsyncHttpClient: write async_wait fd {} error: {}; signalling curl with CURL_CSELECT_ERR",
+ static_cast<int>(Fd),
+ Ec.message());
+ OnSocketReady(Fd, CURL_CSELECT_ERR);
+ }
+ return;
+ }
+ OnSocketReady(Fd, CURL_CSELECT_OUT);
+ });
}
}
void OnSocketReady(curl_socket_t Fd, int CurlAction)
{
- ZEN_TRACE_CPU("AsyncHttpClient::OnSocketReady");
int StillRunning = 0;
curl_multi_socket_action(m_Multi, Fd, CurlAction, &StillRunning);
CheckCompleted();
@@ -472,7 +1283,7 @@ struct AsyncHttpClient::Impl
if (TimeoutMs == 0)
{
- // curl wants immediate action - run it directly on the strand.
+ // curl wants immediate action - run it on the next strand tick.
asio::post(m_Strand, [this]() {
if (m_ShuttingDown)
{
@@ -486,16 +1297,24 @@ struct AsyncHttpClient::Impl
}
m_Timer.expires_after(std::chrono::milliseconds(TimeoutMs));
- m_Timer.async_wait(asio::bind_executor(m_Strand, [this](const asio::error_code& Ec) {
- if (Ec || m_ShuttingDown)
+ m_Timer.async_wait([this](const asio::error_code& Ec) {
+ if (m_ShuttingDown)
+ {
+ return;
+ }
+ if (Ec)
{
+ if (Ec != asio::error::operation_aborted)
+ {
+ ZEN_DEBUG("AsyncHttpClient: curl multi timer error: {}", Ec.message());
+ }
return;
}
ZEN_TRACE_CPU("AsyncHttpClient::OnTimeout");
int StillRunning = 0;
curl_multi_socket_action(m_Multi, CURL_SOCKET_TIMEOUT, 0, &StillRunning);
CheckCompleted();
- }));
+ });
}
// Drain completed transfers from curl_multi --------------------------
@@ -516,7 +1335,13 @@ struct AsyncHttpClient::Impl
curl_multi_remove_handle(m_Multi, Handle);
- auto It = m_Transfers.find(Handle);
+ // Recover TokenId from CURLOPT_PRIVATE; cheaper than a per-handle
+ // reverse map. Returns nullptr if option was never set.
+ char* Private = nullptr;
+ curl_easy_getinfo(Handle, CURLINFO_PRIVATE, &Private);
+ const uint64_t TokenId = static_cast<uint64_t>(reinterpret_cast<uintptr_t>(Private));
+
+ auto It = m_Transfers.find(TokenId);
if (It == m_Transfers.end())
{
ReleaseHandle(Handle);
@@ -530,9 +1355,45 @@ struct AsyncHttpClient::Impl
}
}
+ // Mirrors CurlHttpClient::ShouldRetry semantics; keep the two in sync.
+ static bool ShouldRetryAsync(CURLcode CurlResult, long StatusCode)
+ {
+ switch (CurlResult)
+ {
+ case CURLE_OK:
+ break;
+ case CURLE_COULDNT_CONNECT:
+ case CURLE_RECV_ERROR:
+ case CURLE_SEND_ERROR:
+ case CURLE_OPERATION_TIMEDOUT:
+ case CURLE_PARTIAL_FILE:
+ return true;
+ default:
+ return false;
+ }
+ switch (static_cast<HttpResponseCode>(StatusCode))
+ {
+ case HttpResponseCode::RequestTimeout:
+ case HttpResponseCode::TooManyRequests:
+ case HttpResponseCode::InternalServerError:
+ case HttpResponseCode::BadGateway:
+ case HttpResponseCode::ServiceUnavailable:
+ case HttpResponseCode::GatewayTimeout:
+ return true;
+ default:
+ return false;
+ }
+ }
+
void CompleteTransfer(CURL* Handle, CURLcode CurlResult, std::unique_ptr<TransferContext> Ctx)
{
ZEN_TRACE_CPU("AsyncHttpClient::CompleteTransfer");
+
+ // Free the in-flight counter before any retry / cancel branch. Retry
+ // re-submits via SubmitFromSpec, which re-increments the counter for
+ // the next attempt - keeping the assert balanced across retries.
+ OnSlotFreed();
+
// Extract result info
long StatusCode = 0;
curl_easy_getinfo(Handle, CURLINFO_RESPONSE_CODE, &StatusCode);
@@ -548,13 +1409,205 @@ struct AsyncHttpClient::Impl
ReleaseHandle(Handle);
+ // Cancellation came in after curl ran but before we processed completion.
+ // Synthesize a cancel response and skip retry.
+ if (Ctx->Cancelled)
+ {
+ HttpClient::Response CancelResp;
+ CancelResp.Error = HttpClient::ErrorContext{
+ .ErrorCode = HttpClientErrorCode::kRequestCancelled,
+ .ErrorMessage = "Request canceled",
+ };
+ DispatchCallback(std::move(Ctx->Callback), std::move(CancelResp), *Ctx);
+ return;
+ }
+
+ // User OnData / OnReadSource threw inside curl. The transfer is already
+ // aborted; surface the stashed exception text and skip retry (a callback
+ // exception is not a transient transport failure).
+ if (Ctx->CallbackFailed)
+ {
+ HttpClient::Response Resp;
+ Resp.Error = HttpClient::ErrorContext{
+ .ErrorCode = HttpClientErrorCode::kInternalError,
+ .ErrorMessage = std::move(Ctx->CallbackErrorMessage),
+ };
+ DispatchCallback(std::move(Ctx->Callback), std::move(Resp), *Ctx);
+ return;
+ }
+
+ // Retry path: re-issue from spec after backoff. Keeps the user callback
+ // untouched so the eventual final result fires only once.
+ if (!m_ShuttingDown && Ctx->AttemptCount < m_Settings.RetryCount && ShouldRetryAsync(CurlResult, StatusCode))
+ {
+ ++Ctx->AttemptCount;
+ const long BackoffMs = 100 * Ctx->AttemptCount;
+
+ if (CurlResult != CURLE_OK)
+ {
+ ZEN_INFO("Retry (session: {}): HTTP error ({}) '{}' (Curl error: {}) Attempt {}/{}",
+ m_SessionId,
+ static_cast<int>(MapCurlError(CurlResult)),
+ curl_easy_strerror(CurlResult),
+ static_cast<int>(CurlResult),
+ Ctx->AttemptCount,
+ m_Settings.RetryCount + 1);
+ }
+ else
+ {
+ ZEN_INFO("Retry (session: {}): HTTP status ({}) '{}' Attempt {}/{}",
+ m_SessionId,
+ StatusCode,
+ zen::ToString(HttpResponseCode(StatusCode)),
+ Ctx->AttemptCount,
+ m_Settings.RetryCount + 1);
+ }
+
+ // Stash the just-finished attempt's failure so the abandon path can
+ // surface it instead of a generic "Request canceled (retry abandoned)".
+ // Non-empty LastErrorMessage marks the stash valid.
+ Ctx->LastCurlResult = CurlResult;
+ Ctx->LastStatusCode = StatusCode;
+ Ctx->LastErrorMessage = (CurlResult != CURLE_OK) ? std::string(curl_easy_strerror(CurlResult))
+ : std::string(zen::ToString(HttpResponseCode(StatusCode)));
+ Ctx->ResetForRetry();
+
+ const uint64_t RetryTokenId = Ctx->TokenId;
+ auto RetryTimer = std::make_shared<asio::steady_timer>(m_Strand);
+ RetryTimer->expires_after(std::chrono::milliseconds(BackoffMs));
+
+ // Park Ctx + Timer in m_RetryingTransfers so HandleCancel can find
+ // it and cancel the timer (early cancel without paying the full
+ // backoff). The timer lambda re-claims Ctx through the map; if
+ // HandleCancel got there first the entry is gone and the lambda
+ // returns silently.
+ auto [It, Inserted] = m_RetryingTransfers.emplace(RetryTokenId, RetryEntry{std::move(Ctx), RetryTimer});
+ ZEN_ASSERT(Inserted);
+
+ // Capture weak_from_this() rather than raw `this`. With an external
+ // io_context, Cleanup cancels the timer but the cancellation handler
+ // is queued on the caller's loop and may fire AFTER ~Impl runs. The
+ // owned-context path drains via restart()+poll() before destruction
+ // so this is moot there, but the weak ref is the cheapest way to
+ // keep both paths safe.
+ RetryTimer->async_wait([Self = weak_from_this(), RetryTokenId](const asio::error_code& Ec) {
+ auto Locked = Self.lock();
+ if (!Locked)
+ {
+ return;
+ }
+ Impl& Me = *Locked;
+ auto It = Me.m_RetryingTransfers.find(RetryTokenId);
+ if (It == Me.m_RetryingTransfers.end())
+ {
+ // HandleCancel already removed the entry and dispatched the
+ // cancel callback.
+ return;
+ }
+ std::unique_ptr<TransferContext> Ctx = std::move(It->second.Ctx);
+ Me.m_RetryingTransfers.erase(It);
+
+ if (Ec || Me.m_ShuttingDown)
+ {
+ // Retry abandoned by timer cancellation or client shutdown.
+ // Surface the underlying failure (stashed pre-backoff) so
+ // the caller can distinguish a real timeout/throttle from a
+ // shutdown / cancel race.
+ HttpClient::Response Resp;
+ if (!Ctx->LastErrorMessage.empty())
+ {
+ Resp.StatusCode = HttpResponseCode(Ctx->LastStatusCode);
+ Resp.Error = HttpClient::ErrorContext{
+ .ErrorCode =
+ (Ctx->LastCurlResult != CURLE_OK) ? MapCurlError(Ctx->LastCurlResult) : HttpClientErrorCode::kOtherError,
+ .ErrorMessage = fmt::format("Request canceled (retry abandoned after: {})", Ctx->LastErrorMessage),
+ };
+ }
+ else
+ {
+ Resp.Error = HttpClient::ErrorContext{
+ .ErrorCode = HttpClientErrorCode::kRequestCancelled,
+ .ErrorMessage = "Request canceled (retry abandoned)",
+ };
+ }
+ Me.DispatchCallback(std::move(Ctx->Callback), std::move(Resp), *Ctx);
+ return;
+ }
+ Me.SubmitFromSpec(std::move(Ctx));
+ });
+ return;
+ }
+
// Build response
HttpClient::Response Response;
Response.StatusCode = HttpResponseCode(StatusCode);
Response.UploadedBytes = static_cast<int64_t>(UpBytes);
Response.DownloadedBytes = static_cast<int64_t>(DownBytes);
Response.ElapsedSeconds = Elapsed;
- Response.Header = BuildHeaderMap(Ctx->ResponseHeaders);
+ // Hand the raw arena over - FindHeader scans this lazily. Build the
+ // parsed KeyValueMap only when caller explicitly asks (rare).
+ Response.HeaderArena = std::move(Ctx->HeaderArena);
+ if (Ctx->Spec.WantHeaderMap)
+ {
+ std::string_view View(Response.HeaderArena);
+ while (!View.empty())
+ {
+ const size_t LineEnd = View.find('\n');
+ std::string_view Line = LineEnd == std::string_view::npos ? View : View.substr(0, LineEnd);
+ View = LineEnd == std::string_view::npos ? std::string_view{} : View.substr(LineEnd + 1);
+ while (!Line.empty() && (Line.back() == '\r' || Line.back() == '\n'))
+ {
+ Line.remove_suffix(1);
+ }
+ if (auto Header = ParseHeaderLine(Line))
+ {
+ Response.Header->insert_or_assign(std::string(Header->first), std::string(Header->second));
+ }
+ }
+ }
+
+ // Helper: produce the response IoBuffer from whichever Body path was
+ // taken. Preallocated path moves with zero copy; chunked fallback
+ // moves the single chunk or flattens N chunks into one allocation.
+ auto BuildResponsePayload = [&]() -> IoBuffer {
+ if (Ctx->BodyPreallocated)
+ {
+ IoBuffer Out = std::move(Ctx->Body);
+ if (Ctx->BodyWriteOffset != Out.GetSize())
+ {
+ // Server closed early - return a non-owning sub-buffer over
+ // the actually-received prefix. Sub-buffer holds a ref to
+ // Out's core so the underlying allocation stays alive; no
+ // memcpy.
+ return IoBuffer(Out, 0, Ctx->BodyWriteOffset);
+ }
+ return Out;
+ }
+ if (Ctx->BodyChunks.size() == 1)
+ {
+ return std::move(Ctx->BodyChunks[0]);
+ }
+ if (!Ctx->BodyChunks.empty())
+ {
+ // Flatten N chunks into one IoBuffer; single alloc avoids a copy chain.
+ size_t Total = 0;
+ for (const IoBuffer& C : Ctx->BodyChunks)
+ {
+ Total += C.GetSize();
+ }
+ IoBuffer Out(Total);
+ uint8_t* Dst = static_cast<uint8_t*>(Out.MutableData());
+ for (const IoBuffer& C : Ctx->BodyChunks)
+ {
+ memcpy(Dst, C.GetData(), C.GetSize());
+ Dst += C.GetSize();
+ }
+ return Out;
+ }
+ return IoBuffer{};
+ };
+
+ const bool HasBody = Ctx->BodyPreallocated ? Ctx->BodyWriteOffset > 0 : !Ctx->BodyChunks.empty();
if (CurlResult != CURLE_OK)
{
@@ -562,258 +1615,261 @@ struct AsyncHttpClient::Impl
if (CurlResult != CURLE_OPERATION_TIMEDOUT && CurlResult != CURLE_COULDNT_CONNECT && CurlResult != CURLE_ABORTED_BY_CALLBACK)
{
- ZEN_WARN("AsyncHttpClient failure: ({}) '{}'", static_cast<int>(CurlResult), ErrorMsg);
+ ZEN_WARN("AsyncHttpClient failure: token={} {} '{}': ({}) '{}'",
+ Ctx->TokenId,
+ AsyncRequestMethodName(Ctx->Spec.Method),
+ Ctx->Spec.Url,
+ static_cast<int>(CurlResult),
+ ErrorMsg);
}
- if (!Ctx->Body.empty())
+ if (HasBody)
{
- Response.ResponsePayload = IoBufferBuilder::MakeCloneFromMemory(Ctx->Body.data(), Ctx->Body.size());
+ Response.ResponsePayload = BuildResponsePayload();
}
Response.Error = HttpClient::ErrorContext{.ErrorCode = MapCurlError(CurlResult), .ErrorMessage = std::string(ErrorMsg)};
}
- else if (StatusCode == static_cast<long>(HttpResponseCode::NoContent) || Ctx->Body.empty())
+ else if (StatusCode == static_cast<long>(HttpResponseCode::NoContent) || !HasBody)
{
// No payload
}
else
{
- IoBuffer PayloadBuffer = IoBufferBuilder::MakeCloneFromMemory(Ctx->Body.data(), Ctx->Body.size());
- ApplyContentTypeFromHeaders(PayloadBuffer, Ctx->ResponseHeaders);
+ IoBuffer PayloadBuffer = BuildResponsePayload();
+ if (Ctx->BodyContentType != ZenContentType::kUnknownContentType)
+ {
+ PayloadBuffer.SetContentType(Ctx->BodyContentType);
+ }
const HttpResponseCode Code = HttpResponseCode(StatusCode);
if (!IsHttpSuccessCode(Code) && Code != HttpResponseCode::NotFound)
{
- ZEN_WARN("AsyncHttpClient request failed: status={}, base={}", static_cast<int>(Code), m_BaseUri);
+ ZEN_WARN("AsyncHttpClient request failed: token={} {} '{}': status={}",
+ Ctx->TokenId,
+ AsyncRequestMethodName(Ctx->Spec.Method),
+ Ctx->Spec.Url,
+ static_cast<int>(Code));
}
Response.ResponsePayload = std::move(PayloadBuffer);
}
- // Dispatch the user callback off the strand so a slow callback
- // cannot starve the curl_multi poll loop.
- asio::post(m_IoContext, [LogRef = m_Log, Cb = std::move(Ctx->Callback), Response = std::move(Response)]() mutable {
- try
+ // Token reaches terminal state. Ctx (and its embedded TokenState) is
+ // destroyed when this scope ends; late Cancel() calls find no entry in
+ // m_Transfers and become no-ops.
+ DispatchCallback(std::move(Ctx->Callback), std::move(Response), *Ctx);
+ }
+
+ // -- Async verb implementations --------------------------------------
+
+ AsyncRequestToken Submit(std::unique_ptr<TransferContext> Ctx)
+ {
+ // Allocate token ID + state up front so callers can cancel before the
+ // posted submit even runs. Token::State is shared between the user-held
+ // AsyncRequestToken and the TransferContext (no separate strand-side map).
+ const uint64_t Id = m_NextTokenId.fetch_add(1, std::memory_order_relaxed);
+ Ctx->TokenId = Id;
+
+ auto State = std::make_shared<AsyncRequestToken::State>();
+ State->CancelFn = [WeakSelf = weak_from_this(), Id]() {
+ auto Self = WeakSelf.lock();
+ if (!Self)
{
- Cb(std::move(Response));
+ return;
}
- catch (const std::exception& Ex)
+ asio::post(Self->m_Strand, [Self, Id]() { Self->HandleCancel(Id); });
+ };
+ Ctx->TokenStateRef = State;
+
+ asio::post(m_Strand, [this, Ctx = std::move(Ctx)]() mutable {
+ if (m_ShuttingDown)
{
- ZEN_SCOPED_LOG(LogRef);
- ZEN_ERROR("AsyncHttpClient: unhandled exception in completion callback: {}", Ex.what());
+ HttpClient::Response Resp;
+ Resp.Error = HttpClient::ErrorContext{
+ .ErrorCode = HttpClientErrorCode::kRequestCancelled,
+ .ErrorMessage = "Request canceled (client shutting down)",
+ };
+ DispatchCallback(std::move(Ctx->Callback), std::move(Resp), *Ctx);
+ return;
}
+ SubmitFromSpec(std::move(Ctx));
});
- }
- // -- Async verb implementations --------------------------------------
+ return AsyncRequestToken(std::move(State));
+ }
- void DoAsyncGet(std::string Url,
- AsyncHttpCallback Callback,
- HttpClient::KeyValueMap AdditionalHeader,
- HttpClient::KeyValueMap Parameters)
+ void HandleCancel(uint64_t Id)
{
- asio::post(m_Strand,
- [this,
- Url = std::move(Url),
- Callback = std::move(Callback),
- AdditionalHeader = std::move(AdditionalHeader),
- Parameters = std::move(Parameters)]() mutable {
- ZEN_TRACE_CPU("AsyncHttpClient::Get");
- if (m_ShuttingDown)
- {
- return;
- }
- CURL* Handle = AllocHandle();
- ConfigureHandle(Handle, Url, Parameters);
- curl_easy_setopt(Handle, CURLOPT_HTTPGET, 1L);
-
- auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
- Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken());
- curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
-
- SubmitTransfer(Handle, std::move(Ctx));
- });
+ auto It = m_Transfers.find(Id);
+ if (It != m_Transfers.end())
+ {
+ std::unique_ptr<TransferContext> Ctx = std::move(It->second);
+ m_Transfers.erase(It);
+
+ Ctx->Cancelled = true;
+ if (Ctx->CurlHandle)
+ {
+ curl_multi_remove_handle(m_Multi, Ctx->CurlHandle);
+ ReleaseHandle(Ctx->CurlHandle);
+ Ctx->CurlHandle = nullptr;
+ }
+
+ HttpClient::Response CancelResp;
+ CancelResp.Error = HttpClient::ErrorContext{
+ .ErrorCode = HttpClientErrorCode::kRequestCancelled,
+ .ErrorMessage = "Request canceled",
+ };
+ DispatchCallback(std::move(Ctx->Callback), std::move(CancelResp), *Ctx);
+ OnSlotFreed();
+ return;
+ }
+
+ // Cancel landed during the retry backoff: take the parked Ctx + Timer,
+ // cancel the timer (the in-flight async_wait fires later with
+ // operation_aborted but finds no entry, so it's a no-op), and dispatch
+ // the cancel callback now so the user observes immediate cancellation
+ // rather than waiting out the backoff.
+ auto RetIt = m_RetryingTransfers.find(Id);
+ if (RetIt != m_RetryingTransfers.end())
+ {
+ std::unique_ptr<TransferContext> Ctx = std::move(RetIt->second.Ctx);
+ std::shared_ptr<asio::steady_timer> Timer = std::move(RetIt->second.Timer);
+ m_RetryingTransfers.erase(RetIt);
+ Timer->cancel();
+
+ HttpClient::Response CancelResp;
+ CancelResp.Error = HttpClient::ErrorContext{
+ .ErrorCode = HttpClientErrorCode::kRequestCancelled,
+ .ErrorMessage = "Request canceled",
+ };
+ DispatchCallback(std::move(Ctx->Callback), std::move(CancelResp), *Ctx);
+ return;
+ }
+
+ // Cancel landed before SubmitFromSpec ran (Cancel posted between Submit
+ // returning and the io thread executing the posted SubmitFromSpec).
+ // State.Cancelled has already been set by Cancel(); SubmitFromSpec
+ // checks it and synthesizes the cancel callback when the transfer
+ // eventually arrives. Nothing to do here.
}
- void DoAsyncHead(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader)
+ AsyncRequestToken DoAsyncGet(std::string Url,
+ AsyncHttpCallback Callback,
+ HttpClient::KeyValueMap AdditionalHeader,
+ HttpClient::KeyValueMap Parameters)
{
- asio::post(m_Strand,
- [this, Url = std::move(Url), Callback = std::move(Callback), AdditionalHeader = std::move(AdditionalHeader)]() mutable {
- ZEN_TRACE_CPU("AsyncHttpClient::Head");
- if (m_ShuttingDown)
- {
- return;
- }
- CURL* Handle = AllocHandle();
- ConfigureHandle(Handle, Url, {});
- curl_easy_setopt(Handle, CURLOPT_NOBODY, 1L);
-
- auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
- Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken());
- curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
-
- SubmitTransfer(Handle, std::move(Ctx));
- });
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->Spec.Method = AsyncRequestMethod::Get;
+ Ctx->Spec.Url = std::move(Url);
+ Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader);
+ Ctx->Spec.Parameters = std::move(Parameters);
+ return Submit(std::move(Ctx));
}
- void DoAsyncDelete(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader)
+ AsyncRequestToken DoAsyncHead(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader)
{
- asio::post(m_Strand,
- [this, Url = std::move(Url), Callback = std::move(Callback), AdditionalHeader = std::move(AdditionalHeader)]() mutable {
- ZEN_TRACE_CPU("AsyncHttpClient::Delete");
- if (m_ShuttingDown)
- {
- return;
- }
- CURL* Handle = AllocHandle();
- ConfigureHandle(Handle, Url, {});
- curl_easy_setopt(Handle, CURLOPT_CUSTOMREQUEST, "DELETE");
-
- auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
- Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken());
- curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
-
- SubmitTransfer(Handle, std::move(Ctx));
- });
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->Spec.Method = AsyncRequestMethod::Head;
+ Ctx->Spec.Url = std::move(Url);
+ Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader);
+ return Submit(std::move(Ctx));
}
- void DoAsyncPost(std::string Url,
- AsyncHttpCallback Callback,
- HttpClient::KeyValueMap AdditionalHeader,
- HttpClient::KeyValueMap Parameters)
+ AsyncRequestToken DoAsyncDelete(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap AdditionalHeader)
{
- asio::post(m_Strand,
- [this,
- Url = std::move(Url),
- Callback = std::move(Callback),
- AdditionalHeader = std::move(AdditionalHeader),
- Parameters = std::move(Parameters)]() mutable {
- ZEN_TRACE_CPU("AsyncHttpClient::Post");
- if (m_ShuttingDown)
- {
- return;
- }
- CURL* Handle = AllocHandle();
- ConfigureHandle(Handle, Url, Parameters);
- curl_easy_setopt(Handle, CURLOPT_POST, 1L);
- curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE, 0L);
-
- auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
- Ctx->HeaderList = BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken());
- curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
-
- SubmitTransfer(Handle, std::move(Ctx));
- });
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->Spec.Method = AsyncRequestMethod::Delete;
+ Ctx->Spec.Url = std::move(Url);
+ Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader);
+ return Submit(std::move(Ctx));
}
- void DoAsyncPostWithPayload(std::string Url,
- IoBuffer Payload,
- ZenContentType ContentType,
- AsyncHttpCallback Callback,
- HttpClient::KeyValueMap AdditionalHeader)
+ AsyncRequestToken DoAsyncPost(std::string Url,
+ AsyncHttpCallback Callback,
+ HttpClient::KeyValueMap AdditionalHeader,
+ HttpClient::KeyValueMap Parameters)
{
- asio::post(m_Strand,
- [this,
- Url = std::move(Url),
- Payload = std::move(Payload),
- ContentType,
- Callback = std::move(Callback),
- AdditionalHeader = std::move(AdditionalHeader)]() mutable {
- ZEN_TRACE_CPU("AsyncHttpClient::PostWithPayload");
- if (m_ShuttingDown)
- {
- return;
- }
- CURL* Handle = AllocHandle();
- ConfigureHandle(Handle, Url, {});
- curl_easy_setopt(Handle, CURLOPT_POST, 1L);
-
- auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
- Ctx->PayloadBuffer = std::move(Payload);
- Ctx->HeaderList =
- BuildHeaderList(AdditionalHeader,
- m_SessionId,
- GetAccessToken(),
- {std::make_pair("Content-Type", std::string(MapContentTypeToString(ContentType)))});
- curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
-
- // Set up read callback for payload data
- Ctx->ReadData.DataPtr = static_cast<const uint8_t*>(Ctx->PayloadBuffer.GetData());
- Ctx->ReadData.DataSize = Ctx->PayloadBuffer.GetSize();
- Ctx->ReadData.Offset = 0;
-
- curl_easy_setopt(Handle, CURLOPT_POSTFIELDSIZE_LARGE, static_cast<curl_off_t>(Ctx->PayloadBuffer.GetSize()));
- curl_easy_setopt(Handle, CURLOPT_READFUNCTION, CurlReadCallback);
- curl_easy_setopt(Handle, CURLOPT_READDATA, &Ctx->ReadData);
-
- SubmitTransfer(Handle, std::move(Ctx));
- });
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->Spec.Method = AsyncRequestMethod::Post;
+ Ctx->Spec.Url = std::move(Url);
+ Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader);
+ Ctx->Spec.Parameters = std::move(Parameters);
+ return Submit(std::move(Ctx));
}
- void DoAsyncPutWithPayload(std::string Url,
- IoBuffer Payload,
- AsyncHttpCallback Callback,
- HttpClient::KeyValueMap AdditionalHeader,
- HttpClient::KeyValueMap Parameters)
+ AsyncRequestToken DoAsyncPostWithPayload(std::string Url,
+ IoBuffer Payload,
+ ZenContentType ContentType,
+ AsyncHttpCallback Callback,
+ HttpClient::KeyValueMap AdditionalHeader)
{
- asio::post(m_Strand,
- [this,
- Url = std::move(Url),
- Payload = std::move(Payload),
- Callback = std::move(Callback),
- AdditionalHeader = std::move(AdditionalHeader),
- Parameters = std::move(Parameters)]() mutable {
- ZEN_TRACE_CPU("AsyncHttpClient::Put");
- if (m_ShuttingDown)
- {
- return;
- }
- CURL* Handle = AllocHandle();
- ConfigureHandle(Handle, Url, Parameters);
- curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L);
-
- auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
- Ctx->PayloadBuffer = std::move(Payload);
- Ctx->HeaderList = BuildHeaderList(
- AdditionalHeader,
- m_SessionId,
- GetAccessToken(),
- {std::make_pair("Content-Type", std::string(MapContentTypeToString(Ctx->PayloadBuffer.GetContentType())))});
- curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
-
- Ctx->ReadData.DataPtr = static_cast<const uint8_t*>(Ctx->PayloadBuffer.GetData());
- Ctx->ReadData.DataSize = Ctx->PayloadBuffer.GetSize();
- Ctx->ReadData.Offset = 0;
-
- curl_easy_setopt(Handle, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(Ctx->PayloadBuffer.GetSize()));
- curl_easy_setopt(Handle, CURLOPT_READFUNCTION, CurlReadCallback);
- curl_easy_setopt(Handle, CURLOPT_READDATA, &Ctx->ReadData);
-
- SubmitTransfer(Handle, std::move(Ctx));
- });
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->Spec.Method = AsyncRequestMethod::PostWithPayload;
+ Ctx->Spec.Url = std::move(Url);
+ Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader);
+ Ctx->Spec.Payload = std::move(Payload);
+ Ctx->Spec.ContentType = ContentType;
+ Ctx->Spec.HasContentType = true;
+ return Submit(std::move(Ctx));
}
- void DoAsyncPutNoPayload(std::string Url, AsyncHttpCallback Callback, HttpClient::KeyValueMap Parameters)
+ AsyncRequestToken DoAsyncPutWithPayload(std::string Url,
+ IoBuffer Payload,
+ AsyncHttpCallback Callback,
+ HttpClient::KeyValueMap AdditionalHeader,
+ HttpClient::KeyValueMap Parameters)
{
- asio::post(m_Strand, [this, Url = std::move(Url), Callback = std::move(Callback), Parameters = std::move(Parameters)]() mutable {
- ZEN_TRACE_CPU("AsyncHttpClient::Put");
- if (m_ShuttingDown)
- {
- return;
- }
- CURL* Handle = AllocHandle();
- ConfigureHandle(Handle, Url, Parameters);
- curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L);
- curl_easy_setopt(Handle, CURLOPT_INFILESIZE_LARGE, 0LL);
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->Spec.Method = AsyncRequestMethod::PutWithPayload;
+ Ctx->Spec.Url = std::move(Url);
+ Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader);
+ Ctx->Spec.Parameters = std::move(Parameters);
+ Ctx->Spec.Payload = std::move(Payload);
+ return Submit(std::move(Ctx));
+ }
- auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ AsyncRequestToken DoAsyncPutNoPayload(std::string Url,
+ AsyncHttpCallback Callback,
+ HttpClient::KeyValueMap AdditionalHeader,
+ HttpClient::KeyValueMap Parameters)
+ {
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->Spec.Method = AsyncRequestMethod::Put;
+ Ctx->Spec.Url = std::move(Url);
+ Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader);
+ Ctx->Spec.Parameters = std::move(Parameters);
+ return Submit(std::move(Ctx));
+ }
- HttpClient::KeyValueMap ContentLengthHeader{std::pair<std::string_view, std::string_view>{"Content-Length", "0"}};
- Ctx->HeaderList = BuildHeaderList(ContentLengthHeader, m_SessionId, GetAccessToken());
- curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Ctx->HeaderList);
+ AsyncRequestToken DoAsyncPutWithSource(std::string Url,
+ uint64_t TotalSize,
+ AsyncHttpReadSource Source,
+ AsyncHttpCallback Callback,
+ HttpClient::KeyValueMap AdditionalHeader)
+ {
+ auto Ctx = std::make_unique<TransferContext>(std::move(Callback));
+ Ctx->Spec.Method = AsyncRequestMethod::PutWithSource;
+ Ctx->Spec.Url = std::move(Url);
+ Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader);
+ Ctx->Spec.OnReadSource = std::move(Source);
+ Ctx->Spec.StreamingPutSize = TotalSize;
+ return Submit(std::move(Ctx));
+ }
- SubmitTransfer(Handle, std::move(Ctx));
- });
+ AsyncRequestToken DoAsyncStream(std::string Url,
+ AsyncHttpDataCallback OnData,
+ AsyncHttpCallback OnComplete,
+ HttpClient::KeyValueMap AdditionalHeader,
+ HttpClient::KeyValueMap Parameters)
+ {
+ auto Ctx = std::make_unique<TransferContext>(std::move(OnComplete));
+ Ctx->Spec.Method = AsyncRequestMethod::Stream;
+ Ctx->Spec.Url = std::move(Url);
+ Ctx->Spec.AdditionalHeader = std::move(AdditionalHeader);
+ Ctx->Spec.Parameters = std::move(Parameters);
+ Ctx->Spec.OnData = std::move(OnData);
+ return Submit(std::move(Ctx));
}
// -- Members ---------------------------------------------------------
@@ -824,25 +1880,44 @@ struct AsyncHttpClient::Impl
std::string m_SessionId;
std::string m_UnixSocketPathUtf8;
- // io_context and strand - all curl_multi operations are serialized on the
- // strand, making this safe even when the io_context has multiple threads.
- std::unique_ptr<asio::io_context> m_OwnedIoContext;
- asio::io_context& m_IoContext;
+ // io_context: either privately owned (m_OwnedIoContext non-null + we spin
+ // m_IoThread + hold m_WorkGuard) or supplied by the caller (m_OwnedIoContext
+ // null; caller drives the loop). Declared before m_Strand so the strand
+ // can bind its executor from m_IoContext during member init.
+ std::unique_ptr<asio::io_context> m_OwnedIoContext;
+ asio::io_context& m_IoContext;
+ // Single serialization point for every curl_multi operation, every async
+ // completion handler, and every Cleanup call. Declared after m_IoContext
+ // (so make_strand can read its executor) and before m_Timer (which binds
+ // to the strand).
asio::strand<asio::io_context::executor_type> m_Strand;
std::optional<asio::executor_work_guard<asio::io_context::executor_type>> m_WorkGuard;
std::thread m_IoThread;
- // curl_multi and socket-action state
- CURLM* m_Multi = nullptr;
- std::unordered_map<CURL*, std::unique_ptr<TransferContext>> m_Transfers;
- std::vector<CURL*> m_HandlePool;
- std::unordered_map<curl_socket_t, std::unique_ptr<SocketInfo>> m_Sockets;
- asio::steady_timer m_Timer;
- bool m_ShuttingDown = false;
+ // Strand-bound; async_wait completions land back on m_Strand.
+ asio::steady_timer m_Timer;
+ CURLM* m_Multi = nullptr;
+ // Single TokenId-keyed map. CurlHandle lives in TransferContext; reverse
+ // lookup from CURL* uses CURLOPT_PRIVATE (set in SubmitTransfer).
+ std::unordered_map<uint64_t, std::unique_ptr<TransferContext>> m_Transfers;
+ // Transfers parked between curl-side completion and the next attempt's
+ // SubmitFromSpec. Lookups by TokenId let HandleCancel cancel the backoff
+ // timer without paying the full delay.
+ struct RetryEntry
+ {
+ std::unique_ptr<TransferContext> Ctx;
+ std::shared_ptr<asio::steady_timer> Timer;
+ };
+ std::unordered_map<uint64_t, RetryEntry> m_RetryingTransfers;
+ std::vector<CURL*> m_HandlePool;
+ std::unordered_map<curl_socket_t, std::unique_ptr<AsyncSocketInfo>> m_Sockets;
+ uint32_t m_InFlight = 0; // telemetry only; storage layer caps fan-out
- // Access token cache
- RwLock m_AccessTokenLock;
+ std::atomic<bool> m_ShuttingDown{false};
HttpClientAccessToken m_CachedAccessToken;
+
+ std::atomic<uint64_t> m_NextTokenId{1};
+ std::atomic<bool> m_ShutdownDone{false};
};
//////////////////////////////////////////////////////////////////////////
@@ -850,79 +1925,134 @@ struct AsyncHttpClient::Impl
// AsyncHttpClient public API
AsyncHttpClient::AsyncHttpClient(std::string_view BaseUri, const HttpClientSettings& Settings)
-: m_Impl(std::make_unique<Impl>(BaseUri, Settings))
+: m_Impl(std::make_shared<Impl>(BaseUri, Settings))
{
}
AsyncHttpClient::AsyncHttpClient(std::string_view BaseUri, asio::io_context& IoContext, const HttpClientSettings& Settings)
-: m_Impl(std::make_unique<Impl>(BaseUri, IoContext, Settings))
+: m_Impl(std::make_shared<Impl>(BaseUri, IoContext, Settings))
{
}
-AsyncHttpClient::~AsyncHttpClient() = default;
+AsyncHttpClient::~AsyncHttpClient()
+{
+ // Drive teardown synchronously while we're guaranteed to be on a user
+ // thread (not the io thread). Joining and draining here ensures any
+ // posted lambdas that captured a shared_ptr<Impl> by value (e.g.
+ // Cancel after the transfer completed) are destroyed and release
+ // their refs before m_Impl drops; otherwise the cycle "lambda holds
+ // Impl ref / lambda lives in io_context owned by Impl" would pin Impl
+ // alive forever and leak curl_multi + socket handles.
+ if (m_Impl)
+ {
+ m_Impl->Shutdown();
+ }
+}
// -- Callback-based API --------------------------------------------------
-void
+AsyncRequestToken
AsyncHttpClient::AsyncGet(std::string_view Url,
AsyncHttpCallback Callback,
const KeyValueMap& AdditionalHeader,
const KeyValueMap& Parameters)
{
- m_Impl->DoAsyncGet(std::string(Url), std::move(Callback), AdditionalHeader, Parameters);
+ return m_Impl->DoAsyncGet(std::string(Url), std::move(Callback), AdditionalHeader, Parameters);
}
-void
+AsyncRequestToken
AsyncHttpClient::AsyncHead(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader)
{
- m_Impl->DoAsyncHead(std::string(Url), std::move(Callback), AdditionalHeader);
+ return m_Impl->DoAsyncHead(std::string(Url), std::move(Callback), AdditionalHeader);
}
-void
+AsyncRequestToken
AsyncHttpClient::AsyncDelete(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader)
{
- m_Impl->DoAsyncDelete(std::string(Url), std::move(Callback), AdditionalHeader);
+ return m_Impl->DoAsyncDelete(std::string(Url), std::move(Callback), AdditionalHeader);
}
-void
+AsyncRequestToken
AsyncHttpClient::AsyncPost(std::string_view Url,
AsyncHttpCallback Callback,
const KeyValueMap& AdditionalHeader,
const KeyValueMap& Parameters)
{
- m_Impl->DoAsyncPost(std::string(Url), std::move(Callback), AdditionalHeader, Parameters);
+ return m_Impl->DoAsyncPost(std::string(Url), std::move(Callback), AdditionalHeader, Parameters);
}
-void
+AsyncRequestToken
AsyncHttpClient::AsyncPost(std::string_view Url, const IoBuffer& Payload, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader)
{
- m_Impl->DoAsyncPostWithPayload(std::string(Url), Payload, Payload.GetContentType(), std::move(Callback), AdditionalHeader);
+ return m_Impl->DoAsyncPostWithPayload(std::string(Url), Payload, Payload.GetContentType(), std::move(Callback), AdditionalHeader);
}
-void
+AsyncRequestToken
AsyncHttpClient::AsyncPost(std::string_view Url,
const IoBuffer& Payload,
ZenContentType ContentType,
AsyncHttpCallback Callback,
const KeyValueMap& AdditionalHeader)
{
- m_Impl->DoAsyncPostWithPayload(std::string(Url), Payload, ContentType, std::move(Callback), AdditionalHeader);
+ return m_Impl->DoAsyncPostWithPayload(std::string(Url), Payload, ContentType, std::move(Callback), AdditionalHeader);
}
-void
+AsyncRequestToken
AsyncHttpClient::AsyncPut(std::string_view Url,
const IoBuffer& Payload,
AsyncHttpCallback Callback,
const KeyValueMap& AdditionalHeader,
const KeyValueMap& Parameters)
{
- m_Impl->DoAsyncPutWithPayload(std::string(Url), Payload, std::move(Callback), AdditionalHeader, Parameters);
+ return m_Impl->DoAsyncPutWithPayload(std::string(Url), Payload, std::move(Callback), AdditionalHeader, Parameters);
+}
+
+AsyncRequestToken
+AsyncHttpClient::AsyncPut(std::string_view Url,
+ AsyncHttpCallback Callback,
+ const KeyValueMap& AdditionalHeader,
+ const KeyValueMap& Parameters)
+{
+ return m_Impl->DoAsyncPutNoPayload(std::string(Url), std::move(Callback), AdditionalHeader, Parameters);
+}
+
+AsyncRequestToken
+AsyncHttpClient::AsyncPut(std::string_view Url,
+ uint64_t TotalSize,
+ AsyncHttpReadSource Source,
+ AsyncHttpCallback OnComplete,
+ const KeyValueMap& AdditionalHeader)
+{
+ return m_Impl->DoAsyncPutWithSource(std::string(Url), TotalSize, std::move(Source), std::move(OnComplete), AdditionalHeader);
+}
+
+AsyncRequestToken
+AsyncHttpClient::AsyncStream(std::string_view Url,
+ AsyncHttpDataCallback OnData,
+ AsyncHttpCallback OnComplete,
+ const KeyValueMap& AdditionalHeader,
+ const KeyValueMap& Parameters)
+{
+ return m_Impl->DoAsyncStream(std::string(Url), std::move(OnData), std::move(OnComplete), AdditionalHeader, Parameters);
}
+// -- Token cancellation --------------------------------------------------
+
void
-AsyncHttpClient::AsyncPut(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& Parameters)
+AsyncRequestToken::Cancel()
{
- m_Impl->DoAsyncPutNoPayload(std::string(Url), std::move(Callback), Parameters);
+ if (!m_State)
+ {
+ return;
+ }
+ if (m_State->Cancelled.exchange(true, std::memory_order_acq_rel))
+ {
+ return; // already cancelled
+ }
+ if (m_State->CancelFn)
+ {
+ m_State->CancelFn();
+ }
}
// -- Future-based API ----------------------------------------------------
@@ -1026,6 +2156,7 @@ AsyncHttpClient::Put(std::string_view Url, const KeyValueMap& Parameters)
AsyncPut(
Url,
[Promise](Response R) { Promise->set_value(std::move(R)); },
+ KeyValueMap{},
Parameters);
return Future;
}