aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/computeservice.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencompute/computeservice.cpp')
-rw-r--r--src/zencompute/computeservice.cpp2236
1 files changed, 2236 insertions, 0 deletions
diff --git a/src/zencompute/computeservice.cpp b/src/zencompute/computeservice.cpp
new file mode 100644
index 000000000..838d741b6
--- /dev/null
+++ b/src/zencompute/computeservice.cpp
@@ -0,0 +1,2236 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zencompute/computeservice.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include "runners/functionrunner.h"
+# include "recording/actionrecorder.h"
+# include "runners/localrunner.h"
+# include "runners/remotehttprunner.h"
+# if ZEN_PLATFORM_LINUX
+# include "runners/linuxrunner.h"
+# elif ZEN_PLATFORM_WINDOWS
+# include "runners/windowsrunner.h"
+# elif ZEN_PLATFORM_MAC
+# include "runners/macrunner.h"
+# endif
+
+# include <zencompute/recordingreader.h>
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/compress.h>
+# include <zencore/except.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/iobuffer.h>
+# include <zencore/iohash.h>
+# include <zencore/logging.h>
+# include <zencore/scopeguard.h>
+# include <zencore/trace.h>
+# include <zencore/workthreadpool.h>
+# include <zenutil/workerpools.h>
+# include <zentelemetry/stats.h>
+# include <zenhttp/httpclient.h>
+
+# include <set>
+# include <deque>
+# include <map>
+# include <thread>
+# include <unordered_map>
+# include <unordered_set>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+# include <EASTL/hash_set.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+using namespace std::literals;
+
+namespace zen {
+
+const char*
+ToString(compute::ComputeServiceSession::SessionState State)
+{
+ using enum compute::ComputeServiceSession::SessionState;
+ switch (State)
+ {
+ case Created:
+ return "Created";
+ case Ready:
+ return "Ready";
+ case Draining:
+ return "Draining";
+ case Paused:
+ return "Paused";
+ case Abandoned:
+ return "Abandoned";
+ case Sunset:
+ return "Sunset";
+ }
+ return "Unknown";
+}
+
+const char*
+ToString(compute::ComputeServiceSession::QueueState State)
+{
+ using enum compute::ComputeServiceSession::QueueState;
+ switch (State)
+ {
+ case Active:
+ return "active";
+ case Draining:
+ return "draining";
+ case Cancelled:
+ return "cancelled";
+ }
+ return "unknown";
+}
+
+} // namespace zen
+
+namespace zen::compute {
+
+using SessionState = ComputeServiceSession::SessionState;
+
+static_assert(ZEN_ARRAY_COUNT(ComputeServiceSession::ActionHistoryEntry::Timestamps) == static_cast<size_t>(RunnerAction::State::_Count));
+
+//////////////////////////////////////////////////////////////////////////
+
+struct ComputeServiceSession::Impl
+{
+ ComputeServiceSession* m_ComputeServiceSession;
+ ChunkResolver& m_ChunkResolver;
+ LoggerRef m_Log{logging::Get("compute")};
+
+ Impl(ComputeServiceSession* InComputeServiceSession, ChunkResolver& InChunkResolver)
+ : m_ComputeServiceSession(InComputeServiceSession)
+ , m_ChunkResolver(InChunkResolver)
+ , m_LocalSubmitPool(GetLargeWorkerPool(EWorkloadType::Burst))
+ , m_RemoteSubmitPool(GetLargeWorkerPool(EWorkloadType::Burst))
+ {
+ // Create a non-expiring, non-deletable implicit queue for legacy endpoints
+ auto Result = CreateQueue("implicit"sv, {}, {});
+ m_ImplicitQueueId = Result.QueueId;
+ m_QueueLock.WithSharedLock([&] { m_Queues[m_ImplicitQueueId]->Implicit = true; });
+
+ m_SchedulingThread = std::thread{&Impl::SchedulerThreadFunction, this};
+ }
+
+ void WaitUntilReady();
+ void Shutdown();
+ bool IsHealthy();
+
+ bool RequestStateTransition(SessionState NewState);
+ void AbandonAllActions();
+
+ LoggerRef Log() { return m_Log; }
+
+ // Orchestration
+
+ void SetOrchestratorEndpoint(std::string_view Endpoint);
+ void SetOrchestratorBasePath(std::filesystem::path BasePath);
+
+ std::string m_OrchestratorEndpoint;
+ std::filesystem::path m_OrchestratorBasePath;
+ Stopwatch m_OrchestratorQueryTimer;
+ std::unordered_set<std::string> m_KnownWorkerUris;
+
+ void UpdateCoordinatorState();
+
+ // Worker registration and discovery
+
+ struct FunctionDefinition
+ {
+ std::string FunctionName;
+ Guid FunctionVersion;
+ Guid BuildSystemVersion;
+ IoHash WorkerId;
+ };
+
+ void RegisterWorker(CbPackage Worker);
+ WorkerDesc GetWorkerDescriptor(const IoHash& WorkerId);
+
+ // Action scheduling and tracking
+
+ std::atomic<SessionState> m_SessionState{SessionState::Created};
+ std::atomic<int32_t> m_ActionsCounter = 0; // sequence number
+ metrics::Meter m_ArrivalRate;
+
+ RwLock m_PendingLock;
+ std::map<int, Ref<RunnerAction>> m_PendingActions;
+
+ RwLock m_RunningLock;
+ std::unordered_map<int, Ref<RunnerAction>> m_RunningMap;
+
+ RwLock m_ResultsLock;
+ std::unordered_map<int, Ref<RunnerAction>> m_ResultsMap;
+ metrics::Meter m_ResultRate;
+ std::atomic<uint64_t> m_RetiredCount{0};
+
+ EnqueueResult EnqueueAction(int QueueId, CbObject ActionObject, int Priority);
+ EnqueueResult EnqueueResolvedAction(int QueueId, WorkerDesc Worker, CbObject ActionObj, int RequestPriority);
+
+ void GetCompleted(CbWriter& Cbo);
+
+ HttpResponseCode GetActionResult(int ActionLsn, CbPackage& OutResultPackage);
+ HttpResponseCode FindActionResult(const IoHash& ActionId, CbPackage& ResultPackage);
+ void RetireActionResult(int ActionLsn);
+
+ std::thread m_SchedulingThread;
+ std::atomic<bool> m_SchedulingThreadEnabled{true};
+ Event m_SchedulingThreadEvent;
+
+ void SchedulerThreadFunction();
+ void SchedulePendingActions();
+
+ // Workers
+
+ RwLock m_WorkerLock;
+ std::unordered_map<IoHash, CbPackage> m_WorkerMap;
+ std::vector<FunctionDefinition> m_FunctionList;
+ std::vector<IoHash> GetKnownWorkerIds();
+ void SyncWorkersToRunner(FunctionRunner& Runner);
+
+ // Runners
+
+ DeferredDirectoryDeleter m_DeferredDeleter;
+ WorkerThreadPool& m_LocalSubmitPool;
+ WorkerThreadPool& m_RemoteSubmitPool;
+ RunnerGroup<LocalProcessRunner> m_LocalRunnerGroup;
+ RunnerGroup<RemoteHttpRunner> m_RemoteRunnerGroup;
+
+ void ShutdownRunners();
+
+ // Recording
+
+ void StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath);
+ void StopRecording();
+
+ std::unique_ptr<ActionRecorder> m_Recorder;
+
+ // History tracking
+
+ RwLock m_ActionHistoryLock;
+ std::deque<ComputeServiceSession::ActionHistoryEntry> m_ActionHistory;
+ size_t m_HistoryLimit = 1000;
+
+ // Queue tracking
+
+ using QueueState = ComputeServiceSession::QueueState;
+
+ struct QueueEntry : RefCounted
+ {
+ int QueueId;
+ bool Implicit{false};
+ std::atomic<QueueState> State{QueueState::Active};
+ std::atomic<int> ActiveCount{0}; // pending + running
+ std::atomic<int> CompletedCount{0}; // successfully completed
+ std::atomic<int> FailedCount{0}; // failed
+ std::atomic<int> AbandonedCount{0}; // abandoned
+ std::atomic<int> CancelledCount{0}; // cancelled
+ std::atomic<uint64_t> IdleSince{0}; // hifreq tick when queue became idle; 0 = has active work
+
+ RwLock m_Lock;
+ std::unordered_set<int> ActiveLsns; // for cancellation lookup
+ std::unordered_set<int> FinishedLsns; // completed/failed/cancelled LSNs
+
+ std::string Tag;
+ CbObject Metadata;
+ CbObject Config;
+ };
+
+ int m_ImplicitQueueId{0};
+ std::atomic<int> m_QueueCounter{0};
+ RwLock m_QueueLock;
+ std::unordered_map<int, Ref<QueueEntry>> m_Queues;
+
+ Ref<QueueEntry> FindQueue(int QueueId)
+ {
+ Ref<QueueEntry> Queue;
+ m_QueueLock.WithSharedLock([&] {
+ if (auto It = m_Queues.find(QueueId); It != m_Queues.end())
+ {
+ Queue = It->second;
+ }
+ });
+ return Queue;
+ }
+
+ ComputeServiceSession::CreateQueueResult CreateQueue(std::string_view Tag, CbObject Metadata, CbObject Config);
+ std::vector<int> GetQueueIds();
+ ComputeServiceSession::QueueStatus GetQueueStatus(int QueueId);
+ CbObject GetQueueMetadata(int QueueId);
+ CbObject GetQueueConfig(int QueueId);
+ void CancelQueue(int QueueId);
+ void DeleteQueue(int QueueId);
+ void DrainQueue(int QueueId);
+ ComputeServiceSession::EnqueueResult EnqueueActionToQueue(int QueueId, CbObject ActionObject, int Priority);
+ ComputeServiceSession::EnqueueResult EnqueueResolvedActionToQueue(int QueueId, WorkerDesc Worker, CbObject ActionObj, int Priority);
+ void GetQueueCompleted(int QueueId, CbWriter& Cbo);
+ void NotifyQueueActionComplete(int QueueId, int Lsn, RunnerAction::State ActionState);
+ void ExpireCompletedQueues();
+
+ Stopwatch m_QueueExpiryTimer;
+
+ std::vector<ComputeServiceSession::RunningActionInfo> GetRunningActions();
+ std::vector<ComputeServiceSession::ActionHistoryEntry> GetActionHistory(int Limit);
+ std::vector<ComputeServiceSession::ActionHistoryEntry> GetQueueHistory(int QueueId, int Limit);
+
+ // Action submission
+
+ [[nodiscard]] size_t QueryCapacity();
+
+ [[nodiscard]] SubmitResult SubmitAction(Ref<RunnerAction> Action);
+ [[nodiscard]] std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions);
+ [[nodiscard]] size_t GetSubmittedActionCount();
+
+ // Updates
+
+ RwLock m_UpdatedActionsLock;
+ std::vector<Ref<RunnerAction>> m_UpdatedActions;
+
+ void HandleActionUpdates();
+ void PostUpdate(RunnerAction* Action);
+
+ static constexpr int kDefaultMaxRetries = 3;
+ int GetMaxRetriesForQueue(int QueueId);
+
+ ComputeServiceSession::RescheduleResult RescheduleAction(int ActionLsn);
+
+ ActionCounts GetActionCounts()
+ {
+ ActionCounts Counts;
+ Counts.Pending = (int)m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); });
+ Counts.Running = (int)m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); });
+ Counts.Completed = (int)m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); }) + (int)m_RetiredCount.load();
+ Counts.ActiveQueues = (int)m_QueueLock.WithSharedLock([&] {
+ size_t Count = 0;
+ for (const auto& [Id, Queue] : m_Queues)
+ {
+ if (!Queue->Implicit)
+ {
+ ++Count;
+ }
+ }
+ return Count;
+ });
+ return Counts;
+ }
+
+ void EmitStats(CbObjectWriter& Cbo)
+ {
+ Cbo << "session_state"sv << ToString(m_SessionState.load(std::memory_order_relaxed));
+ m_WorkerLock.WithSharedLock([&] { Cbo << "worker_count"sv << m_WorkerMap.size(); });
+ m_ResultsLock.WithSharedLock([&] { Cbo << "actions_complete"sv << m_ResultsMap.size(); });
+ m_PendingLock.WithSharedLock([&] { Cbo << "actions_pending"sv << m_PendingActions.size(); });
+ Cbo << "actions_submitted"sv << GetSubmittedActionCount();
+ EmitSnapshot("actions_arrival"sv, m_ArrivalRate, Cbo);
+ EmitSnapshot("actions_retired"sv, m_ResultRate, Cbo);
+ }
+};
+
+bool
+ComputeServiceSession::Impl::IsHealthy()
+{
+ return m_SessionState.load(std::memory_order_relaxed) < SessionState::Abandoned;
+}
+
+bool
+ComputeServiceSession::Impl::RequestStateTransition(SessionState NewState)
+{
+ SessionState Current = m_SessionState.load(std::memory_order_relaxed);
+
+ for (;;)
+ {
+ if (Current == NewState)
+ {
+ return true;
+ }
+
+ // Validate the transition
+ bool Valid = false;
+
+ switch (Current)
+ {
+ case SessionState::Created:
+ Valid = (NewState == SessionState::Ready);
+ break;
+ case SessionState::Ready:
+ Valid = (NewState == SessionState::Draining);
+ break;
+ case SessionState::Draining:
+ Valid = (NewState == SessionState::Ready || NewState == SessionState::Paused);
+ break;
+ case SessionState::Paused:
+ Valid = (NewState == SessionState::Ready || NewState == SessionState::Sunset);
+ break;
+ case SessionState::Abandoned:
+ Valid = (NewState == SessionState::Sunset);
+ break;
+ case SessionState::Sunset:
+ Valid = false;
+ break;
+ }
+
+ // Allow jumping directly to Abandoned or Sunset from any non-terminal state
+ if (NewState == SessionState::Abandoned && Current < SessionState::Abandoned)
+ {
+ Valid = true;
+ }
+ if (NewState == SessionState::Sunset && Current != SessionState::Sunset)
+ {
+ Valid = true;
+ }
+
+ if (!Valid)
+ {
+ ZEN_WARN("invalid session state transition {} -> {}", ToString(Current), ToString(NewState));
+ return false;
+ }
+
+ if (m_SessionState.compare_exchange_strong(Current, NewState, std::memory_order_acq_rel))
+ {
+ ZEN_INFO("session state: {} -> {}", ToString(Current), ToString(NewState));
+
+ if (NewState == SessionState::Abandoned)
+ {
+ AbandonAllActions();
+ }
+
+ return true;
+ }
+
+ // CAS failed, Current was updated — retry with the new value
+ }
+}
+
+void
+ComputeServiceSession::Impl::AbandonAllActions()
+{
+ // Collect all pending actions and mark them as Abandoned
+ std::vector<Ref<RunnerAction>> PendingToAbandon;
+
+ m_PendingLock.WithSharedLock([&] {
+ PendingToAbandon.reserve(m_PendingActions.size());
+ for (auto& [Lsn, Action] : m_PendingActions)
+ {
+ PendingToAbandon.push_back(Action);
+ }
+ });
+
+ for (auto& Action : PendingToAbandon)
+ {
+ Action->SetActionState(RunnerAction::State::Abandoned);
+ }
+
+ // Collect all running actions and mark them as Abandoned, then
+ // best-effort cancel via the local runner group
+ std::vector<Ref<RunnerAction>> RunningToAbandon;
+
+ m_RunningLock.WithSharedLock([&] {
+ RunningToAbandon.reserve(m_RunningMap.size());
+ for (auto& [Lsn, Action] : m_RunningMap)
+ {
+ RunningToAbandon.push_back(Action);
+ }
+ });
+
+ for (auto& Action : RunningToAbandon)
+ {
+ Action->SetActionState(RunnerAction::State::Abandoned);
+ m_LocalRunnerGroup.CancelAction(Action->ActionLsn);
+ }
+
+ ZEN_INFO("abandoned all actions: {} pending, {} running", PendingToAbandon.size(), RunningToAbandon.size());
+}
+
+void
+ComputeServiceSession::Impl::SetOrchestratorEndpoint(std::string_view Endpoint)
+{
+ m_OrchestratorEndpoint = Endpoint;
+}
+
+void
+ComputeServiceSession::Impl::SetOrchestratorBasePath(std::filesystem::path BasePath)
+{
+ m_OrchestratorBasePath = std::move(BasePath);
+}
+
+void
+ComputeServiceSession::Impl::UpdateCoordinatorState()
+{
+ ZEN_TRACE_CPU("ComputeServiceSession::UpdateCoordinatorState");
+ if (m_OrchestratorEndpoint.empty())
+ {
+ return;
+ }
+
+ // Poll faster when we have no discovered workers yet so remote runners come online quickly
+ const uint64_t PollIntervalMs = m_KnownWorkerUris.empty() ? 500 : 5000;
+ if (m_OrchestratorQueryTimer.GetElapsedTimeMs() < PollIntervalMs)
+ {
+ return;
+ }
+
+ m_OrchestratorQueryTimer.Reset();
+
+ try
+ {
+ HttpClient Client(m_OrchestratorEndpoint);
+
+ HttpClient::Response Response = Client.Get("/orch/agents");
+
+ if (!Response.IsSuccess())
+ {
+ ZEN_WARN("orchestrator query failed with status {}", static_cast<int>(Response.StatusCode));
+ return;
+ }
+
+ CbObject WorkerList = Response.AsObject();
+
+ std::unordered_set<std::string> ValidWorkerUris;
+
+ for (auto& Item : WorkerList["workers"sv])
+ {
+ CbObjectView Worker = Item.AsObjectView();
+
+ uint64_t Dt = Worker["dt"sv].AsUInt64();
+ bool Reachable = Worker["reachable"sv].AsBool();
+ std::string_view Uri = Worker["uri"sv].AsString();
+
+ // Skip stale workers (not seen in over 30 seconds)
+ if (Dt > 30000)
+ {
+ continue;
+ }
+
+ // Skip workers that are not confirmed reachable
+ if (!Reachable)
+ {
+ continue;
+ }
+
+ std::string UriStr{Uri};
+ ValidWorkerUris.insert(UriStr);
+
+ // Skip workers we already know about
+ if (m_KnownWorkerUris.contains(UriStr))
+ {
+ continue;
+ }
+
+ ZEN_INFO("discovered new worker at {}", UriStr);
+
+ m_KnownWorkerUris.insert(UriStr);
+
+ auto* NewRunner = new RemoteHttpRunner(m_ChunkResolver, m_OrchestratorBasePath, UriStr, m_RemoteSubmitPool);
+ SyncWorkersToRunner(*NewRunner);
+ m_RemoteRunnerGroup.AddRunner(NewRunner);
+ }
+
+ // Remove workers that are no longer valid (stale or unreachable)
+ for (auto It = m_KnownWorkerUris.begin(); It != m_KnownWorkerUris.end();)
+ {
+ if (!ValidWorkerUris.contains(*It))
+ {
+ const std::string& ExpiredUri = *It;
+ ZEN_INFO("removing expired worker at {}", ExpiredUri);
+
+ m_RemoteRunnerGroup.RemoveRunnerIf([&](const RemoteHttpRunner& Runner) { return Runner.GetHostName() == ExpiredUri; });
+
+ It = m_KnownWorkerUris.erase(It);
+ }
+ else
+ {
+ ++It;
+ }
+ }
+ }
+ catch (const HttpClientError& Ex)
+ {
+ ZEN_WARN("orchestrator query error: {}", Ex.what());
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("orchestrator query unexpected error: {}", Ex.what());
+ }
+}
+
+void
+ComputeServiceSession::Impl::WaitUntilReady()
+{
+ if (m_RemoteRunnerGroup.GetRunnerCount() || !m_OrchestratorEndpoint.empty())
+ {
+ ZEN_INFO("waiting for remote runners...");
+
+ constexpr int MaxWaitSeconds = 120;
+
+ for (int Elapsed = 0; Elapsed < MaxWaitSeconds; Elapsed++)
+ {
+ if (!m_SchedulingThreadEnabled.load(std::memory_order_relaxed))
+ {
+ ZEN_WARN("shutdown requested while waiting for remote runners");
+ return;
+ }
+
+ const size_t Capacity = m_RemoteRunnerGroup.QueryCapacity();
+
+ if (Capacity > 0)
+ {
+ ZEN_INFO("found {} remote runners (capacity: {})", m_RemoteRunnerGroup.GetRunnerCount(), Capacity);
+ break;
+ }
+
+ zen::Sleep(1000);
+ }
+ }
+ else
+ {
+ ZEN_ASSERT(m_LocalRunnerGroup.GetRunnerCount(), "no runners available");
+ }
+
+ RequestStateTransition(SessionState::Ready);
+}
+
+void
+ComputeServiceSession::Impl::Shutdown()
+{
+ RequestStateTransition(SessionState::Sunset);
+
+ m_SchedulingThreadEnabled = false;
+ m_SchedulingThreadEvent.Set();
+ if (m_SchedulingThread.joinable())
+ {
+ m_SchedulingThread.join();
+ }
+
+ ShutdownRunners();
+
+ m_DeferredDeleter.Shutdown();
+}
+
+void
+ComputeServiceSession::Impl::ShutdownRunners()
+{
+ m_LocalRunnerGroup.Shutdown();
+ m_RemoteRunnerGroup.Shutdown();
+}
+
+void
+ComputeServiceSession::Impl::StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath)
+{
+ ZEN_INFO("starting recording to '{}'", RecordingPath);
+
+ m_Recorder = std::make_unique<ActionRecorder>(InCidStore, RecordingPath);
+
+ ZEN_INFO("started recording to '{}'", RecordingPath);
+}
+
+void
+ComputeServiceSession::Impl::StopRecording()
+{
+ ZEN_INFO("stopping recording");
+
+ m_Recorder = nullptr;
+
+ ZEN_INFO("stopped recording");
+}
+
+std::vector<ComputeServiceSession::RunningActionInfo>
+ComputeServiceSession::Impl::GetRunningActions()
+{
+ std::vector<ComputeServiceSession::RunningActionInfo> Result;
+ m_RunningLock.WithSharedLock([&] {
+ Result.reserve(m_RunningMap.size());
+ for (const auto& [Lsn, Action] : m_RunningMap)
+ {
+ Result.push_back({.Lsn = Lsn,
+ .QueueId = Action->QueueId,
+ .ActionId = Action->ActionId,
+ .CpuUsagePercent = Action->CpuUsagePercent.load(std::memory_order_relaxed),
+ .CpuSeconds = Action->CpuSeconds.load(std::memory_order_relaxed)});
+ }
+ });
+ return Result;
+}
+
+std::vector<ComputeServiceSession::ActionHistoryEntry>
+ComputeServiceSession::Impl::GetActionHistory(int Limit)
+{
+ RwLock::SharedLockScope _(m_ActionHistoryLock);
+
+ if (Limit > 0 && static_cast<size_t>(Limit) < m_ActionHistory.size())
+ {
+ return std::vector<ActionHistoryEntry>(m_ActionHistory.end() - Limit, m_ActionHistory.end());
+ }
+
+ return std::vector<ActionHistoryEntry>(m_ActionHistory.begin(), m_ActionHistory.end());
+}
+
+std::vector<ComputeServiceSession::ActionHistoryEntry>
+ComputeServiceSession::Impl::GetQueueHistory(int QueueId, int Limit)
+{
+ // Resolve the queue and snapshot its finished LSN set
+ Ref<QueueEntry> Queue = FindQueue(QueueId);
+
+ if (!Queue)
+ {
+ return {};
+ }
+
+ std::unordered_set<int> FinishedLsns;
+
+ Queue->m_Lock.WithSharedLock([&] { FinishedLsns = Queue->FinishedLsns; });
+
+ // Filter the global history to entries belonging to this queue.
+ // m_ActionHistory is ordered oldest-first, so the filtered result keeps the same ordering.
+ std::vector<ActionHistoryEntry> Result;
+
+ m_ActionHistoryLock.WithSharedLock([&] {
+ for (const auto& Entry : m_ActionHistory)
+ {
+ if (FinishedLsns.contains(Entry.Lsn))
+ {
+ Result.push_back(Entry);
+ }
+ }
+ });
+
+ if (Limit > 0 && static_cast<size_t>(Limit) < Result.size())
+ {
+ Result.erase(Result.begin(), Result.end() - Limit);
+ }
+
+ return Result;
+}
+
+void
+ComputeServiceSession::Impl::RegisterWorker(CbPackage Worker)
+{
+ ZEN_TRACE_CPU("ComputeServiceSession::RegisterWorker");
+ RwLock::ExclusiveLockScope _(m_WorkerLock);
+
+ const IoHash& WorkerId = Worker.GetObject().GetHash();
+
+ if (m_WorkerMap.insert_or_assign(WorkerId, Worker).second)
+ {
+ // Note that since the convention currently is that WorkerId is equal to the hash
+ // of the worker descriptor there is no chance that we get a second write with a
+ // different descriptor. Thus we only need to call this the first time, when the
+ // worker is added
+
+ m_LocalRunnerGroup.RegisterWorker(Worker);
+ m_RemoteRunnerGroup.RegisterWorker(Worker);
+
+ if (m_Recorder)
+ {
+ m_Recorder->RegisterWorker(Worker);
+ }
+
+ CbObject WorkerObj = Worker.GetObject();
+
+ // Populate worker database
+
+ const Guid WorkerBuildSystemVersion = WorkerObj["buildsystem_version"sv].AsUuid();
+
+ for (auto& Item : WorkerObj["functions"sv])
+ {
+ CbObjectView Function = Item.AsObjectView();
+
+ std::string_view FunctionName = Function["name"sv].AsString();
+ const Guid FunctionVersion = Function["version"sv].AsUuid();
+
+ m_FunctionList.emplace_back(FunctionDefinition{.FunctionName = std::string{FunctionName},
+ .FunctionVersion = FunctionVersion,
+ .BuildSystemVersion = WorkerBuildSystemVersion,
+ .WorkerId = WorkerId});
+ }
+ }
+}
+
+void
+ComputeServiceSession::Impl::SyncWorkersToRunner(FunctionRunner& Runner)
+{
+ ZEN_TRACE_CPU("SyncWorkersToRunner");
+
+ std::vector<CbPackage> Workers;
+
+ {
+ RwLock::SharedLockScope _(m_WorkerLock);
+ Workers.reserve(m_WorkerMap.size());
+ for (const auto& [Id, Pkg] : m_WorkerMap)
+ {
+ Workers.push_back(Pkg);
+ }
+ }
+
+ for (const CbPackage& Worker : Workers)
+ {
+ Runner.RegisterWorker(Worker);
+ }
+}
+
+WorkerDesc
+ComputeServiceSession::Impl::GetWorkerDescriptor(const IoHash& WorkerId)
+{
+ RwLock::SharedLockScope _(m_WorkerLock);
+
+ if (auto It = m_WorkerMap.find(WorkerId); It != m_WorkerMap.end())
+ {
+ const CbPackage& Desc = It->second;
+ return {Desc, WorkerId};
+ }
+
+ return {};
+}
+
+std::vector<IoHash>
+ComputeServiceSession::Impl::GetKnownWorkerIds()
+{
+ std::vector<IoHash> WorkerIds;
+
+ m_WorkerLock.WithSharedLock([&] {
+ WorkerIds.reserve(m_WorkerMap.size());
+ for (const auto& [WorkerId, _] : m_WorkerMap)
+ {
+ WorkerIds.push_back(WorkerId);
+ }
+ });
+
+ return WorkerIds;
+}
+
+ComputeServiceSession::EnqueueResult
+ComputeServiceSession::Impl::EnqueueAction(int QueueId, CbObject ActionObject, int Priority)
+{
+ ZEN_TRACE_CPU("ComputeServiceSession::EnqueueAction");
+
+ // Resolve function to worker
+
+ IoHash WorkerId{IoHash::Zero};
+ CbPackage WorkerPackage;
+
+ std::string_view FunctionName = ActionObject["Function"sv].AsString();
+ const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid();
+ const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid();
+
+ m_WorkerLock.WithSharedLock([&] {
+ for (const FunctionDefinition& FuncDef : m_FunctionList)
+ {
+ if (FuncDef.FunctionName == FunctionName && FuncDef.FunctionVersion == FunctionVersion &&
+ FuncDef.BuildSystemVersion == BuildSystemVersion)
+ {
+ WorkerId = FuncDef.WorkerId;
+
+ break;
+ }
+ }
+
+ if (WorkerId != IoHash::Zero)
+ {
+ if (auto It = m_WorkerMap.find(WorkerId); It != m_WorkerMap.end())
+ {
+ WorkerPackage = It->second;
+ }
+ }
+ });
+
+ if (WorkerId == IoHash::Zero)
+ {
+ CbObjectWriter Writer;
+
+ Writer << "Function"sv << FunctionName << "FunctionVersion"sv << FunctionVersion << "BuildSystemVersion" << BuildSystemVersion;
+ Writer << "error"
+ << "no worker matches the action specification";
+
+ return {0, Writer.Save()};
+ }
+
+ if (WorkerPackage)
+ {
+ return EnqueueResolvedAction(QueueId, WorkerDesc{WorkerPackage, WorkerId}, ActionObject, Priority);
+ }
+
+ CbObjectWriter Writer;
+
+ Writer << "Function"sv << FunctionName << "FunctionVersion"sv << FunctionVersion << "BuildSystemVersion" << BuildSystemVersion;
+ Writer << "error"
+ << "no worker found despite match";
+
+ return {0, Writer.Save()};
+}
+
+ComputeServiceSession::EnqueueResult
+ComputeServiceSession::Impl::EnqueueResolvedAction(int QueueId, WorkerDesc Worker, CbObject ActionObj, int RequestPriority)
+{
+ ZEN_TRACE_CPU("ComputeServiceSession::EnqueueResolvedAction");
+
+ if (m_SessionState.load(std::memory_order_relaxed) != SessionState::Ready)
+ {
+ CbObjectWriter Writer;
+ Writer << "error"sv << fmt::format("session is not accepting actions (state: {})", ToString(m_SessionState.load()));
+ return {0, Writer.Save()};
+ }
+
+ const int ActionLsn = ++m_ActionsCounter;
+
+ m_ArrivalRate.Mark();
+
+ Ref<RunnerAction> Pending{new RunnerAction(m_ComputeServiceSession)};
+
+ Pending->ActionLsn = ActionLsn;
+ Pending->QueueId = QueueId;
+ Pending->Worker = Worker;
+ Pending->ActionId = ActionObj.GetHash();
+ Pending->ActionObj = ActionObj;
+ Pending->Priority = RequestPriority;
+
+ // For now simply put action into pending state, so we can do batch scheduling
+
+ ZEN_DEBUG("action {} ({}) PENDING", Pending->ActionId, Pending->ActionLsn);
+
+ Pending->SetActionState(RunnerAction::State::Pending);
+
+ if (m_Recorder)
+ {
+ m_Recorder->RecordAction(Pending);
+ }
+
+ CbObjectWriter Writer;
+ Writer << "lsn" << Pending->ActionLsn;
+ Writer << "worker" << Pending->Worker.WorkerId;
+ Writer << "action" << Pending->ActionId;
+
+ return {Pending->ActionLsn, Writer.Save()};
+}
+
+SubmitResult
+ComputeServiceSession::Impl::SubmitAction(Ref<RunnerAction> Action)
+{
+ // Loosely round-robin scheduling of actions across runners.
+ //
+ // It's not entirely clear what this means given that submits
+ // can come in across multiple threads, but it's probably better
+ // than always starting with the first runner.
+ //
+ // Longer term we should track the state of the individual
+ // runners and make decisions accordingly.
+
+ SubmitResult Result = m_LocalRunnerGroup.SubmitAction(Action);
+ if (Result.IsAccepted)
+ {
+ return Result;
+ }
+
+ return m_RemoteRunnerGroup.SubmitAction(Action);
+}
+
+size_t
+ComputeServiceSession::Impl::GetSubmittedActionCount()
+{
+ return m_LocalRunnerGroup.GetSubmittedActionCount() + m_RemoteRunnerGroup.GetSubmittedActionCount();
+}
+
+HttpResponseCode
+ComputeServiceSession::Impl::GetActionResult(int ActionLsn, CbPackage& OutResultPackage)
+{
+ // This lock is held for the duration of the function since we need to
+ // be sure that the action doesn't change state while we are checking the
+ // different data structures
+
+ RwLock::ExclusiveLockScope _(m_ResultsLock);
+
+ if (auto It = m_ResultsMap.find(ActionLsn); It != m_ResultsMap.end())
+ {
+ OutResultPackage = std::move(It->second->GetResult());
+
+ m_ResultsMap.erase(It);
+
+ return HttpResponseCode::OK;
+ }
+
+ {
+ RwLock::SharedLockScope __(m_PendingLock);
+
+ if (auto FindIt = m_PendingActions.find(ActionLsn); FindIt != m_PendingActions.end())
+ {
+ return HttpResponseCode::Accepted;
+ }
+ }
+
+ // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must
+ // always be taken after m_ResultsLock if both are needed
+
+ {
+ RwLock::SharedLockScope __(m_RunningLock);
+
+ if (m_RunningMap.find(ActionLsn) != m_RunningMap.end())
+ {
+ return HttpResponseCode::Accepted;
+ }
+ }
+
+ return HttpResponseCode::NotFound;
+}
+
+HttpResponseCode
+ComputeServiceSession::Impl::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage)
+{
+ // This lock is held for the duration of the function since we need to
+ // be sure that the action doesn't change state while we are checking the
+ // different data structures
+
+ RwLock::ExclusiveLockScope _(m_ResultsLock);
+
+ for (auto It = begin(m_ResultsMap), End = end(m_ResultsMap); It != End; ++It)
+ {
+ if (It->second->ActionId == ActionId)
+ {
+ OutResultPackage = std::move(It->second->GetResult());
+
+ m_ResultsMap.erase(It);
+
+ return HttpResponseCode::OK;
+ }
+ }
+
+ {
+ RwLock::SharedLockScope __(m_PendingLock);
+
+ for (const auto& [K, Pending] : m_PendingActions)
+ {
+ if (Pending->ActionId == ActionId)
+ {
+ return HttpResponseCode::Accepted;
+ }
+ }
+ }
+
+ // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must
+ // always be taken after m_ResultsLock if both are needed
+
+ {
+ RwLock::SharedLockScope __(m_RunningLock);
+
+ for (const auto& [K, v] : m_RunningMap)
+ {
+ if (v->ActionId == ActionId)
+ {
+ return HttpResponseCode::Accepted;
+ }
+ }
+ }
+
+ return HttpResponseCode::NotFound;
+}
+
+void
+ComputeServiceSession::Impl::RetireActionResult(int ActionLsn)
+{
+ m_DeferredDeleter.MarkReady(ActionLsn);
+}
+
+void
+ComputeServiceSession::Impl::GetCompleted(CbWriter& Cbo)
+{
+ Cbo.BeginArray("completed");
+
+ m_ResultsLock.WithSharedLock([&] {
+ for (auto& [Lsn, Action] : m_ResultsMap)
+ {
+ Cbo.BeginObject();
+ Cbo << "lsn"sv << Lsn;
+ Cbo << "state"sv << RunnerAction::ToString(Action->ActionState());
+ Cbo.EndObject();
+ }
+ });
+
+ Cbo.EndArray();
+}
+
+//////////////////////////////////////////////////////////////////////////
+// Queue management
+
+ComputeServiceSession::CreateQueueResult
+ComputeServiceSession::Impl::CreateQueue(std::string_view Tag, CbObject Metadata, CbObject Config)
+{
+ const int QueueId = ++m_QueueCounter;
+
+ Ref<QueueEntry> Queue{new QueueEntry()};
+ Queue->QueueId = QueueId;
+ Queue->Tag = Tag;
+ Queue->Metadata = std::move(Metadata);
+ Queue->Config = std::move(Config);
+ Queue->IdleSince = GetHifreqTimerValue();
+
+ m_QueueLock.WithExclusiveLock([&] { m_Queues[QueueId] = Queue; });
+
+ ZEN_DEBUG("created queue {}", QueueId);
+
+ return {.QueueId = QueueId};
+}
+
+std::vector<int>
+ComputeServiceSession::Impl::GetQueueIds()
+{
+ std::vector<int> Ids;
+
+ m_QueueLock.WithSharedLock([&] {
+ Ids.reserve(m_Queues.size());
+ for (const auto& [Id, Queue] : m_Queues)
+ {
+ if (!Queue->Implicit)
+ {
+ Ids.push_back(Id);
+ }
+ }
+ });
+
+ return Ids;
+}
+
+ComputeServiceSession::QueueStatus
+ComputeServiceSession::Impl::GetQueueStatus(int QueueId)
+{
+ Ref<QueueEntry> Queue = FindQueue(QueueId);
+
+ if (!Queue)
+ {
+ return {};
+ }
+
+ const int Active = Queue->ActiveCount.load(std::memory_order_relaxed);
+ const int Completed = Queue->CompletedCount.load(std::memory_order_relaxed);
+ const int Failed = Queue->FailedCount.load(std::memory_order_relaxed);
+ const int AbandonedN = Queue->AbandonedCount.load(std::memory_order_relaxed);
+ const int CancelledN = Queue->CancelledCount.load(std::memory_order_relaxed);
+ const QueueState QState = Queue->State.load();
+
+ return {
+ .IsValid = true,
+ .QueueId = QueueId,
+ .ActiveCount = Active,
+ .CompletedCount = Completed,
+ .FailedCount = Failed,
+ .AbandonedCount = AbandonedN,
+ .CancelledCount = CancelledN,
+ .State = QState,
+ .IsComplete = (Active == 0),
+ };
+}
+
+CbObject
+ComputeServiceSession::Impl::GetQueueMetadata(int QueueId)
+{
+ Ref<QueueEntry> Queue = FindQueue(QueueId);
+
+ if (!Queue)
+ {
+ return {};
+ }
+
+ return Queue->Metadata;
+}
+
+CbObject
+ComputeServiceSession::Impl::GetQueueConfig(int QueueId)
+{
+ Ref<QueueEntry> Queue = FindQueue(QueueId);
+
+ if (!Queue)
+ {
+ return {};
+ }
+
+ return Queue->Config;
+}
+
+void
+ComputeServiceSession::Impl::CancelQueue(int QueueId)
+{
+ Ref<QueueEntry> Queue = FindQueue(QueueId);
+
+ if (!Queue || Queue->Implicit)
+ {
+ return;
+ }
+
+ Queue->State.store(QueueState::Cancelled);
+
+ // Collect active LSNs snapshot for cancellation
+ std::vector<int> LsnsToCancel;
+
+ Queue->m_Lock.WithSharedLock([&] { LsnsToCancel.assign(Queue->ActiveLsns.begin(), Queue->ActiveLsns.end()); });
+
+ // Identify which LSNs are still pending (not yet dispatched to a runner)
+ std::vector<Ref<RunnerAction>> PendingActionsToCancel;
+ std::vector<int> RunningLsnsToCancel;
+
+ m_PendingLock.WithSharedLock([&] {
+ for (int Lsn : LsnsToCancel)
+ {
+ if (auto It = m_PendingActions.find(Lsn); It != m_PendingActions.end())
+ {
+ PendingActionsToCancel.push_back(It->second);
+ }
+ }
+ });
+
+ m_RunningLock.WithSharedLock([&] {
+ for (int Lsn : LsnsToCancel)
+ {
+ if (m_RunningMap.find(Lsn) != m_RunningMap.end())
+ {
+ RunningLsnsToCancel.push_back(Lsn);
+ }
+ }
+ });
+
+ // Cancel pending actions by marking them as Cancelled; they will flow through
+ // HandleActionUpdates and eventually be removed from the pending map.
+ for (auto& Action : PendingActionsToCancel)
+ {
+ Action->SetActionState(RunnerAction::State::Cancelled);
+ }
+
+ // Best-effort cancellation of running actions via the local runner group.
+ // Also set the action state to Cancelled directly so a subsequent Failed
+ // transition from the runner is blocked (Cancelled > Failed in the enum).
+ for (int Lsn : RunningLsnsToCancel)
+ {
+ m_RunningLock.WithSharedLock([&] {
+ if (auto It = m_RunningMap.find(Lsn); It != m_RunningMap.end())
+ {
+ It->second->SetActionState(RunnerAction::State::Cancelled);
+ }
+ });
+ m_LocalRunnerGroup.CancelAction(Lsn);
+ }
+
+ m_RemoteRunnerGroup.CancelRemoteQueue(QueueId);
+
+ ZEN_INFO("cancelled queue {}: {} pending, {} running actions cancelled",
+ QueueId,
+ PendingActionsToCancel.size(),
+ RunningLsnsToCancel.size());
+
+ // Wake up the scheduler to process the cancelled actions
+ m_SchedulingThreadEvent.Set();
+}
+
+void
+ComputeServiceSession::Impl::DeleteQueue(int QueueId)
+{
+ // Never delete the implicit queue
+ {
+ Ref<QueueEntry> Queue = FindQueue(QueueId);
+ if (Queue && Queue->Implicit)
+ {
+ return;
+ }
+ }
+
+ // Cancel any active work first
+ CancelQueue(QueueId);
+
+ m_QueueLock.WithExclusiveLock([&] {
+ if (auto It = m_Queues.find(QueueId); It != m_Queues.end())
+ {
+ m_Queues.erase(It);
+ }
+ });
+}
+
+void
+ComputeServiceSession::Impl::DrainQueue(int QueueId)
+{
+ Ref<QueueEntry> Queue = FindQueue(QueueId);
+
+ if (!Queue || Queue->Implicit)
+ {
+ return;
+ }
+
+ QueueState Expected = QueueState::Active;
+ Queue->State.compare_exchange_strong(Expected, QueueState::Draining);
+ ZEN_INFO("draining queue {}", QueueId);
+}
+
+ComputeServiceSession::EnqueueResult
+ComputeServiceSession::Impl::EnqueueActionToQueue(int QueueId, CbObject ActionObject, int Priority)
+{
+ Ref<QueueEntry> Queue = FindQueue(QueueId);
+
+ if (!Queue)
+ {
+ CbObjectWriter Writer;
+ Writer << "error"sv
+ << "queue not found"sv;
+ return {0, Writer.Save()};
+ }
+
+ QueueState QState = Queue->State.load();
+ if (QState == QueueState::Cancelled)
+ {
+ CbObjectWriter Writer;
+ Writer << "error"sv
+ << "queue is cancelled"sv;
+ return {0, Writer.Save()};
+ }
+
+ if (QState == QueueState::Draining)
+ {
+ CbObjectWriter Writer;
+ Writer << "error"sv
+ << "queue is draining"sv;
+ return {0, Writer.Save()};
+ }
+
+ EnqueueResult Result = EnqueueAction(QueueId, ActionObject, Priority);
+
+ if (Result.Lsn != 0)
+ {
+ Queue->m_Lock.WithExclusiveLock([&] { Queue->ActiveLsns.insert(Result.Lsn); });
+ Queue->ActiveCount.fetch_add(1, std::memory_order_relaxed);
+ Queue->IdleSince.store(0, std::memory_order_relaxed);
+ }
+
+ return Result;
+}
+
+ComputeServiceSession::EnqueueResult
+ComputeServiceSession::Impl::EnqueueResolvedActionToQueue(int QueueId, WorkerDesc Worker, CbObject ActionObj, int Priority)
+{
+ Ref<QueueEntry> Queue = FindQueue(QueueId);
+
+ if (!Queue)
+ {
+ CbObjectWriter Writer;
+ Writer << "error"sv
+ << "queue not found"sv;
+ return {0, Writer.Save()};
+ }
+
+ QueueState QState = Queue->State.load();
+ if (QState == QueueState::Cancelled)
+ {
+ CbObjectWriter Writer;
+ Writer << "error"sv
+ << "queue is cancelled"sv;
+ return {0, Writer.Save()};
+ }
+
+ if (QState == QueueState::Draining)
+ {
+ CbObjectWriter Writer;
+ Writer << "error"sv
+ << "queue is draining"sv;
+ return {0, Writer.Save()};
+ }
+
+ EnqueueResult Result = EnqueueResolvedAction(QueueId, Worker, ActionObj, Priority);
+
+ if (Result.Lsn != 0)
+ {
+ Queue->m_Lock.WithExclusiveLock([&] { Queue->ActiveLsns.insert(Result.Lsn); });
+ Queue->ActiveCount.fetch_add(1, std::memory_order_relaxed);
+ Queue->IdleSince.store(0, std::memory_order_relaxed);
+ }
+
+ return Result;
+}
+
+void
+ComputeServiceSession::Impl::GetQueueCompleted(int QueueId, CbWriter& Cbo)
+{
+ Ref<QueueEntry> Queue = FindQueue(QueueId);
+
+ Cbo.BeginArray("completed");
+
+ if (Queue)
+ {
+ Queue->m_Lock.WithSharedLock([&] {
+ m_ResultsLock.WithSharedLock([&] {
+ for (int Lsn : Queue->FinishedLsns)
+ {
+ if (m_ResultsMap.contains(Lsn))
+ {
+ Cbo << Lsn;
+ }
+ }
+ });
+ });
+ }
+
+ Cbo.EndArray();
+}
+
+void
+ComputeServiceSession::Impl::NotifyQueueActionComplete(int QueueId, int Lsn, RunnerAction::State ActionState)
+{
+ if (QueueId == 0)
+ {
+ return;
+ }
+
+ Ref<QueueEntry> Queue = FindQueue(QueueId);
+
+ if (!Queue)
+ {
+ return;
+ }
+
+ Queue->m_Lock.WithExclusiveLock([&] {
+ Queue->ActiveLsns.erase(Lsn);
+ Queue->FinishedLsns.insert(Lsn);
+ });
+
+ const int PreviousActive = Queue->ActiveCount.fetch_sub(1, std::memory_order_relaxed);
+ if (PreviousActive == 1)
+ {
+ Queue->IdleSince.store(GetHifreqTimerValue(), std::memory_order_relaxed);
+ }
+
+ switch (ActionState)
+ {
+ case RunnerAction::State::Completed:
+ Queue->CompletedCount.fetch_add(1, std::memory_order_relaxed);
+ break;
+ case RunnerAction::State::Abandoned:
+ Queue->AbandonedCount.fetch_add(1, std::memory_order_relaxed);
+ break;
+ case RunnerAction::State::Cancelled:
+ Queue->CancelledCount.fetch_add(1, std::memory_order_relaxed);
+ break;
+ default:
+ Queue->FailedCount.fetch_add(1, std::memory_order_relaxed);
+ break;
+ }
+}
+
+void
+ComputeServiceSession::Impl::ExpireCompletedQueues()
+{
+ static constexpr uint64_t ExpiryTimeMs = 15 * 60 * 1000;
+
+ std::vector<int> ExpiredQueueIds;
+
+ m_QueueLock.WithSharedLock([&] {
+ for (const auto& [Id, Queue] : m_Queues)
+ {
+ if (Queue->Implicit)
+ {
+ continue;
+ }
+ const uint64_t Idle = Queue->IdleSince.load(std::memory_order_relaxed);
+ if (Idle != 0 && Queue->ActiveCount.load(std::memory_order_relaxed) == 0)
+ {
+ const uint64_t ElapsedMs = Stopwatch::GetElapsedTimeMs(GetHifreqTimerValue() - Idle);
+ if (ElapsedMs >= ExpiryTimeMs)
+ {
+ ExpiredQueueIds.push_back(Id);
+ }
+ }
+ }
+ });
+
+ for (int QueueId : ExpiredQueueIds)
+ {
+ ZEN_INFO("expiring idle queue {}", QueueId);
+ DeleteQueue(QueueId);
+ }
+}
+
+void
+ComputeServiceSession::Impl::SchedulePendingActions()
+{
+ ZEN_TRACE_CPU("ComputeServiceSession::SchedulePendingActions");
+ int ScheduledCount = 0;
+ size_t RunningCount = m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); });
+ size_t PendingCount = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); });
+ size_t ResultCount = m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); });
+
+ static Stopwatch DumpRunningTimer;
+
+ auto _ = MakeGuard([&] {
+ ZEN_INFO("scheduled {} pending actions. {} running ({} retired), {} still pending, {} results",
+ ScheduledCount,
+ RunningCount,
+ m_RetiredCount.load(),
+ PendingCount,
+ ResultCount);
+
+ if (DumpRunningTimer.GetElapsedTimeMs() > 30000)
+ {
+ DumpRunningTimer.Reset();
+
+ std::set<int> RunningList;
+ m_RunningLock.WithSharedLock([&] {
+ for (auto& [K, V] : m_RunningMap)
+ {
+ RunningList.insert(K);
+ }
+ });
+
+ ExtendableStringBuilder<1024> RunningString;
+ for (int i : RunningList)
+ {
+ if (RunningString.Size())
+ {
+ RunningString << ", ";
+ }
+
+ RunningString.Append(IntNum(i));
+ }
+
+ ZEN_INFO("running: {}", RunningString);
+ }
+ });
+
+ size_t Capacity = QueryCapacity();
+
+ if (!Capacity)
+ {
+ _.Dismiss();
+
+ return;
+ }
+
+ std::vector<Ref<RunnerAction>> ActionsToSchedule;
+
+ // Pull actions to schedule from the pending queue, we will
+ // try to submit these to the runner outside of the lock. Note
+ // that because of how the state transitions work it's not
+ // actually the case that all of these actions will still be
+ // pending by the time we try to submit them, but that's fine.
+ //
+ // Also note that the m_PendingActions list is not maintained
+ // here, that's done periodically in SchedulePendingActions()
+
+ m_PendingLock.WithExclusiveLock([&] {
+ if (m_SessionState.load(std::memory_order_relaxed) >= SessionState::Paused)
+ {
+ return;
+ }
+
+ if (m_PendingActions.empty())
+ {
+ return;
+ }
+
+ for (auto& [Lsn, Pending] : m_PendingActions)
+ {
+ switch (Pending->ActionState())
+ {
+ case RunnerAction::State::Pending:
+ ActionsToSchedule.push_back(Pending);
+ break;
+
+ case RunnerAction::State::Submitting:
+ break; // already claimed by async submission
+
+ case RunnerAction::State::Running:
+ case RunnerAction::State::Completed:
+ case RunnerAction::State::Failed:
+ case RunnerAction::State::Abandoned:
+ case RunnerAction::State::Cancelled:
+ break;
+
+ default:
+ case RunnerAction::State::New:
+ ZEN_WARN("unexpected state {} for pending action {}", static_cast<int>(Pending->ActionState()), Pending->ActionLsn);
+ break;
+ }
+ }
+
+ // Sort by priority descending, then by LSN ascending (FIFO within same priority)
+ std::sort(ActionsToSchedule.begin(), ActionsToSchedule.end(), [](const Ref<RunnerAction>& A, const Ref<RunnerAction>& B) {
+ if (A->Priority != B->Priority)
+ {
+ return A->Priority > B->Priority;
+ }
+ return A->ActionLsn < B->ActionLsn;
+ });
+
+ if (ActionsToSchedule.size() > Capacity)
+ {
+ ActionsToSchedule.resize(Capacity);
+ }
+
+ PendingCount = m_PendingActions.size();
+ });
+
+ if (ActionsToSchedule.empty())
+ {
+ _.Dismiss();
+ return;
+ }
+
+ ZEN_INFO("attempting schedule of {} pending actions", ActionsToSchedule.size());
+
+ Stopwatch SubmitTimer;
+ std::vector<SubmitResult> SubmitResults = SubmitActions(ActionsToSchedule);
+
+ int NotAcceptedCount = 0;
+ int ScheduledActionCount = 0;
+
+ for (const SubmitResult& SubResult : SubmitResults)
+ {
+ if (SubResult.IsAccepted)
+ {
+ ++ScheduledActionCount;
+ }
+ else
+ {
+ ++NotAcceptedCount;
+ }
+ }
+
+ ZEN_INFO("scheduled {} pending actions in {} ({} rejected)",
+ ScheduledActionCount,
+ NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()),
+ NotAcceptedCount);
+
+ ScheduledCount += ScheduledActionCount;
+ PendingCount -= ScheduledActionCount;
+}
+
+void
+ComputeServiceSession::Impl::SchedulerThreadFunction()
+{
+ SetCurrentThreadName("Function_Scheduler");
+
+ auto _ = MakeGuard([&] { ZEN_INFO("scheduler thread exiting"); });
+
+ do
+ {
+ int TimeoutMs = 500;
+
+ auto PendingCount = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); });
+
+ if (PendingCount)
+ {
+ TimeoutMs = 100;
+ }
+
+ const bool WasSignaled = m_SchedulingThreadEvent.Wait(TimeoutMs);
+
+ if (m_SchedulingThreadEnabled == false)
+ {
+ return;
+ }
+
+ if (WasSignaled)
+ {
+ m_SchedulingThreadEvent.Reset();
+ }
+
+ ZEN_DEBUG("compute scheduler TICK (Pending: {} was {}, Running: {}, Results: {}) timeout: {}",
+ m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); }),
+ PendingCount,
+ m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); }),
+ m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); }),
+ TimeoutMs);
+
+ HandleActionUpdates();
+
+ // Auto-transition Draining → Paused when all work is done
+ if (m_SessionState.load(std::memory_order_relaxed) == SessionState::Draining)
+ {
+ size_t Pending = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); });
+ size_t Running = m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); });
+
+ if (Pending == 0 && Running == 0)
+ {
+ SessionState Expected = SessionState::Draining;
+ if (m_SessionState.compare_exchange_strong(Expected, SessionState::Paused, std::memory_order_acq_rel))
+ {
+ ZEN_INFO("session state: Draining -> Paused (all work completed)");
+ }
+ }
+ }
+
+ UpdateCoordinatorState();
+ SchedulePendingActions();
+
+ static constexpr uint64_t QueueExpirySweepIntervalMs = 30000;
+ if (m_QueueExpiryTimer.GetElapsedTimeMs() >= QueueExpirySweepIntervalMs)
+ {
+ m_QueueExpiryTimer.Reset();
+ ExpireCompletedQueues();
+ }
+ } while (m_SchedulingThreadEnabled);
+}
+
+void
+ComputeServiceSession::Impl::PostUpdate(RunnerAction* Action)
+{
+ m_UpdatedActionsLock.WithExclusiveLock([&] { m_UpdatedActions.emplace_back(Action); });
+ m_SchedulingThreadEvent.Set();
+}
+
+int
+ComputeServiceSession::Impl::GetMaxRetriesForQueue(int QueueId)
+{
+ if (QueueId == 0)
+ {
+ return kDefaultMaxRetries;
+ }
+
+ CbObject Config = GetQueueConfig(QueueId);
+
+ if (Config)
+ {
+ int Value = Config["max_retries"].AsInt32(0);
+
+ if (Value > 0)
+ {
+ return Value;
+ }
+ }
+
+ return kDefaultMaxRetries;
+}
+
+ComputeServiceSession::RescheduleResult
+ComputeServiceSession::Impl::RescheduleAction(int ActionLsn)
+{
+ Ref<RunnerAction> Action;
+ RunnerAction::State State;
+ RescheduleResult ValidationError;
+ bool Removed = false;
+
+ // Find, validate, and remove atomically under a single lock scope to prevent
+ // concurrent RescheduleAction calls from double-removing the same action.
+ m_ResultsLock.WithExclusiveLock([&] {
+ auto It = m_ResultsMap.find(ActionLsn);
+ if (It == m_ResultsMap.end())
+ {
+ ValidationError = {.Success = false, .Error = "Action not found in results"};
+ return;
+ }
+
+ Action = It->second;
+ State = Action->ActionState();
+
+ if (State != RunnerAction::State::Failed && State != RunnerAction::State::Abandoned)
+ {
+ ValidationError = {.Success = false, .Error = "Action is not in a failed or abandoned state"};
+ return;
+ }
+
+ int MaxRetries = GetMaxRetriesForQueue(Action->QueueId);
+ if (Action->RetryCount.load(std::memory_order_relaxed) >= MaxRetries)
+ {
+ ValidationError = {.Success = false, .Error = "Retry limit reached"};
+ return;
+ }
+
+ m_ResultsMap.erase(It);
+ Removed = true;
+ });
+
+ if (!Removed)
+ {
+ return ValidationError;
+ }
+
+ if (Action->QueueId != 0)
+ {
+ Ref<QueueEntry> Queue = FindQueue(Action->QueueId);
+
+ if (Queue)
+ {
+ Queue->m_Lock.WithExclusiveLock([&] {
+ Queue->FinishedLsns.erase(ActionLsn);
+ Queue->ActiveLsns.insert(ActionLsn);
+ });
+
+ Queue->ActiveCount.fetch_add(1, std::memory_order_relaxed);
+ Queue->IdleSince.store(0, std::memory_order_relaxed);
+
+ if (State == RunnerAction::State::Failed)
+ {
+ Queue->FailedCount.fetch_sub(1, std::memory_order_relaxed);
+ }
+ else
+ {
+ Queue->AbandonedCount.fetch_sub(1, std::memory_order_relaxed);
+ }
+ }
+ }
+
+ // Reset action state — this calls PostUpdate() internally
+ Action->ResetActionStateToPending();
+
+ int NewRetryCount = Action->RetryCount.load(std::memory_order_relaxed);
+ ZEN_INFO("action {} ({}) manually rescheduled (retry {})", Action->ActionId, ActionLsn, NewRetryCount);
+
+ return {.Success = true, .RetryCount = NewRetryCount};
+}
+
+void
+ComputeServiceSession::Impl::HandleActionUpdates()
+{
+ ZEN_TRACE_CPU("ComputeServiceSession::HandleActionUpdates");
+
+ // Drain the update queue atomically
+ std::vector<Ref<RunnerAction>> UpdatedActions;
+ m_UpdatedActionsLock.WithExclusiveLock([&] { std::swap(UpdatedActions, m_UpdatedActions); });
+
+ std::unordered_set<int> SeenLsn;
+
+ // Process each action's latest state, deduplicating by LSN.
+ //
+ // This is safe because state transitions are monotonically increasing by enum
+ // rank (Pending < Submitting < Running < Completed/Failed/Cancelled), so
+ // SetActionState rejects any transition to a lower-ranked state. By the time
+ // we read ActionState() here, it reflects the highest state reached — making
+ // the first occurrence per LSN authoritative and duplicates redundant.
+ for (Ref<RunnerAction>& Action : UpdatedActions)
+ {
+ const int ActionLsn = Action->ActionLsn;
+
+ if (auto [It, Inserted] = SeenLsn.insert(ActionLsn); Inserted)
+ {
+ switch (Action->ActionState())
+ {
+ // Newly enqueued — add to pending map for scheduling
+ case RunnerAction::State::Pending:
+ m_PendingLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Action}); });
+ break;
+
+ // Async submission in progress — remains in pending map
+ case RunnerAction::State::Submitting:
+ break;
+
+ // Dispatched to a runner — move from pending to running
+ case RunnerAction::State::Running:
+ m_RunningLock.WithExclusiveLock([&] {
+ m_PendingLock.WithExclusiveLock([&] {
+ m_RunningMap.insert({ActionLsn, Action});
+ m_PendingActions.erase(ActionLsn);
+ });
+ });
+ ZEN_DEBUG("action {} ({}) RUNNING", Action->ActionId, ActionLsn);
+ break;
+
+ // Terminal states — move to results, record history, notify queue
+ case RunnerAction::State::Completed:
+ case RunnerAction::State::Failed:
+ case RunnerAction::State::Abandoned:
+ case RunnerAction::State::Cancelled:
+ {
+ auto TerminalState = Action->ActionState();
+
+ // Automatic retry for Failed/Abandoned actions with retries remaining.
+ // Skip retries when the session itself is abandoned — those actions
+ // were intentionally abandoned and should not be rescheduled.
+ if ((TerminalState == RunnerAction::State::Failed || TerminalState == RunnerAction::State::Abandoned) &&
+ m_SessionState.load(std::memory_order_relaxed) < SessionState::Abandoned)
+ {
+ int MaxRetries = GetMaxRetriesForQueue(Action->QueueId);
+
+ if (Action->RetryCount.load(std::memory_order_relaxed) < MaxRetries)
+ {
+ // Remove from whichever active map the action is in before resetting
+ m_RunningLock.WithExclusiveLock([&] {
+ m_PendingLock.WithExclusiveLock([&] {
+ if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end())
+ {
+ m_PendingActions.erase(ActionLsn);
+ }
+ else
+ {
+ m_RunningMap.erase(FindIt);
+ }
+ });
+ });
+
+ // Reset triggers PostUpdate() which re-enters the action as Pending
+ Action->ResetActionStateToPending();
+ int NewRetryCount = Action->RetryCount.load(std::memory_order_relaxed);
+
+ ZEN_INFO("action {} ({}) auto-rescheduled (retry {}/{})",
+ Action->ActionId,
+ ActionLsn,
+ NewRetryCount,
+ MaxRetries);
+ break;
+ }
+ }
+
+ // Remove from whichever active map the action is in
+ m_RunningLock.WithExclusiveLock([&] {
+ m_PendingLock.WithExclusiveLock([&] {
+ if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end())
+ {
+ m_PendingActions.erase(ActionLsn);
+ }
+ else
+ {
+ m_RunningMap.erase(FindIt);
+ }
+ });
+ });
+
+ m_ResultsLock.WithExclusiveLock([&] {
+ m_ResultsMap[ActionLsn] = Action;
+
+ // Append to bounded action history ring
+ m_ActionHistoryLock.WithExclusiveLock([&] {
+ ActionHistoryEntry Entry{.Lsn = ActionLsn,
+ .QueueId = Action->QueueId,
+ .ActionId = Action->ActionId,
+ .WorkerId = Action->Worker.WorkerId,
+ .ActionDescriptor = Action->ActionObj,
+ .ExecutionLocation = std::move(Action->ExecutionLocation),
+ .Succeeded = TerminalState == RunnerAction::State::Completed,
+ .CpuSeconds = Action->CpuSeconds.load(std::memory_order_relaxed),
+ .RetryCount = Action->RetryCount.load(std::memory_order_relaxed)};
+
+ std::copy(std::begin(Action->Timestamps), std::end(Action->Timestamps), std::begin(Entry.Timestamps));
+
+ m_ActionHistory.push_back(std::move(Entry));
+
+ if (m_ActionHistory.size() > m_HistoryLimit)
+ {
+ m_ActionHistory.pop_front();
+ }
+ });
+ });
+ m_RetiredCount.fetch_add(1);
+ m_ResultRate.Mark(1);
+ ZEN_DEBUG("action {} ({}) RUNNING -> COMPLETED with {}",
+ Action->ActionId,
+ ActionLsn,
+ TerminalState == RunnerAction::State::Completed ? "SUCCESS" : "FAILURE");
+ NotifyQueueActionComplete(Action->QueueId, ActionLsn, TerminalState);
+ break;
+ }
+ }
+ }
+ }
+}
+
+size_t
+ComputeServiceSession::Impl::QueryCapacity()
+{
+ return m_LocalRunnerGroup.QueryCapacity() + m_RemoteRunnerGroup.QueryCapacity();
+}
+
+std::vector<SubmitResult>
+ComputeServiceSession::Impl::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
+{
+ ZEN_TRACE_CPU("ComputeServiceSession::SubmitActions");
+ std::vector<SubmitResult> Results(Actions.size());
+
+ // First try submitting the batch to local runners in parallel
+
+ std::vector<SubmitResult> LocalResults = m_LocalRunnerGroup.SubmitActions(Actions);
+ std::vector<size_t> RemoteIndices;
+ std::vector<Ref<RunnerAction>> RemoteActions;
+
+ for (size_t i = 0; i < Actions.size(); ++i)
+ {
+ if (LocalResults[i].IsAccepted)
+ {
+ Results[i] = std::move(LocalResults[i]);
+ }
+ else
+ {
+ RemoteIndices.push_back(i);
+ RemoteActions.push_back(Actions[i]);
+ }
+ }
+
+ // Submit remaining actions to remote runners in parallel
+ if (!RemoteActions.empty())
+ {
+ std::vector<SubmitResult> RemoteResults = m_RemoteRunnerGroup.SubmitActions(RemoteActions);
+
+ for (size_t j = 0; j < RemoteIndices.size(); ++j)
+ {
+ Results[RemoteIndices[j]] = std::move(RemoteResults[j]);
+ }
+ }
+
+ return Results;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ComputeServiceSession::ComputeServiceSession(ChunkResolver& InChunkResolver)
+{
+ m_Impl = std::make_unique<Impl>(this, InChunkResolver);
+}
+
+ComputeServiceSession::~ComputeServiceSession()
+{
+ Shutdown();
+}
+
+bool
+ComputeServiceSession::IsHealthy()
+{
+ return m_Impl->IsHealthy();
+}
+
+void
+ComputeServiceSession::WaitUntilReady()
+{
+ m_Impl->WaitUntilReady();
+}
+
+void
+ComputeServiceSession::Shutdown()
+{
+ m_Impl->Shutdown();
+}
+
+ComputeServiceSession::SessionState
+ComputeServiceSession::GetSessionState() const
+{
+ return m_Impl->m_SessionState.load(std::memory_order_relaxed);
+}
+
+bool
+ComputeServiceSession::RequestStateTransition(SessionState NewState)
+{
+ return m_Impl->RequestStateTransition(NewState);
+}
+
+void
+ComputeServiceSession::SetOrchestratorEndpoint(std::string_view Endpoint)
+{
+ m_Impl->SetOrchestratorEndpoint(Endpoint);
+}
+
+void
+ComputeServiceSession::SetOrchestratorBasePath(std::filesystem::path BasePath)
+{
+ m_Impl->SetOrchestratorBasePath(std::move(BasePath));
+}
+
+void
+ComputeServiceSession::StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath)
+{
+ m_Impl->StartRecording(InResolver, RecordingPath);
+}
+
+void
+ComputeServiceSession::StopRecording()
+{
+ m_Impl->StopRecording();
+}
+
+ComputeServiceSession::ActionCounts
+ComputeServiceSession::GetActionCounts()
+{
+ return m_Impl->GetActionCounts();
+}
+
+void
+ComputeServiceSession::EmitStats(CbObjectWriter& Cbo)
+{
+ m_Impl->EmitStats(Cbo);
+}
+
+std::vector<IoHash>
+ComputeServiceSession::GetKnownWorkerIds()
+{
+ return m_Impl->GetKnownWorkerIds();
+}
+
+WorkerDesc
+ComputeServiceSession::GetWorkerDescriptor(const IoHash& WorkerId)
+{
+ return m_Impl->GetWorkerDescriptor(WorkerId);
+}
+
+void
+ComputeServiceSession::AddLocalRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, int32_t MaxConcurrentActions)
+{
+ ZEN_TRACE_CPU("ComputeServiceSession::AddLocalRunner");
+
+# if ZEN_PLATFORM_LINUX
+ auto* NewRunner = new LinuxProcessRunner(InChunkResolver,
+ BasePath,
+ m_Impl->m_DeferredDeleter,
+ m_Impl->m_LocalSubmitPool,
+ false,
+ MaxConcurrentActions);
+# elif ZEN_PLATFORM_WINDOWS
+ auto* NewRunner = new WindowsProcessRunner(InChunkResolver,
+ BasePath,
+ m_Impl->m_DeferredDeleter,
+ m_Impl->m_LocalSubmitPool,
+ false,
+ MaxConcurrentActions);
+# elif ZEN_PLATFORM_MAC
+ auto* NewRunner =
+ new MacProcessRunner(InChunkResolver, BasePath, m_Impl->m_DeferredDeleter, m_Impl->m_LocalSubmitPool, false, MaxConcurrentActions);
+# endif
+
+ m_Impl->SyncWorkersToRunner(*NewRunner);
+ m_Impl->m_LocalRunnerGroup.AddRunner(NewRunner);
+}
+
+void
+ComputeServiceSession::AddRemoteRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, std::string_view HostName)
+{
+ ZEN_TRACE_CPU("ComputeServiceSession::AddRemoteRunner");
+
+ auto* NewRunner = new RemoteHttpRunner(InChunkResolver, BasePath, HostName, m_Impl->m_RemoteSubmitPool);
+ m_Impl->SyncWorkersToRunner(*NewRunner);
+ m_Impl->m_RemoteRunnerGroup.AddRunner(NewRunner);
+}
+
+ComputeServiceSession::EnqueueResult
+ComputeServiceSession::EnqueueAction(CbObject ActionObject, int Priority)
+{
+ return m_Impl->EnqueueActionToQueue(m_Impl->m_ImplicitQueueId, ActionObject, Priority);
+}
+
+ComputeServiceSession::EnqueueResult
+ComputeServiceSession::EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority)
+{
+ return m_Impl->EnqueueResolvedActionToQueue(m_Impl->m_ImplicitQueueId, Worker, ActionObj, RequestPriority);
+}
+ComputeServiceSession::CreateQueueResult
+ComputeServiceSession::CreateQueue(std::string_view Tag, CbObject Metadata, CbObject Config)
+{
+ return m_Impl->CreateQueue(Tag, std::move(Metadata), std::move(Config));
+}
+
+CbObject
+ComputeServiceSession::GetQueueMetadata(int QueueId)
+{
+ return m_Impl->GetQueueMetadata(QueueId);
+}
+
+CbObject
+ComputeServiceSession::GetQueueConfig(int QueueId)
+{
+ return m_Impl->GetQueueConfig(QueueId);
+}
+
+std::vector<int>
+ComputeServiceSession::GetQueueIds()
+{
+ return m_Impl->GetQueueIds();
+}
+
+ComputeServiceSession::QueueStatus
+ComputeServiceSession::GetQueueStatus(int QueueId)
+{
+ return m_Impl->GetQueueStatus(QueueId);
+}
+
+void
+ComputeServiceSession::CancelQueue(int QueueId)
+{
+ m_Impl->CancelQueue(QueueId);
+}
+
+void
+ComputeServiceSession::DrainQueue(int QueueId)
+{
+ m_Impl->DrainQueue(QueueId);
+}
+
+void
+ComputeServiceSession::DeleteQueue(int QueueId)
+{
+ m_Impl->DeleteQueue(QueueId);
+}
+
+void
+ComputeServiceSession::GetQueueCompleted(int QueueId, CbWriter& Cbo)
+{
+ m_Impl->GetQueueCompleted(QueueId, Cbo);
+}
+
+ComputeServiceSession::EnqueueResult
+ComputeServiceSession::EnqueueActionToQueue(int QueueId, CbObject ActionObject, int Priority)
+{
+ return m_Impl->EnqueueActionToQueue(QueueId, ActionObject, Priority);
+}
+
+ComputeServiceSession::EnqueueResult
+ComputeServiceSession::EnqueueResolvedActionToQueue(int QueueId, WorkerDesc Worker, CbObject ActionObj, int RequestPriority)
+{
+ return m_Impl->EnqueueResolvedActionToQueue(QueueId, Worker, ActionObj, RequestPriority);
+}
+
+void
+ComputeServiceSession::RegisterWorker(CbPackage Worker)
+{
+ m_Impl->RegisterWorker(Worker);
+}
+
+HttpResponseCode
+ComputeServiceSession::GetActionResult(int ActionLsn, CbPackage& OutResultPackage)
+{
+ return m_Impl->GetActionResult(ActionLsn, OutResultPackage);
+}
+
+HttpResponseCode
+ComputeServiceSession::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage)
+{
+ return m_Impl->FindActionResult(ActionId, OutResultPackage);
+}
+
+void
+ComputeServiceSession::RetireActionResult(int ActionLsn)
+{
+ m_Impl->RetireActionResult(ActionLsn);
+}
+
+ComputeServiceSession::RescheduleResult
+ComputeServiceSession::RescheduleAction(int ActionLsn)
+{
+ return m_Impl->RescheduleAction(ActionLsn);
+}
+
+std::vector<ComputeServiceSession::RunningActionInfo>
+ComputeServiceSession::GetRunningActions()
+{
+ return m_Impl->GetRunningActions();
+}
+
+std::vector<ComputeServiceSession::ActionHistoryEntry>
+ComputeServiceSession::GetActionHistory(int Limit)
+{
+ return m_Impl->GetActionHistory(Limit);
+}
+
+std::vector<ComputeServiceSession::ActionHistoryEntry>
+ComputeServiceSession::GetQueueHistory(int QueueId, int Limit)
+{
+ return m_Impl->GetQueueHistory(QueueId, Limit);
+}
+
+void
+ComputeServiceSession::GetCompleted(CbWriter& Cbo)
+{
+ m_Impl->GetCompleted(Cbo);
+}
+
+void
+ComputeServiceSession::PostUpdate(RunnerAction* Action)
+{
+ m_Impl->PostUpdate(Action);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+computeservice_forcelink()
+{
+}
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES