fix websockets

This commit is contained in:
John Smith 2022-01-05 16:58:18 -05:00
parent b66aca0ce0
commit ea2300c32b
5 changed files with 24 additions and 23 deletions

View File

@ -34,7 +34,7 @@ impl ConnectionTable {
assert!(res.is_none()); assert!(res.is_none());
let conns = self.conns_by_remote.entry(descriptor.remote).or_default(); let conns = self.conns_by_remote.entry(descriptor.remote).or_default();
warn!("add_connection: {:?}", conn); //warn!("add_connection: {:?}", conn);
conns.push(conn); conns.push(conn);
Ok(()) Ok(())
@ -42,7 +42,7 @@ impl ConnectionTable {
pub fn get_connection(&self, descriptor: ConnectionDescriptor) -> Option<NetworkConnection> { pub fn get_connection(&self, descriptor: ConnectionDescriptor) -> Option<NetworkConnection> {
let out = self.conn_by_descriptor.get(&descriptor).cloned(); let out = self.conn_by_descriptor.get(&descriptor).cloned();
warn!("get_connection: {:?} -> {:?}", descriptor, out); //warn!("get_connection: {:?} -> {:?}", descriptor, out);
out out
} }
pub fn get_last_connection_by_remote(&self, remote: PeerAddress) -> Option<NetworkConnection> { pub fn get_last_connection_by_remote(&self, remote: PeerAddress) -> Option<NetworkConnection> {
@ -50,7 +50,7 @@ impl ConnectionTable {
.conns_by_remote .conns_by_remote
.get(&remote) .get(&remote)
.map(|v| v[(v.len() - 1)].clone()); .map(|v| v[(v.len() - 1)].clone());
warn!("get_last_connection_by_remote: {:?} -> {:?}", remote, out); //warn!("get_last_connection_by_remote: {:?} -> {:?}", remote, out);
out out
} }
@ -62,7 +62,7 @@ impl ConnectionTable {
&mut self, &mut self,
descriptor: ConnectionDescriptor, descriptor: ConnectionDescriptor,
) -> Result<NetworkConnection, String> { ) -> Result<NetworkConnection, String> {
warn!("remove_connection: {:?}", descriptor); //warn!("remove_connection: {:?}", descriptor);
let out = self let out = self
.conn_by_descriptor .conn_by_descriptor
.remove(&descriptor) .remove(&descriptor)

View File

@ -124,7 +124,7 @@ impl RawTcpProtocolHandler {
ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new(stream)), ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new(stream)),
); );
warn!("on_accept_async from: {}", socket_addr); log_net!("on_accept_async from: {}", socket_addr);
Ok(Some(conn)) Ok(Some(conn))
} }

View File

