aboutsummaryrefslogtreecommitdiff
path: root/thirdparty/tourist/analysis/src/dispatcher.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'thirdparty/tourist/analysis/src/dispatcher.cpp')
-rw-r--r--thirdparty/tourist/analysis/src/dispatcher.cpp148
1 files changed, 148 insertions, 0 deletions
diff --git a/thirdparty/tourist/analysis/src/dispatcher.cpp b/thirdparty/tourist/analysis/src/dispatcher.cpp
new file mode 100644
index 000000000..e7ea4a37e
--- /dev/null
+++ b/thirdparty/tourist/analysis/src/dispatcher.cpp
@@ -0,0 +1,148 @@
+#include <analysis/dispatcher.h>
+#include <foundation/scheduler.h>
+#include <trace/trace.h>
+
+//------------------------------------------------------------------------------
+void Dispatcher::add_analyzer(Analyzer& analyzer)
+{
+ analyzer.subscribe(pending_subs);
+}
+
+//------------------------------------------------------------------------------
+void Dispatcher::on_new_type(const Type* type)
+{
+ auto [group, name] = type->get_name();
+ uint32 type_hash = Hash(group) * Hash(name);
+
+ for (Subscription& sub : pending_subs)
+ {
+ Outline* outline = sub.outline.get();
+ if (outline->hash != type_hash)
+ continue;
+
+ for (uint32 i = 0; i < type->get_field_count(); ++i)
+ {
+ auto [field_name, field] = type->get_field_info(i);
+ uint32 field_hash = Hash(field_name);
+
+ for (Outline::FieldBase* f = outline->fields(); f->hash; ++f)
+ {
+ if (f->hash != field_hash)
+ continue;
+
+ f->type_info = uint8(field.get_type_info());
+ f->offset = int16(field.get_offset());
+ f->set = 1;
+ f->index = i;
+ break;
+ }
+ }
+
+ uint32 uid = type->get_uid();
+ if (uid >= dispatchers.size())
+ {
+ uint32 new_size = (uid + 16) & ~15;
+ dispatchers.resize(new_size);
+ }
+
+ std::swap(sub, pending_subs.back());
+ dispatchers[uid] = std::move(pending_subs.back());
+ pending_subs.pop_back();
+ break;
+ }
+}
+
+//------------------------------------------------------------------------------
+void Dispatcher::on_parcel(const EventParcel& parcel)
+{
+ if (!pending_subs.empty())
+ for (const Type* type : parcel.new_types)
+ on_new_type(type);
+
+ if (dispatchers.empty())
+ return;
+
+ for (const Event& event : parcel.events)
+ {
+ uint32 uid = event.uid;
+ if (uid >= dispatchers.size())
+ continue;
+
+ const Subscription& sub = dispatchers[uid];
+ Outline* outline = sub.outline.get();
+ if (outline == nullptr)
+ continue;
+
+ outline->event = &event;
+ auto* analyzer = (Analyzer*)(sub.analyzer);
+ (analyzer->*(sub.sink))(*outline);
+ }
+}
+
+//------------------------------------------------------------------------------
+void Dispatcher::run(DataSource& data_source)
+{
+ Allocator allocator;
+
+ Preamble preamble(data_source, allocator);
+ Transport transport = preamble.get_transport();
+ Protocol protocol = preamble.get_protocol();
+
+#if 0
+ Scheduler scheduler({
+ .concurrency = 3
+ });
+
+ struct State {
+ Bundle bundle;
+ Packet packets[128];
+ EventParcel parcel;
+ };
+ State states[3];
+
+ Task transport_task;
+ Task protocol_task;
+ Task analysis_task;
+
+ auto analysis_entry = [&, index=uint32(0)] () mutable {
+ auto& parcel = states[index].parcel;
+ on_parcel(parcel);
+ };
+
+ auto protocol_entry = [&, index=uint32(0)] () mutable {
+ auto& parcel = states[index].parcel;
+ auto& bundle = states[index].bundle;
+ parcel.reset();
+ protocol.read(parcel, bundle);
+ };
+
+ auto transport_entry = [&, index=uint32(0)] () mutable {
+ auto& bundle = states[index].bundle;
+ auto& packets = states[index].packets;
+ bundle = transport.read_packets(packets);
+ };
+
+ while (true)
+ {
+ transport_task = scheduler.create("transport", transport_entry);
+ protocol_task = scheduler.create("protocol", protocol_entry);
+ analysis_task = scheduler.create("analysis", analysis_entry);
+
+ scheduler.start_after(protocol_task, transport_task);
+ scheduler.start_after(analysis_task, protocol_task);
+ scheduler.submit(transport_task);
+
+ scheduler.wait(analysis_task);
+ };
+
+#else
+ Packet packets[128];
+ EventParcel parcel;
+ while (Bundle bundle = transport.read_packets(packets))
+ {
+ parcel.reset();
+ protocol.read(parcel, bundle);
+ on_parcel(parcel);
+ }
+#endif // 0
+}