From 5527740f6ad8581848902c96c0decb5bb69298f6 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sun, 17 Apr 2022 13:28:39 -0400 Subject: [PATCH] refactor checkpoint --- veilid-core/proto/veilid.capnp | 4 +- .../network/public_dialinfo_discovery.rs | 2 +- veilid-core/src/network_manager.rs | 376 ++++++++++++------ veilid-core/src/receipt_manager.rs | 10 +- veilid-core/src/relay_manager.rs | 0 veilid-core/src/routing_table/find_nodes.rs | 19 +- veilid-core/src/routing_table/mod.rs | 22 +- veilid-core/src/routing_table/node_ref.rs | 11 + .../src/rpc_processor/coders/node_info.rs | 8 +- .../src/rpc_processor/coders/signal_info.rs | 32 +- veilid-core/src/rpc_processor/mod.rs | 51 ++- veilid-core/src/veilid_api/mod.rs | 53 ++- 12 files changed, 392 insertions(+), 196 deletions(-) delete mode 100644 veilid-core/src/relay_manager.rs diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index 9cec4d7f..d4309791 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -112,12 +112,12 @@ struct NodeDialInfo { struct SignalInfoHolePunch { receipt @0 :Data; # receipt to return with hole punch - nodeInfo @1 :NodeInfo; # node info of the signal sender for hole punch attempt + peerInfo @1 :PeerInfo; # peer info of the signal sender for hole punch attempt } struct SignalInfoReverseConnect { receipt @0 :Data; # receipt to return with reverse connect - nodeInfo @1 :NodeInfo; # node info of the signal sender for reverse connect attempt + peerInfo @1 :PeerInfo; # peer info of the signal sender for reverse connect attempt } # Private Routes diff --git a/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs b/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs index d473f4ef..94889a68 100644 --- a/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs +++ b/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs @@ -30,7 +30,7 @@ impl Network { let filter = DialInfoFilter::global() .with_protocol_type(protocol_type) .with_address_type(address_type); - let peers = routing_table.find_fast_nodes_filtered(&filter); + let peers = routing_table.find_fast_public_nodes_filtered(&filter); if peers.is_empty() { log_net!("no peers of type '{:?}'", filter); return None; diff --git a/veilid-core/src/network_manager.rs b/veilid-core/src/network_manager.rs index 49f2f31d..a287827d 100644 --- a/veilid-core/src/network_manager.rs +++ b/veilid-core/src/network_manager.rs @@ -5,7 +5,7 @@ use hashlink::LruCache; use intf::*; use receipt_manager::*; use routing_table::*; -use rpc_processor::RPCProcessor; +use rpc_processor::*; use xx::*; //////////////////////////////////////////////////////////////////////////////////////// @@ -69,11 +69,13 @@ struct ClientWhitelistEntry { } // Mechanism required to contact another node -enum InboundMethod { - Direct, // Contact the node directly - SignalReverse, // Request via signal the node connect back directly - SignalHolePunch, // Request via signal the node negotiate a hole punch - Relay, // Must use a third party relay to reach the node +enum ContactMethod { + Unreachable, // Node is not reachable by any means + Direct(DialInfo), // Contact the node directly + SignalReverse(NodeRef), // Request via signal the node connect back directly + SignalHolePunch(NodeRef), // Request via signal the node negotiate a hole punch + InboundRelay(NodeRef), // Must use an inbound relay to reach the node + OutboundRelay(NodeRef), // Must use outbound relay to reach the node } // The mutable state of the network manager @@ -430,11 +432,40 @@ impl NetworkManager { } // Process a received out-of-band receipt - pub async fn process_receipt>(&self, receipt_data: R) -> Result<(), String> { + pub async fn process_out_of_band_receipt>( + &self, + receipt_data: R, + descriptor: ConnectionDescriptor, + ) -> Result<(), String> { + let routing_table = self.routing_table(); let receipt_manager = self.receipt_manager(); + let ts = intf::get_timestamp(); + let receipt = Receipt::from_signed_data(receipt_data.as_ref()) .map_err(|_| "failed to parse signed receipt".to_owned())?; - receipt_manager.handle_receipt(receipt).await + + // Cache the receipt information in the routing table + let source_noderef = routing_table + .register_node_with_existing_connection(receipt.get_sender_id(), descriptor, ts) + .map_err(|e| format!("node id registration from receipt failed: {}", e))?; + + receipt_manager + .handle_receipt(source_noderef, receipt) + .await + } + + // Process a received in-band receipt + pub async fn process_in_band_receipt>( + &self, + receipt_data: R, + inbound_nr: NodeRef, + ) -> Result<(), String> { + let receipt_manager = self.receipt_manager(); + + let receipt = Receipt::from_signed_data(receipt_data.as_ref()) + .map_err(|_| "failed to parse signed receipt".to_owned())?; + + receipt_manager.handle_receipt(inbound_nr, receipt).await } // Builds an envelope for sending over the network @@ -527,124 +558,252 @@ impl NetworkManager { } // Figure out how to reach a node - // Node info here must be the filtered kind, with only - fn get_inbound_method(&self, node_info: &NodeInfo) -> Result { - // Get our network class - let network_class = self.get_network_class().unwrap_or(NetworkClass::Invalid); + fn get_contact_method(&self, node_ref: &NodeRef) -> Result { + // Get our network class and protocol config + let our_network_class = self.get_network_class().unwrap_or(NetworkClass::Invalid); + let our_protocol_config = self.get_protocol_config().unwrap(); - // If we don't have a network class yet (no public dial info or haven't finished detection) - // then we just need to try to send to the best direct dial info because we won't - // know how to use relays effectively yet - if matches!(network_class, NetworkClass::Invalid) { - return Ok(InboundMethod::Direct); + // See if this is a local node reachable directly + let local_node_info = node_ref.local_node_info(); + if let Some(local_direct_dial_info) = local_node_info + .first_filtered_dial_info(|di| our_protocol_config.outbound.filter_dial_info(di)) + { + return Ok(ContactMethod::Direct(local_direct_dial_info)); } - // Get the protocol of the best matching direct dial info - let protocol_type = node_info.dial_info_list.first().map(|d| d.protocol_type()); + // Get the best matching direct dial info if we have it + let target_node_info = node_ref.node_info(); + let opt_direct_dial_info = target_node_info + .first_filtered_dial_info(|di| our_protocol_config.outbound.filter_dial_info(di)); // Can the target node do inbound? - if node_info.network_class.inbound_capable() { + if target_node_info.network_class.inbound_capable() { // Do we need to signal before going inbound? - if node_info.network_class.inbound_requires_signal() { - // Can we receive a direct reverse connection? - if network_class.inbound_capable() && !network_class.inbound_requires_signal() { - Ok(InboundMethod::SignalReverse) - } - // Is this a hole-punch capable protocol? - else if protocol_type == Some(ProtocolType::UDP) { - Ok(InboundMethod::SignalHolePunch) - } - // Otherwise we have to relay - else { - Ok(InboundMethod::Relay) + if target_node_info.network_class.inbound_requires_signal() { + // Get the target's inbound relay, it must have one or it is not reachable + if let Some(target_rpi) = target_node_info.relay_peer_info { + // Can we reach the inbound relay? + if target_rpi + .node_info + .first_filtered_dial_info(|di| { + our_protocol_config.outbound.filter_dial_info(di) + }) + .is_some() + { + let target_inbound_relay_nr = + self.routing_table().register_node_with_node_info( + target_rpi.node_id.key, + target_rpi.node_info, + )?; + + // Can we receive anything inbound ever? + if our_network_class.inbound_capable() { + // Can we receive a direct reverse connection? + if !our_network_class.inbound_requires_signal() { + return Ok(ContactMethod::SignalReverse(target_inbound_relay_nr)); + } + // Can we hole-punch? + else if our_protocol_config.inbound.udp + && target_node_info.outbound_protocols.udp + { + return Ok(ContactMethod::SignalHolePunch(target_inbound_relay_nr)); + } + // Otherwise we have to inbound relay + } + + return Ok(ContactMethod::InboundRelay(target_inbound_relay_nr)); + } } } - // Can go direct + // Go direct without signaling else { - Ok(InboundMethod::Direct) + // If we have direct dial info we can use, do it + if let Some(ddi) = opt_direct_dial_info { + return Ok(ContactMethod::Direct(ddi)); + } } - // If the other node is not inbound capable at all, it requires a relay } else { - Ok(InboundMethod::Relay) + // If the other node is not inbound capable at all, it is using a full relay + if let Some(target_rpi) = target_node_info.relay_peer_info { + // Can we reach the full relay? + if target_rpi + .node_info + .first_filtered_dial_info(|di| { + our_protocol_config.outbound.filter_dial_info(di) + }) + .is_some() + { + let target_inbound_relay_nr = + self.routing_table().register_node_with_node_info( + target_rpi.node_id.key, + target_rpi.node_info, + )?; + return Ok(ContactMethod::InboundRelay(target_inbound_relay_nr)); + } + } } + // If we can't reach the node by other means, try our outbound relay if we have one + if let Some(relay_node) = self.relay_node() { + return Ok(ContactMethod::OutboundRelay(relay_node)); + } + // Otherwise, we can't reach this node + Ok(ContactMethod::Unreachable) } // Send a reverse connection signal and wait for the return receipt over it // Then send the data across the new connection pub async fn do_reverse_connect( &self, - best_node_info: &NodeInfo, + relay_nr: NodeRef, + target_nr: NodeRef, data: Vec, ) -> Result<(), String> { - - // Get relay to signal from - let relay_nr = if let Some(rpi) = best_node_info.relay_peer_info { - // Get the noderef for this inbound relay - self.routing_table().register_node_with_node_info(rpi.node_id.key, rpi.node_info)?; - } else { - // If we don't have a relay dial info that matches our protocol configuration - // then we can't send to this node! - return Err("Can't send to this relay".to_owned()) - } - - - // Get the receipt timeout - let receipt_time = ms_to_us(self.config.get().network.reverse_connection_receipt_time_ms); - // Build a return receipt for the signal - let (rcpt_data, eventual_value) = self - .generate_single_shot_receipt(receipt_time, []) - .map_err(map_error_string!())?; + let receipt_timeout = + ms_to_us(self.config.get().network.reverse_connection_receipt_time_ms); + let (receipt, eventual_value) = self + .generate_single_shot_receipt(receipt_timeout, []) + .map_err(map_to_string)?; + + // Get our peer info + let peer_info = self.routing_table().get_own_peer_info(); // Issue the signal let rpc = self.rpc_processor(); - rpc.rpc_call_signal(dest, ) + rpc.rpc_call_signal( + Destination::Relay(relay_nr.clone(), target_nr.node_id()), + None, + SignalInfo::ReverseConnect { receipt, peer_info }, + ) + .await + .map_err(logthru_net!("failed to send signal to {:?}", relay_nr)) + .map_err(map_to_string)?; // Wait for the return receipt - match eventual_value.await { - ReceiptEvent::Returned => (), + let inbound_nr = match eventual_value.await { + ReceiptEvent::Returned(inbound_nr) => inbound_nr, ReceiptEvent::Expired => { - return Err("receipt was dropped before expiration".to_owned()); + return Err(format!( + "reverse connect receipt expired from {:?}", + target_nr + )); } ReceiptEvent::Cancelled => { - return Err("receipt was dropped before expiration".to_owned()); + return Err(format!( + "reverse connect receipt cancelled from {:?}", + target_nr + )); } }; + // We expect the inbound noderef to be the same as the target noderef + // if they aren't the same, we should error on this and figure out what then hell is up + if target_nr != inbound_nr { + error!("unexpected noderef mismatch on reverse connect"); + } + // And now use the existing connection to send over - if let Some(descriptor) = node_ref.last_connection() { + if let Some(descriptor) = inbound_nr.last_connection() { match self .net() .send_data_to_existing_connection(descriptor, data) .await .map_err(logthru_net!())? { - None => { - return Ok(()); - } - Some(d) => d, + None => Ok(()), + Some(_) => Err("unable to send over reverse connection".to_owned()), } + } else { + Err("no reverse connection available".to_owned()) } - Ok(()) } // Send a hole punch signal and do a negotiating ping and wait for the return receipt // Then send the data across the new connection - pub async fn do_hole_punch(&self, best_node_info: &NodeInfo, data: Vec) -> Result<(), String> { - if let Some(relay_dial_info) = node_info.relay_dial_info_list.first() { - self.net() - .do_hole_punch(relay_dial_info.clone(), data) - .await - .map_err(logthru_net!()) + pub async fn do_hole_punch( + &self, + relay_nr: NodeRef, + target_nr: NodeRef, + data: Vec, + ) -> Result<(), String> { + // Build a return receipt for the signal + let receipt_timeout = + ms_to_us(self.config.get().network.reverse_connection_receipt_time_ms); + let (receipt, eventual_value) = self + .generate_single_shot_receipt(receipt_timeout, []) + .map_err(map_to_string)?; + + // Get our peer info + let peer_info = self.routing_table().get_own_peer_info(); + + // Get the udp direct dialinfo for the hole punch + let hole_punch_dial_info = if let Some(hpdi) = target_nr + .node_info() + .first_filtered_dial_info(|di| matches!(di.protocol_type(), ProtocolType::UDP)) + { + hpdi } else { - // If we don't have a relay dial info that matches our protocol configuration - // then we can't send to this node! - Err("Can't send to this node yet".to_owned()) + return Err("No hole punch capable dialinfo found for node".to_owned()); + }; + + // Do our half of the hole punch by sending an empty packet + // Both sides will do this and then the receipt will get sent over the punched hole + self.net() + .send_data_to_dial_info(hole_punch_dial_info, Vec::new()) + .await?; + + // Issue the signal + let rpc = self.rpc_processor(); + rpc.rpc_call_signal( + Destination::Relay(relay_nr.clone(), target_nr.node_id()), + None, + SignalInfo::HolePunch { receipt, peer_info }, + ) + .await + .map_err(logthru_net!("failed to send signal to {:?}", relay_nr)) + .map_err(map_to_string)?; + + // Wait for the return receipt + let inbound_nr = match eventual_value.await { + ReceiptEvent::Returned(inbound_nr) => inbound_nr, + ReceiptEvent::Expired => { + return Err(format!( + "reverse connect receipt expired from {:?}", + target_nr + )); + } + ReceiptEvent::Cancelled => { + return Err(format!( + "reverse connect receipt cancelled from {:?}", + target_nr + )); + } + }; + + // We expect the inbound noderef to be the same as the target noderef + // if they aren't the same, we should error on this and figure out what then hell is up + if target_nr != inbound_nr { + error!("unexpected noderef mismatch on reverse connect"); + } + + // And now use the existing connection to send over + if let Some(descriptor) = inbound_nr.last_connection() { + match self + .net() + .send_data_to_existing_connection(descriptor, data) + .await + .map_err(logthru_net!())? + { + None => Ok(()), + Some(_) => Err("unable to send over reverse connection".to_owned()), + } + } else { + Err("no reverse connection available".to_owned()) } } // Send raw data to a node - // + // // We may not have dial info for a node, but have an existing connection for it // because an inbound connection happened first, and no FindNodeQ has happened to that // node yet to discover its dial info. The existing connection should be tried first @@ -652,7 +811,11 @@ impl NetworkManager { // // Sending to a node requires determining a NetworkClass compatible mechanism // - pub fn send_data(&self, node_ref: NodeRef, data: Vec) -> SystemPinBoxFuture> { + pub fn send_data( + &self, + node_ref: NodeRef, + data: Vec, + ) -> SystemPinBoxFuture> { let this = self.clone(); Box::pin(async move { // First try to send data to the last socket we've seen this peer on @@ -673,53 +836,22 @@ impl NetworkManager { }; // If we don't have last_connection, try to reach out to the peer via its dial info - let best_node_info = match node_ref - .best_node_info() { - Some(ni) => ni, - None => { - // If neither this node nor its relays would never ever be - // reachable by any of our protocols - // then we need to go through the outbound relay - if let Some(relay_node) = this.relay_node() { - // We have an outbound relay, lets use it - return this.send_data(relay_node, data).await; - } - else { - // We have no way to reach the node nor an outbound relay to use - return Err("Can't reach this node".to_owned()); - } - } - }; - - // If we aren't using an outbound relay to reach this node, what inbound method do we use? - match this.get_inbound_method(&best_node_info)? { - InboundMethod::Direct => { - if let Some(dial_info) = best_node_info.dial_info_list.first() { - this.net() - .send_data_to_dial_info(dial_info.clone(), data) - .await - .map_err(logthru_net!()) - } else { - // If we don't have a direct dial info that matches our protocol configuration - // then we can't send to this node! - Err("Can't send to this node yet".to_owned()) - } + match this.get_contact_method(&node_ref).map_err(logthru_net!())? { + ContactMethod::OutboundRelay(relay_nr) | ContactMethod::InboundRelay(relay_nr) => { + this.send_data(relay_nr, data).await } - InboundMethod::SignalReverse => this.do_reverse_connect(&best_node_info, data).await, - InboundMethod::SignalHolePunch => this.do_hole_punch(&best_node_info, data).await, - InboundMethod::Relay => { - if let Some(rpi) = best_node_info.relay_peer_info { - // Get the noderef for this inbound relay - let inbound_relay_noderef = this.routing_table().register_node_with_node_info(rpi.node_id.key, rpi.node_info)?; - // Send to the inbound relay - this.send_data(inbound_relay_noderef, data).await - } else { - // If we don't have a relay dial info that matches our protocol configuration - // then we can't send to this node! - Err("Can't send to this relay".to_owned()) - } + ContactMethod::Direct(dial_info) => { + this.net().send_data_to_dial_info(dial_info, data).await } + ContactMethod::SignalReverse(relay_nr) => { + this.do_reverse_connect(relay_nr, node_ref, data).await + } + ContactMethod::SignalHolePunch(relay_nr) => { + this.do_hole_punch(relay_nr, node_ref, data).await + } + ContactMethod::Unreachable => Err("Can't send to this relay".to_owned()), } + .map_err(logthru_net!()) }) } @@ -742,7 +874,7 @@ impl NetworkManager { // Is this an out-of-band receipt instead of an envelope? if data[0..4] == *RECEIPT_MAGIC { - self.process_receipt(data).await?; + self.process_out_of_band_receipt(data, descriptor).await?; return Ok(true); } diff --git a/veilid-core/src/receipt_manager.rs b/veilid-core/src/receipt_manager.rs index f106decb..1bb635a1 100644 --- a/veilid-core/src/receipt_manager.rs +++ b/veilid-core/src/receipt_manager.rs @@ -3,11 +3,12 @@ use core::fmt; use dht::receipt::*; use futures_util::stream::{FuturesUnordered, StreamExt}; use network_manager::*; +use routing_table::*; use xx::*; -#[derive(Clone, Debug, Copy, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub enum ReceiptEvent { - Returned, + Returned(NodeRef), Expired, Cancelled, } @@ -380,7 +381,7 @@ impl ReceiptManager { Ok(()) } - pub async fn handle_receipt(&self, receipt: Receipt) -> Result<(), String> { + pub async fn handle_receipt(&self, node_ref: NodeRef, receipt: Receipt) -> Result<(), String> { // Increment return count let callback_future = { // Look up the receipt record from the nonce @@ -394,7 +395,8 @@ impl ReceiptManager { // Generate the callback future let mut record_mut = record.lock(); record_mut.returns_so_far += 1; - let callback_future = Self::perform_callback(ReceiptEvent::Returned, &mut record_mut); + let callback_future = + Self::perform_callback(ReceiptEvent::Returned(node_ref), &mut record_mut); // Remove the record if we're done if record_mut.returns_so_far == record_mut.expected_returns { diff --git a/veilid-core/src/relay_manager.rs b/veilid-core/src/relay_manager.rs deleted file mode 100644 index e69de29b..00000000 diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index 6ddbce1e..6340ae34 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -10,16 +10,24 @@ pub type FilterType = Box)) -> bool>; impl RoutingTable { // Retrieve the fastest nodes in the routing table with a particular kind of protocol and address type // Returns noderefs are are scoped to that address type only - pub fn find_fast_nodes_filtered(&self, dial_info_filter: &DialInfoFilter) -> Vec { + pub fn find_fast_public_nodes_filtered( + &self, + dial_info_filter: &DialInfoFilter, + ) -> Vec { let dial_info_filter1 = dial_info_filter.clone(); self.find_fastest_nodes( // filter Some(Box::new( move |params: &(&DHTKey, Option<&mut BucketEntry>)| { - params - .1 - .as_ref() - .unwrap() + let entry = params.1.as_ref().unwrap(); + + // skip nodes on our local network here + if entry.local_node_info().has_dial_info() { + return false; + } + + // does it have matching public dial info? + entry .node_info() .first_filtered_dial_info(|di| di.matches_filter(&dial_info_filter1)) .is_some() @@ -30,6 +38,7 @@ impl RoutingTable { ) } + // Get our own node's peer info (public node info) so we can share it with other nodes pub fn get_own_peer_info(&self) -> PeerInfo { let netman = self.network_manager(); let enable_local_peer_scope = netman.config().get().network.enable_local_peer_scope; diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index f588602e..0b35c51f 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -243,10 +243,20 @@ impl RoutingTable { ) { let timestamp = get_timestamp(); let enable_local_peer_scope = { - let c = self.network_manager().config().get(); + let config = self.network_manager().config(); + let c = config.get(); c.network.enable_local_peer_scope }; + if !enable_local_peer_scope && dial_info.is_local() { + error!("shouldn't be registering local addresses as public"); + return; + } + if !dial_info.is_valid() { + error!("shouldn't be registering invalid addresses"); + return; + } + let mut inner = self.inner.lock(); inner.public_dial_info_details.push(DialInfoDetail { @@ -276,12 +286,12 @@ impl RoutingTable { } pub fn register_interface_dial_info(&self, dial_info: DialInfo, origin: DialInfoOrigin) { - let timestamp = get_timestamp(); - let enable_local_peer_scope = { - let c = self.network_manager().config().get(); - c.network.enable_local_peer_scope - }; + if !dial_info.is_valid() { + error!("shouldn't be registering invalid interface addresses"); + return; + } + let timestamp = get_timestamp(); let mut inner = self.inner.lock(); inner.interface_dial_info_details.push(DialInfoDetail { diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 8f92aebd..ef86bd0b 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -45,6 +45,9 @@ impl NodeRef { pub fn last_connection(&self) -> Option { self.operate(|e| e.last_connection()) } + pub fn has_any_dial_info(&self) -> bool { + self.operate(|e| e.node_info().has_any_dial_info() || e.local_node_info().has_dial_info()) + } } impl Clone for NodeRef { @@ -59,6 +62,14 @@ impl Clone for NodeRef { } } +impl PartialEq for NodeRef { + fn eq(&self, other: &Self) -> bool { + self.node_id == other.node_id + } +} + +impl Eq for NodeRef {} + impl fmt::Debug for NodeRef { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.node_id.encode()) diff --git a/veilid-core/src/rpc_processor/coders/node_info.rs b/veilid-core/src/rpc_processor/coders/node_info.rs index 686cc1d2..eae6e3c0 100644 --- a/veilid-core/src/rpc_processor/coders/node_info.rs +++ b/veilid-core/src/rpc_processor/coders/node_info.rs @@ -23,9 +23,9 @@ pub fn encode_node_info( encode_dial_info(&node_info.dial_info_list[idx], &mut di_builder)?; } - if let Some(rpi) = node_info.relay_peer_info { + if let Some(rpi) = &node_info.relay_peer_info { let mut rpi_builder = builder.reborrow().init_relay_peer_info(); - encode_peer_info(&rpi, &mut rpi_builder)?; + encode_peer_info(rpi, &mut rpi_builder)?; } Ok(()) @@ -46,7 +46,7 @@ pub fn decode_node_info( &reader .reborrow() .get_outbound_protocols() - .map_err(map_error_capnp_notinschema!())?, + .map_err(map_error_capnp_error!())?, )?; let dil_reader = reader @@ -69,7 +69,7 @@ pub fn decode_node_info( &reader .reborrow() .get_relay_peer_info() - .map_err(map_error_capnp_notinschema!())?, + .map_err(map_error_capnp_error!())?, false, )?)) } else { diff --git a/veilid-core/src/rpc_processor/coders/signal_info.rs b/veilid-core/src/rpc_processor/coders/signal_info.rs index 497e52fb..c87454b9 100644 --- a/veilid-core/src/rpc_processor/coders/signal_info.rs +++ b/veilid-core/src/rpc_processor/coders/signal_info.rs @@ -6,8 +6,8 @@ pub fn encode_signal_info( builder: &mut veilid_capnp::operation_signal::Builder, ) -> Result<(), RPCError> { match signal_info { - SignalInfo::HolePunch { receipt, node_info } => { - let mut hp_builder = builder.init_hole_punch(); + SignalInfo::HolePunch { receipt, peer_info } => { + let mut hp_builder = builder.reborrow().init_hole_punch(); let rcpt_builder = hp_builder .reborrow() @@ -15,11 +15,11 @@ pub fn encode_signal_info( "invalid receipt length in hole punch signal info" ))?); rcpt_builder.copy_from_slice(receipt.as_slice()); - let mut ni_builder = hp_builder.init_node_info(); - encode_node_info(&node_info, &mut ni_builder)?; + let mut pi_builder = hp_builder.init_peer_info(); + encode_peer_info(peer_info, &mut pi_builder)?; } - SignalInfo::ReverseConnect { receipt, node_info } => { - let mut hp_builder = builder.init_reverse_connect(); + SignalInfo::ReverseConnect { receipt, peer_info } => { + let mut hp_builder = builder.reborrow().init_reverse_connect(); let rcpt_builder = hp_builder .reborrow() @@ -27,8 +27,8 @@ pub fn encode_signal_info( "invalid receipt length in reverse connect signal info" ))?); rcpt_builder.copy_from_slice(receipt.as_slice()); - let mut ni_builder = hp_builder.init_node_info(); - encode_node_info(&node_info, &mut ni_builder)?; + let mut pi_builder = hp_builder.init_peer_info(); + encode_peer_info(peer_info, &mut pi_builder)?; } } @@ -55,12 +55,12 @@ pub fn decode_signal_info( "invalid receipt in hole punch signal info" ))? .to_vec(); - let ni_reader = r.get_node_info().map_err(map_error_protocol!( - "invalid node info in hole punch signal info" + let pi_reader = r.get_peer_info().map_err(map_error_protocol!( + "invalid peer info in hole punch signal info" ))?; - let node_info = decode_node_info(&ni_reader, true)?; + let peer_info = decode_peer_info(&pi_reader, true)?; - SignalInfo::HolePunch { receipt, node_info } + SignalInfo::HolePunch { receipt, peer_info } } veilid_capnp::operation_signal::ReverseConnect(r) => { // Extract reverse connect reader @@ -74,12 +74,12 @@ pub fn decode_signal_info( "invalid receipt in reverse connect signal info" ))? .to_vec(); - let ni_reader = r.get_node_info().map_err(map_error_protocol!( - "invalid node info in reverse connect signal info" + let pi_reader = r.get_peer_info().map_err(map_error_protocol!( + "invalid peer info in reverse connect signal info" ))?; - let node_info = decode_node_info(&ni_reader, true)?; + let peer_info = decode_peer_info(&pi_reader, true)?; - SignalInfo::ReverseConnect { receipt, node_info } + SignalInfo::ReverseConnect { receipt, peer_info } } }, ) diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 9519beb0..6808c989 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -23,10 +23,9 @@ type OperationId = u64; #[derive(Debug, Clone)] pub enum Destination { - Direct(NodeRef), // Can only be sent directly - Normal(NodeRef), // Can be sent via relays as well as directly - Relay(NodeRef, DHTKey), // Can only be sent via a relay - PrivateRoute(PrivateRoute), // Must be encapsulated in a private route + Direct(NodeRef), // Send to node + Relay(NodeRef, DHTKey), // Send to node for relay purposes + PrivateRoute(PrivateRoute), // Send to private route } #[derive(Debug, Clone)] @@ -215,22 +214,25 @@ impl RPCProcessor { } fn filter_peer_scope(&self, peer_info: &PeerInfo) -> bool { + // if local peer scope is enabled, then don't reject any peer info + if self.enable_local_peer_scope { + return true; + } + // reject attempts to include non-public addresses in results - if self.default_peer_scope == PeerScope::Global { - for di in &peer_info.node_info.dial_info_list { + for di in &peer_info.node_info.dial_info_list { + if !di.is_global() { + // non-public address causes rejection + return false; + } + } + if let Some(rpi) = &peer_info.node_info.relay_peer_info { + for di in &rpi.node_info.dial_info_list { if !di.is_global() { // non-public address causes rejection return false; } } - if let Some(rpi) = peer_info.node_info.relay_peer_info { - for di in &rpi.node_info.dial_info_list { - if !di.is_global() { - // non-public address causes rejection - return false; - } - } - } } true } @@ -278,7 +280,7 @@ impl RPCProcessor { if let Some(nr) = routing_table.lookup_node_ref(node_id) { // ensure we have some dial info for the entry already, // if not, we should do the find_node anyway - if !nr.has_any_dial_info() { + if nr.has_any_dial_info() { return Ok(nr); } } @@ -434,13 +436,13 @@ impl RPCProcessor { // To where are we sending the request match &dest { - Destination::Direct(node_ref) | Destination::Normal(node_ref) => { + Destination::Direct(node_ref) | Destination::Relay(node_ref, _) => { // Send to a node without a private route // -------------------------------------- - // Get the actual destination node id, accounting for outbound relaying - let (node_ref, node_id) = if matches!(dest, Destination::Normal(_)) { - self.get_direct_destination(node_ref.clone())? + // Get the actual destination node id accounting for relays + let (node_ref, node_id) = if let Destination::Relay(_, dht_key) = dest { + (node_ref.clone(), dht_key) } else { let node_id = node_ref.node_id(); (node_ref.clone(), node_id) @@ -487,7 +489,7 @@ impl RPCProcessor { let mut pr_msg_builder = ::capnp::message::Builder::new_default(); let mut pr_builder = pr_msg_builder.init_root::(); - encode_private_route(&private_route, &mut pr_builder)?; + encode_private_route(private_route, &mut pr_builder)?; let pr_reader = pr_builder.into_reader(); // Reply with 'route' operation @@ -899,7 +901,7 @@ impl RPCProcessor { if redirect { let routing_table = self.routing_table(); let filter = dial_info.make_filter(true); - let peers = routing_table.find_fast_nodes_filtered(&filter); + let peers = routing_table.find_fast_public_nodes_filtered(&filter); if peers.is_empty() { return Err(rpc_error_internal(format!( "no peers matching filter '{:?}'", @@ -1110,7 +1112,7 @@ impl RPCProcessor { // Handle it let network_manager = self.network_manager(); network_manager - .process_receipt(rcpt_data) + .process_in_band_receipt(rcpt_data, rpcreader.header.peer_noderef) .await .map_err(map_error_string!()) } @@ -1497,7 +1499,7 @@ impl RPCProcessor { // Wait for receipt match eventual_value.await { - ReceiptEvent::Returned => Ok(true), + ReceiptEvent::Returned(_) => Ok(true), ReceiptEvent::Expired => Ok(false), ReceiptEvent::Cancelled => { Err(rpc_error_internal("receipt was dropped before expiration")) @@ -1588,12 +1590,9 @@ impl RPCProcessor { pub async fn rpc_call_signal( &self, dest: Destination, - relay_dial_info: DialInfo, safety_route: Option<&SafetyRouteSpec>, signal_info: SignalInfo, ) -> Result<(), RPCError> { - let network_manager = self.network_manager(); - // let sig_msg = { let mut sig_msg = ::capnp::message::Builder::new_default(); let mut question = sig_msg.init_root::(); diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 0b49aa6d..fb06fa90 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -332,12 +332,6 @@ pub struct NodeInfo { pub relay_peer_info: Option>, } -#[derive(Clone, Debug, Default, Serialize, Deserialize)] -pub struct LocalNodeInfo { - pub outbound_protocols: ProtocolSet, - pub dial_info_list: Vec, -} - impl NodeInfo { pub fn first_filtered_dial_info(&self, filter: F) -> Option where @@ -369,6 +363,7 @@ impl NodeInfo { !self.dial_info_list.is_empty() || !self .relay_peer_info + .as_ref() .map(|rpi| rpi.node_info.has_direct_dial_info()) .unwrap_or_default() } @@ -378,6 +373,44 @@ impl NodeInfo { } } +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct LocalNodeInfo { + pub outbound_protocols: ProtocolSet, + pub dial_info_list: Vec, +} + +impl LocalNodeInfo { + pub fn first_filtered_dial_info(&self, filter: F) -> Option + where + F: Fn(&DialInfo) -> bool, + { + for di in &self.dial_info_list { + if filter(di) { + return Some(di.clone()); + } + } + None + } + + pub fn all_filtered_dial_info(&self, filter: F) -> Vec + where + F: Fn(&DialInfo) -> bool, + { + let mut dial_info_list = Vec::new(); + + for di in &self.dial_info_list { + if filter(di) { + dial_info_list.push(di.clone()); + } + } + dial_info_list + } + + pub fn has_dial_info(&self) -> bool { + !self.dial_info_list.is_empty() + } +} + #[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)] // The derived ordering here is the order of preference, lower is preferred for connections // Must match DialInfo order @@ -397,7 +430,7 @@ pub struct ProtocolSet { } impl ProtocolSet { - pub fn is_protocol_type_enabled(&self, protocol_type: ProtocolType) -> bool { + pub fn contains(&self, protocol_type: ProtocolType) -> bool { match protocol_type { ProtocolType::UDP => self.udp, ProtocolType::TCP => self.tcp, @@ -406,7 +439,7 @@ impl ProtocolSet { } } pub fn filter_dial_info(&self, di: &DialInfo) -> bool { - self.is_protocol_type_enabled(di.protocol_type()) + self.contains(di.protocol_type()) } } @@ -1093,12 +1126,12 @@ pub enum SignalInfo { HolePunch { // UDP Hole Punch Request receipt: Vec, // Receipt to be returned after the hole punch - node_info: NodeInfo, // Sender's node info + peer_info: PeerInfo, // Sender's peer info }, ReverseConnect { // Reverse Connection Request receipt: Vec, // Receipt to be returned by the reverse connection - node_info: NodeInfo, // Sender's node info + peer_info: PeerInfo, // Sender's peer info }, // XXX: WebRTC // XXX: App-level signalling