aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/trace/tracerecorder.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/trace/tracerecorder.cpp')
-rw-r--r--src/zenserver/trace/tracerecorder.cpp565
1 files changed, 565 insertions, 0 deletions
diff --git a/src/zenserver/trace/tracerecorder.cpp b/src/zenserver/trace/tracerecorder.cpp
new file mode 100644
index 000000000..5dec20e18
--- /dev/null
+++ b/src/zenserver/trace/tracerecorder.cpp
@@ -0,0 +1,565 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "tracerecorder.h"
+
+#include <zencore/basicfile.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/uid.h>
+
+#include <asio.hpp>
+
+#include <atomic>
+#include <cstring>
+#include <memory>
+#include <mutex>
+#include <thread>
+
+namespace zen {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TraceSession : public std::enable_shared_from_this<TraceSession>
+{
+ TraceSession(asio::ip::tcp::socket&& Socket, const std::filesystem::path& OutputDir)
+ : m_Socket(std::move(Socket))
+ , m_OutputDir(OutputDir)
+ , m_SessionId(Oid::NewOid())
+ {
+ try
+ {
+ m_RemoteAddress = m_Socket.remote_endpoint().address().to_string();
+ }
+ catch (...)
+ {
+ m_RemoteAddress = "unknown";
+ }
+
+ ZEN_INFO("Trace session {} started from {}", m_SessionId, m_RemoteAddress);
+ }
+
+ ~TraceSession()
+ {
+ if (m_TraceFile.IsOpen())
+ {
+ m_TraceFile.Close();
+ }
+
+ ZEN_INFO("Trace session {} ended, {} bytes recorded to '{}'", m_SessionId, m_TotalBytesRecorded, m_TraceFilePath);
+ }
+
+ void Start() { ReadPreambleHeader(); }
+
+ bool IsActive() const { return m_Socket.is_open(); }
+
+ TraceSessionInfo GetInfo() const
+ {
+ TraceSessionInfo Info;
+ Info.SessionGuid = m_SessionGuid;
+ Info.TraceGuid = m_TraceGuid;
+ Info.ControlPort = m_ControlPort;
+ Info.TransportVersion = m_TransportVersion;
+ Info.ProtocolVersion = m_ProtocolVersion;
+ Info.RemoteAddress = m_RemoteAddress;
+ Info.BytesRecorded = m_TotalBytesRecorded;
+ Info.TraceFilePath = m_TraceFilePath;
+ return Info;
+ }
+
+private:
+ // Preamble format:
+ // [magic: 4 bytes][metadata_size: 2 bytes][metadata fields: variable][version: 2 bytes]
+ //
+ // Magic bytes: [0]=version_char ('2'-'9'), [1]='C', [2]='R', [3]='T'
+ //
+ // Metadata fields (repeated):
+ // [size: 1 byte][id: 1 byte][data: <size> bytes]
+ // Field 0: ControlPort (uint16)
+ // Field 1: SessionGuid (16 bytes)
+ // Field 2: TraceGuid (16 bytes)
+ //
+ // Version: [transport: 1 byte][protocol: 1 byte]
+
+ static constexpr size_t kMagicSize = 4;
+ static constexpr size_t kMetadataSizeFieldSize = 2;
+ static constexpr size_t kPreambleHeaderSize = kMagicSize + kMetadataSizeFieldSize;
+ static constexpr size_t kVersionSize = 2;
+ static constexpr size_t kPreambleBufferSize = 256;
+ static constexpr size_t kReadBufferSize = 64 * 1024;
+
+ void ReadPreambleHeader()
+ {
+ auto Self = shared_from_this();
+
+ // Read the first 6 bytes: 4 magic + 2 metadata size
+ asio::async_read(m_Socket,
+ asio::buffer(m_PreambleBuffer, kPreambleHeaderSize),
+ [this, Self](const asio::error_code& Ec, std::size_t /*BytesRead*/) {
+ if (Ec)
+ {
+ HandleReadError("preamble header", Ec);
+ return;
+ }
+
+ if (!ValidateMagic())
+ {
+ ZEN_WARN("Trace session {}: invalid trace magic header", m_SessionId);
+ CloseSocket();
+ return;
+ }
+
+ ReadPreambleMetadata();
+ });
+ }
+
+ bool ValidateMagic()
+ {
+ const uint8_t* Cursor = m_PreambleBuffer;
+
+ // Validate magic: bytes are version, 'C', 'R', 'T'
+ if (Cursor[3] != 'T' || Cursor[2] != 'R' || Cursor[1] != 'C')
+ {
+ return false;
+ }
+
+ if (Cursor[0] < '2' || Cursor[0] > '9')
+ {
+ return false;
+ }
+
+ // Extract the metadata fields size (does not include the trailing version bytes)
+ std::memcpy(&m_MetadataFieldsSize, Cursor + kMagicSize, sizeof(m_MetadataFieldsSize));
+
+ if (m_MetadataFieldsSize + kVersionSize > kPreambleBufferSize - kPreambleHeaderSize)
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ void ReadPreambleMetadata()
+ {
+ auto Self = shared_from_this();
+ size_t ReadSize = m_MetadataFieldsSize + kVersionSize;
+
+ // Read metadata fields + 2 version bytes
+ asio::async_read(m_Socket,
+ asio::buffer(m_PreambleBuffer + kPreambleHeaderSize, ReadSize),
+ [this, Self](const asio::error_code& Ec, std::size_t /*BytesRead*/) {
+ if (Ec)
+ {
+ HandleReadError("preamble metadata", Ec);
+ return;
+ }
+
+ if (!ParseMetadata())
+ {
+ ZEN_WARN("Trace session {}: malformed trace metadata", m_SessionId);
+ CloseSocket();
+ return;
+ }
+
+ if (!CreateTraceFile())
+ {
+ CloseSocket();
+ return;
+ }
+
+ // Write the full preamble to the trace file so it remains a valid .utrace
+ size_t PreambleSize = kPreambleHeaderSize + m_MetadataFieldsSize + kVersionSize;
+ std::error_code WriteEc;
+ m_TraceFile.Write(m_PreambleBuffer, PreambleSize, 0, WriteEc);
+
+ if (WriteEc)
+ {
+ ZEN_ERROR("Trace session {}: failed to write preamble: {}", m_SessionId, WriteEc.message());
+ CloseSocket();
+ return;
+ }
+
+ m_TotalBytesRecorded = PreambleSize;
+
+ ZEN_INFO("Trace session {}: metadata - TransportV{} ProtocolV{} ControlPort:{} SessionGuid:{} TraceGuid:{}",
+ m_SessionId,
+ m_TransportVersion,
+ m_ProtocolVersion,
+ m_ControlPort,
+ m_SessionGuid,
+ m_TraceGuid);
+
+ // Begin streaming trace data to disk
+ ReadMore();
+ });
+ }
+
+ bool ParseMetadata()
+ {
+ const uint8_t* Cursor = m_PreambleBuffer + kPreambleHeaderSize;
+ int32_t Remaining = static_cast<int32_t>(m_MetadataFieldsSize);
+
+ while (Remaining >= 2)
+ {
+ uint8_t FieldSize = Cursor[0];
+ uint8_t FieldId = Cursor[1];
+ Cursor += 2;
+ Remaining -= 2;
+
+ if (Remaining < FieldSize)
+ {
+ return false;
+ }
+
+ switch (FieldId)
+ {
+ case 0: // ControlPort
+ if (FieldSize >= sizeof(uint16_t))
+ {
+ std::memcpy(&m_ControlPort, Cursor, sizeof(uint16_t));
+ }
+ break;
+ case 1: // SessionGuid
+ if (FieldSize >= sizeof(Guid))
+ {
+ std::memcpy(&m_SessionGuid, Cursor, sizeof(Guid));
+ }
+ break;
+ case 2: // TraceGuid
+ if (FieldSize >= sizeof(Guid))
+ {
+ std::memcpy(&m_TraceGuid, Cursor, sizeof(Guid));
+ }
+ break;
+ }
+
+ Cursor += FieldSize;
+ Remaining -= FieldSize;
+ }
+
+ // Metadata should be fully consumed
+ if (Remaining != 0)
+ {
+ return false;
+ }
+
+ // Version bytes follow immediately after the metadata fields
+ const uint8_t* VersionPtr = m_PreambleBuffer + kPreambleHeaderSize + m_MetadataFieldsSize;
+ m_TransportVersion = VersionPtr[0];
+ m_ProtocolVersion = VersionPtr[1];
+
+ return true;
+ }
+
+ bool CreateTraceFile()
+ {
+ m_TraceFilePath = m_OutputDir / fmt::format("{}.utrace", m_SessionId);
+
+ try
+ {
+ m_TraceFile.Open(m_TraceFilePath, BasicFile::Mode::kTruncate);
+ ZEN_INFO("Trace session {} writing to '{}'", m_SessionId, m_TraceFilePath);
+ return true;
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Trace session {}: failed to create trace file '{}': {}", m_SessionId, m_TraceFilePath, Ex.what());
+ return false;
+ }
+ }
+
+ void ReadMore()
+ {
+ auto Self = shared_from_this();
+
+ m_Socket.async_read_some(asio::buffer(m_ReadBuffer, kReadBufferSize),
+ [this, Self](const asio::error_code& Ec, std::size_t BytesRead) {
+ if (!Ec)
+ {
+ if (BytesRead > 0 && m_TraceFile.IsOpen())
+ {
+ std::error_code WriteEc;
+ const uint64_t FileOffset = m_TotalBytesRecorded;
+ m_TraceFile.Write(m_ReadBuffer, BytesRead, FileOffset, WriteEc);
+
+ if (WriteEc)
+ {
+ ZEN_ERROR("Trace session {}: write error: {}", m_SessionId, WriteEc.message());
+ CloseSocket();
+ return;
+ }
+
+ m_TotalBytesRecorded += BytesRead;
+ }
+
+ ReadMore();
+ }
+ else if (Ec == asio::error::eof)
+ {
+ ZEN_DEBUG("Trace session {} connection closed by peer", m_SessionId);
+ CloseSocket();
+ }
+ else if (Ec == asio::error::operation_aborted)
+ {
+ ZEN_DEBUG("Trace session {} operation aborted", m_SessionId);
+ }
+ else
+ {
+ ZEN_WARN("Trace session {} read error: {}", m_SessionId, Ec.message());
+ CloseSocket();
+ }
+ });
+ }
+
+ void HandleReadError(const char* Phase, const asio::error_code& Ec)
+ {
+ if (Ec == asio::error::eof)
+ {
+ ZEN_DEBUG("Trace session {}: connection closed during {}", m_SessionId, Phase);
+ }
+ else if (Ec == asio::error::operation_aborted)
+ {
+ ZEN_DEBUG("Trace session {}: operation aborted during {}", m_SessionId, Phase);
+ }
+ else
+ {
+ ZEN_WARN("Trace session {}: error during {}: {}", m_SessionId, Phase, Ec.message());
+ }
+
+ CloseSocket();
+ }
+
+ void CloseSocket()
+ {
+ std::error_code Ec;
+ m_Socket.close(Ec);
+
+ if (m_TraceFile.IsOpen())
+ {
+ m_TraceFile.Close();
+ }
+ }
+
+ asio::ip::tcp::socket m_Socket;
+ std::filesystem::path m_OutputDir;
+ std::filesystem::path m_TraceFilePath;
+ BasicFile m_TraceFile;
+ Oid m_SessionId;
+ std::string m_RemoteAddress;
+
+ // Preamble parsing
+ uint8_t m_PreambleBuffer[kPreambleBufferSize] = {};
+ uint16_t m_MetadataFieldsSize = 0;
+
+ // Extracted metadata
+ Guid m_SessionGuid{};
+ Guid m_TraceGuid{};
+ uint16_t m_ControlPort = 0;
+ uint8_t m_TransportVersion = 0;
+ uint8_t m_ProtocolVersion = 0;
+
+ // Streaming
+ uint8_t m_ReadBuffer[kReadBufferSize];
+ uint64_t m_TotalBytesRecorded = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TraceRecorder::Impl
+{
+ Impl() : m_IoContext(), m_Acceptor(m_IoContext) {}
+
+ ~Impl() { Shutdown(); }
+
+ void Initialize(uint16_t InPort, const std::filesystem::path& OutputDir)
+ {
+ std::lock_guard<std::mutex> Lock(m_Mutex);
+
+ if (m_IsRunning)
+ {
+ ZEN_WARN("TraceRecorder already initialized");
+ return;
+ }
+
+ m_OutputDir = OutputDir;
+
+ try
+ {
+ // Create output directory if it doesn't exist
+ CreateDirectories(m_OutputDir);
+
+ // Configure acceptor
+ m_Acceptor.open(asio::ip::tcp::v4());
+ m_Acceptor.set_option(asio::socket_base::reuse_address(true));
+ m_Acceptor.bind(asio::ip::tcp::endpoint(asio::ip::tcp::v4(), InPort));
+ m_Acceptor.listen();
+
+ m_Port = m_Acceptor.local_endpoint().port();
+
+ ZEN_INFO("TraceRecorder listening on port {}, output directory: '{}'", m_Port, m_OutputDir);
+
+ m_IsRunning = true;
+
+ // Start accepting connections
+ StartAccept();
+
+ // Start IO thread
+ m_IoThread = std::thread([this]() {
+ try
+ {
+ m_IoContext.run();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("TraceRecorder IO thread exception: {}", Ex.what());
+ }
+ });
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to initialize TraceRecorder: {}", Ex.what());
+ m_IsRunning = false;
+ throw;
+ }
+ }
+
+ void Shutdown()
+ {
+ std::lock_guard<std::mutex> Lock(m_Mutex);
+
+ if (!m_IsRunning)
+ {
+ return;
+ }
+
+ ZEN_INFO("TraceRecorder shutting down");
+
+ m_IsRunning = false;
+
+ std::error_code Ec;
+ m_Acceptor.close(Ec);
+
+ m_IoContext.stop();
+
+ if (m_IoThread.joinable())
+ {
+ m_IoThread.join();
+ }
+
+ {
+ std::lock_guard<std::mutex> SessionLock(m_SessionsMutex);
+ m_Sessions.clear();
+ }
+
+ ZEN_INFO("TraceRecorder shutdown complete");
+ }
+
+ bool IsRunning() const { return m_IsRunning; }
+
+ uint16_t GetPort() const { return m_Port; }
+
+ std::vector<TraceSessionInfo> GetActiveSessions() const
+ {
+ std::lock_guard<std::mutex> Lock(m_SessionsMutex);
+
+ std::vector<TraceSessionInfo> Result;
+ for (const auto& WeakSession : m_Sessions)
+ {
+ if (auto Session = WeakSession.lock())
+ {
+ if (Session->IsActive())
+ {
+ Result.push_back(Session->GetInfo());
+ }
+ }
+ }
+ return Result;
+ }
+
+private:
+ void StartAccept()
+ {
+ auto Socket = std::make_shared<asio::ip::tcp::socket>(m_IoContext);
+
+ m_Acceptor.async_accept(*Socket, [this, Socket](const asio::error_code& Ec) {
+ if (!Ec)
+ {
+ auto Session = std::make_shared<TraceSession>(std::move(*Socket), m_OutputDir);
+
+ {
+ std::lock_guard<std::mutex> Lock(m_SessionsMutex);
+
+ // Prune expired sessions while adding the new one
+ std::erase_if(m_Sessions, [](const std::weak_ptr<TraceSession>& Wp) { return Wp.expired(); });
+ m_Sessions.push_back(Session);
+ }
+
+ Session->Start();
+ }
+ else if (Ec != asio::error::operation_aborted)
+ {
+ ZEN_WARN("Accept error: {}", Ec.message());
+ }
+
+ // Continue accepting if still running
+ if (m_IsRunning)
+ {
+ StartAccept();
+ }
+ });
+ }
+
+ asio::io_context m_IoContext;
+ asio::ip::tcp::acceptor m_Acceptor;
+ std::thread m_IoThread;
+ std::filesystem::path m_OutputDir;
+ std::mutex m_Mutex;
+ std::atomic<bool> m_IsRunning{false};
+ uint16_t m_Port = 0;
+
+ mutable std::mutex m_SessionsMutex;
+ std::vector<std::weak_ptr<TraceSession>> m_Sessions;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+TraceRecorder::TraceRecorder() : m_Impl(std::make_unique<Impl>())
+{
+}
+
+TraceRecorder::~TraceRecorder()
+{
+ Shutdown();
+}
+
+void
+TraceRecorder::Initialize(uint16_t InPort, const std::filesystem::path& OutputDir)
+{
+ m_Impl->Initialize(InPort, OutputDir);
+}
+
+void
+TraceRecorder::Shutdown()
+{
+ m_Impl->Shutdown();
+}
+
+bool
+TraceRecorder::IsRunning() const
+{
+ return m_Impl->IsRunning();
+}
+
+uint16_t
+TraceRecorder::GetPort() const
+{
+ return m_Impl->GetPort();
+}
+
+std::vector<TraceSessionInfo>
+TraceRecorder::GetActiveSessions() const
+{
+ return m_Impl->GetActiveSessions();
+}
+
+} // namespace zen