diff options
| author | Fuwn <[email protected]> | 2021-04-23 18:27:51 -0700 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2021-04-23 18:27:51 -0700 |
| commit | c58057b38550b94be623526481a31c7fc346a929 (patch) | |
| tree | 7d851d0cf761fe4261dd5563f99e18d6c974d80b /src | |
| parent | Merge branch 'tokio-re' of https://github.com/Whirlsplash/whirl into tokio-re (diff) | |
| download | whirl-c58057b38550b94be623526481a31c7fc346a929.tar.xz whirl-c58057b38550b94be623526481a31c7fc346a929.zip | |
major: :star:
Diffstat (limited to 'src')
75 files changed, 3934 insertions, 836 deletions
@@ -1,38 +1,29 @@ -use structopt::clap::{App, AppSettings, SubCommand, Arg}; +use structopt::clap::{App, AppSettings, Arg, SubCommand}; pub fn cli<'b, 'a>() -> App<'a, 'b> { - App::new(env!("CARGO_PKG_NAME")) - .about(env!("CARGO_PKG_DESCRIPTION")) - .version(env!("CARGO_PKG_VERSION")) - .author(env!("CARGO_PKG_AUTHORS")) - .settings(&[ - AppSettings::SubcommandRequiredElseHelp, - ]) - .subcommands(vec![ - SubCommand::with_name("run") - .about("Start the WorldServer"), - SubCommand::with_name("config") - .setting(AppSettings::SubcommandRequiredElseHelp) - .subcommands(vec![ - SubCommand::with_name("show"), - ]), - SubCommand::with_name("completions") - .setting(AppSettings::SubcommandRequiredElseHelp) - .about("Generate shell completions") - .subcommands(vec![ - SubCommand::with_name("powershell"), - SubCommand::with_name("bash"), - SubCommand::with_name("elvish"), - SubCommand::with_name("zsh"), - SubCommand::with_name("fish"), - ]), - ]) - .args(&[ - Arg::with_name("debug") - .short("d") - .long("debug"), - Arg::with_name("trace") - .short("t") - .long("trace"), - ]) + App::new(env!("CARGO_PKG_NAME")) + .about(env!("CARGO_PKG_DESCRIPTION")) + .version(env!("CARGO_PKG_VERSION")) + .author(env!("CARGO_PKG_AUTHORS")) + .settings(&[AppSettings::SubcommandRequiredElseHelp]) + .subcommands(vec![ + SubCommand::with_name("run").about("Start the WorldServer"), + SubCommand::with_name("config") + .setting(AppSettings::SubcommandRequiredElseHelp) + .subcommands(vec![SubCommand::with_name("show")]), + SubCommand::with_name("completions") + .setting(AppSettings::SubcommandRequiredElseHelp) + .about("Generate shell completions") + .subcommands(vec![ + SubCommand::with_name("powershell"), + SubCommand::with_name("bash"), + SubCommand::with_name("elvish"), + SubCommand::with_name("zsh"), + SubCommand::with_name("fish"), + ]), + ]) + .args(&[ + Arg::with_name("debug").short("d").long("debug"), + Arg::with_name("trace").short("t").long("trace"), + ]) } diff --git a/src/config.rs b/src/config.rs index bddf6a4..7951ff5 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,28 +1,29 @@ -use serde_derive::{Serialize, Deserialize}; - +use serde_derive::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize)] pub struct Config { - pub worldsmaster_greeting: String, - pub auto_server_port: i32, - pub room_server_port: i32, + pub worldsmaster_greeting: String, + pub worldsmaster_username: String, + pub distributor_port: i32, + pub hub_port: i32, } impl Default for Config { - fn default() -> Self { - Config { - worldsmaster_greeting: "Welcome to Whirlsplash!".to_string(), - auto_server_port: 6650, - room_server_port: 5673, - } - } + fn default() -> Self { + Config { + worldsmaster_greeting: "Welcome to Whirlsplash!".to_string(), + worldsmaster_username: "WORLDSMASTER".to_string(), + distributor_port: 6650, + hub_port: 5673, + } + } } pub fn get_config() -> Result<Config, confy::ConfyError> { - let config: Config = confy::load_path("./whirl.toml").unwrap(); + let config: Config = confy::load_path("./.whirlrc.toml").unwrap(); - Ok(config) + Ok(config) } pub fn store_config(config: Config) -> Result<(), confy::ConfyError> { - confy::store_path("./whirl.toml", config) + confy::store_path("./.whirlrc.toml", config) } diff --git a/src/db/tables.rs b/src/db/tables.rs index eb444d9..3d1e00c 100644 --- a/src/db/tables.rs +++ b/src/db/tables.rs @@ -1,25 +1,30 @@ -//! Much of the documentation that you will see within this module is quoted from -//! http://dev.worlds.net/private/GammaDocs/WorldServer.html#RoomServer. +//! Much of the documentation that you will see within this module is quoted +//! from http://dev.worlds.net/private/GammaDocs/WorldServer.html#RoomServer. -/// The SerialNumbers table contains a record for every valid serial number. It is initialized with -/// a set of serial numbers by a WorldServer administrator. A user will register by providing a -/// valid serial number that matches an unused table entry. The serial number must be distributed -/// with the client software or in some other way, because it will be required for all -/// registrations from the client. The serial number record contains the following information: +/// The SerialNumbers table contains a record for every valid serial number. It +/// is initialized with a set of serial numbers by a WorldServer administrator. +/// A user will register by providing a valid serial number that matches an +/// unused table entry. The serial number must be distributed with the client +/// software or in some other way, because it will be required for all +/// registrations from the client. The serial number record contains the +/// following information: /// /// -/// The SerialNumbers table will be initialized with a set of serial numbers by a WorldServer -/// administrator. The serialStatus column should be initialized to SERIAL_FREE at this time -/// (this will be done for you when you create serial numbers). A user will then register via the -/// client by providing a valid serial number. The UserServer will set serialStatus to -/// SERIAL_USED upon successful user registration with a given serial number, and enter their -/// username in the userName field. +/// The SerialNumbers table will be initialized with a set of serial numbers by +/// a WorldServer administrator. The serialStatus column should be initialized +/// to SERIAL_FREE at this time (this will be done for you when you create +/// serial numbers). A user will then register via the client by providing a +/// valid serial number. The UserServer will set serialStatus to SERIAL_USED +/// upon successful user registration with a given serial number, and enter +/// their username in the userName field. /// -/// The included program SerialGen can generate a list of serial numbers based on a seed and tagged -/// by prefix. The program runs in the C-shell and produces three output files: an SQL script that -/// you can use to directly modify the SerialNumbers table, a master list as a text table for -/// administration purposes, and a separate text table for use in production of the serial -/// numbers to be given to client end users. See [Generating Serial Numbers](http://dev.worlds.net/private/GammaDocs/WorldServer.html#SerialGen). +/// The included program SerialGen can generate a list of serial numbers based +/// on a seed and tagged by prefix. The program runs in the C-shell and produces +/// three output files: an SQL script that you can use to directly modify the +/// SerialNumbers table, a master list as a text table for administration +/// purposes, and a separate text table for use in production of the serial +/// numbers to be given to client end users. See +/// [Generating Serial Numbers](http://dev.worlds.net/private/GammaDocs/WorldServer.html#SerialGen). /// /// /// The values defined for serialStatus are: @@ -29,23 +34,24 @@ /// 1 = SERIAL_USED #[derive(Debug)] pub struct SerialNumbers { - /// The user's serial number - pub serial_number: String, + /// The user's serial number + pub serial_number: String, - /// Username with case intact. - pub user_name: String, + /// Username with case intact. + pub user_name: String, - /// One of {SERIAL_FREE, SERIAL_USED} - pub serial_status: i64, + /// One of {SERIAL_FREE, SERIAL_USED} + pub serial_status: i64, } -/// The UserRegistration table contains a record for every registered user. The record holds the -/// following information: +/// The UserRegistration table contains a record for every registered user. The +/// record holds the following information: /// -/// The WorldServer will set an authenticated user's privilege level to the value given in this -/// field when the user logs on. This privilege level is communicated to all RoomServers the user -/// connects to. To take effect, the value must be changed while the user is logged off, otherwise -/// it will only become effective at the next login. +/// The WorldServer will set an authenticated user's privilege level to the +/// value given in this field when the user logs on. This privilege level is +/// communicated to all RoomServers the user connects to. To take effect, the +/// value must be changed while the user is logged off, otherwise it will only +/// become effective at the next login. /// /// /// AccountStatus allowed values: @@ -56,9 +62,9 @@ pub struct SerialNumbers { /// /// 1 = ACCOUNT_ACTIVE /// -/// The WorldServer will set accountStatus to ACCOUNT_ACTIVE upon successful user registration. -/// The WorldServer administrator may deactivate an account by setting accountStatus to -/// ACCOUNT_INACTIVE. +/// The WorldServer will set accountStatus to ACCOUNT_ACTIVE upon successful +/// user registration. The WorldServer administrator may deactivate an account +/// by setting accountStatus to ACCOUNT_INACTIVE. /// /// /// userPrivileges allowed values: @@ -71,7 +77,8 @@ pub struct SerialNumbers { /// /// 2 = PRIV_BROADCAST - the user may broadcast text /// -/// 4 = PRIV_PROPERTY - the user may retrieve and set all properties of any object +/// 4 = PRIV_PROPERTY - the user may retrieve and set all properties of any +/// object /// /// 3 = PRIV_BUILD and PRIV_BROADCAST /// @@ -81,73 +88,86 @@ pub struct SerialNumbers { /// /// 7 = PRIV_BUILD and PRIV_BROADCAST and PRIV_PROPERTY /// -/// The WorldServer will set an authenticated user's privilege level to the value given in this -/// field when the user logs on. This privilege level is communicated to all RoomServers the user -/// connects to. To take effect, the value must be changed while the user is logged off, -/// otherwise it will only become effective at the next login. +/// The WorldServer will set an authenticated user's privilege level to the +/// value given in this field when the user logs on. This privilege level is +/// communicated to all RoomServers the user connects to. To take effect, the +/// value must be changed while the user is logged off, otherwise it will only +/// become effective at the next login. #[derive(Debug)] pub struct UserRegistration { - /// The user name in all lower case. - pub user_name_lower: String, + /// The user name in all lower case. + pub user_name_lower: String, - /// The user name with case intact. - pub user_name: String, + /// The user name with case intact. + pub user_name: String, - /// The user's serial number. - pub serial_number: String, + /// The user's serial number. + pub serial_number: String, - /// The user's password. - pub password: String, + /// The user's password. + pub password: String, - /// The user's client software version. - pub client_version: String, + /// The user's client software version. + pub client_version: String, - /// One of {ACCOUNT_ACTIVE, ACCOUNT_INACTIVE}. - pub account_status: i64, + /// One of {ACCOUNT_ACTIVE, ACCOUNT_INACTIVE}. + pub account_status: i64, - /// The date and time the user registered. - pub registration_date: String, + /// The date and time the user registered. + pub registration_date: String, - /// The number of times the user has logged on since registration. - pub times_on: i64, + /// The number of times the user has logged on since registration. + pub times_on: i64, - /// The number of minutes the user has been logged on since registration. - pub total_minutes: i64, + /// The number of minutes the user has been logged on since registration. + pub total_minutes: i64, - pub user_privileges: i64, + pub user_privileges: i64, } -/// The UserProperties table is used to store persistent user properties. These are accessed every -/// time a user logs in, and they may also be used to form the reply for a "finger" operation. -/// The UserProperties table contains the following columns: +/// The UserProperties table is used to store persistent user properties. These +/// are accessed every time a user logs in, and they may also be used to form +/// the reply for a "finger" operation. The UserProperties table contains the +/// following columns: /// /// -/// The setting of the PropertyFlag determines which column the value of the property is stored in. -/// When the value of the property is a string and is stored in propertyStringValue, the -/// propertyBinaryValue will be NULL. When the value of the property is binary data and is stored -/// in propertyBinaryValue, the propertyStringValue will be NULL. Properties stored in -/// propertyStringValue will be readable using the Select command in SQLplus. Properties stored -/// in propertyBinaryValue will appear encoded in hexadecimal when selected using SQLplus. +/// The setting of the PropertyFlag determines which column the value of the +/// property is stored in. When the value of the property is a string and is +/// stored in propertyStringValue, the propertyBinaryValue will be NULL. When +/// the value of the property is binary data and is stored +/// in propertyBinaryValue, the propertyStringValue will be NULL. Properties +/// stored in propertyStringValue will be readable using the Select command in +/// SQLplus. Properties stored in propertyBinaryValue will appear encoded in +/// hexadecimal when selected using SQLplus. /// -/// The values in the propertyFlags and propertyAccess as seen when doing a select on these columns are as follows: +/// The values in the propertyFlags and propertyAccess as seen when doing a +/// select on these columns are as follows: /// /// propertyFlags /// -/// 128 = Store in DB, no auto-update, not a finger property, stored in propertyStringValue. +/// 128 = Store in DB, no auto-update, not a finger property, stored in +/// propertyStringValue. /// -/// 144 = Store in DB, no auto-update, not a finger property, stored in propertyBinaryValue. +/// 144 = Store in DB, no auto-update, not a finger property, stored in +/// propertyBinaryValue. /// -/// 160 = Store in DB, no auto-update, finger property, stored in propertyStringValue. +/// 160 = Store in DB, no auto-update, finger property, stored in +/// propertyStringValue. /// -/// 176 = Store in DB, no auto-update, finger property, stored in propertyBinaryValue. +/// 176 = Store in DB, no auto-update, finger property, stored in +/// propertyBinaryValue. /// -/// 192 = Store in DB, auto-update, not a finger property, stored in propertyStringValue. +/// 192 = Store in DB, auto-update, not a finger property, stored in +/// propertyStringValue. /// -/// 208 = Store in DB, auto-update, not a finger property, stored in propertyBinaryValue. +/// 208 = Store in DB, auto-update, not a finger property, stored in +/// propertyBinaryValue. /// -/// 224 = Store in DB, auto-update, finger property, stored in propertyStringValue. +/// 224 = Store in DB, auto-update, finger property, stored in +/// propertyStringValue. /// -/// 240 = Store in DB, auto-update, finger property, stored in propertyBinaryValue. +/// 240 = Store in DB, auto-update, finger property, stored in +/// propertyBinaryValue. /// /// /// propertyAccess @@ -161,21 +181,25 @@ pub struct UserRegistration { /// 3 = Possessor write, owner read. /// /// -/// UserProperties can be used to store persistent user data from session to session, including -/// potentially shared-state properties related to users. Properties and their Id's need to be -/// coordinated between the server and the client (or the client's underlying language). -/// Properties are generally meaningless to the server, except for the reserved properties for -/// session tracking etc. It is up to the client to interpret the property values correctly. +/// UserProperties can be used to store persistent user data from session to +/// session, including potentially shared-state properties related to users. +/// Properties and their Id's need to be coordinated between the server and the +/// client (or the client's underlying language). Properties are generally +/// meaningless to the server, except for the reserved properties for +/// session tracking etc. It is up to the client to interpret the property +/// values correctly. /// -/// In Gamma, properties are exchanged with the client by using attributes. A full discussion of -/// properties, attributes and using shared objects in Gamma will be included in a future document. +/// In Gamma, properties are exchanged with the client by using attributes. A +/// full discussion of properties, attributes and using shared objects in Gamma +/// will be included in a future document. /// /// PropertyId: /// -/// Each property has a PropertyId field that is one byte. Up to 255 PropertyIds may be defined. -/// Zero is reserved and is not a valid PropertyId. Some of these properties are shared between -///o bjects (especially those relating to session control), but others may be defined on a -/// per-object basis. Currently defined propertyIds include: +/// Each property has a PropertyId field that is one byte. Up to 255 PropertyIds +/// may be defined. Zero is reserved and is not a valid PropertyId. Some of +/// these properties are shared between o bjects (especially those relating to +/// session control), but others may be defined on a per-object basis. Currently +/// defined propertyIds include: /// /// Session Properties: /// @@ -213,12 +237,13 @@ pub struct UserRegistration { /// /// 11 = Email Address (string) /// -/// The client software can require users to register with their email addresses as an optional -/// field. For example, in Worlds Chat, the client sends the email address as VAR_EMAIL. VAR_EMAIL -/// in turn is understood by the server to be a property that is stored in the Properties -/// database, as Property ID 11, with the email itself stored in PropertyStringValue as a string. -/// Since this table is also keyed to the username, you can correlate tables of email addresses as -/// needed using SQL. +/// The client software can require users to register with their email addresses +/// as an optional field. For example, in Worlds Chat, the client sends the +/// email address as VAR_EMAIL. VAR_EMAIL in turn is understood by the server to +/// be a property that is stored in the Properties database, as Property ID 11, +/// with the email itself stored in PropertyStringValue as a string. Since this +/// table is also keyed to the username, you can correlate tables of email +/// addresses as needed using SQL. /// /// To extract an email address for a given user: /// ```sql @@ -235,27 +260,29 @@ pub struct UserRegistration { /// order by userName; /// ``` /// -/// You should note however that many database table queries, particularly in the RoomProperties -/// table, might give you data that is stored mostly in binary form, and that cannot be properly -/// interpreted by standard SQL. What you'll get in this case is often meaningless hexadecimal, -/// which could be unicode text or just raw data. +/// You should note however that many database table queries, particularly in +/// the RoomProperties table, might give you data that is stored mostly in +/// binary form, and that cannot be properly interpreted by standard SQL. What +/// you'll get in this case is often meaningless hexadecimal, which could be +/// unicode text or just raw data. #[derive(Debug)] pub struct UserProperties { - /// The user name with case intact - pub user_name: String, + /// The user name with case intact + pub user_name: String, - /// The property identifier. - pub property_id: i64, + /// The property identifier. + pub property_id: i64, - /// Each property has a PropertyFlags field that defines certain aspects of the property. - pub property_flags: i64, + /// Each property has a PropertyFlags field that defines certain aspects of + /// the property. + pub property_flags: i64, - /// Defines access restrictions on the property. - pub property_access: i64, + /// Defines access restrictions on the property. + pub property_access: i64, - /// The value of the property when it is a string. - pub property_string_value: String, + /// The value of the property when it is a string. + pub property_string_value: String, - /// The value of the property when it is binary data. - pub property_binary_value: String, + /// The value of the property when it is binary data. + pub property_binary_value: String, } @@ -4,8 +4,9 @@ #[macro_use] extern crate log; -pub mod db; -pub mod server; -pub mod utils; pub mod cli; pub mod config; + +pub mod db; +pub mod re_server; +pub mod utils; diff --git a/src/main.rs b/src/main.rs index cb13cd7..22d72a6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,73 +1,90 @@ #[macro_use] extern crate log; -use whirl::server::auto::server::AutoServer; use std::error::Error; -use whirl::server::room::server::RoomServer; -use whirl::config; -use whirl::config::get_config; + use structopt::clap::Shell; -use whirl::cli::cli; +use whirl::{ + cli::cli, + config, + config::get_config, + re_server::{ + distributor::Distributor, + hub::Hub, + server::{ + Server, + ServerType::{AutoServer, RoomServer}, + }, + }, +}; #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { - // Setup CLI - let matches = cli().get_matches(); + // Setup CLI + let matches = cli().get_matches(); - // Set logging level - let mut log_level = "whirl=error,whirl=warn,whirl=info".to_string(); - if matches.is_present("debug") { log_level += ",whirl=debug"; } - if matches.is_present("trace") { log_level += ",whirl=trace"; } - std::env::set_var("RUST_LOG", log_level); + // Set logging level + let mut log_level = "whirl=error,whirl=warn,whirl=info".to_string(); + if matches.is_present("debug") { + log_level += ",whirl=debug"; + } + if matches.is_present("trace") { + log_level += ",whirl=trace"; + } + std::env::set_var("RUST_LOG", log_level); - // Set database URL - std::env::set_var("DATABASE_URL", "sqlite:whirl.db"); + // Set database URL + std::env::set_var("DATABASE_URL", "sqlite:whirl.db"); - // Setup logging - dotenv::dotenv().ok(); - pretty_env_logger::init(); + // Setup logging + dotenv::dotenv().ok(); + pretty_env_logger::init(); - // Handle CLI command - if matches.is_present("run") { - run().await.unwrap(); - } else if let Some(cmd) = matches.subcommand_matches("config") { - if cmd.is_present("show") { - println!("{:#?}", config::get_config()); - } - } else if let Some(shell) = matches.subcommand_matches("completions") { - if shell.is_present("powershell") { - cli().gen_completions(env!("CARGO_PKG_NAME"), Shell::PowerShell, "."); - } else if shell.is_present("bash") { - cli().gen_completions(env!("CARGO_PKG_NAME"), Shell::Bash, "."); - } else if shell.is_present("elvish") { - cli().gen_completions(env!("CARGO_PKG_NAME"), Shell::Elvish, "."); - } else if shell.is_present("zsh") { - cli().gen_completions(env!("CARGO_PKG_NAME"), Shell::Zsh, "."); - } else if shell.is_present("fish") { - cli().gen_completions(env!("CARGO_PKG_NAME"), Shell::Fish, "."); - } - debug!("generated shell completions"); - } + // Handle CLI command + if matches.is_present("run") { + run().await.unwrap(); + } else if let Some(cmd) = matches.subcommand_matches("config") { + if cmd.is_present("show") { + println!("{:#?}", config::get_config()); + } + } else if let Some(shell) = matches.subcommand_matches("completions") { + if shell.is_present("powershell") { + cli().gen_completions(env!("CARGO_PKG_NAME"), Shell::PowerShell, "."); + } else if shell.is_present("bash") { + cli().gen_completions(env!("CARGO_PKG_NAME"), Shell::Bash, "."); + } else if shell.is_present("elvish") { + cli().gen_completions(env!("CARGO_PKG_NAME"), Shell::Elvish, "."); + } else if shell.is_present("zsh") { + cli().gen_completions(env!("CARGO_PKG_NAME"), Shell::Zsh, "."); + } else if shell.is_present("fish") { + cli().gen_completions(env!("CARGO_PKG_NAME"), Shell::Fish, "."); + } + debug!("generated shell completions"); + } - Ok(()) + Ok(()) } async fn run() -> Result<(), Box<dyn Error>> { - let threads = vec![ - tokio::spawn(async move { - let _ = AutoServer::listen( - &*format!("0.0.0.0:{}", get_config().unwrap().auto_server_port) - ).await; - }), - tokio::spawn(async move { - let _ = RoomServer::listen( - &*format!("0.0.0.0:{}", get_config().unwrap().room_server_port) - ).await; - }), - ]; - for thread in threads { - let _ = thread.await; - } + let threads = vec![ + tokio::spawn(async move { + let _ = Distributor::listen( + &*format!("0.0.0.0:{}", get_config().unwrap().distributor_port,), + AutoServer, + ) + .await; + }), + tokio::spawn(async move { + let _ = Hub::listen( + &*format!("0.0.0.0:{}", get_config().unwrap().hub_port), + RoomServer, + ) + .await; + }), + ]; + for thread in threads { + let _ = thread.await; + } - Ok(()) + Ok(()) } diff --git a/src/re_server/cmd/commands/action.rs b/src/re_server/cmd/commands/action.rs new file mode 100644 index 0000000..03eb10e --- /dev/null +++ b/src/re_server/cmd/commands/action.rs @@ -0,0 +1,17 @@ +use bytes::{BufMut, BytesMut}; + +pub fn create_action() -> Vec<u8> { + let mut command = BytesMut::new(); + + command.put_slice(&[ + 0x01, 0x11, 0x00, 0x05, 0x54, 0x52, 0x41, 0x44, 0x45, 0x07, 0x26, 0x7c, 0x2b, 0x69, 0x6e, 0x76, + 0x3e, + ]); + + // Convert to vector and insert the length + let mut command_as_vec = command.to_vec(); + command_as_vec.insert(0, command.len() as u8 + 1); + + // Return bytes + command_as_vec +} diff --git a/src/re_server/cmd/commands/buddy_list/create.rs b/src/re_server/cmd/commands/buddy_list/create.rs new file mode 100644 index 0000000..dc9bddd --- /dev/null +++ b/src/re_server/cmd/commands/buddy_list/create.rs @@ -0,0 +1,21 @@ +use bytes::{BufMut, BytesMut}; + +use crate::re_server::cmd::{commands::buddy_list::structure::Buddy, constants::BUDDYLISTNOTIFY}; + +pub fn create_buddy_list_notify(buddy: &Buddy) -> Vec<u8> { + let mut command = BytesMut::new(); + + // Header + command.put_u8(0x01); // ObjId + command.put_u8(BUDDYLISTNOTIFY as u8); // Type + + // Content + command.put_u8(buddy.buddy.len() as u8); // Buddy (name) length + command.put_slice(buddy.buddy.as_bytes()); // Buddy (name) + command.put_u8(buddy.add); // "Is buddy logged on?" (?) + + let mut command_as_vec = command.to_vec(); + command_as_vec.insert(0, command.len() as u8 + 1); + + command_as_vec +} diff --git a/src/re_server/cmd/commands/buddy_list/mod.rs b/src/re_server/cmd/commands/buddy_list/mod.rs new file mode 100644 index 0000000..02f63a3 --- /dev/null +++ b/src/re_server/cmd/commands/buddy_list/mod.rs @@ -0,0 +1,3 @@ +pub mod create; +pub mod parse; +pub mod structure; diff --git a/src/re_server/cmd/commands/buddy_list/parse.rs b/src/re_server/cmd/commands/buddy_list/parse.rs new file mode 100644 index 0000000..4e96f5f --- /dev/null +++ b/src/re_server/cmd/commands/buddy_list/parse.rs @@ -0,0 +1,14 @@ +use std::str::from_utf8; + +use crate::re_server::cmd::commands::buddy_list::structure::Buddy; + +pub fn parse_buddy_list_update(data: Vec<u8>) -> Buddy { + Buddy { + buddy: from_utf8(&data[4..data[0] as usize - 1]) + .unwrap() + .to_string(), + + // Get the last byte + add: data[data[0] as usize - 1], + } +} diff --git a/src/re_server/cmd/commands/buddy_list/structure.rs b/src/re_server/cmd/commands/buddy_list/structure.rs new file mode 100644 index 0000000..732a847 --- /dev/null +++ b/src/re_server/cmd/commands/buddy_list/structure.rs @@ -0,0 +1,4 @@ +pub struct Buddy { + pub buddy: String, + pub add: u8, +} diff --git a/src/re_server/cmd/commands/mod.rs b/src/re_server/cmd/commands/mod.rs new file mode 100644 index 0000000..3a49b47 --- /dev/null +++ b/src/re_server/cmd/commands/mod.rs @@ -0,0 +1,9 @@ +pub mod action; +pub mod buddy_list; +pub mod property; +pub mod room; +pub mod session; +pub mod subscribe; +pub mod text; +pub mod whisper; +// pub mod register_object_id; // TODO: Implement. diff --git a/src/re_server/cmd/commands/property/create.rs b/src/re_server/cmd/commands/property/create.rs new file mode 100644 index 0000000..e6d0714 --- /dev/null +++ b/src/re_server/cmd/commands/property/create.rs @@ -0,0 +1,164 @@ +use crate::{ + config::get_config, + re_server::{ + cmd::constants::{PROPUPD, SESSINIT}, + net::{ + constants::{ + VAR_APPNAME, + VAR_CHANNEL, + VAR_ERROR, + VAR_EXTERNAL_HTTP_SERVER, + VAR_MAIL_DOMAIN, + VAR_PRIV, + VAR_PROTOCOL, + VAR_SCRIPT_SERVER, + VAR_SERIAL, + VAR_SERVERTYPE, + VAR_SMTP_SERVER, + VAR_UPDATETIME, + }, + converter::property_list_to_bytes, + structure::NetworkProperty, + }, + }, +}; + +pub fn create_property_update_as_distributor() -> Vec<u8> { + property_list_to_bytes( + PROPUPD, + 0xFF, + vec![ + NetworkProperty { + prop_id: VAR_MAIL_DOMAIN, + value: "worlds3d.com".to_string(), + }, + NetworkProperty { + prop_id: VAR_SMTP_SERVER, + value: "mail.worlds.net:25".to_string(), + }, + NetworkProperty { + prop_id: VAR_SCRIPT_SERVER, + value: "http://www-dynamic.us.worlds.net/cgi-bin".to_string(), + }, + NetworkProperty { + prop_id: VAR_EXTERNAL_HTTP_SERVER, + value: "http://www-static.us.worlds.net".to_string(), + }, + NetworkProperty { + prop_id: VAR_SERVERTYPE, + value: "1".to_string(), + }, + NetworkProperty { + prop_id: VAR_PROTOCOL, + value: "24".to_string(), + }, + NetworkProperty { + prop_id: VAR_APPNAME, + value: get_config().unwrap().worldsmaster_username, + }, + ], + ) +} + +pub fn create_property_update_as_hub() -> Vec<u8> { + property_list_to_bytes( + PROPUPD, + 0xFF, + vec![ + NetworkProperty { + prop_id: VAR_UPDATETIME, + value: "1000000".to_string(), + }, + NetworkProperty { + prop_id: VAR_MAIL_DOMAIN, + value: "worlds3d.com".to_string(), + }, + NetworkProperty { + prop_id: VAR_SMTP_SERVER, + value: "mail.worlds.net:25".to_string(), + }, + NetworkProperty { + prop_id: VAR_SCRIPT_SERVER, + value: "http://www-dynamic.us.worlds.net/cgi-bin".to_string(), + }, + NetworkProperty { + prop_id: VAR_EXTERNAL_HTTP_SERVER, + value: "http://www-static.us.worlds.net".to_string(), + }, + NetworkProperty { + prop_id: VAR_SERVERTYPE, + value: "3".to_string(), + }, + NetworkProperty { + prop_id: VAR_PROTOCOL, + value: "24".to_string(), + }, + NetworkProperty { + prop_id: VAR_APPNAME, + value: get_config().unwrap().worldsmaster_username, + }, + ], + ) +} + +pub fn create_property_request_as_distributor() -> Vec<u8> { + property_list_to_bytes( + SESSINIT as i32, + 0x01, + vec![ + NetworkProperty { + prop_id: VAR_ERROR, + value: "0".to_string(), + }, + NetworkProperty { + prop_id: VAR_APPNAME, + value: get_config().unwrap().worldsmaster_username, + }, + NetworkProperty { + prop_id: VAR_PROTOCOL, + value: "24".to_string(), + }, + NetworkProperty { + prop_id: VAR_SERVERTYPE, + value: "1".to_string(), + }, + NetworkProperty { + prop_id: VAR_SERIAL, + value: "DWLV000000000000".to_string(), + }, + NetworkProperty { + prop_id: VAR_PRIV, + value: "0".to_string(), + }, + NetworkProperty { + prop_id: VAR_CHANNEL, + value: "dimension-1".to_string(), + }, + ], + ) +} + +pub fn create_property_request_as_hub() -> Vec<u8> { + property_list_to_bytes( + SESSINIT as i32, + 0x01, + vec![ + NetworkProperty { + prop_id: VAR_ERROR, + value: "0".to_string(), + }, + NetworkProperty { + prop_id: VAR_SERVERTYPE, + value: "3".to_string(), + }, + NetworkProperty { + prop_id: VAR_UPDATETIME, + value: "1000000".to_string(), + }, + NetworkProperty { + prop_id: VAR_PROTOCOL, + value: "24".to_string(), + }, + ], + ) +} diff --git a/src/re_server/cmd/commands/property/mod.rs b/src/re_server/cmd/commands/property/mod.rs new file mode 100644 index 0000000..d6ebc5b --- /dev/null +++ b/src/re_server/cmd/commands/property/mod.rs @@ -0,0 +1,2 @@ +pub mod create; +pub mod parse; diff --git a/src/re_server/cmd/commands/property/parse.rs b/src/re_server/cmd/commands/property/parse.rs new file mode 100644 index 0000000..4f7c108 --- /dev/null +++ b/src/re_server/cmd/commands/property/parse.rs @@ -0,0 +1,11 @@ +use crate::re_server::net::structure::NetworkProperty; + +pub fn find_property_in_property_list( + property_list: &[NetworkProperty], + property: i32, +) -> &NetworkProperty { + property_list + .iter() + .find(|i| i.prop_id == property) + .unwrap() +} diff --git a/src/re_server/cmd/commands/register_object_id/create.rs b/src/re_server/cmd/commands/register_object_id/create.rs new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/re_server/cmd/commands/register_object_id/create.rs diff --git a/src/re_server/cmd/commands/register_object_id/mod.rs b/src/re_server/cmd/commands/register_object_id/mod.rs new file mode 100644 index 0000000..c5fb369 --- /dev/null +++ b/src/re_server/cmd/commands/register_object_id/mod.rs @@ -0,0 +1 @@ +pub mod create; diff --git a/src/re_server/cmd/commands/room/create.rs b/src/re_server/cmd/commands/room/create.rs new file mode 100644 index 0000000..7a49ded --- /dev/null +++ b/src/re_server/cmd/commands/room/create.rs @@ -0,0 +1,31 @@ +use bytes::{BufMut, BytesMut}; + +use crate::{config::get_config, re_server::cmd::constants::REDIRID}; + +pub fn create_room_id_request(room: &str, room_id: u8) -> Vec<u8> { + let mut command = BytesMut::new(); + + // Header + command.put_u8(0x01); // ObjId + command.put_u8(REDIRID as u8); // Type + + // Content + command.put_u8(room.len() as u8); // Room name length + command.put_slice(room.as_bytes()); // Room name + // command.put_u8(0x00); // Unimplemented byte (?) + // command.put_u8(room_id); // Room ID + command.put_u16(room_id as u16); // Room ID + + // IP + for byte in "0.0.0.0".split('.') { + command.put_u8(byte.parse::<u8>().unwrap()); + } + command.put_u16(get_config().unwrap().hub_port as u16); // Port + + // Length + let mut command_as_vec = command.to_vec(); + command_as_vec.insert(0, command.len() as u8 + 1); + + // Return + command_as_vec +} diff --git a/src/re_server/cmd/commands/room/mod.rs b/src/re_server/cmd/commands/room/mod.rs new file mode 100644 index 0000000..d6ebc5b --- /dev/null +++ b/src/re_server/cmd/commands/room/mod.rs @@ -0,0 +1,2 @@ +pub mod create; +pub mod parse; diff --git a/src/re_server/cmd/commands/room/parse.rs b/src/re_server/cmd/commands/room/parse.rs new file mode 100644 index 0000000..c6bb25c --- /dev/null +++ b/src/re_server/cmd/commands/room/parse.rs @@ -0,0 +1,5 @@ +use std::str::from_utf8; + +pub fn parse_room_id_request(data: Vec<u8>) -> String { + from_utf8(&data[4..data[0] as usize]).unwrap().to_string() +} diff --git a/src/re_server/cmd/commands/session/mod.rs b/src/re_server/cmd/commands/session/mod.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/re_server/cmd/commands/session/mod.rs @@ -0,0 +1 @@ + diff --git a/src/re_server/cmd/commands/session/parse.rs b/src/re_server/cmd/commands/session/parse.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/re_server/cmd/commands/session/parse.rs @@ -0,0 +1 @@ + diff --git a/src/re_server/cmd/commands/session/structure.rs b/src/re_server/cmd/commands/session/structure.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/re_server/cmd/commands/session/structure.rs @@ -0,0 +1 @@ + diff --git a/src/re_server/cmd/commands/subscribe/mod.rs b/src/re_server/cmd/commands/subscribe/mod.rs new file mode 100644 index 0000000..94bb423 --- /dev/null +++ b/src/re_server/cmd/commands/subscribe/mod.rs @@ -0,0 +1,2 @@ +mod parse; +mod structure; diff --git a/src/re_server/cmd/commands/subscribe/parse.rs b/src/re_server/cmd/commands/subscribe/parse.rs new file mode 100644 index 0000000..3caed08 --- /dev/null +++ b/src/re_server/cmd/commands/subscribe/parse.rs @@ -0,0 +1,18 @@ +use byteorder::{BigEndian, ReadBytesExt}; +use bytes::{Buf, BytesMut}; + +use crate::re_server::cmd::commands::subscribe::structure::SubscribeRoom; + +/// TODO: The functionality of this function has not been tested... TEST IT! +pub fn parse_subscribe_room(data: Vec<u8>) -> SubscribeRoom { + // https://stackoverflow.com/questions/41034635/how-do-i-convert-between-string-str-vecu8-and-u8 + let mut data = BytesMut::from(data.as_slice()).reader(); + + SubscribeRoom { + room_number: data.read_i16::<BigEndian>().unwrap(), + distance: data.read_i16::<BigEndian>().unwrap(), + x: data.read_i16::<BigEndian>().unwrap(), + y: data.read_i16::<BigEndian>().unwrap(), + z: data.read_i16::<BigEndian>().unwrap(), + } +} diff --git a/src/re_server/cmd/commands/subscribe/structure.rs b/src/re_server/cmd/commands/subscribe/structure.rs new file mode 100644 index 0000000..fb74e4f --- /dev/null +++ b/src/re_server/cmd/commands/subscribe/structure.rs @@ -0,0 +1,7 @@ +pub struct SubscribeRoom { + pub room_number: i16, + pub distance: i16, + pub x: i16, + pub y: i16, + pub z: i16, +} diff --git a/src/re_server/cmd/commands/text/create.rs b/src/re_server/cmd/commands/text/create.rs new file mode 100644 index 0000000..02def7b --- /dev/null +++ b/src/re_server/cmd/commands/text/create.rs @@ -0,0 +1,28 @@ +use bytes::{BufMut, BytesMut}; + +use crate::re_server::cmd::{commands::text::structure::Text, constants::TEXT}; + +pub fn create_text(text: Text) -> Vec<u8> { + let mut command = BytesMut::new(); + + // Header + command.put_u8(0x01); + command.put_u8(TEXT as u8); + + // Content + // The fourth and fifth elements are presumed to be interpreted as a short by + // the client, however, usernames aren't (?) allowed to be long enough that + // they reach a number high enough to be converted to a short. + command.put_u8(0x00); + command.put_u8(text.sender.len() as u8); + command.put_slice(text.sender.as_bytes()); + command.put_u8(text.content.len() as u8); + command.put_slice(text.content.as_bytes()); + + // Convert to vector and insert the length + let mut command_as_vec = command.to_vec(); + command_as_vec.insert(0, command.len() as u8 + 1); + + // Return bytes + command_as_vec +} diff --git a/src/re_server/cmd/commands/text/mod.rs b/src/re_server/cmd/commands/text/mod.rs new file mode 100644 index 0000000..02f63a3 --- /dev/null +++ b/src/re_server/cmd/commands/text/mod.rs @@ -0,0 +1,3 @@ +pub mod create; +pub mod parse; +pub mod structure; diff --git a/src/re_server/cmd/commands/text/parse.rs b/src/re_server/cmd/commands/text/parse.rs new file mode 100644 index 0000000..45999a0 --- /dev/null +++ b/src/re_server/cmd/commands/text/parse.rs @@ -0,0 +1,10 @@ +use std::str::from_utf8; + +use crate::re_server::cmd::commands::text::structure::Text; + +pub fn parse_text(data: Vec<u8>, username: &str) -> Text { + Text { + sender: username.to_string(), + content: from_utf8(&data[6..]).unwrap().to_string(), + } +} diff --git a/src/re_server/cmd/commands/text/structure.rs b/src/re_server/cmd/commands/text/structure.rs new file mode 100644 index 0000000..038fb12 --- /dev/null +++ b/src/re_server/cmd/commands/text/structure.rs @@ -0,0 +1,4 @@ +pub struct Text { + pub sender: String, + pub content: String, +} diff --git a/src/re_server/cmd/commands/whisper.rs b/src/re_server/cmd/commands/whisper.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/re_server/cmd/commands/whisper.rs @@ -0,0 +1 @@ + diff --git a/src/re_server/cmd/constants.rs b/src/re_server/cmd/constants.rs new file mode 100644 index 0000000..ed07704 --- /dev/null +++ b/src/re_server/cmd/constants.rs @@ -0,0 +1,29 @@ +pub const LONGLOC: i32 = 1; +pub const STATE: i32 = 2; +pub const PROP: i32 = 3; +pub const SHORTLOC: i32 = 4; +pub const ROOMCHNG: i32 = 5; +pub const SESSINIT: i32 = 6; +pub const SESSEXIT: i32 = 7; +pub const APPINIT: i32 = 8; +pub const PROPREQ: i32 = 10; +pub const DISAPPR: i32 = 11; +pub const APPRACTR: i32 = 12; +pub const REGOBJID: i32 = 13; +pub const TEXT: i32 = 14; +pub const PROPSET: i32 = 15; +pub const PROPUPD: i32 = 16; +pub const WHISPER: i32 = 17; +pub const TELEPORT: i32 = 18; +pub const ROOMIDRQ: i32 = 20; +pub const ROOMID: i32 = 21; +pub const SUBSCRIB: i32 = 22; +pub const UNSUBSCR: i32 = 23; +pub const SUB_DIST: i32 = 24; // SUB-DIST +pub const REDIRECT: i32 = 25; +pub const REDIRID: i32 = 26; +pub const FINGREQ: i32 = 27; +pub const FINGREP: i32 = 28; +pub const BUDDYLISTUPDATE: i32 = 29; +pub const BUDDYLISTNOTIFY: i32 = 30; +pub const CHANNEL: i32 = 31; diff --git a/src/re_server/cmd/mod.rs b/src/re_server/cmd/mod.rs new file mode 100644 index 0000000..181a82d --- /dev/null +++ b/src/re_server/cmd/mod.rs @@ -0,0 +1,5 @@ +pub mod commands; + +pub mod constants; +mod set_parser; +mod structure; diff --git a/src/re_server/cmd/set_parser.rs b/src/re_server/cmd/set_parser.rs new file mode 100644 index 0000000..be2a5dd --- /dev/null +++ b/src/re_server/cmd/set_parser.rs @@ -0,0 +1,36 @@ +use crate::re_server::cmd::structure::Command; + +/// Iterate over a command set in the from of bytes and return a list of +/// human-readable commands. +fn _parse_command_set(mut data: Vec<u8>) -> Vec<Command> { + let mut command_set = vec![]; + + // Iterate over all commands + loop { + // Check if any commands are present + if data.len() <= 2 { + break; + } + if data[0] == 0 { + break; + } + + let command_length = data[0]; + let mut command = Command { + length: command_length as i32, + obj_id: data[1] as i32, + id: data[2] as i32, + body: vec![], + }; + if command.length > 3 { + command.body = data[3..].to_owned(); + } + command_set.push(command); + + // Remove current command from the command set + data = data[command_length as usize..].to_vec(); + } + + // Return the human-readable command set + command_set +} diff --git a/src/re_server/cmd/structure.rs b/src/re_server/cmd/structure.rs new file mode 100644 index 0000000..85c2288 --- /dev/null +++ b/src/re_server/cmd/structure.rs @@ -0,0 +1,19 @@ +pub struct Command { + pub length: i32, + pub obj_id: i32, + pub id: i32, + pub body: Vec<u8>, +} +impl Command { + pub fn _new() -> Self { Command::default() } +} +impl Default for Command { + fn default() -> Self { + Command { + length: 0, + obj_id: 0, + id: 0, + body: vec![], + } + } +} diff --git a/src/re_server/distributor.rs b/src/re_server/distributor.rs new file mode 100644 index 0000000..5a8e3aa --- /dev/null +++ b/src/re_server/distributor.rs @@ -0,0 +1,144 @@ +//! The distributor functions as bare-minimal +//! [AutoServer](http://dev.worlds.net/private/GammaDocs/WorldServer.html#AutoServer). +//! +//! It intercepts a client and distributes it to a +//! [RoomServer](http://dev.worlds.net/private/GammaDocs/WorldServer.html#RoomServer). +//! +//! This is not meant to be a high performant section of code as the distributor +//! is only meant to handle the initial and brief session initialization of the +//! client. + +use std::{error::Error, net::SocketAddr, sync::Arc}; + +use tokio::{io::AsyncWriteExt, net::TcpStream, sync::Mutex}; +use tokio_stream::StreamExt; +use tokio_util::codec::{BytesCodec, Decoder}; + +use crate::{ + config::get_config, + re_server::{ + cmd::{ + commands::{ + action::create_action, + buddy_list::{create::create_buddy_list_notify, parse::parse_buddy_list_update}, + property::{ + create::{create_property_request_as_distributor, create_property_update_as_distributor}, + parse::find_property_in_property_list, + }, + room::{create::create_room_id_request, parse::parse_room_id_request}, + text::{create::create_text, structure::Text}, + }, + constants::*, + }, + interaction::{peer::Peer, shared::Shared}, + net::{constants::VAR_USERNAME, property_parser::parse_network_property}, + packet_parser::parse_commands_from_packet, + server::Server, + }, +}; + +pub struct Distributor; +#[async_trait::async_trait] +impl Server for Distributor { + async fn handle( + state: Arc<Mutex<Shared>>, + stream: TcpStream, + _address: SocketAddr, + count: usize, + ) -> Result<(), Box<dyn Error>> { + let bytes = BytesCodec::new().framed(stream); + let mut peer = Peer::new(state.clone(), bytes, count.to_string()).await?; + let mut room_ids = vec![]; + let mut username = String::from("unknown"); + + loop { + tokio::select! { + Some(msg) = peer.rx.recv() => { + peer.bytes.get_mut().write_all(&msg).await?; + } + result = peer.bytes.next() => match result { + Some(Ok(msg)) => { + for msg in parse_commands_from_packet(msg) { + match msg.get(2).unwrap().to_owned() as i32 { + PROPREQ => { + trace!("received property request from client"); + + peer.bytes.get_mut() + .write_all(&create_property_update_as_distributor()).await?; + trace!("sent property update to client"); + } + SESSINIT => { + username = find_property_in_property_list( + &parse_network_property(msg[3..].to_vec()), + VAR_USERNAME, + ).value.clone(); + + trace!("received session initialization from {}", username); + + peer.bytes.get_mut() + .write_all(&create_property_request_as_distributor()).await?; + trace!("sent property request to {}", username); + } + PROPSET => { + trace!("received property set from {}", username); + + peer.bytes.get_mut() + .write_all(&create_text(Text { + sender: get_config()?.worldsmaster_username, + content: get_config()?.worldsmaster_greeting, + })).await?; + peer.bytes.get_mut() + .write_all(&create_action()).await?; + trace!("sent text to {}", username); + } + BUDDYLISTUPDATE => { + let buddy = parse_buddy_list_update(msg.to_vec()); + trace!("received buddy list update from {}: {}", username, buddy.buddy); + peer.bytes.get_mut() + .write_all(&create_buddy_list_notify(&buddy)).await?; + trace!("sent buddy list notify to {}: {}", username, buddy.buddy); + } + ROOMIDRQ => { + let room = parse_room_id_request(msg.to_vec()); + trace!("received room id request from {}: {}", username, &room); + + let room_id; + if !room_ids.contains(&room) { + room_ids.push(room.clone()); + room_id = room_ids.iter().position(|r| r == &room).unwrap(); + debug!("inserted room: {}", room); + } else { + let position = room_ids.iter().position(|r| r == &room).unwrap(); + debug!("found room: {}", room); + room_id = position; + } + + peer.bytes.get_mut() + .write_all(&create_room_id_request(&room, room_id as u8)).await?; + trace!("sent room id redirect to {}: {}", username, room); + } + SESSEXIT => { + trace!("received session exit from {}", username); break; + } + _ => (), + } + } + } + Some(Err(e)) => { + error!("error while processing message (s): {}", e); break; + } + None => break, + } + } + } + + // Deregister client + trace!("de-registering client"); + { + state.lock().await.peers.remove(&count.to_string()); + } + trace!("de-registered client"); + + Ok(()) + } +} diff --git a/src/re_server/hub.rs b/src/re_server/hub.rs new file mode 100644 index 0000000..dffe183 --- /dev/null +++ b/src/re_server/hub.rs @@ -0,0 +1,142 @@ +//! The hub functions as a +//! [RoomServer](http://dev.worlds.net/private/GammaDocs/WorldServer.html#AutoServer). +//! +//! The RoomServer is responsible for handling just about every request from the +//! client after they have been redirected to a room (hub). + +use std::{error::Error, net::SocketAddr, sync::Arc}; + +use tokio::{io::AsyncWriteExt, net::TcpStream, sync::Mutex}; +use tokio_stream::StreamExt; +use tokio_util::codec::{BytesCodec, Decoder}; + +use crate::{ + config::get_config, + re_server::{ + cmd::{ + commands::{ + action::create_action, + buddy_list::{create::create_buddy_list_notify, parse::parse_buddy_list_update}, + property::{ + create::{create_property_request_as_hub, create_property_update_as_hub}, + parse::find_property_in_property_list, + }, + room::{create::create_room_id_request, parse::parse_room_id_request}, + text::{create::create_text, parse::parse_text, structure::Text}, + }, + constants::*, + }, + interaction::{peer::Peer, shared::Shared}, + net::{constants::VAR_USERNAME, property_parser::parse_network_property}, + packet_parser::parse_commands_from_packet, + server::Server, + }, +}; + +pub struct Hub; +#[async_trait::async_trait] +impl Server for Hub { + async fn handle( + state: Arc<Mutex<Shared>>, + stream: TcpStream, + _address: SocketAddr, + count: usize, + ) -> Result<(), Box<dyn Error>> { + let bytes = BytesCodec::new().framed(stream); + let mut peer = Peer::new(state.clone(), bytes, count.to_string()).await?; + // let mut room_ids = vec![]; + let mut username = String::from("unknown"); + + loop { + tokio::select! { + Some(msg) = peer.rx.recv() => { + dbg!("got peer activity: {:?}", &msg); + peer.bytes.get_mut().write_all(&msg).await?; + } + result = peer.bytes.next() => match result { + Some(Ok(msg)) => { + dbg!("got some bytes: {:?}", &msg); + for msg in parse_commands_from_packet(msg) { + match msg.get(2).unwrap().to_owned() as i32 { + PROPREQ => { + trace!("received property request from client"); + + peer.bytes.get_mut() + .write_all(&create_property_update_as_hub()).await?; + trace!("sent property update to client"); + } + SESSINIT => { + username = find_property_in_property_list( + &parse_network_property(msg[3..].to_vec()), + VAR_USERNAME, + ).value.clone(); + + trace!("received session initialization from {}", username); + + peer.bytes.get_mut() + .write_all(&create_property_request_as_hub()).await?; + trace!("sent property request to {}", username); + } + PROPSET => { + trace!("received property set from {}", username); + + peer.bytes.get_mut() + .write_all(&create_text(Text { + sender: get_config()?.worldsmaster_username, + content: get_config()?.worldsmaster_greeting, + })).await?; + peer.bytes.get_mut() + .write_all(&create_action()).await?; + trace!("sent text to {}", username); + } + BUDDYLISTUPDATE => { + let buddy = parse_buddy_list_update(msg.to_vec()); + trace!("received buddy list update from {}: {}", username, buddy.buddy); + peer.bytes.get_mut() + .write_all(&create_buddy_list_notify(&buddy)).await?; + trace!("sent buddy list notify to {}: {}", username, buddy.buddy); + } + ROOMIDRQ => { + let room = parse_room_id_request(msg.to_vec()); + trace!("received room id request from {}: {}", username, room); + debug!("{:?}", create_room_id_request(&room, 0x04)); + } + SESSEXIT => { + trace!("received session exit from {}", username); break; + } + TEXT => { + let text = parse_text(msg.to_vec(), &username); + trace!("received text from {}:{}", username, text.content); + + { + state.lock().await.broadcast(&create_text(Text { + sender: username.clone(), + content: text.content, + })).await; + } + trace!("broadcasted text to hub"); + } + _ => (), + } + } + } + Some(Err(e)) => { + error!("error while processing message (s): {}", e); break; + } + None => { + debug!("nothing"); break; + }, + } + } + } + + // Deregister client + trace!("de-registering client"); + { + state.lock().await.peers.remove(&count.to_string()); + } + trace!("de-registered client"); + + Ok(()) + } +} diff --git a/src/re_server/interaction/mod.rs b/src/re_server/interaction/mod.rs new file mode 100644 index 0000000..81d576b --- /dev/null +++ b/src/re_server/interaction/mod.rs @@ -0,0 +1,2 @@ +pub mod peer; +pub mod shared; diff --git a/src/re_server/interaction/peer.rs b/src/re_server/interaction/peer.rs new file mode 100644 index 0000000..0d5cf4e --- /dev/null +++ b/src/re_server/interaction/peer.rs @@ -0,0 +1,46 @@ +use std::sync::Arc; + +use tokio::{ + net::TcpStream, + sync::{mpsc, Mutex}, +}; +use tokio_util::codec::{BytesCodec, Framed}; + +use crate::re_server::{interaction::shared::Shared, types::Rx}; + +pub struct Peer { + pub bytes: Framed<TcpStream, BytesCodec>, + pub rx: Rx, +} +impl Peer { + pub async fn new( + state: Arc<Mutex<Shared>>, + bytes: Framed<TcpStream, BytesCodec>, + username: String, + ) -> std::io::Result<Peer> { + let (tx, rx) = mpsc::unbounded_channel(); + state.lock().await.peers.insert(username, tx); + + Ok(Peer { + bytes, + rx, + }) + } + + pub async fn _change_username( + self, + state: Arc<Mutex<Shared>>, + username: &str, + new_username: &str, + ) { + // Remove peer from peers + { + state.lock().await.peers.remove(username); + } + + // Add the peer back with the new username + Self::new(state, self.bytes, new_username.to_string()) + .await + .unwrap(); + } +} diff --git a/src/re_server/interaction/shared.rs b/src/re_server/interaction/shared.rs new file mode 100644 index 0000000..ca7997a --- /dev/null +++ b/src/re_server/interaction/shared.rs @@ -0,0 +1,25 @@ +use std::collections::HashMap; + +use bytes::BytesMut; + +use crate::re_server::types::Tx; + +pub struct Shared { + pub peers: HashMap<String, Tx>, +} +impl Shared { + pub fn new() -> Self { + Shared { + peers: HashMap::new(), + } + } + + pub async fn broadcast(&mut self, message: &[u8]) { + for peer in self.peers.iter_mut() { + peer.1.send(BytesMut::from(message)).unwrap(); + } + } +} +impl Default for Shared { + fn default() -> Self { Self::new() } +} diff --git a/src/re_server/mod.rs b/src/re_server/mod.rs new file mode 100644 index 0000000..df25a4c --- /dev/null +++ b/src/re_server/mod.rs @@ -0,0 +1,9 @@ +pub mod cmd; +mod interaction; +pub mod net; + +pub mod distributor; +pub mod hub; +mod packet_parser; +pub mod server; +mod types; diff --git a/src/re_server/net/constants.rs b/src/re_server/net/constants.rs new file mode 100644 index 0000000..64aa60a --- /dev/null +++ b/src/re_server/net/constants.rs @@ -0,0 +1,90 @@ +pub const VAR_PROTOCOL_VERSION: i32 = 24; +pub const STATECMD: i32 = 2; +pub const MAXCMD: i32 = 255; +pub const CURRENT_ROOM: i32 = 253; +pub const CLIENT: i32 = 1; +pub const CO: i32 = 254; +pub const PO: i32 = 255; +pub const VAR_APPNAME: i32 = 1; +pub const VAR_USERNAME: i32 = 2; +pub const VAR_PROTOCOL: i32 = 3; +pub const VAR_ERROR: i32 = 4; +pub const VAR_CHANNEL: i32 = 5; +pub const VAR_BITMAP: i32 = 5; +pub const VAR_PASSWORD: i32 = 6; +pub const VAR_AVATARS: i32 = 7; +pub const VAR_UPDATETIME: i32 = 8; +pub const VAR_CLIENT: i32 = 9; +pub const VAR_SERIAL: i32 = 10; +pub const VAR_EMAIL: i32 = 11; +pub const VAR_LOGONOFF: i32 = 12; +pub const VAR_DURATION: i32 = 13; +pub const VAR_GUEST: i32 = 14; +pub const VAR_SERVERTYPE: i32 = 15; +pub const VAR_VIZCARD: i32 = 16; +pub const VAR_NEW_PASSWD: i32 = 20; +pub const VAR_PRIV: i32 = 22; +pub const VAR_ASLEEP: i32 = 23; +pub const VAR_EXTERNAL_HTTP_SERVER: i32 = 24; +pub const VAR_SCRIPT_SERVER: i32 = 25; +pub const VAR_SMTP_SERVER: i32 = 26; +pub const VAR_MAIL_DOMAIN: i32 = 27; +pub const VAR_NEW_USERNAME: i32 = 28; +pub const VAR_INTERNAL_HTTP_SERVER: i32 = 29; +pub const VAR_INVENTORY: i32 = 32; +pub const ACK: i32 = 0; +pub const NAK_BAD_USER: i32 = 1; +pub const NAK_MAX_ORDINARY: i32 = 2; +pub const NAK_MAX_PRIORITY: i32 = 3; +pub const NAL_BAD_WORLD: i32 = 4; +pub const NAK_FATAIL: i32 = 5; +pub const NAK_BAD_PROTOCOL: i32 = 6; +pub const NAK_BAD_CLIENTSW: i32 = 7; +pub const NAK_BAD_ROOM: i32 = 8; +pub const NAK_BAD_SERIAL: i32 = 9; +pub const NAK_TAKEN_SERIAL: i32 = 10; +pub const NAK_TAKEN_USER: i32 = 11; +pub const NAK_NO_SUCH_USER: i32 = 12; +pub const NAK_BAD_PASSWORD: i32 = 13; +pub const NAK_BAD_ACCOUNT: i32 = 14; +pub const NAK_NOT_LOGGEDON: i32 = 15; +pub const NAK_BAD_IPADDRESS: i32 = 16; +pub const NAK_LOGGEDON: i32 = 17; +pub const NAK_CRYPT_METHOD: i32 = 18; +pub const NAK_CRYPT_ERROR: i32 = 19; +pub const NAK_SESSIONINIT: i32 = 20; +pub const NAK_ROOM_FULL: i32 = 21; +pub const NAK_SHUTDOWN: i32 = 100; +pub const NAK_WRITE_ERROR: i32 = 101; +pub const NAK_READ_ERROR: i32 = 102; +pub const NAK_UNEXPECTED: i32 = 103; +pub const NAK_CONNECTION: i32 = 104; +pub const NAK_IOSTREAMS: i32 = 105; +pub const NAK_TIMEOUT: i32 = 106; +pub const NAK_UNREACHABLE: i32 = 107; +pub const STATUS_CONNECTED: i32 = 200; +pub const STATUS_DETACHING: i32 = 201; +pub const STATUS_WILLRETRY: i32 = 202; +pub const STATUS_DISCONNECTED: i32 = 203; +pub const STATUS_DEAD: i32 = 204; +pub const STATUS_OFFLINE: i32 = 205; +pub const STATUS_GALAXY_ONLINE: i32 = 206; +pub const STATUS_GALAXY_OFFLINE: i32 = 206; +pub const PROPFLAG_BINARY: i32 = 16; +pub const PROPFLAG_FINGER: i32 = 32; +pub const PROPFLAG_AUTOUPDATE: i32 = 64; +pub const PROPFLAG_DBSTORE: i32 = 128; +pub const PROPACCESS_POSSESS: i32 = 1; +pub const PROPACCESS_PRIVATE: i32 = 2; +pub const SERVER_UNKNOWN: i32 = 0; +pub const USER_SERVER_DB: i32 = 1; +pub const USER_SERVER_ANON: i32 = 2; +pub const ROOM_SERVER_US: i32 = 3; +pub const ROOM_SERVER_ANON: i32 = 4; +pub const PRIV_NONE: i32 = 0; +pub const PRIV_BUILD: i32 = 1; +pub const PRIV_BROADCAST: i32 = 2; +pub const PRIV_PROPERTY: i32 = 4; +pub const PRIV_VIP: i32 = 8; +pub const PRIV_VIP2: i32 = 16; +pub const PRIV_SPECIALGUEST: i32 = 64; diff --git a/src/re_server/net/converter.rs b/src/re_server/net/converter.rs new file mode 100644 index 0000000..36002ee --- /dev/null +++ b/src/re_server/net/converter.rs @@ -0,0 +1,54 @@ +use bytes::{BufMut, BytesMut}; + +use crate::re_server::{ + cmd::constants::PROPUPD, + net::{ + constants::{PROPACCESS_POSSESS, PROPFLAG_DBSTORE}, + structure::NetworkProperty, + }, +}; + +pub fn property_list_to_bytes( + command_id: i32, + obj_id: i32, + mut property_list: Vec<NetworkProperty>, +) -> Vec<u8> { + let mut command = BytesMut::new(); + + // Iterate over all network properties + loop { + // Check if there are any properties left + debug!("props left: {}", property_list.len()); + if property_list.is_empty() { + break; + } + + let property = &property_list[0]; // Property we are currently iterating over + debug!("current prop: {}:{}", property.prop_id, property.value); + + command.put_u8(property.prop_id as u8); // Property ID + + // NOTE: THIS IS SUPER BAD DO NOT DO THIS! But it works! + if command_id == PROPUPD { + command.put_u8(PROPFLAG_DBSTORE as u8); // Flag (s) + command.put_u8(PROPACCESS_POSSESS as u8); // Access + } + + command.put_u8(property.value.len() as u8); // Property UTF-8 Length + command.put_slice(property.value.as_bytes()); // Property UTF-8 + + property_list.reverse(); + property_list.pop(); + property_list.reverse(); + } + + // Convert to vector and insert the header + let mut command_as_vec = command.to_vec(); + + command_as_vec.insert(0, command_id as u8); // Command ID + command_as_vec.insert(0, obj_id as u8); // ObjId + command_as_vec.insert(0, command.len() as u8 + 3); // Data length + + // Return bytes + command_as_vec +} diff --git a/src/re_server/net/mod.rs b/src/re_server/net/mod.rs new file mode 100644 index 0000000..15b0bdb --- /dev/null +++ b/src/re_server/net/mod.rs @@ -0,0 +1,4 @@ +pub mod constants; +pub mod converter; +pub mod property_parser; +pub mod structure; diff --git a/src/re_server/net/property_parser.rs b/src/re_server/net/property_parser.rs new file mode 100644 index 0000000..1ee210f --- /dev/null +++ b/src/re_server/net/property_parser.rs @@ -0,0 +1,35 @@ +use std::str::from_utf8; + +use crate::re_server::net::structure::NetworkProperty; + +/// Iterate over a network property in the form of bytes and return a list of +/// human-readable properties. +pub fn parse_network_property(mut data: Vec<u8>) -> Vec<NetworkProperty> { + let mut property_list = vec![]; + + // Iterate over all network properties + loop { + // Check if any commands are present + if data.len() <= 2 { + break; + } + debug!("iteration: {:?}", data); + // if data[0] == 0 { + // break; + // } + + let property_length = data[1] + 2; + property_list.push(NetworkProperty { + prop_id: data[0] as i32, + value: from_utf8(&data[2..data[1] as usize + 2]) + .unwrap() + .to_string(), + }); + + // Remove current property from the network property + data = data[property_length as usize..].to_vec(); + } + + // Return the human-readable network property + property_list +} diff --git a/src/re_server/net/structure.rs b/src/re_server/net/structure.rs new file mode 100644 index 0000000..b5d74a6 --- /dev/null +++ b/src/re_server/net/structure.rs @@ -0,0 +1,15 @@ +pub struct NetworkProperty { + pub prop_id: i32, + pub value: String, +} +impl NetworkProperty { + pub fn new() -> Self { NetworkProperty::default() } +} +impl Default for NetworkProperty { + fn default() -> Self { + NetworkProperty { + prop_id: 0, + value: "".to_string(), + } + } +} diff --git a/src/re_server/packet_parser.rs b/src/re_server/packet_parser.rs new file mode 100644 index 0000000..c53610b --- /dev/null +++ b/src/re_server/packet_parser.rs @@ -0,0 +1,35 @@ +use bytes::BytesMut; + +/// Read all commands from the given buffer. +/// +/// # Process +/// 1. Get a command from `buffer` based on first byte. +/// 2. Push command to `commands`. +/// 3. Remove command from `buffer`. +/// 4. Iterate and do this for all commands within `buffer`. +pub fn parse_commands_from_packet(mut buffer: BytesMut) -> Vec<BytesMut> { + let mut commands: Vec<BytesMut> = Vec::new(); + debug!("initial buffer: {:?}, length: {}", buffer, buffer.len()); + + let data_length = buffer.get(0).unwrap().to_owned() as usize; + if buffer.len() > data_length { + loop { + debug!("loop: {:?}, length: {}", buffer, buffer.len()); + let command_length = buffer.get(0).unwrap().to_owned() as usize; + commands.push(BytesMut::from(buffer.get(0..command_length).unwrap())); + + // Remove command from buffer + buffer = buffer.split_off(command_length); + + // Check if any more commands are present + if buffer.is_empty() { + break; + } + } + } else { + // There will always be at least one command, push it. + commands.push(BytesMut::from(buffer.get(0..data_length).unwrap())); + } + + commands // Return command (s) +} diff --git a/src/re_server/server.rs b/src/re_server/server.rs new file mode 100644 index 0000000..f86b956 --- /dev/null +++ b/src/re_server/server.rs @@ -0,0 +1,57 @@ +use std::{error::Error, fmt, net::SocketAddr, sync::Arc}; + +use tokio::{ + net::{TcpListener, TcpStream}, + sync::Mutex, +}; + +use crate::re_server::interaction::shared::Shared; + +#[derive(Debug)] +pub enum ServerType { + AnonRoomServer, + AnonUserServer, + AutoServer, + RoomServer, + UserServer, +} +// https://stackoverflow.com/a/32712140/14452787 +impl fmt::Display for ServerType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{:?}", self) } +} + +#[async_trait::async_trait] +pub trait Server { + async fn listen(address: &str, server_type: ServerType) -> Result<(), Box<dyn Error>> { + let listener = TcpListener::bind(address).await?; + let state = Arc::new(Mutex::new(Shared::new())); + let mut counter = 0; + + info!( + "server of type {} now listening at {}", + server_type.to_string(), + address + ); + + loop { + let (stream, address) = listener.accept().await?; + counter += 1; + let state = Arc::clone(&state); + + trace!("accepted client at {}", address); + + tokio::spawn(async move { + if let Err(e) = Self::handle(state, stream, address, counter).await { + error!("an error occurred: {}", e); + } + }); + } + } + + async fn handle( + state: Arc<Mutex<Shared>>, + stream: TcpStream, + _address: SocketAddr, + count: usize, + ) -> Result<(), Box<dyn Error>>; +} diff --git a/src/re_server/types.rs b/src/re_server/types.rs new file mode 100644 index 0000000..20dd176 --- /dev/null +++ b/src/re_server/types.rs @@ -0,0 +1,5 @@ +use bytes::BytesMut; +use tokio::sync::mpsc; + +pub type Tx = mpsc::UnboundedSender<BytesMut>; +pub type Rx = mpsc::UnboundedReceiver<BytesMut>; diff --git a/src/server/auto/____server.rs b/src/server/auto/____server.rs new file mode 100644 index 0000000..586bc16 --- /dev/null +++ b/src/server/auto/____server.rs @@ -0,0 +1,384 @@ +use std::error::Error; +use crate::server::auto::cmd::property::{ + create_property_update_command, + create_property_request_command +}; +use crate::server::cmd::text::{create_text_command_with_action, create_text_command}; +use std::str::from_utf8; +use crate::server::cmd::buddy_list::create_buddy_list_notify_command; +use crate::server::auto::cmd::room::create_room_id_redirect_command; +use crate::server::auto::cmd::session::parse_session_initialization_command; +use crate::server::parser::get_commands_from_buffer; +use crate::server::cmd::property::parse_property_set_command; +use crate::config::get_config; +use mio::{Poll, Events, Token, Interest, Registry}; +use mio::net::{TcpListener, TcpStream}; +use std::collections::{HashMap, HashSet}; +use mio::event::Event; +use std::io::{Read, ErrorKind, Write}; +use bytes::BytesMut; + +const SERVER: Token = Token(0); + +pub struct AutoServer { + pub clients: HashMap<Token, String>, + pub connections: HashMap<Token, TcpStream>, + pub room_ids: Vec<String>, +} +impl AutoServer { + pub fn listen(&mut self, addr: &str) -> Result<(), Box<dyn Error>> { + let mut listener = TcpListener::bind(addr.parse().unwrap())?; + let mut poll = Poll::new()?; + let mut events = Events::with_capacity(1024); + let mut counter: usize = 0; + // let mut sockets = HashMap::new(); + let mut requests = HashMap::new(); + let mut buffer = [0 as u8; 1024]; + // let mut room_ids = vec![]; + + poll.registry().register( + &mut listener, + Token(0), + Interest::READABLE, + )?; + + debug!("AutoServer now listening on {}", listener.local_addr().unwrap()); + + loop { + poll.poll(&mut events, None)?; + + for event in &events { + match event.token() { + Token(0) => loop { + match listener.accept() { + Ok((mut stream, address)) => { + counter += 1; + let token = Token(counter); + + poll.registry().register( + &mut stream, + token, + Interest::READABLE, + )?; + + debug!("registered peer with address '{}' as '{}'", address, token.0); + + // sockets.insert(token, stream); + self.connections.insert(token, stream); + requests.insert(token, Vec::with_capacity(192)); + } + Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + Err(err) => { + error!("unexpected error: {}", err); + poll.registry().deregister( + self.connections.get_mut(&Token(counter)).unwrap(), + )?; + break; + } + } + }, + token if event.is_readable() => { + loop { + let read = self.connections.get_mut(&token).unwrap() + .read(&mut buffer); + match read { + Ok(0) => { self.connections.remove(&token); break; } + Ok(n) => { + let req = requests.get_mut(&token).unwrap(); + for b in &buffer[0..n] { req.push(*b); } + + for cmd in get_commands_from_buffer(BytesMut::from(&buffer[..n])) { + match cmd.get(2).unwrap() { + 10 => { // PROPREQ + debug!("received property request command from client 'null'"); + self.connections.get_mut(&token).unwrap() + .write_all(&create_property_update_command()).unwrap(); + debug!("sent property update command to client 'null'"); + } + 6 => { // SESSINIT + let local_username = + parse_session_initialization_command(cmd).username; + self.clients.insert(token, local_username.clone()); + debug!( + "received session initialization command from client '{}'", + local_username, + ); + self.connections.get_mut(&token).unwrap() + .write_all(&create_property_request_command()).unwrap(); + debug!("sent session initialization command to client '{}'", local_username); + } + 15 => { // PROPSET + let avatar = parse_property_set_command(cmd); + debug!( + "received property set command from client '{}': {}", + self.clients.get(&token).unwrap(), + avatar, + ); + self.connections.get_mut(&token).unwrap() + .write_all(&create_text_command_with_action( + "WORLDSMASTER", &get_config().unwrap().worldsmaster_greeting, + )).unwrap(); + debug!( + "sent session initialization command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + 29 => { // BUDDYLISTUPDATE + let received_buddy = from_utf8( + cmd.get(4..cmd.get(0).unwrap().to_owned() as usize - 1).unwrap() + ).unwrap(); + debug!( + "received buddy list update command from client '{}': {}", + self.clients.get(&token).unwrap(), + received_buddy, + ); + self.connections.get_mut(&token).unwrap() + .write_all(&create_buddy_list_notify_command(received_buddy)).unwrap(); + debug!( + "sent buddy list notify command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + 20 => { // ROOMIDRQ + let room_name = from_utf8( + cmd.get(4..cmd.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!( + "received room id request command from client '{}': {}", + self.clients.get(&token).unwrap(), + room_name, + ); + let room_id; + if !self.room_ids.contains(&room_name.to_string()) { + self.room_ids.push(room_name.to_string()); + room_id = self.room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("inserted room '{}' as '{}'", room_name, room_id); + } else { + let position = self.room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("found room '{}' as '{}'", room_name, position); + room_id = position; + } + trace!("room name: {}, room id: {}", room_name, room_id); + trace!("{:?}", self.room_ids); + self.connections.get_mut(&token).unwrap() + .write_all(&create_room_id_redirect_command( + room_name, room_id, + )).unwrap(); + } + 14 => { // TEXT + let text = from_utf8( + cmd.get(6..cmd.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + let username = self.clients.get(&token).unwrap().clone(); + debug!( + "received text command from client '{}': {}", + username, text, + ); + self.connections.iter_mut().for_each(|t| + t.1.write_all(&create_text_command( + &username, + text, + )).unwrap() + ); + debug!("broadcasted text command to clients"); + } + 7 => { // SESSEXIT + debug!( + "received session exit command from client '{}'", + self.clients.get(&token).unwrap(), + ); + } + _ => (), + } + } + } + Err(ref err) if err.kind() == ErrorKind::WouldBlock => + break, + Err(err) => { error!("unexpected error: {}", err); break; } + } + } + } + _ => (), + } + } + } + } + + fn broadcast( + sockets: &HashMap<Token, TcpStream>, + cmd: &[u8], + ) -> () { + for mut socket in sockets { + socket.1.write_all(cmd).unwrap(); + } + } + + // fn process( + // &mut self, + // _registry: &Registry, + // event: &Event, + // token: Token, + // ) -> Result<bool, Box<dyn Error>> { + // if event.is_readable() { + // let mut connection_closed = false; + // let mut received_data = vec![0; 4096]; + // let mut bytes_read = 0; + // + // let stream = self.connections.get_mut(&token).unwrap(); + // + // loop { + // match stream.read(&mut received_data[bytes_read..]) { + // Ok(0) => { + // connection_closed = true; + // break; + // } + // Ok(n) => { + // bytes_read += n; + // if bytes_read == received_data.len() { + // received_data.resize(received_data.len() + 1024, 0); + // } + // } + // Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + // Err(ref err) if err.kind() == ErrorKind::Interrupted => continue, + // Err(err) => return Err(Box::new(err)), + // } + // } + // + // if bytes_read != 0 { + // self.handle( + // &mut received_data[..bytes_read], + // token, + // ); + // } + // if connection_closed { + // println!("de-registered peer with token '{}'", token.0); + // return Ok(true); + // } + // } + // + // Ok(false) + // } + + // fn handle( + // &mut self, + // data: &[u8], + // // stream: &mut TcpStream, + // token: Token, + // ) -> () { + // // trace!("i am client: {:?}", self.clients.get(&token)); + // // debug!("{:?}", self.connections); + // for cmd in get_commands_from_buffer(BytesMut::from(data)) { + // debug!("received: {:?}", cmd); + // match cmd.get(2).unwrap() { + // 10 => { // PROPREQ + // debug!("received property request command from client 'null'"); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_property_update_command()).unwrap(); + // debug!("sent property update command to client 'null'"); + // } + // 6 => { // SESSINIT + // let local_username = + // parse_session_initialization_command(cmd).username; + // self.clients.insert(token, local_username.clone()); + // debug!( + // "received session initialization command from client '{}'", + // local_username, + // ); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_property_request_command()).unwrap(); + // debug!("sent session initialization command to client '{}'", local_username); + // } + // 15 => { // PROPSET + // let avatar = parse_property_set_command(cmd); + // debug!( + // "received property set command from client '{}': {}", + // self.clients.get(&token).unwrap(), + // avatar, + // ); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_text_command_with_action( + // "WORLDSMASTER", &get_config().unwrap().worldsmaster_greeting, + // )).unwrap(); + // debug!( + // "sent session initialization command to client '{}'", + // self.clients.get(&token).unwrap(), + // ); + // } + // 29 => { // BUDDYLISTUPDATE + // let received_buddy = from_utf8( + // cmd.get(4..cmd.get(0).unwrap().to_owned() as usize - 1).unwrap() + // ).unwrap(); + // debug!( + // "received buddy list update command from client '{}': {}", + // self.clients.get(&token).unwrap(), + // received_buddy, + // ); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_buddy_list_notify_command(received_buddy)).unwrap(); + // debug!( + // "sent buddy list notify command to client '{}'", + // self.clients.get(&token).unwrap(), + // ); + // } + // 20 => { // ROOMIDRQ + // let room_name = from_utf8( + // cmd.get(4..cmd.get(0).unwrap().to_owned() as usize).unwrap() + // ).unwrap(); + // debug!( + // "received room id request command from client '{}': {}", + // self.clients.get(&token).unwrap(), + // room_name, + // ); + // let room_id; + // if !self.room_ids.contains(&room_name.to_string()) { + // self.room_ids.push(room_name.to_string()); + // room_id = self.room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("inserted room '{}' as '{}'", room_name, room_id); + // } else { + // let position = self.room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("found room '{}' as '{}'", room_name, position); + // room_id = position; + // } + // trace!("room name: {}, room id: {}", room_name, room_id); + // trace!("{:?}", self.room_ids); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_room_id_redirect_command( + // room_name, room_id, + // )).unwrap(); + // } + // 14 => { // TEXT + // let text = from_utf8( + // cmd.get(6..cmd.get(0).unwrap().to_owned() as usize).unwrap() + // ).unwrap(); + // let username = self.clients.get(&token).unwrap().clone(); + // debug!( + // "received text command from client '{}': {}", + // username, text, + // ); + // self.connections.iter_mut().for_each(|t| + // t.1.write_all(&create_text_command( + // &username, + // text, + // )).unwrap() + // ); + // debug!("broadcasted text command to clients"); + // } + // 7 => { // SESSEXIT + // debug!( + // "received session exit command from client '{}'", + // self.clients.get(&token).unwrap(), + // ); + // } + // _ => (), + // } + // } + // } +} diff --git a/src/server/auto/___server.rs b/src/server/auto/___server.rs new file mode 100644 index 0000000..2f95952 --- /dev/null +++ b/src/server/auto/___server.rs @@ -0,0 +1,266 @@ +use std::error::Error; +use crate::server::auto::cmd::property::{ + create_property_update_command, + create_property_request_command +}; +use crate::server::cmd::text::{create_text_command_with_action, create_text_command}; +use std::str::from_utf8; +use crate::server::cmd::buddy_list::create_buddy_list_notify_command; +use crate::server::auto::cmd::room::create_room_id_redirect_command; +use crate::server::auto::cmd::session::parse_session_initialization_command; +use crate::server::parser::get_commands_from_buffer; +use crate::server::cmd::property::parse_property_set_command; +use crate::config::get_config; +use mio::{Poll, Events, Token, Interest, Registry}; +use mio::net::{TcpListener, TcpStream}; +use std::collections::HashMap; +use mio::event::Event; +use std::io::{Read, ErrorKind, Write}; +use bytes::BytesMut; + +const SERVER: Token = Token(0); + +pub struct AutoServer { + pub clients: HashMap<Token, String>, + pub connections: HashMap<Token, TcpStream>, + pub room_ids: Vec<String>, +} +impl AutoServer { + pub fn listen(&mut self, addr: &str) -> Result<(), Box<dyn Error>> { + let mut server = TcpListener::bind(addr.parse().unwrap())?; + let mut poll = Poll::new()?; + let mut events = Events::with_capacity(1024); + let mut unique_token = Token(SERVER.0 + 1); + + poll.registry().register( + &mut server, + SERVER, + Interest::READABLE + )?; + + debug!("AutoServer now listening on {}", server.local_addr().unwrap()); + + loop { + poll.poll(&mut events, None)?; + + for event in &events { + match event.token() { + SERVER => loop { + let (mut stream, address) = match server.accept() { + Ok((stream, address)) => (stream, address), + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break, + Err(e) => return Err(Box::new(e)), + }; + + let token = AutoServer::next(&mut unique_token); + poll.registry().register( + &mut stream, + token, + Interest::READABLE, //.add(Interest::WRITABLE), + )?; + + self.connections.insert(token, stream); + + debug!("registered peer with address '{}' as '{}'", address, token.0); + }, + token => { + // let done = if let Some(stream) = self.connections.get_mut(&token) { + // self.process( + // poll.registry(), + // event, + // token, + // )? + // } else { + // false + // }; + + let done = self.process( + poll.registry(), + event, + token, + )?; + if done { self.connections.remove(&token); } + } + } + } + } + } + + fn next(current: &mut Token) -> Token { + let next = current.0; + current.0 += 1; + Token(next) + } + + fn broadcast(mut self, cmd: &[u8]) -> () { + self.connections.iter_mut() + .for_each(|c| + c.1.write_all(cmd).unwrap() + ); + } + + fn process( + &mut self, + _registry: &Registry, + event: &Event, + token: Token, + ) -> Result<bool, Box<dyn Error>> { + if event.is_readable() { + let mut connection_closed = false; + let mut received_data = vec![0; 4096]; + let mut bytes_read = 0; + + let stream = self.connections.get_mut(&token).unwrap(); + + loop { + match stream.read(&mut received_data[bytes_read..]) { + Ok(0) => { + connection_closed = true; + break; + } + Ok(n) => { + bytes_read += n; + if bytes_read == received_data.len() { + received_data.resize(received_data.len() + 1024, 0); + } + } + Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + Err(ref err) if err.kind() == ErrorKind::Interrupted => continue, + Err(err) => return Err(Box::new(err)), + } + } + + if bytes_read != 0 { + self.handle( + &mut received_data[..bytes_read], + token, + ); + } + if connection_closed { + println!("de-registered peer with token '{}'", token.0); + return Ok(true); + } + } + + Ok(false) + } + + fn handle( + &mut self, + data: &[u8], + // stream: &mut TcpStream, + token: Token, + ) -> () { + // trace!("i am client: {:?}", self.clients.get(&token)); + // debug!("{:?}", self.connections); + for cmd in get_commands_from_buffer(BytesMut::from(data)) { + debug!("received: {:?}", cmd); + match cmd.get(2).unwrap() { + 10 => { // PROPREQ + debug!("received property request command from client 'null'"); + self.connections.get_mut(&token).unwrap() + .write_all(&create_property_update_command()).unwrap(); + debug!("sent property update command to client 'null'"); + } + 6 => { // SESSINIT + let local_username = + parse_session_initialization_command(cmd).username; + self.clients.insert(token, local_username.clone()); + debug!( + "received session initialization command from client '{}'", + local_username, + ); + self.connections.get_mut(&token).unwrap() + .write_all(&create_property_request_command()).unwrap(); + debug!("sent session initialization command to client '{}'", local_username); + } + 15 => { // PROPSET + let avatar = parse_property_set_command(cmd); + debug!( + "received property set command from client '{}': {}", + self.clients.get(&token).unwrap(), + avatar, + ); + self.connections.get_mut(&token).unwrap() + .write_all(&create_text_command_with_action( + "WORLDSMASTER", &get_config().unwrap().worldsmaster_greeting, + )).unwrap(); + debug!( + "sent session initialization command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + 29 => { // BUDDYLISTUPDATE + let received_buddy = from_utf8( + cmd.get(4..cmd.get(0).unwrap().to_owned() as usize - 1).unwrap() + ).unwrap(); + debug!( + "received buddy list update command from client '{}': {}", + self.clients.get(&token).unwrap(), + received_buddy, + ); + self.connections.get_mut(&token).unwrap() + .write_all(&create_buddy_list_notify_command(received_buddy)).unwrap(); + debug!( + "sent buddy list notify command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + 20 => { // ROOMIDRQ + let room_name = from_utf8( + cmd.get(4..cmd.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!( + "received room id request command from client '{}': {}", + self.clients.get(&token).unwrap(), + room_name, + ); + let room_id; + if !self.room_ids.contains(&room_name.to_string()) { + self.room_ids.push(room_name.to_string()); + room_id = self.room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("inserted room '{}' as '{}'", room_name, room_id); + } else { + let position = self.room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("found room '{}' as '{}'", room_name, position); + room_id = position; + } + trace!("room name: {}, room id: {}", room_name, room_id); + trace!("{:?}", self.room_ids); + self.connections.get_mut(&token).unwrap() + .write_all(&create_room_id_redirect_command( + room_name, room_id, + )).unwrap(); + } + 14 => { // TEXT + let text = from_utf8( + cmd.get(6..cmd.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + let username = self.clients.get(&token).unwrap().clone(); + debug!( + "received text command from client '{}': {}", + username, text, + ); + self.connections.iter_mut().for_each(|t| + t.1.write_all(&create_text_command( + &username, + text, + )).unwrap() + ); + debug!("broadcasted text command to clients"); + } + 7 => { // SESSEXIT + debug!( + "received session exit command from client '{}'", + self.clients.get(&token).unwrap(), + ); + } + _ => (), + } + } + } +} diff --git a/src/server/auto/__server.rs b/src/server/auto/__server.rs new file mode 100644 index 0000000..b72b7e7 --- /dev/null +++ b/src/server/auto/__server.rs @@ -0,0 +1,230 @@ +use std::error::Error; +use crate::server::auto::cmd::property::{ + create_property_update_command, + create_property_request_command +}; +use crate::server::cmd::text::{create_text_command_with_action, create_text_command}; +use std::str::from_utf8; +use crate::server::cmd::buddy_list::create_buddy_list_notify_command; +use crate::server::auto::cmd::room::create_room_id_redirect_command; +use crate::server::auto::cmd::session::parse_session_initialization_command; +use crate::server::parser::get_commands_from_buffer; +use crate::server::cmd::property::parse_property_set_command; +use crate::config::get_config; +use mio::{Poll, Events, Token, Interest, Registry}; +use mio::net::{TcpListener, TcpStream}; +use std::collections::HashMap; +use mio::event::Event; +use std::io::{Read, ErrorKind, Write}; +use bytes::BytesMut; + +const SERVER: Token = Token(0); + +pub struct AutoServer { + pub clients: HashMap<Token, String>, + pub connections: HashMap<Token, TcpStream>, +} +impl AutoServer { + pub fn listen(&mut self, addr: &str) -> Result<(), Box<dyn Error>> { + let mut server = TcpListener::bind(addr.parse().unwrap())?; + let mut poll = Poll::new()?; + let mut events = Events::with_capacity(1024); + let mut connections = HashMap::new(); + let mut unique_token = Token(SERVER.0 + 1); + + poll.registry().register( + &mut server, + SERVER, + Interest::READABLE + )?; + + debug!("AutoServer now listening on {}", server.local_addr().unwrap()); + + loop { + poll.poll(&mut events, None)?; + + for event in events.iter() { + match event.token() { + SERVER => loop { + let (mut stream, address) = match server.accept() { + Ok((stream, address)) => (stream, address), + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break, + Err(e) => return Err(Box::new(e)), + }; + + let token = AutoServer::next(&mut unique_token); + poll.registry().register( + &mut stream, + token, + Interest::READABLE, //.add(Interest::WRITABLE), + )?; + + connections.insert(token, stream); + + println!("registered peer with address '{}' as '{}'", address, token.0); + }, + token => { + let done = if let Some(stream) = connections.get_mut(&token) { + self.process( + poll.registry(), + stream, + event, + token, + )? + } else { + false + }; + if done { connections.remove(&token); } + } + } + } + } + } + + fn next(current: &mut Token) -> Token { + let next = current.0; + current.0 += 1; + Token(next) + } + + fn broadcast(self, cmd: &[u8]) -> () { + for mut connection in self.connections { + connection.1.write(cmd).unwrap(); + } + } + + fn process( + &mut self, + _registry: &Registry, + stream: &mut TcpStream, + event: &Event, + token: Token, + ) -> Result<bool, Box<dyn Error>> { + if event.is_readable() { + let mut connection_closed = false; + let mut received_data = vec![0; 4096]; + let mut bytes_read = 0; + + loop { + match stream.read(&mut received_data[bytes_read..]) { + Ok(0) => { + connection_closed = true; + break; + } + Ok(n) => { + bytes_read += n; + if bytes_read == received_data.len() { + received_data.resize(received_data.len() + 1024, 0); + } + } + Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + Err(ref err) if err.kind() == ErrorKind::Interrupted => continue, + Err(err) => return Err(Box::new(err)), + } + } + + if bytes_read != 0 { + self.handle( + &mut received_data[..bytes_read], + stream, + token, + ); + } + if connection_closed { + debug!("connection closed"); + return Ok(true); + } + } + + Ok(false) + } + + fn handle( + &mut self, + data: &[u8], + stream: &mut TcpStream, + token: Token, + ) -> () { + trace!("i am client: {:?}", self.clients.get(&token)); + // let local_client = self.clients.get(&token) + // .unwrap_or(&"null".to_string()); + for cmd in get_commands_from_buffer(BytesMut::from(data)) { + debug!("received: {:?}", cmd); + match cmd.get(2).unwrap() { + 10 => { // PROPREQ + debug!("received property request command from client 'null'"); + stream.write_all(&create_property_update_command()).unwrap(); + debug!("sent property update command to client 'null'"); + } + 6 => { // SESSINIT + let username = + parse_session_initialization_command(cmd).username; + self.clients.insert(token, username.clone()); + debug!( + "received session initialization command from client '{}'", + username, + ); + stream.write_all(&create_property_request_command()).unwrap(); + debug!("sent session initialization command to client '{}'", username); + } + 15 => { // PROPSET + let avatar = parse_property_set_command(cmd); + debug!( + "received property set command from client '{}': {}", + self.clients.get(&token).unwrap(), + avatar + ); + stream.write_all(&create_text_command_with_action( + "WORLDSMASTER", &get_config().unwrap().worldsmaster_greeting, + )).unwrap(); + debug!( + "sent session initialization command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + 29 => { // BUDDYLISTUPDATE + let received_buddy = from_utf8( + cmd.get(4..cmd.get(0).unwrap().to_owned() as usize - 1).unwrap() + ).unwrap(); + debug!( + "received buddy list update command from client '{}': {}", + self.clients.get(&token).unwrap(), + received_buddy + ); + stream.write_all(&create_buddy_list_notify_command(received_buddy)).unwrap(); + debug!( + "sent buddy list notify command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + 20 => { // ROOMIDRQ + let room_name = from_utf8( + cmd.get(4..cmd.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!( + "received room id request command from client '{}': {}", + self.clients.get(&token).unwrap(), room_name, + ); + } + 14 => { // TEXT + let text = from_utf8( + cmd.get(6..cmd.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!( + "received text command from client '{}': {}", + self.clients.get(&token).unwrap(), + text, + ); + debug!("broadcasted text command to clients"); + } + 7 => { // SESSEXIT + debug!( + "received session exit command from client '{}'", + self.clients.get(&token).unwrap(), + ); + } + _ => (), + } + } + } +} diff --git a/src/server/auto/_server.rs b/src/server/auto/_server.rs new file mode 100644 index 0000000..8b7fa6c --- /dev/null +++ b/src/server/auto/_server.rs @@ -0,0 +1,182 @@ +use std::error::Error; +use tokio::net::{TcpListener, TcpStream}; +use tokio::io::AsyncWriteExt; +use crate::server::auto::cmd::property::{ + create_property_update_command, + create_property_request_command +}; +use tokio_util::codec::{BytesCodec, Decoder}; +use tokio_stream::StreamExt; +use crate::server::cmd::text::{create_text_command_with_action, create_text_command}; +use std::str::from_utf8; +use crate::server::cmd::buddy_list::create_buddy_list_notify_command; +use crate::server::auto::cmd::room::create_room_id_redirect_command; +use std::sync::Arc; +use tokio::sync::Mutex; +use crate::server::shared::Shared; +use crate::server::peer::Peer; +use std::net::SocketAddr; +use crate::server::auto::cmd::session::parse_session_initialization_command; +use crate::server::parser::get_commands_from_buffer; +use crate::server::cmd::property::parse_property_set_command; +use crate::config::get_config; +use crate::server::cmd::action::create_action_command; + +pub struct AutoServer; +impl AutoServer { + pub async fn listen(addr: &str) -> Result<(), Box<dyn Error>> { + let listener = TcpListener::bind(addr).await?; + debug!("AutoServer now listening on {}", listener.local_addr().unwrap()); + let state = Arc::new(Mutex::new(Shared::new())); + let mut counter = 0; + + loop { + let (stream, address) = listener.accept().await?; + counter += 1; + let state = Arc::clone(&state); + + tokio::spawn(async move { + if let Err(e) = AutoServer::handle( + state, + stream, + address, + counter + ).await { + error!("an error occurred: {}", e); + } + }); + } + } + + pub async fn handle( + state: Arc<Mutex<Shared>>, + stream: TcpStream, + address: SocketAddr, + count: usize, + ) -> Result<(), Box<dyn Error>> { + let bytes = BytesCodec::new().framed(stream); + let mut peer = Peer::new(state.clone(), bytes, count.to_string()).await?; + debug!("registered peer with address '{}' as '{}'", address, count); + let mut room_ids: Vec<String> = Vec::new(); + let mut username: String = String::new(); + + loop { + tokio::select! { + Some(msg) = peer.rx.recv() => { + // debug!("received bytes from peer: {:?}", &msg); + peer.bytes.get_mut().write_all(&msg).await?; + } + result = peer.bytes.next() => match result { + Some(Ok(msg)) => { + // let msg: BytesMut = msg; + for msg in get_commands_from_buffer(msg) { + match msg.get(2).unwrap() { + 10 => { // PROPREQ + debug!("received property request command from client"); + peer.bytes.get_mut() + .write_all(&create_property_update_command()).await?; + debug!("sent property update command to client"); + } + 6 => { // SESSINIT + username = parse_session_initialization_command(msg.clone()).username; + debug!( + "received session initialization command from client: {}", + username + ); + peer.bytes.get_mut() + .write_all(&create_property_request_command()).await?; + debug!("sent session initialization command to client"); + } + 15 => { // PROPSET + let avatar = parse_property_set_command(msg.clone()); + debug!("received property set command from client: {}", avatar); + peer.bytes.get_mut() + .write_all(&create_text_command( + "WORLDSMASTER", &get_config()?.worldsmaster_greeting + )).await?; + peer.bytes.get_mut() + .write_all(&create_action_command()).await?; + debug!("sent worldsmaster greeting to client"); + } + 29 => { // BUDDYLISTUPDATE + let received_buddy = from_utf8( + msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap() + ).unwrap(); + debug!( + "received buddy list update command from client: {}", + received_buddy + ); + let buddies = vec![ + "dosfox", + "Fallen_Angel", + "Internet_Man", + "Nexialist", + "SirGemini", + "SirGrandpa", + "Wirlaburla", + ]; + if buddies.contains(&received_buddy) { + peer.bytes.get_mut() + .write_all(&create_buddy_list_notify_command(received_buddy)) + .await?; + debug!("sent buddy list notify command to client: {}", received_buddy); + } + } + 20 => { // ROOMIDRQ + let room_name = from_utf8( + msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!("received room id request command from client: {}", room_name); + let room_id; + if !room_ids.contains(&room_name.to_string()) { + room_ids.push(room_name.to_string()); + room_id = room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("inserted room '{}' as '{}'", room_name, room_id); + } else { + let position = room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("found room '{}' as '{}'", room_name, position); + room_id = position; + } + trace!("room name: {}, room id: {}", room_name, room_id); + trace!("{:?}", room_ids); + peer.bytes.get_mut() + .write_all(&create_room_id_redirect_command(room_name, room_id)) + .await?; + debug!("sent redirect id command to client: {} == {}", room_name, room_id); + } + 14 => { + let text = from_utf8( + msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!("received text command from client: {}", text); + let mut state = state.lock().await; + state.broadcast(&create_text_command(&username, text)).await; + debug!("broadcasted text command from client"); + } + 7 => { // SESSEXIT + debug!("received session exit command from client") + } + _ => (), + } + } + } + Some(Err(e)) => { + error!("error while processing messages: {}", e); break; + } + None => break, + } + } + } + + { // De-register client + state.lock().await.peers.remove(&count.to_string()); + debug!("removed peer: {}", count) + } + + Ok(()) + } +} diff --git a/src/server/auto/cmd/property.rs b/src/server/auto/cmd/property.rs index b8d0085..e266757 100644 --- a/src/server/auto/cmd/property.rs +++ b/src/server/auto/cmd/property.rs @@ -1,37 +1,24 @@ pub fn create_property_update_command() -> [u8; 147] { - [ - 0x93, 0xFF, 0x10, 0x1B, 0x80, 0x01, 0x0C, 0x77, 0x6F, - 0x72, 0x6C, 0x64, 0x73, 0x33, 0x64, 0x2E, 0x63, 0x6F, - 0x6D, 0x1A, 0x80, 0x01, 0x12, 0x6D, 0x61, 0x69, 0x6C, - 0x2E, 0x77, 0x6F, 0x72, 0x6C, 0x64, 0x73, 0x2E, 0x6E, - 0x65, 0x74, 0x3A, 0x32, 0x35, 0x19, 0x80, 0x01, 0x28, - 0x68, 0x74, 0x74, 0x70, 0x3A, 0x2F, 0x2F, 0x77, 0x77, - 0x77, 0x2D, 0x64, 0x79, 0x6E, 0x61, 0x6D, 0x69, 0x63, - 0x2E, 0x75, 0x73, 0x2E, 0x77, 0x6F, 0x72, 0x6C, 0x64, - 0x73, 0x2E, 0x6E, 0x65, 0x74, 0x2F, 0x63, 0x67, 0x69, - 0x2D, 0x62, 0x69, 0x6E, 0x18, 0x80, 0x01, 0x1F, 0x68, - 0x74, 0x74, 0x70, 0x3A, 0x2F, 0x2F, 0x77, 0x77, 0x77, - 0x2D, 0x73, 0x74, 0x61, 0x74, 0x69, 0x63, 0x2E, 0x75, - 0x73, 0x2E, 0x77, 0x6F, 0x72, 0x6C, 0x64, 0x73, 0x2E, - 0x6E, 0x65, 0x74, 0x0F, 0x80, 0x01, 0x01, 0x31, 0x03, - 0x80, 0x01, 0x02, 0x32, 0x34, 0x01, 0x80, 0x01, 0x0C, - 0x57, 0x4F, 0x52, 0x4C, 0x44, 0x53, 0x4D, 0x41, 0x53, - 0x54, 0x45, 0x52 - ]: [u8; 147] + [ + 0x93, 0xFF, 0x10, 0x1B, 0x80, 0x01, 0x0C, 0x77, 0x6F, 0x72, 0x6C, 0x64, 0x73, 0x33, 0x64, 0x2E, + 0x63, 0x6F, 0x6D, 0x1A, 0x80, 0x01, 0x12, 0x6D, 0x61, 0x69, 0x6C, 0x2E, 0x77, 0x6F, 0x72, 0x6C, + 0x64, 0x73, 0x2E, 0x6E, 0x65, 0x74, 0x3A, 0x32, 0x35, 0x19, 0x80, 0x01, 0x28, 0x68, 0x74, 0x74, + 0x70, 0x3A, 0x2F, 0x2F, 0x77, 0x77, 0x77, 0x2D, 0x64, 0x79, 0x6E, 0x61, 0x6D, 0x69, 0x63, 0x2E, + 0x75, 0x73, 0x2E, 0x77, 0x6F, 0x72, 0x6C, 0x64, 0x73, 0x2E, 0x6E, 0x65, 0x74, 0x2F, 0x63, 0x67, + 0x69, 0x2D, 0x62, 0x69, 0x6E, 0x18, 0x80, 0x01, 0x1F, 0x68, 0x74, 0x74, 0x70, 0x3A, 0x2F, 0x2F, + 0x77, 0x77, 0x77, 0x2D, 0x73, 0x74, 0x61, 0x74, 0x69, 0x63, 0x2E, 0x75, 0x73, 0x2E, 0x77, 0x6F, + 0x72, 0x6C, 0x64, 0x73, 0x2E, 0x6E, 0x65, 0x74, 0x0F, 0x80, 0x01, 0x01, 0x31, 0x03, 0x80, 0x01, + 0x02, 0x32, 0x34, 0x01, 0x80, 0x01, 0x0C, 0x57, 0x4F, 0x52, 0x4C, 0x44, 0x53, 0x4D, 0x41, 0x53, + 0x54, 0x45, 0x52, + ]: [u8; 147] } pub fn create_property_request_command() -> [u8; 61] { - [ - 0x3D, 0x01, 0x06, 0x04, 0x01, 0x30, 0x01, 0x0C, - 0x57, 0x4F, 0x52, 0x4C, 0x44, 0x53, 0x4D, 0x41, - 0x53, 0x54, 0x45, 0x52, 0x03, 0x02, 0x32, 0x34, - 0x0F, 0x01, 0x31, 0x0A, 0x10, - - // VAR_SERIAL - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - - 0x16, 0x01, 0x30, 0x05, 0x0B, 0x64, 0x69, 0x6D, - 0x65, 0x6E, 0x73, 0x69, 0x6F, 0x6E, 0x2D, 0x31 - ]: [u8; 61] + [ + 0x3D, 0x01, 0x06, 0x04, 0x01, 0x30, 0x01, 0x0C, 0x57, 0x4F, 0x52, 0x4C, 0x44, 0x53, 0x4D, 0x41, + 0x53, 0x54, 0x45, 0x52, 0x03, 0x02, 0x32, 0x34, 0x0F, 0x01, 0x31, 0x0A, 0x10, + // VAR_SERIAL: DWLV000000000000 + 0x44, 0x57, 0x4c, 0x56, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x16, 0x01, 0x30, 0x05, 0x0B, 0x64, 0x69, 0x6D, 0x65, 0x6E, 0x73, 0x69, 0x6F, 0x6E, 0x2D, 0x31, + ]: [u8; 61] } diff --git a/src/server/auto/cmd/room.rs b/src/server/auto/cmd/room.rs index 16c27b8..79786ef 100644 --- a/src/server/auto/cmd/room.rs +++ b/src/server/auto/cmd/room.rs @@ -1,36 +1,33 @@ pub fn create_room_id_redirect_command(room_name: &str, room_id: usize) -> Vec<u8> { - let mut room_id_redirect = vec![ - 0x01, // ? - 0x1A, // Command type - ]; + let mut room_id_redirect = vec![ + 0x01, // ? + 0x1A, // Command type + ]; - // room_id_redirect.push(room_id_redirect.len() as u8 + 1); // Data length - room_id_redirect.push(room_name.len() as u8); // UTF/ room name length - for i in room_name.bytes() { room_id_redirect.push(i); } // Push `room_name` - // for i in "<dimension-1>".bytes() { room_id_redirect.push(i); } // Push room number + // room_id_redirect.push(room_id_redirect.len() as u8 + 1); // Data length + room_id_redirect.push(room_name.len() as u8); // UTF/ room name length + for i in room_name.bytes() { + room_id_redirect.push(i); + } // Push `room_name` + // for i in "<dimension-1>".bytes() { room_id_redirect.push(i); } // Push room + // number - // Room number - room_id_redirect.push(0x00); - room_id_redirect.push(room_id as u8); + // Room number + room_id_redirect.push(0x00); + room_id_redirect.push(room_id as u8); - // IP - room_id_redirect.push(0x00); - room_id_redirect.push(0x00); - room_id_redirect.push(0x00); - room_id_redirect.push(0x00); + // IP + room_id_redirect.push(0x00); + room_id_redirect.push(0x00); + room_id_redirect.push(0x00); + room_id_redirect.push(0x00); - // Port - // for byte in convert_u16_to_two_u8s_be(0x1629).iter() { - // room_id_redirect.push(*byte); - // } - // Port - // for byte in convert_u16_to_two_u8s_be(5673_i32 as u16).iter() { - // room_id_redirect.push(*byte); - // } - room_id_redirect.push(0x16); - room_id_redirect.push(0x29); + // Port + for byte in 5673_u16.to_be_bytes().iter() { + room_id_redirect.push(*byte); + } - room_id_redirect.insert(0, room_id_redirect.len() as u8 + 1); // Data length + room_id_redirect.insert(0, room_id_redirect.len() as u8 + 1); // Data length - room_id_redirect + room_id_redirect } diff --git a/src/server/auto/cmd/session.rs b/src/server/auto/cmd/session.rs index 5592886..19969dc 100644 --- a/src/server/auto/cmd/session.rs +++ b/src/server/auto/cmd/session.rs @@ -1,34 +1,36 @@ -use crate::server::cmd::session::SessionInitializationCommand; -use bytes::BytesMut; use std::str::from_utf8; +use bytes::BytesMut; + +use crate::server::cmd::session::SessionInitializationCommand; + struct _SessionInitializationCommandServer { - pub error: usize, - pub app_name: String, - pub protocol: usize, - pub server_type: usize, - pub serial: String, - pub private: usize, - pub channel: String, + pub error: usize, + pub app_name: String, + pub protocol: usize, + pub server_type: usize, + pub serial: String, + pub private: usize, + pub channel: String, } -pub fn parse_session_initialization_command( - command: BytesMut -) -> SessionInitializationCommand { - SessionInitializationCommand { - // protocol: command.get(4..4 + command.get(4)).unwrap().to_owned() as usize, - // client: "".to_string(), - username: from_utf8( - command.get( - 21..(20 + command.get(20).unwrap().to_owned() as usize + 1) - ).unwrap() - ).unwrap().to_string(), - // password: "".to_string() - } +pub fn parse_session_initialization_command(command: BytesMut) -> SessionInitializationCommand { + SessionInitializationCommand { + // protocol: command.get(4..4 + command.get(4)).unwrap().to_owned() as usize, + // client: "".to_string(), + username: from_utf8( + command + .get(21..(20 + command.get(20).unwrap().to_owned() as usize + 1)) + .unwrap(), + ) + .unwrap() + .to_string(), + // password: "".to_string() + } } -// pub fn create_session_initialization_command() -> SessionInitializationCommandServer { -// SessionInitializationCommandServer { +// pub fn create_session_initialization_command() -> +// SessionInitializationCommandServer { SessionInitializationCommandServer { // // } // } diff --git a/src/server/auto/mod.rs b/src/server/auto/mod.rs index 0a6f342..24606ea 100644 --- a/src/server/auto/mod.rs +++ b/src/server/auto/mod.rs @@ -1,2 +1,2 @@ -pub mod server; pub mod cmd; +pub mod server; diff --git a/src/server/auto/server.rs b/src/server/auto/server.rs index b8c23e1..164b271 100644 --- a/src/server/auto/server.rs +++ b/src/server/auto/server.rs @@ -1,168 +1,319 @@ -use std::error::Error; -use tokio::net::{TcpListener, TcpStream}; -use tokio::io::AsyncWriteExt; -use crate::server::auto::cmd::property::{ - create_property_update_command, - create_property_request_command +use std::{ + collections::HashMap, + error::Error, + io::{ErrorKind, Read, Write}, + ops::Deref, + str::from_utf8, }; -use tokio_util::codec::{BytesCodec, Decoder}; -use tokio_stream::StreamExt; -use crate::server::cmd::text::{create_text_command_with_action, create_text_command}; -use std::str::from_utf8; -use crate::server::cmd::buddy_list::create_buddy_list_notify_command; -use crate::server::auto::cmd::room::create_room_id_redirect_command; -use std::sync::Arc; -use tokio::sync::Mutex; -use crate::server::shared::Shared; -use crate::server::peer::Peer; -use std::net::SocketAddr; -use crate::server::auto::cmd::session::parse_session_initialization_command; -use crate::server::parser::get_commands_from_buffer; -use crate::server::cmd::property::parse_property_set_command; -use crate::config::get_config; - -pub struct AutoServer; + +use bytes::BytesMut; +use mio::{ + event::Event, + net::{TcpListener, TcpStream}, + Events, + Interest, + Poll, + Registry, + Token, +}; + +use crate::{ + config::get_config, + server::{ + auto::cmd::{ + property::{create_property_request_command, create_property_update_command}, + room::create_room_id_redirect_command, + session::parse_session_initialization_command, + }, + cmd::{ + action::create_action_command, + buddy_list::create_buddy_list_notify_command, + property::parse_property_set_command, + text::{create_text_command, create_text_command_with_action}, + }, + parser::get_commands_from_buffer, + }, +}; + +const SERVER: Token = Token(0); + +pub struct AutoServer { + pub clients: HashMap<Token, String>, + pub connections: HashMap<Token, TcpStream>, + pub room_ids: Vec<String>, +} impl AutoServer { - pub async fn listen(addr: &str) -> Result<(), Box<dyn Error>> { - let listener = TcpListener::bind(addr).await?; - debug!("AutoServer now listening on {}", listener.local_addr().unwrap()); - let state = Arc::new(Mutex::new(Shared::new())); - let mut counter = 0; - - loop { - let (stream, address) = listener.accept().await?; - counter += 1; - let state = Arc::clone(&state); - - tokio::spawn(async move { - if let Err(e) = AutoServer::handle( - state, - stream, - address, - counter - ).await { - error!("an error occurred: {}", e); - } - }); - } - } - - pub async fn handle( - state: Arc<Mutex<Shared>>, - stream: TcpStream, - address: SocketAddr, - count: usize, - ) -> Result<(), Box<dyn Error>> { - let bytes = BytesCodec::new().framed(stream); - let mut peer = Peer::new(state.clone(), bytes, count.to_string()).await?; - debug!("registered peer with address '{}' as '{}'", address, count); - let mut room_ids: Vec<String> = Vec::new(); - let mut username: String = String::new(); - - loop { - tokio::select! { - Some(msg) = peer.rx.recv() => { - // debug!("received bytes from peer: {:?}", &msg); - peer.bytes.get_mut().write_all(&msg).await?; - } - result = peer.bytes.next() => match result { - Some(Ok(msg)) => { - // let msg: BytesMut = msg; - for msg in get_commands_from_buffer(msg) { - match msg.get(2).unwrap() { - 10 => { // PROPREQ - debug!("received property request command from client"); - peer.bytes.get_mut() - .write_all(&create_property_update_command()).await?; - debug!("sent property update command to client"); - } - 6 => { // SESSINIT - username = parse_session_initialization_command(msg.clone()).username; - debug!( - "received session initialization command from client: {}", - username - ); - peer.bytes.get_mut() - .write_all(&create_property_request_command()).await?; - debug!("sent session initialization command to client"); - } - 15 => { // PROPSET - let avatar = parse_property_set_command(msg.clone()); - debug!("received property set command from client: {}", avatar); - peer.bytes.get_mut() - .write_all(&create_text_command_with_action( - "WORLDSMASTER", &get_config()?.worldsmaster_greeting - )).await?; - debug!("sent worldsmaster greeting to client"); - } - 29 => { // BUDDYLISTUPDATE - let received_buddy = from_utf8( - msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap() - ).unwrap(); - debug!( - "received buddy list update command from client: {}", - received_buddy - ); - peer.bytes.get_mut() - .write_all(&create_buddy_list_notify_command(received_buddy)) - .await?; - debug!("sent buddy list notify command to client: {}", received_buddy); - } - 20 => { // ROOMIDRQ - let room_name = from_utf8( - msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap() - ).unwrap(); - debug!("received room id request command from client: {}", room_name); - let room_id; - if !room_ids.contains(&room_name.to_string()) { - room_ids.push(room_name.to_string()); - room_id = room_ids.iter() - .position(|i| i == &room_name.to_string()) - .unwrap(); - trace!("inserted room '{}' as '{}'", room_name, room_id); - } else { - let position = room_ids.iter() - .position(|i| i == &room_name.to_string()) - .unwrap(); - trace!("found room '{}' as '{}'", room_name, position); - room_id = position; - } - trace!("room name: {}, room id: {}", room_name, room_id); - trace!("{:?}", room_ids); - peer.bytes.get_mut() - .write_all(&create_room_id_redirect_command(room_name, room_id)) - .await?; - debug!("sent redirect id command to client: {} == {}", room_name, room_id); - } - 14 => { - let text = from_utf8( - msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap() - ).unwrap(); - debug!("received text command from client: {}", text); - let mut state = state.lock().await; - state.broadcast(&create_text_command(&username, text)).await; - debug!("broadcasted text command from client"); - } - 7 => { // SESSEXIT - debug!("received session exit command from client") - } - _ => (), - } - } - } - Some(Err(e)) => { - error!("error while processing messages: {}", e); break; - } - None => break, - } - } - } - - { // De-register client - state.lock().await.peers.remove(&count.to_string()); - debug!("removed peer: {}", count) - } - - Ok(()) - } + pub fn listen(&mut self, addr: &str) -> Result<(), Box<dyn Error>> { + let mut server = TcpListener::bind(addr.parse().unwrap())?; + let mut poll = Poll::new()?; + let mut events = Events::with_capacity(1024); + let mut unique_token = Token(SERVER.0 + 1); + + poll + .registry() + .register(&mut server, SERVER, Interest::READABLE)?; + + debug!( + "AutoServer now listening on {}", + server.local_addr().unwrap() + ); + + loop { + poll.poll(&mut events, None)?; + + for event in &events { + match event.token() { + SERVER => { + loop { + let (mut stream, address) = match server.accept() { + Ok((stream, address)) => (stream, address), + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break, + Err(e) => return Err(Box::new(e)), + }; + + let token = AutoServer::next(&mut unique_token); + poll.registry().register( + &mut stream, + token, + Interest::READABLE, //.add(Interest::WRITABLE), + )?; + + self.connections.insert(token, stream); + + debug!( + "registered peer with address '{}' as '{}'", + address, token.0 + ); + } + } + token => { + let done = self.process(poll.registry(), event, token)?; + if done { + self.connections.remove(&token); + } + } + } + } + } + } + + fn next(current: &mut Token) -> Token { + let next = current.0; + current.0 += 1; + Token(next) + } + + fn broadcast(mut self, cmd: &[u8]) -> () { + self + .connections + .iter_mut() + .for_each(|c| c.1.write_all(cmd).unwrap()); + } + + fn process( + &mut self, + _registry: &Registry, + event: &Event, + token: Token, + ) -> Result<bool, Box<dyn Error>> { + if event.is_readable() { + let mut connection_closed = false; + let mut received_data = vec![0; 4096]; + let mut bytes_read = 0; + + let stream = self.connections.get_mut(&token).unwrap(); + + loop { + match stream.read(&mut received_data[bytes_read..]) { + Ok(0) => { + connection_closed = true; + break; + } + Ok(n) => { + bytes_read += n; + if bytes_read == received_data.len() { + received_data.resize(received_data.len() + 1024, 0); + } + } + Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + Err(ref err) if err.kind() == ErrorKind::Interrupted => continue, + Err(err) => return Err(Box::new(err)), + } + } + + if bytes_read != 0 { + self.handle(&mut received_data[..bytes_read], token); + } + if connection_closed { + println!("de-registered peer with token '{}'", token.0); + return Ok(true); + } + } + + Ok(false) + } + + fn handle(&mut self, data: &[u8], token: Token) -> () { + // trace!("i am client: {:?}", self.clients.get(&token)); + // debug!("{:?}", self.connections); + for cmd in get_commands_from_buffer(BytesMut::from(data)) { + debug!("received: {:?}", cmd); + match cmd.get(2).unwrap() { + 10 => { + // PROPREQ + debug!("received property request command from client 'null'"); + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_property_update_command()) + .unwrap(); + debug!("sent property update command to client 'null'"); + } + 6 => { + // SESSINIT + let local_username = parse_session_initialization_command(cmd).username; + self.clients.insert(token, local_username.clone()); + debug!( + "received session initialization command from client '{}'", + local_username, + ); + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_property_request_command()) + .unwrap(); + debug!( + "sent session initialization command to client '{}'", + local_username + ); + } + 15 => { + // PROPSET + let avatar = parse_property_set_command(cmd); + debug!( + "received property set command from client '{}': {}", + self.clients.get(&token).unwrap(), + avatar, + ); + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_text_command( + "WORLDSMASTER", + &get_config().unwrap().worldsmaster_greeting, + )) + .unwrap(); + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_action_command()) + .unwrap(); + debug!( + "sent session initialization command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + 29 => { + // BUDDYLISTUPDATE + let received_buddy = from_utf8( + cmd + .get(4..cmd.get(0).unwrap().to_owned() as usize - 1) + .unwrap(), + ) + .unwrap(); + debug!( + "received buddy list update command from client '{}': {}", + self.clients.get(&token).unwrap(), + received_buddy, + ); + let buddies = vec![ + "dosfox", + "Fallen_Angel", + "Internet_Man", + "Nexialist", + "SirGemini", + "SirGrandpa", + "Wirlaburla", + ]; + if buddies.contains(&received_buddy) { + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_buddy_list_notify_command(received_buddy)) + .unwrap(); + debug!( + "sent buddy list notify command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + } + 20 => { + // ROOMIDRQ + let room_name = + from_utf8(cmd.get(4..cmd.get(0).unwrap().to_owned() as usize).unwrap()).unwrap(); + debug!( + "received room id request command from client '{}': {}", + self.clients.get(&token).unwrap(), + room_name, + ); + let room_id; + if !self.room_ids.contains(&room_name.to_string()) { + self.room_ids.push(room_name.to_string()); + room_id = self + .room_ids + .iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("inserted room '{}' as '{}'", room_name, room_id); + } else { + let position = self + .room_ids + .iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("found room '{}' as '{}'", room_name, position); + room_id = position; + } + trace!("room name: {}, room id: {}", room_name, room_id); + trace!("{:?}", self.room_ids); + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_room_id_redirect_command(room_name, room_id)) + .unwrap(); + } + 14 => { + // TEXT + let text = + from_utf8(cmd.get(6..cmd.get(0).unwrap().to_owned() as usize).unwrap()).unwrap(); + let username = self.clients.get(&token).unwrap().clone(); + debug!( + "received text command from client '{}': {}", + username, + format!("auto {}", text), + ); + self.connections.iter_mut().for_each(|t| { + t.1 + .write_all(&create_text_command(&username, text)) + .unwrap() + }); + debug!("broadcasted text command to clients"); + } + 7 => { + // SESSEXIT + debug!( + "received session exit command from client '{}'", + self.clients.get(&token).unwrap(), + ); + } + _ => (), + } + } + } } diff --git a/src/server/cmd/action.rs b/src/server/cmd/action.rs new file mode 100644 index 0000000..1cf9086 --- /dev/null +++ b/src/server/cmd/action.rs @@ -0,0 +1,6 @@ +pub fn create_action_command() -> [u8; 18] { + [ + 0x12, 0x01, 0x11, 0x00, 0x05, 0x54, 0x52, 0x41, 0x44, 0x45, 0x07, 0x26, 0x7c, 0x2b, 0x69, 0x6e, + 0x76, 0x3e, + ] +} diff --git a/src/server/cmd/buddy_list.rs b/src/server/cmd/buddy_list.rs index cbddc19..ba938f5 100644 --- a/src/server/cmd/buddy_list.rs +++ b/src/server/cmd/buddy_list.rs @@ -1,12 +1,14 @@ pub fn create_buddy_list_notify_command(buddy: &str) -> Vec<u8> { - let mut buddy_list_notify = Vec::with_capacity(5 + buddy.len()); - buddy_list_notify.push(0x01); // ? - buddy_list_notify.push(0x1E); // BUDDYLISTNOTIFY - buddy_list_notify.push(buddy.len() as u8); // Buddy name length - for i in buddy.bytes() { buddy_list_notify.push(i); } // Buddy name - buddy_list_notify.push(0x01); // Is buddy logged on? - // Insert data length as first byte. - buddy_list_notify.insert(0, buddy_list_notify.len() as u8 + 1); // ^ + let mut buddy_list_notify = Vec::with_capacity(5 + buddy.len()); + buddy_list_notify.push(0x01); // ? + buddy_list_notify.push(0x1E); // BUDDYLISTNOTIFY + buddy_list_notify.push(buddy.len() as u8); // Buddy name length + for i in buddy.bytes() { + buddy_list_notify.push(i); + } // Buddy name + buddy_list_notify.push(0x01); // Is buddy logged on? + // Insert data length as first byte. + buddy_list_notify.insert(0, buddy_list_notify.len() as u8 + 1); // ^ - buddy_list_notify // Return created array + buddy_list_notify // Return created array } diff --git a/src/server/cmd/mod.rs b/src/server/cmd/mod.rs index 5e5bcd9..6db7826 100644 --- a/src/server/cmd/mod.rs +++ b/src/server/cmd/mod.rs @@ -1,3 +1,4 @@ +pub mod action; pub mod buddy_list; pub mod property; pub mod session; diff --git a/src/server/cmd/property.rs b/src/server/cmd/property.rs index eaa4cc1..1530f58 100644 --- a/src/server/cmd/property.rs +++ b/src/server/cmd/property.rs @@ -1,6 +1,7 @@ -use bytes::BytesMut; use std::str::from_utf8; +use bytes::BytesMut; + pub fn parse_property_set_command(command: BytesMut) -> String { - from_utf8(command.get(8..).unwrap()).unwrap().to_string() + from_utf8(command.get(8..).unwrap()).unwrap().to_string() } diff --git a/src/server/cmd/session.rs b/src/server/cmd/session.rs index 9d48bcc..97efc8f 100644 --- a/src/server/cmd/session.rs +++ b/src/server/cmd/session.rs @@ -1,6 +1,6 @@ pub struct SessionInitializationCommand { - // pub protocol: usize, - // pub client: String, - pub username: String, - // pub password: String, + // pub protocol: usize, + // pub client: String, + pub username: String, + // pub password: String, } diff --git a/src/server/cmd/text.rs b/src/server/cmd/text.rs index a0ada32..cc306db 100644 --- a/src/server/cmd/text.rs +++ b/src/server/cmd/text.rs @@ -1,46 +1,55 @@ pub fn create_text_command(user: &str, message: &str) -> Vec<u8> { - let mut text = Vec::with_capacity(6 + user.len() + message.len()); - text.push(0x01); // ? - text.push(0x0E); // Command type - text.push(0x00); // Assumed to be a divider. - text.push(user.len() as u8); // 'user' length - for i in user.bytes() { text.push(i); } // Pushing 'user' - text.push(message.len() as u8); // 'message' length - for i in message.bytes() { text.push(i); } // Pushing `message` - text.insert(0, text.len() as u8 + 1); // Insert data length as first byte. + let mut text = Vec::with_capacity(6 + user.len() + message.len()); + text.push(0x01); // ? + text.push(0x0E); // Command type + text.push(0x00); // Assumed to be a divider. + text.push(user.len() as u8); // 'user' length + for i in user.bytes() { + text.push(i); + } // Pushing 'user' + text.push(message.len() as u8); // 'message' length + for i in message.bytes() { + text.push(i); + } // Pushing `message` + text.insert(0, text.len() as u8 + 1); // Insert data length as first byte. - text // Return created array + text // Return created array } // TODO: Get this working! -// pub fn get_message_from_text_command(buffer: &'static [u8; 1024]) -> &'static str { -// from_utf8( +// pub fn get_message_from_text_command(buffer: &'static [u8; 1024]) -> &'static +// str { from_utf8( // &buffer[6..*&buffer.get(0).unwrap().to_owned() as usize] // ).unwrap() // } pub fn create_text_command_with_action( - user: &str, - message: &str, - // action: &str // Not accepting input until I figure out how actions work. + user: &str, + message: &str, + // action: &str // Not accepting input until I figure out how actions work. ) -> Vec<u8> { - let mut text = Vec::with_capacity(6 + user.len() + message.len()); - text.push(0x01); // ? - text.push(0x0E); // Command type - text.push(0x00); // Assumed to be a divider. - text.push(user.len() as u8); // 'user' length - for i in user.bytes() { text.push(i); } // Pushing 'user' - text.push(message.len() as u8); // 'message' length - for i in message.bytes() { text.push(i); } // Pushing `message` + let mut text = Vec::with_capacity(6 + user.len() + message.len()); + text.push(0x01); // ? + text.push(0x0E); // Command type + text.push(0x00); // Assumed to be a divider. + text.push(user.len() as u8); // 'user' length + for i in user.bytes() { + text.push(i); + } // Pushing 'user' + text.push(message.len() as u8); // 'message' length + for i in message.bytes() { + text.push(i); + } // Pushing `message` - let action: [u8; 18] = [ - 0x12, 0x01, 0x11, 0x00, 0x05, 0x54, 0x52, 0x41, - 0x44, 0x45, 0x07, 0x26, 0x7c, 0x2b, 0x69, 0x6e, - 0x76, 0x3e - ]; - for i in action.iter() { text.push(*i); } + let action: [u8; 18] = [ + 0x12, 0x01, 0x11, 0x00, 0x05, 0x54, 0x52, 0x41, 0x44, 0x45, 0x07, 0x26, 0x7c, 0x2b, 0x69, 0x6e, + 0x76, 0x3e, + ]; + for i in action.iter() { + text.push(*i); + } - text.insert(0, text.len() as u8 + 1); // Insert data length as first byte. + text.insert(0, text.len() as u8 + 1); // Insert data length as first byte. - text // Return created array + text // Return created array } diff --git a/src/server/mod.rs b/src/server/mod.rs index a6341a2..6f2a839 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,12 +1,7 @@ -use tokio::sync::mpsc; use bytes::BytesMut; +use tokio::sync::mpsc; pub mod auto; mod cmd; -pub mod room; mod parser; -mod peer; -mod shared; - -type Tx = mpsc::UnboundedSender<BytesMut>; -type Rx = mpsc::UnboundedReceiver<BytesMut>; +pub mod room; diff --git a/src/server/parser.rs b/src/server/parser.rs index acedbd3..f856650 100644 --- a/src/server/parser.rs +++ b/src/server/parser.rs @@ -8,30 +8,28 @@ use bytes::BytesMut; /// 3. Remove command from `buffer`. /// 4. Iterate and do this for all commands within `buffer`. pub fn get_commands_from_buffer(mut buffer: BytesMut) -> Vec<BytesMut> { - let mut commands: Vec<BytesMut> = Vec::new(); - // debug!("initial buffer: {:?}, length: {}", buffer, buffer.len()); + let mut commands: Vec<BytesMut> = Vec::new(); + trace!("initial buffer: {:?}, length: {}", buffer, buffer.len()); - let data_length = buffer.get(0).unwrap().to_owned() as usize; - if buffer.len() > data_length { - loop { - // debug!("loop: {:?}, length: {}", buffer, buffer.len()); - let command_length = buffer.get(0).unwrap().to_owned() as usize; - commands.push( - BytesMut::from(buffer.get(0..command_length).unwrap()) - ); + let data_length = buffer.get(0).unwrap().to_owned() as usize; + if buffer.len() > data_length { + loop { + trace!("loop: {:?}, length: {}", buffer, buffer.len()); + let command_length = buffer.get(0).unwrap().to_owned() as usize; + commands.push(BytesMut::from(buffer.get(0..command_length).unwrap())); - // Remove command from buffer - buffer = buffer.split_off(command_length); + // Remove command from buffer + buffer = buffer.split_off(command_length); - // Check if any more commands are present - if buffer.is_empty() { break; } - } - } else { - // There will always be at least one command, push it. - commands.push( - BytesMut::from(buffer.get(0..data_length).unwrap()) - ); - } + // Check if any more commands are present + if buffer.is_empty() { + break; + } + } + } else { + // There will always be at least one command, push it. + commands.push(BytesMut::from(buffer.get(0..data_length).unwrap())); + } - commands // Return command (s) + commands // Return command (s) } diff --git a/src/server/peer.rs b/src/server/peer.rs deleted file mode 100644 index e92a3db..0000000 --- a/src/server/peer.rs +++ /dev/null @@ -1,26 +0,0 @@ -use tokio_util::codec::{Framed, BytesCodec}; -use tokio::net::TcpStream; -use crate::server::Rx; -use std::sync::Arc; -use tokio::sync::Mutex; -use crate::server::shared::Shared; -use tokio::sync::mpsc; - -pub struct Peer { - pub bytes: Framed<TcpStream, BytesCodec>, - // pub(crate) rx: Rx, - pub rx: Rx, -} -impl Peer { - pub async fn new( - state: Arc<Mutex<Shared>>, - bytes: Framed<TcpStream, BytesCodec>, - username: String, - ) -> std::io::Result<Peer> { - // let address = bytes.get_ref().peer_addr()?; - let (tx, rx) = mpsc::unbounded_channel(); - // state.lock().await.peers.insert(address, tx); - state.lock().await.peers.insert(username, tx); - Ok(Peer { bytes, rx }) - } -} diff --git a/src/server/room/__server.rs b/src/server/room/__server.rs new file mode 100644 index 0000000..0923e1b --- /dev/null +++ b/src/server/room/__server.rs @@ -0,0 +1,382 @@ +use std::error::Error; +use crate::server::auto::cmd::property::{ + create_property_update_command, + create_property_request_command +}; +use crate::server::cmd::text::{create_text_command_with_action, create_text_command}; +use std::str::from_utf8; +use crate::server::cmd::buddy_list::create_buddy_list_notify_command; +use crate::server::auto::cmd::room::create_room_id_redirect_command; +use crate::server::room::cmd::session::parse_session_initialization_command; +use crate::server::parser::get_commands_from_buffer; +use crate::server::cmd::property::parse_property_set_command; +use crate::config::get_config; +use mio::{Poll, Events, Token, Interest, Registry}; +use mio::net::{TcpListener, TcpStream}; +use std::collections::{HashMap, HashSet}; +use mio::event::Event; +use std::io::{Read, ErrorKind, Write}; +use bytes::BytesMut; + +pub struct RoomServer { + pub clients: HashMap<Token, String>, + pub connections: HashMap<Token, TcpStream>, + pub room_ids: Vec<String>, +} +impl RoomServer { + pub fn listen(&mut self, addr: &str) -> Result<(), Box<dyn Error>> { + let mut listener = TcpListener::bind(addr.parse().unwrap())?; + let mut poll = Poll::new()?; + let mut events = Events::with_capacity(1024); + let mut counter: usize = 0; + // let mut sockets = HashMap::new(); + let mut requests = HashMap::new(); + let mut buffer = [0 as u8; 1024]; + // let mut room_ids = vec![]; + + poll.registry().register( + &mut listener, + Token(0), + Interest::READABLE, + )?; + + debug!("RoomServer now listening on {}", listener.local_addr().unwrap()); + + loop { + poll.poll(&mut events, None)?; + + for event in &events { + match event.token() { + Token(0) => loop { + match listener.accept() { + Ok((mut stream, address)) => { + counter += 1; + let token = Token(counter); + + poll.registry().register( + &mut stream, + token, + Interest::READABLE, + )?; + + debug!("registered peer with address '{}' as '{}'", address, token.0); + + // sockets.insert(token, stream); + self.connections.insert(token, stream); + requests.insert(token, Vec::with_capacity(192)); + } + Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + Err(err) => { + error!("unexpected error: {}", err); + poll.registry().deregister( + self.connections.get_mut(&Token(counter)).unwrap(), + )?; + break; + } + } + }, + token if event.is_readable() => { + loop { + let read = self.connections.get_mut(&token).unwrap() + .read(&mut buffer); + match read { + Ok(0) => { self.connections.remove(&token); break; } + Ok(n) => { + let req = requests.get_mut(&token).unwrap(); + for b in &buffer[0..n] { req.push(*b); } + + for cmd in get_commands_from_buffer(BytesMut::from(&buffer[..n])) { + match cmd.get(2).unwrap() { + 10 => { // PROPREQ + debug!("received property request command from client 'null'"); + self.connections.get_mut(&token).unwrap() + .write_all(&create_property_update_command()).unwrap(); + debug!("sent property update command to client 'null'"); + } + 6 => { // SESSINIT + let local_username = + parse_session_initialization_command(cmd).username; + self.clients.insert(token, local_username.clone()); + debug!( + "received session initialization command from client '{}'", + local_username, + ); + self.connections.get_mut(&token).unwrap() + .write_all(&create_property_request_command()).unwrap(); + debug!("sent session initialization command to client '{}'", local_username); + } + 15 => { // PROPSET + let avatar = parse_property_set_command(cmd); + debug!( + "received property set command from client '{}': {}", + self.clients.get(&token).unwrap(), + avatar, + ); + self.connections.get_mut(&token).unwrap() + .write_all(&create_text_command_with_action( + "WORLDSMASTER", &get_config().unwrap().worldsmaster_greeting, + )).unwrap(); + debug!( + "sent session initialization command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + 29 => { // BUDDYLISTUPDATE + let received_buddy = from_utf8( + cmd.get(4..cmd.get(0).unwrap().to_owned() as usize - 1).unwrap() + ).unwrap(); + debug!( + "received buddy list update command from client '{}': {}", + self.clients.get(&token).unwrap(), + received_buddy, + ); + self.connections.get_mut(&token).unwrap() + .write_all(&create_buddy_list_notify_command(received_buddy)).unwrap(); + debug!( + "sent buddy list notify command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + // 20 => { // ROOMIDRQ + // let room_name = from_utf8( + // cmd.get(4..cmd.get(0).unwrap().to_owned() as usize).unwrap() + // ).unwrap(); + // debug!( + // "received room id request command from client '{}': {}", + // self.clients.get(&token).unwrap(), + // room_name, + // ); + // let room_id; + // if !self.room_ids.contains(&room_name.to_string()) { + // self.room_ids.push(room_name.to_string()); + // room_id = self.room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("inserted room '{}' as '{}'", room_name, room_id); + // } else { + // let position = self.room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("found room '{}' as '{}'", room_name, position); + // room_id = position; + // } + // trace!("room name: {}, room id: {}", room_name, room_id); + // trace!("{:?}", self.room_ids); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_room_id_redirect_command( + // room_name, room_id, + // )).unwrap(); + // } + 14 => { // TEXT + let text = from_utf8( + cmd.get(6..cmd.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + let username = self.clients.get(&token).unwrap().clone(); + debug!( + "received text command from client '{}': {}", + username, format!("room: {}", text), + ); + self.connections.iter_mut().for_each(|t| + t.1.write_all(&create_text_command( + &username, + text, + )).unwrap() + ); + debug!("broadcasted text command to clients"); + } + 7 => { // SESSEXIT + debug!( + "received session exit command from client '{}'", + self.clients.get(&token).unwrap(), + ); + } + _ => (), + } + } + } + Err(ref err) if err.kind() == ErrorKind::WouldBlock => + break, + Err(err) => { error!("unexpected error: {}", err); break; } + } + } + } + _ => (), + } + } + } + } + + fn broadcast( + sockets: &HashMap<Token, TcpStream>, + cmd: &[u8], + ) -> () { + for mut socket in sockets { + socket.1.write_all(cmd).unwrap(); + } + } + + // fn process( + // &mut self, + // _registry: &Registry, + // event: &Event, + // token: Token, + // ) -> Result<bool, Box<dyn Error>> { + // if event.is_readable() { + // let mut connection_closed = false; + // let mut received_data = vec![0; 4096]; + // let mut bytes_read = 0; + // + // let stream = self.connections.get_mut(&token).unwrap(); + // + // loop { + // match stream.read(&mut received_data[bytes_read..]) { + // Ok(0) => { + // connection_closed = true; + // break; + // } + // Ok(n) => { + // bytes_read += n; + // if bytes_read == received_data.len() { + // received_data.resize(received_data.len() + 1024, 0); + // } + // } + // Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + // Err(ref err) if err.kind() == ErrorKind::Interrupted => continue, + // Err(err) => return Err(Box::new(err)), + // } + // } + // + // if bytes_read != 0 { + // self.handle( + // &mut received_data[..bytes_read], + // token, + // ); + // } + // if connection_closed { + // println!("de-registered peer with token '{}'", token.0); + // return Ok(true); + // } + // } + // + // Ok(false) + // } + + // fn handle( + // &mut self, + // data: &[u8], + // // stream: &mut TcpStream, + // token: Token, + // ) -> () { + // // trace!("i am client: {:?}", self.clients.get(&token)); + // // debug!("{:?}", self.connections); + // for cmd in get_commands_from_buffer(BytesMut::from(data)) { + // debug!("received: {:?}", cmd); + // match cmd.get(2).unwrap() { + // 10 => { // PROPREQ + // debug!("received property request command from client 'null'"); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_property_update_command()).unwrap(); + // debug!("sent property update command to client 'null'"); + // } + // 6 => { // SESSINIT + // let local_username = + // parse_session_initialization_command(cmd).username; + // self.clients.insert(token, local_username.clone()); + // debug!( + // "received session initialization command from client '{}'", + // local_username, + // ); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_property_request_command()).unwrap(); + // debug!("sent session initialization command to client '{}'", local_username); + // } + // 15 => { // PROPSET + // let avatar = parse_property_set_command(cmd); + // debug!( + // "received property set command from client '{}': {}", + // self.clients.get(&token).unwrap(), + // avatar, + // ); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_text_command_with_action( + // "WORLDSMASTER", &get_config().unwrap().worldsmaster_greeting, + // )).unwrap(); + // debug!( + // "sent session initialization command to client '{}'", + // self.clients.get(&token).unwrap(), + // ); + // } + // 29 => { // BUDDYLISTUPDATE + // let received_buddy = from_utf8( + // cmd.get(4..cmd.get(0).unwrap().to_owned() as usize - 1).unwrap() + // ).unwrap(); + // debug!( + // "received buddy list update command from client '{}': {}", + // self.clients.get(&token).unwrap(), + // received_buddy, + // ); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_buddy_list_notify_command(received_buddy)).unwrap(); + // debug!( + // "sent buddy list notify command to client '{}'", + // self.clients.get(&token).unwrap(), + // ); + // } + // 20 => { // ROOMIDRQ + // let room_name = from_utf8( + // cmd.get(4..cmd.get(0).unwrap().to_owned() as usize).unwrap() + // ).unwrap(); + // debug!( + // "received room id request command from client '{}': {}", + // self.clients.get(&token).unwrap(), + // room_name, + // ); + // let room_id; + // if !self.room_ids.contains(&room_name.to_string()) { + // self.room_ids.push(room_name.to_string()); + // room_id = self.room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("inserted room '{}' as '{}'", room_name, room_id); + // } else { + // let position = self.room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("found room '{}' as '{}'", room_name, position); + // room_id = position; + // } + // trace!("room name: {}, room id: {}", room_name, room_id); + // trace!("{:?}", self.room_ids); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_room_id_redirect_command( + // room_name, room_id, + // )).unwrap(); + // } + // 14 => { // TEXT + // let text = from_utf8( + // cmd.get(6..cmd.get(0).unwrap().to_owned() as usize).unwrap() + // ).unwrap(); + // let username = self.clients.get(&token).unwrap().clone(); + // debug!( + // "received text command from client '{}': {}", + // username, text, + // ); + // self.connections.iter_mut().for_each(|t| + // t.1.write_all(&create_text_command( + // &username, + // text, + // )).unwrap() + // ); + // debug!("broadcasted text command to clients"); + // } + // 7 => { // SESSEXIT + // debug!( + // "received session exit command from client '{}'", + // self.clients.get(&token).unwrap(), + // ); + // } + // _ => (), + // } + // } + // } +} diff --git a/src/server/room/_server.rs b/src/server/room/_server.rs new file mode 100644 index 0000000..e08c3d7 --- /dev/null +++ b/src/server/room/_server.rs @@ -0,0 +1,168 @@ +use std::error::Error; +use tokio::net::{TcpListener, TcpStream}; +use tokio::io::AsyncWriteExt; +use crate::server::room::cmd::property::{ + create_property_update_command, + create_property_request_command +}; +use tokio_util::codec::{BytesCodec, Decoder}; +use tokio_stream::StreamExt; +use crate::server::cmd::text::{create_text_command_with_action, create_text_command}; +use std::str::from_utf8; +use crate::server::cmd::buddy_list::create_buddy_list_notify_command; +use crate::server::auto::cmd::room::create_room_id_redirect_command; +use std::sync::Arc; +use tokio::sync::Mutex; +use crate::server::shared::Shared; +use crate::server::peer::Peer; +use std::net::SocketAddr; +use crate::server::room::cmd::session::parse_session_initialization_command; +use crate::server::parser::get_commands_from_buffer; +use crate::server::cmd::property::parse_property_set_command; + + +pub struct RoomServer; +impl RoomServer { + pub async fn listen(addr: &str) -> Result<(), Box<dyn Error>> { + let listener = TcpListener::bind(addr).await?; + debug!("RoomServer now listening on {}", listener.local_addr().unwrap()); + let state = Arc::new(Mutex::new(Shared::new())); + let mut counter = 0; + + loop { + let (stream, address) = listener.accept().await?; + counter += 1; + let state = Arc::clone(&state); + + tokio::spawn(async move { + if let Err(e) = RoomServer::handle( + state, + stream, + address, + counter + ).await { + error!("an error occurred: {}", e); + } + }); + } + } + + pub async fn handle( + state: Arc<Mutex<Shared>>, + stream: TcpStream, + address: SocketAddr, + count: usize, + ) -> Result<(), Box<dyn Error>> { + let bytes = BytesCodec::new().framed(stream); + let mut peer = Peer::new(state.clone(), bytes, count.to_string()).await?; + debug!("registered peer with address '{}' as '{}'", address, count); + let mut room_ids: Vec<String> = Vec::new(); + let mut username: String = String::new(); + + loop { + tokio::select! { + Some(msg) = peer.rx.recv() => { + // debug!("received bytes from peer: {:?}", &msg); + peer.bytes.get_mut().write_all(&msg).await?; + } + result = peer.bytes.next() => match result { + Some(Ok(msg)) => { + // let msg: BytesMut = msg; + for msg in get_commands_from_buffer(msg) { + match msg.get(2).unwrap() { + 10 => { // PROPREQ + debug!("received property request command from client"); + peer.bytes.get_mut() + .write_all(&create_property_update_command()).await?; + debug!("sent property update command to client"); + } + 6 => { // SESSINIT + username = parse_session_initialization_command(msg.clone()).username; + debug!( + "received session initialization command from client: {}", + username + ); + peer.bytes.get_mut() + .write_all(&create_property_request_command()).await?; + debug!("sent session initialization command to client"); + } + 15 => { // PROPSET + let avatar = parse_property_set_command(msg.clone()); + debug!("received property set command from client: {}", avatar); + peer.bytes.get_mut() + .write_all(&create_text_command_with_action( + "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")? + )).await?; + debug!("sent worldsmaster greeting to client"); + } + 29 => { // BUDDYLISTUPDATE + let received_buddy = from_utf8( + msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap() + ).unwrap(); + debug!( + "received buddy list update command from client: {}", + received_buddy + ); + peer.bytes.get_mut() + .write_all(&create_buddy_list_notify_command(received_buddy)) + .await?; + debug!("sent buddy list notify command to client: {}", received_buddy); + } + 20 => { // ROOMIDRQ + let room_name = from_utf8( + msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!("received room id request command from client: {}", room_name); + let room_id; + if !room_ids.contains(&room_name.to_string()) { + room_ids.push(room_name.to_string()); + room_id = room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("inserted room '{}' as '{}'", room_name, room_id); + } else { + let position = room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("found room '{}' as '{}'", room_name, position); + room_id = position; + } + trace!("room name: {}, room id: {}", room_name, room_id); + trace!("{:?}", room_ids); + peer.bytes.get_mut() + .write_all(&create_room_id_redirect_command(room_name, room_id)) + .await?; + debug!("sent redirect id command to client: {} == {}", room_name, room_id); + } + 14 => { + let text = from_utf8( + msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!("received text command from client: {}", text); + let mut state = state.lock().await; + state.broadcast(&create_text_command(&username, text)).await; + debug!("broadcasted text command from client"); + } + 7 => { // SESSEXIT + debug!("received session exit command from client") + } + _ => (), + } + } + } + Some(Err(e)) => { + error!("error while processing messages: {}", e); break; + } + None => break, + } + } + } + + { // De-register client + state.lock().await.peers.remove(&count.to_string()); + debug!("removed peer: {}", count) + } + + Ok(()) + } +} diff --git a/src/server/room/cmd/property.rs b/src/server/room/cmd/property.rs index b75efad..3135d0a 100644 --- a/src/server/room/cmd/property.rs +++ b/src/server/room/cmd/property.rs @@ -1,47 +1,37 @@ -pub fn create_property_update_command() -> [u8; 161] { // Vec<u8> - // let mut property = Vec::with_capacity(2); - // property.push(0x01); // ? - // property.push(0x10); // Command type - // - // // Meaningful Data - // property.push(); // Property ID - // property.push(); // Flags - // property.push(); // Access - // - // // Insert data length as first byte. - // property.insert(0, property.len() as u8 + 1); // ^ - // - // property // Return created array +pub fn create_property_update_command() -> [u8; 161] { + // Vec<u8> + // let mut property = Vec::with_capacity(2); + // property.push(0x01); // ? + // property.push(0x10); // Command type + // + // // Meaningful Data + // property.push(); // Property ID + // property.push(); // Flags + // property.push(); // Access + // + // // Insert data length as first byte. + // property.insert(0, property.len() as u8 + 1); // ^ + // + // property // Return created array - [ - 0xA1, 0xFF, 0x10, 0x08, 0x80, 0x01, 0x07, 0x31, - 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x1B, 0x80, - 0x01, 0x0C, 0x77, 0x6F, 0x72, 0x6C, 0x64, 0x73, - 0x33, 0x64, 0x2E, 0x63, 0x6F, 0x6D, 0x1A, 0x80, - 0x01, 0x15, 0x6D, 0x61, 0x69, 0x6C, 0x2E, 0x75, - 0x73, 0x2E, 0x77, 0x6F, 0x72, 0x6C, 0x64, 0x73, - 0x2E, 0x6E, 0x65, 0x74, 0x3A, 0x32, 0x35, 0x19, - 0x80, 0x01, 0x28, 0x68, 0x74, 0x74, 0x70, 0x3A, - 0x2F, 0x2F, 0x77, 0x77, 0x77, 0x2D, 0x64, 0x79, - 0x6E, 0x61, 0x6D, 0x69, 0x63, 0x2E, 0x75, 0x73, - 0x2E, 0x77, 0x6F, 0x72, 0x6C, 0x64, 0x73, 0x2E, - 0x6E, 0x65, 0x74, 0x2F, 0x63, 0x67, 0x69, 0x2D, - 0x62, 0x69, 0x6E, 0x18, 0x80, 0x01, 0x1F, 0x68, - 0x74, 0x74, 0x70, 0x3A, 0x2F, 0x2F, 0x77, 0x77, - 0x77, 0x2D, 0x73, 0x74, 0x61, 0x74, 0x69, 0x63, - 0x2E, 0x75, 0x73, 0x2E, 0x77, 0x6F, 0x72, 0x6C, - 0x64, 0x73, 0x2E, 0x6E, 0x65, 0x74, 0x0F, 0x80, - 0x01, 0x01, 0x33, 0x03, 0x80, 0x01, 0x02, 0x32, - 0x34, 0x01, 0x80, 0x01, 0x0C, 0x57, 0x4F, 0x52, - 0x4C, 0x44, 0x53, 0x4D, 0x41, 0x53, 0x54, 0x45, - 0x52 - ]: [u8; 161] + [ + 0xA1, 0xFF, 0x10, 0x08, 0x80, 0x01, 0x07, 0x31, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x1B, 0x80, + 0x01, 0x0C, 0x77, 0x6F, 0x72, 0x6C, 0x64, 0x73, 0x33, 0x64, 0x2E, 0x63, 0x6F, 0x6D, 0x1A, 0x80, + 0x01, 0x15, 0x6D, 0x61, 0x69, 0x6C, 0x2E, 0x75, 0x73, 0x2E, 0x77, 0x6F, 0x72, 0x6C, 0x64, 0x73, + 0x2E, 0x6E, 0x65, 0x74, 0x3A, 0x32, 0x35, 0x19, 0x80, 0x01, 0x28, 0x68, 0x74, 0x74, 0x70, 0x3A, + 0x2F, 0x2F, 0x77, 0x77, 0x77, 0x2D, 0x64, 0x79, 0x6E, 0x61, 0x6D, 0x69, 0x63, 0x2E, 0x75, 0x73, + 0x2E, 0x77, 0x6F, 0x72, 0x6C, 0x64, 0x73, 0x2E, 0x6E, 0x65, 0x74, 0x2F, 0x63, 0x67, 0x69, 0x2D, + 0x62, 0x69, 0x6E, 0x18, 0x80, 0x01, 0x1F, 0x68, 0x74, 0x74, 0x70, 0x3A, 0x2F, 0x2F, 0x77, 0x77, + 0x77, 0x2D, 0x73, 0x74, 0x61, 0x74, 0x69, 0x63, 0x2E, 0x75, 0x73, 0x2E, 0x77, 0x6F, 0x72, 0x6C, + 0x64, 0x73, 0x2E, 0x6E, 0x65, 0x74, 0x0F, 0x80, 0x01, 0x01, 0x33, 0x03, 0x80, 0x01, 0x02, 0x32, + 0x34, 0x01, 0x80, 0x01, 0x0C, 0x57, 0x4F, 0x52, 0x4C, 0x44, 0x53, 0x4D, 0x41, 0x53, 0x54, 0x45, + 0x52, + ]: [u8; 161] } pub fn create_property_request_command() -> [u8; 22] { - [ - 0x16, 0x01, 0x06, 0x04, 0x01, 0x30, 0x0f, 0x01, - 0x33, 0x08, 0x07, 0x31, 0x30, 0x30, 0x30, 0x30, - 0x30, 0x30, 0x03, 0x02, 0x32, 0x34 - ]: [u8; 22] + [ + 0x16, 0x01, 0x06, 0x04, 0x01, 0x30, 0x0f, 0x01, 0x33, 0x08, 0x07, 0x31, 0x30, 0x30, 0x30, 0x30, + 0x30, 0x30, 0x03, 0x02, 0x32, 0x34, + ]: [u8; 22] } diff --git a/src/server/room/cmd/session.rs b/src/server/room/cmd/session.rs index f04c078..dee3931 100644 --- a/src/server/room/cmd/session.rs +++ b/src/server/room/cmd/session.rs @@ -1,18 +1,20 @@ -use crate::server::cmd::session::SessionInitializationCommand; -use bytes::BytesMut; use std::str::from_utf8; -pub fn parse_session_initialization_command( - command: BytesMut -) -> SessionInitializationCommand { - SessionInitializationCommand { - // protocol: command.get(4..4 + command.get(4)).unwrap().to_owned() as usize, - // client: "".to_string(), - username: from_utf8( - command.get( - 25..(24 + command.get(24).unwrap().to_owned() as usize + 1) - ).unwrap() - ).unwrap().to_string(), - // password: "".to_string() - } +use bytes::BytesMut; + +use crate::server::cmd::session::SessionInitializationCommand; + +pub fn parse_session_initialization_command(command: BytesMut) -> SessionInitializationCommand { + SessionInitializationCommand { + // protocol: command.get(4..4 + command.get(4)).unwrap().to_owned() as usize, + // client: "".to_string(), + username: from_utf8( + command + .get(25..(24 + command.get(24).unwrap().to_owned() as usize + 1)) + .unwrap(), + ) + .unwrap() + .to_string(), + // password: "".to_string() + } } diff --git a/src/server/room/server.rs b/src/server/room/server.rs index e08c3d7..63a13f6 100644 --- a/src/server/room/server.rs +++ b/src/server/room/server.rs @@ -1,168 +1,434 @@ -use std::error::Error; -use tokio::net::{TcpListener, TcpStream}; -use tokio::io::AsyncWriteExt; -use crate::server::room::cmd::property::{ - create_property_update_command, - create_property_request_command +use std::{ + collections::{HashMap, HashSet}, + error::Error, + io::{ErrorKind, Read, Write}, + str::from_utf8, }; -use tokio_util::codec::{BytesCodec, Decoder}; -use tokio_stream::StreamExt; -use crate::server::cmd::text::{create_text_command_with_action, create_text_command}; -use std::str::from_utf8; -use crate::server::cmd::buddy_list::create_buddy_list_notify_command; -use crate::server::auto::cmd::room::create_room_id_redirect_command; -use std::sync::Arc; -use tokio::sync::Mutex; -use crate::server::shared::Shared; -use crate::server::peer::Peer; -use std::net::SocketAddr; -use crate::server::room::cmd::session::parse_session_initialization_command; -use crate::server::parser::get_commands_from_buffer; -use crate::server::cmd::property::parse_property_set_command; +use bytes::BytesMut; +use mio::{ + event::Event, + net::{TcpListener, TcpStream}, + Events, + Interest, + Poll, + Registry, + Token, +}; + +use crate::{ + config::get_config, + server::{ + auto::cmd::{ + property::{create_property_request_command, create_property_update_command}, + room::create_room_id_redirect_command, + }, + cmd::{ + buddy_list::create_buddy_list_notify_command, + property::parse_property_set_command, + text::{create_text_command, create_text_command_with_action}, + }, + parser::get_commands_from_buffer, + room::cmd::session::parse_session_initialization_command, + }, +}; -pub struct RoomServer; +const SERVER: Token = Token(0); + +pub struct RoomServer { + pub clients: HashMap<Token, String>, + pub connections: HashMap<Token, TcpStream>, + pub room_ids: Vec<String>, +} impl RoomServer { - pub async fn listen(addr: &str) -> Result<(), Box<dyn Error>> { - let listener = TcpListener::bind(addr).await?; - debug!("RoomServer now listening on {}", listener.local_addr().unwrap()); - let state = Arc::new(Mutex::new(Shared::new())); - let mut counter = 0; + pub fn listen(&mut self, addr: &str) -> Result<(), Box<dyn Error>> { + let mut listener = TcpListener::bind(addr.parse().unwrap())?; + let mut poll = Poll::new()?; + let mut events = Events::with_capacity(1024); + let mut counter: usize = 0; + // let mut sockets = HashMap::new(); + let mut requests = HashMap::new(); + let mut buffer = [0 as u8; 1024]; + // let mut room_ids = vec![]; + + poll + .registry() + .register(&mut listener, Token(0), Interest::READABLE)?; + + debug!( + "RoomServer now listening on {}", + listener.local_addr().unwrap() + ); + + loop { + poll.poll(&mut events, None)?; + + for event in &events { + match event.token() { + Token(0) => { + loop { + match listener.accept() { + Ok((mut stream, address)) => { + counter += 1; + let token = Token(counter); + + poll + .registry() + .register(&mut stream, token, Interest::READABLE)?; - loop { - let (stream, address) = listener.accept().await?; - counter += 1; - let state = Arc::clone(&state); + debug!( + "registered peer with address '{}' as '{}'", + address, token.0 + ); - tokio::spawn(async move { - if let Err(e) = RoomServer::handle( - state, - stream, - address, - counter - ).await { - error!("an error occurred: {}", e); - } - }); - } - } + // sockets.insert(token, stream); + self.connections.insert(token, stream); + requests.insert(token, Vec::with_capacity(192)); + } + Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + Err(err) => { + error!("unexpected error: {}", err); + poll + .registry() + .deregister(self.connections.get_mut(&Token(counter)).unwrap())?; + break; + } + } + } + } + token if event.is_readable() => { + loop { + let read = self.connections.get_mut(&token).unwrap().read(&mut buffer); + match read { + Ok(0) => { + self.connections.remove(&token); + break; + } + Ok(n) => { + let req = requests.get_mut(&token).unwrap(); + for b in &buffer[0..n] { + req.push(*b); + } - pub async fn handle( - state: Arc<Mutex<Shared>>, - stream: TcpStream, - address: SocketAddr, - count: usize, - ) -> Result<(), Box<dyn Error>> { - let bytes = BytesCodec::new().framed(stream); - let mut peer = Peer::new(state.clone(), bytes, count.to_string()).await?; - debug!("registered peer with address '{}' as '{}'", address, count); - let mut room_ids: Vec<String> = Vec::new(); - let mut username: String = String::new(); + for cmd in get_commands_from_buffer(BytesMut::from(&buffer[..n])) { + match cmd.get(2).unwrap() { + 10 => { + // PROPREQ + debug!("received property request command from client 'null'"); + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_property_update_command()) + .unwrap(); + debug!("sent property update command to client 'null'"); + } + 6 => { + // SESSINIT + let local_username = parse_session_initialization_command(cmd).username; + self.clients.insert(token, local_username.clone()); + debug!( + "received session initialization command from client '{}'", + local_username, + ); + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_property_request_command()) + .unwrap(); + debug!( + "sent session initialization command to client '{}'", + local_username + ); + } + 15 => { + // PROPSET + let avatar = parse_property_set_command(cmd); + debug!( + "received property set command from client '{}': {}", + self.clients.get(&token).unwrap(), + avatar, + ); + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_text_command_with_action( + "WORLDSMASTER", + &get_config().unwrap().worldsmaster_greeting, + )) + .unwrap(); + debug!( + "sent session initialization command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + 29 => { + // BUDDYLISTUPDATE + let received_buddy = from_utf8( + cmd + .get(4..cmd.get(0).unwrap().to_owned() as usize - 1) + .unwrap(), + ) + .unwrap(); + debug!( + "received buddy list update command from client '{}': {}", + self.clients.get(&token).unwrap(), + received_buddy, + ); + self + .connections + .get_mut(&token) + .unwrap() + .write_all(&create_buddy_list_notify_command(received_buddy)) + .unwrap(); + debug!( + "sent buddy list notify command to client '{}'", + self.clients.get(&token).unwrap(), + ); + } + // 20 => { // ROOMIDRQ + // let room_name = from_utf8( + // cmd.get(4..cmd.get(0).unwrap().to_owned() as usize).unwrap() + // ).unwrap(); + // debug!( + // "received room id request command from client '{}': {}", + // self.clients.get(&token).unwrap(), + // room_name, + // ); + // let room_id; + // if !self.room_ids.contains(&room_name.to_string()) { + // self.room_ids.push(room_name.to_string()); + // room_id = self.room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("inserted room '{}' as '{}'", room_name, room_id); + // } else { + // let position = self.room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("found room '{}' as '{}'", room_name, position); + // room_id = position; + // } + // trace!("room name: {}, room id: {}", room_name, room_id); + // trace!("{:?}", self.room_ids); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_room_id_redirect_command( + // room_name, room_id, + // )).unwrap(); + // } + 14 => { + // TEXT + let text = + from_utf8(cmd.get(6..cmd.get(0).unwrap().to_owned() as usize).unwrap()) + .unwrap(); + let username = self.clients.get(&token).unwrap().clone(); + debug!( + "received text command from client '{}': {}", + username, + format!("room: {}", text), + ); + self.connections.iter_mut().for_each(|t| { + t.1 + .write_all(&create_text_command(&username, text)) + .unwrap() + }); + debug!("broadcasted text command to clients"); + } + 7 => { + // SESSEXIT + debug!( + "received session exit command from client '{}'", + self.clients.get(&token).unwrap(), + ); + } + _ => (), + } + } + } + Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + Err(err) => { + error!("unexpected error: {}", err); + break; + } + } + } + } + _ => (), + } + } + } + } - loop { - tokio::select! { - Some(msg) = peer.rx.recv() => { - // debug!("received bytes from peer: {:?}", &msg); - peer.bytes.get_mut().write_all(&msg).await?; - } - result = peer.bytes.next() => match result { - Some(Ok(msg)) => { - // let msg: BytesMut = msg; - for msg in get_commands_from_buffer(msg) { - match msg.get(2).unwrap() { - 10 => { // PROPREQ - debug!("received property request command from client"); - peer.bytes.get_mut() - .write_all(&create_property_update_command()).await?; - debug!("sent property update command to client"); - } - 6 => { // SESSINIT - username = parse_session_initialization_command(msg.clone()).username; - debug!( - "received session initialization command from client: {}", - username - ); - peer.bytes.get_mut() - .write_all(&create_property_request_command()).await?; - debug!("sent session initialization command to client"); - } - 15 => { // PROPSET - let avatar = parse_property_set_command(msg.clone()); - debug!("received property set command from client: {}", avatar); - peer.bytes.get_mut() - .write_all(&create_text_command_with_action( - "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")? - )).await?; - debug!("sent worldsmaster greeting to client"); - } - 29 => { // BUDDYLISTUPDATE - let received_buddy = from_utf8( - msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap() - ).unwrap(); - debug!( - "received buddy list update command from client: {}", - received_buddy - ); - peer.bytes.get_mut() - .write_all(&create_buddy_list_notify_command(received_buddy)) - .await?; - debug!("sent buddy list notify command to client: {}", received_buddy); - } - 20 => { // ROOMIDRQ - let room_name = from_utf8( - msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap() - ).unwrap(); - debug!("received room id request command from client: {}", room_name); - let room_id; - if !room_ids.contains(&room_name.to_string()) { - room_ids.push(room_name.to_string()); - room_id = room_ids.iter() - .position(|i| i == &room_name.to_string()) - .unwrap(); - trace!("inserted room '{}' as '{}'", room_name, room_id); - } else { - let position = room_ids.iter() - .position(|i| i == &room_name.to_string()) - .unwrap(); - trace!("found room '{}' as '{}'", room_name, position); - room_id = position; - } - trace!("room name: {}, room id: {}", room_name, room_id); - trace!("{:?}", room_ids); - peer.bytes.get_mut() - .write_all(&create_room_id_redirect_command(room_name, room_id)) - .await?; - debug!("sent redirect id command to client: {} == {}", room_name, room_id); - } - 14 => { - let text = from_utf8( - msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap() - ).unwrap(); - debug!("received text command from client: {}", text); - let mut state = state.lock().await; - state.broadcast(&create_text_command(&username, text)).await; - debug!("broadcasted text command from client"); - } - 7 => { // SESSEXIT - debug!("received session exit command from client") - } - _ => (), - } - } - } - Some(Err(e)) => { - error!("error while processing messages: {}", e); break; - } - None => break, - } - } - } + fn broadcast(sockets: &HashMap<Token, TcpStream>, cmd: &[u8]) -> () { + for mut socket in sockets { + socket.1.write_all(cmd).unwrap(); + } + } - { // De-register client - state.lock().await.peers.remove(&count.to_string()); - debug!("removed peer: {}", count) - } + // fn process( + // &mut self, + // _registry: &Registry, + // event: &Event, + // token: Token, + // ) -> Result<bool, Box<dyn Error>> { + // if event.is_readable() { + // let mut connection_closed = false; + // let mut received_data = vec![0; 4096]; + // let mut bytes_read = 0; + // + // let stream = self.connections.get_mut(&token).unwrap(); + // + // loop { + // match stream.read(&mut received_data[bytes_read..]) { + // Ok(0) => { + // connection_closed = true; + // break; + // } + // Ok(n) => { + // bytes_read += n; + // if bytes_read == received_data.len() { + // received_data.resize(received_data.len() + 1024, 0); + // } + // } + // Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + // Err(ref err) if err.kind() == ErrorKind::Interrupted => continue, + // Err(err) => return Err(Box::new(err)), + // } + // } + // + // if bytes_read != 0 { + // self.handle( + // &mut received_data[..bytes_read], + // token, + // ); + // } + // if connection_closed { + // println!("de-registered peer with token '{}'", token.0); + // return Ok(true); + // } + // } + // + // Ok(false) + // } - Ok(()) - } + // fn handle( + // &mut self, + // data: &[u8], + // // stream: &mut TcpStream, + // token: Token, + // ) -> () { + // // trace!("i am client: {:?}", self.clients.get(&token)); + // // debug!("{:?}", self.connections); + // for cmd in get_commands_from_buffer(BytesMut::from(data)) { + // debug!("received: {:?}", cmd); + // match cmd.get(2).unwrap() { + // 10 => { // PROPREQ + // debug!("received property request command from client 'null'"); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_property_update_command()).unwrap(); + // debug!("sent property update command to client 'null'"); + // } + // 6 => { // SESSINIT + // let local_username = + // parse_session_initialization_command(cmd).username; + // self.clients.insert(token, local_username.clone()); + // debug!( + // "received session initialization command from client '{}'", + // local_username, + // ); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_property_request_command()).unwrap(); + // debug!("sent session initialization command to client '{}'", local_username); + // } + // 15 => { // PROPSET + // let avatar = parse_property_set_command(cmd); + // debug!( + // "received property set command from client '{}': {}", + // self.clients.get(&token).unwrap(), + // avatar, + // ); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_text_command_with_action( + // "WORLDSMASTER", &get_config().unwrap().worldsmaster_greeting, + // )).unwrap(); + // debug!( + // "sent session initialization command to client '{}'", + // self.clients.get(&token).unwrap(), + // ); + // } + // 29 => { // BUDDYLISTUPDATE + // let received_buddy = from_utf8( + // cmd.get(4..cmd.get(0).unwrap().to_owned() as usize - 1).unwrap() + // ).unwrap(); + // debug!( + // "received buddy list update command from client '{}': {}", + // self.clients.get(&token).unwrap(), + // received_buddy, + // ); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_buddy_list_notify_command(received_buddy)).unwrap(); + // debug!( + // "sent buddy list notify command to client '{}'", + // self.clients.get(&token).unwrap(), + // ); + // } + // 20 => { // ROOMIDRQ + // let room_name = from_utf8( + // cmd.get(4..cmd.get(0).unwrap().to_owned() as usize).unwrap() + // ).unwrap(); + // debug!( + // "received room id request command from client '{}': {}", + // self.clients.get(&token).unwrap(), + // room_name, + // ); + // let room_id; + // if !self.room_ids.contains(&room_name.to_string()) { + // self.room_ids.push(room_name.to_string()); + // room_id = self.room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("inserted room '{}' as '{}'", room_name, room_id); + // } else { + // let position = self.room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("found room '{}' as '{}'", room_name, position); + // room_id = position; + // } + // trace!("room name: {}, room id: {}", room_name, room_id); + // trace!("{:?}", self.room_ids); + // self.connections.get_mut(&token).unwrap() + // .write_all(&create_room_id_redirect_command( + // room_name, room_id, + // )).unwrap(); + // } + // 14 => { // TEXT + // let text = from_utf8( + // cmd.get(6..cmd.get(0).unwrap().to_owned() as usize).unwrap() + // ).unwrap(); + // let username = self.clients.get(&token).unwrap().clone(); + // debug!( + // "received text command from client '{}': {}", + // username, text, + // ); + // self.connections.iter_mut().for_each(|t| + // t.1.write_all(&create_text_command( + // &username, + // text, + // )).unwrap() + // ); + // debug!("broadcasted text command to clients"); + // } + // 7 => { // SESSEXIT + // debug!( + // "received session exit command from client '{}'", + // self.clients.get(&token).unwrap(), + // ); + // } + // _ => (), + // } + // } + // } } diff --git a/src/server/shared.rs b/src/server/shared.rs deleted file mode 100644 index 9e01d3d..0000000 --- a/src/server/shared.rs +++ /dev/null @@ -1,40 +0,0 @@ -use std::collections::HashMap; -// use std::net::SocketAddr; -use bytes::BytesMut; -use crate::server::Tx; - -pub struct Shared { - pub peers: HashMap<String, Tx>, -} -impl Shared { - pub fn new() -> Self { - Shared { - peers: HashMap::new(), - } - } - - pub async fn broadcast(&mut self, /* sender: &str, */ message: &[u8]) { - // debug!("peer sent message: {:?}", message); - // debug!("peer count: {}", self.peers.len()); - // debug!("peers: {:?}", self.peers); - for peer in self.peers.iter_mut() { - // debug!("peer: {:?}", peer); - // TODO: - // thread 'tokio-runtime-worker' panicked at 'called `Option::unwrap()` on a `None` value' - peer.1.send(BytesMut::from(message)).unwrap(); - } - } - - // pub async fn broadcast(&mut self, sender: SocketAddr, message: &str) { - // for peer in self.peers.iter_mut() { - // if *peer.0 != sender { - // let _ = peer.1.send(message.into()); - // } - // } - // } -} -impl Default for Shared { - fn default() -> Self { - Self::new() - } -} diff --git a/src/utils/byte.rs b/src/utils/byte.rs deleted file mode 100644 index 01f10bb..0000000 --- a/src/utils/byte.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub fn convert_u16_to_two_u8s_be(integer: u16) -> [u8; 2] { - [(integer >> 8) as u8, integer as u8] -} diff --git a/src/utils/db.rs b/src/utils/db.rs index e6e1946..b95e159 100644 --- a/src/utils/db.rs +++ b/src/utils/db.rs @@ -1,17 +1,17 @@ -use sqlx::sqlite::SqlitePoolOptions; use std::error::Error; -use sqlx::SqlitePool; + +use sqlx::{sqlite::SqlitePoolOptions, SqlitePool}; pub async fn get_pool() -> Result<SqlitePool, Box<dyn Error>> { - let pool = SqlitePoolOptions::new() - .max_connections(20) - .connect(&std::env::var("DATABASE_URL")?) - .await?; + let pool = SqlitePoolOptions::new() + .max_connections(20) + .connect(&std::env::var("DATABASE_URL")?) + .await?; - debug!( - "connected to database at url '{}'", - &std::env::var("DATABASE_URL")? - ); + debug!( + "connected to database at url '{}'", + &std::env::var("DATABASE_URL")? + ); - Ok(pool) + Ok(pool) } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 4e23340..dec1023 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,2 +1 @@ -pub mod byte; pub mod db; |