#include #include #include #include //------------------------------------------------------------------------------ void Dispatcher::add_analyzer(Analyzer& analyzer) { analyzer.subscribe(pending_subs); } //------------------------------------------------------------------------------ void Dispatcher::on_new_type(const Type* type) { auto [group, name] = type->get_name(); uint32 type_hash = Hash(group) * Hash(name); for (Subscription& sub : pending_subs) { Outline* outline = sub.outline.get(); if (outline->hash != type_hash) continue; // Sum of non-array field sizes — the byte length of event.data that // scalar field reads index into. uint32 event_size = 0; for (uint32 i = 0, n = type->get_field_count(); i < n; ++i) event_size += type->get_field(i).get_size(); for (uint32 i = 0; i < type->get_field_count(); ++i) { auto [field_name, field] = type->get_field_info(i); uint32 field_hash = Hash(field_name); for (Outline::FieldBase* f = outline->fields(); f->hash; ++f) { if (f->hash != field_hash) continue; uint32 type_info = field.get_type_info(); uint32 offset = field.get_offset(); // Scalar fields are read via event.data + offset. Validate the // offset and element size against event_size so a malicious // utrace can't make Outline::Field memcpy past the event // payload. Array/string fields are read from aux and ignore // offset. if ((type_info & TYPE_INFO_CAT_ARRAY) == 0) { uint32 element_size = 1u << (type_info & TYPE_INFO_SIZE_MASK); if (offset > event_size || element_size > event_size - offset) break; } f->type_info = uint8(type_info); f->offset = int16(offset); f->set = 1; f->index = i; break; } } uint32 uid = type->get_uid(); if (uid >= dispatchers.size()) { uint32 new_size = (uid + 16) & ~15; dispatchers.resize(new_size); } std::swap(sub, pending_subs.back()); dispatchers[uid] = std::move(pending_subs.back()); pending_subs.pop_back(); break; } } //------------------------------------------------------------------------------ void Dispatcher::on_parcel(const EventParcel& parcel) { if (!pending_subs.empty()) for (const Type* type : parcel.new_types) on_new_type(type); if (dispatchers.empty()) return; for (const Event& event : parcel.events) { uint32 uid = event.uid; if (uid >= dispatchers.size()) continue; const Subscription& sub = dispatchers[uid]; Outline* outline = sub.outline.get(); if (outline == nullptr) continue; outline->event = &event; auto* analyzer = (Analyzer*)(sub.analyzer); (analyzer->*(sub.sink))(*outline); } } //------------------------------------------------------------------------------ void Dispatcher::run(DataSource& data_source) { Allocator allocator; Preamble preamble(data_source, allocator); Transport transport = preamble.get_transport(); Protocol protocol = preamble.get_protocol(); #if 0 Scheduler scheduler({ .concurrency = 3 }); struct State { Bundle bundle; Packet packets[128]; EventParcel parcel; }; State states[3]; Task transport_task; Task protocol_task; Task analysis_task; auto analysis_entry = [&, index=uint32(0)] () mutable { auto& parcel = states[index].parcel; on_parcel(parcel); }; auto protocol_entry = [&, index=uint32(0)] () mutable { auto& parcel = states[index].parcel; auto& bundle = states[index].bundle; parcel.reset(); protocol.read(parcel, bundle); }; auto transport_entry = [&, index=uint32(0)] () mutable { auto& bundle = states[index].bundle; auto& packets = states[index].packets; bundle = transport.read_packets(packets); }; while (true) { transport_task = scheduler.create("transport", transport_entry); protocol_task = scheduler.create("protocol", protocol_entry); analysis_task = scheduler.create("analysis", analysis_entry); scheduler.start_after(protocol_task, transport_task); scheduler.start_after(analysis_task, protocol_task); scheduler.submit(transport_task); scheduler.wait(analysis_task); }; #else Packet packets[128]; EventParcel parcel; while (Bundle bundle = transport.read_packets(packets)) { parcel.reset(); protocol.read(parcel, bundle); on_parcel(parcel); } #endif // 0 }