aboutsummaryrefslogtreecommitdiff
path: root/src/zenhorde/hordeagent.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenhorde/hordeagent.h')
-rw-r--r--src/zenhorde/hordeagent.h128
1 files changed, 84 insertions, 44 deletions
diff --git a/src/zenhorde/hordeagent.h b/src/zenhorde/hordeagent.h
index e0ae89ead..b581a8da1 100644
--- a/src/zenhorde/hordeagent.h
+++ b/src/zenhorde/hordeagent.h
@@ -10,68 +10,108 @@
#include <zencore/logbase.h>
#include <filesystem>
+#include <functional>
#include <memory>
#include <string>
+#include <vector>
+
+namespace asio {
+class io_context;
+}
namespace zen::horde {
-/** Manages the lifecycle of a single Horde compute agent.
+class AsyncComputeTransport;
+
+/** Result passed to the completion handler when an async agent finishes. */
+struct AsyncAgentResult
+{
+ bool Success = false;
+ int32_t ExitCode = -1;
+ uint16_t CoreCount = 0; ///< Logical cores on the provisioned machine
+};
+
+/** Completion handler for async agent lifecycle. */
+using AsyncAgentCompletionHandler = std::function<void(const AsyncAgentResult&)>;
+
+/** Configuration for launching a remote zenserver instance via an async agent. */
+struct AsyncAgentConfig
+{
+ MachineInfo Machine;
+ std::vector<std::pair<std::string, std::filesystem::path>> Bundles; ///< (locator, bundleDir) pairs
+ std::string Executable;
+ std::vector<std::string> Args;
+ bool UseWine = false;
+};
+
+/** Async agent that manages the full lifecycle of a single Horde compute connection.
*
- * Handles the full connection sequence for one provisioned machine:
- * 1. Connect via TCP transport (with optional AES encryption wrapping)
- * 2. Create a multiplexed ComputeSocket with agent (channel 0) and child (channel 100)
- * 3. Perform the Attach/Fork handshake to establish the child channel
- * 4. Upload zenserver binary via the WriteFiles/ReadBlob protocol
- * 5. Execute zenserver remotely via ExecuteV2
- * 6. Poll for ExecuteOutput (stdout) and ExecuteResult (exit code)
+ * Driven by a state machine using callbacks on a shared io_context — no dedicated
+ * threads. Call Start() to begin the connection/handshake/upload/execute/poll
+ * sequence. The completion handler is invoked when the remote process exits or
+ * an error occurs.
*/
-class HordeAgent
+class AsyncHordeAgent : public std::enable_shared_from_this<AsyncHordeAgent>
{
public:
- explicit HordeAgent(const MachineInfo& Info);
- ~HordeAgent();
+ AsyncHordeAgent(asio::io_context& IoContext);
+ ~AsyncHordeAgent();
- HordeAgent(const HordeAgent&) = delete;
- HordeAgent& operator=(const HordeAgent&) = delete;
+ AsyncHordeAgent(const AsyncHordeAgent&) = delete;
+ AsyncHordeAgent& operator=(const AsyncHordeAgent&) = delete;
- /** Perform the channel setup handshake (Attach on agent channel, Fork, Attach on child channel).
- * Returns false if the handshake times out or receives an unexpected message. */
- bool BeginCommunication();
+ /** Start the full agent lifecycle. The completion handler is called exactly once. */
+ void Start(AsyncAgentConfig Config, AsyncAgentCompletionHandler OnDone);
- /** Upload binary files to the remote agent.
- * @param BundleDir Directory containing .blob files.
- * @param BundleLocator Locator string identifying the bundle (from CreateBundle). */
- bool UploadBinaries(const std::filesystem::path& BundleDir, const std::string& BundleLocator);
+ /** Cancel in-flight operations. The completion handler is still called (with Success=false). */
+ void Cancel();
- /** Execute a command on the remote machine. */
- void Execute(const char* Exe,
- const char* const* Args,
- size_t NumArgs,
- const char* WorkingDir = nullptr,
- const char* const* EnvVars = nullptr,
- size_t NumEnvVars = 0,
- bool UseWine = false);
+ const MachineInfo& GetMachineInfo() const { return m_Config.Machine; }
- /** Poll for output and results. Returns true if the agent is still running.
- * When LogOutput is true, remote stdout is logged via ZEN_INFO. */
- bool Poll(bool LogOutput = true);
-
- void CloseConnection();
- bool IsValid() const;
-
- const MachineInfo& GetMachineInfo() const { return m_MachineInfo; }
+ enum class State
+ {
+ Idle,
+ Connecting,
+ WaitAgentAttach,
+ SentFork,
+ WaitChildAttach,
+ Uploading,
+ Executing,
+ Polling,
+ Done
+ };
private:
LoggerRef Log() { return m_Log; }
- std::unique_ptr<ComputeSocket> m_Socket;
- std::unique_ptr<AgentMessageChannel> m_AgentChannel; ///< Channel 0: agent control
- std::unique_ptr<AgentMessageChannel> m_ChildChannel; ///< Channel 100: child I/O
-
- LoggerRef m_Log;
- bool m_IsValid = false;
- bool m_HasErrors = false;
- MachineInfo m_MachineInfo;
+ void DoConnect();
+ void OnConnected(const std::error_code& Ec);
+ void DoWaitAgentAttach();
+ void OnAgentResponse(AgentMessageType Type, const uint8_t* Data, size_t Size);
+ void DoSendFork();
+ void DoWaitChildAttach();
+ void OnChildAttachResponse(AgentMessageType Type, const uint8_t* Data, size_t Size);
+ void DoUploadNext();
+ void OnUploadResponse(AgentMessageType Type, const uint8_t* Data, size_t Size);
+ void DoExecute();
+ void DoPoll();
+ void OnPollResponse(AgentMessageType Type, const uint8_t* Data, size_t Size);
+ void Finish(bool Success, int32_t ExitCode = -1);
+
+ asio::io_context& m_IoContext;
+ LoggerRef m_Log;
+ State m_State = State::Idle;
+ bool m_Cancelled = false;
+
+ AsyncAgentConfig m_Config;
+ AsyncAgentCompletionHandler m_OnDone;
+ size_t m_CurrentBundleIndex = 0;
+
+ std::unique_ptr<AsyncTcpComputeTransport> m_TcpTransport;
+ std::unique_ptr<AsyncComputeTransport> m_Transport;
+ std::shared_ptr<AsyncComputeSocket> m_Socket;
+ std::unique_ptr<AsyncAgentMessageChannel> m_AgentChannel;
+ std::unique_ptr<AsyncAgentMessageChannel> m_ChildChannel;
};
} // namespace zen::horde