diff options
| author | Fuwn <[email protected]> | 2021-03-25 22:20:21 +0000 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2021-03-25 22:20:21 +0000 |
| commit | 5fc28bfb2851441893ef2ad5f72e0feb8a0a22cc (patch) | |
| tree | f39b7bccc486298b1b7f4945cad37b4839817b73 /src/server/auto/server.rs | |
| parent | feature: Byte utilities (diff) | |
| download | whirl-5fc28bfb2851441893ef2ad5f72e0feb8a0a22cc.tar.xz whirl-5fc28bfb2851441893ef2ad5f72e0feb8a0a22cc.zip | |
major: Publish work-in-progress
Diffstat (limited to 'src/server/auto/server.rs')
| -rw-r--r-- | src/server/auto/server.rs | 480 |
1 files changed, 232 insertions, 248 deletions
diff --git a/src/server/auto/server.rs b/src/server/auto/server.rs index 381852f..c05ed5e 100644 --- a/src/server/auto/server.rs +++ b/src/server/auto/server.rs @@ -1,266 +1,250 @@ -use mio::net::{TcpListener, TcpStream}; -use std::io::{Read, Write}; -use mio::{Poll, Token, Ready, PollOpt, Events}; -use std::collections::{HashMap, HashSet}; +use std::error::Error; +use tokio::net::{TcpListener, TcpStream}; +use tokio::io::AsyncWriteExt; +use crate::server::auto::cmd::property::{ + create_property_update_command, + create_property_request_command +}; +use tokio_util::codec::{BytesCodec, Decoder}; +use tokio_stream::StreamExt; +use bytes::BytesMut; +use crate::server::cmd::text::{create_text_command_with_action, create_text_command}; use std::str::from_utf8; use crate::server::cmd::buddy_list::create_buddy_list_notify_command; -use crate::server::cmd::text::{create_text_command, create_text_command_with_action}; -use super::cmd::property::{create_property_update_command, create_property_request_command}; -use super::cmd::room::create_room_id_redirect_command; -use rand::Rng; -use crate::server::utils::broadcast_to_all_clients; - -// pub struct ClientSocket { -// tcp_stream: TcpStream, -// username: String, -// } +use crate::server::auto::cmd::room::create_room_id_redirect_command; +use std::sync::Arc; +use tokio::sync::Mutex; +use crate::server::shared::Shared; +use crate::server::peer::Peer; +use std::net::SocketAddr; +use crate::server::auto::cmd::session::parse_session_initialization_command; pub struct AutoServer; impl AutoServer { - pub fn new(listener: TcpListener) { - let poll = Poll::new().unwrap(); - poll.register( - &listener, - Token(0), - Ready::readable(), - PollOpt::edge() - ).unwrap(); - - let mut counter: usize = 0; - let mut sockets: HashMap<Token, TcpStream> = HashMap::new(); - let mut requests: HashMap<Token, Vec<u8>> = HashMap::new(); - let mut buffer = [0 as u8; 1024]; - // let mut room_ids: HashMap<&str, i32> = HashMap::new(); - let mut room_ids: HashSet<String> = HashSet::new(); + pub async fn new(addr: &'static str) -> Result<(), Box<dyn Error>> { + let listener = TcpListener::bind(addr).await?; + debug!("AutoServer now listening on {}", listener.local_addr().unwrap()); + let state = Arc::new(Mutex::new(Shared::new())); + let mut counter = 0; - let mut events = Events::with_capacity(1024); loop { - poll.poll(&mut events, None).unwrap(); - for event in &events { - match event.token() { - Token(0) => { - loop { - match listener.accept() { - Ok((socket, address)) => { - counter += 1; - let token = Token(counter); - - poll.register( - &socket, - token, - Ready::readable(), - PollOpt::edge() - ).unwrap(); + let (stream, address) = listener.accept().await?; + counter += 1; + let state = Arc::clone(&state); + + tokio::spawn(async move { + if let Err(e) = AutoServer::handle( + state, + stream, + address, + counter + ).await { + error!("an error occurred: {}", e); + } + }); + } + } - info!( - "registered client with ip '{}' as token '{}'", - address.ip(), token.0 - ); + pub async fn handle( + state: Arc<Mutex<Shared>>, + stream: TcpStream, + address: SocketAddr, + count: usize, + ) -> Result<(), Box<dyn Error>> { + let bytes = BytesCodec::new().framed(stream); + let mut peer = Peer::new(state.clone(), bytes, count.to_string()).await?; + debug!("registered peer with address '{}' as '{}'", address, count); + let mut room_ids: Vec<String> = Vec::new(); + let mut username: String = String::new(); + + // while let Some(msg) = peer.bytes.next().await { + // match msg { + // Ok(bytes) => match bytes.get(2).unwrap() { + // 10 => { // PROPREQ + // debug!("received property request command from client"); + // peer.bytes.get_mut() + // .write_all(&create_property_update_command()).await?; + // + // debug!("sent property update command to client"); + // } + // 6 => { // SESSINIT + // username = parse_session_initialization_command(bytes).username; + // debug!( + // "received session initialization command from client: {}", + // username + // ); + // peer.bytes.get_mut() + // .write_all(&create_property_request_command()).await?; + // debug!("sent session initialization command to client"); + // } + // 15 => { // PROPSET + // debug!("received property set command from client"); + // peer.bytes.get_mut() + // .write_all(&create_text_command_with_action( + // "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")? + // )).await?; + // debug!("sent worldsmaster greeting to client"); + // } + // 29 => { // BUDDYLISTUPDATE + // let received_buddy = from_utf8( + // bytes.get(4..bytes.get(0).unwrap().to_owned() as usize - 1).unwrap() + // ).unwrap(); + // debug!( + // "received buddy list update command from client: {}", + // received_buddy + // ); + // peer.bytes.get_mut() + // .write_all(&create_buddy_list_notify_command(received_buddy)) + // .await?; + // debug!("sent buddy list notify command to client"); + // } + // 20 => { // ROOMIDRQ + // let room_name = from_utf8( + // bytes.get(4..bytes.get(0).unwrap().to_owned() as usize).unwrap() + // ).unwrap(); + // debug!("received room id request command from client: {}", room_name); + // let room_id; + // if !room_ids.contains(&room_name.to_string()) { + // room_ids.push(room_name.to_string()); + // room_id = room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("inserted room '{}' as '{}'", room_name, room_id); + // } else { + // let position = room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("found room '{}' as '{}'", room_name, position); + // room_id = position; + // } + // trace!("room name: {}, room id: {}", room_name, room_id); + // trace!("{:?}", room_ids); + // peer.bytes.get_mut() + // .write_all(&create_room_id_redirect_command(room_name, room_id)) + // .await?; + // debug!("sent redirect id command to client"); + // } + // 14 => { + // let text = from_utf8( + // bytes.get(6..bytes.get(0).unwrap().to_owned() as usize).unwrap() + // ).unwrap(); + // debug!("received text command from client: {}", text); + // let mut state = state.lock().await; + // state.broadcast(&create_text_command(&username, text)).await; + // debug!("broadcasted text command from client"); + // } + // 7 => { // SESSEXIT + // debug!("received session exit command from client") + // } + // _ => (), + // } + // Err(err) => error!("stream closed with error: {:?}", err), + // } + // } - sockets.insert(token, socket); - requests.insert(token, Vec::with_capacity(192)); - } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => - break, - Err(e) => { - error!("unexpected error: {}", e); - poll.deregister(sockets.get(&Token(counter)).unwrap()).unwrap(); - break; - } + loop { + tokio::select! { + Some(msg) = peer.rx.recv() => { + // debug!("received bytes from peer: {:?}", &msg); + peer.bytes.get_mut().write_all(&msg).await?; + } + result = peer.bytes.next() => match result { + Some(Ok(msg)) => { + let msg: BytesMut = msg; + match msg.get(2).unwrap() { + 10 => { // PROPREQ + debug!("received property request command from client"); + peer.bytes.get_mut() + .write_all(&create_property_update_command()).await?; + + debug!("sent property update command to client"); } - } - }, - token if event.readiness().is_readable() => { - loop { - let read = sockets.get_mut(&token).unwrap().read(&mut buffer); - match read { - Ok(0) => { sockets.remove(&token); break; } - Ok(n) => { - let req = requests.get_mut(&token).unwrap(); - for b in &buffer[0..n] { - req.push(*b); - } - - // First byte means how long data section of the packet is. - // Second byte is to be determined. - // Third byte is the request type. - - // Match packet type by descriptor; **third** byte. - match &buffer.get(2).unwrap() { // Third byte is 2 because Rust is zero-indexed. - // PROPREQ - 10 => { - debug!( - "received property request command from client with token '{}'", - token.0 - ); - sockets.get_mut(&token).unwrap() - .write_all(&create_property_update_command()).unwrap(); - debug!( - "sent property update command to client with token '{}'", - token.0 - ); - } - // SESSINIT - 6 => { - debug!( - "received session initialization command from client with token '{}'", - token.0 - ); - sockets.get_mut(&token).unwrap() - .write_all(&create_property_request_command()).unwrap(); - debug!( - "sent session initialization command to client with token '{}'", - token.0 - ); - } - // PROPSET - 15 => { - debug!( - "received property set command from client with token '{}'", - token.0 - ); - sockets.get_mut(&token).unwrap() - .write_all(&create_text_command_with_action( - "WORLDSMASTER", - "Welcome to Whirlsplash!" - )).unwrap(); - debug!( - "sent worldsmaster greeting to client with token '{}'", - token.0 - ); - }, - // BUDDYLISTUPDATE - 29 => { - debug!( - "received buddy list update command from client with token '{}'", - token.0 - ); - - let received_buddy = from_utf8( - &buffer[4..*&buffer.get(0).unwrap().to_owned() as usize - 1] - ).unwrap(); - debug!("received buddy: {}", received_buddy); - - sockets.get_mut(&token).unwrap() - .write_all(&create_buddy_list_notify_command(received_buddy)) - .unwrap(); - debug!( - "sent buddy notify update command to client with token '{}'", - token.0 - ); - } - // ROOMIDRQ - 20 => { - debug!( - "received room id request command from client with token '{}'", - token.0 - ); - - let room_name = from_utf8( - &buffer[4..*&buffer.get(0).unwrap().to_owned() as usize] - ).unwrap(); - let mut room_id = 0; - if !room_ids.contains(room_name) { - room_ids.insert(room_name.to_string()); - room_id = room_ids.iter() - .position(|i| i == room_name) - .unwrap(); - trace!("inserted room '{}' as '{}'", room_name, room_id); - } else { - let pos = room_ids - .iter() - .position(|i| i == room_name) - .unwrap(); - trace!("found room '{}' as '{}'", room_name, pos); - room_id = pos; - } - trace!("room name: {}, room id: {}", room_name, room_id); - trace!("{:?}", room_ids); - - // Passing `0` as `room_id` parameter as currently there is - // no way to find out a room's ID based on it's name. - sockets.get_mut(&token).unwrap() - .write_all(&create_room_id_redirect_command(room_name, room_id)) - .unwrap(); - debug!("sent redirect id command to client with token '{}'", token.0) - } - // TEXT - 14 => { - debug!("received text command from client with token '{}'", token.0); - - // TODO: Make this into a command! - let message = from_utf8( - &buffer[6..*&buffer.get(0).unwrap().to_owned() as usize] - ).unwrap(); - trace!("message: {}", message); - - // Using User as a placeholder. Ideally, this would print out the username of - // the one who sent it. - broadcast_to_all_clients( - &sockets, - &create_text_command( - // Random integer is added to the end of "User", just a development - // proof-of-concept. Since at this stage usernames aren't exactly kept, - // we can identify message senders as their connection token; `token.0`. - &format!("User{}", rand::thread_rng().gen_range(1..150).to_string()), - message - ) - ); - debug!( - "broadcasted text command from client with token '{}'", - token.0 - ); - } - // SESSEXIT - 7 => { - debug!( - "received session termination command from client with token '{}'", - token.0 - ); - poll.deregister(sockets.get(&token).unwrap()).unwrap(); - debug!("de-registered client with token '{}'", token.0); - } - // Anything else, do nothing. - _ => () - } + 6 => { // SESSINIT + username = parse_session_initialization_command(msg).username; + debug!( + "received session initialization command from client: {}", + username + ); + peer.bytes.get_mut() + .write_all(&create_property_request_command()).await?; + debug!("sent session initialization command to client"); + } + 15 => { // PROPSET + debug!("received property set command from client"); + peer.bytes.get_mut() + .write_all(&create_text_command_with_action( + "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")? + )).await?; + debug!("sent worldsmaster greeting to client"); + } + 29 => { // BUDDYLISTUPDATE + let received_buddy = from_utf8( + msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap() + ).unwrap(); + debug!( + "received buddy list update command from client: {}", + received_buddy + ); + peer.bytes.get_mut() + .write_all(&create_buddy_list_notify_command(received_buddy)) + .await?; + debug!("sent buddy list notify command to client"); + } + 20 => { // ROOMIDRQ + let room_name = from_utf8( + msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!("received room id request command from client: {}", room_name); + let room_id; + if !room_ids.contains(&room_name.to_string()) { + room_ids.push(room_name.to_string()); + room_id = room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("inserted room '{}' as '{}'", room_name, room_id); + } else { + let position = room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("found room '{}' as '{}'", room_name, position); + room_id = position; } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => - break, - Err(e) => { error!("unexpected error: {}", e); break; } + trace!("room name: {}, room id: {}", room_name, room_id); + trace!("{:?}", room_ids); + peer.bytes.get_mut() + .write_all(&create_room_id_redirect_command(room_name, room_id)) + .await?; + debug!("sent redirect id command to client"); + } + 14 => { + let text = from_utf8( + msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!("received text command from client: {}", text); + let mut state = state.lock().await; + state.broadcast(&create_text_command(&username, text)).await; + debug!("broadcasted text command from client"); } + 7 => { // SESSEXIT + debug!("received session exit command from client") + } + _ => (), } - - // Unimplemented - // let ready = requests.get(&token).unwrap() - // .windows(4) - // .find(|window| is_double_crnl(*window)) - // .is_some(); - // - // if ready { - // let socket = sockets.get(&token).unwrap(); - // poll.reregister( - // socket, - // token, - // Ready::writable(), - // PollOpt::edge() | PollOpt::oneshot()).unwrap(); - // } - }, - // Unimplemented - // token if event.readiness().is_writable() => { - // println!("writeable"); - // requests.get_mut(&token).unwrap().clear(); - // sockets.get_mut(&token).unwrap().write_all("test".as_bytes()).unwrap(); - // - // // Re-use existing connection ("keep-alive") - switch back to reading - // poll.reregister( - // sockets.get(&token).unwrap(), - // token, - // Ready::readable(), - // PollOpt::edge()).unwrap(); - // }, - _ => () + } + Some(Err(e)) => { + error!("error while processing messages: {}", e); break; + } + None => break, } } } + + // De-register client + { + state.lock().await.peers.remove(&count.to_string()); + debug!("removed peer: {}", count) + } + + Ok(()) } } |