From 2eb598e68c89a9125dc3099199bfb4829ee64dc7 Mon Sep 17 00:00:00 2001 From: John Smith Date: Wed, 5 Jan 2022 19:15:45 -0500 Subject: [PATCH] fix wasm websocket --- .../src/intf/wasm/network/protocol/ws.rs | 25 ++++++++----------- .../tests/native/test_async_peek_stream.rs | 11 ++++---- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/veilid-core/src/intf/wasm/network/protocol/ws.rs b/veilid-core/src/intf/wasm/network/protocol/ws.rs index 59890165..aa2a5bc8 100644 --- a/veilid-core/src/intf/wasm/network/protocol/ws.rs +++ b/veilid-core/src/intf/wasm/network/protocol/ws.rs @@ -8,13 +8,12 @@ use futures_util::{StreamExt, SinkExt}; struct WebsocketNetworkConnectionInner { ws_meta: WsMeta, - ws_stream: WsStream, + ws_stream: CloneStream, } #[derive(Clone)] pub struct WebsocketNetworkConnection { - tls: bool, - inner: Arc>, + inner: Arc, } impl fmt::Debug for WebsocketNetworkConnection { @@ -24,34 +23,30 @@ impl fmt::Debug for WebsocketNetworkConnection { } impl WebsocketNetworkConnection { - pub fn new(tls: bool, ws_meta: WsMeta, ws_stream: WsStream) -> Self { + pub fn new(ws_meta: WsMeta, ws_stream: WsStream) -> Self { Self { - tls, - inner: Arc::new(AsyncMutex::new(WebsocketNetworkConnectionInner { + inner: Arc::new(WebsocketNetworkConnectionInner { ws_meta, - ws_stream, - })), + ws_stream: CloneStream::new(ws_stream), + }), } } pub async fn close(&self) -> Result<(), String> { - let inner = self.inner.lock().await; - inner.ws_meta.close().await.map_err(map_to_string).map(drop) + self.inner.ws_meta.close().await.map_err(map_to_string).map(drop) } pub async fn send(&self, message: Vec) -> Result<(), String> { if message.len() > MAX_MESSAGE_SIZE { return Err("sending too large WS message".to_owned()).map_err(logthru_net!(error)); } - let mut inner = self.inner.lock().await; - inner.ws_stream + self.inner.ws_stream.clone() .send(WsMessage::Binary(message)).await .map_err(|_| "failed to send to websocket".to_owned()) .map_err(logthru_net!(error)) } pub async fn recv(&self) -> Result, String> { - let mut inner = self.inner.lock().await; - let out = match inner.ws_stream.next().await { + let out = match self.inner.ws_stream.clone().next().await { Some(WsMessage::Binary(v)) => v, Some(_) => { return Err("Unexpected WS message type".to_owned()) @@ -104,7 +99,7 @@ impl WebsocketProtocolHandler { Ok(NetworkConnection::from_protocol(ConnectionDescriptor { local: None, remote: dial_info.to_peer_address(), - },ProtocolNetworkConnection::Ws(WebsocketNetworkConnection::new(tls, wsmeta, wsio)))) + },ProtocolNetworkConnection::Ws(WebsocketNetworkConnection::new(wsmeta, wsio)))) } pub async fn send_unbound_message(dial_info: DialInfo, data: Vec) -> Result<(), String> { diff --git a/veilid-core/src/tests/native/test_async_peek_stream.rs b/veilid-core/src/tests/native/test_async_peek_stream.rs index ad76b295..6f7d9f69 100644 --- a/veilid-core/src/tests/native/test_async_peek_stream.rs +++ b/veilid-core/src/tests/native/test_async_peek_stream.rs @@ -1,11 +1,12 @@ use crate::xx::*; +use core::time::Duration; + +static MESSAGE: &[u8; 62] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + use async_std::io; use async_std::net::{TcpListener, TcpStream}; use async_std::prelude::*; use async_std::task; -use std::time::Duration; - -static MESSAGE: &[u8; 62] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; async fn make_tcp_loopback() -> Result<(TcpStream, TcpStream), io::Error> { let listener = TcpListener::bind("127.0.0.1:0").await?; @@ -35,13 +36,13 @@ async fn make_async_peek_stream_loopback() -> (AsyncPeekStream, AsyncPeekStream) (aps_a, aps_c) } -async fn make_tcpstream_loopback() -> (TcpStream, TcpStream) { +async fn make_stream_loopback() -> (TcpStream, TcpStream) { make_tcp_loopback().await.unwrap() } pub async fn test_nothing() { info!("test_nothing"); - let (mut a, mut c) = make_tcpstream_loopback().await; + let (mut a, mut c) = make_stream_loopback().await; let outbuf = MESSAGE.to_vec(); a.write_all(&outbuf).await.unwrap();