aboutsummaryrefslogtreecommitdiff
path: root/src/zenhorde/hordetransport.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-04 14:13:46 +0100
committerGitHub Enterprise <[email protected]>2026-03-04 14:13:46 +0100
commit0763d09a81e5a1d3df11763a7ec75e7860c9510a (patch)
tree074575ba6ea259044a179eab0bb396d37268fb09 /src/zenhorde/hordetransport.cpp
parentnative xmake toolchain definition for UE-clang (#805) (diff)
downloadzen-0763d09a81e5a1d3df11763a7ec75e7860c9510a.tar.xz
zen-0763d09a81e5a1d3df11763a7ec75e7860c9510a.zip
compute orchestration (#763)
- Added local process runners for Linux/Wine, Mac with some sandboxing support - Horde & Nomad provisioning for development and testing - Client session queues with lifecycle management (active/draining/cancelled), automatic retry with configurable limits, and manual reschedule API - Improved web UI for orchestrator, compute, and hub dashboards with WebSocket push updates - Some security hardening - Improved scalability and `zen exec` command Still experimental - compute support is disabled by default
Diffstat (limited to 'src/zenhorde/hordetransport.cpp')
-rw-r--r--src/zenhorde/hordetransport.cpp169
1 files changed, 169 insertions, 0 deletions
diff --git a/src/zenhorde/hordetransport.cpp b/src/zenhorde/hordetransport.cpp
new file mode 100644
index 000000000..69766e73e
--- /dev/null
+++ b/src/zenhorde/hordetransport.cpp
@@ -0,0 +1,169 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "hordetransport.h"
+
+#include <zencore/logging.h>
+#include <zencore/trace.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <asio.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#if ZEN_PLATFORM_WINDOWS
+# undef SendMessage
+#endif
+
+namespace zen::horde {
+
+// ComputeTransport base
+
+bool
+ComputeTransport::SendMessage(const void* Data, size_t Size)
+{
+ const uint8_t* Ptr = static_cast<const uint8_t*>(Data);
+ size_t Remaining = Size;
+
+ while (Remaining > 0)
+ {
+ const size_t Sent = Send(Ptr, Remaining);
+ if (Sent == 0)
+ {
+ return false;
+ }
+ Ptr += Sent;
+ Remaining -= Sent;
+ }
+
+ return true;
+}
+
+bool
+ComputeTransport::RecvMessage(void* Data, size_t Size)
+{
+ uint8_t* Ptr = static_cast<uint8_t*>(Data);
+ size_t Remaining = Size;
+
+ while (Remaining > 0)
+ {
+ const size_t Received = Recv(Ptr, Remaining);
+ if (Received == 0)
+ {
+ return false;
+ }
+ Ptr += Received;
+ Remaining -= Received;
+ }
+
+ return true;
+}
+
+// TcpComputeTransport - ASIO pimpl
+
+struct TcpComputeTransport::Impl
+{
+ asio::io_context IoContext;
+ asio::ip::tcp::socket Socket;
+
+ Impl() : Socket(IoContext) {}
+};
+
+// Uses ASIO in synchronous mode only — no async operations or io_context::run().
+// The io_context is only needed because ASIO sockets require one to be constructed.
+TcpComputeTransport::TcpComputeTransport(const MachineInfo& Info)
+: m_Impl(std::make_unique<Impl>())
+, m_Log(zen::logging::Get("horde.transport"))
+{
+ ZEN_TRACE_CPU("TcpComputeTransport::Connect");
+
+ asio::error_code Ec;
+
+ const asio::ip::address Address = asio::ip::make_address(Info.GetConnectionAddress(), Ec);
+ if (Ec)
+ {
+ ZEN_WARN("invalid address '{}': {}", Info.GetConnectionAddress(), Ec.message());
+ m_HasErrors = true;
+ return;
+ }
+
+ const asio::ip::tcp::endpoint Endpoint(Address, Info.GetConnectionPort());
+
+ m_Impl->Socket.connect(Endpoint, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("failed to connect to Horde compute [{}:{}]: {}", Info.GetConnectionAddress(), Info.GetConnectionPort(), Ec.message());
+ m_HasErrors = true;
+ return;
+ }
+
+ // Disable Nagle's algorithm for lower latency
+ m_Impl->Socket.set_option(asio::ip::tcp::no_delay(true), Ec);
+}
+
+TcpComputeTransport::~TcpComputeTransport()
+{
+ Close();
+}
+
+bool
+TcpComputeTransport::IsValid() const
+{
+ return m_Impl && m_Impl->Socket.is_open() && !m_HasErrors && !m_IsClosed;
+}
+
+size_t
+TcpComputeTransport::Send(const void* Data, size_t Size)
+{
+ if (!IsValid())
+ {
+ return 0;
+ }
+
+ asio::error_code Ec;
+ const size_t Sent = m_Impl->Socket.send(asio::buffer(Data, Size), 0, Ec);
+
+ if (Ec)
+ {
+ m_HasErrors = true;
+ return 0;
+ }
+
+ return Sent;
+}
+
+size_t
+TcpComputeTransport::Recv(void* Data, size_t Size)
+{
+ if (!IsValid())
+ {
+ return 0;
+ }
+
+ asio::error_code Ec;
+ const size_t Received = m_Impl->Socket.receive(asio::buffer(Data, Size), 0, Ec);
+
+ if (Ec)
+ {
+ return 0;
+ }
+
+ return Received;
+}
+
+void
+TcpComputeTransport::MarkComplete()
+{
+}
+
+void
+TcpComputeTransport::Close()
+{
+ if (!m_IsClosed && m_Impl && m_Impl->Socket.is_open())
+ {
+ asio::error_code Ec;
+ m_Impl->Socket.shutdown(asio::ip::tcp::socket::shutdown_both, Ec);
+ m_Impl->Socket.close(Ec);
+ }
+ m_IsClosed = true;
+}
+
+} // namespace zen::horde