diff options
Diffstat (limited to 'src/zencompute/functionrunner.h')
| -rw-r--r-- | src/zencompute/functionrunner.h | 207 |
1 files changed, 0 insertions, 207 deletions
diff --git a/src/zencompute/functionrunner.h b/src/zencompute/functionrunner.h deleted file mode 100644 index 6fd0d84cc..000000000 --- a/src/zencompute/functionrunner.h +++ /dev/null @@ -1,207 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencompute/functionservice.h> - -#if ZEN_WITH_COMPUTE_SERVICES - -# include <filesystem> -# include <vector> - -namespace zen::compute { - -struct SubmitResult -{ - bool IsAccepted = false; - std::string Reason; -}; - -/** Base interface for classes implementing a remote execution "runner" - */ -class FunctionRunner : public RefCounted -{ - FunctionRunner(FunctionRunner&&) = delete; - FunctionRunner& operator=(FunctionRunner&&) = delete; - -public: - FunctionRunner(std::filesystem::path BasePath); - virtual ~FunctionRunner() = 0; - - virtual void Shutdown() = 0; - virtual void RegisterWorker(const CbPackage& WorkerPackage) = 0; - - [[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) = 0; - [[nodiscard]] virtual size_t GetSubmittedActionCount() = 0; - [[nodiscard]] virtual bool IsHealthy() = 0; - [[nodiscard]] virtual size_t QueryCapacity(); - [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions); - -protected: - std::filesystem::path m_ActionsPath; - bool m_DumpActions = false; - void MaybeDumpAction(int ActionLsn, const CbObject& ActionObject); -}; - -template<typename RunnerType> -struct RunnerGroup -{ - void AddRunner(RunnerType* Runner) - { - m_RunnersLock.WithExclusiveLock([&] { m_Runners.emplace_back(Runner); }); - } - size_t QueryCapacity() - { - size_t TotalCapacity = 0; - m_RunnersLock.WithSharedLock([&] { - for (const auto& Runner : m_Runners) - { - TotalCapacity += Runner->QueryCapacity(); - } - }); - return TotalCapacity; - } - - SubmitResult SubmitAction(Ref<RunnerAction> Action) - { - RwLock::SharedLockScope _(m_RunnersLock); - - const int InitialIndex = m_NextSubmitIndex.load(std::memory_order_acquire); - int Index = InitialIndex; - const int RunnerCount = gsl::narrow<int>(m_Runners.size()); - - if (RunnerCount == 0) - { - return {.IsAccepted = false, .Reason = "No runners available"}; - } - - do - { - while (Index >= RunnerCount) - { - Index -= RunnerCount; - } - - auto& Runner = m_Runners[Index++]; - - SubmitResult Result = Runner->SubmitAction(Action); - - if (Result.IsAccepted == true) - { - m_NextSubmitIndex = Index % RunnerCount; - - return Result; - } - - while (Index >= RunnerCount) - { - Index -= RunnerCount; - } - } while (Index != InitialIndex); - - return {.IsAccepted = false}; - } - - size_t GetSubmittedActionCount() - { - RwLock::SharedLockScope _(m_RunnersLock); - - size_t TotalCount = 0; - - for (const auto& Runner : m_Runners) - { - TotalCount += Runner->GetSubmittedActionCount(); - } - - return TotalCount; - } - - void RegisterWorker(CbPackage Worker) - { - RwLock::SharedLockScope _(m_RunnersLock); - - for (auto& Runner : m_Runners) - { - Runner->RegisterWorker(Worker); - } - } - - void Shutdown() - { - RwLock::SharedLockScope _(m_RunnersLock); - - for (auto& Runner : m_Runners) - { - Runner->Shutdown(); - } - } - -private: - RwLock m_RunnersLock; - std::vector<Ref<RunnerType>> m_Runners; - std::atomic<int> m_NextSubmitIndex{0}; -}; - -/** - * This represents an action going through different stages of scheduling and execution. - */ -struct RunnerAction : public RefCounted -{ - explicit RunnerAction(FunctionServiceSession* OwnerSession); - ~RunnerAction(); - - int ActionLsn = 0; - WorkerDesc Worker; - IoHash ActionId; - CbObject ActionObj; - int Priority = 0; - - enum class State - { - New, - Pending, - Running, - Completed, - Failed, - _Count - }; - - static const char* ToString(State _) - { - switch (_) - { - case State::New: - return "New"; - case State::Pending: - return "Pending"; - case State::Running: - return "Running"; - case State::Completed: - return "Completed"; - case State::Failed: - return "Failed"; - default: - return "Unknown"; - } - } - - uint64_t Timestamps[static_cast<int>(State::_Count)] = {}; - - State ActionState() const { return m_ActionState; } - void SetActionState(State NewState); - - bool IsSuccess() const { return ActionState() == State::Completed; } - bool IsCompleted() const { return ActionState() == State::Completed || ActionState() == State::Failed; } - - void SetResult(CbPackage&& Result); - CbPackage& GetResult(); - -private: - std::atomic<State> m_ActionState = State::New; - FunctionServiceSession* m_OwnerSession = nullptr; - CbPackage m_Result; -}; - -} // namespace zen::compute - -#endif // ZEN_WITH_COMPUTE_SERVICES
\ No newline at end of file |