diff --git a/tentacle/Cargo.toml b/tentacle/Cargo.toml index 1182704b..147ee431 100644 --- a/tentacle/Cargo.toml +++ b/tentacle/Cargo.toml @@ -30,7 +30,7 @@ thiserror = "1.0" nohash-hasher = "0.2" parking_lot = { version = "0.12", optional = true } -tokio-tungstenite = { version = "0.24", optional = true } +tokio-tungstenite = { version = "0.26", optional = true } httparse = { version = "1.9", optional = true } futures-timer = { version = "3.0.2", optional = true } async-std = { version = "1", features = ["unstable"], optional = true } diff --git a/tentacle/src/transports/browser.rs b/tentacle/src/transports/browser.rs index 46f0ba2c..4820be01 100644 --- a/tentacle/src/transports/browser.rs +++ b/tentacle/src/transports/browser.rs @@ -44,9 +44,23 @@ use crate::{ use futures::FutureExt; use wasm_bindgen::JsCast; -async fn connect(addr: Multiaddr, timeout: Duration) -> Result<(Multiaddr, BrowserStream)> { +async fn connect( + addr: Multiaddr, + timeout: Duration, + ty: TransportType, +) -> Result<(Multiaddr, BrowserStream)> { + let schema = match ty { + TransportType::Ws => "ws", + TransportType::Wss => "wss", + _ => unreachable!(), + }; let url = match multiaddr_to_socketaddr(&addr) { - Some(socket_address) => format!("ws://{}:{}", socket_address.ip(), socket_address.port()), + Some(socket_address) => format!( + "{}://{}:{}", + schema, + socket_address.ip(), + socket_address.port() + ), None => { let mut iter = addr.iter().peekable(); @@ -72,10 +86,10 @@ async fn connect(addr: Multiaddr, timeout: Duration) -> Result<(Multiaddr, Brows match (proto1, proto2) { (Protocol::Dns4(domain), Protocol::Tcp(port)) => { - break format!("ws://{}:{}", domain, port) + break format!("{}://{}:{}", schema, domain, port) } (Protocol::Dns6(domain), Protocol::Tcp(port)) => { - break format!("ws://{}:{}", domain, port) + break format!("{}://{}:{}", schema, domain, port) } _ => return Err(TransportErrorKind::NotSupported(addr.clone())), } @@ -127,14 +141,24 @@ impl TransportDial for BrowserTransport { type DialFuture = BrowserDialFuture; fn dial(self, address: Multiaddr) -> Result { - if !matches!(find_type(&address), TransportType::Ws) { - return Err(TransportErrorKind::NotSupported(address)); - } - let dial = crate::runtime::spawn(connect(address, self.timeout)); + match find_type(&address) { + TransportType::Ws => { + let dial = crate::runtime::spawn(connect(address, self.timeout, TransportType::Ws)); - Ok(TransportFuture::new(Box::pin(async { - dial.await.expect("oneshot channel panic") - }))) + Ok(TransportFuture::new(Box::pin(async { + dial.await.expect("oneshot channel panic") + }))) + } + TransportType::Wss => { + let dial = + crate::runtime::spawn(connect(address, self.timeout, TransportType::Wss)); + + Ok(TransportFuture::new(Box::pin(async { + dial.await.expect("oneshot channel panic") + }))) + } + _ => Err(TransportErrorKind::NotSupported(address)), + } } } diff --git a/tentacle/src/transports/ws.rs b/tentacle/src/transports/ws.rs index 9c2cbf09..084035e0 100644 --- a/tentacle/src/transports/ws.rs +++ b/tentacle/src/transports/ws.rs @@ -1,3 +1,4 @@ +use bytes::Bytes; use futures::{future::ok, Sink, StreamExt, TryFutureExt}; use log::debug; use std::{ @@ -101,7 +102,7 @@ impl TransportDial for WsTransport { pub struct WsStream { inner: WebSocketStream, recv_buf: Vec, - pending_ping: Option>, + pending_ping: Option, already_send_close: bool, } @@ -182,7 +183,7 @@ impl AsyncRead for WsStream { match self.inner.poll_next_unpin(cx) { Poll::Ready(Some(Ok(t))) => { let data = match t { - Message::Binary(data) => data, + Message::Binary(data) => data.to_vec(), Message::Close(_) => return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())), Message::Ping(data) => { self.pending_ping = Some(data); @@ -239,7 +240,7 @@ impl AsyncWrite for WsStream { match sink.as_mut().poll_ready(cx) { Poll::Ready(Ok(_)) => { sink.as_mut() - .start_send(Message::Binary(buf.to_vec())) + .start_send(Message::Binary(buf.to_vec().into())) .map_err::(|_| Into::into(io::ErrorKind::BrokenPipe))?; let _ignore = sink .as_mut()