aboutsummaryrefslogtreecommitdiff
path: root/crates/whirl_server/src/distributor.rs
diff options
context:
space:
mode:
authorFuwn <[email protected]>2021-05-20 17:05:59 +0000
committerFuwn <[email protected]>2021-05-20 17:05:59 +0000
commit9dbc613765de8ab7dfa8e1374cf6661dcfd56bc8 (patch)
tree8cfff6a23bb72db2660e68c63a8cf9d0539a061f /crates/whirl_server/src/distributor.rs
parentfeat(readme): add sqlfluff as a dev dep (diff)
downloadwhirl-9dbc613765de8ab7dfa8e1374cf6661dcfd56bc8.tar.xz
whirl-9dbc613765de8ab7dfa8e1374cf6661dcfd56bc8.zip
refactor(global): move crates around, stricter module isolation
Diffstat (limited to 'crates/whirl_server/src/distributor.rs')
-rw-r--r--crates/whirl_server/src/distributor.rs148
1 files changed, 148 insertions, 0 deletions
diff --git a/crates/whirl_server/src/distributor.rs b/crates/whirl_server/src/distributor.rs
new file mode 100644
index 0000000..22b698b
--- /dev/null
+++ b/crates/whirl_server/src/distributor.rs
@@ -0,0 +1,148 @@
+// Copyleft (ɔ) 2021-2021 The Whirlsplash Collective
+// SPDX-License-Identifier: GPL-3.0-only
+
+//! The distributor functions as bare-minimal
+//! [AutoServer](http://dev.worlds.net/private/GammaDocs/WorldServer.html#AutoServer).
+//!
+//! It intercepts a client and distributes it to a
+//! [RoomServer](http://dev.worlds.net/private/GammaDocs/WorldServer.html#RoomServer).
+//!
+//! This is not meant to be a high performant section of code as the distributor
+//! is only meant to handle the initial and brief session initialization of the
+//! client.
+
+use std::{error::Error, net::SocketAddr, sync::Arc};
+
+use tokio::{io::AsyncWriteExt, net::TcpStream, sync::Mutex};
+use tokio_stream::StreamExt;
+use tokio_util::codec::{BytesCodec, Decoder};
+use whirl_config::Config;
+
+use crate::{
+ cmd::{
+ commands::{
+ action::create_action,
+ buddy_list::BuddyList,
+ property::{
+ create::{create_property_request_as_distributor, create_property_update_as_distributor},
+ parse::find_property_in_property_list,
+ },
+ redirect_id::RedirectId,
+ room_id_request::RoomIdRequest,
+ text::Text,
+ },
+ constants::*,
+ extendable::{Creatable, Parsable},
+ },
+ interaction::{peer::Peer, shared::Shared},
+ net::{constants::VAR_USERNAME, property_parser::parse_network_property},
+ packet_parser::parse_commands_from_packet,
+ Server,
+};
+
+pub struct Distributor;
+#[async_trait]
+impl Server for Distributor {
+ async fn handle(
+ state: Arc<Mutex<Shared>>,
+ stream: TcpStream,
+ _address: SocketAddr,
+ count: usize,
+ ) -> Result<(), Box<dyn Error>> {
+ let bytes = BytesCodec::new().framed(stream);
+ let mut peer = Peer::new(state.clone(), bytes, count.to_string()).await?;
+ let mut room_ids = vec![];
+ let mut username = String::from("unknown");
+
+ loop {
+ tokio::select! {
+ Some(msg) = peer.rx.recv() => {
+ peer.bytes.get_mut().write_all(&msg).await?;
+ }
+ result = peer.bytes.next() => match result {
+ Some(Ok(msg)) => {
+ for msg in parse_commands_from_packet(msg) {
+ match msg.get(2).unwrap().to_owned() as i32 {
+ PROPREQ => {
+ debug!("received property request from client");
+
+ peer.bytes.get_mut()
+ .write_all(&create_property_update_as_distributor()).await?;
+ trace!("sent property update to client");
+ }
+ SESSINIT => {
+ username = (&*find_property_in_property_list(
+ &parse_network_property(msg[3..].to_vec()),
+ VAR_USERNAME,
+ ).value).to_string();
+
+ debug!("received session initialization from {}", username);
+
+ peer.bytes.get_mut()
+ .write_all(&create_property_request_as_distributor()).await?;
+ trace!("sent property request to {}", username);
+ }
+ PROPSET => {
+ debug!("received property set from {}", username);
+
+ peer.bytes.get_mut()
+ .write_all(&Text {
+ sender: Config::get().whirlsplash.worldsmaster_username,
+ content: Config::get().distributor.worldsmaster_greeting,
+ }.create()).await?;
+ peer.bytes.get_mut()
+ .write_all(&create_action()).await?;
+ trace!("sent text to {}", username);
+ }
+ BUDDYLISTUPDATE => {
+ let buddy = BuddyList::parse(msg.to_vec());
+ debug!("received buddy list update from {}: {}", username, buddy.buddy);
+ peer.bytes.get_mut().write_all(&buddy.clone().create()).await?;
+ trace!("sent buddy list notify to {}: {}", username, buddy.buddy);
+ }
+ ROOMIDRQ => {
+ let room = RoomIdRequest::parse(msg.to_vec());
+ debug!("received room id request from {}: {}", username, &room.room_name);
+
+ let room_id;
+ if !room_ids.contains(&room.room_name) {
+ room_ids.push((&*room.room_name).to_string());
+ room_id = room_ids.iter().position(|r| r == &room.room_name).unwrap();
+ trace!("inserted room: {}", room.room_name);
+ } else {
+ let position = room_ids.iter().position(|r| r == &room.room_name).unwrap();
+ trace!("found room: {}", room.room_name);
+ room_id = position;
+ }
+
+ peer.bytes.get_mut().write_all(&RedirectId {
+ room_name: (&*room.room_name).to_string(),
+ room_number: room_id as i8,
+ }.create()).await?;
+ trace!("sent redirect id to {}: {}", username, room.room_name);
+ }
+ SESSEXIT => {
+ debug!("received session exit from {}", username); break;
+ }
+ _ => (),
+ }
+ }
+ }
+ Some(Err(e)) => {
+ error!("error while processing message (s): {}", e); break;
+ }
+ None => break,
+ }
+ }
+ }
+
+ // Deregister client
+ debug!("de-registering client");
+ {
+ state.lock().await.peers.remove(&count.to_string());
+ }
+ debug!("de-registered client");
+
+ Ok(())
+ }
+}