aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/functionservice.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencompute/functionservice.cpp')
-rw-r--r--src/zencompute/functionservice.cpp957
1 files changed, 957 insertions, 0 deletions
diff --git a/src/zencompute/functionservice.cpp b/src/zencompute/functionservice.cpp
new file mode 100644
index 000000000..0698449e9
--- /dev/null
+++ b/src/zencompute/functionservice.cpp
@@ -0,0 +1,957 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zencompute/functionservice.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include "functionrunner.h"
+# include "actionrecorder.h"
+# include "localrunner.h"
+# include "remotehttprunner.h"
+
+# include <zencompute/recordingreader.h>
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/compress.h>
+# include <zencore/except.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/iobuffer.h>
+# include <zencore/iohash.h>
+# include <zencore/logging.h>
+# include <zencore/scopeguard.h>
+# include <zentelemetry/stats.h>
+
+# include <set>
+# include <deque>
+# include <map>
+# include <thread>
+# include <unordered_map>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+# include <EASTL/hash_set.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+using namespace std::literals;
+
+namespace zen::compute {
+
+//////////////////////////////////////////////////////////////////////////
+
+struct FunctionServiceSession::Impl
+{
+ FunctionServiceSession* m_FunctionServiceSession;
+ ChunkResolver& m_ChunkResolver;
+ LoggerRef m_Log{logging::Get("apply")};
+
+ Impl(FunctionServiceSession* InFunctionServiceSession, ChunkResolver& InChunkResolver)
+ : m_FunctionServiceSession(InFunctionServiceSession)
+ , m_ChunkResolver(InChunkResolver)
+ {
+ m_SchedulingThread = std::thread{&Impl::MonitorThreadFunction, this};
+ }
+
+ void Shutdown();
+ bool IsHealthy();
+
+ LoggerRef Log() { return m_Log; }
+
+ std::atomic_bool m_AcceptActions = true;
+
+ struct FunctionDefinition
+ {
+ std::string FunctionName;
+ Guid FunctionVersion;
+ Guid BuildSystemVersion;
+ IoHash WorkerId;
+ };
+
+ void EmitStats(CbObjectWriter& Cbo)
+ {
+ m_WorkerLock.WithSharedLock([&] { Cbo << "worker_count"sv << m_WorkerMap.size(); });
+ m_ResultsLock.WithSharedLock([&] { Cbo << "actions_complete"sv << m_ResultsMap.size(); });
+ m_PendingLock.WithSharedLock([&] { Cbo << "actions_pending"sv << m_PendingActions.size(); });
+ Cbo << "actions_submitted"sv << GetSubmittedActionCount();
+ EmitSnapshot("actions_retired"sv, m_ResultRate, Cbo);
+ }
+
+ void RegisterWorker(CbPackage Worker);
+ WorkerDesc GetWorkerDescriptor(const IoHash& WorkerId);
+
+ std::atomic<int32_t> m_ActionsCounter = 0; // sequence number
+
+ RwLock m_PendingLock;
+ std::map<int, Ref<RunnerAction>> m_PendingActions;
+
+ RwLock m_RunningLock;
+ std::unordered_map<int, Ref<RunnerAction>> m_RunningMap;
+
+ RwLock m_ResultsLock;
+ std::unordered_map<int, Ref<RunnerAction>> m_ResultsMap;
+ metrics::Meter m_ResultRate;
+ std::atomic<uint64_t> m_RetiredCount{0};
+
+ HttpResponseCode GetActionResult(int ActionLsn, CbPackage& OutResultPackage);
+ HttpResponseCode FindActionResult(const IoHash& ActionId, CbPackage& ResultPackage);
+
+ std::atomic<bool> m_ShutdownRequested{false};
+
+ std::thread m_SchedulingThread;
+ std::atomic<bool> m_SchedulingThreadEnabled{true};
+ Event m_SchedulingThreadEvent;
+
+ void MonitorThreadFunction();
+ void SchedulePendingActions();
+
+ // Workers
+
+ RwLock m_WorkerLock;
+ std::unordered_map<IoHash, CbPackage> m_WorkerMap;
+ std::vector<FunctionDefinition> m_FunctionList;
+ std::vector<IoHash> GetKnownWorkerIds();
+
+ // Runners
+
+ RunnerGroup<LocalProcessRunner> m_LocalRunnerGroup;
+ RunnerGroup<RemoteHttpRunner> m_RemoteRunnerGroup;
+
+ EnqueueResult EnqueueAction(CbObject ActionObject, int Priority);
+ EnqueueResult EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority);
+
+ void GetCompleted(CbWriter& Cbo);
+
+ // Recording
+
+ void StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath);
+ void StopRecording();
+
+ std::unique_ptr<ActionRecorder> m_Recorder;
+
+ // History tracking
+
+ RwLock m_ActionHistoryLock;
+ std::deque<FunctionServiceSession::ActionHistoryEntry> m_ActionHistory;
+ size_t m_HistoryLimit = 1000;
+
+ std::vector<FunctionServiceSession::ActionHistoryEntry> GetActionHistory(int Limit);
+
+ //
+
+ [[nodiscard]] size_t QueryCapacity();
+
+ [[nodiscard]] SubmitResult SubmitAction(Ref<RunnerAction> Action);
+ [[nodiscard]] std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions);
+ [[nodiscard]] size_t GetSubmittedActionCount();
+
+ // Updates
+
+ RwLock m_UpdatedActionsLock;
+ std::vector<Ref<RunnerAction>> m_UpdatedActions;
+
+ void HandleActionUpdates();
+ void PostUpdate(RunnerAction* Action);
+
+ void ShutdownRunners();
+};
+
+bool
+FunctionServiceSession::Impl::IsHealthy()
+{
+ return true;
+}
+
+void
+FunctionServiceSession::Impl::Shutdown()
+{
+ m_AcceptActions = false;
+ m_ShutdownRequested = true;
+
+ m_SchedulingThreadEnabled = false;
+ m_SchedulingThreadEvent.Set();
+ if (m_SchedulingThread.joinable())
+ {
+ m_SchedulingThread.join();
+ }
+
+ ShutdownRunners();
+}
+
+void
+FunctionServiceSession::Impl::ShutdownRunners()
+{
+ m_LocalRunnerGroup.Shutdown();
+ m_RemoteRunnerGroup.Shutdown();
+}
+
+void
+FunctionServiceSession::Impl::StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath)
+{
+ ZEN_INFO("starting recording to '{}'", RecordingPath);
+
+ m_Recorder = std::make_unique<ActionRecorder>(InCidStore, RecordingPath);
+
+ ZEN_INFO("started recording to '{}'", RecordingPath);
+}
+
+void
+FunctionServiceSession::Impl::StopRecording()
+{
+ ZEN_INFO("stopping recording");
+
+ m_Recorder = nullptr;
+
+ ZEN_INFO("stopped recording");
+}
+
+std::vector<FunctionServiceSession::ActionHistoryEntry>
+FunctionServiceSession::Impl::GetActionHistory(int Limit)
+{
+ RwLock::SharedLockScope _(m_ActionHistoryLock);
+
+ if (Limit > 0 && static_cast<size_t>(Limit) < m_ActionHistory.size())
+ {
+ return std::vector<ActionHistoryEntry>(m_ActionHistory.end() - Limit, m_ActionHistory.end());
+ }
+
+ return std::vector<ActionHistoryEntry>(m_ActionHistory.begin(), m_ActionHistory.end());
+}
+
+void
+FunctionServiceSession::Impl::RegisterWorker(CbPackage Worker)
+{
+ RwLock::ExclusiveLockScope _(m_WorkerLock);
+
+ const IoHash& WorkerId = Worker.GetObject().GetHash();
+
+ if (m_WorkerMap.insert_or_assign(WorkerId, Worker).second)
+ {
+ // Note that since the convention currently is that WorkerId is equal to the hash
+ // of the worker descriptor there is no chance that we get a second write with a
+ // different descriptor. Thus we only need to call this the first time, when the
+ // worker is added
+
+ m_LocalRunnerGroup.RegisterWorker(Worker);
+ m_RemoteRunnerGroup.RegisterWorker(Worker);
+
+ if (m_Recorder)
+ {
+ m_Recorder->RegisterWorker(Worker);
+ }
+
+ CbObject WorkerObj = Worker.GetObject();
+
+ // Populate worker database
+
+ const Guid WorkerBuildSystemVersion = WorkerObj["buildsystem_version"sv].AsUuid();
+
+ for (auto& Item : WorkerObj["functions"sv])
+ {
+ CbObjectView Function = Item.AsObjectView();
+
+ std::string_view FunctionName = Function["name"sv].AsString();
+ const Guid FunctionVersion = Function["version"sv].AsUuid();
+
+ m_FunctionList.emplace_back(FunctionDefinition{.FunctionName = std::string{FunctionName},
+ .FunctionVersion = FunctionVersion,
+ .BuildSystemVersion = WorkerBuildSystemVersion,
+ .WorkerId = WorkerId});
+ }
+ }
+}
+
+WorkerDesc
+FunctionServiceSession::Impl::GetWorkerDescriptor(const IoHash& WorkerId)
+{
+ RwLock::SharedLockScope _(m_WorkerLock);
+
+ if (auto It = m_WorkerMap.find(WorkerId); It != m_WorkerMap.end())
+ {
+ const CbPackage& Desc = It->second;
+ return {Desc, WorkerId};
+ }
+
+ return {};
+}
+
+std::vector<IoHash>
+FunctionServiceSession::Impl::GetKnownWorkerIds()
+{
+ std::vector<IoHash> WorkerIds;
+ WorkerIds.reserve(m_WorkerMap.size());
+
+ m_WorkerLock.WithSharedLock([&] {
+ for (const auto& [WorkerId, _] : m_WorkerMap)
+ {
+ WorkerIds.push_back(WorkerId);
+ }
+ });
+
+ return WorkerIds;
+}
+
+FunctionServiceSession::EnqueueResult
+FunctionServiceSession::Impl::EnqueueAction(CbObject ActionObject, int Priority)
+{
+ // Resolve function to worker
+
+ IoHash WorkerId{IoHash::Zero};
+
+ std::string_view FunctionName = ActionObject["Function"sv].AsString();
+ const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid();
+ const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid();
+
+ for (const FunctionDefinition& FuncDef : m_FunctionList)
+ {
+ if (FuncDef.FunctionName == FunctionName && FuncDef.FunctionVersion == FunctionVersion &&
+ FuncDef.BuildSystemVersion == BuildSystemVersion)
+ {
+ WorkerId = FuncDef.WorkerId;
+
+ break;
+ }
+ }
+
+ if (WorkerId == IoHash::Zero)
+ {
+ CbObjectWriter Writer;
+
+ Writer << "Function"sv << FunctionName << "FunctionVersion"sv << FunctionVersion << "BuildSystemVersion" << BuildSystemVersion;
+ Writer << "error"
+ << "no worker matches the action specification";
+
+ return {0, Writer.Save()};
+ }
+
+ if (auto It = m_WorkerMap.find(WorkerId); It != m_WorkerMap.end())
+ {
+ CbPackage WorkerPackage = It->second;
+
+ return EnqueueResolvedAction(WorkerDesc{WorkerPackage, WorkerId}, ActionObject, Priority);
+ }
+
+ CbObjectWriter Writer;
+
+ Writer << "Function"sv << FunctionName << "FunctionVersion"sv << FunctionVersion << "BuildSystemVersion" << BuildSystemVersion;
+ Writer << "error"
+ << "no worker found despite match";
+
+ return {0, Writer.Save()};
+}
+
+FunctionServiceSession::EnqueueResult
+FunctionServiceSession::Impl::EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority)
+{
+ const int ActionLsn = ++m_ActionsCounter;
+
+ Ref<RunnerAction> Pending{new RunnerAction(m_FunctionServiceSession)};
+
+ Pending->ActionLsn = ActionLsn;
+ Pending->Worker = Worker;
+ Pending->ActionId = ActionObj.GetHash();
+ Pending->ActionObj = ActionObj;
+ Pending->Priority = RequestPriority;
+
+ SubmitResult SubResult = SubmitAction(Pending);
+
+ if (SubResult.IsAccepted)
+ {
+ // Great, the job is being taken care of by the runner
+ ZEN_DEBUG("direct schedule LSN {}", Pending->ActionLsn);
+ }
+ else
+ {
+ ZEN_DEBUG("action {} ({}) PENDING", Pending->ActionId, Pending->ActionLsn);
+
+ Pending->SetActionState(RunnerAction::State::Pending);
+ }
+
+ if (m_Recorder)
+ {
+ m_Recorder->RecordAction(Pending);
+ }
+
+ CbObjectWriter Writer;
+ Writer << "lsn" << Pending->ActionLsn;
+ Writer << "worker" << Pending->Worker.WorkerId;
+ Writer << "action" << Pending->ActionId;
+
+ return {Pending->ActionLsn, Writer.Save()};
+}
+
+SubmitResult
+FunctionServiceSession::Impl::SubmitAction(Ref<RunnerAction> Action)
+{
+ // Loosely round-robin scheduling of actions across runners.
+ //
+ // It's not entirely clear what this means given that submits
+ // can come in across multiple threads, but it's probably better
+ // than always starting with the first runner.
+ //
+ // Longer term we should track the state of the individual
+ // runners and make decisions accordingly.
+
+ SubmitResult Result = m_LocalRunnerGroup.SubmitAction(Action);
+ if (Result.IsAccepted)
+ {
+ return Result;
+ }
+
+ return m_RemoteRunnerGroup.SubmitAction(Action);
+}
+
+size_t
+FunctionServiceSession::Impl::GetSubmittedActionCount()
+{
+ return m_LocalRunnerGroup.GetSubmittedActionCount() + m_RemoteRunnerGroup.GetSubmittedActionCount();
+}
+
+HttpResponseCode
+FunctionServiceSession::Impl::GetActionResult(int ActionLsn, CbPackage& OutResultPackage)
+{
+ // This lock is held for the duration of the function since we need to
+ // be sure that the action doesn't change state while we are checking the
+ // different data structures
+
+ RwLock::ExclusiveLockScope _(m_ResultsLock);
+
+ if (auto It = m_ResultsMap.find(ActionLsn); It != m_ResultsMap.end())
+ {
+ OutResultPackage = std::move(It->second->GetResult());
+
+ m_ResultsMap.erase(It);
+
+ return HttpResponseCode::OK;
+ }
+
+ {
+ RwLock::SharedLockScope __(m_PendingLock);
+
+ if (auto FindIt = m_PendingActions.find(ActionLsn); FindIt != m_PendingActions.end())
+ {
+ return HttpResponseCode::Accepted;
+ }
+ }
+
+ // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must
+ // always be taken after m_ResultsLock if both are needed
+
+ {
+ RwLock::SharedLockScope __(m_RunningLock);
+
+ if (m_RunningMap.find(ActionLsn) != m_RunningMap.end())
+ {
+ return HttpResponseCode::Accepted;
+ }
+ }
+
+ return HttpResponseCode::NotFound;
+}
+
+HttpResponseCode
+FunctionServiceSession::Impl::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage)
+{
+ // This lock is held for the duration of the function since we need to
+ // be sure that the action doesn't change state while we are checking the
+ // different data structures
+
+ RwLock::ExclusiveLockScope _(m_ResultsLock);
+
+ for (auto It = begin(m_ResultsMap), End = end(m_ResultsMap); It != End; ++It)
+ {
+ if (It->second->ActionId == ActionId)
+ {
+ OutResultPackage = std::move(It->second->GetResult());
+
+ m_ResultsMap.erase(It);
+
+ return HttpResponseCode::OK;
+ }
+ }
+
+ {
+ RwLock::SharedLockScope __(m_PendingLock);
+
+ for (const auto& [K, Pending] : m_PendingActions)
+ {
+ if (Pending->ActionId == ActionId)
+ {
+ return HttpResponseCode::Accepted;
+ }
+ }
+ }
+
+ // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must
+ // always be taken after m_ResultsLock if both are needed
+
+ {
+ RwLock::SharedLockScope __(m_RunningLock);
+
+ for (const auto& [K, v] : m_RunningMap)
+ {
+ if (v->ActionId == ActionId)
+ {
+ return HttpResponseCode::Accepted;
+ }
+ }
+ }
+
+ return HttpResponseCode::NotFound;
+}
+
+void
+FunctionServiceSession::Impl::GetCompleted(CbWriter& Cbo)
+{
+ Cbo.BeginArray("completed");
+
+ m_ResultsLock.WithSharedLock([&] {
+ for (auto& Kv : m_ResultsMap)
+ {
+ Cbo << Kv.first;
+ }
+ });
+
+ Cbo.EndArray();
+}
+
+# define ZEN_BATCH_SCHEDULER 1
+
+void
+FunctionServiceSession::Impl::SchedulePendingActions()
+{
+ int ScheduledCount = 0;
+ size_t RunningCount = m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); });
+ size_t PendingCount = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); });
+ size_t ResultCount = m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); });
+
+ static Stopwatch DumpRunningTimer;
+
+ auto _ = MakeGuard([&] {
+ ZEN_INFO("scheduled {} pending actions. {} running ({} retired), {} still pending, {} results",
+ ScheduledCount,
+ RunningCount,
+ m_RetiredCount.load(),
+ PendingCount,
+ ResultCount);
+
+ if (DumpRunningTimer.GetElapsedTimeMs() > 30000)
+ {
+ DumpRunningTimer.Reset();
+
+ std::set<int> RunningList;
+ m_RunningLock.WithSharedLock([&] {
+ for (auto& [K, V] : m_RunningMap)
+ {
+ RunningList.insert(K);
+ }
+ });
+
+ ExtendableStringBuilder<1024> RunningString;
+ for (int i : RunningList)
+ {
+ if (RunningString.Size())
+ {
+ RunningString << ", ";
+ }
+
+ RunningString.Append(IntNum(i));
+ }
+
+ ZEN_INFO("running: {}", RunningString);
+ }
+ });
+
+# if ZEN_BATCH_SCHEDULER
+ size_t Capacity = QueryCapacity();
+
+ if (!Capacity)
+ {
+ _.Dismiss();
+
+ return;
+ }
+
+ std::vector<Ref<RunnerAction>> ActionsToSchedule;
+
+ // Pull actions to schedule from the pending queue, we will try to submit these to the runner outside of the lock
+
+ m_PendingLock.WithExclusiveLock([&] {
+ if (m_ShutdownRequested)
+ {
+ return;
+ }
+
+ if (m_PendingActions.empty())
+ {
+ return;
+ }
+
+ size_t NumActionsToSchedule = std::min(Capacity, m_PendingActions.size());
+
+ auto PendingIt = m_PendingActions.begin();
+ const auto PendingEnd = m_PendingActions.end();
+
+ while (NumActionsToSchedule && PendingIt != PendingEnd)
+ {
+ const Ref<RunnerAction>& Pending = PendingIt->second;
+
+ switch (Pending->ActionState())
+ {
+ case RunnerAction::State::Pending:
+ ActionsToSchedule.push_back(Pending);
+ break;
+
+ case RunnerAction::State::Running:
+ case RunnerAction::State::Completed:
+ case RunnerAction::State::Failed:
+ break;
+
+ default:
+ case RunnerAction::State::New:
+ ZEN_WARN("unexpected state {} for pending action {}", static_cast<int>(Pending->ActionState()), Pending->ActionLsn);
+ break;
+ }
+
+ ++PendingIt;
+ --NumActionsToSchedule;
+ }
+
+ PendingCount = m_PendingActions.size();
+ });
+
+ if (ActionsToSchedule.empty())
+ {
+ _.Dismiss();
+ return;
+ }
+
+ ZEN_INFO("attempting schedule of {} pending actions", ActionsToSchedule.size());
+
+ auto SubmitResults = SubmitActions(ActionsToSchedule);
+
+ // Move successfully scheduled actions to the running map and remove
+ // from pending queue. It's actually possible that by the time we get
+ // to this stage some of the actions may have already completed, so
+ // they should not always be added to the running map
+
+ eastl::hash_set<int> ScheduledActions;
+
+ for (size_t i = 0; i < ActionsToSchedule.size(); ++i)
+ {
+ const Ref<RunnerAction>& Pending = ActionsToSchedule[i];
+ const SubmitResult& SubResult = SubmitResults[i];
+
+ if (SubResult.IsAccepted)
+ {
+ ScheduledActions.insert(Pending->ActionLsn);
+ }
+ }
+
+ ScheduledCount += (int)ActionsToSchedule.size();
+
+# else
+ m_PendingLock.WithExclusiveLock([&] {
+ while (!m_PendingActions.empty())
+ {
+ if (m_ShutdownRequested)
+ {
+ return;
+ }
+
+ // Here it would be good if we could decide to pop immediately to avoid
+ // holding the lock while creating processes etc
+ const Ref<RunnerAction>& Pending = m_PendingActions.begin()->second;
+ FunctionRunner::SubmitResult SubResult = SubmitAction(Pending);
+
+ if (SubResult.IsAccepted)
+ {
+ // Great, the job is being taken care of by the runner
+
+ ZEN_DEBUG("action {} ({}) PENDING -> RUNNING", Pending->ActionId, Pending->ActionLsn);
+
+ m_RunningLock.WithExclusiveLock([&] {
+ m_RunningMap.insert({Pending->ActionLsn, Pending});
+
+ RunningCount = m_RunningMap.size();
+ });
+
+ m_PendingActions.pop_front();
+
+ PendingCount = m_PendingActions.size();
+ ++ScheduledCount;
+ }
+ else
+ {
+ // Runner could not accept the job, leave it on the pending queue
+
+ return;
+ }
+ }
+ });
+# endif
+}
+
+void
+FunctionServiceSession::Impl::MonitorThreadFunction()
+{
+ SetCurrentThreadName("FunctionServiceSession_Monitor");
+
+ auto _ = MakeGuard([&] { ZEN_INFO("monitor thread exiting"); });
+
+ do
+ {
+ int TimeoutMs = 1000;
+
+ if (m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); }))
+ {
+ TimeoutMs = 100;
+ }
+
+ const bool Timedout = m_SchedulingThreadEvent.Wait(TimeoutMs);
+
+ if (m_SchedulingThreadEnabled == false)
+ {
+ return;
+ }
+
+ HandleActionUpdates();
+
+ // Schedule pending actions
+
+ SchedulePendingActions();
+
+ if (!Timedout)
+ {
+ m_SchedulingThreadEvent.Reset();
+ }
+ } while (m_SchedulingThreadEnabled);
+}
+
+void
+FunctionServiceSession::Impl::PostUpdate(RunnerAction* Action)
+{
+ m_UpdatedActionsLock.WithExclusiveLock([&] { m_UpdatedActions.emplace_back(Action); });
+}
+
+void
+FunctionServiceSession::Impl::HandleActionUpdates()
+{
+ std::vector<Ref<RunnerAction>> UpdatedActions;
+
+ m_UpdatedActionsLock.WithExclusiveLock([&] { std::swap(UpdatedActions, m_UpdatedActions); });
+
+ std::unordered_set<int> SeenLsn;
+ std::unordered_set<int> RunningLsn;
+
+ for (Ref<RunnerAction>& Action : UpdatedActions)
+ {
+ const int ActionLsn = Action->ActionLsn;
+
+ if (auto [It, Inserted] = SeenLsn.insert(ActionLsn); Inserted)
+ {
+ switch (Action->ActionState())
+ {
+ case RunnerAction::State::Pending:
+ m_PendingLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Action}); });
+ break;
+
+ case RunnerAction::State::Running:
+ m_PendingLock.WithExclusiveLock([&] {
+ m_RunningLock.WithExclusiveLock([&] {
+ m_RunningMap.insert({ActionLsn, Action});
+ m_PendingActions.erase(ActionLsn);
+ });
+ });
+ ZEN_DEBUG("action {} ({}) RUNNING", Action->ActionId, ActionLsn);
+ break;
+
+ case RunnerAction::State::Completed:
+ case RunnerAction::State::Failed:
+ m_ResultsLock.WithExclusiveLock([&] {
+ m_ResultsMap[ActionLsn] = Action;
+
+ m_PendingLock.WithExclusiveLock([&] {
+ m_RunningLock.WithExclusiveLock([&] {
+ if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end())
+ {
+ m_PendingActions.erase(ActionLsn);
+ }
+ else
+ {
+ m_RunningMap.erase(FindIt);
+ }
+ });
+ });
+
+ m_ActionHistoryLock.WithExclusiveLock([&] {
+ ActionHistoryEntry Entry{.Lsn = ActionLsn,
+ .ActionId = Action->ActionId,
+ .WorkerId = Action->Worker.WorkerId,
+ .ActionDescriptor = Action->ActionObj,
+ .Succeeded = Action->ActionState() == RunnerAction::State::Completed};
+
+ std::copy(std::begin(Action->Timestamps), std::end(Action->Timestamps), std::begin(Entry.Timestamps));
+
+ m_ActionHistory.push_back(std::move(Entry));
+
+ if (m_ActionHistory.size() > m_HistoryLimit)
+ {
+ m_ActionHistory.pop_front();
+ }
+ });
+ });
+ m_RetiredCount.fetch_add(1);
+ m_ResultRate.Mark(1);
+ ZEN_DEBUG("action {} ({}) RUNNING -> COMPLETED with {}",
+ Action->ActionId,
+ ActionLsn,
+ Action->ActionState() == RunnerAction::State::Completed ? "SUCCESS" : "FAILURE");
+ break;
+ }
+ }
+ }
+}
+
+size_t
+FunctionServiceSession::Impl::QueryCapacity()
+{
+ return m_LocalRunnerGroup.QueryCapacity() + m_RemoteRunnerGroup.QueryCapacity();
+}
+
+std::vector<SubmitResult>
+FunctionServiceSession::Impl::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
+{
+ std::vector<SubmitResult> Results;
+
+ for (const Ref<RunnerAction>& Action : Actions)
+ {
+ Results.push_back(SubmitAction(Action));
+ }
+
+ return Results;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+FunctionServiceSession::FunctionServiceSession(ChunkResolver& InChunkResolver)
+{
+ m_Impl = std::make_unique<Impl>(this, InChunkResolver);
+}
+
+FunctionServiceSession::~FunctionServiceSession()
+{
+ Shutdown();
+}
+
+bool
+FunctionServiceSession::IsHealthy()
+{
+ return m_Impl->IsHealthy();
+}
+
+void
+FunctionServiceSession::Shutdown()
+{
+ m_Impl->Shutdown();
+}
+
+void
+FunctionServiceSession::StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath)
+{
+ m_Impl->StartRecording(InResolver, RecordingPath);
+}
+
+void
+FunctionServiceSession::StopRecording()
+{
+ m_Impl->StopRecording();
+}
+
+void
+FunctionServiceSession::EmitStats(CbObjectWriter& Cbo)
+{
+ m_Impl->EmitStats(Cbo);
+}
+
+std::vector<IoHash>
+FunctionServiceSession::GetKnownWorkerIds()
+{
+ return m_Impl->GetKnownWorkerIds();
+}
+
+WorkerDesc
+FunctionServiceSession::GetWorkerDescriptor(const IoHash& WorkerId)
+{
+ return m_Impl->GetWorkerDescriptor(WorkerId);
+}
+
+void
+FunctionServiceSession::AddLocalRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath)
+{
+ m_Impl->m_LocalRunnerGroup.AddRunner(new LocalProcessRunner(InChunkResolver, BasePath));
+}
+
+void
+FunctionServiceSession::AddRemoteRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, std::string_view HostName)
+{
+ m_Impl->m_RemoteRunnerGroup.AddRunner(new RemoteHttpRunner(InChunkResolver, BasePath, HostName));
+}
+
+FunctionServiceSession::EnqueueResult
+FunctionServiceSession::EnqueueAction(CbObject ActionObject, int Priority)
+{
+ return m_Impl->EnqueueAction(ActionObject, Priority);
+}
+
+FunctionServiceSession::EnqueueResult
+FunctionServiceSession::EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority)
+{
+ return m_Impl->EnqueueResolvedAction(Worker, ActionObj, RequestPriority);
+}
+
+void
+FunctionServiceSession::RegisterWorker(CbPackage Worker)
+{
+ m_Impl->RegisterWorker(Worker);
+}
+
+HttpResponseCode
+FunctionServiceSession::GetActionResult(int ActionLsn, CbPackage& OutResultPackage)
+{
+ return m_Impl->GetActionResult(ActionLsn, OutResultPackage);
+}
+
+HttpResponseCode
+FunctionServiceSession::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage)
+{
+ return m_Impl->FindActionResult(ActionId, OutResultPackage);
+}
+
+std::vector<FunctionServiceSession::ActionHistoryEntry>
+FunctionServiceSession::GetActionHistory(int Limit)
+{
+ return m_Impl->GetActionHistory(Limit);
+}
+
+void
+FunctionServiceSession::GetCompleted(CbWriter& Cbo)
+{
+ m_Impl->GetCompleted(Cbo);
+}
+
+void
+FunctionServiceSession::PostUpdate(RunnerAction* Action)
+{
+ m_Impl->PostUpdate(Action);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+function_forcelink()
+{
+}
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES