diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/db/mod.rs | 1 | ||||
| -rw-r--r-- | src/db/routines.rs | 62 | ||||
| -rw-r--r-- | src/lib.rs | 6 | ||||
| -rw-r--r-- | src/main.rs | 44 | ||||
| -rw-r--r-- | src/server/auto/cmd/mod.rs | 1 | ||||
| -rw-r--r-- | src/server/auto/cmd/property.rs | 36 | ||||
| -rw-r--r-- | src/server/auto/cmd/room.rs | 7 | ||||
| -rw-r--r-- | src/server/auto/cmd/session.rs | 18 | ||||
| -rw-r--r-- | src/server/auto/constants.rs | 13 | ||||
| -rw-r--r-- | src/server/auto/mod.rs | 3 | ||||
| -rw-r--r-- | src/server/auto/server.rs | 480 | ||||
| -rw-r--r-- | src/server/cmd/buddy_list.rs | 2 | ||||
| -rw-r--r-- | src/server/cmd/mod.rs | 2 | ||||
| -rw-r--r-- | src/server/cmd/session.rs | 6 | ||||
| -rw-r--r-- | src/server/mod.rs | 11 | ||||
| -rw-r--r-- | src/server/peer.rs | 26 | ||||
| -rw-r--r-- | src/server/room/cmd/mod.rs | 1 | ||||
| -rw-r--r-- | src/server/room/cmd/session.rs | 18 | ||||
| -rw-r--r-- | src/server/room/mod.rs | 1 | ||||
| -rw-r--r-- | src/server/room/server.rs | 351 | ||||
| -rw-r--r-- | src/server/shared.rs | 35 | ||||
| -rw-r--r-- | src/server/structures.rs (renamed from src/server/room/structures.rs) | 0 | ||||
| -rw-r--r-- | src/server/utils.rs | 13 | ||||
| -rw-r--r-- | src/utils/db.rs | 11 | ||||
| -rw-r--r-- | src/utils/mod.rs | 1 | ||||
| -rw-r--r-- | src/utils/web.rs | 7 |
26 files changed, 528 insertions, 628 deletions
diff --git a/src/db/mod.rs b/src/db/mod.rs index 461c354..5ec9ffb 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -1,2 +1 @@ -// mod routines; mod tables; diff --git a/src/db/routines.rs b/src/db/routines.rs deleted file mode 100644 index 884944d..0000000 --- a/src/db/routines.rs +++ /dev/null @@ -1,62 +0,0 @@ -use rusqlite::{params, Connection, Result}; -use crate::db::tables::SerialNumbers; - -#[repr(i32)] #[derive(Debug, PartialEq)] -enum AccountStatus { - AccountInactive = 0, - AccountActive = 1, -} - -fn modify_account_status(username: &str, status: AccountStatus) -> Result<()> { - let connection = Connection::open("worlds.db")?; - - // language=SQLite - connection.execute( - "UPDATE user_registration \ - SET account_status = (?1) \ - where username = '(?2)'", - params![status as i32, username] - )?; - - Ok(()) -} - -fn delete_account(username: &str) -> Result<()> { - let connection = Connection::open("worlds.db")?; - - // Get serial_number from `username`'s row. - // language=SQLite - connection.query_row( - "SELECT * FROM serial_numbers WHERE username = '(?1)'", - params![username], - |row| row.get(0) - ); - - let mut row = connection.prepare( - "SELECT * \ - FROM serial_numbers \ - WHERE username = '(?1)';" - )?; - let row_iter = row.query_map(params![username], |row| { - Ok(SerialNumbers { - serial_number: row.get(0)?, - user_name: row.get(1)?, - serial_status: row.get(2)?, - }) - })?; - - // Reset serial number so it can be reused. - // language=SQLite - connection.execute( - "UPDATE serial_numbers \ - SET username = 'none', serial_status = 0 \ - WHERE serial_number = '(?1)'", - params![row_iter.] - )?; - - Ok(()) -} - -fn set_account_host() { } - -fn modify_account_vip() { } @@ -1,9 +1,9 @@ -#![feature(type_ascription)] -#![feature(hash_set_entry)] +#![feature(type_ascription, hash_set_entry, type_name_of_val)] +#![warn(rust_2018_idioms)] #[macro_use] extern crate log; pub mod db; pub mod server; -pub mod utils; +pub mod utils;
\ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 92a35d0..35c839d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,32 +1,18 @@ -#[macro_use] -extern crate log; +use whirl::server::auto::server::AutoServer; +use std::error::Error; +use whirl::server::room::server::RoomServer; -use mio::net::TcpListener; -use std::thread; -use whirl::server; +#[tokio::main] +async fn main() -> Result<(), Box<dyn Error>> { + dotenv::dotenv().ok(); + pretty_env_logger::init(); -fn main() { - dotenv::dotenv().ok(); // Adds ability to use environment variables - pretty_env_logger::init(); // Adds pretty logging - - let mut threads = vec![]; - threads.push(thread::spawn(move || { - debug!("spawned AutoServer thread"); - server::auto::server::AutoServer::new( - TcpListener::bind( - &"0.0.0.0:6650".parse().unwrap() - ).unwrap() - ); - })); - threads.push(thread::spawn(move || { - debug!("spawned RoomServer thread"); - server::room::server::RoomServer::new( - TcpListener::bind( - &"0.0.0.0:5673".parse().unwrap() - ).unwrap() - ); - })); - for thread in threads { - let _ = thread.join(); // Handle Result by dissolving it. + loop { + tokio::spawn(async move { + let _ = AutoServer::new("0.0.0.0:6650").await; + }); + tokio::spawn(async move { + let _ = RoomServer::new("0.0.0.0:5673").await; + }); } -} +}
\ No newline at end of file diff --git a/src/server/auto/cmd/mod.rs b/src/server/auto/cmd/mod.rs index 5430662..d2345c6 100644 --- a/src/server/auto/cmd/mod.rs +++ b/src/server/auto/cmd/mod.rs @@ -1,2 +1,3 @@ pub mod property; pub mod room; +pub mod session; diff --git a/src/server/auto/cmd/property.rs b/src/server/auto/cmd/property.rs index 01d4f43..b8d0085 100644 --- a/src/server/auto/cmd/property.rs +++ b/src/server/auto/cmd/property.rs @@ -1,38 +1,4 @@ -// struct NetToProperty { -// _prop_id: i32, -// _flags: i32, -// _access: i32, -// _string_value: String, -// _bin_value: Vec<i32>, -// } -// impl NetToProperty { -// fn parse_net_data() -> Self { -// NetToProperty { -// _prop_id: 0, -// _flags: 0, -// _access: 0, -// _string_value: "".to_string(), -// _bin_value: vec![] -// } -// } -// } - -// TODO: Decode received data and send back a valid response. -pub fn create_property_update_command() -> [u8; 147] { // Vec<u8> - // let mut property = Vec::with_capacity(2); - // property.push(0x01); // ? - // property.push(0x10); // Command type - // - // // Meaningful Data - // property.push(); // Property ID - // property.push(); // Flags - // property.push(); // Access - // - // // Insert data length as first byte. - // property.insert(0, property.len() as u8 + 1); // ^ - // - // property // Return created array - +pub fn create_property_update_command() -> [u8; 147] { [ 0x93, 0xFF, 0x10, 0x1B, 0x80, 0x01, 0x0C, 0x77, 0x6F, 0x72, 0x6C, 0x64, 0x73, 0x33, 0x64, 0x2E, 0x63, 0x6F, diff --git a/src/server/auto/cmd/room.rs b/src/server/auto/cmd/room.rs index ff839f3..78498fe 100644 --- a/src/server/auto/cmd/room.rs +++ b/src/server/auto/cmd/room.rs @@ -18,6 +18,13 @@ pub fn create_room_id_redirect_command(room_name: &str, room_id: usize) -> Vec<u room_id_redirect.push(0x00); // Port + // for byte in convert_u16_to_two_u8s_be(0x1629).iter() { + // room_id_redirect.push(*byte); + // } + // Port + // for byte in convert_u16_to_two_u8s_be(5673_i32 as u16).iter() { + // room_id_redirect.push(*byte); + // } room_id_redirect.push(0x16); room_id_redirect.push(0x29); diff --git a/src/server/auto/cmd/session.rs b/src/server/auto/cmd/session.rs new file mode 100644 index 0000000..8e8fbb7 --- /dev/null +++ b/src/server/auto/cmd/session.rs @@ -0,0 +1,18 @@ +use crate::server::cmd::session::SessionInitializationCommand; +use bytes::BytesMut; +use std::str::from_utf8; + +pub fn parse_session_initialization_command( + command: BytesMut +) -> SessionInitializationCommand { + SessionInitializationCommand { + // protocol: command.get(4..4 + command.get(4)).unwrap().to_owned() as usize, + // client: "".to_string(), + username: from_utf8( + command.get( + 21..(20 + command.get(20).unwrap().to_owned() as usize + 1) + ).unwrap() + ).unwrap().to_string(), + // password: "".to_string() + } +} diff --git a/src/server/auto/constants.rs b/src/server/auto/constants.rs deleted file mode 100644 index 10c9439..0000000 --- a/src/server/auto/constants.rs +++ /dev/null @@ -1,13 +0,0 @@ -use std::collections::HashMap; - -pub static ROOM_IDS: phf::Map<&'static str, &'static i32> = phf::phf_map! { -"ChatElevator<dimension-1>" => &4, -"IconViewRoom1Enter<dimension-1>" => &3, -"ChatHall<dimension-1>" => &9, -"IconViewRoom1<dimension-1>" => &12, -"IconViewRoom1g<dimension-1>" => &27230, -"ReceptionView1<dimension-1>" => &6, -"ReceptionView2<dimension-1>" => &5, -"Reception<dimension-1>" => &1, -"staircase1<dimension-1>" => &11, -}; diff --git a/src/server/auto/mod.rs b/src/server/auto/mod.rs index 741d763..0a6f342 100644 --- a/src/server/auto/mod.rs +++ b/src/server/auto/mod.rs @@ -1,3 +1,2 @@ -pub mod cmd; -// pub mod constants; pub mod server; +pub mod cmd; diff --git a/src/server/auto/server.rs b/src/server/auto/server.rs index 381852f..c05ed5e 100644 --- a/src/server/auto/server.rs +++ b/src/server/auto/server.rs @@ -1,266 +1,250 @@ -use mio::net::{TcpListener, TcpStream}; -use std::io::{Read, Write}; -use mio::{Poll, Token, Ready, PollOpt, Events}; -use std::collections::{HashMap, HashSet}; +use std::error::Error; +use tokio::net::{TcpListener, TcpStream}; +use tokio::io::AsyncWriteExt; +use crate::server::auto::cmd::property::{ + create_property_update_command, + create_property_request_command +}; +use tokio_util::codec::{BytesCodec, Decoder}; +use tokio_stream::StreamExt; +use bytes::BytesMut; +use crate::server::cmd::text::{create_text_command_with_action, create_text_command}; use std::str::from_utf8; use crate::server::cmd::buddy_list::create_buddy_list_notify_command; -use crate::server::cmd::text::{create_text_command, create_text_command_with_action}; -use super::cmd::property::{create_property_update_command, create_property_request_command}; -use super::cmd::room::create_room_id_redirect_command; -use rand::Rng; -use crate::server::utils::broadcast_to_all_clients; - -// pub struct ClientSocket { -// tcp_stream: TcpStream, -// username: String, -// } +use crate::server::auto::cmd::room::create_room_id_redirect_command; +use std::sync::Arc; +use tokio::sync::Mutex; +use crate::server::shared::Shared; +use crate::server::peer::Peer; +use std::net::SocketAddr; +use crate::server::auto::cmd::session::parse_session_initialization_command; pub struct AutoServer; impl AutoServer { - pub fn new(listener: TcpListener) { - let poll = Poll::new().unwrap(); - poll.register( - &listener, - Token(0), - Ready::readable(), - PollOpt::edge() - ).unwrap(); - - let mut counter: usize = 0; - let mut sockets: HashMap<Token, TcpStream> = HashMap::new(); - let mut requests: HashMap<Token, Vec<u8>> = HashMap::new(); - let mut buffer = [0 as u8; 1024]; - // let mut room_ids: HashMap<&str, i32> = HashMap::new(); - let mut room_ids: HashSet<String> = HashSet::new(); + pub async fn new(addr: &'static str) -> Result<(), Box<dyn Error>> { + let listener = TcpListener::bind(addr).await?; + debug!("AutoServer now listening on {}", listener.local_addr().unwrap()); + let state = Arc::new(Mutex::new(Shared::new())); + let mut counter = 0; - let mut events = Events::with_capacity(1024); loop { - poll.poll(&mut events, None).unwrap(); - for event in &events { - match event.token() { - Token(0) => { - loop { - match listener.accept() { - Ok((socket, address)) => { - counter += 1; - let token = Token(counter); - - poll.register( - &socket, - token, - Ready::readable(), - PollOpt::edge() - ).unwrap(); + let (stream, address) = listener.accept().await?; + counter += 1; + let state = Arc::clone(&state); + + tokio::spawn(async move { + if let Err(e) = AutoServer::handle( + state, + stream, + address, + counter + ).await { + error!("an error occurred: {}", e); + } + }); + } + } - info!( - "registered client with ip '{}' as token '{}'", - address.ip(), token.0 - ); + pub async fn handle( + state: Arc<Mutex<Shared>>, + stream: TcpStream, + address: SocketAddr, + count: usize, + ) -> Result<(), Box<dyn Error>> { + let bytes = BytesCodec::new().framed(stream); + let mut peer = Peer::new(state.clone(), bytes, count.to_string()).await?; + debug!("registered peer with address '{}' as '{}'", address, count); + let mut room_ids: Vec<String> = Vec::new(); + let mut username: String = String::new(); + + // while let Some(msg) = peer.bytes.next().await { + // match msg { + // Ok(bytes) => match bytes.get(2).unwrap() { + // 10 => { // PROPREQ + // debug!("received property request command from client"); + // peer.bytes.get_mut() + // .write_all(&create_property_update_command()).await?; + // + // debug!("sent property update command to client"); + // } + // 6 => { // SESSINIT + // username = parse_session_initialization_command(bytes).username; + // debug!( + // "received session initialization command from client: {}", + // username + // ); + // peer.bytes.get_mut() + // .write_all(&create_property_request_command()).await?; + // debug!("sent session initialization command to client"); + // } + // 15 => { // PROPSET + // debug!("received property set command from client"); + // peer.bytes.get_mut() + // .write_all(&create_text_command_with_action( + // "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")? + // )).await?; + // debug!("sent worldsmaster greeting to client"); + // } + // 29 => { // BUDDYLISTUPDATE + // let received_buddy = from_utf8( + // bytes.get(4..bytes.get(0).unwrap().to_owned() as usize - 1).unwrap() + // ).unwrap(); + // debug!( + // "received buddy list update command from client: {}", + // received_buddy + // ); + // peer.bytes.get_mut() + // .write_all(&create_buddy_list_notify_command(received_buddy)) + // .await?; + // debug!("sent buddy list notify command to client"); + // } + // 20 => { // ROOMIDRQ + // let room_name = from_utf8( + // bytes.get(4..bytes.get(0).unwrap().to_owned() as usize).unwrap() + // ).unwrap(); + // debug!("received room id request command from client: {}", room_name); + // let room_id; + // if !room_ids.contains(&room_name.to_string()) { + // room_ids.push(room_name.to_string()); + // room_id = room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("inserted room '{}' as '{}'", room_name, room_id); + // } else { + // let position = room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("found room '{}' as '{}'", room_name, position); + // room_id = position; + // } + // trace!("room name: {}, room id: {}", room_name, room_id); + // trace!("{:?}", room_ids); + // peer.bytes.get_mut() + // .write_all(&create_room_id_redirect_command(room_name, room_id)) + // .await?; + // debug!("sent redirect id command to client"); + // } + // 14 => { + // let text = from_utf8( + // bytes.get(6..bytes.get(0).unwrap().to_owned() as usize).unwrap() + // ).unwrap(); + // debug!("received text command from client: {}", text); + // let mut state = state.lock().await; + // state.broadcast(&create_text_command(&username, text)).await; + // debug!("broadcasted text command from client"); + // } + // 7 => { // SESSEXIT + // debug!("received session exit command from client") + // } + // _ => (), + // } + // Err(err) => error!("stream closed with error: {:?}", err), + // } + // } - sockets.insert(token, socket); - requests.insert(token, Vec::with_capacity(192)); - } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => - break, - Err(e) => { - error!("unexpected error: {}", e); - poll.deregister(sockets.get(&Token(counter)).unwrap()).unwrap(); - break; - } + loop { + tokio::select! { + Some(msg) = peer.rx.recv() => { + // debug!("received bytes from peer: {:?}", &msg); + peer.bytes.get_mut().write_all(&msg).await?; + } + result = peer.bytes.next() => match result { + Some(Ok(msg)) => { + let msg: BytesMut = msg; + match msg.get(2).unwrap() { + 10 => { // PROPREQ + debug!("received property request command from client"); + peer.bytes.get_mut() + .write_all(&create_property_update_command()).await?; + + debug!("sent property update command to client"); } - } - }, - token if event.readiness().is_readable() => { - loop { - let read = sockets.get_mut(&token).unwrap().read(&mut buffer); - match read { - Ok(0) => { sockets.remove(&token); break; } - Ok(n) => { - let req = requests.get_mut(&token).unwrap(); - for b in &buffer[0..n] { - req.push(*b); - } - - // First byte means how long data section of the packet is. - // Second byte is to be determined. - // Third byte is the request type. - - // Match packet type by descriptor; **third** byte. - match &buffer.get(2).unwrap() { // Third byte is 2 because Rust is zero-indexed. - // PROPREQ - 10 => { - debug!( - "received property request command from client with token '{}'", - token.0 - ); - sockets.get_mut(&token).unwrap() - .write_all(&create_property_update_command()).unwrap(); - debug!( - "sent property update command to client with token '{}'", - token.0 - ); - } - // SESSINIT - 6 => { - debug!( - "received session initialization command from client with token '{}'", - token.0 - ); - sockets.get_mut(&token).unwrap() - .write_all(&create_property_request_command()).unwrap(); - debug!( - "sent session initialization command to client with token '{}'", - token.0 - ); - } - // PROPSET - 15 => { - debug!( - "received property set command from client with token '{}'", - token.0 - ); - sockets.get_mut(&token).unwrap() - .write_all(&create_text_command_with_action( - "WORLDSMASTER", - "Welcome to Whirlsplash!" - )).unwrap(); - debug!( - "sent worldsmaster greeting to client with token '{}'", - token.0 - ); - }, - // BUDDYLISTUPDATE - 29 => { - debug!( - "received buddy list update command from client with token '{}'", - token.0 - ); - - let received_buddy = from_utf8( - &buffer[4..*&buffer.get(0).unwrap().to_owned() as usize - 1] - ).unwrap(); - debug!("received buddy: {}", received_buddy); - - sockets.get_mut(&token).unwrap() - .write_all(&create_buddy_list_notify_command(received_buddy)) - .unwrap(); - debug!( - "sent buddy notify update command to client with token '{}'", - token.0 - ); - } - // ROOMIDRQ - 20 => { - debug!( - "received room id request command from client with token '{}'", - token.0 - ); - - let room_name = from_utf8( - &buffer[4..*&buffer.get(0).unwrap().to_owned() as usize] - ).unwrap(); - let mut room_id = 0; - if !room_ids.contains(room_name) { - room_ids.insert(room_name.to_string()); - room_id = room_ids.iter() - .position(|i| i == room_name) - .unwrap(); - trace!("inserted room '{}' as '{}'", room_name, room_id); - } else { - let pos = room_ids - .iter() - .position(|i| i == room_name) - .unwrap(); - trace!("found room '{}' as '{}'", room_name, pos); - room_id = pos; - } - trace!("room name: {}, room id: {}", room_name, room_id); - trace!("{:?}", room_ids); - - // Passing `0` as `room_id` parameter as currently there is - // no way to find out a room's ID based on it's name. - sockets.get_mut(&token).unwrap() - .write_all(&create_room_id_redirect_command(room_name, room_id)) - .unwrap(); - debug!("sent redirect id command to client with token '{}'", token.0) - } - // TEXT - 14 => { - debug!("received text command from client with token '{}'", token.0); - - // TODO: Make this into a command! - let message = from_utf8( - &buffer[6..*&buffer.get(0).unwrap().to_owned() as usize] - ).unwrap(); - trace!("message: {}", message); - - // Using User as a placeholder. Ideally, this would print out the username of - // the one who sent it. - broadcast_to_all_clients( - &sockets, - &create_text_command( - // Random integer is added to the end of "User", just a development - // proof-of-concept. Since at this stage usernames aren't exactly kept, - // we can identify message senders as their connection token; `token.0`. - &format!("User{}", rand::thread_rng().gen_range(1..150).to_string()), - message - ) - ); - debug!( - "broadcasted text command from client with token '{}'", - token.0 - ); - } - // SESSEXIT - 7 => { - debug!( - "received session termination command from client with token '{}'", - token.0 - ); - poll.deregister(sockets.get(&token).unwrap()).unwrap(); - debug!("de-registered client with token '{}'", token.0); - } - // Anything else, do nothing. - _ => () - } + 6 => { // SESSINIT + username = parse_session_initialization_command(msg).username; + debug!( + "received session initialization command from client: {}", + username + ); + peer.bytes.get_mut() + .write_all(&create_property_request_command()).await?; + debug!("sent session initialization command to client"); + } + 15 => { // PROPSET + debug!("received property set command from client"); + peer.bytes.get_mut() + .write_all(&create_text_command_with_action( + "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")? + )).await?; + debug!("sent worldsmaster greeting to client"); + } + 29 => { // BUDDYLISTUPDATE + let received_buddy = from_utf8( + msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap() + ).unwrap(); + debug!( + "received buddy list update command from client: {}", + received_buddy + ); + peer.bytes.get_mut() + .write_all(&create_buddy_list_notify_command(received_buddy)) + .await?; + debug!("sent buddy list notify command to client"); + } + 20 => { // ROOMIDRQ + let room_name = from_utf8( + msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!("received room id request command from client: {}", room_name); + let room_id; + if !room_ids.contains(&room_name.to_string()) { + room_ids.push(room_name.to_string()); + room_id = room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("inserted room '{}' as '{}'", room_name, room_id); + } else { + let position = room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("found room '{}' as '{}'", room_name, position); + room_id = position; } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => - break, - Err(e) => { error!("unexpected error: {}", e); break; } + trace!("room name: {}, room id: {}", room_name, room_id); + trace!("{:?}", room_ids); + peer.bytes.get_mut() + .write_all(&create_room_id_redirect_command(room_name, room_id)) + .await?; + debug!("sent redirect id command to client"); + } + 14 => { + let text = from_utf8( + msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!("received text command from client: {}", text); + let mut state = state.lock().await; + state.broadcast(&create_text_command(&username, text)).await; + debug!("broadcasted text command from client"); } + 7 => { // SESSEXIT + debug!("received session exit command from client") + } + _ => (), } - - // Unimplemented - // let ready = requests.get(&token).unwrap() - // .windows(4) - // .find(|window| is_double_crnl(*window)) - // .is_some(); - // - // if ready { - // let socket = sockets.get(&token).unwrap(); - // poll.reregister( - // socket, - // token, - // Ready::writable(), - // PollOpt::edge() | PollOpt::oneshot()).unwrap(); - // } - }, - // Unimplemented - // token if event.readiness().is_writable() => { - // println!("writeable"); - // requests.get_mut(&token).unwrap().clear(); - // sockets.get_mut(&token).unwrap().write_all("test".as_bytes()).unwrap(); - // - // // Re-use existing connection ("keep-alive") - switch back to reading - // poll.reregister( - // sockets.get(&token).unwrap(), - // token, - // Ready::readable(), - // PollOpt::edge()).unwrap(); - // }, - _ => () + } + Some(Err(e)) => { + error!("error while processing messages: {}", e); break; + } + None => break, } } } + + // De-register client + { + state.lock().await.peers.remove(&count.to_string()); + debug!("removed peer: {}", count) + } + + Ok(()) } } diff --git a/src/server/cmd/buddy_list.rs b/src/server/cmd/buddy_list.rs index 1423808..cbddc19 100644 --- a/src/server/cmd/buddy_list.rs +++ b/src/server/cmd/buddy_list.rs @@ -1,5 +1,3 @@ -/// In the future, this will take a `Vec` of buddies and dynamically -/// create a response packet based on the amount of buddies given. pub fn create_buddy_list_notify_command(buddy: &str) -> Vec<u8> { let mut buddy_list_notify = Vec::with_capacity(5 + buddy.len()); buddy_list_notify.push(0x01); // ? diff --git a/src/server/cmd/mod.rs b/src/server/cmd/mod.rs index c211484..196a7a5 100644 --- a/src/server/cmd/mod.rs +++ b/src/server/cmd/mod.rs @@ -1,3 +1,3 @@ pub mod buddy_list; -// pub mod property; +pub mod session; pub mod text; diff --git a/src/server/cmd/session.rs b/src/server/cmd/session.rs new file mode 100644 index 0000000..9d48bcc --- /dev/null +++ b/src/server/cmd/session.rs @@ -0,0 +1,6 @@ +pub struct SessionInitializationCommand { + // pub protocol: usize, + // pub client: String, + pub username: String, + // pub password: String, +} diff --git a/src/server/mod.rs b/src/server/mod.rs index cb241f2..3698d84 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,4 +1,11 @@ +use tokio::sync::mpsc; +use bytes::BytesMut; + pub mod auto; -pub mod cmd; +mod cmd; pub mod room; -pub mod utils; +mod shared; +mod peer; + +type Tx = mpsc::UnboundedSender<BytesMut>; +type Rx = mpsc::UnboundedReceiver<BytesMut>; diff --git a/src/server/peer.rs b/src/server/peer.rs new file mode 100644 index 0000000..e92a3db --- /dev/null +++ b/src/server/peer.rs @@ -0,0 +1,26 @@ +use tokio_util::codec::{Framed, BytesCodec}; +use tokio::net::TcpStream; +use crate::server::Rx; +use std::sync::Arc; +use tokio::sync::Mutex; +use crate::server::shared::Shared; +use tokio::sync::mpsc; + +pub struct Peer { + pub bytes: Framed<TcpStream, BytesCodec>, + // pub(crate) rx: Rx, + pub rx: Rx, +} +impl Peer { + pub async fn new( + state: Arc<Mutex<Shared>>, + bytes: Framed<TcpStream, BytesCodec>, + username: String, + ) -> std::io::Result<Peer> { + // let address = bytes.get_ref().peer_addr()?; + let (tx, rx) = mpsc::unbounded_channel(); + // state.lock().await.peers.insert(address, tx); + state.lock().await.peers.insert(username, tx); + Ok(Peer { bytes, rx }) + } +} diff --git a/src/server/room/cmd/mod.rs b/src/server/room/cmd/mod.rs index f348cd0..1d123c0 100644 --- a/src/server/room/cmd/mod.rs +++ b/src/server/room/cmd/mod.rs @@ -1 +1,2 @@ pub mod property; +pub mod session; diff --git a/src/server/room/cmd/session.rs b/src/server/room/cmd/session.rs new file mode 100644 index 0000000..f04c078 --- /dev/null +++ b/src/server/room/cmd/session.rs @@ -0,0 +1,18 @@ +use crate::server::cmd::session::SessionInitializationCommand; +use bytes::BytesMut; +use std::str::from_utf8; + +pub fn parse_session_initialization_command( + command: BytesMut +) -> SessionInitializationCommand { + SessionInitializationCommand { + // protocol: command.get(4..4 + command.get(4)).unwrap().to_owned() as usize, + // client: "".to_string(), + username: from_utf8( + command.get( + 25..(24 + command.get(24).unwrap().to_owned() as usize + 1) + ).unwrap() + ).unwrap().to_string(), + // password: "".to_string() + } +} diff --git a/src/server/room/mod.rs b/src/server/room/mod.rs index f581dfe..24606ea 100644 --- a/src/server/room/mod.rs +++ b/src/server/room/mod.rs @@ -1,3 +1,2 @@ pub mod cmd; pub mod server; -mod structures; diff --git a/src/server/room/server.rs b/src/server/room/server.rs index c005e40..8781868 100644 --- a/src/server/room/server.rs +++ b/src/server/room/server.rs @@ -1,222 +1,167 @@ -use mio::net::{TcpListener, TcpStream}; -use std::io::{Read, Write}; -use mio::{Poll, Token, Ready, PollOpt, Events}; -use std::collections::HashMap; +use std::error::Error; +use tokio::net::{TcpListener, TcpStream}; +use tokio::io::AsyncWriteExt; +use crate::server::room::cmd::property::{ + create_property_update_command, + create_property_request_command +}; +use tokio_util::codec::{BytesCodec, Decoder}; +use tokio_stream::StreamExt; +use bytes::BytesMut; +use crate::server::cmd::text::{create_text_command_with_action, create_text_command}; use std::str::from_utf8; use crate::server::cmd::buddy_list::create_buddy_list_notify_command; -use crate::server::cmd::text::create_text_command; -// use crate::cmd::property::{create_property_update_command, create_property_request_command}; -use crate::server::room::cmd::property::{create_property_update_command, create_property_request_command}; -use rand::Rng; -use crate::server::utils::broadcast_to_all_clients; +use crate::server::auto::cmd::room::create_room_id_redirect_command; +use std::sync::Arc; +use tokio::sync::Mutex; +use crate::server::shared::Shared; +use crate::server::peer::Peer; +use std::net::SocketAddr; +use crate::server::room::cmd::session::parse_session_initialization_command; pub struct RoomServer; impl RoomServer { - pub fn new(listener: TcpListener) { - let poll = Poll::new().unwrap(); - poll.register( - &listener, - Token(0), - Ready::readable(), - PollOpt::edge() - ).unwrap(); + pub async fn new(addr: &'static str) -> Result<(), Box<dyn Error>> { + let listener = TcpListener::bind(addr).await?; + debug!("RoomServer now listening on {}", listener.local_addr().unwrap()); + let state = Arc::new(Mutex::new(Shared::new())); + let mut counter = 0; - let mut counter: usize = 0; - let mut sockets: HashMap<Token, TcpStream> = HashMap::new(); - let mut requests: HashMap<Token, Vec<u8>> = HashMap::new(); - let mut buffer = [0 as u8; 1024]; - - let mut events = Events::with_capacity(1024); loop { - poll.poll(&mut events, None).unwrap(); - for event in &events { - match event.token() { - Token(0) => { - loop { - match listener.accept() { - Ok((socket, address)) => { - counter += 1; - let token = Token(counter); - - poll.register( - &socket, - token, - Ready::readable(), - PollOpt::edge() - ).unwrap(); - - info!( - "registered client with ip '{}' as token '{}'", - address.ip(), token.0 - ); - - sockets.insert(token, socket); - requests.insert(token, Vec::with_capacity(192)); - } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => - break, - Err(e) => { - error!("unexpected error: {}", e); - poll.deregister(sockets.get(&Token(counter)).unwrap()).unwrap(); - break; - } - } - } - }, - token if event.readiness().is_readable() => { - loop { - let read = sockets.get_mut(&token).unwrap().read(&mut buffer); - match read { - Ok(0) => { sockets.remove(&token); break; } - Ok(n) => { - let req = requests.get_mut(&token).unwrap(); - for b in &buffer[0..n] { - req.push(*b); - } + let (stream, address) = listener.accept().await?; + counter += 1; + let state = Arc::clone(&state); - // First byte means how long data section of the packet is. - // Second byte is to be determined. - // Third byte is the request type. + tokio::spawn(async move { + if let Err(e) = RoomServer::handle( + state, + stream, + address, + counter + ).await { + error!("an error occurred: {}", e); + } + }); + } + } - // Match packet type by descriptor; **third** byte. - match &buffer.get(2).unwrap() { // Third byte is 2 because Rust is zero-indexed. - // PROPREQ - 10 => { - debug!( - "received property request command from client with token '{}'", - token.0 - ); - sockets.get_mut(&token).unwrap() - .write_all(&create_property_update_command()).unwrap(); - debug!( - "sent property update from client with token '{}'", - token.0 - ); - } - // SESSINIT - 6 => { - debug!( - "received session initialization command from client with token '{}'", - token.0 - ); - sockets.get_mut(&token).unwrap() - .write_all(&create_property_request_command()).unwrap(); - debug!( - "sent session initialization command from client with token '{}'", - token.0 - ); - } - // PROPSET - 15 => debug!( - "received property set command from client with token '{}'", - token.0 - ), - // BUDDYLISTUPDATE - 29 => { - debug!( - "received buddy list update command from client with token '{}'", - token.0 - ); - sockets.get_mut(&token).unwrap() - .write_all(&create_buddy_list_notify_command("Wirlaburla")) - .unwrap(); - debug!( - "sent buddy notify update command from client with token '{}'", - token.0 - ); - } - // ROOMIDRQ - 20 => debug!( - "received room id request command from client with token '{}'", - token.0 - ), - // TEXT - 14 => { - debug!( - "received text command from client with token '{}'", - token.0 - ); + pub async fn handle( + state: Arc<Mutex<Shared>>, + stream: TcpStream, + address: SocketAddr, + count: usize, + ) -> Result<(), Box<dyn Error>> { + let bytes = BytesCodec::new().framed(stream); + let mut peer = Peer::new(state.clone(), bytes, count.to_string()).await?; + debug!("registered peer with address '{}' as '{}'", address, count); + let mut room_ids: Vec<String> = Vec::new(); + let mut username: String = String::new(); - // TODO: Make this into a command! - let message = from_utf8( - &buffer[6..*&buffer.get(0).unwrap().to_owned() as usize] - ).unwrap(); - trace!("message: {}", message); + loop { + tokio::select! { + Some(msg) = peer.rx.recv() => { + // debug!("received bytes from peer: {:?}", &msg); + peer.bytes.get_mut().write_all(&msg).await?; + } + result = peer.bytes.next() => match result { + Some(Ok(msg)) => { + // let mut state = state.lock().await; + // state.broadcast("", &msg).await; + let msg: BytesMut = msg; + match msg.get(2).unwrap() { + 10 => { // PROPREQ + debug!("received property request command from client"); + peer.bytes.get_mut() + .write_all(&create_property_update_command()).await?; - // Using User as a placeholder. Ideally, this would print out the username of - // the one who sent it. - broadcast_to_all_clients( - &sockets, - &create_text_command( - // Random integer is added to the end of "User", just a development - // proof-of-concept. Since at this stage usernames aren't exactly kept, - // we can identify message senders as their connection token; `token.0`. - &format!("User{}", rand::thread_rng().gen_range(1..150).to_string()), - message - ) - ); - debug!( - "broadcasted text command from client with token '{}'", - token.0 - ); - } - // SESSEXIT - 7 => { - debug!( - "received session termination command from client with token '{}'", - token.0 - ); - poll.deregister(sockets.get(&token).unwrap()).unwrap(); - debug!("de-registered client with token '{}'", token.0); - } - // SUBSCRIB - 16 => { - debug!( - "received room subscription command from client with token '{}'", - token.0 - ); - } - // Anything else, do nothing. - _ => () - } + debug!("sent property update command to client"); + } + 6 => { // SESSINIT + username = parse_session_initialization_command(msg).username; + debug!( + "received session initialization command from client: {}", + username + ); + peer.bytes.get_mut() + .write_all(&create_property_request_command()).await?; + debug!("sent session initialization command to client"); + } + 15 => { // PROPSET + debug!("received property set command from client"); + peer.bytes.get_mut() + .write_all(&create_text_command_with_action( + "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")? + )).await?; + debug!("sent worldsmaster greeting to client"); + } + 29 => { // BUDDYLISTUPDATE + let received_buddy = from_utf8( + msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap() + ).unwrap(); + debug!( + "received buddy list update command from client: {}", + received_buddy + ); + peer.bytes.get_mut() + .write_all(&create_buddy_list_notify_command(received_buddy)) + .await?; + debug!("sent buddy list notify command to client"); + } + 20 => { // ROOMIDRQ + let room_name = from_utf8( + msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!("received room id request command from client: {}", room_name); + let room_id; + if !room_ids.contains(&room_name.to_string()) { + room_ids.push(room_name.to_string()); + room_id = room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("inserted room '{}' as '{}'", room_name, room_id); + } else { + let position = room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("found room '{}' as '{}'", room_name, position); + room_id = position; } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => - break, - Err(e) => { error!("unexpected error: {}", e); break; } + trace!("room name: {}, room id: {}", room_name, room_id); + trace!("{:?}", room_ids); + peer.bytes.get_mut() + .write_all(&create_room_id_redirect_command(room_name, room_id)) + .await?; + debug!("sent redirect id command to client"); + } + 14 => { + let text = from_utf8( + msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!("received text command from client: {}", text); + let mut state = state.lock().await; + state.broadcast(&create_text_command(&username, text)).await; + debug!("broadcasted text command from client"); } + 7 => { // SESSEXIT + debug!("received session exit command from client") + } + _ => (), } - - // Unimplemented - // let ready = requests.get(&token).unwrap() - // .windows(4) - // .find(|window| is_double_crnl(*window)) - // .is_some(); - // - // if ready { - // let socket = sockets.get(&token).unwrap(); - // poll.reregister( - // socket, - // token, - // Ready::writable(), - // PollOpt::edge() | PollOpt::oneshot()).unwrap(); - // } - }, - // Unimplemented - // token if event.readiness().is_writable() => { - // println!("writeable"); - // requests.get_mut(&token).unwrap().clear(); - // sockets.get_mut(&token).unwrap().write_all("test".as_bytes()).unwrap(); - // - // // Re-use existing connection ("keep-alive") - switch back to reading - // poll.reregister( - // sockets.get(&token).unwrap(), - // token, - // Ready::readable(), - // PollOpt::edge()).unwrap(); - // }, - _ => () + } + Some(Err(e)) => { + error!("error while processing messages: {}", e); break; + } + None => break, } } } + + // De-register client + { + state.lock().await.peers.remove(&count.to_string()); + debug!("removed peer: {}", count) + } + + Ok(()) } } diff --git a/src/server/shared.rs b/src/server/shared.rs new file mode 100644 index 0000000..d7ff6c9 --- /dev/null +++ b/src/server/shared.rs @@ -0,0 +1,35 @@ +use std::collections::HashMap; +// use std::net::SocketAddr; +use bytes::BytesMut; +use crate::server::Tx; + +pub struct Shared { + pub peers: HashMap<String, Tx>, +} +impl Shared { + pub fn new() -> Self { + Shared { + peers: HashMap::new(), + } + } + + pub async fn broadcast(&mut self, /* sender: &str, */ message: &[u8]) { + // debug!("peer sent message: {:?}", message); + // debug!("peer count: {}", self.peers.len()); + // debug!("peers: {:?}", self.peers); + for peer in self.peers.iter_mut() { + // debug!("peer: {:?}", peer); + // TODO: + // thread 'tokio-runtime-worker' panicked at 'called `Option::unwrap()` on a `None` value' + peer.1.send(BytesMut::from(message)).unwrap(); + } + } + + // pub async fn broadcast(&mut self, sender: SocketAddr, message: &str) { + // for peer in self.peers.iter_mut() { + // if *peer.0 != sender { + // let _ = peer.1.send(message.into()); + // } + // } + // } +} diff --git a/src/server/room/structures.rs b/src/server/structures.rs index 25fcf00..25fcf00 100644 --- a/src/server/room/structures.rs +++ b/src/server/structures.rs diff --git a/src/server/utils.rs b/src/server/utils.rs deleted file mode 100644 index 000b788..0000000 --- a/src/server/utils.rs +++ /dev/null @@ -1,13 +0,0 @@ -use std::collections::HashMap; -use mio::Token; -use mio::net::TcpStream; -use std::io::Write; - -pub fn broadcast_to_all_clients( - sockets: &HashMap<Token, TcpStream>, - message: &[u8] -) -> () { - for mut socket in sockets { - socket.1.write_all(message).unwrap(); - } -} diff --git a/src/utils/db.rs b/src/utils/db.rs index adc11d3..e6e1946 100644 --- a/src/utils/db.rs +++ b/src/utils/db.rs @@ -1,15 +1,16 @@ -use sqlx::sqlite::{SqlitePool, SqlitePoolOptions}; -use std::env; +use sqlx::sqlite::SqlitePoolOptions; +use std::error::Error; +use sqlx::SqlitePool; -pub async fn get_pool() -> Result<SqlitePool, Box<dyn std::error::Error>> { +pub async fn get_pool() -> Result<SqlitePool, Box<dyn Error>> { let pool = SqlitePoolOptions::new() .max_connections(20) - .connect(&env::var("DATABASE_URL")?) + .connect(&std::env::var("DATABASE_URL")?) .await?; debug!( "connected to database at url '{}'", - &env::var("DATABASE_URL")? + &std::env::var("DATABASE_URL")? ); Ok(pool) diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 66f6db0..4e23340 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,3 +1,2 @@ pub mod byte; pub mod db; -pub mod web; diff --git a/src/utils/web.rs b/src/utils/web.rs deleted file mode 100644 index a834648..0000000 --- a/src/utils/web.rs +++ /dev/null @@ -1,7 +0,0 @@ -pub fn _is_double_crnl(window: &[u8]) -> bool { - window.len() >= 4 - && (window[0] == '\r' as u8) - && (window[1] == '\n' as u8) - && (window[2] == '\r' as u8) - && (window[3] == '\n' as u8) -} |