aboutsummaryrefslogtreecommitdiff
path: root/src/zenhttp/include
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-05-05 14:59:21 +0200
committerGitHub Enterprise <[email protected]>2026-05-05 14:59:21 +0200
commit46f456ffd4d0717a035253ff9076ca6ee664e536 (patch)
tree69d7a9a43b9874fd3990c43aa5ff4135c35d53d9 /src/zenhttp/include
parentwatchdog ephemeral port exhaust (#1022) (diff)
downloadarchived-zen-46f456ffd4d0717a035253ff9076ca6ee664e536.tar.xz
archived-zen-46f456ffd4d0717a035253ff9076ca6ee664e536.zip
hub async s3 client (#1024)
- Feature: `AsyncHttpClient` adds cancellable request tokens, streaming GET to a file (`AsyncDownload`), zero-copy chunk-callback GET (`AsyncStream`), pull-mode body source for streaming `AsyncPut`, retry layer mirroring the synchronous client, and a submit-side in-flight cap (`HttpClientSettings::MaxConcurrentRequests`) so hub-scale fanout against a single host cannot stall queued handles into curl's connect-timeout window - Feature: Hub hydration can route S3 transfers through a non-blocking `AsyncHttpClient` (curl_multi + asio) backed by a single io thread; hydrate and dehydrate now pipeline requests instead of blocking worker threads - `--hub-hydration-async-enabled` (Lua: `hub.hydration.async.enabled`, default true) - `--hub-hydration-async-max-concurrent-requests` (Lua: `hub.hydration.async.maxconcurrentrequests`, default `clamp(cpu*4, 128, 512)`) - Feature: Hub provision/deprovision/obliterate now run as two phases on separate worker pools so per-module hydration cannot starve child-process spawn/despawn (and vice versa) - New `--hub-instance-spawn-threads` (Lua: `hub.instance.spawnthreads`, default `clamp(cpu/8, 4, 16)`) drives child-process spawn/despawn - `--hub-instance-provision-threads` (Lua: `hub.instance.provisionthreads`) now drives per-module hydrate/dehydrate scheduling only; default changed from `max(cpu/4, 2)` to `clamp(cpu/8, 4, 12)` - `--hub-hydration-threads` (Lua: `hub.hydration.threads`) now controls per-file workers inside a single hydrate/dehydrate; default changed from `max(cpu/4, 2)` to `clamp(cpu/8, 4, 12)` - Feature: `AsyncHttpClient` owns its `asio::io_context` and one io thread by default; the `(BaseUri, io_context&)` constructor is preserved for callers that want to share an externally-driven `io_context` across clients (caller MUST keep the loop running until the client destructs) - Feature: `Hub::Configuration` C++ struct fields renamed (`OptionalProvisionWorkerPool`/`OptionalHydrationWorkerPool` -> `OptionalProvisionPool`/`OptionalSpawnPool`/`OptionalHydrationPool`). Embedders constructing `Hub` directly must update field names; provision and spawn pools must both be set or both null (asserted at construction). - Bugfix: `S3Client` signing-key cache no longer returns stale signatures after IMDS-rotated credentials change `AccessKeyId`; cache is now keyed on `(DateStamp, AccessKeyId)`
Diffstat (limited to 'src/zenhttp/include')
-rw-r--r--src/zenhttp/include/zenhttp/asynchttpclient.h222
-rw-r--r--src/zenhttp/include/zenhttp/httpclient.h34
2 files changed, 211 insertions, 45 deletions
diff --git a/src/zenhttp/include/zenhttp/asynchttpclient.h b/src/zenhttp/include/zenhttp/asynchttpclient.h
index cb41626b9..d4a33e3ac 100644
--- a/src/zenhttp/include/zenhttp/asynchttpclient.h
+++ b/src/zenhttp/include/zenhttp/asynchttpclient.h
@@ -19,21 +19,96 @@ namespace zen {
/// Completion callback for async HTTP operations.
using AsyncHttpCallback = std::function<void(HttpClient::Response)>;
+/// Pull-mode body source for the streaming `AsyncPut` overload. Fired from
+/// the AsyncHttpClient io thread when curl needs more upload bytes; runs on
+/// the same strand as every other transfer.
+///
+/// IMPORTANT: like AsyncHttpDataCallback, this runs on the io thread and
+/// stalls curl_multi for ALL in-flight transfers if it blocks. Local-disk
+/// pread is acceptable on fast storage; for anything slower, pre-fetch into
+/// a caller-owned ring on a worker pool and have this callback pop from it.
+///
+/// Dst - destination buffer to fill (caller writes here).
+/// MaxBytes - maximum bytes Dst can hold this call.
+/// AbsOffset - cumulative bytes returned so far (i.e. start offset of
+/// this chunk into the request body). Strictly monotonic
+/// across calls; equals TotalSize when EOF reached.
+///
+/// Returns the number of bytes written into Dst (<= MaxBytes). Returning 0
+/// while AbsOffset < TotalSize is treated as an upload error (CURL_READFUNC_ABORT).
+using AsyncHttpReadSource = std::function<size_t(uint8_t* Dst, size_t MaxBytes, uint64_t AbsOffset)>;
+
+/// Per-chunk data callback for `AsyncStream`. Fired from the AsyncHttpClient
+/// io thread (the same thread driving curl_multi for every other transfer on
+/// this client) once per response-body chunk that curl delivers.
+///
+/// IMPORTANT: OnData runs on the io thread. Any blocking work inside it
+/// (synchronous disk I/O, `std::mutex` waits, network calls, lock contention)
+/// stalls curl_multi for ALL in-flight transfers on this client, not just
+/// this one. Treat OnData as a strand: copy/refcount the bytes into a buffer
+/// you own and hop the heavy work to a worker pool. See
+/// `S3AsyncStorage::Get` (medium tier) for the canonical pattern - fill a
+/// pre-sized IoBuffer from a bounded pool here, dispatch the positional disk
+/// write to a worker, release the buffer back to the pool from the worker.
+///
+/// Data - pointer to received bytes; valid only for the duration of
+/// this call. Caller must consume synchronously (memcpy into
+/// a pre-arranged buffer slot, etc). No allocation or copy
+/// is performed by AsyncHttpClient.
+/// Size - bytes available at Data for this chunk.
+/// TotalSize - declared total payload size (Content-Length, in bytes); 0
+/// if the server did not declare a length (e.g.
+/// Transfer-Encoding: chunked). Constant across all calls
+/// for a given request.
+///
+/// Returns true to continue the transfer; returning false aborts the
+/// transfer and the completion callback fires with an error response.
+using AsyncHttpDataCallback = std::function<bool(const uint8_t* Data, size_t Size, uint64_t TotalSize)>;
+
+/// Handle to an in-flight async HTTP request. Returned by every AsyncXxx call.
+/// Default-constructed token is empty (Cancel is a no-op). Calling Cancel on a
+/// non-empty token requests cancellation of the underlying transfer; the
+/// completion callback still fires once with an error response so callers do
+/// not need a second notification path.
+///
+/// The token does not keep the AsyncHttpClient alive. Callers must ensure
+/// Cancel is invoked before the client is destroyed; tokens left dangling are
+/// safe to destroy but Cancel calls after client destruction are no-ops.
+class AsyncRequestToken
+{
+public:
+ // Forward-declared so AsyncHttpClient internals can hold a typed
+ // shared_ptr<State> across worker / strand boundaries (used for the
+ // cancel-before-submit race check). State definition is private to the
+ // implementation TU.
+ struct State;
+
+ AsyncRequestToken() = default;
+
+ void Cancel();
+ bool IsValid() const { return m_State != nullptr; }
+
+private:
+ friend class AsyncHttpClient;
+ explicit AsyncRequestToken(std::shared_ptr<State> S) : m_State(std::move(S)) {}
+
+ std::shared_ptr<State> m_State;
+};
+
/** Asynchronous HTTP client backed by curl_multi and ASIO.
*
* Uses curl_multi_socket_action() driven by ASIO socket async_wait to process
- * transfers without blocking the caller. All curl_multi operations are
- * serialized on an internal strand; callers may issue requests from any
- * thread, and the io_context may have multiple threads.
+ * transfers without blocking the caller. By default the client owns an
+ * io_context and a single io thread driving it; the second constructor reuses
+ * an external io_context. All curl_multi operations are serialized on an
+ * internal strand. Callers may issue requests from any thread.
*
- * Two construction modes:
- * - Owned io_context: creates an internal thread (self-contained).
- * - External io_context: caller runs the event loop.
- *
- * Completion callbacks are dispatched on the io_context (not the internal
- * strand), so a slow callback will not block the curl poll loop. Future-
- * based wrappers (Get, Post, ...) return a std::future<Response> for
- * callers that prefer blocking on a result.
+ * Completion callbacks run inline on the AsyncHttpClient io thread (same
+ * strand as the curl poll loop): heavy work (disk syscalls, lock contention,
+ * large allocations) must be hopped to a worker pool. See
+ * `AsyncHttpDataCallback` and `AsyncHttpReadSource` for the same contract on
+ * streaming. Future-based wrappers (Get, Post, ...) return a
+ * std::future<Response> for callers that prefer blocking on a result.
*/
class AsyncHttpClient
{
@@ -41,11 +116,15 @@ public:
using Response = HttpClient::Response;
using KeyValueMap = HttpClient::KeyValueMap;
- /// Construct with an internally-owned io_context and thread.
explicit AsyncHttpClient(std::string_view BaseUri, const HttpClientSettings& Settings = {});
/// Construct with an externally-managed io_context. The io_context must
- /// outlive this client and must be running (via run()) on at least one thread.
+ /// outlive the AsyncHttpClient and must be running (e.g. via run() on
+ /// a dedicated thread, or driven by the caller). The destructor posts
+ /// the cleanup handler to that loop and blocks until it completes -
+ /// destroying the client from the same thread that drives the io_context
+ /// would deadlock. Multiple threads on the same io_context are safe;
+ /// all curl_multi operations are serialized through an internal strand.
AsyncHttpClient(std::string_view BaseUri, asio::io_context& IoContext, const HttpClientSettings& Settings = {});
~AsyncHttpClient();
@@ -54,38 +133,93 @@ public:
AsyncHttpClient& operator=(const AsyncHttpClient&) = delete;
// -- Callback-based API ----------------------------------------------
-
- void AsyncGet(std::string_view Url,
- AsyncHttpCallback Callback,
- const KeyValueMap& AdditionalHeader = {},
- const KeyValueMap& Parameters = {});
-
- void AsyncHead(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader = {});
-
- void AsyncDelete(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader = {});
-
- void AsyncPost(std::string_view Url,
- AsyncHttpCallback Callback,
- const KeyValueMap& AdditionalHeader = {},
- const KeyValueMap& Parameters = {});
-
- void AsyncPost(std::string_view Url, const IoBuffer& Payload, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader = {});
-
- void AsyncPost(std::string_view Url,
- const IoBuffer& Payload,
- ZenContentType ContentType,
- AsyncHttpCallback Callback,
- const KeyValueMap& AdditionalHeader = {});
-
- void AsyncPut(std::string_view Url,
- const IoBuffer& Payload,
- AsyncHttpCallback Callback,
- const KeyValueMap& AdditionalHeader = {},
- const KeyValueMap& Parameters = {});
-
- void AsyncPut(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& Parameters = {});
+ //
+ // On every callback overload below the response's parsed Header map is left
+ // EMPTY by default - the raw header bytes live in Response::HeaderArena and
+ // callers should use Response::FindHeader(name) to look up individual values.
+ // This skips the per-line std::string allocations on the io thread that the
+ // sync client incurs. Callers that need to iterate all headers must opt in
+ // via AsyncRequestSpec::WantHeaderMap on the impl side; today no public
+ // overload exposes that flag because no in-tree caller needs it.
+
+ AsyncRequestToken AsyncGet(std::string_view Url,
+ AsyncHttpCallback Callback,
+ const KeyValueMap& AdditionalHeader = {},
+ const KeyValueMap& Parameters = {});
+
+ AsyncRequestToken AsyncHead(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader = {});
+
+ AsyncRequestToken AsyncDelete(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader = {});
+
+ AsyncRequestToken AsyncPost(std::string_view Url,
+ AsyncHttpCallback Callback,
+ const KeyValueMap& AdditionalHeader = {},
+ const KeyValueMap& Parameters = {});
+
+ AsyncRequestToken AsyncPost(std::string_view Url,
+ const IoBuffer& Payload,
+ AsyncHttpCallback Callback,
+ const KeyValueMap& AdditionalHeader = {});
+
+ AsyncRequestToken AsyncPost(std::string_view Url,
+ const IoBuffer& Payload,
+ ZenContentType ContentType,
+ AsyncHttpCallback Callback,
+ const KeyValueMap& AdditionalHeader = {});
+
+ AsyncRequestToken AsyncPut(std::string_view Url,
+ const IoBuffer& Payload,
+ AsyncHttpCallback Callback,
+ const KeyValueMap& AdditionalHeader = {},
+ const KeyValueMap& Parameters = {});
+
+ AsyncRequestToken AsyncPut(std::string_view Url,
+ AsyncHttpCallback Callback,
+ const KeyValueMap& AdditionalHeader = {},
+ const KeyValueMap& Parameters = {});
+
+ /// Streaming PUT: body bytes are pulled from `Source` on demand, no
+ /// pre-materialized payload buffer. TotalSize is set as Content-Length.
+ /// `Source` runs on the AsyncHttpClient io thread - same strand
+ /// discipline as AsyncStream's OnData. Useful for multipart parts or
+ /// medium-tier uploads where materializing the body would waste RAM.
+ AsyncRequestToken AsyncPut(std::string_view Url,
+ uint64_t TotalSize,
+ AsyncHttpReadSource Source,
+ AsyncHttpCallback OnComplete,
+ const KeyValueMap& AdditionalHeader = {});
+
+ /// Streaming GET with a caller-supplied per-chunk data callback. Bytes are
+ /// delivered to OnData as they arrive, with no internal allocation or
+ /// copy of the payload. Caller manages its own destination (positional
+ /// disk write, pre-arranged buffer slot, etc.) and must consume each
+ /// chunk synchronously before returning - the data pointer is only valid
+ /// for the duration of the call.
+ ///
+ /// WARNING: OnData runs on the AsyncHttpClient io thread. Blocking I/O,
+ /// lock contention, or any wait inside OnData stalls curl_multi for ALL
+ /// in-flight transfers on this client. See `AsyncHttpDataCallback` doc
+ /// for the recommended buffer-then-hop pattern.
+ ///
+ /// The completion Callback fires once with status / headers / error
+ /// (Response.ResponsePayload is always empty on this path).
+ ///
+ /// Useful when the caller already owns the destination file (e.g. ranged
+ /// multipart download writing per-range slices to a shared file) and
+ /// wants to avoid the curl-buffer -> in-memory IoBuffer -> file double
+ /// memcpy / lazy-commit page-fault cost of AsyncGet+worker-write.
+ AsyncRequestToken AsyncStream(std::string_view Url,
+ AsyncHttpDataCallback OnData,
+ AsyncHttpCallback OnComplete,
+ const KeyValueMap& AdditionalHeader = {},
+ const KeyValueMap& Parameters = {});
// -- Future-based API ------------------------------------------------
+ //
+ // These wrappers discard the underlying AsyncRequestToken; callers who need
+ // to cancel an in-flight request must use the callback-based API. The
+ // returned future resolves once with the final Response (which may carry an
+ // error / cancel response if the client is shutting down).
[[nodiscard]] std::future<Response> Get(std::string_view Url,
const KeyValueMap& AdditionalHeader = {},
@@ -115,7 +249,7 @@ public:
private:
struct Impl;
- std::unique_ptr<Impl> m_Impl;
+ std::shared_ptr<Impl> m_Impl;
};
void asynchttpclient_test_forcelink(); // internal
diff --git a/src/zenhttp/include/zenhttp/httpclient.h b/src/zenhttp/include/zenhttp/httpclient.h
index 4cf3a86a8..3162823f3 100644
--- a/src/zenhttp/include/zenhttp/httpclient.h
+++ b/src/zenhttp/include/zenhttp/httpclient.h
@@ -136,6 +136,24 @@ struct HttpClientSettings
/// nullptr disables sharing. Must outlive every HttpClient that
/// references it. Non-curl backends ignore this field.
HttpClientShare* OptionalShare = nullptr;
+
+ /// Max concurrent connections per host. Used by AsyncHttpClient to set
+ /// CURLMOPT_MAX_HOST_CONNECTIONS. libcurl default is small; hub-scale fanout
+ /// against a single S3 endpoint needs much higher. 0 = leave libcurl default.
+ uint32_t MaxConcurrentConnectionsPerHost = 0;
+
+ /// Max concurrent connections total. Used by AsyncHttpClient to set
+ /// CURLMOPT_MAX_TOTAL_CONNECTIONS. 0 = leave libcurl default.
+ uint32_t MaxConcurrentConnectionsTotal = 0;
+
+ /// Hint for the maximum number of in-flight requests the caller intends to
+ /// keep submitted to AsyncHttpClient. AsyncHttpClient itself does not gate
+ /// on this value - fan-out is bounded externally by the caller (e.g. the
+ /// hub's S3AsyncStorage admission semaphore). This setting is reused by
+ /// hub configuration to derive matching curl connection caps so libcurl's
+ /// CONNECTTIMEOUT does not tick on connections waiting behind the cap.
+ /// 0 = no hint.
+ uint32_t MaxConcurrentRequests = 0;
};
class HttpClientError : public std::runtime_error
@@ -272,9 +290,17 @@ public:
HttpResponseCode StatusCode = HttpResponseCode::ImATeapot;
IoBuffer ResponsePayload; // Note: this also includes the content type
- // Contains the response headers
+ // Contains the response headers. By default the async path leaves this
+ // empty (raw bytes are kept in HeaderArena instead) - use FindHeader
+ // for lookups, or set AsyncRequestSpec::WantHeaderMap if you need the
+ // full parsed map. The synchronous client populates this as before.
KeyValueMap Header;
+ // Raw response header bytes, "Key: Value\r\n" lines concatenated.
+ // Populated by the async client; empty for the sync client. Owned
+ // and freed with the Response. FindHeader() scans this lazily.
+ std::string HeaderArena;
+
// The number of bytes sent as part of the request
int64_t UploadedBytes = 0;
@@ -321,6 +347,12 @@ public:
// objects, returns text as-is for text types like Text, JSON, HTML etc
std::string ToText() const;
+ // Lookup a header by name (case-insensitive). Checks HeaderArena first,
+ // then Header map. Returns the first matching value or empty string_view
+ // if absent. View is valid until this Response is destroyed (or Header
+ // map is mutated).
+ std::string_view FindHeader(std::string_view Name) const;
+
// Returns whether the HTTP status code is considered successful (i.e in the
// 2xx range)
bool IsSuccess() const noexcept;