From 922470365ae693b8a25081f9237604e3da713e1e Mon Sep 17 00:00:00 2001 From: John Smith Date: Thu, 23 Dec 2021 20:34:52 -0500 Subject: [PATCH] more refactor --- veilid-core/src/intf/native/network/mod.rs | 156 +++++++++--------- .../network/public_dialinfo_discovery.rs | 3 +- veilid-core/src/network_manager.rs | 19 +++ veilid-core/src/routing_table/debug.rs | 8 +- veilid-core/src/routing_table/find_nodes.rs | 50 ++---- veilid-core/src/routing_table/mod.rs | 140 ++++++---------- veilid-core/src/routing_table/node_ref.rs | 79 +++++---- veilid-core/src/rpc_processor/mod.rs | 46 ++++-- veilid-core/src/veilid_api/mod.rs | 134 +++++++++++++-- 9 files changed, 354 insertions(+), 281 deletions(-) diff --git a/veilid-core/src/intf/native/network/mod.rs b/veilid-core/src/intf/native/network/mod.rs index bccc9ce1..20d2eb13 100644 --- a/veilid-core/src/intf/native/network/mod.rs +++ b/veilid-core/src/intf/native/network/mod.rs @@ -394,13 +394,14 @@ impl Network { ///////////////////////////////////////////////////////////////// + // TCP listener that multiplexes ports so multiple protocols can exist on a single port async fn start_tcp_listener( &self, address: String, is_tls: bool, new_tcp_protocol_handler: Box, - ) -> Result, String> { - let mut out = Vec::<(Address, u16)>::new(); + ) -> Result, String> { + let mut out = Vec::::new(); // convert to socketaddrs let mut sockaddrs = address .to_socket_addrs() @@ -442,7 +443,7 @@ impl Network { // Return local dial infos we listen on for ldi_addr in ldi_addrs { - out.push((Address::from_socket_addr(ldi_addr), ldi_addr.port())); + out.push(SocketAddress::from_socket_addr(ldi_addr)); } } @@ -613,27 +614,6 @@ impl Network { ///////////////////////////////////////////////////////////////// - fn match_socket_addr( - inner: &NetworkInner, - listen_socket_addr: &SocketAddr, - peer_socket_addr: &SocketAddr, - ) -> bool { - let ldi_addrs = Self::translate_unspecified_address(inner, listen_socket_addr); - // xxx POSSIBLE CONCERN (verify this?) - // xxx will need to be reworked to search routing table information if we - // xxx allow systems to be dual homed with multiple interfaces eventually - // xxx to ensure the socket on the appropriate interface is chosen - // xxx this may not be necessary if the kernel automatically picks the right interface - // xxx it may do that. need to verify that though - for local_addr in &ldi_addrs { - if mem::discriminant(local_addr) == mem::discriminant(peer_socket_addr) { - return true; - } - } - - false - } - fn find_best_udp_protocol_handler( &self, peer_socket_addr: &SocketAddr, @@ -785,7 +765,7 @@ impl Network { } pub async fn send_data(&self, node_ref: NodeRef, data: Vec) -> Result<(), String> { - let dial_info = node_ref.dial_info(); + let dial_info = node_ref.best_dial_info(); let descriptor = node_ref.last_connection(); // First try to send data to the last socket we've seen this peer on @@ -880,7 +860,7 @@ impl Network { ) }; trace!("WS: starting listener at {:?}", listen_address); - let addresses = self + let socket_addresses = self .start_tcp_listener( listen_address.clone(), false, @@ -890,33 +870,44 @@ impl Network { trace!("WS: listener started"); let mut dial_infos: Vec = Vec::new(); - for (a, p) in addresses { + for socket_address in socket_addresses { // Pick out WS port for outbound connections (they will all be the same) - self.inner.lock().ws_port = p; -xxx continue here - let di = DialInfo::try_ws(a.address_string(), p, path.clone()); + self.inner.lock().ws_port = socket_address.port(); + // Build local dial info request url + let local_url = format!("ws://{}/{}", socket_address, path); + // Create local dial info + let di = DialInfo::try_ws(socket_address, local_url) + .map_err(map_to_string) + .map_err(logthru_net!(error))?; dial_infos.push(di.clone()); routing_table.register_local_dial_info(di, DialInfoOrigin::Static); } // Add static public dialinfo if it's configured if let Some(url) = url.as_ref() { - let split_url = SplitUrl::from_str(url)?; + let mut split_url = SplitUrl::from_str(url)?; if split_url.scheme.to_ascii_lowercase() != "ws" { return Err("WS URL must use 'ws://' scheme".to_owned()); } - routing_table.register_global_dial_info( - DialInfo::ws( - split_url.host, - split_url.port.unwrap_or(80), - split_url - .path - .map(|p| p.to_string()) - .unwrap_or_else(|| "/".to_string()), - ), - Some(NetworkClass::Server), - DialInfoOrigin::Static, - ); + split_url.scheme = "ws".to_owned(); + + // Resolve static public hostnames + let global_socket_addrs = split_url + .host + .to_socket_addrs() + .await + .map_err(map_to_string) + .map_err(logthru_net!(error))?; + + for gsa in global_socket_addrs { + routing_table.register_global_dial_info( + DialInfo::try_ws(SocketAddress::from_socket_addr(gsa), url.clone()) + .map_err(map_to_string) + .map_err(logthru_net!(error))?, + Some(NetworkClass::Server), + DialInfoOrigin::Static, + ); + } } Ok(()) @@ -932,7 +923,7 @@ xxx continue here ) }; trace!("WSS: starting listener at {}", listen_address); - let addresses = self + let socket_addresses = self .start_tcp_listener( listen_address.clone(), true, @@ -947,33 +938,40 @@ xxx continue here // This is not the case with unencrypted websockets, which can be specified solely by an IP address // // let mut dial_infos: Vec = Vec::new(); - for (_, p) in addresses { - // Pick out WS port for outbound connections (they will all be the same) - self.inner.lock().wss_port = p; + for socket_address in socket_addresses { + // Pick out WSS port for outbound connections (they will all be the same) + self.inner.lock().wss_port = socket_address.port(); - // let di = DialInfo::wss(a.address_string(), p, path.clone()); - // dial_infos.push(di.clone()); - // routing_table.register_local_dial_info(di, DialInfoOrigin::Static); + // Don't register local dial info because TLS won't allow that anyway without a local CA + // and we aren't doing that yet at all today. } // Add static public dialinfo if it's configured if let Some(url) = url.as_ref() { - let split_url = SplitUrl::from_str(url)?; + // Add static public dialinfo if it's configured + let mut split_url = SplitUrl::from_str(url)?; if split_url.scheme.to_ascii_lowercase() != "wss" { return Err("WSS URL must use 'wss://' scheme".to_owned()); } - routing_table.register_global_dial_info( - DialInfo::wss( - split_url.host, - split_url.port.unwrap_or(443), - split_url - .path - .map(|p| p.to_string()) - .unwrap_or_else(|| "/".to_string()), - ), - Some(NetworkClass::Server), - DialInfoOrigin::Static, - ); + split_url.scheme = "wss".to_owned(); + + // Resolve static public hostnames + let global_socket_addrs = split_url + .host + .to_socket_addrs() + .await + .map_err(map_to_string) + .map_err(logthru_net!(error))?; + + for gsa in global_socket_addrs { + routing_table.register_global_dial_info( + DialInfo::try_wss(SocketAddress::from_socket_addr(gsa), url.clone()) + .map_err(map_to_string) + .map_err(logthru_net!(error))?, + Some(NetworkClass::Server), + DialInfoOrigin::Static, + ); + } } else { return Err("WSS URL must be specified due to TLS requirements".to_owned()); } @@ -991,7 +989,7 @@ xxx continue here ) }; trace!("TCP: starting listener at {}", &listen_address); - let addresses = self + let socket_addresses = self .start_tcp_listener( listen_address.clone(), false, @@ -1001,11 +999,11 @@ xxx continue here trace!("TCP: listener started"); let mut dial_infos: Vec = Vec::new(); - for (a, p) in addresses { + for socket_address in socket_addresses { // Pick out TCP port for outbound connections (they will all be the same) - self.inner.lock().tcp_port = p; + self.inner.lock().tcp_port = socket_address.port(); - let di = DialInfo::tcp(a.to_canonical(), p); + let di = DialInfo::tcp(socket_address); dial_infos.push(di.clone()); routing_table.register_local_dial_info(di, DialInfoOrigin::Static); } @@ -1128,10 +1126,10 @@ xxx continue here return inner.network_class; } - // Go through our public dialinfo and see what our best network class is + // Go through our global dialinfo and see what our best network class is let mut network_class = NetworkClass::Invalid; - for x in routing_table.global_dial_info() { - if let Some(nc) = x.network_class { + for did in routing_table.global_dial_info_details() { + if let Some(nc) = did.network_class { if nc < network_class { network_class = nc; } @@ -1172,9 +1170,13 @@ xxx continue here && !udp_static_public_dialinfo && (network_class.inbound_capable() || network_class == NetworkClass::Invalid) { + let filter = DialInfoFilter::with_protocol_type_and_address_type( + ProtocolType::UDP, + AddressType::IPV4, + ); let need_udpv4_dialinfo = routing_table - .global_dial_info_for_protocol_address_type(ProtocolAddressType::UDPv4) - .is_empty(); + .first_filtered_global_dial_info_details(|d| d.dial_info.matches_filter(&filter)) + .is_none(); if need_udpv4_dialinfo { // If we have no public UDPv4 dialinfo, then we need to run a NAT check // ensure the singlefuture is running for this @@ -1186,13 +1188,17 @@ xxx continue here } // Same but for TCPv4 - if protocol_config.tcp_enabled + if protocol_config.tcp_listen && !tcp_static_public_dialinfo && (network_class.inbound_capable() || network_class == NetworkClass::Invalid) { + let filter = DialInfoFilter::with_protocol_type_and_address_type( + ProtocolType::TCP, + AddressType::IPV4, + ); let need_tcpv4_dialinfo = routing_table - .global_dial_info_for_protocol_address_type(ProtocolAddressType::TCPv4) - .is_empty(); + .first_filtered_global_dial_info_details(|d| d.dial_info.matches_filter(&filter)) + .is_none(); if need_tcpv4_dialinfo { // If we have no public TCPv4 dialinfo, then we need to run a NAT check // ensure the singlefuture is running for this diff --git a/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs b/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs index d9de185b..c78e8473 100644 --- a/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs +++ b/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs @@ -9,7 +9,7 @@ use async_std::net::*; impl Network { // Ask for a public address check from a particular noderef - async fn request_public_address(&self, node_ref: NodeRef) -> Option { + async fn request_public_address(&self, node_ref: NodeRef) -> Option { let routing_table = self.routing_table(); let rpc = routing_table.rpc_processor(); rpc.rpc_call_info(node_ref.clone()) @@ -22,6 +22,7 @@ impl Network { .unwrap_or(None) } + xxx convert to filter // find fast peers with a particular address type, and ask them to tell us what our external address is async fn discover_external_address( &self, diff --git a/veilid-core/src/network_manager.rs b/veilid-core/src/network_manager.rs index a488fcf7..1d52e919 100644 --- a/veilid-core/src/network_manager.rs +++ b/veilid-core/src/network_manager.rs @@ -53,6 +53,25 @@ pub struct ProtocolConfig { pub wss_listen: bool, } +impl ProtocolConfig { + pub fn is_protocol_type_connect_enabled(&self, protocol_type: ProtocolType) -> bool { + match protocol_type { + ProtocolType::UDP => self.udp_enabled, + ProtocolType::TCP => self.tcp_connect, + ProtocolType::WS => self.ws_connect, + ProtocolType::WSS => self.wss_connect, + } + } + pub fn is_protocol_type_listen_enabled(&self, protocol_type: ProtocolType) -> bool { + match protocol_type { + ProtocolType::UDP => self.udp_enabled, + ProtocolType::TCP => self.tcp_listen, + ProtocolType::WS => self.ws_listen, + ProtocolType::WSS => self.wss_listen, + } + } +} + // Things we get when we start up and go away when we shut down // Routing table is not in here because we want it to survive a network shutdown/startup restart #[derive(Clone)] diff --git a/veilid-core/src/routing_table/debug.rs b/veilid-core/src/routing_table/debug.rs index 041a76ed..62e8a4dd 100644 --- a/veilid-core/src/routing_table/debug.rs +++ b/veilid-core/src/routing_table/debug.rs @@ -13,15 +13,15 @@ impl RoutingTable { out } pub fn debug_info_dialinfo(&self) -> String { - let ldis = self.local_dial_info(); - let gdis = self.global_dial_info(); + let ldis = self.local_dial_info_details(); + let gdis = self.global_dial_info_details(); let mut out = String::new(); - out += "Local Dial Info:\n"; + out += "Local Dial Info Details:\n"; for (n, ldi) in ldis.iter().enumerate() { out += &format!(" {:>2}: {:?}\n", n, ldi); } - out += "Global Dial Info:\n"; + out += "Global Dial Info Details:\n"; for (n, gdi) in gdis.iter().enumerate() { out += &format!(" {:>2}: {:?}\n", n, gdi); } diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index 34626cfc..7ffa4865 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -10,42 +10,18 @@ pub type FilterType = Box)) -> bool>; impl RoutingTable { // Retrieve the fastest nodes in the routing table with a particular kind of protocol address type // Returns noderefs are are scoped to that address type only - pub fn get_fast_nodes_of_type( - &self, - protocol_address_type: ProtocolAddressType, - ) -> Vec { + pub fn get_fast_nodes_filtered(&self, dial_info_filter: &DialInfoFilter) -> Vec { + let dial_info_filter = dial_info_filter.clone(); self.find_fastest_nodes( // filter Some(Box::new( move |params: &(&DHTKey, Option<&mut BucketEntry>)| { - // Only interested in nodes with node info - if let Some(node_info) = ¶ms.1.as_ref().unwrap().peer_stats().node_info { - // Will the node validate dial info? - // and does it have a UDPv4, public scope, dial info? - if node_info.will_validate_dial_info - && params - .1 - .as_ref() - .unwrap() - .dial_info_entries_as_ref() - .iter() - .find_map(|die| { - if die.matches_peer_scope(PeerScope::Global) - && die.dial_info().protocol_address_type() - == protocol_address_type - { - Some(()) - } else { - None - } - }) - .is_some() - { - // If so return true and include this node - return true; - } - } - false + params + .1 + .as_ref() + .unwrap() + .first_filtered_dial_info(|di| di.matches_filter(&dial_info_filter)) + .is_some() }, )), // transform @@ -54,7 +30,7 @@ impl RoutingTable { self.clone(), *e.0, e.1.as_mut().unwrap(), - protocol_address_type, + dial_info_filter.clone(), ) }, ) @@ -63,13 +39,13 @@ impl RoutingTable { pub fn get_own_peer_info(&self, scope: PeerScope) -> PeerInfo { let dial_infos = match scope { PeerScope::All => { - let mut divec = self.global_dial_info(); - divec.append(&mut self.local_dial_info()); + let mut divec = self.global_dial_info_details(); + divec.append(&mut self.local_dial_info_details()); divec.dedup(); divec } - PeerScope::Global => self.global_dial_info(), - PeerScope::Local => self.local_dial_info(), + PeerScope::Global => self.global_dial_info_details(), + PeerScope::Local => self.local_dial_info_details(), }; PeerInfo { diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index de7b64f7..0bd5aa00 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -1,7 +1,6 @@ mod bucket; mod bucket_entry; mod debug; -mod dial_info_entry; mod find_nodes; mod node_ref; mod stats_accounting; @@ -12,12 +11,10 @@ use crate::network_manager::*; use crate::rpc_processor::*; use crate::xx::*; use crate::*; -use alloc::collections::VecDeque; use alloc::str::FromStr; use bucket::*; pub use bucket_entry::*; pub use debug::*; -pub use dial_info_entry::*; pub use find_nodes::*; use futures_util::stream::{FuturesUnordered, StreamExt}; pub use node_ref::*; @@ -157,42 +154,35 @@ impl RoutingTable { !inner.local_dial_info.is_empty() } - pub fn local_dial_info(&self) -> Vec { + pub fn local_dial_info_details(&self) -> Vec { let inner = self.inner.lock(); inner.local_dial_info.clone() } - pub fn local_dial_info_for_protocol(&self, protocol_type: ProtocolType) -> Vec { + pub fn first_filtered_local_dial_info_details(&self, filter: F) -> Option + where + F: Fn(&DialInfoDetail) -> bool, + { let inner = self.inner.lock(); - inner - .local_dial_info - .iter() - .filter_map(|di| { - if di.dial_info.protocol_type() != protocol_type { - None - } else { - Some(di.clone()) - } - }) - .collect() + for did in &inner.local_dial_info { + if filter(did) { + return Some(did.clone()); + } + } + None } - - pub fn local_dial_info_for_protocol_address_type( - &self, - protocol_address_type: ProtocolAddressType, - ) -> Vec { + pub fn all_filtered_local_dial_info_details(&self, filter: F) -> Vec + where + F: Fn(&DialInfoDetail) -> bool, + { let inner = self.inner.lock(); - inner - .local_dial_info - .iter() - .filter_map(|di| { - if di.dial_info.protocol_address_type() != protocol_address_type { - None - } else { - Some(di.clone()) - } - }) - .collect() + let ret = Vec::new(); + for did in &inner.local_dial_info { + if filter(did) { + ret.push(did.clone()); + } + } + ret } pub fn register_local_dial_info(&self, dial_info: DialInfo, origin: DialInfoOrigin) { @@ -230,44 +220,35 @@ impl RoutingTable { !inner.global_dial_info.is_empty() } - pub fn global_dial_info(&self) -> Vec { + pub fn global_dial_info_details(&self) -> Vec { let inner = self.inner.lock(); inner.global_dial_info.clone() } - pub fn global_dial_info_for_protocol( - &self, - protocol_type: ProtocolType, - ) -> Vec { + pub fn first_filtered_global_dial_info_details(&self, filter: F) -> Option + where + F: Fn(&DialInfoDetail) -> bool, + { let inner = self.inner.lock(); - inner - .global_dial_info - .iter() - .filter_map(|di| { - if di.dial_info.protocol_type() != protocol_type { - None - } else { - Some(di.clone()) - } - }) - .collect() + for did in &inner.global_dial_info { + if filter(did) { + return Some(did.clone()); + } + } + None } - pub fn global_dial_info_for_protocol_address_type( - &self, - protocol_address_type: ProtocolAddressType, - ) -> Vec { + pub fn all_filtered_global_dial_info_details(&self, filter: F) -> Vec + where + F: Fn(&DialInfoDetail) -> bool, + { let inner = self.inner.lock(); - inner - .global_dial_info - .iter() - .filter_map(|di| { - if di.dial_info.protocol_address_type() != protocol_address_type { - None - } else { - Some(di.clone()) - } - }) - .collect() + let ret = Vec::new(); + for did in &inner.global_dial_info { + if filter(did) { + ret.push(did.clone()); + } + } + ret } pub fn register_global_dial_info( @@ -287,7 +268,7 @@ impl RoutingTable { }); info!( - "Public Dial Info: {}", + "Global Dial Info: {}", NodeDialInfoSingle { node_id: NodeId::new(inner.node_id), dial_info @@ -356,34 +337,6 @@ impl RoutingTable { *self.inner.lock() = Self::new_inner(self.network_manager()); } - // Just match address and port to help sort dialinfoentries for buckets - // because inbound connections will not have dialinfo associated with them - // but should have ip addresses if they have changed - fn dial_info_peer_address_match(dial_info: &DialInfo, peer_addr: &PeerAddress) -> bool { - match dial_info { - DialInfo::UDP(_) => { - peer_addr.protocol_type == ProtocolType::UDP - && peer_addr.port == dial_info.port() - && peer_addr.address.address_string() == dial_info.address_string() - } - DialInfo::TCP(_) => { - peer_addr.protocol_type == ProtocolType::TCP - && peer_addr.port == dial_info.port() - && peer_addr.address.address_string() == dial_info.address_string() - } - DialInfo::WS(_) => { - peer_addr.protocol_type == ProtocolType::WS - && peer_addr.port == dial_info.port() - && peer_addr.address.address_string() == dial_info.address_string() - } - DialInfo::WSS(_) => { - peer_addr.protocol_type == ProtocolType::WSS - && peer_addr.port == dial_info.port() - && peer_addr.address.address_string() == dial_info.address_string() - } - } - } - // Attempt to settle buckets and remove entries down to the desired number // which may not be possible due extant NodeRefs fn kick_bucket(inner: &mut RoutingTableInner, idx: usize) { @@ -482,7 +435,7 @@ impl RoutingTable { ) -> Result { let nr = self.create_node_ref(node_id)?; nr.operate(move |e| -> Result<(), String> { - e.update_dial_info(dial_infos); + e.update_dial_infos(dial_infos); Ok(()) })?; @@ -604,6 +557,7 @@ impl RoutingTable { let mut bsmap: BTreeMap> = BTreeMap::new(); for b in bootstrap { let ndis = NodeDialInfoSingle::from_str(b.as_str()) + .map_err(map_to_string) .map_err(logthru_rtab!("Invalid dial info in bootstrap entry: {}", b))?; let node_id = ndis.node_id.key; bsmap diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index c7c670f0..7bea1c13 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -5,9 +5,7 @@ use alloc::fmt; pub struct NodeRef { routing_table: RoutingTable, node_id: DHTKey, - // Filters - protocol_type: Option, - address_type: Option, + dial_info_filter: DialInfoFilter, } impl NodeRef { @@ -16,23 +14,20 @@ impl NodeRef { Self { routing_table, node_id: key, - protocol_type: None, - address_type: None, + dial_info_filter: DialInfoFilter::default(), } } pub fn new_filtered( routing_table: RoutingTable, key: DHTKey, entry: &mut BucketEntry, - protocol_type: Option, - address_type: Option, + dial_info_filter: DialInfoFilter, ) -> Self { entry.ref_count += 1; Self { routing_table, node_id: key, - protocol_type, - address_type, + dial_info_filter, } } @@ -40,20 +35,8 @@ impl NodeRef { self.node_id } - pub fn protocol_type(&self) -> Option { - self.protocol_type - } - - pub fn set_protocol_type(&mut self, protocol_type: Option) { - self.protocol_type = protocol_type; - } - - pub fn address_type(&self) -> Option { - self.address_type - } - - pub fn set_address_type(&mut self, address_type: Option) { - self.address_type = address_type; + pub fn dial_info_filter(&self) -> DialInfoFilter { + self.dial_info_filter.clone() } pub fn operate(&self, f: F) -> T @@ -63,28 +46,39 @@ impl NodeRef { self.routing_table.operate_on_bucket_entry(self.node_id, f) } - xxx fix the notion of 'best dial info' to sort by capability and udp/tcp/ws/wss preference order - pub fn dial_info(&self) -> Option { - if self.protocol_type || self. { - None => self.operate(|e| e.best_dial_info()), - Some(pat) => self.operate(|e| { - e.filtered_dial_info(|die| die.dial_info().protocol_address_type() == pat) - }), + // Returns the best dial info to attempt a connection to this node + pub fn best_dial_info(&self) -> Option { + let nm = self.routing_table.network_manager(); + let protocol_config = nm.get_protocol_config(); + if protocol_config.is_none() { + return None; } + let protocol_config = protocol_config.unwrap(); + + self.operate(|e| { + e.first_filtered_dial_info(|di| { + // Does it match the dial info filter + if !di.matches_filter(&self.dial_info_filter) { + return false; + } + // Filter out dial infos that don't match our protocol config + // for outbound connections. This routine filters on 'connect' settings + // to ensure we connect using only the protocols we have enabled. + protocol_config.is_protocol_type_connect_enabled(di.protocol_type()) + }) + }) } pub fn last_connection(&self) -> Option { match self.operate(|e| e.last_connection()) { None => None, Some(c) => { - if let Some(protocol_address_type) = self.protocol_address_type { - if c.remote.protocol_address_type() == protocol_address_type { - Some(c) - } else { - None - } - } else { - Some(c) + if !c.matches_filter(&self.dial_info_filter) { + return None; } + // We don't filter this out by protocol config because if a connection + // succeeded, it's allowed to persist and be used for communication + // regardless of any other configuration + Some(c) } } } @@ -98,17 +92,18 @@ impl Clone for NodeRef { Self { routing_table: self.routing_table.clone(), node_id: self.node_id, - protocol_address_type: self.protocol_address_type, + dial_info_filter: self.dial_info_filter.clone(), } } } impl fmt::Debug for NodeRef { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self.protocol_address_type { - None => write!(f, "{}", self.node_id.encode()), - Some(pat) => write!(f, "{}#{:?}", self.node_id.encode(), pat), + let out = format!("{}", self.node_id.encode()); + if !self.dial_info_filter.is_empty() { + out += &format!("{:?}", self.dial_info_filter); } + write!(f, "{}", out) } } diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 8786c147..97176cbd 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -219,7 +219,7 @@ impl RPCProcessor { if let Some(nr) = routing_table.lookup_node_ref(node_id) { // ensure we have dial_info for the entry already, // if not, we should do the find_node anyway - if nr.operate(|e| e.best_dial_info().is_some()) { + if !nr.operate(|e| e.dial_infos().is_empty()) { return Ok(nr); } } @@ -712,16 +712,20 @@ impl RPCProcessor { fn can_validate_dial_info(&self) -> bool { let nman = self.network_manager(); - match nman.get_network_class() { - NetworkClass::Server => true, - NetworkClass::Mapped => true, - NetworkClass::FullNAT => true, - NetworkClass::AddressRestrictedNAT => false, - NetworkClass::PortRestrictedNAT => false, - NetworkClass::OutboundOnly => false, - NetworkClass::WebApp => false, - NetworkClass::TorWebApp => false, - NetworkClass::Invalid => false, + if let Some(nc) = nman.get_network_class() { + match nc { + NetworkClass::Server => true, + NetworkClass::Mapped => true, + NetworkClass::FullNAT => true, + NetworkClass::AddressRestrictedNAT => false, + NetworkClass::PortRestrictedNAT => false, + NetworkClass::OutboundOnly => false, + NetworkClass::WebApp => false, + NetworkClass::TorWebApp => false, + NetworkClass::Invalid => false, + } + } else { + false } } @@ -779,7 +783,7 @@ impl RPCProcessor { .peer_noderef .operate(|entry| match entry.last_connection() { None => None, - Some(c) => Some(c.remote.to_socket_addr()), + Some(c) => Some(c.remote.socket_address), }); SenderInfo { socket_address } } @@ -862,16 +866,26 @@ impl RPCProcessor { // Redirect this request if we are asked to if redirect { let routing_table = self.routing_table(); - let protocol_address_type = dial_info.protocol_address_type(); - let peers = routing_table.get_fast_nodes_of_type(protocol_address_type); + let filter = dial_info.make_filter(true); + let peers = routing_table.get_fast_nodes_filtered(&filter); if peers.is_empty() { return Err(rpc_error_internal(format!( - "no peers of type '{:?}'", - protocol_address_type + "no peers matching filter '{:?}'", + filter ))); } for peer in peers { + // See if this peer will validate dial info + if !peer.operate(|e| { + if let Some(ni) = &e.peer_stats().node_info { + ni.will_validate_dial_info + } else { + true + } + }) { + continue; + } // Make a copy of the request, without the redirect flag let vdi_msg_reader = { let mut vdi_msg = ::capnp::message::Builder::new_default(); diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 152b5726..533967a7 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -311,24 +311,71 @@ impl SocketAddress { impl fmt::Display for SocketAddress { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - write!(f, "{}:{}", self.to_ip_addr(), self.port) + write!(f, "{}", self.to_socket_addr()) } } impl FromStr for SocketAddress { type Err = VeilidAPIError; fn from_str(s: &str) -> Result { - let split = s.rsplit_once(':').ok_or_else(|| { - parse_error!("SocketAddress::from_str missing colon port separator", s) - })?; - let address = Address::from_str(split.0)?; - let port = u16::from_str(split.1).map_err(|e| { - parse_error!( - format!("SocketAddress::from_str failed parting port: {}", e), - s - ) - })?; - Ok(SocketAddress { address, port }) + let sa = SocketAddr::from_str(s) + .map_err(|e| parse_error!("Failed to parse SocketAddress", e))?; + Ok(SocketAddress::from_socket_addr(sa)) + } +} + +////////////////////////////////////////////////////////////////// + +#[derive(Clone, Default, PartialEq, Eq, PartialOrd, Ord)] +pub struct DialInfoFilter { + pub peer_scope: PeerScope, + pub protocol_type: Option, + pub address_type: Option, +} + +impl DialInfoFilter { + pub fn new_empty() -> Self { + Self { + peer_scope: PeerScope::All, + protocol_type: None, + address_type: None, + } + } + pub fn with_protocol_type(protocol_type: ProtocolType) -> Self { + Self { + peer_scope: PeerScope::All, + protocol_type: Some(protocol_type), + address_type: None, + } + } + pub fn with_protocol_type_and_address_type( + protocol_type: ProtocolType, + address_type: AddressType, + ) -> Self { + Self { + peer_scope: PeerScope::All, + protocol_type: Some(protocol_type), + address_type: Some(address_type), + } + } + pub fn is_empty(&self) -> bool { + self.peer_scope == PeerScope::All + && self.protocol_type.is_none() + && self.address_type.is_none() + } +} + +impl fmt::Debug for DialInfoFilter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + let mut out = String::new(); + out += &format!("{:?}", self.peer_scope); + if let Some(pt) = self.protocol_type { + out += &format!("+{:?}", pt); + } + if let Some(at) = self.address_type { + out += &format!("+{:?}", at); + } + write!(f, "[{}]", out) } } @@ -552,16 +599,54 @@ impl DialInfo { PeerScope::Local => self.is_local(), } } + pub fn matches_filter(&self, filter: &DialInfoFilter) -> bool { + if !self.matches_peer_scope(filter.peer_scope) { + return false; + } + if let Some(pt) = filter.protocol_type { + if self.protocol_type() != pt { + return false; + } + } + if let Some(at) = filter.address_type { + if self.address_type() != at { + return false; + } + } + true + } + pub fn make_filter(&self, scoped: bool) -> DialInfoFilter { + DialInfoFilter { + peer_scope: if scoped { + if self.is_global() { + PeerScope::Global + } else if self.is_local() { + PeerScope::Local + } else { + PeerScope::All + } + } else { + PeerScope::All + }, + protocol_type: Some(self.protocol_type()), + address_type: Some(self.address_type()), + } + } } ////////////////////////////////////////////////////////////////////////// -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)] pub enum PeerScope { All, Global, Local, } +impl Default for PeerScope { + fn default() -> Self { + PeerScope::All + } +} #[derive(Clone, Debug, Default)] pub struct PeerInfo { @@ -617,6 +702,29 @@ impl ConnectionDescriptor { pub fn address_type(&self) -> AddressType { self.remote.address_type() } + pub fn matches_peer_scope(&self, scope: PeerScope) -> bool { + match scope { + PeerScope::All => true, + PeerScope::Global => self.remote.socket_address.address().is_global(), + PeerScope::Local => self.remote.socket_address.address().is_local(), + } + } + pub fn matches_filter(&self, filter: &DialInfoFilter) -> bool { + if !self.matches_peer_scope(filter.peer_scope) { + return false; + } + if let Some(pt) = filter.protocol_type { + if self.protocol_type() != pt { + return false; + } + } + if let Some(at) = filter.address_type { + if self.address_type() != at { + return false; + } + } + true + } } //////////////////////////////////////////////////////////////////////////