From 46f456ffd4d0717a035253ff9076ca6ee664e536 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 5 May 2026 14:59:21 +0200 Subject: hub async s3 client (#1024) - Feature: `AsyncHttpClient` adds cancellable request tokens, streaming GET to a file (`AsyncDownload`), zero-copy chunk-callback GET (`AsyncStream`), pull-mode body source for streaming `AsyncPut`, retry layer mirroring the synchronous client, and a submit-side in-flight cap (`HttpClientSettings::MaxConcurrentRequests`) so hub-scale fanout against a single host cannot stall queued handles into curl's connect-timeout window - Feature: Hub hydration can route S3 transfers through a non-blocking `AsyncHttpClient` (curl_multi + asio) backed by a single io thread; hydrate and dehydrate now pipeline requests instead of blocking worker threads - `--hub-hydration-async-enabled` (Lua: `hub.hydration.async.enabled`, default true) - `--hub-hydration-async-max-concurrent-requests` (Lua: `hub.hydration.async.maxconcurrentrequests`, default `clamp(cpu*4, 128, 512)`) - Feature: Hub provision/deprovision/obliterate now run as two phases on separate worker pools so per-module hydration cannot starve child-process spawn/despawn (and vice versa) - New `--hub-instance-spawn-threads` (Lua: `hub.instance.spawnthreads`, default `clamp(cpu/8, 4, 16)`) drives child-process spawn/despawn - `--hub-instance-provision-threads` (Lua: `hub.instance.provisionthreads`) now drives per-module hydrate/dehydrate scheduling only; default changed from `max(cpu/4, 2)` to `clamp(cpu/8, 4, 12)` - `--hub-hydration-threads` (Lua: `hub.hydration.threads`) now controls per-file workers inside a single hydrate/dehydrate; default changed from `max(cpu/4, 2)` to `clamp(cpu/8, 4, 12)` - Feature: `AsyncHttpClient` owns its `asio::io_context` and one io thread by default; the `(BaseUri, io_context&)` constructor is preserved for callers that want to share an externally-driven `io_context` across clients (caller MUST keep the loop running until the client destructs) - Feature: `Hub::Configuration` C++ struct fields renamed (`OptionalProvisionWorkerPool`/`OptionalHydrationWorkerPool` -> `OptionalProvisionPool`/`OptionalSpawnPool`/`OptionalHydrationPool`). Embedders constructing `Hub` directly must update field names; provision and spawn pools must both be set or both null (asserted at construction). - Bugfix: `S3Client` signing-key cache no longer returns stale signatures after IMDS-rotated credentials change `AccessKeyId`; cache is now keyed on `(DateStamp, AccessKeyId)` --- src/zencore/parallelwork.cpp | 156 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 156 insertions(+) (limited to 'src/zencore/parallelwork.cpp') diff --git a/src/zencore/parallelwork.cpp b/src/zencore/parallelwork.cpp index 94696f479..ec00fe0bc 100644 --- a/src/zencore/parallelwork.cpp +++ b/src/zencore/parallelwork.cpp @@ -2,6 +2,7 @@ #include +#include #include #include #include @@ -11,6 +12,8 @@ #if ZEN_WITH_TESTS # include + +# include #endif // ZEN_WITH_TESTS namespace zen { @@ -89,6 +92,65 @@ ParallelWork::DefaultErrorFunction() }; } +void +ParallelWork::RecordExternalError(std::exception_ptr Ex) +{ + m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex); }); + m_AbortFlag = true; +} + +ParallelWork::ExternalWorkToken +ParallelWork::RegisterExternal() +{ + ZEN_ASSERT(!m_DispatchComplete); + m_PendingWork.AddCount(1); + return ExternalWorkToken(this); +} + +void +ParallelWork::ExternalWorkToken::Complete() +{ + ZEN_ASSERT(m_Owner != nullptr); + m_Owner->m_PendingWork.CountDown(); + m_Owner = nullptr; +} + +void +ParallelWork::ExternalWorkToken::Fail(std::exception_ptr Ex) +{ + ZEN_ASSERT(m_Owner != nullptr); + // Null exception_ptr would propagate as std::bad_exception via + // rethrow_exception(nullptr) and mask the real failure mode. Catches + // patterns like MakeGuard([Token]{ Token->Fail(std::current_exception()); }) + // firing on a normal-return path where no exception is in flight. + ZEN_ASSERT(Ex != nullptr); + m_Owner->RecordExternalError(Ex); + m_Owner->m_PendingWork.CountDown(); + m_Owner = nullptr; +} + +void +ParallelWork::ExternalWorkToken::Release() +{ + if (m_Owner != nullptr) + { + // Tests should fail loudly so that any leaked path surfaces immediately; + // in production we keep the safety-net countdown so a leak does not deadlock + // the latch but log it as an error rather than a warning - this is always + // a programming bug. +#if ZEN_WITH_TESTS + ZEN_ASSERT_FORMAT(false, "ParallelWork::ExternalWorkToken destroyed without Complete()/Fail()"); +#else + ZEN_ERROR("ParallelWork::ExternalWorkToken destroyed without Complete()/Fail(); decrementing latch as safety net"); + // Surface as an error from Wait()/RethrowErrors() so the caller does not see a phantom success. + m_Owner->RecordExternalError( + std::make_exception_ptr(std::runtime_error("ParallelWork::ExternalWorkToken destroyed without Complete()/Fail()"))); +#endif + m_Owner->m_PendingWork.CountDown(); + m_Owner = nullptr; + } +} + void ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback) { @@ -257,6 +319,100 @@ TEST_CASE("parallellwork.limitqueue") Work.Wait(); } +TEST_CASE("parallellwork.external_basic") +{ + std::atomic AbortFlag; + std::atomic PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + std::vector Tokens; + for (uint32_t I = 0; I < 5; I++) + { + Tokens.push_back(Work.RegisterExternal()); + } + for (auto& Token : Tokens) + { + Token.Complete(); + } + + Work.Wait(); + CHECK_FALSE(AbortFlag.load()); +} + +TEST_CASE("parallellwork.external_completes_from_other_thread") +{ + std::atomic AbortFlag; + std::atomic PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + auto Token = Work.RegisterExternal(); + std::thread Worker([Token = std::move(Token)]() mutable { + Sleep(20); + Token.Complete(); + }); + + Work.Wait(); + Worker.join(); + CHECK_FALSE(AbortFlag.load()); +} + +TEST_CASE("parallellwork.external_fail_propagates_exception") +{ + std::atomic AbortFlag; + std::atomic PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + auto Token = Work.RegisterExternal(); + try + { + throw std::runtime_error("external work failed"); + } + catch (...) + { + Token.Fail(std::current_exception()); + } + + CHECK_THROWS_WITH(Work.Wait(), "external work failed"); + CHECK(AbortFlag.load()); +} + +TEST_CASE("parallellwork.external_mixed_with_scheduled") +{ + WorkerThreadPool WorkerPool(2); + + std::atomic AbortFlag; + std::atomic PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + std::atomic ScheduledCount = 0; + for (uint32_t I = 0; I < 3; I++) + { + Work.ScheduleWork(WorkerPool, [&ScheduledCount](std::atomic& AbortFlag) { + ZEN_UNUSED(AbortFlag); + ScheduledCount++; + }); + } + + std::vector Tokens; + for (uint32_t I = 0; I < 3; I++) + { + Tokens.push_back(Work.RegisterExternal()); + } + + std::thread Completer([&]() { + for (auto& Token : Tokens) + { + Sleep(5); + Token.Complete(); + } + }); + + Work.Wait(); + Completer.join(); + + CHECK_EQ(ScheduledCount.load(), 3u); +} + TEST_SUITE_END(); void -- cgit v1.2.3