diff options
| author | Fuwn <[email protected]> | 2021-04-23 18:27:51 +0000 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2021-04-23 18:27:51 +0000 |
| commit | 70e6a577025651639c745f029fbeb36922438939 (patch) | |
| tree | 354756327afab7744392752db12f421a0040e4f3 /src/server/auto/server.rs | |
| parent | Merge branch 'tokio-re' of https://github.com/Whirlsplash/whirl into tokio-re (diff) | |
| download | whirl-70e6a577025651639c745f029fbeb36922438939.tar.xz whirl-70e6a577025651639c745f029fbeb36922438939.zip | |
major: :star:
Diffstat (limited to 'src/server/auto/server.rs')
| -rw-r--r-- | src/server/auto/server.rs | 481 |
1 files changed, 316 insertions, 165 deletions
diff --git a/src/server/auto/server.rs b/src/server/auto/server.rs index b8c23e1..164b271 100644 --- a/src/server/auto/server.rs +++ b/src/server/auto/server.rs @@ -1,168 +1,319 @@ -use std::error::Error; -use tokio::net::{TcpListener, TcpStream}; -use tokio::io::AsyncWriteExt; -use crate::server::auto::cmd::property::{ - create_property_update_command, - create_property_request_command +use std::{ + collections::HashMap, + error::Error, + io::{ErrorKind, Read, Write}, + ops::Deref, + str::from_utf8, }; -use tokio_util::codec::{BytesCodec, Decoder}; -use tokio_stream::StreamExt; -use crate::server::cmd::text::{create_text_command_with_action, create_text_command}; -use std::str::from_utf8; -use crate::server::cmd::buddy_list::create_buddy_list_notify_command; -use crate::server::auto::cmd::room::create_room_id_redirect_command; -use std::sync::Arc; -use tokio::sync::Mutex; -use crate::server::shared::Shared; -use crate::server::peer::Peer; -use std::net::SocketAddr; -use crate::server::auto::cmd::session::parse_session_initialization_command; -use crate::server::parser::get_commands_from_buffer; -use crate::server::cmd::property::parse_property_set_command; -use crate::config::get_config; - -pub struct AutoServer; + +use bytes::BytesMut; +use mio::{ + event::Event, + net::{TcpListener, TcpStream}, + Events, + Interest, + Poll, + Registry, + Token, +}; + +use crate::{ + config::get_config, + server::{ + auto::cmd::{ + property::{create_property_request_command, create_property_update_command}, + room::create_room_id_redirect_command, + session::parse_session_initialization_command, + }, + cmd::{ + action::create_action_command, + buddy_list::create_buddy_list_notify_command, + property::parse_property_set_command, + text::{create_text_command, create_text_command_with_action}, + }, + parser::get_commands_from_buffer, + }, +}; + +const SERVER: Token = Token(0); + +pub struct AutoServer { + pub clients: HashMap<Token, String>, + pub connections: HashMap<Token, TcpStream>, + pub room_ids: Vec<String>, +} impl AutoServer { - pub async fn listen(addr: &str) -> Result<(), Box<dyn Error>> { - let listener = TcpListener::bind(addr).await?; - debug!("AutoServer now listening on {}", listener.local_addr().unwrap()); - let state = Arc::new(Mutex::new(Shared::new())); - let mut counter = 0; - - loop { - let (stream, address) = listener.accept().await?; - counter += 1; - let state = Arc::clone(&state); - - tokio::spawn(async move { - if let Err(e) = AutoServer::handle( - state, - stream, - address, - counter - ).await { - error!("an error occurred: {}", e); - } - }); - } - } - - pub async fn handle( - state: Arc<Mutex<Shared>>, - stream: TcpStream, - address: SocketAddr, - count: usize, - ) -> Result<(), Box<dyn Error>> { - let bytes = BytesCodec::new().framed(stream); - let mut peer = Peer::new(state.clone(), bytes, count.to_string()).await?; - debug!("registered peer with address '{}' as '{}'", address, count); - let mut room_ids: Vec<String> = Vec::new(); - let mut username: String = String::new(); - - loop { - tokio::select! { - Some(msg) = peer.rx.recv() => { - // debug!("received bytes from peer: {:?}", &msg); - peer.bytes.get_mut().write_all(&msg).await?; - } - result = peer.bytes.next() => match result { - Some(Ok(msg)) => { - // let msg: BytesMut = msg; - for msg in get_commands_from_buffer(msg) { - match msg.get(2).unwrap() { - 10 => { // PROPREQ - debug!("received property request command from client"); - peer.bytes.get_mut() - .write_all(&create_property_update_command()).await?; - debug!("sent property update command to client"); - } - 6 => { // SESSINIT - username = parse_session_initialization_command(msg.clone()).username; - debug!( - "received session initialization command from client: {}", - username - ); - peer.bytes.get_mut() - .write_all(&create_property_request_command()).await?; - debug!("sent session initialization command to client"); - } - 15 => { // PROPSET - let avatar = parse_property_set_command(msg.clone()); - debug!("received property set command from client: {}", avatar); - peer.bytes.get_mut() - .write_all(&create_text_command_with_action( - "WORLDSMASTER", &get_config()?.worldsmaster_greeting - )).await?; - debug!("sent worldsmaster greeting to client"); - } - 29 => { // BUDDYLISTUPDATE - let received_buddy = from_utf8( - msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap() - ).unwrap(); - debug!( - "received buddy list update command from client: {}", - received_buddy - ); - peer.bytes.get_mut() - .write_all(&create_buddy_list_notify_command(received_buddy)) - .await?; - debug!("sent buddy list notify command to client: {}", received_buddy); - } - 20 => { // ROOMIDRQ - let room_name = from_utf8( - msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap() - ).unwrap(); - debug!("received room id request command from client: {}", room_name); - let room_id; - if !room_ids.contains(&room_name.to_string()) { - room_ids.push(room_name.to_string()); - room_id = room_ids.iter() - .position(|i| i == &room_name.to_string()) - .unwrap(); - trace!("inserted room '{}' as '{}'", room_name, room_id); - } else { - let position = room_ids.iter() - .position(|i| i == &room_name.to_string()) - .unwrap(); - trace!("found room '{}' as '{}'", room_name, position); - room_id = position; - } - trace!("room name: {}, room id: {}", room_name, room_id); - trace!("{:?}", room_ids); - peer.bytes.get_mut() - .write_all(&create_room_id_redirect_command(room_name, room_id)) - .await?; - debug!("sent redirect id command to client: {} == {}", room_name, room_id); - } - 14 => { - let text = from_utf8( - msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap() - ).unwrap(); - debug!("received text command from client: {}", text); - let mut state = state.lock().await; - state.broadcast(&create_text_command(&username, text)).await; - debug!("broadcasted text command from client"); - } - 7 => { // SESSEXIT - debug!("received session exit command from client") - } - _ => (), - } - } - } - Some(Err(e)) => { - error!("error while processing messages: {}", e); break; - } - None => break, - } - } - } - - { // De-register client - state.lock().await.peers.remove(&count.to_string()); - debug!("removed peer: {}", count) - } - - Ok(()) - } + pub fn listen(&mut self, addr: &str) -> Result<(), Box<dyn Error>> { + let mut server = TcpListener::bind(addr.parse().unwrap())?; + let mut poll = Poll::new()?; + let mut events = Events::with_capacity(1024); + let mut unique_token = Token(SERVER.0 + 1); + + poll + .registry() + .register(&mut server, SERVER, Interest::READABLE)?; + + debug!( + "AutoServer now listening on {}", + server.local_addr().unwrap() + ); + + loop { + poll.poll(&mut events, None)?; + + for event in &events { + match event.token() { + SERVER => { + loop { + let (mut stream, address) = match server.accept() { + Ok((stream, address)) => (stream, address), + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break, + Err(e) => return Err(Box::new(e)), + }; + + let token = AutoServer::next(&mut unique_token); + poll.registry().register( + &mut stream, + token, + Interest::READABLE, //.add(Interest::WRITABLE), + )?; + + self.connections.insert(token, stream); + + debug!( + "registered peer with address '{}' as '{}'", + address, token.0 + ); + } + } + token => { + let done = self.process(poll.registry(), event, token)?; + if done { + self.connections.remove(&token); + } + } + } + } + } + } + + fn next(current: &mut Token) -> Token { + let next = current.0; + current.0 += 1; + Token(next) + } + + fn broadcast(mut self, cmd: &[u8]) -> () { + self + .connections + .iter_mut() + .for_each(|c| c.1.write_all(cmd).unwrap()); + } + + fn process( + &mut self, + _registry: &Registry, + event: &Event, + token: Token, + ) -> Result<bool, Box<dyn Error>> { + if event.is_readable() { + let mut connection_closed = false; + let mut received_data = vec![0; 4096]; + let mut bytes_read = 0; + + let stream = self.connections.get_mut(&token).unwrap(); + + loop { + match stream.read(&mut received_data[bytes_read..]) { + Ok(0) => { + connection_closed = true; + break; + } + Ok(n) => { + bytes_read += n; + if bytes_read == received_data.len() { + received_data.resize(received_data.len() + 1024, 0); + } + } + Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + Err(ref err) if err.kind() == ErrorKind::Interrupted => continue, + Err(err) => return Err(Box::new(err)), + } + } + + if bytes_read != 0 { + self.handle(&mut received_data[..bytes_read], token); + } + if connection_closed { + println!("de-registered peer with token '{}'", token.0); + return Ok(true); + } + } + + Ok(false) + } + + fn handle(&mut self, data: &[u8], token: Token) -> () { + // trace!("i am client: {:?}", self.clients.get(&token)); + // debug!("{:?}", self.connections); + for cmd in get_commands_from_buffer(BytesMut::from(data)) { + debug!("received: {:?}", cmd); + match cmd.get(2).unwrap() { + 10 => { + // PROPREQ + debug!("received property request command from client 'null'"); + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_property_update_command()) + .unwrap(); + debug!("sent property update command to client 'null'"); + } + 6 => { + // SESSINIT + let local_username = parse_session_initialization_command(cmd).username; + self.clients.insert(token, local_username.clone()); + debug!( + "received session initialization command from client '{}'", + local_username, + ); + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_property_request_command()) + .unwrap(); + debug!( + "sent session initialization command to client '{}'", + local_username + ); + } + 15 => { + // PROPSET + let avatar = parse_property_set_command(cmd); + debug!( + "received property set command from client '{}': {}", + self.clients.get(&token).unwrap(), + avatar, + ); + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_text_command( + "WORLDSMASTER", + &get_config().unwrap().worldsmaster_greeting, + )) + .unwrap(); + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_action_command()) + .unwrap(); + debug!( + "sent session initialization command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + 29 => { + // BUDDYLISTUPDATE + let received_buddy = from_utf8( + cmd + .get(4..cmd.get(0).unwrap().to_owned() as usize - 1) + .unwrap(), + ) + .unwrap(); + debug!( + "received buddy list update command from client '{}': {}", + self.clients.get(&token).unwrap(), + received_buddy, + ); + let buddies = vec![ + "dosfox", + "Fallen_Angel", + "Internet_Man", + "Nexialist", + "SirGemini", + "SirGrandpa", + "Wirlaburla", + ]; + if buddies.contains(&received_buddy) { + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_buddy_list_notify_command(received_buddy)) + .unwrap(); + debug!( + "sent buddy list notify command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + } + 20 => { + // ROOMIDRQ + let room_name = + from_utf8(cmd.get(4..cmd.get(0).unwrap().to_owned() as usize).unwrap()).unwrap(); + debug!( + "received room id request command from client '{}': {}", + self.clients.get(&token).unwrap(), + room_name, + ); + let room_id; + if !self.room_ids.contains(&room_name.to_string()) { + self.room_ids.push(room_name.to_string()); + room_id = self + .room_ids + .iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("inserted room '{}' as '{}'", room_name, room_id); + } else { + let position = self + .room_ids + .iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("found room '{}' as '{}'", room_name, position); + room_id = position; + } + trace!("room name: {}, room id: {}", room_name, room_id); + trace!("{:?}", self.room_ids); + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_room_id_redirect_command(room_name, room_id)) + .unwrap(); + } + 14 => { + // TEXT + let text = + from_utf8(cmd.get(6..cmd.get(0).unwrap().to_owned() as usize).unwrap()).unwrap(); + let username = self.clients.get(&token).unwrap().clone(); + debug!( + "received text command from client '{}': {}", + username, + format!("auto {}", text), + ); + self.connections.iter_mut().for_each(|t| { + t.1 + .write_all(&create_text_command(&username, text)) + .unwrap() + }); + debug!("broadcasted text command to clients"); + } + 7 => { + // SESSEXIT + debug!( + "received session exit command from client '{}'", + self.clients.get(&token).unwrap(), + ); + } + _ => (), + } + } + } } |