Skip to content

Commit

Permalink
Merge pull request #393 from nervosnetwork/wasm-support-wss
Browse files Browse the repository at this point in the history
feat: wasm support wss
  • Loading branch information
driftluo authored Jan 2, 2025
2 parents 09ec7b7 + 22cc9bb commit 48f4250
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 15 deletions.
2 changes: 1 addition & 1 deletion tentacle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ thiserror = "1.0"
nohash-hasher = "0.2"

parking_lot = { version = "0.12", optional = true }
tokio-tungstenite = { version = "0.24", optional = true }
tokio-tungstenite = { version = "0.26", optional = true }
httparse = { version = "1.9", optional = true }
futures-timer = { version = "3.0.2", optional = true }
async-std = { version = "1", features = ["unstable"], optional = true }
Expand Down
46 changes: 35 additions & 11 deletions tentacle/src/transports/browser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,23 @@ use crate::{
use futures::FutureExt;
use wasm_bindgen::JsCast;

async fn connect(addr: Multiaddr, timeout: Duration) -> Result<(Multiaddr, BrowserStream)> {
async fn connect(
addr: Multiaddr,
timeout: Duration,
ty: TransportType,
) -> Result<(Multiaddr, BrowserStream)> {
let schema = match ty {
TransportType::Ws => "ws",
TransportType::Wss => "wss",
_ => unreachable!(),
};
let url = match multiaddr_to_socketaddr(&addr) {
Some(socket_address) => format!("ws://{}:{}", socket_address.ip(), socket_address.port()),
Some(socket_address) => format!(
"{}://{}:{}",
schema,
socket_address.ip(),
socket_address.port()
),
None => {
let mut iter = addr.iter().peekable();

Expand All @@ -72,10 +86,10 @@ async fn connect(addr: Multiaddr, timeout: Duration) -> Result<(Multiaddr, Brows

match (proto1, proto2) {
(Protocol::Dns4(domain), Protocol::Tcp(port)) => {
break format!("ws://{}:{}", domain, port)
break format!("{}://{}:{}", schema, domain, port)
}
(Protocol::Dns6(domain), Protocol::Tcp(port)) => {
break format!("ws://{}:{}", domain, port)
break format!("{}://{}:{}", schema, domain, port)
}
_ => return Err(TransportErrorKind::NotSupported(addr.clone())),
}
Expand Down Expand Up @@ -127,14 +141,24 @@ impl TransportDial for BrowserTransport {
type DialFuture = BrowserDialFuture;

fn dial(self, address: Multiaddr) -> Result<Self::DialFuture> {
if !matches!(find_type(&address), TransportType::Ws) {
return Err(TransportErrorKind::NotSupported(address));
}
let dial = crate::runtime::spawn(connect(address, self.timeout));
match find_type(&address) {
TransportType::Ws => {
let dial = crate::runtime::spawn(connect(address, self.timeout, TransportType::Ws));

Ok(TransportFuture::new(Box::pin(async {
dial.await.expect("oneshot channel panic")
})))
Ok(TransportFuture::new(Box::pin(async {
dial.await.expect("oneshot channel panic")
})))
}
TransportType::Wss => {
let dial =
crate::runtime::spawn(connect(address, self.timeout, TransportType::Wss));

Ok(TransportFuture::new(Box::pin(async {
dial.await.expect("oneshot channel panic")
})))
}
_ => Err(TransportErrorKind::NotSupported(address)),
}
}
}

Expand Down
7 changes: 4 additions & 3 deletions tentacle/src/transports/ws.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use bytes::Bytes;
use futures::{future::ok, Sink, StreamExt, TryFutureExt};
use log::debug;
use std::{
Expand Down Expand Up @@ -101,7 +102,7 @@ impl TransportDial for WsTransport {
pub struct WsStream {
inner: WebSocketStream<TcpStream>,
recv_buf: Vec<u8>,
pending_ping: Option<Vec<u8>>,
pending_ping: Option<Bytes>,
already_send_close: bool,
}

Expand Down Expand Up @@ -182,7 +183,7 @@ impl AsyncRead for WsStream {
match self.inner.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(t))) => {
let data = match t {
Message::Binary(data) => data,
Message::Binary(data) => data.to_vec(),
Message::Close(_) => return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())),
Message::Ping(data) => {
self.pending_ping = Some(data);
Expand Down Expand Up @@ -239,7 +240,7 @@ impl AsyncWrite for WsStream {
match sink.as_mut().poll_ready(cx) {
Poll::Ready(Ok(_)) => {
sink.as_mut()
.start_send(Message::Binary(buf.to_vec()))
.start_send(Message::Binary(buf.to_vec().into()))
.map_err::<io::Error, _>(|_| Into::into(io::ErrorKind::BrokenPipe))?;
let _ignore = sink
.as_mut()
Expand Down

0 comments on commit 48f4250

Please sign in to comment.