diff options
Diffstat (limited to 'thirdparty/tourist/analysis/src/dispatcher.cpp')
| -rw-r--r-- | thirdparty/tourist/analysis/src/dispatcher.cpp | 148 |
1 files changed, 148 insertions, 0 deletions
diff --git a/thirdparty/tourist/analysis/src/dispatcher.cpp b/thirdparty/tourist/analysis/src/dispatcher.cpp new file mode 100644 index 000000000..e7ea4a37e --- /dev/null +++ b/thirdparty/tourist/analysis/src/dispatcher.cpp @@ -0,0 +1,148 @@ +#include <analysis/dispatcher.h> +#include <foundation/scheduler.h> +#include <trace/trace.h> + +//------------------------------------------------------------------------------ +void Dispatcher::add_analyzer(Analyzer& analyzer) +{ + analyzer.subscribe(pending_subs); +} + +//------------------------------------------------------------------------------ +void Dispatcher::on_new_type(const Type* type) +{ + auto [group, name] = type->get_name(); + uint32 type_hash = Hash(group) * Hash(name); + + for (Subscription& sub : pending_subs) + { + Outline* outline = sub.outline.get(); + if (outline->hash != type_hash) + continue; + + for (uint32 i = 0; i < type->get_field_count(); ++i) + { + auto [field_name, field] = type->get_field_info(i); + uint32 field_hash = Hash(field_name); + + for (Outline::FieldBase* f = outline->fields(); f->hash; ++f) + { + if (f->hash != field_hash) + continue; + + f->type_info = uint8(field.get_type_info()); + f->offset = int16(field.get_offset()); + f->set = 1; + f->index = i; + break; + } + } + + uint32 uid = type->get_uid(); + if (uid >= dispatchers.size()) + { + uint32 new_size = (uid + 16) & ~15; + dispatchers.resize(new_size); + } + + std::swap(sub, pending_subs.back()); + dispatchers[uid] = std::move(pending_subs.back()); + pending_subs.pop_back(); + break; + } +} + +//------------------------------------------------------------------------------ +void Dispatcher::on_parcel(const EventParcel& parcel) +{ + if (!pending_subs.empty()) + for (const Type* type : parcel.new_types) + on_new_type(type); + + if (dispatchers.empty()) + return; + + for (const Event& event : parcel.events) + { + uint32 uid = event.uid; + if (uid >= dispatchers.size()) + continue; + + const Subscription& sub = dispatchers[uid]; + Outline* outline = sub.outline.get(); + if (outline == nullptr) + continue; + + outline->event = &event; + auto* analyzer = (Analyzer*)(sub.analyzer); + (analyzer->*(sub.sink))(*outline); + } +} + +//------------------------------------------------------------------------------ +void Dispatcher::run(DataSource& data_source) +{ + Allocator allocator; + + Preamble preamble(data_source, allocator); + Transport transport = preamble.get_transport(); + Protocol protocol = preamble.get_protocol(); + +#if 0 + Scheduler scheduler({ + .concurrency = 3 + }); + + struct State { + Bundle bundle; + Packet packets[128]; + EventParcel parcel; + }; + State states[3]; + + Task transport_task; + Task protocol_task; + Task analysis_task; + + auto analysis_entry = [&, index=uint32(0)] () mutable { + auto& parcel = states[index].parcel; + on_parcel(parcel); + }; + + auto protocol_entry = [&, index=uint32(0)] () mutable { + auto& parcel = states[index].parcel; + auto& bundle = states[index].bundle; + parcel.reset(); + protocol.read(parcel, bundle); + }; + + auto transport_entry = [&, index=uint32(0)] () mutable { + auto& bundle = states[index].bundle; + auto& packets = states[index].packets; + bundle = transport.read_packets(packets); + }; + + while (true) + { + transport_task = scheduler.create("transport", transport_entry); + protocol_task = scheduler.create("protocol", protocol_entry); + analysis_task = scheduler.create("analysis", analysis_entry); + + scheduler.start_after(protocol_task, transport_task); + scheduler.start_after(analysis_task, protocol_task); + scheduler.submit(transport_task); + + scheduler.wait(analysis_task); + }; + +#else + Packet packets[128]; + EventParcel parcel; + while (Bundle bundle = transport.read_packets(packets)) + { + parcel.reset(); + protocol.read(parcel, bundle); + on_parcel(parcel); + } +#endif // 0 +} |