// Copyright Epic Games, Inc. All Rights Reserved. #include "hordeagent.h" #include "hordetransportaes.h" #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END #include namespace zen::horde { // --- AsyncHordeAgent --- static const char* GetStateName(AsyncHordeAgent::State S) { switch (S) { case AsyncHordeAgent::State::Idle: return "idle"; case AsyncHordeAgent::State::Connecting: return "connect"; case AsyncHordeAgent::State::WaitAgentAttach: return "agent-attach"; case AsyncHordeAgent::State::SentFork: return "fork"; case AsyncHordeAgent::State::WaitChildAttach: return "child-attach"; case AsyncHordeAgent::State::Uploading: return "upload"; case AsyncHordeAgent::State::Executing: return "execute"; case AsyncHordeAgent::State::Polling: return "poll"; case AsyncHordeAgent::State::Done: return "done"; default: return "unknown"; } } AsyncHordeAgent::AsyncHordeAgent(asio::io_context& IoContext) : m_IoContext(IoContext), m_Log(zen::logging::Get("horde.agent.async")) { } AsyncHordeAgent::~AsyncHordeAgent() { Cancel(); } void AsyncHordeAgent::Start(AsyncAgentConfig Config, AsyncAgentCompletionHandler OnDone) { m_Config = std::move(Config); m_OnDone = std::move(OnDone); m_State = State::Connecting; DoConnect(); } void AsyncHordeAgent::Cancel() { m_Cancelled = true; if (m_Socket) { m_Socket->Close(); } else if (m_Transport) { m_Transport->Close(); } } void AsyncHordeAgent::DoConnect() { ZEN_TRACE_CPU("AsyncHordeAgent::DoConnect"); m_TcpTransport = std::make_unique(m_IoContext); auto Self = shared_from_this(); m_TcpTransport->AsyncConnect(m_Config.Machine, [this, Self](const std::error_code& Ec) { OnConnected(Ec); }); } void AsyncHordeAgent::OnConnected(const std::error_code& Ec) { if (Ec || m_Cancelled) { if (Ec) { ZEN_WARN("connect failed: {}", Ec.message()); } Finish(false); return; } // Optionally wrap with AES encryption std::unique_ptr FinalTransport = std::move(m_TcpTransport); if (m_Config.Machine.EncryptionMode == Encryption::AES) { FinalTransport = std::make_unique(m_Config.Machine.Key, std::move(FinalTransport), m_IoContext); } m_Transport = std::move(FinalTransport); // Create the multiplexed socket and register channels m_Socket = std::make_shared(std::move(m_Transport), m_IoContext); m_AgentChannel = std::make_unique(m_Socket, 0, m_IoContext); m_ChildChannel = std::make_unique(m_Socket, 100, m_IoContext); m_Socket->RegisterChannel( 0, [this](std::vector Data) { m_AgentChannel->OnFrame(std::move(Data)); }, [this]() { m_AgentChannel->OnDetach(); }); m_Socket->RegisterChannel( 100, [this](std::vector Data) { m_ChildChannel->OnFrame(std::move(Data)); }, [this]() { m_ChildChannel->OnDetach(); }); m_Socket->StartRecvPump(); m_State = State::WaitAgentAttach; DoWaitAgentAttach(); } void AsyncHordeAgent::DoWaitAgentAttach() { auto Self = shared_from_this(); m_AgentChannel->AsyncReadResponse(5000, [this, Self](AgentMessageType Type, const uint8_t* Data, size_t Size) { OnAgentResponse(Type, Data, Size); }); } void AsyncHordeAgent::OnAgentResponse(AgentMessageType Type, const uint8_t* /*Data*/, size_t /*Size*/) { if (m_Cancelled) { Finish(false); return; } if (Type == AgentMessageType::None) { ZEN_WARN("timed out waiting for Attach on agent channel"); Finish(false); return; } if (Type != AgentMessageType::Attach) { ZEN_WARN("expected Attach on agent channel, got 0x{:02x}", static_cast(Type)); Finish(false); return; } m_State = State::SentFork; DoSendFork(); } void AsyncHordeAgent::DoSendFork() { m_AgentChannel->Fork(100, 4 * 1024 * 1024); m_State = State::WaitChildAttach; DoWaitChildAttach(); } void AsyncHordeAgent::DoWaitChildAttach() { auto Self = shared_from_this(); m_ChildChannel->AsyncReadResponse(5000, [this, Self](AgentMessageType Type, const uint8_t* Data, size_t Size) { OnChildAttachResponse(Type, Data, Size); }); } void AsyncHordeAgent::OnChildAttachResponse(AgentMessageType Type, const uint8_t* /*Data*/, size_t /*Size*/) { if (m_Cancelled) { Finish(false); return; } if (Type == AgentMessageType::None) { ZEN_WARN("timed out waiting for Attach on child channel"); Finish(false); return; } if (Type != AgentMessageType::Attach) { ZEN_WARN("expected Attach on child channel, got 0x{:02x}", static_cast(Type)); Finish(false); return; } m_State = State::Uploading; m_CurrentBundleIndex = 0; DoUploadNext(); } void AsyncHordeAgent::DoUploadNext() { if (m_Cancelled) { Finish(false); return; } if (m_CurrentBundleIndex >= m_Config.Bundles.size()) { // All bundles uploaded - proceed to execute m_State = State::Executing; DoExecute(); return; } const auto& [Locator, BundleDir] = m_Config.Bundles[m_CurrentBundleIndex]; m_ChildChannel->UploadFiles("", Locator.c_str()); // Enter the ReadBlob/Blob upload loop auto Self = shared_from_this(); m_ChildChannel->AsyncReadResponse(1000, [this, Self](AgentMessageType Type, const uint8_t* Data, size_t Size) { OnUploadResponse(Type, Data, Size); }); } void AsyncHordeAgent::OnUploadResponse(AgentMessageType Type, const uint8_t* Data, size_t Size) { if (m_Cancelled) { Finish(false); return; } if (Type == AgentMessageType::None) { if (m_ChildChannel->IsDetached()) { ZEN_WARN("connection lost during upload"); Finish(false); return; } // Timeout - retry read auto Self = shared_from_this(); m_ChildChannel->AsyncReadResponse(1000, [this, Self](AgentMessageType Type, const uint8_t* Data, size_t Size) { OnUploadResponse(Type, Data, Size); }); return; } if (Type == AgentMessageType::WriteFilesResponse) { // This bundle upload is done - move to next ++m_CurrentBundleIndex; DoUploadNext(); return; } if (Type == AgentMessageType::Exception) { ExceptionInfo Ex; AsyncAgentMessageChannel::ReadException(Data, Size, Ex); ZEN_ERROR("upload exception: {} - {}", Ex.Message, Ex.Description); Finish(false); return; } if (Type != AgentMessageType::ReadBlob) { ZEN_ERROR("unexpected message type 0x{:02x} during upload", static_cast(Type)); Finish(false); return; } // Handle ReadBlob request BlobRequest Req; AsyncAgentMessageChannel::ReadBlobRequest(Data, Size, Req); const auto& [Locator, BundleDir] = m_Config.Bundles[m_CurrentBundleIndex]; const std::filesystem::path BlobPath = BundleDir / (std::string(Req.Locator) + ".blob"); std::error_code FsEc; BasicFile File; File.Open(BlobPath, BasicFile::Mode::kRead, FsEc); if (FsEc) { ZEN_ERROR("cannot read blob file: '{}'", BlobPath); Finish(false); return; } const uint64_t TotalSize = File.FileSize(); const uint64_t Offset = static_cast(Req.Offset); if (Offset >= TotalSize) { ZEN_ERROR("blob request beyond end of file: offset={}, length={}, total_size={}", Offset, Req.Length, TotalSize); m_ChildChannel->Blob(nullptr, 0); } else { const IoBuffer FileData = File.ReadRange(Offset, Min(Req.Length, TotalSize - Offset)); m_ChildChannel->Blob(static_cast(FileData.GetData()), FileData.GetSize()); } // Continue the upload loop auto Self = shared_from_this(); m_ChildChannel->AsyncReadResponse(1000, [this, Self](AgentMessageType Type, const uint8_t* Data, size_t Size) { OnUploadResponse(Type, Data, Size); }); } void AsyncHordeAgent::DoExecute() { ZEN_TRACE_CPU("AsyncHordeAgent::DoExecute"); std::vector ArgPtrs; ArgPtrs.reserve(m_Config.Args.size()); for (const std::string& Arg : m_Config.Args) { ArgPtrs.push_back(Arg.c_str()); } m_ChildChannel->Execute(m_Config.Executable.c_str(), ArgPtrs.data(), ArgPtrs.size(), nullptr, nullptr, 0, m_Config.UseWine ? ExecuteProcessFlags::UseWine : ExecuteProcessFlags::None); ZEN_INFO("remote execution started on [{}:{}] lease={}", m_Config.Machine.GetConnectionAddress(), m_Config.Machine.GetConnectionPort(), m_Config.Machine.LeaseId); m_State = State::Polling; DoPoll(); } void AsyncHordeAgent::DoPoll() { if (m_Cancelled) { Finish(false); return; } auto Self = shared_from_this(); m_ChildChannel->AsyncReadResponse(100, [this, Self](AgentMessageType Type, const uint8_t* Data, size_t Size) { OnPollResponse(Type, Data, Size); }); } void AsyncHordeAgent::OnPollResponse(AgentMessageType Type, const uint8_t* Data, size_t Size) { if (m_Cancelled) { Finish(false); return; } switch (Type) { case AgentMessageType::None: if (m_ChildChannel->IsDetached()) { ZEN_WARN("connection lost during execution"); Finish(false); } else { // Timeout - poll again DoPoll(); } break; case AgentMessageType::ExecuteOutput: // Silently consume remote stdout (matching LogOutput=false in provisioner) DoPoll(); break; case AgentMessageType::ExecuteResult: { int32_t ExitCode = -1; if (Size == sizeof(int32_t)) { memcpy(&ExitCode, Data, sizeof(int32_t)); } ZEN_INFO("remote process exited with code {} (lease={})", ExitCode, m_Config.Machine.LeaseId); Finish(ExitCode == 0, ExitCode); } break; case AgentMessageType::Exception: { ExceptionInfo Ex; AsyncAgentMessageChannel::ReadException(Data, Size, Ex); ZEN_ERROR("exception: {} - {}", Ex.Message, Ex.Description); Finish(false); } break; default: DoPoll(); break; } } void AsyncHordeAgent::Finish(bool Success, int32_t ExitCode) { if (m_State == State::Done) { return; // Already finished } if (!Success) { ZEN_WARN("agent failed during {} (lease={})", GetStateName(m_State), m_Config.Machine.LeaseId); } m_State = State::Done; if (m_Socket) { m_Socket->Close(); } if (m_OnDone) { AsyncAgentResult Result; Result.Success = Success; Result.ExitCode = ExitCode; Result.CoreCount = m_Config.Machine.LogicalCores; auto Handler = std::move(m_OnDone); m_OnDone = nullptr; Handler(Result); } } } // namespace zen::horde