diff options
| author | zousar <[email protected]> | 2026-02-18 23:19:14 -0700 |
|---|---|---|
| committer | zousar <[email protected]> | 2026-02-18 23:19:14 -0700 |
| commit | 2ba28acaf034722452f82cfb07afc0a4bb90eeab (patch) | |
| tree | c00dea385597180673be6e02aca6c07d9ef6ec00 /src/zencompute/functionrunner.h | |
| parent | updatefrontend (diff) | |
| parent | structured compute basics (#714) (diff) | |
| download | zen-2ba28acaf034722452f82cfb07afc0a4bb90eeab.tar.xz zen-2ba28acaf034722452f82cfb07afc0a4bb90eeab.zip | |
Merge branch 'main' into zs/web-ui-improvements
Diffstat (limited to 'src/zencompute/functionrunner.h')
| -rw-r--r-- | src/zencompute/functionrunner.h | 207 |
1 files changed, 207 insertions, 0 deletions
diff --git a/src/zencompute/functionrunner.h b/src/zencompute/functionrunner.h new file mode 100644 index 000000000..6fd0d84cc --- /dev/null +++ b/src/zencompute/functionrunner.h @@ -0,0 +1,207 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencompute/functionservice.h> + +#if ZEN_WITH_COMPUTE_SERVICES + +# include <filesystem> +# include <vector> + +namespace zen::compute { + +struct SubmitResult +{ + bool IsAccepted = false; + std::string Reason; +}; + +/** Base interface for classes implementing a remote execution "runner" + */ +class FunctionRunner : public RefCounted +{ + FunctionRunner(FunctionRunner&&) = delete; + FunctionRunner& operator=(FunctionRunner&&) = delete; + +public: + FunctionRunner(std::filesystem::path BasePath); + virtual ~FunctionRunner() = 0; + + virtual void Shutdown() = 0; + virtual void RegisterWorker(const CbPackage& WorkerPackage) = 0; + + [[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) = 0; + [[nodiscard]] virtual size_t GetSubmittedActionCount() = 0; + [[nodiscard]] virtual bool IsHealthy() = 0; + [[nodiscard]] virtual size_t QueryCapacity(); + [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions); + +protected: + std::filesystem::path m_ActionsPath; + bool m_DumpActions = false; + void MaybeDumpAction(int ActionLsn, const CbObject& ActionObject); +}; + +template<typename RunnerType> +struct RunnerGroup +{ + void AddRunner(RunnerType* Runner) + { + m_RunnersLock.WithExclusiveLock([&] { m_Runners.emplace_back(Runner); }); + } + size_t QueryCapacity() + { + size_t TotalCapacity = 0; + m_RunnersLock.WithSharedLock([&] { + for (const auto& Runner : m_Runners) + { + TotalCapacity += Runner->QueryCapacity(); + } + }); + return TotalCapacity; + } + + SubmitResult SubmitAction(Ref<RunnerAction> Action) + { + RwLock::SharedLockScope _(m_RunnersLock); + + const int InitialIndex = m_NextSubmitIndex.load(std::memory_order_acquire); + int Index = InitialIndex; + const int RunnerCount = gsl::narrow<int>(m_Runners.size()); + + if (RunnerCount == 0) + { + return {.IsAccepted = false, .Reason = "No runners available"}; + } + + do + { + while (Index >= RunnerCount) + { + Index -= RunnerCount; + } + + auto& Runner = m_Runners[Index++]; + + SubmitResult Result = Runner->SubmitAction(Action); + + if (Result.IsAccepted == true) + { + m_NextSubmitIndex = Index % RunnerCount; + + return Result; + } + + while (Index >= RunnerCount) + { + Index -= RunnerCount; + } + } while (Index != InitialIndex); + + return {.IsAccepted = false}; + } + + size_t GetSubmittedActionCount() + { + RwLock::SharedLockScope _(m_RunnersLock); + + size_t TotalCount = 0; + + for (const auto& Runner : m_Runners) + { + TotalCount += Runner->GetSubmittedActionCount(); + } + + return TotalCount; + } + + void RegisterWorker(CbPackage Worker) + { + RwLock::SharedLockScope _(m_RunnersLock); + + for (auto& Runner : m_Runners) + { + Runner->RegisterWorker(Worker); + } + } + + void Shutdown() + { + RwLock::SharedLockScope _(m_RunnersLock); + + for (auto& Runner : m_Runners) + { + Runner->Shutdown(); + } + } + +private: + RwLock m_RunnersLock; + std::vector<Ref<RunnerType>> m_Runners; + std::atomic<int> m_NextSubmitIndex{0}; +}; + +/** + * This represents an action going through different stages of scheduling and execution. + */ +struct RunnerAction : public RefCounted +{ + explicit RunnerAction(FunctionServiceSession* OwnerSession); + ~RunnerAction(); + + int ActionLsn = 0; + WorkerDesc Worker; + IoHash ActionId; + CbObject ActionObj; + int Priority = 0; + + enum class State + { + New, + Pending, + Running, + Completed, + Failed, + _Count + }; + + static const char* ToString(State _) + { + switch (_) + { + case State::New: + return "New"; + case State::Pending: + return "Pending"; + case State::Running: + return "Running"; + case State::Completed: + return "Completed"; + case State::Failed: + return "Failed"; + default: + return "Unknown"; + } + } + + uint64_t Timestamps[static_cast<int>(State::_Count)] = {}; + + State ActionState() const { return m_ActionState; } + void SetActionState(State NewState); + + bool IsSuccess() const { return ActionState() == State::Completed; } + bool IsCompleted() const { return ActionState() == State::Completed || ActionState() == State::Failed; } + + void SetResult(CbPackage&& Result); + CbPackage& GetResult(); + +private: + std::atomic<State> m_ActionState = State::New; + FunctionServiceSession* m_OwnerSession = nullptr; + CbPackage m_Result; +}; + +} // namespace zen::compute + +#endif // ZEN_WITH_COMPUTE_SERVICES
\ No newline at end of file |