From e0f4299ea4ea811b9e6d8e9c96fad92bbdbc8b64 Mon Sep 17 00:00:00 2001 From: a-wing <1@233.email> Date: Tue, 8 Oct 2024 02:07:30 +0800 Subject: [PATCH] feat(net4mqtt): add xdata --- libs/net4mqtt/bin/main.rs | 21 +++-- libs/net4mqtt/src/proxy.rs | 145 ++++++++++++++++++++++++++++---- libs/net4mqtt/src/tests.rs | 164 +++++++++++++++++++++++++++++++++++++ libs/net4mqtt/src/topic.rs | 16 +++- liveion/src/lib.rs | 1 + liveman/src/lib.rs | 1 + 6 files changed, 323 insertions(+), 25 deletions(-) diff --git a/libs/net4mqtt/bin/main.rs b/libs/net4mqtt/bin/main.rs index 918e572..c8861cf 100644 --- a/libs/net4mqtt/bin/main.rs +++ b/libs/net4mqtt/bin/main.rs @@ -113,6 +113,7 @@ async fn main() { (&agent_id, &id), Some(domain), None, + None, kcp, ) .await @@ -131,14 +132,22 @@ async fn main() { if udp { let sock = UdpSocket::bind(listen).await.unwrap(); - proxy::local_ports_udp(&mqtt_url, sock, target, (&agent_id, &id), None) + proxy::local_ports_udp(&mqtt_url, sock, target, (&agent_id, &id), None, None) .await .unwrap(); } else { let listener = TcpListener::bind(listen).await.unwrap(); - proxy::local_ports_tcp(&mqtt_url, listener, target, (&agent_id, &id), None, kcp) - .await - .unwrap(); + proxy::local_ports_tcp( + &mqtt_url, + listener, + target, + (&agent_id, &id), + None, + None, + kcp, + ) + .await + .unwrap(); } } Commands::Agent { @@ -148,7 +157,9 @@ async fn main() { } => { info!("Running as agent, {:?}", target); - proxy::agent(&mqtt_url, &target, &id, None).await.unwrap(); + proxy::agent(&mqtt_url, &target, &id, None, None) + .await + .unwrap(); } } } diff --git a/libs/net4mqtt/src/proxy.rs b/libs/net4mqtt/src/proxy.rs index 662145d..8e6c235 100644 --- a/libs/net4mqtt/src/proxy.rs +++ b/libs/net4mqtt/src/proxy.rs @@ -209,26 +209,41 @@ async fn mqtt_client_init( mqtt_url: &str, topic_io_sub: (&str, &str, &str), id: (&str, &str), - vdata: VDataConfig, + vdata: Option, + xdata: Option, ) -> Result< ( AsyncClient, EventLoop, String, Option)>>, + Option)>>, + UnboundedReceiver<(String, Vec)>, ), Error, > { let (agent_id, local_id) = id; let (url, prefix) = crate::utils::pre_url(mqtt_url.parse::()?); + let (x_sender, x_receiver) = { + let XDataConfig { sender, receiver } = xdata.unwrap_or_default(); + ( + sender, + receiver.unwrap_or_else(|| { + let (sender, receiver) = unbounded_channel::<(String, Vec)>(); + std::mem::forget(sender); + receiver + }), + ) + }; + let VDataConfig { online, offline, receiver, - } = vdata; + } = vdata.unwrap_or_default(); let topic_v_sub = topic::build_sub(&prefix, topic::ANY, topic::ANY, topic::label::V); - let topic_v_pub = topic::build_pub_x(&prefix, agent_id, local_id, topic::label::V); + let topic_v_pub = topic::build_pub_x(&prefix, agent_id, local_id, topic::label::V, topic::NIL); let mut mqtt_options = rumqttc::MqttOptions::parse_url(url)?; debug!("mqtt_options: {:?}", mqtt_options); @@ -269,7 +284,17 @@ async fn mqtt_client_init( .unwrap(); } - Ok((client, eventloop, prefix, receiver)) + // MQTT subscribe at label::X + if x_sender.is_some() { + client + .subscribe( + topic::build_sub(&prefix, topic::ANY, topic::ANY, topic::label::X), + QoS::AtMostOnce, + ) + .await + .unwrap(); + } + Ok((client, eventloop, prefix, receiver, x_sender, x_receiver)) } #[derive(Default)] @@ -279,6 +304,12 @@ pub struct VDataConfig { pub receiver: Option)>>, } +#[derive(Default)] +pub struct XDataConfig { + pub sender: Option)>>, + pub receiver: Option)>>, +} + /// Agent service /// /// # Arguments @@ -287,23 +318,26 @@ pub struct VDataConfig { /// * `address` - The age of the default target address, if local no set `DST`, use this as address. /// * `agent_id` - The ID is all agents unique ID. /// * `vdata` - MQTT system message: (online, offline, on_receiver(online, offline)) +/// * `xdata` - User message: (sender, receiver) /// /// # Examples /// /// ``` -/// net4mqtt::proxy::agent("mqtt://127.0.0.1:1883", "127.0.0.1:4444", "agent-0", None); +/// net4mqtt::proxy::agent("mqtt://127.0.0.1:1883", "127.0.0.1:4444", "agent-0", None, None); /// ``` pub async fn agent( mqtt_url: &str, address: &str, agent_id: &str, vdata: Option, + xdata: Option, ) -> Result<(), Error> { - let (client, mut eventloop, _prefix, on_vdata) = mqtt_client_init( + let (client, mut eventloop, prefix, on_vdata, x_sender, mut x_receiver) = mqtt_client_init( mqtt_url, (agent_id, topic::ANY, topic::label::I), (agent_id, topic::NIL), - vdata.unwrap_or_default(), + vdata, + xdata, ) .await?; @@ -316,8 +350,21 @@ pub async fn agent( let (sender, mut receiver) = unbounded_channel::<(String, Vec)>(); loop { let sender = sender.clone(); + let x_sender = x_sender.clone(); let on_vdata = on_vdata.clone(); select! { + result = x_receiver.recv() => { + match result { + Some((key, data)) => { + client.publish(topic::build_pub_x(&prefix, agent_id, topic::NIL, topic::label::X, &key), + QoS::AtMostOnce, + false, + data + ).await?; + } + None => return Err(anyhow!("recv error")) + } + } result = receiver.recv() => { match result { Some((key, data)) => { @@ -344,6 +391,11 @@ pub async fn agent( s.send((agent_id.to_string(), local_id.to_string(), p.payload.to_vec())).await?; } }, + topic::label::X => { + if let Some(s) = x_sender { + s.send((protocol.to_string(), p.payload.to_vec()))?; + } + }, _ => { let sender = match senders.get(&p.topic) { Some(sender) => sender, @@ -387,15 +439,17 @@ pub async fn local_ports_tcp( target: Option, id: (&str, &str), vdata: Option, + xdata: Option, tcp_over_kcp: bool, ) -> Result<(), Error> { let (agent_id, local_id) = id; - let (client, mut eventloop, prefix, on_vdata) = mqtt_client_init( + let (client, mut eventloop, prefix, on_vdata, x_sender, mut x_receiver) = mqtt_client_init( mqtt_url, (topic::ANY, local_id, topic::label::O), (topic::NIL, local_id), - vdata.unwrap_or_default(), + vdata, + xdata, ) .await?; @@ -408,6 +462,7 @@ pub async fn local_ports_tcp( let (sender, mut receiver) = unbounded_channel::<(String, Vec)>(); loop { let sender = sender.clone(); + let x_sender = x_sender.clone(); let on_vdata = on_vdata.clone(); select! { Ok((socket, _)) = listener.accept() => { @@ -428,7 +483,18 @@ pub async fn local_ports_tcp( } { error!("local vnet error: {}", e) }; }); } - + result = x_receiver.recv() => { + match result { + Some((key, data)) => { + client.publish(topic::build_pub_x(&prefix, topic::NIL, local_id, topic::label::X, &key), + QoS::AtMostOnce, + false, + data + ).await?; + } + None => return Err(anyhow!("recv error")) + } + } result = receiver.recv() => { match result { Some((key, data)) => { @@ -455,6 +521,11 @@ pub async fn local_ports_tcp( s.send((agent_id.to_string(), local_id.to_string(), p.payload.to_vec())).await?; } }, + (topic::label::X, _) => { + if let Some(s) = x_sender { + s.send((protocol.to_string(), p.payload.to_vec()))?; + } + }, (_, topic::protocol::KCP | topic::protocol::TCP) => { if let Some(sender) = senders.get(&p.topic) { if sender.is_closed() { @@ -486,14 +557,16 @@ pub async fn local_ports_udp( target: Option, id: (&str, &str), vdata: Option, + xdata: Option, ) -> Result<(), Error> { let (agent_id, local_id) = id; - let (client, mut eventloop, prefix, on_vdata) = mqtt_client_init( + let (client, mut eventloop, prefix, on_vdata, x_sender, mut x_receiver) = mqtt_client_init( mqtt_url, (topic::ANY, local_id, topic::label::O), (topic::NIL, local_id), - vdata.unwrap_or_default(), + vdata, + xdata, ) .await?; @@ -502,6 +575,7 @@ pub async fn local_ports_udp( let (sender, mut receiver) = unbounded_channel::<(String, Vec)>(); loop { let sender = sender.clone(); + let x_sender = x_sender.clone(); let on_vdata = on_vdata.clone(); select! { Ok((len, addr)) = sock.recv_from(&mut buf) => { @@ -509,6 +583,18 @@ pub async fn local_ports_udp( topic::build(&prefix, agent_id, local_id, topic::label::I, topic::protocol::UDP, &addr.to_string(), &target), buf[..len].to_vec())).unwrap(); } + result = x_receiver.recv() => { + match result { + Some((key, data)) => { + client.publish(topic::build_pub_x(&prefix, topic::NIL, local_id, topic::label::X, &key), + QoS::AtMostOnce, + false, + data + ).await?; + } + None => return Err(anyhow!("recv error")) + } + } result = receiver.recv() => { match result { Some((key, data)) => { @@ -535,6 +621,11 @@ pub async fn local_ports_udp( s.send((agent_id.to_string(), local_id.to_string(), p.payload.to_vec())).await?; } }, + (topic::label::X, _) => { + if let Some(s) = x_sender { + s.send((protocol.to_string(), p.payload.to_vec()))?; + } + }, (_, topic::protocol::UDP) => { let _ = sock.send_to(&p.payload, src).await?; }, (label, protocol) => info!("unknown label: {} and protocol: {}", label, protocol) } @@ -572,15 +663,17 @@ pub async fn local_socks( id: (&str, &str), domain: Option, vdata: Option, + xdata: Option, tcp_over_kcp: bool, ) -> Result<(), Error> { let (agent_id, local_id) = id; - let (client, mut eventloop, prefix, on_vdata) = mqtt_client_init( + let (client, mut eventloop, prefix, on_vdata, x_sender, mut x_receiver) = mqtt_client_init( mqtt_url, (topic::ANY, local_id, topic::label::O), (topic::NIL, local_id), - vdata.unwrap_or_default(), + vdata, + xdata, ) .await?; @@ -592,7 +685,8 @@ pub async fn local_socks( ); let (sender, mut receiver) = unbounded_channel::<(String, Vec)>(); loop { - let sender_clone = sender.clone(); + let sender = sender.clone(); + let x_sender = x_sender.clone(); let on_vdata = on_vdata.clone(); select! { Ok((conn, _)) = server.accept() => { @@ -618,9 +712,9 @@ pub async fn local_socks( senders.insert(key_recv, vnet_tx); task::spawn(async move { if let Err(e) = if tcp_over_kcp { - up_kcp_vnet(socket, key_send, sender_clone, vnet_rx).await + up_kcp_vnet(socket, key_send, sender, vnet_rx).await } else { - up_tcp_vnet(socket, key_send, sender_clone, vnet_rx).await + up_tcp_vnet(socket, key_send, sender, vnet_rx).await } { error!("local vnet error: {}", e) }; }); @@ -629,6 +723,18 @@ pub async fn local_socks( } } + result = x_receiver.recv() => { + match result { + Some((key, data)) => { + client.publish(topic::build_pub_x(&prefix, topic::NIL, local_id, topic::label::X, &key), + QoS::AtMostOnce, + false, + data + ).await?; + } + None => return Err(anyhow!("recv error")) + } + } result = receiver.recv() => { match result { Some((key, data)) => { @@ -655,6 +761,11 @@ pub async fn local_socks( s.send((agent_id.to_string(), local_id.to_string(), p.payload.to_vec())).await.unwrap(); } }, + (topic::label::X, _) => { + if let Some(s) = x_sender { + s.send((protocol.to_string(), p.payload.to_vec()))?; + } + }, (_, topic::protocol::KCP | topic::protocol::TCP) => { if let Some(sender) = senders.get(&p.topic) { if sender.is_closed() { diff --git a/libs/net4mqtt/src/tests.rs b/libs/net4mqtt/src/tests.rs index 6b0c59a..99d75d8 100644 --- a/libs/net4mqtt/src/tests.rs +++ b/libs/net4mqtt/src/tests.rs @@ -178,6 +178,7 @@ async fn helper_cluster_up(cfg: Config) -> Vec { &addr.to_string(), &id.to_string(), None, + None, )) }); } @@ -201,6 +202,7 @@ async fn helper_cluster_up(cfg: Config) -> Vec { None, (&id.to_string(), &format!("local-{}", id)), None, + None, cfg.kcp, )) }); @@ -220,6 +222,7 @@ async fn helper_cluster_up(cfg: Config) -> Vec { None, (&id.to_string(), &format!("local-{}", id)), None, + None, )) }); } @@ -241,6 +244,7 @@ async fn helper_cluster_up(cfg: Config) -> Vec { (&id.to_string(), &format!("socks-{}", id)), Some(DOMAIN_SUFFIX.to_string()), None, + None, cfg.kcp, )) }); @@ -774,6 +778,7 @@ async fn test_socks_multiple_server() { &addr.to_string(), &id.to_string(), None, + None, )) }); } @@ -850,6 +855,7 @@ async fn test_vdata() { receiver: Some(sender), ..Default::default() }), + None, )) }); @@ -871,6 +877,7 @@ async fn test_vdata() { online: Some(msg_1_clone), ..Default::default() }), + None, )) }); time::sleep(time::Duration::from_millis(100)).await; @@ -891,6 +898,7 @@ async fn test_vdata() { online: Some(msg_2_clone), ..Default::default() }), + None, )) }); @@ -913,6 +921,7 @@ async fn test_vdata() { online: Some(msg_3_clone), ..Default::default() }), + None, false, )) }); @@ -936,6 +945,7 @@ async fn test_vdata() { online: Some(msg_4_clone), ..Default::default() }), + None, false, )) }); @@ -956,3 +966,157 @@ async fn test_vdata() { assert_eq!(msg_4, data); assert_eq!("socks-x", local_id); } + +#[tokio::test] +async fn test_xdata() { + let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); + let mqtt_broker_port: u16 = pick_unused_port().expect("No ports free"); + let agent_port: u16 = pick_unused_port().expect("No ports free"); + let agent_port_1: u16 = pick_unused_port().expect("No ports free"); + let agent_port_2: u16 = pick_unused_port().expect("No ports free"); + helper_cluster_up(Config { + ip, + kcp: true, + broker: mqtt_broker_port, + ..Default::default() + }) + .await; + + let msg_1: Vec = "xxx".bytes().collect(); + let msg_2: Vec = "yyyyyyyy".bytes().collect(); + let msg_3: Vec = "333".bytes().collect(); + let msg_4: Vec = "4444".bytes().collect(); + + let (sender_a, receiver) = tokio::sync::mpsc::unbounded_channel::<(String, Vec)>(); + let (sender, mut receiver_a) = tokio::sync::mpsc::unbounded_channel::<(String, Vec)>(); + + thread::spawn(move || { + let id = 'a'; + let addr = SocketAddr::new(ip, agent_port); + tokio_test::block_on(proxy::agent( + &format!( + "mqtt://{}/{}?client_id=test-proxy-agent-{}", + SocketAddr::new(ip, mqtt_broker_port), + MQTT_TOPIC_PREFIX, + id + ), + &addr.to_string(), + &id.to_string(), + None, + Some(proxy::XDataConfig { + sender: Some(sender), + receiver: Some(receiver), + }), + )) + }); + + let (sender_b, receiver) = tokio::sync::mpsc::unbounded_channel::<(String, Vec)>(); + let (sender, mut receiver_b) = tokio::sync::mpsc::unbounded_channel::<(String, Vec)>(); + + thread::spawn(move || { + let id = 'b'; + let addr = SocketAddr::new(ip, agent_port_1); + tokio_test::block_on(proxy::agent( + &format!( + "mqtt://{}/{}?client_id=test-proxy-agent-{}", + SocketAddr::new(ip, mqtt_broker_port), + MQTT_TOPIC_PREFIX, + id + ), + &addr.to_string(), + &id.to_string(), + None, + Some(proxy::XDataConfig { + sender: Some(sender), + receiver: Some(receiver), + }), + )) + }); + + let (sender_c, receiver) = tokio::sync::mpsc::unbounded_channel::<(String, Vec)>(); + let (sender, mut receiver_c) = tokio::sync::mpsc::unbounded_channel::<(String, Vec)>(); + + thread::spawn(move || { + let id = 'c'; + let addr = SocketAddr::new(ip, agent_port_2); + tokio_test::block_on(proxy::agent( + &format!( + "mqtt://{}/{}?client_id=test-proxy-agent-{}", + SocketAddr::new(ip, mqtt_broker_port), + MQTT_TOPIC_PREFIX, + id + ), + &addr.to_string(), + &id.to_string(), + None, + Some(proxy::XDataConfig { + sender: Some(sender), + receiver: Some(receiver), + }), + )) + }); + + time::sleep(time::Duration::from_millis(100)).await; + + let ev = "test".to_string(); + + sender_a.send((ev.clone(), msg_1.clone())).unwrap(); + assert_eq!(Some((ev.clone(), msg_1.clone())), receiver_a.recv().await); + assert_eq!(Some((ev.clone(), msg_1.clone())), receiver_b.recv().await); + assert_eq!(Some((ev.clone(), msg_1.clone())), receiver_c.recv().await); + + sender_b.send((ev.clone(), msg_2.clone())).unwrap(); + assert_eq!(Some((ev.clone(), msg_2.clone())), receiver_a.recv().await); + assert_eq!(Some((ev.clone(), msg_2.clone())), receiver_b.recv().await); + assert_eq!(Some((ev.clone(), msg_2.clone())), receiver_c.recv().await); + + sender_c.send((ev.clone(), msg_3.clone())).unwrap(); + assert_eq!(Some((ev.clone(), msg_3.clone())), receiver_a.recv().await); + assert_eq!(Some((ev.clone(), msg_3.clone())), receiver_b.recv().await); + assert_eq!(Some((ev.clone(), msg_3.clone())), receiver_c.recv().await); + + time::sleep(time::Duration::from_millis(100)).await; + assert!(receiver_a.is_empty()); + assert!(receiver_b.is_empty()); + assert!(receiver_c.is_empty()); + + sender_a.send((ev.clone(), msg_2.clone())).unwrap(); + assert_eq!(Some((ev.clone(), msg_2.clone())), receiver_a.recv().await); + assert_eq!(Some((ev.clone(), msg_2.clone())), receiver_b.recv().await); + assert_eq!(Some((ev.clone(), msg_2.clone())), receiver_c.recv().await); + + sender_b.send((ev.clone(), msg_1.clone())).unwrap(); + assert_eq!(Some((ev.clone(), msg_1.clone())), receiver_a.recv().await); + assert_eq!(Some((ev.clone(), msg_1.clone())), receiver_b.recv().await); + assert_eq!(Some((ev.clone(), msg_1.clone())), receiver_c.recv().await); + + sender_c.send((ev.clone(), msg_1.clone())).unwrap(); + assert_eq!(Some((ev.clone(), msg_1.clone())), receiver_a.recv().await); + assert_eq!(Some((ev.clone(), msg_1.clone())), receiver_b.recv().await); + assert_eq!(Some((ev.clone(), msg_1.clone())), receiver_c.recv().await); + + sender_c.send((ev.clone(), msg_3.clone())).unwrap(); + assert_eq!(Some((ev.clone(), msg_3.clone())), receiver_a.recv().await); + assert_eq!(Some((ev.clone(), msg_3.clone())), receiver_b.recv().await); + assert_eq!(Some((ev.clone(), msg_3.clone())), receiver_c.recv().await); + + sender_a.send((ev.clone(), msg_4.clone())).unwrap(); + assert_eq!(Some((ev.clone(), msg_4.clone())), receiver_a.recv().await); + assert_eq!(Some((ev.clone(), msg_4.clone())), receiver_b.recv().await); + assert_eq!(Some((ev.clone(), msg_4.clone())), receiver_c.recv().await); + + sender_a.send((ev.clone(), msg_4.clone())).unwrap(); + assert_eq!(Some((ev.clone(), msg_4.clone())), receiver_a.recv().await); + assert_eq!(Some((ev.clone(), msg_4.clone())), receiver_b.recv().await); + assert_eq!(Some((ev.clone(), msg_4.clone())), receiver_c.recv().await); + + sender_a.send((ev.clone(), msg_1.clone())).unwrap(); + assert_eq!(Some((ev.clone(), msg_1.clone())), receiver_a.recv().await); + assert_eq!(Some((ev.clone(), msg_1.clone())), receiver_b.recv().await); + assert_eq!(Some((ev.clone(), msg_1.clone())), receiver_c.recv().await); + + time::sleep(time::Duration::from_millis(100)).await; + assert!(receiver_a.is_empty()); + assert!(receiver_b.is_empty()); + assert!(receiver_c.is_empty()); +} diff --git a/libs/net4mqtt/src/topic.rs b/libs/net4mqtt/src/topic.rs index db26d7d..0f10209 100644 --- a/libs/net4mqtt/src/topic.rs +++ b/libs/net4mqtt/src/topic.rs @@ -25,6 +25,7 @@ pub mod label { pub const I: &str = "i"; pub const O: &str = "o"; pub const V: &str = "v"; + pub const X: &str = "x"; } pub mod protocol { @@ -52,8 +53,17 @@ pub fn build_sub(prefix: &str, agent_id: &str, local_id: &str, label: &str) -> S format!("{}/{}/{}/{}/{}", prefix, agent_id, local_id, label, ALL) } -pub fn build_pub_x(prefix: &str, agent_id: &str, local_id: &str, label: &str) -> String { - format!("{}/{}/{}/{}/{}", prefix, agent_id, local_id, label, NIL) +pub fn build_pub_x( + prefix: &str, + agent_id: &str, + local_id: &str, + label: &str, + protocol: &str, +) -> String { + format!( + "{}/{}/{}/{}/{}", + prefix, agent_id, local_id, label, protocol + ) } pub fn parse(topic: &str) -> (&str, &str, &str, &str, &str, &str, &str) { @@ -104,7 +114,7 @@ fn test_build_parse() { ); assert_eq!( - build_pub_x(prefix, agent_id, local_id, label::V), + build_pub_x(prefix, agent_id, local_id, label::V, NIL), "test_build_parse/3/7/v/-", ); diff --git a/liveion/src/lib.rs b/liveion/src/lib.rs index eff38bb..4d6828e 100644 --- a/liveion/src/lib.rs +++ b/liveion/src/lib.rs @@ -111,6 +111,7 @@ where offline: Some("{}".bytes().collect()), ..Default::default() }), + None, ) .await .unwrap() diff --git a/liveman/src/lib.rs b/liveman/src/lib.rs index 34f7c3b..a4ae336 100644 --- a/liveman/src/lib.rs +++ b/liveman/src/lib.rs @@ -100,6 +100,7 @@ where receiver: Some(sender), ..Default::default() }), + None, false, ) .await