diff options
| author | Fuwn <[email protected]> | 2021-04-23 18:27:51 +0000 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2021-04-23 18:27:51 +0000 |
| commit | 70e6a577025651639c745f029fbeb36922438939 (patch) | |
| tree | 354756327afab7744392752db12f421a0040e4f3 /src/server | |
| parent | Merge branch 'tokio-re' of https://github.com/Whirlsplash/whirl into tokio-re (diff) | |
| download | whirl-70e6a577025651639c745f029fbeb36922438939.tar.xz whirl-70e6a577025651639c745f029fbeb36922438939.zip | |
major: :star:
Diffstat (limited to 'src/server')
| -rw-r--r-- | src/server/auto/____server.rs | 384 | ||||
| -rw-r--r-- | src/server/auto/___server.rs | 266 | ||||
| -rw-r--r-- | src/server/auto/__server.rs | 230 | ||||
| -rw-r--r-- | src/server/auto/_server.rs | 182 | ||||
| -rw-r--r-- | src/server/auto/cmd/property.rs | 51 | ||||
| -rw-r--r-- | src/server/auto/cmd/room.rs | 53 | ||||
| -rw-r--r-- | src/server/auto/cmd/session.rs | 50 | ||||
| -rw-r--r-- | src/server/auto/mod.rs | 2 | ||||
| -rw-r--r-- | src/server/auto/server.rs | 481 | ||||
| -rw-r--r-- | src/server/cmd/action.rs | 6 | ||||
| -rw-r--r-- | src/server/cmd/buddy_list.rs | 20 | ||||
| -rw-r--r-- | src/server/cmd/mod.rs | 1 | ||||
| -rw-r--r-- | src/server/cmd/property.rs | 5 | ||||
| -rw-r--r-- | src/server/cmd/session.rs | 8 | ||||
| -rw-r--r-- | src/server/cmd/text.rs | 71 | ||||
| -rw-r--r-- | src/server/mod.rs | 9 | ||||
| -rw-r--r-- | src/server/parser.rs | 42 | ||||
| -rw-r--r-- | src/server/peer.rs | 26 | ||||
| -rw-r--r-- | src/server/room/__server.rs | 382 | ||||
| -rw-r--r-- | src/server/room/_server.rs | 168 | ||||
| -rw-r--r-- | src/server/room/cmd/property.rs | 74 | ||||
| -rw-r--r-- | src/server/room/cmd/session.rs | 32 | ||||
| -rw-r--r-- | src/server/room/server.rs | 580 | ||||
| -rw-r--r-- | src/server/shared.rs | 40 |
24 files changed, 2558 insertions, 605 deletions
diff --git a/src/server/auto/____server.rs b/src/server/auto/____server.rs new file mode 100644 index 0000000..586bc16 --- /dev/null +++ b/src/server/auto/____server.rs @@ -0,0 +1,384 @@ +use std::error::Error; +use crate::server::auto::cmd::property::{ + create_property_update_command, + create_property_request_command +}; +use crate::server::cmd::text::{create_text_command_with_action, create_text_command}; +use std::str::from_utf8; +use crate::server::cmd::buddy_list::create_buddy_list_notify_command; +use crate::server::auto::cmd::room::create_room_id_redirect_command; +use crate::server::auto::cmd::session::parse_session_initialization_command; +use crate::server::parser::get_commands_from_buffer; +use crate::server::cmd::property::parse_property_set_command; +use crate::config::get_config; +use mio::{Poll, Events, Token, Interest, Registry}; +use mio::net::{TcpListener, TcpStream}; +use std::collections::{HashMap, HashSet}; +use mio::event::Event; +use std::io::{Read, ErrorKind, Write}; +use bytes::BytesMut; + +const SERVER: Token = Token(0); + +pub struct AutoServer { + pub clients: HashMap<Token, String>, + pub connections: HashMap<Token, TcpStream>, + pub room_ids: Vec<String>, +} +impl AutoServer { + pub fn listen(&mut self, addr: &str) -> Result<(), Box<dyn Error>> { + let mut listener = TcpListener::bind(addr.parse().unwrap())?; + let mut poll = Poll::new()?; + let mut events = Events::with_capacity(1024); + let mut counter: usize = 0; + // let mut sockets = HashMap::new(); + let mut requests = HashMap::new(); + let mut buffer = [0 as u8; 1024]; + // let mut room_ids = vec![]; + + poll.registry().register( + &mut listener, + Token(0), + Interest::READABLE, + )?; + + debug!("AutoServer now listening on {}", listener.local_addr().unwrap()); + + loop { + poll.poll(&mut events, None)?; + + for event in &events { + match event.token() { + Token(0) => loop { + match listener.accept() { + Ok((mut stream, address)) => { + counter += 1; + let token = Token(counter); + + poll.registry().register( + &mut stream, + token, + Interest::READABLE, + )?; + + debug!("registered peer with address '{}' as '{}'", address, token.0); + + // sockets.insert(token, stream); + self.connections.insert(token, stream); + requests.insert(token, Vec::with_capacity(192)); + } + Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + Err(err) => { + error!("unexpected error: {}", err); + poll.registry().deregister( + self.connections.get_mut(&Token(counter)).unwrap(), + )?; + break; + } + } + }, + token if event.is_readable() => { + loop { + let read = self.connections.get_mut(&token).unwrap() + .read(&mut buffer); + match read { + Ok(0) => { self.connections.remove(&token); break; } + Ok(n) => { + let req = requests.get_mut(&token).unwrap(); + for b in &buffer[0..n] { req.push(*b); } + + for cmd in get_commands_from_buffer(BytesMut::from(&buffer[..n])) { + match cmd.get(2).unwrap() { + 10 => { // PROPREQ + debug!("received property request command from client 'null'"); + self.connections.get_mut(&token).unwrap() + .write_all(&create_property_update_command()).unwrap(); + debug!("sent property update command to client 'null'"); + } + 6 => { // SESSINIT + let local_username = + parse_session_initialization_command(cmd).username; + self.clients.insert(token, local_username.clone()); + debug!( + "received session initialization command from client '{}'", + local_username, + ); + self.connections.get_mut(&token).unwrap() + .write_all(&create_property_request_command()).unwrap(); + debug!("sent session initialization command to client '{}'", local_username); + } + 15 => { // PROPSET + let avatar = parse_property_set_command(cmd); + debug!( + "received property set command from client '{}': {}", + self.clients.get(&token).unwrap(), + avatar, + ); + self.connections.get_mut(&token).unwrap() + .write_all(&create_text_command_with_action( + "WORLDSMASTER", &get_config().unwrap().worldsmaster_greeting, + )).unwrap(); + debug!( + "sent session initialization command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + 29 => { // BUDDYLISTUPDATE + let received_buddy = from_utf8( + cmd.get(4..cmd.get(0).unwrap().to_owned() as usize - 1).unwrap() + ).unwrap(); + debug!( + "received buddy list update command from client '{}': {}", + self.clients.get(&token).unwrap(), + received_buddy, + ); + self.connections.get_mut(&token).unwrap() + .write_all(&create_buddy_list_notify_command(received_buddy)).unwrap(); + debug!( + "sent buddy list notify command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + 20 => { // ROOMIDRQ + let room_name = from_utf8( + cmd.get(4..cmd.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!( + "received room id request command from client '{}': {}", + self.clients.get(&token).unwrap(), + room_name, + ); + let room_id; + if !self.room_ids.contains(&room_name.to_string()) { + self.room_ids.push(room_name.to_string()); + room_id = self.room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("inserted room '{}' as '{}'", room_name, room_id); + } else { + let position = self.room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("found room '{}' as '{}'", room_name, position); + room_id = position; + } + trace!("room name: {}, room id: {}", room_name, room_id); + trace!("{:?}", self.room_ids); + self.connections.get_mut(&token).unwrap() + .write_all(&create_room_id_redirect_command( + room_name, room_id, + )).unwrap(); + } + 14 => { // TEXT + let text = from_utf8( + cmd.get(6..cmd.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + let username = self.clients.get(&token).unwrap().clone(); + debug!( + "received text command from client '{}': {}", + username, text, + ); + self.connections.iter_mut().for_each(|t| + t.1.write_all(&create_text_command( + &username, + text, + )).unwrap() + ); + debug!("broadcasted text command to clients"); + } + 7 => { // SESSEXIT + debug!( + "received session exit command from client '{}'", + self.clients.get(&token).unwrap(), + ); + } + _ => (), + } + } + } + Err(ref err) if err.kind() == ErrorKind::WouldBlock => + break, + Err(err) => { error!("unexpected error: {}", err); break; } + } + } + } + _ => (), + } + } + } + } + + fn broadcast( + sockets: &HashMap<Token, TcpStream>, + cmd: &[u8], + ) -> () { + for mut socket in sockets { + socket.1.write_all(cmd).unwrap(); + } + } + + // fn process( + // &mut self, + // _registry: &Registry, + // event: &Event, + // token: Token, + // ) -> Result<bool, Box<dyn Error>> { + // if event.is_readable() { + // let mut connection_closed = false; + // let mut received_data = vec![0; 4096]; + // let mut bytes_read = 0; + // + // let stream = self.connections.get_mut(&token).unwrap(); + // + // loop { + // match stream.read(&mut received_data[bytes_read..]) { + // Ok(0) => { + // connection_closed = true; + // break; + // } + // Ok(n) => { + // bytes_read += n; + // if bytes_read == received_data.len() { + // received_data.resize(received_data.len() + 1024, 0); + // } + // } + // Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + // Err(ref err) if err.kind() == ErrorKind::Interrupted => continue, + // Err(err) => return Err(Box::new(err)), + // } + // } + // + // if bytes_read != 0 { + // self.handle( + // &mut received_data[..bytes_read], + // token, + // ); + // } + // if connection_closed { + // println!("de-registered peer with token '{}'", token.0); + // return Ok(true); + // } + // } + // + // Ok(false) + // } + + // fn handle( + // &mut self, + // data: &[u8], + // // stream: &mut TcpStream, + // token: Token, + // ) -> () { + // // trace!("i am client: {:?}", self.clients.get(&token)); + // // debug!("{:?}", self.connections); + // for cmd in get_commands_from_buffer(BytesMut::from(data)) { + // debug!("received: {:?}", cmd); + // match cmd.get(2).unwrap() { + // 10 => { // PROPREQ + // debug!("received property request command from client 'null'"); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_property_update_command()).unwrap(); + // debug!("sent property update command to client 'null'"); + // } + // 6 => { // SESSINIT + // let local_username = + // parse_session_initialization_command(cmd).username; + // self.clients.insert(token, local_username.clone()); + // debug!( + // "received session initialization command from client '{}'", + // local_username, + // ); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_property_request_command()).unwrap(); + // debug!("sent session initialization command to client '{}'", local_username); + // } + // 15 => { // PROPSET + // let avatar = parse_property_set_command(cmd); + // debug!( + // "received property set command from client '{}': {}", + // self.clients.get(&token).unwrap(), + // avatar, + // ); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_text_command_with_action( + // "WORLDSMASTER", &get_config().unwrap().worldsmaster_greeting, + // )).unwrap(); + // debug!( + // "sent session initialization command to client '{}'", + // self.clients.get(&token).unwrap(), + // ); + // } + // 29 => { // BUDDYLISTUPDATE + // let received_buddy = from_utf8( + // cmd.get(4..cmd.get(0).unwrap().to_owned() as usize - 1).unwrap() + // ).unwrap(); + // debug!( + // "received buddy list update command from client '{}': {}", + // self.clients.get(&token).unwrap(), + // received_buddy, + // ); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_buddy_list_notify_command(received_buddy)).unwrap(); + // debug!( + // "sent buddy list notify command to client '{}'", + // self.clients.get(&token).unwrap(), + // ); + // } + // 20 => { // ROOMIDRQ + // let room_name = from_utf8( + // cmd.get(4..cmd.get(0).unwrap().to_owned() as usize).unwrap() + // ).unwrap(); + // debug!( + // "received room id request command from client '{}': {}", + // self.clients.get(&token).unwrap(), + // room_name, + // ); + // let room_id; + // if !self.room_ids.contains(&room_name.to_string()) { + // self.room_ids.push(room_name.to_string()); + // room_id = self.room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("inserted room '{}' as '{}'", room_name, room_id); + // } else { + // let position = self.room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("found room '{}' as '{}'", room_name, position); + // room_id = position; + // } + // trace!("room name: {}, room id: {}", room_name, room_id); + // trace!("{:?}", self.room_ids); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_room_id_redirect_command( + // room_name, room_id, + // )).unwrap(); + // } + // 14 => { // TEXT + // let text = from_utf8( + // cmd.get(6..cmd.get(0).unwrap().to_owned() as usize).unwrap() + // ).unwrap(); + // let username = self.clients.get(&token).unwrap().clone(); + // debug!( + // "received text command from client '{}': {}", + // username, text, + // ); + // self.connections.iter_mut().for_each(|t| + // t.1.write_all(&create_text_command( + // &username, + // text, + // )).unwrap() + // ); + // debug!("broadcasted text command to clients"); + // } + // 7 => { // SESSEXIT + // debug!( + // "received session exit command from client '{}'", + // self.clients.get(&token).unwrap(), + // ); + // } + // _ => (), + // } + // } + // } +} diff --git a/src/server/auto/___server.rs b/src/server/auto/___server.rs new file mode 100644 index 0000000..2f95952 --- /dev/null +++ b/src/server/auto/___server.rs @@ -0,0 +1,266 @@ +use std::error::Error; +use crate::server::auto::cmd::property::{ + create_property_update_command, + create_property_request_command +}; +use crate::server::cmd::text::{create_text_command_with_action, create_text_command}; +use std::str::from_utf8; +use crate::server::cmd::buddy_list::create_buddy_list_notify_command; +use crate::server::auto::cmd::room::create_room_id_redirect_command; +use crate::server::auto::cmd::session::parse_session_initialization_command; +use crate::server::parser::get_commands_from_buffer; +use crate::server::cmd::property::parse_property_set_command; +use crate::config::get_config; +use mio::{Poll, Events, Token, Interest, Registry}; +use mio::net::{TcpListener, TcpStream}; +use std::collections::HashMap; +use mio::event::Event; +use std::io::{Read, ErrorKind, Write}; +use bytes::BytesMut; + +const SERVER: Token = Token(0); + +pub struct AutoServer { + pub clients: HashMap<Token, String>, + pub connections: HashMap<Token, TcpStream>, + pub room_ids: Vec<String>, +} +impl AutoServer { + pub fn listen(&mut self, addr: &str) -> Result<(), Box<dyn Error>> { + let mut server = TcpListener::bind(addr.parse().unwrap())?; + let mut poll = Poll::new()?; + let mut events = Events::with_capacity(1024); + let mut unique_token = Token(SERVER.0 + 1); + + poll.registry().register( + &mut server, + SERVER, + Interest::READABLE + )?; + + debug!("AutoServer now listening on {}", server.local_addr().unwrap()); + + loop { + poll.poll(&mut events, None)?; + + for event in &events { + match event.token() { + SERVER => loop { + let (mut stream, address) = match server.accept() { + Ok((stream, address)) => (stream, address), + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break, + Err(e) => return Err(Box::new(e)), + }; + + let token = AutoServer::next(&mut unique_token); + poll.registry().register( + &mut stream, + token, + Interest::READABLE, //.add(Interest::WRITABLE), + )?; + + self.connections.insert(token, stream); + + debug!("registered peer with address '{}' as '{}'", address, token.0); + }, + token => { + // let done = if let Some(stream) = self.connections.get_mut(&token) { + // self.process( + // poll.registry(), + // event, + // token, + // )? + // } else { + // false + // }; + + let done = self.process( + poll.registry(), + event, + token, + )?; + if done { self.connections.remove(&token); } + } + } + } + } + } + + fn next(current: &mut Token) -> Token { + let next = current.0; + current.0 += 1; + Token(next) + } + + fn broadcast(mut self, cmd: &[u8]) -> () { + self.connections.iter_mut() + .for_each(|c| + c.1.write_all(cmd).unwrap() + ); + } + + fn process( + &mut self, + _registry: &Registry, + event: &Event, + token: Token, + ) -> Result<bool, Box<dyn Error>> { + if event.is_readable() { + let mut connection_closed = false; + let mut received_data = vec![0; 4096]; + let mut bytes_read = 0; + + let stream = self.connections.get_mut(&token).unwrap(); + + loop { + match stream.read(&mut received_data[bytes_read..]) { + Ok(0) => { + connection_closed = true; + break; + } + Ok(n) => { + bytes_read += n; + if bytes_read == received_data.len() { + received_data.resize(received_data.len() + 1024, 0); + } + } + Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + Err(ref err) if err.kind() == ErrorKind::Interrupted => continue, + Err(err) => return Err(Box::new(err)), + } + } + + if bytes_read != 0 { + self.handle( + &mut received_data[..bytes_read], + token, + ); + } + if connection_closed { + println!("de-registered peer with token '{}'", token.0); + return Ok(true); + } + } + + Ok(false) + } + + fn handle( + &mut self, + data: &[u8], + // stream: &mut TcpStream, + token: Token, + ) -> () { + // trace!("i am client: {:?}", self.clients.get(&token)); + // debug!("{:?}", self.connections); + for cmd in get_commands_from_buffer(BytesMut::from(data)) { + debug!("received: {:?}", cmd); + match cmd.get(2).unwrap() { + 10 => { // PROPREQ + debug!("received property request command from client 'null'"); + self.connections.get_mut(&token).unwrap() + .write_all(&create_property_update_command()).unwrap(); + debug!("sent property update command to client 'null'"); + } + 6 => { // SESSINIT + let local_username = + parse_session_initialization_command(cmd).username; + self.clients.insert(token, local_username.clone()); + debug!( + "received session initialization command from client '{}'", + local_username, + ); + self.connections.get_mut(&token).unwrap() + .write_all(&create_property_request_command()).unwrap(); + debug!("sent session initialization command to client '{}'", local_username); + } + 15 => { // PROPSET + let avatar = parse_property_set_command(cmd); + debug!( + "received property set command from client '{}': {}", + self.clients.get(&token).unwrap(), + avatar, + ); + self.connections.get_mut(&token).unwrap() + .write_all(&create_text_command_with_action( + "WORLDSMASTER", &get_config().unwrap().worldsmaster_greeting, + )).unwrap(); + debug!( + "sent session initialization command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + 29 => { // BUDDYLISTUPDATE + let received_buddy = from_utf8( + cmd.get(4..cmd.get(0).unwrap().to_owned() as usize - 1).unwrap() + ).unwrap(); + debug!( + "received buddy list update command from client '{}': {}", + self.clients.get(&token).unwrap(), + received_buddy, + ); + self.connections.get_mut(&token).unwrap() + .write_all(&create_buddy_list_notify_command(received_buddy)).unwrap(); + debug!( + "sent buddy list notify command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + 20 => { // ROOMIDRQ + let room_name = from_utf8( + cmd.get(4..cmd.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!( + "received room id request command from client '{}': {}", + self.clients.get(&token).unwrap(), + room_name, + ); + let room_id; + if !self.room_ids.contains(&room_name.to_string()) { + self.room_ids.push(room_name.to_string()); + room_id = self.room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("inserted room '{}' as '{}'", room_name, room_id); + } else { + let position = self.room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("found room '{}' as '{}'", room_name, position); + room_id = position; + } + trace!("room name: {}, room id: {}", room_name, room_id); + trace!("{:?}", self.room_ids); + self.connections.get_mut(&token).unwrap() + .write_all(&create_room_id_redirect_command( + room_name, room_id, + )).unwrap(); + } + 14 => { // TEXT + let text = from_utf8( + cmd.get(6..cmd.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + let username = self.clients.get(&token).unwrap().clone(); + debug!( + "received text command from client '{}': {}", + username, text, + ); + self.connections.iter_mut().for_each(|t| + t.1.write_all(&create_text_command( + &username, + text, + )).unwrap() + ); + debug!("broadcasted text command to clients"); + } + 7 => { // SESSEXIT + debug!( + "received session exit command from client '{}'", + self.clients.get(&token).unwrap(), + ); + } + _ => (), + } + } + } +} diff --git a/src/server/auto/__server.rs b/src/server/auto/__server.rs new file mode 100644 index 0000000..b72b7e7 --- /dev/null +++ b/src/server/auto/__server.rs @@ -0,0 +1,230 @@ +use std::error::Error; +use crate::server::auto::cmd::property::{ + create_property_update_command, + create_property_request_command +}; +use crate::server::cmd::text::{create_text_command_with_action, create_text_command}; +use std::str::from_utf8; +use crate::server::cmd::buddy_list::create_buddy_list_notify_command; +use crate::server::auto::cmd::room::create_room_id_redirect_command; +use crate::server::auto::cmd::session::parse_session_initialization_command; +use crate::server::parser::get_commands_from_buffer; +use crate::server::cmd::property::parse_property_set_command; +use crate::config::get_config; +use mio::{Poll, Events, Token, Interest, Registry}; +use mio::net::{TcpListener, TcpStream}; +use std::collections::HashMap; +use mio::event::Event; +use std::io::{Read, ErrorKind, Write}; +use bytes::BytesMut; + +const SERVER: Token = Token(0); + +pub struct AutoServer { + pub clients: HashMap<Token, String>, + pub connections: HashMap<Token, TcpStream>, +} +impl AutoServer { + pub fn listen(&mut self, addr: &str) -> Result<(), Box<dyn Error>> { + let mut server = TcpListener::bind(addr.parse().unwrap())?; + let mut poll = Poll::new()?; + let mut events = Events::with_capacity(1024); + let mut connections = HashMap::new(); + let mut unique_token = Token(SERVER.0 + 1); + + poll.registry().register( + &mut server, + SERVER, + Interest::READABLE + )?; + + debug!("AutoServer now listening on {}", server.local_addr().unwrap()); + + loop { + poll.poll(&mut events, None)?; + + for event in events.iter() { + match event.token() { + SERVER => loop { + let (mut stream, address) = match server.accept() { + Ok((stream, address)) => (stream, address), + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break, + Err(e) => return Err(Box::new(e)), + }; + + let token = AutoServer::next(&mut unique_token); + poll.registry().register( + &mut stream, + token, + Interest::READABLE, //.add(Interest::WRITABLE), + )?; + + connections.insert(token, stream); + + println!("registered peer with address '{}' as '{}'", address, token.0); + }, + token => { + let done = if let Some(stream) = connections.get_mut(&token) { + self.process( + poll.registry(), + stream, + event, + token, + )? + } else { + false + }; + if done { connections.remove(&token); } + } + } + } + } + } + + fn next(current: &mut Token) -> Token { + let next = current.0; + current.0 += 1; + Token(next) + } + + fn broadcast(self, cmd: &[u8]) -> () { + for mut connection in self.connections { + connection.1.write(cmd).unwrap(); + } + } + + fn process( + &mut self, + _registry: &Registry, + stream: &mut TcpStream, + event: &Event, + token: Token, + ) -> Result<bool, Box<dyn Error>> { + if event.is_readable() { + let mut connection_closed = false; + let mut received_data = vec![0; 4096]; + let mut bytes_read = 0; + + loop { + match stream.read(&mut received_data[bytes_read..]) { + Ok(0) => { + connection_closed = true; + break; + } + Ok(n) => { + bytes_read += n; + if bytes_read == received_data.len() { + received_data.resize(received_data.len() + 1024, 0); + } + } + Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + Err(ref err) if err.kind() == ErrorKind::Interrupted => continue, + Err(err) => return Err(Box::new(err)), + } + } + + if bytes_read != 0 { + self.handle( + &mut received_data[..bytes_read], + stream, + token, + ); + } + if connection_closed { + debug!("connection closed"); + return Ok(true); + } + } + + Ok(false) + } + + fn handle( + &mut self, + data: &[u8], + stream: &mut TcpStream, + token: Token, + ) -> () { + trace!("i am client: {:?}", self.clients.get(&token)); + // let local_client = self.clients.get(&token) + // .unwrap_or(&"null".to_string()); + for cmd in get_commands_from_buffer(BytesMut::from(data)) { + debug!("received: {:?}", cmd); + match cmd.get(2).unwrap() { + 10 => { // PROPREQ + debug!("received property request command from client 'null'"); + stream.write_all(&create_property_update_command()).unwrap(); + debug!("sent property update command to client 'null'"); + } + 6 => { // SESSINIT + let username = + parse_session_initialization_command(cmd).username; + self.clients.insert(token, username.clone()); + debug!( + "received session initialization command from client '{}'", + username, + ); + stream.write_all(&create_property_request_command()).unwrap(); + debug!("sent session initialization command to client '{}'", username); + } + 15 => { // PROPSET + let avatar = parse_property_set_command(cmd); + debug!( + "received property set command from client '{}': {}", + self.clients.get(&token).unwrap(), + avatar + ); + stream.write_all(&create_text_command_with_action( + "WORLDSMASTER", &get_config().unwrap().worldsmaster_greeting, + )).unwrap(); + debug!( + "sent session initialization command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + 29 => { // BUDDYLISTUPDATE + let received_buddy = from_utf8( + cmd.get(4..cmd.get(0).unwrap().to_owned() as usize - 1).unwrap() + ).unwrap(); + debug!( + "received buddy list update command from client '{}': {}", + self.clients.get(&token).unwrap(), + received_buddy + ); + stream.write_all(&create_buddy_list_notify_command(received_buddy)).unwrap(); + debug!( + "sent buddy list notify command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + 20 => { // ROOMIDRQ + let room_name = from_utf8( + cmd.get(4..cmd.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!( + "received room id request command from client '{}': {}", + self.clients.get(&token).unwrap(), room_name, + ); + } + 14 => { // TEXT + let text = from_utf8( + cmd.get(6..cmd.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!( + "received text command from client '{}': {}", + self.clients.get(&token).unwrap(), + text, + ); + debug!("broadcasted text command to clients"); + } + 7 => { // SESSEXIT + debug!( + "received session exit command from client '{}'", + self.clients.get(&token).unwrap(), + ); + } + _ => (), + } + } + } +} diff --git a/src/server/auto/_server.rs b/src/server/auto/_server.rs new file mode 100644 index 0000000..8b7fa6c --- /dev/null +++ b/src/server/auto/_server.rs @@ -0,0 +1,182 @@ +use std::error::Error; +use tokio::net::{TcpListener, TcpStream}; +use tokio::io::AsyncWriteExt; +use crate::server::auto::cmd::property::{ + create_property_update_command, + create_property_request_command +}; +use tokio_util::codec::{BytesCodec, Decoder}; +use tokio_stream::StreamExt; +use crate::server::cmd::text::{create_text_command_with_action, create_text_command}; +use std::str::from_utf8; +use crate::server::cmd::buddy_list::create_buddy_list_notify_command; +use crate::server::auto::cmd::room::create_room_id_redirect_command; +use std::sync::Arc; +use tokio::sync::Mutex; +use crate::server::shared::Shared; +use crate::server::peer::Peer; +use std::net::SocketAddr; +use crate::server::auto::cmd::session::parse_session_initialization_command; +use crate::server::parser::get_commands_from_buffer; +use crate::server::cmd::property::parse_property_set_command; +use crate::config::get_config; +use crate::server::cmd::action::create_action_command; + +pub struct AutoServer; +impl AutoServer { + pub async fn listen(addr: &str) -> Result<(), Box<dyn Error>> { + let listener = TcpListener::bind(addr).await?; + debug!("AutoServer now listening on {}", listener.local_addr().unwrap()); + let state = Arc::new(Mutex::new(Shared::new())); + let mut counter = 0; + + loop { + let (stream, address) = listener.accept().await?; + counter += 1; + let state = Arc::clone(&state); + + tokio::spawn(async move { + if let Err(e) = AutoServer::handle( + state, + stream, + address, + counter + ).await { + error!("an error occurred: {}", e); + } + }); + } + } + + pub async fn handle( + state: Arc<Mutex<Shared>>, + stream: TcpStream, + address: SocketAddr, + count: usize, + ) -> Result<(), Box<dyn Error>> { + let bytes = BytesCodec::new().framed(stream); + let mut peer = Peer::new(state.clone(), bytes, count.to_string()).await?; + debug!("registered peer with address '{}' as '{}'", address, count); + let mut room_ids: Vec<String> = Vec::new(); + let mut username: String = String::new(); + + loop { + tokio::select! { + Some(msg) = peer.rx.recv() => { + // debug!("received bytes from peer: {:?}", &msg); + peer.bytes.get_mut().write_all(&msg).await?; + } + result = peer.bytes.next() => match result { + Some(Ok(msg)) => { + // let msg: BytesMut = msg; + for msg in get_commands_from_buffer(msg) { + match msg.get(2).unwrap() { + 10 => { // PROPREQ + debug!("received property request command from client"); + peer.bytes.get_mut() + .write_all(&create_property_update_command()).await?; + debug!("sent property update command to client"); + } + 6 => { // SESSINIT + username = parse_session_initialization_command(msg.clone()).username; + debug!( + "received session initialization command from client: {}", + username + ); + peer.bytes.get_mut() + .write_all(&create_property_request_command()).await?; + debug!("sent session initialization command to client"); + } + 15 => { // PROPSET + let avatar = parse_property_set_command(msg.clone()); + debug!("received property set command from client: {}", avatar); + peer.bytes.get_mut() + .write_all(&create_text_command( + "WORLDSMASTER", &get_config()?.worldsmaster_greeting + )).await?; + peer.bytes.get_mut() + .write_all(&create_action_command()).await?; + debug!("sent worldsmaster greeting to client"); + } + 29 => { // BUDDYLISTUPDATE + let received_buddy = from_utf8( + msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap() + ).unwrap(); + debug!( + "received buddy list update command from client: {}", + received_buddy + ); + let buddies = vec![ + "dosfox", + "Fallen_Angel", + "Internet_Man", + "Nexialist", + "SirGemini", + "SirGrandpa", + "Wirlaburla", + ]; + if buddies.contains(&received_buddy) { + peer.bytes.get_mut() + .write_all(&create_buddy_list_notify_command(received_buddy)) + .await?; + debug!("sent buddy list notify command to client: {}", received_buddy); + } + } + 20 => { // ROOMIDRQ + let room_name = from_utf8( + msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!("received room id request command from client: {}", room_name); + let room_id; + if !room_ids.contains(&room_name.to_string()) { + room_ids.push(room_name.to_string()); + room_id = room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("inserted room '{}' as '{}'", room_name, room_id); + } else { + let position = room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("found room '{}' as '{}'", room_name, position); + room_id = position; + } + trace!("room name: {}, room id: {}", room_name, room_id); + trace!("{:?}", room_ids); + peer.bytes.get_mut() + .write_all(&create_room_id_redirect_command(room_name, room_id)) + .await?; + debug!("sent redirect id command to client: {} == {}", room_name, room_id); + } + 14 => { + let text = from_utf8( + msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!("received text command from client: {}", text); + let mut state = state.lock().await; + state.broadcast(&create_text_command(&username, text)).await; + debug!("broadcasted text command from client"); + } + 7 => { // SESSEXIT + debug!("received session exit command from client") + } + _ => (), + } + } + } + Some(Err(e)) => { + error!("error while processing messages: {}", e); break; + } + None => break, + } + } + } + + { // De-register client + state.lock().await.peers.remove(&count.to_string()); + debug!("removed peer: {}", count) + } + + Ok(()) + } +} diff --git a/src/server/auto/cmd/property.rs b/src/server/auto/cmd/property.rs index b8d0085..e266757 100644 --- a/src/server/auto/cmd/property.rs +++ b/src/server/auto/cmd/property.rs @@ -1,37 +1,24 @@ pub fn create_property_update_command() -> [u8; 147] { - [ - 0x93, 0xFF, 0x10, 0x1B, 0x80, 0x01, 0x0C, 0x77, 0x6F, - 0x72, 0x6C, 0x64, 0x73, 0x33, 0x64, 0x2E, 0x63, 0x6F, - 0x6D, 0x1A, 0x80, 0x01, 0x12, 0x6D, 0x61, 0x69, 0x6C, - 0x2E, 0x77, 0x6F, 0x72, 0x6C, 0x64, 0x73, 0x2E, 0x6E, - 0x65, 0x74, 0x3A, 0x32, 0x35, 0x19, 0x80, 0x01, 0x28, - 0x68, 0x74, 0x74, 0x70, 0x3A, 0x2F, 0x2F, 0x77, 0x77, - 0x77, 0x2D, 0x64, 0x79, 0x6E, 0x61, 0x6D, 0x69, 0x63, - 0x2E, 0x75, 0x73, 0x2E, 0x77, 0x6F, 0x72, 0x6C, 0x64, - 0x73, 0x2E, 0x6E, 0x65, 0x74, 0x2F, 0x63, 0x67, 0x69, - 0x2D, 0x62, 0x69, 0x6E, 0x18, 0x80, 0x01, 0x1F, 0x68, - 0x74, 0x74, 0x70, 0x3A, 0x2F, 0x2F, 0x77, 0x77, 0x77, - 0x2D, 0x73, 0x74, 0x61, 0x74, 0x69, 0x63, 0x2E, 0x75, - 0x73, 0x2E, 0x77, 0x6F, 0x72, 0x6C, 0x64, 0x73, 0x2E, - 0x6E, 0x65, 0x74, 0x0F, 0x80, 0x01, 0x01, 0x31, 0x03, - 0x80, 0x01, 0x02, 0x32, 0x34, 0x01, 0x80, 0x01, 0x0C, - 0x57, 0x4F, 0x52, 0x4C, 0x44, 0x53, 0x4D, 0x41, 0x53, - 0x54, 0x45, 0x52 - ]: [u8; 147] + [ + 0x93, 0xFF, 0x10, 0x1B, 0x80, 0x01, 0x0C, 0x77, 0x6F, 0x72, 0x6C, 0x64, 0x73, 0x33, 0x64, 0x2E, + 0x63, 0x6F, 0x6D, 0x1A, 0x80, 0x01, 0x12, 0x6D, 0x61, 0x69, 0x6C, 0x2E, 0x77, 0x6F, 0x72, 0x6C, + 0x64, 0x73, 0x2E, 0x6E, 0x65, 0x74, 0x3A, 0x32, 0x35, 0x19, 0x80, 0x01, 0x28, 0x68, 0x74, 0x74, + 0x70, 0x3A, 0x2F, 0x2F, 0x77, 0x77, 0x77, 0x2D, 0x64, 0x79, 0x6E, 0x61, 0x6D, 0x69, 0x63, 0x2E, + 0x75, 0x73, 0x2E, 0x77, 0x6F, 0x72, 0x6C, 0x64, 0x73, 0x2E, 0x6E, 0x65, 0x74, 0x2F, 0x63, 0x67, + 0x69, 0x2D, 0x62, 0x69, 0x6E, 0x18, 0x80, 0x01, 0x1F, 0x68, 0x74, 0x74, 0x70, 0x3A, 0x2F, 0x2F, + 0x77, 0x77, 0x77, 0x2D, 0x73, 0x74, 0x61, 0x74, 0x69, 0x63, 0x2E, 0x75, 0x73, 0x2E, 0x77, 0x6F, + 0x72, 0x6C, 0x64, 0x73, 0x2E, 0x6E, 0x65, 0x74, 0x0F, 0x80, 0x01, 0x01, 0x31, 0x03, 0x80, 0x01, + 0x02, 0x32, 0x34, 0x01, 0x80, 0x01, 0x0C, 0x57, 0x4F, 0x52, 0x4C, 0x44, 0x53, 0x4D, 0x41, 0x53, + 0x54, 0x45, 0x52, + ]: [u8; 147] } pub fn create_property_request_command() -> [u8; 61] { - [ - 0x3D, 0x01, 0x06, 0x04, 0x01, 0x30, 0x01, 0x0C, - 0x57, 0x4F, 0x52, 0x4C, 0x44, 0x53, 0x4D, 0x41, - 0x53, 0x54, 0x45, 0x52, 0x03, 0x02, 0x32, 0x34, - 0x0F, 0x01, 0x31, 0x0A, 0x10, - - // VAR_SERIAL - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - - 0x16, 0x01, 0x30, 0x05, 0x0B, 0x64, 0x69, 0x6D, - 0x65, 0x6E, 0x73, 0x69, 0x6F, 0x6E, 0x2D, 0x31 - ]: [u8; 61] + [ + 0x3D, 0x01, 0x06, 0x04, 0x01, 0x30, 0x01, 0x0C, 0x57, 0x4F, 0x52, 0x4C, 0x44, 0x53, 0x4D, 0x41, + 0x53, 0x54, 0x45, 0x52, 0x03, 0x02, 0x32, 0x34, 0x0F, 0x01, 0x31, 0x0A, 0x10, + // VAR_SERIAL: DWLV000000000000 + 0x44, 0x57, 0x4c, 0x56, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x16, 0x01, 0x30, 0x05, 0x0B, 0x64, 0x69, 0x6D, 0x65, 0x6E, 0x73, 0x69, 0x6F, 0x6E, 0x2D, 0x31, + ]: [u8; 61] } diff --git a/src/server/auto/cmd/room.rs b/src/server/auto/cmd/room.rs index 16c27b8..79786ef 100644 --- a/src/server/auto/cmd/room.rs +++ b/src/server/auto/cmd/room.rs @@ -1,36 +1,33 @@ pub fn create_room_id_redirect_command(room_name: &str, room_id: usize) -> Vec<u8> { - let mut room_id_redirect = vec![ - 0x01, // ? - 0x1A, // Command type - ]; + let mut room_id_redirect = vec![ + 0x01, // ? + 0x1A, // Command type + ]; - // room_id_redirect.push(room_id_redirect.len() as u8 + 1); // Data length - room_id_redirect.push(room_name.len() as u8); // UTF/ room name length - for i in room_name.bytes() { room_id_redirect.push(i); } // Push `room_name` - // for i in "<dimension-1>".bytes() { room_id_redirect.push(i); } // Push room number + // room_id_redirect.push(room_id_redirect.len() as u8 + 1); // Data length + room_id_redirect.push(room_name.len() as u8); // UTF/ room name length + for i in room_name.bytes() { + room_id_redirect.push(i); + } // Push `room_name` + // for i in "<dimension-1>".bytes() { room_id_redirect.push(i); } // Push room + // number - // Room number - room_id_redirect.push(0x00); - room_id_redirect.push(room_id as u8); + // Room number + room_id_redirect.push(0x00); + room_id_redirect.push(room_id as u8); - // IP - room_id_redirect.push(0x00); - room_id_redirect.push(0x00); - room_id_redirect.push(0x00); - room_id_redirect.push(0x00); + // IP + room_id_redirect.push(0x00); + room_id_redirect.push(0x00); + room_id_redirect.push(0x00); + room_id_redirect.push(0x00); - // Port - // for byte in convert_u16_to_two_u8s_be(0x1629).iter() { - // room_id_redirect.push(*byte); - // } - // Port - // for byte in convert_u16_to_two_u8s_be(5673_i32 as u16).iter() { - // room_id_redirect.push(*byte); - // } - room_id_redirect.push(0x16); - room_id_redirect.push(0x29); + // Port + for byte in 5673_u16.to_be_bytes().iter() { + room_id_redirect.push(*byte); + } - room_id_redirect.insert(0, room_id_redirect.len() as u8 + 1); // Data length + room_id_redirect.insert(0, room_id_redirect.len() as u8 + 1); // Data length - room_id_redirect + room_id_redirect } diff --git a/src/server/auto/cmd/session.rs b/src/server/auto/cmd/session.rs index 5592886..19969dc 100644 --- a/src/server/auto/cmd/session.rs +++ b/src/server/auto/cmd/session.rs @@ -1,34 +1,36 @@ -use crate::server::cmd::session::SessionInitializationCommand; -use bytes::BytesMut; use std::str::from_utf8; +use bytes::BytesMut; + +use crate::server::cmd::session::SessionInitializationCommand; + struct _SessionInitializationCommandServer { - pub error: usize, - pub app_name: String, - pub protocol: usize, - pub server_type: usize, - pub serial: String, - pub private: usize, - pub channel: String, + pub error: usize, + pub app_name: String, + pub protocol: usize, + pub server_type: usize, + pub serial: String, + pub private: usize, + pub channel: String, } -pub fn parse_session_initialization_command( - command: BytesMut -) -> SessionInitializationCommand { - SessionInitializationCommand { - // protocol: command.get(4..4 + command.get(4)).unwrap().to_owned() as usize, - // client: "".to_string(), - username: from_utf8( - command.get( - 21..(20 + command.get(20).unwrap().to_owned() as usize + 1) - ).unwrap() - ).unwrap().to_string(), - // password: "".to_string() - } +pub fn parse_session_initialization_command(command: BytesMut) -> SessionInitializationCommand { + SessionInitializationCommand { + // protocol: command.get(4..4 + command.get(4)).unwrap().to_owned() as usize, + // client: "".to_string(), + username: from_utf8( + command + .get(21..(20 + command.get(20).unwrap().to_owned() as usize + 1)) + .unwrap(), + ) + .unwrap() + .to_string(), + // password: "".to_string() + } } -// pub fn create_session_initialization_command() -> SessionInitializationCommandServer { -// SessionInitializationCommandServer { +// pub fn create_session_initialization_command() -> +// SessionInitializationCommandServer { SessionInitializationCommandServer { // // } // } diff --git a/src/server/auto/mod.rs b/src/server/auto/mod.rs index 0a6f342..24606ea 100644 --- a/src/server/auto/mod.rs +++ b/src/server/auto/mod.rs @@ -1,2 +1,2 @@ -pub mod server; pub mod cmd; +pub mod server; diff --git a/src/server/auto/server.rs b/src/server/auto/server.rs index b8c23e1..164b271 100644 --- a/src/server/auto/server.rs +++ b/src/server/auto/server.rs @@ -1,168 +1,319 @@ -use std::error::Error; -use tokio::net::{TcpListener, TcpStream}; -use tokio::io::AsyncWriteExt; -use crate::server::auto::cmd::property::{ - create_property_update_command, - create_property_request_command +use std::{ + collections::HashMap, + error::Error, + io::{ErrorKind, Read, Write}, + ops::Deref, + str::from_utf8, }; -use tokio_util::codec::{BytesCodec, Decoder}; -use tokio_stream::StreamExt; -use crate::server::cmd::text::{create_text_command_with_action, create_text_command}; -use std::str::from_utf8; -use crate::server::cmd::buddy_list::create_buddy_list_notify_command; -use crate::server::auto::cmd::room::create_room_id_redirect_command; -use std::sync::Arc; -use tokio::sync::Mutex; -use crate::server::shared::Shared; -use crate::server::peer::Peer; -use std::net::SocketAddr; -use crate::server::auto::cmd::session::parse_session_initialization_command; -use crate::server::parser::get_commands_from_buffer; -use crate::server::cmd::property::parse_property_set_command; -use crate::config::get_config; - -pub struct AutoServer; + +use bytes::BytesMut; +use mio::{ + event::Event, + net::{TcpListener, TcpStream}, + Events, + Interest, + Poll, + Registry, + Token, +}; + +use crate::{ + config::get_config, + server::{ + auto::cmd::{ + property::{create_property_request_command, create_property_update_command}, + room::create_room_id_redirect_command, + session::parse_session_initialization_command, + }, + cmd::{ + action::create_action_command, + buddy_list::create_buddy_list_notify_command, + property::parse_property_set_command, + text::{create_text_command, create_text_command_with_action}, + }, + parser::get_commands_from_buffer, + }, +}; + +const SERVER: Token = Token(0); + +pub struct AutoServer { + pub clients: HashMap<Token, String>, + pub connections: HashMap<Token, TcpStream>, + pub room_ids: Vec<String>, +} impl AutoServer { - pub async fn listen(addr: &str) -> Result<(), Box<dyn Error>> { - let listener = TcpListener::bind(addr).await?; - debug!("AutoServer now listening on {}", listener.local_addr().unwrap()); - let state = Arc::new(Mutex::new(Shared::new())); - let mut counter = 0; - - loop { - let (stream, address) = listener.accept().await?; - counter += 1; - let state = Arc::clone(&state); - - tokio::spawn(async move { - if let Err(e) = AutoServer::handle( - state, - stream, - address, - counter - ).await { - error!("an error occurred: {}", e); - } - }); - } - } - - pub async fn handle( - state: Arc<Mutex<Shared>>, - stream: TcpStream, - address: SocketAddr, - count: usize, - ) -> Result<(), Box<dyn Error>> { - let bytes = BytesCodec::new().framed(stream); - let mut peer = Peer::new(state.clone(), bytes, count.to_string()).await?; - debug!("registered peer with address '{}' as '{}'", address, count); - let mut room_ids: Vec<String> = Vec::new(); - let mut username: String = String::new(); - - loop { - tokio::select! { - Some(msg) = peer.rx.recv() => { - // debug!("received bytes from peer: {:?}", &msg); - peer.bytes.get_mut().write_all(&msg).await?; - } - result = peer.bytes.next() => match result { - Some(Ok(msg)) => { - // let msg: BytesMut = msg; - for msg in get_commands_from_buffer(msg) { - match msg.get(2).unwrap() { - 10 => { // PROPREQ - debug!("received property request command from client"); - peer.bytes.get_mut() - .write_all(&create_property_update_command()).await?; - debug!("sent property update command to client"); - } - 6 => { // SESSINIT - username = parse_session_initialization_command(msg.clone()).username; - debug!( - "received session initialization command from client: {}", - username - ); - peer.bytes.get_mut() - .write_all(&create_property_request_command()).await?; - debug!("sent session initialization command to client"); - } - 15 => { // PROPSET - let avatar = parse_property_set_command(msg.clone()); - debug!("received property set command from client: {}", avatar); - peer.bytes.get_mut() - .write_all(&create_text_command_with_action( - "WORLDSMASTER", &get_config()?.worldsmaster_greeting - )).await?; - debug!("sent worldsmaster greeting to client"); - } - 29 => { // BUDDYLISTUPDATE - let received_buddy = from_utf8( - msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap() - ).unwrap(); - debug!( - "received buddy list update command from client: {}", - received_buddy - ); - peer.bytes.get_mut() - .write_all(&create_buddy_list_notify_command(received_buddy)) - .await?; - debug!("sent buddy list notify command to client: {}", received_buddy); - } - 20 => { // ROOMIDRQ - let room_name = from_utf8( - msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap() - ).unwrap(); - debug!("received room id request command from client: {}", room_name); - let room_id; - if !room_ids.contains(&room_name.to_string()) { - room_ids.push(room_name.to_string()); - room_id = room_ids.iter() - .position(|i| i == &room_name.to_string()) - .unwrap(); - trace!("inserted room '{}' as '{}'", room_name, room_id); - } else { - let position = room_ids.iter() - .position(|i| i == &room_name.to_string()) - .unwrap(); - trace!("found room '{}' as '{}'", room_name, position); - room_id = position; - } - trace!("room name: {}, room id: {}", room_name, room_id); - trace!("{:?}", room_ids); - peer.bytes.get_mut() - .write_all(&create_room_id_redirect_command(room_name, room_id)) - .await?; - debug!("sent redirect id command to client: {} == {}", room_name, room_id); - } - 14 => { - let text = from_utf8( - msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap() - ).unwrap(); - debug!("received text command from client: {}", text); - let mut state = state.lock().await; - state.broadcast(&create_text_command(&username, text)).await; - debug!("broadcasted text command from client"); - } - 7 => { // SESSEXIT - debug!("received session exit command from client") - } - _ => (), - } - } - } - Some(Err(e)) => { - error!("error while processing messages: {}", e); break; - } - None => break, - } - } - } - - { // De-register client - state.lock().await.peers.remove(&count.to_string()); - debug!("removed peer: {}", count) - } - - Ok(()) - } + pub fn listen(&mut self, addr: &str) -> Result<(), Box<dyn Error>> { + let mut server = TcpListener::bind(addr.parse().unwrap())?; + let mut poll = Poll::new()?; + let mut events = Events::with_capacity(1024); + let mut unique_token = Token(SERVER.0 + 1); + + poll + .registry() + .register(&mut server, SERVER, Interest::READABLE)?; + + debug!( + "AutoServer now listening on {}", + server.local_addr().unwrap() + ); + + loop { + poll.poll(&mut events, None)?; + + for event in &events { + match event.token() { + SERVER => { + loop { + let (mut stream, address) = match server.accept() { + Ok((stream, address)) => (stream, address), + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break, + Err(e) => return Err(Box::new(e)), + }; + + let token = AutoServer::next(&mut unique_token); + poll.registry().register( + &mut stream, + token, + Interest::READABLE, //.add(Interest::WRITABLE), + )?; + + self.connections.insert(token, stream); + + debug!( + "registered peer with address '{}' as '{}'", + address, token.0 + ); + } + } + token => { + let done = self.process(poll.registry(), event, token)?; + if done { + self.connections.remove(&token); + } + } + } + } + } + } + + fn next(current: &mut Token) -> Token { + let next = current.0; + current.0 += 1; + Token(next) + } + + fn broadcast(mut self, cmd: &[u8]) -> () { + self + .connections + .iter_mut() + .for_each(|c| c.1.write_all(cmd).unwrap()); + } + + fn process( + &mut self, + _registry: &Registry, + event: &Event, + token: Token, + ) -> Result<bool, Box<dyn Error>> { + if event.is_readable() { + let mut connection_closed = false; + let mut received_data = vec![0; 4096]; + let mut bytes_read = 0; + + let stream = self.connections.get_mut(&token).unwrap(); + + loop { + match stream.read(&mut received_data[bytes_read..]) { + Ok(0) => { + connection_closed = true; + break; + } + Ok(n) => { + bytes_read += n; + if bytes_read == received_data.len() { + received_data.resize(received_data.len() + 1024, 0); + } + } + Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + Err(ref err) if err.kind() == ErrorKind::Interrupted => continue, + Err(err) => return Err(Box::new(err)), + } + } + + if bytes_read != 0 { + self.handle(&mut received_data[..bytes_read], token); + } + if connection_closed { + println!("de-registered peer with token '{}'", token.0); + return Ok(true); + } + } + + Ok(false) + } + + fn handle(&mut self, data: &[u8], token: Token) -> () { + // trace!("i am client: {:?}", self.clients.get(&token)); + // debug!("{:?}", self.connections); + for cmd in get_commands_from_buffer(BytesMut::from(data)) { + debug!("received: {:?}", cmd); + match cmd.get(2).unwrap() { + 10 => { + // PROPREQ + debug!("received property request command from client 'null'"); + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_property_update_command()) + .unwrap(); + debug!("sent property update command to client 'null'"); + } + 6 => { + // SESSINIT + let local_username = parse_session_initialization_command(cmd).username; + self.clients.insert(token, local_username.clone()); + debug!( + "received session initialization command from client '{}'", + local_username, + ); + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_property_request_command()) + .unwrap(); + debug!( + "sent session initialization command to client '{}'", + local_username + ); + } + 15 => { + // PROPSET + let avatar = parse_property_set_command(cmd); + debug!( + "received property set command from client '{}': {}", + self.clients.get(&token).unwrap(), + avatar, + ); + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_text_command( + "WORLDSMASTER", + &get_config().unwrap().worldsmaster_greeting, + )) + .unwrap(); + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_action_command()) + .unwrap(); + debug!( + "sent session initialization command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + 29 => { + // BUDDYLISTUPDATE + let received_buddy = from_utf8( + cmd + .get(4..cmd.get(0).unwrap().to_owned() as usize - 1) + .unwrap(), + ) + .unwrap(); + debug!( + "received buddy list update command from client '{}': {}", + self.clients.get(&token).unwrap(), + received_buddy, + ); + let buddies = vec![ + "dosfox", + "Fallen_Angel", + "Internet_Man", + "Nexialist", + "SirGemini", + "SirGrandpa", + "Wirlaburla", + ]; + if buddies.contains(&received_buddy) { + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_buddy_list_notify_command(received_buddy)) + .unwrap(); + debug!( + "sent buddy list notify command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + } + 20 => { + // ROOMIDRQ + let room_name = + from_utf8(cmd.get(4..cmd.get(0).unwrap().to_owned() as usize).unwrap()).unwrap(); + debug!( + "received room id request command from client '{}': {}", + self.clients.get(&token).unwrap(), + room_name, + ); + let room_id; + if !self.room_ids.contains(&room_name.to_string()) { + self.room_ids.push(room_name.to_string()); + room_id = self + .room_ids + .iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("inserted room '{}' as '{}'", room_name, room_id); + } else { + let position = self + .room_ids + .iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("found room '{}' as '{}'", room_name, position); + room_id = position; + } + trace!("room name: {}, room id: {}", room_name, room_id); + trace!("{:?}", self.room_ids); + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_room_id_redirect_command(room_name, room_id)) + .unwrap(); + } + 14 => { + // TEXT + let text = + from_utf8(cmd.get(6..cmd.get(0).unwrap().to_owned() as usize).unwrap()).unwrap(); + let username = self.clients.get(&token).unwrap().clone(); + debug!( + "received text command from client '{}': {}", + username, + format!("auto {}", text), + ); + self.connections.iter_mut().for_each(|t| { + t.1 + .write_all(&create_text_command(&username, text)) + .unwrap() + }); + debug!("broadcasted text command to clients"); + } + 7 => { + // SESSEXIT + debug!( + "received session exit command from client '{}'", + self.clients.get(&token).unwrap(), + ); + } + _ => (), + } + } + } } diff --git a/src/server/cmd/action.rs b/src/server/cmd/action.rs new file mode 100644 index 0000000..1cf9086 --- /dev/null +++ b/src/server/cmd/action.rs @@ -0,0 +1,6 @@ +pub fn create_action_command() -> [u8; 18] { + [ + 0x12, 0x01, 0x11, 0x00, 0x05, 0x54, 0x52, 0x41, 0x44, 0x45, 0x07, 0x26, 0x7c, 0x2b, 0x69, 0x6e, + 0x76, 0x3e, + ] +} diff --git a/src/server/cmd/buddy_list.rs b/src/server/cmd/buddy_list.rs index cbddc19..ba938f5 100644 --- a/src/server/cmd/buddy_list.rs +++ b/src/server/cmd/buddy_list.rs @@ -1,12 +1,14 @@ pub fn create_buddy_list_notify_command(buddy: &str) -> Vec<u8> { - let mut buddy_list_notify = Vec::with_capacity(5 + buddy.len()); - buddy_list_notify.push(0x01); // ? - buddy_list_notify.push(0x1E); // BUDDYLISTNOTIFY - buddy_list_notify.push(buddy.len() as u8); // Buddy name length - for i in buddy.bytes() { buddy_list_notify.push(i); } // Buddy name - buddy_list_notify.push(0x01); // Is buddy logged on? - // Insert data length as first byte. - buddy_list_notify.insert(0, buddy_list_notify.len() as u8 + 1); // ^ + let mut buddy_list_notify = Vec::with_capacity(5 + buddy.len()); + buddy_list_notify.push(0x01); // ? + buddy_list_notify.push(0x1E); // BUDDYLISTNOTIFY + buddy_list_notify.push(buddy.len() as u8); // Buddy name length + for i in buddy.bytes() { + buddy_list_notify.push(i); + } // Buddy name + buddy_list_notify.push(0x01); // Is buddy logged on? + // Insert data length as first byte. + buddy_list_notify.insert(0, buddy_list_notify.len() as u8 + 1); // ^ - buddy_list_notify // Return created array + buddy_list_notify // Return created array } diff --git a/src/server/cmd/mod.rs b/src/server/cmd/mod.rs index 5e5bcd9..6db7826 100644 --- a/src/server/cmd/mod.rs +++ b/src/server/cmd/mod.rs @@ -1,3 +1,4 @@ +pub mod action; pub mod buddy_list; pub mod property; pub mod session; diff --git a/src/server/cmd/property.rs b/src/server/cmd/property.rs index eaa4cc1..1530f58 100644 --- a/src/server/cmd/property.rs +++ b/src/server/cmd/property.rs @@ -1,6 +1,7 @@ -use bytes::BytesMut; use std::str::from_utf8; +use bytes::BytesMut; + pub fn parse_property_set_command(command: BytesMut) -> String { - from_utf8(command.get(8..).unwrap()).unwrap().to_string() + from_utf8(command.get(8..).unwrap()).unwrap().to_string() } diff --git a/src/server/cmd/session.rs b/src/server/cmd/session.rs index 9d48bcc..97efc8f 100644 --- a/src/server/cmd/session.rs +++ b/src/server/cmd/session.rs @@ -1,6 +1,6 @@ pub struct SessionInitializationCommand { - // pub protocol: usize, - // pub client: String, - pub username: String, - // pub password: String, + // pub protocol: usize, + // pub client: String, + pub username: String, + // pub password: String, } diff --git a/src/server/cmd/text.rs b/src/server/cmd/text.rs index a0ada32..cc306db 100644 --- a/src/server/cmd/text.rs +++ b/src/server/cmd/text.rs @@ -1,46 +1,55 @@ pub fn create_text_command(user: &str, message: &str) -> Vec<u8> { - let mut text = Vec::with_capacity(6 + user.len() + message.len()); - text.push(0x01); // ? - text.push(0x0E); // Command type - text.push(0x00); // Assumed to be a divider. - text.push(user.len() as u8); // 'user' length - for i in user.bytes() { text.push(i); } // Pushing 'user' - text.push(message.len() as u8); // 'message' length - for i in message.bytes() { text.push(i); } // Pushing `message` - text.insert(0, text.len() as u8 + 1); // Insert data length as first byte. + let mut text = Vec::with_capacity(6 + user.len() + message.len()); + text.push(0x01); // ? + text.push(0x0E); // Command type + text.push(0x00); // Assumed to be a divider. + text.push(user.len() as u8); // 'user' length + for i in user.bytes() { + text.push(i); + } // Pushing 'user' + text.push(message.len() as u8); // 'message' length + for i in message.bytes() { + text.push(i); + } // Pushing `message` + text.insert(0, text.len() as u8 + 1); // Insert data length as first byte. - text // Return created array + text // Return created array } // TODO: Get this working! -// pub fn get_message_from_text_command(buffer: &'static [u8; 1024]) -> &'static str { -// from_utf8( +// pub fn get_message_from_text_command(buffer: &'static [u8; 1024]) -> &'static +// str { from_utf8( // &buffer[6..*&buffer.get(0).unwrap().to_owned() as usize] // ).unwrap() // } pub fn create_text_command_with_action( - user: &str, - message: &str, - // action: &str // Not accepting input until I figure out how actions work. + user: &str, + message: &str, + // action: &str // Not accepting input until I figure out how actions work. ) -> Vec<u8> { - let mut text = Vec::with_capacity(6 + user.len() + message.len()); - text.push(0x01); // ? - text.push(0x0E); // Command type - text.push(0x00); // Assumed to be a divider. - text.push(user.len() as u8); // 'user' length - for i in user.bytes() { text.push(i); } // Pushing 'user' - text.push(message.len() as u8); // 'message' length - for i in message.bytes() { text.push(i); } // Pushing `message` + let mut text = Vec::with_capacity(6 + user.len() + message.len()); + text.push(0x01); // ? + text.push(0x0E); // Command type + text.push(0x00); // Assumed to be a divider. + text.push(user.len() as u8); // 'user' length + for i in user.bytes() { + text.push(i); + } // Pushing 'user' + text.push(message.len() as u8); // 'message' length + for i in message.bytes() { + text.push(i); + } // Pushing `message` - let action: [u8; 18] = [ - 0x12, 0x01, 0x11, 0x00, 0x05, 0x54, 0x52, 0x41, - 0x44, 0x45, 0x07, 0x26, 0x7c, 0x2b, 0x69, 0x6e, - 0x76, 0x3e - ]; - for i in action.iter() { text.push(*i); } + let action: [u8; 18] = [ + 0x12, 0x01, 0x11, 0x00, 0x05, 0x54, 0x52, 0x41, 0x44, 0x45, 0x07, 0x26, 0x7c, 0x2b, 0x69, 0x6e, + 0x76, 0x3e, + ]; + for i in action.iter() { + text.push(*i); + } - text.insert(0, text.len() as u8 + 1); // Insert data length as first byte. + text.insert(0, text.len() as u8 + 1); // Insert data length as first byte. - text // Return created array + text // Return created array } diff --git a/src/server/mod.rs b/src/server/mod.rs index a6341a2..6f2a839 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,12 +1,7 @@ -use tokio::sync::mpsc; use bytes::BytesMut; +use tokio::sync::mpsc; pub mod auto; mod cmd; -pub mod room; mod parser; -mod peer; -mod shared; - -type Tx = mpsc::UnboundedSender<BytesMut>; -type Rx = mpsc::UnboundedReceiver<BytesMut>; +pub mod room; diff --git a/src/server/parser.rs b/src/server/parser.rs index acedbd3..f856650 100644 --- a/src/server/parser.rs +++ b/src/server/parser.rs @@ -8,30 +8,28 @@ use bytes::BytesMut; /// 3. Remove command from `buffer`. /// 4. Iterate and do this for all commands within `buffer`. pub fn get_commands_from_buffer(mut buffer: BytesMut) -> Vec<BytesMut> { - let mut commands: Vec<BytesMut> = Vec::new(); - // debug!("initial buffer: {:?}, length: {}", buffer, buffer.len()); + let mut commands: Vec<BytesMut> = Vec::new(); + trace!("initial buffer: {:?}, length: {}", buffer, buffer.len()); - let data_length = buffer.get(0).unwrap().to_owned() as usize; - if buffer.len() > data_length { - loop { - // debug!("loop: {:?}, length: {}", buffer, buffer.len()); - let command_length = buffer.get(0).unwrap().to_owned() as usize; - commands.push( - BytesMut::from(buffer.get(0..command_length).unwrap()) - ); + let data_length = buffer.get(0).unwrap().to_owned() as usize; + if buffer.len() > data_length { + loop { + trace!("loop: {:?}, length: {}", buffer, buffer.len()); + let command_length = buffer.get(0).unwrap().to_owned() as usize; + commands.push(BytesMut::from(buffer.get(0..command_length).unwrap())); - // Remove command from buffer - buffer = buffer.split_off(command_length); + // Remove command from buffer + buffer = buffer.split_off(command_length); - // Check if any more commands are present - if buffer.is_empty() { break; } - } - } else { - // There will always be at least one command, push it. - commands.push( - BytesMut::from(buffer.get(0..data_length).unwrap()) - ); - } + // Check if any more commands are present + if buffer.is_empty() { + break; + } + } + } else { + // There will always be at least one command, push it. + commands.push(BytesMut::from(buffer.get(0..data_length).unwrap())); + } - commands // Return command (s) + commands // Return command (s) } diff --git a/src/server/peer.rs b/src/server/peer.rs deleted file mode 100644 index e92a3db..0000000 --- a/src/server/peer.rs +++ /dev/null @@ -1,26 +0,0 @@ -use tokio_util::codec::{Framed, BytesCodec}; -use tokio::net::TcpStream; -use crate::server::Rx; -use std::sync::Arc; -use tokio::sync::Mutex; -use crate::server::shared::Shared; -use tokio::sync::mpsc; - -pub struct Peer { - pub bytes: Framed<TcpStream, BytesCodec>, - // pub(crate) rx: Rx, - pub rx: Rx, -} -impl Peer { - pub async fn new( - state: Arc<Mutex<Shared>>, - bytes: Framed<TcpStream, BytesCodec>, - username: String, - ) -> std::io::Result<Peer> { - // let address = bytes.get_ref().peer_addr()?; - let (tx, rx) = mpsc::unbounded_channel(); - // state.lock().await.peers.insert(address, tx); - state.lock().await.peers.insert(username, tx); - Ok(Peer { bytes, rx }) - } -} diff --git a/src/server/room/__server.rs b/src/server/room/__server.rs new file mode 100644 index 0000000..0923e1b --- /dev/null +++ b/src/server/room/__server.rs @@ -0,0 +1,382 @@ +use std::error::Error; +use crate::server::auto::cmd::property::{ + create_property_update_command, + create_property_request_command +}; +use crate::server::cmd::text::{create_text_command_with_action, create_text_command}; +use std::str::from_utf8; +use crate::server::cmd::buddy_list::create_buddy_list_notify_command; +use crate::server::auto::cmd::room::create_room_id_redirect_command; +use crate::server::room::cmd::session::parse_session_initialization_command; +use crate::server::parser::get_commands_from_buffer; +use crate::server::cmd::property::parse_property_set_command; +use crate::config::get_config; +use mio::{Poll, Events, Token, Interest, Registry}; +use mio::net::{TcpListener, TcpStream}; +use std::collections::{HashMap, HashSet}; +use mio::event::Event; +use std::io::{Read, ErrorKind, Write}; +use bytes::BytesMut; + +pub struct RoomServer { + pub clients: HashMap<Token, String>, + pub connections: HashMap<Token, TcpStream>, + pub room_ids: Vec<String>, +} +impl RoomServer { + pub fn listen(&mut self, addr: &str) -> Result<(), Box<dyn Error>> { + let mut listener = TcpListener::bind(addr.parse().unwrap())?; + let mut poll = Poll::new()?; + let mut events = Events::with_capacity(1024); + let mut counter: usize = 0; + // let mut sockets = HashMap::new(); + let mut requests = HashMap::new(); + let mut buffer = [0 as u8; 1024]; + // let mut room_ids = vec![]; + + poll.registry().register( + &mut listener, + Token(0), + Interest::READABLE, + )?; + + debug!("RoomServer now listening on {}", listener.local_addr().unwrap()); + + loop { + poll.poll(&mut events, None)?; + + for event in &events { + match event.token() { + Token(0) => loop { + match listener.accept() { + Ok((mut stream, address)) => { + counter += 1; + let token = Token(counter); + + poll.registry().register( + &mut stream, + token, + Interest::READABLE, + )?; + + debug!("registered peer with address '{}' as '{}'", address, token.0); + + // sockets.insert(token, stream); + self.connections.insert(token, stream); + requests.insert(token, Vec::with_capacity(192)); + } + Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + Err(err) => { + error!("unexpected error: {}", err); + poll.registry().deregister( + self.connections.get_mut(&Token(counter)).unwrap(), + )?; + break; + } + } + }, + token if event.is_readable() => { + loop { + let read = self.connections.get_mut(&token).unwrap() + .read(&mut buffer); + match read { + Ok(0) => { self.connections.remove(&token); break; } + Ok(n) => { + let req = requests.get_mut(&token).unwrap(); + for b in &buffer[0..n] { req.push(*b); } + + for cmd in get_commands_from_buffer(BytesMut::from(&buffer[..n])) { + match cmd.get(2).unwrap() { + 10 => { // PROPREQ + debug!("received property request command from client 'null'"); + self.connections.get_mut(&token).unwrap() + .write_all(&create_property_update_command()).unwrap(); + debug!("sent property update command to client 'null'"); + } + 6 => { // SESSINIT + let local_username = + parse_session_initialization_command(cmd).username; + self.clients.insert(token, local_username.clone()); + debug!( + "received session initialization command from client '{}'", + local_username, + ); + self.connections.get_mut(&token).unwrap() + .write_all(&create_property_request_command()).unwrap(); + debug!("sent session initialization command to client '{}'", local_username); + } + 15 => { // PROPSET + let avatar = parse_property_set_command(cmd); + debug!( + "received property set command from client '{}': {}", + self.clients.get(&token).unwrap(), + avatar, + ); + self.connections.get_mut(&token).unwrap() + .write_all(&create_text_command_with_action( + "WORLDSMASTER", &get_config().unwrap().worldsmaster_greeting, + )).unwrap(); + debug!( + "sent session initialization command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + 29 => { // BUDDYLISTUPDATE + let received_buddy = from_utf8( + cmd.get(4..cmd.get(0).unwrap().to_owned() as usize - 1).unwrap() + ).unwrap(); + debug!( + "received buddy list update command from client '{}': {}", + self.clients.get(&token).unwrap(), + received_buddy, + ); + self.connections.get_mut(&token).unwrap() + .write_all(&create_buddy_list_notify_command(received_buddy)).unwrap(); + debug!( + "sent buddy list notify command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + // 20 => { // ROOMIDRQ + // let room_name = from_utf8( + // cmd.get(4..cmd.get(0).unwrap().to_owned() as usize).unwrap() + // ).unwrap(); + // debug!( + // "received room id request command from client '{}': {}", + // self.clients.get(&token).unwrap(), + // room_name, + // ); + // let room_id; + // if !self.room_ids.contains(&room_name.to_string()) { + // self.room_ids.push(room_name.to_string()); + // room_id = self.room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("inserted room '{}' as '{}'", room_name, room_id); + // } else { + // let position = self.room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("found room '{}' as '{}'", room_name, position); + // room_id = position; + // } + // trace!("room name: {}, room id: {}", room_name, room_id); + // trace!("{:?}", self.room_ids); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_room_id_redirect_command( + // room_name, room_id, + // )).unwrap(); + // } + 14 => { // TEXT + let text = from_utf8( + cmd.get(6..cmd.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + let username = self.clients.get(&token).unwrap().clone(); + debug!( + "received text command from client '{}': {}", + username, format!("room: {}", text), + ); + self.connections.iter_mut().for_each(|t| + t.1.write_all(&create_text_command( + &username, + text, + )).unwrap() + ); + debug!("broadcasted text command to clients"); + } + 7 => { // SESSEXIT + debug!( + "received session exit command from client '{}'", + self.clients.get(&token).unwrap(), + ); + } + _ => (), + } + } + } + Err(ref err) if err.kind() == ErrorKind::WouldBlock => + break, + Err(err) => { error!("unexpected error: {}", err); break; } + } + } + } + _ => (), + } + } + } + } + + fn broadcast( + sockets: &HashMap<Token, TcpStream>, + cmd: &[u8], + ) -> () { + for mut socket in sockets { + socket.1.write_all(cmd).unwrap(); + } + } + + // fn process( + // &mut self, + // _registry: &Registry, + // event: &Event, + // token: Token, + // ) -> Result<bool, Box<dyn Error>> { + // if event.is_readable() { + // let mut connection_closed = false; + // let mut received_data = vec![0; 4096]; + // let mut bytes_read = 0; + // + // let stream = self.connections.get_mut(&token).unwrap(); + // + // loop { + // match stream.read(&mut received_data[bytes_read..]) { + // Ok(0) => { + // connection_closed = true; + // break; + // } + // Ok(n) => { + // bytes_read += n; + // if bytes_read == received_data.len() { + // received_data.resize(received_data.len() + 1024, 0); + // } + // } + // Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + // Err(ref err) if err.kind() == ErrorKind::Interrupted => continue, + // Err(err) => return Err(Box::new(err)), + // } + // } + // + // if bytes_read != 0 { + // self.handle( + // &mut received_data[..bytes_read], + // token, + // ); + // } + // if connection_closed { + // println!("de-registered peer with token '{}'", token.0); + // return Ok(true); + // } + // } + // + // Ok(false) + // } + + // fn handle( + // &mut self, + // data: &[u8], + // // stream: &mut TcpStream, + // token: Token, + // ) -> () { + // // trace!("i am client: {:?}", self.clients.get(&token)); + // // debug!("{:?}", self.connections); + // for cmd in get_commands_from_buffer(BytesMut::from(data)) { + // debug!("received: {:?}", cmd); + // match cmd.get(2).unwrap() { + // 10 => { // PROPREQ + // debug!("received property request command from client 'null'"); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_property_update_command()).unwrap(); + // debug!("sent property update command to client 'null'"); + // } + // 6 => { // SESSINIT + // let local_username = + // parse_session_initialization_command(cmd).username; + // self.clients.insert(token, local_username.clone()); + // debug!( + // "received session initialization command from client '{}'", + // local_username, + // ); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_property_request_command()).unwrap(); + // debug!("sent session initialization command to client '{}'", local_username); + // } + // 15 => { // PROPSET + // let avatar = parse_property_set_command(cmd); + // debug!( + // "received property set command from client '{}': {}", + // self.clients.get(&token).unwrap(), + // avatar, + // ); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_text_command_with_action( + // "WORLDSMASTER", &get_config().unwrap().worldsmaster_greeting, + // )).unwrap(); + // debug!( + // "sent session initialization command to client '{}'", + // self.clients.get(&token).unwrap(), + // ); + // } + // 29 => { // BUDDYLISTUPDATE + // let received_buddy = from_utf8( + // cmd.get(4..cmd.get(0).unwrap().to_owned() as usize - 1).unwrap() + // ).unwrap(); + // debug!( + // "received buddy list update command from client '{}': {}", + // self.clients.get(&token).unwrap(), + // received_buddy, + // ); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_buddy_list_notify_command(received_buddy)).unwrap(); + // debug!( + // "sent buddy list notify command to client '{}'", + // self.clients.get(&token).unwrap(), + // ); + // } + // 20 => { // ROOMIDRQ + // let room_name = from_utf8( + // cmd.get(4..cmd.get(0).unwrap().to_owned() as usize).unwrap() + // ).unwrap(); + // debug!( + // "received room id request command from client '{}': {}", + // self.clients.get(&token).unwrap(), + // room_name, + // ); + // let room_id; + // if !self.room_ids.contains(&room_name.to_string()) { + // self.room_ids.push(room_name.to_string()); + // room_id = self.room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("inserted room '{}' as '{}'", room_name, room_id); + // } else { + // let position = self.room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("found room '{}' as '{}'", room_name, position); + // room_id = position; + // } + // trace!("room name: {}, room id: {}", room_name, room_id); + // trace!("{:?}", self.room_ids); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_room_id_redirect_command( + // room_name, room_id, + // )).unwrap(); + // } + // 14 => { // TEXT + // let text = from_utf8( + // cmd.get(6..cmd.get(0).unwrap().to_owned() as usize).unwrap() + // ).unwrap(); + // let username = self.clients.get(&token).unwrap().clone(); + // debug!( + // "received text command from client '{}': {}", + // username, text, + // ); + // self.connections.iter_mut().for_each(|t| + // t.1.write_all(&create_text_command( + // &username, + // text, + // )).unwrap() + // ); + // debug!("broadcasted text command to clients"); + // } + // 7 => { // SESSEXIT + // debug!( + // "received session exit command from client '{}'", + // self.clients.get(&token).unwrap(), + // ); + // } + // _ => (), + // } + // } + // } +} diff --git a/src/server/room/_server.rs b/src/server/room/_server.rs new file mode 100644 index 0000000..e08c3d7 --- /dev/null +++ b/src/server/room/_server.rs @@ -0,0 +1,168 @@ +use std::error::Error; +use tokio::net::{TcpListener, TcpStream}; +use tokio::io::AsyncWriteExt; +use crate::server::room::cmd::property::{ + create_property_update_command, + create_property_request_command +}; +use tokio_util::codec::{BytesCodec, Decoder}; +use tokio_stream::StreamExt; +use crate::server::cmd::text::{create_text_command_with_action, create_text_command}; +use std::str::from_utf8; +use crate::server::cmd::buddy_list::create_buddy_list_notify_command; +use crate::server::auto::cmd::room::create_room_id_redirect_command; +use std::sync::Arc; +use tokio::sync::Mutex; +use crate::server::shared::Shared; +use crate::server::peer::Peer; +use std::net::SocketAddr; +use crate::server::room::cmd::session::parse_session_initialization_command; +use crate::server::parser::get_commands_from_buffer; +use crate::server::cmd::property::parse_property_set_command; + + +pub struct RoomServer; +impl RoomServer { + pub async fn listen(addr: &str) -> Result<(), Box<dyn Error>> { + let listener = TcpListener::bind(addr).await?; + debug!("RoomServer now listening on {}", listener.local_addr().unwrap()); + let state = Arc::new(Mutex::new(Shared::new())); + let mut counter = 0; + + loop { + let (stream, address) = listener.accept().await?; + counter += 1; + let state = Arc::clone(&state); + + tokio::spawn(async move { + if let Err(e) = RoomServer::handle( + state, + stream, + address, + counter + ).await { + error!("an error occurred: {}", e); + } + }); + } + } + + pub async fn handle( + state: Arc<Mutex<Shared>>, + stream: TcpStream, + address: SocketAddr, + count: usize, + ) -> Result<(), Box<dyn Error>> { + let bytes = BytesCodec::new().framed(stream); + let mut peer = Peer::new(state.clone(), bytes, count.to_string()).await?; + debug!("registered peer with address '{}' as '{}'", address, count); + let mut room_ids: Vec<String> = Vec::new(); + let mut username: String = String::new(); + + loop { + tokio::select! { + Some(msg) = peer.rx.recv() => { + // debug!("received bytes from peer: {:?}", &msg); + peer.bytes.get_mut().write_all(&msg).await?; + } + result = peer.bytes.next() => match result { + Some(Ok(msg)) => { + // let msg: BytesMut = msg; + for msg in get_commands_from_buffer(msg) { + match msg.get(2).unwrap() { + 10 => { // PROPREQ + debug!("received property request command from client"); + peer.bytes.get_mut() + .write_all(&create_property_update_command()).await?; + debug!("sent property update command to client"); + } + 6 => { // SESSINIT + username = parse_session_initialization_command(msg.clone()).username; + debug!( + "received session initialization command from client: {}", + username + ); + peer.bytes.get_mut() + .write_all(&create_property_request_command()).await?; + debug!("sent session initialization command to client"); + } + 15 => { // PROPSET + let avatar = parse_property_set_command(msg.clone()); + debug!("received property set command from client: {}", avatar); + peer.bytes.get_mut() + .write_all(&create_text_command_with_action( + "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")? + )).await?; + debug!("sent worldsmaster greeting to client"); + } + 29 => { // BUDDYLISTUPDATE + let received_buddy = from_utf8( + msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap() + ).unwrap(); + debug!( + "received buddy list update command from client: {}", + received_buddy + ); + peer.bytes.get_mut() + .write_all(&create_buddy_list_notify_command(received_buddy)) + .await?; + debug!("sent buddy list notify command to client: {}", received_buddy); + } + 20 => { // ROOMIDRQ + let room_name = from_utf8( + msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!("received room id request command from client: {}", room_name); + let room_id; + if !room_ids.contains(&room_name.to_string()) { + room_ids.push(room_name.to_string()); + room_id = room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("inserted room '{}' as '{}'", room_name, room_id); + } else { + let position = room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("found room '{}' as '{}'", room_name, position); + room_id = position; + } + trace!("room name: {}, room id: {}", room_name, room_id); + trace!("{:?}", room_ids); + peer.bytes.get_mut() + .write_all(&create_room_id_redirect_command(room_name, room_id)) + .await?; + debug!("sent redirect id command to client: {} == {}", room_name, room_id); + } + 14 => { + let text = from_utf8( + msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!("received text command from client: {}", text); + let mut state = state.lock().await; + state.broadcast(&create_text_command(&username, text)).await; + debug!("broadcasted text command from client"); + } + 7 => { // SESSEXIT + debug!("received session exit command from client") + } + _ => (), + } + } + } + Some(Err(e)) => { + error!("error while processing messages: {}", e); break; + } + None => break, + } + } + } + + { // De-register client + state.lock().await.peers.remove(&count.to_string()); + debug!("removed peer: {}", count) + } + + Ok(()) + } +} diff --git a/src/server/room/cmd/property.rs b/src/server/room/cmd/property.rs index b75efad..3135d0a 100644 --- a/src/server/room/cmd/property.rs +++ b/src/server/room/cmd/property.rs @@ -1,47 +1,37 @@ -pub fn create_property_update_command() -> [u8; 161] { // Vec<u8> - // let mut property = Vec::with_capacity(2); - // property.push(0x01); // ? - // property.push(0x10); // Command type - // - // // Meaningful Data - // property.push(); // Property ID - // property.push(); // Flags - // property.push(); // Access - // - // // Insert data length as first byte. - // property.insert(0, property.len() as u8 + 1); // ^ - // - // property // Return created array +pub fn create_property_update_command() -> [u8; 161] { + // Vec<u8> + // let mut property = Vec::with_capacity(2); + // property.push(0x01); // ? + // property.push(0x10); // Command type + // + // // Meaningful Data + // property.push(); // Property ID + // property.push(); // Flags + // property.push(); // Access + // + // // Insert data length as first byte. + // property.insert(0, property.len() as u8 + 1); // ^ + // + // property // Return created array - [ - 0xA1, 0xFF, 0x10, 0x08, 0x80, 0x01, 0x07, 0x31, - 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x1B, 0x80, - 0x01, 0x0C, 0x77, 0x6F, 0x72, 0x6C, 0x64, 0x73, - 0x33, 0x64, 0x2E, 0x63, 0x6F, 0x6D, 0x1A, 0x80, - 0x01, 0x15, 0x6D, 0x61, 0x69, 0x6C, 0x2E, 0x75, - 0x73, 0x2E, 0x77, 0x6F, 0x72, 0x6C, 0x64, 0x73, - 0x2E, 0x6E, 0x65, 0x74, 0x3A, 0x32, 0x35, 0x19, - 0x80, 0x01, 0x28, 0x68, 0x74, 0x74, 0x70, 0x3A, - 0x2F, 0x2F, 0x77, 0x77, 0x77, 0x2D, 0x64, 0x79, - 0x6E, 0x61, 0x6D, 0x69, 0x63, 0x2E, 0x75, 0x73, - 0x2E, 0x77, 0x6F, 0x72, 0x6C, 0x64, 0x73, 0x2E, - 0x6E, 0x65, 0x74, 0x2F, 0x63, 0x67, 0x69, 0x2D, - 0x62, 0x69, 0x6E, 0x18, 0x80, 0x01, 0x1F, 0x68, - 0x74, 0x74, 0x70, 0x3A, 0x2F, 0x2F, 0x77, 0x77, - 0x77, 0x2D, 0x73, 0x74, 0x61, 0x74, 0x69, 0x63, - 0x2E, 0x75, 0x73, 0x2E, 0x77, 0x6F, 0x72, 0x6C, - 0x64, 0x73, 0x2E, 0x6E, 0x65, 0x74, 0x0F, 0x80, - 0x01, 0x01, 0x33, 0x03, 0x80, 0x01, 0x02, 0x32, - 0x34, 0x01, 0x80, 0x01, 0x0C, 0x57, 0x4F, 0x52, - 0x4C, 0x44, 0x53, 0x4D, 0x41, 0x53, 0x54, 0x45, - 0x52 - ]: [u8; 161] + [ + 0xA1, 0xFF, 0x10, 0x08, 0x80, 0x01, 0x07, 0x31, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x1B, 0x80, + 0x01, 0x0C, 0x77, 0x6F, 0x72, 0x6C, 0x64, 0x73, 0x33, 0x64, 0x2E, 0x63, 0x6F, 0x6D, 0x1A, 0x80, + 0x01, 0x15, 0x6D, 0x61, 0x69, 0x6C, 0x2E, 0x75, 0x73, 0x2E, 0x77, 0x6F, 0x72, 0x6C, 0x64, 0x73, + 0x2E, 0x6E, 0x65, 0x74, 0x3A, 0x32, 0x35, 0x19, 0x80, 0x01, 0x28, 0x68, 0x74, 0x74, 0x70, 0x3A, + 0x2F, 0x2F, 0x77, 0x77, 0x77, 0x2D, 0x64, 0x79, 0x6E, 0x61, 0x6D, 0x69, 0x63, 0x2E, 0x75, 0x73, + 0x2E, 0x77, 0x6F, 0x72, 0x6C, 0x64, 0x73, 0x2E, 0x6E, 0x65, 0x74, 0x2F, 0x63, 0x67, 0x69, 0x2D, + 0x62, 0x69, 0x6E, 0x18, 0x80, 0x01, 0x1F, 0x68, 0x74, 0x74, 0x70, 0x3A, 0x2F, 0x2F, 0x77, 0x77, + 0x77, 0x2D, 0x73, 0x74, 0x61, 0x74, 0x69, 0x63, 0x2E, 0x75, 0x73, 0x2E, 0x77, 0x6F, 0x72, 0x6C, + 0x64, 0x73, 0x2E, 0x6E, 0x65, 0x74, 0x0F, 0x80, 0x01, 0x01, 0x33, 0x03, 0x80, 0x01, 0x02, 0x32, + 0x34, 0x01, 0x80, 0x01, 0x0C, 0x57, 0x4F, 0x52, 0x4C, 0x44, 0x53, 0x4D, 0x41, 0x53, 0x54, 0x45, + 0x52, + ]: [u8; 161] } pub fn create_property_request_command() -> [u8; 22] { - [ - 0x16, 0x01, 0x06, 0x04, 0x01, 0x30, 0x0f, 0x01, - 0x33, 0x08, 0x07, 0x31, 0x30, 0x30, 0x30, 0x30, - 0x30, 0x30, 0x03, 0x02, 0x32, 0x34 - ]: [u8; 22] + [ + 0x16, 0x01, 0x06, 0x04, 0x01, 0x30, 0x0f, 0x01, 0x33, 0x08, 0x07, 0x31, 0x30, 0x30, 0x30, 0x30, + 0x30, 0x30, 0x03, 0x02, 0x32, 0x34, + ]: [u8; 22] } diff --git a/src/server/room/cmd/session.rs b/src/server/room/cmd/session.rs index f04c078..dee3931 100644 --- a/src/server/room/cmd/session.rs +++ b/src/server/room/cmd/session.rs @@ -1,18 +1,20 @@ -use crate::server::cmd::session::SessionInitializationCommand; -use bytes::BytesMut; use std::str::from_utf8; -pub fn parse_session_initialization_command( - command: BytesMut -) -> SessionInitializationCommand { - SessionInitializationCommand { - // protocol: command.get(4..4 + command.get(4)).unwrap().to_owned() as usize, - // client: "".to_string(), - username: from_utf8( - command.get( - 25..(24 + command.get(24).unwrap().to_owned() as usize + 1) - ).unwrap() - ).unwrap().to_string(), - // password: "".to_string() - } +use bytes::BytesMut; + +use crate::server::cmd::session::SessionInitializationCommand; + +pub fn parse_session_initialization_command(command: BytesMut) -> SessionInitializationCommand { + SessionInitializationCommand { + // protocol: command.get(4..4 + command.get(4)).unwrap().to_owned() as usize, + // client: "".to_string(), + username: from_utf8( + command + .get(25..(24 + command.get(24).unwrap().to_owned() as usize + 1)) + .unwrap(), + ) + .unwrap() + .to_string(), + // password: "".to_string() + } } diff --git a/src/server/room/server.rs b/src/server/room/server.rs index e08c3d7..63a13f6 100644 --- a/src/server/room/server.rs +++ b/src/server/room/server.rs @@ -1,168 +1,434 @@ -use std::error::Error; -use tokio::net::{TcpListener, TcpStream}; -use tokio::io::AsyncWriteExt; -use crate::server::room::cmd::property::{ - create_property_update_command, - create_property_request_command +use std::{ + collections::{HashMap, HashSet}, + error::Error, + io::{ErrorKind, Read, Write}, + str::from_utf8, }; -use tokio_util::codec::{BytesCodec, Decoder}; -use tokio_stream::StreamExt; -use crate::server::cmd::text::{create_text_command_with_action, create_text_command}; -use std::str::from_utf8; -use crate::server::cmd::buddy_list::create_buddy_list_notify_command; -use crate::server::auto::cmd::room::create_room_id_redirect_command; -use std::sync::Arc; -use tokio::sync::Mutex; -use crate::server::shared::Shared; -use crate::server::peer::Peer; -use std::net::SocketAddr; -use crate::server::room::cmd::session::parse_session_initialization_command; -use crate::server::parser::get_commands_from_buffer; -use crate::server::cmd::property::parse_property_set_command; +use bytes::BytesMut; +use mio::{ + event::Event, + net::{TcpListener, TcpStream}, + Events, + Interest, + Poll, + Registry, + Token, +}; + +use crate::{ + config::get_config, + server::{ + auto::cmd::{ + property::{create_property_request_command, create_property_update_command}, + room::create_room_id_redirect_command, + }, + cmd::{ + buddy_list::create_buddy_list_notify_command, + property::parse_property_set_command, + text::{create_text_command, create_text_command_with_action}, + }, + parser::get_commands_from_buffer, + room::cmd::session::parse_session_initialization_command, + }, +}; -pub struct RoomServer; +const SERVER: Token = Token(0); + +pub struct RoomServer { + pub clients: HashMap<Token, String>, + pub connections: HashMap<Token, TcpStream>, + pub room_ids: Vec<String>, +} impl RoomServer { - pub async fn listen(addr: &str) -> Result<(), Box<dyn Error>> { - let listener = TcpListener::bind(addr).await?; - debug!("RoomServer now listening on {}", listener.local_addr().unwrap()); - let state = Arc::new(Mutex::new(Shared::new())); - let mut counter = 0; + pub fn listen(&mut self, addr: &str) -> Result<(), Box<dyn Error>> { + let mut listener = TcpListener::bind(addr.parse().unwrap())?; + let mut poll = Poll::new()?; + let mut events = Events::with_capacity(1024); + let mut counter: usize = 0; + // let mut sockets = HashMap::new(); + let mut requests = HashMap::new(); + let mut buffer = [0 as u8; 1024]; + // let mut room_ids = vec![]; + + poll + .registry() + .register(&mut listener, Token(0), Interest::READABLE)?; + + debug!( + "RoomServer now listening on {}", + listener.local_addr().unwrap() + ); + + loop { + poll.poll(&mut events, None)?; + + for event in &events { + match event.token() { + Token(0) => { + loop { + match listener.accept() { + Ok((mut stream, address)) => { + counter += 1; + let token = Token(counter); + + poll + .registry() + .register(&mut stream, token, Interest::READABLE)?; - loop { - let (stream, address) = listener.accept().await?; - counter += 1; - let state = Arc::clone(&state); + debug!( + "registered peer with address '{}' as '{}'", + address, token.0 + ); - tokio::spawn(async move { - if let Err(e) = RoomServer::handle( - state, - stream, - address, - counter - ).await { - error!("an error occurred: {}", e); - } - }); - } - } + // sockets.insert(token, stream); + self.connections.insert(token, stream); + requests.insert(token, Vec::with_capacity(192)); + } + Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + Err(err) => { + error!("unexpected error: {}", err); + poll + .registry() + .deregister(self.connections.get_mut(&Token(counter)).unwrap())?; + break; + } + } + } + } + token if event.is_readable() => { + loop { + let read = self.connections.get_mut(&token).unwrap().read(&mut buffer); + match read { + Ok(0) => { + self.connections.remove(&token); + break; + } + Ok(n) => { + let req = requests.get_mut(&token).unwrap(); + for b in &buffer[0..n] { + req.push(*b); + } - pub async fn handle( - state: Arc<Mutex<Shared>>, - stream: TcpStream, - address: SocketAddr, - count: usize, - ) -> Result<(), Box<dyn Error>> { - let bytes = BytesCodec::new().framed(stream); - let mut peer = Peer::new(state.clone(), bytes, count.to_string()).await?; - debug!("registered peer with address '{}' as '{}'", address, count); - let mut room_ids: Vec<String> = Vec::new(); - let mut username: String = String::new(); + for cmd in get_commands_from_buffer(BytesMut::from(&buffer[..n])) { + match cmd.get(2).unwrap() { + 10 => { + // PROPREQ + debug!("received property request command from client 'null'"); + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_property_update_command()) + .unwrap(); + debug!("sent property update command to client 'null'"); + } + 6 => { + // SESSINIT + let local_username = parse_session_initialization_command(cmd).username; + self.clients.insert(token, local_username.clone()); + debug!( + "received session initialization command from client '{}'", + local_username, + ); + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_property_request_command()) + .unwrap(); + debug!( + "sent session initialization command to client '{}'", + local_username + ); + } + 15 => { + // PROPSET + let avatar = parse_property_set_command(cmd); + debug!( + "received property set command from client '{}': {}", + self.clients.get(&token).unwrap(), + avatar, + ); + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_text_command_with_action( + "WORLDSMASTER", + &get_config().unwrap().worldsmaster_greeting, + )) + .unwrap(); + debug!( + "sent session initialization command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + 29 => { + // BUDDYLISTUPDATE + let received_buddy = from_utf8( + cmd + .get(4..cmd.get(0).unwrap().to_owned() as usize - 1) + .unwrap(), + ) + .unwrap(); + debug!( + "received buddy list update command from client '{}': {}", + self.clients.get(&token).unwrap(), + received_buddy, + ); + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_buddy_list_notify_command(received_buddy)) + .unwrap(); + debug!( + "sent buddy list notify command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + // 20 => { // ROOMIDRQ + // let room_name = from_utf8( + // cmd.get(4..cmd.get(0).unwrap().to_owned() as usize).unwrap() + // ).unwrap(); + // debug!( + // "received room id request command from client '{}': {}", + // self.clients.get(&token).unwrap(), + // room_name, + // ); + // let room_id; + // if !self.room_ids.contains(&room_name.to_string()) { + // self.room_ids.push(room_name.to_string()); + // room_id = self.room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("inserted room '{}' as '{}'", room_name, room_id); + // } else { + // let position = self.room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("found room '{}' as '{}'", room_name, position); + // room_id = position; + // } + // trace!("room name: {}, room id: {}", room_name, room_id); + // trace!("{:?}", self.room_ids); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_room_id_redirect_command( + // room_name, room_id, + // )).unwrap(); + // } + 14 => { + // TEXT + let text = + from_utf8(cmd.get(6..cmd.get(0).unwrap().to_owned() as usize).unwrap()) + .unwrap(); + let username = self.clients.get(&token).unwrap().clone(); + debug!( + "received text command from client '{}': {}", + username, + format!("room: {}", text), + ); + self.connections.iter_mut().for_each(|t| { + t.1 + .write_all(&create_text_command(&username, text)) + .unwrap() + }); + debug!("broadcasted text command to clients"); + } + 7 => { + // SESSEXIT + debug!( + "received session exit command from client '{}'", + self.clients.get(&token).unwrap(), + ); + } + _ => (), + } + } + } + Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + Err(err) => { + error!("unexpected error: {}", err); + break; + } + } + } + } + _ => (), + } + } + } + } - loop { - tokio::select! { - Some(msg) = peer.rx.recv() => { - // debug!("received bytes from peer: {:?}", &msg); - peer.bytes.get_mut().write_all(&msg).await?; - } - result = peer.bytes.next() => match result { - Some(Ok(msg)) => { - // let msg: BytesMut = msg; - for msg in get_commands_from_buffer(msg) { - match msg.get(2).unwrap() { - 10 => { // PROPREQ - debug!("received property request command from client"); - peer.bytes.get_mut() - .write_all(&create_property_update_command()).await?; - debug!("sent property update command to client"); - } - 6 => { // SESSINIT - username = parse_session_initialization_command(msg.clone()).username; - debug!( - "received session initialization command from client: {}", - username - ); - peer.bytes.get_mut() - .write_all(&create_property_request_command()).await?; - debug!("sent session initialization command to client"); - } - 15 => { // PROPSET - let avatar = parse_property_set_command(msg.clone()); - debug!("received property set command from client: {}", avatar); - peer.bytes.get_mut() - .write_all(&create_text_command_with_action( - "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")? - )).await?; - debug!("sent worldsmaster greeting to client"); - } - 29 => { // BUDDYLISTUPDATE - let received_buddy = from_utf8( - msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap() - ).unwrap(); - debug!( - "received buddy list update command from client: {}", - received_buddy - ); - peer.bytes.get_mut() - .write_all(&create_buddy_list_notify_command(received_buddy)) - .await?; - debug!("sent buddy list notify command to client: {}", received_buddy); - } - 20 => { // ROOMIDRQ - let room_name = from_utf8( - msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap() - ).unwrap(); - debug!("received room id request command from client: {}", room_name); - let room_id; - if !room_ids.contains(&room_name.to_string()) { - room_ids.push(room_name.to_string()); - room_id = room_ids.iter() - .position(|i| i == &room_name.to_string()) - .unwrap(); - trace!("inserted room '{}' as '{}'", room_name, room_id); - } else { - let position = room_ids.iter() - .position(|i| i == &room_name.to_string()) - .unwrap(); - trace!("found room '{}' as '{}'", room_name, position); - room_id = position; - } - trace!("room name: {}, room id: {}", room_name, room_id); - trace!("{:?}", room_ids); - peer.bytes.get_mut() - .write_all(&create_room_id_redirect_command(room_name, room_id)) - .await?; - debug!("sent redirect id command to client: {} == {}", room_name, room_id); - } - 14 => { - let text = from_utf8( - msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap() - ).unwrap(); - debug!("received text command from client: {}", text); - let mut state = state.lock().await; - state.broadcast(&create_text_command(&username, text)).await; - debug!("broadcasted text command from client"); - } - 7 => { // SESSEXIT - debug!("received session exit command from client") - } - _ => (), - } - } - } - Some(Err(e)) => { - error!("error while processing messages: {}", e); break; - } - None => break, - } - } - } + fn broadcast(sockets: &HashMap<Token, TcpStream>, cmd: &[u8]) -> () { + for mut socket in sockets { + socket.1.write_all(cmd).unwrap(); + } + } - { // De-register client - state.lock().await.peers.remove(&count.to_string()); - debug!("removed peer: {}", count) - } + // fn process( + // &mut self, + // _registry: &Registry, + // event: &Event, + // token: Token, + // ) -> Result<bool, Box<dyn Error>> { + // if event.is_readable() { + // let mut connection_closed = false; + // let mut received_data = vec![0; 4096]; + // let mut bytes_read = 0; + // + // let stream = self.connections.get_mut(&token).unwrap(); + // + // loop { + // match stream.read(&mut received_data[bytes_read..]) { + // Ok(0) => { + // connection_closed = true; + // break; + // } + // Ok(n) => { + // bytes_read += n; + // if bytes_read == received_data.len() { + // received_data.resize(received_data.len() + 1024, 0); + // } + // } + // Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + // Err(ref err) if err.kind() == ErrorKind::Interrupted => continue, + // Err(err) => return Err(Box::new(err)), + // } + // } + // + // if bytes_read != 0 { + // self.handle( + // &mut received_data[..bytes_read], + // token, + // ); + // } + // if connection_closed { + // println!("de-registered peer with token '{}'", token.0); + // return Ok(true); + // } + // } + // + // Ok(false) + // } - Ok(()) - } + // fn handle( + // &mut self, + // data: &[u8], + // // stream: &mut TcpStream, + // token: Token, + // ) -> () { + // // trace!("i am client: {:?}", self.clients.get(&token)); + // // debug!("{:?}", self.connections); + // for cmd in get_commands_from_buffer(BytesMut::from(data)) { + // debug!("received: {:?}", cmd); + // match cmd.get(2).unwrap() { + // 10 => { // PROPREQ + // debug!("received property request command from client 'null'"); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_property_update_command()).unwrap(); + // debug!("sent property update command to client 'null'"); + // } + // 6 => { // SESSINIT + // let local_username = + // parse_session_initialization_command(cmd).username; + // self.clients.insert(token, local_username.clone()); + // debug!( + // "received session initialization command from client '{}'", + // local_username, + // ); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_property_request_command()).unwrap(); + // debug!("sent session initialization command to client '{}'", local_username); + // } + // 15 => { // PROPSET + // let avatar = parse_property_set_command(cmd); + // debug!( + // "received property set command from client '{}': {}", + // self.clients.get(&token).unwrap(), + // avatar, + // ); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_text_command_with_action( + // "WORLDSMASTER", &get_config().unwrap().worldsmaster_greeting, + // )).unwrap(); + // debug!( + // "sent session initialization command to client '{}'", + // self.clients.get(&token).unwrap(), + // ); + // } + // 29 => { // BUDDYLISTUPDATE + // let received_buddy = from_utf8( + // cmd.get(4..cmd.get(0).unwrap().to_owned() as usize - 1).unwrap() + // ).unwrap(); + // debug!( + // "received buddy list update command from client '{}': {}", + // self.clients.get(&token).unwrap(), + // received_buddy, + // ); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_buddy_list_notify_command(received_buddy)).unwrap(); + // debug!( + // "sent buddy list notify command to client '{}'", + // self.clients.get(&token).unwrap(), + // ); + // } + // 20 => { // ROOMIDRQ + // let room_name = from_utf8( + // cmd.get(4..cmd.get(0).unwrap().to_owned() as usize).unwrap() + // ).unwrap(); + // debug!( + // "received room id request command from client '{}': {}", + // self.clients.get(&token).unwrap(), + // room_name, + // ); + // let room_id; + // if !self.room_ids.contains(&room_name.to_string()) { + // self.room_ids.push(room_name.to_string()); + // room_id = self.room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("inserted room '{}' as '{}'", room_name, room_id); + // } else { + // let position = self.room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("found room '{}' as '{}'", room_name, position); + // room_id = position; + // } + // trace!("room name: {}, room id: {}", room_name, room_id); + // trace!("{:?}", self.room_ids); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_room_id_redirect_command( + // room_name, room_id, + // )).unwrap(); + // } + // 14 => { // TEXT + // let text = from_utf8( + // cmd.get(6..cmd.get(0).unwrap().to_owned() as usize).unwrap() + // ).unwrap(); + // let username = self.clients.get(&token).unwrap().clone(); + // debug!( + // "received text command from client '{}': {}", + // username, text, + // ); + // self.connections.iter_mut().for_each(|t| + // t.1.write_all(&create_text_command( + // &username, + // text, + // )).unwrap() + // ); + // debug!("broadcasted text command to clients"); + // } + // 7 => { // SESSEXIT + // debug!( + // "received session exit command from client '{}'", + // self.clients.get(&token).unwrap(), + // ); + // } + // _ => (), + // } + // } + // } } diff --git a/src/server/shared.rs b/src/server/shared.rs deleted file mode 100644 index 9e01d3d..0000000 --- a/src/server/shared.rs +++ /dev/null @@ -1,40 +0,0 @@ -use std::collections::HashMap; -// use std::net::SocketAddr; -use bytes::BytesMut; -use crate::server::Tx; - -pub struct Shared { - pub peers: HashMap<String, Tx>, -} -impl Shared { - pub fn new() -> Self { - Shared { - peers: HashMap::new(), - } - } - - pub async fn broadcast(&mut self, /* sender: &str, */ message: &[u8]) { - // debug!("peer sent message: {:?}", message); - // debug!("peer count: {}", self.peers.len()); - // debug!("peers: {:?}", self.peers); - for peer in self.peers.iter_mut() { - // debug!("peer: {:?}", peer); - // TODO: - // thread 'tokio-runtime-worker' panicked at 'called `Option::unwrap()` on a `None` value' - peer.1.send(BytesMut::from(message)).unwrap(); - } - } - - // pub async fn broadcast(&mut self, sender: SocketAddr, message: &str) { - // for peer in self.peers.iter_mut() { - // if *peer.0 != sender { - // let _ = peer.1.send(message.into()); - // } - // } - // } -} -impl Default for Shared { - fn default() -> Self { - Self::new() - } -} |