aboutsummaryrefslogtreecommitdiff
path: root/src/server
diff options
context:
space:
mode:
authorFuwn <[email protected]>2021-03-28 15:12:32 +0000
committerFuwn <[email protected]>2021-03-28 15:12:32 +0000
commitc08d9a34680032f2477307d2ca346220df00ea5d (patch)
treeda92aebcdc34266f26bd904d4544bf5bd4b46b11 /src/server
parentFeature: Parse property set command (diff)
downloadwhirl-c08d9a34680032f2477307d2ca346220df00ea5d.tar.xz
whirl-c08d9a34680032f2477307d2ca346220df00ea5d.zip
fix: Implement parser
Fixes the problem that sometimes, commands are just skipped despite being sent from the client.
Diffstat (limited to 'src/server')
-rw-r--r--src/server/auto/server.rs247
-rw-r--r--src/server/mod.rs3
-rw-r--r--src/server/parser.rs37
-rw-r--r--src/server/room/server.rs165
4 files changed, 204 insertions, 248 deletions
diff --git a/src/server/auto/server.rs b/src/server/auto/server.rs
index c05ed5e..a2d5609 100644
--- a/src/server/auto/server.rs
+++ b/src/server/auto/server.rs
@@ -7,7 +7,6 @@ use crate::server::auto::cmd::property::{
};
use tokio_util::codec::{BytesCodec, Decoder};
use tokio_stream::StreamExt;
-use bytes::BytesMut;
use crate::server::cmd::text::{create_text_command_with_action, create_text_command};
use std::str::from_utf8;
use crate::server::cmd::buddy_list::create_buddy_list_notify_command;
@@ -18,6 +17,8 @@ use crate::server::shared::Shared;
use crate::server::peer::Peer;
use std::net::SocketAddr;
use crate::server::auto::cmd::session::parse_session_initialization_command;
+use crate::server::parser::get_commands_from_buffer;
+use crate::server::cmd::property::parse_property_set_command;
pub struct AutoServer;
impl AutoServer {
@@ -57,91 +58,6 @@ impl AutoServer {
let mut room_ids: Vec<String> = Vec::new();
let mut username: String = String::new();
- // while let Some(msg) = peer.bytes.next().await {
- // match msg {
- // Ok(bytes) => match bytes.get(2).unwrap() {
- // 10 => { // PROPREQ
- // debug!("received property request command from client");
- // peer.bytes.get_mut()
- // .write_all(&create_property_update_command()).await?;
- //
- // debug!("sent property update command to client");
- // }
- // 6 => { // SESSINIT
- // username = parse_session_initialization_command(bytes).username;
- // debug!(
- // "received session initialization command from client: {}",
- // username
- // );
- // peer.bytes.get_mut()
- // .write_all(&create_property_request_command()).await?;
- // debug!("sent session initialization command to client");
- // }
- // 15 => { // PROPSET
- // debug!("received property set command from client");
- // peer.bytes.get_mut()
- // .write_all(&create_text_command_with_action(
- // "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")?
- // )).await?;
- // debug!("sent worldsmaster greeting to client");
- // }
- // 29 => { // BUDDYLISTUPDATE
- // let received_buddy = from_utf8(
- // bytes.get(4..bytes.get(0).unwrap().to_owned() as usize - 1).unwrap()
- // ).unwrap();
- // debug!(
- // "received buddy list update command from client: {}",
- // received_buddy
- // );
- // peer.bytes.get_mut()
- // .write_all(&create_buddy_list_notify_command(received_buddy))
- // .await?;
- // debug!("sent buddy list notify command to client");
- // }
- // 20 => { // ROOMIDRQ
- // let room_name = from_utf8(
- // bytes.get(4..bytes.get(0).unwrap().to_owned() as usize).unwrap()
- // ).unwrap();
- // debug!("received room id request command from client: {}", room_name);
- // let room_id;
- // if !room_ids.contains(&room_name.to_string()) {
- // room_ids.push(room_name.to_string());
- // room_id = room_ids.iter()
- // .position(|i| i == &room_name.to_string())
- // .unwrap();
- // trace!("inserted room '{}' as '{}'", room_name, room_id);
- // } else {
- // let position = room_ids.iter()
- // .position(|i| i == &room_name.to_string())
- // .unwrap();
- // trace!("found room '{}' as '{}'", room_name, position);
- // room_id = position;
- // }
- // trace!("room name: {}, room id: {}", room_name, room_id);
- // trace!("{:?}", room_ids);
- // peer.bytes.get_mut()
- // .write_all(&create_room_id_redirect_command(room_name, room_id))
- // .await?;
- // debug!("sent redirect id command to client");
- // }
- // 14 => {
- // let text = from_utf8(
- // bytes.get(6..bytes.get(0).unwrap().to_owned() as usize).unwrap()
- // ).unwrap();
- // debug!("received text command from client: {}", text);
- // let mut state = state.lock().await;
- // state.broadcast(&create_text_command(&username, text)).await;
- // debug!("broadcasted text command from client");
- // }
- // 7 => { // SESSEXIT
- // debug!("received session exit command from client")
- // }
- // _ => (),
- // }
- // Err(err) => error!("stream closed with error: {:?}", err),
- // }
- // }
-
loop {
tokio::select! {
Some(msg) = peer.rx.recv() => {
@@ -150,85 +66,87 @@ impl AutoServer {
}
result = peer.bytes.next() => match result {
Some(Ok(msg)) => {
- let msg: BytesMut = msg;
- match msg.get(2).unwrap() {
- 10 => { // PROPREQ
- debug!("received property request command from client");
- peer.bytes.get_mut()
- .write_all(&create_property_update_command()).await?;
-
- debug!("sent property update command to client");
- }
- 6 => { // SESSINIT
- username = parse_session_initialization_command(msg).username;
- debug!(
- "received session initialization command from client: {}",
- username
- );
- peer.bytes.get_mut()
- .write_all(&create_property_request_command()).await?;
- debug!("sent session initialization command to client");
- }
- 15 => { // PROPSET
- debug!("received property set command from client");
- peer.bytes.get_mut()
- .write_all(&create_text_command_with_action(
- "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")?
- )).await?;
- debug!("sent worldsmaster greeting to client");
- }
- 29 => { // BUDDYLISTUPDATE
- let received_buddy = from_utf8(
- msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap()
- ).unwrap();
- debug!(
- "received buddy list update command from client: {}",
- received_buddy
- );
- peer.bytes.get_mut()
- .write_all(&create_buddy_list_notify_command(received_buddy))
- .await?;
- debug!("sent buddy list notify command to client");
- }
- 20 => { // ROOMIDRQ
- let room_name = from_utf8(
- msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap()
- ).unwrap();
- debug!("received room id request command from client: {}", room_name);
- let room_id;
- if !room_ids.contains(&room_name.to_string()) {
- room_ids.push(room_name.to_string());
- room_id = room_ids.iter()
- .position(|i| i == &room_name.to_string())
- .unwrap();
- trace!("inserted room '{}' as '{}'", room_name, room_id);
- } else {
- let position = room_ids.iter()
- .position(|i| i == &room_name.to_string())
- .unwrap();
- trace!("found room '{}' as '{}'", room_name, position);
- room_id = position;
+ // let msg: BytesMut = msg;
+ for msg in get_commands_from_buffer(msg).iter_mut() {
+ match msg.get(2).unwrap() {
+ 10 => { // PROPREQ
+ debug!("received property request command from client");
+ peer.bytes.get_mut()
+ .write_all(&create_property_update_command()).await?;
+ debug!("sent property update command to client");
}
- trace!("room name: {}, room id: {}", room_name, room_id);
- trace!("{:?}", room_ids);
- peer.bytes.get_mut()
- .write_all(&create_room_id_redirect_command(room_name, room_id))
- .await?;
- debug!("sent redirect id command to client");
- }
- 14 => {
- let text = from_utf8(
- msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap()
- ).unwrap();
- debug!("received text command from client: {}", text);
- let mut state = state.lock().await;
- state.broadcast(&create_text_command(&username, text)).await;
- debug!("broadcasted text command from client");
- }
- 7 => { // SESSEXIT
- debug!("received session exit command from client")
+ 6 => { // SESSINIT
+ username = parse_session_initialization_command(msg.clone()).username;
+ debug!(
+ "received session initialization command from client: {}",
+ username
+ );
+ peer.bytes.get_mut()
+ .write_all(&create_property_request_command()).await?;
+ debug!("sent session initialization command to client");
+ }
+ 15 => { // PROPSET
+ let avatar = parse_property_set_command(msg.clone());
+ debug!("received property set command from client: {}", avatar);
+ peer.bytes.get_mut()
+ .write_all(&create_text_command_with_action(
+ "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")?
+ )).await?;
+ debug!("sent worldsmaster greeting to client");
+ }
+ 29 => { // BUDDYLISTUPDATE
+ let received_buddy = from_utf8(
+ msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap()
+ ).unwrap();
+ debug!(
+ "received buddy list update command from client: {}",
+ received_buddy
+ );
+ peer.bytes.get_mut()
+ .write_all(&create_buddy_list_notify_command(received_buddy))
+ .await?;
+ debug!("sent buddy list notify command to client: {}", received_buddy);
+ }
+ 20 => { // ROOMIDRQ
+ let room_name = from_utf8(
+ msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap()
+ ).unwrap();
+ debug!("received room id request command from client: {}", room_name);
+ let room_id;
+ if !room_ids.contains(&room_name.to_string()) {
+ room_ids.push(room_name.to_string());
+ room_id = room_ids.iter()
+ .position(|i| i == &room_name.to_string())
+ .unwrap();
+ trace!("inserted room '{}' as '{}'", room_name, room_id);
+ } else {
+ let position = room_ids.iter()
+ .position(|i| i == &room_name.to_string())
+ .unwrap();
+ trace!("found room '{}' as '{}'", room_name, position);
+ room_id = position;
+ }
+ trace!("room name: {}, room id: {}", room_name, room_id);
+ trace!("{:?}", room_ids);
+ peer.bytes.get_mut()
+ .write_all(&create_room_id_redirect_command(room_name, room_id))
+ .await?;
+ debug!("sent redirect id command to client: {} == {}", room_name, room_id);
+ }
+ 14 => {
+ let text = from_utf8(
+ msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap()
+ ).unwrap();
+ debug!("received text command from client: {}", text);
+ let mut state = state.lock().await;
+ state.broadcast(&create_text_command(&username, text)).await;
+ debug!("broadcasted text command from client");
+ }
+ 7 => { // SESSEXIT
+ debug!("received session exit command from client")
+ }
+ _ => (),
}
- _ => (),
}
}
Some(Err(e)) => {
@@ -239,8 +157,7 @@ impl AutoServer {
}
}
- // De-register client
- {
+ { // De-register client
state.lock().await.peers.remove(&count.to_string());
debug!("removed peer: {}", count)
}
diff --git a/src/server/mod.rs b/src/server/mod.rs
index 3698d84..a6341a2 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -4,8 +4,9 @@ use bytes::BytesMut;
pub mod auto;
mod cmd;
pub mod room;
-mod shared;
+mod parser;
mod peer;
+mod shared;
type Tx = mpsc::UnboundedSender<BytesMut>;
type Rx = mpsc::UnboundedReceiver<BytesMut>;
diff --git a/src/server/parser.rs b/src/server/parser.rs
new file mode 100644
index 0000000..39065e6
--- /dev/null
+++ b/src/server/parser.rs
@@ -0,0 +1,37 @@
+use bytes::BytesMut;
+
+/// Read all commands from the given buffer.
+///
+/// # Process
+/// 1. Get a command from `buffer` based on first byte.
+/// 2. Push command to `commands`.
+/// 3. Remove command from `buffer`.
+/// 4. Iterate and do this for all commands within `buffer`.
+pub fn get_commands_from_buffer(mut buffer: BytesMut) -> Vec<BytesMut> {
+ let mut commands: Vec<BytesMut> = Vec::new();
+ // debug!("initial buffer: {:?}, length: {}", buffer, buffer.len());
+
+ let data_length = buffer.get(0).unwrap().to_owned() as usize;
+ if buffer.len() > data_length {
+ loop {
+ // debug!("loop: {:?}, length: {}", buffer, buffer.len());
+ let command_length = buffer.get(0).unwrap().to_owned() as usize;
+ commands.push(
+ BytesMut::from(buffer.get(0..command_length).unwrap())
+ );
+
+ // Remove command from buffer
+ buffer = buffer.split_off(command_length);
+
+ // Check if any more commands are present
+ if buffer.len() == 0 { break; }
+ }
+ } else {
+ // There will always be at least one command, push it.
+ commands.push(
+ BytesMut::from(buffer.get(0..data_length).unwrap())
+ );
+ }
+
+ commands // Return command (s)
+}
diff --git a/src/server/room/server.rs b/src/server/room/server.rs
index 8781868..48b4d5c 100644
--- a/src/server/room/server.rs
+++ b/src/server/room/server.rs
@@ -7,7 +7,6 @@ use crate::server::room::cmd::property::{
};
use tokio_util::codec::{BytesCodec, Decoder};
use tokio_stream::StreamExt;
-use bytes::BytesMut;
use crate::server::cmd::text::{create_text_command_with_action, create_text_command};
use std::str::from_utf8;
use crate::server::cmd::buddy_list::create_buddy_list_notify_command;
@@ -18,6 +17,9 @@ use crate::server::shared::Shared;
use crate::server::peer::Peer;
use std::net::SocketAddr;
use crate::server::room::cmd::session::parse_session_initialization_command;
+use crate::server::parser::get_commands_from_buffer;
+use crate::server::cmd::property::parse_property_set_command;
+
pub struct RoomServer;
impl RoomServer {
@@ -65,87 +67,87 @@ impl RoomServer {
}
result = peer.bytes.next() => match result {
Some(Ok(msg)) => {
- // let mut state = state.lock().await;
- // state.broadcast("", &msg).await;
- let msg: BytesMut = msg;
- match msg.get(2).unwrap() {
- 10 => { // PROPREQ
- debug!("received property request command from client");
- peer.bytes.get_mut()
- .write_all(&create_property_update_command()).await?;
-
- debug!("sent property update command to client");
- }
- 6 => { // SESSINIT
- username = parse_session_initialization_command(msg).username;
- debug!(
- "received session initialization command from client: {}",
- username
- );
- peer.bytes.get_mut()
- .write_all(&create_property_request_command()).await?;
- debug!("sent session initialization command to client");
- }
- 15 => { // PROPSET
- debug!("received property set command from client");
- peer.bytes.get_mut()
- .write_all(&create_text_command_with_action(
- "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")?
- )).await?;
- debug!("sent worldsmaster greeting to client");
- }
- 29 => { // BUDDYLISTUPDATE
- let received_buddy = from_utf8(
- msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap()
- ).unwrap();
- debug!(
- "received buddy list update command from client: {}",
- received_buddy
- );
- peer.bytes.get_mut()
- .write_all(&create_buddy_list_notify_command(received_buddy))
- .await?;
- debug!("sent buddy list notify command to client");
- }
- 20 => { // ROOMIDRQ
- let room_name = from_utf8(
- msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap()
- ).unwrap();
- debug!("received room id request command from client: {}", room_name);
- let room_id;
- if !room_ids.contains(&room_name.to_string()) {
- room_ids.push(room_name.to_string());
- room_id = room_ids.iter()
- .position(|i| i == &room_name.to_string())
- .unwrap();
- trace!("inserted room '{}' as '{}'", room_name, room_id);
- } else {
- let position = room_ids.iter()
- .position(|i| i == &room_name.to_string())
- .unwrap();
- trace!("found room '{}' as '{}'", room_name, position);
- room_id = position;
+ // let msg: BytesMut = msg;
+ for msg in get_commands_from_buffer(msg).iter_mut() {
+ match msg.get(2).unwrap() {
+ 10 => { // PROPREQ
+ debug!("received property request command from client");
+ peer.bytes.get_mut()
+ .write_all(&create_property_update_command()).await?;
+ debug!("sent property update command to client");
}
- trace!("room name: {}, room id: {}", room_name, room_id);
- trace!("{:?}", room_ids);
- peer.bytes.get_mut()
- .write_all(&create_room_id_redirect_command(room_name, room_id))
- .await?;
- debug!("sent redirect id command to client");
- }
- 14 => {
- let text = from_utf8(
- msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap()
- ).unwrap();
- debug!("received text command from client: {}", text);
- let mut state = state.lock().await;
- state.broadcast(&create_text_command(&username, text)).await;
- debug!("broadcasted text command from client");
- }
- 7 => { // SESSEXIT
- debug!("received session exit command from client")
+ 6 => { // SESSINIT
+ username = parse_session_initialization_command(msg.clone()).username;
+ debug!(
+ "received session initialization command from client: {}",
+ username
+ );
+ peer.bytes.get_mut()
+ .write_all(&create_property_request_command()).await?;
+ debug!("sent session initialization command to client");
+ }
+ 15 => { // PROPSET
+ let avatar = parse_property_set_command(msg.clone());
+ debug!("received property set command from client: {}", avatar);
+ peer.bytes.get_mut()
+ .write_all(&create_text_command_with_action(
+ "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")?
+ )).await?;
+ debug!("sent worldsmaster greeting to client");
+ }
+ 29 => { // BUDDYLISTUPDATE
+ let received_buddy = from_utf8(
+ msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap()
+ ).unwrap();
+ debug!(
+ "received buddy list update command from client: {}",
+ received_buddy
+ );
+ peer.bytes.get_mut()
+ .write_all(&create_buddy_list_notify_command(received_buddy))
+ .await?;
+ debug!("sent buddy list notify command to client: {}", received_buddy);
+ }
+ // 20 => { // ROOMIDRQ
+ // let room_name = from_utf8(
+ // msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap()
+ // ).unwrap();
+ // debug!("received room id request command from client: {}", room_name);
+ // let room_id;
+ // if !room_ids.contains(&room_name.to_string()) {
+ // room_ids.push(room_name.to_string());
+ // room_id = room_ids.iter()
+ // .position(|i| i == &room_name.to_string())
+ // .unwrap();
+ // trace!("inserted room '{}' as '{}'", room_name, room_id);
+ // } else {
+ // let position = room_ids.iter()
+ // .position(|i| i == &room_name.to_string())
+ // .unwrap();
+ // trace!("found room '{}' as '{}'", room_name, position);
+ // room_id = position;
+ // }
+ // trace!("room name: {}, room id: {}", room_name, room_id);
+ // trace!("{:?}", room_ids);
+ // peer.bytes.get_mut()
+ // .write_all(&create_room_id_redirect_command(room_name, room_id))
+ // .await?;
+ // debug!("sent redirect id command to client: {} == {}", room_name, room_id);
+ // }
+ 14 => {
+ let text = from_utf8(
+ msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap()
+ ).unwrap();
+ debug!("received text command from client: {}", text);
+ let mut state = state.lock().await;
+ state.broadcast(&create_text_command(&username, text)).await;
+ debug!("broadcasted text command from client");
+ }
+ 7 => { // SESSEXIT
+ debug!("received session exit command from client")
+ }
+ _ => (),
}
- _ => (),
}
}
Some(Err(e)) => {
@@ -156,8 +158,7 @@ impl RoomServer {
}
}
- // De-register client
- {
+ { // De-register client
state.lock().await.peers.remove(&count.to_string());
debug!("removed peer: {}", count)
}