From 23abaa3c99ab559625e1d71f1ff463d633f70f30 Mon Sep 17 00:00:00 2001 From: John Smith Date: Fri, 24 Dec 2021 18:02:53 -0500 Subject: [PATCH] refactor done for native --- veilid-core/src/intf/native/network/mod.rs | 167 +++++++++--------- .../src/intf/native/network/protocol/mod.rs | 14 +- .../src/intf/native/network/protocol/tcp.rs | 4 - .../src/intf/native/network/protocol/ws.rs | 14 +- .../network/public_dialinfo_discovery.rs | 79 ++++----- veilid-core/src/intf/wasm/network/mod.rs | 3 +- .../src/intf/wasm/network/protocol/mod.rs | 6 - .../src/intf/wasm/network/protocol/ws.rs | 7 - veilid-core/src/routing_table/bucket_entry.rs | 4 +- veilid-core/src/routing_table/find_nodes.rs | 28 ++- veilid-core/src/routing_table/mod.rs | 147 +++++---------- veilid-core/src/routing_table/node_ref.rs | 9 +- veilid-core/src/rpc_processor/mod.rs | 8 +- veilid-core/src/veilid_api/mod.rs | 126 +++++++------ 14 files changed, 266 insertions(+), 350 deletions(-) diff --git a/veilid-core/src/intf/native/network/mod.rs b/veilid-core/src/intf/native/network/mod.rs index 20d2eb13..94e361cb 100644 --- a/veilid-core/src/intf/native/network/mod.rs +++ b/veilid-core/src/intf/native/network/mod.rs @@ -18,13 +18,11 @@ use utils::network_interfaces::*; use async_std::io; use async_std::net::*; use async_tls::TlsAcceptor; -use cfg_if::*; use futures_util::StreamExt; // xxx: rustls ^0.20 //use rustls::{server::NoClientAuth, Certificate, PrivateKey, ServerConfig}; use rustls::{Certificate, NoClientAuth, PrivateKey, ServerConfig}; use rustls_pemfile::{certs, pkcs8_private_keys, rsa_private_keys}; -use socket2::{Domain, Protocol, Socket, Type}; use std::fs::File; use std::io::BufReader; use std::path::{Path, PathBuf}; @@ -44,6 +42,7 @@ struct NetworkInner { protocol_config: Option, udp_static_public_dialinfo: bool, tcp_static_public_dialinfo: bool, + ws_static_public_dialinfo: bool, network_class: Option, join_handles: Vec>, listener_states: BTreeMap>>, @@ -55,9 +54,6 @@ struct NetworkInner { wss_port: u16, outbound_udpv4_protocol_handler: Option, outbound_udpv6_protocol_handler: Option, - outbound_tcp_protocol_handler: Option, - outbound_ws_protocol_handler: Option, - outbound_wss_protocol_handler: Option, interfaces: NetworkInterfaces, } @@ -84,6 +80,7 @@ impl Network { protocol_config: None, udp_static_public_dialinfo: false, tcp_static_public_dialinfo: false, + ws_static_public_dialinfo: false, network_class: None, join_handles: Vec::new(), listener_states: BTreeMap::new(), @@ -95,9 +92,6 @@ impl Network { wss_port: 0u16, outbound_udpv4_protocol_handler: None, outbound_udpv6_protocol_handler: None, - outbound_tcp_protocol_handler: None, - outbound_ws_protocol_handler: None, - outbound_wss_protocol_handler: None, interfaces: NetworkInterfaces::new(), } } @@ -464,7 +458,7 @@ impl Network { .local_addr() .map_err(map_to_string)? .as_socket_ipv4() - .ok_or("expected ipv4 address type".to_owned())? + .ok_or_else(|| "expected ipv4 address type".to_owned())? .port(); // Make an async UdpSocket from the socket2 socket @@ -639,7 +633,6 @@ impl Network { local_port: u16, peer_socket_addr: &SocketAddr, ) -> SocketAddr { - let inner = self.inner.lock(); match peer_socket_addr { SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), local_port), SocketAddr::V6(_) => SocketAddr::new( @@ -807,15 +800,27 @@ impl Network { }; info!("UDP: starting listener at {:?}", listen_address); let dial_infos = self.start_udp_handler(listen_address.clone()).await?; - for x in &dial_infos { + let mut static_public = false; + for di in &dial_infos { // Pick out UDP port for outbound connections (they will all be the same) - self.inner.lock().udp_port = x.port(); - // Register local dial info - routing_table.register_local_dial_info(x.clone(), DialInfoOrigin::Static); + self.inner.lock().udp_port = di.port(); + + // Register local dial info only here if we specify a public address + if public_address.is_none() && di.is_global() { + // Register global dial info if no public address is specified + routing_table.register_dial_info( + di.clone(), + DialInfoOrigin::Static, + Some(NetworkClass::Server), + ); + static_public = true; + } else if di.is_local() { + // Register local dial info + routing_table.register_dial_info(di.clone(), DialInfoOrigin::Static, None); + } } // Add static public dialinfo if it's configured - let mut static_public = false; if let Some(public_address) = public_address.as_ref() { // Resolve statically configured public dialinfo let mut public_sockaddrs = public_address @@ -825,26 +830,15 @@ impl Network { // Add all resolved addresses as public dialinfo for pdi_addr in &mut public_sockaddrs { - routing_table.register_global_dial_info( + routing_table.register_dial_info( DialInfo::udp_from_socketaddr(pdi_addr), - Some(NetworkClass::Server), DialInfoOrigin::Static, + Some(NetworkClass::Server), ); static_public = true; } - } else { - // Register local dial info as public if it is publicly routable - for x in &dial_infos { - if x.is_global() { - routing_table.register_global_dial_info( - x.clone(), - Some(NetworkClass::Server), - DialInfoOrigin::Static, - ); - static_public = true; - } - } } + self.inner.lock().udp_static_public_dialinfo = static_public; Ok(()) } @@ -869,18 +863,35 @@ impl Network { .await?; trace!("WS: listener started"); - let mut dial_infos: Vec = Vec::new(); + let mut static_public = false; for socket_address in socket_addresses { // Pick out WS port for outbound connections (they will all be the same) self.inner.lock().ws_port = socket_address.port(); - // Build local dial info request url - let local_url = format!("ws://{}/{}", socket_address, path); - // Create local dial info - let di = DialInfo::try_ws(socket_address, local_url) - .map_err(map_to_string) - .map_err(logthru_net!(error))?; - dial_infos.push(di.clone()); - routing_table.register_local_dial_info(di, DialInfoOrigin::Static); + + if url.is_none() && socket_address.address().is_global() { + // Build global dial info request url + let global_url = format!("ws://{}/{}", socket_address, path); + + // Create global dial info + let di = DialInfo::try_ws(socket_address, global_url) + .map_err(map_to_string) + .map_err(logthru_net!(error))?; + routing_table.register_dial_info( + di, + DialInfoOrigin::Static, + Some(NetworkClass::Server), + ); + static_public = true; + } else if socket_address.address().is_local() { + // Build local dial info request url + let local_url = format!("ws://{}/{}", socket_address, path); + + // Create local dial info + let di = DialInfo::try_ws(socket_address, local_url) + .map_err(map_to_string) + .map_err(logthru_net!(error))?; + routing_table.register_dial_info(di, DialInfoOrigin::Static, None); + } } // Add static public dialinfo if it's configured @@ -900,15 +911,17 @@ impl Network { .map_err(logthru_net!(error))?; for gsa in global_socket_addrs { - routing_table.register_global_dial_info( + routing_table.register_dial_info( DialInfo::try_ws(SocketAddress::from_socket_addr(gsa), url.clone()) .map_err(map_to_string) .map_err(logthru_net!(error))?, - Some(NetworkClass::Server), DialInfoOrigin::Static, + Some(NetworkClass::Server), ); } + static_public = true; } + self.inner.lock().ws_static_public_dialinfo = static_public; Ok(()) } @@ -937,13 +950,9 @@ impl Network { // is specified, then TLS won't validate, so no local dialinfo is possible. // This is not the case with unencrypted websockets, which can be specified solely by an IP address // - // let mut dial_infos: Vec = Vec::new(); - for socket_address in socket_addresses { + if let Some(socket_address) = socket_addresses.first() { // Pick out WSS port for outbound connections (they will all be the same) self.inner.lock().wss_port = socket_address.port(); - - // Don't register local dial info because TLS won't allow that anyway without a local CA - // and we aren't doing that yet at all today. } // Add static public dialinfo if it's configured @@ -964,12 +973,12 @@ impl Network { .map_err(logthru_net!(error))?; for gsa in global_socket_addrs { - routing_table.register_global_dial_info( + routing_table.register_dial_info( DialInfo::try_wss(SocketAddress::from_socket_addr(gsa), url.clone()) .map_err(map_to_string) .map_err(logthru_net!(error))?, - Some(NetworkClass::Server), DialInfoOrigin::Static, + Some(NetworkClass::Server), ); } } else { @@ -998,18 +1007,29 @@ impl Network { .await?; trace!("TCP: listener started"); - let mut dial_infos: Vec = Vec::new(); + let mut static_public = false; for socket_address in socket_addresses { // Pick out TCP port for outbound connections (they will all be the same) self.inner.lock().tcp_port = socket_address.port(); let di = DialInfo::tcp(socket_address); - dial_infos.push(di.clone()); - routing_table.register_local_dial_info(di, DialInfoOrigin::Static); + + // Register local dial info only here if we specify a public address + if public_address.is_none() && di.is_global() { + // Register global dial info if no public address is specified + routing_table.register_dial_info( + di.clone(), + DialInfoOrigin::Static, + Some(NetworkClass::Server), + ); + static_public = true; + } else if di.is_local() { + // Register local dial info + routing_table.register_dial_info(di.clone(), DialInfoOrigin::Static, None); + } } // Add static public dialinfo if it's configured - let mut static_public = false; if let Some(public_address) = public_address.as_ref() { // Resolve statically configured public dialinfo let mut public_sockaddrs = public_address @@ -1019,25 +1039,13 @@ impl Network { // Add all resolved addresses as public dialinfo for pdi_addr in &mut public_sockaddrs { - routing_table.register_global_dial_info( + routing_table.register_dial_info( DialInfo::tcp_from_socketaddr(pdi_addr), - None, DialInfoOrigin::Static, + None, ); static_public = true; } - } else { - // Register local dial info as public if it is publicly routable - for x in &dial_infos { - if x.is_global() { - routing_table.register_global_dial_info( - x.clone(), - Some(NetworkClass::Server), - DialInfoOrigin::Static, - ); - static_public = true; - } - } } self.inner.lock().tcp_static_public_dialinfo = static_public; @@ -1046,12 +1054,11 @@ impl Network { } pub fn get_protocol_config(&self) -> Option { - self.inner.lock().protocol_config.clone() + self.inner.lock().protocol_config } pub async fn startup(&self) -> Result<(), String> { info!("starting network"); - let network_manager = self.inner.lock().network_manager.clone(); // initialize interfaces self.inner.lock().interfaces.refresh()?; @@ -1103,8 +1110,7 @@ impl Network { let routing_table = network_manager.routing_table(); // Drop all dial info - routing_table.clear_local_dial_info(); - routing_table.clear_global_dial_info(); + routing_table.clear_dial_info_details(); // Cancels all async background tasks by dropping join handles *self.inner.lock() = Self::new_inner(network_manager); @@ -1151,10 +1157,7 @@ impl Network { let inner = self.inner.lock(); ( inner.network_manager.routing_table(), - inner - .protocol_config - .clone() - .unwrap_or_else(|| ProtocolConfig::default()), + inner.protocol_config.unwrap_or_default(), inner.udp_static_public_dialinfo, inner.tcp_static_public_dialinfo, inner.network_class.unwrap_or(NetworkClass::Invalid), @@ -1170,12 +1173,11 @@ impl Network { && !udp_static_public_dialinfo && (network_class.inbound_capable() || network_class == NetworkClass::Invalid) { - let filter = DialInfoFilter::with_protocol_type_and_address_type( - ProtocolType::UDP, - AddressType::IPV4, - ); + let filter = DialInfoFilter::global() + .with_protocol_type(ProtocolType::UDP) + .with_address_type(AddressType::IPV4); let need_udpv4_dialinfo = routing_table - .first_filtered_global_dial_info_details(|d| d.dial_info.matches_filter(&filter)) + .first_filtered_dial_info_detail(&filter) .is_none(); if need_udpv4_dialinfo { // If we have no public UDPv4 dialinfo, then we need to run a NAT check @@ -1192,12 +1194,11 @@ impl Network { && !tcp_static_public_dialinfo && (network_class.inbound_capable() || network_class == NetworkClass::Invalid) { - let filter = DialInfoFilter::with_protocol_type_and_address_type( - ProtocolType::TCP, - AddressType::IPV4, - ); + let filter = DialInfoFilter::global() + .with_protocol_type(ProtocolType::TCP) + .with_address_type(AddressType::IPV4); let need_tcpv4_dialinfo = routing_table - .first_filtered_global_dial_info_details(|d| d.dial_info.matches_filter(&filter)) + .first_filtered_dial_info_detail(&filter) .is_none(); if need_tcpv4_dialinfo { // If we have no public TCPv4 dialinfo, then we need to run a NAT check diff --git a/veilid-core/src/intf/native/network/protocol/mod.rs b/veilid-core/src/intf/native/network/protocol/mod.rs index 9a9b2a60..00464fde 100644 --- a/veilid-core/src/intf/native/network/protocol/mod.rs +++ b/veilid-core/src/intf/native/network/protocol/mod.rs @@ -4,17 +4,14 @@ pub mod wrtc; pub mod ws; use super::listener_state::*; -use crate::veilid_api::ProtocolType; use crate::xx::*; +use crate::*; use socket2::{Domain, Protocol, Socket, Type}; #[derive(Debug, Clone, PartialEq, Eq)] pub struct DummyNetworkConnection {} impl DummyNetworkConnection { - pub fn protocol_type(&self) -> ProtocolType { - ProtocolType::UDP - } pub fn send(&self, _message: Vec) -> SystemPinBoxFuture> { Box::pin(async { Ok(()) }) } @@ -34,15 +31,6 @@ pub enum NetworkConnection { } impl NetworkConnection { - pub fn protocol_type(&self) -> ProtocolType { - match self { - Self::Dummy(d) => d.protocol_type(), - Self::RawTcp(t) => t.protocol_type(), - Self::WsAccepted(w) => w.protocol_type(), - Self::Ws(w) => w.protocol_type(), - Self::Wss(w) => w.protocol_type(), - } - } pub fn send(&self, message: Vec) -> SystemPinBoxFuture> { match self { Self::Dummy(d) => d.send(message), diff --git a/veilid-core/src/intf/native/network/protocol/tcp.rs b/veilid-core/src/intf/native/network/protocol/tcp.rs index 01074d86..e7774790 100644 --- a/veilid-core/src/intf/native/network/protocol/tcp.rs +++ b/veilid-core/src/intf/native/network/protocol/tcp.rs @@ -44,10 +44,6 @@ impl RawTcpNetworkConnection { } impl RawTcpNetworkConnection { - pub fn protocol_type(&self) -> ProtocolType { - ProtocolType::TCP - } - pub fn send(&self, message: Vec) -> SystemPinBoxFuture> { let inner = self.inner.clone(); diff --git a/veilid-core/src/intf/native/network/protocol/ws.rs b/veilid-core/src/intf/native/network/protocol/ws.rs index 7959043e..e055a9f2 100644 --- a/veilid-core/src/intf/native/network/protocol/ws.rs +++ b/veilid-core/src/intf/native/network/protocol/ws.rs @@ -80,14 +80,6 @@ where } } - pub fn protocol_type(&self) -> ProtocolType { - if self.tls { - ProtocolType::WSS - } else { - ProtocolType::WS - } - } - pub fn send(&self, message: Vec) -> SystemPinBoxFuture> { let inner = self.inner.clone(); @@ -248,9 +240,9 @@ impl WebsocketProtocolHandler { dial_info: &DialInfo, ) -> Result { // Split dial info up - let (tls, protocol_type, scheme) = match &dial_info { - DialInfo::WS(_) => (false, ProtocolType::WS, "ws"), - DialInfo::WSS(_) => (true, ProtocolType::WSS, "wss"), + let (tls, scheme) = match &dial_info { + DialInfo::WS(_) => (false, "ws"), + DialInfo::WSS(_) => (true, "wss"), _ => panic!("invalid dialinfo for WS/WSS protocol"), }; let request = dial_info.request().unwrap(); diff --git a/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs b/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs index c78e8473..a0ea6112 100644 --- a/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs +++ b/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs @@ -5,8 +5,6 @@ use crate::network_manager::*; use crate::routing_table::*; use crate::*; -use async_std::net::*; - impl Network { // Ask for a public address check from a particular noderef async fn request_public_address(&self, node_ref: NodeRef) -> Option { @@ -22,17 +20,20 @@ impl Network { .unwrap_or(None) } - xxx convert to filter // find fast peers with a particular address type, and ask them to tell us what our external address is async fn discover_external_address( &self, - protocol_address_type: ProtocolAddressType, + protocol_type: ProtocolType, + address_type: AddressType, ignore_node: Option, - ) -> Option<(SocketAddr, NodeRef)> { + ) -> Option<(SocketAddress, NodeRef)> { let routing_table = self.routing_table(); - let peers = routing_table.get_fast_nodes_of_type(protocol_address_type); + let filter = DialInfoFilter::global() + .with_protocol_type(protocol_type) + .with_address_type(address_type); + let peers = routing_table.find_fast_nodes_filtered(&filter); if peers.is_empty() { - log_net!("no peers of type '{:?}'", protocol_address_type); + log_net!("no peers of type '{:?}'", filter); return None; } for peer in peers { @@ -49,24 +50,20 @@ impl Network { None } - fn get_interface_addresses( + fn get_local_addresses( &self, - protocol_address_type: ProtocolAddressType, - ) -> Vec { + protocol_type: ProtocolType, + address_type: AddressType, + ) -> Vec { let routing_table = self.routing_table(); + let filter = DialInfoFilter::local() + .with_protocol_type(protocol_type) + .with_address_type(address_type); routing_table - .get_own_peer_info(PeerScope::Local) - .dial_infos + .all_filtered_dial_info_details(&filter) .iter() - .filter_map(|di| { - if di.protocol_address_type() == protocol_address_type { - if let Ok(addr) = di.to_socket_addr() { - return Some(addr); - } - } - None - }) + .map(|did| did.dial_info.socket_address()) .collect() } @@ -88,11 +85,12 @@ impl Network { .unwrap_or(false) } - async fn try_port_mapping>( + async fn try_port_mapping>( &self, _intf_addrs: I, - _protocol_address_type: ProtocolAddressType, - ) -> Option { + _protocol_type: ProtocolType, + _address_type: AddressType, + ) -> Option { //xxx None } @@ -107,13 +105,13 @@ impl Network { }; // Get our interface addresses - let intf_addrs = self.get_interface_addresses(ProtocolAddressType::UDPv4); + let intf_addrs = self.get_local_addresses(ProtocolType::UDP, AddressType::IPV4); // Loop for restricted NAT retries loop { // Get our external address from some fast node, call it node B let (external1, node_b) = match self - .discover_external_address(ProtocolAddressType::UDPv4, None) + .discover_external_address(ProtocolType::UDP, AddressType::IPV4, None) .await { None => { @@ -122,7 +120,7 @@ impl Network { } Some(v) => v, }; - let external1_dial_info = DialInfo::udp_from_socketaddr(external1); + let external1_dial_info = DialInfo::udp(external1); // If our local interface list contains external1 then there is no NAT in place if intf_addrs.contains(&external1) { @@ -133,10 +131,10 @@ impl Network { .await { // Add public dial info with Server network class - routing_table.register_global_dial_info( + routing_table.register_dial_info( external1_dial_info, - Some(NetworkClass::Server), DialInfoOrigin::Discovered, + Some(NetworkClass::Server), ); // No more retries @@ -149,15 +147,15 @@ impl Network { // There is -some NAT- // Attempt a UDP port mapping via all available and enabled mechanisms if let Some(external_mapped) = self - .try_port_mapping(&intf_addrs, ProtocolAddressType::UDPv4) + .try_port_mapping(&intf_addrs, ProtocolType::UDP, AddressType::IPV4) .await { // Got a port mapping, let's use it - let external_mapped_dial_info = DialInfo::udp_from_socketaddr(external_mapped); - routing_table.register_global_dial_info( + let external_mapped_dial_info = DialInfo::udp(external_mapped); + routing_table.register_dial_info( external_mapped_dial_info, - Some(NetworkClass::Mapped), DialInfoOrigin::Mapped, + Some(NetworkClass::Mapped), ); // No more retries @@ -177,10 +175,10 @@ impl Network { { // Yes, another machine can use the dial info directly, so Full Cone // Add public dial info with full cone NAT network class - routing_table.register_global_dial_info( + routing_table.register_dial_info( external1_dial_info, - Some(NetworkClass::FullNAT), DialInfoOrigin::Discovered, + Some(NetworkClass::FullNAT), ); // No more retries @@ -191,7 +189,8 @@ impl Network { // Get our external address from some fast node, that is not node B, call it node D let (external2, node_d) = match self .discover_external_address( - ProtocolAddressType::UDPv4, + ProtocolType::UDP, + AddressType::IPV4, Some(node_b.node_id()), ) .await @@ -214,7 +213,7 @@ impl Network { // we should go through our retries before we assign a dial info if retry_count == 0 { // Address is the same, so it's address or port restricted - let external2_dial_info = DialInfo::udp_from_socketaddr(external2); + let external2_dial_info = DialInfo::udp(external2); // Do a validate_dial_info on the external address from a routed node if self .validate_dial_info( @@ -226,17 +225,17 @@ impl Network { .await { // Got a reply from a non-default port, which means we're only address restricted - routing_table.register_global_dial_info( + routing_table.register_dial_info( external1_dial_info, - Some(NetworkClass::AddressRestrictedNAT), DialInfoOrigin::Discovered, + Some(NetworkClass::AddressRestrictedNAT), ); } else { // Didn't get a reply from a non-default port, which means we are also port restricted - routing_table.register_global_dial_info( + routing_table.register_dial_info( external1_dial_info, - Some(NetworkClass::PortRestrictedNAT), DialInfoOrigin::Discovered, + Some(NetworkClass::PortRestrictedNAT), ); } } diff --git a/veilid-core/src/intf/wasm/network/mod.rs b/veilid-core/src/intf/wasm/network/mod.rs index 8ccae1c2..157a1be6 100644 --- a/veilid-core/src/intf/wasm/network/mod.rs +++ b/veilid-core/src/intf/wasm/network/mod.rs @@ -185,8 +185,7 @@ impl Network { let routing_table = network_manager.routing_table(); // Drop all dial info - routing_table.clear_local_dial_info(); - routing_table.clear_global_dial_info(); + routing_table.clear_dial_info_details(); // Cancels all async background tasks by dropping join handles *self.inner.lock() = Self::new_inner(network_manager); diff --git a/veilid-core/src/intf/wasm/network/protocol/mod.rs b/veilid-core/src/intf/wasm/network/protocol/mod.rs index 890b8d9b..91c5fb96 100644 --- a/veilid-core/src/intf/wasm/network/protocol/mod.rs +++ b/veilid-core/src/intf/wasm/network/protocol/mod.rs @@ -27,12 +27,6 @@ pub enum NetworkConnection { } impl NetworkConnection { - pub fn protocol_type(&self) -> ProtocolType { - match self { - Self::Dummy(d) => d.protocol_type(), - Self::WS(w) => w.protocol_type(), - } - } pub fn send(&self, message: Vec) -> SystemPinBoxFuture> { match self { Self::Dummy(d) => d.send(message), diff --git a/veilid-core/src/intf/wasm/network/protocol/ws.rs b/veilid-core/src/intf/wasm/network/protocol/ws.rs index 9147df7b..7499e960 100644 --- a/veilid-core/src/intf/wasm/network/protocol/ws.rs +++ b/veilid-core/src/intf/wasm/network/protocol/ws.rs @@ -45,13 +45,6 @@ impl WebsocketNetworkConnection { } impl WebsocketNetworkConnection { - pub fn protocol_type(&self) -> ProtocolType { - if self.tls { - ProtocolType::WSS - } else { - ProtocolType::WS - } - } pub fn send(&self, message: Vec) -> SystemPinBoxFuture> { let inner = self.inner.clone(); Box::pin(async move { diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index af0c38f5..39621aa2 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -76,7 +76,7 @@ impl BucketEntry { where F: Fn(&DialInfo) -> bool, { - let ret = Vec::new(); + let mut ret = Vec::new(); for di in &self.dial_infos { if filter(di) { ret.push(di.clone()); @@ -86,7 +86,7 @@ impl BucketEntry { } pub fn dial_infos(&self) -> &[DialInfo] { - &self.dial_infos.clone() + &self.dial_infos } pub fn get_peer_info(&self, key: DHTKey, scope: PeerScope) -> PeerInfo { diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index 7ffa4865..b779639a 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -8,10 +8,11 @@ use crate::*; pub type FilterType = Box)) -> bool>; impl RoutingTable { - // Retrieve the fastest nodes in the routing table with a particular kind of protocol address type + // Retrieve the fastest nodes in the routing table with a particular kind of protocol and address type // Returns noderefs are are scoped to that address type only - pub fn get_fast_nodes_filtered(&self, dial_info_filter: &DialInfoFilter) -> Vec { - let dial_info_filter = dial_info_filter.clone(); + pub fn find_fast_nodes_filtered(&self, dial_info_filter: &DialInfoFilter) -> Vec { + let dial_info_filter1 = dial_info_filter.clone(); + let dial_info_filter2 = dial_info_filter.clone(); self.find_fastest_nodes( // filter Some(Box::new( @@ -20,7 +21,7 @@ impl RoutingTable { .1 .as_ref() .unwrap() - .first_filtered_dial_info(|di| di.matches_filter(&dial_info_filter)) + .first_filtered_dial_info(|di| di.matches_filter(&dial_info_filter1)) .is_some() }, )), @@ -30,27 +31,22 @@ impl RoutingTable { self.clone(), *e.0, e.1.as_mut().unwrap(), - dial_info_filter.clone(), + dial_info_filter2.clone(), ) }, ) } pub fn get_own_peer_info(&self, scope: PeerScope) -> PeerInfo { - let dial_infos = match scope { - PeerScope::All => { - let mut divec = self.global_dial_info_details(); - divec.append(&mut self.local_dial_info_details()); - divec.dedup(); - divec - } - PeerScope::Global => self.global_dial_info_details(), - PeerScope::Local => self.local_dial_info_details(), - }; + let filter = DialInfoFilter::scoped(scope); PeerInfo { node_id: NodeId::new(self.node_id()), - dial_infos: dial_infos.iter().map(|x| x.dial_info.clone()).collect(), + dial_infos: self + .all_filtered_dial_info_details(&filter) + .iter() + .map(|did| did.dial_info.clone()) + .collect(), } } diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 0bd5aa00..4948a41d 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -37,13 +37,18 @@ pub struct DialInfoDetail { pub timestamp: u64, } +impl MatchesDialInfoFilter for DialInfoDetail { + fn matches_filter(&self, filter: &DialInfoFilter) -> bool { + self.dial_info.matches_filter(filter) + } +} + struct RoutingTableInner { network_manager: NetworkManager, node_id: DHTKey, node_id_secret: DHTKeySecret, buckets: Vec, - local_dial_info: Vec, - global_dial_info: Vec, + dial_info_details: Vec, bucket_entry_count: usize, // Waiters eventual_changed_dial_info: Eventual, @@ -75,8 +80,7 @@ impl RoutingTable { node_id: DHTKey::default(), node_id_secret: DHTKeySecret::default(), buckets: Vec::new(), - local_dial_info: Vec::new(), - global_dial_info: Vec::new(), + dial_info_details: Vec::new(), bucket_entry_count: 0, eventual_changed_dial_info: Eventual::new(), stats_accounting: StatsAccounting::new(), @@ -150,125 +154,71 @@ impl RoutingTable { } pub fn has_local_dial_info(&self) -> bool { - let inner = self.inner.lock(); - !inner.local_dial_info.is_empty() + self.first_filtered_dial_info_detail(&DialInfoFilter::local()) + .is_some() + } + + pub fn has_global_dial_info(&self) -> bool { + self.first_filtered_dial_info_detail(&DialInfoFilter::global()) + .is_some() + } + + pub fn global_dial_info_details(&self) -> Vec { + self.all_filtered_dial_info_details(&DialInfoFilter::global()) } pub fn local_dial_info_details(&self) -> Vec { - let inner = self.inner.lock(); - inner.local_dial_info.clone() + self.all_filtered_dial_info_details(&DialInfoFilter::local()) } - pub fn first_filtered_local_dial_info_details(&self, filter: F) -> Option - where - F: Fn(&DialInfoDetail) -> bool, - { + pub fn first_filtered_dial_info_detail( + &self, + filter: &DialInfoFilter, + ) -> Option { let inner = self.inner.lock(); - for did in &inner.local_dial_info { - if filter(did) { + for did in &inner.dial_info_details { + if did.matches_filter(filter) { return Some(did.clone()); } } None } - pub fn all_filtered_local_dial_info_details(&self, filter: F) -> Vec - where - F: Fn(&DialInfoDetail) -> bool, - { + pub fn all_filtered_dial_info_details(&self, filter: &DialInfoFilter) -> Vec { let inner = self.inner.lock(); - let ret = Vec::new(); - for did in &inner.local_dial_info { - if filter(did) { + let mut ret = Vec::new(); + for did in &inner.dial_info_details { + if did.matches_filter(filter) { ret.push(did.clone()); } } ret } - pub fn register_local_dial_info(&self, dial_info: DialInfo, origin: DialInfoOrigin) { + pub fn register_dial_info( + &self, + dial_info: DialInfo, + origin: DialInfoOrigin, + network_class: Option, + ) { let timestamp = get_timestamp(); let mut inner = self.inner.lock(); - inner.local_dial_info.push(DialInfoDetail { + inner.dial_info_details.push(DialInfoDetail { dial_info: dial_info.clone(), origin, - network_class: None, + network_class, timestamp, }); info!( - "Local Dial Info: {}", - NodeDialInfoSingle { - node_id: NodeId::new(inner.node_id), - dial_info - } - .to_string(), - ); - debug!(" Origin: {:?}", origin); - - Self::trigger_changed_dial_info(&mut *inner); - } - - pub fn clear_local_dial_info(&self) { - let mut inner = self.inner.lock(); - inner.local_dial_info.clear(); - Self::trigger_changed_dial_info(&mut *inner); - } - - pub fn has_global_dial_info(&self) -> bool { - let inner = self.inner.lock(); - !inner.global_dial_info.is_empty() - } - - pub fn global_dial_info_details(&self) -> Vec { - let inner = self.inner.lock(); - inner.global_dial_info.clone() - } - - pub fn first_filtered_global_dial_info_details(&self, filter: F) -> Option - where - F: Fn(&DialInfoDetail) -> bool, - { - let inner = self.inner.lock(); - for did in &inner.global_dial_info { - if filter(did) { - return Some(did.clone()); - } - } - None - } - pub fn all_filtered_global_dial_info_details(&self, filter: F) -> Vec - where - F: Fn(&DialInfoDetail) -> bool, - { - let inner = self.inner.lock(); - let ret = Vec::new(); - for did in &inner.global_dial_info { - if filter(did) { - ret.push(did.clone()); - } - } - ret - } - - pub fn register_global_dial_info( - &self, - dial_info: DialInfo, - network_class: Option, - origin: DialInfoOrigin, - ) { - let ts = get_timestamp(); - let mut inner = self.inner.lock(); - - inner.global_dial_info.push(DialInfoDetail { - dial_info: dial_info.clone(), - origin, - network_class, - timestamp: ts, - }); - - info!( - "Global Dial Info: {}", + "{}Dial Info: {}", + if dial_info.is_local() { + "Local " + } else if dial_info.is_global() { + "Global " + } else { + "Other " + }, NodeDialInfoSingle { node_id: NodeId::new(inner.node_id), dial_info @@ -277,12 +227,13 @@ impl RoutingTable { ); debug!(" Origin: {:?}", origin); debug!(" Network Class: {:?}", network_class); + Self::trigger_changed_dial_info(&mut *inner); } - pub fn clear_global_dial_info(&self) { + pub fn clear_dial_info_details(&self) { let mut inner = self.inner.lock(); - inner.global_dial_info.clear(); + inner.dial_info_details.clear(); Self::trigger_changed_dial_info(&mut *inner); } diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 7bea1c13..70cb61d3 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -49,12 +49,7 @@ impl NodeRef { // Returns the best dial info to attempt a connection to this node pub fn best_dial_info(&self) -> Option { let nm = self.routing_table.network_manager(); - let protocol_config = nm.get_protocol_config(); - if protocol_config.is_none() { - return None; - } - let protocol_config = protocol_config.unwrap(); - + let protocol_config = nm.get_protocol_config()?; self.operate(|e| { e.first_filtered_dial_info(|di| { // Does it match the dial info filter @@ -99,7 +94,7 @@ impl Clone for NodeRef { impl fmt::Debug for NodeRef { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let out = format!("{}", self.node_id.encode()); + let mut out = self.node_id.encode(); if !self.dial_info_filter.is_empty() { out += &format!("{:?}", self.dial_info_filter); } diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 97176cbd..d707ded3 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -867,23 +867,23 @@ impl RPCProcessor { if redirect { let routing_table = self.routing_table(); let filter = dial_info.make_filter(true); - let peers = routing_table.get_fast_nodes_filtered(&filter); + let peers = routing_table.find_fast_nodes_filtered(&filter); if peers.is_empty() { return Err(rpc_error_internal(format!( "no peers matching filter '{:?}'", filter ))); } - for peer in peers { // See if this peer will validate dial info - if !peer.operate(|e| { + let will_validate_dial_info = peer.operate(|e: &mut BucketEntry| { if let Some(ni) = &e.peer_stats().node_info { ni.will_validate_dial_info } else { true } - }) { + }); + if !will_validate_dial_info { continue; } // Make a copy of the request, without the redirect flag diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 533967a7..31dca076 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + mod debug; pub use debug::*; @@ -209,8 +211,8 @@ impl Address { } pub fn address_type(&self) -> AddressType { match self { - Address::IPV4(v4) => AddressType::IPV4, - Address::IPV6(v6) => AddressType::IPV6, + Address::IPV4(_) => AddressType::IPV4, + Address::IPV6(_) => AddressType::IPV6, } } pub fn address_string(&self) -> String { @@ -227,14 +229,14 @@ impl Address { } pub fn is_global(&self) -> bool { match self { - Address::IPV4(v4) => ipv4addr_is_global(&v4), - Address::IPV6(v6) => ipv6addr_is_global(&v6), + Address::IPV4(v4) => ipv4addr_is_global(v4), + Address::IPV6(v6) => ipv6addr_is_global(v6), } } pub fn is_local(&self) -> bool { match self { - Address::IPV4(v4) => ipv4addr_is_private(&v4), - Address::IPV6(v6) => ipv6addr_is_unicast_site_local(&v6), + Address::IPV4(v4) => ipv4addr_is_private(v4), + Address::IPV6(v6) => ipv6addr_is_unicast_site_local(v6), } } pub fn to_ip_addr(&self) -> IpAddr { @@ -334,30 +336,42 @@ pub struct DialInfoFilter { } impl DialInfoFilter { - pub fn new_empty() -> Self { + pub fn all() -> Self { Self { peer_scope: PeerScope::All, protocol_type: None, address_type: None, } } - pub fn with_protocol_type(protocol_type: ProtocolType) -> Self { + pub fn global() -> Self { Self { - peer_scope: PeerScope::All, - protocol_type: Some(protocol_type), + peer_scope: PeerScope::Global, + protocol_type: None, address_type: None, } } - pub fn with_protocol_type_and_address_type( - protocol_type: ProtocolType, - address_type: AddressType, - ) -> Self { + pub fn local() -> Self { Self { - peer_scope: PeerScope::All, - protocol_type: Some(protocol_type), - address_type: Some(address_type), + peer_scope: PeerScope::Local, + protocol_type: None, + address_type: None, } } + pub fn scoped(peer_scope: PeerScope) -> Self { + Self { + peer_scope, + protocol_type: None, + address_type: None, + } + } + pub fn with_protocol_type(mut self, protocol_type: ProtocolType) -> Self { + self.protocol_type = Some(protocol_type); + self + } + pub fn with_address_type(mut self, address_type: AddressType) -> Self { + self.address_type = Some(address_type); + self + } pub fn is_empty(&self) -> bool { self.peer_scope == PeerScope::All && self.protocol_type.is_none() @@ -379,6 +393,10 @@ impl fmt::Debug for DialInfoFilter { } } +pub trait MatchesDialInfoFilter { + fn matches_filter(&self, filter: &DialInfoFilter) -> bool; +} + #[derive(Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq)] pub struct DialInfoUDP { pub socket_address: SocketAddress, @@ -430,9 +448,9 @@ impl fmt::Display for DialInfo { impl FromStr for DialInfo { type Err = VeilidAPIError; fn from_str(s: &str) -> Result { - let (proto, rest) = s.split_once('|').ok_or_else(|| { - parse_error!("SocketAddress::from_str missing protocol '|' separator", s) - })?; + let (proto, rest) = s + .split_once('|') + .ok_or_else(|| parse_error!("DialInfo::from_str missing protocol '|' separator", s))?; match proto { "udp" => { let socket_address = SocketAddress::from_str(rest)?; @@ -444,24 +462,19 @@ impl FromStr for DialInfo { } "ws" => { let (sa, rest) = s.split_once('|').ok_or_else(|| { - parse_error!( - "SocketAddress::from_str missing socket address '|' separator", - s - ) + parse_error!("DialInfo::from_str missing socket address '|' separator", s) })?; let socket_address = SocketAddress::from_str(sa)?; DialInfo::try_ws(socket_address, rest.to_string()) } "wss" => { let (sa, rest) = s.split_once('|').ok_or_else(|| { - parse_error!( - "SocketAddress::from_str missing socket address '|' separator", - s - ) + parse_error!("DialInfo::from_str missing socket address '|' separator", s) })?; let socket_address = SocketAddress::from_str(sa)?; DialInfo::try_wss(socket_address, rest.to_string()) } + _ => Err(parse_error!("DialInfo::from_str has invalid scheme", s)), } } } @@ -518,7 +531,7 @@ impl DialInfo { url )); } - if !Address::from_str(&split_url.host).is_err() { + if Address::from_str(&split_url.host).is_ok() { return Err(parse_error!( "WSS url can not use address format, only hostname format", url @@ -599,22 +612,6 @@ impl DialInfo { PeerScope::Local => self.is_local(), } } - pub fn matches_filter(&self, filter: &DialInfoFilter) -> bool { - if !self.matches_peer_scope(filter.peer_scope) { - return false; - } - if let Some(pt) = filter.protocol_type { - if self.protocol_type() != pt { - return false; - } - } - if let Some(at) = filter.address_type { - if self.address_type() != at { - return false; - } - } - true - } pub fn make_filter(&self, scoped: bool) -> DialInfoFilter { DialInfoFilter { peer_scope: if scoped { @@ -634,6 +631,25 @@ impl DialInfo { } } +impl MatchesDialInfoFilter for DialInfo { + fn matches_filter(&self, filter: &DialInfoFilter) -> bool { + if !self.matches_peer_scope(filter.peer_scope) { + return false; + } + if let Some(pt) = filter.protocol_type { + if self.protocol_type() != pt { + return false; + } + } + if let Some(at) = filter.address_type { + if self.address_type() != at { + return false; + } + } + true + } +} + ////////////////////////////////////////////////////////////////////////// #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)] @@ -709,7 +725,10 @@ impl ConnectionDescriptor { PeerScope::Local => self.remote.socket_address.address().is_local(), } } - pub fn matches_filter(&self, filter: &DialInfoFilter) -> bool { +} + +impl MatchesDialInfoFilter for ConnectionDescriptor { + fn matches_filter(&self, filter: &DialInfoFilter) -> bool { if !self.matches_peer_scope(filter.peer_scope) { return false; } @@ -1093,18 +1112,11 @@ impl VeilidAPI { } // wait for state change - // xxx: this should not use 'sleep', perhaps this function should be eliminated anyway - // xxx: it should really only be used for test anyway, and there is probably a better way to do this regardless - // xxx: that doesn't wait forever and can time out + // xxx: should have optional timeout pub async fn wait_for_state(&self, state: VeilidState) -> Result<(), VeilidAPIError> { - loop { - intf::sleep(500).await; - match state { - VeilidState::Attachment(cs) => { - if self.attachment_manager()?.get_state() == cs { - break; - } - } + match state { + VeilidState::Attachment(cs) => { + self.attachment_manager()?.wait_for_state(cs).await; } } Ok(())