aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-18 00:14:52 +0100
committerStefan Boberg <[email protected]>2026-03-18 00:14:52 +0100
commit5ccc80434fa4e6f92bd98c4afe0df46e214e4c64 (patch)
tree2681155020af73ac8cdd99665aacf96185308a98 /src/zencompute
parentupdated zencompute CLAUDE.md (diff)
downloadzen-sb/compute-batch.tar.xz
zen-sb/compute-batch.zip
Propagate errors from FunctionRunner::RegisterWorkersb/compute-batch
Change RegisterWorker return type from void to [[nodiscard]] bool across the FunctionRunner hierarchy so callers can detect registration failures at the point of registration rather than at action execution time. - FunctionRunner pure virtual: void -> [[nodiscard]] bool - LocalProcessRunner: always returns true (debug dump only) - RemoteHttpRunner: returns false on the three HTTP error paths (removes the TODO comments), true on success/already-known - BaseRunnerGroup: returns true only if all runners succeeded - Callers in computeservice.cpp log warnings on failure but continue best-effort
Diffstat (limited to 'src/zencompute')
-rw-r--r--src/zencompute/computeservice.cpp15
-rw-r--r--src/zencompute/runners/functionrunner.cpp11
-rw-r--r--src/zencompute/runners/functionrunner.h6
-rw-r--r--src/zencompute/runners/localrunner.cpp4
-rw-r--r--src/zencompute/runners/localrunner.h2
-rw-r--r--src/zencompute/runners/remotehttprunner.cpp15
-rw-r--r--src/zencompute/runners/remotehttprunner.h2
7 files changed, 36 insertions, 19 deletions
diff --git a/src/zencompute/computeservice.cpp b/src/zencompute/computeservice.cpp
index 0638b923f..92901de64 100644
--- a/src/zencompute/computeservice.cpp
+++ b/src/zencompute/computeservice.cpp
@@ -821,8 +821,14 @@ ComputeServiceSession::Impl::RegisterWorker(CbPackage Worker)
// different descriptor. Thus we only need to call this the first time, when the
// worker is added
- m_LocalRunnerGroup.RegisterWorker(Worker);
- m_RemoteRunnerGroup.RegisterWorker(Worker);
+ if (!m_LocalRunnerGroup.RegisterWorker(Worker))
+ {
+ ZEN_WARN("failed to register worker {} on one or more local runners", WorkerId);
+ }
+ if (!m_RemoteRunnerGroup.RegisterWorker(Worker))
+ {
+ ZEN_WARN("failed to register worker {} on one or more remote runners", WorkerId);
+ }
if (m_Recorder)
{
@@ -868,7 +874,10 @@ ComputeServiceSession::Impl::SyncWorkersToRunner(FunctionRunner& Runner)
for (const CbPackage& Worker : Workers)
{
- Runner.RegisterWorker(Worker);
+ if (!Runner.RegisterWorker(Worker))
+ {
+ ZEN_WARN("failed to sync worker {} to runner", Worker.GetObjectHash());
+ }
}
}
diff --git a/src/zencompute/runners/functionrunner.cpp b/src/zencompute/runners/functionrunner.cpp
index 3ed98d363..4f116e7d8 100644
--- a/src/zencompute/runners/functionrunner.cpp
+++ b/src/zencompute/runners/functionrunner.cpp
@@ -215,15 +215,22 @@ BaseRunnerGroup::GetSubmittedActionCount()
return TotalCount;
}
-void
+bool
BaseRunnerGroup::RegisterWorker(CbPackage Worker)
{
RwLock::SharedLockScope _(m_RunnersLock);
+ bool AllSucceeded = true;
+
for (auto& Runner : m_Runners)
{
- Runner->RegisterWorker(Worker);
+ if (!Runner->RegisterWorker(Worker))
+ {
+ AllSucceeded = false;
+ }
}
+
+ return AllSucceeded;
}
void
diff --git a/src/zencompute/runners/functionrunner.h b/src/zencompute/runners/functionrunner.h
index 37d3ce04c..56c3f3af0 100644
--- a/src/zencompute/runners/functionrunner.h
+++ b/src/zencompute/runners/functionrunner.h
@@ -29,8 +29,8 @@ public:
FunctionRunner(std::filesystem::path BasePath);
virtual ~FunctionRunner() = 0;
- virtual void Shutdown() = 0;
- virtual void RegisterWorker(const CbPackage& WorkerPackage) = 0;
+ virtual void Shutdown() = 0;
+ [[nodiscard]] virtual bool RegisterWorker(const CbPackage& WorkerPackage) = 0;
[[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) = 0;
[[nodiscard]] virtual size_t GetSubmittedActionCount() = 0;
@@ -63,7 +63,7 @@ public:
SubmitResult SubmitAction(Ref<RunnerAction> Action);
std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions);
size_t GetSubmittedActionCount();
- void RegisterWorker(CbPackage Worker);
+ [[nodiscard]] bool RegisterWorker(CbPackage Worker);
void Shutdown();
bool CancelAction(int ActionLsn);
void CancelRemoteQueue(int QueueId);
diff --git a/src/zencompute/runners/localrunner.cpp b/src/zencompute/runners/localrunner.cpp
index ec08d468f..b61e0a46f 100644
--- a/src/zencompute/runners/localrunner.cpp
+++ b/src/zencompute/runners/localrunner.cpp
@@ -154,7 +154,7 @@ LocalProcessRunner::CreateNewSandbox()
return Path;
}
-void
+bool
LocalProcessRunner::RegisterWorker(const CbPackage& WorkerPackage)
{
ZEN_TRACE_CPU("LocalProcessRunner::RegisterWorker");
@@ -175,6 +175,8 @@ LocalProcessRunner::RegisterWorker(const CbPackage& WorkerPackage)
ZEN_INFO("dumped worker '{}' to 'file://{}'", WorkerId, Path);
}
+
+ return true;
}
size_t
diff --git a/src/zencompute/runners/localrunner.h b/src/zencompute/runners/localrunner.h
index 7493e980b..b8cff6826 100644
--- a/src/zencompute/runners/localrunner.h
+++ b/src/zencompute/runners/localrunner.h
@@ -51,7 +51,7 @@ public:
~LocalProcessRunner();
virtual void Shutdown() override;
- virtual void RegisterWorker(const CbPackage& WorkerPackage) override;
+ [[nodiscard]] virtual bool RegisterWorker(const CbPackage& WorkerPackage) override;
[[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) override;
[[nodiscard]] virtual bool IsHealthy() override { return true; }
[[nodiscard]] virtual size_t GetSubmittedActionCount() override;
diff --git a/src/zencompute/runners/remotehttprunner.cpp b/src/zencompute/runners/remotehttprunner.cpp
index f1cabb176..ce6a81173 100644
--- a/src/zencompute/runners/remotehttprunner.cpp
+++ b/src/zencompute/runners/remotehttprunner.cpp
@@ -112,7 +112,7 @@ RemoteHttpRunner::Shutdown()
}
}
-void
+bool
RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage)
{
ZEN_TRACE_CPU("RemoteHttpRunner::RegisterWorker");
@@ -174,15 +174,13 @@ RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage)
if (!IsHttpSuccessCode(PayloadResponse.StatusCode))
{
ZEN_ERROR("ERROR: unable to register payloads for worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl);
-
- // TODO: propagate error
+ return false;
}
}
else if (!IsHttpSuccessCode(DescResponse.StatusCode))
{
ZEN_ERROR("ERROR: unable to register worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl);
-
- // TODO: propagate error
+ return false;
}
else
{
@@ -201,9 +199,10 @@ RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage)
WorkerUrl,
(int)WorkerResponse.StatusCode,
ToString(WorkerResponse.StatusCode));
-
- // TODO: propagate error
+ return false;
}
+
+ return true;
}
size_t
@@ -378,7 +377,7 @@ RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action)
m_Http.GetBaseUri(),
ActionId);
- RegisterWorker(Action->Worker.Descriptor);
+ (void)RegisterWorker(Action->Worker.Descriptor);
}
else
{
diff --git a/src/zencompute/runners/remotehttprunner.h b/src/zencompute/runners/remotehttprunner.h
index 5e8f0d2bd..c17d0cf2a 100644
--- a/src/zencompute/runners/remotehttprunner.h
+++ b/src/zencompute/runners/remotehttprunner.h
@@ -47,7 +47,7 @@ public:
~RemoteHttpRunner();
virtual void Shutdown() override;
- virtual void RegisterWorker(const CbPackage& WorkerPackage) override;
+ [[nodiscard]] virtual bool RegisterWorker(const CbPackage& WorkerPackage) override;
[[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) override;
[[nodiscard]] virtual bool IsHealthy() override;
[[nodiscard]] virtual size_t GetSubmittedActionCount() override;