diff --git a/veilid-core/src/intf/native/network/mod.rs b/veilid-core/src/intf/native/network/mod.rs index ac7c2af3..97059d32 100644 --- a/veilid-core/src/intf/native/network/mod.rs +++ b/veilid-core/src/intf/native/network/mod.rs @@ -487,11 +487,11 @@ impl Network { ////////////////////////////////////////// pub fn get_network_class(&self) -> Option { let inner = self.inner.lock(); - return inner.network_class; + inner.network_class } pub fn reset_network_class(&self) { - let inner = self.inner.lock(); + let mut inner = self.inner.lock(); inner.network_class = None; } diff --git a/veilid-core/src/intf/native/network/network_class_discovery.rs b/veilid-core/src/intf/native/network/network_class_discovery.rs index 2389f178..ff14b486 100644 --- a/veilid-core/src/intf/native/network/network_class_discovery.rs +++ b/veilid-core/src/intf/native/network/network_class_discovery.rs @@ -16,7 +16,7 @@ struct DiscoveryContextInner { node_b: Option, } -struct DiscoveryContext { +pub struct DiscoveryContext { routing_table: RoutingTable, net: Network, inner: Arc>, @@ -43,10 +43,10 @@ impl DiscoveryContext { /////// // Utilities -xxxx continue converting to async safe inner + // Pick the best network class we have seen so far pub fn upgrade_network_class(&self, network_class: NetworkClass) { - let inner = self.inner.lock(); + let mut inner = self.inner.lock(); if let Some(old_nc) = inner.network_class { if network_class < old_nc { @@ -137,7 +137,7 @@ xxxx continue converting to async safe inner .unwrap_or(false) } - async fn try_port_mapping(&mut self) -> Option { + async fn try_port_mapping(&self) -> Option { //xxx None } @@ -161,25 +161,30 @@ xxxx continue converting to async safe inner /////// // Per-protocol discovery routines - pub fn protocol_begin(&mut self, protocol_type: ProtocolType, address_type: AddressType) { + pub fn protocol_begin(&self, protocol_type: ProtocolType, address_type: AddressType) { // Get our interface addresses - self.intf_addrs = Some(self.get_local_addresses(protocol_type, address_type)); - self.protocol_type = Some(protocol_type); - self.address_type = Some(address_type); - self.low_level_protocol_type = Some(match protocol_type { + let intf_addrs = self.get_local_addresses(protocol_type, address_type); + + let mut inner = self.inner.lock(); + inner.intf_addrs = Some(intf_addrs); + inner.protocol_type = Some(protocol_type); + inner.address_type = Some(address_type); + inner.low_level_protocol_type = Some(match protocol_type { ProtocolType::UDP => ProtocolType::UDP, ProtocolType::TCP => ProtocolType::TCP, ProtocolType::WS => ProtocolType::TCP, ProtocolType::WSS => ProtocolType::TCP, }); - self.external1_dial_info = None; - self.external1 = None; - self.node_b = None; + inner.external1_dial_info = None; + inner.external1 = None; + inner.node_b = None; } - pub async fn protocol_get_external_address_1(&mut self) -> bool { - let protocol_type = self.protocol_type.unwrap(); - let address_type = self.address_type.unwrap(); + pub async fn protocol_get_external_address_1(&self) -> bool { + let (protocol_type, address_type) = { + let inner = self.inner.lock(); + (inner.protocol_type.unwrap(), inner.address_type.unwrap()) + }; // Get our external address from some fast node, call it node B let (external1, node_b) = match self @@ -194,20 +199,22 @@ xxxx continue converting to async safe inner }; let external1_dial_info = self.make_dial_info(external1, protocol_type); - self.external1_dial_info = Some(external1_dial_info); - self.external1 = Some(external1); - self.node_b = Some(node_b); + let mut inner = self.inner.lock(); + inner.external1_dial_info = Some(external1_dial_info); + inner.external1 = Some(external1); + inner.node_b = Some(node_b); true } - pub async fn protocol_process_no_nat(&mut self) { - let node_b = self.node_b.as_ref().unwrap().clone(); - let external1_dial_info = self.external1_dial_info.as_ref().unwrap().clone(); - let external1 = self.external1.unwrap(); - let protocol_type = self.protocol_type.unwrap(); - let address_type = self.address_type.unwrap(); - let intf_addrs = self.intf_addrs.as_ref().unwrap(); + pub async fn protocol_process_no_nat(&self) { + let (node_b, external1_dial_info) = { + let inner = self.inner.lock(); + ( + inner.node_b.as_ref().unwrap().clone(), + inner.external1_dial_info.as_ref().unwrap().clone(), + ) + }; // Do a validate_dial_info on the external address from a redirected node if self @@ -240,13 +247,17 @@ xxxx continue converting to async safe inner self.upgrade_network_class(NetworkClass::InboundCapable); } - pub async fn protocol_process_nat(&mut self) -> bool { - let node_b = self.node_b.as_ref().unwrap().clone(); - let external1_dial_info = self.external1_dial_info.as_ref().unwrap().clone(); - let external1 = self.external1.unwrap(); - let protocol_type = self.protocol_type.unwrap(); - let address_type = self.address_type.unwrap(); - let intf_addrs = self.intf_addrs.as_ref().unwrap(); + pub async fn protocol_process_nat(&self) -> bool { + let (node_b, external1_dial_info, external1, protocol_type, address_type) = { + let inner = self.inner.lock(); + ( + inner.node_b.as_ref().unwrap().clone(), + inner.external1_dial_info.as_ref().unwrap().clone(), + inner.external1.unwrap(), + inner.protocol_type.unwrap(), + inner.address_type.unwrap(), + ) + }; // Attempt a UDP port mapping via all available and enabled mechanisms if let Some(external_mapped_dial_info) = self.try_port_mapping().await { @@ -337,7 +348,7 @@ xxxx continue converting to async safe inner impl Network { pub async fn update_ipv4_protocol_dialinfo( &self, - context: &mut DiscoveryContext, + context: &DiscoveryContext, protocol_type: ProtocolType, ) -> Result<(), String> { let mut retry_count = { @@ -357,17 +368,22 @@ impl Network { } // If our local interface list contains external1 then there is no NAT in place - if context - .intf_addrs - .as_ref() - .unwrap() - .contains(&context.external1.as_ref().unwrap()) { - // No NAT - context.protocol_process_no_nat().await; + let res = { + let inner = context.inner.lock(); + inner + .intf_addrs + .as_ref() + .unwrap() + .contains(inner.external1.as_ref().unwrap()) + }; + if res { + // No NAT + context.protocol_process_no_nat().await; - // No more retries - break; + // No more retries + break; + } } // There is -some NAT- @@ -388,7 +404,7 @@ impl Network { pub async fn update_ipv6_protocol_dialinfo( &self, - context: &mut DiscoveryContext, + context: &DiscoveryContext, protocol_type: ProtocolType, ) -> Result<(), String> { // Start doing ipv6 protocol @@ -401,18 +417,21 @@ impl Network { } // If our local interface list doesn't contain external1 then there is an Ipv6 NAT in place - if !context - .intf_addrs - .as_ref() - .unwrap() - .contains(&context.external1.as_ref().unwrap()) { - // IPv6 NAT is not supported today - log_net!(warn - "IPv6 NAT is not supported for external address: {}", - context.external1.unwrap() - ); - return Ok(()); + let inner = context.inner.lock(); + if !inner + .intf_addrs + .as_ref() + .unwrap() + .contains(inner.external1.as_ref().unwrap()) + { + // IPv6 NAT is not supported today + log_net!(warn + "IPv6 NAT is not supported for external address: {}", + inner.external1.unwrap() + ); + return Ok(()); + } } // No NAT @@ -424,37 +443,32 @@ impl Network { pub async fn update_network_class_task_routine(self, _l: u64, _t: u64) -> Result<(), String> { log_net!("updating network class"); - let protocol_config = self - .inner - .lock() - .protocol_config - .clone() - .unwrap_or_default(); + let protocol_config = self.inner.lock().protocol_config.unwrap_or_default(); let context = DiscoveryContext::new(self.routing_table(), self.clone()); if protocol_config.inbound.contains(ProtocolType::UDP) { - self.update_ipv4_protocol_dialinfo(&mut context, ProtocolType::UDP) + self.update_ipv4_protocol_dialinfo(&context, ProtocolType::UDP) .await?; - self.update_ipv6_protocol_dialinfo(&mut context, ProtocolType::UDP) + self.update_ipv6_protocol_dialinfo(&context, ProtocolType::UDP) .await?; } if protocol_config.inbound.contains(ProtocolType::TCP) { - self.update_ipv4_protocol_dialinfo(&mut context, ProtocolType::TCP) + self.update_ipv4_protocol_dialinfo(&context, ProtocolType::TCP) .await?; - self.update_ipv6_protocol_dialinfo(&mut context, ProtocolType::TCP) + self.update_ipv6_protocol_dialinfo(&context, ProtocolType::TCP) .await?; } if protocol_config.inbound.contains(ProtocolType::WS) { - self.update_ipv4_protocol_dialinfo(&mut context, ProtocolType::WS) + self.update_ipv4_protocol_dialinfo(&context, ProtocolType::WS) .await?; - self.update_ipv6_protocol_dialinfo(&mut context, ProtocolType::WS) + self.update_ipv6_protocol_dialinfo(&context, ProtocolType::WS) .await?; } - self.inner.lock().network_class = context.network_class; + self.inner.lock().network_class = context.inner.lock().network_class; Ok(()) } diff --git a/veilid-core/src/intf/native/network/start_protocols.rs b/veilid-core/src/intf/native/network/start_protocols.rs index bc31464b..4e5c6536 100644 --- a/veilid-core/src/intf/native/network/start_protocols.rs +++ b/veilid-core/src/intf/native/network/start_protocols.rs @@ -331,17 +331,16 @@ impl Network { DialInfoClass::Direct, ); - // See if this public address is also a local interface address - if !local_dial_info_list.contains(&pdi) - && self.with_interface_addresses(|ip_addrs| { - for ip_addr in ip_addrs { - if pdi_addr.ip() == *ip_addr { - return true; - } + // See if this public address is also a local interface address we haven't registered yet + let is_interface_address = self.with_interface_addresses(|ip_addrs| { + for ip_addr in ip_addrs { + if pdi_addr.ip() == *ip_addr { + return true; } - false - }) - { + } + false + }); + if !local_dial_info_list.contains(&pdi) && is_interface_address { routing_table.register_dial_info( RoutingDomain::LocalNetwork, DialInfo::udp_from_socketaddr(pdi_addr), @@ -432,16 +431,15 @@ impl Network { static_public = true; // See if this public address is also a local interface address - if !registered_addresses.contains(&gsa.ip()) - && self.with_interface_addresses(|ip_addrs| { - for ip_addr in ip_addrs { - if gsa.ip() == *ip_addr { - return true; - } + let is_interface_address = self.with_interface_addresses(|ip_addrs| { + for ip_addr in ip_addrs { + if gsa.ip() == *ip_addr { + return true; } - false - }) - { + } + false + }); + if !registered_addresses.contains(&gsa.ip()) && is_interface_address { routing_table.register_dial_info( RoutingDomain::LocalNetwork, pdi, @@ -496,12 +494,11 @@ impl Network { trace!("starting wss listeners"); let routing_table = self.routing_table(); - let (listen_address, url, enable_local_peer_scope) = { + let (listen_address, url) = { let c = self.config.get(); ( c.network.protocol.wss.listen_address.clone(), c.network.protocol.wss.url.clone(), - c.network.enable_local_peer_scope, ) }; @@ -566,16 +563,15 @@ impl Network { static_public = true; // See if this public address is also a local interface address - if !registered_addresses.contains(&gsa.ip()) - && self.with_interface_addresses(|ip_addrs| { - for ip_addr in ip_addrs { - if gsa.ip() == *ip_addr { - return true; - } + let is_interface_address = self.with_interface_addresses(|ip_addrs| { + for ip_addr in ip_addrs { + if gsa.ip() == *ip_addr { + return true; } - false - }) - { + } + false + }); + if !registered_addresses.contains(&gsa.ip()) && is_interface_address { routing_table.register_dial_info( RoutingDomain::LocalNetwork, pdi, @@ -683,22 +679,21 @@ impl Network { static_public = true; // See if this public address is also a local interface address - if self.with_interface_addresses(|ip_addrs| { + let is_interface_address = self.with_interface_addresses(|ip_addrs| { for ip_addr in ip_addrs { if pdi_addr.ip() == *ip_addr { return true; } } false - }) { + }); + if is_interface_address { routing_table.register_dial_info( RoutingDomain::LocalNetwork, pdi, DialInfoClass::Direct, ); } - - static_public = true; } } diff --git a/veilid-core/src/network_manager.rs b/veilid-core/src/network_manager.rs index dadf1692..ca7f1828 100644 --- a/veilid-core/src/network_manager.rs +++ b/veilid-core/src/network_manager.rs @@ -599,7 +599,7 @@ impl NetworkManager { if envelope_node_id != via_node_id { return Ok(SendDataKind::GlobalIndirect); } - return Ok(send_data_kind); + Ok(send_data_kind) } // Called by the RPC handler when we want to issue an direct receipt @@ -626,7 +626,7 @@ impl NetworkManager { } // Figure out how to reach a node - fn get_contact_method(&self, target_node_ref: NodeRef) -> Result { + fn get_contact_method(&self, mut target_node_ref: NodeRef) -> Result { let routing_table = self.routing_table(); // Get our network class and protocol config @@ -639,105 +639,94 @@ impl NetworkManager { } // Get the best matching local direct dial info if we have it - let opt_local_did = + let opt_target_local_did = target_node_ref.first_filtered_dial_info_detail(Some(RoutingDomain::LocalNetwork)); - if let Some(local_did) = opt_local_did { - return Ok(ContactMethod::Direct(local_did.dial_info)); + if let Some(target_local_did) = opt_target_local_did { + return Ok(ContactMethod::Direct(target_local_did.dial_info)); } // Get the best match internet dial info if we have it - let opt_public_did = + let opt_target_public_did = target_node_ref.first_filtered_dial_info_detail(Some(RoutingDomain::PublicInternet)); - - // Can the target node do inbound? - let target_network_class = target_node_ref.network_class(); - //if matches!(target_network_class, NetworkClass::InboundCapable) { - if let Some(public_did) = opt_public_did { + if let Some(target_public_did) = opt_target_public_did { // Do we need to signal before going inbound? - if public_did.class.requires_signal() { - // Get the target's inbound relay, it must have one or it is not reachable - if let Some(inbound_relay_nr) = target_node_ref.relay() { - // Can we reach the inbound relay? - if inbound_relay_nr - .first_filtered_dial_info_detail(Some(RoutingDomain::PublicInternet)) - .is_some() - { - // Can we receive anything inbound ever? - if matches!(our_network_class, NetworkClass::InboundCapable) { - // Get the best match dial info for an reverse inbound connection - let reverse_dif = DialInfoFilter::global() - .with_protocol_set(target_node_ref.outbound_protocols()); - if let Some(reverse_did) = routing_table - .first_filtered_dial_info_detail( - RoutingDomain::PublicInternet, - &reverse_dif, - ) - { - // Can we receive a direct reverse connection? - if !reverse_did.class.requires_signal() { - return Ok(ContactMethod::SignalReverse( - inbound_relay_nr, - target_node_ref, - )); - } - } - - // Does we and the target have outbound protocols to hole-punch? - if our_protocol_config.outbound.contains(ProtocolType::UDP) - && target_node_ref - .outbound_protocols() - .contains(ProtocolType::UDP) - { - // Do the target and self nodes have a direct udp dialinfo - let udp_dif = - DialInfoFilter::global().with_protocol_type(ProtocolType::UDP); - let udp_target_nr = target_node_ref.clone(); - udp_target_nr - .filter_protocols(ProtocolSet::only(ProtocolType::UDP)); - let target_has_udp_dialinfo = target_node_ref - .first_filtered_dial_info_detail(Some( - RoutingDomain::PublicInternet, - )) - .is_some(); - let self_has_udp_dialinfo = routing_table - .first_filtered_dial_info_detail( - RoutingDomain::PublicInternet, - &udp_dif, - ) - .is_some(); - if target_has_udp_dialinfo && self_has_udp_dialinfo { - return Ok(ContactMethod::SignalHolePunch( - inbound_relay_nr, - udp_target_nr, - )); - } - } - // Otherwise we have to inbound relay - } - - return Ok(ContactMethod::InboundRelay(inbound_relay_nr)); - } - } + if !target_public_did.class.requires_signal() { + // Go direct without signaling + return Ok(ContactMethod::Direct(target_public_did.dial_info)); } - // Go direct without signaling - else { - // If we have direct dial info we can use, do it - if let Some(did) = opt_public_did { - return Ok(ContactMethod::Direct(did.dial_info)); - } - } - } else { - // If the other node is not inbound capable at all, it is using a full relay - if let Some(target_inbound_relay_nr) = target_node_ref.relay() { - // Can we reach the full relay? - if target_inbound_relay_nr + + // Get the target's inbound relay, it must have one or it is not reachable + if let Some(inbound_relay_nr) = target_node_ref.relay() { + // Can we reach the inbound relay? + if inbound_relay_nr .first_filtered_dial_info_detail(Some(RoutingDomain::PublicInternet)) .is_some() { - return Ok(ContactMethod::InboundRelay(target_inbound_relay_nr)); + // Can we receive anything inbound ever? + if matches!(our_network_class, NetworkClass::InboundCapable) { + // Get the best match dial info for an reverse inbound connection + let reverse_dif = DialInfoFilter::global() + .with_protocol_set(target_node_ref.outbound_protocols()); + if let Some(reverse_did) = routing_table.first_filtered_dial_info_detail( + Some(RoutingDomain::PublicInternet), + &reverse_dif, + ) { + // Can we receive a direct reverse connection? + if !reverse_did.class.requires_signal() { + return Ok(ContactMethod::SignalReverse( + inbound_relay_nr, + target_node_ref, + )); + } + } + + // Does we and the target have outbound protocols to hole-punch? + if our_protocol_config.outbound.contains(ProtocolType::UDP) + && target_node_ref + .outbound_protocols() + .contains(ProtocolType::UDP) + { + // Do the target and self nodes have a direct udp dialinfo + let udp_dif = + DialInfoFilter::global().with_protocol_type(ProtocolType::UDP); + let mut udp_target_nr = target_node_ref.clone(); + udp_target_nr.filter_protocols(ProtocolSet::only(ProtocolType::UDP)); + let target_has_udp_dialinfo = target_node_ref + .first_filtered_dial_info_detail(Some( + RoutingDomain::PublicInternet, + )) + .is_some(); + let self_has_udp_dialinfo = routing_table + .first_filtered_dial_info_detail( + Some(RoutingDomain::PublicInternet), + &udp_dif, + ) + .is_some(); + if target_has_udp_dialinfo && self_has_udp_dialinfo { + return Ok(ContactMethod::SignalHolePunch( + inbound_relay_nr, + udp_target_nr, + )); + } + } + // Otherwise we have to inbound relay + } + + return Ok(ContactMethod::InboundRelay(inbound_relay_nr)); } } } + // If the other node is not inbound capable at all, it is using a full relay + else if let Some(target_inbound_relay_nr) = target_node_ref.relay() { + // Can we reach the full relay? + if target_inbound_relay_nr + .first_filtered_dial_info_detail(Some(RoutingDomain::PublicInternet)) + .is_some() + { + return Ok(ContactMethod::InboundRelay(target_inbound_relay_nr)); + } + } + // If we can't reach the node by other means, try our outbound relay if we have one if let Some(relay_node) = self.relay_node() { return Ok(ContactMethod::OutboundRelay(relay_node)); @@ -868,23 +857,17 @@ impl NetworkManager { let inbound_nr = match eventual_value.await { ReceiptEvent::Returned(inbound_nr) => inbound_nr, ReceiptEvent::Expired => { - return Err(format!( - "reverse connect receipt expired from {:?}", - target_nr - )); + return Err(format!("hole punch receipt expired from {:?}", target_nr)); } ReceiptEvent::Cancelled => { - return Err(format!( - "reverse connect receipt cancelled from {:?}", - target_nr - )); + return Err(format!("hole punch receipt cancelled from {:?}", target_nr)); } }; // We expect the inbound noderef to be the same as the target noderef // if they aren't the same, we should error on this and figure out what then hell is up if target_nr != inbound_nr { - error!("unexpected noderef mismatch on reverse connect"); + error!("unexpected noderef mismatch on hole punch"); } // And now use the existing connection to send over @@ -896,10 +879,10 @@ impl NetworkManager { .map_err(logthru_net!())? { None => Ok(()), - Some(_) => Err("unable to send over reverse connection".to_owned()), + Some(_) => Err("unable to send over hole punch".to_owned()), } } else { - Err("no reverse connection available".to_owned()) + Err("no hole punch available".to_owned()) } } @@ -947,17 +930,17 @@ impl NetworkManager { .await .map(|_| SendDataKind::GlobalIndirect) } - ContactMethod::Direct(dial_info) => this - .net() - .send_data_to_dial_info(dial_info, data) - .await - .map(|_| { - if dial_info.is_local() { - SendDataKind::LocalDirect - } else { - SendDataKind::GlobalDirect - } - }), + ContactMethod::Direct(dial_info) => { + let send_data_kind = if dial_info.is_local() { + SendDataKind::LocalDirect + } else { + SendDataKind::GlobalDirect + }; + this.net() + .send_data_to_dial_info(dial_info, data) + .await + .map(|_| send_data_kind) + } ContactMethod::SignalReverse(relay_nr, target_node_ref) => this .do_reverse_connect(relay_nr, target_node_ref, data) .await diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index 9395c74f..4f3e1e86 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -50,7 +50,6 @@ impl RoutingTable { // Get our own node's peer info (public node info) so we can share it with other nodes pub fn get_own_peer_info(&self) -> PeerInfo { let netman = self.network_manager(); - let enable_local_peer_scope = netman.config().get().network.enable_local_peer_scope; let relay_node = netman.relay_node(); PeerInfo { node_id: NodeId::new(self.node_id()), diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index b98a298b..4b0e25f1 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -187,17 +187,36 @@ impl RoutingTable { pub fn first_filtered_dial_info_detail( &self, - domain: RoutingDomain, + domain: Option, filter: &DialInfoFilter, ) -> Option { let inner = self.inner.lock(); - Self::with_routing_domain(&*inner, domain, |rd| { - for did in rd.dial_info_details { - if did.matches_filter(filter) { - return Some(did.clone()); + // Prefer local network first if it isn't filtered out + if domain == None || domain == Some(RoutingDomain::LocalNetwork) { + Self::with_routing_domain(&*inner, RoutingDomain::LocalNetwork, |rd| { + for did in &rd.dial_info_details { + if did.matches_filter(filter) { + return Some(did.clone()); + } } - } + None + }) + } else { None + } + .or_else(|| { + if domain == None || domain == Some(RoutingDomain::PublicInternet) { + Self::with_routing_domain(&*inner, RoutingDomain::PublicInternet, |rd| { + for did in &rd.dial_info_details { + if did.matches_filter(filter) { + return Some(did.clone()); + } + } + None + }) + } else { + None + } }) } @@ -211,7 +230,7 @@ impl RoutingTable { if domain == None || domain == Some(RoutingDomain::LocalNetwork) { Self::with_routing_domain(&*inner, RoutingDomain::LocalNetwork, |rd| { - for did in rd.dial_info_details { + for did in &rd.dial_info_details { if did.matches_filter(filter) { ret.push(did.clone()); } @@ -220,7 +239,7 @@ impl RoutingTable { } if domain == None || domain == Some(RoutingDomain::PublicInternet) { Self::with_routing_domain(&*inner, RoutingDomain::PublicInternet, |rd| { - for did in rd.dial_info_details { + for did in &rd.dial_info_details { if did.matches_filter(filter) { ret.push(did.clone()); } diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index e5bb9c99..2d9e8bf4 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -46,11 +46,12 @@ impl NodeRef { // Returns true if some protocols can still pass the filter and false if no protocols remain pub fn filter_protocols(&mut self, protocol_set: ProtocolSet) -> bool { if protocol_set != ProtocolSet::all() { - let mut dif = self.filter.unwrap_or_default(); + let mut dif = self.filter.clone().unwrap_or_default(); dif.protocol_set &= protocol_set; self.filter = Some(dif); } self.filter + .as_ref() .map(|f| !f.protocol_set.is_empty()) .unwrap_or(true) } @@ -79,13 +80,12 @@ impl NodeRef { self.operate(|e| e.node_info().outbound_protocols) } pub fn relay(&self) -> Option { - let target_rpi = self.operate(|e| e.node_info().relay_peer_info)?; - + let target_rpi = self.operate(|e| e.node_info().relay_peer_info.clone())?; self.routing_table .register_node_with_node_info(target_rpi.node_id.key, target_rpi.node_info) .map_err(logthru_rtab!(error)) .ok() - .map(|nr| { + .map(|mut nr| { nr.set_filter(self.filter_ref().cloned()); nr }) @@ -95,16 +95,20 @@ impl NodeRef { routing_domain: Option, ) -> Option { self.operate(|e| { + // Prefer local dial info first unless it is filtered out if (routing_domain == None || routing_domain == Some(RoutingDomain::LocalNetwork)) && matches!( - self.filter.map(|f| f.peer_scope).unwrap_or(PeerScope::All), + self.filter + .as_ref() + .map(|f| f.peer_scope) + .unwrap_or(PeerScope::All), PeerScope::All | PeerScope::Local ) { e.local_node_info() .first_filtered_dial_info(|di| { - if let Some(filter) = self.filter { - di.matches_filter(&filter) + if let Some(filter) = self.filter.as_ref() { + di.matches_filter(filter) } else { true } @@ -119,13 +123,16 @@ impl NodeRef { .or_else(|| { if (routing_domain == None || routing_domain == Some(RoutingDomain::PublicInternet)) && matches!( - self.filter.map(|f| f.peer_scope).unwrap_or(PeerScope::All), + self.filter + .as_ref() + .map(|f| f.peer_scope) + .unwrap_or(PeerScope::All), PeerScope::All | PeerScope::Global ) { e.node_info().first_filtered_dial_info_detail(|did| { - if let Some(filter) = self.filter { - did.matches_filter(&filter) + if let Some(filter) = self.filter.as_ref() { + did.matches_filter(filter) } else { true } @@ -143,15 +150,19 @@ impl NodeRef { ) -> Vec { let mut out = Vec::new(); self.operate(|e| { + // Prefer local dial info first unless it is filtered out if (routing_domain == None || routing_domain == Some(RoutingDomain::LocalNetwork)) && matches!( - self.filter.map(|f| f.peer_scope).unwrap_or(PeerScope::All), + self.filter + .as_ref() + .map(|f| f.peer_scope) + .unwrap_or(PeerScope::All), PeerScope::All | PeerScope::Local ) { for di in e.local_node_info().all_filtered_dial_info(|di| { - if let Some(filter) = self.filter { - di.matches_filter(&filter) + if let Some(filter) = self.filter.as_ref() { + di.matches_filter(filter) } else { true } @@ -164,13 +175,16 @@ impl NodeRef { } if (routing_domain == None || routing_domain == Some(RoutingDomain::PublicInternet)) && matches!( - self.filter.map(|f| f.peer_scope).unwrap_or(PeerScope::All), + self.filter + .as_ref() + .map(|f| f.peer_scope) + .unwrap_or(PeerScope::All), PeerScope::All | PeerScope::Global ) { out.append(&mut e.node_info().all_filtered_dial_info_details(|did| { - if let Some(filter) = self.filter { - did.matches_filter(&filter) + if let Some(filter) = self.filter.as_ref() { + did.matches_filter(filter) } else { true } diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 2a50ff78..935e2e23 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -1378,49 +1378,52 @@ impl RPCProcessor { // Wait for reply let (rpcreader, latency) = self.wait_for_reply(waitable_reply).await?; - let response_operation = rpcreader - .reader - .get_root::() - .map_err(map_error_capnp_error!()) - .map_err(logthru_rpc!())?; - let info_a = match response_operation - .get_detail() - .which() - .map_err(map_error_capnp_notinschema!()) - .map_err(logthru_rpc!())? - { - veilid_capnp::operation::detail::InfoA(a) => { - a.map_err(map_error_internal!("Invalid InfoA"))? + let (sender_info, node_status) = { + let response_operation = rpcreader + .reader + .get_root::() + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?; + let info_a = match response_operation + .get_detail() + .which() + .map_err(map_error_capnp_notinschema!()) + .map_err(logthru_rpc!())? + { + veilid_capnp::operation::detail::InfoA(a) => { + a.map_err(map_error_internal!("Invalid InfoA"))? + } + _ => return Err(rpc_error_internal("Incorrect RPC answer for question")), + }; + + // Decode node info + if !info_a.has_node_status() { + return Err(rpc_error_internal("Missing node status")); } - _ => return Err(rpc_error_internal("Incorrect RPC answer for question")), + let nsr = info_a + .get_node_status() + .map_err(map_error_internal!("Broken node status"))?; + let node_status = decode_node_status(&nsr)?; + + // Decode sender info + let sender_info = if info_a.has_sender_info() { + let sir = info_a + .get_sender_info() + .map_err(map_error_internal!("Broken sender info"))?; + decode_sender_info(&sir)? + } else { + SenderInfo::default() + }; + + // Update latest node status in routing table + peer.operate(|e| { + e.update_node_status(node_status.clone()); + }); + + (sender_info, node_status) }; - // Decode node info - if !info_a.has_node_status() { - return Err(rpc_error_internal("Missing node status")); - } - let nsr = info_a - .get_node_status() - .map_err(map_error_internal!("Broken node status"))?; - let node_status = decode_node_status(&nsr)?; - - // Decode sender info - let sender_info = if info_a.has_sender_info() { - let sir = info_a - .get_sender_info() - .map_err(map_error_internal!("Broken sender info"))?; - decode_sender_info(&sir)? - } else { - SenderInfo::default() - }; - - // Update latest node status in routing table - peer.operate(|e| { - e.update_node_status(node_status.clone()); - }); - // Report sender_info IP addresses to network manager - if let Some(socket_address) = sender_info.socket_address { match send_data_kind { SendDataKind::LocalDirect => { diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 7d67f858..0dbc0903 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -326,7 +326,7 @@ impl NodeInfo { F: Fn(&DialInfoDetail) -> bool, { for did in &self.dial_info_detail_list { - if filter(&did) { + if filter(did) { return Some(did.clone()); } } @@ -340,7 +340,7 @@ impl NodeInfo { let mut dial_info_detail_list = Vec::new(); for did in &self.dial_info_detail_list { - if filter(&did) { + if filter(did) { dial_info_detail_list.push(did.clone()); } } @@ -446,6 +446,7 @@ impl LocalNodeInfo { } } +#[allow(clippy::derive_hash_xor_eq)] #[derive(Debug, PartialOrd, Ord, Hash, Serialize, Deserialize, EnumSetType)] // Keep member order appropriate for sorting < preference // Must match DialInfo order