fix wasm and finish refactor
This commit is contained in:
@@ -1,8 +1,9 @@
|
||||
mod protocol;
|
||||
|
||||
use crate::intf::*;
|
||||
use crate::connection_manager::*;
|
||||
use crate::network_manager::*;
|
||||
use crate::routing_table::*;
|
||||
use crate::intf::*;
|
||||
use crate::*;
|
||||
use protocol::ws::WebsocketProtocolHandler;
|
||||
pub use protocol::*;
|
||||
@@ -42,11 +43,15 @@ impl Network {
|
||||
}
|
||||
}
|
||||
|
||||
fn connection_manager(&self) -> ConnectionManager {
|
||||
self.inner.lock().network_manager.connection_manager()
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////
|
||||
|
||||
async fn send_data_to_existing_connection(
|
||||
&self,
|
||||
descriptor: &ConnectionDescriptor,
|
||||
descriptor: ConnectionDescriptor,
|
||||
data: Vec<u8>,
|
||||
) -> Result<Option<Vec<u8>>, String> {
|
||||
match descriptor.protocol_type() {
|
||||
@@ -56,79 +61,70 @@ impl Network {
|
||||
ProtocolType::TCP => {
|
||||
return Err("no support for tcp protocol".to_owned()).map_err(logthru_net!(error))
|
||||
}
|
||||
ProtocolType::WS | ProtocolType::WSS => {
|
||||
// find an existing connection in the connection table if one exists
|
||||
let network_manager = self.inner.lock().network_manager.clone();
|
||||
if let Some(entry) = network_manager
|
||||
.connection_table()
|
||||
.get_connection(&descriptor)
|
||||
{
|
||||
// connection exists, send over it
|
||||
entry.conn.send(data).await.map_err(logthru_net!())?;
|
||||
// Data was consumed
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
// Handle connection-oriented protocols
|
||||
if let Some(conn) = self.connection_manager().get_connection(descriptor).await {
|
||||
// connection exists, send over it
|
||||
conn.send(data).await.map_err(logthru_net!())?;
|
||||
|
||||
// Data was consumed
|
||||
Ok(None)
|
||||
} else {
|
||||
// Connection or didn't exist
|
||||
// Pass the data back out so we don't own it any more
|
||||
Ok(Some(data))
|
||||
}
|
||||
// connection or local socket didn't exist, we'll need to use dialinfo to create one
|
||||
// Pass the data back out so we don't own it any more
|
||||
Ok(Some(data))
|
||||
}
|
||||
|
||||
pub async fn send_data_unbound_to_dial_info(
|
||||
&self,
|
||||
dial_info: &DialInfo,
|
||||
dial_info: DialInfo,
|
||||
data: Vec<u8>,
|
||||
) -> Result<(), String> {
|
||||
let network_manager = self.inner.lock().network_manager.clone();
|
||||
|
||||
match &dial_info {
|
||||
DialInfo::UDP(_) => {
|
||||
match dial_info.protocol_type() {
|
||||
ProtocolType::UDP => {
|
||||
return Err("no support for UDP protocol".to_owned()).map_err(logthru_net!(error))
|
||||
}
|
||||
DialInfo::TCP(_) => {
|
||||
ProtocolType::TCP => {
|
||||
return Err("no support for TCP protocol".to_owned()).map_err(logthru_net!(error))
|
||||
}
|
||||
DialInfo::WS(_) => Err("WS protocol does not support unbound messages".to_owned())
|
||||
.map_err(logthru_net!(error)),
|
||||
DialInfo::WSS(_) => Err("WSS protocol does not support unbound messages".to_owned())
|
||||
.map_err(logthru_net!(error)),
|
||||
ProtocolType::WS | ProtocolType::WSS => {
|
||||
WebsocketProtocolHandler::send_unbound_message(dial_info, data)
|
||||
.await
|
||||
.map_err(logthru_net!())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send_data_to_dial_info(
|
||||
&self,
|
||||
dial_info: &DialInfo,
|
||||
dial_info: DialInfo,
|
||||
data: Vec<u8>,
|
||||
) -> Result<(), String> {
|
||||
let network_manager = self.inner.lock().network_manager.clone();
|
||||
if dial_info.protocol_type() == ProtocolType::UDP {
|
||||
return Err("no support for UDP protocol".to_owned()).map_err(logthru_net!(error))
|
||||
}
|
||||
if dial_info.protocol_type() == ProtocolType::TCP {
|
||||
return Err("no support for TCP protocol".to_owned()).map_err(logthru_net!(error))
|
||||
}
|
||||
|
||||
let conn = match &dial_info {
|
||||
DialInfo::UDP(_) => {
|
||||
return Err("no support for UDP protocol".to_owned()).map_err(logthru_net!(error))
|
||||
}
|
||||
DialInfo::TCP(_) => {
|
||||
return Err("no support for TCP protocol".to_owned()).map_err(logthru_net!(error))
|
||||
}
|
||||
DialInfo::WS(_) => WebsocketProtocolHandler::connect(network_manager, dial_info)
|
||||
.await
|
||||
.map_err(logthru_net!(error))?,
|
||||
DialInfo::WSS(_) => WebsocketProtocolHandler::connect(network_manager, dial_info)
|
||||
.await
|
||||
.map_err(logthru_net!(error))?,
|
||||
};
|
||||
// Handle connection-oriented protocols
|
||||
let conn = self
|
||||
.connection_manager()
|
||||
.get_or_create_connection(None, dial_info)
|
||||
.await?;
|
||||
|
||||
conn.send(data).await.map_err(logthru_net!(error))
|
||||
}
|
||||
|
||||
pub async fn send_data(&self, node_ref: NodeRef, data: Vec<u8>) -> Result<(), String> {
|
||||
let dial_info = node_ref.best_dial_info();
|
||||
let descriptor = node_ref.last_connection();
|
||||
|
||||
// First try to send data to the last socket we've seen this peer on
|
||||
let di_data = if let Some(descriptor) = descriptor {
|
||||
let data = if let Some(descriptor) = node_ref.last_connection() {
|
||||
match self
|
||||
.clone()
|
||||
.send_data_to_existing_connection(&descriptor, data)
|
||||
.send_data_to_existing_connection(descriptor, data)
|
||||
.await?
|
||||
{
|
||||
None => {
|
||||
@@ -141,15 +137,11 @@ impl Network {
|
||||
};
|
||||
|
||||
// If that fails, try to make a connection or reach out to the peer via its dial info
|
||||
if let Some(di) = dial_info {
|
||||
self.clone()
|
||||
.send_data_to_dial_info(&di, di_data)
|
||||
.await
|
||||
.map_err(logthru_net!(error))
|
||||
} else {
|
||||
Err("couldn't send data, no dial info or peer address".to_owned())
|
||||
.map_err(logthru_net!(error))
|
||||
}
|
||||
let dial_info = node_ref
|
||||
.best_dial_info()
|
||||
.ok_or_else(|| "couldn't send data, no dial info or peer address".to_owned())?;
|
||||
|
||||
self.send_data_to_dial_info(dial_info, data).await
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////
|
||||
|
@@ -1,9 +1,9 @@
|
||||
pub mod wrtc;
|
||||
pub mod ws;
|
||||
|
||||
use crate::connection_manager::*;
|
||||
use crate::veilid_api::ProtocolType;
|
||||
use crate::network_connection::*;
|
||||
use crate::xx::*;
|
||||
use crate::*;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ProtocolNetworkConnection {
|
||||
@@ -31,7 +31,7 @@ impl ProtocolNetworkConnection {
|
||||
}
|
||||
|
||||
pub async fn send_unbound_message(
|
||||
dial_info: &DialInfo,
|
||||
dial_info: DialInfo,
|
||||
data: Vec<u8>,
|
||||
) -> Result<(), String> {
|
||||
match dial_info.protocol_type() {
|
||||
|
@@ -1,10 +1,10 @@
|
||||
use crate::intf::*;
|
||||
use crate::network_manager::{NetworkManager, MAX_MESSAGE_SIZE};
|
||||
use crate::network_connection::*;
|
||||
use crate::network_manager::MAX_MESSAGE_SIZE;
|
||||
use crate::*;
|
||||
use alloc::fmt;
|
||||
use futures_util::stream::StreamExt;
|
||||
use web_sys::WebSocket;
|
||||
use ws_stream_wasm::*;
|
||||
use futures_util::{StreamExt, SinkExt};
|
||||
|
||||
struct WebsocketNetworkConnectionInner {
|
||||
ws_meta: WsMeta,
|
||||
@@ -27,7 +27,7 @@ impl WebsocketNetworkConnection {
|
||||
pub fn new(tls: bool, ws_meta: WsMeta, ws_stream: WsStream) -> Self {
|
||||
Self {
|
||||
tls,
|
||||
inner: Arc::new(Mutex::new(WebsocketNetworkConnectionInner {
|
||||
inner: Arc::new(AsyncMutex::new(WebsocketNetworkConnectionInner {
|
||||
ws_meta,
|
||||
ws_stream,
|
||||
})),
|
||||
@@ -36,7 +36,7 @@ impl WebsocketNetworkConnection {
|
||||
|
||||
pub async fn close(&self) -> Result<(), String> {
|
||||
let inner = self.inner.lock().await;
|
||||
inner.ws_meta.close().await;
|
||||
inner.ws_meta.close().await.map_err(map_to_string).map(drop)
|
||||
}
|
||||
|
||||
pub async fn send(&self, message: Vec<u8>) -> Result<(), String> {
|
||||
@@ -77,46 +77,37 @@ pub struct WebsocketProtocolHandler {}
|
||||
impl WebsocketProtocolHandler {
|
||||
pub async fn connect(
|
||||
local_address: Option<SocketAddr>,
|
||||
dial_info: &DialInfo,
|
||||
dial_info: DialInfo,
|
||||
) -> Result<NetworkConnection, String> {
|
||||
let url = dial_info
|
||||
.request()
|
||||
.ok_or_else(|| format!("missing url in websocket dialinfo: {:?}", dial_info))?;
|
||||
let split_url = SplitUrl::from_str(&url)?;
|
||||
let tls = match dial_info {
|
||||
DialInfo::WS(ws) => {
|
||||
if split_url.scheme.to_ascii_lowercase() != "ws" {
|
||||
return Err(format!("wrong scheme for WS websocket url: {}", url));
|
||||
}
|
||||
false
|
||||
}
|
||||
DialInfo::WSS(wss) => {
|
||||
if split_url.scheme.to_ascii_lowercase() != "wss" {
|
||||
return Err(format!("wrong scheme for WSS websocket url: {}", url));
|
||||
}
|
||||
true
|
||||
}
|
||||
_ => {
|
||||
return Err("wrong protocol for WebsocketProtocolHandler".to_owned())
|
||||
.map_err(logthru_net!(error))
|
||||
}
|
||||
};
|
||||
|
||||
assert!(local_address.is_none());
|
||||
|
||||
let (_, wsio) = WsMeta::connect(url, None)
|
||||
// Split dial info up
|
||||
let (tls, scheme) = match &dial_info {
|
||||
DialInfo::WS(_) => (false, "ws"),
|
||||
DialInfo::WSS(_) => (true, "wss"),
|
||||
_ => panic!("invalid dialinfo for WS/WSS protocol"),
|
||||
};
|
||||
let request = dial_info.request().unwrap();
|
||||
let split_url = SplitUrl::from_str(&request)?;
|
||||
if split_url.scheme != scheme {
|
||||
return Err("invalid websocket url scheme".to_string());
|
||||
}
|
||||
|
||||
let (wsmeta, wsio) = WsMeta::connect(request, None)
|
||||
.await
|
||||
.map_err(map_to_string)
|
||||
.map_err(logthru_net!(error))?;
|
||||
|
||||
// Make our connection descriptor
|
||||
let connection_descriptor = ConnectionDescriptor {
|
||||
|
||||
Ok(NetworkConnection::from_protocol(ConnectionDescriptor {
|
||||
local: None,
|
||||
remote: dial_info.to_peer_address(),
|
||||
};
|
||||
|
||||
Ok(NetworkConnection::from_protocol(descriptor,ProtocolNetworkConnection::Ws(WebsocketNetworkConnection::new(tls, wsio))))
|
||||
},ProtocolNetworkConnection::Ws(WebsocketNetworkConnection::new(tls, wsmeta, wsio))))
|
||||
}
|
||||
|
||||
pub async fn send_unbound_message(dial_info: &DialInfo, data: Vec<u8>) -> Result<(), String> {
|
||||
pub async fn send_unbound_message(dial_info: DialInfo, data: Vec<u8>) -> Result<(), String> {
|
||||
if data.len() > MAX_MESSAGE_SIZE {
|
||||
return Err("sending too large unbound WS message".to_owned());
|
||||
}
|
||||
@@ -126,7 +117,7 @@ impl WebsocketProtocolHandler {
|
||||
dial_info,
|
||||
);
|
||||
|
||||
let conn = Self::connect(None, dial_info.clone())
|
||||
let conn = Self::connect(None, dial_info)
|
||||
.await
|
||||
.map_err(|e| format!("failed to connect websocket for unbound message: {}", e))?;
|
||||
|
||||
|
Reference in New Issue
Block a user