Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(net4mqtt): add xdata #232

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions libs/net4mqtt/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ async fn main() {
(&agent_id, &id),
Some(domain),
None,
None,
kcp,
)
.await
Expand All @@ -131,14 +132,22 @@ async fn main() {

if udp {
let sock = UdpSocket::bind(listen).await.unwrap();
proxy::local_ports_udp(&mqtt_url, sock, target, (&agent_id, &id), None)
proxy::local_ports_udp(&mqtt_url, sock, target, (&agent_id, &id), None, None)
.await
.unwrap();
} else {
let listener = TcpListener::bind(listen).await.unwrap();
proxy::local_ports_tcp(&mqtt_url, listener, target, (&agent_id, &id), None, kcp)
.await
.unwrap();
proxy::local_ports_tcp(
&mqtt_url,
listener,
target,
(&agent_id, &id),
None,
None,
kcp,
)
.await
.unwrap();
}
}
Commands::Agent {
Expand All @@ -148,7 +157,9 @@ async fn main() {
} => {
info!("Running as agent, {:?}", target);

proxy::agent(&mqtt_url, &target, &id, None).await.unwrap();
proxy::agent(&mqtt_url, &target, &id, None, None)
.await
.unwrap();
}
}
}
145 changes: 128 additions & 17 deletions libs/net4mqtt/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,26 +209,41 @@ async fn mqtt_client_init(
mqtt_url: &str,
topic_io_sub: (&str, &str, &str),
id: (&str, &str),
vdata: VDataConfig,
vdata: Option<VDataConfig>,
xdata: Option<XDataConfig>,
) -> Result<
(
AsyncClient,
EventLoop,
String,
Option<Sender<(String, String, Vec<u8>)>>,
Option<UnboundedSender<(String, Vec<u8>)>>,
UnboundedReceiver<(String, Vec<u8>)>,
),
Error,
> {
let (agent_id, local_id) = id;
let (url, prefix) = crate::utils::pre_url(mqtt_url.parse::<Url>()?);

let (x_sender, x_receiver) = {
let XDataConfig { sender, receiver } = xdata.unwrap_or_default();
(
sender,
receiver.unwrap_or_else(|| {
let (sender, receiver) = unbounded_channel::<(String, Vec<u8>)>();
std::mem::forget(sender);
receiver
}),
)
};

let VDataConfig {
online,
offline,
receiver,
} = vdata;
} = vdata.unwrap_or_default();
let topic_v_sub = topic::build_sub(&prefix, topic::ANY, topic::ANY, topic::label::V);
let topic_v_pub = topic::build_pub_x(&prefix, agent_id, local_id, topic::label::V);
let topic_v_pub = topic::build_pub_x(&prefix, agent_id, local_id, topic::label::V, topic::NIL);

let mut mqtt_options = rumqttc::MqttOptions::parse_url(url)?;
debug!("mqtt_options: {:?}", mqtt_options);
Expand Down Expand Up @@ -269,7 +284,17 @@ async fn mqtt_client_init(
.unwrap();
}

Ok((client, eventloop, prefix, receiver))
// MQTT subscribe at label::X
if x_sender.is_some() {
client
.subscribe(
topic::build_sub(&prefix, topic::ANY, topic::ANY, topic::label::X),
QoS::AtMostOnce,
)
.await
.unwrap();
}
Ok((client, eventloop, prefix, receiver, x_sender, x_receiver))
}

#[derive(Default)]
Expand All @@ -279,6 +304,12 @@ pub struct VDataConfig {
pub receiver: Option<Sender<(String, String, Vec<u8>)>>,
}

#[derive(Default)]
pub struct XDataConfig {
pub sender: Option<UnboundedSender<(String, Vec<u8>)>>,
pub receiver: Option<UnboundedReceiver<(String, Vec<u8>)>>,
}

