diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index 00916440..40be60e6 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -175,8 +175,8 @@ struct ValueData { # Operations ############################## -struct OperationInfoQ { - nodeStatus @0 :NodeStatus; # node status update about the infoq sender +struct OperationStatusQ { + nodeStatus @0 :NodeStatus; # node status update about the statusq sender } enum NetworkClass { @@ -221,13 +221,18 @@ struct NodeInfo { relayPeerInfo @3 :PeerInfo; # (optional) relay peer info for this node } +struct SignedNodeInfo { + nodeInfo @0 :NodeInfo; # node info + signature @1 :Signature; # signature +} + struct SenderInfo { socketAddress @0 :SocketAddress; # socket address was available for peer } -struct OperationInfoA { +struct OperationStatusA { nodeStatus @0 :NodeStatus; # returned node status - senderInfo @1 :SenderInfo; # info about InfoQ sender from the perspective of the replier + senderInfo @1 :SenderInfo; # info about StatusQ sender from the perspective of the replier } struct OperationValidateDialInfo { @@ -247,7 +252,7 @@ struct OperationFindNodeQ { struct PeerInfo { nodeId @0 :NodeID; # node id for 'closer peer' - nodeInfo @1 :NodeInfo; # node info for 'closer peer' + signedNodeInfo @1 :SignedNodeInfo; # signed node info for 'closer peer' } struct OperationFindNodeA { @@ -265,6 +270,10 @@ struct OperationRoute { operation @1 :RoutedOperation; # The operation to be routed } +struct OperationNodeInfoUpdate { + signedNodeInfo @0 :SignedNodeInfo; # Our signed node info +} + struct OperationGetValueQ { key @0 :ValueKey; # key for value to get } @@ -405,42 +414,43 @@ struct Operation { respondTo :union { none @1 :Void; # no response is desired sender @2 :Void; # sender without node info - senderWithInfo @3 :NodeInfo; # some envelope-sender node info to be used for reply + senderWithInfo @3 :SignedNodeInfo; # some envelope-sender signed node info to be used for reply privateRoute @4 :PrivateRoute; # embedded private route to be used for reply } detail :union { # Direct operations - infoQ @5 :OperationInfoQ; - infoA @6 :OperationInfoA; + statusQ @5 :OperationStatusQ; + statusA @6 :OperationStatusA; validateDialInfo @7 :OperationValidateDialInfo; findNodeQ @8 :OperationFindNodeQ; findNodeA @9 :OperationFindNodeA; - route @10 :OperationRoute; + route @10 :OperationRoute; + nodeInfoUpdate @11 :OperationNodeInfoUpdate; # Routable operations - getValueQ @11 :OperationGetValueQ; - getValueA @12 :OperationGetValueA; - setValueQ @13 :OperationSetValueQ; - setValueA @14 :OperationSetValueA; - watchValueQ @15 :OperationWatchValueQ; - watchValueA @16 :OperationWatchValueA; - valueChanged @17 :OperationValueChanged; + getValueQ @12 :OperationGetValueQ; + getValueA @13 :OperationGetValueA; + setValueQ @14 :OperationSetValueQ; + setValueA @15 :OperationSetValueA; + watchValueQ @16 :OperationWatchValueQ; + watchValueA @17 :OperationWatchValueA; + valueChanged @18 :OperationValueChanged; - supplyBlockQ @18 :OperationSupplyBlockQ; - supplyBlockA @19 :OperationSupplyBlockA; - findBlockQ @20 :OperationFindBlockQ; - findBlockA @21 :OperationFindBlockA; + supplyBlockQ @19 :OperationSupplyBlockQ; + supplyBlockA @20 :OperationSupplyBlockA; + findBlockQ @21 :OperationFindBlockQ; + findBlockA @22 :OperationFindBlockA; - signal @22 :OperationSignal; - returnReceipt @23 :OperationReturnReceipt; + signal @23 :OperationSignal; + returnReceipt @24 :OperationReturnReceipt; # Tunnel operations - startTunnelQ @24 :OperationStartTunnelQ; - startTunnelA @25 :OperationStartTunnelA; - completeTunnelQ @26 :OperationCompleteTunnelQ; - completeTunnelA @27 :OperationCompleteTunnelA; - cancelTunnelQ @28 :OperationCancelTunnelQ; - cancelTunnelA @29 :OperationCancelTunnelA; + startTunnelQ @25 :OperationStartTunnelQ; + startTunnelA @26 :OperationStartTunnelA; + completeTunnelQ @27 :OperationCompleteTunnelQ; + completeTunnelA @28 :OperationCompleteTunnelA; + cancelTunnelQ @29 :OperationCancelTunnelQ; + cancelTunnelA @30 :OperationCancelTunnelA; } } diff --git a/veilid-core/src/intf/native/network/network_class_discovery.rs b/veilid-core/src/intf/native/network/network_class_discovery.rs index 963d220e..4a68eb6e 100644 --- a/veilid-core/src/intf/native/network/network_class_discovery.rs +++ b/veilid-core/src/intf/native/network/network_class_discovery.rs @@ -60,13 +60,13 @@ impl DiscoveryContext { // Ask for a public address check from a particular noderef async fn request_public_address(&self, node_ref: NodeRef) -> Option { let rpc = self.routing_table.rpc_processor(); - rpc.rpc_call_info(node_ref.clone()) + rpc.rpc_call_status(node_ref.clone()) .await .map_err(logthru_net!( - "failed to get info answer from {:?}", + "failed to get status answer from {:?}", node_ref )) - .map(|info_answer| info_answer.sender_info.socket_address) + .map(|sa| sa.sender_info.socket_address) .unwrap_or(None) } diff --git a/veilid-core/src/network_manager.rs b/veilid-core/src/network_manager.rs index 4c7b9e16..b278272b 100644 --- a/veilid-core/src/network_manager.rs +++ b/veilid-core/src/network_manager.rs @@ -365,11 +365,14 @@ impl NetworkManager { pub fn generate_node_status(&self) -> NodeStatus { let peer_info = self.routing_table().get_own_peer_info(); - let will_route = peer_info.node_info.can_inbound_relay(); // xxx: eventually this may have more criteria added - let will_tunnel = peer_info.node_info.can_inbound_relay(); // xxx: we may want to restrict by battery life and network bandwidth at some point - let will_signal = peer_info.node_info.can_signal(); - let will_relay = peer_info.node_info.can_inbound_relay(); - let will_validate_dial_info = peer_info.node_info.can_validate_dial_info(); + let will_route = peer_info.signed_node_info.node_info.can_inbound_relay(); // xxx: eventually this may have more criteria added + let will_tunnel = peer_info.signed_node_info.node_info.can_inbound_relay(); // xxx: we may want to restrict by battery life and network bandwidth at some point + let will_signal = peer_info.signed_node_info.node_info.can_signal(); + let will_relay = peer_info.signed_node_info.node_info.can_inbound_relay(); + let will_validate_dial_info = peer_info + .signed_node_info + .node_info + .can_validate_dial_info(); NodeStatus { will_route, @@ -483,8 +486,10 @@ impl NetworkManager { let rpc = self.rpc_processor(); // Add the peer info to our routing table - let peer_nr = routing_table - .register_node_with_node_info(peer_info.node_id.key, peer_info.node_info)?; + let peer_nr = routing_table.register_node_with_signed_node_info( + peer_info.node_id.key, + peer_info.signed_node_info, + )?; // Make a reverse connection to the peer and send the receipt to it rpc.rpc_call_return_receipt(Destination::Direct(peer_nr), None, receipt) @@ -495,8 +500,10 @@ impl NetworkManager { let routing_table = self.routing_table(); // Add the peer info to our routing table - let mut peer_nr = routing_table - .register_node_with_node_info(peer_info.node_id.key, peer_info.node_info)?; + let mut peer_nr = routing_table.register_node_with_signed_node_info( + peer_info.node_id.key, + peer_info.signed_node_info, + )?; // Get the udp direct dialinfo for the hole punch peer_nr.filter_protocols(ProtocolSet::only(ProtocolType::UDP)); @@ -665,8 +672,9 @@ impl NetworkManager { // Can we receive anything inbound ever? if matches!(our_network_class, NetworkClass::InboundCapable) { // Get the best match dial info for an reverse inbound connection - let reverse_dif = DialInfoFilter::global() - .with_protocol_set(target_node_ref.outbound_protocols()); + let reverse_dif = DialInfoFilter::global().with_protocol_set( + target_node_ref.outbound_protocols().unwrap_or_default(), + ); if let Some(reverse_did) = routing_table.first_filtered_dial_info_detail( Some(RoutingDomain::PublicInternet), &reverse_dif, @@ -684,6 +692,7 @@ impl NetworkManager { if our_protocol_config.outbound.contains(ProtocolType::UDP) && target_node_ref .outbound_protocols() + .unwrap_or_default() .contains(ProtocolType::UDP) { // Do the target and self nodes have a direct udp dialinfo @@ -1097,7 +1106,7 @@ impl NetworkManager { // Get our node's current node info and network class and do the right thing let routing_table = self.routing_table(); - let node_info = routing_table.get_own_peer_info().node_info; + let node_info = routing_table.get_own_node_info(); let network_class = self.get_network_class(); // Do we know our network class yet? @@ -1123,9 +1132,9 @@ impl NetworkManager { let mut inner = self.inner.lock(); // Register new outbound relay - let nr = routing_table.register_node_with_node_info( + let nr = routing_table.register_node_with_signed_node_info( outbound_relay_peerinfo.node_id.key, - outbound_relay_peerinfo.node_info, + outbound_relay_peerinfo.signed_node_info, )?; inner.relay_node = Some(nr); } diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 8bce0ca4..570e6399 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -37,8 +37,8 @@ pub struct BucketEntry { min_max_version: Option<(u8, u8)>, seen_our_node_info: bool, last_connection: Option<(ConnectionDescriptor, u64)>, - node_info: NodeInfo, - local_node_info: LocalNodeInfo, + opt_signed_node_info: Option, + opt_local_node_info: Option, peer_stats: PeerStats, latency_stats_accounting: LatencyStatsAccounting, transfer_stats_accounting: TransferStatsAccounting, @@ -52,8 +52,8 @@ impl BucketEntry { min_max_version: None, seen_our_node_info: false, last_connection: None, - node_info: NodeInfo::default(), - local_node_info: LocalNodeInfo::default(), + opt_signed_node_info: None, + opt_local_node_info: None, latency_stats_accounting: LatencyStatsAccounting::new(), transfer_stats_accounting: TransferStatsAccounting::new(), peer_stats: PeerStats { @@ -107,26 +107,44 @@ impl BucketEntry { move |e1, e2| Self::cmp_fastest_reliable(cur_ts, e1, e2) } - pub fn update_node_info(&mut self, node_info: NodeInfo) { - self.node_info = node_info + pub fn update_node_info(&mut self, signed_node_info: SignedNodeInfo) { + self.opt_signed_node_info = Some(signed_node_info); } pub fn update_local_node_info(&mut self, local_node_info: LocalNodeInfo) { - self.local_node_info = local_node_info + self.opt_local_node_info = Some(local_node_info) } - pub fn node_info(&self) -> &NodeInfo { - &self.node_info + pub fn has_node_info(&self) -> bool { + self.opt_signed_node_info.is_some() } - pub fn local_node_info(&self) -> &LocalNodeInfo { - &self.local_node_info - } - pub fn peer_info(&self, key: DHTKey) -> PeerInfo { - PeerInfo { - node_id: NodeId::new(key), - node_info: self.node_info.clone(), + + pub fn has_valid_signed_node_info(&self) -> bool { + if let Some(sni) = &self.opt_signed_node_info { + sni.signature.valid + } else { + false } } + pub fn has_local_node_info(&self) -> bool { + self.opt_local_node_info.is_some() + } + + pub fn node_info(&self) -> Option { + self.opt_signed_node_info + .as_ref() + .map(|s| s.node_info.clone()) + } + pub fn local_node_info(&self) -> Option { + self.opt_local_node_info.clone() + } + pub fn peer_info(&self, key: DHTKey) -> Option { + self.opt_signed_node_info.as_ref().map(|s| PeerInfo { + node_id: NodeId::new(key), + signed_node_info: s.clone(), + }) + } + pub fn set_last_connection(&mut self, last_connection: ConnectionDescriptor, timestamp: u64) { self.last_connection = Some((last_connection, timestamp)); } diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index 4f3e1e86..afd520f1 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -22,17 +22,20 @@ impl RoutingTable { let entry = params.1.as_ref().unwrap(); // skip nodes on our local network here - if entry.local_node_info().has_dial_info() { + if entry.local_node_info().is_some() { return false; } // does it have matching public dial info? entry .node_info() - .first_filtered_dial_info_detail(|did| { - did.matches_filter(&dial_info_filter1) + .map(|n| { + n.first_filtered_dial_info_detail(|did| { + did.matches_filter(&dial_info_filter1) + }) + .is_some() }) - .is_some() + .unwrap_or(false) }, )), // transform @@ -49,16 +52,30 @@ impl RoutingTable { // Get our own node's peer info (public node info) so we can share it with other nodes pub fn get_own_peer_info(&self) -> PeerInfo { + PeerInfo::new(NodeId::new(self.node_id()), self.get_own_signed_node_info()) + } + + pub fn get_own_signed_node_info(&self) -> SignedNodeInfo { + let node_id = NodeId::new(self.node_id()); + let secret = self.node_id_secret(); + SignedNodeInfo::with_secret(self.get_own_node_info(), node_id, &secret).unwrap() + } + + pub fn get_own_node_info(&self) -> NodeInfo { let netman = self.network_manager(); let relay_node = netman.relay_node(); - PeerInfo { - node_id: NodeId::new(self.node_id()), - node_info: NodeInfo { - network_class: netman.get_network_class().unwrap_or(NetworkClass::Invalid), - outbound_protocols: netman.get_protocol_config().unwrap_or_default().outbound, - dial_info_detail_list: self.dial_info_details(RoutingDomain::PublicInternet), - relay_peer_info: relay_node.map(|rn| Box::new(rn.peer_info())), - }, + NodeInfo { + network_class: netman.get_network_class().unwrap_or(NetworkClass::Invalid), + outbound_protocols: netman.get_protocol_config().unwrap_or_default().outbound, + dial_info_detail_list: self.dial_info_details(RoutingDomain::PublicInternet), + relay_peer_info: relay_node.and_then(|rn| rn.peer_info().map(Box::new)), + } + } + + pub fn filter_has_valid_signed_node_info(kv: &(&DHTKey, Option<&mut BucketEntry>)) -> bool { + match &kv.1 { + None => true, + Some(b) => b.has_node_info(), } } @@ -68,7 +85,7 @@ impl RoutingTable { ) -> PeerInfo { match &kv.1 { None => own_peer_info.clone(), - Some(entry) => entry.peer_info(*kv.0), + Some(entry) => entry.peer_info(*kv.0).unwrap(), } } diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index b6d00387..993d2783 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -482,13 +482,13 @@ impl RoutingTable { // Shortcut function to add a node to our routing table if it doesn't exist // and add the dial info we have for it, since that's pretty common - pub fn register_node_with_node_info( + pub fn register_node_with_signed_node_info( &self, node_id: DHTKey, - node_info: NodeInfo, + signed_node_info: SignedNodeInfo, ) -> Result { let nr = self.create_node_ref(node_id, |e| { - e.update_node_info(node_info); + e.update_node_info(signed_node_info); })?; Ok(nr) @@ -542,7 +542,11 @@ impl RoutingTable { // Ensure it's not dead if !matches!(entry.state(cur_ts), BucketEntryState::Dead) { // Ensure this node is not on our local network - if !entry.local_node_info().has_dial_info() { + if !entry + .local_node_info() + .map(|l| l.has_dial_info()) + .unwrap_or(false) + { // Ensure we have the node's status if let Some(node_status) = &entry.peer_stats().status { // Ensure the node will relay @@ -569,8 +573,36 @@ impl RoutingTable { best_inbound_relay } - pub async fn find_self(&self, node_ref: NodeRef) -> Result, String> { + pub fn register_find_node_answer(&self, fna: FindNodeAnswer) -> Result, String> { let node_id = self.node_id(); + + // register nodes we'd found + let mut out = Vec::::with_capacity(fna.peers.len()); + for p in fna.peers { + // if our own node if is in the list then ignore it, as we don't add ourselves to our own routing table + if p.node_id.key == node_id { + continue; + } + + // register the node if it's new + let nr = self + .register_node_with_signed_node_info(p.node_id.key, p.signed_node_info.clone()) + .map_err(map_to_string) + .map_err(logthru_rtab!( + "couldn't register node {} at {:?}", + p.node_id.key, + &p.signed_node_info + ))?; + out.push(nr); + } + Ok(out) + } + + pub async fn find_node( + &self, + node_ref: NodeRef, + node_id: DHTKey, + ) -> Result, String> { let rpc_processor = self.rpc_processor(); let res = rpc_processor @@ -594,29 +626,14 @@ impl RoutingTable { self.register_find_node_answer(res) } - pub fn register_find_node_answer(&self, fna: FindNodeAnswer) -> Result, String> { + pub async fn find_self(&self, node_ref: NodeRef) -> Result, String> { let node_id = self.node_id(); + self.find_node(node_ref, node_id).await + } - // register nodes we'd found - let mut out = Vec::::with_capacity(fna.peers.len()); - for p in fna.peers { - // if our own node if is in the list then ignore it, as we don't add ourselves to our own routing table - if p.node_id.key == node_id { - continue; - } - - // register the node if it's new - let nr = self - .register_node_with_node_info(p.node_id.key, p.node_info.clone()) - .map_err(map_to_string) - .map_err(logthru_rtab!( - "couldn't register node {} at {:?}", - p.node_id.key, - &p.node_info - ))?; - out.push(nr); - } - Ok(out) + pub async fn find_target(&self, node_ref: NodeRef) -> Result, String> { + let node_id = node_ref.node_id(); + self.find_node(node_ref, node_id).await } pub async fn reverse_find_node(&self, node_ref: NodeRef, wide: bool) { @@ -681,18 +698,39 @@ impl RoutingTable { let mut unord = FuturesUnordered::new(); for (k, v) in bsmap { log_rtab!(" bootstrapping {} with {:?}", k.encode(), &v); + + // Make invalid signed node info (no signature) let nr = self - .register_node_with_node_info( + .register_node_with_signed_node_info( k, - NodeInfo { + SignedNodeInfo::with_no_signature(NodeInfo { network_class: NetworkClass::InboundCapable, // Bootstraps are always inbound capable outbound_protocols: ProtocolSet::empty(), // Bootstraps do not participate in relaying and will not make outbound requests dial_info_detail_list: v, // Dial info is as specified in the bootstrap list relay_peer_info: None, // Bootstraps never require a relay themselves - }, + }), ) .map_err(logthru_rtab!("Couldn't add bootstrap node: {}", k))?; - unord.push(self.reverse_find_node(nr, true)); + + // Add this our futures to process in parallel + let this = self.clone(); + unord.push(async move { + // Need VALID signed peer info, so ask bootstrap to find_node of itself + // which will ensure it has the bootstrap's signed peer info as part of the response + let _ = this.find_target(nr.clone()).await; + + // Ensure we got the signed peer info + if !nr.operate(|e| e.has_valid_signed_node_info()) { + warn!( + "bootstrap at {:?} did not return valid signed node info", + nr + ); + // xxx: delete the node? + } else { + // otherwise this bootstrap is valid, lets ask it to find ourselves now + this.reverse_find_node(nr, true).await + } + }); } while unord.next().await.is_some() {} Ok(()) @@ -749,7 +787,7 @@ impl RoutingTable { nr, entry.state_debug_info(cur_ts) ); - intf::spawn_local(rpc.clone().rpc_call_info(nr)).detach(); + intf::spawn_local(rpc.clone().rpc_call_status(nr)).detach(); } } } diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 2d9e8bf4..13b88869 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -63,7 +63,7 @@ impl NodeRef { self.routing_table.operate_on_bucket_entry(self.node_id, f) } - pub fn peer_info(&self) -> PeerInfo { + pub fn peer_info(&self) -> Option { self.operate(|e| e.peer_info(self.node_id())) } pub fn has_seen_our_node_info(&self) -> bool { @@ -72,23 +72,24 @@ impl NodeRef { pub fn set_seen_our_node_info(&self) { self.operate(|e| e.set_seen_our_node_info(true)); } - - pub fn network_class(&self) -> NetworkClass { - self.operate(|e| e.node_info().network_class) + pub fn network_class(&self) -> Option { + self.operate(|e| e.node_info().map(|n| n.network_class)) } - pub fn outbound_protocols(&self) -> ProtocolSet { - self.operate(|e| e.node_info().outbound_protocols) + pub fn outbound_protocols(&self) -> Option { + self.operate(|e| e.node_info().map(|n| n.outbound_protocols)) } pub fn relay(&self) -> Option { - let target_rpi = self.operate(|e| e.node_info().relay_peer_info.clone())?; - self.routing_table - .register_node_with_node_info(target_rpi.node_id.key, target_rpi.node_info) - .map_err(logthru_rtab!(error)) - .ok() - .map(|mut nr| { - nr.set_filter(self.filter_ref().cloned()); - nr - }) + let target_rpi = self.operate(|e| e.node_info().map(|n| n.relay_peer_info))?; + target_rpi.and_then(|t| { + self.routing_table + .register_node_with_signed_node_info(t.node_id.key, t.signed_node_info) + .map_err(logthru_rtab!(error)) + .ok() + .map(|mut nr| { + nr.set_filter(self.filter_ref().cloned()); + nr + }) + }) } pub fn first_filtered_dial_info_detail( &self, @@ -105,8 +106,8 @@ impl NodeRef { PeerScope::All | PeerScope::Local ) { - e.local_node_info() - .first_filtered_dial_info(|di| { + e.local_node_info().and_then(|l| { + l.first_filtered_dial_info(|di| { if let Some(filter) = self.filter.as_ref() { di.matches_filter(filter) } else { @@ -117,6 +118,7 @@ impl NodeRef { class: DialInfoClass::Direct, dial_info: di, }) + }) } else { None } @@ -130,12 +132,14 @@ impl NodeRef { PeerScope::All | PeerScope::Global ) { - e.node_info().first_filtered_dial_info_detail(|did| { - if let Some(filter) = self.filter.as_ref() { - did.matches_filter(filter) - } else { - true - } + e.node_info().and_then(|n| { + n.first_filtered_dial_info_detail(|did| { + if let Some(filter) = self.filter.as_ref() { + did.matches_filter(filter) + } else { + true + } + }) }) } else { None @@ -160,17 +164,19 @@ impl NodeRef { PeerScope::All | PeerScope::Local ) { - for di in e.local_node_info().all_filtered_dial_info(|di| { - if let Some(filter) = self.filter.as_ref() { - di.matches_filter(filter) - } else { - true + if let Some(lni) = e.local_node_info() { + for di in lni.all_filtered_dial_info(|di| { + if let Some(filter) = self.filter.as_ref() { + di.matches_filter(filter) + } else { + true + } + }) { + out.push(DialInfoDetail { + class: DialInfoClass::Direct, + dial_info: di, + }); } - }) { - out.push(DialInfoDetail { - class: DialInfoClass::Direct, - dial_info: di, - }); } } if (routing_domain == None || routing_domain == Some(RoutingDomain::PublicInternet)) @@ -182,13 +188,15 @@ impl NodeRef { PeerScope::All | PeerScope::Global ) { - out.append(&mut e.node_info().all_filtered_dial_info_details(|did| { - if let Some(filter) = self.filter.as_ref() { - did.matches_filter(filter) - } else { - true - } - })) + if let Some(ni) = e.node_info() { + out.append(&mut ni.all_filtered_dial_info_details(|did| { + if let Some(filter) = self.filter.as_ref() { + did.matches_filter(filter) + } else { + true + } + })) + } } }); out.remove_duplicates(); @@ -225,7 +233,14 @@ impl NodeRef { } pub fn has_any_dial_info(&self) -> bool { - self.operate(|e| e.node_info().has_any_dial_info() || e.local_node_info().has_dial_info()) + self.operate(|e| { + e.node_info() + .map(|n| n.has_any_dial_info()) + .unwrap_or(false) + || e.local_node_info() + .map(|l| l.has_dial_info()) + .unwrap_or(false) + }) } } diff --git a/veilid-core/src/rpc_processor/coders/mod.rs b/veilid-core/src/rpc_processor/coders/mod.rs index 86fdb14a..ddd76469 100644 --- a/veilid-core/src/rpc_processor/coders/mod.rs +++ b/veilid-core/src/rpc_processor/coders/mod.rs @@ -13,6 +13,8 @@ mod protocol_set; mod public_key; mod sender_info; mod signal_info; +mod signature; +mod signed_node_info; mod socket_address; pub use address::*; @@ -30,4 +32,6 @@ pub use protocol_set::*; pub use public_key::*; pub use sender_info::*; pub use signal_info::*; +pub use signature::*; +pub use signed_node_info::*; pub use socket_address::*; diff --git a/veilid-core/src/rpc_processor/coders/peer_info.rs b/veilid-core/src/rpc_processor/coders/peer_info.rs index 38d45387..d775527b 100644 --- a/veilid-core/src/rpc_processor/coders/peer_info.rs +++ b/veilid-core/src/rpc_processor/coders/peer_info.rs @@ -8,8 +8,8 @@ pub fn encode_peer_info( // let mut nid_builder = builder.reborrow().init_node_id(); encode_public_key(&peer_info.node_id.key, &mut nid_builder)?; - let mut ni_builder = builder.reborrow().init_node_info(); - encode_node_info(&peer_info.node_info, &mut ni_builder)?; + let mut sni_builder = builder.reborrow().init_signed_node_info(); + encode_signed_node_info(&peer_info.signed_node_info, &mut sni_builder)?; Ok(()) } @@ -22,14 +22,16 @@ pub fn decode_peer_info( .reborrow() .get_node_id() .map_err(map_error_capnp_error!())?; - let ni_reader = reader + let sni_reader = reader .reborrow() - .get_node_info() + .get_signed_node_info() .map_err(map_error_capnp_error!())?; - let node_info = decode_node_info(&ni_reader, allow_relay_peer_info)?; + let node_id = NodeId::new(decode_public_key(&nid_reader)); + let signed_node_info = + decode_signed_node_info(&sni_reader, &node_id.key, allow_relay_peer_info)?; Ok(PeerInfo { - node_id: NodeId::new(decode_public_key(&nid_reader)), - node_info, + node_id, + signed_node_info, }) } diff --git a/veilid-core/src/rpc_processor/coders/signature.rs b/veilid-core/src/rpc_processor/coders/signature.rs new file mode 100644 index 00000000..01f8b124 --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/signature.rs @@ -0,0 +1,60 @@ +use crate::*; +use rpc_processor::*; + +pub fn encode_signature( + sig: &DHTSignature, + builder: &mut veilid_capnp::ed25519_signature::Builder, +) { + if sig.valid { + panic!("don't encode invalid signatures"); + } + + let sig = &sig.bytes; + + builder.set_u0(u64::from_be_bytes( + sig[0..8].try_into().expect("slice with incorrect length"), + )); + builder.set_u1(u64::from_be_bytes( + sig[8..16].try_into().expect("slice with incorrect length"), + )); + builder.set_u2(u64::from_be_bytes( + sig[16..24].try_into().expect("slice with incorrect length"), + )); + builder.set_u3(u64::from_be_bytes( + sig[24..32].try_into().expect("slice with incorrect length"), + )); + builder.set_u4(u64::from_be_bytes( + sig[32..40].try_into().expect("slice with incorrect length"), + )); + builder.set_u5(u64::from_be_bytes( + sig[40..48].try_into().expect("slice with incorrect length"), + )); + builder.set_u6(u64::from_be_bytes( + sig[48..56].try_into().expect("slice with incorrect length"), + )); + builder.set_u7(u64::from_be_bytes( + sig[56..64].try_into().expect("slice with incorrect length"), + )); +} + +pub fn decode_signature(reader: &veilid_capnp::ed25519_signature::Reader) -> DHTSignature { + let u0 = reader.get_u0().to_be_bytes(); + let u1 = reader.get_u1().to_be_bytes(); + let u2 = reader.get_u2().to_be_bytes(); + let u3 = reader.get_u3().to_be_bytes(); + let u4 = reader.get_u4().to_be_bytes(); + let u5 = reader.get_u5().to_be_bytes(); + let u6 = reader.get_u6().to_be_bytes(); + let u7 = reader.get_u7().to_be_bytes(); + + DHTSignature::new([ + u0[0], u0[1], u0[2], u0[3], u0[4], u0[5], u0[6], u0[7], // u0 + u1[0], u1[1], u1[2], u1[3], u1[4], u1[5], u1[6], u1[7], // u1 + u2[0], u2[1], u2[2], u2[3], u2[4], u2[5], u2[6], u2[7], // u2 + u3[0], u3[1], u3[2], u3[3], u3[4], u3[5], u3[6], u3[7], // u3 + u4[0], u4[1], u4[2], u4[3], u4[4], u4[5], u4[6], u4[7], // u4 + u5[0], u5[1], u5[2], u5[3], u5[4], u5[5], u5[6], u5[7], // u5 + u6[0], u6[1], u6[2], u6[3], u6[4], u6[5], u6[6], u6[7], // u6 + u7[0], u7[1], u7[2], u7[3], u7[4], u7[5], u7[6], u7[7], // u7 + ]) +} diff --git a/veilid-core/src/rpc_processor/coders/signed_node_info.rs b/veilid-core/src/rpc_processor/coders/signed_node_info.rs new file mode 100644 index 00000000..f722ec5e --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/signed_node_info.rs @@ -0,0 +1,36 @@ +use crate::*; +use rpc_processor::*; + +pub fn encode_signed_node_info( + signed_node_info: &SignedNodeInfo, + builder: &mut veilid_capnp::signed_node_info::Builder, +) -> Result<(), RPCError> { + // + let mut ni_builder = builder.reborrow().init_node_info(); + encode_node_info(&signed_node_info.node_info, &mut ni_builder)?; + + let mut sig_builder = builder.reborrow().init_signature(); + encode_signature(&signed_node_info.signature, &mut sig_builder); + + Ok(()) +} + +pub fn decode_signed_node_info( + reader: &veilid_capnp::signed_node_info::Reader, + node_id: &DHTKey, + allow_relay_peer_info: bool, +) -> Result { + let ni_reader = reader + .reborrow() + .get_node_info() + .map_err(map_error_capnp_error!())?; + let node_info = decode_node_info(&ni_reader, allow_relay_peer_info)?; + + let sig_reader = reader + .reborrow() + .get_signature() + .map_err(map_error_capnp_error!())?; + let signature = decode_signature(&sig_reader); + + SignedNodeInfo::new(node_info, NodeId::new(*node_id), signature).map_err(map_error_string!()) +} diff --git a/veilid-core/src/rpc_processor/debug.rs b/veilid-core/src/rpc_processor/debug.rs index 82b4a163..203cc27a 100644 --- a/veilid-core/src/rpc_processor/debug.rs +++ b/veilid-core/src/rpc_processor/debug.rs @@ -125,20 +125,24 @@ impl RPCProcessor { let respond_to_str = match respond_to { veilid_capnp::operation::respond_to::None(_) => "(None)".to_owned(), veilid_capnp::operation::respond_to::Sender(_) => "Sender".to_owned(), - veilid_capnp::operation::respond_to::SenderWithInfo(ni) => { - let ni_reader = match ni { - Ok(nir) => nir, + veilid_capnp::operation::respond_to::SenderWithInfo(sni) => { + let sni_reader = match sni { + Ok(snir) => snir, Err(e) => { return e.to_string(); } }; - let node_info = match decode_node_info(&ni_reader, true) { + let signed_node_info = match decode_signed_node_info( + &sni_reader, + &request_rpcreader.header.envelope.get_sender_id(), + true, + ) { Ok(ni) => ni, Err(e) => { return e.to_string(); } }; - format!("Sender({:?})", node_info) + format!("Sender({:?})", signed_node_info) } veilid_capnp::operation::respond_to::PrivateRoute(pr) => { let pr_reader = match pr { @@ -197,11 +201,11 @@ impl RPCProcessor { detail: &veilid_capnp::operation::detail::WhichReader, ) -> String { match detail { - veilid_capnp::operation::detail::InfoQ(_) => { - format!("InfoQ") + veilid_capnp::operation::detail::StatusQ(_) => { + format!("StatusQ") } - veilid_capnp::operation::detail::InfoA(_) => { - format!("InfoA") + veilid_capnp::operation::detail::StatusA(_) => { + format!("StatusA") } veilid_capnp::operation::detail::ValidateDialInfo(_) => { format!("ValidateDialInfo") @@ -255,6 +259,9 @@ impl RPCProcessor { veilid_capnp::operation::detail::Route(_) => { format!("Route") } + veilid_capnp::operation::detail::NodeInfoUpdate(_) => { + format!("NodeInfoUpdate") + } veilid_capnp::operation::detail::GetValueQ(_) => { format!("GetValueQ") } diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 6a6da9df..70426857 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -31,7 +31,7 @@ pub enum Destination { #[derive(Debug, Clone)] pub enum RespondTo { None, - Sender(Option), + Sender(Option), PrivateRoute(PrivateRoute), } @@ -44,9 +44,9 @@ impl RespondTo { Self::None => { builder.set_none(()); } - Self::Sender(Some(ni)) => { - let mut ni_builder = builder.reborrow().init_sender_with_info(); - encode_node_info(ni, &mut ni_builder)?; + Self::Sender(Some(sni)) => { + let mut sni_builder = builder.reborrow().init_sender_with_info(); + encode_signed_node_info(sni, &mut sni_builder)?; } Self::Sender(None) => { builder.reborrow().set_sender(()); @@ -130,7 +130,7 @@ struct WaitableReply { ///////////////////////////////////////////////////////////////////// #[derive(Clone, Debug, Default)] -pub struct InfoAnswer { +pub struct StatusAnswer { pub latency: u64, pub node_status: NodeStatus, pub sender_info: SenderInfo, @@ -227,7 +227,7 @@ impl RPCProcessor { } } if let Some(rpi) = &node_info.relay_peer_info { - for did in &rpi.node_info.dial_info_detail_list { + for did in &rpi.signed_node_info.node_info.dial_info_detail_list { if !did.dial_info.is_global() { // non-public address causes rejection return false; @@ -748,20 +748,21 @@ impl RPCProcessor { } } - fn get_respond_to_sender_node_info( + fn get_respond_to_sender_signed_node_info( &self, operation: &veilid_capnp::operation::Reader, - ) -> Result, RPCError> { + sender_node_id: &DHTKey, + ) -> Result, RPCError> { match operation .get_respond_to() .which() .map_err(map_error_capnp_notinschema!())? { - veilid_capnp::operation::respond_to::SenderWithInfo(Ok(sender_ni_reader)) => { - Ok(Some(decode_node_info(&sender_ni_reader, true)?)) - } + veilid_capnp::operation::respond_to::SenderWithInfo(Ok(sender_ni_reader)) => Ok(Some( + decode_signed_node_info(&sender_ni_reader, sender_node_id, true)?, + )), veilid_capnp::operation::respond_to::SenderWithInfo(Err(e)) => Err(rpc_error_protocol( - format!("invalid sender_with_info node info: {}", e), + format!("invalid sender_with_info signed node info: {}", e), )), veilid_capnp::operation::respond_to::None(_) | veilid_capnp::operation::respond_to::Sender(_) @@ -779,7 +780,7 @@ impl RPCProcessor { SenderInfo { socket_address } } - async fn process_info_q(&self, rpcreader: RPCMessageReader) -> Result<(), RPCError> { + async fn process_status_q(&self, rpcreader: RPCMessageReader) -> Result<(), RPCError> { let peer_noderef = rpcreader.header.peer_noderef.clone(); let sender_info = self.generate_sender_info(peer_noderef).await; @@ -795,10 +796,10 @@ impl RPCProcessor { return Ok(()); } - // get InfoQ reader + // get StatusQ reader let iq_reader = match operation.get_detail().which() { - Ok(veilid_capnp::operation::detail::Which::InfoQ(Ok(x))) => x, - _ => panic!("invalid operation type in process_info_q"), + Ok(veilid_capnp::operation::detail::Which::StatusQ(Ok(x))) => x, + _ => panic!("invalid operation type in process_status_q"), }; // Parse out fields @@ -810,7 +811,7 @@ impl RPCProcessor { // update node status for the requesting node to our routing table if let Some(sender_nr) = rpcreader.opt_sender_nr.clone() { - // Update latest node status in routing table for the infoq sender + // Update latest node status in routing table for the statusq sender sender_nr.operate(|e| { e.update_node_status(node_status); }); @@ -823,15 +824,15 @@ impl RPCProcessor { let mut respond_to = answer.reborrow().init_respond_to(); respond_to.set_none(()); let detail = answer.reborrow().init_detail(); - let mut info_a = detail.init_info_a(); + let mut status_a = detail.init_status_a(); // Add node status let node_status = self.network_manager().generate_node_status(); - let mut nsb = info_a.reborrow().init_node_status(); + let mut nsb = status_a.reborrow().init_node_status(); encode_node_status(&node_status, &mut nsb)?; // Add sender info - let mut sib = info_a.reborrow().init_sender_info(); + let mut sib = status_a.reborrow().init_sender_info(); encode_sender_info(&sender_info, &mut sib)?; reply_msg.into_reader() @@ -982,7 +983,7 @@ impl RPCProcessor { let closest_nodes = routing_table.find_closest_nodes( target_node_id, // filter - None, + Some(Box::new(RoutingTable::filter_has_valid_signed_node_info)), // transform |e| RoutingTable::transform_to_peer_info(e, &own_peer_info), ); @@ -995,8 +996,8 @@ impl RPCProcessor { let mut respond_to = answer.reborrow().init_respond_to(); respond_to.set_none(()); let detail = answer.reborrow().init_detail(); - let info_a = detail.init_find_node_a(); - let mut peers_builder = info_a.init_peers( + let fna = detail.init_find_node_a(); + let mut peers_builder = fna.init_peers( closest_nodes .len() .try_into() @@ -1017,6 +1018,46 @@ impl RPCProcessor { Err(rpc_error_unimplemented("process_route")) } + async fn process_node_info_update(&self, rpcreader: RPCMessageReader) -> Result<(), RPCError> { + // + let sender_node_id = rpcreader.header.envelope.get_sender_id(); + let signed_node_info = { + let operation = rpcreader + .reader + .get_root::() + .map_err(map_error_capnp_error!()) + .map_err(logthru_rpc!())?; + + // This should never want an answer + if self.wants_answer(&operation)? { + return Err(RPCError::InvalidFormat); + } + + // get nodeInfoUpdate reader + let niumsg_reader = match operation.get_detail().which() { + Ok(veilid_capnp::operation::detail::Which::NodeInfoUpdate(Ok(x))) => x, + _ => panic!("invalid operation type in process_node_info_update"), + }; + + // Parse out fields + let sni_reader = niumsg_reader + .get_signed_node_info() + .map_err(map_error_internal!("no valid signed node info"))?; + decode_signed_node_info(&sni_reader, &sender_node_id, true)? + }; + + // Update our routing table with signed node info + if !self.filter_peer_scope(&signed_node_info.node_info) { + return Err(RPCError::InvalidFormat); + } + let _ = self + .routing_table() + .register_node_with_signed_node_info(sender_node_id, signed_node_info) + .map_err(RPCError::Internal)?; + + Ok(()) + } + async fn process_get_value_q(&self, _rpcreader: RPCMessageReader) -> Result<(), RPCError> { Err(rpc_error_unimplemented("process_get_value_q")) } @@ -1139,6 +1180,7 @@ impl RPCProcessor { ////////////////////////////////////////////////////////////////////// async fn process_rpc_message_version_0(&self, msg: RPCMessage) -> Result<(), RPCError> { let reader = capnp::message::Reader::new(msg.data, Default::default()); + let sender_node_id = msg.header.envelope.get_sender_id(); let mut opt_sender_nr: Option = None; let which = { let operation = reader @@ -1151,55 +1193,53 @@ impl RPCProcessor { .which() .map_err(map_error_capnp_notinschema!())? { - veilid_capnp::operation::detail::InfoQ(_) => (0u32, true), - veilid_capnp::operation::detail::InfoA(_) => (1u32, false), + veilid_capnp::operation::detail::StatusQ(_) => (0u32, true), + veilid_capnp::operation::detail::StatusA(_) => (1u32, false), veilid_capnp::operation::detail::ValidateDialInfo(_) => (2u32, true), veilid_capnp::operation::detail::FindNodeQ(_) => (3u32, true), veilid_capnp::operation::detail::FindNodeA(_) => (4u32, false), veilid_capnp::operation::detail::Route(_) => (5u32, true), - veilid_capnp::operation::detail::GetValueQ(_) => (6u32, true), - veilid_capnp::operation::detail::GetValueA(_) => (7u32, false), - veilid_capnp::operation::detail::SetValueQ(_) => (8u32, true), - veilid_capnp::operation::detail::SetValueA(_) => (9u32, false), - veilid_capnp::operation::detail::WatchValueQ(_) => (10u32, true), - veilid_capnp::operation::detail::WatchValueA(_) => (11u32, false), - veilid_capnp::operation::detail::ValueChanged(_) => (12u32, true), - veilid_capnp::operation::detail::SupplyBlockQ(_) => (13u32, true), - veilid_capnp::operation::detail::SupplyBlockA(_) => (14u32, false), - veilid_capnp::operation::detail::FindBlockQ(_) => (15u32, true), - veilid_capnp::operation::detail::FindBlockA(_) => (16u32, false), - veilid_capnp::operation::detail::Signal(_) => (17u32, true), - veilid_capnp::operation::detail::ReturnReceipt(_) => (18u32, true), - veilid_capnp::operation::detail::StartTunnelQ(_) => (19u32, true), - veilid_capnp::operation::detail::StartTunnelA(_) => (20u32, false), - veilid_capnp::operation::detail::CompleteTunnelQ(_) => (21u32, true), - veilid_capnp::operation::detail::CompleteTunnelA(_) => (22u32, false), - veilid_capnp::operation::detail::CancelTunnelQ(_) => (23u32, true), - veilid_capnp::operation::detail::CancelTunnelA(_) => (24u32, false), + veilid_capnp::operation::detail::NodeInfoUpdate(_) => (6u32, true), + veilid_capnp::operation::detail::GetValueQ(_) => (7u32, true), + veilid_capnp::operation::detail::GetValueA(_) => (8u32, false), + veilid_capnp::operation::detail::SetValueQ(_) => (9u32, true), + veilid_capnp::operation::detail::SetValueA(_) => (10u32, false), + veilid_capnp::operation::detail::WatchValueQ(_) => (11u32, true), + veilid_capnp::operation::detail::WatchValueA(_) => (12u32, false), + veilid_capnp::operation::detail::ValueChanged(_) => (13u32, true), + veilid_capnp::operation::detail::SupplyBlockQ(_) => (14u32, true), + veilid_capnp::operation::detail::SupplyBlockA(_) => (15u32, false), + veilid_capnp::operation::detail::FindBlockQ(_) => (16u32, true), + veilid_capnp::operation::detail::FindBlockA(_) => (17u32, false), + veilid_capnp::operation::detail::Signal(_) => (18u32, true), + veilid_capnp::operation::detail::ReturnReceipt(_) => (19u32, true), + veilid_capnp::operation::detail::StartTunnelQ(_) => (20u32, true), + veilid_capnp::operation::detail::StartTunnelA(_) => (21u32, false), + veilid_capnp::operation::detail::CompleteTunnelQ(_) => (22u32, true), + veilid_capnp::operation::detail::CompleteTunnelA(_) => (23u32, false), + veilid_capnp::operation::detail::CancelTunnelQ(_) => (24u32, true), + veilid_capnp::operation::detail::CancelTunnelA(_) => (25u32, false), }; // Accounting for questions we receive if is_q { // See if we have some Sender NodeInfo to incorporate - opt_sender_nr = - if let Some(sender_ni) = self.get_respond_to_sender_node_info(&operation)? { - // Sender NodeInfo was specified, update our routing table with it - if !self.filter_peer_scope(&sender_ni) { - return Err(RPCError::InvalidFormat); - } - let nr = self - .routing_table() - .register_node_with_node_info( - msg.header.envelope.get_sender_id(), - sender_ni, - ) - .map_err(RPCError::Internal)?; - Some(nr) - } else { - // look up sender node, in case it's different than our peer due to relaying - self.routing_table() - .lookup_node_ref(msg.header.envelope.get_sender_id()) - }; + opt_sender_nr = if let Some(sender_ni) = + self.get_respond_to_sender_signed_node_info(&operation, &sender_node_id)? + { + // Sender NodeInfo was specified, update our routing table with it + if !self.filter_peer_scope(&sender_ni.node_info) { + return Err(RPCError::InvalidFormat); + } + let nr = self + .routing_table() + .register_node_with_signed_node_info(sender_node_id, sender_ni) + .map_err(RPCError::Internal)?; + Some(nr) + } else { + // look up sender node, in case it's different than our peer due to relaying + self.routing_table().lookup_node_ref(sender_node_id) + }; if let Some(sender_nr) = opt_sender_nr.clone() { self.routing_table().stats_question_rcvd( @@ -1220,31 +1260,32 @@ impl RPCProcessor { }; match which { - 0 => self.process_info_q(rpcreader).await, // InfoQ - 1 => self.process_answer(rpcreader).await, // InfoA + 0 => self.process_status_q(rpcreader).await, // StatusQ + 1 => self.process_answer(rpcreader).await, // StatusA 2 => self.process_validate_dial_info(rpcreader).await, // ValidateDialInfo 3 => self.process_find_node_q(rpcreader).await, // FindNodeQ - 4 => self.process_answer(rpcreader).await, // FindNodeA - 5 => self.process_route(rpcreader).await, // Route - 6 => self.process_get_value_q(rpcreader).await, // GetValueQ - 7 => self.process_answer(rpcreader).await, // GetValueA - 8 => self.process_set_value_q(rpcreader).await, // SetValueQ - 9 => self.process_answer(rpcreader).await, // SetValueA - 10 => self.process_watch_value_q(rpcreader).await, // WatchValueQ - 11 => self.process_answer(rpcreader).await, // WatchValueA - 12 => self.process_value_changed(rpcreader).await, // ValueChanged - 13 => self.process_supply_block_q(rpcreader).await, // SupplyBlockQ - 14 => self.process_answer(rpcreader).await, // SupplyBlockA - 15 => self.process_find_block_q(rpcreader).await, // FindBlockQ - 16 => self.process_answer(rpcreader).await, // FindBlockA - 17 => self.process_signal(rpcreader).await, // SignalQ - 18 => self.process_return_receipt(rpcreader).await, // ReturnReceipt - 19 => self.process_start_tunnel_q(rpcreader).await, // StartTunnelQ - 20 => self.process_answer(rpcreader).await, // StartTunnelA - 21 => self.process_complete_tunnel_q(rpcreader).await, // CompleteTunnelQ - 22 => self.process_answer(rpcreader).await, // CompleteTunnelA - 23 => self.process_cancel_tunnel_q(rpcreader).await, // CancelTunnelQ - 24 => self.process_answer(rpcreader).await, // CancelTunnelA + 4 => self.process_answer(rpcreader).await, // FindNodeA + 5 => self.process_route(rpcreader).await, // Route + 6 => self.process_node_info_update(rpcreader).await, // NodeInfoUpdate + 7 => self.process_get_value_q(rpcreader).await, // GetValueQ + 8 => self.process_answer(rpcreader).await, // GetValueA + 9 => self.process_set_value_q(rpcreader).await, // SetValueQ + 10 => self.process_answer(rpcreader).await, // SetValueA + 11 => self.process_watch_value_q(rpcreader).await, // WatchValueQ + 12 => self.process_answer(rpcreader).await, // WatchValueA + 13 => self.process_value_changed(rpcreader).await, // ValueChanged + 14 => self.process_supply_block_q(rpcreader).await, // SupplyBlockQ + 15 => self.process_answer(rpcreader).await, // SupplyBlockA + 16 => self.process_find_block_q(rpcreader).await, // FindBlockQ + 17 => self.process_answer(rpcreader).await, // FindBlockA + 18 => self.process_signal(rpcreader).await, // SignalQ + 19 => self.process_return_receipt(rpcreader).await, // ReturnReceipt + 20 => self.process_start_tunnel_q(rpcreader).await, // StartTunnelQ + 21 => self.process_answer(rpcreader).await, // StartTunnelA + 22 => self.process_complete_tunnel_q(rpcreader).await, // CompleteTunnelQ + 23 => self.process_answer(rpcreader).await, // CompleteTunnelA + 24 => self.process_cancel_tunnel_q(rpcreader).await, // CancelTunnelQ + 25 => self.process_answer(rpcreader).await, // CancelTunnelA _ => panic!("must update rpc table"), } } @@ -1347,38 +1388,43 @@ impl RPCProcessor { // or None if the peer has seen our dial info before or our node info is not yet valid // because of an unknown network class pub fn make_respond_to_sender(&self, peer: NodeRef) -> RespondTo { - let our_node_info = self.routing_table().get_own_peer_info().node_info; if peer.has_seen_our_node_info() - || matches!(our_node_info.network_class, NetworkClass::Invalid) + || matches!( + self.network_manager() + .get_network_class() + .unwrap_or(NetworkClass::Invalid), + NetworkClass::Invalid + ) { RespondTo::Sender(None) } else { - RespondTo::Sender(Some(our_node_info)) + let our_sni = self.routing_table().get_own_signed_node_info(); + RespondTo::Sender(Some(our_sni)) } } - // Send InfoQ RPC request, receive InfoA answer + // Send StatusQ RPC request, receive StatusA answer // Can be sent via relays, but not via routes - pub async fn rpc_call_info(self, peer: NodeRef) -> Result { - let info_q_msg = { - let mut info_q_msg = ::capnp::message::Builder::new_default(); - let mut question = info_q_msg.init_root::(); + pub async fn rpc_call_status(self, peer: NodeRef) -> Result { + let status_q_msg = { + let mut status_q_msg = ::capnp::message::Builder::new_default(); + let mut question = status_q_msg.init_root::(); question.set_op_id(self.get_next_op_id()); let mut respond_to = question.reborrow().init_respond_to(); self.make_respond_to_sender(peer.clone()) .encode(&mut respond_to)?; let detail = question.reborrow().init_detail(); - let mut iqb = detail.init_info_q(); - let mut node_status_builder = iqb.reborrow().init_node_status(); + let mut sqb = detail.init_status_q(); + let mut node_status_builder = sqb.reborrow().init_node_status(); let node_status = self.network_manager().generate_node_status(); encode_node_status(&node_status, &mut node_status_builder)?; - info_q_msg.into_reader() + status_q_msg.into_reader() }; // Send the info request let waitable_reply = self - .request(Destination::Direct(peer.clone()), info_q_msg, None) + .request(Destination::Direct(peer.clone()), status_q_msg, None) .await? .unwrap(); @@ -1394,30 +1440,30 @@ impl RPCProcessor { .get_root::() .map_err(map_error_capnp_error!()) .map_err(logthru_rpc!())?; - let info_a = match response_operation + let status_a = match response_operation .get_detail() .which() .map_err(map_error_capnp_notinschema!()) .map_err(logthru_rpc!())? { - veilid_capnp::operation::detail::InfoA(a) => { - a.map_err(map_error_internal!("Invalid InfoA"))? + veilid_capnp::operation::detail::StatusA(a) => { + a.map_err(map_error_internal!("Invalid StatusA"))? } _ => return Err(rpc_error_internal("Incorrect RPC answer for question")), }; // Decode node info - if !info_a.has_node_status() { + if !status_a.has_node_status() { return Err(rpc_error_internal("Missing node status")); } - let nsr = info_a + let nsr = status_a .get_node_status() .map_err(map_error_internal!("Broken node status"))?; let node_status = decode_node_status(&nsr)?; // Decode sender info - let sender_info = if info_a.has_sender_info() { - let sir = info_a + let sender_info = if status_a.has_sender_info() { + let sir = status_a .get_sender_info() .map_err(map_error_internal!("Broken sender info"))?; decode_sender_info(&sir)? @@ -1453,7 +1499,7 @@ impl RPCProcessor { } // Return the answer for anyone who may care - let out = InfoAnswer { + let out = StatusAnswer { latency, node_status, sender_info, @@ -1583,7 +1629,7 @@ impl RPCProcessor { for p in peers_reader.iter() { let peer_info = decode_peer_info(&p, true)?; - if !self.filter_peer_scope(&peer_info.node_info) { + if !self.filter_peer_scope(&peer_info.signed_node_info.node_info) { return Err(RPCError::InvalidFormat); } @@ -1595,6 +1641,34 @@ impl RPCProcessor { Ok(out) } + // Sends a our node info to another node + // Can be sent via all methods including relays and routes + pub async fn rpc_call_node_info_update( + &self, + dest: Destination, + safety_route: Option<&SafetyRouteSpec>, + ) -> Result<(), RPCError> { + let sni_msg = { + let mut sni_msg = ::capnp::message::Builder::new_default(); + let mut question = sni_msg.init_root::(); + question.set_op_id(self.get_next_op_id()); + let mut respond_to = question.reborrow().init_respond_to(); + respond_to.set_none(()); + let detail = question.reborrow().init_detail(); + let niu_builder = detail.init_node_info_update(); + let mut sni_builder = niu_builder.init_signed_node_info(); + let sni = self.routing_table().get_own_signed_node_info(); + encode_signed_node_info(&sni, &mut sni_builder)?; + + sni_msg.into_reader() + }; + + // Send the node_info_update request + self.request(dest, sni_msg, safety_route).await?; + + Ok(()) + } + // Sends a unidirectional signal to a node // Can be sent via all methods including relays and routes pub async fn rpc_call_signal( diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 62841b6b..7d3e87de 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -13,13 +13,13 @@ pub use alloc::string::ToString; pub use attachment_manager::AttachmentManager; pub use core::str::FromStr; pub use dht::crypto::Crypto; -pub use dht::key::{generate_secret, DHTKey, DHTKeySecret}; +pub use dht::key::{generate_secret, sign, verify, DHTKey, DHTKeySecret, DHTSignature}; pub use intf::BlockStore; pub use intf::ProtectedStore; pub use intf::TableStore; pub use network_manager::NetworkManager; pub use routing_table::RoutingTable; -pub use rpc_processor::InfoAnswer; +pub use rpc_processor::StatusAnswer; use core::fmt; use core_context::{api_shutdown, VeilidCoreContext}; @@ -43,6 +43,9 @@ pub enum VeilidAPIError { NoDialInfo { node_id: NodeId, }, + NoPeerInfo { + node_id: NodeId, + }, Internal { message: String, }, @@ -77,6 +80,9 @@ impl fmt::Display for VeilidAPIError { VeilidAPIError::NoDialInfo { node_id } => { write!(f, "VeilidAPIError::NoDialInfo({})", node_id) } + VeilidAPIError::NoPeerInfo { node_id } => { + write!(f, "VeilidAPIError::NoPeerInfo({})", node_id) + } VeilidAPIError::Internal { message } => { write!(f, "VeilidAPIError::Internal({})", message) } @@ -312,7 +318,7 @@ pub struct NodeStatus { pub will_validate_dial_info: bool, } -#[derive(Clone, Debug, Default, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct NodeInfo { pub network_class: NetworkClass, pub outbound_protocols: ProtocolSet, @@ -352,7 +358,7 @@ impl NodeInfo { || !self .relay_peer_info .as_ref() - .map(|rpi| rpi.node_info.has_direct_dial_info()) + .map(|rpi| rpi.signed_node_info.node_info.has_direct_dial_info()) .unwrap_or_default() } @@ -409,7 +415,7 @@ impl NodeInfo { } } -#[derive(Clone, Debug, Default, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct LocalNodeInfo { pub dial_info_list: Vec, } @@ -978,10 +984,61 @@ impl Default for PeerScope { } } -#[derive(Clone, Debug, Default, Serialize, Deserialize)] +// Signed NodeInfo that can be passed around amongst peers and verifiable +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct SignedNodeInfo { + pub node_info: NodeInfo, + pub signature: DHTSignature, +} + +impl SignedNodeInfo { + pub fn new( + node_info: NodeInfo, + node_id: NodeId, + signature: DHTSignature, + ) -> Result { + let node_info_bytes = serde_cbor::to_vec(&node_info).map_err(map_to_string)?; + verify(&node_id.key, &node_info_bytes, &signature)?; + Ok(Self { + node_info, + signature, + }) + } + + pub fn with_secret( + node_info: NodeInfo, + node_id: NodeId, + secret: &DHTKeySecret, + ) -> Result { + let node_info_bytes = serde_cbor::to_vec(&node_info).map_err(map_to_string)?; + let signature = sign(&node_id.key, secret, &node_info_bytes)?; + Ok(Self { + node_info, + signature, + }) + } + + pub fn with_no_signature(node_info: NodeInfo) -> Self { + Self { + node_info, + signature: DHTSignature::default(), + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct PeerInfo { pub node_id: NodeId, - pub node_info: NodeInfo, + pub signed_node_info: SignedNodeInfo, +} + +impl PeerInfo { + pub fn new(node_id: NodeId, signed_node_info: SignedNodeInfo) -> Self { + Self { + node_id, + signed_node_info, + } + } } #[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)] @@ -1463,18 +1520,18 @@ impl VeilidAPI { //////////////////////////////////////////////////////////////// // Direct Node Access (pretty much for testing only) - pub async fn info(&self, node_id: NodeId) -> Result { + pub async fn status(&self, node_id: NodeId) -> Result { let rpc = self.rpc_processor()?; let routing_table = rpc.routing_table(); let node_ref = match routing_table.lookup_node_ref(node_id.key) { None => return Err(VeilidAPIError::NodeNotFound { node_id }), Some(nr) => nr, }; - let info_answer = rpc - .rpc_call_info(node_ref) + let status_answer = rpc + .rpc_call_status(node_ref) .await .map_err(map_rpc_error!())?; - Ok(info_answer) + Ok(status_answer) } pub async fn validate_dial_info( @@ -1513,8 +1570,13 @@ impl VeilidAPI { .map_err(map_rpc_error!())?; let answer = node_ref.peer_info(); - - Ok(answer) + if let Some(answer) = answer { + Ok(answer) + } else { + Err(VeilidAPIError::NoPeerInfo { + node_id: NodeId::new(node_ref.node_id()), + }) + } } pub async fn search_dht_multi(&self, node_id: NodeId) -> Result, VeilidAPIError> { @@ -1534,7 +1596,7 @@ impl VeilidAPI { .await .map_err(map_rpc_error!())?; - let answer = node_refs.iter().map(|x| x.peer_info()).collect(); + let answer = node_refs.iter().filter_map(|x| x.peer_info()).collect(); Ok(answer) }