checkpoint
This commit is contained in:
@@ -34,14 +34,17 @@ impl Network {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new(network_manager: NetworkManager) -> Self {
|
||||
pub fn new(
|
||||
network_manager: NetworkManager,
|
||||
routing_table: RoutingTable,
|
||||
connection_manager: ConnectionManager,
|
||||
) -> Self {
|
||||
Self {
|
||||
config: network_manager.config(),
|
||||
inner: Arc::new(Mutex::new(Self::new_inner(network_manager))),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
fn network_manager(&self) -> NetworkManager {
|
||||
self.inner.lock().network_manager.clone()
|
||||
}
|
||||
@@ -61,14 +64,15 @@ impl Network {
|
||||
|
||||
let res = match dial_info.protocol_type() {
|
||||
ProtocolType::UDP => {
|
||||
return Err("no support for UDP protocol".to_owned()).map_err(logthru_net!(error))
|
||||
bail!("no support for UDP protocol")
|
||||
}
|
||||
ProtocolType::TCP => {
|
||||
return Err("no support for TCP protocol".to_owned()).map_err(logthru_net!(error))
|
||||
bail!("no support for TCP protocol")
|
||||
}
|
||||
ProtocolType::WS | ProtocolType::WSS => {
|
||||
WebsocketProtocolHandler::send_unbound_message(dial_info.clone(), data)
|
||||
.await
|
||||
.wrap_err("failed to send unbound message")
|
||||
}
|
||||
};
|
||||
if res.is_ok() {
|
||||
@@ -94,10 +98,10 @@ impl Network {
|
||||
let data_len = data.len();
|
||||
let out = match dial_info.protocol_type() {
|
||||
ProtocolType::UDP => {
|
||||
return Err("no support for UDP protocol".to_owned()).map_err(logthru_net!(error))
|
||||
bail!("no support for UDP protocol")
|
||||
}
|
||||
ProtocolType::TCP => {
|
||||
return Err("no support for TCP protocol".to_owned()).map_err(logthru_net!(error))
|
||||
bail!("no support for TCP protocol")
|
||||
}
|
||||
ProtocolType::WS | ProtocolType::WSS => {
|
||||
WebsocketProtocolHandler::send_recv_unbound_message(
|
||||
@@ -128,14 +132,14 @@ impl Network {
|
||||
let data_len = data.len();
|
||||
match descriptor.protocol_type() {
|
||||
ProtocolType::UDP => {
|
||||
return Err("no support for udp protocol".to_owned()).map_err(logthru_net!(error))
|
||||
bail!("no support for UDP protocol")
|
||||
}
|
||||
ProtocolType::TCP => {
|
||||
return Err("no support for tcp protocol".to_owned()).map_err(logthru_net!(error))
|
||||
bail!("no support for TCP protocol")
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
|
||||
// Handle connection-oriented protocols
|
||||
|
||||
// Try to send to the exact existing connection if one exists
|
||||
@@ -164,10 +168,10 @@ impl Network {
|
||||
) -> EyreResult<()> {
|
||||
let data_len = data.len();
|
||||
if dial_info.protocol_type() == ProtocolType::UDP {
|
||||
return Err("no support for UDP protocol".to_owned()).map_err(logthru_net!(error))
|
||||
bail!("no support for UDP protocol");
|
||||
}
|
||||
if dial_info.protocol_type() == ProtocolType::TCP {
|
||||
return Err("no support for TCP protocol".to_owned()).map_err(logthru_net!(error))
|
||||
bail!("no support for TCP protocol");
|
||||
}
|
||||
|
||||
// Handle connection-oriented protocols
|
||||
@@ -176,7 +180,7 @@ impl Network {
|
||||
.get_or_create_connection(None, dial_info.clone())
|
||||
.await?;
|
||||
|
||||
let res = conn.send_async(data).await.map_err(logthru_net!(error));
|
||||
let res = conn.send_async(data).await;
|
||||
if res.is_ok() {
|
||||
// Network accounting
|
||||
self.network_manager()
|
||||
@@ -215,7 +219,7 @@ impl Network {
|
||||
pub fn is_started(&self) -> bool {
|
||||
self.inner.lock().network_started
|
||||
}
|
||||
|
||||
|
||||
pub fn restart_network(&self) {
|
||||
self.inner.lock().network_needs_restart = true;
|
||||
}
|
||||
@@ -247,8 +251,7 @@ impl Network {
|
||||
pub async fn check_interface_addresses(&self) -> Result<bool, String> {
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
|
||||
|
||||
//////////////////////////////////////////
|
||||
pub fn get_network_class(&self) -> Option<NetworkClass> {
|
||||
// xxx eventually detect tor browser?
|
||||
|
@@ -3,6 +3,7 @@ pub mod ws;
|
||||
|
||||
use super::*;
|
||||
use crate::xx::*;
|
||||
use std::io;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ProtocolNetworkConnection {
|
||||
|
@@ -1,7 +1,7 @@
|
||||
use super::*;
|
||||
use ws_stream_wasm::*;
|
||||
use futures_util::{StreamExt, SinkExt};
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use std::io;
|
||||
use ws_stream_wasm::*;
|
||||
|
||||
struct WebsocketNetworkConnectionInner {
|
||||
ws_meta: WsMeta,
|
||||
@@ -9,9 +9,7 @@ struct WebsocketNetworkConnectionInner {
|
||||
}
|
||||
|
||||
fn to_io(err: WsErr) -> io::Error {
|
||||
let kind = match err {
|
||||
WsErr::InvalidWsState {supplied:_} => io::ErrorKind::
|
||||
}
|
||||
io::Error::new(io::ErrorKind::Other, err.to_string())
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -27,9 +25,7 @@ impl fmt::Debug for WebsocketNetworkConnection {
|
||||
}
|
||||
|
||||
impl WebsocketNetworkConnection {
|
||||
pub fn new(
|
||||
descriptor: ConnectionDescriptor,
|
||||
ws_meta: WsMeta, ws_stream: WsStream) -> Self {
|
||||
pub fn new(descriptor: ConnectionDescriptor, ws_meta: WsMeta, ws_stream: WsStream) -> Self {
|
||||
Self {
|
||||
descriptor,
|
||||
inner: Arc::new(WebsocketNetworkConnectionInner {
|
||||
@@ -53,28 +49,29 @@ impl WebsocketNetworkConnection {
|
||||
if message.len() > MAX_MESSAGE_SIZE {
|
||||
bail_io_error_other!("sending too large WS message");
|
||||
}
|
||||
self.inner.ws_stream.clone()
|
||||
.send(WsMessage::Binary(message)).await
|
||||
.map_err(|_| "failed to send to websocket".to_owned())
|
||||
.map_err(logthru_net!(error))
|
||||
self.inner
|
||||
.ws_stream
|
||||
.clone()
|
||||
.send(WsMessage::Binary(message))
|
||||
.await
|
||||
.map_err(to_io)
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", err, skip(self), fields(ret.len))]
|
||||
pub async fn recv(&self) -> Result<Vec<u8>, String> {
|
||||
pub async fn recv(&self) -> io::Result<Vec<u8>> {
|
||||
let out = match self.inner.ws_stream.clone().next().await {
|
||||
Some(WsMessage::Binary(v)) => v,
|
||||
Some(x) => {
|
||||
return Err(format!("Unexpected WS message type: {:?}", x));
|
||||
bail_io_error_other!("Unexpected WS message type");
|
||||
}
|
||||
None => {
|
||||
return Err("WS stream closed".to_owned()).map_err(logthru_net!(error));
|
||||
bail_io_error_other!("WS stream closed");
|
||||
}
|
||||
};
|
||||
if out.len() > MAX_MESSAGE_SIZE {
|
||||
Err("sending too large WS message".to_owned()).map_err(logthru_net!(error))
|
||||
} else {
|
||||
Ok(out)
|
||||
bail_io_error_other!("sending too large WS message")
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,8 +85,7 @@ impl WebsocketProtocolHandler {
|
||||
pub async fn connect(
|
||||
local_address: Option<SocketAddr>,
|
||||
dial_info: DialInfo,
|
||||
) -> Result<ProtocolNetworkConnection, String> {
|
||||
|
||||
) -> io::Result<ProtocolNetworkConnection> {
|
||||
assert!(local_address.is_none());
|
||||
|
||||
// Split dial info up
|
||||
@@ -99,34 +95,33 @@ impl WebsocketProtocolHandler {
|
||||
_ => panic!("invalid dialinfo for WS/WSS protocol"),
|
||||
};
|
||||
let request = dial_info.request().unwrap();
|
||||
let split_url = SplitUrl::from_str(&request)?;
|
||||
let split_url = SplitUrl::from_str(&request).map_err(to_io_error_other)?;
|
||||
if split_url.scheme != scheme {
|
||||
return Err("invalid websocket url scheme".to_string());
|
||||
bail_io_error_other!("invalid websocket url scheme");
|
||||
}
|
||||
|
||||
let (wsmeta, wsio) = WsMeta::connect(request, None)
|
||||
.await
|
||||
.map_err(map_to_string)
|
||||
.map_err(logthru_net!(error))?;
|
||||
|
||||
let (wsmeta, wsio) = WsMeta::connect(request, None).await.map_err(to_io)?;
|
||||
|
||||
// Make our connection descriptor
|
||||
Ok(ProtocolNetworkConnection::Ws(WebsocketNetworkConnection::new(ConnectionDescriptor::new_no_local(
|
||||
dial_info.to_peer_address(),
|
||||
), wsmeta, wsio)))
|
||||
Ok(ProtocolNetworkConnection::Ws(
|
||||
WebsocketNetworkConnection::new(
|
||||
ConnectionDescriptor::new_no_local(dial_info.to_peer_address()),
|
||||
wsmeta,
|
||||
wsio,
|
||||
),
|
||||
))
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", err, skip(data), fields(data.len = data.len()))]
|
||||
pub async fn send_unbound_message(dial_info: DialInfo, data: Vec<u8>) -> Result<(), String> {
|
||||
pub async fn send_unbound_message(dial_info: DialInfo, data: Vec<u8>) -> io::Result<()> {
|
||||
if data.len() > MAX_MESSAGE_SIZE {
|
||||
return Err("sending too large unbound WS message".to_owned());
|
||||
bail_io_error_other!("sending too large unbound WS message");
|
||||
}
|
||||
|
||||
// Make the real connection
|
||||
let conn = Self::connect(None, dial_info)
|
||||
.await
|
||||
.map_err(|e| format!("failed to connect websocket for unbound message: {}", e))?;
|
||||
|
||||
conn.send(data).await
|
||||
// Make the real connection
|
||||
let conn = Self::connect(None, dial_info).await?;
|
||||
|
||||
conn.send(data).await
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", err, skip(data), fields(data.len = data.len(), ret.len))]
|
||||
@@ -134,23 +129,19 @@ impl WebsocketProtocolHandler {
|
||||
dial_info: DialInfo,
|
||||
data: Vec<u8>,
|
||||
timeout_ms: u32,
|
||||
) -> Result<Vec<u8>, String> {
|
||||
) -> io::Result<Vec<u8>> {
|
||||
if data.len() > MAX_MESSAGE_SIZE {
|
||||
return Err("sending too large unbound WS message".to_owned());
|
||||
bail_io_error_other!("sending too large unbound WS message");
|
||||
}
|
||||
|
||||
let conn = Self::connect(None, dial_info.clone())
|
||||
.await
|
||||
.map_err(|e| format!("failed to connect websocket for unbound message: {}", e))?;
|
||||
let conn = Self::connect(None, dial_info.clone()).await?;
|
||||
|
||||
conn.send(data).await?;
|
||||
let out = timeout(timeout_ms, conn.recv())
|
||||
.await
|
||||
.map_err(map_to_string)??;
|
||||
.map_err(|e| e.to_io())??;
|
||||
|
||||
tracing::Span::current().record("ret.len", &out.len());
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user