// Copyright Epic Games, Inc. All Rights Reserved. #include "tracerecorder.h" #include #include #include #include #include #include #include #include #include #include #include namespace zen { //////////////////////////////////////////////////////////////////////////////// struct TraceSession : public std::enable_shared_from_this { TraceSession(asio::ip::tcp::socket&& Socket, const std::filesystem::path& OutputDir) : m_Socket(std::move(Socket)) , m_OutputDir(OutputDir) , m_SessionId(Oid::NewOid()) { try { m_RemoteAddress = m_Socket.remote_endpoint().address().to_string(); } catch (...) { m_RemoteAddress = "unknown"; } ZEN_INFO("Trace session {} started from {}", m_SessionId, m_RemoteAddress); } ~TraceSession() { if (m_TraceFile.IsOpen()) { m_TraceFile.Close(); } ZEN_INFO("Trace session {} ended, {} bytes recorded to '{}'", m_SessionId, m_TotalBytesRecorded, m_TraceFilePath); } void Start() { ReadPreambleHeader(); } bool IsActive() const { return m_Socket.is_open(); } TraceSessionInfo GetInfo() const { TraceSessionInfo Info; Info.SessionGuid = m_SessionGuid; Info.TraceGuid = m_TraceGuid; Info.ControlPort = m_ControlPort; Info.TransportVersion = m_TransportVersion; Info.ProtocolVersion = m_ProtocolVersion; Info.RemoteAddress = m_RemoteAddress; Info.BytesRecorded = m_TotalBytesRecorded; Info.TraceFilePath = m_TraceFilePath; return Info; } private: // Preamble format: // [magic: 4 bytes][metadata_size: 2 bytes][metadata fields: variable][version: 2 bytes] // // Magic bytes: [0]=version_char ('2'-'9'), [1]='C', [2]='R', [3]='T' // // Metadata fields (repeated): // [size: 1 byte][id: 1 byte][data: bytes] // Field 0: ControlPort (uint16) // Field 1: SessionGuid (16 bytes) // Field 2: TraceGuid (16 bytes) // // Version: [transport: 1 byte][protocol: 1 byte] static constexpr size_t kMagicSize = 4; static constexpr size_t kMetadataSizeFieldSize = 2; static constexpr size_t kPreambleHeaderSize = kMagicSize + kMetadataSizeFieldSize; static constexpr size_t kVersionSize = 2; static constexpr size_t kPreambleBufferSize = 256; static constexpr size_t kReadBufferSize = 64 * 1024; void ReadPreambleHeader() { auto Self = shared_from_this(); // Read the first 6 bytes: 4 magic + 2 metadata size asio::async_read(m_Socket, asio::buffer(m_PreambleBuffer, kPreambleHeaderSize), [this, Self](const asio::error_code& Ec, std::size_t /*BytesRead*/) { if (Ec) { HandleReadError("preamble header", Ec); return; } if (!ValidateMagic()) { ZEN_WARN("Trace session {}: invalid trace magic header", m_SessionId); CloseSocket(); return; } ReadPreambleMetadata(); }); } bool ValidateMagic() { const uint8_t* Cursor = m_PreambleBuffer; // Validate magic: bytes are version, 'C', 'R', 'T' if (Cursor[3] != 'T' || Cursor[2] != 'R' || Cursor[1] != 'C') { return false; } if (Cursor[0] < '2' || Cursor[0] > '9') { return false; } // Extract the metadata fields size (does not include the trailing version bytes) std::memcpy(&m_MetadataFieldsSize, Cursor + kMagicSize, sizeof(m_MetadataFieldsSize)); if (m_MetadataFieldsSize + kVersionSize > kPreambleBufferSize - kPreambleHeaderSize) { return false; } return true; } void ReadPreambleMetadata() { auto Self = shared_from_this(); size_t ReadSize = m_MetadataFieldsSize + kVersionSize; // Read metadata fields + 2 version bytes asio::async_read(m_Socket, asio::buffer(m_PreambleBuffer + kPreambleHeaderSize, ReadSize), [this, Self](const asio::error_code& Ec, std::size_t /*BytesRead*/) { if (Ec) { HandleReadError("preamble metadata", Ec); return; } if (!ParseMetadata()) { ZEN_WARN("Trace session {}: malformed trace metadata", m_SessionId); CloseSocket(); return; } if (!CreateTraceFile()) { CloseSocket(); return; } // Write the full preamble to the trace file so it remains a valid .utrace size_t PreambleSize = kPreambleHeaderSize + m_MetadataFieldsSize + kVersionSize; std::error_code WriteEc; m_TraceFile.Write(m_PreambleBuffer, PreambleSize, 0, WriteEc); if (WriteEc) { ZEN_ERROR("Trace session {}: failed to write preamble: {}", m_SessionId, WriteEc.message()); CloseSocket(); return; } m_TotalBytesRecorded = PreambleSize; ZEN_INFO("Trace session {}: metadata - TransportV{} ProtocolV{} ControlPort:{} SessionGuid:{} TraceGuid:{}", m_SessionId, m_TransportVersion, m_ProtocolVersion, m_ControlPort, m_SessionGuid, m_TraceGuid); // Begin streaming trace data to disk ReadMore(); }); } bool ParseMetadata() { const uint8_t* Cursor = m_PreambleBuffer + kPreambleHeaderSize; int32_t Remaining = static_cast(m_MetadataFieldsSize); while (Remaining >= 2) { uint8_t FieldSize = Cursor[0]; uint8_t FieldId = Cursor[1]; Cursor += 2; Remaining -= 2; if (Remaining < FieldSize) { return false; } switch (FieldId) { case 0: // ControlPort if (FieldSize >= sizeof(uint16_t)) { std::memcpy(&m_ControlPort, Cursor, sizeof(uint16_t)); } break; case 1: // SessionGuid if (FieldSize >= sizeof(Guid)) { std::memcpy(&m_SessionGuid, Cursor, sizeof(Guid)); } break; case 2: // TraceGuid if (FieldSize >= sizeof(Guid)) { std::memcpy(&m_TraceGuid, Cursor, sizeof(Guid)); } break; } Cursor += FieldSize; Remaining -= FieldSize; } // Metadata should be fully consumed if (Remaining != 0) { return false; } // Version bytes follow immediately after the metadata fields const uint8_t* VersionPtr = m_PreambleBuffer + kPreambleHeaderSize + m_MetadataFieldsSize; m_TransportVersion = VersionPtr[0]; m_ProtocolVersion = VersionPtr[1]; return true; } bool CreateTraceFile() { m_TraceFilePath = m_OutputDir / fmt::format("{}.utrace", m_SessionId); try { m_TraceFile.Open(m_TraceFilePath, BasicFile::Mode::kTruncate); ZEN_INFO("Trace session {} writing to '{}'", m_SessionId, m_TraceFilePath); return true; } catch (const std::exception& Ex) { ZEN_ERROR("Trace session {}: failed to create trace file '{}': {}", m_SessionId, m_TraceFilePath, Ex.what()); return false; } } void ReadMore() { auto Self = shared_from_this(); m_Socket.async_read_some(asio::buffer(m_ReadBuffer, kReadBufferSize), [this, Self](const asio::error_code& Ec, std::size_t BytesRead) { if (!Ec) { if (BytesRead > 0 && m_TraceFile.IsOpen()) { std::error_code WriteEc; const uint64_t FileOffset = m_TotalBytesRecorded; m_TraceFile.Write(m_ReadBuffer, BytesRead, FileOffset, WriteEc); if (WriteEc) { ZEN_ERROR("Trace session {}: write error: {}", m_SessionId, WriteEc.message()); CloseSocket(); return; } m_TotalBytesRecorded += BytesRead; } ReadMore(); } else if (Ec == asio::error::eof) { ZEN_DEBUG("Trace session {} connection closed by peer", m_SessionId); CloseSocket(); } else if (Ec == asio::error::operation_aborted) { ZEN_DEBUG("Trace session {} operation aborted", m_SessionId); } else { ZEN_WARN("Trace session {} read error: {}", m_SessionId, Ec.message()); CloseSocket(); } }); } void HandleReadError(const char* Phase, const asio::error_code& Ec) { if (Ec == asio::error::eof) { ZEN_DEBUG("Trace session {}: connection closed during {}", m_SessionId, Phase); } else if (Ec == asio::error::operation_aborted) { ZEN_DEBUG("Trace session {}: operation aborted during {}", m_SessionId, Phase); } else { ZEN_WARN("Trace session {}: error during {}: {}", m_SessionId, Phase, Ec.message()); } CloseSocket(); } void CloseSocket() { std::error_code Ec; m_Socket.close(Ec); if (m_TraceFile.IsOpen()) { m_TraceFile.Close(); } } asio::ip::tcp::socket m_Socket; std::filesystem::path m_OutputDir; std::filesystem::path m_TraceFilePath; BasicFile m_TraceFile; Oid m_SessionId; std::string m_RemoteAddress; // Preamble parsing uint8_t m_PreambleBuffer[kPreambleBufferSize] = {}; uint16_t m_MetadataFieldsSize = 0; // Extracted metadata Guid m_SessionGuid{}; Guid m_TraceGuid{}; uint16_t m_ControlPort = 0; uint8_t m_TransportVersion = 0; uint8_t m_ProtocolVersion = 0; // Streaming uint8_t m_ReadBuffer[kReadBufferSize]; uint64_t m_TotalBytesRecorded = 0; }; //////////////////////////////////////////////////////////////////////////////// struct TraceRecorder::Impl { Impl() : m_IoContext(), m_Acceptor(m_IoContext) {} ~Impl() { Shutdown(); } void Initialize(uint16_t InPort, const std::filesystem::path& OutputDir) { std::lock_guard Lock(m_Mutex); if (m_IsRunning) { ZEN_WARN("TraceRecorder already initialized"); return; } m_OutputDir = OutputDir; try { // Create output directory if it doesn't exist CreateDirectories(m_OutputDir); // Configure acceptor m_Acceptor.open(asio::ip::tcp::v4()); m_Acceptor.set_option(asio::socket_base::reuse_address(true)); m_Acceptor.bind(asio::ip::tcp::endpoint(asio::ip::tcp::v4(), InPort)); m_Acceptor.listen(); m_Port = m_Acceptor.local_endpoint().port(); ZEN_INFO("TraceRecorder listening on port {}, output directory: '{}'", m_Port, m_OutputDir); m_IsRunning = true; // Start accepting connections StartAccept(); // Start IO thread m_IoThread = std::thread([this]() { try { m_IoContext.run(); } catch (const std::exception& Ex) { ZEN_ERROR("TraceRecorder IO thread exception: {}", Ex.what()); } }); } catch (const std::exception& Ex) { ZEN_ERROR("Failed to initialize TraceRecorder: {}", Ex.what()); m_IsRunning = false; throw; } } void Shutdown() { std::lock_guard Lock(m_Mutex); if (!m_IsRunning) { return; } ZEN_INFO("TraceRecorder shutting down"); m_IsRunning = false; std::error_code Ec; m_Acceptor.close(Ec); m_IoContext.stop(); if (m_IoThread.joinable()) { m_IoThread.join(); } { std::lock_guard SessionLock(m_SessionsMutex); m_Sessions.clear(); } ZEN_INFO("TraceRecorder shutdown complete"); } bool IsRunning() const { return m_IsRunning; } uint16_t GetPort() const { return m_Port; } std::vector GetActiveSessions() const { std::lock_guard Lock(m_SessionsMutex); std::vector Result; for (const auto& WeakSession : m_Sessions) { if (auto Session = WeakSession.lock()) { if (Session->IsActive()) { Result.push_back(Session->GetInfo()); } } } return Result; } private: void StartAccept() { auto Socket = std::make_shared(m_IoContext); m_Acceptor.async_accept(*Socket, [this, Socket](const asio::error_code& Ec) { if (!Ec) { auto Session = std::make_shared(std::move(*Socket), m_OutputDir); { std::lock_guard Lock(m_SessionsMutex); // Prune expired sessions while adding the new one std::erase_if(m_Sessions, [](const std::weak_ptr& Wp) { return Wp.expired(); }); m_Sessions.push_back(Session); } Session->Start(); } else if (Ec != asio::error::operation_aborted) { ZEN_WARN("Accept error: {}", Ec.message()); } // Continue accepting if still running if (m_IsRunning) { StartAccept(); } }); } asio::io_context m_IoContext; asio::ip::tcp::acceptor m_Acceptor; std::thread m_IoThread; std::filesystem::path m_OutputDir; std::mutex m_Mutex; std::atomic m_IsRunning{false}; uint16_t m_Port = 0; mutable std::mutex m_SessionsMutex; std::vector> m_Sessions; }; //////////////////////////////////////////////////////////////////////////////// TraceRecorder::TraceRecorder() : m_Impl(std::make_unique()) { } TraceRecorder::~TraceRecorder() { Shutdown(); } void TraceRecorder::Initialize(uint16_t InPort, const std::filesystem::path& OutputDir) { m_Impl->Initialize(InPort, OutputDir); } void TraceRecorder::Shutdown() { m_Impl->Shutdown(); } bool TraceRecorder::IsRunning() const { return m_Impl->IsRunning(); } uint16_t TraceRecorder::GetPort() const { return m_Impl->GetPort(); } std::vector TraceRecorder::GetActiveSessions() const { return m_Impl->GetActiveSessions(); } } // namespace zen