From fba3f5b5f3a389d15fa8f35758e0546866234cf6 Mon Sep 17 00:00:00 2001 From: John Smith Date: Wed, 8 Dec 2021 03:09:45 +0000 Subject: [PATCH] Update to NAT detection --- .vscode/launch.json | 14 +- external/cursive | 2 +- external/cursive-flexi-logger-view | 2 +- external/cursive_buffered_backend | 2 +- external/if-addrs | 2 +- external/keyring-rs | 2 +- external/keyvaluedb | 2 +- veilid-cli/src/main.rs | 2 +- veilid-core/src/attachment_manager.rs | 6 +- veilid-core/src/intf/native/network/mod.rs | 24 +-- .../network/public_dialinfo_discovery.rs | 75 ++++---- veilid-core/src/intf/wasm/network/mod.rs | 2 +- veilid-core/src/lease_manager.rs | 4 +- veilid-core/src/lib.rs | 2 +- veilid-core/src/routing_table/bucket_entry.rs | 12 +- .../src/routing_table/dial_info_entry.rs | 4 +- veilid-core/src/routing_table/find_nodes.rs | 8 +- veilid-core/src/routing_table/mod.rs | 28 +-- veilid-core/src/rpc_processor/mod.rs | 4 +- .../src/tests/common/test_veilid_core.rs | 11 +- veilid-core/src/veilid_api.rs | 160 ++++++++++-------- veilid-core/src/veilid_core.rs | 57 ++++--- veilid-server/src/client_api.rs | 18 +- veilid-server/src/main.rs | 2 +- veilid-server/src/unix.rs | 11 +- 25 files changed, 254 insertions(+), 202 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index ec4d4238..1de6a0a9 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -8,7 +8,7 @@ "type": "lldb", "request": "attach", "name": "Attach to veilid-server", - "program": "${workspaceFolder}/veilid-server/target/debug/veilid-server", + "program": "${workspaceFolder}/target/debug/veilid-server", "pid": "${command:pickMyProcess}" }, { @@ -16,11 +16,11 @@ "request": "launch", "name": "Launch veilid-cli", "args": ["--debug"], - "program": "${workspaceFolder}/veilid-cli/target/debug/veilid-cli", + "program": "${workspaceFolder}/target/debug/veilid-cli", "windows": { - "program": "${workspaceFolder}/veilid-cli/target/debug/veilid-cli.exe" + "program": "${workspaceFolder}/target/debug/veilid-cli.exe" }, - "cwd": "${workspaceFolder}", + "cwd": "${workspaceFolder}/target/debug/", "sourceLanguages": ["rust"], "terminal": "console" }, @@ -39,9 +39,9 @@ "type": "lldb", "request": "launch", "name": "Debug veilid-server", - "program": "${workspaceFolder}/veilid-server/target/debug/veilid-server", + "program": "${workspaceFolder}/target/debug/veilid-server", "args": ["--trace", "--attach=true"], - "cwd": "${workspaceFolder}/veilid-server/target/debug/", + "cwd": "${workspaceFolder}/target/debug/", "env": { "RUST_BACKTRACE": "1" }, @@ -65,7 +65,7 @@ } }, "args": ["${selectedText}"], - "cwd": "${workspaceFolder}/veilid-core" + "cwd": "${workspaceFolder}/target/debug/" }, { diff --git a/external/cursive b/external/cursive index 298b1254..74c9d697 160000 --- a/external/cursive +++ b/external/cursive @@ -1 +1 @@ -Subproject commit 298b12545798d6b6a9e1a469467b89a15106cf7e +Subproject commit 74c9d6977af86b2a57d4415c71eacda26f28c6b4 diff --git a/external/cursive-flexi-logger-view b/external/cursive-flexi-logger-view index e95298b6..1e1542b1 160000 --- a/external/cursive-flexi-logger-view +++ b/external/cursive-flexi-logger-view @@ -1 +1 @@ -Subproject commit e95298b6db6d971d1fdc13439d1c7edd48b744de +Subproject commit 1e1542b1bb45ba590e604cb9904ef08e5e6bd55d diff --git a/external/cursive_buffered_backend b/external/cursive_buffered_backend index 70c55412..5a093be7 160000 --- a/external/cursive_buffered_backend +++ b/external/cursive_buffered_backend @@ -1 +1 @@ -Subproject commit 70c55412ea1ee97f9d60eb25e3a514b6968caa35 +Subproject commit 5a093be753db1251c2451e7e0e55d548af4abe1d diff --git a/external/if-addrs b/external/if-addrs index c78ca1aa..e9853990 160000 --- a/external/if-addrs +++ b/external/if-addrs @@ -1 +1 @@ -Subproject commit c78ca1aaff2a010c0466c10182cef932f2d53d26 +Subproject commit e985399095255f2d0ea3388a33f19e037255283a diff --git a/external/keyring-rs b/external/keyring-rs index 9162bdc2..b4a07507 160000 --- a/external/keyring-rs +++ b/external/keyring-rs @@ -1 +1 @@ -Subproject commit 9162bdc20fe4e6e5c6a8282ffa4de81b988687be +Subproject commit b4a075070682f250d00feb00dd078f35f5127ed6 diff --git a/external/keyvaluedb b/external/keyvaluedb index 4971ce61..27f4defd 160000 --- a/external/keyvaluedb +++ b/external/keyvaluedb @@ -1 +1 @@ -Subproject commit 4971ce612e7aace83b17022132687f6f380dbbae +Subproject commit 27f4defdca5f12b3ef6917cf4698181b3df0026e diff --git a/veilid-cli/src/main.rs b/veilid-cli/src/main.rs index fa57c1b7..47d37431 100644 --- a/veilid-cli/src/main.rs +++ b/veilid-cli/src/main.rs @@ -1,4 +1,4 @@ -#![warn(clippy::all)] +#![deny(clippy::all)] use anyhow::*; use async_std::prelude::*; diff --git a/veilid-core/src/attachment_manager.rs b/veilid-core/src/attachment_manager.rs index a58bf200..015719d5 100644 --- a/veilid-core/src/attachment_manager.rs +++ b/veilid-core/src/attachment_manager.rs @@ -126,11 +126,7 @@ impl AttachmentManager { table_store: table_store.clone(), crypto: crypto.clone(), attachment_machine: CallbackStateMachine::new(), - network_manager: NetworkManager::new( - config, - table_store, - crypto, - ), + network_manager: NetworkManager::new(config, table_store, crypto), maintain_peers: false, peer_count: 0, attach_timestamp: None, diff --git a/veilid-core/src/intf/native/network/mod.rs b/veilid-core/src/intf/native/network/mod.rs index f44e99ed..1ea71f29 100644 --- a/veilid-core/src/intf/native/network/mod.rs +++ b/veilid-core/src/intf/native/network/mod.rs @@ -833,7 +833,7 @@ impl Network { // Add all resolved addresses as public dialinfo for pdi_addr in &mut public_sockaddrs { - routing_table.register_public_dial_info( + routing_table.register_global_dial_info( DialInfo::udp_from_socketaddr(pdi_addr), Some(NetworkClass::Server), DialInfoOrigin::Static, @@ -844,7 +844,7 @@ impl Network { // Register local dial info as public if it is publicly routable for x in &dial_infos { if x.is_public().unwrap_or(false) { - routing_table.register_public_dial_info( + routing_table.register_global_dial_info( x.clone(), Some(NetworkClass::Server), DialInfoOrigin::Static, @@ -895,13 +895,13 @@ impl Network { let public_port = public_port .ok_or_else(|| "port must be specified for public WS address".to_owned())?; - routing_table.register_public_dial_info( + routing_table.register_global_dial_info( DialInfo::ws(fqdn, public_port, public_fqdn), Some(NetworkClass::Server), DialInfoOrigin::Static, ); } else { - routing_table.register_public_dial_info( + routing_table.register_global_dial_info( DialInfo::ws(fqdn, port, path.clone()), Some(NetworkClass::Server), DialInfoOrigin::Static, @@ -950,13 +950,13 @@ impl Network { let public_port = public_port .ok_or_else(|| "port must be specified for public WSS address".to_owned())?; - routing_table.register_public_dial_info( + routing_table.register_global_dial_info( DialInfo::wss(fqdn, public_port, public_fqdn), None, DialInfoOrigin::Static, ); } else { - routing_table.register_public_dial_info( + routing_table.register_global_dial_info( DialInfo::wss(fqdn, port, path.clone()), None, DialInfoOrigin::Static, @@ -1004,7 +1004,7 @@ impl Network { // Add all resolved addresses as public dialinfo for pdi_addr in &mut public_sockaddrs { - routing_table.register_public_dial_info( + routing_table.register_global_dial_info( DialInfo::tcp_from_socketaddr(pdi_addr), None, DialInfoOrigin::Static, @@ -1015,7 +1015,7 @@ impl Network { // Register local dial info as public if it is publicly routable for x in &dial_infos { if x.is_public().unwrap_or(false) { - routing_table.register_public_dial_info( + routing_table.register_global_dial_info( x.clone(), Some(NetworkClass::Server), DialInfoOrigin::Static, @@ -1079,7 +1079,7 @@ impl Network { // Drop all dial info routing_table.clear_local_dial_info(); - routing_table.clear_public_dial_info(); + routing_table.clear_global_dial_info(); // Cancels all async background tasks by dropping join handles *self.inner.lock() = Self::new_inner(network_manager); @@ -1099,7 +1099,7 @@ impl Network { // Go through our public dialinfo and see what our best network class is let mut network_class = NetworkClass::Invalid; - for x in routing_table.public_dial_info() { + for x in routing_table.global_dial_info() { if let Some(nc) = x.network_class { if nc < network_class { network_class = nc; @@ -1141,7 +1141,7 @@ impl Network { && (network_class.inbound_capable() || network_class == NetworkClass::Invalid) { let need_udpv4_dialinfo = routing_table - .public_dial_info_for_protocol_address_type(ProtocolAddressType::UDPv4) + .global_dial_info_for_protocol_address_type(ProtocolAddressType::UDPv4) .is_empty(); if need_udpv4_dialinfo { // If we have no public UDPv4 dialinfo, then we need to run a NAT check @@ -1159,7 +1159,7 @@ impl Network { && (network_class.inbound_capable() || network_class == NetworkClass::Invalid) { let need_tcpv4_dialinfo = routing_table - .public_dial_info_for_protocol_address_type(ProtocolAddressType::TCPv4) + .global_dial_info_for_protocol_address_type(ProtocolAddressType::TCPv4) .is_empty(); if need_tcpv4_dialinfo { // If we have no public TCPv4 dialinfo, then we need to run a NAT check diff --git a/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs b/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs index 6adf1d2d..9e651ff9 100644 --- a/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs +++ b/veilid-core/src/intf/native/network/public_dialinfo_discovery.rs @@ -27,11 +27,12 @@ impl Network { &self, protocol_address_type: ProtocolAddressType, ignore_node: Option, - ) -> Result<(SocketAddr, NodeRef), String> { + ) -> Option<(SocketAddr, NodeRef)> { let routing_table = self.routing_table(); let peers = routing_table.get_fast_nodes_of_type(protocol_address_type); if peers.is_empty() { - return Err(format!("no peers of type '{:?}'", protocol_address_type)); + trace!("no peers of type '{:?}'", protocol_address_type); + return None; } for peer in peers { if let Some(ignore_node) = ignore_node { @@ -40,36 +41,32 @@ impl Network { } } if let Some(sa) = self.request_public_address(peer.clone()).await { - return Ok((sa, peer)); + return Some((sa, peer)); } } - Err("no peers responded with an external address".to_owned()) + trace!("no peers responded with an external address"); + None } - fn discover_local_address( + fn get_interface_addresses( &self, protocol_address_type: ProtocolAddressType, - ) -> Result { + ) -> Vec { let routing_table = self.routing_table(); - match routing_table - .get_own_peer_info(PeerScope::Public) + routing_table + .get_own_peer_info(PeerScope::Local) .dial_infos .iter() - .find_map(|di| { + .filter_map(|di| { if di.protocol_address_type() == protocol_address_type { if let Ok(addr) = di.to_socket_addr() { return Some(addr); } } None - }) { - None => Err(format!( - "no local address for protocol address type: {:?}", - protocol_address_type - )), - Some(addr) => Ok(addr), - } + }) + .collect() } async fn validate_dial_info( @@ -96,9 +93,9 @@ impl Network { } } - async fn try_port_mapping( + async fn try_port_mapping>( &self, - _local_addr: SocketAddr, + _intf_addrs: I, _protocol_address_type: ProtocolAddressType, ) -> Option { //xxx @@ -114,19 +111,26 @@ impl Network { c.network.restricted_nat_retries }; - // Get our local address - let local1 = self.discover_local_address(ProtocolAddressType::UDPv4)?; + // Get our interface addresses + let intf_addrs = self.get_interface_addresses(ProtocolAddressType::UDPv4); // Loop for restricted NAT retries loop { // Get our external address from some fast node, call it node B - let (external1, node_b) = self + let (external1, node_b) = match self .discover_external_address(ProtocolAddressType::UDPv4, None) - .await?; + .await + { + None => { + // If we can't get an external address, exit but don't throw an error so we can try again later + return Ok(()); + } + Some(v) => v, + }; let external1_dial_info = DialInfo::udp_from_socketaddr(external1); - // If local1 == external1 then there is no NAT in place - if local1 == external1 { + // If our local interface list contains external1 then there is no NAT in place + if intf_addrs.contains(&external1) { // No NAT // Do a validate_dial_info on the external address from a routed node if self @@ -134,7 +138,7 @@ impl Network { .await { // Add public dial info with Server network class - routing_table.register_public_dial_info( + routing_table.register_global_dial_info( external1_dial_info, Some(NetworkClass::Server), DialInfoOrigin::Discovered, @@ -150,12 +154,12 @@ impl Network { // There is -some NAT- // Attempt a UDP port mapping via all available and enabled mechanisms if let Some(external_mapped) = self - .try_port_mapping(local1, ProtocolAddressType::UDPv4) + .try_port_mapping(&intf_addrs, ProtocolAddressType::UDPv4) .await { // Got a port mapping, let's use it let external_mapped_dial_info = DialInfo::udp_from_socketaddr(external_mapped); - routing_table.register_public_dial_info( + routing_table.register_global_dial_info( external_mapped_dial_info, Some(NetworkClass::Mapped), DialInfoOrigin::Mapped, @@ -178,7 +182,7 @@ impl Network { { // Yes, another machine can use the dial info directly, so Full Cone // Add public dial info with full cone NAT network class - routing_table.register_public_dial_info( + routing_table.register_global_dial_info( external1_dial_info, Some(NetworkClass::FullNAT), DialInfoOrigin::Discovered, @@ -190,12 +194,19 @@ impl Network { // No, we are restricted, determine what kind of restriction // Get our external address from some fast node, that is not node B, call it node D - let (external2, node_d) = self + let (external2, node_d) = match self .discover_external_address( ProtocolAddressType::UDPv4, Some(node_b.node_id()), ) - .await?; + .await + { + None => { + // If we can't get an external address, exit but don't throw an error so we can try again later + return Ok(()); + } + Some(v) => v, + }; // If we have two different external addresses, then this is a symmetric NAT if external2 != external1 { // Symmetric NAT is outbound only, no public dial info will work @@ -220,14 +231,14 @@ impl Network { .await { // Got a reply from a non-default port, which means we're only address restricted - routing_table.register_public_dial_info( + routing_table.register_global_dial_info( external1_dial_info, Some(NetworkClass::AddressRestrictedNAT), DialInfoOrigin::Discovered, ); } else { // Didn't get a reply from a non-default port, which means we are also port restricted - routing_table.register_public_dial_info( + routing_table.register_global_dial_info( external1_dial_info, Some(NetworkClass::PortRestrictedNAT), DialInfoOrigin::Discovered, diff --git a/veilid-core/src/intf/wasm/network/mod.rs b/veilid-core/src/intf/wasm/network/mod.rs index 8afdba06..90011f3e 100644 --- a/veilid-core/src/intf/wasm/network/mod.rs +++ b/veilid-core/src/intf/wasm/network/mod.rs @@ -158,7 +158,7 @@ impl Network { // Drop all dial info routing_table.clear_local_dial_info(); - routing_table.clear_public_dial_info(); + routing_table.clear_global_dial_info(); // Cancels all async background tasks by dropping join handles *self.inner.lock() = Self::new_inner(network_manager); diff --git a/veilid-core/src/lease_manager.rs b/veilid-core/src/lease_manager.rs index 0fc443f3..b959f9e2 100644 --- a/veilid-core/src/lease_manager.rs +++ b/veilid-core/src/lease_manager.rs @@ -135,7 +135,7 @@ impl LeaseManager { // xxx: depends on who is asking? // signaling requires inbound ability, so check to see if we have public dial info let routing_table = inner.network_manager.routing_table(); - if !routing_table.has_public_dial_info() { + if !routing_table.has_global_dial_info() { return false; } @@ -178,7 +178,7 @@ impl LeaseManager { // xxx: depends on who is asking? // relaying requires inbound ability, so check to see if we have public dial info let routing_table = inner.network_manager.routing_table(); - if !routing_table.has_public_dial_info() { + if !routing_table.has_global_dial_info() { return false; } true diff --git a/veilid-core/src/lib.rs b/veilid-core/src/lib.rs index 30340249..9df25b1b 100644 --- a/veilid-core/src/lib.rs +++ b/veilid-core/src/lib.rs @@ -1,4 +1,4 @@ -#![warn(clippy::all)] +#![deny(clippy::all)] #![cfg_attr(target_arch = "wasm32", no_std)] #[macro_use] diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 4a29619d..18a9ffea 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -105,7 +105,7 @@ impl BucketEntry { .collect() } - pub fn public_dial_info(&self) -> Vec { + pub fn global_dial_info(&self) -> Vec { self.dial_info_entries .iter() .filter_map(|e| { @@ -118,7 +118,7 @@ impl BucketEntry { .collect() } - pub fn public_dial_info_for_protocol(&self, protocol_type: ProtocolType) -> Vec { + pub fn global_dial_info_for_protocol(&self, protocol_type: ProtocolType) -> Vec { self.dial_info_entries .iter() .filter_map(|e| { @@ -133,7 +133,7 @@ impl BucketEntry { .collect() } - pub fn private_dial_info(&self) -> Vec { + pub fn local_dial_info(&self) -> Vec { self.dial_info_entries .iter() .filter_map(|e| { @@ -146,7 +146,7 @@ impl BucketEntry { .collect() } - pub fn private_dial_info_for_protocol(&mut self, protocol_type: ProtocolType) -> Vec { + pub fn local_dial_info_for_protocol(&mut self, protocol_type: ProtocolType) -> Vec { self.dial_info_entries .iter_mut() .filter_map(|e| { @@ -166,8 +166,8 @@ impl BucketEntry { node_id: NodeId::new(key), dial_infos: match scope { PeerScope::All => self.dial_info(), - PeerScope::Public => self.public_dial_info(), - PeerScope::Private => self.private_dial_info(), + PeerScope::Global => self.global_dial_info(), + PeerScope::Local => self.local_dial_info(), }, } } diff --git a/veilid-core/src/routing_table/dial_info_entry.rs b/veilid-core/src/routing_table/dial_info_entry.rs index 644a2dc2..9e169974 100644 --- a/veilid-core/src/routing_table/dial_info_entry.rs +++ b/veilid-core/src/routing_table/dial_info_entry.rs @@ -37,8 +37,8 @@ impl DialInfoEntry { pub fn matches_peer_scope(&self, scope: PeerScope) -> bool { match scope { PeerScope::All => true, - PeerScope::Public => self.is_public(), - PeerScope::Private => self.is_private(), + PeerScope::Global => self.is_public(), + PeerScope::Local => self.is_private(), } } diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index 35bbbf2a..c97d5633 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -30,7 +30,7 @@ impl RoutingTable { .dial_info_entries_as_ref() .iter() .find_map(|die| { - if die.matches_peer_scope(PeerScope::Public) + if die.matches_peer_scope(PeerScope::Global) && die.dial_info().protocol_address_type() == protocol_address_type { @@ -63,13 +63,13 @@ impl RoutingTable { pub fn get_own_peer_info(&self, scope: PeerScope) -> PeerInfo { let dial_infos = match scope { PeerScope::All => { - let mut divec = self.public_dial_info(); + let mut divec = self.global_dial_info(); divec.append(&mut self.local_dial_info()); divec.dedup(); divec } - PeerScope::Public => self.public_dial_info(), - PeerScope::Private => self.local_dial_info(), + PeerScope::Global => self.global_dial_info(), + PeerScope::Local => self.local_dial_info(), }; PeerInfo { diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 6dd3a8fe..dad6581d 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -44,7 +44,7 @@ struct RoutingTableInner { node_id_secret: DHTKeySecret, buckets: Vec, local_dial_info: Vec, - public_dial_info: Vec, + global_dial_info: Vec, bucket_entry_count: usize, // Waiters eventual_changed_dial_info: Eventual, @@ -77,7 +77,7 @@ impl RoutingTable { node_id_secret: DHTKeySecret::default(), buckets: Vec::new(), local_dial_info: Vec::new(), - public_dial_info: Vec::new(), + global_dial_info: Vec::new(), bucket_entry_count: 0, eventual_changed_dial_info: Eventual::new(), stats_accounting: StatsAccounting::new(), @@ -219,23 +219,23 @@ impl RoutingTable { self.inner.lock().local_dial_info.clear(); } - pub fn has_public_dial_info(&self) -> bool { + pub fn has_global_dial_info(&self) -> bool { let inner = self.inner.lock(); - !inner.public_dial_info.is_empty() + !inner.global_dial_info.is_empty() } - pub fn public_dial_info(&self) -> Vec { + pub fn global_dial_info(&self) -> Vec { let inner = self.inner.lock(); - inner.public_dial_info.clone() + inner.global_dial_info.clone() } - pub fn public_dial_info_for_protocol( + pub fn global_dial_info_for_protocol( &self, protocol_type: ProtocolType, ) -> Vec { let inner = self.inner.lock(); inner - .public_dial_info + .global_dial_info .iter() .filter_map(|di| { if di.dial_info.protocol_type() != protocol_type { @@ -246,13 +246,13 @@ impl RoutingTable { }) .collect() } - pub fn public_dial_info_for_protocol_address_type( + pub fn global_dial_info_for_protocol_address_type( &self, protocol_address_type: ProtocolAddressType, ) -> Vec { let inner = self.inner.lock(); inner - .public_dial_info + .global_dial_info .iter() .filter_map(|di| { if di.dial_info.protocol_address_type() != protocol_address_type { @@ -264,7 +264,7 @@ impl RoutingTable { .collect() } - pub fn register_public_dial_info( + pub fn register_global_dial_info( &self, dial_info: DialInfo, network_class: Option, @@ -273,7 +273,7 @@ impl RoutingTable { let ts = get_timestamp(); let mut inner = self.inner.lock(); - inner.public_dial_info.push(DialInfoDetail { + inner.global_dial_info.push(DialInfoDetail { dial_info: dial_info.clone(), origin, network_class, @@ -292,8 +292,8 @@ impl RoutingTable { ); } - pub fn clear_public_dial_info(&self) { - self.inner.lock().public_dial_info.clear(); + pub fn clear_global_dial_info(&self) { + self.inner.lock().global_dial_info.clear(); } pub async fn wait_changed_dial_info(&self) { diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 2577f521..08c46497 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -1194,7 +1194,7 @@ impl RPCProcessor { // find N nodes closest to the target node in our routing table let peer_scope = if address_filter { - PeerScope::Public + PeerScope::Global } else { PeerScope::All }; @@ -1671,7 +1671,7 @@ impl RPCProcessor { let mut peer_info_builder = fnq.reborrow().init_peer_info(); let own_peer_info = self.routing_table().get_own_peer_info(if address_filter { - PeerScope::Public + PeerScope::Global } else { PeerScope::All }); diff --git a/veilid-core/src/tests/common/test_veilid_core.rs b/veilid-core/src/tests/common/test_veilid_core.rs index e877cbfe..0a2a2736 100644 --- a/veilid-core/src/tests/common/test_veilid_core.rs +++ b/veilid-core/src/tests/common/test_veilid_core.rs @@ -22,11 +22,12 @@ pub async fn test_attach_detach() { .startup(setup_veilid_core()) .await .expect("startup failed"); - api.attach().await; + api.attach().await.unwrap(); intf::sleep(5000).await; - api.detach().await; + api.detach().await.unwrap(); api.wait_for_state(VeilidState::Attachment(AttachmentState::Detached)) - .await; + .await + .unwrap(); api.shutdown().await; info!("--- test auto detach ---"); @@ -34,7 +35,7 @@ pub async fn test_attach_detach() { .startup(setup_veilid_core()) .await .expect("startup failed"); - api.attach().await; + api.attach().await.unwrap(); intf::sleep(5000).await; api.shutdown().await; @@ -43,7 +44,7 @@ pub async fn test_attach_detach() { .startup(setup_veilid_core()) .await .expect("startup failed"); - api.detach().await; + api.detach().await.unwrap(); api.shutdown().await; } diff --git a/veilid-core/src/veilid_api.rs b/veilid-core/src/veilid_api.rs index 827c793c..1ea84ebb 100644 --- a/veilid-core/src/veilid_api.rs +++ b/veilid-core/src/veilid_api.rs @@ -1,6 +1,7 @@ pub use crate::rpc_processor::InfoAnswer; use crate::*; use attachment_manager::AttachmentManager; +use core::fmt; use network_manager::NetworkManager; use rpc_processor::{RPCError, RPCProcessor}; use xx::*; @@ -469,8 +470,8 @@ impl Default for DialInfo { #[derive(Clone, Copy, Debug)] pub enum PeerScope { All, - Public, - Private, + Global, + Local, } #[derive(Clone, Debug, Default)] @@ -876,119 +877,140 @@ impl RoutingContext { ///////////////////////////////////////////////////////////////////////////////////////////////////// struct VeilidAPIInner { - config: VeilidConfig, - attachment_manager: AttachmentManager, - core: VeilidCore, - network_manager: NetworkManager, - is_shutdown: bool, + core: Option, +} + +impl fmt::Debug for VeilidAPIInner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "VeilidAPIInner: {}", + match self.core { + Some(_) => "active", + None => "shutdown", + } + ) + } } impl Drop for VeilidAPIInner { fn drop(&mut self) { - if !self.is_shutdown { - intf::spawn_local(self.core.clone().internal_shutdown()).detach(); + if let Some(core) = self.core.take() { + intf::spawn_local(core.internal_shutdown()).detach(); } } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct VeilidAPI { inner: Arc>, } +#[derive(Clone, Debug, Default)] +pub struct VeilidAPIWeak { + inner: Weak>, +} + +impl VeilidAPIWeak { + pub fn upgrade(&self) -> Option { + self.inner.upgrade().map(|v| VeilidAPI { inner: v }) + } +} + impl VeilidAPI { - pub fn new(attachment_manager: AttachmentManager, core: VeilidCore) -> Self { + pub(crate) fn new(core: VeilidCore) -> Self { Self { - inner: Arc::new(Mutex::new(VeilidAPIInner { - config: attachment_manager.config(), - attachment_manager: attachment_manager.clone(), - core, - network_manager: attachment_manager.network_manager(), - is_shutdown: false, - })), + inner: Arc::new(Mutex::new(VeilidAPIInner { core: Some(core) })), + } + } + pub fn weak(&self) -> VeilidAPIWeak { + VeilidAPIWeak { + inner: Arc::downgrade(&self.inner), + } + } + fn core(&self) -> Result { + Ok(self + .inner + .lock() + .core + .as_ref() + .ok_or(VeilidAPIError::Shutdown)? + .clone()) + } + fn config(&self) -> Result { + Ok(self.core()?.config()) + } + fn attachment_manager(&self) -> Result { + Ok(self.core()?.attachment_manager()) + } + fn network_manager(&self) -> Result { + Ok(self.attachment_manager()?.network_manager()) + } + fn rpc_processor(&self) -> Result { + Ok(self.network_manager()?.rpc_processor()) + } + + pub async fn shutdown(self) { + let core = { self.inner.lock().core.take() }; + if let Some(core) = core { + core.internal_shutdown().await; } } - pub fn config(&self) -> VeilidConfig { - self.inner.lock().config.clone() - } - - fn attachment_manager(&self) -> AttachmentManager { - self.inner.lock().attachment_manager.clone() - } - - // fn network_manager(&self) -> NetworkManager { - // self.inner.lock().network_manager.clone() - // } - - fn rpc_processor(&self) -> RPCProcessor { - self.inner.lock().network_manager.rpc_processor() - } - - pub async fn shutdown(&self) { - let mut inner = self.inner.lock(); - if !inner.is_shutdown { - inner.core.clone().internal_shutdown().await; - inner.is_shutdown = true; - } - } pub fn is_shutdown(&self) -> bool { - self.inner.lock().is_shutdown - } - - fn verify_not_shutdown(&self) -> Result<(), VeilidAPIError> { - if self.is_shutdown() { - return Err(VeilidAPIError::Shutdown); - } - Ok(()) + self.inner.lock().core.is_none() } //////////////////////////////////////////////////////////////// // Attach/Detach // issue state changed updates for updating clients - pub async fn send_state_update(&self) { + pub async fn send_state_update(&self) -> Result<(), VeilidAPIError> { trace!("VeilidCore::send_state_update"); - let attachment_manager = self.attachment_manager().clone(); + let attachment_manager = self.attachment_manager()?; attachment_manager.send_state_update().await; + Ok(()) } // connect to the network - pub async fn attach(&self) { + pub async fn attach(&self) -> Result<(), VeilidAPIError> { trace!("VeilidCore::attach"); - let attachment_manager = self.attachment_manager().clone(); + let attachment_manager = self.attachment_manager()?; attachment_manager.request_attach().await; + Ok(()) } // disconnect from the network - pub async fn detach(&self) { + pub async fn detach(&self) -> Result<(), VeilidAPIError> { trace!("VeilidCore::detach"); - let attachment_manager = self.attachment_manager().clone(); + let attachment_manager = self.attachment_manager()?; attachment_manager.request_detach().await; + Ok(()) } // wait for state change // xxx: this should not use 'sleep', perhaps this function should be eliminated anyway - pub async fn wait_for_state(&self, state: VeilidState) { + // xxx: it should really only be used for test anyway, and there is probably a better way to do this regardless + // xxx: that doesn't wait forever and can time out + pub async fn wait_for_state(&self, state: VeilidState) -> Result<(), VeilidAPIError> { loop { intf::sleep(500).await; match state { VeilidState::Attachment(cs) => { - if self.attachment_manager().get_state() == cs { + if self.attachment_manager()?.get_state() == cs { break; } } } } + Ok(()) } //////////////////////////////////////////////////////////////// // Direct Node Access (pretty much for testing only) pub async fn info(&self, node_id: NodeId) -> Result { - self.verify_not_shutdown()?; - - let rpc = self.rpc_processor(); + let rpc = self.rpc_processor()?; let routing_table = rpc.routing_table(); let node_ref = match routing_table.lookup_node_ref(node_id.key) { None => return Err(VeilidAPIError::NodeNotFound(node_id)), @@ -1008,9 +1030,7 @@ impl VeilidAPI { redirect: bool, alternate_port: bool, ) -> Result { - self.verify_not_shutdown()?; - - let rpc = self.rpc_processor(); + let rpc = self.rpc_processor()?; let routing_table = rpc.routing_table(); let node_ref = match routing_table.lookup_node_ref(node_id.key) { None => return Err(VeilidAPIError::NodeNotFound(node_id)), @@ -1022,9 +1042,8 @@ impl VeilidAPI { } pub async fn search_dht(&self, node_id: NodeId) -> Result { - self.verify_not_shutdown()?; - let rpc_processor = self.rpc_processor(); - let config = self.config(); + let rpc_processor = self.rpc_processor()?; + let config = self.config()?; let (count, fanout, timeout) = { let c = config.get(); ( @@ -1051,10 +1070,8 @@ impl VeilidAPI { &self, node_id: NodeId, ) -> Result, VeilidAPIError> { - self.verify_not_shutdown()?; - - let rpc_processor = self.rpc_processor(); - let config = self.config(); + let rpc_processor = self.rpc_processor()?; + let config = self.config()?; let (count, fanout, timeout) = { let c = config.get(); ( @@ -1151,7 +1168,6 @@ impl VeilidAPI { _endpoint_mode: TunnelMode, _depth: u8, ) -> Result { - self.verify_not_shutdown()?; panic!("unimplemented"); } @@ -1161,12 +1177,10 @@ impl VeilidAPI { _depth: u8, _partial_tunnel: PartialTunnel, ) -> Result { - self.verify_not_shutdown()?; panic!("unimplemented"); } pub async fn cancel_tunnel(&self, _tunnel_id: TunnelId) -> Result { - self.verify_not_shutdown()?; panic!("unimplemented"); } } diff --git a/veilid-core/src/veilid_core.rs b/veilid-core/src/veilid_core.rs index 99c6e269..4f938abd 100644 --- a/veilid-core/src/veilid_core.rs +++ b/veilid-core/src/veilid_core.rs @@ -36,6 +36,7 @@ struct VeilidCoreInner { table_store: Option, crypto: Option, attachment_manager: Option, + api: VeilidAPIWeak, } #[derive(Clone)] @@ -56,6 +57,7 @@ impl VeilidCore { table_store: None, crypto: None, attachment_manager: None, + api: VeilidAPIWeak::default(), } } pub fn new() -> Self { @@ -64,18 +66,9 @@ impl VeilidCore { } } - // pub(crate) fn config(&self) -> VeilidConfig { - // self.inner.lock().config.as_ref().unwrap().clone() - // } - - // pub(crate) fn attachment_manager(&self) -> AttachmentManager { - // self.inner - // .lock() - // .attachment_manager - // .as_ref() - // .unwrap() - // .clone() - // } + pub(crate) fn config(&self) -> VeilidConfig { + self.inner.lock().config.as_ref().unwrap().clone() + } pub(crate) fn table_store(&self) -> TableStore { self.inner.lock().table_store.as_ref().unwrap().clone() @@ -85,8 +78,21 @@ impl VeilidCore { self.inner.lock().crypto.as_ref().unwrap().clone() } + pub(crate) fn attachment_manager(&self) -> AttachmentManager { + self.inner + .lock() + .attachment_manager + .as_ref() + .unwrap() + .clone() + } + // internal startup - async fn internal_startup(&self, setup: VeilidCoreSetup) -> Result { + async fn internal_startup( + &self, + inner: &mut VeilidCoreInner, + setup: VeilidCoreSetup, + ) -> Result { trace!("VeilidCore::internal_startup starting"); cfg_if! { @@ -102,19 +108,19 @@ impl VeilidCore { trace!("VeilidCore::internal_startup init config"); let mut config = VeilidConfig::new(); config.init(setup.config_callback).await?; - self.inner.lock().config = Some(config.clone()); + inner.config = Some(config.clone()); // Set up tablestore trace!("VeilidCore::internal_startup init tablestore"); let table_store = TableStore::new(config.clone()); table_store.init().await?; - self.inner.lock().table_store = Some(table_store.clone()); + inner.table_store = Some(table_store.clone()); // Set up crypto trace!("VeilidCore::internal_startup init crypto"); let crypto = Crypto::new(config.clone(), table_store.clone()); crypto.init().await?; - self.inner.lock().crypto = Some(crypto.clone()); + inner.crypto = Some(crypto.clone()); // Set up attachment manager trace!("VeilidCore::internal_startup init attachment manager"); @@ -131,20 +137,30 @@ impl VeilidCore { }, )) .await?; - self.inner.lock().attachment_manager = Some(attachment_manager.clone()); + inner.attachment_manager = Some(attachment_manager.clone()); // Set up the API trace!("VeilidCore::internal_startup init API"); let this = self.clone(); - let veilid_api = VeilidAPI::new(attachment_manager, this); + let veilid_api = VeilidAPI::new(this); + inner.api = veilid_api.weak(); + trace!("VeilidCore::internal_startup complete"); + Ok(veilid_api) } // called once at the beginning to start the node pub async fn startup(&self, setup: VeilidCoreSetup) -> Result { + // See if we have an API started up already + let mut inner = self.inner.lock(); + if inner.api.upgrade().is_some() { + // If so, return an error because we shouldn't try to do this more than once + return Err("Veilid API is started".to_owned()); + } + // Ensure we never end up partially initialized - match self.internal_startup(setup).await { + match self.internal_startup(&mut *inner, setup).await { Ok(v) => Ok(v), Err(e) => { self.clone().internal_shutdown().await; @@ -158,6 +174,9 @@ impl VeilidCore { let mut inner = self.inner.lock(); trace!("VeilidCore::internal_shutdown starting"); + // Detach the API object + inner.api = VeilidAPIWeak::default(); + // Shut down up attachment manager if let Some(attachment_manager) = &inner.attachment_manager { attachment_manager.terminate().await; diff --git a/veilid-server/src/client_api.rs b/veilid-server/src/client_api.rs index 134712e6..aab1b81f 100644 --- a/veilid-server/src/client_api.rs +++ b/veilid-server/src/client_api.rs @@ -107,8 +107,10 @@ impl veilid_server::Server for VeilidServerImpl { // Send state update let veilid_api = self.veilid_api.clone(); Promise::from_future(async move { - veilid_api.send_state_update().await; - Ok(()) + veilid_api + .send_state_update() + .await + .map_err(|e| ::capnp::Error::failed(format!("{:?}", e))) }) } @@ -120,8 +122,10 @@ impl veilid_server::Server for VeilidServerImpl { trace!("VeilidServerImpl::attach"); let veilid_api = self.veilid_api.clone(); Promise::from_future(async move { - veilid_api.attach().await; - Ok(()) + veilid_api + .attach() + .await + .map_err(|e| ::capnp::Error::failed(format!("{:?}", e))) }) } fn detach( @@ -132,8 +136,10 @@ impl veilid_server::Server for VeilidServerImpl { trace!("VeilidServerImpl::detach"); let veilid_api = self.veilid_api.clone(); Promise::from_future(async move { - veilid_api.detach().await; - Ok(()) + veilid_api + .detach() + .await + .map_err(|e| ::capnp::Error::failed(format!("{:?}", e))) }) } fn shutdown( diff --git a/veilid-server/src/main.rs b/veilid-server/src/main.rs index 4aeae6f9..dad2eec5 100644 --- a/veilid-server/src/main.rs +++ b/veilid-server/src/main.rs @@ -1,5 +1,5 @@ #![forbid(unsafe_code)] -#![warn(clippy::all)] +#![deny(clippy::all)] mod client_api; mod settings; diff --git a/veilid-server/src/unix.rs b/veilid-server/src/unix.rs index f8396f84..f56c3792 100644 --- a/veilid-server/src/unix.rs +++ b/veilid-server/src/unix.rs @@ -89,8 +89,8 @@ lazy_static! { } pub fn shutdown() { - let mut shutdown_switch_locked = SHUTDOWN_SWITCH.lock(); - if let Some(shutdown_switch) = shutdown_switch_locked.take() { + let shutdown_switch = SHUTDOWN_SWITCH.lock().take(); + if let Some(shutdown_switch) = shutdown_switch { shutdown_switch.resolve(()); } } @@ -279,17 +279,22 @@ pub async fn main() -> Result<(), String> { // Handle state changes on main thread for capnproto rpc let capi2 = capi.clone(); let capi_jh = async_std::task::spawn_local(async move { + trace!("state change processing started"); while let Ok(change) = receiver.recv().await { if let Some(c) = capi2.borrow_mut().as_mut().cloned() { c.handle_state_change(change); } } + trace!("state change processing stopped"); }); // Auto-attach if desired if auto_attach { info!("Auto-attach to the Veilid network"); - veilid_api.attach().await; + if let Err(e) = veilid_api.attach().await { + error!("Auto-attaching to the Veilid network failed: {:?}", e); + shutdown(); + } } // Idle while waiting to exit