aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/runners/functionrunner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencompute/runners/functionrunner.cpp')
-rw-r--r--src/zencompute/runners/functionrunner.cpp365
1 files changed, 365 insertions, 0 deletions
diff --git a/src/zencompute/runners/functionrunner.cpp b/src/zencompute/runners/functionrunner.cpp
new file mode 100644
index 000000000..768cdf1e1
--- /dev/null
+++ b/src/zencompute/runners/functionrunner.cpp
@@ -0,0 +1,365 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "functionrunner.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <zencore/compactbinary.h>
+# include <zencore/filesystem.h>
+# include <zencore/trace.h>
+
+# include <fmt/format.h>
+# include <vector>
+
+namespace zen::compute {
+
+FunctionRunner::FunctionRunner(std::filesystem::path BasePath) : m_ActionsPath(BasePath / "actions")
+{
+}
+
+FunctionRunner::~FunctionRunner() = default;
+
+size_t
+FunctionRunner::QueryCapacity()
+{
+ return 1;
+}
+
+std::vector<SubmitResult>
+FunctionRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
+{
+ std::vector<SubmitResult> Results;
+ Results.reserve(Actions.size());
+
+ for (const Ref<RunnerAction>& Action : Actions)
+ {
+ Results.push_back(SubmitAction(Action));
+ }
+
+ return Results;
+}
+
+void
+FunctionRunner::MaybeDumpAction(int ActionLsn, const CbObject& ActionObject)
+{
+ if (m_DumpActions)
+ {
+ std::string UniqueId = fmt::format("{}.ddb", ActionLsn);
+ std::filesystem::path Path = m_ActionsPath / UniqueId;
+
+ zen::WriteFile(Path, IoBuffer(ActionObject.GetBuffer().AsIoBuffer()));
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+BaseRunnerGroup::AddRunnerInternal(FunctionRunner* Runner)
+{
+ m_RunnersLock.WithExclusiveLock([&] { m_Runners.emplace_back(Runner); });
+}
+
+size_t
+BaseRunnerGroup::QueryCapacity()
+{
+ size_t TotalCapacity = 0;
+ m_RunnersLock.WithSharedLock([&] {
+ for (const auto& Runner : m_Runners)
+ {
+ TotalCapacity += Runner->QueryCapacity();
+ }
+ });
+ return TotalCapacity;
+}
+
+SubmitResult
+BaseRunnerGroup::SubmitAction(Ref<RunnerAction> Action)
+{
+ ZEN_TRACE_CPU("BaseRunnerGroup::SubmitAction");
+ RwLock::SharedLockScope _(m_RunnersLock);
+
+ const int InitialIndex = m_NextSubmitIndex.load(std::memory_order_acquire);
+ int Index = InitialIndex;
+ const int RunnerCount = gsl::narrow<int>(m_Runners.size());
+
+ if (RunnerCount == 0)
+ {
+ return {.IsAccepted = false, .Reason = "No runners available"};
+ }
+
+ do
+ {
+ while (Index >= RunnerCount)
+ {
+ Index -= RunnerCount;
+ }
+
+ auto& Runner = m_Runners[Index++];
+
+ SubmitResult Result = Runner->SubmitAction(Action);
+
+ if (Result.IsAccepted == true)
+ {
+ m_NextSubmitIndex = Index % RunnerCount;
+
+ return Result;
+ }
+
+ while (Index >= RunnerCount)
+ {
+ Index -= RunnerCount;
+ }
+ } while (Index != InitialIndex);
+
+ return {.IsAccepted = false};
+}
+
+std::vector<SubmitResult>
+BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
+{
+ ZEN_TRACE_CPU("BaseRunnerGroup::SubmitActions");
+ RwLock::SharedLockScope _(m_RunnersLock);
+
+ const int RunnerCount = gsl::narrow<int>(m_Runners.size());
+
+ if (RunnerCount == 0)
+ {
+ return std::vector<SubmitResult>(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "No runners available"});
+ }
+
+ // Query capacity per runner and compute total
+ std::vector<size_t> Capacities(RunnerCount);
+ size_t TotalCapacity = 0;
+
+ for (int i = 0; i < RunnerCount; ++i)
+ {
+ Capacities[i] = m_Runners[i]->QueryCapacity();
+ TotalCapacity += Capacities[i];
+ }
+
+ if (TotalCapacity == 0)
+ {
+ return std::vector<SubmitResult>(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "No capacity"});
+ }
+
+ // Distribute actions across runners proportionally to their available capacity
+ std::vector<std::vector<Ref<RunnerAction>>> PerRunnerActions(RunnerCount);
+ std::vector<size_t> ActionRunnerIndex(Actions.size());
+ size_t ActionIdx = 0;
+
+ for (int i = 0; i < RunnerCount; ++i)
+ {
+ if (Capacities[i] == 0)
+ {
+ continue;
+ }
+
+ size_t Share = (Actions.size() * Capacities[i] + TotalCapacity - 1) / TotalCapacity;
+ Share = std::min(Share, Capacities[i]);
+
+ for (size_t j = 0; j < Share && ActionIdx < Actions.size(); ++j, ++ActionIdx)
+ {
+ PerRunnerActions[i].push_back(Actions[ActionIdx]);
+ ActionRunnerIndex[ActionIdx] = i;
+ }
+ }
+
+ // Assign any remaining actions to runners with capacity (round-robin)
+ for (int i = 0; ActionIdx < Actions.size(); i = (i + 1) % RunnerCount)
+ {
+ if (Capacities[i] > PerRunnerActions[i].size())
+ {
+ PerRunnerActions[i].push_back(Actions[ActionIdx]);
+ ActionRunnerIndex[ActionIdx] = i;
+ ++ActionIdx;
+ }
+ }
+
+ // Submit batches per runner
+ std::vector<std::vector<SubmitResult>> PerRunnerResults(RunnerCount);
+
+ for (int i = 0; i < RunnerCount; ++i)
+ {
+ if (!PerRunnerActions[i].empty())
+ {
+ PerRunnerResults[i] = m_Runners[i]->SubmitActions(PerRunnerActions[i]);
+ }
+ }
+
+ // Reassemble results in original action order
+ std::vector<SubmitResult> Results(Actions.size());
+ std::vector<size_t> PerRunnerIdx(RunnerCount, 0);
+
+ for (size_t i = 0; i < Actions.size(); ++i)
+ {
+ size_t RunnerIdx = ActionRunnerIndex[i];
+ size_t Idx = PerRunnerIdx[RunnerIdx]++;
+ Results[i] = std::move(PerRunnerResults[RunnerIdx][Idx]);
+ }
+
+ return Results;
+}
+
+size_t
+BaseRunnerGroup::GetSubmittedActionCount()
+{
+ RwLock::SharedLockScope _(m_RunnersLock);
+
+ size_t TotalCount = 0;
+
+ for (const auto& Runner : m_Runners)
+ {
+ TotalCount += Runner->GetSubmittedActionCount();
+ }
+
+ return TotalCount;
+}
+
+void
+BaseRunnerGroup::RegisterWorker(CbPackage Worker)
+{
+ RwLock::SharedLockScope _(m_RunnersLock);
+
+ for (auto& Runner : m_Runners)
+ {
+ Runner->RegisterWorker(Worker);
+ }
+}
+
+void
+BaseRunnerGroup::Shutdown()
+{
+ RwLock::SharedLockScope _(m_RunnersLock);
+
+ for (auto& Runner : m_Runners)
+ {
+ Runner->Shutdown();
+ }
+}
+
+bool
+BaseRunnerGroup::CancelAction(int ActionLsn)
+{
+ RwLock::SharedLockScope _(m_RunnersLock);
+
+ for (auto& Runner : m_Runners)
+ {
+ if (Runner->CancelAction(ActionLsn))
+ {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+void
+BaseRunnerGroup::CancelRemoteQueue(int QueueId)
+{
+ RwLock::SharedLockScope _(m_RunnersLock);
+
+ for (auto& Runner : m_Runners)
+ {
+ Runner->CancelRemoteQueue(QueueId);
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+RunnerAction::RunnerAction(ComputeServiceSession* OwnerSession) : m_OwnerSession(OwnerSession)
+{
+ this->Timestamps[static_cast<int>(State::New)] = DateTime::Now().GetTicks();
+}
+
+RunnerAction::~RunnerAction()
+{
+}
+
+bool
+RunnerAction::ResetActionStateToPending()
+{
+ // Only allow reset from Failed or Abandoned states
+ State CurrentState = m_ActionState.load();
+
+ if (CurrentState != State::Failed && CurrentState != State::Abandoned)
+ {
+ return false;
+ }
+
+ if (!m_ActionState.compare_exchange_strong(CurrentState, State::Pending))
+ {
+ return false;
+ }
+
+ // Clear timestamps from Submitting through _Count
+ for (int i = static_cast<int>(State::Submitting); i < static_cast<int>(State::_Count); ++i)
+ {
+ this->Timestamps[i] = 0;
+ }
+
+ // Record new Pending timestamp
+ this->Timestamps[static_cast<int>(State::Pending)] = DateTime::Now().GetTicks();
+
+ // Clear execution fields
+ ExecutionLocation.clear();
+ CpuUsagePercent.store(-1.0f, std::memory_order_relaxed);
+ CpuSeconds.store(0.0f, std::memory_order_relaxed);
+
+ // Increment retry count
+ RetryCount.fetch_add(1, std::memory_order_relaxed);
+
+ // Re-enter the scheduler pipeline
+ m_OwnerSession->PostUpdate(this);
+
+ return true;
+}
+
+void
+RunnerAction::SetActionState(State NewState)
+{
+ ZEN_ASSERT(NewState < State::_Count);
+ this->Timestamps[static_cast<int>(NewState)] = DateTime::Now().GetTicks();
+
+ do
+ {
+ if (State CurrentState = m_ActionState.load(); CurrentState == NewState)
+ {
+ // No state change
+ return;
+ }
+ else
+ {
+ if (NewState <= CurrentState)
+ {
+ // Cannot transition to an earlier or same state
+ return;
+ }
+
+ if (m_ActionState.compare_exchange_strong(CurrentState, NewState))
+ {
+ // Successful state change
+
+ m_OwnerSession->PostUpdate(this);
+
+ return;
+ }
+ }
+ } while (true);
+}
+
+void
+RunnerAction::SetResult(CbPackage&& Result)
+{
+ m_Result = std::move(Result);
+}
+
+CbPackage&
+RunnerAction::GetResult()
+{
+ ZEN_ASSERT(IsCompleted());
+ return m_Result;
+}
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES \ No newline at end of file