// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include #if ZEN_WITH_COMPUTE_SERVICES # include # include namespace zen::compute { struct SubmitResult { bool IsAccepted = false; std::string Reason; }; /** Base interface for classes implementing a remote execution "runner" */ class FunctionRunner : public RefCounted { FunctionRunner(FunctionRunner&&) = delete; FunctionRunner& operator=(FunctionRunner&&) = delete; public: FunctionRunner(std::filesystem::path BasePath); virtual ~FunctionRunner() = 0; virtual void Shutdown() = 0; virtual void RegisterWorker(const CbPackage& WorkerPackage) = 0; [[nodiscard]] virtual SubmitResult SubmitAction(Ref Action) = 0; [[nodiscard]] virtual size_t GetSubmittedActionCount() = 0; [[nodiscard]] virtual bool IsHealthy() = 0; [[nodiscard]] virtual size_t QueryCapacity(); [[nodiscard]] virtual std::vector SubmitActions(const std::vector>& Actions); protected: std::filesystem::path m_ActionsPath; bool m_DumpActions = false; void MaybeDumpAction(int ActionLsn, const CbObject& ActionObject); }; template struct RunnerGroup { void AddRunner(RunnerType* Runner) { m_RunnersLock.WithExclusiveLock([&] { m_Runners.emplace_back(Runner); }); } size_t QueryCapacity() { size_t TotalCapacity = 0; m_RunnersLock.WithSharedLock([&] { for (const auto& Runner : m_Runners) { TotalCapacity += Runner->QueryCapacity(); } }); return TotalCapacity; } SubmitResult SubmitAction(Ref Action) { RwLock::SharedLockScope _(m_RunnersLock); const int InitialIndex = m_NextSubmitIndex.load(std::memory_order_acquire); int Index = InitialIndex; const int RunnerCount = gsl::narrow(m_Runners.size()); if (RunnerCount == 0) { return {.IsAccepted = false, .Reason = "No runners available"}; } do { while (Index >= RunnerCount) { Index -= RunnerCount; } auto& Runner = m_Runners[Index++]; SubmitResult Result = Runner->SubmitAction(Action); if (Result.IsAccepted == true) { m_NextSubmitIndex = Index % RunnerCount; return Result; } while (Index >= RunnerCount) { Index -= RunnerCount; } } while (Index != InitialIndex); return {.IsAccepted = false}; } size_t GetSubmittedActionCount() { RwLock::SharedLockScope _(m_RunnersLock); size_t TotalCount = 0; for (const auto& Runner : m_Runners) { TotalCount += Runner->GetSubmittedActionCount(); } return TotalCount; } void RegisterWorker(CbPackage Worker) { RwLock::SharedLockScope _(m_RunnersLock); for (auto& Runner : m_Runners) { Runner->RegisterWorker(Worker); } } void Shutdown() { RwLock::SharedLockScope _(m_RunnersLock); for (auto& Runner : m_Runners) { Runner->Shutdown(); } } private: RwLock m_RunnersLock; std::vector> m_Runners; std::atomic m_NextSubmitIndex{0}; }; /** * This represents an action going through different stages of scheduling and execution. */ struct RunnerAction : public RefCounted { explicit RunnerAction(FunctionServiceSession* OwnerSession); ~RunnerAction(); int ActionLsn = 0; WorkerDesc Worker; IoHash ActionId; CbObject ActionObj; int Priority = 0; enum class State { New, Pending, Running, Completed, Failed, _Count }; static const char* ToString(State _) { switch (_) { case State::New: return "New"; case State::Pending: return "Pending"; case State::Running: return "Running"; case State::Completed: return "Completed"; case State::Failed: return "Failed"; default: return "Unknown"; } } uint64_t Timestamps[static_cast(State::_Count)] = {}; State ActionState() const { return m_ActionState; } void SetActionState(State NewState); bool IsSuccess() const { return ActionState() == State::Completed; } bool IsCompleted() const { return ActionState() == State::Completed || ActionState() == State::Failed; } void SetResult(CbPackage&& Result); CbPackage& GetResult(); private: std::atomic m_ActionState = State::New; FunctionServiceSession* m_OwnerSession = nullptr; CbPackage m_Result; }; } // namespace zen::compute #endif // ZEN_WITH_COMPUTE_SERVICES