@ -22,7 +22,6 @@ pub struct WebsocketNetworkConnection<T>
where where
T: io::Read + io::Write + Send + Unpin + 'static, T: io::Read + io::Write + Send + Unpin + 'static,
{ {
tls: bool,
ws_stream: CloneStream<WebSocketStream<T>>, ws_stream: CloneStream<WebSocketStream<T>>,
} }
@ -39,9 +38,8 @@ impl<T> WebsocketNetworkConnection<T>
where where
T: io::Read + io::Write + Send + Unpin + 'static, T: io::Read + io::Write + Send + Unpin + 'static,
{ {
pub fn new(tls: bool, ws_stream: WebSocketStream<T>) -> Self { pub fn new(ws_stream: WebSocketStream<T>) -> Self {
Self { Self {
tls,
ws_stream: CloneStream::new(ws_stream), ws_stream: CloneStream::new(ws_stream),
} }
} }
@ -103,9 +101,9 @@ impl WebsocketProtocolHandler {
pub fn new(config: VeilidConfig, tls: bool, local_address: SocketAddr) -> Self { pub fn new(config: VeilidConfig, tls: bool, local_address: SocketAddr) -> Self {
let c = config.get(); let c = config.get();
let path = if tls { let path = if tls {
format!("GET {}", c.network.protocol.ws.path.trim_end_matches('/')) format!("GET /{}", c.network.protocol.ws.path.trim_end_matches('/'))
} else { } else {
format!("GET {}", c.network.protocol.wss.path.trim_end_matches('/')) format!("GET /{}", c.network.protocol.wss.path.trim_end_matches('/'))
}; };
let connection_initial_timeout = if tls { let connection_initial_timeout = if tls {
c.network.tls.connection_initial_timeout c.network.tls.connection_initial_timeout
@ -151,7 +149,11 @@ impl WebsocketProtocolHandler {
&& peekbuf[request_path_len - 1] == b' ')); && peekbuf[request_path_len - 1] == b' '));
if !matches_path { if !matches_path {
log_net!("not websocket"); log_net!(
"not websocket: request_path: {} peekbuf:{}",
std::str::from_utf8(&self.arc.request_path).unwrap(),
std::str::from_utf8(&peekbuf).unwrap()
);
return Ok(None); return Ok(None);
} }
log_net!("found websocket"); log_net!("found websocket");
@ -176,10 +178,7 @@ impl WebsocketProtocolHandler {
peer_addr, peer_addr,
SocketAddress::from_socket_addr(self.arc.local_address), SocketAddress::from_socket_addr(self.arc.local_address),
), ),
ProtocolNetworkConnection::WsAccepted(WebsocketNetworkConnection::new( ProtocolNetworkConnection::WsAccepted(WebsocketNetworkConnection::new(ws_stream)),
self.arc.tls,
ws_stream,
)),
); );
Ok(Some(conn)) Ok(Some(conn))
@ -246,7 +245,7 @@ impl WebsocketProtocolHandler {
Ok(NetworkConnection::from_protocol( Ok(NetworkConnection::from_protocol(
descriptor, descriptor,
ProtocolNetworkConnection::Wss(WebsocketNetworkConnection::new(tls, ws_stream)), ProtocolNetworkConnection::Wss(WebsocketNetworkConnection::new(ws_stream)),
)) ))
} else { } else {
let (ws_stream, _response) = client_async(request, tcp_stream) let (ws_stream, _response) = client_async(request, tcp_stream)
@ -255,7 +254,7 @@ impl WebsocketProtocolHandler {
.map_err(logthru_net!(error))?; .map_err(logthru_net!(error))?;
Ok(NetworkConnection::from_protocol( Ok(NetworkConnection::from_protocol(
descriptor, descriptor,
ProtocolNetworkConnection::Ws(WebsocketNetworkConnection::new(tls, ws_stream)), ProtocolNetworkConnection::Ws(WebsocketNetworkConnection::new(ws_stream)),
)) ))
} }
} }

View File

@ -96,9 +96,11 @@ impl TableStore {
let cfg = DatabaseConfig::with_columns(column_count); let cfg = DatabaseConfig::with_columns(column_count);
let db = let db =
Database::open(&dbpath, cfg).map_err(|e| format!("failed to open tabledb: {}", e))?; Database::open(&dbpath, cfg).map_err(|e| format!("failed to open tabledb: {}", e))?;
info!( trace!(
"opened table store '{}' at path '{:?}' with {} columns", "opened table store '{}' at path '{:?}' with {} columns",
name, dbpath, column_count name,
dbpath,
column_count
); );
let table_db = TableDB::new(table_name.clone(), self.clone(), db); let table_db = TableDB::new(table_name.clone(), self.clone(), db);

View File

@ -461,18 +461,18 @@ impl FromStr for DialInfo {
Ok(DialInfo::tcp(socket_address)) Ok(DialInfo::tcp(socket_address))
} }
"ws" => { "ws" => {
let (sa, rest) = s.split_once('|').ok_or_else(|| { let (sa, rest) = rest.split_once('|').ok_or_else(|| {
parse_error!("DialInfo::from_str missing socket address '|' separator", s) parse_error!("DialInfo::from_str missing socket address '|' separator", s)
})?; })?;
let socket_address = SocketAddress::from_str(sa)?; let socket_address = SocketAddress::from_str(sa)?;
DialInfo::try_ws(socket_address, rest.to_string()) DialInfo::try_ws(socket_address, format!("ws://{}", rest))
} }
"wss" => { "wss" => {
let (sa, rest) = s.split_once('|').ok_or_else(|| { let (sa, rest) = rest.split_once('|').ok_or_else(|| {
parse_error!("DialInfo::from_str missing socket address '|' separator", s) parse_error!("DialInfo::from_str missing socket address '|' separator", s)
})?; })?;
let socket_address = SocketAddress::from_str(sa)?; let socket_address = SocketAddress::from_str(sa)?;
DialInfo::try_wss(socket_address, rest.to_string()) DialInfo::try_wss(socket_address, format!("wss://{}", rest))
} }
_ => Err(parse_error!("DialInfo::from_str has invalid scheme", s)), _ => Err(parse_error!("DialInfo::from_str has invalid scheme", s)),
} }