diff options
| author | Dan Engelbrecht <[email protected]> | 2026-05-05 14:59:21 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-05-05 14:59:21 +0200 |
| commit | 46f456ffd4d0717a035253ff9076ca6ee664e536 (patch) | |
| tree | 69d7a9a43b9874fd3990c43aa5ff4135c35d53d9 /src/zencore/parallelwork.cpp | |
| parent | watchdog ephemeral port exhaust (#1022) (diff) | |
| download | archived-zen-46f456ffd4d0717a035253ff9076ca6ee664e536.tar.xz archived-zen-46f456ffd4d0717a035253ff9076ca6ee664e536.zip | |
hub async s3 client (#1024)
- Feature: `AsyncHttpClient` adds cancellable request tokens, streaming GET to a file (`AsyncDownload`), zero-copy chunk-callback GET (`AsyncStream`), pull-mode body source for streaming `AsyncPut`, retry layer mirroring the synchronous client, and a submit-side in-flight cap (`HttpClientSettings::MaxConcurrentRequests`) so hub-scale fanout against a single host cannot stall queued handles into curl's connect-timeout window
- Feature: Hub hydration can route S3 transfers through a non-blocking `AsyncHttpClient` (curl_multi + asio) backed by a single io thread; hydrate and dehydrate now pipeline requests instead of blocking worker threads
- `--hub-hydration-async-enabled` (Lua: `hub.hydration.async.enabled`, default true)
- `--hub-hydration-async-max-concurrent-requests` (Lua: `hub.hydration.async.maxconcurrentrequests`, default `clamp(cpu*4, 128, 512)`)
- Feature: Hub provision/deprovision/obliterate now run as two phases on separate worker pools so per-module hydration cannot starve child-process spawn/despawn (and vice versa)
- New `--hub-instance-spawn-threads` (Lua: `hub.instance.spawnthreads`, default `clamp(cpu/8, 4, 16)`) drives child-process spawn/despawn
- `--hub-instance-provision-threads` (Lua: `hub.instance.provisionthreads`) now drives per-module hydrate/dehydrate scheduling only; default changed from `max(cpu/4, 2)` to `clamp(cpu/8, 4, 12)`
- `--hub-hydration-threads` (Lua: `hub.hydration.threads`) now controls per-file workers inside a single hydrate/dehydrate; default changed from `max(cpu/4, 2)` to `clamp(cpu/8, 4, 12)`
- Feature: `AsyncHttpClient` owns its `asio::io_context` and one io thread by default; the `(BaseUri, io_context&)` constructor is preserved for callers that want to share an externally-driven `io_context` across clients (caller MUST keep the loop running until the client destructs)
- Feature: `Hub::Configuration` C++ struct fields renamed (`OptionalProvisionWorkerPool`/`OptionalHydrationWorkerPool` -> `OptionalProvisionPool`/`OptionalSpawnPool`/`OptionalHydrationPool`). Embedders constructing `Hub` directly must update field names; provision and spawn pools must both be set or both null (asserted at construction).
- Bugfix: `S3Client` signing-key cache no longer returns stale signatures after IMDS-rotated credentials change `AccessKeyId`; cache is now keyed on `(DateStamp, AccessKeyId)`
Diffstat (limited to 'src/zencore/parallelwork.cpp')
| -rw-r--r-- | src/zencore/parallelwork.cpp | 156 |
1 files changed, 156 insertions, 0 deletions
diff --git a/src/zencore/parallelwork.cpp b/src/zencore/parallelwork.cpp index 94696f479..ec00fe0bc 100644 --- a/src/zencore/parallelwork.cpp +++ b/src/zencore/parallelwork.cpp @@ -2,6 +2,7 @@ #include <zencore/parallelwork.h> +#include <zencore/assertfmt.h> #include <zencore/callstack.h> #include <zencore/except.h> #include <zencore/fmtutils.h> @@ -11,6 +12,8 @@ #if ZEN_WITH_TESTS # include <zencore/testing.h> + +# include <thread> #endif // ZEN_WITH_TESTS namespace zen { @@ -90,6 +93,65 @@ ParallelWork::DefaultErrorFunction() } void +ParallelWork::RecordExternalError(std::exception_ptr Ex) +{ + m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex); }); + m_AbortFlag = true; +} + +ParallelWork::ExternalWorkToken +ParallelWork::RegisterExternal() +{ + ZEN_ASSERT(!m_DispatchComplete); + m_PendingWork.AddCount(1); + return ExternalWorkToken(this); +} + +void +ParallelWork::ExternalWorkToken::Complete() +{ + ZEN_ASSERT(m_Owner != nullptr); + m_Owner->m_PendingWork.CountDown(); + m_Owner = nullptr; +} + +void +ParallelWork::ExternalWorkToken::Fail(std::exception_ptr Ex) +{ + ZEN_ASSERT(m_Owner != nullptr); + // Null exception_ptr would propagate as std::bad_exception via + // rethrow_exception(nullptr) and mask the real failure mode. Catches + // patterns like MakeGuard([Token]{ Token->Fail(std::current_exception()); }) + // firing on a normal-return path where no exception is in flight. + ZEN_ASSERT(Ex != nullptr); + m_Owner->RecordExternalError(Ex); + m_Owner->m_PendingWork.CountDown(); + m_Owner = nullptr; +} + +void +ParallelWork::ExternalWorkToken::Release() +{ + if (m_Owner != nullptr) + { + // Tests should fail loudly so that any leaked path surfaces immediately; + // in production we keep the safety-net countdown so a leak does not deadlock + // the latch but log it as an error rather than a warning - this is always + // a programming bug. +#if ZEN_WITH_TESTS + ZEN_ASSERT_FORMAT(false, "ParallelWork::ExternalWorkToken destroyed without Complete()/Fail()"); +#else + ZEN_ERROR("ParallelWork::ExternalWorkToken destroyed without Complete()/Fail(); decrementing latch as safety net"); + // Surface as an error from Wait()/RethrowErrors() so the caller does not see a phantom success. + m_Owner->RecordExternalError( + std::make_exception_ptr(std::runtime_error("ParallelWork::ExternalWorkToken destroyed without Complete()/Fail()"))); +#endif + m_Owner->m_PendingWork.CountDown(); + m_Owner = nullptr; + } +} + +void ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback) { ZEN_ASSERT(!m_DispatchComplete); @@ -257,6 +319,100 @@ TEST_CASE("parallellwork.limitqueue") Work.Wait(); } +TEST_CASE("parallellwork.external_basic") +{ + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + std::vector<ParallelWork::ExternalWorkToken> Tokens; + for (uint32_t I = 0; I < 5; I++) + { + Tokens.push_back(Work.RegisterExternal()); + } + for (auto& Token : Tokens) + { + Token.Complete(); + } + + Work.Wait(); + CHECK_FALSE(AbortFlag.load()); +} + +TEST_CASE("parallellwork.external_completes_from_other_thread") +{ + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + auto Token = Work.RegisterExternal(); + std::thread Worker([Token = std::move(Token)]() mutable { + Sleep(20); + Token.Complete(); + }); + + Work.Wait(); + Worker.join(); + CHECK_FALSE(AbortFlag.load()); +} + +TEST_CASE("parallellwork.external_fail_propagates_exception") +{ + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + auto Token = Work.RegisterExternal(); + try + { + throw std::runtime_error("external work failed"); + } + catch (...) + { + Token.Fail(std::current_exception()); + } + + CHECK_THROWS_WITH(Work.Wait(), "external work failed"); + CHECK(AbortFlag.load()); +} + +TEST_CASE("parallellwork.external_mixed_with_scheduled") +{ + WorkerThreadPool WorkerPool(2); + + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + std::atomic<uint32_t> ScheduledCount = 0; + for (uint32_t I = 0; I < 3; I++) + { + Work.ScheduleWork(WorkerPool, [&ScheduledCount](std::atomic<bool>& AbortFlag) { + ZEN_UNUSED(AbortFlag); + ScheduledCount++; + }); + } + + std::vector<ParallelWork::ExternalWorkToken> Tokens; + for (uint32_t I = 0; I < 3; I++) + { + Tokens.push_back(Work.RegisterExternal()); + } + + std::thread Completer([&]() { + for (auto& Token : Tokens) + { + Sleep(5); + Token.Complete(); + } + }); + + Work.Wait(); + Completer.join(); + + CHECK_EQ(ScheduledCount.load(), 3u); +} + TEST_SUITE_END(); void |