// Copyright Epic Games, Inc. All Rights Reserved. #include "hordetransport.h" #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_PLATFORM_WINDOWS # undef SendMessage #endif namespace zen::horde { // ComputeTransport base bool ComputeTransport::SendMessage(const void* Data, size_t Size) { const uint8_t* Ptr = static_cast(Data); size_t Remaining = Size; while (Remaining > 0) { const size_t Sent = Send(Ptr, Remaining); if (Sent == 0) { return false; } Ptr += Sent; Remaining -= Sent; } return true; } bool ComputeTransport::RecvMessage(void* Data, size_t Size) { uint8_t* Ptr = static_cast(Data); size_t Remaining = Size; while (Remaining > 0) { const size_t Received = Recv(Ptr, Remaining); if (Received == 0) { return false; } Ptr += Received; Remaining -= Received; } return true; } // TcpComputeTransport - ASIO pimpl struct TcpComputeTransport::Impl { asio::io_context IoContext; asio::ip::tcp::socket Socket; Impl() : Socket(IoContext) {} }; // Uses ASIO in synchronous mode only — no async operations or io_context::run(). // The io_context is only needed because ASIO sockets require one to be constructed. TcpComputeTransport::TcpComputeTransport(const MachineInfo& Info) : m_Impl(std::make_unique()) , m_Log(zen::logging::Get("horde.transport")) { ZEN_TRACE_CPU("TcpComputeTransport::Connect"); asio::error_code Ec; const asio::ip::address Address = asio::ip::make_address(Info.GetConnectionAddress(), Ec); if (Ec) { ZEN_WARN("invalid address '{}': {}", Info.GetConnectionAddress(), Ec.message()); m_HasErrors = true; return; } const asio::ip::tcp::endpoint Endpoint(Address, Info.GetConnectionPort()); m_Impl->Socket.connect(Endpoint, Ec); if (Ec) { ZEN_WARN("failed to connect to Horde compute [{}:{}]: {}", Info.GetConnectionAddress(), Info.GetConnectionPort(), Ec.message()); m_HasErrors = true; return; } // Disable Nagle's algorithm for lower latency m_Impl->Socket.set_option(asio::ip::tcp::no_delay(true), Ec); } TcpComputeTransport::~TcpComputeTransport() { Close(); } bool TcpComputeTransport::IsValid() const { return m_Impl && m_Impl->Socket.is_open() && !m_HasErrors && !m_IsClosed; } size_t TcpComputeTransport::Send(const void* Data, size_t Size) { if (!IsValid()) { return 0; } asio::error_code Ec; const size_t Sent = m_Impl->Socket.send(asio::buffer(Data, Size), 0, Ec); if (Ec) { m_HasErrors = true; return 0; } return Sent; } size_t TcpComputeTransport::Recv(void* Data, size_t Size) { if (!IsValid()) { return 0; } asio::error_code Ec; const size_t Received = m_Impl->Socket.receive(asio::buffer(Data, Size), 0, Ec); if (Ec) { return 0; } return Received; } void TcpComputeTransport::MarkComplete() { } void TcpComputeTransport::Close() { if (!m_IsClosed && m_Impl && m_Impl->Socket.is_open()) { asio::error_code Ec; m_Impl->Socket.shutdown(asio::ip::tcp::socket::shutdown_both, Ec); m_Impl->Socket.close(Ec); } m_IsClosed = true; } } // namespace zen::horde