aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorFuwn <[email protected]>2021-03-25 22:20:21 +0000
committerFuwn <[email protected]>2021-03-25 22:20:21 +0000
commit5fc28bfb2851441893ef2ad5f72e0feb8a0a22cc (patch)
treef39b7bccc486298b1b7f4945cad37b4839817b73 /src
parentfeature: Byte utilities (diff)
downloadwhirl-5fc28bfb2851441893ef2ad5f72e0feb8a0a22cc.tar.xz
whirl-5fc28bfb2851441893ef2ad5f72e0feb8a0a22cc.zip
major: Publish work-in-progress
Diffstat (limited to 'src')
-rw-r--r--src/db/mod.rs1
-rw-r--r--src/db/routines.rs62
-rw-r--r--src/lib.rs6
-rw-r--r--src/main.rs44
-rw-r--r--src/server/auto/cmd/mod.rs1
-rw-r--r--src/server/auto/cmd/property.rs36
-rw-r--r--src/server/auto/cmd/room.rs7
-rw-r--r--src/server/auto/cmd/session.rs18
-rw-r--r--src/server/auto/constants.rs13
-rw-r--r--src/server/auto/mod.rs3
-rw-r--r--src/server/auto/server.rs480
-rw-r--r--src/server/cmd/buddy_list.rs2
-rw-r--r--src/server/cmd/mod.rs2
-rw-r--r--src/server/cmd/session.rs6
-rw-r--r--src/server/mod.rs11
-rw-r--r--src/server/peer.rs26
-rw-r--r--src/server/room/cmd/mod.rs1
-rw-r--r--src/server/room/cmd/session.rs18
-rw-r--r--src/server/room/mod.rs1
-rw-r--r--src/server/room/server.rs351
-rw-r--r--src/server/shared.rs35
-rw-r--r--src/server/structures.rs (renamed from src/server/room/structures.rs)0
-rw-r--r--src/server/utils.rs13
-rw-r--r--src/utils/db.rs11
-rw-r--r--src/utils/mod.rs1
-rw-r--r--src/utils/web.rs7
26 files changed, 528 insertions, 628 deletions
diff --git a/src/db/mod.rs b/src/db/mod.rs
index 461c354..5ec9ffb 100644
--- a/src/db/mod.rs
+++ b/src/db/mod.rs
@@ -1,2 +1 @@
-// mod routines;
mod tables;
diff --git a/src/db/routines.rs b/src/db/routines.rs
deleted file mode 100644
index 884944d..0000000
--- a/src/db/routines.rs
+++ /dev/null
@@ -1,62 +0,0 @@
-use rusqlite::{params, Connection, Result};
-use crate::db::tables::SerialNumbers;
-
-#[repr(i32)] #[derive(Debug, PartialEq)]
-enum AccountStatus {
- AccountInactive = 0,
- AccountActive = 1,
-}
-
-fn modify_account_status(username: &str, status: AccountStatus) -> Result<()> {
- let connection = Connection::open("worlds.db")?;
-
- // language=SQLite
- connection.execute(
- "UPDATE user_registration \
- SET account_status = (?1) \
- where username = '(?2)'",
- params![status as i32, username]
- )?;
-
- Ok(())
-}
-
-fn delete_account(username: &str) -> Result<()> {
- let connection = Connection::open("worlds.db")?;
-
- // Get serial_number from `username`'s row.
- // language=SQLite
- connection.query_row(
- "SELECT * FROM serial_numbers WHERE username = '(?1)'",
- params![username],
- |row| row.get(0)
- );
-
- let mut row = connection.prepare(
- "SELECT * \
- FROM serial_numbers \
- WHERE username = '(?1)';"
- )?;
- let row_iter = row.query_map(params![username], |row| {
- Ok(SerialNumbers {
- serial_number: row.get(0)?,
- user_name: row.get(1)?,
- serial_status: row.get(2)?,
- })
- })?;
-
- // Reset serial number so it can be reused.
- // language=SQLite
- connection.execute(
- "UPDATE serial_numbers \
- SET username = 'none', serial_status = 0 \
- WHERE serial_number = '(?1)'",
- params![row_iter.]
- )?;
-
- Ok(())
-}
-
-fn set_account_host() { }
-
-fn modify_account_vip() { }
diff --git a/src/lib.rs b/src/lib.rs
index cf812d0..ebb37ce 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,9 +1,9 @@
-#![feature(type_ascription)]
-#![feature(hash_set_entry)]
+#![feature(type_ascription, hash_set_entry, type_name_of_val)]
+#![warn(rust_2018_idioms)]
#[macro_use]
extern crate log;
pub mod db;
pub mod server;
-pub mod utils;
+pub mod utils; \ No newline at end of file
diff --git a/src/main.rs b/src/main.rs
index 92a35d0..35c839d 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,32 +1,18 @@
-#[macro_use]
-extern crate log;
+use whirl::server::auto::server::AutoServer;
+use std::error::Error;
+use whirl::server::room::server::RoomServer;
-use mio::net::TcpListener;
-use std::thread;
-use whirl::server;
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn Error>> {
+ dotenv::dotenv().ok();
+ pretty_env_logger::init();
-fn main() {
- dotenv::dotenv().ok(); // Adds ability to use environment variables
- pretty_env_logger::init(); // Adds pretty logging
-
- let mut threads = vec![];
- threads.push(thread::spawn(move || {
- debug!("spawned AutoServer thread");
- server::auto::server::AutoServer::new(
- TcpListener::bind(
- &"0.0.0.0:6650".parse().unwrap()
- ).unwrap()
- );
- }));
- threads.push(thread::spawn(move || {
- debug!("spawned RoomServer thread");
- server::room::server::RoomServer::new(
- TcpListener::bind(
- &"0.0.0.0:5673".parse().unwrap()
- ).unwrap()
- );
- }));
- for thread in threads {
- let _ = thread.join(); // Handle Result by dissolving it.
+ loop {
+ tokio::spawn(async move {
+ let _ = AutoServer::new("0.0.0.0:6650").await;
+ });
+ tokio::spawn(async move {
+ let _ = RoomServer::new("0.0.0.0:5673").await;
+ });
}
-}
+} \ No newline at end of file
diff --git a/src/server/auto/cmd/mod.rs b/src/server/auto/cmd/mod.rs
index 5430662..d2345c6 100644
--- a/src/server/auto/cmd/mod.rs
+++ b/src/server/auto/cmd/mod.rs
@@ -1,2 +1,3 @@
pub mod property;
pub mod room;
+pub mod session;
diff --git a/src/server/auto/cmd/property.rs b/src/server/auto/cmd/property.rs
index 01d4f43..b8d0085 100644
--- a/src/server/auto/cmd/property.rs
+++ b/src/server/auto/cmd/property.rs
@@ -1,38 +1,4 @@
-// struct NetToProperty {
-// _prop_id: i32,
-// _flags: i32,
-// _access: i32,
-// _string_value: String,
-// _bin_value: Vec<i32>,
-// }
-// impl NetToProperty {
-// fn parse_net_data() -> Self {
-// NetToProperty {
-// _prop_id: 0,
-// _flags: 0,
-// _access: 0,
-// _string_value: "".to_string(),
-// _bin_value: vec![]
-// }
-// }
-// }
-
-// TODO: Decode received data and send back a valid response.
-pub fn create_property_update_command() -> [u8; 147] { // Vec<u8>
- // let mut property = Vec::with_capacity(2);
- // property.push(0x01); // ?
- // property.push(0x10); // Command type
- //
- // // Meaningful Data
- // property.push(); // Property ID
- // property.push(); // Flags
- // property.push(); // Access
- //
- // // Insert data length as first byte.
- // property.insert(0, property.len() as u8 + 1); // ^
- //
- // property // Return created array
-
+pub fn create_property_update_command() -> [u8; 147] {
[
0x93, 0xFF, 0x10, 0x1B, 0x80, 0x01, 0x0C, 0x77, 0x6F,
0x72, 0x6C, 0x64, 0x73, 0x33, 0x64, 0x2E, 0x63, 0x6F,
diff --git a/src/server/auto/cmd/room.rs b/src/server/auto/cmd/room.rs
index ff839f3..78498fe 100644
--- a/src/server/auto/cmd/room.rs
+++ b/src/server/auto/cmd/room.rs
@@ -18,6 +18,13 @@ pub fn create_room_id_redirect_command(room_name: &str, room_id: usize) -> Vec<u
room_id_redirect.push(0x00);
// Port
+ // for byte in convert_u16_to_two_u8s_be(0x1629).iter() {
+ // room_id_redirect.push(*byte);
+ // }
+ // Port
+ // for byte in convert_u16_to_two_u8s_be(5673_i32 as u16).iter() {
+ // room_id_redirect.push(*byte);
+ // }
room_id_redirect.push(0x16);
room_id_redirect.push(0x29);
diff --git a/src/server/auto/cmd/session.rs b/src/server/auto/cmd/session.rs
new file mode 100644
index 0000000..8e8fbb7
--- /dev/null
+++ b/src/server/auto/cmd/session.rs
@@ -0,0 +1,18 @@
+use crate::server::cmd::session::SessionInitializationCommand;
+use bytes::BytesMut;
+use std::str::from_utf8;
+
+pub fn parse_session_initialization_command(
+ command: BytesMut
+) -> SessionInitializationCommand {
+ SessionInitializationCommand {
+ // protocol: command.get(4..4 + command.get(4)).unwrap().to_owned() as usize,
+ // client: "".to_string(),
+ username: from_utf8(
+ command.get(
+ 21..(20 + command.get(20).unwrap().to_owned() as usize + 1)
+ ).unwrap()
+ ).unwrap().to_string(),
+ // password: "".to_string()
+ }
+}
diff --git a/src/server/auto/constants.rs b/src/server/auto/constants.rs
deleted file mode 100644
index 10c9439..0000000
--- a/src/server/auto/constants.rs
+++ /dev/null
@@ -1,13 +0,0 @@
-use std::collections::HashMap;
-
-pub static ROOM_IDS: phf::Map<&'static str, &'static i32> = phf::phf_map! {
-"ChatElevator<dimension-1>" => &4,
-"IconViewRoom1Enter<dimension-1>" => &3,
-"ChatHall<dimension-1>" => &9,
-"IconViewRoom1<dimension-1>" => &12,
-"IconViewRoom1g<dimension-1>" => &27230,
-"ReceptionView1<dimension-1>" => &6,
-"ReceptionView2<dimension-1>" => &5,
-"Reception<dimension-1>" => &1,
-"staircase1<dimension-1>" => &11,
-};
diff --git a/src/server/auto/mod.rs b/src/server/auto/mod.rs
index 741d763..0a6f342 100644
--- a/src/server/auto/mod.rs
+++ b/src/server/auto/mod.rs
@@ -1,3 +1,2 @@
-pub mod cmd;
-// pub mod constants;
pub mod server;
+pub mod cmd;
diff --git a/src/server/auto/server.rs b/src/server/auto/server.rs
index 381852f..c05ed5e 100644
--- a/src/server/auto/server.rs
+++ b/src/server/auto/server.rs
@@ -1,266 +1,250 @@
-use mio::net::{TcpListener, TcpStream};
-use std::io::{Read, Write};
-use mio::{Poll, Token, Ready, PollOpt, Events};
-use std::collections::{HashMap, HashSet};
+use std::error::Error;
+use tokio::net::{TcpListener, TcpStream};
+use tokio::io::AsyncWriteExt;
+use crate::server::auto::cmd::property::{
+ create_property_update_command,
+ create_property_request_command
+};
+use tokio_util::codec::{BytesCodec, Decoder};
+use tokio_stream::StreamExt;
+use bytes::BytesMut;
+use crate::server::cmd::text::{create_text_command_with_action, create_text_command};
use std::str::from_utf8;
use crate::server::cmd::buddy_list::create_buddy_list_notify_command;
-use crate::server::cmd::text::{create_text_command, create_text_command_with_action};
-use super::cmd::property::{create_property_update_command, create_property_request_command};
-use super::cmd::room::create_room_id_redirect_command;
-use rand::Rng;
-use crate::server::utils::broadcast_to_all_clients;
-
-// pub struct ClientSocket {
-// tcp_stream: TcpStream,
-// username: String,
-// }
+use crate::server::auto::cmd::room::create_room_id_redirect_command;
+use std::sync::Arc;
+use tokio::sync::Mutex;
+use crate::server::shared::Shared;
+use crate::server::peer::Peer;
+use std::net::SocketAddr;
+use crate::server::auto::cmd::session::parse_session_initialization_command;
pub struct AutoServer;
impl AutoServer {
- pub fn new(listener: TcpListener) {
- let poll = Poll::new().unwrap();
- poll.register(
- &listener,
- Token(0),
- Ready::readable(),
- PollOpt::edge()
- ).unwrap();
-
- let mut counter: usize = 0;
- let mut sockets: HashMap<Token, TcpStream> = HashMap::new();
- let mut requests: HashMap<Token, Vec<u8>> = HashMap::new();
- let mut buffer = [0 as u8; 1024];
- // let mut room_ids: HashMap<&str, i32> = HashMap::new();
- let mut room_ids: HashSet<String> = HashSet::new();
+ pub async fn new(addr: &'static str) -> Result<(), Box<dyn Error>> {
+ let listener = TcpListener::bind(addr).await?;
+ debug!("AutoServer now listening on {}", listener.local_addr().unwrap());
+ let state = Arc::new(Mutex::new(Shared::new()));
+ let mut counter = 0;
- let mut events = Events::with_capacity(1024);
loop {
- poll.poll(&mut events, None).unwrap();
- for event in &events {
- match event.token() {
- Token(0) => {
- loop {
- match listener.accept() {
- Ok((socket, address)) => {
- counter += 1;
- let token = Token(counter);
-
- poll.register(
- &socket,
- token,
- Ready::readable(),
- PollOpt::edge()
- ).unwrap();
+ let (stream, address) = listener.accept().await?;
+ counter += 1;
+ let state = Arc::clone(&state);
+
+ tokio::spawn(async move {
+ if let Err(e) = AutoServer::handle(
+ state,
+ stream,
+ address,
+ counter
+ ).await {
+ error!("an error occurred: {}", e);
+ }
+ });
+ }
+ }
- info!(
- "registered client with ip '{}' as token '{}'",
- address.ip(), token.0
- );
+ pub async fn handle(
+ state: Arc<Mutex<Shared>>,
+ stream: TcpStream,
+ address: SocketAddr,
+ count: usize,
+ ) -> Result<(), Box<dyn Error>> {
+ let bytes = BytesCodec::new().framed(stream);
+ let mut peer = Peer::new(state.clone(), bytes, count.to_string()).await?;
+ debug!("registered peer with address '{}' as '{}'", address, count);
+ let mut room_ids: Vec<String> = Vec::new();
+ let mut username: String = String::new();
+
+ // while let Some(msg) = peer.bytes.next().await {
+ // match msg {
+ // Ok(bytes) => match bytes.get(2).unwrap() {
+ // 10 => { // PROPREQ
+ // debug!("received property request command from client");
+ // peer.bytes.get_mut()
+ // .write_all(&create_property_update_command()).await?;
+ //
+ // debug!("sent property update command to client");
+ // }
+ // 6 => { // SESSINIT
+ // username = parse_session_initialization_command(bytes).username;
+ // debug!(
+ // "received session initialization command from client: {}",
+ // username
+ // );
+ // peer.bytes.get_mut()
+ // .write_all(&create_property_request_command()).await?;
+ // debug!("sent session initialization command to client");
+ // }
+ // 15 => { // PROPSET
+ // debug!("received property set command from client");
+ // peer.bytes.get_mut()
+ // .write_all(&create_text_command_with_action(
+ // "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")?
+ // )).await?;
+ // debug!("sent worldsmaster greeting to client");
+ // }
+ // 29 => { // BUDDYLISTUPDATE
+ // let received_buddy = from_utf8(
+ // bytes.get(4..bytes.get(0).unwrap().to_owned() as usize - 1).unwrap()
+ // ).unwrap();
+ // debug!(
+ // "received buddy list update command from client: {}",
+ // received_buddy
+ // );
+ // peer.bytes.get_mut()
+ // .write_all(&create_buddy_list_notify_command(received_buddy))
+ // .await?;
+ // debug!("sent buddy list notify command to client");
+ // }
+ // 20 => { // ROOMIDRQ
+ // let room_name = from_utf8(
+ // bytes.get(4..bytes.get(0).unwrap().to_owned() as usize).unwrap()
+ // ).unwrap();
+ // debug!("received room id request command from client: {}", room_name);
+ // let room_id;
+ // if !room_ids.contains(&room_name.to_string()) {
+ // room_ids.push(room_name.to_string());
+ // room_id = room_ids.iter()
+ // .position(|i| i == &room_name.to_string())
+ // .unwrap();
+ // trace!("inserted room '{}' as '{}'", room_name, room_id);
+ // } else {
+ // let position = room_ids.iter()
+ // .position(|i| i == &room_name.to_string())
+ // .unwrap();
+ // trace!("found room '{}' as '{}'", room_name, position);
+ // room_id = position;
+ // }
+ // trace!("room name: {}, room id: {}", room_name, room_id);
+ // trace!("{:?}", room_ids);
+ // peer.bytes.get_mut()
+ // .write_all(&create_room_id_redirect_command(room_name, room_id))
+ // .await?;
+ // debug!("sent redirect id command to client");
+ // }
+ // 14 => {
+ // let text = from_utf8(
+ // bytes.get(6..bytes.get(0).unwrap().to_owned() as usize).unwrap()
+ // ).unwrap();
+ // debug!("received text command from client: {}", text);
+ // let mut state = state.lock().await;
+ // state.broadcast(&create_text_command(&username, text)).await;
+ // debug!("broadcasted text command from client");
+ // }
+ // 7 => { // SESSEXIT
+ // debug!("received session exit command from client")
+ // }
+ // _ => (),
+ // }
+ // Err(err) => error!("stream closed with error: {:?}", err),
+ // }
+ // }
- sockets.insert(token, socket);
- requests.insert(token, Vec::with_capacity(192));
- }
- Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock =>
- break,
- Err(e) => {
- error!("unexpected error: {}", e);
- poll.deregister(sockets.get(&Token(counter)).unwrap()).unwrap();
- break;
- }
+ loop {
+ tokio::select! {
+ Some(msg) = peer.rx.recv() => {
+ // debug!("received bytes from peer: {:?}", &msg);
+ peer.bytes.get_mut().write_all(&msg).await?;
+ }
+ result = peer.bytes.next() => match result {
+ Some(Ok(msg)) => {
+ let msg: BytesMut = msg;
+ match msg.get(2).unwrap() {
+ 10 => { // PROPREQ
+ debug!("received property request command from client");
+ peer.bytes.get_mut()
+ .write_all(&create_property_update_command()).await?;
+
+ debug!("sent property update command to client");
}
- }
- },
- token if event.readiness().is_readable() => {
- loop {
- let read = sockets.get_mut(&token).unwrap().read(&mut buffer);
- match read {
- Ok(0) => { sockets.remove(&token); break; }
- Ok(n) => {
- let req = requests.get_mut(&token).unwrap();
- for b in &buffer[0..n] {
- req.push(*b);
- }
-
- // First byte means how long data section of the packet is.
- // Second byte is to be determined.
- // Third byte is the request type.
-
- // Match packet type by descriptor; **third** byte.
- match &buffer.get(2).unwrap() { // Third byte is 2 because Rust is zero-indexed.
- // PROPREQ
- 10 => {
- debug!(
- "received property request command from client with token '{}'",
- token.0
- );
- sockets.get_mut(&token).unwrap()
- .write_all(&create_property_update_command()).unwrap();
- debug!(
- "sent property update command to client with token '{}'",
- token.0
- );
- }
- // SESSINIT
- 6 => {
- debug!(
- "received session initialization command from client with token '{}'",
- token.0
- );
- sockets.get_mut(&token).unwrap()
- .write_all(&create_property_request_command()).unwrap();
- debug!(
- "sent session initialization command to client with token '{}'",
- token.0
- );
- }
- // PROPSET
- 15 => {
- debug!(
- "received property set command from client with token '{}'",
- token.0
- );
- sockets.get_mut(&token).unwrap()
- .write_all(&create_text_command_with_action(
- "WORLDSMASTER",
- "Welcome to Whirlsplash!"
- )).unwrap();
- debug!(
- "sent worldsmaster greeting to client with token '{}'",
- token.0
- );
- },
- // BUDDYLISTUPDATE
- 29 => {
- debug!(
- "received buddy list update command from client with token '{}'",
- token.0
- );
-
- let received_buddy = from_utf8(
- &buffer[4..*&buffer.get(0).unwrap().to_owned() as usize - 1]
- ).unwrap();
- debug!("received buddy: {}", received_buddy);
-
- sockets.get_mut(&token).unwrap()
- .write_all(&create_buddy_list_notify_command(received_buddy))
- .unwrap();
- debug!(
- "sent buddy notify update command to client with token '{}'",
- token.0
- );
- }
- // ROOMIDRQ
- 20 => {
- debug!(
- "received room id request command from client with token '{}'",
- token.0
- );
-
- let room_name = from_utf8(
- &buffer[4..*&buffer.get(0).unwrap().to_owned() as usize]
- ).unwrap();
- let mut room_id = 0;
- if !room_ids.contains(room_name) {
- room_ids.insert(room_name.to_string());
- room_id = room_ids.iter()
- .position(|i| i == room_name)
- .unwrap();
- trace!("inserted room '{}' as '{}'", room_name, room_id);
- } else {
- let pos = room_ids
- .iter()
- .position(|i| i == room_name)
- .unwrap();
- trace!("found room '{}' as '{}'", room_name, pos);
- room_id = pos;
- }
- trace!("room name: {}, room id: {}", room_name, room_id);
- trace!("{:?}", room_ids);
-
- // Passing `0` as `room_id` parameter as currently there is
- // no way to find out a room's ID based on it's name.
- sockets.get_mut(&token).unwrap()
- .write_all(&create_room_id_redirect_command(room_name, room_id))
- .unwrap();
- debug!("sent redirect id command to client with token '{}'", token.0)
- }
- // TEXT
- 14 => {
- debug!("received text command from client with token '{}'", token.0);
-
- // TODO: Make this into a command!
- let message = from_utf8(
- &buffer[6..*&buffer.get(0).unwrap().to_owned() as usize]
- ).unwrap();
- trace!("message: {}", message);
-
- // Using User as a placeholder. Ideally, this would print out the username of
- // the one who sent it.
- broadcast_to_all_clients(
- &sockets,
- &create_text_command(
- // Random integer is added to the end of "User", just a development
- // proof-of-concept. Since at this stage usernames aren't exactly kept,
- // we can identify message senders as their connection token; `token.0`.
- &format!("User{}", rand::thread_rng().gen_range(1..150).to_string()),
- message
- )
- );
- debug!(
- "broadcasted text command from client with token '{}'",
- token.0
- );
- }
- // SESSEXIT
- 7 => {
- debug!(
- "received session termination command from client with token '{}'",
- token.0
- );
- poll.deregister(sockets.get(&token).unwrap()).unwrap();
- debug!("de-registered client with token '{}'", token.0);
- }
- // Anything else, do nothing.
- _ => ()
- }
+ 6 => { // SESSINIT
+ username = parse_session_initialization_command(msg).username;
+ debug!(
+ "received session initialization command from client: {}",
+ username
+ );
+ peer.bytes.get_mut()
+ .write_all(&create_property_request_command()).await?;
+ debug!("sent session initialization command to client");
+ }
+ 15 => { // PROPSET
+ debug!("received property set command from client");
+ peer.bytes.get_mut()
+ .write_all(&create_text_command_with_action(
+ "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")?
+ )).await?;
+ debug!("sent worldsmaster greeting to client");
+ }
+ 29 => { // BUDDYLISTUPDATE
+ let received_buddy = from_utf8(
+ msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap()
+ ).unwrap();
+ debug!(
+ "received buddy list update command from client: {}",
+ received_buddy
+ );
+ peer.bytes.get_mut()
+ .write_all(&create_buddy_list_notify_command(received_buddy))
+ .await?;
+ debug!("sent buddy list notify command to client");
+ }
+ 20 => { // ROOMIDRQ
+ let room_name = from_utf8(
+ msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap()
+ ).unwrap();
+ debug!("received room id request command from client: {}", room_name);
+ let room_id;
+ if !room_ids.contains(&room_name.to_string()) {
+ room_ids.push(room_name.to_string());
+ room_id = room_ids.iter()
+ .position(|i| i == &room_name.to_string())
+ .unwrap();
+ trace!("inserted room '{}' as '{}'", room_name, room_id);
+ } else {
+ let position = room_ids.iter()
+ .position(|i| i == &room_name.to_string())
+ .unwrap();
+ trace!("found room '{}' as '{}'", room_name, position);
+ room_id = position;
}
- Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock =>
- break,
- Err(e) => { error!("unexpected error: {}", e); break; }
+ trace!("room name: {}, room id: {}", room_name, room_id);
+ trace!("{:?}", room_ids);
+ peer.bytes.get_mut()
+ .write_all(&create_room_id_redirect_command(room_name, room_id))
+ .await?;
+ debug!("sent redirect id command to client");
+ }
+ 14 => {
+ let text = from_utf8(
+ msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap()
+ ).unwrap();
+ debug!("received text command from client: {}", text);
+ let mut state = state.lock().await;
+ state.broadcast(&create_text_command(&username, text)).await;
+ debug!("broadcasted text command from client");
}
+ 7 => { // SESSEXIT
+ debug!("received session exit command from client")
+ }
+ _ => (),
}
-
- // Unimplemented
- // let ready = requests.get(&token).unwrap()
- // .windows(4)
- // .find(|window| is_double_crnl(*window))
- // .is_some();
- //
- // if ready {
- // let socket = sockets.get(&token).unwrap();
- // poll.reregister(
- // socket,
- // token,
- // Ready::writable(),
- // PollOpt::edge() | PollOpt::oneshot()).unwrap();
- // }
- },
- // Unimplemented
- // token if event.readiness().is_writable() => {
- // println!("writeable");
- // requests.get_mut(&token).unwrap().clear();
- // sockets.get_mut(&token).unwrap().write_all("test".as_bytes()).unwrap();
- //
- // // Re-use existing connection ("keep-alive") - switch back to reading
- // poll.reregister(
- // sockets.get(&token).unwrap(),
- // token,
- // Ready::readable(),
- // PollOpt::edge()).unwrap();
- // },
- _ => ()
+ }
+ Some(Err(e)) => {
+ error!("error while processing messages: {}", e); break;
+ }
+ None => break,
}
}
}
+
+ // De-register client
+ {
+ state.lock().await.peers.remove(&count.to_string());
+ debug!("removed peer: {}", count)
+ }
+
+ Ok(())
}
}
diff --git a/src/server/cmd/buddy_list.rs b/src/server/cmd/buddy_list.rs
index 1423808..cbddc19 100644
--- a/src/server/cmd/buddy_list.rs
+++ b/src/server/cmd/buddy_list.rs
@@ -1,5 +1,3 @@
-/// In the future, this will take a `Vec` of buddies and dynamically
-/// create a response packet based on the amount of buddies given.
pub fn create_buddy_list_notify_command(buddy: &str) -> Vec<u8> {
let mut buddy_list_notify = Vec::with_capacity(5 + buddy.len());
buddy_list_notify.push(0x01); // ?
diff --git a/src/server/cmd/mod.rs b/src/server/cmd/mod.rs
index c211484..196a7a5 100644
--- a/src/server/cmd/mod.rs
+++ b/src/server/cmd/mod.rs
@@ -1,3 +1,3 @@
pub mod buddy_list;
-// pub mod property;
+pub mod session;
pub mod text;
diff --git a/src/server/cmd/session.rs b/src/server/cmd/session.rs
new file mode 100644
index 0000000..9d48bcc
--- /dev/null
+++ b/src/server/cmd/session.rs
@@ -0,0 +1,6 @@
+pub struct SessionInitializationCommand {
+ // pub protocol: usize,
+ // pub client: String,
+ pub username: String,
+ // pub password: String,
+}
diff --git a/src/server/mod.rs b/src/server/mod.rs
index cb241f2..3698d84 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -1,4 +1,11 @@
+use tokio::sync::mpsc;
+use bytes::BytesMut;
+
pub mod auto;
-pub mod cmd;
+mod cmd;
pub mod room;
-pub mod utils;
+mod shared;
+mod peer;
+
+type Tx = mpsc::UnboundedSender<BytesMut>;
+type Rx = mpsc::UnboundedReceiver<BytesMut>;
diff --git a/src/server/peer.rs b/src/server/peer.rs
new file mode 100644
index 0000000..e92a3db
--- /dev/null
+++ b/src/server/peer.rs
@@ -0,0 +1,26 @@
+use tokio_util::codec::{Framed, BytesCodec};
+use tokio::net::TcpStream;
+use crate::server::Rx;
+use std::sync::Arc;
+use tokio::sync::Mutex;
+use crate::server::shared::Shared;
+use tokio::sync::mpsc;
+
+pub struct Peer {
+ pub bytes: Framed<TcpStream, BytesCodec>,
+ // pub(crate) rx: Rx,
+ pub rx: Rx,
+}
+impl Peer {
+ pub async fn new(
+ state: Arc<Mutex<Shared>>,
+ bytes: Framed<TcpStream, BytesCodec>,
+ username: String,
+ ) -> std::io::Result<Peer> {
+ // let address = bytes.get_ref().peer_addr()?;
+ let (tx, rx) = mpsc::unbounded_channel();
+ // state.lock().await.peers.insert(address, tx);
+ state.lock().await.peers.insert(username, tx);
+ Ok(Peer { bytes, rx })
+ }
+}
diff --git a/src/server/room/cmd/mod.rs b/src/server/room/cmd/mod.rs
index f348cd0..1d123c0 100644
--- a/src/server/room/cmd/mod.rs
+++ b/src/server/room/cmd/mod.rs
@@ -1 +1,2 @@
pub mod property;
+pub mod session;
diff --git a/src/server/room/cmd/session.rs b/src/server/room/cmd/session.rs
new file mode 100644
index 0000000..f04c078
--- /dev/null
+++ b/src/server/room/cmd/session.rs
@@ -0,0 +1,18 @@
+use crate::server::cmd::session::SessionInitializationCommand;
+use bytes::BytesMut;
+use std::str::from_utf8;
+
+pub fn parse_session_initialization_command(
+ command: BytesMut
+) -> SessionInitializationCommand {
+ SessionInitializationCommand {
+ // protocol: command.get(4..4 + command.get(4)).unwrap().to_owned() as usize,
+ // client: "".to_string(),
+ username: from_utf8(
+ command.get(
+ 25..(24 + command.get(24).unwrap().to_owned() as usize + 1)
+ ).unwrap()
+ ).unwrap().to_string(),
+ // password: "".to_string()
+ }
+}
diff --git a/src/server/room/mod.rs b/src/server/room/mod.rs
index f581dfe..24606ea 100644
--- a/src/server/room/mod.rs
+++ b/src/server/room/mod.rs
@@ -1,3 +1,2 @@
pub mod cmd;
pub mod server;
-mod structures;
diff --git a/src/server/room/server.rs b/src/server/room/server.rs
index c005e40..8781868 100644
--- a/src/server/room/server.rs
+++ b/src/server/room/server.rs
@@ -1,222 +1,167 @@
-use mio::net::{TcpListener, TcpStream};
-use std::io::{Read, Write};
-use mio::{Poll, Token, Ready, PollOpt, Events};
-use std::collections::HashMap;
+use std::error::Error;
+use tokio::net::{TcpListener, TcpStream};
+use tokio::io::AsyncWriteExt;
+use crate::server::room::cmd::property::{
+ create_property_update_command,
+ create_property_request_command
+};
+use tokio_util::codec::{BytesCodec, Decoder};
+use tokio_stream::StreamExt;
+use bytes::BytesMut;
+use crate::server::cmd::text::{create_text_command_with_action, create_text_command};
use std::str::from_utf8;
use crate::server::cmd::buddy_list::create_buddy_list_notify_command;
-use crate::server::cmd::text::create_text_command;
-// use crate::cmd::property::{create_property_update_command, create_property_request_command};
-use crate::server::room::cmd::property::{create_property_update_command, create_property_request_command};
-use rand::Rng;
-use crate::server::utils::broadcast_to_all_clients;
+use crate::server::auto::cmd::room::create_room_id_redirect_command;
+use std::sync::Arc;
+use tokio::sync::Mutex;
+use crate::server::shared::Shared;
+use crate::server::peer::Peer;
+use std::net::SocketAddr;
+use crate::server::room::cmd::session::parse_session_initialization_command;
pub struct RoomServer;
impl RoomServer {
- pub fn new(listener: TcpListener) {
- let poll = Poll::new().unwrap();
- poll.register(
- &listener,
- Token(0),
- Ready::readable(),
- PollOpt::edge()
- ).unwrap();
+ pub async fn new(addr: &'static str) -> Result<(), Box<dyn Error>> {
+ let listener = TcpListener::bind(addr).await?;
+ debug!("RoomServer now listening on {}", listener.local_addr().unwrap());
+ let state = Arc::new(Mutex::new(Shared::new()));
+ let mut counter = 0;
- let mut counter: usize = 0;
- let mut sockets: HashMap<Token, TcpStream> = HashMap::new();
- let mut requests: HashMap<Token, Vec<u8>> = HashMap::new();
- let mut buffer = [0 as u8; 1024];
-
- let mut events = Events::with_capacity(1024);
loop {
- poll.poll(&mut events, None).unwrap();
- for event in &events {
- match event.token() {
- Token(0) => {
- loop {
- match listener.accept() {
- Ok((socket, address)) => {
- counter += 1;
- let token = Token(counter);
-
- poll.register(
- &socket,
- token,
- Ready::readable(),
- PollOpt::edge()
- ).unwrap();
-
- info!(
- "registered client with ip '{}' as token '{}'",
- address.ip(), token.0
- );
-
- sockets.insert(token, socket);
- requests.insert(token, Vec::with_capacity(192));
- }
- Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock =>
- break,
- Err(e) => {
- error!("unexpected error: {}", e);
- poll.deregister(sockets.get(&Token(counter)).unwrap()).unwrap();
- break;
- }
- }
- }
- },
- token if event.readiness().is_readable() => {
- loop {
- let read = sockets.get_mut(&token).unwrap().read(&mut buffer);
- match read {
- Ok(0) => { sockets.remove(&token); break; }
- Ok(n) => {
- let req = requests.get_mut(&token).unwrap();
- for b in &buffer[0..n] {
- req.push(*b);
- }
+ let (stream, address) = listener.accept().await?;
+ counter += 1;
+ let state = Arc::clone(&state);
- // First byte means how long data section of the packet is.
- // Second byte is to be determined.
- // Third byte is the request type.
+ tokio::spawn(async move {
+ if let Err(e) = RoomServer::handle(
+ state,
+ stream,
+ address,
+ counter
+ ).await {
+ error!("an error occurred: {}", e);
+ }
+ });
+ }
+ }
- // Match packet type by descriptor; **third** byte.
- match &buffer.get(2).unwrap() { // Third byte is 2 because Rust is zero-indexed.
- // PROPREQ
- 10 => {
- debug!(
- "received property request command from client with token '{}'",
- token.0
- );
- sockets.get_mut(&token).unwrap()
- .write_all(&create_property_update_command()).unwrap();
- debug!(
- "sent property update from client with token '{}'",
- token.0
- );
- }
- // SESSINIT
- 6 => {
- debug!(
- "received session initialization command from client with token '{}'",
- token.0
- );
- sockets.get_mut(&token).unwrap()
- .write_all(&create_property_request_command()).unwrap();
- debug!(
- "sent session initialization command from client with token '{}'",
- token.0
- );
- }
- // PROPSET
- 15 => debug!(
- "received property set command from client with token '{}'",
- token.0
- ),
- // BUDDYLISTUPDATE
- 29 => {
- debug!(
- "received buddy list update command from client with token '{}'",
- token.0
- );
- sockets.get_mut(&token).unwrap()
- .write_all(&create_buddy_list_notify_command("Wirlaburla"))
- .unwrap();
- debug!(
- "sent buddy notify update command from client with token '{}'",
- token.0
- );
- }
- // ROOMIDRQ
- 20 => debug!(
- "received room id request command from client with token '{}'",
- token.0
- ),
- // TEXT
- 14 => {
- debug!(
- "received text command from client with token '{}'",
- token.0
- );
+ pub async fn handle(
+ state: Arc<Mutex<Shared>>,
+ stream: TcpStream,
+ address: SocketAddr,
+ count: usize,
+ ) -> Result<(), Box<dyn Error>> {
+ let bytes = BytesCodec::new().framed(stream);
+ let mut peer = Peer::new(state.clone(), bytes, count.to_string()).await?;
+ debug!("registered peer with address '{}' as '{}'", address, count);
+ let mut room_ids: Vec<String> = Vec::new();
+ let mut username: String = String::new();
- // TODO: Make this into a command!
- let message = from_utf8(
- &buffer[6..*&buffer.get(0).unwrap().to_owned() as usize]
- ).unwrap();
- trace!("message: {}", message);
+ loop {
+ tokio::select! {
+ Some(msg) = peer.rx.recv() => {
+ // debug!("received bytes from peer: {:?}", &msg);
+ peer.bytes.get_mut().write_all(&msg).await?;
+ }
+ result = peer.bytes.next() => match result {
+ Some(Ok(msg)) => {
+ // let mut state = state.lock().await;
+ // state.broadcast("", &msg).await;
+ let msg: BytesMut = msg;
+ match msg.get(2).unwrap() {
+ 10 => { // PROPREQ
+ debug!("received property request command from client");
+ peer.bytes.get_mut()
+ .write_all(&create_property_update_command()).await?;
- // Using User as a placeholder. Ideally, this would print out the username of
- // the one who sent it.
- broadcast_to_all_clients(
- &sockets,
- &create_text_command(
- // Random integer is added to the end of "User", just a development
- // proof-of-concept. Since at this stage usernames aren't exactly kept,
- // we can identify message senders as their connection token; `token.0`.
- &format!("User{}", rand::thread_rng().gen_range(1..150).to_string()),
- message
- )
- );
- debug!(
- "broadcasted text command from client with token '{}'",
- token.0
- );
- }
- // SESSEXIT
- 7 => {
- debug!(
- "received session termination command from client with token '{}'",
- token.0
- );
- poll.deregister(sockets.get(&token).unwrap()).unwrap();
- debug!("de-registered client with token '{}'", token.0);
- }
- // SUBSCRIB
- 16 => {
- debug!(
- "received room subscription command from client with token '{}'",
- token.0
- );
- }
- // Anything else, do nothing.
- _ => ()
- }
+ debug!("sent property update command to client");
+ }
+ 6 => { // SESSINIT
+ username = parse_session_initialization_command(msg).username;
+ debug!(
+ "received session initialization command from client: {}",
+ username
+ );
+ peer.bytes.get_mut()
+ .write_all(&create_property_request_command()).await?;
+ debug!("sent session initialization command to client");
+ }
+ 15 => { // PROPSET
+ debug!("received property set command from client");
+ peer.bytes.get_mut()
+ .write_all(&create_text_command_with_action(
+ "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")?
+ )).await?;
+ debug!("sent worldsmaster greeting to client");
+ }
+ 29 => { // BUDDYLISTUPDATE
+ let received_buddy = from_utf8(
+ msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap()
+ ).unwrap();
+ debug!(
+ "received buddy list update command from client: {}",
+ received_buddy
+ );
+ peer.bytes.get_mut()
+ .write_all(&create_buddy_list_notify_command(received_buddy))
+ .await?;
+ debug!("sent buddy list notify command to client");
+ }
+ 20 => { // ROOMIDRQ
+ let room_name = from_utf8(
+ msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap()
+ ).unwrap();
+ debug!("received room id request command from client: {}", room_name);
+ let room_id;
+ if !room_ids.contains(&room_name.to_string()) {
+ room_ids.push(room_name.to_string());
+ room_id = room_ids.iter()
+ .position(|i| i == &room_name.to_string())
+ .unwrap();
+ trace!("inserted room '{}' as '{}'", room_name, room_id);
+ } else {
+ let position = room_ids.iter()
+ .position(|i| i == &room_name.to_string())
+ .unwrap();
+ trace!("found room '{}' as '{}'", room_name, position);
+ room_id = position;
}
- Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock =>
- break,
- Err(e) => { error!("unexpected error: {}", e); break; }
+ trace!("room name: {}, room id: {}", room_name, room_id);
+ trace!("{:?}", room_ids);
+ peer.bytes.get_mut()
+ .write_all(&create_room_id_redirect_command(room_name, room_id))
+ .await?;
+ debug!("sent redirect id command to client");
+ }
+ 14 => {
+ let text = from_utf8(
+ msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap()
+ ).unwrap();
+ debug!("received text command from client: {}", text);
+ let mut state = state.lock().await;
+ state.broadcast(&create_text_command(&username, text)).await;
+ debug!("broadcasted text command from client");
}
+ 7 => { // SESSEXIT
+ debug!("received session exit command from client")
+ }
+ _ => (),
}
-
- // Unimplemented
- // let ready = requests.get(&token).unwrap()
- // .windows(4)
- // .find(|window| is_double_crnl(*window))
- // .is_some();
- //
- // if ready {
- // let socket = sockets.get(&token).unwrap();
- // poll.reregister(
- // socket,
- // token,
- // Ready::writable(),
- // PollOpt::edge() | PollOpt::oneshot()).unwrap();
- // }
- },
- // Unimplemented
- // token if event.readiness().is_writable() => {
- // println!("writeable");
- // requests.get_mut(&token).unwrap().clear();
- // sockets.get_mut(&token).unwrap().write_all("test".as_bytes()).unwrap();
- //
- // // Re-use existing connection ("keep-alive") - switch back to reading
- // poll.reregister(
- // sockets.get(&token).unwrap(),
- // token,
- // Ready::readable(),
- // PollOpt::edge()).unwrap();
- // },
- _ => ()
+ }
+ Some(Err(e)) => {
+ error!("error while processing messages: {}", e); break;
+ }
+ None => break,
}
}
}
+
+ // De-register client
+ {
+ state.lock().await.peers.remove(&count.to_string());
+ debug!("removed peer: {}", count)
+ }
+
+ Ok(())
}
}
diff --git a/src/server/shared.rs b/src/server/shared.rs
new file mode 100644
index 0000000..d7ff6c9
--- /dev/null
+++ b/src/server/shared.rs
@@ -0,0 +1,35 @@
+use std::collections::HashMap;
+// use std::net::SocketAddr;
+use bytes::BytesMut;
+use crate::server::Tx;
+
+pub struct Shared {
+ pub peers: HashMap<String, Tx>,
+}
+impl Shared {
+ pub fn new() -> Self {
+ Shared {
+ peers: HashMap::new(),
+ }
+ }
+
+ pub async fn broadcast(&mut self, /* sender: &str, */ message: &[u8]) {
+ // debug!("peer sent message: {:?}", message);
+ // debug!("peer count: {}", self.peers.len());
+ // debug!("peers: {:?}", self.peers);
+ for peer in self.peers.iter_mut() {
+ // debug!("peer: {:?}", peer);
+ // TODO:
+ // thread 'tokio-runtime-worker' panicked at 'called `Option::unwrap()` on a `None` value'
+ peer.1.send(BytesMut::from(message)).unwrap();
+ }
+ }
+
+ // pub async fn broadcast(&mut self, sender: SocketAddr, message: &str) {
+ // for peer in self.peers.iter_mut() {
+ // if *peer.0 != sender {
+ // let _ = peer.1.send(message.into());
+ // }
+ // }
+ // }
+}
diff --git a/src/server/room/structures.rs b/src/server/structures.rs
index 25fcf00..25fcf00 100644
--- a/src/server/room/structures.rs
+++ b/src/server/structures.rs
diff --git a/src/server/utils.rs b/src/server/utils.rs
deleted file mode 100644
index 000b788..0000000
--- a/src/server/utils.rs
+++ /dev/null
@@ -1,13 +0,0 @@
-use std::collections::HashMap;
-use mio::Token;
-use mio::net::TcpStream;
-use std::io::Write;
-
-pub fn broadcast_to_all_clients(
- sockets: &HashMap<Token, TcpStream>,
- message: &[u8]
-) -> () {
- for mut socket in sockets {
- socket.1.write_all(message).unwrap();
- }
-}
diff --git a/src/utils/db.rs b/src/utils/db.rs
index adc11d3..e6e1946 100644
--- a/src/utils/db.rs
+++ b/src/utils/db.rs
@@ -1,15 +1,16 @@
-use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
-use std::env;
+use sqlx::sqlite::SqlitePoolOptions;
+use std::error::Error;
+use sqlx::SqlitePool;
-pub async fn get_pool() -> Result<SqlitePool, Box<dyn std::error::Error>> {
+pub async fn get_pool() -> Result<SqlitePool, Box<dyn Error>> {
let pool = SqlitePoolOptions::new()
.max_connections(20)
- .connect(&env::var("DATABASE_URL")?)
+ .connect(&std::env::var("DATABASE_URL")?)
.await?;
debug!(
"connected to database at url '{}'",
- &env::var("DATABASE_URL")?
+ &std::env::var("DATABASE_URL")?
);
Ok(pool)
diff --git a/src/utils/mod.rs b/src/utils/mod.rs
index 66f6db0..4e23340 100644
--- a/src/utils/mod.rs
+++ b/src/utils/mod.rs
@@ -1,3 +1,2 @@
pub mod byte;
pub mod db;
-pub mod web;
diff --git a/src/utils/web.rs b/src/utils/web.rs
deleted file mode 100644
index a834648..0000000
--- a/src/utils/web.rs
+++ /dev/null
@@ -1,7 +0,0 @@
-pub fn _is_double_crnl(window: &[u8]) -> bool {
- window.len() >= 4
- && (window[0] == '\r' as u8)
- && (window[1] == '\n' as u8)
- && (window[2] == '\r' as u8)
- && (window[3] == '\n' as u8)
-}