diff options
| author | Stefan Boberg <[email protected]> | 2026-03-18 00:14:52 +0100 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2026-03-18 00:14:52 +0100 |
| commit | 5ccc80434fa4e6f92bd98c4afe0df46e214e4c64 (patch) | |
| tree | 2681155020af73ac8cdd99665aacf96185308a98 /src/zencompute | |
| parent | updated zencompute CLAUDE.md (diff) | |
| download | zen-sb/compute-batch.tar.xz zen-sb/compute-batch.zip | |
Propagate errors from FunctionRunner::RegisterWorkersb/compute-batch
Change RegisterWorker return type from void to [[nodiscard]] bool across
the FunctionRunner hierarchy so callers can detect registration failures
at the point of registration rather than at action execution time.
- FunctionRunner pure virtual: void -> [[nodiscard]] bool
- LocalProcessRunner: always returns true (debug dump only)
- RemoteHttpRunner: returns false on the three HTTP error paths (removes
the TODO comments), true on success/already-known
- BaseRunnerGroup: returns true only if all runners succeeded
- Callers in computeservice.cpp log warnings on failure but continue
best-effort
Diffstat (limited to 'src/zencompute')
| -rw-r--r-- | src/zencompute/computeservice.cpp | 15 | ||||
| -rw-r--r-- | src/zencompute/runners/functionrunner.cpp | 11 | ||||
| -rw-r--r-- | src/zencompute/runners/functionrunner.h | 6 | ||||
| -rw-r--r-- | src/zencompute/runners/localrunner.cpp | 4 | ||||
| -rw-r--r-- | src/zencompute/runners/localrunner.h | 2 | ||||
| -rw-r--r-- | src/zencompute/runners/remotehttprunner.cpp | 15 | ||||
| -rw-r--r-- | src/zencompute/runners/remotehttprunner.h | 2 |
7 files changed, 36 insertions, 19 deletions
diff --git a/src/zencompute/computeservice.cpp b/src/zencompute/computeservice.cpp index 0638b923f..92901de64 100644 --- a/src/zencompute/computeservice.cpp +++ b/src/zencompute/computeservice.cpp @@ -821,8 +821,14 @@ ComputeServiceSession::Impl::RegisterWorker(CbPackage Worker) // different descriptor. Thus we only need to call this the first time, when the // worker is added - m_LocalRunnerGroup.RegisterWorker(Worker); - m_RemoteRunnerGroup.RegisterWorker(Worker); + if (!m_LocalRunnerGroup.RegisterWorker(Worker)) + { + ZEN_WARN("failed to register worker {} on one or more local runners", WorkerId); + } + if (!m_RemoteRunnerGroup.RegisterWorker(Worker)) + { + ZEN_WARN("failed to register worker {} on one or more remote runners", WorkerId); + } if (m_Recorder) { @@ -868,7 +874,10 @@ ComputeServiceSession::Impl::SyncWorkersToRunner(FunctionRunner& Runner) for (const CbPackage& Worker : Workers) { - Runner.RegisterWorker(Worker); + if (!Runner.RegisterWorker(Worker)) + { + ZEN_WARN("failed to sync worker {} to runner", Worker.GetObjectHash()); + } } } diff --git a/src/zencompute/runners/functionrunner.cpp b/src/zencompute/runners/functionrunner.cpp index 3ed98d363..4f116e7d8 100644 --- a/src/zencompute/runners/functionrunner.cpp +++ b/src/zencompute/runners/functionrunner.cpp @@ -215,15 +215,22 @@ BaseRunnerGroup::GetSubmittedActionCount() return TotalCount; } -void +bool BaseRunnerGroup::RegisterWorker(CbPackage Worker) { RwLock::SharedLockScope _(m_RunnersLock); + bool AllSucceeded = true; + for (auto& Runner : m_Runners) { - Runner->RegisterWorker(Worker); + if (!Runner->RegisterWorker(Worker)) + { + AllSucceeded = false; + } } + + return AllSucceeded; } void diff --git a/src/zencompute/runners/functionrunner.h b/src/zencompute/runners/functionrunner.h index 37d3ce04c..56c3f3af0 100644 --- a/src/zencompute/runners/functionrunner.h +++ b/src/zencompute/runners/functionrunner.h @@ -29,8 +29,8 @@ public: FunctionRunner(std::filesystem::path BasePath); virtual ~FunctionRunner() = 0; - virtual void Shutdown() = 0; - virtual void RegisterWorker(const CbPackage& WorkerPackage) = 0; + virtual void Shutdown() = 0; + [[nodiscard]] virtual bool RegisterWorker(const CbPackage& WorkerPackage) = 0; [[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) = 0; [[nodiscard]] virtual size_t GetSubmittedActionCount() = 0; @@ -63,7 +63,7 @@ public: SubmitResult SubmitAction(Ref<RunnerAction> Action); std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions); size_t GetSubmittedActionCount(); - void RegisterWorker(CbPackage Worker); + [[nodiscard]] bool RegisterWorker(CbPackage Worker); void Shutdown(); bool CancelAction(int ActionLsn); void CancelRemoteQueue(int QueueId); diff --git a/src/zencompute/runners/localrunner.cpp b/src/zencompute/runners/localrunner.cpp index ec08d468f..b61e0a46f 100644 --- a/src/zencompute/runners/localrunner.cpp +++ b/src/zencompute/runners/localrunner.cpp @@ -154,7 +154,7 @@ LocalProcessRunner::CreateNewSandbox() return Path; } -void +bool LocalProcessRunner::RegisterWorker(const CbPackage& WorkerPackage) { ZEN_TRACE_CPU("LocalProcessRunner::RegisterWorker"); @@ -175,6 +175,8 @@ LocalProcessRunner::RegisterWorker(const CbPackage& WorkerPackage) ZEN_INFO("dumped worker '{}' to 'file://{}'", WorkerId, Path); } + + return true; } size_t diff --git a/src/zencompute/runners/localrunner.h b/src/zencompute/runners/localrunner.h index 7493e980b..b8cff6826 100644 --- a/src/zencompute/runners/localrunner.h +++ b/src/zencompute/runners/localrunner.h @@ -51,7 +51,7 @@ public: ~LocalProcessRunner(); virtual void Shutdown() override; - virtual void RegisterWorker(const CbPackage& WorkerPackage) override; + [[nodiscard]] virtual bool RegisterWorker(const CbPackage& WorkerPackage) override; [[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) override; [[nodiscard]] virtual bool IsHealthy() override { return true; } [[nodiscard]] virtual size_t GetSubmittedActionCount() override; diff --git a/src/zencompute/runners/remotehttprunner.cpp b/src/zencompute/runners/remotehttprunner.cpp index f1cabb176..ce6a81173 100644 --- a/src/zencompute/runners/remotehttprunner.cpp +++ b/src/zencompute/runners/remotehttprunner.cpp @@ -112,7 +112,7 @@ RemoteHttpRunner::Shutdown() } } -void +bool RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage) { ZEN_TRACE_CPU("RemoteHttpRunner::RegisterWorker"); @@ -174,15 +174,13 @@ RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage) if (!IsHttpSuccessCode(PayloadResponse.StatusCode)) { ZEN_ERROR("ERROR: unable to register payloads for worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl); - - // TODO: propagate error + return false; } } else if (!IsHttpSuccessCode(DescResponse.StatusCode)) { ZEN_ERROR("ERROR: unable to register worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl); - - // TODO: propagate error + return false; } else { @@ -201,9 +199,10 @@ RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage) WorkerUrl, (int)WorkerResponse.StatusCode, ToString(WorkerResponse.StatusCode)); - - // TODO: propagate error + return false; } + + return true; } size_t @@ -378,7 +377,7 @@ RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action) m_Http.GetBaseUri(), ActionId); - RegisterWorker(Action->Worker.Descriptor); + (void)RegisterWorker(Action->Worker.Descriptor); } else { diff --git a/src/zencompute/runners/remotehttprunner.h b/src/zencompute/runners/remotehttprunner.h index 5e8f0d2bd..c17d0cf2a 100644 --- a/src/zencompute/runners/remotehttprunner.h +++ b/src/zencompute/runners/remotehttprunner.h @@ -47,7 +47,7 @@ public: ~RemoteHttpRunner(); virtual void Shutdown() override; - virtual void RegisterWorker(const CbPackage& WorkerPackage) override; + [[nodiscard]] virtual bool RegisterWorker(const CbPackage& WorkerPackage) override; [[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) override; [[nodiscard]] virtual bool IsHealthy() override; [[nodiscard]] virtual size_t GetSubmittedActionCount() override; |