Skip to content

Commit

Permalink
support file sync
Browse files Browse the repository at this point in the history
  • Loading branch information
jcs090218 committed Jun 10, 2024
1 parent aeacbeb commit 726d121
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 29 deletions.
6 changes: 5 additions & 1 deletion src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ impl Channel {
// Create a channel for this peer
let (_tx, _rx) = mpsc::unbounded_channel();

room.peers.insert(_connection.addr, _tx);
let addr = _connection.addr;
//let addr = _connection.stream.peer_addr().unwrap();
room.peers.insert(addr, _tx);

//room.peers.insert(_connection.addr, _tx);

Self {
read_buf: [0; BUF_SIZE],
Expand Down
1 change: 1 addition & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::handler;
use crate::room::*;
use crate::user::*;

#[derive(PartialEq)]
pub struct Client {
entered: bool, // Is inside the room?
path: String, // workspace path
Expand Down
65 changes: 59 additions & 6 deletions src/handler/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,67 @@ pub mod users {
use crate::channel::*;
use crate::handler::room::*;
use crate::room::*;
use crate::util::*;
use serde_json::Value;
use std::sync::Arc;
use tokio::sync::Mutex;

const METHOD: &str = "file::users";

pub async fn handle(channel: &mut Channel, room: &Arc<Mutex<Room>>, json: &Value) {
// TODO: ..
let addr = &channel.get_connection().addr;
let room = room.lock().await;
let client = room.get_client(addr).unwrap();

// XXX: Get this early to avoid borrow errors.
let file_path = data_str(json, "file").unwrap();
let local_path = to_room_path(addr, &room, &file_path);

if !check_entered(channel, &client, METHOD).await {
return;
}

let this_user = client.user().unwrap();

// If user is not in the file, ignore it.
if this_user.path.is_none() {
return;
}

// Prepare data to send.
let mut users = Vec::new();

for _client in room.get_clients().iter() {
let user = _client.user();

if user.is_none() {
continue;
}

let user = user.unwrap();

// Ignore the sender client.
if this_user == user {
continue;
}

// Ignore if not in the same file.
if local_path != user.path.clone().unwrap() {
continue;
}

users.push(user.clone());
}

let users = serde_json::to_string(&users).unwrap();

channel
.send_json(&serde_json::json!({
"method": METHOD,
"clients": users,
"status": "success",
}))
.await;
}
}

Expand All @@ -45,13 +98,13 @@ pub mod sync {

pub async fn handle(channel: &mut Channel, room: &Arc<Mutex<Room>>, json: &Value) {
let addr = &channel.get_connection().addr;
let mut room = room.lock().await;
let room = room.lock().await;

// XXX: Get this early to avoid borrow errors.
let file_path = data_str(json, "file").unwrap();
let local_path = to_room_path(addr, &mut room, &file_path);
let local_path = to_room_path(addr, &room, &file_path);

let client = room.get_client_mut(addr).unwrap();
let client = room.get_client(addr).unwrap();

// Check entered the room.
if !check_entered(channel, client, METHOD).await {
Expand Down Expand Up @@ -85,8 +138,8 @@ pub mod say {

pub async fn handle(channel: &mut Channel, room: &Arc<Mutex<Room>>, json: &Value) {
let addr = &channel.get_connection().addr;
let mut room = room.lock().await;
let client = room.get_client_mut(addr).unwrap();
let room = room.lock().await;
let client = room.get_client(addr).unwrap();

if !check_entered(channel, client, METHOD).await {
return;
Expand Down
3 changes: 2 additions & 1 deletion src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub async fn handle(channel: &mut Channel, room: &Arc<Mutex<Room>>, json: &str)
"room::users" => room::users::handle(channel, room, &val).await,
"room::sync" => room::sync::handle(channel, room, &val).await,
"room::update" => room::update::handle(channel, room, &val).await,
"file::users" => file::users::handle(channel, room, &val).await,
"file::say" => file::say::handle(channel, room, &val).await,
"file::sync" => file::sync::handle(channel, room, &val).await,
_ => {
Expand All @@ -61,7 +62,7 @@ mod test {
pub async fn handle(channel: &mut Channel, room: &Arc<Mutex<Room>>, json: &Value) {
tracing::trace!("method: {:?}", json["method"]);

let mut room = room.lock().await;
let room = room.lock().await;

channel
.send_json(&serde_json::json!({
Expand Down
30 changes: 16 additions & 14 deletions src/handler/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use tokio::sync::Mutex;
/// * `channel` - Send error message when not entered.
/// * `client` - The client to check to see if entered.
/// * `method` - The method id.
pub async fn check_entered(channel: &mut Channel, client: &mut Client, method: &str) -> bool {
pub async fn check_entered(channel: &mut Channel, client: &Client, method: &str) -> bool {
if client.entered() {
return true;
}
Expand Down Expand Up @@ -71,7 +71,7 @@ pub async fn ensure_entered(channel: &mut Channel, room: &Arc<Mutex<Room>>, meth
/// * `channel` - Used when sending the error message.
/// * `client` - Client to see if it has the admin privileges.
/// * `method` - The method id.
pub async fn check_admin(channel: &mut Channel, client: &mut Client, method: &str) -> bool {
pub async fn check_admin(channel: &mut Channel, client: &Client, method: &str) -> bool {
if client.admin() {
return true;
}
Expand All @@ -94,9 +94,9 @@ pub async fn check_admin(channel: &mut Channel, client: &mut Client, method: &st
/// * `addr` - Socket address used to get the client's project path.
/// * `room` - Used to get client and room path.
/// * `path` - Path we want to convert.
pub fn to_room_path(addr: &SocketAddr, room: &mut Room, path: &String) -> String {
pub fn to_room_path(addr: &SocketAddr, room: &Room, path: &str) -> String {
let server_path = room.get_path().clone();
let client = room.get_client_mut(addr).unwrap();
let client = room.get_client(addr).unwrap();
let project_path = client.get_path();
path.replace(project_path, &server_path)
}
Expand Down Expand Up @@ -213,14 +213,14 @@ pub mod kick {
pub async fn handle(channel: &mut Channel, room: &Arc<Mutex<Room>>, json: &Value) {
let addr = &channel.get_connection().addr;
let mut room = room.lock().await;
let client = room.get_client_mut(addr).unwrap();
let client = room.get_client(addr).unwrap();

if !check_entered(channel, client, METHOD).await {
return;
}

// Only the admin privileges can kick the user out!
if !check_admin(channel, client, METHOD).await {
if !check_admin(channel, &client, METHOD).await {
return;
}

Expand Down Expand Up @@ -269,8 +269,8 @@ pub mod broadcast {

pub async fn handle(channel: &mut Channel, room: &Arc<Mutex<Room>>, json: &Value) {
let addr = &channel.get_connection().addr;
let mut room = room.lock().await;
let client = room.get_client_mut(addr).unwrap();
let room = room.lock().await;
let client = room.get_client(addr).unwrap();

let username = client.user().unwrap().username.clone();

Expand Down Expand Up @@ -338,17 +338,17 @@ pub mod users {

pub async fn handle(channel: &mut Channel, room: &Arc<Mutex<Room>>, json: &Value) {
let addr = &channel.get_connection().addr;
let mut room = room.lock().await;
let client = room.get_client_mut(addr).unwrap();
let room = room.lock().await;
let client = room.get_client(addr).unwrap();

if !check_entered(channel, client, METHOD).await {
return;
}

let mut users = Vec::new();

for client in room.get_clients().iter_mut() {
let user = client.user_mut();
for client in room.get_clients().iter() {
let user = client.user();

users.push(user.unwrap().clone());
}
Expand Down Expand Up @@ -379,8 +379,10 @@ pub mod sync {

pub async fn handle(channel: &mut Channel, room: &Arc<Mutex<Room>>, json: &Value) {
let addr = &channel.get_connection().addr;
let mut room = room.lock().await;
let client = room.get_client_mut(addr).unwrap();
let room = room.lock().await;
let client = room.get_client(addr).unwrap();

println!("addr: {:?}", addr);

if !check_entered(channel, client, METHOD).await {
return;
Expand Down
5 changes: 3 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ async fn start_server(prop: &Properties, port: u16, working_dir: &str, password:
println!("Start room server :::");
let host = prop.get_or_default("cogru.Host", DEFAULT_HOST);

println!("host: {}", host);

let room = Room::new(working_dir, password);
let mut server = Server::new(&host, port, room);
let _ = server.start().await;
Expand Down Expand Up @@ -159,6 +157,9 @@ async fn main() {

let mut port = matches.get_one::<String>("port").unwrap();

// XXX: If the port is the same as default port, we
// assumed the user did not input the port number.
// Let's respect the properties' port instead.
if port == DEFAULT_PORT {
port = &prop_port;
}
Expand Down
11 changes: 8 additions & 3 deletions src/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ impl Room {
/// # Arguments
///
/// * `params` - [description]
pub fn broadcast_json(&mut self, params: &Value) {
for (addr, sender) in self.peers.iter_mut() {
pub fn broadcast_json(&self, params: &Value) {
for (addr, sender) in self.peers.iter() {
let _ = sender.send(params.to_string());
}
}
Expand Down Expand Up @@ -197,7 +197,12 @@ impl Room {
}

/// Return a list of client.
pub fn get_clients(&mut self) -> Vec<&mut Client> {
pub fn get_clients(&self) -> Vec<&Client> {
self.clients.values().collect::<Vec<&Client>>()
}

/// Return a list of client.
pub fn get_clients_mut(&mut self) -> Vec<&mut Client> {
self.clients.values_mut().collect::<Vec<&mut Client>>()
}

Expand Down
4 changes: 2 additions & 2 deletions src/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, PartialEq)]
pub struct Region {
pub start: Option<u64>,
pub end: Option<u64>,
Expand All @@ -30,7 +30,7 @@ impl Region {
}
}

#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, PartialEq)]
pub struct User {
pub username: String,
pub path: Option<String>, // the user's location
Expand Down

0 comments on commit 726d121

Please sign in to comment.