aboutsummaryrefslogtreecommitdiff
path: root/src/zencore/include
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-05-05 14:59:21 +0200
committerGitHub Enterprise <[email protected]>2026-05-05 14:59:21 +0200
commit46f456ffd4d0717a035253ff9076ca6ee664e536 (patch)
tree69d7a9a43b9874fd3990c43aa5ff4135c35d53d9 /src/zencore/include
parentwatchdog ephemeral port exhaust (#1022) (diff)
downloadarchived-zen-46f456ffd4d0717a035253ff9076ca6ee664e536.tar.xz
archived-zen-46f456ffd4d0717a035253ff9076ca6ee664e536.zip
hub async s3 client (#1024)
- Feature: `AsyncHttpClient` adds cancellable request tokens, streaming GET to a file (`AsyncDownload`), zero-copy chunk-callback GET (`AsyncStream`), pull-mode body source for streaming `AsyncPut`, retry layer mirroring the synchronous client, and a submit-side in-flight cap (`HttpClientSettings::MaxConcurrentRequests`) so hub-scale fanout against a single host cannot stall queued handles into curl's connect-timeout window - Feature: Hub hydration can route S3 transfers through a non-blocking `AsyncHttpClient` (curl_multi + asio) backed by a single io thread; hydrate and dehydrate now pipeline requests instead of blocking worker threads - `--hub-hydration-async-enabled` (Lua: `hub.hydration.async.enabled`, default true) - `--hub-hydration-async-max-concurrent-requests` (Lua: `hub.hydration.async.maxconcurrentrequests`, default `clamp(cpu*4, 128, 512)`) - Feature: Hub provision/deprovision/obliterate now run as two phases on separate worker pools so per-module hydration cannot starve child-process spawn/despawn (and vice versa) - New `--hub-instance-spawn-threads` (Lua: `hub.instance.spawnthreads`, default `clamp(cpu/8, 4, 16)`) drives child-process spawn/despawn - `--hub-instance-provision-threads` (Lua: `hub.instance.provisionthreads`) now drives per-module hydrate/dehydrate scheduling only; default changed from `max(cpu/4, 2)` to `clamp(cpu/8, 4, 12)` - `--hub-hydration-threads` (Lua: `hub.hydration.threads`) now controls per-file workers inside a single hydrate/dehydrate; default changed from `max(cpu/4, 2)` to `clamp(cpu/8, 4, 12)` - Feature: `AsyncHttpClient` owns its `asio::io_context` and one io thread by default; the `(BaseUri, io_context&)` constructor is preserved for callers that want to share an externally-driven `io_context` across clients (caller MUST keep the loop running until the client destructs) - Feature: `Hub::Configuration` C++ struct fields renamed (`OptionalProvisionWorkerPool`/`OptionalHydrationWorkerPool` -> `OptionalProvisionPool`/`OptionalSpawnPool`/`OptionalHydrationPool`). Embedders constructing `Hub` directly must update field names; provision and spawn pools must both be set or both null (asserted at construction). - Bugfix: `S3Client` signing-key cache no longer returns stale signatures after IMDS-rotated credentials change `AccessKeyId`; cache is now keyed on `(DateStamp, AccessKeyId)`
Diffstat (limited to 'src/zencore/include')
-rw-r--r--src/zencore/include/zencore/parallelwork.h46
1 files changed, 46 insertions, 0 deletions
diff --git a/src/zencore/include/zencore/parallelwork.h b/src/zencore/include/zencore/parallelwork.h
index 536b0a056..d9b20b9d7 100644
--- a/src/zencore/include/zencore/parallelwork.h
+++ b/src/zencore/include/zencore/parallelwork.h
@@ -13,6 +13,8 @@ namespace zen {
class ParallelWork
{
public:
+ class ExternalWorkToken;
+
ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag, WorkerThreadPool::EMode Mode);
~ParallelWork();
@@ -74,9 +76,53 @@ public:
Latch& PendingWork() { return m_PendingWork; }
+ // Register a unit of work whose completion is signalled out-of-band (typically
+ // from an async callback firing on a different thread). Counter increments now;
+ // the returned token's Complete()/Fail() decrements. Used by S3AsyncStorage so
+ // in-flight S3 requests count against ParallelWork without occupying a worker.
+ [[nodiscard]] ExternalWorkToken RegisterExternal();
+
+ class ExternalWorkToken
+ {
+ public:
+ ExternalWorkToken() = default;
+
+ ExternalWorkToken(const ExternalWorkToken&) = delete;
+ ExternalWorkToken& operator=(const ExternalWorkToken&) = delete;
+
+ ExternalWorkToken(ExternalWorkToken&& Other) noexcept : m_Owner(Other.m_Owner) { Other.m_Owner = nullptr; }
+ ExternalWorkToken& operator=(ExternalWorkToken&& Other) noexcept
+ {
+ if (this != &Other)
+ {
+ Release();
+ m_Owner = Other.m_Owner;
+ Other.m_Owner = nullptr;
+ }
+ return *this;
+ }
+
+ ~ExternalWorkToken() { Release(); }
+
+ void Complete();
+ void Fail(std::exception_ptr Ex);
+
+ bool IsActive() const { return m_Owner != nullptr; }
+
+ private:
+ friend class ParallelWork;
+
+ explicit ExternalWorkToken(ParallelWork* Owner) : m_Owner(Owner) {}
+
+ void Release();
+
+ ParallelWork* m_Owner = nullptr;
+ };
+
private:
ExceptionCallback DefaultErrorFunction();
void RethrowErrors();
+ void RecordExternalError(std::exception_ptr Ex);
std::atomic<bool>& m_AbortFlag;
std::atomic<bool>& m_PauseFlag;