From 9966d25672381471610a6e9231b33854338a7149 Mon Sep 17 00:00:00 2001 From: John Smith Date: Wed, 31 Aug 2022 21:41:48 -0400 Subject: [PATCH] more routingdomain refactor --- veilid-core/proto/veilid.capnp | 5 +- veilid-core/src/network_manager/mod.rs | 2 +- .../native/network_class_discovery.rs | 10 +- .../network_manager/native/start_protocols.rs | 6 +- veilid-core/src/network_manager/tasks.rs | 4 +- veilid-core/src/routing_table/bucket_entry.rs | 166 +++++++++---- veilid-core/src/routing_table/find_nodes.rs | 104 +++++---- veilid-core/src/routing_table/mod.rs | 22 +- veilid-core/src/routing_table/node_ref.rs | 16 +- veilid-core/src/rpc_processor/mod.rs | 219 ++++++++++++++++-- .../src/rpc_processor/rpc_node_info_update.rs | 17 +- veilid-core/src/veilid_api/mod.rs | 194 ++++++---------- 12 files changed, 486 insertions(+), 279 deletions(-) diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index c8bd457f..dc2d67fc 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -200,14 +200,11 @@ struct DialInfoDetail { } struct NodeStatus { - # PublicInternet RoutingDomain Status willRoute @0 :Bool; willTunnel @1 :Bool; willSignal @2 :Bool; willRelay @3 :Bool; willValidateDialInfo @4 :Bool; - # LocalNetwork RoutingDomain Status - # TODO } struct ProtocolTypeSet { @@ -224,7 +221,7 @@ struct AddressTypeSet { struct NodeInfo { networkClass @0 :NetworkClass; # network class of this node - outboundProtocols @1 :ProtocolTypeSet; # protocols that can go outbound + outboundProtocols @1 :ProtocolTypeSet; # protocols that can go outbound addressTypes @2 :AddressTypeSet; # address types supported minVersion @3 :UInt8; # minimum protocol version for rpc maxVersion @4 :UInt8; # maximum protocol version for rpc diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 2ea2bc54..fb7986fe 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -1884,7 +1884,7 @@ impl NetworkManager { unord.push(async move { // Update the node if let Err(e) = rpc - .rpc_call_node_info_update(Destination::Direct(nr.clone()), None) + .rpc_call_node_info_update(nr.clone, routing_domain) .await { // Not fatal, but we should be able to see if this is happening diff --git a/veilid-core/src/network_manager/native/network_class_discovery.rs b/veilid-core/src/network_manager/native/network_class_discovery.rs index 6504fa17..b1ff1d6e 100644 --- a/veilid-core/src/network_manager/native/network_class_discovery.rs +++ b/veilid-core/src/network_manager/native/network_class_discovery.rs @@ -118,11 +118,13 @@ impl DiscoveryContext { // Build an filter that matches our protocol and address type // and excludes relays so we can get an accurate external address - let dial_info_filter = DialInfoFilter::global() + let dial_info_filter = DialInfoFilter::all() .with_protocol_type(protocol_type) .with_address_type(address_type); - let inbound_dial_info_entry_filter = - RoutingTable::make_inbound_dial_info_entry_filter(dial_info_filter.clone()); + let inbound_dial_info_entry_filter = RoutingTable::make_inbound_dial_info_entry_filter( + RoutingDomain::PublicInternet, + dial_info_filter.clone(), + ); let disallow_relays_filter = move |e: &BucketEntryInner| { if let Some(n) = e.node_info(RoutingDomain::PublicInternet) { n.relay_peer_info.is_none() @@ -169,7 +171,7 @@ impl DiscoveryContext { protocol_type: ProtocolType, address_type: AddressType, ) -> Vec { - let filter = DialInfoFilter::local() + let filter = DialInfoFilter::all() .with_protocol_type(protocol_type) .with_address_type(address_type); self.routing_table diff --git a/veilid-core/src/network_manager/native/start_protocols.rs b/veilid-core/src/network_manager/native/start_protocols.rs index 0965ab5d..e61ca869 100644 --- a/veilid-core/src/network_manager/native/start_protocols.rs +++ b/veilid-core/src/network_manager/native/start_protocols.rs @@ -253,12 +253,11 @@ impl Network { pub(super) async fn start_udp_listeners(&self) -> EyreResult<()> { trace!("starting udp listeners"); let routing_table = self.routing_table(); - let (listen_address, public_address, enable_local_peer_scope, detect_address_changes) = { + let (listen_address, public_address, detect_address_changes) = { let c = self.config.get(); ( c.network.protocol.udp.listen_address.clone(), c.network.protocol.udp.public_address.clone(), - c.network.enable_local_peer_scope, c.network.detect_address_changes, ) }; @@ -288,6 +287,9 @@ impl Network { // Register local dial info for di in &local_dial_info_list { + +xxx write routing table sieve for routing domain from dialinfo and local network detection and registration + // If the local interface address is global, or we are enabling local peer scope // register global dial info if no public address is specified if !detect_address_changes diff --git a/veilid-core/src/network_manager/tasks.rs b/veilid-core/src/network_manager/tasks.rs index 91e18588..01c705cf 100644 --- a/veilid-core/src/network_manager/tasks.rs +++ b/veilid-core/src/network_manager/tasks.rs @@ -476,7 +476,9 @@ impl NetworkManager { // Otherwise we must need an inbound relay } else { // Find a node in our routing table that is an acceptable inbound relay - if let Some(nr) = routing_table.find_inbound_relay(cur_ts) { + if let Some(nr) = + routing_table.find_inbound_relay(RoutingDomain::PublicInternet, cur_ts) + { info!("Inbound relay node selected: {}", nr); routing_table.set_relay_node(RoutingDomain::PublicInternet, Some(nr)); node_info_changed = true; diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 9f8f6bcd..6c63ee0a 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -39,15 +39,37 @@ pub enum BucketEntryState { } #[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] -struct LastConnectionKey(PeerScope, ProtocolType, AddressType); +struct LastConnectionKey(ProtocolType, AddressType); + +/// Bucket entry information specific to the LocalNetwork RoutingDomain +#[derive(Debug)] +pub struct BucketEntryPublicInternet { + /// The PublicInternet node info + signed_node_info: Option>, + /// If this node has seen our publicinternet node info + seen_our_node_info: bool, + /// Last known node status + node_status: Option, +} + +/// Bucket entry information specific to the LocalNetwork RoutingDomain +#[derive(Debug)] +pub struct BucketEntryLocalNetwork { + /// The LocalNetwork node info + signed_node_info: Option>, + /// If this node has seen our localnetwork node info + seen_our_node_info: bool, + /// Last known node status + node_status: Option, +} #[derive(Debug)] pub struct BucketEntryInner { min_max_version: Option<(u8, u8)>, updated_since_last_network_change: bool, last_connections: BTreeMap, - signed_node_info: [Option>; RoutingDomain::count()], - seen_our_node_info: [bool; RoutingDomain::count()], + public_internet: BucketEntryPublicInternet, + local_network: BucketEntryLocalNetwork, peer_stats: PeerStats, latency_stats_accounting: LatencyStatsAccounting, transfer_stats_accounting: TransferStatsAccounting, @@ -127,8 +149,14 @@ impl BucketEntryInner { return; } + // Get the correct signed_node_info for the chosen routing domain + let opt_current_sni = match routing_domain { + RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, + RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, + }; + // See if we have an existing signed_node_info to update or not - if let Some(current_sni) = &self.signed_node_info[routing_domain as usize] { + if let Some(current_sni) = opt_current_sni { // If the timestamp hasn't changed or is less, ignore this update if signed_node_info.timestamp <= current_sni.timestamp { // If we received a node update with the same timestamp @@ -152,38 +180,51 @@ impl BucketEntryInner { )); // Update the signed node info - self.signed_node_info[routing_domain as usize] = Some(Box::new(signed_node_info)); + *opt_current_sni = Some(Box::new(signed_node_info)); self.updated_since_last_network_change = true; self.touch_last_seen(intf::get_timestamp()); } pub fn has_node_info(&self, opt_routing_domain: Option) -> bool { - if let Some(rd) = opt_routing_domain { - self.signed_node_info[rd as usize].is_some() + if let Some(routing_domain) = opt_routing_domain { + // Get the correct signed_node_info for the chosen routing domain + let opt_current_sni = match routing_domain { + RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, + RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, + }; + opt_current_sni.is_some() } else { - for rd in RoutingDomain::all() { - if self.signed_node_info[rd as usize].is_some() { - return true; - } + if self.local_network.signed_node_info.is_some() { + true + } else if self.public_internet.signed_node_info.is_some() { + true + } else { + false } - false } } pub fn has_valid_signed_node_info(&self, opt_routing_domain: Option) -> bool { - if let Some(rd) = opt_routing_domain { - if let Some(sni) = &self.signed_node_info[rd as usize] { + if let Some(routing_domain) = opt_routing_domain { + // Get the correct signed_node_info for the chosen routing domain + let opt_current_sni = match routing_domain { + RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, + RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, + }; + if let Some(sni) = opt_current_sni { + sni.is_valid() + } else { + false + } + } else { + if let Some(sni) = self.local_network.signed_node_info { if sni.is_valid() { return true; } } - false - } else { - for rd in RoutingDomain::all() { - if let Some(sni) = &self.signed_node_info[rd as usize] { - if sni.is_valid() { - return true; - } + if let Some(sni) = self.public_internet.signed_node_info { + if sni.is_valid() { + return true; } } false @@ -191,23 +232,26 @@ impl BucketEntryInner { } pub fn node_info(&self, routing_domain: RoutingDomain) -> Option { - self.signed_node_info[routing_domain as usize] - .as_ref() - .map(|s| s.node_info.clone()) + let opt_current_sni = match routing_domain { + RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, + RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, + }; + opt_current_sni.as_ref().map(|s| s.node_info.clone()) } pub fn peer_info(&self, key: DHTKey, routing_domain: RoutingDomain) -> Option { - self.signed_node_info[routing_domain as usize] - .as_ref() - .map(|s| PeerInfo { - node_id: NodeId::new(key), - signed_node_info: *s.clone(), - }) + let opt_current_sni = match routing_domain { + RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, + RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, + }; + opt_current_sni.as_ref().map(|s| PeerInfo { + node_id: NodeId::new(key), + signed_node_info: *s.clone(), + }) } fn descriptor_to_key(last_connection: ConnectionDescriptor) -> LastConnectionKey { LastConnectionKey( - last_connection.peer_scope(), last_connection.protocol_type(), last_connection.address_type(), ) @@ -232,13 +276,11 @@ impl BucketEntryInner { ) -> Option<(ConnectionDescriptor, u64)> { // Iterate peer scopes and protocol types and address type in order to ensure we pick the preferred protocols if all else is the same let dif = dial_info_filter.unwrap_or_default(); - for ps in dif.peer_scope_set { - for pt in dif.protocol_type_set { - for at in dif.address_type_set { - let key = LastConnectionKey(ps, pt, at); - if let Some(v) = self.last_connections.get(&key) { - return Some(*v); - } + for pt in dif.protocol_type_set { + for at in dif.address_type_set { + let key = LastConnectionKey(pt, at); + if let Some(v) = self.last_connections.get(&key) { + return Some(*v); } } } @@ -267,15 +309,44 @@ impl BucketEntryInner { } pub fn update_node_status(&mut self, status: NodeStatus) { - self.peer_stats.status = Some(status); + match status { + NodeStatus::LocalNetwork(ln) => { + self.local_network.node_status = Some(ln); + } + NodeStatus::PublicInternet(pi) => { + self.public_internet.node_status = Some(pi); + } + } + } + pub fn node_status(&self, routing_domain: RoutingDomain) -> Option { + match routing_domain { + RoutingDomain::LocalNetwork => self + .local_network + .node_status + .map(|ln| NodeStatus::LocalNetwork(ln)), + RoutingDomain::PublicInternet => self + .local_network + .node_status + .map(|pi| NodeStatus::PublicInternet(pi)), + } } pub fn set_seen_our_node_info(&mut self, routing_domain: RoutingDomain, seen: bool) { - self.seen_our_node_info[routing_domain as usize] = seen; + match routing_domain { + RoutingDomain::LocalNetwork => { + self.local_network.seen_our_node_info = seen; + } + RoutingDomain::PublicInternet => { + self.public_internet.seen_our_node_info = seen; + } + } } pub fn has_seen_our_node_info(&self, routing_domain: RoutingDomain) -> bool { - self.seen_our_node_info[routing_domain as usize] + match routing_domain { + RoutingDomain::LocalNetwork => self.local_network.seen_our_node_info, + RoutingDomain::PublicInternet => self.public_internet.seen_our_node_info, + } } pub fn set_updated_since_last_network_change(&mut self, updated: bool) { @@ -502,16 +573,23 @@ impl BucketEntry { ref_count: AtomicU32::new(0), inner: RwLock::new(BucketEntryInner { min_max_version: None, - seen_our_node_info: [false, false], updated_since_last_network_change: false, last_connections: BTreeMap::new(), - signed_node_info: [None, None], + local_network: BucketEntryLocalNetwork { + seen_our_node_info: false, + signed_node_info: None, + node_status: None, + }, + public_internet: BucketEntryPublicInternet { + seen_our_node_info: false, + signed_node_info: None, + node_status: None, + }, peer_stats: PeerStats { time_added: now, rpc_stats: RPCStats::default(), latency: None, transfer: TransferStatsDownUp::default(), - status: None, }, latency_stats_accounting: LatencyStatsAccounting::new(), transfer_stats_accounting: TransferStatsAccounting::new(), diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index c6bcccb1..a17a98f6 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -15,20 +15,17 @@ pub struct MappedPortInfo { impl RoutingTable { // Makes a filter that finds nodes with a matching inbound dialinfo pub fn make_inbound_dial_info_entry_filter( + routing_domain: RoutingDomain, dial_info_filter: DialInfoFilter, ) -> impl FnMut(&BucketEntryInner) -> bool { // does it have matching public dial info? move |e| { - for rd in RoutingDomain::all() { - if let Some(ni) = e.node_info(rd) { - if ni - .first_filtered_dial_info_detail(|did| { - did.matches_filter(&dial_info_filter) - }) - .is_some() - { - return true; - } + if let Some(ni) = e.node_info(routing_domain) { + if ni + .first_filtered_dial_info_detail(|did| did.matches_filter(&dial_info_filter)) + .is_some() + { + return true; } } false @@ -37,18 +34,17 @@ impl RoutingTable { // Makes a filter that finds nodes capable of dialing a particular outbound dialinfo pub fn make_outbound_dial_info_entry_filter( + routing_domain: RoutingDomain, dial_info: DialInfo, ) -> impl FnMut(&BucketEntryInner) -> bool { // does the node's outbound capabilities match the dialinfo? move |e| { - for rd in RoutingDomain::all() { - if let Some(ni) = e.node_info(rd) { - let mut dif = DialInfoFilter::all(); - dif = dif.with_protocol_type_set(ni.outbound_protocols); - dif = dif.with_address_type_set(ni.address_types); - if dial_info.matches_filter(&dif) { - return true; - } + if let Some(ni) = e.node_info(routing_domain) { + let dif = DialInfoFilter::all() + .with_protocol_type_set(ni.outbound_protocols) + .with_address_type_set(ni.address_types); + if dial_info.matches_filter(&dif) { + return true; } } false @@ -126,7 +122,7 @@ impl RoutingTable { let entry = v.unwrap(); entry.with(|e| { // skip nodes on our local network here - if e.node_info(RoutingDomain::LocalNetwork).is_some() { + if e.has_node_info(Some(RoutingDomain::LocalNetwork)) { return false; } @@ -452,7 +448,7 @@ impl RoutingTable { } } - fn make_relay_node_filter(&self) -> impl Fn(&BucketEntryInner) -> bool { + fn make_public_internet_relay_node_filter(&self) -> impl Fn(&BucketEntryInner) -> bool { // Get all our outbound protocol/address types let outbound_dif = self .network_manager() @@ -460,12 +456,8 @@ impl RoutingTable { let mapped_port_info = self.get_mapped_port_info(); move |e: &BucketEntryInner| { - // Ensure this node is not on our local network - let has_local_dial_info = e - .local_node_info() - .map(|l| l.has_dial_info()) - .unwrap_or(false); - if has_local_dial_info { + // Ensure this node is not on the local network + if e.has_node_info(Some(RoutingDomain::LocalNetwork)) { return false; } @@ -474,7 +466,7 @@ impl RoutingTable { let mut low_level_protocol_ports = mapped_port_info.low_level_protocol_ports.clone(); let can_serve_as_relay = e - .node_info() + .node_info(RoutingDomain::PublicInternet) .map(|n| { let dids = n.all_filtered_dial_info_details(|did| did.matches_filter(&outbound_dif)); @@ -498,9 +490,18 @@ impl RoutingTable { } #[instrument(level = "trace", skip(self), ret)] - pub fn find_inbound_relay(&self, cur_ts: u64) -> Option { + pub fn find_inbound_relay( + &self, + routing_domain: RoutingDomain, + cur_ts: u64, + ) -> Option { // Get relay filter function - let relay_node_filter = self.make_relay_node_filter(); + let relay_node_filter = match routing_domain { + RoutingDomain::PublicInternet => self.make_public_internet_relay_node_filter(), + RoutingDomain::LocalNetwork => { + unimplemented!(); + } + }; // Go through all entries and find fastest entry that matches filter function let inner = self.inner.read(); @@ -512,9 +513,9 @@ impl RoutingTable { let v2 = v.clone(); v.with(|e| { // Ensure we have the node's status - if let Some(node_status) = e.peer_stats().status.clone() { + if let Some(node_status) = e.node_status(routing_domain) { // Ensure the node will relay - if node_status.will_relay { + if node_status.will_relay() { // Compare against previous candidate if let Some(best_inbound_relay) = best_inbound_relay.as_mut() { // Less is faster @@ -541,7 +542,11 @@ impl RoutingTable { } #[instrument(level = "trace", skip(self), ret)] - pub fn register_find_node_answer(&self, peers: Vec) -> Vec { + pub fn register_find_node_answer( + &self, + routing_domain: RoutingDomain, + peers: Vec, + ) -> Vec { let node_id = self.node_id(); // register nodes we'd found @@ -561,6 +566,7 @@ impl RoutingTable { // register the node if it's new if let Some(nr) = self.register_node_with_signed_node_info( + routing_domain, p.node_id.key, p.signed_node_info.clone(), false, @@ -574,6 +580,7 @@ impl RoutingTable { #[instrument(level = "trace", skip(self), ret, err)] pub async fn find_node( &self, + routing_domain: RoutingDomain, node_ref: NodeRef, node_id: DHTKey, ) -> EyreResult>> { @@ -583,39 +590,52 @@ impl RoutingTable { rpc_processor .clone() .rpc_call_find_node( - Destination::Direct(node_ref.clone()), + Destination::direct(node_ref.clone()).with_routing_domain(routing_domain), node_id, None, - rpc_processor.make_respond_to_sender(node_ref.clone()), + rpc_processor.make_respond_to_sender(routing_domain, node_ref.clone()), ) .await? ); // register nodes we'd found Ok(NetworkResult::value( - self.register_find_node_answer(res.answer), + self.register_find_node_answer(routing_domain, res.answer), )) } #[instrument(level = "trace", skip(self), ret, err)] - pub async fn find_self(&self, node_ref: NodeRef) -> EyreResult>> { + pub async fn find_self( + &self, + routing_domain: RoutingDomain, + node_ref: NodeRef, + ) -> EyreResult>> { let node_id = self.node_id(); - self.find_node(node_ref, node_id).await + self.find_node(routing_domain, node_ref, node_id).await } #[instrument(level = "trace", skip(self), ret, err)] - pub async fn find_target(&self, node_ref: NodeRef) -> EyreResult>> { + pub async fn find_target( + &self, + routing_domain: RoutingDomain, + node_ref: NodeRef, + ) -> EyreResult>> { let node_id = node_ref.node_id(); - self.find_node(node_ref, node_id).await + self.find_node(routing_domain, node_ref, node_id).await } #[instrument(level = "trace", skip(self))] - pub async fn reverse_find_node(&self, node_ref: NodeRef, wide: bool) { + pub async fn reverse_find_node( + &self, + routing_domain: RoutingDomain, + node_ref: NodeRef, + wide: bool, + ) { // Ask bootstrap node to 'find' our own node so we can get some more nodes near ourselves // and then contact those nodes to inform -them- that we exist // Ask bootstrap server for nodes closest to our own node - let closest_nodes = network_result_value_or_log!(debug match self.find_self(node_ref.clone()).await { + let closest_nodes = network_result_value_or_log!(debug match self.find_self(routing_domain, node_ref.clone()).await { Err(e) => { log_rtab!(error "find_self failed for {:?}: {:?}", @@ -631,7 +651,7 @@ impl RoutingTable { // Ask each node near us to find us as well if wide { for closest_nr in closest_nodes { - network_result_value_or_log!(debug match self.find_self(closest_nr.clone()).await { + network_result_value_or_log!(debug match self.find_self(routing_domain, closest_nr.clone()).await { Err(e) => { log_rtab!(error "find_self failed for {:?}: {:?}", diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 1aa15f05..6ed25b7c 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -23,20 +23,6 @@ const RECENT_PEERS_TABLE_SIZE: usize = 64; ////////////////////////////////////////////////////////////////////////// -#[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Ord, Eq)] -pub enum RoutingDomain { - PublicInternet = 0, - LocalNetwork = 1, -} -impl RoutingDomain { - pub const fn count() -> usize { - 2 - } - pub const fn all() -> [RoutingDomain; RoutingDomain::count()] { - [RoutingDomain::PublicInternet, RoutingDomain::LocalNetwork] - } -} - #[derive(Debug, Default)] pub struct RoutingDomainDetail { relay_node: Option, @@ -519,7 +505,13 @@ impl RoutingTable { Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { // Only update nodes that haven't seen our node info yet if all || !v.with(|e| e.has_seen_our_node_info(routing_domain)) { - node_refs.push(NodeRef::new(self.clone(), k, v, None)); + node_refs.push(NodeRef::new( + self.clone(), + k, + v, + RoutingDomainSet::only(routing_domain), + None, + )); } Option::<()>::None }); diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 69859b7c..f86094ca 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -52,7 +52,7 @@ impl NodeRef { pub fn merge_filter(&mut self, filter: DialInfoFilter) { if let Some(self_filter) = self.filter.take() { - self.filter = Some(self_filter.filtered(filter)); + self.filter = Some(self_filter.filtered(&filter)); } else { self.filter = Some(filter); } @@ -72,19 +72,6 @@ impl NodeRef { } } - // Returns true if some protocols can still pass the filter and false if no protocols remain - // pub fn filter_protocols(&mut self, protocol_set: ProtocolSet) -> bool { - // if protocol_set != ProtocolSet::all() { - // let mut dif = self.filter.clone().unwrap_or_default(); - // dif.protocol_set &= protocol_set; - // self.filter = Some(dif); - // } - // self.filter - // .as_ref() - // .map(|f| !f.protocol_set.is_empty()) - // .unwrap_or(true) - // } - pub(super) fn operate(&self, f: F) -> T where F: FnOnce(&RoutingTableInner, &BucketEntryInner) -> T, @@ -129,7 +116,6 @@ impl NodeRef { pub fn min_max_version(&self) -> Option<(u8, u8)> { self.operate(|_rti, e| e.min_max_version()) } - pub fn set_min_max_version(&self, min_max_version: (u8, u8)) { self.operate_mut(|_rti, e| e.set_min_max_version(min_max_version)) } diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index ead3ea1d..d0edd24a 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -39,25 +39,200 @@ type OperationId = u64; /// Where to send an RPC message #[derive(Debug, Clone)] pub enum Destination { - /// Send to node (target noderef) - Direct(NodeRef), - /// Send to node for relay purposes (relay noderef, target nodeid) - Relay(NodeRef, DHTKey), + /// Send to node directly + Direct { + /// The node to send to + target: NodeRef, + /// An optional routing domain to require + routing_domain: Option, + /// An optional safety route specification to send from for sender privacy + safety_route_spec: Option>, + }, + /// Send to node for relay purposes + Relay { + /// The relay to send to + relay: NodeRef, + /// The final destination the relay should send to + target: DHTKey, + /// An optional routing domain to require + routing_domain: Option, + /// An optional safety route specification to send from for sender privacy + safety_route_spec: Option>, + }, /// Send to private route (privateroute) - PrivateRoute(PrivateRoute), + PrivateRoute { + /// A private route to send to + private_route: PrivateRoute, + /// An optional safety route specification to send from for sender privacy + safety_route_spec: Option>, + }, +} + +impl Destination { + pub fn direct(target: NodeRef) -> Self { + Self::Direct { + target, + routing_domain: None, + safety_route_spec: None, + } + } + pub fn relay(relay: NodeRef, target: DHTKey) -> Self { + Self::Relay { + relay, + target, + routing_domain: None, + safety_route_spec: None, + } + } + pub fn private_route(private_route: PrivateRoute) -> Self { + Self::PrivateRoute { + private_route, + safety_route_spec: None, + } + } + + pub fn + + pub fn routing_domain(&self) -> Option { + match self { + Destination::Direct { + target, + routing_domain, + safety_route_spec, + } => *routing_domain, + Destination::Relay { + relay, + target, + routing_domain, + safety_route_spec, + } => *routing_domain, + Destination::PrivateRoute { + private_route, + safety_route_spec, + } => Some(RoutingDomain::PublicInternet), + } + } + pub fn safety_route_spec(&self) -> Option> { + match self { + Destination::Direct { + target, + routing_domain, + safety_route_spec, + } => safety_route_spec.clone(), + Destination::Relay { + relay, + target, + routing_domain, + safety_route_spec, + } => safety_route_spec.clone(), + Destination::PrivateRoute { + private_route, + safety_route_spec, + } => safety_route_spec.clone(), + } + } + pub fn with_routing_domain(self, routing_domain: RoutingDomain) -> Self { + match self { + Destination::Direct { + target, + routing_domain: _, + safety_route_spec, + } => Self::Direct { + target, + routing_domain: Some(routing_domain), + safety_route_spec, + }, + Destination::Relay { + relay, + target, + routing_domain: _, + safety_route_spec, + } => Self::Relay { + relay, + target, + routing_domain: Some(routing_domain), + safety_route_spec, + }, + Destination::PrivateRoute { + private_route: _, + safety_route_spec: _, + } => panic!("Private route is only valid in PublicInternet routing domain"), + } + } + pub fn with_safety_route_spec(self, safety_route_spec: Arc) -> Self { + match self { + Destination::Direct { + target, + routing_domain, + safety_route_spec: _, + } => Self::Direct { + target, + routing_domain, + safety_route_spec: Some(safety_route_spec), + }, + Destination::Relay { + relay, + target, + routing_domain, + safety_route_spec: _, + } => Self::Relay { + relay, + target, + routing_domain, + safety_route_spec: Some(safety_route_spec), + }, + Destination::PrivateRoute { + private_route, + safety_route_spec: _, + } => Self::PrivateRoute { + private_route, + safety_route_spec: Some(safety_route_spec), + }, + } + } } impl fmt::Display for Destination { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - Destination::Direct(nr) => { - write!(f, "{:?}", nr) + Destination::Direct { + target, + routing_domain, + safety_route_spec, + } => { + let rd = routing_domain + .map(|rd| format!("#{:?}", rd)) + .unwrap_or_default(); + let sr = safety_route_spec + .map(|_sr| "+SR".to_owned()) + .unwrap_or_default(); + + write!(f, "{:?}{}{}", target, rd, sr) } - Destination::Relay(nr, key) => { - write!(f, "{:?}@{:?}", key.encode(), nr) + Destination::Relay { + relay, + target, + routing_domain, + safety_route_spec, + } => { + let rd = routing_domain + .map(|rd| format!("#{:?}", rd)) + .unwrap_or_default(); + let sr = safety_route_spec + .map(|_sr| "+SR".to_owned()) + .unwrap_or_default(); + + write!(f, "{:?}@{:?}{}{}", target.encode(), relay, rd, sr) } - Destination::PrivateRoute(pr) => { - write!(f, "{}", pr) + Destination::PrivateRoute { + private_route, + safety_route_spec, + } => { + let sr = safety_route_spec + .map(|_sr| "+SR".to_owned()) + .unwrap_or_default(); + + write!(f, "{}{}", private_route, sr) } } } @@ -412,18 +587,24 @@ impl RPCProcessor { /// Gets a 'RespondTo::Sender' that contains either our dial info, /// or None if the peer has seen our dial info before or our node info is not yet valid /// because of an unknown network class - pub fn make_respond_to_sender(&self, peer: NodeRef) -> RespondTo { - if peer.has_seen_our_node_info() + pub fn make_respond_to_sender( + &self, + routing_domain: RoutingDomain, + peer: NodeRef, + ) -> RespondTo { + if peer.has_seen_our_node_info(routing_domain) || matches!( self.network_manager() - .get_network_class() + .get_network_class(routing_domain) .unwrap_or(NetworkClass::Invalid), NetworkClass::Invalid ) { RespondTo::Sender(None) } else { - let our_sni = self.routing_table().get_own_signed_node_info(); + let our_sni = self + .routing_table() + .get_own_signed_node_info(routing_domain); RespondTo::Sender(Some(our_sni)) } } @@ -431,13 +612,12 @@ impl RPCProcessor { /// Produce a byte buffer that represents the wire encoding of the entire /// unencrypted envelope body for a RPC message. This incorporates /// wrapping a private and/or safety route if they are specified. - #[instrument(level = "debug", skip(self, operation, safety_route_spec), err)] + #[instrument(level = "debug", skip(self, operation), err)] fn render_operation( &self, dest: Destination, operation: &RPCOperation, - safety_route_spec: Option<&SafetyRouteSpec>, - ) -> Result { + ) -> Result { xxx continue propagating safetyroutespec let out_node_id; // Envelope Node Id let mut out_node_ref: Option = None; // Node to send envelope to let out_hop_count: usize; // Total safety + private route hop count @@ -548,7 +728,6 @@ impl RPCProcessor { &self, dest: Destination, question: RPCQuestion, - safety_route_spec: Option<&SafetyRouteSpec>, ) -> Result, RPCError> { // Wrap question in operation let operation = RPCOperation::new_question(question); @@ -624,7 +803,6 @@ impl RPCProcessor { &self, dest: Destination, statement: RPCStatement, - safety_route_spec: Option<&SafetyRouteSpec>, ) -> Result, RPCError> { // Wrap statement in operation let operation = RPCOperation::new_statement(statement); @@ -714,7 +892,6 @@ impl RPCProcessor { &self, request: RPCMessage, answer: RPCAnswer, - safety_route_spec: Option<&SafetyRouteSpec>, ) -> Result, RPCError> { // Wrap answer in operation let operation = RPCOperation::new_answer(&request.operation, answer); diff --git a/veilid-core/src/rpc_processor/rpc_node_info_update.rs b/veilid-core/src/rpc_processor/rpc_node_info_update.rs index 15b23d0c..924283b8 100644 --- a/veilid-core/src/rpc_processor/rpc_node_info_update.rs +++ b/veilid-core/src/rpc_processor/rpc_node_info_update.rs @@ -6,16 +6,23 @@ impl RPCProcessor { #[instrument(level = "trace", skip(self), ret, err)] pub async fn rpc_call_node_info_update( self, - dest: Destination, - safety_route: Option<&SafetyRouteSpec>, + target: NodeRef, + routing_domain: RoutingDomain, ) -> Result, RPCError> { - let signed_node_info = self.routing_table().get_own_signed_node_info(); - xxx add routing domain to capnp.... + let signed_node_info = self + .routing_table() + .get_own_signed_node_info(routing_domain); let node_info_update = RPCOperationNodeInfoUpdate { signed_node_info }; let statement = RPCStatement::new(RPCStatementDetail::NodeInfoUpdate(node_info_update)); // Send the node_info_update request - network_result_try!(self.statement(dest, statement, safety_route).await?); + network_result_try!( + self.statement( + Destination::direct(target).with_routing_domain(routing_domain), + statement, + ) + .await? + ); Ok(NetworkResult::value(())) } diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index d3059146..1e0b2df6 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -390,16 +390,62 @@ impl NetworkClass { } } +/// RoutingDomain-specific status for each node +/// is returned by the StatusA call + +/// PublicInternet RoutingDomain Status #[derive(Clone, Debug, Default, Serialize, Deserialize)] -pub struct NodeStatus { - // PublicInternet RoutingDomain Status +pub struct PublicInternetNodeStatus { pub will_route: bool, pub will_tunnel: bool, pub will_signal: bool, pub will_relay: bool, pub will_validate_dial_info: bool, - // LocalNetwork RoutingDomain Status - // TODO +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct LocalNetworkNodeStatus { + pub will_relay: bool, + pub will_validate_dial_info: bool, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum NodeStatus { + PublicInternet(PublicInternetNodeStatus), + LocalNetwork(LocalNetworkNodeStatus), +} + +impl NodeStatus { + pub fn will_route(&self) -> bool { + match self { + NodeStatus::PublicInternet(pi) => pi.will_route, + NodeStatus::LocalNetwork(_) => false, + } + } + pub fn will_tunnel(&self) -> bool { + match self { + NodeStatus::PublicInternet(pi) => pi.will_tunnel, + NodeStatus::LocalNetwork(_) => false, + } + } + pub fn will_signal(&self) -> bool { + match self { + NodeStatus::PublicInternet(pi) => pi.will_signal, + NodeStatus::LocalNetwork(_) => false, + } + } + pub fn will_relay(&self) -> bool { + match self { + NodeStatus::PublicInternet(pi) => pi.will_relay, + NodeStatus::LocalNetwork(ln) => ln.will_relay, + } + } + pub fn will_validate_dial_info(&self) -> bool { + match self { + NodeStatus::PublicInternet(pi) => pi.will_validate_dial_info, + NodeStatus::LocalNetwork(ln) => ln.will_validate_dial_info, + } + } } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -556,13 +602,23 @@ pub enum AddressType { } pub type AddressTypeSet = EnumSet; +// Routing domain here is listed in order of preference, keep in order #[allow(clippy::derive_hash_xor_eq)] #[derive(Debug, Ord, PartialOrd, Hash, Serialize, Deserialize, EnumSetType)] -pub enum PeerScope { - Global, - Local, +pub enum RoutingDomain { + LocalNetwork = 0, + PublicInternet = 1, } -pub type PeerScopeSet = EnumSet; +impl RoutingDomain { + pub const fn count() -> usize { + 2 + } + pub const fn all() -> [RoutingDomain; RoutingDomain::count()] { + // Routing domain here is listed in order of preference, keep in order + [RoutingDomain::LocalNetwork, RoutingDomain::PublicInternet] + } +} +pub type RoutingDomainSet = EnumSet; #[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)] pub enum Address { @@ -729,7 +785,6 @@ impl FromStr for SocketAddress { #[derive(Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct DialInfoFilter { - pub peer_scope_set: PeerScopeSet, pub protocol_type_set: ProtocolTypeSet, pub address_type_set: AddressTypeSet, } @@ -737,7 +792,6 @@ pub struct DialInfoFilter { impl Default for DialInfoFilter { fn default() -> Self { Self { - peer_scope_set: PeerScopeSet::all(), protocol_type_set: ProtocolTypeSet::all(), address_type_set: AddressTypeSet::all(), } @@ -747,28 +801,6 @@ impl Default for DialInfoFilter { impl DialInfoFilter { pub fn all() -> Self { Self { - peer_scope_set: PeerScopeSet::all(), - protocol_type_set: ProtocolTypeSet::all(), - address_type_set: AddressTypeSet::all(), - } - } - pub fn global() -> Self { - Self { - peer_scope_set: PeerScopeSet::only(PeerScope::Global), - protocol_type_set: ProtocolTypeSet::all(), - address_type_set: AddressTypeSet::all(), - } - } - pub fn local() -> Self { - Self { - peer_scope_set: PeerScopeSet::only(PeerScope::Local), - protocol_type_set: ProtocolTypeSet::all(), - address_type_set: AddressTypeSet::all(), - } - } - pub fn scoped(peer_scope: PeerScope) -> Self { - Self { - peer_scope_set: PeerScopeSet::only(peer_scope), protocol_type_set: ProtocolTypeSet::all(), address_type_set: AddressTypeSet::all(), } @@ -789,25 +821,19 @@ impl DialInfoFilter { self.address_type_set = address_set; self } - pub fn filtered(mut self, other_dif: DialInfoFilter) -> Self { - self.peer_scope_set &= other_dif.peer_scope_set; + pub fn filtered(mut self, other_dif: &DialInfoFilter) -> Self { self.protocol_type_set &= other_dif.protocol_type_set; self.address_type_set &= other_dif.address_type_set; self } pub fn is_dead(&self) -> bool { - self.peer_scope_set.is_empty() - || self.protocol_type_set.is_empty() - || self.address_type_set.is_empty() + self.protocol_type_set.is_empty() || self.address_type_set.is_empty() } } impl fmt::Debug for DialInfoFilter { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { let mut out = String::new(); - if self.peer_scope_set != PeerScopeSet::all() { - out += &format!("+{:?}", self.peer_scope_set); - } if self.protocol_type_set != ProtocolTypeSet::all() { out += &format!("+{:?}", self.protocol_type_set); } @@ -1126,49 +1152,15 @@ impl DialInfo { Self::WSS(di) => Some(format!("wss://{}", di.request)), } } - pub fn is_global(&self) -> bool { - self.socket_address().address().is_global() - } - pub fn is_local(&self) -> bool { - self.socket_address().address().is_local() - } pub fn is_valid(&self) -> bool { let socket_address = self.socket_address(); let address = socket_address.address(); let port = socket_address.port(); (address.is_global() || address.is_local()) && port > 0 } - pub fn peer_scope(&self) -> Option { - let addr = self.socket_address().address(); - if addr.is_global() { - return Some(PeerScope::Global); - } - if addr.is_local() { - return Some(PeerScope::Local); - } - None - } - pub fn matches_peer_scope(&self, scope: PeerScopeSet) -> bool { - if let Some(ps) = self.peer_scope() { - scope.contains(ps) - } else { - false - } - } - pub fn make_filter(&self, scoped: bool) -> DialInfoFilter { + pub fn make_filter(&self) -> DialInfoFilter { DialInfoFilter { - peer_scope_set: if scoped { - if self.is_global() { - PeerScopeSet::only(PeerScope::Global) - } else if self.is_local() { - PeerScopeSet::only(PeerScope::Local) - } else { - PeerScopeSet::empty() - } - } else { - PeerScopeSet::all() - }, protocol_type_set: ProtocolTypeSet::only(self.protocol_type()), address_type_set: AddressTypeSet::only(self.address_type()), } @@ -1355,9 +1347,6 @@ impl DialInfo { impl MatchesDialInfoFilter for DialInfo { fn matches_filter(&self, filter: &DialInfoFilter) -> bool { - if !self.matches_peer_scope(filter.peer_scope_set) { - return false; - } if !filter.protocol_type_set.contains(self.protocol_type()) { return false; } @@ -1452,8 +1441,8 @@ impl PeerInfo { #[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)] pub struct PeerAddress { - socket_address: SocketAddress, protocol_type: ProtocolType, + socket_address: SocketAddress, } impl PeerAddress { @@ -1488,36 +1477,13 @@ pub struct ConnectionDescriptor { } impl ConnectionDescriptor { - fn validate_peer_scope(remote: PeerAddress) -> Result<(), VeilidAPIError> { - // Verify address is in one of our peer scopes we care about - let addr = remote.socket_address.address(); - - // Allow WASM to have unresolved addresses, for bootstraps - cfg_if::cfg_if! { - if #[cfg(target_arch = "wasm32")] { - if addr.is_unspecified() { - return Ok(()); - } - } - } - if !addr.is_global() && !addr.is_local() { - return Err(VeilidAPIError::generic(format!( - "not a valid peer scope: {:?}", - addr - ))); - } - Ok(()) - } - pub fn new(remote: PeerAddress, local: SocketAddress) -> Result { - Self::validate_peer_scope(remote)?; Ok(Self { remote, local: Some(local), }) } pub fn new_no_local(remote: PeerAddress) -> Result { - Self::validate_peer_scope(remote)?; Ok(Self { remote, local: None, @@ -1538,36 +1504,15 @@ impl ConnectionDescriptor { pub fn address_type(&self) -> AddressType { self.remote.address_type() } - pub fn peer_scope(&self) -> PeerScope { - let addr = self.remote.socket_address.address(); - // Allow WASM to have unresolved addresses, for bootstraps - cfg_if::cfg_if! { - if #[cfg(target_arch = "wasm32")] { - if addr.is_unspecified() { - return PeerScope::Global; - } - } - } - if addr.is_global() { - return PeerScope::Global; - } - PeerScope::Local - } pub fn make_dial_info_filter(&self) -> DialInfoFilter { - DialInfoFilter::scoped(self.peer_scope()) + DialInfoFilter::all() .with_protocol_type(self.protocol_type()) .with_address_type(self.address_type()) } - pub fn matches_peer_scope(&self, scope: PeerScopeSet) -> bool { - scope.contains(self.peer_scope()) - } } impl MatchesDialInfoFilter for ConnectionDescriptor { fn matches_filter(&self, filter: &DialInfoFilter) -> bool { - if !self.matches_peer_scope(filter.peer_scope_set) { - return false; - } if !filter.protocol_type_set.contains(self.protocol_type()) { return false; } @@ -1654,7 +1599,6 @@ pub struct PeerStats { pub rpc_stats: RPCStats, // information about RPCs pub latency: Option, // latencies for communications with the peer pub transfer: TransferStatsDownUp, // Stats for communications with the peer - pub status: Option, // Last known node status } pub type ValueChangeCallback =