/// Agent service
///
/// # Arguments
Expand All @@ -287,23 +318,26 @@ pub struct VDataConfig {
/// * `address` - The age of the default target address, if local no set `DST`, use this as address.
/// * `agent_id` - The ID is all agents unique ID.
/// * `vdata` - MQTT system message: (online, offline, on_receiver(online, offline))
/// * `xdata` - User message: (sender, receiver)
///
/// # Examples
///
/// ```
/// net4mqtt::proxy::agent("mqtt://127.0.0.1:1883", "127.0.0.1:4444", "agent-0", None);
/// net4mqtt::proxy::agent("mqtt://127.0.0.1:1883", "127.0.0.1:4444", "agent-0", None, None);
/// ```
pub async fn agent(
mqtt_url: &str,
address: &str,
agent_id: &str,
vdata: Option<VDataConfig>,
xdata: Option<XDataConfig>,
) -> Result<(), Error> {
let (client, mut eventloop, _prefix, on_vdata) = mqtt_client_init(
let (client, mut eventloop, prefix, on_vdata, x_sender, mut x_receiver) = mqtt_client_init(
mqtt_url,
(agent_id, topic::ANY, topic::label::I),
(agent_id, topic::NIL),
vdata.unwrap_or_default(),
vdata,
xdata,
)
.await?;

Expand All @@ -316,8 +350,21 @@ pub async fn agent(
let (sender, mut receiver) = unbounded_channel::<(String, Vec<u8>)>();
loop {
let sender = sender.clone();
let x_sender = x_sender.clone();
let on_vdata = on_vdata.clone();
select! {
result = x_receiver.recv() => {
match result {
Some((key, data)) => {
client.publish(topic::build_pub_x(&prefix, agent_id, topic::NIL, topic::label::X, &key),
QoS::AtMostOnce,
false,
data
).await?;
}
None => return Err(anyhow!("recv error"))
}
}
result = receiver.recv() => {
match result {
Some((key, data)) => {
Expand All @@ -344,6 +391,11 @@ pub async fn agent(
s.send((agent_id.to_string(), local_id.to_string(), p.payload.to_vec())).await?;
}
},
topic::label::X => {
if let Some(s) = x_sender {
s.send((protocol.to_string(), p.payload.to_vec()))?;
}
},
_ => {
let sender = match senders.get(&p.topic) {
Some(sender) => sender,
Expand Down Expand Up @@ -387,15 +439,17 @@ pub async fn local_ports_tcp(
target: Option<String>,
id: (&str, &str),
vdata: Option<VDataConfig>,
xdata: Option<XDataConfig>,
tcp_over_kcp: bool,
) -> Result<(), Error> {
let (agent_id, local_id) = id;

let (client, mut eventloop, prefix, on_vdata) = mqtt_client_init(
let (client, mut eventloop, prefix, on_vdata, x_sender, mut x_receiver) = mqtt_client_init(
mqtt_url,
(topic::ANY, local_id, topic::label::O),
(topic::NIL, local_id),
vdata.unwrap_or_default(),
vdata,
xdata,
)
.await?;

Expand All @@ -408,6 +462,7 @@ pub async fn local_ports_tcp(
let (sender, mut receiver) = unbounded_channel::<(String, Vec<u8>)>();
loop {
let sender = sender.clone();
let x_sender = x_sender.clone();
let on_vdata = on_vdata.clone();
select! {
Ok((socket, _)) = listener.accept() => {
Expand All @@ -428,7 +483,18 @@ pub async fn local_ports_tcp(
} { error!("local vnet error: {}", e) };
});
}

result = x_receiver.recv() => {
match result {
Some((key, data)) => {
client.publish(topic::build_pub_x(&prefix, topic::NIL, local_id, topic::label::X, &key),
QoS::AtMostOnce,
false,
data
).await?;
}
None => return Err(anyhow!("recv error"))
}
}
result = receiver.recv() => {
match result {
Some((key, data)) => {
Expand All @@ -455,6 +521,11 @@ pub async fn local_ports_tcp(
s.send((agent_id.to_string(), local_id.to_string(), p.payload.to_vec())).await?;
}
},
(topic::label::X, _) => {
if let Some(s) = x_sender {
s.send((protocol.to_string(), p.payload.to_vec()))?;
}
},
(_, topic::protocol::KCP | topic::protocol::TCP) => {
if let Some(sender) = senders.get(&p.topic) {
if sender.is_closed() {
Expand Down Expand Up @@ -486,14 +557,16 @@ pub async fn local_ports_udp(
target: Option<String>,
id: (&str, &str),
vdata: Option<VDataConfig>,
xdata: Option<XDataConfig>,
) -> Result<(), Error> {
let (agent_id, local_id) = id;

let (client, mut eventloop, prefix, on_vdata) = mqtt_client_init(
let (client, mut eventloop, prefix, on_vdata, x_sender, mut x_receiver) = mqtt_client_init(
mqtt_url,
(topic::ANY, local_id, topic::label::O),
(topic::NIL, local_id),
vdata.unwrap_or_default(),
vdata,
xdata,
)
.await?;

Expand All @@ -502,13 +575,26 @@ pub async fn local_ports_udp(
let (sender, mut receiver) = unbounded_channel::<(String, Vec<u8>)>();
loop {
let sender = sender.clone();
let x_sender = x_sender.clone();
let on_vdata = on_vdata.clone();
select! {
Ok((len, addr)) = sock.recv_from(&mut buf) => {
sender.send((
topic::build(&prefix, agent_id, local_id, topic::label::I, topic::protocol::UDP, &addr.to_string(), &target),
buf[..len].to_vec())).unwrap();
}
result = x_receiver.recv() => {
match result {
Some((key, data)) => {
client.publish(topic::build_pub_x(&prefix, topic::NIL, local_id, topic::label::X, &key),
QoS::AtMostOnce,
false,
data
).await?;
}
None => return Err(anyhow!("recv error"))
}
}
result = receiver.recv() => {
match result {
Some((key, data)) => {
Expand All @@ -535,6 +621,11 @@ pub async fn local_ports_udp(
s.send((agent_id.to_string(), local_id.to_string(), p.payload.to_vec())).await?;
}
},
(topic::label::X, _) => {
if let Some(s) = x_sender {
s.send((protocol.to_string(), p.payload.to_vec()))?;
}
},
(_, topic::protocol::UDP) => { let _ = sock.send_to(&p.payload, src).await?; },
(label, protocol) => info!("unknown label: {} and protocol: {}", label, protocol)
}
Expand Down Expand Up @@ -572,15 +663,17 @@ pub async fn local_socks(
id: (&str, &str),
domain: Option<String>,
vdata: Option<VDataConfig>,
xdata: Option<XDataConfig>,
tcp_over_kcp: bool,
) -> Result<(), Error> {
let (agent_id, local_id) = id;

let (client, mut eventloop, prefix, on_vdata) = mqtt_client_init(
let (client, mut eventloop, prefix, on_vdata, x_sender, mut x_receiver) = mqtt_client_init(
mqtt_url,
(topic::ANY, local_id, topic::label::O),
(topic::NIL, local_id),
vdata.unwrap_or_default(),
vdata,
xdata,
)
.await?;

Expand All @@ -592,7 +685,8 @@ pub async fn local_socks(
);
let (sender, mut receiver) = unbounded_channel::<(String, Vec<u8>)>();
loop {
let sender_clone = sender.clone();
let sender = sender.clone();
let x_sender = x_sender.clone();
let on_vdata = on_vdata.clone();
select! {
Ok((conn, _)) = server.accept() => {
Expand All @@ -618,9 +712,9 @@ pub async fn local_socks(
senders.insert(key_recv, vnet_tx);
task::spawn(async move {
if let Err(e) = if tcp_over_kcp {
up_kcp_vnet(socket, key_send, sender_clone, vnet_rx).await
up_kcp_vnet(socket, key_send, sender, vnet_rx).await
} else {
up_tcp_vnet(socket, key_send, sender_clone, vnet_rx).await
up_tcp_vnet(socket, key_send, sender, vnet_rx).await
} { error!("local vnet error: {}", e) };
});

Expand All @@ -629,6 +723,18 @@ pub async fn local_socks(
}
}

result = x_receiver.recv() => {
match result {
Some((key, data)) => {
client.publish(topic::build_pub_x(&prefix, topic::NIL, local_id, topic::label::X, &key),
QoS::AtMostOnce,
false,
data
).await?;
}
None => return Err(anyhow!("recv error"))
}
}
result = receiver.recv() => {
match result {
Some((key, data)) => {
Expand All @@ -655,6 +761,11 @@ pub async fn local_socks(
s.send((agent_id.to_string(), local_id.to_string(), p.payload.to_vec())).await.unwrap();
}
},
(topic::label::X, _) => {
if let Some(s) = x_sender {
s.send((protocol.to_string(), p.payload.to_vec()))?;
}
},
(_, topic::protocol::KCP | topic::protocol::TCP) => {
if let Some(sender) = senders.get(&p.topic) {
if sender.is_closed() {
Expand Down
Loading
Loading