aboutsummaryrefslogtreecommitdiff
path: root/src/zencore/parallelwork.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-05-05 14:59:21 +0200
committerGitHub Enterprise <[email protected]>2026-05-05 14:59:21 +0200
commit46f456ffd4d0717a035253ff9076ca6ee664e536 (patch)
tree69d7a9a43b9874fd3990c43aa5ff4135c35d53d9 /src/zencore/parallelwork.cpp
parentwatchdog ephemeral port exhaust (#1022) (diff)
downloadarchived-zen-46f456ffd4d0717a035253ff9076ca6ee664e536.tar.xz
archived-zen-46f456ffd4d0717a035253ff9076ca6ee664e536.zip
hub async s3 client (#1024)
- Feature: `AsyncHttpClient` adds cancellable request tokens, streaming GET to a file (`AsyncDownload`), zero-copy chunk-callback GET (`AsyncStream`), pull-mode body source for streaming `AsyncPut`, retry layer mirroring the synchronous client, and a submit-side in-flight cap (`HttpClientSettings::MaxConcurrentRequests`) so hub-scale fanout against a single host cannot stall queued handles into curl's connect-timeout window - Feature: Hub hydration can route S3 transfers through a non-blocking `AsyncHttpClient` (curl_multi + asio) backed by a single io thread; hydrate and dehydrate now pipeline requests instead of blocking worker threads - `--hub-hydration-async-enabled` (Lua: `hub.hydration.async.enabled`, default true) - `--hub-hydration-async-max-concurrent-requests` (Lua: `hub.hydration.async.maxconcurrentrequests`, default `clamp(cpu*4, 128, 512)`) - Feature: Hub provision/deprovision/obliterate now run as two phases on separate worker pools so per-module hydration cannot starve child-process spawn/despawn (and vice versa) - New `--hub-instance-spawn-threads` (Lua: `hub.instance.spawnthreads`, default `clamp(cpu/8, 4, 16)`) drives child-process spawn/despawn - `--hub-instance-provision-threads` (Lua: `hub.instance.provisionthreads`) now drives per-module hydrate/dehydrate scheduling only; default changed from `max(cpu/4, 2)` to `clamp(cpu/8, 4, 12)` - `--hub-hydration-threads` (Lua: `hub.hydration.threads`) now controls per-file workers inside a single hydrate/dehydrate; default changed from `max(cpu/4, 2)` to `clamp(cpu/8, 4, 12)` - Feature: `AsyncHttpClient` owns its `asio::io_context` and one io thread by default; the `(BaseUri, io_context&)` constructor is preserved for callers that want to share an externally-driven `io_context` across clients (caller MUST keep the loop running until the client destructs) - Feature: `Hub::Configuration` C++ struct fields renamed (`OptionalProvisionWorkerPool`/`OptionalHydrationWorkerPool` -> `OptionalProvisionPool`/`OptionalSpawnPool`/`OptionalHydrationPool`). Embedders constructing `Hub` directly must update field names; provision and spawn pools must both be set or both null (asserted at construction). - Bugfix: `S3Client` signing-key cache no longer returns stale signatures after IMDS-rotated credentials change `AccessKeyId`; cache is now keyed on `(DateStamp, AccessKeyId)`
Diffstat (limited to 'src/zencore/parallelwork.cpp')
-rw-r--r--src/zencore/parallelwork.cpp156
1 files changed, 156 insertions, 0 deletions
diff --git a/src/zencore/parallelwork.cpp b/src/zencore/parallelwork.cpp
index 94696f479..ec00fe0bc 100644
--- a/src/zencore/parallelwork.cpp
+++ b/src/zencore/parallelwork.cpp
@@ -2,6 +2,7 @@
#include <zencore/parallelwork.h>
+#include <zencore/assertfmt.h>
#include <zencore/callstack.h>
#include <zencore/except.h>
#include <zencore/fmtutils.h>
@@ -11,6 +12,8 @@
#if ZEN_WITH_TESTS
# include <zencore/testing.h>
+
+# include <thread>
#endif // ZEN_WITH_TESTS
namespace zen {
@@ -90,6 +93,65 @@ ParallelWork::DefaultErrorFunction()
}
void
+ParallelWork::RecordExternalError(std::exception_ptr Ex)
+{
+ m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex); });
+ m_AbortFlag = true;
+}
+
+ParallelWork::ExternalWorkToken
+ParallelWork::RegisterExternal()
+{
+ ZEN_ASSERT(!m_DispatchComplete);
+ m_PendingWork.AddCount(1);
+ return ExternalWorkToken(this);
+}
+
+void
+ParallelWork::ExternalWorkToken::Complete()
+{
+ ZEN_ASSERT(m_Owner != nullptr);
+ m_Owner->m_PendingWork.CountDown();
+ m_Owner = nullptr;
+}
+
+void
+ParallelWork::ExternalWorkToken::Fail(std::exception_ptr Ex)
+{
+ ZEN_ASSERT(m_Owner != nullptr);
+ // Null exception_ptr would propagate as std::bad_exception via
+ // rethrow_exception(nullptr) and mask the real failure mode. Catches
+ // patterns like MakeGuard([Token]{ Token->Fail(std::current_exception()); })
+ // firing on a normal-return path where no exception is in flight.
+ ZEN_ASSERT(Ex != nullptr);
+ m_Owner->RecordExternalError(Ex);
+ m_Owner->m_PendingWork.CountDown();
+ m_Owner = nullptr;
+}
+
+void
+ParallelWork::ExternalWorkToken::Release()
+{
+ if (m_Owner != nullptr)
+ {
+ // Tests should fail loudly so that any leaked path surfaces immediately;
+ // in production we keep the safety-net countdown so a leak does not deadlock
+ // the latch but log it as an error rather than a warning - this is always
+ // a programming bug.
+#if ZEN_WITH_TESTS
+ ZEN_ASSERT_FORMAT(false, "ParallelWork::ExternalWorkToken destroyed without Complete()/Fail()");
+#else
+ ZEN_ERROR("ParallelWork::ExternalWorkToken destroyed without Complete()/Fail(); decrementing latch as safety net");
+ // Surface as an error from Wait()/RethrowErrors() so the caller does not see a phantom success.
+ m_Owner->RecordExternalError(
+ std::make_exception_ptr(std::runtime_error("ParallelWork::ExternalWorkToken destroyed without Complete()/Fail()")));
+#endif
+ m_Owner->m_PendingWork.CountDown();
+ m_Owner = nullptr;
+ }
+}
+
+void
ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback)
{
ZEN_ASSERT(!m_DispatchComplete);
@@ -257,6 +319,100 @@ TEST_CASE("parallellwork.limitqueue")
Work.Wait();
}
+TEST_CASE("parallellwork.external_basic")
+{
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ std::vector<ParallelWork::ExternalWorkToken> Tokens;
+ for (uint32_t I = 0; I < 5; I++)
+ {
+ Tokens.push_back(Work.RegisterExternal());
+ }
+ for (auto& Token : Tokens)
+ {
+ Token.Complete();
+ }
+
+ Work.Wait();
+ CHECK_FALSE(AbortFlag.load());
+}
+
+TEST_CASE("parallellwork.external_completes_from_other_thread")
+{
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ auto Token = Work.RegisterExternal();
+ std::thread Worker([Token = std::move(Token)]() mutable {
+ Sleep(20);
+ Token.Complete();
+ });
+
+ Work.Wait();
+ Worker.join();
+ CHECK_FALSE(AbortFlag.load());
+}
+
+TEST_CASE("parallellwork.external_fail_propagates_exception")
+{
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ auto Token = Work.RegisterExternal();
+ try
+ {
+ throw std::runtime_error("external work failed");
+ }
+ catch (...)
+ {
+ Token.Fail(std::current_exception());
+ }
+
+ CHECK_THROWS_WITH(Work.Wait(), "external work failed");
+ CHECK(AbortFlag.load());
+}
+
+TEST_CASE("parallellwork.external_mixed_with_scheduled")
+{
+ WorkerThreadPool WorkerPool(2);
+
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ std::atomic<uint32_t> ScheduledCount = 0;
+ for (uint32_t I = 0; I < 3; I++)
+ {
+ Work.ScheduleWork(WorkerPool, [&ScheduledCount](std::atomic<bool>& AbortFlag) {
+ ZEN_UNUSED(AbortFlag);
+ ScheduledCount++;
+ });
+ }
+
+ std::vector<ParallelWork::ExternalWorkToken> Tokens;
+ for (uint32_t I = 0; I < 3; I++)
+ {
+ Tokens.push_back(Work.RegisterExternal());
+ }
+
+ std::thread Completer([&]() {
+ for (auto& Token : Tokens)
+ {
+ Sleep(5);
+ Token.Complete();
+ }
+ });
+
+ Work.Wait();
+ Completer.join();
+
+ CHECK_EQ(ScheduledCount.load(), 3u);
+}
+
TEST_SUITE_END();
void