diff options
Diffstat (limited to 'src/zencompute/remotehttprunner.h')
| -rw-r--r-- | src/zencompute/remotehttprunner.h | 80 |
1 files changed, 80 insertions, 0 deletions
diff --git a/src/zencompute/remotehttprunner.h b/src/zencompute/remotehttprunner.h new file mode 100644 index 000000000..1e885da3d --- /dev/null +++ b/src/zencompute/remotehttprunner.h @@ -0,0 +1,80 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "zencompute/functionservice.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include "functionrunner.h" + +# include <zencore/compactbinarypackage.h> +# include <zencore/logging.h> +# include <zencore/zencore.h> +# include <zenhttp/httpclient.h> + +# include <atomic> +# include <filesystem> +# include <thread> + +namespace zen { +class CidStore; +} + +namespace zen::compute { + +/** HTTP-based runner + + This implements a DDC remote compute execution strategy via REST API + + */ + +class RemoteHttpRunner : public FunctionRunner +{ + RemoteHttpRunner(RemoteHttpRunner&&) = delete; + RemoteHttpRunner& operator=(RemoteHttpRunner&&) = delete; + +public: + RemoteHttpRunner(ChunkResolver& InChunkResolver, const std::filesystem::path& BaseDir, std::string_view HostName); + ~RemoteHttpRunner(); + + virtual void Shutdown() override; + virtual void RegisterWorker(const CbPackage& WorkerPackage) override; + [[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) override; + [[nodiscard]] virtual bool IsHealthy() override; + [[nodiscard]] virtual size_t GetSubmittedActionCount() override; + [[nodiscard]] virtual size_t QueryCapacity() override; + [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) override; + +protected: + LoggerRef Log() { return m_Log; } + +private: + LoggerRef m_Log; + ChunkResolver& m_ChunkResolver; + std::string m_BaseUrl; + HttpClient m_Http; + + int32_t m_MaxRunningActions = 256; // arbitrary limit for testing + + struct HttpRunningAction + { + Ref<RunnerAction> Action; + int RemoteActionLsn = 0; // Remote LSN + bool Success = false; + CbPackage ActionResults; + }; + + RwLock m_RunningLock; + std::unordered_map<int, HttpRunningAction> m_RemoteRunningMap; // Note that this is keyed on the *REMOTE* lsn + + std::thread m_MonitorThread; + std::atomic<bool> m_MonitorThreadEnabled{true}; + Event m_MonitorThreadEvent; + void MonitorThreadFunction(); + size_t SweepRunningActions(); +}; + +} // namespace zen::compute + +#endif |