aboutsummaryrefslogtreecommitdiff
path: root/thirdparty/tourist/trace/src/protocol.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-04-20 21:50:41 +0200
committerGitHub Enterprise <[email protected]>2026-04-20 21:50:41 +0200
commit2dfb5da16b97a6c12e01977af5b5188522178a4e (patch)
tree428aa0aa8e6079c64438931e0fd4f828c613c94d /thirdparty/tourist/trace/src/protocol.cpp
parentAdd CompactString utility type (#990) (diff)
downloadarchived-zen-2dfb5da16b97a6c12e01977af5b5188522178a4e.tar.xz
archived-zen-2dfb5da16b97a6c12e01977af5b5188522178a4e.zip
zen trace analysis support (#945)
Integrates the **tourist** trace analysis library and builds a full `zen trace` command suite for working with Unreal Engine `.utrace` files. ### Trace analysis library (`thirdparty/tourist/`) - Adds the tourist library as a third-party dependency with three modules: **foundation** (platform primitives, memory, scheduling), **trace** (UE Trace protocol decoding), and **analysis** (event dispatching and analyzer framework). - Cross-platform support for Windows, Linux, and macOS. ### `zen trace` CLI commands (`src/zen/cmds/`, `src/zen/trace/`) - **`zen trace analyze`** — Summarize a `.utrace` file: session metadata, thread inventory, command line + build configuration, CPU profiling scopes, timing, event rates, log messages, and (with symbols) memory allocation metrics including live-allocs dumps, callstack-keyed aggregation, and allocation churn. Optional HTML output for memory reports. - **`zen trace inspect`** — Dump the event schema (declared types, fields, sizes) from a trace file. - **`zen trace trim`** — Extract a time-window from a trace into a new `.utrace` file. - **`zen trace serve`** — Launch a local HTTP server hosting an interactive trace viewer; opens in the default browser. ### Symbolication (`src/zen/trace/symbol_resolver.*`, `thirdparty/raw_pdb/`) - Pluggable resolver with multiple backends: `pdb` (in-tree raw_pdb), `dbghelp` (Windows), `llvm-symbolizer` (all platforms), `atos` (macOS). An `auto` backend picks the best available tool per platform. - Microsoft Symbol Server support: downloads PDBs on demand using a redirect-aware HTTP client. - Local PDB cache keyed by image GUID preserves symbols across binary recompilation. - Callstack trimming heuristic strips UE internal noise from reports. - Binary analysis cache (`.ucache_z`) avoids re-resolving the same trace. ### Interactive trace viewer (`src/zen/frontend/html/`, `src/zen/trace/trace_viewer_service.*`) - Timeline: scope-level detail, horizontal zoom/pan, vertical scrolling, viewport-driven loading with pre-computed LOD for responsive navigation of large traces. - Thread grouping (collapsible sidebar sections) synthesized from name suffixes, natural sort order, visual distinction between lane threads and OS threads. - Bookmark and region annotations; region categories with per-category toggles; bookmark marker toggle in the toolbar. - Filterable Logs tab showing captured `UE_LOG` output. - Stats tab with per-scope aggregate statistics. - Memory tab with interactive allocation analysis and an allocation size histogram. - CsvProfiler event parsing and chart UI. ### Other in-branch supporting changes - **Cross-platform browser launcher** (`browser_launcher.{h,cpp}`) used by `trace serve`. - **`ReciprocalU64`** fast 64-bit integer division (zencore/intmath) for trace analyzers. - **`parallelsort`** cross-platform parallel sort helper (zenutil). - Frontend zip build rule so the viewer's HTML assets are bundled into `zen.exe`. - `/Zo` flag for better optimized debug info on Windows release builds. - `trace-tests.cpp` in the `zen-test` harness (harness itself landed on main via #985).
Diffstat (limited to 'thirdparty/tourist/trace/src/protocol.cpp')
-rw-r--r--thirdparty/tourist/trace/src/protocol.cpp850
1 files changed, 850 insertions, 0 deletions
diff --git a/thirdparty/tourist/trace/src/protocol.cpp b/thirdparty/tourist/trace/src/protocol.cpp
new file mode 100644
index 000000000..5297048ec
--- /dev/null
+++ b/thirdparty/tourist/trace/src/protocol.cpp
@@ -0,0 +1,850 @@
+#include <foundation/types.h>
+#include <foundation/buffer.h>
+#include <trace/detail/protocol.h>
+#include <trace/detail/transport.h>
+#include <trace/detail/type.h>
+
+#include "constants.h"
+
+//------------------------------------------------------------------------------
+class TypeDesc
+{
+public:
+ uint16 size : 13;
+ uint16 important : 1;
+ uint16 has_serial : 1;
+ uint16 maybe_aux : 1;
+ uint16 _unused[3];
+ const Type* type;
+ static Tuple<uint32, TypeDesc> parse(BufferStream& stream);
+};
+
+//------------------------------------------------------------------------------
+Tuple<uint32, TypeDesc> TypeDesc::parse(BufferStream& stream)
+{
+ uint32 zero_uid = stream.read<uint16>();
+ if (zero_uid != 0)
+ fatal("non-zero type uid");
+
+ uint32 info_size = stream.read<uint16>();
+ const uint8* type_info = stream.read(info_size);
+ auto* type = (Type*)type_info;
+ type->patch();
+
+ uint32 uid = type->get_uid();
+
+ uint32 type_size = 0;
+ for (uint32 i = 0, n = type->get_field_count(); i < n; ++i)
+ type_size += type->get_field(i).get_size();
+
+ bool important_ = type->has_flag(TYPE_FLAG_IMPORTANT);
+ bool has_serial_ = !type->has_flag(TYPE_FLAG_NO_SERIAL);
+ bool maybe_aux_ = type->has_flag(TYPE_FLAG_AUX);
+
+ TypeDesc desc = {
+ uint16(type_size),
+ important_,
+ has_serial_,
+ maybe_aux_,
+ {},
+ type
+ };
+ return { uid, desc };
+}
+
+
+
+//------------------------------------------------------------------------------
+class Types
+{
+public:
+ void parse(Buffer& buffer, Vector<const Type*>& new_types);
+ const TypeDesc* lookup(uint32 uid) const;
+
+private:
+ Vector<BufferRef> _buffer_refs;
+ Vector<TypeDesc> _descs;
+};
+
+//------------------------------------------------------------------------------
+void Types::parse(Buffer& buffer, Vector<const Type*>& new_types)
+{
+ BufferStream stream = buffer.create_stream();
+ do
+ {
+ auto [uid, desc] = TypeDesc::parse(stream);
+ if (_descs.size() <= uid)
+ {
+ uint32 new_size = (uid + 32) & ~31;
+ _descs.resize(new_size, TypeDesc{});
+ }
+ _descs[uid] = desc;
+
+ new_types.push_back(desc.type);
+ }
+ while (stream.has_data());
+
+ BufferRef buffer_ref = buffer.create_ref();
+ _buffer_refs.push_back(std::move(buffer_ref));
+}
+
+//------------------------------------------------------------------------------
+const TypeDesc* Types::lookup(uint32 uid) const
+{
+ if (uid >= _descs.size())
+ return nullptr;
+
+ const TypeDesc* ret = _descs.data() + uid;
+ return (ret->type != nullptr) ? ret : nullptr;
+}
+
+
+
+//------------------------------------------------------------------------------
+class Serial
+{
+public:
+ enum NoSync : uint32 { NO_SYNC = 0x0100'0000 };
+ enum Pending : uint32 { PENDING = 0xff00'0000 };
+
+ Serial() {}
+ explicit Serial(int32 v) : _v(v) {}
+ explicit Serial(NoSync) : _v(NO_SYNC) {}
+ explicit Serial(Pending) : _v(PENDING) {}
+ bool is_sync() const { return _v < NO_SYNC; }
+ int32 get_value() const { return _v & 0x00ff'ffff; }
+ explicit operator bool () const { return _v <= NO_SYNC; }
+ bool operator == (Pending) const { return _v == PENDING; }
+ bool operator == (Serial rhs) const { return !(_v - rhs._v); }
+ bool operator != (Serial rhs) const { return ! operator == (rhs); }
+ void operator ++ () { _v = (_v + 1) & 0x00ff'ffff; }
+
+ bool less(Serial lhs, Serial rhs) const
+ {
+ return (!lhs || !rhs)
+ ? lhs._v < rhs._v
+ : (lhs._v - _v) < (rhs._v - _v);
+ }
+
+private:
+ uint32 _v = 0xffff'ffff;
+};
+
+//------------------------------------------------------------------------------
+class EventParser
+ : public NoCopy
+{
+public:
+ EventParser() = default;
+ ~EventParser();
+ EventParser(EventParser&& rhs) { move(std::move(rhs)); }
+ EventParser& operator = (EventParser&& rhs) { move(std::move(rhs)); return *this; }
+ Serial parse_normal(BufferStream& stream, const Types& types);
+ Serial parse_important(BufferStream& stream, const Types& types);
+ Event consume();
+ void pin();
+ bool is_empty() const { return _stack.empty(); }
+
+private:
+ struct State
+ {
+ BufferStream& stream;
+ const Types& types;
+ };
+
+ void move(EventParser&& rhs);
+ Serial parse_normal(const State& state);
+ Serial parse_important(const State& state);
+ Serial parse_continue(const State& state);
+ Serial parse_uid(const State& state);
+ Serial parse_type(const State& state);
+ Serial parse_well_known(const State& state, uint32 uid);
+ void parse_aux(const State& state);
+ Serial parse_important_aux(const State& state);
+
+ struct StackItem
+ : public Event
+ {
+ Serial serial;
+ };
+
+ friend class ProtocolImpl;
+ Vector<StackItem> _stack;
+ MutableBuffer _fragment;
+ uint32 _missing = 0;
+ int32 _stage = 0;
+ uint32 _last_uid = 0;
+ uint8 _protocol_version = 7;
+};
+
+//------------------------------------------------------------------------------
+EventParser::~EventParser()
+{
+ // Don't throw from destructors — this causes std::terminate during
+ // stack unwinding (e.g. when the trace stream ends mid-parse).
+}
+
+//------------------------------------------------------------------------------
+void EventParser::move(EventParser&& rhs)
+{
+ std::swap(_stack, rhs._stack);
+ std::swap(_fragment, rhs._fragment);
+ std::swap(_missing, rhs._missing);
+ std::swap(_stage, rhs._stage);
+ std::swap(_last_uid, rhs._last_uid);
+ std::swap(_protocol_version, rhs._protocol_version);
+}
+
+//------------------------------------------------------------------------------
+Serial EventParser::parse_normal(BufferStream& stream, const Types& types)
+{
+ State state = { stream, types };
+ return parse_normal(state);
+}
+
+//------------------------------------------------------------------------------
+Serial EventParser::parse_important(BufferStream& stream, const Types& types)
+{
+ if (_missing == 0)
+ {
+ State state = { stream, types };
+ Serial ret = parse_important(state);
+ if (!ret && _stage != 1 && _missing == 0)
+ fatal("important parse should only fail on unknown uid");
+ return ret;
+ }
+
+ uint32 remaining = stream.get_remaining();
+ uint32 read_size = std::min(remaining, _missing);
+ uint8* dest = _fragment.get_pointer() + _fragment.get_size() - _missing;
+ std::memcpy(dest, stream.read(read_size), read_size);
+ if (_missing -= read_size)
+ return Serial();
+
+ BufferStream missing_stream = _fragment.create_stream();
+ State state = { missing_stream, types };
+
+ if (!_stack.empty())
+ return parse_important(state);
+
+ parse_important(state);
+ _missing = 0;
+ return parse_important(stream, types);
+}
+
+//------------------------------------------------------------------------------
+Event EventParser::consume()
+{
+ _stage = 0;
+ Event event = std::move(_stack.back());
+ _stack.pop_back();
+ return event;
+}
+
+//------------------------------------------------------------------------------
+void EventParser::pin()
+{
+ for (StackItem& item : _stack)
+ {
+ if (item.data.is_valid())
+ item.data.pin();
+
+ for (Aux& aux : item.aux)
+ aux.data.pin();
+ }
+}
+
+//------------------------------------------------------------------------------
+Serial EventParser::parse_normal(const State& state)
+{
+ switch (_stage)
+ {
+ case 0: return parse_uid(state);
+ case 1: return parse_type(state);
+ default: fatal("unexpected _stage value");
+ }
+ return Serial();
+}
+
+//------------------------------------------------------------------------------
+Serial EventParser::parse_important(const State& state)
+{
+ if (_stage == 1)
+ return parse_type(state);
+
+ BufferStream& stream = state.stream;
+
+ auto ok_or_capture_fragment = [this, &stream] (uint32 required) {
+ uint32 remaining = stream.get_remaining();
+ if (remaining >= required)
+ return true;
+
+ if (required >= (64 << 10)) // size field is uint16 so 64 KB is the hard upper bound
+ fatal("an important event seems to be rather too large");
+
+ Allocator& allocator = Allocator::get_from(stream);
+ _fragment = allocator.create_buffer(required);
+ std::memcpy(
+ _fragment.get_pointer(),
+ stream.read(remaining),
+ remaining
+ );
+ _missing = required - remaining;
+
+ return false;
+ };
+
+ if (!ok_or_capture_fragment(EVENT_IMPORTANT_SIZE))
+ return Serial(Serial::PENDING);
+
+ _stage = 1;
+
+ Event& top = _stack.emplace_back();
+ top.uid = stream.read<uint16>();
+
+ uint32 size = stream.read<uint16>();
+ if (!ok_or_capture_fragment(size))
+ return Serial(Serial::PENDING);
+
+ // Track how many bytes parse_type consumes so we can skip any
+ // remaining bytes in the declared important-event payload. The UE
+ // writer may include trailing data (e.g. attachment metadata) that
+ // our type parser does not consume.
+ uint32 before = stream.get_remaining();
+ Serial ret = parse_type(state);
+ uint32 consumed = before - stream.get_remaining();
+ if (consumed < size)
+ stream.read(size - consumed);
+
+ return ret;
+}
+
+//------------------------------------------------------------------------------
+Serial EventParser::parse_continue(const State& state)
+{
+ _stage = 0;
+ if (state.stream.has_data())
+ return parse_uid(state);
+
+ return _stack.empty() ? Serial() : Serial(Serial::PENDING);
+}
+
+//------------------------------------------------------------------------------
+Serial EventParser::parse_uid(const State& state)
+{
+ BufferStream& stream = state.stream;
+
+ uint32 uid = stream.read<uint8>();
+ if (uid & EVENT_LARGE_UID_BIT)
+ uid |= uint32(stream.read<uint8>()) << 8;
+ _last_uid = uid >>= 1;
+
+ if (uid <= EVENT_UID_WELL_KNOWN)
+ return parse_well_known(state, uid);
+
+ _stack.emplace_back().uid = uint16(uid);
+
+ _stage = 1;
+ return parse_type(state);
+}
+
+//------------------------------------------------------------------------------
+Serial EventParser::parse_type(const State& state)
+{
+ StackItem& top = _stack.back();
+
+ const TypeDesc* type_desc = state.types.lookup(top.uid);
+ if (type_desc == nullptr)
+ return Serial(Serial::PENDING);
+
+ BufferStream& stream = state.stream;
+
+ Serial serial(Serial::NO_SYNC);
+ if (type_desc->has_serial)
+ {
+ uint32 low_serial = stream.read<uint8>();
+ uint32 high_serial = stream.read<uint16>();
+ serial = Serial((high_serial << 8) | low_serial);
+ }
+
+ uint32 event_size = type_desc->size;
+ top.data = stream.read_ptr(event_size);
+ top.serial = serial;
+
+ if (type_desc->maybe_aux)
+ return type_desc->important
+ ? parse_important_aux(state)
+ : parse_continue(state);
+
+ _stage = -1;
+ return serial;
+}
+
+//------------------------------------------------------------------------------
+Serial EventParser::parse_well_known(const State& state, uint32 uid)
+{
+ BufferStream& stream = state.stream;
+
+ // AuxData
+ if (uid == 1)
+ {
+ parse_aux(state);
+ return parse_continue(state);
+ }
+
+ // AuxDataTerminal
+ if (uid == 3)
+ {
+ _stage = -1;
+ return Serial(_stack.back().serial);
+ }
+
+ // EnterScope
+ if (uid == 4)
+ return parse_continue(state);
+
+ // LeaveScope
+ if (uid == 5)
+ return parse_continue(state);
+
+ if (_protocol_version >= 7)
+ {
+ // EnterScope_T (protocol 7)
+ if (uid == 6 || uid == 8)
+ {
+ /*const uint8* timestamp =*/ stream.read(7);
+ return parse_continue(state);
+ }
+
+ // LeaveScope_T (protocol 7)
+ if (uid == 7 || uid == 9)
+ {
+ /*const uint8* timestamp =*/ stream.read(7);
+ return parse_continue(state);
+ }
+ }
+ else
+ {
+ // EnterScope_T (protocol 6)
+ if (uid == 8 || uid == 12)
+ {
+ /*const uint8* timestamp =*/ stream.read(7);
+ return parse_continue(state);
+ }
+ }
+
+ fatal("Unexpected uid");
+ return Serial();
+}
+
+//------------------------------------------------------------------------------
+void EventParser::parse_aux(const State& state)
+{
+ uint32 low_size = state.stream.read<uint8>();
+ uint32 high_size = state.stream.read<uint16>();
+ uint32 size = (low_size | (high_size << 8)) >> 5;
+
+ Event& top = _stack.back();
+ Aux& aux = top.aux.emplace_back();
+ aux.size = size;
+ aux.index = low_size & 0x1f;
+ aux.partial = 0;
+ if (top.aux.size() > 1)
+ {
+ Aux& prev = *(top.aux.rbegin() + 1);
+ prev.partial = (prev.index == aux.index);
+ }
+
+ BufferStream& stream = state.stream;
+ uint32 remaining = stream.get_remaining();
+ if (remaining < size)
+ fatal("aux size too large");
+
+ aux.data = stream.read_ptr(size);
+}
+
+//------------------------------------------------------------------------------
+Serial EventParser::parse_important_aux(const State& state)
+{
+ uint32 uid = state.stream.read<uint8>();
+
+ if (uid == 1) // AuxData
+ {
+ parse_aux(state);
+ return parse_important_aux(state);
+ }
+
+ if (uid == 3) // AuxDataTerminal
+ {
+ _stage = -1;
+ return Serial(Serial::NO_SYNC);
+ }
+
+ fatal("unsupported important sub-uid");
+ return Serial();
+}
+
+
+//------------------------------------------------------------------------------
+class PacketNodePool
+{
+public:
+ struct PacketNode
+ {
+ PacketNode* get_next() const { return (PacketNode*)next; }
+ void set_next(PacketNode* n) { next = uintptr(n); }
+ Buffer payload;
+ uintptr compressed : 1;
+ uintptr _unused : 15;
+ uintptr next : 48;
+ };
+
+ ~PacketNodePool();
+ PacketNode* alloc_pnode();
+ void free_pnode(PacketNode* node);
+
+private:
+ PacketNode* _free_nodes = nullptr;
+};
+
+//------------------------------------------------------------------------------
+PacketNodePool::~PacketNodePool()
+{
+ while (_free_nodes != nullptr)
+ {
+ PacketNode* next = _free_nodes->get_next();
+ delete _free_nodes;
+ _free_nodes = next;
+ }
+}
+
+//------------------------------------------------------------------------------
+PacketNodePool::PacketNode* PacketNodePool::alloc_pnode()
+{
+ if (_free_nodes == nullptr)
+ return new PacketNode();
+
+ PacketNode* ret = _free_nodes;
+ _free_nodes = ret->get_next();
+ return new (ret) PacketNode();
+}
+
+//------------------------------------------------------------------------------
+void PacketNodePool::free_pnode(PacketNode* node)
+{
+ node->payload = Buffer();
+ node->set_next(_free_nodes);
+ _free_nodes = node;
+}
+
+
+
+//------------------------------------------------------------------------------
+class ParserPool
+{
+public:
+ EventParser& get_parser(uint32 index);
+ uint16 alloc_parser();
+ void free_parser(uint32 index);
+
+private:
+ Vector<EventParser> _parsers;
+ Vector<uint16> _frees;
+};
+
+//------------------------------------------------------------------------------
+EventParser& ParserPool::get_parser(uint32 index)
+{
+ return _parsers[index];
+}
+
+//------------------------------------------------------------------------------
+uint16 ParserPool::alloc_parser()
+{
+ if (_frees.empty())
+ {
+ _parsers.emplace_back();
+ return uint16(_parsers.size() - 1);
+ }
+
+ uint16 index = _frees.back();
+ _frees.pop_back();
+ return index;
+}
+
+//------------------------------------------------------------------------------
+void ParserPool::free_parser(uint32 index)
+{
+ _parsers[index] = EventParser();
+ _frees.push_back(uint16(index));
+}
+
+
+
+//------------------------------------------------------------------------------
+class ProtocolImpl
+ : public PacketNodePool
+ , protected ParserPool
+ , public NoCopy
+ , public NoMove
+{
+public:
+ explicit ProtocolImpl(uint8 protocol_version);
+ void enable_unordered();
+ void read(EventParcel& parcel, Bundle& bundle);
+
+private:
+ struct Thread
+ {
+ Serial serial;
+ uint16 id;
+ uint16 parser_index;
+ PacketNode* head = nullptr;
+ PacketNode* tail = nullptr;
+ };
+
+ template <bool> bool read(EventParcel& parcel, Thread& thread);
+ void scatter(EventParcel& parcel, Packet& packet);
+ EventParser& get_parser(const Thread& thread);
+ Types _types;
+ Vector<Thread> _threads;
+ Serial _next_serial = Serial(0);
+ bool _serialised = true;
+ uint8 _protocol_version;
+};
+
+//------------------------------------------------------------------------------
+ProtocolImpl::ProtocolImpl(uint8 protocol_version)
+: _protocol_version(protocol_version)
+{
+ Thread& thread = _threads.emplace_back();
+ thread.id = TID_IMPORTANT;
+ thread.parser_index = alloc_parser();
+ get_parser(thread)._protocol_version = protocol_version;
+}
+
+//------------------------------------------------------------------------------
+void ProtocolImpl::enable_unordered()
+{
+ _serialised = false;
+}
+
+//------------------------------------------------------------------------------
+void ProtocolImpl::read(EventParcel& parcel, Bundle& bundle)
+{
+ // scatter packets to their respective threads
+ for (Packet& packet : bundle)
+ scatter(parcel, packet);
+
+ // important
+ read<false>(parcel, _threads[0]);
+
+ // read as many events as we can into the parcel
+ for (uint32 i = 1, n = uint32(_threads.size()); i < n; ++i)
+ {
+ Thread& thread = _threads[i];
+ if (thread.serial.is_sync())
+ continue;
+
+ if (!read<true>(parcel, thread))
+ continue;
+
+ thread = std::move(_threads.back());
+ _threads.pop_back();
+ --i, --n;
+ }
+
+ for (const bool do_gather = _serialised; do_gather;)
+ {
+ Serial prev_serial = _next_serial;
+ for (uint32 i = 1, n = uint32(_threads.size()); i < n; ++i)
+ {
+ Thread& thread = _threads[i];
+ if (thread.serial != _next_serial)
+ continue;
+
+ ++_next_serial;
+
+ Event event = get_parser(thread).consume();
+ event.thread_id = thread.id;
+ event.serial = thread.serial.is_sync() ? thread.serial.get_value() : -1;
+ parcel.events.push_back(std::move(event));
+
+ if (!read<true>(parcel, thread))
+ continue;
+
+ thread = std::move(_threads.back());
+ _threads.pop_back();
+ --i, --n;
+ }
+
+ if (prev_serial == _next_serial)
+ break;
+ }
+
+ for (Thread& thread : _threads)
+ get_parser(thread).pin();
+}
+
+//------------------------------------------------------------------------------
+void ProtocolImpl::scatter(EventParcel& parcel, Packet& packet)
+{
+ uint32 thread_id = packet.get_thread_id();
+
+ if (thread_id == TID_SYNC)
+ return;
+
+ Buffer& payload = packet.get_payload();
+
+ if (thread_id == TID_TYPE)
+ {
+ packet.decompress();
+ _types.parse(payload, parcel.new_types);
+ return;
+ }
+
+ PacketNode* node = alloc_pnode();
+ node->payload = std::move(payload);
+ node->compressed = packet.is_compressed();
+
+ Thread* thread = (thread_id == TID_IMPORTANT) ? _threads.data() : nullptr;
+ for (int32 i = 1, n = uint32(_threads.size()); i < n && !thread; ++i)
+ if (Thread& lookup = _threads[i]; lookup.id == thread_id)
+ thread = &lookup;
+
+ if (thread == nullptr)
+ {
+ thread = &(_threads.emplace_back());
+ thread->id = uint16(thread_id);
+ thread->parser_index = alloc_parser();
+ get_parser(*thread)._protocol_version = _protocol_version;
+ }
+
+ if (thread->tail != nullptr)
+ {
+ thread->tail->set_next(node);
+ thread->tail = node;
+ }
+ else
+ thread->head = thread->tail = node;
+}
+
+//------------------------------------------------------------------------------
+template <bool is_normal>
+bool ProtocolImpl::read(EventParcel& parcel, Thread& thread)
+{
+ EventParser& parser = get_parser(thread);
+
+ PacketNode* node = thread.head;
+ while (node != nullptr)
+ {
+ if (node->compressed)
+ {
+ Packet::decompress(node->payload);
+ node->compressed = 0;
+ }
+
+ BufferRef buffer_ref = node->payload.create_ref();
+ parcel.buffer_refs.push_back(std::move(buffer_ref));
+
+ BufferStream stream = node->payload.create_stream();
+ while (true)
+ {
+ Serial serial = is_normal
+ ? parser.parse_normal(stream, _types)
+ : parser.parse_important(stream, _types);
+
+ thread.serial = serial;
+
+ if (!serial)
+ break;
+
+ if (int32 cond = is_normal && serial.is_sync(); cond)
+ {
+ if ((_serialised == true) & (serial != _next_serial))
+ break;
+
+ ++_next_serial;
+ thread.serial = Serial(Serial::NO_SYNC);
+ }
+
+ Event event = parser.consume();
+ event.thread_id = thread.id;
+ event.serial = serial.is_sync() ? serial.get_value() : -1;
+ parcel.events.push_back(std::move(event));
+
+ if (!stream.has_data())
+ break;
+ }
+
+ if (stream.has_data())
+ {
+ Buffer& buffer = node->payload;
+ node->payload = buffer.create_sub_buffer(stream.get_consumed());
+ thread.head = node;
+ return false;
+ }
+
+ PacketNode* next = node->get_next();
+ free_pnode(node);
+
+ if ((thread.head = node = next) == nullptr)
+ thread.tail = nullptr;
+
+ if (bool cond = is_normal; cond)
+ if (thread.serial.is_sync())
+ return false;
+ }
+
+ if (bool cond = !is_normal; cond)
+ return false;
+
+ if (thread.serial == Serial::PENDING)
+ return false;
+
+ free_parser(thread.parser_index);
+ thread.serial = Serial();
+ return true;
+}
+
+//------------------------------------------------------------------------------
+EventParser& ProtocolImpl::get_parser(const Thread& thread)
+{
+ return ParserPool::get_parser(thread.parser_index);
+}
+
+
+
+//------------------------------------------------------------------------------
+void EventParcel::reset()
+{
+ events.clear();
+ new_types.clear();
+ buffer_refs.clear();
+}
+
+
+
+//------------------------------------------------------------------------------
+Protocol::Protocol(uint8 version)
+{
+ _impl = new ProtocolImpl(version);
+}
+
+//------------------------------------------------------------------------------
+Protocol::~Protocol()
+{
+ delete _impl;
+}
+
+//------------------------------------------------------------------------------
+void Protocol::enable_unordered()
+{
+ return _impl->enable_unordered();
+}
+
+//------------------------------------------------------------------------------
+void Protocol::read(EventParcel& parcel, Bundle& bundle)
+{
+ return _impl->read(parcel, bundle);
+}