aboutsummaryrefslogtreecommitdiff
path: root/src/server
diff options
context:
space:
mode:
Diffstat (limited to 'src/server')
-rw-r--r--src/server/auto/cmd/mod.rs1
-rw-r--r--src/server/auto/cmd/property.rs36
-rw-r--r--src/server/auto/cmd/room.rs7
-rw-r--r--src/server/auto/cmd/session.rs18
-rw-r--r--src/server/auto/constants.rs13
-rw-r--r--src/server/auto/mod.rs3
-rw-r--r--src/server/auto/server.rs480
-rw-r--r--src/server/cmd/buddy_list.rs2
-rw-r--r--src/server/cmd/mod.rs2
-rw-r--r--src/server/cmd/session.rs6
-rw-r--r--src/server/mod.rs11
-rw-r--r--src/server/peer.rs26
-rw-r--r--src/server/room/cmd/mod.rs1
-rw-r--r--src/server/room/cmd/session.rs18
-rw-r--r--src/server/room/mod.rs1
-rw-r--r--src/server/room/server.rs351
-rw-r--r--src/server/shared.rs35
-rw-r--r--src/server/structures.rs (renamed from src/server/room/structures.rs)0
-rw-r--r--src/server/utils.rs13
19 files changed, 504 insertions, 520 deletions
diff --git a/src/server/auto/cmd/mod.rs b/src/server/auto/cmd/mod.rs
index 5430662..d2345c6 100644
--- a/src/server/auto/cmd/mod.rs
+++ b/src/server/auto/cmd/mod.rs
@@ -1,2 +1,3 @@
pub mod property;
pub mod room;
+pub mod session;
diff --git a/src/server/auto/cmd/property.rs b/src/server/auto/cmd/property.rs
index 01d4f43..b8d0085 100644
--- a/src/server/auto/cmd/property.rs
+++ b/src/server/auto/cmd/property.rs
@@ -1,38 +1,4 @@
-// struct NetToProperty {
-// _prop_id: i32,
-// _flags: i32,
-// _access: i32,
-// _string_value: String,
-// _bin_value: Vec<i32>,
-// }
-// impl NetToProperty {
-// fn parse_net_data() -> Self {
-// NetToProperty {
-// _prop_id: 0,
-// _flags: 0,
-// _access: 0,
-// _string_value: "".to_string(),
-// _bin_value: vec![]
-// }
-// }
-// }
-
-// TODO: Decode received data and send back a valid response.
-pub fn create_property_update_command() -> [u8; 147] { // Vec<u8>
- // let mut property = Vec::with_capacity(2);
- // property.push(0x01); // ?
- // property.push(0x10); // Command type
- //
- // // Meaningful Data
- // property.push(); // Property ID
- // property.push(); // Flags
- // property.push(); // Access
- //
- // // Insert data length as first byte.
- // property.insert(0, property.len() as u8 + 1); // ^
- //
- // property // Return created array
-
+pub fn create_property_update_command() -> [u8; 147] {
[
0x93, 0xFF, 0x10, 0x1B, 0x80, 0x01, 0x0C, 0x77, 0x6F,
0x72, 0x6C, 0x64, 0x73, 0x33, 0x64, 0x2E, 0x63, 0x6F,
diff --git a/src/server/auto/cmd/room.rs b/src/server/auto/cmd/room.rs
index ff839f3..78498fe 100644
--- a/src/server/auto/cmd/room.rs
+++ b/src/server/auto/cmd/room.rs
@@ -18,6 +18,13 @@ pub fn create_room_id_redirect_command(room_name: &str, room_id: usize) -> Vec<u
room_id_redirect.push(0x00);
// Port
+ // for byte in convert_u16_to_two_u8s_be(0x1629).iter() {
+ // room_id_redirect.push(*byte);
+ // }
+ // Port
+ // for byte in convert_u16_to_two_u8s_be(5673_i32 as u16).iter() {
+ // room_id_redirect.push(*byte);
+ // }
room_id_redirect.push(0x16);
room_id_redirect.push(0x29);
diff --git a/src/server/auto/cmd/session.rs b/src/server/auto/cmd/session.rs
new file mode 100644
index 0000000..8e8fbb7
--- /dev/null
+++ b/src/server/auto/cmd/session.rs
@@ -0,0 +1,18 @@
+use crate::server::cmd::session::SessionInitializationCommand;
+use bytes::BytesMut;
+use std::str::from_utf8;
+
+pub fn parse_session_initialization_command(
+ command: BytesMut
+) -> SessionInitializationCommand {
+ SessionInitializationCommand {
+ // protocol: command.get(4..4 + command.get(4)).unwrap().to_owned() as usize,
+ // client: "".to_string(),
+ username: from_utf8(
+ command.get(
+ 21..(20 + command.get(20).unwrap().to_owned() as usize + 1)
+ ).unwrap()
+ ).unwrap().to_string(),
+ // password: "".to_string()
+ }
+}
diff --git a/src/server/auto/constants.rs b/src/server/auto/constants.rs
deleted file mode 100644
index 10c9439..0000000
--- a/src/server/auto/constants.rs
+++ /dev/null
@@ -1,13 +0,0 @@
-use std::collections::HashMap;
-
-pub static ROOM_IDS: phf::Map<&'static str, &'static i32> = phf::phf_map! {
-"ChatElevator<dimension-1>" => &4,
-"IconViewRoom1Enter<dimension-1>" => &3,
-"ChatHall<dimension-1>" => &9,
-"IconViewRoom1<dimension-1>" => &12,
-"IconViewRoom1g<dimension-1>" => &27230,
-"ReceptionView1<dimension-1>" => &6,
-"ReceptionView2<dimension-1>" => &5,
-"Reception<dimension-1>" => &1,
-"staircase1<dimension-1>" => &11,
-};
diff --git a/src/server/auto/mod.rs b/src/server/auto/mod.rs
index 741d763..0a6f342 100644
--- a/src/server/auto/mod.rs
+++ b/src/server/auto/mod.rs
@@ -1,3 +1,2 @@
-pub mod cmd;
-// pub mod constants;
pub mod server;
+pub mod cmd;
diff --git a/src/server/auto/server.rs b/src/server/auto/server.rs
index 381852f..c05ed5e 100644
--- a/src/server/auto/server.rs
+++ b/src/server/auto/server.rs
@@ -1,266 +1,250 @@
-use mio::net::{TcpListener, TcpStream};
-use std::io::{Read, Write};
-use mio::{Poll, Token, Ready, PollOpt, Events};
-use std::collections::{HashMap, HashSet};
+use std::error::Error;
+use tokio::net::{TcpListener, TcpStream};
+use tokio::io::AsyncWriteExt;
+use crate::server::auto::cmd::property::{
+ create_property_update_command,
+ create_property_request_command
+};
+use tokio_util::codec::{BytesCodec, Decoder};
+use tokio_stream::StreamExt;
+use bytes::BytesMut;
+use crate::server::cmd::text::{create_text_command_with_action, create_text_command};
use std::str::from_utf8;
use crate::server::cmd::buddy_list::create_buddy_list_notify_command;
-use crate::server::cmd::text::{create_text_command, create_text_command_with_action};
-use super::cmd::property::{create_property_update_command, create_property_request_command};
-use super::cmd::room::create_room_id_redirect_command;
-use rand::Rng;
-use crate::server::utils::broadcast_to_all_clients;
-
-// pub struct ClientSocket {
-// tcp_stream: TcpStream,
-// username: String,
-// }
+use crate::server::auto::cmd::room::create_room_id_redirect_command;
+use std::sync::Arc;
+use tokio::sync::Mutex;
+use crate::server::shared::Shared;
+use crate::server::peer::Peer;
+use std::net::SocketAddr;
+use crate::server::auto::cmd::session::parse_session_initialization_command;
pub struct AutoServer;
impl AutoServer {
- pub fn new(listener: TcpListener) {
- let poll = Poll::new().unwrap();
- poll.register(
- &listener,
- Token(0),
- Ready::readable(),
- PollOpt::edge()
- ).unwrap();
-
- let mut counter: usize = 0;
- let mut sockets: HashMap<Token, TcpStream> = HashMap::new();
- let mut requests: HashMap<Token, Vec<u8>> = HashMap::new();
- let mut buffer = [0 as u8; 1024];
- // let mut room_ids: HashMap<&str, i32> = HashMap::new();
- let mut room_ids: HashSet<String> = HashSet::new();
+ pub async fn new(addr: &'static str) -> Result<(), Box<dyn Error>> {
+ let listener = TcpListener::bind(addr).await?;
+ debug!("AutoServer now listening on {}", listener.local_addr().unwrap());
+ let state = Arc::new(Mutex::new(Shared::new()));
+ let mut counter = 0;
- let mut events = Events::with_capacity(1024);
loop {
- poll.poll(&mut events, None).unwrap();
- for event in &events {
- match event.token() {
- Token(0) => {
- loop {
- match listener.accept() {
- Ok((socket, address)) => {
- counter += 1;
- let token = Token(counter);
-
- poll.register(
- &socket,
- token,
- Ready::readable(),
- PollOpt::edge()
- ).unwrap();
+ let (stream, address) = listener.accept().await?;
+ counter += 1;
+ let state = Arc::clone(&state);
+
+ tokio::spawn(async move {
+ if let Err(e) = AutoServer::handle(
+ state,
+ stream,
+ address,
+ counter
+ ).await {
+ error!("an error occurred: {}", e);
+ }
+ });
+ }
+ }
- info!(
- "registered client with ip '{}' as token '{}'",
- address.ip(), token.0
- );
+ pub async fn handle(
+ state: Arc<Mutex<Shared>>,
+ stream: TcpStream,
+ address: SocketAddr,
+ count: usize,
+ ) -> Result<(), Box<dyn Error>> {
+ let bytes = BytesCodec::new().framed(stream);
+ let mut peer = Peer::new(state.clone(), bytes, count.to_string()).await?;
+ debug!("registered peer with address '{}' as '{}'", address, count);
+ let mut room_ids: Vec<String> = Vec::new();
+ let mut username: String = String::new();
+
+ // while let Some(msg) = peer.bytes.next().await {
+ // match msg {
+ // Ok(bytes) => match bytes.get(2).unwrap() {
+ // 10 => { // PROPREQ
+ // debug!("received property request command from client");
+ // peer.bytes.get_mut()
+ // .write_all(&create_property_update_command()).await?;
+ //
+ // debug!("sent property update command to client");
+ // }
+ // 6 => { // SESSINIT
+ // username = parse_session_initialization_command(bytes).username;
+ // debug!(
+ // "received session initialization command from client: {}",
+ // username
+ // );
+ // peer.bytes.get_mut()
+ // .write_all(&create_property_request_command()).await?;
+ // debug!("sent session initialization command to client");
+ // }
+ // 15 => { // PROPSET
+ // debug!("received property set command from client");
+ // peer.bytes.get_mut()
+ // .write_all(&create_text_command_with_action(
+ // "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")?
+ // )).await?;
+ // debug!("sent worldsmaster greeting to client");
+ // }
+ // 29 => { // BUDDYLISTUPDATE
+ // let received_buddy = from_utf8(
+ // bytes.get(4..bytes.get(0).unwrap().to_owned() as usize - 1).unwrap()
+ // ).unwrap();
+ // debug!(
+ // "received buddy list update command from client: {}",
+ // received_buddy
+ // );
+ // peer.bytes.get_mut()
+ // .write_all(&create_buddy_list_notify_command(received_buddy))
+ // .await?;
+ // debug!("sent buddy list notify command to client");
+ // }
+ // 20 => { // ROOMIDRQ
+ // let room_name = from_utf8(
+ // bytes.get(4..bytes.get(0).unwrap().to_owned() as usize).unwrap()
+ // ).unwrap();
+ // debug!("received room id request command from client: {}", room_name);
+ // let room_id;
+ // if !room_ids.contains(&room_name.to_string()) {
+ // room_ids.push(room_name.to_string());
+ // room_id = room_ids.iter()
+ // .position(|i| i == &room_name.to_string())
+ // .unwrap();
+ // trace!("inserted room '{}' as '{}'", room_name, room_id);
+ // } else {
+ // let position = room_ids.iter()
+ // .position(|i| i == &room_name.to_string())
+ // .unwrap();
+ // trace!("found room '{}' as '{}'", room_name, position);
+ // room_id = position;
+ // }
+ // trace!("room name: {}, room id: {}", room_name, room_id);
+ // trace!("{:?}", room_ids);
+ // peer.bytes.get_mut()
+ // .write_all(&create_room_id_redirect_command(room_name, room_id))
+ // .await?;
+ // debug!("sent redirect id command to client");
+ // }
+ // 14 => {
+ // let text = from_utf8(
+ // bytes.get(6..bytes.get(0).unwrap().to_owned() as usize).unwrap()
+ // ).unwrap();
+ // debug!("received text command from client: {}", text);
+ // let mut state = state.lock().await;
+ // state.broadcast(&create_text_command(&username, text)).await;
+ // debug!("broadcasted text command from client");
+ // }
+ // 7 => { // SESSEXIT
+ // debug!("received session exit command from client")
+ // }
+ // _ => (),
+ // }
+ // Err(err) => error!("stream closed with error: {:?}", err),
+ // }
+ // }
- sockets.insert(token, socket);
- requests.insert(token, Vec::with_capacity(192));
- }
- Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock =>
- break,
- Err(e) => {
- error!("unexpected error: {}", e);
- poll.deregister(sockets.get(&Token(counter)).unwrap()).unwrap();
- break;
- }
+ loop {
+ tokio::select! {
+ Some(msg) = peer.rx.recv() => {
+ // debug!("received bytes from peer: {:?}", &msg);
+ peer.bytes.get_mut().write_all(&msg).await?;
+ }
+ result = peer.bytes.next() => match result {
+ Some(Ok(msg)) => {
+ let msg: BytesMut = msg;
+ match msg.get(2).unwrap() {
+ 10 => { // PROPREQ
+ debug!("received property request command from client");
+ peer.bytes.get_mut()
+ .write_all(&create_property_update_command()).await?;
+
+ debug!("sent property update command to client");
}
- }
- },
- token if event.readiness().is_readable() => {
- loop {
- let read = sockets.get_mut(&token).unwrap().read(&mut buffer);
- match read {
- Ok(0) => { sockets.remove(&token); break; }
- Ok(n) => {
- let req = requests.get_mut(&token).unwrap();
- for b in &buffer[0..n] {
- req.push(*b);
- }
-
- // First byte means how long data section of the packet is.
- // Second byte is to be determined.
- // Third byte is the request type.
-
- // Match packet type by descriptor; **third** byte.
- match &buffer.get(2).unwrap() { // Third byte is 2 because Rust is zero-indexed.
- // PROPREQ
- 10 => {
- debug!(
- "received property request command from client with token '{}'",
- token.0
- );
- sockets.get_mut(&token).unwrap()
- .write_all(&create_property_update_command()).unwrap();
- debug!(
- "sent property update command to client with token '{}'",
- token.0
- );
- }
- // SESSINIT
- 6 => {
- debug!(
- "received session initialization command from client with token '{}'",
- token.0
- );
- sockets.get_mut(&token).unwrap()
- .write_all(&create_property_request_command()).unwrap();
- debug!(
- "sent session initialization command to client with token '{}'",
- token.0
- );
- }
- // PROPSET
- 15 => {
- debug!(
- "received property set command from client with token '{}'",
- token.0
- );
- sockets.get_mut(&token).unwrap()
- .write_all(&create_text_command_with_action(
- "WORLDSMASTER",
- "Welcome to Whirlsplash!"
- )).unwrap();
- debug!(
- "sent worldsmaster greeting to client with token '{}'",
- token.0
- );
- },
- // BUDDYLISTUPDATE
- 29 => {
- debug!(
- "received buddy list update command from client with token '{}'",
- token.0
- );
-
- let received_buddy = from_utf8(
- &buffer[4..*&buffer.get(0).unwrap().to_owned() as usize - 1]
- ).unwrap();
- debug!("received buddy: {}", received_buddy);
-
- sockets.get_mut(&token).unwrap()
- .write_all(&create_buddy_list_notify_command(received_buddy))
- .unwrap();
- debug!(
- "sent buddy notify update command to client with token '{}'",
- token.0
- );
- }
- // ROOMIDRQ
- 20 => {
- debug!(
- "received room id request command from client with token '{}'",
- token.0
- );
-
- let room_name = from_utf8(
- &buffer[4..*&buffer.get(0).unwrap().to_owned() as usize]
- ).unwrap();
- let mut room_id = 0;
- if !room_ids.contains(room_name) {
- room_ids.insert(room_name.to_string());
- room_id = room_ids.iter()
- .position(|i| i == room_name)
- .unwrap();
- trace!("inserted room '{}' as '{}'", room_name, room_id);
- } else {
- let pos = room_ids
- .iter()
- .position(|i| i == room_name)
- .unwrap();
- trace!("found room '{}' as '{}'", room_name, pos);
- room_id = pos;
- }
- trace!("room name: {}, room id: {}", room_name, room_id);
- trace!("{:?}", room_ids);
-
- // Passing `0` as `room_id` parameter as currently there is
- // no way to find out a room's ID based on it's name.
- sockets.get_mut(&token).unwrap()
- .write_all(&create_room_id_redirect_command(room_name, room_id))
- .unwrap();
- debug!("sent redirect id command to client with token '{}'", token.0)
- }
- // TEXT
- 14 => {
- debug!("received text command from client with token '{}'", token.0);
-
- // TODO: Make this into a command!
- let message = from_utf8(
- &buffer[6..*&buffer.get(0).unwrap().to_owned() as usize]
- ).unwrap();
- trace!("message: {}", message);
-
- // Using User as a placeholder. Ideally, this would print out the username of
- // the one who sent it.
- broadcast_to_all_clients(
- &sockets,
- &create_text_command(
- // Random integer is added to the end of "User", just a development
- // proof-of-concept. Since at this stage usernames aren't exactly kept,
- // we can identify message senders as their connection token; `token.0`.
- &format!("User{}", rand::thread_rng().gen_range(1..150).to_string()),
- message
- )
- );
- debug!(
- "broadcasted text command from client with token '{}'",
- token.0
- );
- }
- // SESSEXIT
- 7 => {
- debug!(
- "received session termination command from client with token '{}'",
- token.0
- );
- poll.deregister(sockets.get(&token).unwrap()).unwrap();
- debug!("de-registered client with token '{}'", token.0);
- }
- // Anything else, do nothing.
- _ => ()
- }
+ 6 => { // SESSINIT
+ username = parse_session_initialization_command(msg).username;
+ debug!(
+ "received session initialization command from client: {}",
+ username
+ );
+ peer.bytes.get_mut()
+ .write_all(&create_property_request_command()).await?;
+ debug!("sent session initialization command to client");
+ }
+ 15 => { // PROPSET
+ debug!("received property set command from client");
+ peer.bytes.get_mut()
+ .write_all(&create_text_command_with_action(
+ "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")?
+ )).await?;
+ debug!("sent worldsmaster greeting to client");
+ }
+ 29 => { // BUDDYLISTUPDATE
+ let received_buddy = from_utf8(
+ msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap()
+ ).unwrap();
+ debug!(
+ "received buddy list update command from client: {}",
+ received_buddy
+ );
+ peer.bytes.get_mut()
+ .write_all(&create_buddy_list_notify_command(received_buddy))
+ .await?;
+ debug!("sent buddy list notify command to client");
+ }
+ 20 => { // ROOMIDRQ
+ let room_name = from_utf8(
+ msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap()
+ ).unwrap();
+ debug!("received room id request command from client: {}", room_name);
+ let room_id;
+ if !room_ids.contains(&room_name.to_string()) {
+ room_ids.push(room_name.to_string());
+ room_id = room_ids.iter()
+ .position(|i| i == &room_name.to_string())
+ .unwrap();
+ trace!("inserted room '{}' as '{}'", room_name, room_id);
+ } else {
+ let position = room_ids.iter()
+ .position(|i| i == &room_name.to_string())
+ .unwrap();
+ trace!("found room '{}' as '{}'", room_name, position);
+ room_id = position;
}
- Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock =>
- break,
- Err(e) => { error!("unexpected error: {}", e); break; }
+ trace!("room name: {}, room id: {}", room_name, room_id);
+ trace!("{:?}", room_ids);
+ peer.bytes.get_mut()
+ .write_all(&create_room_id_redirect_command(room_name, room_id))
+ .await?;
+ debug!("sent redirect id command to client");
+ }
+ 14 => {
+ let text = from_utf8(
+ msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap()
+ ).unwrap();
+ debug!("received text command from client: {}", text);
+ let mut state = state.lock().await;
+ state.broadcast(&create_text_command(&username, text)).await;
+ debug!("broadcasted text command from client");
}
+ 7 => { // SESSEXIT
+ debug!("received session exit command from client")
+ }
+ _ => (),
}
-
- // Unimplemented
- // let ready = requests.get(&token).unwrap()
- // .windows(4)
- // .find(|window| is_double_crnl(*window))
- // .is_some();
- //
- // if ready {
- // let socket = sockets.get(&token).unwrap();
- // poll.reregister(
- // socket,
- // token,
- // Ready::writable(),
- // PollOpt::edge() | PollOpt::oneshot()).unwrap();
- // }
- },
- // Unimplemented
- // token if event.readiness().is_writable() => {
- // println!("writeable");
- // requests.get_mut(&token).unwrap().clear();
- // sockets.get_mut(&token).unwrap().write_all("test".as_bytes()).unwrap();
- //
- // // Re-use existing connection ("keep-alive") - switch back to reading
- // poll.reregister(
- // sockets.get(&token).unwrap(),
- // token,
- // Ready::readable(),
- // PollOpt::edge()).unwrap();
- // },
- _ => ()
+ }
+ Some(Err(e)) => {
+ error!("error while processing messages: {}", e); break;
+ }
+ None => break,
}
}
}
+
+ // De-register client
+ {
+ state.lock().await.peers.remove(&count.to_string());
+ debug!("removed peer: {}", count)
+ }
+
+ Ok(())
}
}
diff --git a/src/server/cmd/buddy_list.rs b/src/server/cmd/buddy_list.rs
index 1423808..cbddc19 100644
--- a/src/server/cmd/buddy_list.rs
+++ b/src/server/cmd/buddy_list.rs
@@ -1,5 +1,3 @@
-/// In the future, this will take a `Vec` of buddies and dynamically
-/// create a response packet based on the amount of buddies given.
pub fn create_buddy_list_notify_command(buddy: &str) -> Vec<u8> {
let mut buddy_list_notify = Vec::with_capacity(5 + buddy.len());
buddy_list_notify.push(0x01); // ?
diff --git a/src/server/cmd/mod.rs b/src/server/cmd/mod.rs
index c211484..196a7a5 100644
--- a/src/server/cmd/mod.rs
+++ b/src/server/cmd/mod.rs
@@ -1,3 +1,3 @@
pub mod buddy_list;
-// pub mod property;
+pub mod session;
pub mod text;
diff --git a/src/server/cmd/session.rs b/src/server/cmd/session.rs
new file mode 100644
index 0000000..9d48bcc
--- /dev/null
+++ b/src/server/cmd/session.rs
@@ -0,0 +1,6 @@
+pub struct SessionInitializationCommand {
+ // pub protocol: usize,
+ // pub client: String,
+ pub username: String,
+ // pub password: String,
+}
diff --git a/src/server/mod.rs b/src/server/mod.rs
index cb241f2..3698d84 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -1,4 +1,11 @@
+use tokio::sync::mpsc;
+use bytes::BytesMut;
+
pub mod auto;
-pub mod cmd;
+mod cmd;
pub mod room;
-pub mod utils;
+mod shared;
+mod peer;
+
+type Tx = mpsc::UnboundedSender<BytesMut>;
+type Rx = mpsc::UnboundedReceiver<BytesMut>;
diff --git a/src/server/peer.rs b/src/server/peer.rs
new file mode 100644
index 0000000..e92a3db
--- /dev/null
+++ b/src/server/peer.rs
@@ -0,0 +1,26 @@
+use tokio_util::codec::{Framed, BytesCodec};
+use tokio::net::TcpStream;
+use crate::server::Rx;
+use std::sync::Arc;
+use tokio::sync::Mutex;
+use crate::server::shared::Shared;
+use tokio::sync::mpsc;
+
+pub struct Peer {
+ pub bytes: Framed<TcpStream, BytesCodec>,
+ // pub(crate) rx: Rx,
+ pub rx: Rx,
+}
+impl Peer {
+ pub async fn new(
+ state: Arc<Mutex<Shared>>,
+ bytes: Framed<TcpStream, BytesCodec>,
+ username: String,
+ ) -> std::io::Result<Peer> {
+ // let address = bytes.get_ref().peer_addr()?;
+ let (tx, rx) = mpsc::unbounded_channel();
+ // state.lock().await.peers.insert(address, tx);
+ state.lock().await.peers.insert(username, tx);
+ Ok(Peer { bytes, rx })
+ }
+}
diff --git a/src/server/room/cmd/mod.rs b/src/server/room/cmd/mod.rs
index f348cd0..1d123c0 100644
--- a/src/server/room/cmd/mod.rs
+++ b/src/server/room/cmd/mod.rs
@@ -1 +1,2 @@
pub mod property;
+pub mod session;
diff --git a/src/server/room/cmd/session.rs b/src/server/room/cmd/session.rs
new file mode 100644
index 0000000..f04c078
--- /dev/null
+++ b/src/server/room/cmd/session.rs
@@ -0,0 +1,18 @@
+use crate::server::cmd::session::SessionInitializationCommand;
+use bytes::BytesMut;
+use std::str::from_utf8;
+
+pub fn parse_session_initialization_command(
+ command: BytesMut
+) -> SessionInitializationCommand {
+ SessionInitializationCommand {
+ // protocol: command.get(4..4 + command.get(4)).unwrap().to_owned() as usize,
+ // client: "".to_string(),
+ username: from_utf8(
+ command.get(
+ 25..(24 + command.get(24).unwrap().to_owned() as usize + 1)
+ ).unwrap()
+ ).unwrap().to_string(),
+ // password: "".to_string()
+ }
+}
diff --git a/src/server/room/mod.rs b/src/server/room/mod.rs
index f581dfe..24606ea 100644
--- a/src/server/room/mod.rs
+++ b/src/server/room/mod.rs
@@ -1,3 +1,2 @@
pub mod cmd;
pub mod server;
-mod structures;
diff --git a/src/server/room/server.rs b/src/server/room/server.rs
index c005e40..8781868 100644
--- a/src/server/room/server.rs
+++ b/src/server/room/server.rs
@@ -1,222 +1,167 @@
-use mio::net::{TcpListener, TcpStream};
-use std::io::{Read, Write};
-use mio::{Poll, Token, Ready, PollOpt, Events};
-use std::collections::HashMap;
+use std::error::Error;
+use tokio::net::{TcpListener, TcpStream};
+use tokio::io::AsyncWriteExt;
+use crate::server::room::cmd::property::{
+ create_property_update_command,
+ create_property_request_command
+};
+use tokio_util::codec::{BytesCodec, Decoder};
+use tokio_stream::StreamExt;
+use bytes::BytesMut;
+use crate::server::cmd::text::{create_text_command_with_action, create_text_command};
use std::str::from_utf8;
use crate::server::cmd::buddy_list::create_buddy_list_notify_command;
-use crate::server::cmd::text::create_text_command;
-// use crate::cmd::property::{create_property_update_command, create_property_request_command};
-use crate::server::room::cmd::property::{create_property_update_command, create_property_request_command};
-use rand::Rng;
-use crate::server::utils::broadcast_to_all_clients;
+use crate::server::auto::cmd::room::create_room_id_redirect_command;
+use std::sync::Arc;
+use tokio::sync::Mutex;
+use crate::server::shared::Shared;
+use crate::server::peer::Peer;
+use std::net::SocketAddr;
+use crate::server::room::cmd::session::parse_session_initialization_command;
pub struct RoomServer;
impl RoomServer {
- pub fn new(listener: TcpListener) {
- let poll = Poll::new().unwrap();
- poll.register(
- &listener,
- Token(0),
- Ready::readable(),
- PollOpt::edge()
- ).unwrap();
+ pub async fn new(addr: &'static str) -> Result<(), Box<dyn Error>> {
+ let listener = TcpListener::bind(addr).await?;
+ debug!("RoomServer now listening on {}", listener.local_addr().unwrap());
+ let state = Arc::new(Mutex::new(Shared::new()));
+ let mut counter = 0;
- let mut counter: usize = 0;
- let mut sockets: HashMap<Token, TcpStream> = HashMap::new();
- let mut requests: HashMap<Token, Vec<u8>> = HashMap::new();
- let mut buffer = [0 as u8; 1024];
-
- let mut events = Events::with_capacity(1024);
loop {
- poll.poll(&mut events, None).unwrap();
- for event in &events {
- match event.token() {
- Token(0) => {
- loop {
- match listener.accept() {
- Ok((socket, address)) => {
- counter += 1;
- let token = Token(counter);
-
- poll.register(
- &socket,
- token,
- Ready::readable(),
- PollOpt::edge()
- ).unwrap();
-
- info!(
- "registered client with ip '{}' as token '{}'",
- address.ip(), token.0
- );
-
- sockets.insert(token, socket);
- requests.insert(token, Vec::with_capacity(192));
- }
- Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock =>
- break,
- Err(e) => {
- error!("unexpected error: {}", e);
- poll.deregister(sockets.get(&Token(counter)).unwrap()).unwrap();
- break;
- }
- }
- }
- },
- token if event.readiness().is_readable() => {
- loop {
- let read = sockets.get_mut(&token).unwrap().read(&mut buffer);
- match read {
- Ok(0) => { sockets.remove(&token); break; }
- Ok(n) => {
- let req = requests.get_mut(&token).unwrap();
- for b in &buffer[0..n] {
- req.push(*b);
- }
+ let (stream, address) = listener.accept().await?;
+ counter += 1;
+ let state = Arc::clone(&state);
- // First byte means how long data section of the packet is.
- // Second byte is to be determined.
- // Third byte is the request type.
+ tokio::spawn(async move {
+ if let Err(e) = RoomServer::handle(
+ state,
+ stream,
+ address,
+ counter
+ ).await {
+ error!("an error occurred: {}", e);
+ }
+ });
+ }
+ }
- // Match packet type by descriptor; **third** byte.
- match &buffer.get(2).unwrap() { // Third byte is 2 because Rust is zero-indexed.
- // PROPREQ
- 10 => {
- debug!(
- "received property request command from client with token '{}'",
- token.0
- );
- sockets.get_mut(&token).unwrap()
- .write_all(&create_property_update_command()).unwrap();
- debug!(
- "sent property update from client with token '{}'",
- token.0
- );
- }
- // SESSINIT
- 6 => {
- debug!(
- "received session initialization command from client with token '{}'",
- token.0
- );
- sockets.get_mut(&token).unwrap()
- .write_all(&create_property_request_command()).unwrap();
- debug!(
- "sent session initialization command from client with token '{}'",
- token.0
- );
- }
- // PROPSET
- 15 => debug!(
- "received property set command from client with token '{}'",
- token.0
- ),
- // BUDDYLISTUPDATE
- 29 => {
- debug!(
- "received buddy list update command from client with token '{}'",
- token.0
- );
- sockets.get_mut(&token).unwrap()
- .write_all(&create_buddy_list_notify_command("Wirlaburla"))
- .unwrap();
- debug!(
- "sent buddy notify update command from client with token '{}'",
- token.0
- );
- }
- // ROOMIDRQ
- 20 => debug!(
- "received room id request command from client with token '{}'",
- token.0
- ),
- // TEXT
- 14 => {
- debug!(
- "received text command from client with token '{}'",
- token.0
- );
+ pub async fn handle(
+ state: Arc<Mutex<Shared>>,
+ stream: TcpStream,
+ address: SocketAddr,
+ count: usize,
+ ) -> Result<(), Box<dyn Error>> {
+ let bytes = BytesCodec::new().framed(stream);
+ let mut peer = Peer::new(state.clone(), bytes, count.to_string()).await?;
+ debug!("registered peer with address '{}' as '{}'", address, count);
+ let mut room_ids: Vec<String> = Vec::new();
+ let mut username: String = String::new();
- // TODO: Make this into a command!
- let message = from_utf8(
- &buffer[6..*&buffer.get(0).unwrap().to_owned() as usize]
- ).unwrap();
- trace!("message: {}", message);
+ loop {
+ tokio::select! {
+ Some(msg) = peer.rx.recv() => {
+ // debug!("received bytes from peer: {:?}", &msg);
+ peer.bytes.get_mut().write_all(&msg).await?;
+ }
+ result = peer.bytes.next() => match result {
+ Some(Ok(msg)) => {
+ // let mut state = state.lock().await;
+ // state.broadcast("", &msg).await;
+ let msg: BytesMut = msg;
+ match msg.get(2).unwrap() {
+ 10 => { // PROPREQ
+ debug!("received property request command from client");
+ peer.bytes.get_mut()
+ .write_all(&create_property_update_command()).await?;
- // Using User as a placeholder. Ideally, this would print out the username of
- // the one who sent it.
- broadcast_to_all_clients(
- &sockets,
- &create_text_command(
- // Random integer is added to the end of "User", just a development
- // proof-of-concept. Since at this stage usernames aren't exactly kept,
- // we can identify message senders as their connection token; `token.0`.
- &format!("User{}", rand::thread_rng().gen_range(1..150).to_string()),
- message
- )
- );
- debug!(
- "broadcasted text command from client with token '{}'",
- token.0
- );
- }
- // SESSEXIT
- 7 => {
- debug!(
- "received session termination command from client with token '{}'",
- token.0
- );
- poll.deregister(sockets.get(&token).unwrap()).unwrap();
- debug!("de-registered client with token '{}'", token.0);
- }
- // SUBSCRIB
- 16 => {
- debug!(
- "received room subscription command from client with token '{}'",
- token.0
- );
- }
- // Anything else, do nothing.
- _ => ()
- }
+ debug!("sent property update command to client");
+ }
+ 6 => { // SESSINIT
+ username = parse_session_initialization_command(msg).username;
+ debug!(
+ "received session initialization command from client: {}",
+ username
+ );
+ peer.bytes.get_mut()
+ .write_all(&create_property_request_command()).await?;
+ debug!("sent session initialization command to client");
+ }
+ 15 => { // PROPSET
+ debug!("received property set command from client");
+ peer.bytes.get_mut()
+ .write_all(&create_text_command_with_action(
+ "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")?
+ )).await?;
+ debug!("sent worldsmaster greeting to client");
+ }
+ 29 => { // BUDDYLISTUPDATE
+ let received_buddy = from_utf8(
+ msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap()
+ ).unwrap();
+ debug!(
+ "received buddy list update command from client: {}",
+ received_buddy
+ );
+ peer.bytes.get_mut()
+ .write_all(&create_buddy_list_notify_command(received_buddy))
+ .await?;
+ debug!("sent buddy list notify command to client");
+ }
+ 20 => { // ROOMIDRQ
+ let room_name = from_utf8(
+ msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap()
+ ).unwrap();
+ debug!("received room id request command from client: {}", room_name);
+ let room_id;
+ if !room_ids.contains(&room_name.to_string()) {
+ room_ids.push(room_name.to_string());
+ room_id = room_ids.iter()
+ .position(|i| i == &room_name.to_string())
+ .unwrap();
+ trace!("inserted room '{}' as '{}'", room_name, room_id);
+ } else {
+ let position = room_ids.iter()
+ .position(|i| i == &room_name.to_string())
+ .unwrap();
+ trace!("found room '{}' as '{}'", room_name, position);
+ room_id = position;
}
- Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock =>
- break,
- Err(e) => { error!("unexpected error: {}", e); break; }
+ trace!("room name: {}, room id: {}", room_name, room_id);
+ trace!("{:?}", room_ids);
+ peer.bytes.get_mut()
+ .write_all(&create_room_id_redirect_command(room_name, room_id))
+ .await?;
+ debug!("sent redirect id command to client");
+ }
+ 14 => {
+ let text = from_utf8(
+ msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap()
+ ).unwrap();
+ debug!("received text command from client: {}", text);
+ let mut state = state.lock().await;
+ state.broadcast(&create_text_command(&username, text)).await;
+ debug!("broadcasted text command from client");
}
+ 7 => { // SESSEXIT
+ debug!("received session exit command from client")
+ }
+ _ => (),
}
-
- // Unimplemented
- // let ready = requests.get(&token).unwrap()
- // .windows(4)
- // .find(|window| is_double_crnl(*window))
- // .is_some();
- //
- // if ready {
- // let socket = sockets.get(&token).unwrap();
- // poll.reregister(
- // socket,
- // token,
- // Ready::writable(),
- // PollOpt::edge() | PollOpt::oneshot()).unwrap();
- // }
- },
- // Unimplemented
- // token if event.readiness().is_writable() => {
- // println!("writeable");
- // requests.get_mut(&token).unwrap().clear();
- // sockets.get_mut(&token).unwrap().write_all("test".as_bytes()).unwrap();
- //
- // // Re-use existing connection ("keep-alive") - switch back to reading
- // poll.reregister(
- // sockets.get(&token).unwrap(),
- // token,
- // Ready::readable(),
- // PollOpt::edge()).unwrap();
- // },
- _ => ()
+ }
+ Some(Err(e)) => {
+ error!("error while processing messages: {}", e); break;
+ }
+ None => break,
}
}
}
+
+ // De-register client
+ {
+ state.lock().await.peers.remove(&count.to_string());
+ debug!("removed peer: {}", count)
+ }
+
+ Ok(())
}
}
diff --git a/src/server/shared.rs b/src/server/shared.rs
new file mode 100644
index 0000000..d7ff6c9
--- /dev/null
+++ b/src/server/shared.rs
@@ -0,0 +1,35 @@
+use std::collections::HashMap;
+// use std::net::SocketAddr;
+use bytes::BytesMut;
+use crate::server::Tx;
+
+pub struct Shared {
+ pub peers: HashMap<String, Tx>,
+}
+impl Shared {
+ pub fn new() -> Self {
+ Shared {
+ peers: HashMap::new(),
+ }
+ }
+
+ pub async fn broadcast(&mut self, /* sender: &str, */ message: &[u8]) {
+ // debug!("peer sent message: {:?}", message);
+ // debug!("peer count: {}", self.peers.len());
+ // debug!("peers: {:?}", self.peers);
+ for peer in self.peers.iter_mut() {
+ // debug!("peer: {:?}", peer);
+ // TODO:
+ // thread 'tokio-runtime-worker' panicked at 'called `Option::unwrap()` on a `None` value'
+ peer.1.send(BytesMut::from(message)).unwrap();
+ }
+ }
+
+ // pub async fn broadcast(&mut self, sender: SocketAddr, message: &str) {
+ // for peer in self.peers.iter_mut() {
+ // if *peer.0 != sender {
+ // let _ = peer.1.send(message.into());
+ // }
+ // }
+ // }
+}
diff --git a/src/server/room/structures.rs b/src/server/structures.rs
index 25fcf00..25fcf00 100644
--- a/src/server/room/structures.rs
+++ b/src/server/structures.rs
diff --git a/src/server/utils.rs b/src/server/utils.rs
deleted file mode 100644
index 000b788..0000000
--- a/src/server/utils.rs
+++ /dev/null
@@ -1,13 +0,0 @@
-use std::collections::HashMap;
-use mio::Token;
-use mio::net::TcpStream;
-use std::io::Write;
-
-pub fn broadcast_to_all_clients(
- sockets: &HashMap<Token, TcpStream>,
- message: &[u8]
-) -> () {
- for mut socket in sockets {
- socket.1.write_all(message).unwrap();
- }
-}