diff options
| author | Fuwn <[email protected]> | 2021-03-28 15:12:32 +0000 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2021-03-28 15:12:32 +0000 |
| commit | c08d9a34680032f2477307d2ca346220df00ea5d (patch) | |
| tree | da92aebcdc34266f26bd904d4544bf5bd4b46b11 /src/server | |
| parent | Feature: Parse property set command (diff) | |
| download | whirl-c08d9a34680032f2477307d2ca346220df00ea5d.tar.xz whirl-c08d9a34680032f2477307d2ca346220df00ea5d.zip | |
fix: Implement parser
Fixes the problem that sometimes, commands are just skipped despite being sent from the client.
Diffstat (limited to 'src/server')
| -rw-r--r-- | src/server/auto/server.rs | 247 | ||||
| -rw-r--r-- | src/server/mod.rs | 3 | ||||
| -rw-r--r-- | src/server/parser.rs | 37 | ||||
| -rw-r--r-- | src/server/room/server.rs | 165 |
4 files changed, 204 insertions, 248 deletions
diff --git a/src/server/auto/server.rs b/src/server/auto/server.rs index c05ed5e..a2d5609 100644 --- a/src/server/auto/server.rs +++ b/src/server/auto/server.rs @@ -7,7 +7,6 @@ use crate::server::auto::cmd::property::{ }; use tokio_util::codec::{BytesCodec, Decoder}; use tokio_stream::StreamExt; -use bytes::BytesMut; use crate::server::cmd::text::{create_text_command_with_action, create_text_command}; use std::str::from_utf8; use crate::server::cmd::buddy_list::create_buddy_list_notify_command; @@ -18,6 +17,8 @@ use crate::server::shared::Shared; use crate::server::peer::Peer; use std::net::SocketAddr; use crate::server::auto::cmd::session::parse_session_initialization_command; +use crate::server::parser::get_commands_from_buffer; +use crate::server::cmd::property::parse_property_set_command; pub struct AutoServer; impl AutoServer { @@ -57,91 +58,6 @@ impl AutoServer { let mut room_ids: Vec<String> = Vec::new(); let mut username: String = String::new(); - // while let Some(msg) = peer.bytes.next().await { - // match msg { - // Ok(bytes) => match bytes.get(2).unwrap() { - // 10 => { // PROPREQ - // debug!("received property request command from client"); - // peer.bytes.get_mut() - // .write_all(&create_property_update_command()).await?; - // - // debug!("sent property update command to client"); - // } - // 6 => { // SESSINIT - // username = parse_session_initialization_command(bytes).username; - // debug!( - // "received session initialization command from client: {}", - // username - // ); - // peer.bytes.get_mut() - // .write_all(&create_property_request_command()).await?; - // debug!("sent session initialization command to client"); - // } - // 15 => { // PROPSET - // debug!("received property set command from client"); - // peer.bytes.get_mut() - // .write_all(&create_text_command_with_action( - // "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")? - // )).await?; - // debug!("sent worldsmaster greeting to client"); - // } - // 29 => { // BUDDYLISTUPDATE - // let received_buddy = from_utf8( - // bytes.get(4..bytes.get(0).unwrap().to_owned() as usize - 1).unwrap() - // ).unwrap(); - // debug!( - // "received buddy list update command from client: {}", - // received_buddy - // ); - // peer.bytes.get_mut() - // .write_all(&create_buddy_list_notify_command(received_buddy)) - // .await?; - // debug!("sent buddy list notify command to client"); - // } - // 20 => { // ROOMIDRQ - // let room_name = from_utf8( - // bytes.get(4..bytes.get(0).unwrap().to_owned() as usize).unwrap() - // ).unwrap(); - // debug!("received room id request command from client: {}", room_name); - // let room_id; - // if !room_ids.contains(&room_name.to_string()) { - // room_ids.push(room_name.to_string()); - // room_id = room_ids.iter() - // .position(|i| i == &room_name.to_string()) - // .unwrap(); - // trace!("inserted room '{}' as '{}'", room_name, room_id); - // } else { - // let position = room_ids.iter() - // .position(|i| i == &room_name.to_string()) - // .unwrap(); - // trace!("found room '{}' as '{}'", room_name, position); - // room_id = position; - // } - // trace!("room name: {}, room id: {}", room_name, room_id); - // trace!("{:?}", room_ids); - // peer.bytes.get_mut() - // .write_all(&create_room_id_redirect_command(room_name, room_id)) - // .await?; - // debug!("sent redirect id command to client"); - // } - // 14 => { - // let text = from_utf8( - // bytes.get(6..bytes.get(0).unwrap().to_owned() as usize).unwrap() - // ).unwrap(); - // debug!("received text command from client: {}", text); - // let mut state = state.lock().await; - // state.broadcast(&create_text_command(&username, text)).await; - // debug!("broadcasted text command from client"); - // } - // 7 => { // SESSEXIT - // debug!("received session exit command from client") - // } - // _ => (), - // } - // Err(err) => error!("stream closed with error: {:?}", err), - // } - // } - loop { tokio::select! { Some(msg) = peer.rx.recv() => { @@ -150,85 +66,87 @@ impl AutoServer { } result = peer.bytes.next() => match result { Some(Ok(msg)) => { - let msg: BytesMut = msg; - match msg.get(2).unwrap() { - 10 => { // PROPREQ - debug!("received property request command from client"); - peer.bytes.get_mut() - .write_all(&create_property_update_command()).await?; - - debug!("sent property update command to client"); - } - 6 => { // SESSINIT - username = parse_session_initialization_command(msg).username; - debug!( - "received session initialization command from client: {}", - username - ); - peer.bytes.get_mut() - .write_all(&create_property_request_command()).await?; - debug!("sent session initialization command to client"); - } - 15 => { // PROPSET - debug!("received property set command from client"); - peer.bytes.get_mut() - .write_all(&create_text_command_with_action( - "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")? - )).await?; - debug!("sent worldsmaster greeting to client"); - } - 29 => { // BUDDYLISTUPDATE - let received_buddy = from_utf8( - msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap() - ).unwrap(); - debug!( - "received buddy list update command from client: {}", - received_buddy - ); - peer.bytes.get_mut() - .write_all(&create_buddy_list_notify_command(received_buddy)) - .await?; - debug!("sent buddy list notify command to client"); - } - 20 => { // ROOMIDRQ - let room_name = from_utf8( - msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap() - ).unwrap(); - debug!("received room id request command from client: {}", room_name); - let room_id; - if !room_ids.contains(&room_name.to_string()) { - room_ids.push(room_name.to_string()); - room_id = room_ids.iter() - .position(|i| i == &room_name.to_string()) - .unwrap(); - trace!("inserted room '{}' as '{}'", room_name, room_id); - } else { - let position = room_ids.iter() - .position(|i| i == &room_name.to_string()) - .unwrap(); - trace!("found room '{}' as '{}'", room_name, position); - room_id = position; + // let msg: BytesMut = msg; + for msg in get_commands_from_buffer(msg).iter_mut() { + match msg.get(2).unwrap() { + 10 => { // PROPREQ + debug!("received property request command from client"); + peer.bytes.get_mut() + .write_all(&create_property_update_command()).await?; + debug!("sent property update command to client"); } - trace!("room name: {}, room id: {}", room_name, room_id); - trace!("{:?}", room_ids); - peer.bytes.get_mut() - .write_all(&create_room_id_redirect_command(room_name, room_id)) - .await?; - debug!("sent redirect id command to client"); - } - 14 => { - let text = from_utf8( - msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap() - ).unwrap(); - debug!("received text command from client: {}", text); - let mut state = state.lock().await; - state.broadcast(&create_text_command(&username, text)).await; - debug!("broadcasted text command from client"); - } - 7 => { // SESSEXIT - debug!("received session exit command from client") + 6 => { // SESSINIT + username = parse_session_initialization_command(msg.clone()).username; + debug!( + "received session initialization command from client: {}", + username + ); + peer.bytes.get_mut() + .write_all(&create_property_request_command()).await?; + debug!("sent session initialization command to client"); + } + 15 => { // PROPSET + let avatar = parse_property_set_command(msg.clone()); + debug!("received property set command from client: {}", avatar); + peer.bytes.get_mut() + .write_all(&create_text_command_with_action( + "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")? + )).await?; + debug!("sent worldsmaster greeting to client"); + } + 29 => { // BUDDYLISTUPDATE + let received_buddy = from_utf8( + msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap() + ).unwrap(); + debug!( + "received buddy list update command from client: {}", + received_buddy + ); + peer.bytes.get_mut() + .write_all(&create_buddy_list_notify_command(received_buddy)) + .await?; + debug!("sent buddy list notify command to client: {}", received_buddy); + } + 20 => { // ROOMIDRQ + let room_name = from_utf8( + msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!("received room id request command from client: {}", room_name); + let room_id; + if !room_ids.contains(&room_name.to_string()) { + room_ids.push(room_name.to_string()); + room_id = room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("inserted room '{}' as '{}'", room_name, room_id); + } else { + let position = room_ids.iter() + .position(|i| i == &room_name.to_string()) + .unwrap(); + trace!("found room '{}' as '{}'", room_name, position); + room_id = position; + } + trace!("room name: {}, room id: {}", room_name, room_id); + trace!("{:?}", room_ids); + peer.bytes.get_mut() + .write_all(&create_room_id_redirect_command(room_name, room_id)) + .await?; + debug!("sent redirect id command to client: {} == {}", room_name, room_id); + } + 14 => { + let text = from_utf8( + msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!("received text command from client: {}", text); + let mut state = state.lock().await; + state.broadcast(&create_text_command(&username, text)).await; + debug!("broadcasted text command from client"); + } + 7 => { // SESSEXIT + debug!("received session exit command from client") + } + _ => (), } - _ => (), } } Some(Err(e)) => { @@ -239,8 +157,7 @@ impl AutoServer { } } - // De-register client - { + { // De-register client state.lock().await.peers.remove(&count.to_string()); debug!("removed peer: {}", count) } diff --git a/src/server/mod.rs b/src/server/mod.rs index 3698d84..a6341a2 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -4,8 +4,9 @@ use bytes::BytesMut; pub mod auto; mod cmd; pub mod room; -mod shared; +mod parser; mod peer; +mod shared; type Tx = mpsc::UnboundedSender<BytesMut>; type Rx = mpsc::UnboundedReceiver<BytesMut>; diff --git a/src/server/parser.rs b/src/server/parser.rs new file mode 100644 index 0000000..39065e6 --- /dev/null +++ b/src/server/parser.rs @@ -0,0 +1,37 @@ +use bytes::BytesMut; + +/// Read all commands from the given buffer. +/// +/// # Process +/// 1. Get a command from `buffer` based on first byte. +/// 2. Push command to `commands`. +/// 3. Remove command from `buffer`. +/// 4. Iterate and do this for all commands within `buffer`. +pub fn get_commands_from_buffer(mut buffer: BytesMut) -> Vec<BytesMut> { + let mut commands: Vec<BytesMut> = Vec::new(); + // debug!("initial buffer: {:?}, length: {}", buffer, buffer.len()); + + let data_length = buffer.get(0).unwrap().to_owned() as usize; + if buffer.len() > data_length { + loop { + // debug!("loop: {:?}, length: {}", buffer, buffer.len()); + let command_length = buffer.get(0).unwrap().to_owned() as usize; + commands.push( + BytesMut::from(buffer.get(0..command_length).unwrap()) + ); + + // Remove command from buffer + buffer = buffer.split_off(command_length); + + // Check if any more commands are present + if buffer.len() == 0 { break; } + } + } else { + // There will always be at least one command, push it. + commands.push( + BytesMut::from(buffer.get(0..data_length).unwrap()) + ); + } + + commands // Return command (s) +} diff --git a/src/server/room/server.rs b/src/server/room/server.rs index 8781868..48b4d5c 100644 --- a/src/server/room/server.rs +++ b/src/server/room/server.rs @@ -7,7 +7,6 @@ use crate::server::room::cmd::property::{ }; use tokio_util::codec::{BytesCodec, Decoder}; use tokio_stream::StreamExt; -use bytes::BytesMut; use crate::server::cmd::text::{create_text_command_with_action, create_text_command}; use std::str::from_utf8; use crate::server::cmd::buddy_list::create_buddy_list_notify_command; @@ -18,6 +17,9 @@ use crate::server::shared::Shared; use crate::server::peer::Peer; use std::net::SocketAddr; use crate::server::room::cmd::session::parse_session_initialization_command; +use crate::server::parser::get_commands_from_buffer; +use crate::server::cmd::property::parse_property_set_command; + pub struct RoomServer; impl RoomServer { @@ -65,87 +67,87 @@ impl RoomServer { } result = peer.bytes.next() => match result { Some(Ok(msg)) => { - // let mut state = state.lock().await; - // state.broadcast("", &msg).await; - let msg: BytesMut = msg; - match msg.get(2).unwrap() { - 10 => { // PROPREQ - debug!("received property request command from client"); - peer.bytes.get_mut() - .write_all(&create_property_update_command()).await?; - - debug!("sent property update command to client"); - } - 6 => { // SESSINIT - username = parse_session_initialization_command(msg).username; - debug!( - "received session initialization command from client: {}", - username - ); - peer.bytes.get_mut() - .write_all(&create_property_request_command()).await?; - debug!("sent session initialization command to client"); - } - 15 => { // PROPSET - debug!("received property set command from client"); - peer.bytes.get_mut() - .write_all(&create_text_command_with_action( - "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")? - )).await?; - debug!("sent worldsmaster greeting to client"); - } - 29 => { // BUDDYLISTUPDATE - let received_buddy = from_utf8( - msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap() - ).unwrap(); - debug!( - "received buddy list update command from client: {}", - received_buddy - ); - peer.bytes.get_mut() - .write_all(&create_buddy_list_notify_command(received_buddy)) - .await?; - debug!("sent buddy list notify command to client"); - } - 20 => { // ROOMIDRQ - let room_name = from_utf8( - msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap() - ).unwrap(); - debug!("received room id request command from client: {}", room_name); - let room_id; - if !room_ids.contains(&room_name.to_string()) { - room_ids.push(room_name.to_string()); - room_id = room_ids.iter() - .position(|i| i == &room_name.to_string()) - .unwrap(); - trace!("inserted room '{}' as '{}'", room_name, room_id); - } else { - let position = room_ids.iter() - .position(|i| i == &room_name.to_string()) - .unwrap(); - trace!("found room '{}' as '{}'", room_name, position); - room_id = position; + // let msg: BytesMut = msg; + for msg in get_commands_from_buffer(msg).iter_mut() { + match msg.get(2).unwrap() { + 10 => { // PROPREQ + debug!("received property request command from client"); + peer.bytes.get_mut() + .write_all(&create_property_update_command()).await?; + debug!("sent property update command to client"); } - trace!("room name: {}, room id: {}", room_name, room_id); - trace!("{:?}", room_ids); - peer.bytes.get_mut() - .write_all(&create_room_id_redirect_command(room_name, room_id)) - .await?; - debug!("sent redirect id command to client"); - } - 14 => { - let text = from_utf8( - msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap() - ).unwrap(); - debug!("received text command from client: {}", text); - let mut state = state.lock().await; - state.broadcast(&create_text_command(&username, text)).await; - debug!("broadcasted text command from client"); - } - 7 => { // SESSEXIT - debug!("received session exit command from client") + 6 => { // SESSINIT + username = parse_session_initialization_command(msg.clone()).username; + debug!( + "received session initialization command from client: {}", + username + ); + peer.bytes.get_mut() + .write_all(&create_property_request_command()).await?; + debug!("sent session initialization command to client"); + } + 15 => { // PROPSET + let avatar = parse_property_set_command(msg.clone()); + debug!("received property set command from client: {}", avatar); + peer.bytes.get_mut() + .write_all(&create_text_command_with_action( + "WORLDSMASTER", &std::env::var("WORLDSMASTER_GREETING")? + )).await?; + debug!("sent worldsmaster greeting to client"); + } + 29 => { // BUDDYLISTUPDATE + let received_buddy = from_utf8( + msg.get(4..msg.get(0).unwrap().to_owned() as usize - 1).unwrap() + ).unwrap(); + debug!( + "received buddy list update command from client: {}", + received_buddy + ); + peer.bytes.get_mut() + .write_all(&create_buddy_list_notify_command(received_buddy)) + .await?; + debug!("sent buddy list notify command to client: {}", received_buddy); + } + // 20 => { // ROOMIDRQ + // let room_name = from_utf8( + // msg.get(4..msg.get(0).unwrap().to_owned() as usize).unwrap() + // ).unwrap(); + // debug!("received room id request command from client: {}", room_name); + // let room_id; + // if !room_ids.contains(&room_name.to_string()) { + // room_ids.push(room_name.to_string()); + // room_id = room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("inserted room '{}' as '{}'", room_name, room_id); + // } else { + // let position = room_ids.iter() + // .position(|i| i == &room_name.to_string()) + // .unwrap(); + // trace!("found room '{}' as '{}'", room_name, position); + // room_id = position; + // } + // trace!("room name: {}, room id: {}", room_name, room_id); + // trace!("{:?}", room_ids); + // peer.bytes.get_mut() + // .write_all(&create_room_id_redirect_command(room_name, room_id)) + // .await?; + // debug!("sent redirect id command to client: {} == {}", room_name, room_id); + // } + 14 => { + let text = from_utf8( + msg.get(6..msg.get(0).unwrap().to_owned() as usize).unwrap() + ).unwrap(); + debug!("received text command from client: {}", text); + let mut state = state.lock().await; + state.broadcast(&create_text_command(&username, text)).await; + debug!("broadcasted text command from client"); + } + 7 => { // SESSEXIT + debug!("received session exit command from client") + } + _ => (), } - _ => (), } } Some(Err(e)) => { @@ -156,8 +158,7 @@ impl RoomServer { } } - // De-register client - { + { // De-register client state.lock().await.peers.remove(&count.to_string()); debug!("removed peer: {}", count) } |