start of refactoring veilid_api

This commit is contained in:
John Smith
2021-12-20 19:12:30 -05:00
parent 971fa94751
commit c0a42ac90c
8 changed files with 647 additions and 565 deletions

View File

@@ -51,6 +51,15 @@ struct NetworkInner {
listener_states: BTreeMap<SocketAddr, Arc<RwLock<ListenerState>>>,
udp_protocol_handlers: BTreeMap<SocketAddr, RawUdpProtocolHandler>,
tls_acceptor: Option<TlsAcceptor>,
udp_port: u16,
tcp_port: u16,
ws_port: u16,
wss_port: u16,
outbound_udpv4_protocol_handler: Option<RawUdpProtocolHandler>,
outbound_udpv6_protocol_handler: Option<RawUdpProtocolHandler>,
outbound_tcp_protocol_handler: Option<RawTcpProtocolHandler>,
outbound_ws_protocol_handler: Option<WebsocketProtocolHandler>,
outbound_wss_protocol_handler: Option<WebsocketProtocolHandler>,
interfaces: NetworkInterfaces,
}
@@ -84,6 +93,15 @@ impl Network {
listener_states: BTreeMap::new(),
udp_protocol_handlers: BTreeMap::new(),
tls_acceptor: None,
udp_port: 0u16,
tcp_port: 0u16,
ws_port: 0u16,
wss_port: 0u16,
outbound_udpv4_protocol_handler: None,
outbound_udpv6_protocol_handler: None,
outbound_tcp_protocol_handler: None,
outbound_ws_protocol_handler: None,
outbound_wss_protocol_handler: None,
interfaces: NetworkInterfaces::new(),
}
}
@@ -267,34 +285,9 @@ impl Network {
c.network.tls.connection_initial_timeout,
)
};
// Create a reusable socket with no linger time, and no delay
let domain = Domain::for_address(addr);
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))
.map_err(|e| format!("Couldn't create TCP socket: {}", e))?;
if let Err(e) = socket.set_linger(None) {
warn!("Couldn't set TCP linger: {}", e);
}
if let Err(e) = socket.set_nodelay(true) {
warn!("Couldn't set TCP nodelay: {}", e);
}
if let Err(e) = socket.set_reuse_address(true) {
warn!("Couldn't set reuse address: {}", e);
}
cfg_if! {
if #[cfg(unix)] {
if let Err(e) = socket.set_reuse_port(true) {
warn!("Couldn't set reuse port: {}", e);
}
}
}
// Bind a listener and stash it with the sockaddr in a table
trace!("spawn_socket_listener: binding to {}", addr);
let socket2_addr = socket2::SockAddr::from(addr);
socket
.bind(&socket2_addr)
.map_err(|e| format!("failed to bind TCP socket: {}", e))?;
let socket = new_shared_tcp_socket(addr)?;
// Listen on the socket
socket
.listen(128)
@@ -461,30 +454,59 @@ impl Network {
}
////////////////////////////////////////////////////////////
async fn spawn_udp_socket(&self, addr: SocketAddr) -> Result<(), String> {
trace!("spawn_udp_socket on {:?}", &addr);
async fn create_udp_outbound_sockets(&self) -> Result<(), String> {
let mut inner = self.inner.lock();
let mut port = inner.udp_port;
// v4
let socket_addr_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port);
if let Ok(socket) = new_shared_udp_socket(socket_addr_v4) {
log_net!("created udpv4 outbound socket on {:?}", &socket_addr_v4);
// Pull the port if we randomly bound, so v6 can be on the same port
port = socket
.local_addr()
.map_err(map_to_string)?
.as_socket_ipv4()
.ok_or("expected ipv4 address type".to_owned())?
.port();
// Make an async UdpSocket from the socket2 socket
let std_udp_socket: std::net::UdpSocket = socket.into();
let udp_socket = UdpSocket::from(std_udp_socket);
let socket_arc = Arc::new(udp_socket);
// Create protocol handler
let udpv4_handler =
RawUdpProtocolHandler::new(inner.network_manager.clone(), socket_arc.clone());
inner.outbound_udpv4_protocol_handler = Some(udpv4_handler);
}
//v6
let socket_addr_v6 =
SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), port);
if let Ok(socket) = new_shared_udp_socket(socket_addr_v6) {
log_net!("created udpv6 outbound socket on {:?}", &socket_addr_v6);
// Make an async UdpSocket from the socket2 socket
let std_udp_socket: std::net::UdpSocket = socket.into();
let udp_socket = UdpSocket::from(std_udp_socket);
let socket_arc = Arc::new(udp_socket);
// Create protocol handler
let udpv6_handler =
RawUdpProtocolHandler::new(inner.network_manager.clone(), socket_arc.clone());
inner.outbound_udpv6_protocol_handler = Some(udpv6_handler);
}
Ok(())
}
async fn spawn_udp_inbound_socket(&self, addr: SocketAddr) -> Result<(), String> {
log_net!("spawn_udp_inbound_socket on {:?}", &addr);
// Create a reusable socket
let domain = Domain::for_address(addr);
let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))
.map_err(|e| format!("Couldn't create UDP socket: {}", e))?;
if let Err(e) = socket.set_reuse_address(true) {
warn!("Couldn't set reuse address: {}", e);
}
cfg_if! {
if #[cfg(unix)] {
if let Err(e) = socket.set_reuse_port(true) {
warn!("Couldn't set reuse port: {}", e);
}
}
}
// Bind a listener and stash it with the sockaddr in a table
trace!("spawn_udp_socket: binding to {}", addr);
let socket2_addr = socket2::SockAddr::from(addr);
socket
.bind(&socket2_addr)
.map_err(|e| format!("failed to bind UDP socket: {}", e))?;
let socket = new_shared_udp_socket(addr)?;
// Make an async UdpSocket from the socket2 socket
let std_udp_socket: std::net::UdpSocket = socket.into();
@@ -582,7 +604,7 @@ impl Network {
if !self.inner.lock().udp_protocol_handlers.contains_key(&addr) {
let ldi_addrs = Self::translate_unspecified_address(&*self.inner.lock(), &addr);
self.clone().spawn_udp_socket(addr).await?;
self.clone().spawn_udp_inbound_socket(addr).await?;
// Return local dial infos we listen on
for ldi_addr in ldi_addrs {
@@ -627,29 +649,28 @@ impl Network {
return Some(ph.clone());
}
}
// otherwise find the first udp protocol handler that matches the ip protocol version of the peer addr
// otherwise find the outbound udp protocol handler that matches the ip protocol version of the peer addr
let inner = self.inner.lock();
for (local_addr, ph) in &inner.udp_protocol_handlers {
if Self::match_socket_addr(&*inner, local_addr, peer_socket_addr) {
return Some(ph.clone());
}
match peer_socket_addr {
SocketAddr::V4(_) => inner.outbound_udpv4_protocol_handler.clone(),
SocketAddr::V6(_) => inner.outbound_udpv6_protocol_handler.clone(),
}
None
}
fn find_best_tcp_local_address(&self, peer_socket_addr: &SocketAddr) -> Option<SocketAddr> {
// Find a matching listening local tcp socket address if we can
let routing_table = self.routing_table();
let dids = routing_table.local_dial_info_for_protocol(ProtocolType::TCP);
fn get_preferred_local_address(
&self,
local_port: u16,
peer_socket_addr: &SocketAddr,
) -> SocketAddr {
let inner = self.inner.lock();
for did in dids {
if let Ok(local_addr) = did.dial_info.to_socket_addr() {
if Self::match_socket_addr(&*inner, &local_addr, peer_socket_addr) {
return Some(local_addr);
}
}
match peer_socket_addr {
SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), local_port),
SocketAddr::V6(_) => SocketAddr::new(
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)),
local_port,
),
}
None
}
async fn send_data_to_existing_connection(
@@ -744,17 +765,30 @@ impl Network {
}
DialInfo::TCP(_) => {
let peer_socket_addr = dial_info.to_socket_addr().map_err(logthru_net!())?;
let some_local_addr = self.find_best_tcp_local_address(&peer_socket_addr);
RawTcpProtocolHandler::connect(network_manager, some_local_addr, peer_socket_addr)
let local_addr =
self.get_preferred_local_address(self.inner.lock().tcp_port, &peer_socket_addr);
RawTcpProtocolHandler::connect(network_manager, local_addr, peer_socket_addr)
.await
.map_err(logthru_net!())?
}
DialInfo::WS(_) => WebsocketProtocolHandler::connect(network_manager, dial_info)
.await
.map_err(logthru_net!(error))?,
DialInfo::WSS(_) => WebsocketProtocolHandler::connect(network_manager, dial_info)
DialInfo::WS(_) => {
let remote_ip_addr = dial_info.resolve()?;
let local_addr =
self.get_preferred_local_address(self.inner.lock().ws_port, &peer_socket_addr);
WebsocketProtocolHandler::connect(network_manager, local_addr, dial_info)
.await
.map_err(logthru_net!(error))?
}
DialInfo::WSS(_) => {
let local_addr =
self.get_preferred_local_address(self.inner.lock().ws_port, &peer_socket_addr);
WebsocketProtocolHandler::connect(network_manager, dial_info)
.await
.map_err(logthru_net!(error))?,
}
};
conn.send(data).await.map_err(logthru_net!(error))
@@ -801,11 +835,12 @@ impl Network {
c.network.protocol.udp.public_address.clone(),
)
};
trace!("UDP: starting listener at {:?}", listen_address);
info!("UDP: starting listener at {:?}", listen_address);
let dial_infos = self.start_udp_handler(listen_address.clone()).await?;
trace!("UDP: listener started");
for x in &dial_infos {
// Pick out UDP port for outbound connections (they will all be the same)
self.inner.lock().udp_port = x.port();
// Register local dial info
routing_table.register_local_dial_info(x.clone(), DialInfoOrigin::Static);
}
@@ -867,6 +902,9 @@ impl Network {
let mut dial_infos: Vec<DialInfo> = Vec::new();
for (a, p) in addresses {
// Pick out WS port for outbound connections (they will all be the same)
self.inner.lock().ws_port = p;
let di = DialInfo::ws(a.address_string(), p, path.clone());
dial_infos.push(di.clone());
routing_table.register_local_dial_info(di, DialInfoOrigin::Static);
@@ -906,7 +944,7 @@ impl Network {
)
};
trace!("WSS: starting listener at {}", listen_address);
let _ = self
let addresses = self
.start_tcp_listener(
listen_address.clone(),
true,
@@ -921,11 +959,14 @@ impl Network {
// This is not the case with unencrypted websockets, which can be specified solely by an IP address
//
// let mut dial_infos: Vec<DialInfo> = Vec::new();
// for (a, p) in addresses {
// let di = DialInfo::wss(a.address_string(), p, path.clone());
// dial_infos.push(di.clone());
// routing_table.register_local_dial_info(di, DialInfoOrigin::Static);
// }
for (_, p) in addresses {
// Pick out WS port for outbound connections (they will all be the same)
self.inner.lock().wss_port = p;
// let di = DialInfo::wss(a.address_string(), p, path.clone());
// dial_infos.push(di.clone());
// routing_table.register_local_dial_info(di, DialInfoOrigin::Static);
}
// Add static public dialinfo if it's configured
if let Some(url) = url.as_ref() {
@@ -974,6 +1015,9 @@ impl Network {
let mut dial_infos: Vec<DialInfo> = Vec::new();
for (a, p) in addresses {
// Pick out TCP port for outbound connections (they will all be the same)
self.inner.lock().tcp_port = p;
let di = DialInfo::tcp(a.to_canonical(), p);
dial_infos.push(di.clone());
routing_table.register_local_dial_info(di, DialInfoOrigin::Static);
@@ -1019,24 +1063,29 @@ impl Network {
pub async fn startup(&self) -> Result<(), String> {
info!("starting network");
let network_manager = self.inner.lock().network_manager.clone();
// initialize interfaces
self.inner.lock().interfaces.refresh()?;
// get listen config
let (listen_udp, listen_tcp, listen_ws, listen_wss) = {
// get network config
let (enabled_udp, connect_tcp, listen_tcp, connect_ws, listen_ws, connect_wss, listen_wss) = {
let c = self.config.get();
(
c.network.protocol.udp.enabled && c.capabilities.protocol_udp,
c.network.protocol.tcp.connect && c.capabilities.protocol_connect_tcp,
c.network.protocol.tcp.listen && c.capabilities.protocol_accept_tcp,
c.network.protocol.ws.connect && c.capabilities.protocol_connect_ws,
c.network.protocol.ws.listen && c.capabilities.protocol_accept_ws,
c.network.protocol.wss.connect && c.capabilities.protocol_connect_wss,
c.network.protocol.wss.listen && c.capabilities.protocol_accept_wss,
)
};
// start listeners
if listen_udp {
if enabled_udp {
self.start_udp_listeners().await?;
self.create_udp_outbound_sockets().await?;
}
if listen_ws {
self.start_ws_listeners().await?;

View File

@@ -6,6 +6,7 @@ pub mod ws;
use super::listener_state::*;
use crate::veilid_api::ProtocolType;
use crate::xx::*;
use socket2::{Domain, Protocol, Socket, Type};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DummyNetworkConnection {}
@@ -61,3 +62,57 @@ impl NetworkConnection {
}
}
}
pub fn new_shared_udp_socket(local_address: SocketAddr) -> Result<socket2::Socket, String> {
let domain = Domain::for_address(local_address);
let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))
.map_err(|e| format!("Couldn't create UDP socket: {}", e))?;
if let Err(e) = socket.set_reuse_address(true) {
log_net!(error "Couldn't set reuse address: {}", e);
}
cfg_if! {
if #[cfg(unix)] {
if let Err(e) = socket.set_reuse_port(true) {
log_net!(error "Couldn't set reuse port: {}", e);
}
}
}
let socket2_addr = socket2::SockAddr::from(local_address);
socket
.bind(&socket2_addr)
.map_err(|e| format!("failed to bind UDP socket: {}", e))?;
Ok(socket)
}
pub fn new_shared_tcp_socket(local_address: SocketAddr) -> Result<socket2::Socket, String> {
let domain = Domain::for_address(local_address);
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))
.map_err(map_to_string)
.map_err(logthru_net!())?;
if let Err(e) = socket.set_linger(None) {
log_net!(error "Couldn't set TCP linger: {}", e);
}
if let Err(e) = socket.set_nodelay(true) {
log_net!(error "Couldn't set TCP nodelay: {}", e);
}
if let Err(e) = socket.set_reuse_address(true) {
log_net!(error "Couldn't set reuse address: {}", e);
}
cfg_if! {
if #[cfg(unix)] {
if let Err(e) = socket.set_reuse_port(true) {
log_net!(error "Couldn't set reuse port: {}", e);
}
}
}
let socket2_addr = socket2::SockAddr::from(local_address);
if let Err(e) = socket.bind(&socket2_addr) {
log_net!(error "failed to bind TCP socket: {}", e);
}
Ok(socket)
}

View File

@@ -6,7 +6,6 @@ use crate::*;
use async_std::net::*;
use async_std::prelude::*;
use async_std::sync::Mutex as AsyncMutex;
use socket2::{Domain, Protocol, Socket, Type};
use std::fmt;
struct RawTcpNetworkConnectionInner {
@@ -170,40 +169,11 @@ impl RawTcpProtocolHandler {
pub async fn connect(
network_manager: NetworkManager,
preferred_local_address: Option<SocketAddr>,
local_address: SocketAddr,
remote_socket_addr: SocketAddr,
) -> Result<NetworkConnection, String> {
// Make a low level socket that can connect to the remote socket address
// and attempt to reuse the local address that our listening socket uses
// for hole-punch compatibility
let domain = Domain::for_address(remote_socket_addr);
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))
.map_err(map_to_string)
.map_err(logthru_net!())?;
if let Err(e) = socket.set_linger(None) {
log_net!("Couldn't set TCP linger: {}", e);
}
if let Err(e) = socket.set_nodelay(true) {
log_net!("Couldn't set TCP nodelay: {}", e);
}
if let Err(e) = socket.set_reuse_address(true) {
log_net!("Couldn't set reuse address: {}", e);
}
cfg_if! {
if #[cfg(unix)] {
if let Err(e) = socket.set_reuse_port(true) {
log_net!("Couldn't set reuse port: {}", e);
}
}
}
// Try to bind it to the preferred local address
if let Some(some_local_addr) = preferred_local_address {
let socket2_addr = socket2::SockAddr::from(some_local_addr);
if let Err(e) = socket.bind(&socket2_addr) {
log_net!(error "failed to bind TCP socket: {}", e);
}
}
// Make a shared socket
let socket = new_shared_tcp_socket(local_address)?;
// Connect to the remote address
let remote_socket2_addr = socket2::SockAddr::from(remote_socket_addr);

View File

@@ -244,19 +244,21 @@ impl WebsocketProtocolHandler {
pub async fn connect(
network_manager: NetworkManager,
local_address: SocketAddr,
dial_info: &DialInfo,
) -> Result<NetworkConnection, String> {
// Split dial info up
let (tls, request, domain, port, protocol_type) = match &dial_info {
DialInfo::WS(di) => (
false,
di.path.clone(),
format!("ws://{}:{}/{}", di.host, di.port, di.path),
di.host.clone(),
di.port,
ProtocolType::WS,
),
DialInfo::WSS(di) => (
true,
di.path.clone(),
format!("wss://{}:{}/{}", di.host, di.port, di.path),
di.host.clone(),
di.port,
ProtocolType::WSS,
@@ -264,24 +266,29 @@ impl WebsocketProtocolHandler {
_ => panic!("invalid dialinfo for WS/WSS protocol"),
};
let tcp_stream = TcpStream::connect(format!("{}:{}", &domain, &port))
.await
// Resolve remote address
let remote_ip_addr = dial_info.resolve()?;
let remote_socket_addr = SocketAddr::new(remote_ip_addr, port);
// Make a shared socket
let socket = new_shared_tcp_socket(local_address)?;
// Connect to the remote address
let remote_socket2_addr = socket2::SockAddr::from(remote_socket_addr);
socket
.connect(&remote_socket2_addr)
.map_err(map_to_string)
.map_err(logthru_net!(error))?;
let local_addr = tcp_stream
.map_err(logthru_net!(error "addr={}", remote_socket_addr))?;
let std_stream: std::net::TcpStream = socket.into();
let tcp_stream = TcpStream::from(std_stream);
// See what local address we ended up with
let actual_local_addr = tcp_stream
.local_addr()
.map_err(map_to_string)
.map_err(logthru_net!(error))?;
let peer_socket_addr = tcp_stream
.peer_addr()
.map_err(map_to_string)
.map_err(logthru_net!(error))?;
let peer_addr = PeerAddress::new(
Address::from_socket_addr(peer_socket_addr),
peer_socket_addr.port(),
protocol_type,
);
.map_err(logthru_net!())?;
// Negotiate TLS if this is WSS
if tls {
let connector = TlsConnector::default();
let tls_stream = connector
@@ -294,9 +301,18 @@ impl WebsocketProtocolHandler {
.map_err(map_to_string)
.map_err(logthru_net!(error))?;
let conn = NetworkConnection::Wss(WebsocketNetworkConnection::new(tls, ws_stream));
// Make the connection descriptor peer address
let peer_addr = PeerAddress::new(
Address::from_socket_addr(remote_socket_addr),
port,
ProtocolType::WSS,
);
// Register the WSS connection
network_manager
.on_new_connection(
ConnectionDescriptor::new(peer_addr, local_addr),
ConnectionDescriptor::new(peer_addr, actual_local_addr),
conn.clone(),
)
.await?;
@@ -307,9 +323,18 @@ impl WebsocketProtocolHandler {
.map_err(map_to_string)
.map_err(logthru_net!(error))?;
let conn = NetworkConnection::Ws(WebsocketNetworkConnection::new(tls, ws_stream));
// Make the connection descriptor peer address
let peer_addr = PeerAddress::new(
Address::from_socket_addr(remote_socket_addr),
port,
ProtocolType::WS,
);
// Register the WS connection
network_manager
.on_new_connection(
ConnectionDescriptor::new(peer_addr, local_addr),
ConnectionDescriptor::new(peer_addr, actual_local_addr),
conn.clone(),
)
.await?;