// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include "zencompute/computeservice.h" #if ZEN_WITH_COMPUTE_SERVICES # include "functionrunner.h" # include # include # include # include # include # include # include # include # include # include namespace zen { class CidStore; } namespace zen::compute { /** HTTP-based runner This implements a DDC remote compute execution strategy via REST API */ class RemoteHttpRunner : public FunctionRunner { RemoteHttpRunner(RemoteHttpRunner&&) = delete; RemoteHttpRunner& operator=(RemoteHttpRunner&&) = delete; public: RemoteHttpRunner(ChunkResolver& InChunkResolver, const std::filesystem::path& BaseDir, std::string_view HostName, WorkerThreadPool& InWorkerPool); ~RemoteHttpRunner(); virtual void Shutdown() override; virtual void RegisterWorker(const CbPackage& WorkerPackage) override; [[nodiscard]] virtual SubmitResult SubmitAction(Ref Action) override; [[nodiscard]] virtual bool IsHealthy() override; [[nodiscard]] virtual size_t GetSubmittedActionCount() override; [[nodiscard]] virtual size_t QueryCapacity() override; [[nodiscard]] virtual std::vector SubmitActions(const std::vector>& Actions) override; virtual void CancelRemoteQueue(int QueueId) override; std::string_view GetHostName() const { return m_HostName; } protected: LoggerRef Log() { return m_Log; } private: LoggerRef m_Log; ChunkResolver& m_ChunkResolver; WorkerThreadPool& m_WorkerPool; std::string m_HostName; std::string m_BaseUrl; HttpClient m_Http; int32_t m_MaxRunningActions = 256; // arbitrary limit for testing struct HttpRunningAction { Ref Action; int RemoteActionLsn = 0; // Remote LSN RunnerAction::State RemoteState = RunnerAction::State::Failed; CbPackage ActionResults; }; RwLock m_RunningLock; std::unordered_map m_RemoteRunningMap; // Note that this is keyed on the *REMOTE* lsn std::thread m_MonitorThread; std::atomic m_MonitorThreadEnabled{true}; Event m_MonitorThreadEvent; void MonitorThreadFunction(); size_t SweepRunningActions(); RwLock m_QueueTokenLock; std::unordered_map m_RemoteQueueTokens; // local QueueId → remote queue token // Stable identity for this runner instance, used as part of the idempotency key when // creating remote queues. Generated once at construction and never changes. Oid m_InstanceId; Oid EnsureRemoteQueue(int QueueId, const CbObject& Metadata, const CbObject& Config); }; } // namespace zen::compute #endif