#include #include #include #include #include #include "constants.h" //------------------------------------------------------------------------------ class TypeDesc { public: uint16 size : 13; uint16 important : 1; uint16 has_serial : 1; uint16 maybe_aux : 1; uint16 _unused[3]; const Type* type; static Tuple parse(BufferStream& stream); }; //------------------------------------------------------------------------------ Tuple TypeDesc::parse(BufferStream& stream) { uint32 zero_uid = stream.read(); if (zero_uid != 0) fatal("non-zero type uid"); uint32 info_size = stream.read(); const uint8* type_info = stream.read(info_size); // Validate that the declared field count and name lengths actually fit // within info_size before patch() walks _fields[] and writes to it. if (info_size < sizeof(Type)) fatal("type info smaller than Type header"); auto* type = (Type*)type_info; uint32 field_count = type->get_field_count(); uint64 required = sizeof(Type); required += uint64(field_count) * sizeof(Type::Field); required += type->_logger_name_len; required += type->_event_name_len; if (required > info_size) fatal("type info too small for declared fields"); for (uint32 i = 0; i < field_count; ++i) { required += type->_fields[i].name_size; if (required > info_size) fatal("type info too small for declared field names"); } type->patch(); uint32 uid = type->get_uid(); uint32 type_size = 0; for (uint32 i = 0, n = type->get_field_count(); i < n; ++i) type_size += type->get_field(i).get_size(); bool important_ = type->has_flag(TYPE_FLAG_IMPORTANT); bool has_serial_ = !type->has_flag(TYPE_FLAG_NO_SERIAL); bool maybe_aux_ = type->has_flag(TYPE_FLAG_AUX); TypeDesc desc = { uint16(type_size), important_, has_serial_, maybe_aux_, {}, type }; return { uid, desc }; } //------------------------------------------------------------------------------ class Types { public: void parse(Buffer& buffer, Vector& new_types); const TypeDesc* lookup(uint32 uid) const; private: Vector _buffer_refs; Vector _descs; }; //------------------------------------------------------------------------------ void Types::parse(Buffer& buffer, Vector& new_types) { // Pin the buffer up-front. TypeDesc::parse stores raw pointers into // this buffer's slab in _descs; if a later type is malformed and // throws, we still need the already-stored entries to remain valid. _buffer_refs.push_back(buffer.create_ref()); BufferStream stream = buffer.create_stream(); do { auto [uid, desc] = TypeDesc::parse(stream); if (_descs.size() <= uid) { uint32 new_size = (uid + 32) & ~31; _descs.resize(new_size, TypeDesc{}); } _descs[uid] = desc; new_types.push_back(desc.type); } while (stream.has_data()); } //------------------------------------------------------------------------------ const TypeDesc* Types::lookup(uint32 uid) const { if (uid >= _descs.size()) return nullptr; const TypeDesc* ret = _descs.data() + uid; return (ret->type != nullptr) ? ret : nullptr; } //------------------------------------------------------------------------------ class Serial { public: enum NoSync : uint32 { NO_SYNC = 0x0100'0000 }; enum Pending : uint32 { PENDING = 0xff00'0000 }; Serial() {} explicit Serial(int32 v) : _v(v) {} explicit Serial(NoSync) : _v(NO_SYNC) {} explicit Serial(Pending) : _v(PENDING) {} bool is_sync() const { return _v < NO_SYNC; } int32 get_value() const { return _v & 0x00ff'ffff; } explicit operator bool () const { return _v <= NO_SYNC; } bool operator == (Pending) const { return _v == PENDING; } bool operator == (Serial rhs) const { return !(_v - rhs._v); } bool operator != (Serial rhs) const { return ! operator == (rhs); } void operator ++ () { _v = (_v + 1) & 0x00ff'ffff; } bool less(Serial lhs, Serial rhs) const { return (!lhs || !rhs) ? lhs._v < rhs._v : (lhs._v - _v) < (rhs._v - _v); } private: uint32 _v = 0xffff'ffff; }; //------------------------------------------------------------------------------ class EventParser : public NoCopy { public: EventParser() = default; ~EventParser(); EventParser(EventParser&& rhs) { move(std::move(rhs)); } EventParser& operator = (EventParser&& rhs) { move(std::move(rhs)); return *this; } Serial parse_normal(BufferStream& stream, const Types& types); Serial parse_important(BufferStream& stream, const Types& types); Event consume(); void pin(); bool is_empty() const { return _stack.empty(); } private: struct State { BufferStream& stream; const Types& types; }; void move(EventParser&& rhs); Serial parse_normal(const State& state); Serial parse_important(const State& state); Serial parse_continue(const State& state); Serial parse_uid(const State& state); Serial parse_type(const State& state); Serial parse_well_known(const State& state, uint32 uid); void parse_aux(const State& state); Serial parse_important_aux(const State& state); struct StackItem : public Event { Serial serial; }; friend class ProtocolImpl; Vector _stack; MutableBuffer _fragment; uint32 _missing = 0; int32 _stage = 0; uint32 _last_uid = 0; uint8 _protocol_version = 7; }; //------------------------------------------------------------------------------ EventParser::~EventParser() { // Don't throw from destructors — this causes std::terminate during // stack unwinding (e.g. when the trace stream ends mid-parse). } //------------------------------------------------------------------------------ void EventParser::move(EventParser&& rhs) { std::swap(_stack, rhs._stack); std::swap(_fragment, rhs._fragment); std::swap(_missing, rhs._missing); std::swap(_stage, rhs._stage); std::swap(_last_uid, rhs._last_uid); std::swap(_protocol_version, rhs._protocol_version); } //------------------------------------------------------------------------------ Serial EventParser::parse_normal(BufferStream& stream, const Types& types) { State state = { stream, types }; return parse_normal(state); } //------------------------------------------------------------------------------ Serial EventParser::parse_important(BufferStream& stream, const Types& types) { if (_missing == 0) { State state = { stream, types }; Serial ret = parse_important(state); if (!ret && _stage != 1 && _missing == 0) fatal("important parse should only fail on unknown uid"); return ret; } uint32 remaining = stream.get_remaining(); uint32 read_size = std::min(remaining, _missing); uint8* dest = _fragment.get_pointer() + _fragment.get_size() - _missing; std::memcpy(dest, stream.read(read_size), read_size); if (_missing -= read_size) return Serial(); BufferStream missing_stream = _fragment.create_stream(); State state = { missing_stream, types }; if (!_stack.empty()) return parse_important(state); parse_important(state); _missing = 0; return parse_important(stream, types); } //------------------------------------------------------------------------------ Event EventParser::consume() { _stage = 0; Event event = std::move(_stack.back()); _stack.pop_back(); return event; } //------------------------------------------------------------------------------ void EventParser::pin() { for (StackItem& item : _stack) { if (item.data.is_valid()) item.data.pin(); for (Aux& aux : item.aux) aux.data.pin(); } } //------------------------------------------------------------------------------ Serial EventParser::parse_normal(const State& state) { switch (_stage) { case 0: return parse_uid(state); case 1: return parse_type(state); default: fatal("unexpected _stage value"); } return Serial(); } //------------------------------------------------------------------------------ Serial EventParser::parse_important(const State& state) { if (_stage == 1) return parse_type(state); BufferStream& stream = state.stream; auto ok_or_capture_fragment = [this, &stream] (uint32 required) { uint32 remaining = stream.get_remaining(); if (remaining >= required) return true; if (required >= (64 << 10)) // size field is uint16 so 64 KB is the hard upper bound fatal("an important event seems to be rather too large"); Allocator& allocator = Allocator::get_from(stream); _fragment = allocator.create_buffer(required); std::memcpy( _fragment.get_pointer(), stream.read(remaining), remaining ); _missing = required - remaining; return false; }; if (!ok_or_capture_fragment(EVENT_IMPORTANT_SIZE)) return Serial(Serial::PENDING); _stage = 1; Event& top = _stack.emplace_back(); top.uid = stream.read(); uint32 size = stream.read(); if (!ok_or_capture_fragment(size)) return Serial(Serial::PENDING); // Track how many bytes parse_type consumes so we can skip any // remaining bytes in the declared important-event payload. The UE // writer may include trailing data (e.g. attachment metadata) that // our type parser does not consume. uint32 before = stream.get_remaining(); Serial ret = parse_type(state); uint32 consumed = before - stream.get_remaining(); if (consumed < size) stream.read(size - consumed); return ret; } //------------------------------------------------------------------------------ Serial EventParser::parse_continue(const State& state) { _stage = 0; if (state.stream.has_data()) return parse_uid(state); return _stack.empty() ? Serial() : Serial(Serial::PENDING); } //------------------------------------------------------------------------------ Serial EventParser::parse_uid(const State& state) { BufferStream& stream = state.stream; uint32 uid = stream.read(); if (uid & EVENT_LARGE_UID_BIT) uid |= uint32(stream.read()) << 8; _last_uid = uid >>= 1; if (uid <= EVENT_UID_WELL_KNOWN) return parse_well_known(state, uid); _stack.emplace_back().uid = uint16(uid); _stage = 1; return parse_type(state); } //------------------------------------------------------------------------------ Serial EventParser::parse_type(const State& state) { StackItem& top = _stack.back(); const TypeDesc* type_desc = state.types.lookup(top.uid); if (type_desc == nullptr) return Serial(Serial::PENDING); BufferStream& stream = state.stream; Serial serial(Serial::NO_SYNC); if (type_desc->has_serial) { uint32 low_serial = stream.read(); uint32 high_serial = stream.read(); serial = Serial((high_serial << 8) | low_serial); } uint32 event_size = type_desc->size; top.data = stream.read_ptr(event_size); top.serial = serial; if (type_desc->maybe_aux) return type_desc->important ? parse_important_aux(state) : parse_continue(state); _stage = -1; return serial; } //------------------------------------------------------------------------------ Serial EventParser::parse_well_known(const State& state, uint32 uid) { BufferStream& stream = state.stream; // AuxData if (uid == 1) { parse_aux(state); return parse_continue(state); } // AuxDataTerminal if (uid == 3) { _stage = -1; return Serial(_stack.back().serial); } // EnterScope if (uid == 4) return parse_continue(state); // LeaveScope if (uid == 5) return parse_continue(state); if (_protocol_version >= 7) { // EnterScope_T (protocol 7) if (uid == 6 || uid == 8) { /*const uint8* timestamp =*/ stream.read(7); return parse_continue(state); } // LeaveScope_T (protocol 7) if (uid == 7 || uid == 9) { /*const uint8* timestamp =*/ stream.read(7); return parse_continue(state); } } else { // EnterScope_T (protocol 6) if (uid == 8 || uid == 12) { /*const uint8* timestamp =*/ stream.read(7); return parse_continue(state); } } fatal("Unexpected uid"); return Serial(); } //------------------------------------------------------------------------------ void EventParser::parse_aux(const State& state) { uint32 low_size = state.stream.read(); uint32 high_size = state.stream.read(); uint32 size = (low_size | (high_size << 8)) >> 5; Event& top = _stack.back(); Aux& aux = top.aux.emplace_back(); aux.size = size; aux.index = low_size & 0x1f; aux.partial = 0; if (top.aux.size() > 1) { Aux& prev = *(top.aux.rbegin() + 1); prev.partial = (prev.index == aux.index); } BufferStream& stream = state.stream; uint32 remaining = stream.get_remaining(); if (remaining < size) fatal("aux size too large"); aux.data = stream.read_ptr(size); } //------------------------------------------------------------------------------ Serial EventParser::parse_important_aux(const State& state) { while (true) { uint32 uid = state.stream.read(); if (uid == 1) // AuxData { parse_aux(state); continue; } if (uid == 3) // AuxDataTerminal { _stage = -1; return Serial(Serial::NO_SYNC); } fatal("unsupported important sub-uid"); return Serial(); } } //------------------------------------------------------------------------------ class PacketNodePool { public: struct PacketNode { PacketNode* get_next() const { return (PacketNode*)next; } void set_next(PacketNode* n) { next = uintptr(n); } Buffer payload; uintptr compressed : 1; uintptr _unused : 15; uintptr next : 48; }; ~PacketNodePool(); PacketNode* alloc_pnode(); void free_pnode(PacketNode* node); private: PacketNode* _free_nodes = nullptr; }; //------------------------------------------------------------------------------ PacketNodePool::~PacketNodePool() { while (_free_nodes != nullptr) { PacketNode* next = _free_nodes->get_next(); delete _free_nodes; _free_nodes = next; } } //------------------------------------------------------------------------------ PacketNodePool::PacketNode* PacketNodePool::alloc_pnode() { if (_free_nodes == nullptr) return new PacketNode(); PacketNode* ret = _free_nodes; _free_nodes = ret->get_next(); return new (ret) PacketNode(); } //------------------------------------------------------------------------------ void PacketNodePool::free_pnode(PacketNode* node) { node->payload = Buffer(); node->set_next(_free_nodes); _free_nodes = node; } //------------------------------------------------------------------------------ class ParserPool { public: EventParser& get_parser(uint32 index); uint32 alloc_parser(); void free_parser(uint32 index); private: Vector _parsers; Vector _frees; }; //------------------------------------------------------------------------------ EventParser& ParserPool::get_parser(uint32 index) { return _parsers[index]; } //------------------------------------------------------------------------------ uint32 ParserPool::alloc_parser() { if (_frees.empty()) { _parsers.emplace_back(); return uint32(_parsers.size() - 1); } uint32 index = _frees.back(); _frees.pop_back(); return index; } //------------------------------------------------------------------------------ void ParserPool::free_parser(uint32 index) { _parsers[index] = EventParser(); _frees.push_back(index); } //------------------------------------------------------------------------------ class ProtocolImpl : public PacketNodePool , protected ParserPool , public NoCopy , public NoMove { public: explicit ProtocolImpl(uint8 protocol_version); void enable_unordered(); void read(EventParcel& parcel, Bundle& bundle); private: struct Thread { Serial serial; uint16 id; uint32 parser_index; PacketNode* head = nullptr; PacketNode* tail = nullptr; }; template bool read(EventParcel& parcel, Thread& thread); void scatter(EventParcel& parcel, Packet& packet); EventParser& get_parser(const Thread& thread); Types _types; Vector _threads; Serial _next_serial = Serial(0); bool _serialised = true; uint8 _protocol_version; }; //------------------------------------------------------------------------------ ProtocolImpl::ProtocolImpl(uint8 protocol_version) : _protocol_version(protocol_version) { Thread& thread = _threads.emplace_back(); thread.id = TID_IMPORTANT; thread.parser_index = alloc_parser(); get_parser(thread)._protocol_version = protocol_version; } //------------------------------------------------------------------------------ void ProtocolImpl::enable_unordered() { _serialised = false; } //------------------------------------------------------------------------------ void ProtocolImpl::read(EventParcel& parcel, Bundle& bundle) { // scatter packets to their respective threads for (Packet& packet : bundle) scatter(parcel, packet); // important read(parcel, _threads[0]); // read as many events as we can into the parcel for (uint32 i = 1, n = uint32(_threads.size()); i < n; ++i) { Thread& thread = _threads[i]; if (thread.serial.is_sync()) continue; if (!read(parcel, thread)) continue; thread = std::move(_threads.back()); _threads.pop_back(); --i, --n; } for (const bool do_gather = _serialised; do_gather;) { Serial prev_serial = _next_serial; for (uint32 i = 1, n = uint32(_threads.size()); i < n; ++i) { Thread& thread = _threads[i]; if (thread.serial != _next_serial) continue; ++_next_serial; Event event = get_parser(thread).consume(); event.thread_id = thread.id; event.serial = thread.serial.is_sync() ? thread.serial.get_value() : -1; parcel.events.push_back(std::move(event)); if (!read(parcel, thread)) continue; thread = std::move(_threads.back()); _threads.pop_back(); --i, --n; } if (prev_serial == _next_serial) break; } for (Thread& thread : _threads) get_parser(thread).pin(); } //------------------------------------------------------------------------------ void ProtocolImpl::scatter(EventParcel& parcel, Packet& packet) { uint32 thread_id = packet.get_thread_id(); if (thread_id == TID_SYNC) return; Buffer& payload = packet.get_payload(); if (thread_id == TID_TYPE) { packet.decompress(); _types.parse(payload, parcel.new_types); return; } PacketNode* node = alloc_pnode(); node->payload = std::move(payload); node->compressed = packet.is_compressed(); Thread* thread = (thread_id == TID_IMPORTANT) ? _threads.data() : nullptr; for (int32 i = 1, n = uint32(_threads.size()); i < n && !thread; ++i) if (Thread& lookup = _threads[i]; lookup.id == thread_id) thread = &lookup; if (thread == nullptr) { thread = &(_threads.emplace_back()); thread->id = uint16(thread_id); thread->parser_index = alloc_parser(); get_parser(*thread)._protocol_version = _protocol_version; } if (thread->tail != nullptr) { thread->tail->set_next(node); thread->tail = node; } else thread->head = thread->tail = node; } //------------------------------------------------------------------------------ template bool ProtocolImpl::read(EventParcel& parcel, Thread& thread) { EventParser& parser = get_parser(thread); PacketNode* node = thread.head; while (node != nullptr) { if (node->compressed) { Packet::decompress(node->payload); node->compressed = 0; } BufferRef buffer_ref = node->payload.create_ref(); parcel.buffer_refs.push_back(std::move(buffer_ref)); BufferStream stream = node->payload.create_stream(); while (true) { Serial serial = is_normal ? parser.parse_normal(stream, _types) : parser.parse_important(stream, _types); thread.serial = serial; if (!serial) break; if (int32 cond = is_normal && serial.is_sync(); cond) { if ((_serialised == true) & (serial != _next_serial)) break; ++_next_serial; thread.serial = Serial(Serial::NO_SYNC); } Event event = parser.consume(); event.thread_id = thread.id; event.serial = serial.is_sync() ? serial.get_value() : -1; parcel.events.push_back(std::move(event)); if (!stream.has_data()) break; } if (stream.has_data()) { Buffer& buffer = node->payload; node->payload = buffer.create_sub_buffer(stream.get_consumed()); thread.head = node; return false; } PacketNode* next = node->get_next(); free_pnode(node); if ((thread.head = node = next) == nullptr) thread.tail = nullptr; if (bool cond = is_normal; cond) if (thread.serial.is_sync()) return false; } if (bool cond = !is_normal; cond) return false; if (thread.serial == Serial::PENDING) return false; free_parser(thread.parser_index); thread.serial = Serial(); return true; } //------------------------------------------------------------------------------ EventParser& ProtocolImpl::get_parser(const Thread& thread) { return ParserPool::get_parser(thread.parser_index); } //------------------------------------------------------------------------------ void EventParcel::reset() { events.clear(); new_types.clear(); buffer_refs.clear(); } //------------------------------------------------------------------------------ Protocol::Protocol(uint8 version) { _impl = new ProtocolImpl(version); } //------------------------------------------------------------------------------ Protocol::~Protocol() { delete _impl; } //------------------------------------------------------------------------------ void Protocol::enable_unordered() { return _impl->enable_unordered(); } //------------------------------------------------------------------------------ void Protocol::read(EventParcel& parcel, Bundle& bundle) { return _impl->read(parcel, bundle); }