diff options
| author | Dan Engelbrecht <[email protected]> | 2026-05-05 14:59:21 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-05-05 14:59:21 +0200 |
| commit | 46f456ffd4d0717a035253ff9076ca6ee664e536 (patch) | |
| tree | 69d7a9a43b9874fd3990c43aa5ff4135c35d53d9 /src/zenhttp/include | |
| parent | watchdog ephemeral port exhaust (#1022) (diff) | |
| download | archived-zen-46f456ffd4d0717a035253ff9076ca6ee664e536.tar.xz archived-zen-46f456ffd4d0717a035253ff9076ca6ee664e536.zip | |
hub async s3 client (#1024)
- Feature: `AsyncHttpClient` adds cancellable request tokens, streaming GET to a file (`AsyncDownload`), zero-copy chunk-callback GET (`AsyncStream`), pull-mode body source for streaming `AsyncPut`, retry layer mirroring the synchronous client, and a submit-side in-flight cap (`HttpClientSettings::MaxConcurrentRequests`) so hub-scale fanout against a single host cannot stall queued handles into curl's connect-timeout window
- Feature: Hub hydration can route S3 transfers through a non-blocking `AsyncHttpClient` (curl_multi + asio) backed by a single io thread; hydrate and dehydrate now pipeline requests instead of blocking worker threads
- `--hub-hydration-async-enabled` (Lua: `hub.hydration.async.enabled`, default true)
- `--hub-hydration-async-max-concurrent-requests` (Lua: `hub.hydration.async.maxconcurrentrequests`, default `clamp(cpu*4, 128, 512)`)
- Feature: Hub provision/deprovision/obliterate now run as two phases on separate worker pools so per-module hydration cannot starve child-process spawn/despawn (and vice versa)
- New `--hub-instance-spawn-threads` (Lua: `hub.instance.spawnthreads`, default `clamp(cpu/8, 4, 16)`) drives child-process spawn/despawn
- `--hub-instance-provision-threads` (Lua: `hub.instance.provisionthreads`) now drives per-module hydrate/dehydrate scheduling only; default changed from `max(cpu/4, 2)` to `clamp(cpu/8, 4, 12)`
- `--hub-hydration-threads` (Lua: `hub.hydration.threads`) now controls per-file workers inside a single hydrate/dehydrate; default changed from `max(cpu/4, 2)` to `clamp(cpu/8, 4, 12)`
- Feature: `AsyncHttpClient` owns its `asio::io_context` and one io thread by default; the `(BaseUri, io_context&)` constructor is preserved for callers that want to share an externally-driven `io_context` across clients (caller MUST keep the loop running until the client destructs)
- Feature: `Hub::Configuration` C++ struct fields renamed (`OptionalProvisionWorkerPool`/`OptionalHydrationWorkerPool` -> `OptionalProvisionPool`/`OptionalSpawnPool`/`OptionalHydrationPool`). Embedders constructing `Hub` directly must update field names; provision and spawn pools must both be set or both null (asserted at construction).
- Bugfix: `S3Client` signing-key cache no longer returns stale signatures after IMDS-rotated credentials change `AccessKeyId`; cache is now keyed on `(DateStamp, AccessKeyId)`
Diffstat (limited to 'src/zenhttp/include')
| -rw-r--r-- | src/zenhttp/include/zenhttp/asynchttpclient.h | 222 | ||||
| -rw-r--r-- | src/zenhttp/include/zenhttp/httpclient.h | 34 |
2 files changed, 211 insertions, 45 deletions
diff --git a/src/zenhttp/include/zenhttp/asynchttpclient.h b/src/zenhttp/include/zenhttp/asynchttpclient.h index cb41626b9..d4a33e3ac 100644 --- a/src/zenhttp/include/zenhttp/asynchttpclient.h +++ b/src/zenhttp/include/zenhttp/asynchttpclient.h @@ -19,21 +19,96 @@ namespace zen { /// Completion callback for async HTTP operations. using AsyncHttpCallback = std::function<void(HttpClient::Response)>; +/// Pull-mode body source for the streaming `AsyncPut` overload. Fired from +/// the AsyncHttpClient io thread when curl needs more upload bytes; runs on +/// the same strand as every other transfer. +/// +/// IMPORTANT: like AsyncHttpDataCallback, this runs on the io thread and +/// stalls curl_multi for ALL in-flight transfers if it blocks. Local-disk +/// pread is acceptable on fast storage; for anything slower, pre-fetch into +/// a caller-owned ring on a worker pool and have this callback pop from it. +/// +/// Dst - destination buffer to fill (caller writes here). +/// MaxBytes - maximum bytes Dst can hold this call. +/// AbsOffset - cumulative bytes returned so far (i.e. start offset of +/// this chunk into the request body). Strictly monotonic +/// across calls; equals TotalSize when EOF reached. +/// +/// Returns the number of bytes written into Dst (<= MaxBytes). Returning 0 +/// while AbsOffset < TotalSize is treated as an upload error (CURL_READFUNC_ABORT). +using AsyncHttpReadSource = std::function<size_t(uint8_t* Dst, size_t MaxBytes, uint64_t AbsOffset)>; + +/// Per-chunk data callback for `AsyncStream`. Fired from the AsyncHttpClient +/// io thread (the same thread driving curl_multi for every other transfer on +/// this client) once per response-body chunk that curl delivers. +/// +/// IMPORTANT: OnData runs on the io thread. Any blocking work inside it +/// (synchronous disk I/O, `std::mutex` waits, network calls, lock contention) +/// stalls curl_multi for ALL in-flight transfers on this client, not just +/// this one. Treat OnData as a strand: copy/refcount the bytes into a buffer +/// you own and hop the heavy work to a worker pool. See +/// `S3AsyncStorage::Get` (medium tier) for the canonical pattern - fill a +/// pre-sized IoBuffer from a bounded pool here, dispatch the positional disk +/// write to a worker, release the buffer back to the pool from the worker. +/// +/// Data - pointer to received bytes; valid only for the duration of +/// this call. Caller must consume synchronously (memcpy into +/// a pre-arranged buffer slot, etc). No allocation or copy +/// is performed by AsyncHttpClient. +/// Size - bytes available at Data for this chunk. +/// TotalSize - declared total payload size (Content-Length, in bytes); 0 +/// if the server did not declare a length (e.g. +/// Transfer-Encoding: chunked). Constant across all calls +/// for a given request. +/// +/// Returns true to continue the transfer; returning false aborts the +/// transfer and the completion callback fires with an error response. +using AsyncHttpDataCallback = std::function<bool(const uint8_t* Data, size_t Size, uint64_t TotalSize)>; + +/// Handle to an in-flight async HTTP request. Returned by every AsyncXxx call. +/// Default-constructed token is empty (Cancel is a no-op). Calling Cancel on a +/// non-empty token requests cancellation of the underlying transfer; the +/// completion callback still fires once with an error response so callers do +/// not need a second notification path. +/// +/// The token does not keep the AsyncHttpClient alive. Callers must ensure +/// Cancel is invoked before the client is destroyed; tokens left dangling are +/// safe to destroy but Cancel calls after client destruction are no-ops. +class AsyncRequestToken +{ +public: + // Forward-declared so AsyncHttpClient internals can hold a typed + // shared_ptr<State> across worker / strand boundaries (used for the + // cancel-before-submit race check). State definition is private to the + // implementation TU. + struct State; + + AsyncRequestToken() = default; + + void Cancel(); + bool IsValid() const { return m_State != nullptr; } + +private: + friend class AsyncHttpClient; + explicit AsyncRequestToken(std::shared_ptr<State> S) : m_State(std::move(S)) {} + + std::shared_ptr<State> m_State; +}; + /** Asynchronous HTTP client backed by curl_multi and ASIO. * * Uses curl_multi_socket_action() driven by ASIO socket async_wait to process - * transfers without blocking the caller. All curl_multi operations are - * serialized on an internal strand; callers may issue requests from any - * thread, and the io_context may have multiple threads. + * transfers without blocking the caller. By default the client owns an + * io_context and a single io thread driving it; the second constructor reuses + * an external io_context. All curl_multi operations are serialized on an + * internal strand. Callers may issue requests from any thread. * - * Two construction modes: - * - Owned io_context: creates an internal thread (self-contained). - * - External io_context: caller runs the event loop. - * - * Completion callbacks are dispatched on the io_context (not the internal - * strand), so a slow callback will not block the curl poll loop. Future- - * based wrappers (Get, Post, ...) return a std::future<Response> for - * callers that prefer blocking on a result. + * Completion callbacks run inline on the AsyncHttpClient io thread (same + * strand as the curl poll loop): heavy work (disk syscalls, lock contention, + * large allocations) must be hopped to a worker pool. See + * `AsyncHttpDataCallback` and `AsyncHttpReadSource` for the same contract on + * streaming. Future-based wrappers (Get, Post, ...) return a + * std::future<Response> for callers that prefer blocking on a result. */ class AsyncHttpClient { @@ -41,11 +116,15 @@ public: using Response = HttpClient::Response; using KeyValueMap = HttpClient::KeyValueMap; - /// Construct with an internally-owned io_context and thread. explicit AsyncHttpClient(std::string_view BaseUri, const HttpClientSettings& Settings = {}); /// Construct with an externally-managed io_context. The io_context must - /// outlive this client and must be running (via run()) on at least one thread. + /// outlive the AsyncHttpClient and must be running (e.g. via run() on + /// a dedicated thread, or driven by the caller). The destructor posts + /// the cleanup handler to that loop and blocks until it completes - + /// destroying the client from the same thread that drives the io_context + /// would deadlock. Multiple threads on the same io_context are safe; + /// all curl_multi operations are serialized through an internal strand. AsyncHttpClient(std::string_view BaseUri, asio::io_context& IoContext, const HttpClientSettings& Settings = {}); ~AsyncHttpClient(); @@ -54,38 +133,93 @@ public: AsyncHttpClient& operator=(const AsyncHttpClient&) = delete; // -- Callback-based API ---------------------------------------------- - - void AsyncGet(std::string_view Url, - AsyncHttpCallback Callback, - const KeyValueMap& AdditionalHeader = {}, - const KeyValueMap& Parameters = {}); - - void AsyncHead(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader = {}); - - void AsyncDelete(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader = {}); - - void AsyncPost(std::string_view Url, - AsyncHttpCallback Callback, - const KeyValueMap& AdditionalHeader = {}, - const KeyValueMap& Parameters = {}); - - void AsyncPost(std::string_view Url, const IoBuffer& Payload, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader = {}); - - void AsyncPost(std::string_view Url, - const IoBuffer& Payload, - ZenContentType ContentType, - AsyncHttpCallback Callback, - const KeyValueMap& AdditionalHeader = {}); - - void AsyncPut(std::string_view Url, - const IoBuffer& Payload, - AsyncHttpCallback Callback, - const KeyValueMap& AdditionalHeader = {}, - const KeyValueMap& Parameters = {}); - - void AsyncPut(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& Parameters = {}); + // + // On every callback overload below the response's parsed Header map is left + // EMPTY by default - the raw header bytes live in Response::HeaderArena and + // callers should use Response::FindHeader(name) to look up individual values. + // This skips the per-line std::string allocations on the io thread that the + // sync client incurs. Callers that need to iterate all headers must opt in + // via AsyncRequestSpec::WantHeaderMap on the impl side; today no public + // overload exposes that flag because no in-tree caller needs it. + + AsyncRequestToken AsyncGet(std::string_view Url, + AsyncHttpCallback Callback, + const KeyValueMap& AdditionalHeader = {}, + const KeyValueMap& Parameters = {}); + + AsyncRequestToken AsyncHead(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader = {}); + + AsyncRequestToken AsyncDelete(std::string_view Url, AsyncHttpCallback Callback, const KeyValueMap& AdditionalHeader = {}); + + AsyncRequestToken AsyncPost(std::string_view Url, + AsyncHttpCallback Callback, + const KeyValueMap& AdditionalHeader = {}, + const KeyValueMap& Parameters = {}); + + AsyncRequestToken AsyncPost(std::string_view Url, + const IoBuffer& Payload, + AsyncHttpCallback Callback, + const KeyValueMap& AdditionalHeader = {}); + + AsyncRequestToken AsyncPost(std::string_view Url, + const IoBuffer& Payload, + ZenContentType ContentType, + AsyncHttpCallback Callback, + const KeyValueMap& AdditionalHeader = {}); + + AsyncRequestToken AsyncPut(std::string_view Url, + const IoBuffer& Payload, + AsyncHttpCallback Callback, + const KeyValueMap& AdditionalHeader = {}, + const KeyValueMap& Parameters = {}); + + AsyncRequestToken AsyncPut(std::string_view Url, + AsyncHttpCallback Callback, + const KeyValueMap& AdditionalHeader = {}, + const KeyValueMap& Parameters = {}); + + /// Streaming PUT: body bytes are pulled from `Source` on demand, no + /// pre-materialized payload buffer. TotalSize is set as Content-Length. + /// `Source` runs on the AsyncHttpClient io thread - same strand + /// discipline as AsyncStream's OnData. Useful for multipart parts or + /// medium-tier uploads where materializing the body would waste RAM. + AsyncRequestToken AsyncPut(std::string_view Url, + uint64_t TotalSize, + AsyncHttpReadSource Source, + AsyncHttpCallback OnComplete, + const KeyValueMap& AdditionalHeader = {}); + + /// Streaming GET with a caller-supplied per-chunk data callback. Bytes are + /// delivered to OnData as they arrive, with no internal allocation or + /// copy of the payload. Caller manages its own destination (positional + /// disk write, pre-arranged buffer slot, etc.) and must consume each + /// chunk synchronously before returning - the data pointer is only valid + /// for the duration of the call. + /// + /// WARNING: OnData runs on the AsyncHttpClient io thread. Blocking I/O, + /// lock contention, or any wait inside OnData stalls curl_multi for ALL + /// in-flight transfers on this client. See `AsyncHttpDataCallback` doc + /// for the recommended buffer-then-hop pattern. + /// + /// The completion Callback fires once with status / headers / error + /// (Response.ResponsePayload is always empty on this path). + /// + /// Useful when the caller already owns the destination file (e.g. ranged + /// multipart download writing per-range slices to a shared file) and + /// wants to avoid the curl-buffer -> in-memory IoBuffer -> file double + /// memcpy / lazy-commit page-fault cost of AsyncGet+worker-write. + AsyncRequestToken AsyncStream(std::string_view Url, + AsyncHttpDataCallback OnData, + AsyncHttpCallback OnComplete, + const KeyValueMap& AdditionalHeader = {}, + const KeyValueMap& Parameters = {}); // -- Future-based API ------------------------------------------------ + // + // These wrappers discard the underlying AsyncRequestToken; callers who need + // to cancel an in-flight request must use the callback-based API. The + // returned future resolves once with the final Response (which may carry an + // error / cancel response if the client is shutting down). [[nodiscard]] std::future<Response> Get(std::string_view Url, const KeyValueMap& AdditionalHeader = {}, @@ -115,7 +249,7 @@ public: private: struct Impl; - std::unique_ptr<Impl> m_Impl; + std::shared_ptr<Impl> m_Impl; }; void asynchttpclient_test_forcelink(); // internal diff --git a/src/zenhttp/include/zenhttp/httpclient.h b/src/zenhttp/include/zenhttp/httpclient.h index 4cf3a86a8..3162823f3 100644 --- a/src/zenhttp/include/zenhttp/httpclient.h +++ b/src/zenhttp/include/zenhttp/httpclient.h @@ -136,6 +136,24 @@ struct HttpClientSettings /// nullptr disables sharing. Must outlive every HttpClient that /// references it. Non-curl backends ignore this field. HttpClientShare* OptionalShare = nullptr; + + /// Max concurrent connections per host. Used by AsyncHttpClient to set + /// CURLMOPT_MAX_HOST_CONNECTIONS. libcurl default is small; hub-scale fanout + /// against a single S3 endpoint needs much higher. 0 = leave libcurl default. + uint32_t MaxConcurrentConnectionsPerHost = 0; + + /// Max concurrent connections total. Used by AsyncHttpClient to set + /// CURLMOPT_MAX_TOTAL_CONNECTIONS. 0 = leave libcurl default. + uint32_t MaxConcurrentConnectionsTotal = 0; + + /// Hint for the maximum number of in-flight requests the caller intends to + /// keep submitted to AsyncHttpClient. AsyncHttpClient itself does not gate + /// on this value - fan-out is bounded externally by the caller (e.g. the + /// hub's S3AsyncStorage admission semaphore). This setting is reused by + /// hub configuration to derive matching curl connection caps so libcurl's + /// CONNECTTIMEOUT does not tick on connections waiting behind the cap. + /// 0 = no hint. + uint32_t MaxConcurrentRequests = 0; }; class HttpClientError : public std::runtime_error @@ -272,9 +290,17 @@ public: HttpResponseCode StatusCode = HttpResponseCode::ImATeapot; IoBuffer ResponsePayload; // Note: this also includes the content type - // Contains the response headers + // Contains the response headers. By default the async path leaves this + // empty (raw bytes are kept in HeaderArena instead) - use FindHeader + // for lookups, or set AsyncRequestSpec::WantHeaderMap if you need the + // full parsed map. The synchronous client populates this as before. KeyValueMap Header; + // Raw response header bytes, "Key: Value\r\n" lines concatenated. + // Populated by the async client; empty for the sync client. Owned + // and freed with the Response. FindHeader() scans this lazily. + std::string HeaderArena; + // The number of bytes sent as part of the request int64_t UploadedBytes = 0; @@ -321,6 +347,12 @@ public: // objects, returns text as-is for text types like Text, JSON, HTML etc std::string ToText() const; + // Lookup a header by name (case-insensitive). Checks HeaderArena first, + // then Header map. Returns the first matching value or empty string_view + // if absent. View is valid until this Response is destroyed (or Header + // map is mutated). + std::string_view FindHeader(std::string_view Name) const; + // Returns whether the HTTP status code is considered successful (i.e in the // 2xx range) bool IsSuccess() const noexcept; |