diff options
Diffstat (limited to 'src/zenhorde/hordeagent.h')
| -rw-r--r-- | src/zenhorde/hordeagent.h | 128 |
1 files changed, 84 insertions, 44 deletions
diff --git a/src/zenhorde/hordeagent.h b/src/zenhorde/hordeagent.h index e0ae89ead..b581a8da1 100644 --- a/src/zenhorde/hordeagent.h +++ b/src/zenhorde/hordeagent.h @@ -10,68 +10,108 @@ #include <zencore/logbase.h> #include <filesystem> +#include <functional> #include <memory> #include <string> +#include <vector> + +namespace asio { +class io_context; +} namespace zen::horde { -/** Manages the lifecycle of a single Horde compute agent. +class AsyncComputeTransport; + +/** Result passed to the completion handler when an async agent finishes. */ +struct AsyncAgentResult +{ + bool Success = false; + int32_t ExitCode = -1; + uint16_t CoreCount = 0; ///< Logical cores on the provisioned machine +}; + +/** Completion handler for async agent lifecycle. */ +using AsyncAgentCompletionHandler = std::function<void(const AsyncAgentResult&)>; + +/** Configuration for launching a remote zenserver instance via an async agent. */ +struct AsyncAgentConfig +{ + MachineInfo Machine; + std::vector<std::pair<std::string, std::filesystem::path>> Bundles; ///< (locator, bundleDir) pairs + std::string Executable; + std::vector<std::string> Args; + bool UseWine = false; +}; + +/** Async agent that manages the full lifecycle of a single Horde compute connection. * - * Handles the full connection sequence for one provisioned machine: - * 1. Connect via TCP transport (with optional AES encryption wrapping) - * 2. Create a multiplexed ComputeSocket with agent (channel 0) and child (channel 100) - * 3. Perform the Attach/Fork handshake to establish the child channel - * 4. Upload zenserver binary via the WriteFiles/ReadBlob protocol - * 5. Execute zenserver remotely via ExecuteV2 - * 6. Poll for ExecuteOutput (stdout) and ExecuteResult (exit code) + * Driven by a state machine using callbacks on a shared io_context — no dedicated + * threads. Call Start() to begin the connection/handshake/upload/execute/poll + * sequence. The completion handler is invoked when the remote process exits or + * an error occurs. */ -class HordeAgent +class AsyncHordeAgent : public std::enable_shared_from_this<AsyncHordeAgent> { public: - explicit HordeAgent(const MachineInfo& Info); - ~HordeAgent(); + AsyncHordeAgent(asio::io_context& IoContext); + ~AsyncHordeAgent(); - HordeAgent(const HordeAgent&) = delete; - HordeAgent& operator=(const HordeAgent&) = delete; + AsyncHordeAgent(const AsyncHordeAgent&) = delete; + AsyncHordeAgent& operator=(const AsyncHordeAgent&) = delete; - /** Perform the channel setup handshake (Attach on agent channel, Fork, Attach on child channel). - * Returns false if the handshake times out or receives an unexpected message. */ - bool BeginCommunication(); + /** Start the full agent lifecycle. The completion handler is called exactly once. */ + void Start(AsyncAgentConfig Config, AsyncAgentCompletionHandler OnDone); - /** Upload binary files to the remote agent. - * @param BundleDir Directory containing .blob files. - * @param BundleLocator Locator string identifying the bundle (from CreateBundle). */ - bool UploadBinaries(const std::filesystem::path& BundleDir, const std::string& BundleLocator); + /** Cancel in-flight operations. The completion handler is still called (with Success=false). */ + void Cancel(); - /** Execute a command on the remote machine. */ - void Execute(const char* Exe, - const char* const* Args, - size_t NumArgs, - const char* WorkingDir = nullptr, - const char* const* EnvVars = nullptr, - size_t NumEnvVars = 0, - bool UseWine = false); + const MachineInfo& GetMachineInfo() const { return m_Config.Machine; } - /** Poll for output and results. Returns true if the agent is still running. - * When LogOutput is true, remote stdout is logged via ZEN_INFO. */ - bool Poll(bool LogOutput = true); - - void CloseConnection(); - bool IsValid() const; - - const MachineInfo& GetMachineInfo() const { return m_MachineInfo; } + enum class State + { + Idle, + Connecting, + WaitAgentAttach, + SentFork, + WaitChildAttach, + Uploading, + Executing, + Polling, + Done + }; private: LoggerRef Log() { return m_Log; } - std::unique_ptr<ComputeSocket> m_Socket; - std::unique_ptr<AgentMessageChannel> m_AgentChannel; ///< Channel 0: agent control - std::unique_ptr<AgentMessageChannel> m_ChildChannel; ///< Channel 100: child I/O - - LoggerRef m_Log; - bool m_IsValid = false; - bool m_HasErrors = false; - MachineInfo m_MachineInfo; + void DoConnect(); + void OnConnected(const std::error_code& Ec); + void DoWaitAgentAttach(); + void OnAgentResponse(AgentMessageType Type, const uint8_t* Data, size_t Size); + void DoSendFork(); + void DoWaitChildAttach(); + void OnChildAttachResponse(AgentMessageType Type, const uint8_t* Data, size_t Size); + void DoUploadNext(); + void OnUploadResponse(AgentMessageType Type, const uint8_t* Data, size_t Size); + void DoExecute(); + void DoPoll(); + void OnPollResponse(AgentMessageType Type, const uint8_t* Data, size_t Size); + void Finish(bool Success, int32_t ExitCode = -1); + + asio::io_context& m_IoContext; + LoggerRef m_Log; + State m_State = State::Idle; + bool m_Cancelled = false; + + AsyncAgentConfig m_Config; + AsyncAgentCompletionHandler m_OnDone; + size_t m_CurrentBundleIndex = 0; + + std::unique_ptr<AsyncTcpComputeTransport> m_TcpTransport; + std::unique_ptr<AsyncComputeTransport> m_Transport; + std::shared_ptr<AsyncComputeSocket> m_Socket; + std::unique_ptr<AsyncAgentMessageChannel> m_AgentChannel; + std::unique_ptr<AsyncAgentMessageChannel> m_ChildChannel; }; } // namespace zen::horde |