aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/functionrunner.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencompute/functionrunner.h')
-rw-r--r--src/zencompute/functionrunner.h207
1 files changed, 0 insertions, 207 deletions
diff --git a/src/zencompute/functionrunner.h b/src/zencompute/functionrunner.h
deleted file mode 100644
index 6fd0d84cc..000000000
--- a/src/zencompute/functionrunner.h
+++ /dev/null
@@ -1,207 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencompute/functionservice.h>
-
-#if ZEN_WITH_COMPUTE_SERVICES
-
-# include <filesystem>
-# include <vector>
-
-namespace zen::compute {
-
-struct SubmitResult
-{
- bool IsAccepted = false;
- std::string Reason;
-};
-
-/** Base interface for classes implementing a remote execution "runner"
- */
-class FunctionRunner : public RefCounted
-{
- FunctionRunner(FunctionRunner&&) = delete;
- FunctionRunner& operator=(FunctionRunner&&) = delete;
-
-public:
- FunctionRunner(std::filesystem::path BasePath);
- virtual ~FunctionRunner() = 0;
-
- virtual void Shutdown() = 0;
- virtual void RegisterWorker(const CbPackage& WorkerPackage) = 0;
-
- [[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) = 0;
- [[nodiscard]] virtual size_t GetSubmittedActionCount() = 0;
- [[nodiscard]] virtual bool IsHealthy() = 0;
- [[nodiscard]] virtual size_t QueryCapacity();
- [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions);
-
-protected:
- std::filesystem::path m_ActionsPath;
- bool m_DumpActions = false;
- void MaybeDumpAction(int ActionLsn, const CbObject& ActionObject);
-};
-
-template<typename RunnerType>
-struct RunnerGroup
-{
- void AddRunner(RunnerType* Runner)
- {
- m_RunnersLock.WithExclusiveLock([&] { m_Runners.emplace_back(Runner); });
- }
- size_t QueryCapacity()
- {
- size_t TotalCapacity = 0;
- m_RunnersLock.WithSharedLock([&] {
- for (const auto& Runner : m_Runners)
- {
- TotalCapacity += Runner->QueryCapacity();
- }
- });
- return TotalCapacity;
- }
-
- SubmitResult SubmitAction(Ref<RunnerAction> Action)
- {
- RwLock::SharedLockScope _(m_RunnersLock);
-
- const int InitialIndex = m_NextSubmitIndex.load(std::memory_order_acquire);
- int Index = InitialIndex;
- const int RunnerCount = gsl::narrow<int>(m_Runners.size());
-
- if (RunnerCount == 0)
- {
- return {.IsAccepted = false, .Reason = "No runners available"};
- }
-
- do
- {
- while (Index >= RunnerCount)
- {
- Index -= RunnerCount;
- }
-
- auto& Runner = m_Runners[Index++];
-
- SubmitResult Result = Runner->SubmitAction(Action);
-
- if (Result.IsAccepted == true)
- {
- m_NextSubmitIndex = Index % RunnerCount;
-
- return Result;
- }
-
- while (Index >= RunnerCount)
- {
- Index -= RunnerCount;
- }
- } while (Index != InitialIndex);
-
- return {.IsAccepted = false};
- }
-
- size_t GetSubmittedActionCount()
- {
- RwLock::SharedLockScope _(m_RunnersLock);
-
- size_t TotalCount = 0;
-
- for (const auto& Runner : m_Runners)
- {
- TotalCount += Runner->GetSubmittedActionCount();
- }
-
- return TotalCount;
- }
-
- void RegisterWorker(CbPackage Worker)
- {
- RwLock::SharedLockScope _(m_RunnersLock);
-
- for (auto& Runner : m_Runners)
- {
- Runner->RegisterWorker(Worker);
- }
- }
-
- void Shutdown()
- {
- RwLock::SharedLockScope _(m_RunnersLock);
-
- for (auto& Runner : m_Runners)
- {
- Runner->Shutdown();
- }
- }
-
-private:
- RwLock m_RunnersLock;
- std::vector<Ref<RunnerType>> m_Runners;
- std::atomic<int> m_NextSubmitIndex{0};
-};
-
-/**
- * This represents an action going through different stages of scheduling and execution.
- */
-struct RunnerAction : public RefCounted
-{
- explicit RunnerAction(FunctionServiceSession* OwnerSession);
- ~RunnerAction();
-
- int ActionLsn = 0;
- WorkerDesc Worker;
- IoHash ActionId;
- CbObject ActionObj;
- int Priority = 0;
-
- enum class State
- {
- New,
- Pending,
- Running,
- Completed,
- Failed,
- _Count
- };
-
- static const char* ToString(State _)
- {
- switch (_)
- {
- case State::New:
- return "New";
- case State::Pending:
- return "Pending";
- case State::Running:
- return "Running";
- case State::Completed:
- return "Completed";
- case State::Failed:
- return "Failed";
- default:
- return "Unknown";
- }
- }
-
- uint64_t Timestamps[static_cast<int>(State::_Count)] = {};
-
- State ActionState() const { return m_ActionState; }
- void SetActionState(State NewState);
-
- bool IsSuccess() const { return ActionState() == State::Completed; }
- bool IsCompleted() const { return ActionState() == State::Completed || ActionState() == State::Failed; }
-
- void SetResult(CbPackage&& Result);
- CbPackage& GetResult();
-
-private:
- std::atomic<State> m_ActionState = State::New;
- FunctionServiceSession* m_OwnerSession = nullptr;
- CbPackage m_Result;
-};
-
-} // namespace zen::compute
-
-#endif // ZEN_WITH_COMPUTE_SERVICES \ No newline at end of file