// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include #if ZEN_WITH_COMPUTE_SERVICES # include # include namespace zen::compute { struct SubmitResult { bool IsAccepted = false; std::string Reason; }; /** Base interface for classes implementing a remote execution "runner" */ class FunctionRunner : public RefCounted { FunctionRunner(FunctionRunner&&) = delete; FunctionRunner& operator=(FunctionRunner&&) = delete; public: FunctionRunner(std::filesystem::path BasePath); virtual ~FunctionRunner() = 0; virtual void Shutdown() = 0; virtual void RegisterWorker(const CbPackage& WorkerPackage) = 0; [[nodiscard]] virtual SubmitResult SubmitAction(Ref Action) = 0; [[nodiscard]] virtual size_t GetSubmittedActionCount() = 0; [[nodiscard]] virtual bool IsHealthy() = 0; [[nodiscard]] virtual size_t QueryCapacity(); [[nodiscard]] virtual std::vector SubmitActions(const std::vector>& Actions); protected: std::filesystem::path m_ActionsPath; bool m_DumpActions = false; void MaybeDumpAction(int ActionLsn, const CbObject& ActionObject); }; /** Base class for RunnerGroup that operates on generic FunctionRunner references. * All scheduling, capacity, and lifecycle logic lives here. */ class BaseRunnerGroup { public: size_t QueryCapacity(); SubmitResult SubmitAction(Ref Action); std::vector SubmitActions(const std::vector>& Actions); size_t GetSubmittedActionCount(); void RegisterWorker(CbPackage Worker); void Shutdown(); size_t GetRunnerCount() { return m_RunnersLock.WithSharedLock([this] { return m_Runners.size(); }); } protected: void AddRunnerInternal(FunctionRunner* Runner); RwLock m_RunnersLock; std::vector> m_Runners; std::atomic m_NextSubmitIndex{0}; }; /** Typed RunnerGroup that adds type-safe runner addition and predicate-based removal. */ template struct RunnerGroup : public BaseRunnerGroup { void AddRunner(RunnerType* Runner) { AddRunnerInternal(Runner); } template size_t RemoveRunnerIf(Predicate&& Pred) { size_t RemovedCount = 0; m_RunnersLock.WithExclusiveLock([&] { auto It = m_Runners.begin(); while (It != m_Runners.end()) { if (Pred(static_cast(**It))) { (*It)->Shutdown(); It = m_Runners.erase(It); ++RemovedCount; } else { ++It; } } }); return RemovedCount; } }; /** * This represents an action going through different stages of scheduling and execution. */ struct RunnerAction : public RefCounted { explicit RunnerAction(FunctionServiceSession* OwnerSession); ~RunnerAction(); int ActionLsn = 0; WorkerDesc Worker; IoHash ActionId; CbObject ActionObj; int Priority = 0; enum class State { New, Pending, Submitting, Running, Completed, Failed, _Count }; static const char* ToString(State _) { switch (_) { case State::New: return "New"; case State::Pending: return "Pending"; case State::Submitting: return "Submitting"; case State::Running: return "Running"; case State::Completed: return "Completed"; case State::Failed: return "Failed"; default: return "Unknown"; } } uint64_t Timestamps[static_cast(State::_Count)] = {}; State ActionState() const { return m_ActionState; } void SetActionState(State NewState); bool IsSuccess() const { return ActionState() == State::Completed; } bool IsCompleted() const { return ActionState() == State::Completed || ActionState() == State::Failed; } void SetResult(CbPackage&& Result); CbPackage& GetResult(); private: std::atomic m_ActionState = State::New; FunctionServiceSession* m_OwnerSession = nullptr; CbPackage m_Result; }; } // namespace zen::compute #endif // ZEN_WITH_COMPUTE_SERVICES