aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/computeservice.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-18 11:19:10 +0100
committerGitHub Enterprise <[email protected]>2026-03-18 11:19:10 +0100
commiteba410c4168e23d7908827eb34b7cf0c58a5dc48 (patch)
tree3cda8e8f3f81941d3bb5b84a8155350c5bb2068c /src/zencompute/computeservice.cpp
parentbugfix release - v5.7.23 (#851) (diff)
downloadzen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.tar.xz
zen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.zip
Compute batching (#849)
### Compute Batch Submission - Consolidate duplicated action submission logic in `httpcomputeservice` into a single `HandleSubmitAction` supporting both single-action and batch (actions array) payloads - Group actions by queue in `RemoteHttpRunner` and submit as batches with configurable chunk size, falling back to individual submission on failure - Extract shared helpers: `MakeErrorResult`, `ValidateQueueForEnqueue`, `ActivateActionInQueue`, `RemoveActionFromActiveMaps` ### Retracted Action State - Add `Retracted` state to `RunnerAction` for retry-free rescheduling — an explicit request to pull an action back and reschedule it on a different runner without incrementing `RetryCount` - Implement idempotent `RetractAction()` on `RunnerAction` and `ComputeServiceSession` - Add `POST jobs/{lsn}/retract` and `queues/{queueref}/jobs/{lsn}/retract` HTTP endpoints - Add state machine documentation and per-state comments to `RunnerAction` ### Compute Race Fixes - Fix race in `HandleActionUpdates` where actions enqueued between session abandon and scheduler tick were never abandoned, causing `GetActionResult` to return 202 indefinitely - Fix queue `ActiveCount` race where `NotifyQueueActionComplete` was called after releasing `m_ResultsLock`, allowing callers to observe stale counters immediately after `GetActionResult` returned OK ### Logging Optimization and ANSI improvements - Improve `AnsiColorStdoutSink` write efficiency — single write call, dirty-flag flush, `RwLock` instead of `std::mutex` - Move ANSI color emission from sink into formatters via `Formatter::SetColorEnabled()`; remove `ColorRangeStart`/`End` from `LogMessage` - Extract color helpers (`AnsiColorForLevel`, `StripAnsiSgrSequences`) into `helpers.h` - Strip upstream ANSI SGR escapes in non-color output mode. This enables colour in log messages without polluting log files with ANSI control sequences - Move `RotatingFileSink`, `JsonFormatter`, and `FullFormatter` from header-only to pimpl with `.cpp` files ### CLI / Exec Refactoring - Extract `ExecSessionRunner` class from ~920-line `ExecUsingSession` into focused methods and a `ExecSessionConfig` struct - Replace monolithic `ExecCommand` with subcommand-based architecture (`http`, `inproc`, `beacon`, `dump`, `buildlog`) - Allow parent options to appear after subcommand name by parsing subcommand args permissively and forwarding unmatched tokens to the parent parser ### Testing Improvements - Fix `--test-suite` filter being ignored due to accumulation with default wildcard filter - Add test suite banners to test listener output - Made `function.session.abandon_pending` test more robust ### Startup / Reliability Fixes - Fix silent exit when a second zenserver instance detects a port conflict — use `ZEN_CONSOLE_*` for log calls that precede `InitializeLogging()` - Fix two potential SIGSEGV paths during early startup: guard `sentry_options_new()` returning nullptr, and throw on `ZenServerState::Register()` returning nullptr instead of dereferencing - Fail on unrecognized zenserver `--mode` instead of silently defaulting to store ### Other - Show host details (hostname, platform, CPU count, memory) when discovering new compute workers - Move frontend `html.zip` from source tree into build directory - Add format specifications for Compact Binary and Compressed Buffer wire formats - Add `WriteCompactBinaryObject` to zencore - Extended `ConsoleTui` with additional functionality - Add `--vscode` option to `xmake sln` for clangd / `compile_commands.json` support - Disable compute/horde/nomad in release builds (not yet production-ready) - Disable unintended `ASIO_HAS_IO_URING` enablement - Fix crashpad patch missing leading whitespace - Clean up code triggering gcc false positives
Diffstat (limited to 'src/zencompute/computeservice.cpp')
-rw-r--r--src/zencompute/computeservice.cpp384
1 files changed, 300 insertions, 84 deletions
diff --git a/src/zencompute/computeservice.cpp b/src/zencompute/computeservice.cpp
index 838d741b6..92901de64 100644
--- a/src/zencompute/computeservice.cpp
+++ b/src/zencompute/computeservice.cpp
@@ -33,6 +33,7 @@
# include <zenutil/workerpools.h>
# include <zentelemetry/stats.h>
# include <zenhttp/httpclient.h>
+# include <zenhttp/httpwsclient.h>
# include <set>
# include <deque>
@@ -42,6 +43,7 @@
# include <unordered_set>
ZEN_THIRD_PARTY_INCLUDES_START
+# include <EASTL/fixed_vector.h>
# include <EASTL/hash_set.h>
ZEN_THIRD_PARTY_INCLUDES_END
@@ -95,6 +97,14 @@ using SessionState = ComputeServiceSession::SessionState;
static_assert(ZEN_ARRAY_COUNT(ComputeServiceSession::ActionHistoryEntry::Timestamps) == static_cast<size_t>(RunnerAction::State::_Count));
+static ComputeServiceSession::EnqueueResult
+MakeErrorResult(std::string_view Error)
+{
+ CbObjectWriter Writer;
+ Writer << "error"sv << Error;
+ return {0, Writer.Save()};
+}
+
//////////////////////////////////////////////////////////////////////////
struct ComputeServiceSession::Impl
@@ -130,14 +140,40 @@ struct ComputeServiceSession::Impl
void SetOrchestratorEndpoint(std::string_view Endpoint);
void SetOrchestratorBasePath(std::filesystem::path BasePath);
+ void NotifyOrchestratorChanged();
std::string m_OrchestratorEndpoint;
std::filesystem::path m_OrchestratorBasePath;
Stopwatch m_OrchestratorQueryTimer;
+ std::atomic<bool> m_OrchestratorQueryForced{false};
std::unordered_set<std::string> m_KnownWorkerUris;
void UpdateCoordinatorState();
+ // WebSocket subscription to orchestrator push notifications
+ struct OrchestratorWsHandler : public IWsClientHandler
+ {
+ Impl& Owner;
+
+ explicit OrchestratorWsHandler(Impl& InOwner) : Owner(InOwner) {}
+
+ void OnWsOpen() override
+ {
+ ZEN_LOG_INFO(Owner.m_Log, "orchestrator WebSocket connected");
+ Owner.NotifyOrchestratorChanged();
+ }
+
+ void OnWsMessage(const WebSocketMessage&) override { Owner.NotifyOrchestratorChanged(); }
+
+ void OnWsClose(uint16_t Code, std::string_view Reason) override
+ {
+ ZEN_LOG_WARN(Owner.m_Log, "orchestrator WebSocket closed (code {}: {})", Code, Reason);
+ }
+ };
+
+ std::unique_ptr<OrchestratorWsHandler> m_OrchestratorWsHandler;
+ std::unique_ptr<HttpWsClient> m_OrchestratorWsClient;
+
// Worker registration and discovery
struct FunctionDefinition
@@ -157,6 +193,8 @@ struct ComputeServiceSession::Impl
std::atomic<int32_t> m_ActionsCounter = 0; // sequence number
metrics::Meter m_ArrivalRate;
+ std::atomic<IComputeCompletionObserver*> m_CompletionObserver{nullptr};
+
RwLock m_PendingLock;
std::map<int, Ref<RunnerAction>> m_PendingActions;
@@ -267,6 +305,8 @@ struct ComputeServiceSession::Impl
void DrainQueue(int QueueId);
ComputeServiceSession::EnqueueResult EnqueueActionToQueue(int QueueId, CbObject ActionObject, int Priority);
ComputeServiceSession::EnqueueResult EnqueueResolvedActionToQueue(int QueueId, WorkerDesc Worker, CbObject ActionObj, int Priority);
+ ComputeServiceSession::EnqueueResult ValidateQueueForEnqueue(int QueueId, Ref<QueueEntry>& OutQueue);
+ void ActivateActionInQueue(const Ref<QueueEntry>& Queue, int Lsn);
void GetQueueCompleted(int QueueId, CbWriter& Cbo);
void NotifyQueueActionComplete(int QueueId, int Lsn, RunnerAction::State ActionState);
void ExpireCompletedQueues();
@@ -292,11 +332,13 @@ struct ComputeServiceSession::Impl
void HandleActionUpdates();
void PostUpdate(RunnerAction* Action);
+ void RemoveActionFromActiveMaps(int ActionLsn);
static constexpr int kDefaultMaxRetries = 3;
int GetMaxRetriesForQueue(int QueueId);
ComputeServiceSession::RescheduleResult RescheduleAction(int ActionLsn);
+ ComputeServiceSession::RescheduleResult RetractAction(int ActionLsn);
ActionCounts GetActionCounts()
{
@@ -449,6 +491,28 @@ void
ComputeServiceSession::Impl::SetOrchestratorEndpoint(std::string_view Endpoint)
{
m_OrchestratorEndpoint = Endpoint;
+
+ // Subscribe to orchestrator WebSocket push so we discover worker changes
+ // immediately instead of waiting for the next polling cycle.
+ try
+ {
+ std::string WsUrl = HttpToWsUrl(Endpoint, "/orch/ws");
+
+ m_OrchestratorWsHandler = std::make_unique<OrchestratorWsHandler>(*this);
+
+ HttpWsClientSettings WsSettings;
+ WsSettings.LogCategory = "orch_disc_ws";
+ WsSettings.ConnectTimeout = std::chrono::milliseconds{3000};
+
+ m_OrchestratorWsClient = std::make_unique<HttpWsClient>(WsUrl, *m_OrchestratorWsHandler, WsSettings);
+ m_OrchestratorWsClient->Connect();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("failed to connect orchestrator WebSocket, falling back to polling: {}", Ex.what());
+ m_OrchestratorWsClient.reset();
+ m_OrchestratorWsHandler.reset();
+ }
}
void
@@ -458,6 +522,13 @@ ComputeServiceSession::Impl::SetOrchestratorBasePath(std::filesystem::path BaseP
}
void
+ComputeServiceSession::Impl::NotifyOrchestratorChanged()
+{
+ m_OrchestratorQueryForced.store(true, std::memory_order_relaxed);
+ m_SchedulingThreadEvent.Set();
+}
+
+void
ComputeServiceSession::Impl::UpdateCoordinatorState()
{
ZEN_TRACE_CPU("ComputeServiceSession::UpdateCoordinatorState");
@@ -467,10 +538,14 @@ ComputeServiceSession::Impl::UpdateCoordinatorState()
}
// Poll faster when we have no discovered workers yet so remote runners come online quickly
- const uint64_t PollIntervalMs = m_KnownWorkerUris.empty() ? 500 : 5000;
- if (m_OrchestratorQueryTimer.GetElapsedTimeMs() < PollIntervalMs)
+ const bool Forced = m_OrchestratorQueryForced.exchange(false, std::memory_order_relaxed);
+ if (!Forced)
{
- return;
+ const uint64_t PollIntervalMs = m_KnownWorkerUris.empty() ? 500 : 5000;
+ if (m_OrchestratorQueryTimer.GetElapsedTimeMs() < PollIntervalMs)
+ {
+ return;
+ }
}
m_OrchestratorQueryTimer.Reset();
@@ -520,7 +595,24 @@ ComputeServiceSession::Impl::UpdateCoordinatorState()
continue;
}
- ZEN_INFO("discovered new worker at {}", UriStr);
+ std::string_view Hostname = Worker["hostname"sv].AsString();
+ std::string_view Platform = Worker["platform"sv].AsString();
+ int Cpus = Worker["cpus"sv].AsInt32();
+ uint64_t MemTotal = Worker["memory_total"sv].AsUInt64();
+
+ if (!Hostname.empty())
+ {
+ ZEN_INFO("discovered new worker at {} ({}, {}, {} cpus, {:.1f} GB)",
+ UriStr,
+ Hostname,
+ Platform,
+ Cpus,
+ static_cast<double>(MemTotal) / (1024.0 * 1024.0 * 1024.0));
+ }
+ else
+ {
+ ZEN_INFO("discovered new worker at {}", UriStr);
+ }
m_KnownWorkerUris.insert(UriStr);
@@ -598,6 +690,15 @@ ComputeServiceSession::Impl::Shutdown()
{
RequestStateTransition(SessionState::Sunset);
+ // Close orchestrator WebSocket before stopping the scheduler thread
+ // to prevent callbacks into a shutting-down scheduler.
+ if (m_OrchestratorWsClient)
+ {
+ m_OrchestratorWsClient->Close();
+ m_OrchestratorWsClient.reset();
+ }
+ m_OrchestratorWsHandler.reset();
+
m_SchedulingThreadEnabled = false;
m_SchedulingThreadEvent.Set();
if (m_SchedulingThread.joinable())
@@ -720,8 +821,14 @@ ComputeServiceSession::Impl::RegisterWorker(CbPackage Worker)
// different descriptor. Thus we only need to call this the first time, when the
// worker is added
- m_LocalRunnerGroup.RegisterWorker(Worker);
- m_RemoteRunnerGroup.RegisterWorker(Worker);
+ if (!m_LocalRunnerGroup.RegisterWorker(Worker))
+ {
+ ZEN_WARN("failed to register worker {} on one or more local runners", WorkerId);
+ }
+ if (!m_RemoteRunnerGroup.RegisterWorker(Worker))
+ {
+ ZEN_WARN("failed to register worker {} on one or more remote runners", WorkerId);
+ }
if (m_Recorder)
{
@@ -767,7 +874,10 @@ ComputeServiceSession::Impl::SyncWorkersToRunner(FunctionRunner& Runner)
for (const CbPackage& Worker : Workers)
{
- Runner.RegisterWorker(Worker);
+ if (!Runner.RegisterWorker(Worker))
+ {
+ ZEN_WARN("failed to sync worker {} to runner", Worker.GetObjectHash());
+ }
}
}
@@ -868,9 +978,7 @@ ComputeServiceSession::Impl::EnqueueResolvedAction(int QueueId, WorkerDesc Worke
if (m_SessionState.load(std::memory_order_relaxed) != SessionState::Ready)
{
- CbObjectWriter Writer;
- Writer << "error"sv << fmt::format("session is not accepting actions (state: {})", ToString(m_SessionState.load()));
- return {0, Writer.Save()};
+ return MakeErrorResult(fmt::format("session is not accepting actions (state: {})", ToString(m_SessionState.load())));
}
const int ActionLsn = ++m_ActionsCounter;
@@ -1258,42 +1366,51 @@ ComputeServiceSession::Impl::DrainQueue(int QueueId)
}
ComputeServiceSession::EnqueueResult
-ComputeServiceSession::Impl::EnqueueActionToQueue(int QueueId, CbObject ActionObject, int Priority)
+ComputeServiceSession::Impl::ValidateQueueForEnqueue(int QueueId, Ref<QueueEntry>& OutQueue)
{
- Ref<QueueEntry> Queue = FindQueue(QueueId);
+ OutQueue = FindQueue(QueueId);
- if (!Queue)
+ if (!OutQueue)
{
- CbObjectWriter Writer;
- Writer << "error"sv
- << "queue not found"sv;
- return {0, Writer.Save()};
+ return MakeErrorResult("queue not found"sv);
}
- QueueState QState = Queue->State.load();
+ QueueState QState = OutQueue->State.load();
if (QState == QueueState::Cancelled)
{
- CbObjectWriter Writer;
- Writer << "error"sv
- << "queue is cancelled"sv;
- return {0, Writer.Save()};
+ return MakeErrorResult("queue is cancelled"sv);
}
if (QState == QueueState::Draining)
{
- CbObjectWriter Writer;
- Writer << "error"sv
- << "queue is draining"sv;
- return {0, Writer.Save()};
+ return MakeErrorResult("queue is draining"sv);
+ }
+
+ return {};
+}
+
+void
+ComputeServiceSession::Impl::ActivateActionInQueue(const Ref<QueueEntry>& Queue, int Lsn)
+{
+ Queue->m_Lock.WithExclusiveLock([&] { Queue->ActiveLsns.insert(Lsn); });
+ Queue->ActiveCount.fetch_add(1, std::memory_order_relaxed);
+ Queue->IdleSince.store(0, std::memory_order_relaxed);
+}
+
+ComputeServiceSession::EnqueueResult
+ComputeServiceSession::Impl::EnqueueActionToQueue(int QueueId, CbObject ActionObject, int Priority)
+{
+ Ref<QueueEntry> Queue;
+ if (EnqueueResult Error = ValidateQueueForEnqueue(QueueId, Queue); Error.ResponseMessage)
+ {
+ return Error;
}
EnqueueResult Result = EnqueueAction(QueueId, ActionObject, Priority);
if (Result.Lsn != 0)
{
- Queue->m_Lock.WithExclusiveLock([&] { Queue->ActiveLsns.insert(Result.Lsn); });
- Queue->ActiveCount.fetch_add(1, std::memory_order_relaxed);
- Queue->IdleSince.store(0, std::memory_order_relaxed);
+ ActivateActionInQueue(Queue, Result.Lsn);
}
return Result;
@@ -1302,40 +1419,17 @@ ComputeServiceSession::Impl::EnqueueActionToQueue(int QueueId, CbObject ActionOb
ComputeServiceSession::EnqueueResult
ComputeServiceSession::Impl::EnqueueResolvedActionToQueue(int QueueId, WorkerDesc Worker, CbObject ActionObj, int Priority)
{
- Ref<QueueEntry> Queue = FindQueue(QueueId);
-
- if (!Queue)
- {
- CbObjectWriter Writer;
- Writer << "error"sv
- << "queue not found"sv;
- return {0, Writer.Save()};
- }
-
- QueueState QState = Queue->State.load();
- if (QState == QueueState::Cancelled)
+ Ref<QueueEntry> Queue;
+ if (EnqueueResult Error = ValidateQueueForEnqueue(QueueId, Queue); Error.ResponseMessage)
{
- CbObjectWriter Writer;
- Writer << "error"sv
- << "queue is cancelled"sv;
- return {0, Writer.Save()};
- }
-
- if (QState == QueueState::Draining)
- {
- CbObjectWriter Writer;
- Writer << "error"sv
- << "queue is draining"sv;
- return {0, Writer.Save()};
+ return Error;
}
EnqueueResult Result = EnqueueResolvedAction(QueueId, Worker, ActionObj, Priority);
if (Result.Lsn != 0)
{
- Queue->m_Lock.WithExclusiveLock([&] { Queue->ActiveLsns.insert(Result.Lsn); });
- Queue->ActiveCount.fetch_add(1, std::memory_order_relaxed);
- Queue->IdleSince.store(0, std::memory_order_relaxed);
+ ActivateActionInQueue(Queue, Result.Lsn);
}
return Result;
@@ -1770,6 +1864,68 @@ ComputeServiceSession::Impl::RescheduleAction(int ActionLsn)
return {.Success = true, .RetryCount = NewRetryCount};
}
+ComputeServiceSession::RescheduleResult
+ComputeServiceSession::Impl::RetractAction(int ActionLsn)
+{
+ Ref<RunnerAction> Action;
+ bool WasRunning = false;
+
+ // Look for the action in pending or running maps
+ m_RunningLock.WithSharedLock([&] {
+ if (auto It = m_RunningMap.find(ActionLsn); It != m_RunningMap.end())
+ {
+ Action = It->second;
+ WasRunning = true;
+ }
+ });
+
+ if (!Action)
+ {
+ m_PendingLock.WithSharedLock([&] {
+ if (auto It = m_PendingActions.find(ActionLsn); It != m_PendingActions.end())
+ {
+ Action = It->second;
+ }
+ });
+ }
+
+ if (!Action)
+ {
+ return {.Success = false, .Error = "Action not found in pending or running maps"};
+ }
+
+ if (!Action->RetractAction())
+ {
+ return {.Success = false, .Error = "Action cannot be retracted from its current state"};
+ }
+
+ // If the action was running, send a cancellation signal to the runner
+ if (WasRunning)
+ {
+ m_LocalRunnerGroup.CancelAction(ActionLsn);
+ }
+
+ ZEN_INFO("action {} ({}) retract requested", Action->ActionId, ActionLsn);
+ return {.Success = true, .RetryCount = Action->RetryCount.load(std::memory_order_relaxed)};
+}
+
+void
+ComputeServiceSession::Impl::RemoveActionFromActiveMaps(int ActionLsn)
+{
+ m_RunningLock.WithExclusiveLock([&] {
+ m_PendingLock.WithExclusiveLock([&] {
+ if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end())
+ {
+ m_PendingActions.erase(ActionLsn);
+ }
+ else
+ {
+ m_RunningMap.erase(FindIt);
+ }
+ });
+ });
+}
+
void
ComputeServiceSession::Impl::HandleActionUpdates()
{
@@ -1781,6 +1937,10 @@ ComputeServiceSession::Impl::HandleActionUpdates()
std::unordered_set<int> SeenLsn;
+ // Collect terminal action notifications for the completion observer.
+ // Inline capacity of 64 avoids heap allocation in the common case.
+ eastl::fixed_vector<IComputeCompletionObserver::CompletedActionNotification, 64> TerminalBatch;
+
// Process each action's latest state, deduplicating by LSN.
//
// This is safe because state transitions are monotonically increasing by enum
@@ -1798,7 +1958,23 @@ ComputeServiceSession::Impl::HandleActionUpdates()
{
// Newly enqueued — add to pending map for scheduling
case RunnerAction::State::Pending:
- m_PendingLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Action}); });
+ // Guard against a race where the session is abandoned between
+ // EnqueueAction (which calls PostUpdate) and this scheduler
+ // tick. AbandonAllActions() only scans m_PendingActions, so it
+ // misses actions still in m_UpdatedActions at the time the
+ // session transitions. Detect that here and immediately abandon
+ // rather than inserting into the pending map, where they would
+ // otherwise be stuck indefinitely.
+ if (m_SessionState.load(std::memory_order_relaxed) >= SessionState::Abandoned)
+ {
+ Action->SetActionState(RunnerAction::State::Abandoned);
+ // SetActionState calls PostUpdate; the Abandoned action
+ // will be processed as a terminal on the next scheduler pass.
+ }
+ else
+ {
+ m_PendingLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Action}); });
+ }
break;
// Async submission in progress — remains in pending map
@@ -1816,6 +1992,15 @@ ComputeServiceSession::Impl::HandleActionUpdates()
ZEN_DEBUG("action {} ({}) RUNNING", Action->ActionId, ActionLsn);
break;
+ // Retracted — pull back for rescheduling without counting against retry limit
+ case RunnerAction::State::Retracted:
+ {
+ RemoveActionFromActiveMaps(ActionLsn);
+ Action->ResetActionStateToPending();
+ ZEN_INFO("action {} ({}) retracted for rescheduling", Action->ActionId, ActionLsn);
+ break;
+ }
+
// Terminal states — move to results, record history, notify queue
case RunnerAction::State::Completed:
case RunnerAction::State::Failed:
@@ -1834,19 +2019,7 @@ ComputeServiceSession::Impl::HandleActionUpdates()
if (Action->RetryCount.load(std::memory_order_relaxed) < MaxRetries)
{
- // Remove from whichever active map the action is in before resetting
- m_RunningLock.WithExclusiveLock([&] {
- m_PendingLock.WithExclusiveLock([&] {
- if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end())
- {
- m_PendingActions.erase(ActionLsn);
- }
- else
- {
- m_RunningMap.erase(FindIt);
- }
- });
- });
+ RemoveActionFromActiveMaps(ActionLsn);
// Reset triggers PostUpdate() which re-enters the action as Pending
Action->ResetActionStateToPending();
@@ -1861,19 +2034,14 @@ ComputeServiceSession::Impl::HandleActionUpdates()
}
}
- // Remove from whichever active map the action is in
- m_RunningLock.WithExclusiveLock([&] {
- m_PendingLock.WithExclusiveLock([&] {
- if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end())
- {
- m_PendingActions.erase(ActionLsn);
- }
- else
- {
- m_RunningMap.erase(FindIt);
- }
- });
- });
+ RemoveActionFromActiveMaps(ActionLsn);
+
+ // Update queue counters BEFORE publishing the result into
+ // m_ResultsMap. GetActionResult erases from m_ResultsMap
+ // under m_ResultsLock, so if we updated counters after
+ // releasing that lock, a caller could observe ActiveCount
+ // still at 1 immediately after GetActionResult returned OK.
+ NotifyQueueActionComplete(Action->QueueId, ActionLsn, TerminalState);
m_ResultsLock.WithExclusiveLock([&] {
m_ResultsMap[ActionLsn] = Action;
@@ -1902,16 +2070,46 @@ ComputeServiceSession::Impl::HandleActionUpdates()
});
m_RetiredCount.fetch_add(1);
m_ResultRate.Mark(1);
+ {
+ using ObserverState = IComputeCompletionObserver::ActionState;
+ ObserverState NotifyState{};
+ switch (TerminalState)
+ {
+ case RunnerAction::State::Completed:
+ NotifyState = ObserverState::Completed;
+ break;
+ case RunnerAction::State::Failed:
+ NotifyState = ObserverState::Failed;
+ break;
+ case RunnerAction::State::Abandoned:
+ NotifyState = ObserverState::Abandoned;
+ break;
+ case RunnerAction::State::Cancelled:
+ NotifyState = ObserverState::Cancelled;
+ break;
+ default:
+ break;
+ }
+ TerminalBatch.push_back({ActionLsn, NotifyState});
+ }
ZEN_DEBUG("action {} ({}) RUNNING -> COMPLETED with {}",
Action->ActionId,
ActionLsn,
TerminalState == RunnerAction::State::Completed ? "SUCCESS" : "FAILURE");
- NotifyQueueActionComplete(Action->QueueId, ActionLsn, TerminalState);
break;
}
}
}
}
+
+ // Notify the completion observer, if any, about all terminal actions in this batch.
+ if (!TerminalBatch.empty())
+ {
+ if (IComputeCompletionObserver* Observer = m_CompletionObserver.load(std::memory_order_acquire))
+ {
+ Observer->OnActionsCompleted({TerminalBatch.data(), TerminalBatch.size()});
+ }
+ }
}
size_t
@@ -2014,6 +2212,12 @@ ComputeServiceSession::SetOrchestratorBasePath(std::filesystem::path BasePath)
}
void
+ComputeServiceSession::NotifyOrchestratorChanged()
+{
+ m_Impl->NotifyOrchestratorChanged();
+}
+
+void
ComputeServiceSession::StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath)
{
m_Impl->StartRecording(InResolver, RecordingPath);
@@ -2194,6 +2398,12 @@ ComputeServiceSession::RescheduleAction(int ActionLsn)
return m_Impl->RescheduleAction(ActionLsn);
}
+ComputeServiceSession::RescheduleResult
+ComputeServiceSession::RetractAction(int ActionLsn)
+{
+ return m_Impl->RetractAction(ActionLsn);
+}
+
std::vector<ComputeServiceSession::RunningActionInfo>
ComputeServiceSession::GetRunningActions()
{
@@ -2219,6 +2429,12 @@ ComputeServiceSession::GetCompleted(CbWriter& Cbo)
}
void
+ComputeServiceSession::SetCompletionObserver(IComputeCompletionObserver* Observer)
+{
+ m_Impl->m_CompletionObserver.store(Observer, std::memory_order_release);
+}
+
+void
ComputeServiceSession::PostUpdate(RunnerAction* Action)
{
m_Impl->PostUpdate(Action);