// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include "zencompute/computeservice.h" #if ZEN_WITH_COMPUTE_SERVICES # include "functionrunner.h" # include # include # include # include # include # include # include # include # include # include # include # include namespace zen { class CidStore; } namespace zen::compute { /** HTTP-based runner This implements a DDC remote compute execution strategy via REST API */ class RemoteHttpRunner : public FunctionRunner, private IWsClientHandler { RemoteHttpRunner(RemoteHttpRunner&&) = delete; RemoteHttpRunner& operator=(RemoteHttpRunner&&) = delete; public: RemoteHttpRunner(ChunkResolver& InChunkResolver, const std::filesystem::path& BaseDir, std::string_view HostName, WorkerThreadPool& InWorkerPool); ~RemoteHttpRunner(); virtual void Shutdown() override; [[nodiscard]] virtual bool RegisterWorker(const CbPackage& WorkerPackage) override; [[nodiscard]] virtual SubmitResult SubmitAction(Ref Action) override; [[nodiscard]] virtual bool IsHealthy() override; [[nodiscard]] virtual size_t GetSubmittedActionCount() override; [[nodiscard]] virtual size_t QueryCapacity() override; [[nodiscard]] virtual std::vector SubmitActions(const std::vector>& Actions) override; virtual void CancelRemoteQueue(int QueueId) override; std::string_view GetHostName() const { return m_HostName; } protected: LoggerRef Log() { return m_Log; } private: LoggerRef m_Log; ChunkResolver& m_ChunkResolver; WorkerThreadPool& m_WorkerPool; std::string m_HostName; std::string m_BaseUrl; HttpClient m_Http; std::atomic m_AcceptNewActions{true}; int32_t m_MaxRunningActions = 256; // arbitrary limit for testing int32_t m_MaxBatchSize = 50; struct HttpRunningAction { Ref Action; int RemoteActionLsn = 0; // Remote LSN RunnerAction::State RemoteState = RunnerAction::State::Failed; CbPackage ActionResults; }; RwLock m_RunningLock; std::unordered_map m_RemoteRunningMap; // Note that this is keyed on the *REMOTE* lsn std::thread m_MonitorThread; std::atomic m_MonitorThreadEnabled{true}; Event m_MonitorThreadEvent; void MonitorThreadFunction(); size_t SweepRunningActions(); RwLock m_QueueTokenLock; std::unordered_map m_RemoteQueueTokens; // local QueueId → remote queue token // Stable identity for this runner instance, used as part of the idempotency key when // creating remote queues. Generated once at construction and never changes. Oid m_InstanceId; // WebSocket completion notification client std::unique_ptr m_WsClient; std::atomic m_WsConnected{false}; // IWsClientHandler void OnWsOpen() override; void OnWsMessage(const WebSocketMessage& Msg) override; void OnWsClose(uint16_t Code, std::string_view Reason) override; Oid EnsureRemoteQueue(int QueueId, const CbObject& Metadata, const CbObject& Config); std::vector SubmitActionBatch(const std::string& SubmitUrl, const std::vector>& Actions); std::vector ParseBatchResponse(const HttpClient::Response& Response, const std::vector>& Actions); std::vector FallbackToIndividualSubmit(const std::vector>& Actions); }; } // namespace zen::compute #endif