// Copyright Epic Games, Inc. All Rights Reserved. #include "hordetransport.h" #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END namespace zen::horde { // --- AsyncTcpComputeTransport --- struct AsyncTcpComputeTransport::Impl { asio::io_context& IoContext; asio::ip::tcp::socket Socket; explicit Impl(asio::io_context& Ctx) : IoContext(Ctx), Socket(Ctx) {} }; AsyncTcpComputeTransport::AsyncTcpComputeTransport(asio::io_context& IoContext) : m_Impl(std::make_unique(IoContext)) , m_Log(zen::logging::Get("horde.transport.async")) { } AsyncTcpComputeTransport::~AsyncTcpComputeTransport() { Close(); } void AsyncTcpComputeTransport::AsyncConnect(const MachineInfo& Info, AsyncConnectHandler Handler) { ZEN_TRACE_CPU("AsyncTcpComputeTransport::AsyncConnect"); asio::error_code Ec; const asio::ip::address Address = asio::ip::make_address(Info.GetConnectionAddress(), Ec); if (Ec) { ZEN_WARN("invalid address '{}': {}", Info.GetConnectionAddress(), Ec.message()); m_HasErrors = true; asio::post(m_Impl->IoContext, [Handler = std::move(Handler), Ec] { Handler(Ec); }); return; } const asio::ip::tcp::endpoint Endpoint(Address, Info.GetConnectionPort()); // Copy the nonce so it survives past this scope into the async callback auto NonceBuf = std::make_shared>(Info.Nonce, Info.Nonce + NonceSize); m_Impl->Socket.async_connect(Endpoint, [this, Handler = std::move(Handler), NonceBuf](const asio::error_code& Ec) mutable { if (Ec) { ZEN_WARN("async connect failed: {}", Ec.message()); m_HasErrors = true; Handler(Ec); return; } asio::error_code SetOptEc; m_Impl->Socket.set_option(asio::ip::tcp::no_delay(true), SetOptEc); // Send the 64-byte nonce as the first thing on the wire asio::async_write(m_Impl->Socket, asio::buffer(*NonceBuf), [this, Handler = std::move(Handler), NonceBuf](const asio::error_code& Ec, size_t /*BytesWritten*/) { if (Ec) { ZEN_WARN("nonce write failed: {}", Ec.message()); m_HasErrors = true; } Handler(Ec); }); }); } bool AsyncTcpComputeTransport::IsValid() const { return m_Impl && m_Impl->Socket.is_open() && !m_HasErrors && !m_IsClosed; } void AsyncTcpComputeTransport::AsyncWrite(const void* Data, size_t Size, AsyncIoHandler Handler) { if (!IsValid()) { asio::post(m_Impl->IoContext, [Handler = std::move(Handler)] { Handler(asio::error::make_error_code(asio::error::not_connected), 0); }); return; } asio::async_write(m_Impl->Socket, asio::buffer(Data, Size), std::move(Handler)); } void AsyncTcpComputeTransport::AsyncRead(void* Data, size_t Size, AsyncIoHandler Handler) { if (!IsValid()) { asio::post(m_Impl->IoContext, [Handler = std::move(Handler)] { Handler(asio::error::make_error_code(asio::error::not_connected), 0); }); return; } asio::async_read(m_Impl->Socket, asio::buffer(Data, Size), std::move(Handler)); } void AsyncTcpComputeTransport::Close() { if (!m_IsClosed && m_Impl && m_Impl->Socket.is_open()) { asio::error_code Ec; m_Impl->Socket.shutdown(asio::ip::tcp::socket::shutdown_both, Ec); m_Impl->Socket.close(Ec); } m_IsClosed = true; } } // namespace zen::horde