aboutsummaryrefslogtreecommitdiff
path: root/src/server/auto/server.rs
diff options
context:
space:
mode:
authorFuwn <[email protected]>2021-03-25 22:20:21 +0000
committerFuwn <[email protected]>2021-03-25 22:20:21 +0000
commit5fc28bfb2851441893ef2ad5f72e0feb8a0a22cc (patch)
treef39b7bccc486298b1b7f4945cad37b4839817b73 /src/server/auto/server.rs
parentfeature: Byte utilities (diff)
downloadwhirl-5fc28bfb2851441893ef2ad5f72e0feb8a0a22cc.tar.xz
whirl-5fc28bfb2851441893ef2ad5f72e0feb8a0a22cc.zip
major: Publish work-in-progress
Diffstat (limited to 'src/server/auto/server.rs')
-rw-r--r--src/server/auto/server.rs480
1 files changed, 232 insertions, 248 deletions
diff --git a/src/server/auto/server.rs b/src/server/auto/server.rs
index 381852f..c05ed5e 100644
--- a/src/server/auto/server.rs
+++ b/src/server/auto/server.rs
@@ -1,266 +1,250 @@
-use mio::net::{TcpListener, TcpStream};
-use std::io::{Read, Write};
-use mio::{Poll, Token, Ready, PollOpt, Events};
-use std::collections::{HashMap, HashSet};
+use std::error::Error;
+use tokio::net::{TcpListener, TcpStream};
+use tokio::io::AsyncWriteExt;
+use crate::server::auto::cmd::property::{
+ create_property_update_command,
+ create_property_request_command
+};
+use tokio_util::codec::{BytesCodec, Decoder};
+use tokio_stream::StreamExt;
+use bytes::BytesMut;
+use crate::server::cmd::text::{create_text_command_with_action, create_text_command};
use std::str::from_utf8;
use crate::server::cmd::buddy_list::create_buddy_list_notify_command;
-use crate::server::cmd::text::{create_text_command, create_text_command_with_action};
-use super::cmd::property::{create_property_update_command, create_property_request_command};
-use super::cmd::room::create_room_id_redirect_command;
-use rand::Rng;
-use crate::server::utils::broadcast_to_all_clients;
-
-// pub struct ClientSocket {
-// tcp_stream: TcpStream,
-// username: String,
-// }
+use crate::server::auto::cmd::room::create_room_id_redirect_command;
+use std::sync::Arc;
+use tokio::sync::Mutex;
+use crate::server::shared::Shared;
+use crate::server::peer::Peer;
+use std::net::SocketAddr;
+use crate::server::auto::cmd::session::parse_session_initialization_command;
pub struct AutoServer;
impl AutoServer {
- pub fn new(listener: TcpListener) {
- let poll = Poll::new().unwrap();
- poll.register(
- &listener,
- Token(0),
- Ready::readable(),
- PollOpt::edge()
- ).unwrap();
-
- let mut counter: usize = 0;
- let mut sockets: HashMap<Token, TcpStream> = HashMap::new();
- let mut requests: HashMap<Token, Vec<u8>> = HashMap::new();
- let mut buffer = [0 as u8; 1024];
- // let mut room_ids: HashMap<&str, i32> = HashMap::new();
- let mut room_ids: HashSet<String> = HashSet::new();
+ pub async fn new(addr: &'static str) -> Result<(), Box<dyn Error>> {
+ let listener = TcpListener::bind(addr).await?;
+ debug!("AutoServer now listening on {}", listener.local_addr().unwrap());
+ let state = Arc::new(Mutex::new(Shared::new()));
+ let mut counter = 0;
- let mut events = Events::with_capacity(1024);
loop {
- poll.poll(&mut events, None).unwrap();
- for event in &events {
- match event.token() {
- Token(0) => {
- loop {
- match listener.accept() {
- Ok((socket, address)) => {
- counter += 1;
- let token = Token(counter);
-
- poll.register(
- &socket,
- token,
- Ready::readable(),
- PollOpt::edge()
- ).unwrap();
+ let (stream, address) = listener.accept().await?;
+ counter += 1;
+ let state = Arc::clone(&state);
+
+ tokio::spawn(async move {
+ if let Err(e) = AutoServer::handle(
+ state,
+ stream,
+ address,
+ counter
+ ).await {
+ error!("an error occurred: {}", e);
+ }
+ });
+ }
+ }
- info!(
- "registered client with ip '{}' as token '{}'",
- address.ip(), token.0
- );
+ pub async fn handle(
+ state: Arc<Mutex<Shared>>,
+ stream: TcpStream,
+ address: SocketAddr,
+ count: usize,
+ ) -> Result<(), Box<dyn Error>> {
+ let bytes = BytesCodec::new().framed(stream);
+ let mut peer = Peer::new(state.clone(), bytes, count.to_string()).await?;
+ debug!("registered peer with address '{}' as '{}'", address, count);
+ let mut room_ids: Vec<String> = Vec::new();
+ let mut username: String = String::new();
+
+ // while let Some(msg) = peer.bytes.next().await {
+ // match msg {
+ // Ok(bytes) => match bytes.get(2).unwrap() {
+ // 10 => { // PROPREQ
+ // debug!("received property request command from client");
+ // peer.bytes.get_mut()
+ // .write_all(&create_property_update_command()).await?;
+ //
+ // debug!("sent property update command to client");
+ // }
+ // 6 => { // SESSINIT
+ // username = parse_session_initialization_command(bytes).username;
+ // debug!(
+ // "received session initialization command from client: {}",
+ // username
+ // );
+ // peer.bytes.get_mut()
+ // .write_all(&create_property_request_command()).await?;
+ // debug!("sent session initialization command to client");
+ // }
+ // 15 => { // PROPSET
+ // debug!("received property set command from client");
+ // peer.bytes.get_mut()
+ // .write_all(&create_text_command_with_action(
+ // "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")?
+ // )).await?;
+ // debug!("sent worldsmaster greeting to client");
+ // }
+ // 29 => { // BUDDYLISTUPDATE
+ // let received_buddy = from_utf8(
+ // bytes.get(4..bytes.get(0).unwrap().to_owned() as usize - 1).unwrap()
+ // ).unwrap();
+ // debug!(
+ // "received buddy list update command from client: {}",
+ // received_buddy
+ // );
+ // peer.bytes.get_mut()
+ // .write_all(&create_buddy_list_notify_command(received_buddy))
+ // .await?;
+ // debug!("sent buddy list notify command to client");
+ // }
+ // 20 => { // ROOMIDRQ
+ // let room_name = from_utf8(
+ // bytes.get(4..bytes.get(0).unwrap().to_owned() as usize).unwrap()
+ // ).unwrap();
+ // debug!("received room id request command from client: {}", room_name);
+ // let room_id;
+ // if !room_ids.contains(&room_name.to_string()) {
+ // room_ids.push(room_name.to_string());
+ // room_id = room_ids.iter()
+ // .position(|i| i == &room_name.to_string())
+ // .unwrap();
+ // trace!("inserted room '{}' as '{}'", room_name, room_id);
+ // } else {
+ // let position = room_ids.iter()
+ // .position(|i| i == &room_name.to_string())
+ // .unwrap();
+ // trace!("found room '{}' as '{}'", room_name, position);
+ // room_id = position;
+ // }
+ // trace!("room name: {}, room id: {}", room_name, room_id);
+ // trace!("{:?}", room_ids);
+ // peer.bytes.get_mut()
+ // .write_all(&create_room_id_redirect_command(room_name, room_id))
+ // .await?;
+ // debug!("sent redirect id command to client");
+ // }
+ // 14 => {
+ // let text = from_utf8(
+ // bytes.get(6..bytes.get(0).unwrap().to_owned() as usize).unwrap()
+ // ).unwrap();
+ // debug!("received text command from client: {}", text);
+ // let mut state = state.lock().await;
+ // state.broadcast(&create_text_command(&username, text)).await;
+ // debug!("broadcasted text command from client");
+ // }
+ // 7 => { // SESSEXIT
+ // debug!("received session exit command from client")
+ // }
+ // _ => (),
+ // }
+ // Err(err) => error!("stream closed with error: {:?}", err),
+ // }
+ // }
- sockets.insert(token, socket);
- requests.insert(token, Vec::with_capacity(192));
- }
- Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock =>
- break,
- Err(e) => {
- error!("unexpected error: {}", e);
- poll.deregister(sockets.get(&Token(counter)).unwrap()).unwrap();
- break;
- }
+ loop {
+ tokio::select! {
+ Some(msg) = peer.rx.recv() => {
+ // debug!("received bytes from peer: {:?}", &msg);
+ peer.bytes.get_mut().write_all(&msg).await?;
+ }
+ result = peer.bytes.next() => match result {
+ Some(Ok(msg)) => {
+ let msg: BytesMut = msg;
+ match msg.get(2).unwrap() {
+ 10 => { // PROPREQ
+ debug!("received property request command from client");
+ peer.bytes.get_mut()
+ .write_all(&create_property_update_command()).await?;
+
+ debug!("sent property update command to client");
}
- }
- },
- token if event.readiness().is_readable() => {
- loop {
- let read = sockets.get_mut(&token).unwrap().read(&mut buffer);
- match read {
- Ok(0) => { sockets.remove(&token); break; }
- Ok(n) => {
- let req = requests.get_mut(&token).unwrap();
- for b in &buffer[0..n] {
- req.push(*b);
- }
-
- // First byte means how long data section of the packet is.
- // Second byte is to be determined.
- // Third byte is the request type.
-
- // Match packet type by descriptor; **third** byte.
- match &buffer.get(2).unwrap() { // Third byte is 2 because Rust is zero-indexed.
- // PROPREQ
- 10 => {
- debug!(
- "received property request command from client with token '{}'",
- token.0
- );
- sockets.get_mut(&token).unwrap()
- .write_all(&create_property_update_command()).unwrap();
- debug!(
- "sent property update command to client with token '{}'",
- token.0
- );
- }
- // SESSINIT
- 6 => {
- debug!(
- "received session initialization command from client with token '{}'",
- token.0
- );
- sockets.get_mut(&token).unwrap()
- .write_all(&create_property_request_command()).unwrap();
- debug!(
- "sent session initialization command to client with token '{}'",
- token.0
- );
- }
- // PROPSET
- 15 => {
- debug!(
- "received property set command from client with token '{}'",
- token.0
- );
- sockets.get_mut(&token).unwrap()
- .write_all(&create_text_command_with_action(
- "WORLDSMASTER",
- "Welcome to Whirlsplash!"
- )).unwrap();
- debug!(
- "sent worldsmaster greeting to client with token '{}'",
- token.0
- );
- },
- // BUDDYLISTUPDATE
- 29 => {
- debug!(
- "received buddy list update command from client with token '{}'",
- token.0
- );
-
- let received_buddy = from_utf8(
- &buffer[4..*&buffer.get(0).unwrap().to_owned() as usize - 1]
- ).unwrap();
- debug!("received buddy: {}", received_buddy);
-
- sockets.get_mut(&token).unwrap()
- .write_all(&create_buddy_list_notify_command(received_buddy))
- .unwrap();
- debug!(
- "sent buddy notify update command to client with token '{}'",
- token.0
- );
- }
- // ROOMIDRQ
- 20 => {
- debug!(
- "received room id request command from client with token '{}'",
- token.0
- );
-
- let room_name = from_utf8(
- &buffer[4..*&buffer.get(0).unwrap().to_owned() as usize]
- ).unwrap();
- let mut room_id = 0;
- if !room_ids.contains(room_name) {
- room_ids.insert(room_name.to_string());
- room_id = room_ids.iter()
- .position(|i| i == room_name)
- .unwrap();
- trace!("inserted room '{}' as '{}'", room_name, room_id);
- } else {
- let pos = room_ids
- .iter()
- .position(|i| i == room_name)
- .unwrap();
- trace!("found room '{}' as '{}'", room_name, pos);
- room_id = pos;
- }
- trace!("room name: {}, room id: {}", room_name, room_id);
- trace!("{:?}", room_ids);
-
- // Passing `0` as `room_id` parameter as currently there is
- // no way to find out a room's ID based on it's name.
- sockets.get_mut(&token).unwrap()
- .write_all(&create_room_id_redirect_command(room_name, room_id))
- .unwrap();
- debug!("sent redirect id command to client with token '{}'", token.0)
- }
- // TEXT
- 14 => {
- debug!("received text command from client with token '{}'", token.0);
-
- // TODO: Make this into a command!
- let message = from_utf8(
- &buffer[6..*&buffer.get(0).unwrap().to_owned() as usize]
- ).unwrap();
- trace!("message: {}", message);
-
- // Using User as a placeholder. Ideally, this would print out the username of
- // the one who sent it.
- broadcast_to_all_clients(
- &sockets,
- &create_text_command(
- // Random integer is added to the end of "User", just a development
- // proof-of-concept. Since at this stage usernames aren't exactly kept,
- // we can identify message senders as their connection token; `token.0`.
- &format!("User{}", rand::thread_rng().gen_range(1..150).to_string()),
- message
- )
- );
- debug!(
- "broadcasted text command from client with token '{}'",
- token.0
- );
- }
- // SESSEXIT
- 7 => {
- debug!(
- "received session termination command from client with token '{}'",
- token.0
- );
- poll.deregister(sockets.get(&token).unwrap()).unwrap();
- debug!("de-registered client with token '{}'", token.0);
- }
- // Anything else, do nothing.
- _ => ()
- }
+ 6 => { // SESSINIT
+ username = parse_session_initialization_command(msg).username;
+ debug!(
+ "received session initialization command from client: {}",
+ username
+ );
+ peer.bytes.get_mut()
+ .write_all(&create_property_request_command()).await?;
+ debug!("sent session initialization command to client");
+ }
+ 15 => { // PROPSET
+ debug!("received property set command from client");
+ peer.bytes.get_mut()
+ .write_all(&create_text_command_with_action(
+ "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")?
+ )).await?;
+ debug!("sent worldsmaster greeting to client");
+ }
+ 29 => { // BUDDYLISTUPDATE
+ let received_buddy = from_utf8(
+ msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap()
+ ).unwrap();
+ debug!(
+ "received buddy list update command from client: {}",
+ received_buddy
+ );
+ peer.bytes.get_mut()
+ .write_all(&create_buddy_list_notify_command(received_buddy))
+ .await?;
+ debug!("sent buddy list notify command to client");
+ }
+ 20 => { // ROOMIDRQ
+ let room_name = from_utf8(
+ msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap()
+ ).unwrap();
+ debug!("received room id request command from client: {}", room_name);
+ let room_id;
+ if !room_ids.contains(&room_name.to_string()) {
+ room_ids.push(room_name.to_string());
+ room_id = room_ids.iter()
+ .position(|i| i == &room_name.to_string())
+ .unwrap();
+ trace!("inserted room '{}' as '{}'", room_name, room_id);
+ } else {
+ let position = room_ids.iter()
+ .position(|i| i == &room_name.to_string())
+ .unwrap();
+ trace!("found room '{}' as '{}'", room_name, position);
+ room_id = position;
}
- Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock =>
- break,
- Err(e) => { error!("unexpected error: {}", e); break; }
+ trace!("room name: {}, room id: {}", room_name, room_id);
+ trace!("{:?}", room_ids);
+ peer.bytes.get_mut()
+ .write_all(&create_room_id_redirect_command(room_name, room_id))
+ .await?;
+ debug!("sent redirect id command to client");
+ }
+ 14 => {
+ let text = from_utf8(
+ msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap()
+ ).unwrap();
+ debug!("received text command from client: {}", text);
+ let mut state = state.lock().await;
+ state.broadcast(&create_text_command(&username, text)).await;
+ debug!("broadcasted text command from client");
}
+ 7 => { // SESSEXIT
+ debug!("received session exit command from client")
+ }
+ _ => (),
}
-
- // Unimplemented
- // let ready = requests.get(&token).unwrap()
- // .windows(4)
- // .find(|window| is_double_crnl(*window))
- // .is_some();
- //
- // if ready {
- // let socket = sockets.get(&token).unwrap();
- // poll.reregister(
- // socket,
- // token,
- // Ready::writable(),
- // PollOpt::edge() | PollOpt::oneshot()).unwrap();
- // }
- },
- // Unimplemented
- // token if event.readiness().is_writable() => {
- // println!("writeable");
- // requests.get_mut(&token).unwrap().clear();
- // sockets.get_mut(&token).unwrap().write_all("test".as_bytes()).unwrap();
- //
- // // Re-use existing connection ("keep-alive") - switch back to reading
- // poll.reregister(
- // sockets.get(&token).unwrap(),
- // token,
- // Ready::readable(),
- // PollOpt::edge()).unwrap();
- // },
- _ => ()
+ }
+ Some(Err(e)) => {
+ error!("error while processing messages: {}", e); break;
+ }
+ None => break,
}
}
}
+
+ // De-register client
+ {
+ state.lock().await.peers.remove(&count.to_string());
+ debug!("removed peer: {}", count)
+ }
+
+ Ok(())
}
}