// Copyright Epic Games, Inc. All Rights Reserved. #include "zencompute/computeservice.h" #if ZEN_WITH_COMPUTE_SERVICES # include "runners/functionrunner.h" # include "recording/actionrecorder.h" # include "runners/localrunner.h" # include "runners/remotehttprunner.h" # if ZEN_PLATFORM_LINUX # include "runners/linuxrunner.h" # elif ZEN_PLATFORM_WINDOWS # include "runners/windowsrunner.h" # elif ZEN_PLATFORM_MAC # include "runners/macrunner.h" # endif # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include ZEN_THIRD_PARTY_INCLUDES_START # include ZEN_THIRD_PARTY_INCLUDES_END using namespace std::literals; namespace zen { const char* ToString(compute::ComputeServiceSession::SessionState State) { using enum compute::ComputeServiceSession::SessionState; switch (State) { case Created: return "Created"; case Ready: return "Ready"; case Draining: return "Draining"; case Paused: return "Paused"; case Abandoned: return "Abandoned"; case Sunset: return "Sunset"; } return "Unknown"; } const char* ToString(compute::ComputeServiceSession::QueueState State) { using enum compute::ComputeServiceSession::QueueState; switch (State) { case Active: return "active"; case Draining: return "draining"; case Cancelled: return "cancelled"; } return "unknown"; } } // namespace zen namespace zen::compute { using SessionState = ComputeServiceSession::SessionState; static_assert(ZEN_ARRAY_COUNT(ComputeServiceSession::ActionHistoryEntry::Timestamps) == static_cast(RunnerAction::State::_Count)); ////////////////////////////////////////////////////////////////////////// struct ComputeServiceSession::Impl { ComputeServiceSession* m_ComputeServiceSession; ChunkResolver& m_ChunkResolver; LoggerRef m_Log{logging::Get("compute")}; Impl(ComputeServiceSession* InComputeServiceSession, ChunkResolver& InChunkResolver) : m_ComputeServiceSession(InComputeServiceSession) , m_ChunkResolver(InChunkResolver) , m_LocalSubmitPool(GetLargeWorkerPool(EWorkloadType::Burst)) , m_RemoteSubmitPool(GetLargeWorkerPool(EWorkloadType::Burst)) { // Create a non-expiring, non-deletable implicit queue for legacy endpoints auto Result = CreateQueue("implicit"sv, {}, {}); m_ImplicitQueueId = Result.QueueId; m_QueueLock.WithSharedLock([&] { m_Queues[m_ImplicitQueueId]->Implicit = true; }); m_SchedulingThread = std::thread{&Impl::SchedulerThreadFunction, this}; } void WaitUntilReady(); void Shutdown(); bool IsHealthy(); bool RequestStateTransition(SessionState NewState); void AbandonAllActions(); LoggerRef Log() { return m_Log; } // Orchestration void SetOrchestratorEndpoint(std::string_view Endpoint); void SetOrchestratorBasePath(std::filesystem::path BasePath); std::string m_OrchestratorEndpoint; std::filesystem::path m_OrchestratorBasePath; Stopwatch m_OrchestratorQueryTimer; std::unordered_set m_KnownWorkerUris; void UpdateCoordinatorState(); // Worker registration and discovery struct FunctionDefinition { std::string FunctionName; Guid FunctionVersion; Guid BuildSystemVersion; IoHash WorkerId; }; void RegisterWorker(CbPackage Worker); WorkerDesc GetWorkerDescriptor(const IoHash& WorkerId); // Action scheduling and tracking std::atomic m_SessionState{SessionState::Created}; std::atomic m_ActionsCounter = 0; // sequence number metrics::Meter m_ArrivalRate; RwLock m_PendingLock; std::map> m_PendingActions; RwLock m_RunningLock; std::unordered_map> m_RunningMap; RwLock m_ResultsLock; std::unordered_map> m_ResultsMap; metrics::Meter m_ResultRate; std::atomic m_RetiredCount{0}; EnqueueResult EnqueueAction(int QueueId, CbObject ActionObject, int Priority); EnqueueResult EnqueueResolvedAction(int QueueId, WorkerDesc Worker, CbObject ActionObj, int RequestPriority); void GetCompleted(CbWriter& Cbo); HttpResponseCode GetActionResult(int ActionLsn, CbPackage& OutResultPackage); HttpResponseCode FindActionResult(const IoHash& ActionId, CbPackage& ResultPackage); void RetireActionResult(int ActionLsn); std::thread m_SchedulingThread; std::atomic m_SchedulingThreadEnabled{true}; Event m_SchedulingThreadEvent; void SchedulerThreadFunction(); void SchedulePendingActions(); // Workers RwLock m_WorkerLock; std::unordered_map m_WorkerMap; std::vector m_FunctionList; std::vector GetKnownWorkerIds(); void SyncWorkersToRunner(FunctionRunner& Runner); // Runners DeferredDirectoryDeleter m_DeferredDeleter; WorkerThreadPool& m_LocalSubmitPool; WorkerThreadPool& m_RemoteSubmitPool; RunnerGroup m_LocalRunnerGroup; RunnerGroup m_RemoteRunnerGroup; void ShutdownRunners(); // Recording void StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath); void StopRecording(); std::unique_ptr m_Recorder; // History tracking RwLock m_ActionHistoryLock; std::deque m_ActionHistory; size_t m_HistoryLimit = 1000; // Queue tracking using QueueState = ComputeServiceSession::QueueState; struct QueueEntry : RefCounted { int QueueId; bool Implicit{false}; std::atomic State{QueueState::Active}; std::atomic ActiveCount{0}; // pending + running std::atomic CompletedCount{0}; // successfully completed std::atomic FailedCount{0}; // failed std::atomic AbandonedCount{0}; // abandoned std::atomic CancelledCount{0}; // cancelled std::atomic IdleSince{0}; // hifreq tick when queue became idle; 0 = has active work RwLock m_Lock; std::unordered_set ActiveLsns; // for cancellation lookup std::unordered_set FinishedLsns; // completed/failed/cancelled LSNs std::string Tag; CbObject Metadata; CbObject Config; }; int m_ImplicitQueueId{0}; std::atomic m_QueueCounter{0}; RwLock m_QueueLock; std::unordered_map> m_Queues; Ref FindQueue(int QueueId) { Ref Queue; m_QueueLock.WithSharedLock([&] { if (auto It = m_Queues.find(QueueId); It != m_Queues.end()) { Queue = It->second; } }); return Queue; } ComputeServiceSession::CreateQueueResult CreateQueue(std::string_view Tag, CbObject Metadata, CbObject Config); std::vector GetQueueIds(); ComputeServiceSession::QueueStatus GetQueueStatus(int QueueId); CbObject GetQueueMetadata(int QueueId); CbObject GetQueueConfig(int QueueId); void CancelQueue(int QueueId); void DeleteQueue(int QueueId); void DrainQueue(int QueueId); ComputeServiceSession::EnqueueResult EnqueueActionToQueue(int QueueId, CbObject ActionObject, int Priority); ComputeServiceSession::EnqueueResult EnqueueResolvedActionToQueue(int QueueId, WorkerDesc Worker, CbObject ActionObj, int Priority); void GetQueueCompleted(int QueueId, CbWriter& Cbo); void NotifyQueueActionComplete(int QueueId, int Lsn, RunnerAction::State ActionState); void ExpireCompletedQueues(); Stopwatch m_QueueExpiryTimer; std::vector GetRunningActions(); std::vector GetActionHistory(int Limit); std::vector GetQueueHistory(int QueueId, int Limit); // Action submission [[nodiscard]] size_t QueryCapacity(); [[nodiscard]] SubmitResult SubmitAction(Ref Action); [[nodiscard]] std::vector SubmitActions(const std::vector>& Actions); [[nodiscard]] size_t GetSubmittedActionCount(); // Updates RwLock m_UpdatedActionsLock; std::vector> m_UpdatedActions; void HandleActionUpdates(); void PostUpdate(RunnerAction* Action); static constexpr int kDefaultMaxRetries = 3; int GetMaxRetriesForQueue(int QueueId); ComputeServiceSession::RescheduleResult RescheduleAction(int ActionLsn); ActionCounts GetActionCounts() { ActionCounts Counts; Counts.Pending = (int)m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); }); Counts.Running = (int)m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); }); Counts.Completed = (int)m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); }) + (int)m_RetiredCount.load(); Counts.ActiveQueues = (int)m_QueueLock.WithSharedLock([&] { size_t Count = 0; for (const auto& [Id, Queue] : m_Queues) { if (!Queue->Implicit) { ++Count; } } return Count; }); return Counts; } void EmitStats(CbObjectWriter& Cbo) { Cbo << "session_state"sv << ToString(m_SessionState.load(std::memory_order_relaxed)); m_WorkerLock.WithSharedLock([&] { Cbo << "worker_count"sv << m_WorkerMap.size(); }); m_ResultsLock.WithSharedLock([&] { Cbo << "actions_complete"sv << m_ResultsMap.size(); }); m_PendingLock.WithSharedLock([&] { Cbo << "actions_pending"sv << m_PendingActions.size(); }); Cbo << "actions_submitted"sv << GetSubmittedActionCount(); EmitSnapshot("actions_arrival"sv, m_ArrivalRate, Cbo); EmitSnapshot("actions_retired"sv, m_ResultRate, Cbo); } }; bool ComputeServiceSession::Impl::IsHealthy() { return m_SessionState.load(std::memory_order_relaxed) < SessionState::Abandoned; } bool ComputeServiceSession::Impl::RequestStateTransition(SessionState NewState) { SessionState Current = m_SessionState.load(std::memory_order_relaxed); for (;;) { if (Current == NewState) { return true; } // Validate the transition bool Valid = false; switch (Current) { case SessionState::Created: Valid = (NewState == SessionState::Ready); break; case SessionState::Ready: Valid = (NewState == SessionState::Draining); break; case SessionState::Draining: Valid = (NewState == SessionState::Ready || NewState == SessionState::Paused); break; case SessionState::Paused: Valid = (NewState == SessionState::Ready || NewState == SessionState::Sunset); break; case SessionState::Abandoned: Valid = (NewState == SessionState::Sunset); break; case SessionState::Sunset: Valid = false; break; } // Allow jumping directly to Abandoned or Sunset from any non-terminal state if (NewState == SessionState::Abandoned && Current < SessionState::Abandoned) { Valid = true; } if (NewState == SessionState::Sunset && Current != SessionState::Sunset) { Valid = true; } if (!Valid) { ZEN_WARN("invalid session state transition {} -> {}", ToString(Current), ToString(NewState)); return false; } if (m_SessionState.compare_exchange_strong(Current, NewState, std::memory_order_acq_rel)) { ZEN_INFO("session state: {} -> {}", ToString(Current), ToString(NewState)); if (NewState == SessionState::Abandoned) { AbandonAllActions(); } return true; } // CAS failed, Current was updated — retry with the new value } } void ComputeServiceSession::Impl::AbandonAllActions() { // Collect all pending actions and mark them as Abandoned std::vector> PendingToAbandon; m_PendingLock.WithSharedLock([&] { PendingToAbandon.reserve(m_PendingActions.size()); for (auto& [Lsn, Action] : m_PendingActions) { PendingToAbandon.push_back(Action); } }); for (auto& Action : PendingToAbandon) { Action->SetActionState(RunnerAction::State::Abandoned); } // Collect all running actions and mark them as Abandoned, then // best-effort cancel via the local runner group std::vector> RunningToAbandon; m_RunningLock.WithSharedLock([&] { RunningToAbandon.reserve(m_RunningMap.size()); for (auto& [Lsn, Action] : m_RunningMap) { RunningToAbandon.push_back(Action); } }); for (auto& Action : RunningToAbandon) { Action->SetActionState(RunnerAction::State::Abandoned); m_LocalRunnerGroup.CancelAction(Action->ActionLsn); } ZEN_INFO("abandoned all actions: {} pending, {} running", PendingToAbandon.size(), RunningToAbandon.size()); } void ComputeServiceSession::Impl::SetOrchestratorEndpoint(std::string_view Endpoint) { m_OrchestratorEndpoint = Endpoint; } void ComputeServiceSession::Impl::SetOrchestratorBasePath(std::filesystem::path BasePath) { m_OrchestratorBasePath = std::move(BasePath); } void ComputeServiceSession::Impl::UpdateCoordinatorState() { ZEN_TRACE_CPU("ComputeServiceSession::UpdateCoordinatorState"); if (m_OrchestratorEndpoint.empty()) { return; } // Poll faster when we have no discovered workers yet so remote runners come online quickly const uint64_t PollIntervalMs = m_KnownWorkerUris.empty() ? 500 : 5000; if (m_OrchestratorQueryTimer.GetElapsedTimeMs() < PollIntervalMs) { return; } m_OrchestratorQueryTimer.Reset(); try { HttpClient Client(m_OrchestratorEndpoint); HttpClient::Response Response = Client.Get("/orch/agents"); if (!Response.IsSuccess()) { ZEN_WARN("orchestrator query failed with status {}", static_cast(Response.StatusCode)); return; } CbObject WorkerList = Response.AsObject(); std::unordered_set ValidWorkerUris; for (auto& Item : WorkerList["workers"sv]) { CbObjectView Worker = Item.AsObjectView(); uint64_t Dt = Worker["dt"sv].AsUInt64(); bool Reachable = Worker["reachable"sv].AsBool(); std::string_view Uri = Worker["uri"sv].AsString(); // Skip stale workers (not seen in over 30 seconds) if (Dt > 30000) { continue; } // Skip workers that are not confirmed reachable if (!Reachable) { continue; } std::string UriStr{Uri}; ValidWorkerUris.insert(UriStr); // Skip workers we already know about if (m_KnownWorkerUris.contains(UriStr)) { continue; } ZEN_INFO("discovered new worker at {}", UriStr); m_KnownWorkerUris.insert(UriStr); auto* NewRunner = new RemoteHttpRunner(m_ChunkResolver, m_OrchestratorBasePath, UriStr, m_RemoteSubmitPool); SyncWorkersToRunner(*NewRunner); m_RemoteRunnerGroup.AddRunner(NewRunner); } // Remove workers that are no longer valid (stale or unreachable) for (auto It = m_KnownWorkerUris.begin(); It != m_KnownWorkerUris.end();) { if (!ValidWorkerUris.contains(*It)) { const std::string& ExpiredUri = *It; ZEN_INFO("removing expired worker at {}", ExpiredUri); m_RemoteRunnerGroup.RemoveRunnerIf([&](const RemoteHttpRunner& Runner) { return Runner.GetHostName() == ExpiredUri; }); It = m_KnownWorkerUris.erase(It); } else { ++It; } } } catch (const HttpClientError& Ex) { ZEN_WARN("orchestrator query error: {}", Ex.what()); } catch (const std::exception& Ex) { ZEN_WARN("orchestrator query unexpected error: {}", Ex.what()); } } void ComputeServiceSession::Impl::WaitUntilReady() { if (m_RemoteRunnerGroup.GetRunnerCount() || !m_OrchestratorEndpoint.empty()) { ZEN_INFO("waiting for remote runners..."); constexpr int MaxWaitSeconds = 120; for (int Elapsed = 0; Elapsed < MaxWaitSeconds; Elapsed++) { if (!m_SchedulingThreadEnabled.load(std::memory_order_relaxed)) { ZEN_WARN("shutdown requested while waiting for remote runners"); return; } const size_t Capacity = m_RemoteRunnerGroup.QueryCapacity(); if (Capacity > 0) { ZEN_INFO("found {} remote runners (capacity: {})", m_RemoteRunnerGroup.GetRunnerCount(), Capacity); break; } zen::Sleep(1000); } } else { ZEN_ASSERT(m_LocalRunnerGroup.GetRunnerCount(), "no runners available"); } RequestStateTransition(SessionState::Ready); } void ComputeServiceSession::Impl::Shutdown() { RequestStateTransition(SessionState::Sunset); m_SchedulingThreadEnabled = false; m_SchedulingThreadEvent.Set(); if (m_SchedulingThread.joinable()) { m_SchedulingThread.join(); } ShutdownRunners(); m_DeferredDeleter.Shutdown(); } void ComputeServiceSession::Impl::ShutdownRunners() { m_LocalRunnerGroup.Shutdown(); m_RemoteRunnerGroup.Shutdown(); } void ComputeServiceSession::Impl::StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath) { ZEN_INFO("starting recording to '{}'", RecordingPath); m_Recorder = std::make_unique(InCidStore, RecordingPath); ZEN_INFO("started recording to '{}'", RecordingPath); } void ComputeServiceSession::Impl::StopRecording() { ZEN_INFO("stopping recording"); m_Recorder = nullptr; ZEN_INFO("stopped recording"); } std::vector ComputeServiceSession::Impl::GetRunningActions() { std::vector Result; m_RunningLock.WithSharedLock([&] { Result.reserve(m_RunningMap.size()); for (const auto& [Lsn, Action] : m_RunningMap) { Result.push_back({.Lsn = Lsn, .QueueId = Action->QueueId, .ActionId = Action->ActionId, .CpuUsagePercent = Action->CpuUsagePercent.load(std::memory_order_relaxed), .CpuSeconds = Action->CpuSeconds.load(std::memory_order_relaxed)}); } }); return Result; } std::vector ComputeServiceSession::Impl::GetActionHistory(int Limit) { RwLock::SharedLockScope _(m_ActionHistoryLock); if (Limit > 0 && static_cast(Limit) < m_ActionHistory.size()) { return std::vector(m_ActionHistory.end() - Limit, m_ActionHistory.end()); } return std::vector(m_ActionHistory.begin(), m_ActionHistory.end()); } std::vector ComputeServiceSession::Impl::GetQueueHistory(int QueueId, int Limit) { // Resolve the queue and snapshot its finished LSN set Ref Queue = FindQueue(QueueId); if (!Queue) { return {}; } std::unordered_set FinishedLsns; Queue->m_Lock.WithSharedLock([&] { FinishedLsns = Queue->FinishedLsns; }); // Filter the global history to entries belonging to this queue. // m_ActionHistory is ordered oldest-first, so the filtered result keeps the same ordering. std::vector Result; m_ActionHistoryLock.WithSharedLock([&] { for (const auto& Entry : m_ActionHistory) { if (FinishedLsns.contains(Entry.Lsn)) { Result.push_back(Entry); } } }); if (Limit > 0 && static_cast(Limit) < Result.size()) { Result.erase(Result.begin(), Result.end() - Limit); } return Result; } void ComputeServiceSession::Impl::RegisterWorker(CbPackage Worker) { ZEN_TRACE_CPU("ComputeServiceSession::RegisterWorker"); RwLock::ExclusiveLockScope _(m_WorkerLock); const IoHash& WorkerId = Worker.GetObject().GetHash(); if (m_WorkerMap.insert_or_assign(WorkerId, Worker).second) { // Note that since the convention currently is that WorkerId is equal to the hash // of the worker descriptor there is no chance that we get a second write with a // different descriptor. Thus we only need to call this the first time, when the // worker is added m_LocalRunnerGroup.RegisterWorker(Worker); m_RemoteRunnerGroup.RegisterWorker(Worker); if (m_Recorder) { m_Recorder->RegisterWorker(Worker); } CbObject WorkerObj = Worker.GetObject(); // Populate worker database const Guid WorkerBuildSystemVersion = WorkerObj["buildsystem_version"sv].AsUuid(); for (auto& Item : WorkerObj["functions"sv]) { CbObjectView Function = Item.AsObjectView(); std::string_view FunctionName = Function["name"sv].AsString(); const Guid FunctionVersion = Function["version"sv].AsUuid(); m_FunctionList.emplace_back(FunctionDefinition{.FunctionName = std::string{FunctionName}, .FunctionVersion = FunctionVersion, .BuildSystemVersion = WorkerBuildSystemVersion, .WorkerId = WorkerId}); } } } void ComputeServiceSession::Impl::SyncWorkersToRunner(FunctionRunner& Runner) { ZEN_TRACE_CPU("SyncWorkersToRunner"); std::vector Workers; { RwLock::SharedLockScope _(m_WorkerLock); Workers.reserve(m_WorkerMap.size()); for (const auto& [Id, Pkg] : m_WorkerMap) { Workers.push_back(Pkg); } } for (const CbPackage& Worker : Workers) { Runner.RegisterWorker(Worker); } } WorkerDesc ComputeServiceSession::Impl::GetWorkerDescriptor(const IoHash& WorkerId) { RwLock::SharedLockScope _(m_WorkerLock); if (auto It = m_WorkerMap.find(WorkerId); It != m_WorkerMap.end()) { const CbPackage& Desc = It->second; return {Desc, WorkerId}; } return {}; } std::vector ComputeServiceSession::Impl::GetKnownWorkerIds() { std::vector WorkerIds; m_WorkerLock.WithSharedLock([&] { WorkerIds.reserve(m_WorkerMap.size()); for (const auto& [WorkerId, _] : m_WorkerMap) { WorkerIds.push_back(WorkerId); } }); return WorkerIds; } ComputeServiceSession::EnqueueResult ComputeServiceSession::Impl::EnqueueAction(int QueueId, CbObject ActionObject, int Priority) { ZEN_TRACE_CPU("ComputeServiceSession::EnqueueAction"); // Resolve function to worker IoHash WorkerId{IoHash::Zero}; CbPackage WorkerPackage; std::string_view FunctionName = ActionObject["Function"sv].AsString(); const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid(); const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid(); m_WorkerLock.WithSharedLock([&] { for (const FunctionDefinition& FuncDef : m_FunctionList) { if (FuncDef.FunctionName == FunctionName && FuncDef.FunctionVersion == FunctionVersion && FuncDef.BuildSystemVersion == BuildSystemVersion) { WorkerId = FuncDef.WorkerId; break; } } if (WorkerId != IoHash::Zero) { if (auto It = m_WorkerMap.find(WorkerId); It != m_WorkerMap.end()) { WorkerPackage = It->second; } } }); if (WorkerId == IoHash::Zero) { CbObjectWriter Writer; Writer << "Function"sv << FunctionName << "FunctionVersion"sv << FunctionVersion << "BuildSystemVersion" << BuildSystemVersion; Writer << "error" << "no worker matches the action specification"; return {0, Writer.Save()}; } if (WorkerPackage) { return EnqueueResolvedAction(QueueId, WorkerDesc{WorkerPackage, WorkerId}, ActionObject, Priority); } CbObjectWriter Writer; Writer << "Function"sv << FunctionName << "FunctionVersion"sv << FunctionVersion << "BuildSystemVersion" << BuildSystemVersion; Writer << "error" << "no worker found despite match"; return {0, Writer.Save()}; } ComputeServiceSession::EnqueueResult ComputeServiceSession::Impl::EnqueueResolvedAction(int QueueId, WorkerDesc Worker, CbObject ActionObj, int RequestPriority) { ZEN_TRACE_CPU("ComputeServiceSession::EnqueueResolvedAction"); if (m_SessionState.load(std::memory_order_relaxed) != SessionState::Ready) { CbObjectWriter Writer; Writer << "error"sv << fmt::format("session is not accepting actions (state: {})", ToString(m_SessionState.load())); return {0, Writer.Save()}; } const int ActionLsn = ++m_ActionsCounter; m_ArrivalRate.Mark(); Ref Pending{new RunnerAction(m_ComputeServiceSession)}; Pending->ActionLsn = ActionLsn; Pending->QueueId = QueueId; Pending->Worker = Worker; Pending->ActionId = ActionObj.GetHash(); Pending->ActionObj = ActionObj; Pending->Priority = RequestPriority; // For now simply put action into pending state, so we can do batch scheduling ZEN_DEBUG("action {} ({}) PENDING", Pending->ActionId, Pending->ActionLsn); Pending->SetActionState(RunnerAction::State::Pending); if (m_Recorder) { m_Recorder->RecordAction(Pending); } CbObjectWriter Writer; Writer << "lsn" << Pending->ActionLsn; Writer << "worker" << Pending->Worker.WorkerId; Writer << "action" << Pending->ActionId; return {Pending->ActionLsn, Writer.Save()}; } SubmitResult ComputeServiceSession::Impl::SubmitAction(Ref Action) { // Loosely round-robin scheduling of actions across runners. // // It's not entirely clear what this means given that submits // can come in across multiple threads, but it's probably better // than always starting with the first runner. // // Longer term we should track the state of the individual // runners and make decisions accordingly. SubmitResult Result = m_LocalRunnerGroup.SubmitAction(Action); if (Result.IsAccepted) { return Result; } return m_RemoteRunnerGroup.SubmitAction(Action); } size_t ComputeServiceSession::Impl::GetSubmittedActionCount() { return m_LocalRunnerGroup.GetSubmittedActionCount() + m_RemoteRunnerGroup.GetSubmittedActionCount(); } HttpResponseCode ComputeServiceSession::Impl::GetActionResult(int ActionLsn, CbPackage& OutResultPackage) { // This lock is held for the duration of the function since we need to // be sure that the action doesn't change state while we are checking the // different data structures RwLock::ExclusiveLockScope _(m_ResultsLock); if (auto It = m_ResultsMap.find(ActionLsn); It != m_ResultsMap.end()) { OutResultPackage = std::move(It->second->GetResult()); m_ResultsMap.erase(It); return HttpResponseCode::OK; } { RwLock::SharedLockScope __(m_PendingLock); if (auto FindIt = m_PendingActions.find(ActionLsn); FindIt != m_PendingActions.end()) { return HttpResponseCode::Accepted; } } // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must // always be taken after m_ResultsLock if both are needed { RwLock::SharedLockScope __(m_RunningLock); if (m_RunningMap.find(ActionLsn) != m_RunningMap.end()) { return HttpResponseCode::Accepted; } } return HttpResponseCode::NotFound; } HttpResponseCode ComputeServiceSession::Impl::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage) { // This lock is held for the duration of the function since we need to // be sure that the action doesn't change state while we are checking the // different data structures RwLock::ExclusiveLockScope _(m_ResultsLock); for (auto It = begin(m_ResultsMap), End = end(m_ResultsMap); It != End; ++It) { if (It->second->ActionId == ActionId) { OutResultPackage = std::move(It->second->GetResult()); m_ResultsMap.erase(It); return HttpResponseCode::OK; } } { RwLock::SharedLockScope __(m_PendingLock); for (const auto& [K, Pending] : m_PendingActions) { if (Pending->ActionId == ActionId) { return HttpResponseCode::Accepted; } } } // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must // always be taken after m_ResultsLock if both are needed { RwLock::SharedLockScope __(m_RunningLock); for (const auto& [K, v] : m_RunningMap) { if (v->ActionId == ActionId) { return HttpResponseCode::Accepted; } } } return HttpResponseCode::NotFound; } void ComputeServiceSession::Impl::RetireActionResult(int ActionLsn) { m_DeferredDeleter.MarkReady(ActionLsn); } void ComputeServiceSession::Impl::GetCompleted(CbWriter& Cbo) { Cbo.BeginArray("completed"); m_ResultsLock.WithSharedLock([&] { for (auto& [Lsn, Action] : m_ResultsMap) { Cbo.BeginObject(); Cbo << "lsn"sv << Lsn; Cbo << "state"sv << RunnerAction::ToString(Action->ActionState()); Cbo.EndObject(); } }); Cbo.EndArray(); } ////////////////////////////////////////////////////////////////////////// // Queue management ComputeServiceSession::CreateQueueResult ComputeServiceSession::Impl::CreateQueue(std::string_view Tag, CbObject Metadata, CbObject Config) { const int QueueId = ++m_QueueCounter; Ref Queue{new QueueEntry()}; Queue->QueueId = QueueId; Queue->Tag = Tag; Queue->Metadata = std::move(Metadata); Queue->Config = std::move(Config); Queue->IdleSince = GetHifreqTimerValue(); m_QueueLock.WithExclusiveLock([&] { m_Queues[QueueId] = Queue; }); ZEN_DEBUG("created queue {}", QueueId); return {.QueueId = QueueId}; } std::vector ComputeServiceSession::Impl::GetQueueIds() { std::vector Ids; m_QueueLock.WithSharedLock([&] { Ids.reserve(m_Queues.size()); for (const auto& [Id, Queue] : m_Queues) { if (!Queue->Implicit) { Ids.push_back(Id); } } }); return Ids; } ComputeServiceSession::QueueStatus ComputeServiceSession::Impl::GetQueueStatus(int QueueId) { Ref Queue = FindQueue(QueueId); if (!Queue) { return {}; } const int Active = Queue->ActiveCount.load(std::memory_order_relaxed); const int Completed = Queue->CompletedCount.load(std::memory_order_relaxed); const int Failed = Queue->FailedCount.load(std::memory_order_relaxed); const int AbandonedN = Queue->AbandonedCount.load(std::memory_order_relaxed); const int CancelledN = Queue->CancelledCount.load(std::memory_order_relaxed); const QueueState QState = Queue->State.load(); return { .IsValid = true, .QueueId = QueueId, .ActiveCount = Active, .CompletedCount = Completed, .FailedCount = Failed, .AbandonedCount = AbandonedN, .CancelledCount = CancelledN, .State = QState, .IsComplete = (Active == 0), }; } CbObject ComputeServiceSession::Impl::GetQueueMetadata(int QueueId) { Ref Queue = FindQueue(QueueId); if (!Queue) { return {}; } return Queue->Metadata; } CbObject ComputeServiceSession::Impl::GetQueueConfig(int QueueId) { Ref Queue = FindQueue(QueueId); if (!Queue) { return {}; } return Queue->Config; } void ComputeServiceSession::Impl::CancelQueue(int QueueId) { Ref Queue = FindQueue(QueueId); if (!Queue || Queue->Implicit) { return; } Queue->State.store(QueueState::Cancelled); // Collect active LSNs snapshot for cancellation std::vector LsnsToCancel; Queue->m_Lock.WithSharedLock([&] { LsnsToCancel.assign(Queue->ActiveLsns.begin(), Queue->ActiveLsns.end()); }); // Identify which LSNs are still pending (not yet dispatched to a runner) std::vector> PendingActionsToCancel; std::vector RunningLsnsToCancel; m_PendingLock.WithSharedLock([&] { for (int Lsn : LsnsToCancel) { if (auto It = m_PendingActions.find(Lsn); It != m_PendingActions.end()) { PendingActionsToCancel.push_back(It->second); } } }); m_RunningLock.WithSharedLock([&] { for (int Lsn : LsnsToCancel) { if (m_RunningMap.find(Lsn) != m_RunningMap.end()) { RunningLsnsToCancel.push_back(Lsn); } } }); // Cancel pending actions by marking them as Cancelled; they will flow through // HandleActionUpdates and eventually be removed from the pending map. for (auto& Action : PendingActionsToCancel) { Action->SetActionState(RunnerAction::State::Cancelled); } // Best-effort cancellation of running actions via the local runner group. // Also set the action state to Cancelled directly so a subsequent Failed // transition from the runner is blocked (Cancelled > Failed in the enum). for (int Lsn : RunningLsnsToCancel) { m_RunningLock.WithSharedLock([&] { if (auto It = m_RunningMap.find(Lsn); It != m_RunningMap.end()) { It->second->SetActionState(RunnerAction::State::Cancelled); } }); m_LocalRunnerGroup.CancelAction(Lsn); } m_RemoteRunnerGroup.CancelRemoteQueue(QueueId); ZEN_INFO("cancelled queue {}: {} pending, {} running actions cancelled", QueueId, PendingActionsToCancel.size(), RunningLsnsToCancel.size()); // Wake up the scheduler to process the cancelled actions m_SchedulingThreadEvent.Set(); } void ComputeServiceSession::Impl::DeleteQueue(int QueueId) { // Never delete the implicit queue { Ref Queue = FindQueue(QueueId); if (Queue && Queue->Implicit) { return; } } // Cancel any active work first CancelQueue(QueueId); m_QueueLock.WithExclusiveLock([&] { if (auto It = m_Queues.find(QueueId); It != m_Queues.end()) { m_Queues.erase(It); } }); } void ComputeServiceSession::Impl::DrainQueue(int QueueId) { Ref Queue = FindQueue(QueueId); if (!Queue || Queue->Implicit) { return; } QueueState Expected = QueueState::Active; Queue->State.compare_exchange_strong(Expected, QueueState::Draining); ZEN_INFO("draining queue {}", QueueId); } ComputeServiceSession::EnqueueResult ComputeServiceSession::Impl::EnqueueActionToQueue(int QueueId, CbObject ActionObject, int Priority) { Ref Queue = FindQueue(QueueId); if (!Queue) { CbObjectWriter Writer; Writer << "error"sv << "queue not found"sv; return {0, Writer.Save()}; } QueueState QState = Queue->State.load(); if (QState == QueueState::Cancelled) { CbObjectWriter Writer; Writer << "error"sv << "queue is cancelled"sv; return {0, Writer.Save()}; } if (QState == QueueState::Draining) { CbObjectWriter Writer; Writer << "error"sv << "queue is draining"sv; return {0, Writer.Save()}; } EnqueueResult Result = EnqueueAction(QueueId, ActionObject, Priority); if (Result.Lsn != 0) { Queue->m_Lock.WithExclusiveLock([&] { Queue->ActiveLsns.insert(Result.Lsn); }); Queue->ActiveCount.fetch_add(1, std::memory_order_relaxed); Queue->IdleSince.store(0, std::memory_order_relaxed); } return Result; } ComputeServiceSession::EnqueueResult ComputeServiceSession::Impl::EnqueueResolvedActionToQueue(int QueueId, WorkerDesc Worker, CbObject ActionObj, int Priority) { Ref Queue = FindQueue(QueueId); if (!Queue) { CbObjectWriter Writer; Writer << "error"sv << "queue not found"sv; return {0, Writer.Save()}; } QueueState QState = Queue->State.load(); if (QState == QueueState::Cancelled) { CbObjectWriter Writer; Writer << "error"sv << "queue is cancelled"sv; return {0, Writer.Save()}; } if (QState == QueueState::Draining) { CbObjectWriter Writer; Writer << "error"sv << "queue is draining"sv; return {0, Writer.Save()}; } EnqueueResult Result = EnqueueResolvedAction(QueueId, Worker, ActionObj, Priority); if (Result.Lsn != 0) { Queue->m_Lock.WithExclusiveLock([&] { Queue->ActiveLsns.insert(Result.Lsn); }); Queue->ActiveCount.fetch_add(1, std::memory_order_relaxed); Queue->IdleSince.store(0, std::memory_order_relaxed); } return Result; } void ComputeServiceSession::Impl::GetQueueCompleted(int QueueId, CbWriter& Cbo) { Ref Queue = FindQueue(QueueId); Cbo.BeginArray("completed"); if (Queue) { Queue->m_Lock.WithSharedLock([&] { m_ResultsLock.WithSharedLock([&] { for (int Lsn : Queue->FinishedLsns) { if (m_ResultsMap.contains(Lsn)) { Cbo << Lsn; } } }); }); } Cbo.EndArray(); } void ComputeServiceSession::Impl::NotifyQueueActionComplete(int QueueId, int Lsn, RunnerAction::State ActionState) { if (QueueId == 0) { return; } Ref Queue = FindQueue(QueueId); if (!Queue) { return; } Queue->m_Lock.WithExclusiveLock([&] { Queue->ActiveLsns.erase(Lsn); Queue->FinishedLsns.insert(Lsn); }); const int PreviousActive = Queue->ActiveCount.fetch_sub(1, std::memory_order_relaxed); if (PreviousActive == 1) { Queue->IdleSince.store(GetHifreqTimerValue(), std::memory_order_relaxed); } switch (ActionState) { case RunnerAction::State::Completed: Queue->CompletedCount.fetch_add(1, std::memory_order_relaxed); break; case RunnerAction::State::Abandoned: Queue->AbandonedCount.fetch_add(1, std::memory_order_relaxed); break; case RunnerAction::State::Cancelled: Queue->CancelledCount.fetch_add(1, std::memory_order_relaxed); break; default: Queue->FailedCount.fetch_add(1, std::memory_order_relaxed); break; } } void ComputeServiceSession::Impl::ExpireCompletedQueues() { static constexpr uint64_t ExpiryTimeMs = 15 * 60 * 1000; std::vector ExpiredQueueIds; m_QueueLock.WithSharedLock([&] { for (const auto& [Id, Queue] : m_Queues) { if (Queue->Implicit) { continue; } const uint64_t Idle = Queue->IdleSince.load(std::memory_order_relaxed); if (Idle != 0 && Queue->ActiveCount.load(std::memory_order_relaxed) == 0) { const uint64_t ElapsedMs = Stopwatch::GetElapsedTimeMs(GetHifreqTimerValue() - Idle); if (ElapsedMs >= ExpiryTimeMs) { ExpiredQueueIds.push_back(Id); } } } }); for (int QueueId : ExpiredQueueIds) { ZEN_INFO("expiring idle queue {}", QueueId); DeleteQueue(QueueId); } } void ComputeServiceSession::Impl::SchedulePendingActions() { ZEN_TRACE_CPU("ComputeServiceSession::SchedulePendingActions"); int ScheduledCount = 0; size_t RunningCount = m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); }); size_t PendingCount = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); }); size_t ResultCount = m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); }); static Stopwatch DumpRunningTimer; auto _ = MakeGuard([&] { ZEN_INFO("scheduled {} pending actions. {} running ({} retired), {} still pending, {} results", ScheduledCount, RunningCount, m_RetiredCount.load(), PendingCount, ResultCount); if (DumpRunningTimer.GetElapsedTimeMs() > 30000) { DumpRunningTimer.Reset(); std::set RunningList; m_RunningLock.WithSharedLock([&] { for (auto& [K, V] : m_RunningMap) { RunningList.insert(K); } }); ExtendableStringBuilder<1024> RunningString; for (int i : RunningList) { if (RunningString.Size()) { RunningString << ", "; } RunningString.Append(IntNum(i)); } ZEN_INFO("running: {}", RunningString); } }); size_t Capacity = QueryCapacity(); if (!Capacity) { _.Dismiss(); return; } std::vector> ActionsToSchedule; // Pull actions to schedule from the pending queue, we will // try to submit these to the runner outside of the lock. Note // that because of how the state transitions work it's not // actually the case that all of these actions will still be // pending by the time we try to submit them, but that's fine. // // Also note that the m_PendingActions list is not maintained // here, that's done periodically in SchedulePendingActions() m_PendingLock.WithExclusiveLock([&] { if (m_SessionState.load(std::memory_order_relaxed) >= SessionState::Paused) { return; } if (m_PendingActions.empty()) { return; } for (auto& [Lsn, Pending] : m_PendingActions) { switch (Pending->ActionState()) { case RunnerAction::State::Pending: ActionsToSchedule.push_back(Pending); break; case RunnerAction::State::Submitting: break; // already claimed by async submission case RunnerAction::State::Running: case RunnerAction::State::Completed: case RunnerAction::State::Failed: case RunnerAction::State::Abandoned: case RunnerAction::State::Cancelled: break; default: case RunnerAction::State::New: ZEN_WARN("unexpected state {} for pending action {}", static_cast(Pending->ActionState()), Pending->ActionLsn); break; } } // Sort by priority descending, then by LSN ascending (FIFO within same priority) std::sort(ActionsToSchedule.begin(), ActionsToSchedule.end(), [](const Ref& A, const Ref& B) { if (A->Priority != B->Priority) { return A->Priority > B->Priority; } return A->ActionLsn < B->ActionLsn; }); if (ActionsToSchedule.size() > Capacity) { ActionsToSchedule.resize(Capacity); } PendingCount = m_PendingActions.size(); }); if (ActionsToSchedule.empty()) { _.Dismiss(); return; } ZEN_INFO("attempting schedule of {} pending actions", ActionsToSchedule.size()); Stopwatch SubmitTimer; std::vector SubmitResults = SubmitActions(ActionsToSchedule); int NotAcceptedCount = 0; int ScheduledActionCount = 0; for (const SubmitResult& SubResult : SubmitResults) { if (SubResult.IsAccepted) { ++ScheduledActionCount; } else { ++NotAcceptedCount; } } ZEN_INFO("scheduled {} pending actions in {} ({} rejected)", ScheduledActionCount, NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()), NotAcceptedCount); ScheduledCount += ScheduledActionCount; PendingCount -= ScheduledActionCount; } void ComputeServiceSession::Impl::SchedulerThreadFunction() { SetCurrentThreadName("Function_Scheduler"); auto _ = MakeGuard([&] { ZEN_INFO("scheduler thread exiting"); }); do { int TimeoutMs = 500; auto PendingCount = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); }); if (PendingCount) { TimeoutMs = 100; } const bool WasSignaled = m_SchedulingThreadEvent.Wait(TimeoutMs); if (m_SchedulingThreadEnabled == false) { return; } if (WasSignaled) { m_SchedulingThreadEvent.Reset(); } ZEN_DEBUG("compute scheduler TICK (Pending: {} was {}, Running: {}, Results: {}) timeout: {}", m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); }), PendingCount, m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); }), m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); }), TimeoutMs); HandleActionUpdates(); // Auto-transition Draining → Paused when all work is done if (m_SessionState.load(std::memory_order_relaxed) == SessionState::Draining) { size_t Pending = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); }); size_t Running = m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); }); if (Pending == 0 && Running == 0) { SessionState Expected = SessionState::Draining; if (m_SessionState.compare_exchange_strong(Expected, SessionState::Paused, std::memory_order_acq_rel)) { ZEN_INFO("session state: Draining -> Paused (all work completed)"); } } } UpdateCoordinatorState(); SchedulePendingActions(); static constexpr uint64_t QueueExpirySweepIntervalMs = 30000; if (m_QueueExpiryTimer.GetElapsedTimeMs() >= QueueExpirySweepIntervalMs) { m_QueueExpiryTimer.Reset(); ExpireCompletedQueues(); } } while (m_SchedulingThreadEnabled); } void ComputeServiceSession::Impl::PostUpdate(RunnerAction* Action) { m_UpdatedActionsLock.WithExclusiveLock([&] { m_UpdatedActions.emplace_back(Action); }); m_SchedulingThreadEvent.Set(); } int ComputeServiceSession::Impl::GetMaxRetriesForQueue(int QueueId) { if (QueueId == 0) { return kDefaultMaxRetries; } CbObject Config = GetQueueConfig(QueueId); if (Config) { int Value = Config["max_retries"].AsInt32(0); if (Value > 0) { return Value; } } return kDefaultMaxRetries; } ComputeServiceSession::RescheduleResult ComputeServiceSession::Impl::RescheduleAction(int ActionLsn) { Ref Action; RunnerAction::State State; RescheduleResult ValidationError; bool Removed = false; // Find, validate, and remove atomically under a single lock scope to prevent // concurrent RescheduleAction calls from double-removing the same action. m_ResultsLock.WithExclusiveLock([&] { auto It = m_ResultsMap.find(ActionLsn); if (It == m_ResultsMap.end()) { ValidationError = {.Success = false, .Error = "Action not found in results"}; return; } Action = It->second; State = Action->ActionState(); if (State != RunnerAction::State::Failed && State != RunnerAction::State::Abandoned) { ValidationError = {.Success = false, .Error = "Action is not in a failed or abandoned state"}; return; } int MaxRetries = GetMaxRetriesForQueue(Action->QueueId); if (Action->RetryCount.load(std::memory_order_relaxed) >= MaxRetries) { ValidationError = {.Success = false, .Error = "Retry limit reached"}; return; } m_ResultsMap.erase(It); Removed = true; }); if (!Removed) { return ValidationError; } if (Action->QueueId != 0) { Ref Queue = FindQueue(Action->QueueId); if (Queue) { Queue->m_Lock.WithExclusiveLock([&] { Queue->FinishedLsns.erase(ActionLsn); Queue->ActiveLsns.insert(ActionLsn); }); Queue->ActiveCount.fetch_add(1, std::memory_order_relaxed); Queue->IdleSince.store(0, std::memory_order_relaxed); if (State == RunnerAction::State::Failed) { Queue->FailedCount.fetch_sub(1, std::memory_order_relaxed); } else { Queue->AbandonedCount.fetch_sub(1, std::memory_order_relaxed); } } } // Reset action state — this calls PostUpdate() internally Action->ResetActionStateToPending(); int NewRetryCount = Action->RetryCount.load(std::memory_order_relaxed); ZEN_INFO("action {} ({}) manually rescheduled (retry {})", Action->ActionId, ActionLsn, NewRetryCount); return {.Success = true, .RetryCount = NewRetryCount}; } void ComputeServiceSession::Impl::HandleActionUpdates() { ZEN_TRACE_CPU("ComputeServiceSession::HandleActionUpdates"); // Drain the update queue atomically std::vector> UpdatedActions; m_UpdatedActionsLock.WithExclusiveLock([&] { std::swap(UpdatedActions, m_UpdatedActions); }); std::unordered_set SeenLsn; // Process each action's latest state, deduplicating by LSN. // // This is safe because state transitions are monotonically increasing by enum // rank (Pending < Submitting < Running < Completed/Failed/Cancelled), so // SetActionState rejects any transition to a lower-ranked state. By the time // we read ActionState() here, it reflects the highest state reached — making // the first occurrence per LSN authoritative and duplicates redundant. for (Ref& Action : UpdatedActions) { const int ActionLsn = Action->ActionLsn; if (auto [It, Inserted] = SeenLsn.insert(ActionLsn); Inserted) { switch (Action->ActionState()) { // Newly enqueued — add to pending map for scheduling case RunnerAction::State::Pending: m_PendingLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Action}); }); break; // Async submission in progress — remains in pending map case RunnerAction::State::Submitting: break; // Dispatched to a runner — move from pending to running case RunnerAction::State::Running: m_RunningLock.WithExclusiveLock([&] { m_PendingLock.WithExclusiveLock([&] { m_RunningMap.insert({ActionLsn, Action}); m_PendingActions.erase(ActionLsn); }); }); ZEN_DEBUG("action {} ({}) RUNNING", Action->ActionId, ActionLsn); break; // Terminal states — move to results, record history, notify queue case RunnerAction::State::Completed: case RunnerAction::State::Failed: case RunnerAction::State::Abandoned: case RunnerAction::State::Cancelled: { auto TerminalState = Action->ActionState(); // Automatic retry for Failed/Abandoned actions with retries remaining. // Skip retries when the session itself is abandoned — those actions // were intentionally abandoned and should not be rescheduled. if ((TerminalState == RunnerAction::State::Failed || TerminalState == RunnerAction::State::Abandoned) && m_SessionState.load(std::memory_order_relaxed) < SessionState::Abandoned) { int MaxRetries = GetMaxRetriesForQueue(Action->QueueId); if (Action->RetryCount.load(std::memory_order_relaxed) < MaxRetries) { // Remove from whichever active map the action is in before resetting m_RunningLock.WithExclusiveLock([&] { m_PendingLock.WithExclusiveLock([&] { if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end()) { m_PendingActions.erase(ActionLsn); } else { m_RunningMap.erase(FindIt); } }); }); // Reset triggers PostUpdate() which re-enters the action as Pending Action->ResetActionStateToPending(); int NewRetryCount = Action->RetryCount.load(std::memory_order_relaxed); ZEN_INFO("action {} ({}) auto-rescheduled (retry {}/{})", Action->ActionId, ActionLsn, NewRetryCount, MaxRetries); break; } } // Remove from whichever active map the action is in m_RunningLock.WithExclusiveLock([&] { m_PendingLock.WithExclusiveLock([&] { if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end()) { m_PendingActions.erase(ActionLsn); } else { m_RunningMap.erase(FindIt); } }); }); m_ResultsLock.WithExclusiveLock([&] { m_ResultsMap[ActionLsn] = Action; // Append to bounded action history ring m_ActionHistoryLock.WithExclusiveLock([&] { ActionHistoryEntry Entry{.Lsn = ActionLsn, .QueueId = Action->QueueId, .ActionId = Action->ActionId, .WorkerId = Action->Worker.WorkerId, .ActionDescriptor = Action->ActionObj, .ExecutionLocation = std::move(Action->ExecutionLocation), .Succeeded = TerminalState == RunnerAction::State::Completed, .CpuSeconds = Action->CpuSeconds.load(std::memory_order_relaxed), .RetryCount = Action->RetryCount.load(std::memory_order_relaxed)}; std::copy(std::begin(Action->Timestamps), std::end(Action->Timestamps), std::begin(Entry.Timestamps)); m_ActionHistory.push_back(std::move(Entry)); if (m_ActionHistory.size() > m_HistoryLimit) { m_ActionHistory.pop_front(); } }); }); m_RetiredCount.fetch_add(1); m_ResultRate.Mark(1); ZEN_DEBUG("action {} ({}) RUNNING -> COMPLETED with {}", Action->ActionId, ActionLsn, TerminalState == RunnerAction::State::Completed ? "SUCCESS" : "FAILURE"); NotifyQueueActionComplete(Action->QueueId, ActionLsn, TerminalState); break; } } } } } size_t ComputeServiceSession::Impl::QueryCapacity() { return m_LocalRunnerGroup.QueryCapacity() + m_RemoteRunnerGroup.QueryCapacity(); } std::vector ComputeServiceSession::Impl::SubmitActions(const std::vector>& Actions) { ZEN_TRACE_CPU("ComputeServiceSession::SubmitActions"); std::vector Results(Actions.size()); // First try submitting the batch to local runners in parallel std::vector LocalResults = m_LocalRunnerGroup.SubmitActions(Actions); std::vector RemoteIndices; std::vector> RemoteActions; for (size_t i = 0; i < Actions.size(); ++i) { if (LocalResults[i].IsAccepted) { Results[i] = std::move(LocalResults[i]); } else { RemoteIndices.push_back(i); RemoteActions.push_back(Actions[i]); } } // Submit remaining actions to remote runners in parallel if (!RemoteActions.empty()) { std::vector RemoteResults = m_RemoteRunnerGroup.SubmitActions(RemoteActions); for (size_t j = 0; j < RemoteIndices.size(); ++j) { Results[RemoteIndices[j]] = std::move(RemoteResults[j]); } } return Results; } ////////////////////////////////////////////////////////////////////////// ComputeServiceSession::ComputeServiceSession(ChunkResolver& InChunkResolver) { m_Impl = std::make_unique(this, InChunkResolver); } ComputeServiceSession::~ComputeServiceSession() { Shutdown(); } bool ComputeServiceSession::IsHealthy() { return m_Impl->IsHealthy(); } void ComputeServiceSession::WaitUntilReady() { m_Impl->WaitUntilReady(); } void ComputeServiceSession::Shutdown() { m_Impl->Shutdown(); } ComputeServiceSession::SessionState ComputeServiceSession::GetSessionState() const { return m_Impl->m_SessionState.load(std::memory_order_relaxed); } bool ComputeServiceSession::RequestStateTransition(SessionState NewState) { return m_Impl->RequestStateTransition(NewState); } void ComputeServiceSession::SetOrchestratorEndpoint(std::string_view Endpoint) { m_Impl->SetOrchestratorEndpoint(Endpoint); } void ComputeServiceSession::SetOrchestratorBasePath(std::filesystem::path BasePath) { m_Impl->SetOrchestratorBasePath(std::move(BasePath)); } void ComputeServiceSession::StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath) { m_Impl->StartRecording(InResolver, RecordingPath); } void ComputeServiceSession::StopRecording() { m_Impl->StopRecording(); } ComputeServiceSession::ActionCounts ComputeServiceSession::GetActionCounts() { return m_Impl->GetActionCounts(); } void ComputeServiceSession::EmitStats(CbObjectWriter& Cbo) { m_Impl->EmitStats(Cbo); } std::vector ComputeServiceSession::GetKnownWorkerIds() { return m_Impl->GetKnownWorkerIds(); } WorkerDesc ComputeServiceSession::GetWorkerDescriptor(const IoHash& WorkerId) { return m_Impl->GetWorkerDescriptor(WorkerId); } void ComputeServiceSession::AddLocalRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, int32_t MaxConcurrentActions) { ZEN_TRACE_CPU("ComputeServiceSession::AddLocalRunner"); # if ZEN_PLATFORM_LINUX auto* NewRunner = new LinuxProcessRunner(InChunkResolver, BasePath, m_Impl->m_DeferredDeleter, m_Impl->m_LocalSubmitPool, false, MaxConcurrentActions); # elif ZEN_PLATFORM_WINDOWS auto* NewRunner = new WindowsProcessRunner(InChunkResolver, BasePath, m_Impl->m_DeferredDeleter, m_Impl->m_LocalSubmitPool, false, MaxConcurrentActions); # elif ZEN_PLATFORM_MAC auto* NewRunner = new MacProcessRunner(InChunkResolver, BasePath, m_Impl->m_DeferredDeleter, m_Impl->m_LocalSubmitPool, false, MaxConcurrentActions); # endif m_Impl->SyncWorkersToRunner(*NewRunner); m_Impl->m_LocalRunnerGroup.AddRunner(NewRunner); } void ComputeServiceSession::AddRemoteRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, std::string_view HostName) { ZEN_TRACE_CPU("ComputeServiceSession::AddRemoteRunner"); auto* NewRunner = new RemoteHttpRunner(InChunkResolver, BasePath, HostName, m_Impl->m_RemoteSubmitPool); m_Impl->SyncWorkersToRunner(*NewRunner); m_Impl->m_RemoteRunnerGroup.AddRunner(NewRunner); } ComputeServiceSession::EnqueueResult ComputeServiceSession::EnqueueAction(CbObject ActionObject, int Priority) { return m_Impl->EnqueueActionToQueue(m_Impl->m_ImplicitQueueId, ActionObject, Priority); } ComputeServiceSession::EnqueueResult ComputeServiceSession::EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority) { return m_Impl->EnqueueResolvedActionToQueue(m_Impl->m_ImplicitQueueId, Worker, ActionObj, RequestPriority); } ComputeServiceSession::CreateQueueResult ComputeServiceSession::CreateQueue(std::string_view Tag, CbObject Metadata, CbObject Config) { return m_Impl->CreateQueue(Tag, std::move(Metadata), std::move(Config)); } CbObject ComputeServiceSession::GetQueueMetadata(int QueueId) { return m_Impl->GetQueueMetadata(QueueId); } CbObject ComputeServiceSession::GetQueueConfig(int QueueId) { return m_Impl->GetQueueConfig(QueueId); } std::vector ComputeServiceSession::GetQueueIds() { return m_Impl->GetQueueIds(); } ComputeServiceSession::QueueStatus ComputeServiceSession::GetQueueStatus(int QueueId) { return m_Impl->GetQueueStatus(QueueId); } void ComputeServiceSession::CancelQueue(int QueueId) { m_Impl->CancelQueue(QueueId); } void ComputeServiceSession::DrainQueue(int QueueId) { m_Impl->DrainQueue(QueueId); } void ComputeServiceSession::DeleteQueue(int QueueId) { m_Impl->DeleteQueue(QueueId); } void ComputeServiceSession::GetQueueCompleted(int QueueId, CbWriter& Cbo) { m_Impl->GetQueueCompleted(QueueId, Cbo); } ComputeServiceSession::EnqueueResult ComputeServiceSession::EnqueueActionToQueue(int QueueId, CbObject ActionObject, int Priority) { return m_Impl->EnqueueActionToQueue(QueueId, ActionObject, Priority); } ComputeServiceSession::EnqueueResult ComputeServiceSession::EnqueueResolvedActionToQueue(int QueueId, WorkerDesc Worker, CbObject ActionObj, int RequestPriority) { return m_Impl->EnqueueResolvedActionToQueue(QueueId, Worker, ActionObj, RequestPriority); } void ComputeServiceSession::RegisterWorker(CbPackage Worker) { m_Impl->RegisterWorker(Worker); } HttpResponseCode ComputeServiceSession::GetActionResult(int ActionLsn, CbPackage& OutResultPackage) { return m_Impl->GetActionResult(ActionLsn, OutResultPackage); } HttpResponseCode ComputeServiceSession::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage) { return m_Impl->FindActionResult(ActionId, OutResultPackage); } void ComputeServiceSession::RetireActionResult(int ActionLsn) { m_Impl->RetireActionResult(ActionLsn); } ComputeServiceSession::RescheduleResult ComputeServiceSession::RescheduleAction(int ActionLsn) { return m_Impl->RescheduleAction(ActionLsn); } std::vector ComputeServiceSession::GetRunningActions() { return m_Impl->GetRunningActions(); } std::vector ComputeServiceSession::GetActionHistory(int Limit) { return m_Impl->GetActionHistory(Limit); } std::vector ComputeServiceSession::GetQueueHistory(int QueueId, int Limit) { return m_Impl->GetQueueHistory(QueueId, Limit); } void ComputeServiceSession::GetCompleted(CbWriter& Cbo) { m_Impl->GetCompleted(Cbo); } void ComputeServiceSession::PostUpdate(RunnerAction* Action) { m_Impl->PostUpdate(Action); } ////////////////////////////////////////////////////////////////////////// void computeservice_forcelink() { } } // namespace zen::compute #endif // ZEN_WITH_COMPUTE_SERVICES