diff --git a/scripts/run_local_test.py b/scripts/run_local_test.py index 00b93e5b..da59fa4c 100755 --- a/scripts/run_local_test.py +++ b/scripts/run_local_test.py @@ -50,16 +50,16 @@ def tee(prefix, infile, *files): return t -def read_until_local_dial_info(proc, proto): +def read_until_interface_dial_info(proc, proto): - local_dial_info_str = b"Local Dial Info: " + interface_dial_info_str = b"Interface Dial Info: " for ln in iter(proc.stdout.readline, ""): sys.stdout.buffer.write(ln) sys.stdout.flush() - idx = ln.find(local_dial_info_str) + idx = ln.find(interface_dial_info_str) if idx != -1: - idx += len(local_dial_info_str) + idx += len(interface_dial_info_str) di = ln[idx:] if b"@"+bytes(proto)+b"|" in di: return di.decode("utf-8").strip() @@ -130,7 +130,7 @@ def main(): main_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) print(">>> MAIN NODE PID={}".format(main_proc.pid)) - main_di = read_until_local_dial_info( + main_di = read_until_interface_dial_info( main_proc, bytes(args.protocol, 'utf-8')) print(">>> MAIN DIAL INFO={}".format(main_di)) diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index e0ba8bc9..9cec4d7f 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -107,6 +107,19 @@ struct NodeDialInfo { dialInfo @1 :DialInfo; # how to get to the node } +# Signals +############################## + +struct SignalInfoHolePunch { + receipt @0 :Data; # receipt to return with hole punch + nodeInfo @1 :NodeInfo; # node info of the signal sender for hole punch attempt +} + +struct SignalInfoReverseConnect { + receipt @0 :Data; # receipt to return with reverse connect + nodeInfo @1 :NodeInfo; # node info of the signal sender for reverse connect attempt +} + # Private Routes ############################## @@ -186,10 +199,18 @@ struct NodeStatus { willValidateDialInfo @4 :Bool; } +struct ProtocolSet { + udp @0 :Bool; + tcp @1 :Bool; + ws @2 :Bool; + wss @3 :Bool; +} + struct NodeInfo { networkClass @0 :NetworkClass; # network class of this node - dialInfoList @1 :List(DialInfo); # dial info for this node - relayDialInfoList @2 :List(DialInfo); # relay dial info for this node + outboundProtocols @1 :ProtocolSet; # protocols that can go outbound + dialInfoList @2 :List(DialInfo); # inbound dial info for this node + relayPeerInfo @3 :PeerInfo; # (optional) relay peer info for this node } struct SenderInfo { @@ -214,8 +235,7 @@ struct OperationReturnReceipt { struct OperationFindNodeQ { nodeId @0 :NodeID; # node id to locate - dialInfoList @1 :List(DialInfo); # dial info for the node asking the question - relayDialInfoList @2 :List(DialInfo); # relay dial info for the node asking the question + senderNodeInfo @1 :NodeInfo; # dial info for the node asking the question } struct PeerInfo { @@ -297,12 +317,11 @@ struct OperationFindBlockA { peers @2 :List(PeerInfo); # returned 'closer peer' information } -struct OperationSignalQ { - data @0 :Data; # the signalling system request -} - -struct OperationSignalA { - data @0 :Data; # the signalling system response +struct OperationSignal { + union { + holePunch @0 :SignalInfoHolePunch; + reverseConnect @1 :SignalInfoReverseConnect; + } } enum TunnelEndpointMode { @@ -318,9 +337,8 @@ enum TunnelError { } struct TunnelEndpoint { - nodeId @0 :NodeID; # node id - dialInfoList @1 :List(DialInfo); # how to reach the node - mode @2 :TunnelEndpointMode; # what kind of endpoint this is + mode @0 :TunnelEndpointMode; # what kind of endpoint this is + peerInfo @1 :PeerInfo; # node id and dialinfo } struct FullTunnel { @@ -406,17 +424,15 @@ struct Operation { findBlockQ @19 :OperationFindBlockQ; findBlockA @20 :OperationFindBlockA; - signalQ @21 :OperationSignalQ; - signalA @22 :OperationSignalA; - - returnReceipt @23 :OperationReturnReceipt; + signal @21 :OperationSignal; + returnReceipt @22 :OperationReturnReceipt; # Tunnel operations - startTunnelQ @24 :OperationStartTunnelQ; - startTunnelA @25 :OperationStartTunnelA; - completeTunnelQ @26 :OperationCompleteTunnelQ; - completeTunnelA @27 :OperationCompleteTunnelA; - cancelTunnelQ @28 :OperationCancelTunnelQ; - cancelTunnelA @29 :OperationCancelTunnelA; + startTunnelQ @23 :OperationStartTunnelQ; + startTunnelA @24 :OperationStartTunnelA; + completeTunnelQ @25 :OperationCompleteTunnelQ; + completeTunnelA @26 :OperationCompleteTunnelA; + cancelTunnelQ @27 :OperationCancelTunnelQ; + cancelTunnelA @28 :OperationCancelTunnelA; } } diff --git a/veilid-core/src/intf/native/network/mod.rs b/veilid-core/src/intf/native/network/mod.rs index 81d3e2f2..03f30575 100644 --- a/veilid-core/src/intf/native/network/mod.rs +++ b/veilid-core/src/intf/native/network/mod.rs @@ -66,6 +66,7 @@ struct NetworkUnlockedInner { // Background processes update_udpv4_dialinfo_task: TickTask, update_tcpv4_dialinfo_task: TickTask, + update_wsv4_dialinfo_task: TickTask, } #[derive(Clone)] @@ -107,6 +108,7 @@ impl Network { NetworkUnlockedInner { update_udpv4_dialinfo_task: TickTask::new(1), update_tcpv4_dialinfo_task: TickTask::new(1), + update_wsv4_dialinfo_task: TickTask::new(1), } } @@ -135,6 +137,15 @@ impl Network { Box::pin(this2.clone().update_tcpv4_dialinfo_task_routine(l, t)) }); } + // Set ws dialinfo tick task + { + let this2 = this.clone(); + this.unlocked_inner + .update_wsv4_dialinfo_task + .set_routine(move |l, t| { + Box::pin(this2.clone().update_wsv4_dialinfo_task_routine(l, t)) + }); + } this } @@ -289,7 +300,7 @@ impl Network { res } - async fn send_data_to_existing_connection( + pub async fn send_data_to_existing_connection( &self, descriptor: ConnectionDescriptor, data: Vec, @@ -380,41 +391,6 @@ impl Network { res } - // Send data to node - // We may not have dial info for a node, but have an existing connection for it - // because an inbound connection happened first, and no FindNodeQ has happened to that - // node yet to discover its dial info. The existing connection should be tried first - // in this case. - pub async fn send_data(&self, node_ref: NodeRef, data: Vec) -> Result<(), String> { - // First try to send data to the last socket we've seen this peer on - let data = if let Some(descriptor) = node_ref.last_connection() { - match self - .clone() - .send_data_to_existing_connection(descriptor, data) - .await - .map_err(logthru_net!())? - { - None => { - return Ok(()); - } - Some(d) => d, - } - } else { - data - }; - - // If that fails, try to make a connection or reach out to the peer via its dial info - let node_info = node_ref - .best_node_info() - .ok_or_else(|| "couldn't send data, no dial info or peer address".to_owned())?; - - xxx write logic to determine if a relay needs to be used first xxx - - self.send_data_to_dial_info(dial_info, data) - .await - .map_err(logthru_net!()) - } - ///////////////////////////////////////////////////////////////// pub fn get_protocol_config(&self) -> Option { @@ -433,28 +409,33 @@ impl Network { let protocol_config = { let c = self.config.get(); ProtocolConfig { - udp_enabled: c.network.protocol.udp.enabled && c.capabilities.protocol_udp, - tcp_connect: c.network.protocol.tcp.connect && c.capabilities.protocol_connect_tcp, - tcp_listen: c.network.protocol.tcp.listen && c.capabilities.protocol_accept_tcp, - ws_connect: c.network.protocol.ws.connect && c.capabilities.protocol_connect_ws, - ws_listen: c.network.protocol.ws.listen && c.capabilities.protocol_accept_ws, - wss_connect: c.network.protocol.wss.connect && c.capabilities.protocol_connect_wss, - wss_listen: c.network.protocol.wss.listen && c.capabilities.protocol_accept_wss, + inbound: ProtocolSet { + udp: c.network.protocol.udp.enabled && c.capabilities.protocol_udp, + tcp: c.network.protocol.tcp.listen && c.capabilities.protocol_accept_tcp, + ws: c.network.protocol.ws.listen && c.capabilities.protocol_accept_ws, + wss: c.network.protocol.wss.listen && c.capabilities.protocol_accept_wss, + }, + outbound: ProtocolSet { + udp: c.network.protocol.udp.enabled && c.capabilities.protocol_udp, + tcp: c.network.protocol.tcp.connect && c.capabilities.protocol_connect_tcp, + ws: c.network.protocol.ws.connect && c.capabilities.protocol_connect_ws, + wss: c.network.protocol.wss.connect && c.capabilities.protocol_connect_wss, + }, } }; self.inner.lock().protocol_config = Some(protocol_config); // start listeners - if protocol_config.udp_enabled { + if protocol_config.inbound.udp { self.start_udp_listeners().await?; } - if protocol_config.ws_listen { + if protocol_config.inbound.ws { self.start_ws_listeners().await?; } - if protocol_config.wss_listen { + if protocol_config.inbound.wss { self.start_wss_listeners().await?; } - if protocol_config.tcp_listen { + if protocol_config.inbound.tcp { self.start_tcp_listeners().await?; } @@ -503,7 +484,7 @@ impl Network { // Go through our global dialinfo and see what our best network class is let mut network_class = NetworkClass::Invalid; - for did in inner.routing_table.global_dial_info_details() { + for did in inner.routing_table.public_dial_info_details() { if let Some(nc) = did.network_class { if nc < network_class { network_class = nc; @@ -521,6 +502,7 @@ impl Network { protocol_config, udp_static_public_dialinfo, tcp_static_public_dialinfo, + ws_static_public_dialinfo, network_class, ) = { let inner = self.inner.lock(); @@ -529,6 +511,7 @@ impl Network { inner.protocol_config.unwrap_or_default(), inner.udp_static_public_dialinfo, inner.tcp_static_public_dialinfo, + inner.ws_static_public_dialinfo, inner.network_class.unwrap_or(NetworkClass::Invalid), ) }; @@ -538,15 +521,15 @@ impl Network { // If we can have public dialinfo, or we haven't figured out our network class yet, // and we're active for UDP, we should attempt to get our public dialinfo sorted out // and assess our network class if we haven't already - if protocol_config.udp_enabled + if protocol_config.inbound.udp && !udp_static_public_dialinfo && (network_class.inbound_capable() || network_class == NetworkClass::Invalid) { - let filter = DialInfoFilter::global() + let filter = DialInfoFilter::all() .with_protocol_type(ProtocolType::UDP) .with_address_type(AddressType::IPV4); let need_udpv4_dialinfo = routing_table - .first_filtered_dial_info_detail(&filter) + .first_public_filtered_dial_info_detail(&filter) .is_none(); if need_udpv4_dialinfo { // If we have no public UDPv4 dialinfo, then we need to run a NAT check @@ -559,15 +542,15 @@ impl Network { } // Same but for TCPv4 - if protocol_config.tcp_listen + if protocol_config.inbound.tcp && !tcp_static_public_dialinfo && (network_class.inbound_capable() || network_class == NetworkClass::Invalid) { - let filter = DialInfoFilter::global() + let filter = DialInfoFilter::all() .with_protocol_type(ProtocolType::TCP) .with_address_type(AddressType::IPV4); let need_tcpv4_dialinfo = routing_table - .first_filtered_dial_info_detail(&filter) + .first_public_filtered_dial_info_detail(&filter) .is_none(); if need_tcpv4_dialinfo { // If we have no public TCPv4 dialinfo, then we need to run a NAT check @@ -579,6 +562,24 @@ impl Network { } } + // Same but for WSv4 + if protocol_config.inbound.ws + && !ws_static_public_dialinfo + && (network_class.inbound_capable() || network_class == NetworkClass::Invalid) + { + let filter = DialInfoFilter::all() + .with_protocol_type(ProtocolType::WS) + .with_address_type(AddressType::IPV4); + let need_wsv4_dialinfo = routing_table + .first_public_filtered_dial_info_detail(&filter) + .is_none(); + if need_wsv4_dialinfo { + // If we have no public TCPv4 dialinfo, then we need to run a NAT check + // ensure the singlefuture is running for this + self.unlocked_inner.update_wsv4_dialinfo_task.tick().await?; + } + } + Ok(()) } } diff --git a/veilid-core/src/intf/native/network/network_tcp.rs b/veilid-core/src/intf/native/network/network_tcp.rs index 82cbadd8..50f47538 100644 --- a/veilid-core/src/intf/native/network/network_tcp.rs +++ b/veilid-core/src/intf/native/network/network_tcp.rs @@ -227,7 +227,7 @@ impl Network { for ip_addr in ip_addrs { let addr = SocketAddr::new(ip_addr, port); - let ldi_addrs = Self::translate_unspecified_address(&*(self.inner.lock()), &addr); + let idi_addrs = Self::translate_unspecified_address(&*(self.inner.lock()), &addr); // see if we've already bound to this already // if not, spawn a listener @@ -262,9 +262,9 @@ impl Network { )); } - // Return local dial infos we listen on - for ldi_addr in ldi_addrs { - out.push(SocketAddress::from_socket_addr(ldi_addr)); + // Return interface dial infos we listen on + for idi_addr in idi_addrs { + out.push(SocketAddress::from_socket_addr(idi_addr)); } } diff --git a/veilid-core/src/intf/native/network/network_udp.rs b/veilid-core/src/intf/native/network/network_udp.rs index a7213b49..33c405a0 100644 --- a/veilid-core/src/intf/native/network/network_udp.rs +++ b/veilid-core/src/intf/native/network/network_udp.rs @@ -173,13 +173,13 @@ impl Network { .inbound_udp_protocol_handlers .contains_key(&addr) { - let ldi_addrs = Self::translate_unspecified_address(&*self.inner.lock(), &addr); + let idi_addrs = Self::translate_unspecified_address(&*self.inner.lock(), &addr); self.clone().create_udp_inbound_socket(addr).await?; - // Return local dial infos we listen on - for ldi_addr in ldi_addrs { - out.push(DialInfo::udp_from_socketaddr(ldi_addr)); + // Return interface dial infos we listen on + for idi_addr in idi_addrs { + out.push(DialInfo::udp_from_socketaddr(idi_addr)); } } } diff --git a/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs b/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs index 58d9ae01..d473f4ef 100644 --- a/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs +++ b/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs @@ -60,9 +60,15 @@ impl Network { .with_protocol_type(protocol_type) .with_address_type(address_type); routing_table - .all_filtered_dial_info_details(&filter) + .interface_dial_info_details() .iter() - .map(|did| did.dial_info.socket_address()) + .filter_map(|did| { + if did.dial_info.matches_filter(&filter) { + Some(did.dial_info.socket_address()) + } else { + None + } + }) .collect() } @@ -130,7 +136,7 @@ impl Network { .await { // Add public dial info with Server network class - routing_table.register_dial_info( + routing_table.register_public_dial_info( external1_dial_info, DialInfoOrigin::Discovered, Some(NetworkClass::Server), @@ -151,7 +157,7 @@ impl Network { { // Got a port mapping, let's use it let external_mapped_dial_info = DialInfo::udp(external_mapped); - routing_table.register_dial_info( + routing_table.register_public_dial_info( external_mapped_dial_info, DialInfoOrigin::Mapped, Some(NetworkClass::Mapped), @@ -174,7 +180,7 @@ impl Network { { // Yes, another machine can use the dial info directly, so Full Cone // Add public dial info with full cone NAT network class - routing_table.register_dial_info( + routing_table.register_public_dial_info( external1_dial_info, DialInfoOrigin::Discovered, Some(NetworkClass::FullConeNAT), @@ -224,14 +230,14 @@ impl Network { .await { // Got a reply from a non-default port, which means we're only address restricted - routing_table.register_dial_info( + routing_table.register_public_dial_info( external1_dial_info, DialInfoOrigin::Discovered, Some(NetworkClass::AddressRestrictedNAT), ); } else { // Didn't get a reply from a non-default port, which means we are also port restricted - routing_table.register_dial_info( + routing_table.register_public_dial_info( external1_dial_info, DialInfoOrigin::Discovered, Some(NetworkClass::PortRestrictedNAT), @@ -258,4 +264,11 @@ impl Network { //Err("unimplemented".to_owned()) Ok(()) } + + pub async fn update_wsv4_dialinfo_task_routine(self, _l: u64, _t: u64) -> Result<(), String> { + log_net!("looking for wsv4 public dial info"); + // xxx + //Err("unimplemented".to_owned()) + Ok(()) + } } diff --git a/veilid-core/src/intf/native/network/start_protocols.rs b/veilid-core/src/intf/native/network/start_protocols.rs index 9ab710c8..0a13a8c2 100644 --- a/veilid-core/src/intf/native/network/start_protocols.rs +++ b/veilid-core/src/intf/native/network/start_protocols.rs @@ -287,22 +287,23 @@ impl Network { "UDP: starting listeners on port {} at {:?}", udp_port, ip_addrs ); - let dial_infos = self.create_udp_inbound_sockets(ip_addrs, udp_port).await?; + let dial_info_list = self.create_udp_inbound_sockets(ip_addrs, udp_port).await?; let mut static_public = false; - for di in &dial_infos { - // Register local dial info only here if we specify a public address + for di in &dial_info_list { + // If the local interface address is global, + // register global dial info if no public address is specified if public_address.is_none() && di.is_global() { - // Register global dial info if no public address is specified - routing_table.register_dial_info( + routing_table.register_public_dial_info( di.clone(), DialInfoOrigin::Static, Some(NetworkClass::Server), ); + static_public = true; - } else if di.is_local() { - // Register local dial info - routing_table.register_dial_info(di.clone(), DialInfoOrigin::Static, None); } + + // Register interface dial info as well since the address is on the local interface + routing_table.register_interface_dial_info(di.clone(), DialInfoOrigin::Static); } // Add static public dialinfo if it's configured @@ -315,7 +316,7 @@ impl Network { // Add all resolved addresses as public dialinfo for pdi_addr in &mut public_sockaddrs { - routing_table.register_dial_info( + routing_table.register_public_dial_info( DialInfo::udp_from_socketaddr(pdi_addr), DialInfoOrigin::Static, Some(NetworkClass::Server), @@ -375,22 +376,21 @@ impl Network { let di = DialInfo::try_ws(socket_address, global_url) .map_err(map_to_string) .map_err(logthru_net!(error))?; - routing_table.register_dial_info( + routing_table.register_public_dial_info( di, DialInfoOrigin::Static, Some(NetworkClass::Server), ); static_public = true; - } else if socket_address.address().is_local() { - // Build local dial info request url - let local_url = format!("ws://{}/{}", socket_address, path); - - // Create local dial info - let di = DialInfo::try_ws(socket_address, local_url) - .map_err(map_to_string) - .map_err(logthru_net!(error))?; - routing_table.register_dial_info(di, DialInfoOrigin::Static, None); } + // Build interface dial info request url + let interface_url = format!("ws://{}/{}", socket_address, path); + + // Create interface dial info + let di = DialInfo::try_ws(socket_address, interface_url) + .map_err(map_to_string) + .map_err(logthru_net!(error))?; + routing_table.register_interface_dial_info(di, DialInfoOrigin::Static); } // Add static public dialinfo if it's configured @@ -410,7 +410,7 @@ impl Network { .map_err(logthru_net!(error))?; for gsa in global_socket_addrs { - routing_table.register_dial_info( + routing_table.register_public_dial_info( DialInfo::try_ws(SocketAddress::from_socket_addr(gsa), url.clone()) .map_err(map_to_string) .map_err(logthru_net!(error))?, @@ -460,7 +460,7 @@ impl Network { .await?; trace!("WSS: listener started"); - // NOTE: No local dial info for WSS, as there is no way to connect to a local dialinfo via TLS + // NOTE: No interface dial info for WSS, as there is no way to connect to a local dialinfo via TLS // If the hostname is specified, it is the public dialinfo via the URL. If no hostname // is specified, then TLS won't validate, so no local dialinfo is possible. // This is not the case with unencrypted websockets, which can be specified solely by an IP address @@ -483,7 +483,7 @@ impl Network { .map_err(logthru_net!(error))?; for gsa in global_socket_addrs { - routing_table.register_dial_info( + routing_table.register_public_dial_info( DialInfo::try_wss(SocketAddress::from_socket_addr(gsa), url.clone()) .map_err(map_to_string) .map_err(logthru_net!(error))?, @@ -537,19 +537,17 @@ impl Network { for socket_address in socket_addresses { let di = DialInfo::tcp(socket_address); - // Register local dial info only here if we specify a public address + // Register global dial info if no public address is specified if public_address.is_none() && di.is_global() { - // Register global dial info if no public address is specified - routing_table.register_dial_info( + routing_table.register_public_dial_info( di.clone(), DialInfoOrigin::Static, Some(NetworkClass::Server), ); static_public = true; - } else if di.is_local() { - // Register local dial info - routing_table.register_dial_info(di.clone(), DialInfoOrigin::Static, None); } + // Register interface dial info + routing_table.register_interface_dial_info(di.clone(), DialInfoOrigin::Static); } // Add static public dialinfo if it's configured @@ -562,7 +560,7 @@ impl Network { // Add all resolved addresses as public dialinfo for pdi_addr in &mut public_sockaddrs { - routing_table.register_dial_info( + routing_table.register_public_dial_info( DialInfo::tcp_from_socketaddr(pdi_addr), DialInfoOrigin::Static, None, diff --git a/veilid-core/src/intf/wasm/network/mod.rs b/veilid-core/src/intf/wasm/network/mod.rs index ee79e9a1..7f7a20ea 100644 --- a/veilid-core/src/intf/wasm/network/mod.rs +++ b/veilid-core/src/intf/wasm/network/mod.rs @@ -75,7 +75,7 @@ impl Network { res } - async fn send_data_to_existing_connection( + pub async fn send_data_to_existing_connection( &self, descriptor: ConnectionDescriptor, data: Vec, @@ -137,31 +137,6 @@ impl Network { res } - pub async fn send_data(&self, node_ref: NodeRef, data: Vec) -> Result<(), String> { - // First try to send data to the last socket we've seen this peer on - let data = if let Some(descriptor) = node_ref.last_connection() { - match self - .clone() - .send_data_to_existing_connection(descriptor, data) - .await? - { - None => { - return Ok(()); - } - Some(d) => d, - } - } else { - data - }; - - // If that fails, try to make a connection or reach out to the peer via its dial info - let dial_info = node_ref - .best_dial_info() - .ok_or_else(|| "couldn't send data, no dial info or peer address".to_owned())?; - - self.send_data_to_dial_info(dial_info, data).await - } - ///////////////////////////////////////////////////////////////// pub async fn startup(&self) -> Result<(), String> { diff --git a/veilid-core/src/network_manager.rs b/veilid-core/src/network_manager.rs index f30154eb..49f2f31d 100644 --- a/veilid-core/src/network_manager.rs +++ b/veilid-core/src/network_manager.rs @@ -17,32 +17,8 @@ pub const IPADDR_MAX_INACTIVE_DURATION_US: u64 = 300_000_000u64; // 5 minutes #[derive(Copy, Clone, Debug, Default)] pub struct ProtocolConfig { - pub udp_enabled: bool, - pub tcp_connect: bool, - pub tcp_listen: bool, - pub ws_connect: bool, - pub ws_listen: bool, - pub wss_connect: bool, - pub wss_listen: bool, -} - -impl ProtocolConfig { - pub fn is_protocol_type_connect_enabled(&self, protocol_type: ProtocolType) -> bool { - match protocol_type { - ProtocolType::UDP => self.udp_enabled, - ProtocolType::TCP => self.tcp_connect, - ProtocolType::WS => self.ws_connect, - ProtocolType::WSS => self.wss_connect, - } - } - pub fn is_protocol_type_listen_enabled(&self, protocol_type: ProtocolType) -> bool { - match protocol_type { - ProtocolType::UDP => self.udp_enabled, - ProtocolType::TCP => self.tcp_listen, - ProtocolType::WS => self.ws_listen, - ProtocolType::WSS => self.wss_listen, - } - } + pub outbound: ProtocolSet, + pub inbound: ProtocolSet, } // Things we get when we start up and go away when we shut down @@ -91,6 +67,15 @@ impl Default for NetworkManagerStats { struct ClientWhitelistEntry { last_seen: u64, } + +// Mechanism required to contact another node +enum InboundMethod { + Direct, // Contact the node directly + SignalReverse, // Request via signal the node connect back directly + SignalHolePunch, // Request via signal the node negotiate a hole punch + Relay, // Must use a third party relay to reach the node +} + // The mutable state of the network manager struct NetworkManagerInner { routing_table: Option, @@ -476,12 +461,20 @@ impl NetworkManager { } // Called by the RPC handler when we want to issue an RPC request or response + // node_ref is the direct destination to which the envelope will be sent + // If 'node_id' is specified, it can be different than node_ref.node_id() + // which will cause the envelope to be relayed pub async fn send_envelope>( &self, node_ref: NodeRef, + node_id: Option, body: B, ) -> Result<(), String> { - log_net!("sending envelope to {:?}", node_ref); + if let Some(node_id) = node_id { + log_net!("sending envelope to {:?} via {:?}", node_id, node_ref); + } else { + log_net!("sending envelope to {:?}", node_ref); + } // Get node's min/max version and see if we can send to it // and if so, get the max version we can use let version = if let Some((node_min, node_max)) = node_ref.operate(|e| e.min_max_version()) @@ -503,11 +496,11 @@ impl NetworkManager { // Build the envelope to send let out = self - .build_envelope(node_ref.node_id(), version, body) + .build_envelope(node_id.unwrap_or_else(|| node_ref.node_id()), version, body) .map_err(logthru_rpc!(error))?; - // Send via relay if we have to - self.net().send_data(node_ref, out).await + // Send the envelope via whatever means necessary + self.send_data(node_ref, out).await } // Called by the RPC handler when we want to issue an direct receipt @@ -533,6 +526,203 @@ impl NetworkManager { } } + // Figure out how to reach a node + // Node info here must be the filtered kind, with only + fn get_inbound_method(&self, node_info: &NodeInfo) -> Result { + // Get our network class + let network_class = self.get_network_class().unwrap_or(NetworkClass::Invalid); + + // If we don't have a network class yet (no public dial info or haven't finished detection) + // then we just need to try to send to the best direct dial info because we won't + // know how to use relays effectively yet + if matches!(network_class, NetworkClass::Invalid) { + return Ok(InboundMethod::Direct); + } + + // Get the protocol of the best matching direct dial info + let protocol_type = node_info.dial_info_list.first().map(|d| d.protocol_type()); + + // Can the target node do inbound? + if node_info.network_class.inbound_capable() { + // Do we need to signal before going inbound? + if node_info.network_class.inbound_requires_signal() { + // Can we receive a direct reverse connection? + if network_class.inbound_capable() && !network_class.inbound_requires_signal() { + Ok(InboundMethod::SignalReverse) + } + // Is this a hole-punch capable protocol? + else if protocol_type == Some(ProtocolType::UDP) { + Ok(InboundMethod::SignalHolePunch) + } + // Otherwise we have to relay + else { + Ok(InboundMethod::Relay) + } + } + // Can go direct + else { + Ok(InboundMethod::Direct) + } + // If the other node is not inbound capable at all, it requires a relay + } else { + Ok(InboundMethod::Relay) + } + } + + // Send a reverse connection signal and wait for the return receipt over it + // Then send the data across the new connection + pub async fn do_reverse_connect( + &self, + best_node_info: &NodeInfo, + data: Vec, + ) -> Result<(), String> { + + // Get relay to signal from + let relay_nr = if let Some(rpi) = best_node_info.relay_peer_info { + // Get the noderef for this inbound relay + self.routing_table().register_node_with_node_info(rpi.node_id.key, rpi.node_info)?; + } else { + // If we don't have a relay dial info that matches our protocol configuration + // then we can't send to this node! + return Err("Can't send to this relay".to_owned()) + } + + + // Get the receipt timeout + let receipt_time = ms_to_us(self.config.get().network.reverse_connection_receipt_time_ms); + + // Build a return receipt for the signal + let (rcpt_data, eventual_value) = self + .generate_single_shot_receipt(receipt_time, []) + .map_err(map_error_string!())?; + + // Issue the signal + let rpc = self.rpc_processor(); + rpc.rpc_call_signal(dest, ) + + // Wait for the return receipt + match eventual_value.await { + ReceiptEvent::Returned => (), + ReceiptEvent::Expired => { + return Err("receipt was dropped before expiration".to_owned()); + } + ReceiptEvent::Cancelled => { + return Err("receipt was dropped before expiration".to_owned()); + } + }; + + // And now use the existing connection to send over + if let Some(descriptor) = node_ref.last_connection() { + match self + .net() + .send_data_to_existing_connection(descriptor, data) + .await + .map_err(logthru_net!())? + { + None => { + return Ok(()); + } + Some(d) => d, + } + } + Ok(()) + } + + // Send a hole punch signal and do a negotiating ping and wait for the return receipt + // Then send the data across the new connection + pub async fn do_hole_punch(&self, best_node_info: &NodeInfo, data: Vec) -> Result<(), String> { + if let Some(relay_dial_info) = node_info.relay_dial_info_list.first() { + self.net() + .do_hole_punch(relay_dial_info.clone(), data) + .await + .map_err(logthru_net!()) + } else { + // If we don't have a relay dial info that matches our protocol configuration + // then we can't send to this node! + Err("Can't send to this node yet".to_owned()) + } + } + + // Send raw data to a node + // + // We may not have dial info for a node, but have an existing connection for it + // because an inbound connection happened first, and no FindNodeQ has happened to that + // node yet to discover its dial info. The existing connection should be tried first + // in this case. + // + // Sending to a node requires determining a NetworkClass compatible mechanism + // + pub fn send_data(&self, node_ref: NodeRef, data: Vec) -> SystemPinBoxFuture> { + let this = self.clone(); + Box::pin(async move { + // First try to send data to the last socket we've seen this peer on + let data = if let Some(descriptor) = node_ref.last_connection() { + match this + .net() + .send_data_to_existing_connection(descriptor, data) + .await + .map_err(logthru_net!())? + { + None => { + return Ok(()); + } + Some(d) => d, + } + } else { + data + }; + + // If we don't have last_connection, try to reach out to the peer via its dial info + let best_node_info = match node_ref + .best_node_info() { + Some(ni) => ni, + None => { + // If neither this node nor its relays would never ever be + // reachable by any of our protocols + // then we need to go through the outbound relay + if let Some(relay_node) = this.relay_node() { + // We have an outbound relay, lets use it + return this.send_data(relay_node, data).await; + } + else { + // We have no way to reach the node nor an outbound relay to use + return Err("Can't reach this node".to_owned()); + } + } + }; + + // If we aren't using an outbound relay to reach this node, what inbound method do we use? + match this.get_inbound_method(&best_node_info)? { + InboundMethod::Direct => { + if let Some(dial_info) = best_node_info.dial_info_list.first() { + this.net() + .send_data_to_dial_info(dial_info.clone(), data) + .await + .map_err(logthru_net!()) + } else { + // If we don't have a direct dial info that matches our protocol configuration + // then we can't send to this node! + Err("Can't send to this node yet".to_owned()) + } + } + InboundMethod::SignalReverse => this.do_reverse_connect(&best_node_info, data).await, + InboundMethod::SignalHolePunch => this.do_hole_punch(&best_node_info, data).await, + InboundMethod::Relay => { + if let Some(rpi) = best_node_info.relay_peer_info { + // Get the noderef for this inbound relay + let inbound_relay_noderef = this.routing_table().register_node_with_node_info(rpi.node_id.key, rpi.node_info)?; + // Send to the inbound relay + this.send_data(inbound_relay_noderef, data).await + } else { + // If we don't have a relay dial info that matches our protocol configuration + // then we can't send to this node! + Err("Can't send to this relay".to_owned()) + } + } + } + }) + } + // Called when a packet potentially containing an RPC envelope is received by a low-level // network protocol handler. Processes the envelope, authenticates and decrypts the RPC message // and passes it to the RPC handler @@ -608,14 +798,7 @@ impl NetworkManager { // nodes are allowed to do this, for example PWA users let relay_nr = if self.check_client_whitelist(sender_id) { - // Cache the envelope information in the routing table - // let source_noderef = routing_table - // .register_node_with_existing_connection(envelope.get_sender_id(), descriptor, ts) - // .map_err(|e| format!("node id registration failed: {}", e))?; - // source_noderef.operate(|e| e.set_min_max_version(envelope.get_min_max_version())); - - // If the sender is in the client whitelist, allow a full resolve_node, - // which effectively lets the client use our routing table + // Full relay allowed, do a full resolve_node rpc.resolve_node(recipient_id).await.map_err(|e| { format!( "failed to resolve recipient node for relay, dropping outbound relayed packet...: {:?}", @@ -630,27 +813,16 @@ impl NetworkManager { // We should, because relays are chosen by nodes that have established connectivity and // should be mutually in each others routing tables. The node needing the relay will be // pinging this node regularly to keep itself in the routing table - if let Some(nr) = routing_table.lookup_node_ref(recipient_id) { - // ensure we have dial_info for the entry already, - if !nr.operate(|e| e.dial_infos().is_empty()) { - nr - } else { - return Err(format!( - "Inbound relay asked for recipient with no dial info: {}", - recipient_id - )); - } - } else { - return Err(format!( + routing_table.lookup_node_ref(recipient_id).ok_or_else(|| { + format!( "Inbound relay asked for recipient not in routing table: {}", recipient_id - )); - } + ) + })? }; - // Re-send the packet to the leased node - self.net() - .send_data(relay_nr, data.to_vec()) + // Relay the packet to the desired destination + self.send_data(relay_nr, data.to_vec()) .await .map_err(|e| format!("failed to forward envelope: {}", e))?; // Inform caller that we dealt with the envelope, but did not process it locally @@ -683,7 +855,7 @@ impl NetworkManager { } // Keep relays assigned and accessible - async fn relay_management_task_routine(self, last_ts: u64, cur_ts: u64) -> Result<(), String> { + async fn relay_management_task_routine(self, _last_ts: u64, cur_ts: u64) -> Result<(), String> { log_net!("--- network manager relay_management task"); // Get our node's current network class and do the right thing @@ -712,9 +884,9 @@ impl NetworkManager { let mut inner = self.inner.lock(); // Register new outbound relay - let nr = routing_table.register_node_with_dial_info( + let nr = routing_table.register_node_with_node_info( outbound_relay_peerinfo.node_id.key, - &outbound_relay_peerinfo.dial_infos, + outbound_relay_peerinfo.node_info, )?; inner.relay_node = Some(nr); } diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 65e6cc12..ede93f87 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -35,9 +35,10 @@ pub enum BucketEntryState { pub struct BucketEntry { pub(super) ref_count: u32, min_max_version: Option<(u8, u8)>, - seen_our_dial_info: bool, + seen_our_node_info: bool, last_connection: Option<(ConnectionDescriptor, u64)>, node_info: NodeInfo, + local_node_info: LocalNodeInfo, peer_stats: PeerStats, latency_stats_accounting: LatencyStatsAccounting, transfer_stats_accounting: TransferStatsAccounting, @@ -49,9 +50,10 @@ impl BucketEntry { Self { ref_count: 0, min_max_version: None, - seen_our_dial_info: false, + seen_our_node_info: false, last_connection: None, node_info: NodeInfo::default(), + local_node_info: LocalNodeInfo::default(), latency_stats_accounting: LatencyStatsAccounting::new(), transfer_stats_accounting: TransferStatsAccounting::new(), peer_stats: PeerStats { @@ -108,34 +110,20 @@ impl BucketEntry { pub fn update_node_info(&mut self, node_info: NodeInfo) { self.node_info = node_info } + pub fn update_local_node_info(&mut self, local_node_info: LocalNodeInfo) { + self.local_node_info = local_node_info + } pub fn node_info(&self) -> &NodeInfo { &self.node_info } - - pub fn first_filtered_node_info(&self, filter: F) -> Option - where - F: Fn(&DialInfo) -> bool, - { - let out = self.node_info.first_filtered(filter); - if out.dial_infos.is_empty() && out.relay_dial_infos.is_empty() { - None - } else { - Some(out) - } + pub fn local_node_info(&self) -> &LocalNodeInfo { + &self.local_node_info } - - pub fn all_filtered_node_info(&self, filter: F) -> NodeInfo - where - F: Fn(&DialInfo) -> bool, - { - self.node_info.all_filtered(filter) - } - - pub fn get_peer_info(&self, key: DHTKey, scope: PeerScope) -> PeerInfo { + pub fn peer_info(&self, key: DHTKey) -> PeerInfo { PeerInfo { node_id: NodeId::new(key), - node_info: self.all_filtered_node_info(|di| di.matches_peer_scope(scope)), + node_info: self.node_info.clone(), } } @@ -173,12 +161,12 @@ impl BucketEntry { self.peer_stats.status = Some(status); } - pub fn set_seen_our_dial_info(&mut self, seen: bool) { - self.seen_our_dial_info = seen; + pub fn set_seen_our_node_info(&mut self, seen: bool) { + self.seen_our_node_info = seen; } - pub fn has_seen_our_dial_info(&self) -> bool { - self.seen_our_dial_info + pub fn has_seen_our_node_info(&self) -> bool { + self.seen_our_node_info } ///// stats methods diff --git a/veilid-core/src/routing_table/debug.rs b/veilid-core/src/routing_table/debug.rs index 67c3f598..4f82e2ba 100644 --- a/veilid-core/src/routing_table/debug.rs +++ b/veilid-core/src/routing_table/debug.rs @@ -23,15 +23,15 @@ impl RoutingTable { out } pub fn debug_info_dialinfo(&self) -> String { - let ldis = self.local_dial_info_details(); - let gdis = self.global_dial_info_details(); + let ldis = self.interface_dial_info_details(); + let gdis = self.public_dial_info_details(); let mut out = String::new(); - out += "Local Dial Info Details:\n"; + out += "Interface Dial Info Details:\n"; for (n, ldi) in ldis.iter().enumerate() { out += &format!(" {:>2}: {:?}\n", n, ldi); } - out += "Global Dial Info Details:\n"; + out += "Public Dial Info Details:\n"; for (n, gdi) in gdis.iter().enumerate() { out += &format!(" {:>2}: {:?}\n", n, gdi); } diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index 71c5e483..6ddbce1e 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -12,7 +12,6 @@ impl RoutingTable { // Returns noderefs are are scoped to that address type only pub fn find_fast_nodes_filtered(&self, dial_info_filter: &DialInfoFilter) -> Vec { let dial_info_filter1 = dial_info_filter.clone(); - let dial_info_filter2 = dial_info_filter.clone(); self.find_fastest_nodes( // filter Some(Box::new( @@ -21,50 +20,53 @@ impl RoutingTable { .1 .as_ref() .unwrap() - .first_filtered_node_info(|di| di.matches_filter(&dial_info_filter1)) + .node_info() + .first_filtered_dial_info(|di| di.matches_filter(&dial_info_filter1)) .is_some() }, )), // transform - |e| { - NodeRef::new_filtered( - self.clone(), - *e.0, - e.1.as_mut().unwrap(), - dial_info_filter2.clone(), - ) - }, + |e| NodeRef::new(self.clone(), *e.0, e.1.as_mut().unwrap()), ) } - pub fn get_own_peer_info(&self, scope: PeerScope) -> PeerInfo { - let filter = DialInfoFilter::scoped(scope); + pub fn get_own_peer_info(&self) -> PeerInfo { let netman = self.network_manager(); + let enable_local_peer_scope = netman.config().get().network.enable_local_peer_scope; let relay_node = netman.relay_node(); PeerInfo { node_id: NodeId::new(self.node_id()), node_info: NodeInfo { network_class: netman.get_network_class().unwrap_or(NetworkClass::Invalid), - dial_infos: self - .all_filtered_dial_info_details(&filter) - .iter() - .map(|did| did.dial_info.clone()) - .collect(), - relay_dial_infos: relay_node - .map(|rn| rn.node_info().dial_infos) - .unwrap_or_default(), + outbound_protocols: netman.get_protocol_config().unwrap_or_default().outbound, + dial_info_list: if !enable_local_peer_scope { + self.public_dial_info_details() + .iter() + .map(|did| did.dial_info.clone()) + .collect() + } else { + self.public_dial_info_details() + .iter() + .map(|did| did.dial_info.clone()) + .chain( + self.interface_dial_info_details() + .iter() + .map(|did| did.dial_info.clone()), + ) + .collect() + }, + relay_peer_info: relay_node.map(|rn| Box::new(rn.peer_info())), }, } } pub fn transform_to_peer_info( kv: &mut (&DHTKey, Option<&mut BucketEntry>), - scope: PeerScope, own_peer_info: &PeerInfo, ) -> PeerInfo { match &kv.1 { None => own_peer_info.clone(), - Some(entry) => entry.get_peer_info(*kv.0, scope), + Some(entry) => entry.peer_info(*kv.0), } } diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 44cd29e6..f588602e 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -48,7 +48,8 @@ struct RoutingTableInner { node_id: DHTKey, node_id_secret: DHTKeySecret, buckets: Vec, - dial_info_details: Vec, + public_dial_info_details: Vec, + interface_dial_info_details: Vec, bucket_entry_count: usize, // Waiters @@ -89,7 +90,8 @@ impl RoutingTable { node_id: DHTKey::default(), node_id_secret: DHTKeySecret::default(), buckets: Vec::new(), - dial_info_details: Vec::new(), + public_dial_info_details: Vec::new(), + interface_dial_info_details: Vec::new(), bucket_entry_count: 0, eventual_changed_dial_info: Eventual::new(), self_latency_stats_accounting: LatencyStatsAccounting::new(), @@ -163,40 +165,42 @@ impl RoutingTable { self.inner.lock().node_id_secret } - pub fn has_local_dial_info(&self) -> bool { - self.first_filtered_dial_info_detail(&DialInfoFilter::local()) - .is_some() + pub fn has_interface_dial_info(&self) -> bool { + !self.inner.lock().interface_dial_info_details.is_empty() } - pub fn has_global_dial_info(&self) -> bool { - self.first_filtered_dial_info_detail(&DialInfoFilter::global()) - .is_some() + pub fn has_public_dial_info(&self) -> bool { + !self.inner.lock().public_dial_info_details.is_empty() } - pub fn global_dial_info_details(&self) -> Vec { - self.all_filtered_dial_info_details(&DialInfoFilter::global()) + pub fn public_dial_info_details(&self) -> Vec { + self.inner.lock().public_dial_info_details.clone() } - pub fn local_dial_info_details(&self) -> Vec { - self.all_filtered_dial_info_details(&DialInfoFilter::local()) + pub fn interface_dial_info_details(&self) -> Vec { + self.inner.lock().interface_dial_info_details.clone() } - pub fn first_filtered_dial_info_detail( + pub fn first_public_filtered_dial_info_detail( &self, filter: &DialInfoFilter, ) -> Option { let inner = self.inner.lock(); - for did in &inner.dial_info_details { + for did in &inner.public_dial_info_details { if did.matches_filter(filter) { return Some(did.clone()); } } None } - pub fn all_filtered_dial_info_details(&self, filter: &DialInfoFilter) -> Vec { + + pub fn all_public_filtered_dial_info_details( + &self, + filter: &DialInfoFilter, + ) -> Vec { let inner = self.inner.lock(); let mut ret = Vec::new(); - for did in &inner.dial_info_details { + for did in &inner.public_dial_info_details { if did.matches_filter(filter) { ret.push(did.clone()); } @@ -204,16 +208,48 @@ impl RoutingTable { ret } - pub fn register_dial_info( + pub fn first_interface_filtered_dial_info_detail( + &self, + filter: &DialInfoFilter, + ) -> Option { + let inner = self.inner.lock(); + for did in &inner.interface_dial_info_details { + if did.matches_filter(filter) { + return Some(did.clone()); + } + } + None + } + + pub fn all_interface_filtered_dial_info_details( + &self, + filter: &DialInfoFilter, + ) -> Vec { + let inner = self.inner.lock(); + let mut ret = Vec::new(); + for did in &inner.interface_dial_info_details { + if did.matches_filter(filter) { + ret.push(did.clone()); + } + } + ret + } + + pub fn register_public_dial_info( &self, dial_info: DialInfo, origin: DialInfoOrigin, network_class: Option, ) { let timestamp = get_timestamp(); + let enable_local_peer_scope = { + let c = self.network_manager().config().get(); + c.network.enable_local_peer_scope + }; + let mut inner = self.inner.lock(); - inner.dial_info_details.push(DialInfoDetail { + inner.public_dial_info_details.push(DialInfoDetail { dial_info: dial_info.clone(), origin, network_class, @@ -222,18 +258,11 @@ impl RoutingTable { // Re-sort dial info to endure preference ordering inner - .dial_info_details + .public_dial_info_details .sort_by(|a, b| a.dial_info.cmp(&b.dial_info)); info!( - "{}Dial Info: {}", - if dial_info.is_local() { - "Local " - } else if dial_info.is_global() { - "Global " - } else { - "Other " - }, + "Public Dial Info: {}", NodeDialInfo { node_id: NodeId::new(inner.node_id), dial_info @@ -246,9 +275,44 @@ impl RoutingTable { Self::trigger_changed_dial_info(&mut *inner); } + pub fn register_interface_dial_info(&self, dial_info: DialInfo, origin: DialInfoOrigin) { + let timestamp = get_timestamp(); + let enable_local_peer_scope = { + let c = self.network_manager().config().get(); + c.network.enable_local_peer_scope + }; + + let mut inner = self.inner.lock(); + + inner.interface_dial_info_details.push(DialInfoDetail { + dial_info: dial_info.clone(), + origin, + network_class: None, + timestamp, + }); + + // Re-sort dial info to endure preference ordering + inner + .interface_dial_info_details + .sort_by(|a, b| a.dial_info.cmp(&b.dial_info)); + + info!( + "Interface Dial Info: {}", + NodeDialInfo { + node_id: NodeId::new(inner.node_id), + dial_info + } + .to_string(), + ); + debug!(" Origin: {:?}", origin); + + Self::trigger_changed_dial_info(&mut *inner); + } + pub fn clear_dial_info_details(&self) { let mut inner = self.inner.lock(); - inner.dial_info_details.clear(); + inner.public_dial_info_details.clear(); + inner.interface_dial_info_details.clear(); Self::trigger_changed_dial_info(&mut *inner); } @@ -262,10 +326,10 @@ impl RoutingTable { } fn trigger_changed_dial_info(inner: &mut RoutingTableInner) { - // Clear 'seen dial info' bits on routing table entries so we know to ping them + // Clear 'seen node info' bits on routing table entries so we know to ping them for b in &mut inner.buckets { for e in b.entries_mut() { - e.1.set_seen_our_dial_info(false); + e.1.set_seen_our_node_info(false); } } // @@ -608,9 +672,10 @@ impl RoutingTable { .register_node_with_node_info( k, NodeInfo { - network_class: NetworkClass::Server, - dial_infos: v, - relay_dial_infos: Default::default(), + network_class: NetworkClass::Server, // Bootstraps are always full servers + outbound_protocols: ProtocolSet::default(), // Bootstraps do not participate in relaying and will not make outbound requests + dial_info_list: v, // Dial info is as specified in the bootstrap list + relay_peer_info: None, // Bootstraps never require a relay themselves }, ) .map_err(logthru_rtab!("Couldn't add bootstrap node: {}", k))?; diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 554972fb..8f92aebd 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -5,7 +5,6 @@ use alloc::fmt; pub struct NodeRef { routing_table: RoutingTable, node_id: DHTKey, - dial_info_filter: DialInfoFilter, } impl NodeRef { @@ -14,20 +13,6 @@ impl NodeRef { Self { routing_table, node_id: key, - dial_info_filter: DialInfoFilter::default(), - } - } - pub fn new_filtered( - routing_table: RoutingTable, - key: DHTKey, - entry: &mut BucketEntry, - dial_info_filter: DialInfoFilter, - ) -> Self { - entry.ref_count += 1; - Self { - routing_table, - node_id: key, - dial_info_filter, } } @@ -35,10 +20,6 @@ impl NodeRef { self.node_id } - pub fn dial_info_filter(&self) -> &DialInfoFilter { - &self.dial_info_filter - } - pub fn operate(&self, f: F) -> T where F: FnOnce(&mut BucketEntry) -> T, @@ -46,52 +27,23 @@ impl NodeRef { self.routing_table.operate_on_bucket_entry(self.node_id, f) } + pub fn peer_info(&self) -> PeerInfo { + self.operate(|e| e.peer_info(self.node_id())) + } pub fn node_info(&self) -> NodeInfo { self.operate(|e| e.node_info().clone()) } - - pub fn has_dial_info(&self) -> bool { - self.operate(|e| !e.node_info().dial_infos.is_empty()) + pub fn local_node_info(&self) -> LocalNodeInfo { + self.operate(|e| e.local_node_info().clone()) } - - // Returns if this node has seen and acknowledged our node's dial info yet - pub fn has_seen_our_dial_info(&self) -> bool { - self.operate(|e| e.has_seen_our_dial_info()) + pub fn has_seen_our_node_info(&self) -> bool { + self.operate(|e| e.has_seen_our_node_info()) } - pub fn set_seen_our_dial_info(&self) { - self.operate(|e| e.set_seen_our_dial_info(true)); - } - - // Returns the best node info to attempt a connection to this node - pub fn best_node_info(&self) -> Option { - let nm = self.routing_table.network_manager(); - let protocol_config = nm.get_protocol_config()?; - self.operate(|e| { - e.first_filtered_node_info(|di| { - // Does it match the dial info filter - if !di.matches_filter(&self.dial_info_filter) { - return false; - } - // Filter out dial infos that don't match our protocol config - // for outbound connections. This routine filters on 'connect' settings - // to ensure we connect using only the protocols we have enabled. - protocol_config.is_protocol_type_connect_enabled(di.protocol_type()) - }) - }) + pub fn set_seen_our_node_info(&self) { + self.operate(|e| e.set_seen_our_node_info(true)); } pub fn last_connection(&self) -> Option { - match self.operate(|e| e.last_connection()) { - None => None, - Some(c) => { - if !c.matches_filter(&self.dial_info_filter) { - return None; - } - // We don't filter this out by protocol config because if a connection - // succeeded, it's allowed to persist and be used for communication - // regardless of any other configuration - Some(c) - } - } + self.operate(|e| e.last_connection()) } } @@ -103,18 +55,13 @@ impl Clone for NodeRef { Self { routing_table: self.routing_table.clone(), node_id: self.node_id, - dial_info_filter: self.dial_info_filter.clone(), } } } impl fmt::Debug for NodeRef { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let mut out = self.node_id.encode(); - if !self.dial_info_filter.is_empty() { - out += &format!("{:?}", self.dial_info_filter); - } - write!(f, "{}", out) + write!(f, "{}", self.node_id.encode()) } } diff --git a/veilid-core/src/rpc_processor/coders/mod.rs b/veilid-core/src/rpc_processor/coders/mod.rs index f9594371..fc4968a6 100644 --- a/veilid-core/src/rpc_processor/coders/mod.rs +++ b/veilid-core/src/rpc_processor/coders/mod.rs @@ -7,8 +7,10 @@ mod node_status; mod nonce; mod peer_info; mod private_safety_route; +mod protocol_set; mod public_key; mod sender_info; +mod signal_info; mod socket_address; pub use address::*; @@ -20,6 +22,8 @@ pub use node_status::*; pub use nonce::*; pub use peer_info::*; pub use private_safety_route::*; +pub use protocol_set::*; pub use public_key::*; pub use sender_info::*; +pub use signal_info::*; pub use socket_address::*; diff --git a/veilid-core/src/rpc_processor/coders/node_info.rs b/veilid-core/src/rpc_processor/coders/node_info.rs index e88690b3..686cc1d2 100644 --- a/veilid-core/src/rpc_processor/coders/node_info.rs +++ b/veilid-core/src/rpc_processor/coders/node_info.rs @@ -7,38 +7,34 @@ pub fn encode_node_info( ) -> Result<(), RPCError> { builder.set_network_class(encode_network_class(node_info.network_class)); + let mut ps_builder = builder.reborrow().init_outbound_protocols(); + encode_protocol_set(&node_info.outbound_protocols, &mut ps_builder)?; + let mut dil_builder = builder.reborrow().init_dial_info_list( node_info - .dial_infos + .dial_info_list .len() .try_into() .map_err(map_error_protocol!("too many dial infos in node info"))?, ); - for idx in 0..node_info.dial_infos.len() { + for idx in 0..node_info.dial_info_list.len() { let mut di_builder = dil_builder.reborrow().get(idx as u32); - encode_dial_info(&node_info.dial_infos[idx], &mut di_builder)?; + encode_dial_info(&node_info.dial_info_list[idx], &mut di_builder)?; } - let mut rdil_builder = builder.reborrow().init_relay_dial_info_list( - node_info - .relay_dial_infos - .len() - .try_into() - .map_err(map_error_protocol!( - "too many relay dial infos in node info" - ))?, - ); - - for idx in 0..node_info.relay_dial_infos.len() { - let mut rdi_builder = rdil_builder.reborrow().get(idx as u32); - encode_dial_info(&node_info.relay_dial_infos[idx], &mut rdi_builder)?; + if let Some(rpi) = node_info.relay_peer_info { + let mut rpi_builder = builder.reborrow().init_relay_peer_info(); + encode_peer_info(&rpi, &mut rpi_builder)?; } Ok(()) } -pub fn decode_node_info(reader: &veilid_capnp::node_info::Reader) -> Result { +pub fn decode_node_info( + reader: &veilid_capnp::node_info::Reader, + allow_relay_peer_info: bool, +) -> Result { let network_class = decode_network_class( reader .reborrow() @@ -46,37 +42,47 @@ pub fn decode_node_info(reader: &veilid_capnp::node_info::Reader) -> Result::with_capacity( + let mut dial_info_list = Vec::::with_capacity( dil_reader .len() .try_into() .map_err(map_error_protocol!("too many dial infos"))?, ); for di in dil_reader.iter() { - dial_infos.push(decode_dial_info(&di)?) + dial_info_list.push(decode_dial_info(&di)?) } - let rdil_reader = reader - .reborrow() - .get_relay_dial_info_list() - .map_err(map_error_capnp_error!())?; - let mut relay_dial_infos = Vec::::with_capacity( - rdil_reader - .len() - .try_into() - .map_err(map_error_protocol!("too many relay dial infos"))?, - ); - for di in rdil_reader.iter() { - relay_dial_infos.push(decode_dial_info(&di)?) - } + let relay_peer_info = if allow_relay_peer_info { + if reader.has_relay_peer_info() { + Some(Box::new(decode_peer_info( + &reader + .reborrow() + .get_relay_peer_info() + .map_err(map_error_capnp_notinschema!())?, + false, + )?)) + } else { + None + } + } else { + None + }; Ok(NodeInfo { network_class, - dial_infos, - relay_dial_infos, + outbound_protocols, + dial_info_list, + relay_peer_info, }) } diff --git a/veilid-core/src/rpc_processor/coders/peer_info.rs b/veilid-core/src/rpc_processor/coders/peer_info.rs index 49d2c068..38d45387 100644 --- a/veilid-core/src/rpc_processor/coders/peer_info.rs +++ b/veilid-core/src/rpc_processor/coders/peer_info.rs @@ -14,7 +14,10 @@ pub fn encode_peer_info( Ok(()) } -pub fn decode_peer_info(reader: &veilid_capnp::peer_info::Reader) -> Result { +pub fn decode_peer_info( + reader: &veilid_capnp::peer_info::Reader, + allow_relay_peer_info: bool, +) -> Result { let nid_reader = reader .reborrow() .get_node_id() @@ -23,7 +26,7 @@ pub fn decode_peer_info(reader: &veilid_capnp::peer_info::Reader) -> Result Result<(), RPCError> { + builder.set_udp(protocol_set.udp); + builder.set_tcp(protocol_set.tcp); + builder.set_ws(protocol_set.ws); + builder.set_wss(protocol_set.wss); + + Ok(()) +} + +pub fn decode_protocol_set( + reader: &veilid_capnp::protocol_set::Reader, +) -> Result { + Ok(ProtocolSet { + udp: reader.reborrow().get_udp(), + tcp: reader.reborrow().get_tcp(), + ws: reader.reborrow().get_ws(), + wss: reader.reborrow().get_wss(), + }) +} diff --git a/veilid-core/src/rpc_processor/coders/signal_info.rs b/veilid-core/src/rpc_processor/coders/signal_info.rs new file mode 100644 index 00000000..497e52fb --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/signal_info.rs @@ -0,0 +1,86 @@ +use crate::*; +use rpc_processor::*; + +pub fn encode_signal_info( + signal_info: &SignalInfo, + builder: &mut veilid_capnp::operation_signal::Builder, +) -> Result<(), RPCError> { + match signal_info { + SignalInfo::HolePunch { receipt, node_info } => { + let mut hp_builder = builder.init_hole_punch(); + let rcpt_builder = + hp_builder + .reborrow() + .init_receipt(receipt.len().try_into().map_err(map_error_protocol!( + "invalid receipt length in hole punch signal info" + ))?); + rcpt_builder.copy_from_slice(receipt.as_slice()); + let mut ni_builder = hp_builder.init_node_info(); + encode_node_info(&node_info, &mut ni_builder)?; + } + SignalInfo::ReverseConnect { receipt, node_info } => { + let mut hp_builder = builder.init_reverse_connect(); + let rcpt_builder = + hp_builder + .reborrow() + .init_receipt(receipt.len().try_into().map_err(map_error_protocol!( + "invalid receipt length in reverse connect signal info" + ))?); + rcpt_builder.copy_from_slice(receipt.as_slice()); + let mut ni_builder = hp_builder.init_node_info(); + encode_node_info(&node_info, &mut ni_builder)?; + } + } + + Ok(()) +} + +pub fn decode_signal_info( + reader: &veilid_capnp::operation_signal::Reader, +) -> Result { + Ok( + match reader + .which() + .map_err(map_error_internal!("invalid signal operation"))? + { + veilid_capnp::operation_signal::HolePunch(r) => { + // Extract hole punch reader + let r = match r { + Ok(r) => r, + Err(_) => return Err(rpc_error_internal("invalid hole punch")), + }; + let receipt = r + .get_receipt() + .map_err(map_error_protocol!( + "invalid receipt in hole punch signal info" + ))? + .to_vec(); + let ni_reader = r.get_node_info().map_err(map_error_protocol!( + "invalid node info in hole punch signal info" + ))?; + let node_info = decode_node_info(&ni_reader, true)?; + + SignalInfo::HolePunch { receipt, node_info } + } + veilid_capnp::operation_signal::ReverseConnect(r) => { + // Extract reverse connect reader + let r = match r { + Ok(r) => r, + Err(_) => return Err(rpc_error_internal("invalid reverse connect")), + }; + let receipt = r + .get_receipt() + .map_err(map_error_protocol!( + "invalid receipt in reverse connect signal info" + ))? + .to_vec(); + let ni_reader = r.get_node_info().map_err(map_error_protocol!( + "invalid node info in reverse connect signal info" + ))?; + let node_info = decode_node_info(&ni_reader, true)?; + + SignalInfo::ReverseConnect { receipt, node_info } + } + }, + ) +} diff --git a/veilid-core/src/rpc_processor/debug.rs b/veilid-core/src/rpc_processor/debug.rs index 492375cf..dac121fb 100644 --- a/veilid-core/src/rpc_processor/debug.rs +++ b/veilid-core/src/rpc_processor/debug.rs @@ -4,10 +4,12 @@ use super::*; pub enum RPCError { Timeout, InvalidFormat, + Unreachable(DHTKey), Unimplemented(String), Protocol(String), Internal(String), } + pub fn rpc_error_internal>(x: T) -> RPCError { error!("RPCError Internal: {}", x.as_ref()); RPCError::Internal(x.as_ref().to_owned()) @@ -34,6 +36,7 @@ impl fmt::Display for RPCError { match self { RPCError::Timeout => write!(f, "[RPCError: Timeout]"), RPCError::InvalidFormat => write!(f, "[RPCError: InvalidFormat]"), + RPCError::Unreachable(k) => write!(f, "[RPCError: Unreachable({})]", k), RPCError::Unimplemented(s) => write!(f, "[RPCError: Unimplemented({})]", s), RPCError::Protocol(s) => write!(f, "[RPCError: Protocol({})]", s), RPCError::Internal(s) => write!(f, "[RPCError: Internal({})]", s), @@ -202,37 +205,55 @@ impl RPCProcessor { } }; - let dil_reader = match fnqr.reborrow().get_dial_info_list() { - Ok(dilr) => dilr, + let sni_reader = match fnqr.reborrow().get_sender_node_info() { + Ok(snir) => snir, Err(e) => { - return format!("(invalid dial info list: {})", e); + return format!("(invalid sender node info: {})", e); + } + }; + let sender_node_info = match decode_node_info(&sni_reader, true) { + Ok(v) => v, + Err(e) => { + return format!("(unable to decode node info: {})", e); } }; - let mut dial_infos = - Vec::::with_capacity(match dil_reader.len().try_into() { - Ok(v) => v, - Err(e) => { - return format!("(too many dial infos: {})", e); - } - }); - for di in dil_reader.iter() { - dial_infos.push(match decode_dial_info(&di) { - Ok(v) => v, - Err(e) => { - return format!("(unable to decode dial info: {})", e); - } - }); - } let node_id = decode_public_key(&nidr); format!( - "FindNodeQ: node_id={} dial_infos={:?}", + "FindNodeQ: node_id={} sender_node_info={:#?}", node_id.encode(), - dial_infos + sender_node_info ) } - veilid_capnp::operation::detail::FindNodeA(_) => { - format!("FindNodeA") + veilid_capnp::operation::detail::FindNodeA(d) => { + let fnar = match d { + Ok(fnar) => fnar, + Err(e) => { + return format!("(invalid detail: {})", e); + } + }; + + let p_reader = match fnar.reborrow().get_peers() { + Ok(pr) => pr, + Err(e) => { + return format!("(invalid sender node info: {})", e); + } + }; + let mut peers = Vec::::with_capacity(match p_reader.len().try_into() { + Ok(v) => v, + Err(e) => return format!("invalid peer count: {}", e), + }); + for p in p_reader.iter() { + let peer_info = match decode_peer_info(&p, true) { + Ok(v) => v, + Err(e) => { + return format!("(unable to decode peer info: {})", e); + } + }; + peers.push(peer_info); + } + + format!("FindNodeA: peers={:#?}", peers) } veilid_capnp::operation::detail::Route(_) => { format!("Route") @@ -270,11 +291,8 @@ impl RPCProcessor { veilid_capnp::operation::detail::FindBlockA(_) => { format!("FindBlockA") } - veilid_capnp::operation::detail::SignalQ(_) => { - format!("SignalQ") - } - veilid_capnp::operation::detail::SignalA(_) => { - format!("SignalA") + veilid_capnp::operation::detail::Signal(_) => { + format!("Signal") } veilid_capnp::operation::detail::ReturnReceipt(_) => { format!("ReturnReceipt") diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 6f59e536..9519beb0 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -23,8 +23,10 @@ type OperationId = u64; #[derive(Debug, Clone)] pub enum Destination { - Direct(NodeRef), - PrivateRoute(PrivateRoute), + Direct(NodeRef), // Can only be sent directly + Normal(NodeRef), // Can be sent via relays as well as directly + Relay(NodeRef, DHTKey), // Can only be sent via a relay + PrivateRoute(PrivateRoute), // Must be encapsulated in a private route } #[derive(Debug, Clone)] @@ -131,7 +133,7 @@ struct WaitableReply { #[derive(Clone, Debug, Default)] pub struct InfoAnswer { pub latency: u64, - pub node_info: NodeInfo, + pub node_status: NodeStatus, pub sender_info: SenderInfo, } @@ -159,7 +161,7 @@ pub struct RPCProcessorInner { pub struct RPCProcessor { crypto: Crypto, config: VeilidConfig, - default_peer_scope: PeerScope, + enable_local_peer_scope: bool, inner: Arc>, } @@ -181,16 +183,11 @@ impl RPCProcessor { Self { crypto: network_manager.crypto(), config: network_manager.config(), - default_peer_scope: if !network_manager + enable_local_peer_scope: network_manager .config() .get() .network - .enable_local_peer_scope - { - PeerScope::Global - } else { - PeerScope::All - }, + .enable_local_peer_scope, inner: Arc::new(Mutex::new(Self::new_inner(network_manager))), } } @@ -220,16 +217,18 @@ impl RPCProcessor { fn filter_peer_scope(&self, peer_info: &PeerInfo) -> bool { // reject attempts to include non-public addresses in results if self.default_peer_scope == PeerScope::Global { - for di in &peer_info.node_info.dial_infos { + for di in &peer_info.node_info.dial_info_list { if !di.is_global() { // non-public address causes rejection return false; } } - for di in &peer_info.node_info.relay_dial_infos { - if !di.is_global() { - // non-public address causes rejection - return false; + if let Some(rpi) = peer_info.node_info.relay_peer_info { + for di in &rpi.node_info.dial_info_list { + if !di.is_global() { + // non-public address causes rejection + return false; + } } } } @@ -277,9 +276,9 @@ impl RPCProcessor { // First see if we have the node in our routing table already if let Some(nr) = routing_table.lookup_node_ref(node_id) { - // ensure we have dial_info for the entry already, + // ensure we have some dial info for the entry already, // if not, we should do the find_node anyway - if !nr.has_dial_info() { + if !nr.has_any_dial_info() { return Ok(nr); } } @@ -377,8 +376,8 @@ impl RPCProcessor { } } Ok((rpcreader, _)) => { - // Note that we definitely received this peer info since we got a reply - waitable_reply.node_ref.set_seen_our_dial_info(); + // Note that we definitely received this node info since we got a reply + waitable_reply.node_ref.set_seen_our_node_info(); // Reply received let recv_ts = get_timestamp(); @@ -434,10 +433,20 @@ impl RPCProcessor { let out; // To where are we sending the request - match dest { - Destination::Direct(node_ref) => { + match &dest { + Destination::Direct(node_ref) | Destination::Normal(node_ref) => { // Send to a node without a private route // -------------------------------------- + + // Get the actual destination node id, accounting for outbound relaying + let (node_ref, node_id) = if matches!(dest, Destination::Normal(_)) { + self.get_direct_destination(node_ref.clone())? + } else { + let node_id = node_ref.node_id(); + (node_ref.clone(), node_id) + }; + + // Handle the existence of safety route match safety_route_spec { None => { // If no safety route is being used, and we're not sending to a private @@ -445,7 +454,7 @@ impl RPCProcessor { out = reader_to_vec(message)?; // Message goes directly to the node - out_node_id = node_ref.node_id(); + out_node_id = node_id; out_noderef = Some(node_ref); hopcount = 1; } @@ -454,7 +463,7 @@ impl RPCProcessor { // but we are using a safety route, so we must create an empty private route let mut pr_builder = ::capnp::message::Builder::new_default(); let private_route = - self.new_stub_private_route(node_ref.node_id(), &mut pr_builder)?; + self.new_stub_private_route(node_id, &mut pr_builder)?; let message_vec = reader_to_vec(message)?; // first @@ -487,7 +496,7 @@ impl RPCProcessor { None => { // If no safety route, the first node is the first hop of the private route hopcount = private_route.hop_count as usize; - let out_node_id = match private_route.hops { + let out_node_id = match &private_route.hops { Some(rh) => rh.dial_info.node_id.key, _ => return Err(rpc_error_internal("private route has no hops")), }; @@ -547,7 +556,7 @@ impl RPCProcessor { let bytes = out.len() as u64; if let Err(e) = self .network_manager() - .send_envelope(node_ref.clone(), out) + .send_envelope(node_ref.clone(), Some(out_node_id), out) .await .map_err(logthru_rpc!(error)) .map_err(RPCError::Internal) @@ -735,7 +744,7 @@ impl RPCProcessor { // Send the reply let bytes = out.len() as u64; self.network_manager() - .send_envelope(node_ref.clone(), out) + .send_envelope(node_ref.clone(), Some(out_node_id), out) .await .map_err(RPCError::Internal)?; @@ -762,17 +771,17 @@ impl RPCProcessor { } } - fn get_respond_to_sender_dial_info( + fn get_respond_to_sender_node_info( &self, operation: &veilid_capnp::operation::Reader, - ) -> Result, RPCError> { - if let veilid_capnp::operation::respond_to::Sender(Ok(sender_di_reader)) = operation + ) -> Result, RPCError> { + if let veilid_capnp::operation::respond_to::Sender(Ok(sender_ni_reader)) = operation .get_respond_to() .which() .map_err(map_error_capnp_notinschema!())? { // Sender DialInfo was specified, update our routing table with it - Ok(Some(decode_dial_info(&sender_di_reader)?)) + Ok(Some(decode_node_info(&sender_ni_reader, true)?)) } else { Ok(None) } @@ -809,17 +818,17 @@ impl RPCProcessor { }; // Parse out fields - let node_info = decode_node_info( + let node_status = decode_node_status( &iq_reader - .get_node_info() - .map_err(map_error_internal!("no valid node info"))?, + .get_node_status() + .map_err(map_error_internal!("no valid node status"))?, )?; - // add node information for the requesting node to our routing table + // update node status for the requesting node to our routing table if let Some(sender_nr) = rpcreader.opt_sender_nr.clone() { - // Update latest node info in routing table for the infoq sender + // Update latest node status in routing table for the infoq sender sender_nr.operate(|e| { - e.update_node_info(node_info); + e.update_node_status(node_status); }); } @@ -831,10 +840,12 @@ impl RPCProcessor { respond_to.set_none(()); let detail = answer.reborrow().init_detail(); let mut info_a = detail.init_info_a(); - // Add node info - let node_info = self.network_manager().generate_node_info(); - let mut nib = info_a.reborrow().init_node_info(); - encode_node_info(&node_info, &mut nib)?; + + // Add node status + let node_status = self.network_manager().generate_node_status(); + let mut nsb = info_a.reborrow().init_node_status(); + encode_node_status(&node_status, &mut nsb)?; + // Add sender info let sender_info = self.generate_sender_info(&rpcreader); let mut sib = info_a.reborrow().init_sender_info(); @@ -898,7 +909,7 @@ impl RPCProcessor { for peer in peers { // See if this peer will validate dial info let will_validate_dial_info = peer.operate(|e: &mut BucketEntry| { - if let Some(ni) = &e.peer_stats().node_info { + if let Some(ni) = &e.peer_stats().status { ni.will_validate_dial_info } else { true @@ -980,23 +991,14 @@ impl RPCProcessor { .map_err(logthru_rpc!())?, ); - // get the peerinfo/dialinfos of the requesting node - let dil_reader = fnq_reader + // get the sender NodeInfo of the requesting node + let sni_reader = fnq_reader .reborrow() - .get_dial_info_list() + .get_sender_node_info() .map_err(map_error_capnp_error!())?; - let mut dial_infos = Vec::::with_capacity( - dil_reader - .len() - .try_into() - .map_err(map_error_protocol!("too many dial infos"))?, - ); - for di in dil_reader.iter() { - dial_infos.push(decode_dial_info(&di)?) - } let peer_info = PeerInfo { node_id: NodeId::new(rpcreader.header.envelope.get_sender_id()), - dial_infos, + node_info: decode_node_info(&sni_reader, true)?, }; // filter out attempts to pass non-public addresses in for peers @@ -1007,19 +1009,17 @@ impl RPCProcessor { // add node information for the requesting node to our routing table let routing_table = self.routing_table(); let _requesting_node_ref = routing_table - .register_node_with_dial_info(peer_info.node_id.key, &peer_info.dial_infos) + .register_node_with_node_info(peer_info.node_id.key, peer_info.node_info) .map_err(map_error_string!())?; // find N nodes closest to the target node in our routing table - let own_peer_info = routing_table.get_own_peer_info(self.default_peer_scope); + let own_peer_info = routing_table.get_own_peer_info(); let closest_nodes = routing_table.find_closest_nodes( target_node_id, // filter None, // transform - |e| { - RoutingTable::transform_to_peer_info(e, self.default_peer_scope, &own_peer_info) - }, + |e| RoutingTable::transform_to_peer_info(e, &own_peer_info), ); log_rpc!(">>>> Returning {} closest peers", closest_nodes.len()); @@ -1076,8 +1076,8 @@ impl RPCProcessor { Err(rpc_error_unimplemented("process_find_block_q")) } - async fn process_signal_q(&self, _rpcreader: RPCMessageReader) -> Result<(), RPCError> { - Err(rpc_error_unimplemented("process_signal_q")) + async fn process_signal(&self, _rpcreader: RPCMessageReader) -> Result<(), RPCError> { + Err(rpc_error_unimplemented("process_signal")) } async fn process_return_receipt(&self, rpcreader: RPCMessageReader) -> Result<(), RPCError> { @@ -1176,28 +1176,27 @@ impl RPCProcessor { veilid_capnp::operation::detail::SupplyBlockA(_) => (14u32, false), veilid_capnp::operation::detail::FindBlockQ(_) => (15u32, true), veilid_capnp::operation::detail::FindBlockA(_) => (16u32, false), - veilid_capnp::operation::detail::SignalQ(_) => (17u32, true), - veilid_capnp::operation::detail::SignalA(_) => (18u32, false), - veilid_capnp::operation::detail::ReturnReceipt(_) => (19u32, true), - veilid_capnp::operation::detail::StartTunnelQ(_) => (20u32, true), - veilid_capnp::operation::detail::StartTunnelA(_) => (21u32, false), - veilid_capnp::operation::detail::CompleteTunnelQ(_) => (22u32, true), - veilid_capnp::operation::detail::CompleteTunnelA(_) => (23u32, false), - veilid_capnp::operation::detail::CancelTunnelQ(_) => (24u32, true), - veilid_capnp::operation::detail::CancelTunnelA(_) => (25u32, false), + veilid_capnp::operation::detail::Signal(_) => (17u32, true), + veilid_capnp::operation::detail::ReturnReceipt(_) => (18u32, true), + veilid_capnp::operation::detail::StartTunnelQ(_) => (19u32, true), + veilid_capnp::operation::detail::StartTunnelA(_) => (20u32, false), + veilid_capnp::operation::detail::CompleteTunnelQ(_) => (21u32, true), + veilid_capnp::operation::detail::CompleteTunnelA(_) => (22u32, false), + veilid_capnp::operation::detail::CancelTunnelQ(_) => (23u32, true), + veilid_capnp::operation::detail::CancelTunnelA(_) => (24u32, false), }; // Accounting for questions we receive if is_q { - // See if we have some Sender DialInfo to incorporate + // See if we have some Sender NodeInfo to incorporate opt_sender_nr = - if let Some(sender_di) = self.get_respond_to_sender_dial_info(&operation)? { - // Sender DialInfo was specified, update our routing table with it + if let Some(sender_ni) = self.get_respond_to_sender_node_info(&operation)? { + // Sender NodeInfo was specified, update our routing table with it let nr = self .routing_table() - .update_node_with_single_dial_info( + .register_node_with_node_info( msg.header.envelope.get_sender_id(), - &sender_di, + sender_ni, ) .map_err(RPCError::Internal)?; Some(nr) @@ -1251,15 +1250,14 @@ impl RPCProcessor { 14 => self.process_answer(rpcreader).await, // SupplyBlockA 15 => self.process_find_block_q(rpcreader).await, // FindBlockQ 16 => self.process_answer(rpcreader).await, // FindBlockA - 17 => self.process_signal_q(rpcreader).await, // SignalQ - 18 => self.process_answer(rpcreader).await, // SignalA - 19 => self.process_return_receipt(rpcreader).await, // ReturnReceipt - 20 => self.process_start_tunnel_q(rpcreader).await, // StartTunnelQ - 21 => self.process_answer(rpcreader).await, // StartTunnelA - 22 => self.process_complete_tunnel_q(rpcreader).await, // CompleteTunnelQ - 23 => self.process_answer(rpcreader).await, // CompleteTunnelA - 24 => self.process_cancel_tunnel_q(rpcreader).await, // CancelTunnelQ - 25 => self.process_answer(rpcreader).await, // CancelTunnelA + 17 => self.process_signal(rpcreader).await, // SignalQ + 18 => self.process_return_receipt(rpcreader).await, // ReturnReceipt + 19 => self.process_start_tunnel_q(rpcreader).await, // StartTunnelQ + 20 => self.process_answer(rpcreader).await, // StartTunnelA + 21 => self.process_complete_tunnel_q(rpcreader).await, // CompleteTunnelQ + 22 => self.process_answer(rpcreader).await, // CompleteTunnelA + 23 => self.process_cancel_tunnel_q(rpcreader).await, // CancelTunnelQ + 24 => self.process_answer(rpcreader).await, // CancelTunnelA _ => panic!("must update rpc table"), } } @@ -1361,19 +1359,15 @@ impl RPCProcessor { // Gets a 'RespondTo::Sender' that contains either our dial info, // or None if the peer has seen our dial info before pub fn get_respond_to_sender(&self, peer: NodeRef) -> RespondTo { - if peer.has_seen_our_dial_info() { + if peer.has_seen_our_node_info() { RespondTo::Sender(None) - } else if let Some(did) = self - .routing_table() - .first_filtered_dial_info_detail(peer.dial_info_filter()) - { - RespondTo::Sender(Some(did.dial_info)) } else { - RespondTo::Sender(None) + RespondTo::Sender(Some(self.routing_table().get_own_peer_info().node_info)) } } // Send InfoQ RPC request, receive InfoA answer + // Can be sent via relays, but not via routes pub async fn rpc_call_info(self, peer: NodeRef) -> Result { let info_q_msg = { let mut info_q_msg = ::capnp::message::Builder::new_default(); @@ -1384,9 +1378,9 @@ impl RPCProcessor { .encode(&mut respond_to)?; let detail = question.reborrow().init_detail(); let mut iqb = detail.init_info_q(); - let mut node_info_builder = iqb.reborrow().init_node_info(); - let node_info = self.network_manager().generate_node_info(); - encode_node_info(&node_info, &mut node_info_builder)?; + let mut node_status_builder = iqb.reborrow().init_node_status(); + let node_status = self.network_manager().generate_node_status(); + encode_node_status(&node_status, &mut node_status_builder)?; info_q_msg.into_reader() }; @@ -1418,13 +1412,13 @@ impl RPCProcessor { }; // Decode node info - if !info_a.has_node_info() { - return Err(rpc_error_internal("Missing node info")); + if !info_a.has_node_status() { + return Err(rpc_error_internal("Missing node status")); } - let nir = info_a - .get_node_info() - .map_err(map_error_internal!("Broken node info"))?; - let node_info = decode_node_info(&nir)?; + let nsr = info_a + .get_node_status() + .map_err(map_error_internal!("Broken node status"))?; + let node_status = decode_node_status(&nsr)?; // Decode sender info let sender_info = if info_a.has_sender_info() { @@ -1436,21 +1430,22 @@ impl RPCProcessor { SenderInfo::default() }; - // Update latest node info in routing table + // Update latest node status in routing table peer.operate(|e| { - e.update_node_info(node_info.clone()); + e.update_node_status(node_status.clone()); }); // Return the answer for anyone who may care let out = InfoAnswer { latency, - node_info, + node_status, sender_info, }; Ok(out) } + // Can only be sent directly, not via relays or routes pub async fn rpc_call_validate_dial_info( &self, peer: NodeRef, @@ -1496,7 +1491,8 @@ impl RPCProcessor { }; // Send the validate_dial_info request - self.request(Destination::Direct(peer.clone()), vdi_msg, None) + // This can only be sent directly, as relays can not validate dial info + self.request(Destination::Direct(peer), vdi_msg, None) .await?; // Wait for receipt @@ -1510,6 +1506,7 @@ impl RPCProcessor { } // Send FindNodeQ RPC request, receive FindNodeA answer + // Can be sent via all methods including relays and routes pub async fn rpc_call_find_node( self, dest: Destination, @@ -1528,22 +1525,10 @@ impl RPCProcessor { let mut node_id_builder = fnq.reborrow().init_node_id(); encode_public_key(&key, &mut node_id_builder)?; - let own_peer_info = self - .routing_table() - .get_own_peer_info(self.default_peer_scope); + let own_peer_info = self.routing_table().get_own_peer_info(); - let mut dil_builder = fnq.reborrow().init_dial_info_list( - own_peer_info - .dial_infos - .len() - .try_into() - .map_err(map_error_internal!("too many dial infos in peer info"))?, - ); - - for idx in 0..own_peer_info.dial_infos.len() { - let mut di_builder = dil_builder.reborrow().get(idx as u32); - encode_dial_info(&own_peer_info.dial_infos[idx], &mut di_builder)?; - } + let mut ni_builder = fnq.reborrow().init_sender_node_info(); + encode_node_info(&own_peer_info.node_info, &mut ni_builder)?; find_node_q_msg.into_reader() }; @@ -1584,7 +1569,7 @@ impl RPCProcessor { .map_err(map_error_internal!("too many peers"))?, ); for p in peers_reader.iter() { - let peer_info = decode_peer_info(&p)?; + let peer_info = decode_peer_info(&p, true)?; if !self.filter_peer_scope(&peer_info) { return Err(RPCError::InvalidFormat); @@ -1598,5 +1583,35 @@ impl RPCProcessor { Ok(out) } + // Sends a unidirectional signal to a node + // Can be sent via all methods including relays and routes + pub async fn rpc_call_signal( + &self, + dest: Destination, + relay_dial_info: DialInfo, + safety_route: Option<&SafetyRouteSpec>, + signal_info: SignalInfo, + ) -> Result<(), RPCError> { + let network_manager = self.network_manager(); + // + let sig_msg = { + let mut sig_msg = ::capnp::message::Builder::new_default(); + let mut question = sig_msg.init_root::(); + question.set_op_id(self.get_next_op_id()); + let mut respond_to = question.reborrow().init_respond_to(); + respond_to.set_none(()); + let detail = question.reborrow().init_detail(); + let mut sig_builder = detail.init_signal(); + encode_signal_info(&signal_info, &mut sig_builder)?; + + sig_msg.into_reader() + }; + + // Send the signal request + self.request(dest, sig_msg, safety_route).await?; + + Ok(()) + } + // xxx do not process latency for routed messages } diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 76d69f04..0b49aa6d 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -110,6 +110,9 @@ impl fmt::Display for VeilidAPIError { fn convert_rpc_error(x: RPCError) -> VeilidAPIError { match x { RPCError::Timeout => VeilidAPIError::Timeout, + RPCError::Unreachable(n) => VeilidAPIError::NodeNotFound { + node_id: NodeId::new(n), + }, RPCError::Unimplemented(s) => VeilidAPIError::Unimplemented { message: s }, RPCError::Internal(s) => VeilidAPIError::Internal { message: s }, RPCError::Protocol(s) => VeilidAPIError::Internal { message: s }, @@ -324,50 +327,54 @@ pub struct NodeStatus { #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct NodeInfo { pub network_class: NetworkClass, - pub dial_infos: Vec, - pub relay_dial_infos: Vec, + pub outbound_protocols: ProtocolSet, + pub dial_info_list: Vec, + pub relay_peer_info: Option>, +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct LocalNodeInfo { + pub outbound_protocols: ProtocolSet, + pub dial_info_list: Vec, } impl NodeInfo { - pub fn first_filtered(&self, filter: F) -> NodeInfo + pub fn first_filtered_dial_info(&self, filter: F) -> Option where F: Fn(&DialInfo) -> bool, { - let mut node_info = NodeInfo::default(); - node_info.network_class = self.network_class; - - for di in &self.dial_infos { + for di in &self.dial_info_list { if filter(di) { - node_info.dial_infos.push(di.clone()); - break; + return Some(di.clone()); } } - for di in &self.relay_dial_infos { - if filter(di) { - node_info.relay_dial_infos.push(di.clone()); - break; - } - } - node_info + None } - pub fn all_filtered(&self, filter: F) -> NodeInfo + + pub fn all_filtered_dial_info(&self, filter: F) -> Vec where F: Fn(&DialInfo) -> bool, { - let mut node_info = NodeInfo::default(); - node_info.network_class = self.network_class; + let mut dial_info_list = Vec::new(); - for di in &self.dial_infos { + for di in &self.dial_info_list { if filter(di) { - node_info.dial_infos.push(di.clone()); + dial_info_list.push(di.clone()); } } - for di in &self.relay_dial_infos { - if filter(di) { - node_info.relay_dial_infos.push(di.clone()); - } - } - node_info + dial_info_list + } + + pub fn has_any_dial_info(&self) -> bool { + !self.dial_info_list.is_empty() + || !self + .relay_peer_info + .map(|rpi| rpi.node_info.has_direct_dial_info()) + .unwrap_or_default() + } + + pub fn has_direct_dial_info(&self) -> bool { + !self.dial_info_list.is_empty() } } @@ -381,6 +388,28 @@ pub enum ProtocolType { WSS, } +#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize)] +pub struct ProtocolSet { + pub udp: bool, + pub tcp: bool, + pub ws: bool, + pub wss: bool, +} + +impl ProtocolSet { + pub fn is_protocol_type_enabled(&self, protocol_type: ProtocolType) -> bool { + match protocol_type { + ProtocolType::UDP => self.udp, + ProtocolType::TCP => self.tcp, + ProtocolType::WS => self.ws, + ProtocolType::WSS => self.wss, + } + } + pub fn filter_dial_info(&self, di: &DialInfo) -> bool { + self.is_protocol_type_enabled(di.protocol_type()) + } +} + #[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)] pub enum AddressType { IPV4, @@ -1057,7 +1086,25 @@ cfg_if! { Arc) -> SystemPinBoxFuture<()> + Send + Sync + 'static>; } } +///////////////////////////////////////////////////////////////////////////////////////////////////// +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum SignalInfo { + HolePunch { + // UDP Hole Punch Request + receipt: Vec, // Receipt to be returned after the hole punch + node_info: NodeInfo, // Sender's node info + }, + ReverseConnect { + // Reverse Connection Request + receipt: Vec, // Receipt to be returned by the reverse connection + node_info: NodeInfo, // Sender's node info + }, + // XXX: WebRTC + // XXX: App-level signalling +} + +///////////////////////////////////////////////////////////////////////////////////////////////////// #[derive(Clone, Debug, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize)] pub enum TunnelMode { Raw, @@ -1139,14 +1186,6 @@ pub struct RoutingContextOptions { ///////////////////////////////////////////////////////////////////////////////////////////////////// -#[derive(Clone, Debug, Default, Serialize, Deserialize)] -pub struct SearchDHTAnswer { - pub node_id: NodeId, - pub dial_info: Vec, -} - -///////////////////////////////////////////////////////////////////////////////////////////////////// - pub struct RoutingContextInner { api: VeilidAPI, options: RoutingContextOptions, @@ -1397,7 +1436,7 @@ impl VeilidAPI { .map_err(map_rpc_error!()) } - pub async fn search_dht(&self, node_id: NodeId) -> Result { + pub async fn search_dht(&self, node_id: NodeId) -> Result { let rpc_processor = self.rpc_processor()?; let config = self.config()?; let (count, fanout, timeout) = { @@ -1414,18 +1453,12 @@ impl VeilidAPI { .await .map_err(map_rpc_error!())?; - let answer = node_ref.operate(|e| SearchDHTAnswer { - node_id: NodeId::new(node_ref.node_id()), - dial_info: e.dial_infos().to_vec(), - }); + let answer = node_ref.peer_info(); Ok(answer) } - pub async fn search_dht_multi( - &self, - node_id: NodeId, - ) -> Result, VeilidAPIError> { + pub async fn search_dht_multi(&self, node_id: NodeId) -> Result, VeilidAPIError> { let rpc_processor = self.rpc_processor()?; let config = self.config()?; let (count, fanout, timeout) = { @@ -1442,14 +1475,7 @@ impl VeilidAPI { .await .map_err(map_rpc_error!())?; - let mut answer = Vec::::new(); - for nr in node_refs { - let a = nr.operate(|e| SearchDHTAnswer { - node_id: NodeId::new(nr.node_id()), - dial_info: e.dial_infos().to_vec(), - }); - answer.push(a); - } + let answer = node_refs.iter().map(|x| x.peer_info()).collect(); Ok(answer) } diff --git a/veilid-core/src/veilid_config.rs b/veilid-core/src/veilid_config.rs index 93f40ff1..4666df2d 100644 --- a/veilid-core/src/veilid_config.rs +++ b/veilid-core/src/veilid_config.rs @@ -15,7 +15,7 @@ cfg_if! { } } -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct VeilidConfigHTTPS { pub enabled: bool, pub listen_address: String, @@ -23,7 +23,7 @@ pub struct VeilidConfigHTTPS { pub url: Option, // Fixed URL is not optional for TLS-based protocols and is dynamically validated } -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct VeilidConfigHTTP { pub enabled: bool, pub listen_address: String, @@ -31,13 +31,13 @@ pub struct VeilidConfigHTTP { pub url: Option, } -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct VeilidConfigApplication { pub https: VeilidConfigHTTPS, pub http: VeilidConfigHTTP, } -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct VeilidConfigUDP { pub enabled: bool, pub socket_pool_size: u32, @@ -45,7 +45,7 @@ pub struct VeilidConfigUDP { pub public_address: Option, } -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct VeilidConfigTCP { pub connect: bool, pub listen: bool, @@ -54,7 +54,7 @@ pub struct VeilidConfigTCP { pub public_address: Option, } -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct VeilidConfigWS { pub connect: bool, pub listen: bool, @@ -64,7 +64,7 @@ pub struct VeilidConfigWS { pub url: Option, } -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct VeilidConfigWSS { pub connect: bool, pub listen: bool, @@ -74,7 +74,7 @@ pub struct VeilidConfigWSS { pub url: Option, // Fixed URL is not optional for TLS-based protocols and is dynamically validated } -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct VeilidConfigProtocol { pub udp: VeilidConfigUDP, pub tcp: VeilidConfigTCP, @@ -82,14 +82,14 @@ pub struct VeilidConfigProtocol { pub wss: VeilidConfigWSS, } -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct VeilidConfigTLS { pub certificate_path: String, pub private_key_path: String, pub connection_initial_timeout_ms: u32, } -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct VeilidConfigDHT { pub resolve_node_timeout_ms: Option, pub resolve_node_count: u32, @@ -107,7 +107,7 @@ pub struct VeilidConfigDHT { pub nearby_node_percentage: u32, } -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct VeilidConfigRPC { pub concurrency: u32, pub queue_size: u32, @@ -117,14 +117,14 @@ pub struct VeilidConfigRPC { pub max_route_hop_count: u8, } -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct VeilidConfigLeases { pub max_server_signal_leases: u32, pub max_server_relay_leases: u32, pub max_client_signal_leases: u32, pub max_client_relay_leases: u32, } -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct VeilidConfigRoutingTable { pub limit_over_attached: u32, pub limit_fully_attached: u32, @@ -133,12 +133,14 @@ pub struct VeilidConfigRoutingTable { pub limit_attached_weak: u32, } -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct VeilidConfigNetwork { pub max_connections: u32, pub connection_initial_timeout_ms: u32, pub connection_inactivity_timeout_ms: u32, pub client_whitelist_timeout_ms: u32, + pub reverse_connection_receipt_time_ms: u32, + pub hole_punch_receipt_time_ms: u32, pub node_id: key::DHTKey, pub node_id_secret: key::DHTKeySecret, pub bootstrap: Vec, @@ -155,19 +157,19 @@ pub struct VeilidConfigNetwork { pub leases: VeilidConfigLeases, } -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct VeilidConfigTableStore { pub directory: String, pub delete: bool, } -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct VeilidConfigBlockStore { pub directory: String, pub delete: bool, } -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct VeilidConfigProtectedStore { pub allow_insecure_fallback: bool, pub always_use_insecure_storage: bool, @@ -175,7 +177,7 @@ pub struct VeilidConfigProtectedStore { pub delete: bool, } -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct VeilidConfigCapabilities { pub protocol_udp: bool, pub protocol_connect_tcp: bool, @@ -214,7 +216,7 @@ impl Default for VeilidConfigLogLevel { } } -#[derive(Default, Clone, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct VeilidConfigInner { pub program_name: String, pub namespace: String,