aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/orchestratorservice.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencompute/orchestratorservice.cpp')
-rw-r--r--src/zencompute/orchestratorservice.cpp29
1 files changed, 27 insertions, 2 deletions
diff --git a/src/zencompute/orchestratorservice.cpp b/src/zencompute/orchestratorservice.cpp
index 9ea695305..aee8fa63a 100644
--- a/src/zencompute/orchestratorservice.cpp
+++ b/src/zencompute/orchestratorservice.cpp
@@ -31,7 +31,7 @@ OrchestratorService::~OrchestratorService()
}
CbObject
-OrchestratorService::GetWorkerList()
+OrchestratorService::GetWorkerList(const WorkerAnnotator& Annotate)
{
ZEN_TRACE_CPU("OrchestratorService::GetWorkerList");
CbObjectWriter Cbo;
@@ -71,6 +71,10 @@ OrchestratorService::GetWorkerList()
Cbo << "ws_connected" << true;
}
Cbo << "dt" << Worker.LastSeen.GetElapsedTimeMs();
+ if (Annotate)
+ {
+ Annotate(WorkerId, Cbo);
+ }
Cbo.EndObject();
}
});
@@ -144,6 +148,12 @@ OrchestratorService::AnnounceWorker(const WorkerAnnouncement& Ann)
}
}
+void
+OrchestratorService::SetProvisionerStateProvider(IProvisionerStateProvider* Provider)
+{
+ m_Provisioner.store(Provider, std::memory_order_release);
+}
+
bool
OrchestratorService::IsWorkerWebSocketEnabled() const
{
@@ -607,6 +617,14 @@ OrchestratorService::ProbeThreadFunction()
continue;
}
+ // Check if the provisioner knows this worker is draining — if so,
+ // unreachability is expected and should not be logged as a warning.
+ bool IsDraining = false;
+ if (IProvisionerStateProvider* Prov = m_Provisioner.load(std::memory_order_acquire))
+ {
+ IsDraining = Prov->GetAgentStatus(Snap.Id) == AgentProvisioningStatus::Draining;
+ }
+
ReachableState NewState = ReachableState::Unreachable;
try
@@ -621,7 +639,10 @@ OrchestratorService::ProbeThreadFunction()
}
catch (const std::exception& Ex)
{
- ZEN_WARN("probe failed for worker {} ({}): {}", Snap.Id, Snap.Uri, Ex.what());
+ if (!IsDraining)
+ {
+ ZEN_WARN("probe failed for worker {} ({}): {}", Snap.Id, Snap.Uri, Ex.what());
+ }
}
ReachableState PrevState = ReachableState::Unknown;
@@ -646,6 +667,10 @@ OrchestratorService::ProbeThreadFunction()
{
ZEN_INFO("worker {} ({}) is now reachable", Snap.Id, Snap.Uri);
}
+ else if (IsDraining)
+ {
+ ZEN_INFO("worker {} ({}) shut down (draining)", Snap.Id, Snap.Uri);
+ }
else if (PrevState == ReachableState::Reachable)
{
ZEN_WARN("worker {} ({}) is no longer reachable", Snap.Id, Snap.Uri);