diff options
Diffstat (limited to 'src/zencompute/functionservice.cpp')
| -rw-r--r-- | src/zencompute/functionservice.cpp | 957 |
1 files changed, 0 insertions, 957 deletions
diff --git a/src/zencompute/functionservice.cpp b/src/zencompute/functionservice.cpp deleted file mode 100644 index 0698449e9..000000000 --- a/src/zencompute/functionservice.cpp +++ /dev/null @@ -1,957 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "zencompute/functionservice.h" - -#if ZEN_WITH_COMPUTE_SERVICES - -# include "functionrunner.h" -# include "actionrecorder.h" -# include "localrunner.h" -# include "remotehttprunner.h" - -# include <zencompute/recordingreader.h> -# include <zencore/compactbinary.h> -# include <zencore/compactbinarybuilder.h> -# include <zencore/compactbinarypackage.h> -# include <zencore/compress.h> -# include <zencore/except.h> -# include <zencore/filesystem.h> -# include <zencore/fmtutils.h> -# include <zencore/iobuffer.h> -# include <zencore/iohash.h> -# include <zencore/logging.h> -# include <zencore/scopeguard.h> -# include <zentelemetry/stats.h> - -# include <set> -# include <deque> -# include <map> -# include <thread> -# include <unordered_map> - -ZEN_THIRD_PARTY_INCLUDES_START -# include <EASTL/hash_set.h> -ZEN_THIRD_PARTY_INCLUDES_END - -using namespace std::literals; - -namespace zen::compute { - -////////////////////////////////////////////////////////////////////////// - -struct FunctionServiceSession::Impl -{ - FunctionServiceSession* m_FunctionServiceSession; - ChunkResolver& m_ChunkResolver; - LoggerRef m_Log{logging::Get("apply")}; - - Impl(FunctionServiceSession* InFunctionServiceSession, ChunkResolver& InChunkResolver) - : m_FunctionServiceSession(InFunctionServiceSession) - , m_ChunkResolver(InChunkResolver) - { - m_SchedulingThread = std::thread{&Impl::MonitorThreadFunction, this}; - } - - void Shutdown(); - bool IsHealthy(); - - LoggerRef Log() { return m_Log; } - - std::atomic_bool m_AcceptActions = true; - - struct FunctionDefinition - { - std::string FunctionName; - Guid FunctionVersion; - Guid BuildSystemVersion; - IoHash WorkerId; - }; - - void EmitStats(CbObjectWriter& Cbo) - { - m_WorkerLock.WithSharedLock([&] { Cbo << "worker_count"sv << m_WorkerMap.size(); }); - m_ResultsLock.WithSharedLock([&] { Cbo << "actions_complete"sv << m_ResultsMap.size(); }); - m_PendingLock.WithSharedLock([&] { Cbo << "actions_pending"sv << m_PendingActions.size(); }); - Cbo << "actions_submitted"sv << GetSubmittedActionCount(); - EmitSnapshot("actions_retired"sv, m_ResultRate, Cbo); - } - - void RegisterWorker(CbPackage Worker); - WorkerDesc GetWorkerDescriptor(const IoHash& WorkerId); - - std::atomic<int32_t> m_ActionsCounter = 0; // sequence number - - RwLock m_PendingLock; - std::map<int, Ref<RunnerAction>> m_PendingActions; - - RwLock m_RunningLock; - std::unordered_map<int, Ref<RunnerAction>> m_RunningMap; - - RwLock m_ResultsLock; - std::unordered_map<int, Ref<RunnerAction>> m_ResultsMap; - metrics::Meter m_ResultRate; - std::atomic<uint64_t> m_RetiredCount{0}; - - HttpResponseCode GetActionResult(int ActionLsn, CbPackage& OutResultPackage); - HttpResponseCode FindActionResult(const IoHash& ActionId, CbPackage& ResultPackage); - - std::atomic<bool> m_ShutdownRequested{false}; - - std::thread m_SchedulingThread; - std::atomic<bool> m_SchedulingThreadEnabled{true}; - Event m_SchedulingThreadEvent; - - void MonitorThreadFunction(); - void SchedulePendingActions(); - - // Workers - - RwLock m_WorkerLock; - std::unordered_map<IoHash, CbPackage> m_WorkerMap; - std::vector<FunctionDefinition> m_FunctionList; - std::vector<IoHash> GetKnownWorkerIds(); - - // Runners - - RunnerGroup<LocalProcessRunner> m_LocalRunnerGroup; - RunnerGroup<RemoteHttpRunner> m_RemoteRunnerGroup; - - EnqueueResult EnqueueAction(CbObject ActionObject, int Priority); - EnqueueResult EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority); - - void GetCompleted(CbWriter& Cbo); - - // Recording - - void StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath); - void StopRecording(); - - std::unique_ptr<ActionRecorder> m_Recorder; - - // History tracking - - RwLock m_ActionHistoryLock; - std::deque<FunctionServiceSession::ActionHistoryEntry> m_ActionHistory; - size_t m_HistoryLimit = 1000; - - std::vector<FunctionServiceSession::ActionHistoryEntry> GetActionHistory(int Limit); - - // - - [[nodiscard]] size_t QueryCapacity(); - - [[nodiscard]] SubmitResult SubmitAction(Ref<RunnerAction> Action); - [[nodiscard]] std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions); - [[nodiscard]] size_t GetSubmittedActionCount(); - - // Updates - - RwLock m_UpdatedActionsLock; - std::vector<Ref<RunnerAction>> m_UpdatedActions; - - void HandleActionUpdates(); - void PostUpdate(RunnerAction* Action); - - void ShutdownRunners(); -}; - -bool -FunctionServiceSession::Impl::IsHealthy() -{ - return true; -} - -void -FunctionServiceSession::Impl::Shutdown() -{ - m_AcceptActions = false; - m_ShutdownRequested = true; - - m_SchedulingThreadEnabled = false; - m_SchedulingThreadEvent.Set(); - if (m_SchedulingThread.joinable()) - { - m_SchedulingThread.join(); - } - - ShutdownRunners(); -} - -void -FunctionServiceSession::Impl::ShutdownRunners() -{ - m_LocalRunnerGroup.Shutdown(); - m_RemoteRunnerGroup.Shutdown(); -} - -void -FunctionServiceSession::Impl::StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath) -{ - ZEN_INFO("starting recording to '{}'", RecordingPath); - - m_Recorder = std::make_unique<ActionRecorder>(InCidStore, RecordingPath); - - ZEN_INFO("started recording to '{}'", RecordingPath); -} - -void -FunctionServiceSession::Impl::StopRecording() -{ - ZEN_INFO("stopping recording"); - - m_Recorder = nullptr; - - ZEN_INFO("stopped recording"); -} - -std::vector<FunctionServiceSession::ActionHistoryEntry> -FunctionServiceSession::Impl::GetActionHistory(int Limit) -{ - RwLock::SharedLockScope _(m_ActionHistoryLock); - - if (Limit > 0 && static_cast<size_t>(Limit) < m_ActionHistory.size()) - { - return std::vector<ActionHistoryEntry>(m_ActionHistory.end() - Limit, m_ActionHistory.end()); - } - - return std::vector<ActionHistoryEntry>(m_ActionHistory.begin(), m_ActionHistory.end()); -} - -void -FunctionServiceSession::Impl::RegisterWorker(CbPackage Worker) -{ - RwLock::ExclusiveLockScope _(m_WorkerLock); - - const IoHash& WorkerId = Worker.GetObject().GetHash(); - - if (m_WorkerMap.insert_or_assign(WorkerId, Worker).second) - { - // Note that since the convention currently is that WorkerId is equal to the hash - // of the worker descriptor there is no chance that we get a second write with a - // different descriptor. Thus we only need to call this the first time, when the - // worker is added - - m_LocalRunnerGroup.RegisterWorker(Worker); - m_RemoteRunnerGroup.RegisterWorker(Worker); - - if (m_Recorder) - { - m_Recorder->RegisterWorker(Worker); - } - - CbObject WorkerObj = Worker.GetObject(); - - // Populate worker database - - const Guid WorkerBuildSystemVersion = WorkerObj["buildsystem_version"sv].AsUuid(); - - for (auto& Item : WorkerObj["functions"sv]) - { - CbObjectView Function = Item.AsObjectView(); - - std::string_view FunctionName = Function["name"sv].AsString(); - const Guid FunctionVersion = Function["version"sv].AsUuid(); - - m_FunctionList.emplace_back(FunctionDefinition{.FunctionName = std::string{FunctionName}, - .FunctionVersion = FunctionVersion, - .BuildSystemVersion = WorkerBuildSystemVersion, - .WorkerId = WorkerId}); - } - } -} - -WorkerDesc -FunctionServiceSession::Impl::GetWorkerDescriptor(const IoHash& WorkerId) -{ - RwLock::SharedLockScope _(m_WorkerLock); - - if (auto It = m_WorkerMap.find(WorkerId); It != m_WorkerMap.end()) - { - const CbPackage& Desc = It->second; - return {Desc, WorkerId}; - } - - return {}; -} - -std::vector<IoHash> -FunctionServiceSession::Impl::GetKnownWorkerIds() -{ - std::vector<IoHash> WorkerIds; - WorkerIds.reserve(m_WorkerMap.size()); - - m_WorkerLock.WithSharedLock([&] { - for (const auto& [WorkerId, _] : m_WorkerMap) - { - WorkerIds.push_back(WorkerId); - } - }); - - return WorkerIds; -} - -FunctionServiceSession::EnqueueResult -FunctionServiceSession::Impl::EnqueueAction(CbObject ActionObject, int Priority) -{ - // Resolve function to worker - - IoHash WorkerId{IoHash::Zero}; - - std::string_view FunctionName = ActionObject["Function"sv].AsString(); - const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid(); - const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid(); - - for (const FunctionDefinition& FuncDef : m_FunctionList) - { - if (FuncDef.FunctionName == FunctionName && FuncDef.FunctionVersion == FunctionVersion && - FuncDef.BuildSystemVersion == BuildSystemVersion) - { - WorkerId = FuncDef.WorkerId; - - break; - } - } - - if (WorkerId == IoHash::Zero) - { - CbObjectWriter Writer; - - Writer << "Function"sv << FunctionName << "FunctionVersion"sv << FunctionVersion << "BuildSystemVersion" << BuildSystemVersion; - Writer << "error" - << "no worker matches the action specification"; - - return {0, Writer.Save()}; - } - - if (auto It = m_WorkerMap.find(WorkerId); It != m_WorkerMap.end()) - { - CbPackage WorkerPackage = It->second; - - return EnqueueResolvedAction(WorkerDesc{WorkerPackage, WorkerId}, ActionObject, Priority); - } - - CbObjectWriter Writer; - - Writer << "Function"sv << FunctionName << "FunctionVersion"sv << FunctionVersion << "BuildSystemVersion" << BuildSystemVersion; - Writer << "error" - << "no worker found despite match"; - - return {0, Writer.Save()}; -} - -FunctionServiceSession::EnqueueResult -FunctionServiceSession::Impl::EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority) -{ - const int ActionLsn = ++m_ActionsCounter; - - Ref<RunnerAction> Pending{new RunnerAction(m_FunctionServiceSession)}; - - Pending->ActionLsn = ActionLsn; - Pending->Worker = Worker; - Pending->ActionId = ActionObj.GetHash(); - Pending->ActionObj = ActionObj; - Pending->Priority = RequestPriority; - - SubmitResult SubResult = SubmitAction(Pending); - - if (SubResult.IsAccepted) - { - // Great, the job is being taken care of by the runner - ZEN_DEBUG("direct schedule LSN {}", Pending->ActionLsn); - } - else - { - ZEN_DEBUG("action {} ({}) PENDING", Pending->ActionId, Pending->ActionLsn); - - Pending->SetActionState(RunnerAction::State::Pending); - } - - if (m_Recorder) - { - m_Recorder->RecordAction(Pending); - } - - CbObjectWriter Writer; - Writer << "lsn" << Pending->ActionLsn; - Writer << "worker" << Pending->Worker.WorkerId; - Writer << "action" << Pending->ActionId; - - return {Pending->ActionLsn, Writer.Save()}; -} - -SubmitResult -FunctionServiceSession::Impl::SubmitAction(Ref<RunnerAction> Action) -{ - // Loosely round-robin scheduling of actions across runners. - // - // It's not entirely clear what this means given that submits - // can come in across multiple threads, but it's probably better - // than always starting with the first runner. - // - // Longer term we should track the state of the individual - // runners and make decisions accordingly. - - SubmitResult Result = m_LocalRunnerGroup.SubmitAction(Action); - if (Result.IsAccepted) - { - return Result; - } - - return m_RemoteRunnerGroup.SubmitAction(Action); -} - -size_t -FunctionServiceSession::Impl::GetSubmittedActionCount() -{ - return m_LocalRunnerGroup.GetSubmittedActionCount() + m_RemoteRunnerGroup.GetSubmittedActionCount(); -} - -HttpResponseCode -FunctionServiceSession::Impl::GetActionResult(int ActionLsn, CbPackage& OutResultPackage) -{ - // This lock is held for the duration of the function since we need to - // be sure that the action doesn't change state while we are checking the - // different data structures - - RwLock::ExclusiveLockScope _(m_ResultsLock); - - if (auto It = m_ResultsMap.find(ActionLsn); It != m_ResultsMap.end()) - { - OutResultPackage = std::move(It->second->GetResult()); - - m_ResultsMap.erase(It); - - return HttpResponseCode::OK; - } - - { - RwLock::SharedLockScope __(m_PendingLock); - - if (auto FindIt = m_PendingActions.find(ActionLsn); FindIt != m_PendingActions.end()) - { - return HttpResponseCode::Accepted; - } - } - - // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must - // always be taken after m_ResultsLock if both are needed - - { - RwLock::SharedLockScope __(m_RunningLock); - - if (m_RunningMap.find(ActionLsn) != m_RunningMap.end()) - { - return HttpResponseCode::Accepted; - } - } - - return HttpResponseCode::NotFound; -} - -HttpResponseCode -FunctionServiceSession::Impl::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage) -{ - // This lock is held for the duration of the function since we need to - // be sure that the action doesn't change state while we are checking the - // different data structures - - RwLock::ExclusiveLockScope _(m_ResultsLock); - - for (auto It = begin(m_ResultsMap), End = end(m_ResultsMap); It != End; ++It) - { - if (It->second->ActionId == ActionId) - { - OutResultPackage = std::move(It->second->GetResult()); - - m_ResultsMap.erase(It); - - return HttpResponseCode::OK; - } - } - - { - RwLock::SharedLockScope __(m_PendingLock); - - for (const auto& [K, Pending] : m_PendingActions) - { - if (Pending->ActionId == ActionId) - { - return HttpResponseCode::Accepted; - } - } - } - - // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must - // always be taken after m_ResultsLock if both are needed - - { - RwLock::SharedLockScope __(m_RunningLock); - - for (const auto& [K, v] : m_RunningMap) - { - if (v->ActionId == ActionId) - { - return HttpResponseCode::Accepted; - } - } - } - - return HttpResponseCode::NotFound; -} - -void -FunctionServiceSession::Impl::GetCompleted(CbWriter& Cbo) -{ - Cbo.BeginArray("completed"); - - m_ResultsLock.WithSharedLock([&] { - for (auto& Kv : m_ResultsMap) - { - Cbo << Kv.first; - } - }); - - Cbo.EndArray(); -} - -# define ZEN_BATCH_SCHEDULER 1 - -void -FunctionServiceSession::Impl::SchedulePendingActions() -{ - int ScheduledCount = 0; - size_t RunningCount = m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); }); - size_t PendingCount = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); }); - size_t ResultCount = m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); }); - - static Stopwatch DumpRunningTimer; - - auto _ = MakeGuard([&] { - ZEN_INFO("scheduled {} pending actions. {} running ({} retired), {} still pending, {} results", - ScheduledCount, - RunningCount, - m_RetiredCount.load(), - PendingCount, - ResultCount); - - if (DumpRunningTimer.GetElapsedTimeMs() > 30000) - { - DumpRunningTimer.Reset(); - - std::set<int> RunningList; - m_RunningLock.WithSharedLock([&] { - for (auto& [K, V] : m_RunningMap) - { - RunningList.insert(K); - } - }); - - ExtendableStringBuilder<1024> RunningString; - for (int i : RunningList) - { - if (RunningString.Size()) - { - RunningString << ", "; - } - - RunningString.Append(IntNum(i)); - } - - ZEN_INFO("running: {}", RunningString); - } - }); - -# if ZEN_BATCH_SCHEDULER - size_t Capacity = QueryCapacity(); - - if (!Capacity) - { - _.Dismiss(); - - return; - } - - std::vector<Ref<RunnerAction>> ActionsToSchedule; - - // Pull actions to schedule from the pending queue, we will try to submit these to the runner outside of the lock - - m_PendingLock.WithExclusiveLock([&] { - if (m_ShutdownRequested) - { - return; - } - - if (m_PendingActions.empty()) - { - return; - } - - size_t NumActionsToSchedule = std::min(Capacity, m_PendingActions.size()); - - auto PendingIt = m_PendingActions.begin(); - const auto PendingEnd = m_PendingActions.end(); - - while (NumActionsToSchedule && PendingIt != PendingEnd) - { - const Ref<RunnerAction>& Pending = PendingIt->second; - - switch (Pending->ActionState()) - { - case RunnerAction::State::Pending: - ActionsToSchedule.push_back(Pending); - break; - - case RunnerAction::State::Running: - case RunnerAction::State::Completed: - case RunnerAction::State::Failed: - break; - - default: - case RunnerAction::State::New: - ZEN_WARN("unexpected state {} for pending action {}", static_cast<int>(Pending->ActionState()), Pending->ActionLsn); - break; - } - - ++PendingIt; - --NumActionsToSchedule; - } - - PendingCount = m_PendingActions.size(); - }); - - if (ActionsToSchedule.empty()) - { - _.Dismiss(); - return; - } - - ZEN_INFO("attempting schedule of {} pending actions", ActionsToSchedule.size()); - - auto SubmitResults = SubmitActions(ActionsToSchedule); - - // Move successfully scheduled actions to the running map and remove - // from pending queue. It's actually possible that by the time we get - // to this stage some of the actions may have already completed, so - // they should not always be added to the running map - - eastl::hash_set<int> ScheduledActions; - - for (size_t i = 0; i < ActionsToSchedule.size(); ++i) - { - const Ref<RunnerAction>& Pending = ActionsToSchedule[i]; - const SubmitResult& SubResult = SubmitResults[i]; - - if (SubResult.IsAccepted) - { - ScheduledActions.insert(Pending->ActionLsn); - } - } - - ScheduledCount += (int)ActionsToSchedule.size(); - -# else - m_PendingLock.WithExclusiveLock([&] { - while (!m_PendingActions.empty()) - { - if (m_ShutdownRequested) - { - return; - } - - // Here it would be good if we could decide to pop immediately to avoid - // holding the lock while creating processes etc - const Ref<RunnerAction>& Pending = m_PendingActions.begin()->second; - FunctionRunner::SubmitResult SubResult = SubmitAction(Pending); - - if (SubResult.IsAccepted) - { - // Great, the job is being taken care of by the runner - - ZEN_DEBUG("action {} ({}) PENDING -> RUNNING", Pending->ActionId, Pending->ActionLsn); - - m_RunningLock.WithExclusiveLock([&] { - m_RunningMap.insert({Pending->ActionLsn, Pending}); - - RunningCount = m_RunningMap.size(); - }); - - m_PendingActions.pop_front(); - - PendingCount = m_PendingActions.size(); - ++ScheduledCount; - } - else - { - // Runner could not accept the job, leave it on the pending queue - - return; - } - } - }); -# endif -} - -void -FunctionServiceSession::Impl::MonitorThreadFunction() -{ - SetCurrentThreadName("FunctionServiceSession_Monitor"); - - auto _ = MakeGuard([&] { ZEN_INFO("monitor thread exiting"); }); - - do - { - int TimeoutMs = 1000; - - if (m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); })) - { - TimeoutMs = 100; - } - - const bool Timedout = m_SchedulingThreadEvent.Wait(TimeoutMs); - - if (m_SchedulingThreadEnabled == false) - { - return; - } - - HandleActionUpdates(); - - // Schedule pending actions - - SchedulePendingActions(); - - if (!Timedout) - { - m_SchedulingThreadEvent.Reset(); - } - } while (m_SchedulingThreadEnabled); -} - -void -FunctionServiceSession::Impl::PostUpdate(RunnerAction* Action) -{ - m_UpdatedActionsLock.WithExclusiveLock([&] { m_UpdatedActions.emplace_back(Action); }); -} - -void -FunctionServiceSession::Impl::HandleActionUpdates() -{ - std::vector<Ref<RunnerAction>> UpdatedActions; - - m_UpdatedActionsLock.WithExclusiveLock([&] { std::swap(UpdatedActions, m_UpdatedActions); }); - - std::unordered_set<int> SeenLsn; - std::unordered_set<int> RunningLsn; - - for (Ref<RunnerAction>& Action : UpdatedActions) - { - const int ActionLsn = Action->ActionLsn; - - if (auto [It, Inserted] = SeenLsn.insert(ActionLsn); Inserted) - { - switch (Action->ActionState()) - { - case RunnerAction::State::Pending: - m_PendingLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Action}); }); - break; - - case RunnerAction::State::Running: - m_PendingLock.WithExclusiveLock([&] { - m_RunningLock.WithExclusiveLock([&] { - m_RunningMap.insert({ActionLsn, Action}); - m_PendingActions.erase(ActionLsn); - }); - }); - ZEN_DEBUG("action {} ({}) RUNNING", Action->ActionId, ActionLsn); - break; - - case RunnerAction::State::Completed: - case RunnerAction::State::Failed: - m_ResultsLock.WithExclusiveLock([&] { - m_ResultsMap[ActionLsn] = Action; - - m_PendingLock.WithExclusiveLock([&] { - m_RunningLock.WithExclusiveLock([&] { - if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end()) - { - m_PendingActions.erase(ActionLsn); - } - else - { - m_RunningMap.erase(FindIt); - } - }); - }); - - m_ActionHistoryLock.WithExclusiveLock([&] { - ActionHistoryEntry Entry{.Lsn = ActionLsn, - .ActionId = Action->ActionId, - .WorkerId = Action->Worker.WorkerId, - .ActionDescriptor = Action->ActionObj, - .Succeeded = Action->ActionState() == RunnerAction::State::Completed}; - - std::copy(std::begin(Action->Timestamps), std::end(Action->Timestamps), std::begin(Entry.Timestamps)); - - m_ActionHistory.push_back(std::move(Entry)); - - if (m_ActionHistory.size() > m_HistoryLimit) - { - m_ActionHistory.pop_front(); - } - }); - }); - m_RetiredCount.fetch_add(1); - m_ResultRate.Mark(1); - ZEN_DEBUG("action {} ({}) RUNNING -> COMPLETED with {}", - Action->ActionId, - ActionLsn, - Action->ActionState() == RunnerAction::State::Completed ? "SUCCESS" : "FAILURE"); - break; - } - } - } -} - -size_t -FunctionServiceSession::Impl::QueryCapacity() -{ - return m_LocalRunnerGroup.QueryCapacity() + m_RemoteRunnerGroup.QueryCapacity(); -} - -std::vector<SubmitResult> -FunctionServiceSession::Impl::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) -{ - std::vector<SubmitResult> Results; - - for (const Ref<RunnerAction>& Action : Actions) - { - Results.push_back(SubmitAction(Action)); - } - - return Results; -} - -////////////////////////////////////////////////////////////////////////// - -FunctionServiceSession::FunctionServiceSession(ChunkResolver& InChunkResolver) -{ - m_Impl = std::make_unique<Impl>(this, InChunkResolver); -} - -FunctionServiceSession::~FunctionServiceSession() -{ - Shutdown(); -} - -bool -FunctionServiceSession::IsHealthy() -{ - return m_Impl->IsHealthy(); -} - -void -FunctionServiceSession::Shutdown() -{ - m_Impl->Shutdown(); -} - -void -FunctionServiceSession::StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath) -{ - m_Impl->StartRecording(InResolver, RecordingPath); -} - -void -FunctionServiceSession::StopRecording() -{ - m_Impl->StopRecording(); -} - -void -FunctionServiceSession::EmitStats(CbObjectWriter& Cbo) -{ - m_Impl->EmitStats(Cbo); -} - -std::vector<IoHash> -FunctionServiceSession::GetKnownWorkerIds() -{ - return m_Impl->GetKnownWorkerIds(); -} - -WorkerDesc -FunctionServiceSession::GetWorkerDescriptor(const IoHash& WorkerId) -{ - return m_Impl->GetWorkerDescriptor(WorkerId); -} - -void -FunctionServiceSession::AddLocalRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath) -{ - m_Impl->m_LocalRunnerGroup.AddRunner(new LocalProcessRunner(InChunkResolver, BasePath)); -} - -void -FunctionServiceSession::AddRemoteRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, std::string_view HostName) -{ - m_Impl->m_RemoteRunnerGroup.AddRunner(new RemoteHttpRunner(InChunkResolver, BasePath, HostName)); -} - -FunctionServiceSession::EnqueueResult -FunctionServiceSession::EnqueueAction(CbObject ActionObject, int Priority) -{ - return m_Impl->EnqueueAction(ActionObject, Priority); -} - -FunctionServiceSession::EnqueueResult -FunctionServiceSession::EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority) -{ - return m_Impl->EnqueueResolvedAction(Worker, ActionObj, RequestPriority); -} - -void -FunctionServiceSession::RegisterWorker(CbPackage Worker) -{ - m_Impl->RegisterWorker(Worker); -} - -HttpResponseCode -FunctionServiceSession::GetActionResult(int ActionLsn, CbPackage& OutResultPackage) -{ - return m_Impl->GetActionResult(ActionLsn, OutResultPackage); -} - -HttpResponseCode -FunctionServiceSession::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage) -{ - return m_Impl->FindActionResult(ActionId, OutResultPackage); -} - -std::vector<FunctionServiceSession::ActionHistoryEntry> -FunctionServiceSession::GetActionHistory(int Limit) -{ - return m_Impl->GetActionHistory(Limit); -} - -void -FunctionServiceSession::GetCompleted(CbWriter& Cbo) -{ - m_Impl->GetCompleted(Cbo); -} - -void -FunctionServiceSession::PostUpdate(RunnerAction* Action) -{ - m_Impl->PostUpdate(Action); -} - -////////////////////////////////////////////////////////////////////////// - -void -function_forcelink() -{ -} - -} // namespace zen::compute - -#endif // ZEN_WITH_COMPUTE_SERVICES |