// Copyright Epic Games, Inc. All Rights Reserved. #include "zencompute/functionservice.h" #if ZEN_WITH_COMPUTE_SERVICES # include "functionrunner.h" # include "actionrecorder.h" # include "localrunner.h" # include "remotehttprunner.h" # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include ZEN_THIRD_PARTY_INCLUDES_START # include ZEN_THIRD_PARTY_INCLUDES_END using namespace std::literals; namespace zen::compute { ////////////////////////////////////////////////////////////////////////// struct FunctionServiceSession::Impl { FunctionServiceSession* m_FunctionServiceSession; ChunkResolver& m_ChunkResolver; LoggerRef m_Log{logging::Get("apply")}; Impl(FunctionServiceSession* InFunctionServiceSession, ChunkResolver& InChunkResolver) : m_FunctionServiceSession(InFunctionServiceSession) , m_ChunkResolver(InChunkResolver) { m_SchedulingThread = std::thread{&Impl::MonitorThreadFunction, this}; } void Shutdown(); bool IsHealthy(); LoggerRef Log() { return m_Log; } std::atomic_bool m_AcceptActions = true; struct FunctionDefinition { std::string FunctionName; Guid FunctionVersion; Guid BuildSystemVersion; IoHash WorkerId; }; void EmitStats(CbObjectWriter& Cbo) { m_WorkerLock.WithSharedLock([&] { Cbo << "worker_count"sv << m_WorkerMap.size(); }); m_ResultsLock.WithSharedLock([&] { Cbo << "actions_complete"sv << m_ResultsMap.size(); }); m_PendingLock.WithSharedLock([&] { Cbo << "actions_pending"sv << m_PendingActions.size(); }); Cbo << "actions_submitted"sv << GetSubmittedActionCount(); EmitSnapshot("actions_retired"sv, m_ResultRate, Cbo); } void RegisterWorker(CbPackage Worker); WorkerDesc GetWorkerDescriptor(const IoHash& WorkerId); std::atomic m_ActionsCounter = 0; // sequence number RwLock m_PendingLock; std::map> m_PendingActions; RwLock m_RunningLock; std::unordered_map> m_RunningMap; RwLock m_ResultsLock; std::unordered_map> m_ResultsMap; metrics::Meter m_ResultRate; std::atomic m_RetiredCount{0}; HttpResponseCode GetActionResult(int ActionLsn, CbPackage& OutResultPackage); HttpResponseCode FindActionResult(const IoHash& ActionId, CbPackage& ResultPackage); std::atomic m_ShutdownRequested{false}; std::thread m_SchedulingThread; std::atomic m_SchedulingThreadEnabled{true}; Event m_SchedulingThreadEvent; void MonitorThreadFunction(); void SchedulePendingActions(); // Workers RwLock m_WorkerLock; std::unordered_map m_WorkerMap; std::vector m_FunctionList; std::vector GetKnownWorkerIds(); // Runners RunnerGroup m_LocalRunnerGroup; RunnerGroup m_RemoteRunnerGroup; EnqueueResult EnqueueAction(CbObject ActionObject, int Priority); EnqueueResult EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority); void GetCompleted(CbWriter& Cbo); // Recording void StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath); void StopRecording(); std::unique_ptr m_Recorder; // History tracking RwLock m_ActionHistoryLock; std::deque m_ActionHistory; size_t m_HistoryLimit = 1000; std::vector GetActionHistory(int Limit); // [[nodiscard]] size_t QueryCapacity(); [[nodiscard]] SubmitResult SubmitAction(Ref Action); [[nodiscard]] std::vector SubmitActions(const std::vector>& Actions); [[nodiscard]] size_t GetSubmittedActionCount(); // Updates RwLock m_UpdatedActionsLock; std::vector> m_UpdatedActions; void HandleActionUpdates(); void PostUpdate(RunnerAction* Action); void ShutdownRunners(); }; bool FunctionServiceSession::Impl::IsHealthy() { return true; } void FunctionServiceSession::Impl::Shutdown() { m_AcceptActions = false; m_ShutdownRequested = true; m_SchedulingThreadEnabled = false; m_SchedulingThreadEvent.Set(); if (m_SchedulingThread.joinable()) { m_SchedulingThread.join(); } ShutdownRunners(); } void FunctionServiceSession::Impl::ShutdownRunners() { m_LocalRunnerGroup.Shutdown(); m_RemoteRunnerGroup.Shutdown(); } void FunctionServiceSession::Impl::StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath) { ZEN_INFO("starting recording to '{}'", RecordingPath); m_Recorder = std::make_unique(InCidStore, RecordingPath); ZEN_INFO("started recording to '{}'", RecordingPath); } void FunctionServiceSession::Impl::StopRecording() { ZEN_INFO("stopping recording"); m_Recorder = nullptr; ZEN_INFO("stopped recording"); } std::vector FunctionServiceSession::Impl::GetActionHistory(int Limit) { RwLock::SharedLockScope _(m_ActionHistoryLock); if (Limit > 0 && static_cast(Limit) < m_ActionHistory.size()) { return std::vector(m_ActionHistory.end() - Limit, m_ActionHistory.end()); } return std::vector(m_ActionHistory.begin(), m_ActionHistory.end()); } void FunctionServiceSession::Impl::RegisterWorker(CbPackage Worker) { RwLock::ExclusiveLockScope _(m_WorkerLock); const IoHash& WorkerId = Worker.GetObject().GetHash(); if (m_WorkerMap.insert_or_assign(WorkerId, Worker).second) { // Note that since the convention currently is that WorkerId is equal to the hash // of the worker descriptor there is no chance that we get a second write with a // different descriptor. Thus we only need to call this the first time, when the // worker is added m_LocalRunnerGroup.RegisterWorker(Worker); m_RemoteRunnerGroup.RegisterWorker(Worker); if (m_Recorder) { m_Recorder->RegisterWorker(Worker); } CbObject WorkerObj = Worker.GetObject(); // Populate worker database const Guid WorkerBuildSystemVersion = WorkerObj["buildsystem_version"sv].AsUuid(); for (auto& Item : WorkerObj["functions"sv]) { CbObjectView Function = Item.AsObjectView(); std::string_view FunctionName = Function["name"sv].AsString(); const Guid FunctionVersion = Function["version"sv].AsUuid(); m_FunctionList.emplace_back(FunctionDefinition{.FunctionName = std::string{FunctionName}, .FunctionVersion = FunctionVersion, .BuildSystemVersion = WorkerBuildSystemVersion, .WorkerId = WorkerId}); } } } WorkerDesc FunctionServiceSession::Impl::GetWorkerDescriptor(const IoHash& WorkerId) { RwLock::SharedLockScope _(m_WorkerLock); if (auto It = m_WorkerMap.find(WorkerId); It != m_WorkerMap.end()) { const CbPackage& Desc = It->second; return {Desc, WorkerId}; } return {}; } std::vector FunctionServiceSession::Impl::GetKnownWorkerIds() { std::vector WorkerIds; WorkerIds.reserve(m_WorkerMap.size()); m_WorkerLock.WithSharedLock([&] { for (const auto& [WorkerId, _] : m_WorkerMap) { WorkerIds.push_back(WorkerId); } }); return WorkerIds; } FunctionServiceSession::EnqueueResult FunctionServiceSession::Impl::EnqueueAction(CbObject ActionObject, int Priority) { // Resolve function to worker IoHash WorkerId{IoHash::Zero}; std::string_view FunctionName = ActionObject["Function"sv].AsString(); const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid(); const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid(); for (const FunctionDefinition& FuncDef : m_FunctionList) { if (FuncDef.FunctionName == FunctionName && FuncDef.FunctionVersion == FunctionVersion && FuncDef.BuildSystemVersion == BuildSystemVersion) { WorkerId = FuncDef.WorkerId; break; } } if (WorkerId == IoHash::Zero) { CbObjectWriter Writer; Writer << "Function"sv << FunctionName << "FunctionVersion"sv << FunctionVersion << "BuildSystemVersion" << BuildSystemVersion; Writer << "error" << "no worker matches the action specification"; return {0, Writer.Save()}; } if (auto It = m_WorkerMap.find(WorkerId); It != m_WorkerMap.end()) { CbPackage WorkerPackage = It->second; return EnqueueResolvedAction(WorkerDesc{WorkerPackage, WorkerId}, ActionObject, Priority); } CbObjectWriter Writer; Writer << "Function"sv << FunctionName << "FunctionVersion"sv << FunctionVersion << "BuildSystemVersion" << BuildSystemVersion; Writer << "error" << "no worker found despite match"; return {0, Writer.Save()}; } FunctionServiceSession::EnqueueResult FunctionServiceSession::Impl::EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority) { const int ActionLsn = ++m_ActionsCounter; Ref Pending{new RunnerAction(m_FunctionServiceSession)}; Pending->ActionLsn = ActionLsn; Pending->Worker = Worker; Pending->ActionId = ActionObj.GetHash(); Pending->ActionObj = ActionObj; Pending->Priority = RequestPriority; SubmitResult SubResult = SubmitAction(Pending); if (SubResult.IsAccepted) { // Great, the job is being taken care of by the runner ZEN_DEBUG("direct schedule LSN {}", Pending->ActionLsn); } else { ZEN_DEBUG("action {} ({}) PENDING", Pending->ActionId, Pending->ActionLsn); Pending->SetActionState(RunnerAction::State::Pending); } if (m_Recorder) { m_Recorder->RecordAction(Pending); } CbObjectWriter Writer; Writer << "lsn" << Pending->ActionLsn; Writer << "worker" << Pending->Worker.WorkerId; Writer << "action" << Pending->ActionId; return {Pending->ActionLsn, Writer.Save()}; } SubmitResult FunctionServiceSession::Impl::SubmitAction(Ref Action) { // Loosely round-robin scheduling of actions across runners. // // It's not entirely clear what this means given that submits // can come in across multiple threads, but it's probably better // than always starting with the first runner. // // Longer term we should track the state of the individual // runners and make decisions accordingly. SubmitResult Result = m_LocalRunnerGroup.SubmitAction(Action); if (Result.IsAccepted) { return Result; } return m_RemoteRunnerGroup.SubmitAction(Action); } size_t FunctionServiceSession::Impl::GetSubmittedActionCount() { return m_LocalRunnerGroup.GetSubmittedActionCount() + m_RemoteRunnerGroup.GetSubmittedActionCount(); } HttpResponseCode FunctionServiceSession::Impl::GetActionResult(int ActionLsn, CbPackage& OutResultPackage) { // This lock is held for the duration of the function since we need to // be sure that the action doesn't change state while we are checking the // different data structures RwLock::ExclusiveLockScope _(m_ResultsLock); if (auto It = m_ResultsMap.find(ActionLsn); It != m_ResultsMap.end()) { OutResultPackage = std::move(It->second->GetResult()); m_ResultsMap.erase(It); return HttpResponseCode::OK; } { RwLock::SharedLockScope __(m_PendingLock); if (auto FindIt = m_PendingActions.find(ActionLsn); FindIt != m_PendingActions.end()) { return HttpResponseCode::Accepted; } } // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must // always be taken after m_ResultsLock if both are needed { RwLock::SharedLockScope __(m_RunningLock); if (m_RunningMap.find(ActionLsn) != m_RunningMap.end()) { return HttpResponseCode::Accepted; } } return HttpResponseCode::NotFound; } HttpResponseCode FunctionServiceSession::Impl::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage) { // This lock is held for the duration of the function since we need to // be sure that the action doesn't change state while we are checking the // different data structures RwLock::ExclusiveLockScope _(m_ResultsLock); for (auto It = begin(m_ResultsMap), End = end(m_ResultsMap); It != End; ++It) { if (It->second->ActionId == ActionId) { OutResultPackage = std::move(It->second->GetResult()); m_ResultsMap.erase(It); return HttpResponseCode::OK; } } { RwLock::SharedLockScope __(m_PendingLock); for (const auto& [K, Pending] : m_PendingActions) { if (Pending->ActionId == ActionId) { return HttpResponseCode::Accepted; } } } // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must // always be taken after m_ResultsLock if both are needed { RwLock::SharedLockScope __(m_RunningLock); for (const auto& [K, v] : m_RunningMap) { if (v->ActionId == ActionId) { return HttpResponseCode::Accepted; } } } return HttpResponseCode::NotFound; } void FunctionServiceSession::Impl::GetCompleted(CbWriter& Cbo) { Cbo.BeginArray("completed"); m_ResultsLock.WithSharedLock([&] { for (auto& Kv : m_ResultsMap) { Cbo << Kv.first; } }); Cbo.EndArray(); } # define ZEN_BATCH_SCHEDULER 1 void FunctionServiceSession::Impl::SchedulePendingActions() { int ScheduledCount = 0; size_t RunningCount = m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); }); size_t PendingCount = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); }); size_t ResultCount = m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); }); static Stopwatch DumpRunningTimer; auto _ = MakeGuard([&] { ZEN_INFO("scheduled {} pending actions. {} running ({} retired), {} still pending, {} results", ScheduledCount, RunningCount, m_RetiredCount.load(), PendingCount, ResultCount); if (DumpRunningTimer.GetElapsedTimeMs() > 30000) { DumpRunningTimer.Reset(); std::set RunningList; m_RunningLock.WithSharedLock([&] { for (auto& [K, V] : m_RunningMap) { RunningList.insert(K); } }); ExtendableStringBuilder<1024> RunningString; for (int i : RunningList) { if (RunningString.Size()) { RunningString << ", "; } RunningString.Append(IntNum(i)); } ZEN_INFO("running: {}", RunningString); } }); # if ZEN_BATCH_SCHEDULER size_t Capacity = QueryCapacity(); if (!Capacity) { _.Dismiss(); return; } std::vector> ActionsToSchedule; // Pull actions to schedule from the pending queue, we will try to submit these to the runner outside of the lock m_PendingLock.WithExclusiveLock([&] { if (m_ShutdownRequested) { return; } if (m_PendingActions.empty()) { return; } size_t NumActionsToSchedule = std::min(Capacity, m_PendingActions.size()); auto PendingIt = m_PendingActions.begin(); const auto PendingEnd = m_PendingActions.end(); while (NumActionsToSchedule && PendingIt != PendingEnd) { const Ref& Pending = PendingIt->second; switch (Pending->ActionState()) { case RunnerAction::State::Pending: ActionsToSchedule.push_back(Pending); break; case RunnerAction::State::Running: case RunnerAction::State::Completed: case RunnerAction::State::Failed: break; default: case RunnerAction::State::New: ZEN_WARN("unexpected state {} for pending action {}", static_cast(Pending->ActionState()), Pending->ActionLsn); break; } ++PendingIt; --NumActionsToSchedule; } PendingCount = m_PendingActions.size(); }); if (ActionsToSchedule.empty()) { _.Dismiss(); return; } ZEN_INFO("attempting schedule of {} pending actions", ActionsToSchedule.size()); auto SubmitResults = SubmitActions(ActionsToSchedule); // Move successfully scheduled actions to the running map and remove // from pending queue. It's actually possible that by the time we get // to this stage some of the actions may have already completed, so // they should not always be added to the running map eastl::hash_set ScheduledActions; for (size_t i = 0; i < ActionsToSchedule.size(); ++i) { const Ref& Pending = ActionsToSchedule[i]; const SubmitResult& SubResult = SubmitResults[i]; if (SubResult.IsAccepted) { ScheduledActions.insert(Pending->ActionLsn); } } ScheduledCount += (int)ActionsToSchedule.size(); # else m_PendingLock.WithExclusiveLock([&] { while (!m_PendingActions.empty()) { if (m_ShutdownRequested) { return; } // Here it would be good if we could decide to pop immediately to avoid // holding the lock while creating processes etc const Ref& Pending = m_PendingActions.begin()->second; FunctionRunner::SubmitResult SubResult = SubmitAction(Pending); if (SubResult.IsAccepted) { // Great, the job is being taken care of by the runner ZEN_DEBUG("action {} ({}) PENDING -> RUNNING", Pending->ActionId, Pending->ActionLsn); m_RunningLock.WithExclusiveLock([&] { m_RunningMap.insert({Pending->ActionLsn, Pending}); RunningCount = m_RunningMap.size(); }); m_PendingActions.pop_front(); PendingCount = m_PendingActions.size(); ++ScheduledCount; } else { // Runner could not accept the job, leave it on the pending queue return; } } }); # endif } void FunctionServiceSession::Impl::MonitorThreadFunction() { SetCurrentThreadName("FunctionServiceSession_Monitor"); auto _ = MakeGuard([&] { ZEN_INFO("monitor thread exiting"); }); do { int TimeoutMs = 1000; if (m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); })) { TimeoutMs = 100; } const bool Timedout = m_SchedulingThreadEvent.Wait(TimeoutMs); if (m_SchedulingThreadEnabled == false) { return; } HandleActionUpdates(); // Schedule pending actions SchedulePendingActions(); if (!Timedout) { m_SchedulingThreadEvent.Reset(); } } while (m_SchedulingThreadEnabled); } void FunctionServiceSession::Impl::PostUpdate(RunnerAction* Action) { m_UpdatedActionsLock.WithExclusiveLock([&] { m_UpdatedActions.emplace_back(Action); }); } void FunctionServiceSession::Impl::HandleActionUpdates() { std::vector> UpdatedActions; m_UpdatedActionsLock.WithExclusiveLock([&] { std::swap(UpdatedActions, m_UpdatedActions); }); std::unordered_set SeenLsn; std::unordered_set RunningLsn; for (Ref& Action : UpdatedActions) { const int ActionLsn = Action->ActionLsn; if (auto [It, Inserted] = SeenLsn.insert(ActionLsn); Inserted) { switch (Action->ActionState()) { case RunnerAction::State::Pending: m_PendingLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Action}); }); break; case RunnerAction::State::Running: m_PendingLock.WithExclusiveLock([&] { m_RunningLock.WithExclusiveLock([&] { m_RunningMap.insert({ActionLsn, Action}); m_PendingActions.erase(ActionLsn); }); }); ZEN_DEBUG("action {} ({}) RUNNING", Action->ActionId, ActionLsn); break; case RunnerAction::State::Completed: case RunnerAction::State::Failed: m_ResultsLock.WithExclusiveLock([&] { m_ResultsMap[ActionLsn] = Action; m_PendingLock.WithExclusiveLock([&] { m_RunningLock.WithExclusiveLock([&] { if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end()) { m_PendingActions.erase(ActionLsn); } else { m_RunningMap.erase(FindIt); } }); }); m_ActionHistoryLock.WithExclusiveLock([&] { ActionHistoryEntry Entry{.Lsn = ActionLsn, .ActionId = Action->ActionId, .WorkerId = Action->Worker.WorkerId, .ActionDescriptor = Action->ActionObj, .Succeeded = Action->ActionState() == RunnerAction::State::Completed}; std::copy(std::begin(Action->Timestamps), std::end(Action->Timestamps), std::begin(Entry.Timestamps)); m_ActionHistory.push_back(std::move(Entry)); if (m_ActionHistory.size() > m_HistoryLimit) { m_ActionHistory.pop_front(); } }); }); m_RetiredCount.fetch_add(1); m_ResultRate.Mark(1); ZEN_DEBUG("action {} ({}) RUNNING -> COMPLETED with {}", Action->ActionId, ActionLsn, Action->ActionState() == RunnerAction::State::Completed ? "SUCCESS" : "FAILURE"); break; } } } } size_t FunctionServiceSession::Impl::QueryCapacity() { return m_LocalRunnerGroup.QueryCapacity() + m_RemoteRunnerGroup.QueryCapacity(); } std::vector FunctionServiceSession::Impl::SubmitActions(const std::vector>& Actions) { std::vector Results; for (const Ref& Action : Actions) { Results.push_back(SubmitAction(Action)); } return Results; } ////////////////////////////////////////////////////////////////////////// FunctionServiceSession::FunctionServiceSession(ChunkResolver& InChunkResolver) { m_Impl = std::make_unique(this, InChunkResolver); } FunctionServiceSession::~FunctionServiceSession() { Shutdown(); } bool FunctionServiceSession::IsHealthy() { return m_Impl->IsHealthy(); } void FunctionServiceSession::Shutdown() { m_Impl->Shutdown(); } void FunctionServiceSession::StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath) { m_Impl->StartRecording(InResolver, RecordingPath); } void FunctionServiceSession::StopRecording() { m_Impl->StopRecording(); } void FunctionServiceSession::EmitStats(CbObjectWriter& Cbo) { m_Impl->EmitStats(Cbo); } std::vector FunctionServiceSession::GetKnownWorkerIds() { return m_Impl->GetKnownWorkerIds(); } WorkerDesc FunctionServiceSession::GetWorkerDescriptor(const IoHash& WorkerId) { return m_Impl->GetWorkerDescriptor(WorkerId); } void FunctionServiceSession::AddLocalRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath) { m_Impl->m_LocalRunnerGroup.AddRunner(new LocalProcessRunner(InChunkResolver, BasePath)); } void FunctionServiceSession::AddRemoteRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, std::string_view HostName) { m_Impl->m_RemoteRunnerGroup.AddRunner(new RemoteHttpRunner(InChunkResolver, BasePath, HostName)); } FunctionServiceSession::EnqueueResult FunctionServiceSession::EnqueueAction(CbObject ActionObject, int Priority) { return m_Impl->EnqueueAction(ActionObject, Priority); } FunctionServiceSession::EnqueueResult FunctionServiceSession::EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority) { return m_Impl->EnqueueResolvedAction(Worker, ActionObj, RequestPriority); } void FunctionServiceSession::RegisterWorker(CbPackage Worker) { m_Impl->RegisterWorker(Worker); } HttpResponseCode FunctionServiceSession::GetActionResult(int ActionLsn, CbPackage& OutResultPackage) { return m_Impl->GetActionResult(ActionLsn, OutResultPackage); } HttpResponseCode FunctionServiceSession::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage) { return m_Impl->FindActionResult(ActionId, OutResultPackage); } std::vector FunctionServiceSession::GetActionHistory(int Limit) { return m_Impl->GetActionHistory(Limit); } void FunctionServiceSession::GetCompleted(CbWriter& Cbo) { m_Impl->GetCompleted(Cbo); } void FunctionServiceSession::PostUpdate(RunnerAction* Action) { m_Impl->PostUpdate(Action); } ////////////////////////////////////////////////////////////////////////// void function_forcelink() { } } // namespace zen::compute #endif // ZEN_WITH_COMPUTE_SERVICES