// Copyright Epic Games, Inc. All Rights Reserved. #include "zencompute/functionservice.h" #if ZEN_WITH_COMPUTE_SERVICES # include "functionrunner.h" # include "actionrecorder.h" # include "localrunner.h" # include "remotehttprunner.h" # if ZEN_PLATFORM_LINUX # include "winerunner.h" # elif ZEN_PLATFORM_WINDOWS # include "windowsrunner.h" # elif ZEN_PLATFORM_MAC # include "macrunner.h" # endif # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include ZEN_THIRD_PARTY_INCLUDES_START # include ZEN_THIRD_PARTY_INCLUDES_END using namespace std::literals; namespace zen::compute { static_assert(ZEN_ARRAY_COUNT(FunctionServiceSession::ActionHistoryEntry::Timestamps) == static_cast(RunnerAction::State::_Count)); ////////////////////////////////////////////////////////////////////////// struct FunctionServiceSession::Impl { FunctionServiceSession* m_FunctionServiceSession; ChunkResolver& m_ChunkResolver; LoggerRef m_Log{logging::Get("compute")}; Impl(FunctionServiceSession* InFunctionServiceSession, ChunkResolver& InChunkResolver) : m_FunctionServiceSession(InFunctionServiceSession) , m_ChunkResolver(InChunkResolver) , m_LocalSubmitPool(GetLargeWorkerPool(EWorkloadType::Burst)) , m_RemoteSubmitPool(GetLargeWorkerPool(EWorkloadType::Burst)) { m_SchedulingThread = std::thread{&Impl::SchedulerThreadFunction, this}; } void WaitUntilReady(); void Shutdown(); bool IsHealthy(); LoggerRef Log() { return m_Log; } // Orchestration void SetOrchestratorEndpoint(std::string_view Endpoint); void SetOrchestratorBasePath(std::filesystem::path BasePath); std::string m_OrchestratorEndpoint; std::filesystem::path m_OrchestratorBasePath; Stopwatch m_OrchestratorQueryTimer; std::unordered_set m_KnownWorkerUris; void UpdateCoordinatorState(); // Worker registration and discovery struct FunctionDefinition { std::string FunctionName; Guid FunctionVersion; Guid BuildSystemVersion; IoHash WorkerId; }; void RegisterWorker(CbPackage Worker); WorkerDesc GetWorkerDescriptor(const IoHash& WorkerId); // Action scheduling and tracking std::atomic_bool m_AcceptActions = true; std::atomic m_ActionsCounter = 0; // sequence number metrics::Meter m_ArrivalRate; RwLock m_PendingLock; std::map> m_PendingActions; RwLock m_RunningLock; std::unordered_map> m_RunningMap; RwLock m_ResultsLock; std::unordered_map> m_ResultsMap; metrics::Meter m_ResultRate; std::atomic m_RetiredCount{0}; EnqueueResult EnqueueAction(CbObject ActionObject, int Priority); EnqueueResult EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority); void GetCompleted(CbWriter& Cbo); HttpResponseCode GetActionResult(int ActionLsn, CbPackage& OutResultPackage); HttpResponseCode FindActionResult(const IoHash& ActionId, CbPackage& ResultPackage); void RetireActionResult(int ActionLsn); std::atomic m_ShutdownRequested{false}; std::thread m_SchedulingThread; std::atomic m_SchedulingThreadEnabled{true}; Event m_SchedulingThreadEvent; void SchedulerThreadFunction(); void SchedulePendingActions(); // Workers RwLock m_WorkerLock; std::unordered_map m_WorkerMap; std::vector m_FunctionList; std::vector GetKnownWorkerIds(); // Runners DeferredDirectoryDeleter m_DeferredDeleter; WorkerThreadPool& m_LocalSubmitPool; WorkerThreadPool& m_RemoteSubmitPool; RunnerGroup m_LocalRunnerGroup; RunnerGroup m_RemoteRunnerGroup; void ShutdownRunners(); // Recording void StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath); void StopRecording(); std::unique_ptr m_Recorder; // History tracking RwLock m_ActionHistoryLock; std::deque m_ActionHistory; size_t m_HistoryLimit = 1000; std::vector GetActionHistory(int Limit); // [[nodiscard]] size_t QueryCapacity(); [[nodiscard]] SubmitResult SubmitAction(Ref Action); [[nodiscard]] std::vector SubmitActions(const std::vector>& Actions); [[nodiscard]] size_t GetSubmittedActionCount(); // Updates RwLock m_UpdatedActionsLock; std::vector> m_UpdatedActions; void HandleActionUpdates(); void PostUpdate(RunnerAction* Action); void EmitStats(CbObjectWriter& Cbo) { m_WorkerLock.WithSharedLock([&] { Cbo << "worker_count"sv << m_WorkerMap.size(); }); m_ResultsLock.WithSharedLock([&] { Cbo << "actions_complete"sv << m_ResultsMap.size(); }); m_PendingLock.WithSharedLock([&] { Cbo << "actions_pending"sv << m_PendingActions.size(); }); Cbo << "actions_submitted"sv << GetSubmittedActionCount(); EmitSnapshot("actions_arrival"sv, m_ArrivalRate, Cbo); EmitSnapshot("actions_retired"sv, m_ResultRate, Cbo); } }; bool FunctionServiceSession::Impl::IsHealthy() { return true; } void FunctionServiceSession::Impl::SetOrchestratorEndpoint(std::string_view Endpoint) { m_OrchestratorEndpoint = Endpoint; } void FunctionServiceSession::Impl::SetOrchestratorBasePath(std::filesystem::path BasePath) { m_OrchestratorBasePath = std::move(BasePath); } void FunctionServiceSession::Impl::UpdateCoordinatorState() { ZEN_TRACE_CPU("FunctionServiceSession::UpdateCoordinatorState"); if (m_OrchestratorEndpoint.empty()) { return; } // Throttle queries to every 5 seconds if (m_OrchestratorQueryTimer.GetElapsedTimeMs() < 5000) { return; } m_OrchestratorQueryTimer.Reset(); try { HttpClient Client(m_OrchestratorEndpoint); HttpClient::Response Response = Client.Get("/orch/agents"); if (!Response.IsSuccess()) { ZEN_WARN("orchestrator query failed with status {}", static_cast(Response.StatusCode)); return; } CbObject WorkerList = Response.AsObject(); std::unordered_set ValidWorkerUris; for (auto& Item : WorkerList["workers"sv]) { CbObjectView Worker = Item.AsObjectView(); uint64_t Dt = Worker["dt"sv].AsUInt64(); bool Reachable = Worker["reachable"sv].AsBool(); std::string_view Uri = Worker["uri"sv].AsString(); // Skip stale workers (not seen in over 30 seconds) if (Dt > 30000) { continue; } // Skip workers that are not confirmed reachable if (!Reachable) { continue; } std::string UriStr{Uri}; ValidWorkerUris.insert(UriStr); // Skip workers we already know about if (m_KnownWorkerUris.contains(UriStr)) { continue; } ZEN_INFO("discovered new worker at {}", UriStr); m_KnownWorkerUris.insert(UriStr); m_RemoteRunnerGroup.AddRunner(new RemoteHttpRunner(m_ChunkResolver, m_OrchestratorBasePath, UriStr, m_RemoteSubmitPool)); } // Remove workers that are no longer valid (stale or unreachable) for (auto It = m_KnownWorkerUris.begin(); It != m_KnownWorkerUris.end();) { if (!ValidWorkerUris.contains(*It)) { const std::string& ExpiredUri = *It; ZEN_INFO("removing expired worker at {}", ExpiredUri); m_RemoteRunnerGroup.RemoveRunnerIf([&](const RemoteHttpRunner& Runner) { return Runner.GetHostName() == ExpiredUri; }); It = m_KnownWorkerUris.erase(It); } else { ++It; } } } catch (const HttpClientError& Ex) { ZEN_WARN("orchestrator query error: {}", Ex.what()); } catch (const std::exception& Ex) { ZEN_WARN("orchestrator query unexpected error: {}", Ex.what()); } } void FunctionServiceSession::Impl::WaitUntilReady() { if (m_RemoteRunnerGroup.GetRunnerCount()) { for (;;) { const size_t Capacity = m_RemoteRunnerGroup.QueryCapacity(); if (Capacity > 0) { return; } zen::Sleep(1000); } } ZEN_ASSERT(m_LocalRunnerGroup.GetRunnerCount()); } void FunctionServiceSession::Impl::Shutdown() { m_AcceptActions = false; m_ShutdownRequested = true; m_SchedulingThreadEnabled = false; m_SchedulingThreadEvent.Set(); if (m_SchedulingThread.joinable()) { m_SchedulingThread.join(); } ShutdownRunners(); m_DeferredDeleter.Shutdown(); } void FunctionServiceSession::Impl::ShutdownRunners() { m_LocalRunnerGroup.Shutdown(); m_RemoteRunnerGroup.Shutdown(); } void FunctionServiceSession::Impl::StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath) { ZEN_INFO("starting recording to '{}'", RecordingPath); m_Recorder = std::make_unique(InCidStore, RecordingPath); ZEN_INFO("started recording to '{}'", RecordingPath); } void FunctionServiceSession::Impl::StopRecording() { ZEN_INFO("stopping recording"); m_Recorder = nullptr; ZEN_INFO("stopped recording"); } std::vector FunctionServiceSession::Impl::GetActionHistory(int Limit) { RwLock::SharedLockScope _(m_ActionHistoryLock); if (Limit > 0 && static_cast(Limit) < m_ActionHistory.size()) { return std::vector(m_ActionHistory.end() - Limit, m_ActionHistory.end()); } return std::vector(m_ActionHistory.begin(), m_ActionHistory.end()); } void FunctionServiceSession::Impl::RegisterWorker(CbPackage Worker) { ZEN_TRACE_CPU("FunctionServiceSession::RegisterWorker"); RwLock::ExclusiveLockScope _(m_WorkerLock); const IoHash& WorkerId = Worker.GetObject().GetHash(); if (m_WorkerMap.insert_or_assign(WorkerId, Worker).second) { // Note that since the convention currently is that WorkerId is equal to the hash // of the worker descriptor there is no chance that we get a second write with a // different descriptor. Thus we only need to call this the first time, when the // worker is added m_LocalRunnerGroup.RegisterWorker(Worker); m_RemoteRunnerGroup.RegisterWorker(Worker); if (m_Recorder) { m_Recorder->RegisterWorker(Worker); } CbObject WorkerObj = Worker.GetObject(); // Populate worker database const Guid WorkerBuildSystemVersion = WorkerObj["buildsystem_version"sv].AsUuid(); for (auto& Item : WorkerObj["functions"sv]) { CbObjectView Function = Item.AsObjectView(); std::string_view FunctionName = Function["name"sv].AsString(); const Guid FunctionVersion = Function["version"sv].AsUuid(); m_FunctionList.emplace_back(FunctionDefinition{.FunctionName = std::string{FunctionName}, .FunctionVersion = FunctionVersion, .BuildSystemVersion = WorkerBuildSystemVersion, .WorkerId = WorkerId}); } } } WorkerDesc FunctionServiceSession::Impl::GetWorkerDescriptor(const IoHash& WorkerId) { RwLock::SharedLockScope _(m_WorkerLock); if (auto It = m_WorkerMap.find(WorkerId); It != m_WorkerMap.end()) { const CbPackage& Desc = It->second; return {Desc, WorkerId}; } return {}; } std::vector FunctionServiceSession::Impl::GetKnownWorkerIds() { std::vector WorkerIds; WorkerIds.reserve(m_WorkerMap.size()); m_WorkerLock.WithSharedLock([&] { for (const auto& [WorkerId, _] : m_WorkerMap) { WorkerIds.push_back(WorkerId); } }); return WorkerIds; } FunctionServiceSession::EnqueueResult FunctionServiceSession::Impl::EnqueueAction(CbObject ActionObject, int Priority) { ZEN_TRACE_CPU("FunctionServiceSession::EnqueueAction"); // Resolve function to worker IoHash WorkerId{IoHash::Zero}; std::string_view FunctionName = ActionObject["Function"sv].AsString(); const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid(); const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid(); for (const FunctionDefinition& FuncDef : m_FunctionList) { if (FuncDef.FunctionName == FunctionName && FuncDef.FunctionVersion == FunctionVersion && FuncDef.BuildSystemVersion == BuildSystemVersion) { WorkerId = FuncDef.WorkerId; break; } } if (WorkerId == IoHash::Zero) { CbObjectWriter Writer; Writer << "Function"sv << FunctionName << "FunctionVersion"sv << FunctionVersion << "BuildSystemVersion" << BuildSystemVersion; Writer << "error" << "no worker matches the action specification"; return {0, Writer.Save()}; } if (auto It = m_WorkerMap.find(WorkerId); It != m_WorkerMap.end()) { CbPackage WorkerPackage = It->second; return EnqueueResolvedAction(WorkerDesc{WorkerPackage, WorkerId}, ActionObject, Priority); } CbObjectWriter Writer; Writer << "Function"sv << FunctionName << "FunctionVersion"sv << FunctionVersion << "BuildSystemVersion" << BuildSystemVersion; Writer << "error" << "no worker found despite match"; return {0, Writer.Save()}; } FunctionServiceSession::EnqueueResult FunctionServiceSession::Impl::EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority) { ZEN_TRACE_CPU("FunctionServiceSession::EnqueueResolvedAction"); const int ActionLsn = ++m_ActionsCounter; m_ArrivalRate.Mark(); Ref Pending{new RunnerAction(m_FunctionServiceSession)}; Pending->ActionLsn = ActionLsn; Pending->Worker = Worker; Pending->ActionId = ActionObj.GetHash(); Pending->ActionObj = ActionObj; Pending->Priority = RequestPriority; // For now simply put action into pending state, so we can do batch scheduling # if 0 SubmitResult SubResult = SubmitAction(Pending); if (SubResult.IsAccepted) { // Great, the job is being taken care of by the runner ZEN_DEBUG("direct schedule LSN {}", Pending->ActionLsn); } else # endif { ZEN_DEBUG("action {} ({}) PENDING", Pending->ActionId, Pending->ActionLsn); Pending->SetActionState(RunnerAction::State::Pending); } if (m_Recorder) { m_Recorder->RecordAction(Pending); } CbObjectWriter Writer; Writer << "lsn" << Pending->ActionLsn; Writer << "worker" << Pending->Worker.WorkerId; Writer << "action" << Pending->ActionId; return {Pending->ActionLsn, Writer.Save()}; } SubmitResult FunctionServiceSession::Impl::SubmitAction(Ref Action) { // Loosely round-robin scheduling of actions across runners. // // It's not entirely clear what this means given that submits // can come in across multiple threads, but it's probably better // than always starting with the first runner. // // Longer term we should track the state of the individual // runners and make decisions accordingly. SubmitResult Result = m_LocalRunnerGroup.SubmitAction(Action); if (Result.IsAccepted) { return Result; } return m_RemoteRunnerGroup.SubmitAction(Action); } size_t FunctionServiceSession::Impl::GetSubmittedActionCount() { return m_LocalRunnerGroup.GetSubmittedActionCount() + m_RemoteRunnerGroup.GetSubmittedActionCount(); } HttpResponseCode FunctionServiceSession::Impl::GetActionResult(int ActionLsn, CbPackage& OutResultPackage) { // This lock is held for the duration of the function since we need to // be sure that the action doesn't change state while we are checking the // different data structures RwLock::ExclusiveLockScope _(m_ResultsLock); if (auto It = m_ResultsMap.find(ActionLsn); It != m_ResultsMap.end()) { OutResultPackage = std::move(It->second->GetResult()); m_ResultsMap.erase(It); return HttpResponseCode::OK; } { RwLock::SharedLockScope __(m_PendingLock); if (auto FindIt = m_PendingActions.find(ActionLsn); FindIt != m_PendingActions.end()) { return HttpResponseCode::Accepted; } } // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must // always be taken after m_ResultsLock if both are needed { RwLock::SharedLockScope __(m_RunningLock); if (m_RunningMap.find(ActionLsn) != m_RunningMap.end()) { return HttpResponseCode::Accepted; } } return HttpResponseCode::NotFound; } HttpResponseCode FunctionServiceSession::Impl::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage) { // This lock is held for the duration of the function since we need to // be sure that the action doesn't change state while we are checking the // different data structures RwLock::ExclusiveLockScope _(m_ResultsLock); for (auto It = begin(m_ResultsMap), End = end(m_ResultsMap); It != End; ++It) { if (It->second->ActionId == ActionId) { OutResultPackage = std::move(It->second->GetResult()); m_ResultsMap.erase(It); return HttpResponseCode::OK; } } { RwLock::SharedLockScope __(m_PendingLock); for (const auto& [K, Pending] : m_PendingActions) { if (Pending->ActionId == ActionId) { return HttpResponseCode::Accepted; } } } // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must // always be taken after m_ResultsLock if both are needed { RwLock::SharedLockScope __(m_RunningLock); for (const auto& [K, v] : m_RunningMap) { if (v->ActionId == ActionId) { return HttpResponseCode::Accepted; } } } return HttpResponseCode::NotFound; } void FunctionServiceSession::Impl::RetireActionResult(int ActionLsn) { m_DeferredDeleter.MarkReady(ActionLsn); } void FunctionServiceSession::Impl::GetCompleted(CbWriter& Cbo) { Cbo.BeginArray("completed"); m_ResultsLock.WithSharedLock([&] { for (auto& Kv : m_ResultsMap) { Cbo << Kv.first; } }); Cbo.EndArray(); } void FunctionServiceSession::Impl::SchedulePendingActions() { ZEN_TRACE_CPU("FunctionServiceSession::SchedulePendingActions"); int ScheduledCount = 0; size_t RunningCount = m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); }); size_t PendingCount = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); }); size_t ResultCount = m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); }); static Stopwatch DumpRunningTimer; auto _ = MakeGuard([&] { ZEN_INFO("scheduled {} pending actions. {} running ({} retired), {} still pending, {} results", ScheduledCount, RunningCount, m_RetiredCount.load(), PendingCount, ResultCount); if (DumpRunningTimer.GetElapsedTimeMs() > 30000) { DumpRunningTimer.Reset(); std::set RunningList; m_RunningLock.WithSharedLock([&] { for (auto& [K, V] : m_RunningMap) { RunningList.insert(K); } }); ExtendableStringBuilder<1024> RunningString; for (int i : RunningList) { if (RunningString.Size()) { RunningString << ", "; } RunningString.Append(IntNum(i)); } ZEN_INFO("running: {}", RunningString); } }); size_t Capacity = QueryCapacity(); if (!Capacity) { _.Dismiss(); return; } std::vector> ActionsToSchedule; // Pull actions to schedule from the pending queue, we will // try to submit these to the runner outside of the lock. Note // that because of how the state transitions work it's not // actually the case that all of these actions will still be // pending by the time we try to submit them, but that's fine. // // Also note that the m_PendingActions list is not maintained // here, that's done periodically in SchedulePendingActions() m_PendingLock.WithExclusiveLock([&] { if (m_ShutdownRequested) { return; } if (m_PendingActions.empty()) { return; } size_t NumActionsToSchedule = std::min(Capacity, m_PendingActions.size()); auto PendingIt = m_PendingActions.begin(); const auto PendingEnd = m_PendingActions.end(); while (NumActionsToSchedule && PendingIt != PendingEnd) { const Ref& Pending = PendingIt->second; switch (Pending->ActionState()) { case RunnerAction::State::Pending: ActionsToSchedule.push_back(Pending); break; case RunnerAction::State::Submitting: break; // already claimed by async submission case RunnerAction::State::Running: case RunnerAction::State::Completed: case RunnerAction::State::Failed: break; default: case RunnerAction::State::New: ZEN_WARN("unexpected state {} for pending action {}", static_cast(Pending->ActionState()), Pending->ActionLsn); break; } ++PendingIt; --NumActionsToSchedule; } PendingCount = m_PendingActions.size(); }); if (ActionsToSchedule.empty()) { _.Dismiss(); return; } ZEN_INFO("attempting schedule of {} pending actions", ActionsToSchedule.size()); Stopwatch SubmitTimer; std::vector SubmitResults = SubmitActions(ActionsToSchedule); int NotAcceptedCount = 0; int ScheduledActionCount = 0; for (const SubmitResult& SubResult : SubmitResults) { if (SubResult.IsAccepted) { ++ScheduledActionCount; } else { ++NotAcceptedCount; } } ZEN_INFO("scheduled {} pending actions in {} ({} rejected)", ScheduledActionCount, NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()), NotAcceptedCount); ScheduledCount += ScheduledActionCount; PendingCount -= ScheduledActionCount; } void FunctionServiceSession::Impl::SchedulerThreadFunction() { SetCurrentThreadName("Function_Scheduler"); auto _ = MakeGuard([&] { ZEN_INFO("scheduler thread exiting"); }); do { int TimeoutMs = 1000; if (m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); })) { TimeoutMs = 100; } const bool Timedout = m_SchedulingThreadEvent.Wait(TimeoutMs); if (m_SchedulingThreadEnabled == false) { return; } HandleActionUpdates(); UpdateCoordinatorState(); SchedulePendingActions(); if (!Timedout) { m_SchedulingThreadEvent.Reset(); } } while (m_SchedulingThreadEnabled); } void FunctionServiceSession::Impl::PostUpdate(RunnerAction* Action) { m_UpdatedActionsLock.WithExclusiveLock([&] { m_UpdatedActions.emplace_back(Action); }); } void FunctionServiceSession::Impl::HandleActionUpdates() { ZEN_TRACE_CPU("FunctionServiceSession::HandleActionUpdates"); std::vector> UpdatedActions; m_UpdatedActionsLock.WithExclusiveLock([&] { std::swap(UpdatedActions, m_UpdatedActions); }); std::unordered_set SeenLsn; std::unordered_set RunningLsn; for (Ref& Action : UpdatedActions) { const int ActionLsn = Action->ActionLsn; if (auto [It, Inserted] = SeenLsn.insert(ActionLsn); Inserted) { switch (Action->ActionState()) { case RunnerAction::State::Pending: m_PendingLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Action}); }); break; case RunnerAction::State::Submitting: break; // still in pending map, async submission in progress case RunnerAction::State::Running: m_PendingLock.WithExclusiveLock([&] { m_RunningLock.WithExclusiveLock([&] { m_RunningMap.insert({ActionLsn, Action}); m_PendingActions.erase(ActionLsn); }); }); ZEN_DEBUG("action {} ({}) RUNNING", Action->ActionId, ActionLsn); break; case RunnerAction::State::Completed: case RunnerAction::State::Failed: m_ResultsLock.WithExclusiveLock([&] { m_ResultsMap[ActionLsn] = Action; m_PendingLock.WithExclusiveLock([&] { m_RunningLock.WithExclusiveLock([&] { if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end()) { m_PendingActions.erase(ActionLsn); } else { m_RunningMap.erase(FindIt); } }); }); m_ActionHistoryLock.WithExclusiveLock([&] { ActionHistoryEntry Entry{.Lsn = ActionLsn, .ActionId = Action->ActionId, .WorkerId = Action->Worker.WorkerId, .ActionDescriptor = Action->ActionObj, .Succeeded = Action->ActionState() == RunnerAction::State::Completed}; std::copy(std::begin(Action->Timestamps), std::end(Action->Timestamps), std::begin(Entry.Timestamps)); m_ActionHistory.push_back(std::move(Entry)); if (m_ActionHistory.size() > m_HistoryLimit) { m_ActionHistory.pop_front(); } }); }); m_RetiredCount.fetch_add(1); m_ResultRate.Mark(1); ZEN_DEBUG("action {} ({}) RUNNING -> COMPLETED with {}", Action->ActionId, ActionLsn, Action->ActionState() == RunnerAction::State::Completed ? "SUCCESS" : "FAILURE"); break; } } } } size_t FunctionServiceSession::Impl::QueryCapacity() { return m_LocalRunnerGroup.QueryCapacity() + m_RemoteRunnerGroup.QueryCapacity(); } std::vector FunctionServiceSession::Impl::SubmitActions(const std::vector>& Actions) { ZEN_TRACE_CPU("FunctionServiceSession::SubmitActions"); std::vector Results(Actions.size()); // First try submitting the batch to local runners in parallel std::vector LocalResults = m_LocalRunnerGroup.SubmitActions(Actions); std::vector RemoteIndices; std::vector> RemoteActions; for (size_t i = 0; i < Actions.size(); ++i) { if (LocalResults[i].IsAccepted) { Results[i] = std::move(LocalResults[i]); } else { RemoteIndices.push_back(i); RemoteActions.push_back(Actions[i]); } } // Submit remaining actions to remote runners in parallel if (!RemoteActions.empty()) { std::vector RemoteResults = m_RemoteRunnerGroup.SubmitActions(RemoteActions); for (size_t j = 0; j < RemoteIndices.size(); ++j) { Results[RemoteIndices[j]] = std::move(RemoteResults[j]); } } return Results; } ////////////////////////////////////////////////////////////////////////// FunctionServiceSession::FunctionServiceSession(ChunkResolver& InChunkResolver) { m_Impl = std::make_unique(this, InChunkResolver); } FunctionServiceSession::~FunctionServiceSession() { Shutdown(); } bool FunctionServiceSession::IsHealthy() { return m_Impl->IsHealthy(); } void FunctionServiceSession::WaitUntilReady() { m_Impl->WaitUntilReady(); } void FunctionServiceSession::Shutdown() { m_Impl->Shutdown(); } void FunctionServiceSession::SetOrchestratorEndpoint(std::string_view Endpoint) { m_Impl->SetOrchestratorEndpoint(Endpoint); } void FunctionServiceSession::SetOrchestratorBasePath(std::filesystem::path BasePath) { m_Impl->SetOrchestratorBasePath(std::move(BasePath)); } void FunctionServiceSession::StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath) { m_Impl->StartRecording(InResolver, RecordingPath); } void FunctionServiceSession::StopRecording() { m_Impl->StopRecording(); } void FunctionServiceSession::EmitStats(CbObjectWriter& Cbo) { m_Impl->EmitStats(Cbo); } std::vector FunctionServiceSession::GetKnownWorkerIds() { return m_Impl->GetKnownWorkerIds(); } WorkerDesc FunctionServiceSession::GetWorkerDescriptor(const IoHash& WorkerId) { return m_Impl->GetWorkerDescriptor(WorkerId); } void FunctionServiceSession::AddLocalRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath) { ZEN_TRACE_CPU("FunctionServiceSession::AddLocalRunner"); # if ZEN_PLATFORM_LINUX m_Impl->m_LocalRunnerGroup.AddRunner( new WineProcessRunner(InChunkResolver, BasePath, m_Impl->m_DeferredDeleter, m_Impl->m_LocalSubmitPool)); # elif ZEN_PLATFORM_WINDOWS m_Impl->m_LocalRunnerGroup.AddRunner( new WindowsProcessRunner(InChunkResolver, BasePath, m_Impl->m_DeferredDeleter, m_Impl->m_LocalSubmitPool)); # elif ZEN_PLATFORM_MAC m_Impl->m_LocalRunnerGroup.AddRunner( new MacProcessRunner(InChunkResolver, BasePath, m_Impl->m_DeferredDeleter, m_Impl->m_LocalSubmitPool)); # endif } void FunctionServiceSession::AddRemoteRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, std::string_view HostName) { ZEN_TRACE_CPU("FunctionServiceSession::AddRemoteRunner"); m_Impl->m_RemoteRunnerGroup.AddRunner(new RemoteHttpRunner(InChunkResolver, BasePath, HostName, m_Impl->m_RemoteSubmitPool)); } FunctionServiceSession::EnqueueResult FunctionServiceSession::EnqueueAction(CbObject ActionObject, int Priority) { return m_Impl->EnqueueAction(ActionObject, Priority); } FunctionServiceSession::EnqueueResult FunctionServiceSession::EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority) { return m_Impl->EnqueueResolvedAction(Worker, ActionObj, RequestPriority); } void FunctionServiceSession::RegisterWorker(CbPackage Worker) { m_Impl->RegisterWorker(Worker); } HttpResponseCode FunctionServiceSession::GetActionResult(int ActionLsn, CbPackage& OutResultPackage) { return m_Impl->GetActionResult(ActionLsn, OutResultPackage); } HttpResponseCode FunctionServiceSession::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage) { return m_Impl->FindActionResult(ActionId, OutResultPackage); } void FunctionServiceSession::RetireActionResult(int ActionLsn) { m_Impl->RetireActionResult(ActionLsn); } std::vector FunctionServiceSession::GetActionHistory(int Limit) { return m_Impl->GetActionHistory(Limit); } void FunctionServiceSession::GetCompleted(CbWriter& Cbo) { m_Impl->GetCompleted(Cbo); } void FunctionServiceSession::PostUpdate(RunnerAction* Action) { m_Impl->PostUpdate(Action); } ////////////////////////////////////////////////////////////////////////// void function_forcelink() { } } // namespace zen::compute #endif // ZEN_WITH_COMPUTE_SERVICES