From 5b0ade9f49eb1df5a360ea9afd627716fb4008ac Mon Sep 17 00:00:00 2001 From: John Smith Date: Wed, 20 Apr 2022 20:49:16 -0400 Subject: [PATCH] refactor checkpoint --- Cargo.lock | 2 + veilid-core/Cargo.toml | 1 + veilid-core/src/network_manager.rs | 146 +++++++++--------- veilid-core/src/routing_table/node_ref.rs | 50 +++++- .../src/rpc_processor/coders/protocol_set.rs | 28 ++-- veilid-core/src/veilid_api/mod.rs | 73 ++++----- 6 files changed, 168 insertions(+), 132 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bd891801..dfac024d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1366,6 +1366,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6216d2c19a6fb5f29d1ada1dc7bc4367a8cbf0fa4af5cf12e07b5bbdde6b5b2c" dependencies = [ "enumset_derive", + "serde 1.0.136", ] [[package]] @@ -4196,6 +4197,7 @@ dependencies = [ "digest 0.9.0", "directories", "ed25519-dalek", + "enumset", "flume", "futures-util", "generic-array 0.14.5", diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml index a8584793..1e4208d3 100644 --- a/veilid-core/Cargo.toml +++ b/veilid-core/Cargo.toml @@ -35,6 +35,7 @@ directories = "^4" once_cell = "^1" json = "^0" flume = { version = "^0", features = ["async"] } +enumset = { version= "^1", features = ["serde"] } ed25519-dalek = { version = "^1", default_features = false, features = ["alloc", "u64_backend"] } x25519-dalek = { package = "x25519-dalek-ng", version = "^1", default_features = false, features = ["u64_backend"] } diff --git a/veilid-core/src/network_manager.rs b/veilid-core/src/network_manager.rs index c31d4dbc..b9adebc7 100644 --- a/veilid-core/src/network_manager.rs +++ b/veilid-core/src/network_manager.rs @@ -70,12 +70,18 @@ struct ClientWhitelistEntry { // Mechanism required to contact another node enum ContactMethod { - Unreachable, // Node is not reachable by any means - Direct(DialInfo), // Contact the node directly - SignalReverse(NodeRef), // Request via signal the node connect back directly - SignalHolePunch(NodeRef), // Request via signal the node negotiate a hole punch - InboundRelay(NodeRef), // Must use an inbound relay to reach the node - OutboundRelay(NodeRef), // Must use outbound relay to reach the node + Unreachable, // Node is not reachable by any means + Direct(DialInfo), // Contact the node directly + SignalReverse(NodeRef, NodeRef), // Request via signal the node connect back directly + SignalHolePunch(NodeRef, NodeRef), // Request via signal the node negotiate a hole punch + InboundRelay(NodeRef), // Must use an inbound relay to reach the node + OutboundRelay(NodeRef), // Must use outbound relay to reach the node +} + +#[derive(Copy, Clone, Debug)] +pub enum SendDataKind { + Direct, + Indirect, } // The mutable state of the network manager @@ -488,18 +494,14 @@ impl NetworkManager { let routing_table = self.routing_table(); // Add the peer info to our routing table - let peer_nr = routing_table + let mut peer_nr = routing_table .register_node_with_node_info(peer_info.node_id.key, peer_info.node_info)?; // Get the udp direct dialinfo for the hole punch - let hole_punch_dial_info = if let Some(hpdi) = peer_nr - .node_info() - .first_filtered_dial_info(|di| matches!(di.protocol_type(), ProtocolType::UDP)) - { - hpdi - } else { - return Err("No hole punch capable dialinfo found for node".to_owned()); - }; + peer_nr.filter_protocols(ProtocolSet::only(ProtocolType::UDP)); + let hole_punch_dial_info = peer_nr + .first_filtered_dial_info() + .ok_or_else(|| "No hole punch capable dialinfo found for node".to_owned())?; // Do our half of the hole punch by sending an empty packet // Both sides will do this and then the receipt will get sent over the punched hole @@ -609,60 +611,65 @@ impl NetworkManager { } // Figure out how to reach a node - fn get_contact_method(&self, node_ref: &NodeRef) -> Result { + fn get_contact_method(&self, node_ref: NodeRef) -> Result { // Get our network class and protocol config let our_network_class = self.get_network_class().unwrap_or(NetworkClass::Invalid); let our_protocol_config = self.get_protocol_config().unwrap(); - // See if this is a local node reachable directly - let local_node_info = node_ref.local_node_info(); - if let Some(local_direct_dial_info) = local_node_info - .first_filtered_dial_info(|di| our_protocol_config.outbound.filter_dial_info(di)) - { - return Ok(ContactMethod::Direct(local_direct_dial_info)); + // Scope noderef down to protocols we can do outbound + if !node_ref.filter_protocols(our_protocol_config.outbound) { + return Ok(ContactMethod::Unreachable); } // Get the best matching direct dial info if we have it - let target_node_info = node_ref.node_info(); - let opt_direct_dial_info = target_node_info - .first_filtered_dial_info(|di| our_protocol_config.outbound.filter_dial_info(di)); + let opt_direct_dial_info = node_ref.first_filtered_dial_info(); + + // See if this is a local node reachable directly + if let Some(direct_dial_info) = opt_direct_dial_info { + if direct_dial_info.is_local() { + return Ok(ContactMethod::Direct(direct_dial_info)); + } + } // Can the target node do inbound? - if target_node_info.network_class.inbound_capable() { + let target_network_class = node_ref.network_class(); + if target_network_class.inbound_capable() { // Do we need to signal before going inbound? - if target_node_info.network_class.inbound_requires_signal() { + if target_network_class.inbound_requires_signal() { // Get the target's inbound relay, it must have one or it is not reachable - if let Some(target_rpi) = target_node_info.relay_peer_info { + if let Some(inbound_relay_nr) = node_ref.relay() { // Can we reach the inbound relay? - if target_rpi - .node_info - .first_filtered_dial_info(|di| { - our_protocol_config.outbound.filter_dial_info(di) - }) - .is_some() - { - let target_inbound_relay_nr = - self.routing_table().register_node_with_node_info( - target_rpi.node_id.key, - target_rpi.node_info, - )?; - + if inbound_relay_nr.first_filtered_dial_info().is_some() { // Can we receive anything inbound ever? if our_network_class.inbound_capable() { // Can we receive a direct reverse connection? if !our_network_class.inbound_requires_signal() { - return Ok(ContactMethod::SignalReverse(target_inbound_relay_nr)); + return Ok(ContactMethod::SignalReverse( + inbound_relay_nr, + node_ref, + )); } // Can we hole-punch? - else if our_protocol_config.inbound.udp - && target_node_info.outbound_protocols.udp + else if our_protocol_config.inbound.contains(ProtocolType::UDP) + && node_ref.outbound_protocols().contains(ProtocolType::UDP) { - return Ok(ContactMethod::SignalHolePunch(target_inbound_relay_nr)); + let udp_inbound_relay_nr = inbound_relay_nr.clone(); + let udp_target_nr = node_ref.clone(); + let can_reach_inbound_relay = udp_inbound_relay_nr + .filter_protocols(ProtocolSet::only(ProtocolType::UDP)); + let can_reach_target = udp_target_nr + .filter_protocols(ProtocolSet::only(ProtocolType::UDP)); + if can_reach_inbound_relay && can_reach_target { + return Ok(ContactMethod::SignalHolePunch( + udp_inbound_relay_nr, + udp_target_nr, + )); + } } // Otherwise we have to inbound relay } - return Ok(ContactMethod::InboundRelay(target_inbound_relay_nr)); + return Ok(ContactMethod::InboundRelay(inbound_relay_nr)); } } } @@ -675,20 +682,9 @@ impl NetworkManager { } } else { // If the other node is not inbound capable at all, it is using a full relay - if let Some(target_rpi) = target_node_info.relay_peer_info { + if let Some(target_inbound_relay_nr) = node_ref.relay() { // Can we reach the full relay? - if target_rpi - .node_info - .first_filtered_dial_info(|di| { - our_protocol_config.outbound.filter_dial_info(di) - }) - .is_some() - { - let target_inbound_relay_nr = - self.routing_table().register_node_with_node_info( - target_rpi.node_id.key, - target_rpi.node_info, - )?; + if target_inbound_relay_nr.first_filtered_dial_info().is_some() { return Ok(ContactMethod::InboundRelay(target_inbound_relay_nr)); } } @@ -777,6 +773,16 @@ impl NetworkManager { target_nr: NodeRef, data: Vec, ) -> Result<(), String> { + // Ensure we are filtered down to UDP (the only hole punch protocol supported today) + assert!(relay_nr + .filter_ref() + .map(|dif| dif.protocol_set == ProtocolSet::only(ProtocolType::UDP)) + .unwrap_or_default()); + assert!(target_nr + .filter_ref() + .map(|dif| dif.protocol_set == ProtocolSet::only(ProtocolType::UDP)) + .unwrap_or_default()); + // Build a return receipt for the signal let receipt_timeout = ms_to_us(self.config.get().network.reverse_connection_receipt_time_ms); @@ -788,14 +794,9 @@ impl NetworkManager { let peer_info = self.routing_table().get_own_peer_info(); // Get the udp direct dialinfo for the hole punch - let hole_punch_dial_info = if let Some(hpdi) = target_nr - .node_info() - .first_filtered_dial_info(|di| matches!(di.protocol_type(), ProtocolType::UDP)) - { - hpdi - } else { - return Err("No hole punch capable dialinfo found for node".to_owned()); - }; + let hole_punch_dial_info = target_nr + .first_filtered_dial_info() + .ok_or_else(|| "No hole punch capable dialinfo found for node".to_owned())?; // Do our half of the hole punch by sending an empty packet // Both sides will do this and then the receipt will get sent over the punched hole @@ -887,18 +888,19 @@ impl NetworkManager { }; // If we don't have last_connection, try to reach out to the peer via its dial info - match this.get_contact_method(&node_ref).map_err(logthru_net!())? { + match this.get_contact_method(node_ref).map_err(logthru_net!())? { ContactMethod::OutboundRelay(relay_nr) | ContactMethod::InboundRelay(relay_nr) => { this.send_data(relay_nr, data).await } ContactMethod::Direct(dial_info) => { this.net().send_data_to_dial_info(dial_info, data).await } - ContactMethod::SignalReverse(relay_nr) => { - this.do_reverse_connect(relay_nr, node_ref, data).await + ContactMethod::SignalReverse(relay_nr, target_node_ref) => { + this.do_reverse_connect(relay_nr, target_node_ref, data) + .await } - ContactMethod::SignalHolePunch(relay_nr) => { - this.do_hole_punch(relay_nr, node_ref, data).await + ContactMethod::SignalHolePunch(relay_nr, target_node_ref) => { + this.do_hole_punch(relay_nr, target_node_ref, data).await } ContactMethod::Unreachable => Err("Can't send to this relay".to_owned()), } diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 35344762..ecf8040f 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -31,6 +31,30 @@ impl NodeRef { self.node_id } + pub fn filter_ref(&self) -> Option<&DialInfoFilter> { + self.filter.as_ref() + } + + pub fn take_filter(&mut self) -> Option { + self.filter.take() + } + + pub fn set_filter(&mut self, filter: Option) { + self.filter = filter + } + + // Returns true if some protocols can still pass the filter and false if no protocols remain + pub fn filter_protocols(&mut self, protocol_set: ProtocolSet) -> bool { + if protocol_set != ProtocolSet::all() { + let mut dif = self.filter.unwrap_or_default(); + dif.protocol_set &= protocol_set; + self.filter = Some(dif); + } + self.filter + .map(|f| !f.protocol_set.is_empty()) + .unwrap_or(true) + } + pub fn operate(&self, f: F) -> T where F: FnOnce(&mut BucketEntry) -> T, @@ -48,13 +72,31 @@ impl NodeRef { self.operate(|e| e.set_seen_our_node_info(true)); } + pub fn network_class(&self) -> NetworkClass { + self.operate(|e| e.node_info().network_class) + } + pub fn outbound_protocols(&self) -> ProtocolSet { + self.operate(|e| e.node_info().outbound_protocols) + } + pub fn relay(&self) -> Option { + let target_rpi = self.operate(|e| e.node_info().relay_peer_info)?; + + self.routing_table + .register_node_with_node_info(target_rpi.node_id.key, target_rpi.node_info) + .map_err(logthru_rtab!(error)) + .ok() + .map(|nr| { + nr.set_filter(self.filter_ref().cloned()); + nr + }) + } pub fn first_filtered_dial_info(&self) -> Option { self.operate(|e| { if matches!( self.filter.map(|f| f.peer_scope).unwrap_or(PeerScope::All), - PeerScope::All | PeerScope::Global + PeerScope::All | PeerScope::Local ) { - e.node_info().first_filtered_dial_info(|di| { + e.local_node_info().first_filtered_dial_info(|di| { if let Some(filter) = self.filter { di.matches_filter(&filter) } else { @@ -67,9 +109,9 @@ impl NodeRef { .or_else(|| { if matches!( self.filter.map(|f| f.peer_scope).unwrap_or(PeerScope::All), - PeerScope::All | PeerScope::Local + PeerScope::All | PeerScope::Global ) { - e.local_node_info().first_filtered_dial_info(|di| { + e.node_info().first_filtered_dial_info(|di| { if let Some(filter) = self.filter { di.matches_filter(&filter) } else { diff --git a/veilid-core/src/rpc_processor/coders/protocol_set.rs b/veilid-core/src/rpc_processor/coders/protocol_set.rs index 545d776d..0fd3dc02 100644 --- a/veilid-core/src/rpc_processor/coders/protocol_set.rs +++ b/veilid-core/src/rpc_processor/coders/protocol_set.rs @@ -5,10 +5,10 @@ pub fn encode_protocol_set( protocol_set: &ProtocolSet, builder: &mut veilid_capnp::protocol_set::Builder, ) -> Result<(), RPCError> { - builder.set_udp(protocol_set.udp); - builder.set_tcp(protocol_set.tcp); - builder.set_ws(protocol_set.ws); - builder.set_wss(protocol_set.wss); + builder.set_udp(protocol_set.contains(ProtocolType::UDP)); + builder.set_tcp(protocol_set.contains(ProtocolType::TCP)); + builder.set_ws(protocol_set.contains(ProtocolType::WS)); + builder.set_wss(protocol_set.contains(ProtocolType::WSS)); Ok(()) } @@ -16,10 +16,18 @@ pub fn encode_protocol_set( pub fn decode_protocol_set( reader: &veilid_capnp::protocol_set::Reader, ) -> Result { - Ok(ProtocolSet { - udp: reader.reborrow().get_udp(), - tcp: reader.reborrow().get_tcp(), - ws: reader.reborrow().get_ws(), - wss: reader.reborrow().get_wss(), - }) + let mut out = ProtocolSet::new(); + if reader.reborrow().get_udp() { + out.insert(ProtocolType::UDP); + } + if reader.reborrow().get_tcp() { + out.insert(ProtocolType::TCP); + } + if reader.reborrow().get_ws() { + out.insert(ProtocolType::WS); + } + if reader.reborrow().get_wss() { + out.insert(ProtocolType::WSS); + } + Ok(out) } diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 6302564c..cec18efc 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -23,6 +23,7 @@ pub use rpc_processor::InfoAnswer; use core::fmt; use core_context::{api_shutdown, VeilidCoreContext}; +use enumset::*; use rpc_processor::{RPCError, RPCProcessor}; use serde::*; use xx::*; @@ -375,7 +376,6 @@ impl NodeInfo { #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct LocalNodeInfo { - pub outbound_protocols: ProtocolSet, pub dial_info_list: Vec, } @@ -411,7 +411,7 @@ impl LocalNodeInfo { } } -#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)] +#[derive(Debug, PartialOrd, Ord, Hash, Serialize, Deserialize, EnumSetType)] // The derived ordering here is the order of preference, lower is preferred for connections // Must match DialInfo order pub enum ProtocolType { @@ -430,27 +430,7 @@ impl ProtocolType { } } -#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize)] -pub struct ProtocolSet { - pub udp: bool, - pub tcp: bool, - pub ws: bool, - pub wss: bool, -} - -impl ProtocolSet { - pub fn contains(&self, protocol_type: ProtocolType) -> bool { - match protocol_type { - ProtocolType::UDP => self.udp, - ProtocolType::TCP => self.tcp, - ProtocolType::WS => self.ws, - ProtocolType::WSS => self.wss, - } - } - pub fn filter_dial_info(&self, di: &DialInfo) -> bool { - self.contains(di.protocol_type()) - } -} +pub type ProtocolSet = EnumSet; #[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)] pub enum AddressType { @@ -598,63 +578,68 @@ impl FromStr for SocketAddress { ////////////////////////////////////////////////////////////////// -#[derive(Clone, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct DialInfoFilter { pub peer_scope: PeerScope, - pub protocol_type: Option, + pub protocol_set: ProtocolSet, pub address_type: Option, } +impl Default for DialInfoFilter { + fn default() -> Self { + Self { + peer_scope: PeerScope::All, + protocol_set: ProtocolSet::all(), + address_type: None, + } + } +} + impl DialInfoFilter { pub fn all() -> Self { Self { peer_scope: PeerScope::All, - protocol_type: None, + protocol_set: ProtocolSet::all(), address_type: None, } } pub fn global() -> Self { Self { peer_scope: PeerScope::Global, - protocol_type: None, + protocol_set: ProtocolSet::all(), address_type: None, } } pub fn local() -> Self { Self { peer_scope: PeerScope::Local, - protocol_type: None, + protocol_set: ProtocolSet::all(), address_type: None, } } pub fn scoped(peer_scope: PeerScope) -> Self { Self { peer_scope, - protocol_type: None, + protocol_set: ProtocolSet::all(), address_type: None, } } pub fn with_protocol_type(mut self, protocol_type: ProtocolType) -> Self { - self.protocol_type = Some(protocol_type); + self.protocol_set = ProtocolSet::only(protocol_type); self } pub fn with_address_type(mut self, address_type: AddressType) -> Self { self.address_type = Some(address_type); self } - pub fn is_empty(&self) -> bool { - self.peer_scope == PeerScope::All - && self.protocol_type.is_none() - && self.address_type.is_none() - } } impl fmt::Debug for DialInfoFilter { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { let mut out = String::new(); out += &format!("{:?}", self.peer_scope); - if let Some(pt) = self.protocol_type { - out += &format!("+{:?}", pt); + if self.protocol_set != ProtocolSet::all() { + out += &format!("+{:?}", self.protocol_set); } if let Some(at) = self.address_type { out += &format!("+{:?}", at); @@ -919,7 +904,7 @@ impl DialInfo { } else { PeerScope::All }, - protocol_type: Some(self.protocol_type()), + protocol_set: ProtocolSet::only(self.protocol_type()), address_type: Some(self.address_type()), } } @@ -930,10 +915,8 @@ impl MatchesDialInfoFilter for DialInfo { if !self.matches_peer_scope(filter.peer_scope) { return false; } - if let Some(pt) = filter.protocol_type { - if self.protocol_type() != pt { - return false; - } + if !filter.protocol_set.contains(self.protocol_type()) { + return false; } if let Some(at) = filter.address_type { if self.address_type() != at { @@ -1026,10 +1009,8 @@ impl MatchesDialInfoFilter for ConnectionDescriptor { if !self.matches_peer_scope(filter.peer_scope) { return false; } - if let Some(pt) = filter.protocol_type { - if self.protocol_type() != pt { - return false; - } + if filter.protocol_set.contains(self.protocol_type()) { + return false; } if let Some(at) = filter.address_type { if self.address_type() != at {