diff options
| author | Stefan Boberg <[email protected]> | 2026-03-18 11:19:10 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-18 11:19:10 +0100 |
| commit | eba410c4168e23d7908827eb34b7cf0c58a5dc48 (patch) | |
| tree | 3cda8e8f3f81941d3bb5b84a8155350c5bb2068c /src/zencompute/computeservice.cpp | |
| parent | bugfix release - v5.7.23 (#851) (diff) | |
| download | zen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.tar.xz zen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.zip | |
Compute batching (#849)
### Compute Batch Submission
- Consolidate duplicated action submission logic in `httpcomputeservice` into a single `HandleSubmitAction` supporting both single-action and batch (actions array) payloads
- Group actions by queue in `RemoteHttpRunner` and submit as batches with configurable chunk size, falling back to individual submission on failure
- Extract shared helpers: `MakeErrorResult`, `ValidateQueueForEnqueue`, `ActivateActionInQueue`, `RemoveActionFromActiveMaps`
### Retracted Action State
- Add `Retracted` state to `RunnerAction` for retry-free rescheduling — an explicit request to pull an action back and reschedule it on a different runner without incrementing `RetryCount`
- Implement idempotent `RetractAction()` on `RunnerAction` and `ComputeServiceSession`
- Add `POST jobs/{lsn}/retract` and `queues/{queueref}/jobs/{lsn}/retract` HTTP endpoints
- Add state machine documentation and per-state comments to `RunnerAction`
### Compute Race Fixes
- Fix race in `HandleActionUpdates` where actions enqueued between session abandon and scheduler tick were never abandoned, causing `GetActionResult` to return 202 indefinitely
- Fix queue `ActiveCount` race where `NotifyQueueActionComplete` was called after releasing `m_ResultsLock`, allowing callers to observe stale counters immediately after `GetActionResult` returned OK
### Logging Optimization and ANSI improvements
- Improve `AnsiColorStdoutSink` write efficiency — single write call, dirty-flag flush, `RwLock` instead of `std::mutex`
- Move ANSI color emission from sink into formatters via `Formatter::SetColorEnabled()`; remove `ColorRangeStart`/`End` from `LogMessage`
- Extract color helpers (`AnsiColorForLevel`, `StripAnsiSgrSequences`) into `helpers.h`
- Strip upstream ANSI SGR escapes in non-color output mode. This enables colour in log messages without polluting log files with ANSI control sequences
- Move `RotatingFileSink`, `JsonFormatter`, and `FullFormatter` from header-only to pimpl with `.cpp` files
### CLI / Exec Refactoring
- Extract `ExecSessionRunner` class from ~920-line `ExecUsingSession` into focused methods and a `ExecSessionConfig` struct
- Replace monolithic `ExecCommand` with subcommand-based architecture (`http`, `inproc`, `beacon`, `dump`, `buildlog`)
- Allow parent options to appear after subcommand name by parsing subcommand args permissively and forwarding unmatched tokens to the parent parser
### Testing Improvements
- Fix `--test-suite` filter being ignored due to accumulation with default wildcard filter
- Add test suite banners to test listener output
- Made `function.session.abandon_pending` test more robust
### Startup / Reliability Fixes
- Fix silent exit when a second zenserver instance detects a port conflict — use `ZEN_CONSOLE_*` for log calls that precede `InitializeLogging()`
- Fix two potential SIGSEGV paths during early startup: guard `sentry_options_new()` returning nullptr, and throw on `ZenServerState::Register()` returning nullptr instead of dereferencing
- Fail on unrecognized zenserver `--mode` instead of silently defaulting to store
### Other
- Show host details (hostname, platform, CPU count, memory) when discovering new compute workers
- Move frontend `html.zip` from source tree into build directory
- Add format specifications for Compact Binary and Compressed Buffer wire formats
- Add `WriteCompactBinaryObject` to zencore
- Extended `ConsoleTui` with additional functionality
- Add `--vscode` option to `xmake sln` for clangd / `compile_commands.json` support
- Disable compute/horde/nomad in release builds (not yet production-ready)
- Disable unintended `ASIO_HAS_IO_URING` enablement
- Fix crashpad patch missing leading whitespace
- Clean up code triggering gcc false positives
Diffstat (limited to 'src/zencompute/computeservice.cpp')
| -rw-r--r-- | src/zencompute/computeservice.cpp | 384 |
1 files changed, 300 insertions, 84 deletions
diff --git a/src/zencompute/computeservice.cpp b/src/zencompute/computeservice.cpp index 838d741b6..92901de64 100644 --- a/src/zencompute/computeservice.cpp +++ b/src/zencompute/computeservice.cpp @@ -33,6 +33,7 @@ # include <zenutil/workerpools.h> # include <zentelemetry/stats.h> # include <zenhttp/httpclient.h> +# include <zenhttp/httpwsclient.h> # include <set> # include <deque> @@ -42,6 +43,7 @@ # include <unordered_set> ZEN_THIRD_PARTY_INCLUDES_START +# include <EASTL/fixed_vector.h> # include <EASTL/hash_set.h> ZEN_THIRD_PARTY_INCLUDES_END @@ -95,6 +97,14 @@ using SessionState = ComputeServiceSession::SessionState; static_assert(ZEN_ARRAY_COUNT(ComputeServiceSession::ActionHistoryEntry::Timestamps) == static_cast<size_t>(RunnerAction::State::_Count)); +static ComputeServiceSession::EnqueueResult +MakeErrorResult(std::string_view Error) +{ + CbObjectWriter Writer; + Writer << "error"sv << Error; + return {0, Writer.Save()}; +} + ////////////////////////////////////////////////////////////////////////// struct ComputeServiceSession::Impl @@ -130,14 +140,40 @@ struct ComputeServiceSession::Impl void SetOrchestratorEndpoint(std::string_view Endpoint); void SetOrchestratorBasePath(std::filesystem::path BasePath); + void NotifyOrchestratorChanged(); std::string m_OrchestratorEndpoint; std::filesystem::path m_OrchestratorBasePath; Stopwatch m_OrchestratorQueryTimer; + std::atomic<bool> m_OrchestratorQueryForced{false}; std::unordered_set<std::string> m_KnownWorkerUris; void UpdateCoordinatorState(); + // WebSocket subscription to orchestrator push notifications + struct OrchestratorWsHandler : public IWsClientHandler + { + Impl& Owner; + + explicit OrchestratorWsHandler(Impl& InOwner) : Owner(InOwner) {} + + void OnWsOpen() override + { + ZEN_LOG_INFO(Owner.m_Log, "orchestrator WebSocket connected"); + Owner.NotifyOrchestratorChanged(); + } + + void OnWsMessage(const WebSocketMessage&) override { Owner.NotifyOrchestratorChanged(); } + + void OnWsClose(uint16_t Code, std::string_view Reason) override + { + ZEN_LOG_WARN(Owner.m_Log, "orchestrator WebSocket closed (code {}: {})", Code, Reason); + } + }; + + std::unique_ptr<OrchestratorWsHandler> m_OrchestratorWsHandler; + std::unique_ptr<HttpWsClient> m_OrchestratorWsClient; + // Worker registration and discovery struct FunctionDefinition @@ -157,6 +193,8 @@ struct ComputeServiceSession::Impl std::atomic<int32_t> m_ActionsCounter = 0; // sequence number metrics::Meter m_ArrivalRate; + std::atomic<IComputeCompletionObserver*> m_CompletionObserver{nullptr}; + RwLock m_PendingLock; std::map<int, Ref<RunnerAction>> m_PendingActions; @@ -267,6 +305,8 @@ struct ComputeServiceSession::Impl void DrainQueue(int QueueId); ComputeServiceSession::EnqueueResult EnqueueActionToQueue(int QueueId, CbObject ActionObject, int Priority); ComputeServiceSession::EnqueueResult EnqueueResolvedActionToQueue(int QueueId, WorkerDesc Worker, CbObject ActionObj, int Priority); + ComputeServiceSession::EnqueueResult ValidateQueueForEnqueue(int QueueId, Ref<QueueEntry>& OutQueue); + void ActivateActionInQueue(const Ref<QueueEntry>& Queue, int Lsn); void GetQueueCompleted(int QueueId, CbWriter& Cbo); void NotifyQueueActionComplete(int QueueId, int Lsn, RunnerAction::State ActionState); void ExpireCompletedQueues(); @@ -292,11 +332,13 @@ struct ComputeServiceSession::Impl void HandleActionUpdates(); void PostUpdate(RunnerAction* Action); + void RemoveActionFromActiveMaps(int ActionLsn); static constexpr int kDefaultMaxRetries = 3; int GetMaxRetriesForQueue(int QueueId); ComputeServiceSession::RescheduleResult RescheduleAction(int ActionLsn); + ComputeServiceSession::RescheduleResult RetractAction(int ActionLsn); ActionCounts GetActionCounts() { @@ -449,6 +491,28 @@ void ComputeServiceSession::Impl::SetOrchestratorEndpoint(std::string_view Endpoint) { m_OrchestratorEndpoint = Endpoint; + + // Subscribe to orchestrator WebSocket push so we discover worker changes + // immediately instead of waiting for the next polling cycle. + try + { + std::string WsUrl = HttpToWsUrl(Endpoint, "/orch/ws"); + + m_OrchestratorWsHandler = std::make_unique<OrchestratorWsHandler>(*this); + + HttpWsClientSettings WsSettings; + WsSettings.LogCategory = "orch_disc_ws"; + WsSettings.ConnectTimeout = std::chrono::milliseconds{3000}; + + m_OrchestratorWsClient = std::make_unique<HttpWsClient>(WsUrl, *m_OrchestratorWsHandler, WsSettings); + m_OrchestratorWsClient->Connect(); + } + catch (const std::exception& Ex) + { + ZEN_WARN("failed to connect orchestrator WebSocket, falling back to polling: {}", Ex.what()); + m_OrchestratorWsClient.reset(); + m_OrchestratorWsHandler.reset(); + } } void @@ -458,6 +522,13 @@ ComputeServiceSession::Impl::SetOrchestratorBasePath(std::filesystem::path BaseP } void +ComputeServiceSession::Impl::NotifyOrchestratorChanged() +{ + m_OrchestratorQueryForced.store(true, std::memory_order_relaxed); + m_SchedulingThreadEvent.Set(); +} + +void ComputeServiceSession::Impl::UpdateCoordinatorState() { ZEN_TRACE_CPU("ComputeServiceSession::UpdateCoordinatorState"); @@ -467,10 +538,14 @@ ComputeServiceSession::Impl::UpdateCoordinatorState() } // Poll faster when we have no discovered workers yet so remote runners come online quickly - const uint64_t PollIntervalMs = m_KnownWorkerUris.empty() ? 500 : 5000; - if (m_OrchestratorQueryTimer.GetElapsedTimeMs() < PollIntervalMs) + const bool Forced = m_OrchestratorQueryForced.exchange(false, std::memory_order_relaxed); + if (!Forced) { - return; + const uint64_t PollIntervalMs = m_KnownWorkerUris.empty() ? 500 : 5000; + if (m_OrchestratorQueryTimer.GetElapsedTimeMs() < PollIntervalMs) + { + return; + } } m_OrchestratorQueryTimer.Reset(); @@ -520,7 +595,24 @@ ComputeServiceSession::Impl::UpdateCoordinatorState() continue; } - ZEN_INFO("discovered new worker at {}", UriStr); + std::string_view Hostname = Worker["hostname"sv].AsString(); + std::string_view Platform = Worker["platform"sv].AsString(); + int Cpus = Worker["cpus"sv].AsInt32(); + uint64_t MemTotal = Worker["memory_total"sv].AsUInt64(); + + if (!Hostname.empty()) + { + ZEN_INFO("discovered new worker at {} ({}, {}, {} cpus, {:.1f} GB)", + UriStr, + Hostname, + Platform, + Cpus, + static_cast<double>(MemTotal) / (1024.0 * 1024.0 * 1024.0)); + } + else + { + ZEN_INFO("discovered new worker at {}", UriStr); + } m_KnownWorkerUris.insert(UriStr); @@ -598,6 +690,15 @@ ComputeServiceSession::Impl::Shutdown() { RequestStateTransition(SessionState::Sunset); + // Close orchestrator WebSocket before stopping the scheduler thread + // to prevent callbacks into a shutting-down scheduler. + if (m_OrchestratorWsClient) + { + m_OrchestratorWsClient->Close(); + m_OrchestratorWsClient.reset(); + } + m_OrchestratorWsHandler.reset(); + m_SchedulingThreadEnabled = false; m_SchedulingThreadEvent.Set(); if (m_SchedulingThread.joinable()) @@ -720,8 +821,14 @@ ComputeServiceSession::Impl::RegisterWorker(CbPackage Worker) // different descriptor. Thus we only need to call this the first time, when the // worker is added - m_LocalRunnerGroup.RegisterWorker(Worker); - m_RemoteRunnerGroup.RegisterWorker(Worker); + if (!m_LocalRunnerGroup.RegisterWorker(Worker)) + { + ZEN_WARN("failed to register worker {} on one or more local runners", WorkerId); + } + if (!m_RemoteRunnerGroup.RegisterWorker(Worker)) + { + ZEN_WARN("failed to register worker {} on one or more remote runners", WorkerId); + } if (m_Recorder) { @@ -767,7 +874,10 @@ ComputeServiceSession::Impl::SyncWorkersToRunner(FunctionRunner& Runner) for (const CbPackage& Worker : Workers) { - Runner.RegisterWorker(Worker); + if (!Runner.RegisterWorker(Worker)) + { + ZEN_WARN("failed to sync worker {} to runner", Worker.GetObjectHash()); + } } } @@ -868,9 +978,7 @@ ComputeServiceSession::Impl::EnqueueResolvedAction(int QueueId, WorkerDesc Worke if (m_SessionState.load(std::memory_order_relaxed) != SessionState::Ready) { - CbObjectWriter Writer; - Writer << "error"sv << fmt::format("session is not accepting actions (state: {})", ToString(m_SessionState.load())); - return {0, Writer.Save()}; + return MakeErrorResult(fmt::format("session is not accepting actions (state: {})", ToString(m_SessionState.load()))); } const int ActionLsn = ++m_ActionsCounter; @@ -1258,42 +1366,51 @@ ComputeServiceSession::Impl::DrainQueue(int QueueId) } ComputeServiceSession::EnqueueResult -ComputeServiceSession::Impl::EnqueueActionToQueue(int QueueId, CbObject ActionObject, int Priority) +ComputeServiceSession::Impl::ValidateQueueForEnqueue(int QueueId, Ref<QueueEntry>& OutQueue) { - Ref<QueueEntry> Queue = FindQueue(QueueId); + OutQueue = FindQueue(QueueId); - if (!Queue) + if (!OutQueue) { - CbObjectWriter Writer; - Writer << "error"sv - << "queue not found"sv; - return {0, Writer.Save()}; + return MakeErrorResult("queue not found"sv); } - QueueState QState = Queue->State.load(); + QueueState QState = OutQueue->State.load(); if (QState == QueueState::Cancelled) { - CbObjectWriter Writer; - Writer << "error"sv - << "queue is cancelled"sv; - return {0, Writer.Save()}; + return MakeErrorResult("queue is cancelled"sv); } if (QState == QueueState::Draining) { - CbObjectWriter Writer; - Writer << "error"sv - << "queue is draining"sv; - return {0, Writer.Save()}; + return MakeErrorResult("queue is draining"sv); + } + + return {}; +} + +void +ComputeServiceSession::Impl::ActivateActionInQueue(const Ref<QueueEntry>& Queue, int Lsn) +{ + Queue->m_Lock.WithExclusiveLock([&] { Queue->ActiveLsns.insert(Lsn); }); + Queue->ActiveCount.fetch_add(1, std::memory_order_relaxed); + Queue->IdleSince.store(0, std::memory_order_relaxed); +} + +ComputeServiceSession::EnqueueResult +ComputeServiceSession::Impl::EnqueueActionToQueue(int QueueId, CbObject ActionObject, int Priority) +{ + Ref<QueueEntry> Queue; + if (EnqueueResult Error = ValidateQueueForEnqueue(QueueId, Queue); Error.ResponseMessage) + { + return Error; } EnqueueResult Result = EnqueueAction(QueueId, ActionObject, Priority); if (Result.Lsn != 0) { - Queue->m_Lock.WithExclusiveLock([&] { Queue->ActiveLsns.insert(Result.Lsn); }); - Queue->ActiveCount.fetch_add(1, std::memory_order_relaxed); - Queue->IdleSince.store(0, std::memory_order_relaxed); + ActivateActionInQueue(Queue, Result.Lsn); } return Result; @@ -1302,40 +1419,17 @@ ComputeServiceSession::Impl::EnqueueActionToQueue(int QueueId, CbObject ActionOb ComputeServiceSession::EnqueueResult ComputeServiceSession::Impl::EnqueueResolvedActionToQueue(int QueueId, WorkerDesc Worker, CbObject ActionObj, int Priority) { - Ref<QueueEntry> Queue = FindQueue(QueueId); - - if (!Queue) - { - CbObjectWriter Writer; - Writer << "error"sv - << "queue not found"sv; - return {0, Writer.Save()}; - } - - QueueState QState = Queue->State.load(); - if (QState == QueueState::Cancelled) + Ref<QueueEntry> Queue; + if (EnqueueResult Error = ValidateQueueForEnqueue(QueueId, Queue); Error.ResponseMessage) { - CbObjectWriter Writer; - Writer << "error"sv - << "queue is cancelled"sv; - return {0, Writer.Save()}; - } - - if (QState == QueueState::Draining) - { - CbObjectWriter Writer; - Writer << "error"sv - << "queue is draining"sv; - return {0, Writer.Save()}; + return Error; } EnqueueResult Result = EnqueueResolvedAction(QueueId, Worker, ActionObj, Priority); if (Result.Lsn != 0) { - Queue->m_Lock.WithExclusiveLock([&] { Queue->ActiveLsns.insert(Result.Lsn); }); - Queue->ActiveCount.fetch_add(1, std::memory_order_relaxed); - Queue->IdleSince.store(0, std::memory_order_relaxed); + ActivateActionInQueue(Queue, Result.Lsn); } return Result; @@ -1770,6 +1864,68 @@ ComputeServiceSession::Impl::RescheduleAction(int ActionLsn) return {.Success = true, .RetryCount = NewRetryCount}; } +ComputeServiceSession::RescheduleResult +ComputeServiceSession::Impl::RetractAction(int ActionLsn) +{ + Ref<RunnerAction> Action; + bool WasRunning = false; + + // Look for the action in pending or running maps + m_RunningLock.WithSharedLock([&] { + if (auto It = m_RunningMap.find(ActionLsn); It != m_RunningMap.end()) + { + Action = It->second; + WasRunning = true; + } + }); + + if (!Action) + { + m_PendingLock.WithSharedLock([&] { + if (auto It = m_PendingActions.find(ActionLsn); It != m_PendingActions.end()) + { + Action = It->second; + } + }); + } + + if (!Action) + { + return {.Success = false, .Error = "Action not found in pending or running maps"}; + } + + if (!Action->RetractAction()) + { + return {.Success = false, .Error = "Action cannot be retracted from its current state"}; + } + + // If the action was running, send a cancellation signal to the runner + if (WasRunning) + { + m_LocalRunnerGroup.CancelAction(ActionLsn); + } + + ZEN_INFO("action {} ({}) retract requested", Action->ActionId, ActionLsn); + return {.Success = true, .RetryCount = Action->RetryCount.load(std::memory_order_relaxed)}; +} + +void +ComputeServiceSession::Impl::RemoveActionFromActiveMaps(int ActionLsn) +{ + m_RunningLock.WithExclusiveLock([&] { + m_PendingLock.WithExclusiveLock([&] { + if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end()) + { + m_PendingActions.erase(ActionLsn); + } + else + { + m_RunningMap.erase(FindIt); + } + }); + }); +} + void ComputeServiceSession::Impl::HandleActionUpdates() { @@ -1781,6 +1937,10 @@ ComputeServiceSession::Impl::HandleActionUpdates() std::unordered_set<int> SeenLsn; + // Collect terminal action notifications for the completion observer. + // Inline capacity of 64 avoids heap allocation in the common case. + eastl::fixed_vector<IComputeCompletionObserver::CompletedActionNotification, 64> TerminalBatch; + // Process each action's latest state, deduplicating by LSN. // // This is safe because state transitions are monotonically increasing by enum @@ -1798,7 +1958,23 @@ ComputeServiceSession::Impl::HandleActionUpdates() { // Newly enqueued — add to pending map for scheduling case RunnerAction::State::Pending: - m_PendingLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Action}); }); + // Guard against a race where the session is abandoned between + // EnqueueAction (which calls PostUpdate) and this scheduler + // tick. AbandonAllActions() only scans m_PendingActions, so it + // misses actions still in m_UpdatedActions at the time the + // session transitions. Detect that here and immediately abandon + // rather than inserting into the pending map, where they would + // otherwise be stuck indefinitely. + if (m_SessionState.load(std::memory_order_relaxed) >= SessionState::Abandoned) + { + Action->SetActionState(RunnerAction::State::Abandoned); + // SetActionState calls PostUpdate; the Abandoned action + // will be processed as a terminal on the next scheduler pass. + } + else + { + m_PendingLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Action}); }); + } break; // Async submission in progress — remains in pending map @@ -1816,6 +1992,15 @@ ComputeServiceSession::Impl::HandleActionUpdates() ZEN_DEBUG("action {} ({}) RUNNING", Action->ActionId, ActionLsn); break; + // Retracted — pull back for rescheduling without counting against retry limit + case RunnerAction::State::Retracted: + { + RemoveActionFromActiveMaps(ActionLsn); + Action->ResetActionStateToPending(); + ZEN_INFO("action {} ({}) retracted for rescheduling", Action->ActionId, ActionLsn); + break; + } + // Terminal states — move to results, record history, notify queue case RunnerAction::State::Completed: case RunnerAction::State::Failed: @@ -1834,19 +2019,7 @@ ComputeServiceSession::Impl::HandleActionUpdates() if (Action->RetryCount.load(std::memory_order_relaxed) < MaxRetries) { - // Remove from whichever active map the action is in before resetting - m_RunningLock.WithExclusiveLock([&] { - m_PendingLock.WithExclusiveLock([&] { - if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end()) - { - m_PendingActions.erase(ActionLsn); - } - else - { - m_RunningMap.erase(FindIt); - } - }); - }); + RemoveActionFromActiveMaps(ActionLsn); // Reset triggers PostUpdate() which re-enters the action as Pending Action->ResetActionStateToPending(); @@ -1861,19 +2034,14 @@ ComputeServiceSession::Impl::HandleActionUpdates() } } - // Remove from whichever active map the action is in - m_RunningLock.WithExclusiveLock([&] { - m_PendingLock.WithExclusiveLock([&] { - if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end()) - { - m_PendingActions.erase(ActionLsn); - } - else - { - m_RunningMap.erase(FindIt); - } - }); - }); + RemoveActionFromActiveMaps(ActionLsn); + + // Update queue counters BEFORE publishing the result into + // m_ResultsMap. GetActionResult erases from m_ResultsMap + // under m_ResultsLock, so if we updated counters after + // releasing that lock, a caller could observe ActiveCount + // still at 1 immediately after GetActionResult returned OK. + NotifyQueueActionComplete(Action->QueueId, ActionLsn, TerminalState); m_ResultsLock.WithExclusiveLock([&] { m_ResultsMap[ActionLsn] = Action; @@ -1902,16 +2070,46 @@ ComputeServiceSession::Impl::HandleActionUpdates() }); m_RetiredCount.fetch_add(1); m_ResultRate.Mark(1); + { + using ObserverState = IComputeCompletionObserver::ActionState; + ObserverState NotifyState{}; + switch (TerminalState) + { + case RunnerAction::State::Completed: + NotifyState = ObserverState::Completed; + break; + case RunnerAction::State::Failed: + NotifyState = ObserverState::Failed; + break; + case RunnerAction::State::Abandoned: + NotifyState = ObserverState::Abandoned; + break; + case RunnerAction::State::Cancelled: + NotifyState = ObserverState::Cancelled; + break; + default: + break; + } + TerminalBatch.push_back({ActionLsn, NotifyState}); + } ZEN_DEBUG("action {} ({}) RUNNING -> COMPLETED with {}", Action->ActionId, ActionLsn, TerminalState == RunnerAction::State::Completed ? "SUCCESS" : "FAILURE"); - NotifyQueueActionComplete(Action->QueueId, ActionLsn, TerminalState); break; } } } } + + // Notify the completion observer, if any, about all terminal actions in this batch. + if (!TerminalBatch.empty()) + { + if (IComputeCompletionObserver* Observer = m_CompletionObserver.load(std::memory_order_acquire)) + { + Observer->OnActionsCompleted({TerminalBatch.data(), TerminalBatch.size()}); + } + } } size_t @@ -2014,6 +2212,12 @@ ComputeServiceSession::SetOrchestratorBasePath(std::filesystem::path BasePath) } void +ComputeServiceSession::NotifyOrchestratorChanged() +{ + m_Impl->NotifyOrchestratorChanged(); +} + +void ComputeServiceSession::StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath) { m_Impl->StartRecording(InResolver, RecordingPath); @@ -2194,6 +2398,12 @@ ComputeServiceSession::RescheduleAction(int ActionLsn) return m_Impl->RescheduleAction(ActionLsn); } +ComputeServiceSession::RescheduleResult +ComputeServiceSession::RetractAction(int ActionLsn) +{ + return m_Impl->RetractAction(ActionLsn); +} + std::vector<ComputeServiceSession::RunningActionInfo> ComputeServiceSession::GetRunningActions() { @@ -2219,6 +2429,12 @@ ComputeServiceSession::GetCompleted(CbWriter& Cbo) } void +ComputeServiceSession::SetCompletionObserver(IComputeCompletionObserver* Observer) +{ + m_Impl->m_CompletionObserver.store(Observer, std::memory_order_release); +} + +void ComputeServiceSession::PostUpdate(RunnerAction* Action) { m_Impl->PostUpdate(Action); |