diff options
| author | Dan Engelbrecht <[email protected]> | 2026-05-05 14:59:21 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-05-05 14:59:21 +0200 |
| commit | 46f456ffd4d0717a035253ff9076ca6ee664e536 (patch) | |
| tree | 69d7a9a43b9874fd3990c43aa5ff4135c35d53d9 /src/zencore/include | |
| parent | watchdog ephemeral port exhaust (#1022) (diff) | |
| download | archived-zen-46f456ffd4d0717a035253ff9076ca6ee664e536.tar.xz archived-zen-46f456ffd4d0717a035253ff9076ca6ee664e536.zip | |
hub async s3 client (#1024)
- Feature: `AsyncHttpClient` adds cancellable request tokens, streaming GET to a file (`AsyncDownload`), zero-copy chunk-callback GET (`AsyncStream`), pull-mode body source for streaming `AsyncPut`, retry layer mirroring the synchronous client, and a submit-side in-flight cap (`HttpClientSettings::MaxConcurrentRequests`) so hub-scale fanout against a single host cannot stall queued handles into curl's connect-timeout window
- Feature: Hub hydration can route S3 transfers through a non-blocking `AsyncHttpClient` (curl_multi + asio) backed by a single io thread; hydrate and dehydrate now pipeline requests instead of blocking worker threads
- `--hub-hydration-async-enabled` (Lua: `hub.hydration.async.enabled`, default true)
- `--hub-hydration-async-max-concurrent-requests` (Lua: `hub.hydration.async.maxconcurrentrequests`, default `clamp(cpu*4, 128, 512)`)
- Feature: Hub provision/deprovision/obliterate now run as two phases on separate worker pools so per-module hydration cannot starve child-process spawn/despawn (and vice versa)
- New `--hub-instance-spawn-threads` (Lua: `hub.instance.spawnthreads`, default `clamp(cpu/8, 4, 16)`) drives child-process spawn/despawn
- `--hub-instance-provision-threads` (Lua: `hub.instance.provisionthreads`) now drives per-module hydrate/dehydrate scheduling only; default changed from `max(cpu/4, 2)` to `clamp(cpu/8, 4, 12)`
- `--hub-hydration-threads` (Lua: `hub.hydration.threads`) now controls per-file workers inside a single hydrate/dehydrate; default changed from `max(cpu/4, 2)` to `clamp(cpu/8, 4, 12)`
- Feature: `AsyncHttpClient` owns its `asio::io_context` and one io thread by default; the `(BaseUri, io_context&)` constructor is preserved for callers that want to share an externally-driven `io_context` across clients (caller MUST keep the loop running until the client destructs)
- Feature: `Hub::Configuration` C++ struct fields renamed (`OptionalProvisionWorkerPool`/`OptionalHydrationWorkerPool` -> `OptionalProvisionPool`/`OptionalSpawnPool`/`OptionalHydrationPool`). Embedders constructing `Hub` directly must update field names; provision and spawn pools must both be set or both null (asserted at construction).
- Bugfix: `S3Client` signing-key cache no longer returns stale signatures after IMDS-rotated credentials change `AccessKeyId`; cache is now keyed on `(DateStamp, AccessKeyId)`
Diffstat (limited to 'src/zencore/include')
| -rw-r--r-- | src/zencore/include/zencore/parallelwork.h | 46 |
1 files changed, 46 insertions, 0 deletions
diff --git a/src/zencore/include/zencore/parallelwork.h b/src/zencore/include/zencore/parallelwork.h index 536b0a056..d9b20b9d7 100644 --- a/src/zencore/include/zencore/parallelwork.h +++ b/src/zencore/include/zencore/parallelwork.h @@ -13,6 +13,8 @@ namespace zen { class ParallelWork { public: + class ExternalWorkToken; + ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag, WorkerThreadPool::EMode Mode); ~ParallelWork(); @@ -74,9 +76,53 @@ public: Latch& PendingWork() { return m_PendingWork; } + // Register a unit of work whose completion is signalled out-of-band (typically + // from an async callback firing on a different thread). Counter increments now; + // the returned token's Complete()/Fail() decrements. Used by S3AsyncStorage so + // in-flight S3 requests count against ParallelWork without occupying a worker. + [[nodiscard]] ExternalWorkToken RegisterExternal(); + + class ExternalWorkToken + { + public: + ExternalWorkToken() = default; + + ExternalWorkToken(const ExternalWorkToken&) = delete; + ExternalWorkToken& operator=(const ExternalWorkToken&) = delete; + + ExternalWorkToken(ExternalWorkToken&& Other) noexcept : m_Owner(Other.m_Owner) { Other.m_Owner = nullptr; } + ExternalWorkToken& operator=(ExternalWorkToken&& Other) noexcept + { + if (this != &Other) + { + Release(); + m_Owner = Other.m_Owner; + Other.m_Owner = nullptr; + } + return *this; + } + + ~ExternalWorkToken() { Release(); } + + void Complete(); + void Fail(std::exception_ptr Ex); + + bool IsActive() const { return m_Owner != nullptr; } + + private: + friend class ParallelWork; + + explicit ExternalWorkToken(ParallelWork* Owner) : m_Owner(Owner) {} + + void Release(); + + ParallelWork* m_Owner = nullptr; + }; + private: ExceptionCallback DefaultErrorFunction(); void RethrowErrors(); + void RecordExternalError(std::exception_ptr Ex); std::atomic<bool>& m_AbortFlag; std::atomic<bool>& m_PauseFlag; |