diff options
| author | Fuwn <[email protected]> | 2021-03-25 22:20:21 +0000 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2021-03-25 22:20:21 +0000 |
| commit | 5fc28bfb2851441893ef2ad5f72e0feb8a0a22cc (patch) | |
| tree | f39b7bccc486298b1b7f4945cad37b4839817b73 /src/server/room | |
| parent | feature: Byte utilities (diff) | |
| download | whirl-5fc28bfb2851441893ef2ad5f72e0feb8a0a22cc.tar.xz whirl-5fc28bfb2851441893ef2ad5f72e0feb8a0a22cc.zip | |
major: Publish work-in-progress
Diffstat (limited to 'src/server/room')
| -rw-r--r-- | src/server/room/cmd/mod.rs | 1 | ||||
| -rw-r--r-- | src/server/room/cmd/session.rs | 18 | ||||
| -rw-r--r-- | src/server/room/mod.rs | 1 | ||||
| -rw-r--r-- | src/server/room/server.rs | 351 | ||||
| -rw-r--r-- | src/server/room/structures.rs | 15 |
5 files changed, 167 insertions, 219 deletions
diff --git a/src/server/room/cmd/mod.rs b/src/server/room/cmd/mod.rs index f348cd0..1d123c0 100644 --- a/src/server/room/cmd/mod.rs +++ b/src/server/room/cmd/mod.rs @@ -1 +1,2 @@ pub mod property; +pub mod session; diff --git a/src/server/room/cmd/session.rs b/src/server/room/cmd/session.rs new file mode 100644 index 0000000..f04c078 --- /dev/null +++ b/src/server/room/cmd/session.rs @@ -0,0 +1,18 @@ +use crate::server::cmd::session::SessionInitializationCommand; +use bytes::BytesMut; +use std::str::from_utf8; + +pub fn parse_session_initialization_command( + command: BytesMut +) -> SessionInitializationCommand { + SessionInitializationCommand { + // protocol: command.get(4..4 + command.get(4)).unwrap().to_owned() as usize, + // client: "".to_string(), + username: from_utf8( + command.get( + 25..(24 + command.get(24).unwrap().to_owned() as usize + 1) + ).unwrap() + ).unwrap().to_string(), + // password: "".to_string() + } +} diff --git a/src/server/room/mod.rs b/src/server/room/mod.rs index f581dfe..24606ea 100644 --- a/src/server/room/mod.rs +++ b/src/server/room/mod.rs @@ -1,3 +1,2 @@ pub mod cmd; pub mod server; -mod structures; diff --git a/src/server/room/server.rs b/src/server/room/server.rs index c005e40..8781868 100644 --- a/src/server/room/server.rs +++ b/src/server/room/server.rs @@ -1,222 +1,167 @@ -use mio::net::{TcpListener, TcpStream}; -use std::io::{Read, Write}; -use mio::{Poll, Token, Ready, PollOpt, Events}; -use std::collections::HashMap; +use std::error::Error; +use tokio::net::{TcpListener, TcpStream}; +use tokio::io::AsyncWriteExt; +use crate::server::room::cmd::property::{ + create_property_update_command, + create_property_request_command +}; +use tokio_util::codec::{BytesCodec, Decoder}; +use tokio_stream::StreamExt; +use bytes::BytesMut; +use crate::server::cmd::text::{create_text_command_with_action, create_text_command}; use std::str::from_utf8; use crate::server::cmd::buddy_list::create_buddy_list_notify_command; -use crate::server::cmd::text::create_text_command; -// use crate::cmd::property::{create_property_update_command, create_property_request_command}; -use crate::server::room::cmd::property::{create_property_update_command, create_property_request_command}; -use rand::Rng; -use crate::server::utils::broadcast_to_all_clients; +use crate::server::auto::cmd::room::create_room_id_redirect_command; +use std::sync::Arc; +use tokio::sync::Mutex; +use crate::server::shared::Shared; +use crate::server::peer::Peer; +use std::net::SocketAddr; +use crate::server::room::cmd::session::parse_session_initialization_command; pub struct RoomServer; impl RoomServer { - pub fn new(listener: TcpListener) { - let poll = Poll::new().unwrap(); - poll.register( - &listener, - Token(0), - Ready::readable(), - PollOpt::edge() - ).unwrap(); + pub async fn new(addr: &'static str) -> Result<(), Box<dyn Error>> { + let listener = TcpListener::bind(addr).await?; + debug!("RoomServer now listening on {}", listener.local_addr().unwrap()); + let state = Arc::new(Mutex::new(Shared::new())); + let mut counter = 0; - let mut counter: usize = 0; - let mut sockets: HashMap<Token, TcpStream> = HashMap::new(); - let mut requests: HashMap<Token, Vec<u8>> = HashMap::new(); - let mut buffer = [0 as u8; 1024]; - - let mut events = Events::with_capacity(1024); loop { - poll.poll(&mut events, None).unwrap(); - for event in &events { - match event.token() { - Token(0) => { - loop { - match listener.accept() { - Ok((socket, address)) => { - counter += 1; - let token = Token(counter); - - poll.register( - &socket, - token, - Ready::readable(), - PollOpt::edge() - ).unwrap(); - - info!( - "registered client with ip '{}' as token '{}'", - address.ip(), token.0 - ); - - sockets.insert(token, socket); - requests.insert(token, Vec::with_capacity(192)); - } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => - break, - Err(e) => { - error!("unexpected error: {}", e); - poll.deregister(sockets.get(&Token(counter)).unwrap()).unwrap(); - break; - } - } - } - }, - token if event.readiness().is_readable() => { - loop { - let read = sockets.get_mut(&token).unwrap().read(&mut buffer); - match read { - Ok(0) => { sockets.remove(&token); break; } - Ok(n) => { - let req = requests.get_mut(&token).unwrap(); - for b in &buffer[0..n] { - req.push(*b); - } + let (stream, address) = listener.accept().await?; + counter += 1; + let state = Arc::clone(&state); - // First byte means how long data section of the packet is. - // Second byte is to be determined. - // Third byte is the request type. + tokio::spawn(async move { + if let Err(e) = RoomServer::handle( + state, + stream, + address, + counter + ).await { + error!("an error occurred: {}", e); + } + }); + } + } - // Match packet type by descriptor; **third** byte. - match &buffer.get(2).unwrap() { // Third byte is 2 because Rust is zero-indexed. - // PROPREQ - 10 => { - debug!( - "received property request command from client with token '{}'", - token.0 - ); - sockets.get_mut(&token).unwrap() - .write_all(&create_property_update_command()).unwrap(); - debug!( - "sent property update from client with token '{}'", - token.0 - ); - } - // SESSINIT - 6 => { - debug!( - "received session initialization command from client with token '{}'", - token.0 - ); - sockets.get_mut(&token).unwrap() - .write_all(&create_property_request_command()).unwrap(); - debug!( - "sent session initialization command from client with token '{}'", - token.0 - ); - } - // PROPSET - 15 => debug!( - "received property set command from client with token '{}'", - token.0 - ), - // BUDDYLISTUPDATE - 29 => { - debug!( - "received buddy list update command from client with token '{}'", - token.0 - ); - sockets.get_mut(&token).unwrap() - .write_all(&create_buddy_list_notify_command("Wirlaburla")) - .unwrap(); - debug!( - "sent buddy notify update command from client with token '{}'", - token.0 - ); - } - // ROOMIDRQ - 20 => debug!( - "received room id request command from client with token '{}'", - token.0 - ), - // TEXT - 14 => { - debug!( - "received text command from client with token '{}'", - token.0 - ); + pub async fn handle( + state: Arc<Mutex<Shared>>, + stream: TcpStream, + address: SocketAddr, + count: usize, + ) -> Result<(), Box<dyn Error>> { + let bytes = BytesCodec::new().framed(stream); + let mut peer = Peer::new(state.clone(), bytes, count.to_string()).await?; + debug!("registered peer with address '{}' as '{}'", address, count); + let mut room_ids: Vec<String> = Vec::new(); + let mut username: String = String::new(); - // TODO: Make this into a command! - let message = from_utf8( - &buffer[6..*&buffer.get(0).unwrap().to_owned() as usize] - ).unwrap(); - trace!("message: {}", message); + loop { + tokio::select! { + Some(msg) = peer.rx.recv() => { + // debug!("received bytes from peer: {:?}", &msg); + peer.bytes.get_mut().write_all(&msg).await?; + } + result = peer.bytes.next() => match result { + Some(Ok(msg)) => { + // let mut state = state.lock().await; + // state.broadcast("", &msg).await; + let msg: BytesMut = msg; + match msg.get(2).unwrap() { + 10 => { // PROPREQ + debug!("received property request command from client"); + peer.bytes.get_mut() + .write_all(&create_property_update_command()).await?; - // Using User as a placeholder. Ideally, this would print out the username of - // the one who sent it. - broadcast_to_all_clients( - &sockets, - &create_text_command( - // Random integer is added to the end of "User", just a development - // proof-of-concept. Since at this stage usernames aren't exactly kept, - // we can identify message senders as their connection token; `token.0`. - &format!("User{}", rand::thread_rng().gen_range(1..150).to_string()), - message - ) - ); - debug!( - "broadcasted text command from client with token '{}'", - token.0 - ); - } - // SESSEXIT - 7 => { - debug!( - "received session termination command from client with token '{}'", - token.0 - ); - poll.deregister(sockets.get(&token).unwrap()).unwrap(); - debug!("de-registered client with token '{}'", token.0); - } - // SUBSCRIB - 16 => { - debug!( - "received room subscription command from client with token '{}'", - token.0 - ); - } - // Anything else, do nothing. - _ => () - } + debug!("sent property update command to client"); + } + 6 => { // SESSINIT + username = parse_session_initialization_command(msg).username; + debug!( + "received session initialization command from client: {}", + username + ); + peer.bytes.get_mut() + .write_all(&create_property_request_command()).await?; + debug!("sent session initialization command to client"); + } + 15 => { // PROPSET + debug!("received property set command from client"); + peer.bytes.get_mut() + .write_all(&create_text_command_with_action( + "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")? + )).await?; + debug!("sent worldsmaster greeting to client"); + } + 29 => { // BUDDYLISTUPDATE + let received_buddy = from_utf8( + msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap() + ).unwrap(); + debug!( + "received buddy list update command from client: {}", + received_buddy + ); + peer.bytes.get_mut() + .write_all(&create_buddy_list_notify_command(received_buddy)) + .await?; + debug!("sent buddy list notify command to client"); + } + 20 => { // ROOMIDRQ + let room_name = from_utf8( + msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!("received room id request command from client: {}", room_name); + let room_id; + if !room_ids.contains(&room_name.to_string()) { + room_ids.push(room_name.to_string()); + room_id = room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("inserted room '{}' as '{}'", room_name, room_id); + } else { + let position = room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("found room '{}' as '{}'", room_name, position); + room_id = position; } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => - break, - Err(e) => { error!("unexpected error: {}", e); break; } + trace!("room name: {}, room id: {}", room_name, room_id); + trace!("{:?}", room_ids); + peer.bytes.get_mut() + .write_all(&create_room_id_redirect_command(room_name, room_id)) + .await?; + debug!("sent redirect id command to client"); + } + 14 => { + let text = from_utf8( + msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!("received text command from client: {}", text); + let mut state = state.lock().await; + state.broadcast(&create_text_command(&username, text)).await; + debug!("broadcasted text command from client"); } + 7 => { // SESSEXIT + debug!("received session exit command from client") + } + _ => (), } - - // Unimplemented - // let ready = requests.get(&token).unwrap() - // .windows(4) - // .find(|window| is_double_crnl(*window)) - // .is_some(); - // - // if ready { - // let socket = sockets.get(&token).unwrap(); - // poll.reregister( - // socket, - // token, - // Ready::writable(), - // PollOpt::edge() | PollOpt::oneshot()).unwrap(); - // } - }, - // Unimplemented - // token if event.readiness().is_writable() => { - // println!("writeable"); - // requests.get_mut(&token).unwrap().clear(); - // sockets.get_mut(&token).unwrap().write_all("test".as_bytes()).unwrap(); - // - // // Re-use existing connection ("keep-alive") - switch back to reading - // poll.reregister( - // sockets.get(&token).unwrap(), - // token, - // Ready::readable(), - // PollOpt::edge()).unwrap(); - // }, - _ => () + } + Some(Err(e)) => { + error!("error while processing messages: {}", e); break; + } + None => break, } } } + + // De-register client + { + state.lock().await.peers.remove(&count.to_string()); + debug!("removed peer: {}", count) + } + + Ok(()) } } diff --git a/src/server/room/structures.rs b/src/server/room/structures.rs deleted file mode 100644 index 25fcf00..0000000 --- a/src/server/room/structures.rs +++ /dev/null @@ -1,15 +0,0 @@ -// Unused struct, purely for reference. -// struct RoomSubscribeInfo { -// pub x: f64, -// pub y: f64, -// pub z: f64, -// pub d: f64, -// } - -pub struct Room { - distance: i16, - x: i16, - y: i16, - z: i16, - room_number: i16, -} |