aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/functionservice.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencompute/functionservice.cpp')
-rw-r--r--src/zencompute/functionservice.cpp957
1 files changed, 0 insertions, 957 deletions
diff --git a/src/zencompute/functionservice.cpp b/src/zencompute/functionservice.cpp
deleted file mode 100644
index 0698449e9..000000000
--- a/src/zencompute/functionservice.cpp
+++ /dev/null
@@ -1,957 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "zencompute/functionservice.h"
-
-#if ZEN_WITH_COMPUTE_SERVICES
-
-# include "functionrunner.h"
-# include "actionrecorder.h"
-# include "localrunner.h"
-# include "remotehttprunner.h"
-
-# include <zencompute/recordingreader.h>
-# include <zencore/compactbinary.h>
-# include <zencore/compactbinarybuilder.h>
-# include <zencore/compactbinarypackage.h>
-# include <zencore/compress.h>
-# include <zencore/except.h>
-# include <zencore/filesystem.h>
-# include <zencore/fmtutils.h>
-# include <zencore/iobuffer.h>
-# include <zencore/iohash.h>
-# include <zencore/logging.h>
-# include <zencore/scopeguard.h>
-# include <zentelemetry/stats.h>
-
-# include <set>
-# include <deque>
-# include <map>
-# include <thread>
-# include <unordered_map>
-
-ZEN_THIRD_PARTY_INCLUDES_START
-# include <EASTL/hash_set.h>
-ZEN_THIRD_PARTY_INCLUDES_END
-
-using namespace std::literals;
-
-namespace zen::compute {
-
-//////////////////////////////////////////////////////////////////////////
-
-struct FunctionServiceSession::Impl
-{
- FunctionServiceSession* m_FunctionServiceSession;
- ChunkResolver& m_ChunkResolver;
- LoggerRef m_Log{logging::Get("apply")};
-
- Impl(FunctionServiceSession* InFunctionServiceSession, ChunkResolver& InChunkResolver)
- : m_FunctionServiceSession(InFunctionServiceSession)
- , m_ChunkResolver(InChunkResolver)
- {
- m_SchedulingThread = std::thread{&Impl::MonitorThreadFunction, this};
- }
-
- void Shutdown();
- bool IsHealthy();
-
- LoggerRef Log() { return m_Log; }
-
- std::atomic_bool m_AcceptActions = true;
-
- struct FunctionDefinition
- {
- std::string FunctionName;
- Guid FunctionVersion;
- Guid BuildSystemVersion;
- IoHash WorkerId;
- };
-
- void EmitStats(CbObjectWriter& Cbo)
- {
- m_WorkerLock.WithSharedLock([&] { Cbo << "worker_count"sv << m_WorkerMap.size(); });
- m_ResultsLock.WithSharedLock([&] { Cbo << "actions_complete"sv << m_ResultsMap.size(); });
- m_PendingLock.WithSharedLock([&] { Cbo << "actions_pending"sv << m_PendingActions.size(); });
- Cbo << "actions_submitted"sv << GetSubmittedActionCount();
- EmitSnapshot("actions_retired"sv, m_ResultRate, Cbo);
- }
-
- void RegisterWorker(CbPackage Worker);
- WorkerDesc GetWorkerDescriptor(const IoHash& WorkerId);
-
- std::atomic<int32_t> m_ActionsCounter = 0; // sequence number
-
- RwLock m_PendingLock;
- std::map<int, Ref<RunnerAction>> m_PendingActions;
-
- RwLock m_RunningLock;
- std::unordered_map<int, Ref<RunnerAction>> m_RunningMap;
-
- RwLock m_ResultsLock;
- std::unordered_map<int, Ref<RunnerAction>> m_ResultsMap;
- metrics::Meter m_ResultRate;
- std::atomic<uint64_t> m_RetiredCount{0};
-
- HttpResponseCode GetActionResult(int ActionLsn, CbPackage& OutResultPackage);
- HttpResponseCode FindActionResult(const IoHash& ActionId, CbPackage& ResultPackage);
-
- std::atomic<bool> m_ShutdownRequested{false};
-
- std::thread m_SchedulingThread;
- std::atomic<bool> m_SchedulingThreadEnabled{true};
- Event m_SchedulingThreadEvent;
-
- void MonitorThreadFunction();
- void SchedulePendingActions();
-
- // Workers
-
- RwLock m_WorkerLock;
- std::unordered_map<IoHash, CbPackage> m_WorkerMap;
- std::vector<FunctionDefinition> m_FunctionList;
- std::vector<IoHash> GetKnownWorkerIds();
-
- // Runners
-
- RunnerGroup<LocalProcessRunner> m_LocalRunnerGroup;
- RunnerGroup<RemoteHttpRunner> m_RemoteRunnerGroup;
-
- EnqueueResult EnqueueAction(CbObject ActionObject, int Priority);
- EnqueueResult EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority);
-
- void GetCompleted(CbWriter& Cbo);
-
- // Recording
-
- void StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath);
- void StopRecording();
-
- std::unique_ptr<ActionRecorder> m_Recorder;
-
- // History tracking
-
- RwLock m_ActionHistoryLock;
- std::deque<FunctionServiceSession::ActionHistoryEntry> m_ActionHistory;
- size_t m_HistoryLimit = 1000;
-
- std::vector<FunctionServiceSession::ActionHistoryEntry> GetActionHistory(int Limit);
-
- //
-
- [[nodiscard]] size_t QueryCapacity();
-
- [[nodiscard]] SubmitResult SubmitAction(Ref<RunnerAction> Action);
- [[nodiscard]] std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions);
- [[nodiscard]] size_t GetSubmittedActionCount();
-
- // Updates
-
- RwLock m_UpdatedActionsLock;
- std::vector<Ref<RunnerAction>> m_UpdatedActions;
-
- void HandleActionUpdates();
- void PostUpdate(RunnerAction* Action);
-
- void ShutdownRunners();
-};
-
-bool
-FunctionServiceSession::Impl::IsHealthy()
-{
- return true;
-}
-
-void
-FunctionServiceSession::Impl::Shutdown()
-{
- m_AcceptActions = false;
- m_ShutdownRequested = true;
-
- m_SchedulingThreadEnabled = false;
- m_SchedulingThreadEvent.Set();
- if (m_SchedulingThread.joinable())
- {
- m_SchedulingThread.join();
- }
-
- ShutdownRunners();
-}
-
-void
-FunctionServiceSession::Impl::ShutdownRunners()
-{
- m_LocalRunnerGroup.Shutdown();
- m_RemoteRunnerGroup.Shutdown();
-}
-
-void
-FunctionServiceSession::Impl::StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath)
-{
- ZEN_INFO("starting recording to '{}'", RecordingPath);
-
- m_Recorder = std::make_unique<ActionRecorder>(InCidStore, RecordingPath);
-
- ZEN_INFO("started recording to '{}'", RecordingPath);
-}
-
-void
-FunctionServiceSession::Impl::StopRecording()
-{
- ZEN_INFO("stopping recording");
-
- m_Recorder = nullptr;
-
- ZEN_INFO("stopped recording");
-}
-
-std::vector<FunctionServiceSession::ActionHistoryEntry>
-FunctionServiceSession::Impl::GetActionHistory(int Limit)
-{
- RwLock::SharedLockScope _(m_ActionHistoryLock);
-
- if (Limit > 0 && static_cast<size_t>(Limit) < m_ActionHistory.size())
- {
- return std::vector<ActionHistoryEntry>(m_ActionHistory.end() - Limit, m_ActionHistory.end());
- }
-
- return std::vector<ActionHistoryEntry>(m_ActionHistory.begin(), m_ActionHistory.end());
-}
-
-void
-FunctionServiceSession::Impl::RegisterWorker(CbPackage Worker)
-{
- RwLock::ExclusiveLockScope _(m_WorkerLock);
-
- const IoHash& WorkerId = Worker.GetObject().GetHash();
-
- if (m_WorkerMap.insert_or_assign(WorkerId, Worker).second)
- {
- // Note that since the convention currently is that WorkerId is equal to the hash
- // of the worker descriptor there is no chance that we get a second write with a
- // different descriptor. Thus we only need to call this the first time, when the
- // worker is added
-
- m_LocalRunnerGroup.RegisterWorker(Worker);
- m_RemoteRunnerGroup.RegisterWorker(Worker);
-
- if (m_Recorder)
- {
- m_Recorder->RegisterWorker(Worker);
- }
-
- CbObject WorkerObj = Worker.GetObject();
-
- // Populate worker database
-
- const Guid WorkerBuildSystemVersion = WorkerObj["buildsystem_version"sv].AsUuid();
-
- for (auto& Item : WorkerObj["functions"sv])
- {
- CbObjectView Function = Item.AsObjectView();
-
- std::string_view FunctionName = Function["name"sv].AsString();
- const Guid FunctionVersion = Function["version"sv].AsUuid();
-
- m_FunctionList.emplace_back(FunctionDefinition{.FunctionName = std::string{FunctionName},
- .FunctionVersion = FunctionVersion,
- .BuildSystemVersion = WorkerBuildSystemVersion,
- .WorkerId = WorkerId});
- }
- }
-}
-
-WorkerDesc
-FunctionServiceSession::Impl::GetWorkerDescriptor(const IoHash& WorkerId)
-{
- RwLock::SharedLockScope _(m_WorkerLock);
-
- if (auto It = m_WorkerMap.find(WorkerId); It != m_WorkerMap.end())
- {
- const CbPackage& Desc = It->second;
- return {Desc, WorkerId};
- }
-
- return {};
-}
-
-std::vector<IoHash>
-FunctionServiceSession::Impl::GetKnownWorkerIds()
-{
- std::vector<IoHash> WorkerIds;
- WorkerIds.reserve(m_WorkerMap.size());
-
- m_WorkerLock.WithSharedLock([&] {
- for (const auto& [WorkerId, _] : m_WorkerMap)
- {
- WorkerIds.push_back(WorkerId);
- }
- });
-
- return WorkerIds;
-}
-
-FunctionServiceSession::EnqueueResult
-FunctionServiceSession::Impl::EnqueueAction(CbObject ActionObject, int Priority)
-{
- // Resolve function to worker
-
- IoHash WorkerId{IoHash::Zero};
-
- std::string_view FunctionName = ActionObject["Function"sv].AsString();
- const Guid FunctionVersion = ActionObject["FunctionVersion"sv].AsUuid();
- const Guid BuildSystemVersion = ActionObject["BuildSystemVersion"sv].AsUuid();
-
- for (const FunctionDefinition& FuncDef : m_FunctionList)
- {
- if (FuncDef.FunctionName == FunctionName && FuncDef.FunctionVersion == FunctionVersion &&
- FuncDef.BuildSystemVersion == BuildSystemVersion)
- {
- WorkerId = FuncDef.WorkerId;
-
- break;
- }
- }
-
- if (WorkerId == IoHash::Zero)
- {
- CbObjectWriter Writer;
-
- Writer << "Function"sv << FunctionName << "FunctionVersion"sv << FunctionVersion << "BuildSystemVersion" << BuildSystemVersion;
- Writer << "error"
- << "no worker matches the action specification";
-
- return {0, Writer.Save()};
- }
-
- if (auto It = m_WorkerMap.find(WorkerId); It != m_WorkerMap.end())
- {
- CbPackage WorkerPackage = It->second;
-
- return EnqueueResolvedAction(WorkerDesc{WorkerPackage, WorkerId}, ActionObject, Priority);
- }
-
- CbObjectWriter Writer;
-
- Writer << "Function"sv << FunctionName << "FunctionVersion"sv << FunctionVersion << "BuildSystemVersion" << BuildSystemVersion;
- Writer << "error"
- << "no worker found despite match";
-
- return {0, Writer.Save()};
-}
-
-FunctionServiceSession::EnqueueResult
-FunctionServiceSession::Impl::EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority)
-{
- const int ActionLsn = ++m_ActionsCounter;
-
- Ref<RunnerAction> Pending{new RunnerAction(m_FunctionServiceSession)};
-
- Pending->ActionLsn = ActionLsn;
- Pending->Worker = Worker;
- Pending->ActionId = ActionObj.GetHash();
- Pending->ActionObj = ActionObj;
- Pending->Priority = RequestPriority;
-
- SubmitResult SubResult = SubmitAction(Pending);
-
- if (SubResult.IsAccepted)
- {
- // Great, the job is being taken care of by the runner
- ZEN_DEBUG("direct schedule LSN {}", Pending->ActionLsn);
- }
- else
- {
- ZEN_DEBUG("action {} ({}) PENDING", Pending->ActionId, Pending->ActionLsn);
-
- Pending->SetActionState(RunnerAction::State::Pending);
- }
-
- if (m_Recorder)
- {
- m_Recorder->RecordAction(Pending);
- }
-
- CbObjectWriter Writer;
- Writer << "lsn" << Pending->ActionLsn;
- Writer << "worker" << Pending->Worker.WorkerId;
- Writer << "action" << Pending->ActionId;
-
- return {Pending->ActionLsn, Writer.Save()};
-}
-
-SubmitResult
-FunctionServiceSession::Impl::SubmitAction(Ref<RunnerAction> Action)
-{
- // Loosely round-robin scheduling of actions across runners.
- //
- // It's not entirely clear what this means given that submits
- // can come in across multiple threads, but it's probably better
- // than always starting with the first runner.
- //
- // Longer term we should track the state of the individual
- // runners and make decisions accordingly.
-
- SubmitResult Result = m_LocalRunnerGroup.SubmitAction(Action);
- if (Result.IsAccepted)
- {
- return Result;
- }
-
- return m_RemoteRunnerGroup.SubmitAction(Action);
-}
-
-size_t
-FunctionServiceSession::Impl::GetSubmittedActionCount()
-{
- return m_LocalRunnerGroup.GetSubmittedActionCount() + m_RemoteRunnerGroup.GetSubmittedActionCount();
-}
-
-HttpResponseCode
-FunctionServiceSession::Impl::GetActionResult(int ActionLsn, CbPackage& OutResultPackage)
-{
- // This lock is held for the duration of the function since we need to
- // be sure that the action doesn't change state while we are checking the
- // different data structures
-
- RwLock::ExclusiveLockScope _(m_ResultsLock);
-
- if (auto It = m_ResultsMap.find(ActionLsn); It != m_ResultsMap.end())
- {
- OutResultPackage = std::move(It->second->GetResult());
-
- m_ResultsMap.erase(It);
-
- return HttpResponseCode::OK;
- }
-
- {
- RwLock::SharedLockScope __(m_PendingLock);
-
- if (auto FindIt = m_PendingActions.find(ActionLsn); FindIt != m_PendingActions.end())
- {
- return HttpResponseCode::Accepted;
- }
- }
-
- // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must
- // always be taken after m_ResultsLock if both are needed
-
- {
- RwLock::SharedLockScope __(m_RunningLock);
-
- if (m_RunningMap.find(ActionLsn) != m_RunningMap.end())
- {
- return HttpResponseCode::Accepted;
- }
- }
-
- return HttpResponseCode::NotFound;
-}
-
-HttpResponseCode
-FunctionServiceSession::Impl::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage)
-{
- // This lock is held for the duration of the function since we need to
- // be sure that the action doesn't change state while we are checking the
- // different data structures
-
- RwLock::ExclusiveLockScope _(m_ResultsLock);
-
- for (auto It = begin(m_ResultsMap), End = end(m_ResultsMap); It != End; ++It)
- {
- if (It->second->ActionId == ActionId)
- {
- OutResultPackage = std::move(It->second->GetResult());
-
- m_ResultsMap.erase(It);
-
- return HttpResponseCode::OK;
- }
- }
-
- {
- RwLock::SharedLockScope __(m_PendingLock);
-
- for (const auto& [K, Pending] : m_PendingActions)
- {
- if (Pending->ActionId == ActionId)
- {
- return HttpResponseCode::Accepted;
- }
- }
- }
-
- // Lock order is important here to avoid deadlocks, RwLock m_RunningLock must
- // always be taken after m_ResultsLock if both are needed
-
- {
- RwLock::SharedLockScope __(m_RunningLock);
-
- for (const auto& [K, v] : m_RunningMap)
- {
- if (v->ActionId == ActionId)
- {
- return HttpResponseCode::Accepted;
- }
- }
- }
-
- return HttpResponseCode::NotFound;
-}
-
-void
-FunctionServiceSession::Impl::GetCompleted(CbWriter& Cbo)
-{
- Cbo.BeginArray("completed");
-
- m_ResultsLock.WithSharedLock([&] {
- for (auto& Kv : m_ResultsMap)
- {
- Cbo << Kv.first;
- }
- });
-
- Cbo.EndArray();
-}
-
-# define ZEN_BATCH_SCHEDULER 1
-
-void
-FunctionServiceSession::Impl::SchedulePendingActions()
-{
- int ScheduledCount = 0;
- size_t RunningCount = m_RunningLock.WithSharedLock([&] { return m_RunningMap.size(); });
- size_t PendingCount = m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); });
- size_t ResultCount = m_ResultsLock.WithSharedLock([&] { return m_ResultsMap.size(); });
-
- static Stopwatch DumpRunningTimer;
-
- auto _ = MakeGuard([&] {
- ZEN_INFO("scheduled {} pending actions. {} running ({} retired), {} still pending, {} results",
- ScheduledCount,
- RunningCount,
- m_RetiredCount.load(),
- PendingCount,
- ResultCount);
-
- if (DumpRunningTimer.GetElapsedTimeMs() > 30000)
- {
- DumpRunningTimer.Reset();
-
- std::set<int> RunningList;
- m_RunningLock.WithSharedLock([&] {
- for (auto& [K, V] : m_RunningMap)
- {
- RunningList.insert(K);
- }
- });
-
- ExtendableStringBuilder<1024> RunningString;
- for (int i : RunningList)
- {
- if (RunningString.Size())
- {
- RunningString << ", ";
- }
-
- RunningString.Append(IntNum(i));
- }
-
- ZEN_INFO("running: {}", RunningString);
- }
- });
-
-# if ZEN_BATCH_SCHEDULER
- size_t Capacity = QueryCapacity();
-
- if (!Capacity)
- {
- _.Dismiss();
-
- return;
- }
-
- std::vector<Ref<RunnerAction>> ActionsToSchedule;
-
- // Pull actions to schedule from the pending queue, we will try to submit these to the runner outside of the lock
-
- m_PendingLock.WithExclusiveLock([&] {
- if (m_ShutdownRequested)
- {
- return;
- }
-
- if (m_PendingActions.empty())
- {
- return;
- }
-
- size_t NumActionsToSchedule = std::min(Capacity, m_PendingActions.size());
-
- auto PendingIt = m_PendingActions.begin();
- const auto PendingEnd = m_PendingActions.end();
-
- while (NumActionsToSchedule && PendingIt != PendingEnd)
- {
- const Ref<RunnerAction>& Pending = PendingIt->second;
-
- switch (Pending->ActionState())
- {
- case RunnerAction::State::Pending:
- ActionsToSchedule.push_back(Pending);
- break;
-
- case RunnerAction::State::Running:
- case RunnerAction::State::Completed:
- case RunnerAction::State::Failed:
- break;
-
- default:
- case RunnerAction::State::New:
- ZEN_WARN("unexpected state {} for pending action {}", static_cast<int>(Pending->ActionState()), Pending->ActionLsn);
- break;
- }
-
- ++PendingIt;
- --NumActionsToSchedule;
- }
-
- PendingCount = m_PendingActions.size();
- });
-
- if (ActionsToSchedule.empty())
- {
- _.Dismiss();
- return;
- }
-
- ZEN_INFO("attempting schedule of {} pending actions", ActionsToSchedule.size());
-
- auto SubmitResults = SubmitActions(ActionsToSchedule);
-
- // Move successfully scheduled actions to the running map and remove
- // from pending queue. It's actually possible that by the time we get
- // to this stage some of the actions may have already completed, so
- // they should not always be added to the running map
-
- eastl::hash_set<int> ScheduledActions;
-
- for (size_t i = 0; i < ActionsToSchedule.size(); ++i)
- {
- const Ref<RunnerAction>& Pending = ActionsToSchedule[i];
- const SubmitResult& SubResult = SubmitResults[i];
-
- if (SubResult.IsAccepted)
- {
- ScheduledActions.insert(Pending->ActionLsn);
- }
- }
-
- ScheduledCount += (int)ActionsToSchedule.size();
-
-# else
- m_PendingLock.WithExclusiveLock([&] {
- while (!m_PendingActions.empty())
- {
- if (m_ShutdownRequested)
- {
- return;
- }
-
- // Here it would be good if we could decide to pop immediately to avoid
- // holding the lock while creating processes etc
- const Ref<RunnerAction>& Pending = m_PendingActions.begin()->second;
- FunctionRunner::SubmitResult SubResult = SubmitAction(Pending);
-
- if (SubResult.IsAccepted)
- {
- // Great, the job is being taken care of by the runner
-
- ZEN_DEBUG("action {} ({}) PENDING -> RUNNING", Pending->ActionId, Pending->ActionLsn);
-
- m_RunningLock.WithExclusiveLock([&] {
- m_RunningMap.insert({Pending->ActionLsn, Pending});
-
- RunningCount = m_RunningMap.size();
- });
-
- m_PendingActions.pop_front();
-
- PendingCount = m_PendingActions.size();
- ++ScheduledCount;
- }
- else
- {
- // Runner could not accept the job, leave it on the pending queue
-
- return;
- }
- }
- });
-# endif
-}
-
-void
-FunctionServiceSession::Impl::MonitorThreadFunction()
-{
- SetCurrentThreadName("FunctionServiceSession_Monitor");
-
- auto _ = MakeGuard([&] { ZEN_INFO("monitor thread exiting"); });
-
- do
- {
- int TimeoutMs = 1000;
-
- if (m_PendingLock.WithSharedLock([&] { return m_PendingActions.size(); }))
- {
- TimeoutMs = 100;
- }
-
- const bool Timedout = m_SchedulingThreadEvent.Wait(TimeoutMs);
-
- if (m_SchedulingThreadEnabled == false)
- {
- return;
- }
-
- HandleActionUpdates();
-
- // Schedule pending actions
-
- SchedulePendingActions();
-
- if (!Timedout)
- {
- m_SchedulingThreadEvent.Reset();
- }
- } while (m_SchedulingThreadEnabled);
-}
-
-void
-FunctionServiceSession::Impl::PostUpdate(RunnerAction* Action)
-{
- m_UpdatedActionsLock.WithExclusiveLock([&] { m_UpdatedActions.emplace_back(Action); });
-}
-
-void
-FunctionServiceSession::Impl::HandleActionUpdates()
-{
- std::vector<Ref<RunnerAction>> UpdatedActions;
-
- m_UpdatedActionsLock.WithExclusiveLock([&] { std::swap(UpdatedActions, m_UpdatedActions); });
-
- std::unordered_set<int> SeenLsn;
- std::unordered_set<int> RunningLsn;
-
- for (Ref<RunnerAction>& Action : UpdatedActions)
- {
- const int ActionLsn = Action->ActionLsn;
-
- if (auto [It, Inserted] = SeenLsn.insert(ActionLsn); Inserted)
- {
- switch (Action->ActionState())
- {
- case RunnerAction::State::Pending:
- m_PendingLock.WithExclusiveLock([&] { m_PendingActions.insert({ActionLsn, Action}); });
- break;
-
- case RunnerAction::State::Running:
- m_PendingLock.WithExclusiveLock([&] {
- m_RunningLock.WithExclusiveLock([&] {
- m_RunningMap.insert({ActionLsn, Action});
- m_PendingActions.erase(ActionLsn);
- });
- });
- ZEN_DEBUG("action {} ({}) RUNNING", Action->ActionId, ActionLsn);
- break;
-
- case RunnerAction::State::Completed:
- case RunnerAction::State::Failed:
- m_ResultsLock.WithExclusiveLock([&] {
- m_ResultsMap[ActionLsn] = Action;
-
- m_PendingLock.WithExclusiveLock([&] {
- m_RunningLock.WithExclusiveLock([&] {
- if (auto FindIt = m_RunningMap.find(ActionLsn); FindIt == m_RunningMap.end())
- {
- m_PendingActions.erase(ActionLsn);
- }
- else
- {
- m_RunningMap.erase(FindIt);
- }
- });
- });
-
- m_ActionHistoryLock.WithExclusiveLock([&] {
- ActionHistoryEntry Entry{.Lsn = ActionLsn,
- .ActionId = Action->ActionId,
- .WorkerId = Action->Worker.WorkerId,
- .ActionDescriptor = Action->ActionObj,
- .Succeeded = Action->ActionState() == RunnerAction::State::Completed};
-
- std::copy(std::begin(Action->Timestamps), std::end(Action->Timestamps), std::begin(Entry.Timestamps));
-
- m_ActionHistory.push_back(std::move(Entry));
-
- if (m_ActionHistory.size() > m_HistoryLimit)
- {
- m_ActionHistory.pop_front();
- }
- });
- });
- m_RetiredCount.fetch_add(1);
- m_ResultRate.Mark(1);
- ZEN_DEBUG("action {} ({}) RUNNING -> COMPLETED with {}",
- Action->ActionId,
- ActionLsn,
- Action->ActionState() == RunnerAction::State::Completed ? "SUCCESS" : "FAILURE");
- break;
- }
- }
- }
-}
-
-size_t
-FunctionServiceSession::Impl::QueryCapacity()
-{
- return m_LocalRunnerGroup.QueryCapacity() + m_RemoteRunnerGroup.QueryCapacity();
-}
-
-std::vector<SubmitResult>
-FunctionServiceSession::Impl::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
-{
- std::vector<SubmitResult> Results;
-
- for (const Ref<RunnerAction>& Action : Actions)
- {
- Results.push_back(SubmitAction(Action));
- }
-
- return Results;
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-FunctionServiceSession::FunctionServiceSession(ChunkResolver& InChunkResolver)
-{
- m_Impl = std::make_unique<Impl>(this, InChunkResolver);
-}
-
-FunctionServiceSession::~FunctionServiceSession()
-{
- Shutdown();
-}
-
-bool
-FunctionServiceSession::IsHealthy()
-{
- return m_Impl->IsHealthy();
-}
-
-void
-FunctionServiceSession::Shutdown()
-{
- m_Impl->Shutdown();
-}
-
-void
-FunctionServiceSession::StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath)
-{
- m_Impl->StartRecording(InResolver, RecordingPath);
-}
-
-void
-FunctionServiceSession::StopRecording()
-{
- m_Impl->StopRecording();
-}
-
-void
-FunctionServiceSession::EmitStats(CbObjectWriter& Cbo)
-{
- m_Impl->EmitStats(Cbo);
-}
-
-std::vector<IoHash>
-FunctionServiceSession::GetKnownWorkerIds()
-{
- return m_Impl->GetKnownWorkerIds();
-}
-
-WorkerDesc
-FunctionServiceSession::GetWorkerDescriptor(const IoHash& WorkerId)
-{
- return m_Impl->GetWorkerDescriptor(WorkerId);
-}
-
-void
-FunctionServiceSession::AddLocalRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath)
-{
- m_Impl->m_LocalRunnerGroup.AddRunner(new LocalProcessRunner(InChunkResolver, BasePath));
-}
-
-void
-FunctionServiceSession::AddRemoteRunner(ChunkResolver& InChunkResolver, std::filesystem::path BasePath, std::string_view HostName)
-{
- m_Impl->m_RemoteRunnerGroup.AddRunner(new RemoteHttpRunner(InChunkResolver, BasePath, HostName));
-}
-
-FunctionServiceSession::EnqueueResult
-FunctionServiceSession::EnqueueAction(CbObject ActionObject, int Priority)
-{
- return m_Impl->EnqueueAction(ActionObject, Priority);
-}
-
-FunctionServiceSession::EnqueueResult
-FunctionServiceSession::EnqueueResolvedAction(WorkerDesc Worker, CbObject ActionObj, int RequestPriority)
-{
- return m_Impl->EnqueueResolvedAction(Worker, ActionObj, RequestPriority);
-}
-
-void
-FunctionServiceSession::RegisterWorker(CbPackage Worker)
-{
- m_Impl->RegisterWorker(Worker);
-}
-
-HttpResponseCode
-FunctionServiceSession::GetActionResult(int ActionLsn, CbPackage& OutResultPackage)
-{
- return m_Impl->GetActionResult(ActionLsn, OutResultPackage);
-}
-
-HttpResponseCode
-FunctionServiceSession::FindActionResult(const IoHash& ActionId, CbPackage& OutResultPackage)
-{
- return m_Impl->FindActionResult(ActionId, OutResultPackage);
-}
-
-std::vector<FunctionServiceSession::ActionHistoryEntry>
-FunctionServiceSession::GetActionHistory(int Limit)
-{
- return m_Impl->GetActionHistory(Limit);
-}
-
-void
-FunctionServiceSession::GetCompleted(CbWriter& Cbo)
-{
- m_Impl->GetCompleted(Cbo);
-}
-
-void
-FunctionServiceSession::PostUpdate(RunnerAction* Action)
-{
- m_Impl->PostUpdate(Action);
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-void
-function_forcelink()
-{
-}
-
-} // namespace zen::compute
-
-#endif // ZEN_WITH_COMPUTE_SERVICES