// Copyright Epic Games, Inc. All Rights Reserved. #include "functionrunner.h" #if ZEN_WITH_COMPUTE_SERVICES # include # include # include # include # include namespace zen::compute { FunctionRunner::FunctionRunner(std::filesystem::path BasePath) : m_ActionsPath(BasePath / "actions") { } FunctionRunner::~FunctionRunner() = default; size_t FunctionRunner::QueryCapacity() { return 1; } std::vector FunctionRunner::SubmitActions(const std::vector>& Actions) { std::vector Results; Results.reserve(Actions.size()); for (const Ref& Action : Actions) { Results.push_back(SubmitAction(Action)); } return Results; } void FunctionRunner::MaybeDumpAction(int ActionLsn, const CbObject& ActionObject) { if (m_DumpActions) { std::string UniqueId = fmt::format("{}.ddb", ActionLsn); std::filesystem::path Path = m_ActionsPath / UniqueId; zen::WriteFile(Path, IoBuffer(ActionObject.GetBuffer().AsIoBuffer())); } } ////////////////////////////////////////////////////////////////////////// void BaseRunnerGroup::AddRunnerInternal(FunctionRunner* Runner) { m_RunnersLock.WithExclusiveLock([&] { m_Runners.emplace_back(Runner); }); } size_t BaseRunnerGroup::QueryCapacity() { size_t TotalCapacity = 0; m_RunnersLock.WithSharedLock([&] { for (const auto& Runner : m_Runners) { TotalCapacity += Runner->QueryCapacity(); } }); return TotalCapacity; } SubmitResult BaseRunnerGroup::SubmitAction(Ref Action) { ZEN_TRACE_CPU("BaseRunnerGroup::SubmitAction"); RwLock::SharedLockScope _(m_RunnersLock); const int InitialIndex = m_NextSubmitIndex.load(std::memory_order_acquire); int Index = InitialIndex; const int RunnerCount = gsl::narrow(m_Runners.size()); if (RunnerCount == 0) { return {.IsAccepted = false, .Reason = "No runners available"}; } do { while (Index >= RunnerCount) { Index -= RunnerCount; } auto& Runner = m_Runners[Index++]; SubmitResult Result = Runner->SubmitAction(Action); if (Result.IsAccepted == true) { m_NextSubmitIndex = Index % RunnerCount; return Result; } while (Index >= RunnerCount) { Index -= RunnerCount; } } while (Index != InitialIndex); return {.IsAccepted = false}; } std::vector BaseRunnerGroup::SubmitActions(const std::vector>& Actions) { ZEN_TRACE_CPU("BaseRunnerGroup::SubmitActions"); RwLock::SharedLockScope _(m_RunnersLock); const int RunnerCount = gsl::narrow(m_Runners.size()); if (RunnerCount == 0) { return std::vector(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "No runners available"}); } // Query capacity per runner and compute total std::vector Capacities(RunnerCount); size_t TotalCapacity = 0; for (int i = 0; i < RunnerCount; ++i) { Capacities[i] = m_Runners[i]->QueryCapacity(); TotalCapacity += Capacities[i]; } if (TotalCapacity == 0) { return std::vector(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "No capacity"}); } // Distribute actions across runners proportionally to their available capacity std::vector>> PerRunnerActions(RunnerCount); std::vector ActionRunnerIndex(Actions.size()); size_t ActionIdx = 0; for (int i = 0; i < RunnerCount; ++i) { if (Capacities[i] == 0) { continue; } size_t Share = (Actions.size() * Capacities[i] + TotalCapacity - 1) / TotalCapacity; Share = std::min(Share, Capacities[i]); for (size_t j = 0; j < Share && ActionIdx < Actions.size(); ++j, ++ActionIdx) { PerRunnerActions[i].push_back(Actions[ActionIdx]); ActionRunnerIndex[ActionIdx] = i; } } // Assign any remaining actions to runners with capacity (round-robin) for (int i = 0; ActionIdx < Actions.size(); i = (i + 1) % RunnerCount) { if (Capacities[i] > PerRunnerActions[i].size()) { PerRunnerActions[i].push_back(Actions[ActionIdx]); ActionRunnerIndex[ActionIdx] = i; ++ActionIdx; } } // Submit batches per runner std::vector> PerRunnerResults(RunnerCount); for (int i = 0; i < RunnerCount; ++i) { if (!PerRunnerActions[i].empty()) { PerRunnerResults[i] = m_Runners[i]->SubmitActions(PerRunnerActions[i]); } } // Reassemble results in original action order std::vector Results(Actions.size()); std::vector PerRunnerIdx(RunnerCount, 0); for (size_t i = 0; i < Actions.size(); ++i) { size_t RunnerIdx = ActionRunnerIndex[i]; size_t Idx = PerRunnerIdx[RunnerIdx]++; Results[i] = std::move(PerRunnerResults[RunnerIdx][Idx]); } return Results; } size_t BaseRunnerGroup::GetSubmittedActionCount() { RwLock::SharedLockScope _(m_RunnersLock); size_t TotalCount = 0; for (const auto& Runner : m_Runners) { TotalCount += Runner->GetSubmittedActionCount(); } return TotalCount; } void BaseRunnerGroup::RegisterWorker(CbPackage Worker) { RwLock::SharedLockScope _(m_RunnersLock); for (auto& Runner : m_Runners) { Runner->RegisterWorker(Worker); } } void BaseRunnerGroup::Shutdown() { RwLock::SharedLockScope _(m_RunnersLock); for (auto& Runner : m_Runners) { Runner->Shutdown(); } } ////////////////////////////////////////////////////////////////////////// RunnerAction::RunnerAction(FunctionServiceSession* OwnerSession) : m_OwnerSession(OwnerSession) { this->Timestamps[static_cast(State::New)] = DateTime::Now().GetTicks(); } RunnerAction::~RunnerAction() { } void RunnerAction::SetActionState(State NewState) { ZEN_ASSERT(NewState < State::_Count); this->Timestamps[static_cast(NewState)] = DateTime::Now().GetTicks(); do { if (State CurrentState = m_ActionState.load(); CurrentState == NewState) { // No state change return; } else { if (NewState <= CurrentState) { // Cannot transition to an earlier or same state return; } if (m_ActionState.compare_exchange_strong(CurrentState, NewState)) { // Successful state change m_OwnerSession->PostUpdate(this); return; } } } while (true); } void RunnerAction::SetResult(CbPackage&& Result) { m_Result = std::move(Result); } CbPackage& RunnerAction::GetResult() { ZEN_ASSERT(IsCompleted()); return m_Result; } } // namespace zen::compute #endif // ZEN_WITH_COMPUTE_SERVICES