From b13f8947df659d17dbb499085e8b285d0a8a9879 Mon Sep 17 00:00:00 2001 From: John Smith Date: Wed, 7 Sep 2022 21:52:08 -0400 Subject: [PATCH] clean up locking --- veilid-core/src/network_manager/mod.rs | 197 +++++++++--------- veilid-core/src/network_manager/wasm/mod.rs | 10 +- veilid-core/src/routing_table/mod.rs | 20 +- .../example/android/app/build.gradle | 13 +- 4 files changed, 126 insertions(+), 114 deletions(-) diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 4c79931e..c7c12da5 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -16,12 +16,12 @@ pub mod tests; //////////////////////////////////////////////////////////////////////////////////////// +pub use connection_manager::*; pub use network_connection::*; //////////////////////////////////////////////////////////////////////////////////////// use connection_handle::*; use connection_limits::*; -use connection_manager::*; use dht::*; use futures_util::stream::{FuturesOrdered, FuturesUnordered, StreamExt}; use hashlink::LruCache; @@ -142,9 +142,6 @@ struct PublicAddressCheckCacheKey(ProtocolType, AddressType); // The mutable state of the network manager struct NetworkManagerInner { - routing_table: Option, - components: Option, - update_callback: Option, stats: NetworkManagerStats, client_whitelist: LruCache, public_address_check_cache: @@ -159,6 +156,10 @@ struct NetworkManagerInner { } struct NetworkManagerUnlockedInner { + // Accessors + routing_table: RwLock>, + components: RwLock>, + update_callback: RwLock>, // Background processes rolling_transfers_task: TickTask, relay_management_task: TickTask, @@ -181,9 +182,6 @@ pub struct NetworkManager { impl NetworkManager { fn new_inner() -> NetworkManagerInner { NetworkManagerInner { - routing_table: None, - components: None, - update_callback: None, stats: NetworkManagerStats::default(), client_whitelist: LruCache::new_unbounded(), public_address_check_cache: BTreeMap::new(), @@ -198,6 +196,9 @@ impl NetworkManager { fn new_unlocked_inner(config: VeilidConfig) -> NetworkManagerUnlockedInner { let c = config.get(); NetworkManagerUnlockedInner { + routing_table: RwLock::new(None), + components: RwLock::new(None), + update_callback: RwLock::new(None), rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS), relay_management_task: TickTask::new(RELAY_MANAGEMENT_INTERVAL_SECS), bootstrap_task: TickTask::new(1), @@ -280,33 +281,44 @@ impl NetworkManager { self.crypto.clone() } pub fn routing_table(&self) -> RoutingTable { - self.inner.lock().routing_table.as_ref().unwrap().clone() + self.unlocked_inner + .routing_table + .read() + .as_ref() + .unwrap() + .clone() } pub fn net(&self) -> Network { - self.inner.lock().components.as_ref().unwrap().net.clone() + self.unlocked_inner + .components + .read() + .as_ref() + .unwrap() + .net + .clone() } pub fn rpc_processor(&self) -> RPCProcessor { - self.inner - .lock() + self.unlocked_inner .components + .read() .as_ref() .unwrap() .rpc_processor .clone() } pub fn receipt_manager(&self) -> ReceiptManager { - self.inner - .lock() + self.unlocked_inner .components + .read() .as_ref() .unwrap() .receipt_manager .clone() } pub fn connection_manager(&self) -> ConnectionManager { - self.inner - .lock() + self.unlocked_inner .components + .read() .as_ref() .unwrap() .connection_manager @@ -317,27 +329,24 @@ impl NetworkManager { pub async fn init(&self, update_callback: UpdateCallback) -> EyreResult<()> { let routing_table = RoutingTable::new(self.clone()); routing_table.init().await?; - self.inner.lock().routing_table = Some(routing_table.clone()); - self.inner.lock().update_callback = Some(update_callback); + *self.unlocked_inner.routing_table.write() = Some(routing_table.clone()); + *self.unlocked_inner.update_callback.write() = Some(update_callback); Ok(()) } #[instrument(level = "debug", skip_all)] pub async fn terminate(&self) { - let routing_table = { - let mut inner = self.inner.lock(); - inner.routing_table.take() - }; + let routing_table = self.unlocked_inner.routing_table.write().take(); if let Some(routing_table) = routing_table { routing_table.terminate().await; } - self.inner.lock().update_callback = None; + *self.unlocked_inner.update_callback.write() = None; } #[instrument(level = "debug", skip_all, err)] pub async fn internal_startup(&self) -> EyreResult<()> { trace!("NetworkManager::internal_startup begin"); - if self.inner.lock().components.is_some() { + if self.unlocked_inner.components.read().is_some() { debug!("NetworkManager::internal_startup already started"); return Ok(()); } @@ -351,7 +360,7 @@ impl NetworkManager { ); let rpc_processor = RPCProcessor::new(self.clone()); let receipt_manager = ReceiptManager::new(self.clone()); - self.inner.lock().components = Some(NetworkComponents { + *self.unlocked_inner.components.write() = Some(NetworkComponents { net: net.clone(), connection_manager: connection_manager.clone(), rpc_processor: rpc_processor.clone(), @@ -378,14 +387,9 @@ impl NetworkManager { // Store copy of protocol config and dial info filters { + let pc = self.net().get_protocol_config().unwrap(); + let mut inner = self.inner.lock(); - let pc = inner - .components - .as_ref() - .unwrap() - .net - .get_protocol_config() - .unwrap(); inner.public_inbound_dial_info_filter = Some( DialInfoFilter::all() @@ -461,19 +465,20 @@ impl NetworkManager { // Shutdown network components if they started up debug!("shutting down network components"); - let components = self.inner.lock().components.clone(); + let components = self.unlocked_inner.components.read().clone(); if let Some(components) = components { components.net.shutdown().await; components.rpc_processor.shutdown().await; components.receipt_manager.shutdown().await; components.connection_manager.shutdown().await; + + *self.unlocked_inner.components.write() = None; } // reset the state debug!("resetting network manager state"); { let mut inner = self.inner.lock(); - inner.components = None; inner.public_inbound_dial_info_filter = None; inner.local_inbound_dial_info_filter = None; inner.public_outbound_dial_info_filter = None; @@ -482,7 +487,7 @@ impl NetworkManager { } // send update - debug!("sending network state update"); + debug!("sending network state update to api clients"); self.send_network_update(); debug!("finished network manager shutdown"); @@ -537,15 +542,9 @@ impl NetworkManager { } pub async fn tick(&self) -> EyreResult<()> { - let (routing_table, net, receipt_manager) = { - let inner = self.inner.lock(); - let components = inner.components.as_ref().unwrap(); - ( - inner.routing_table.as_ref().unwrap().clone(), - components.net.clone(), - components.receipt_manager.clone(), - ) - }; + let routing_table = self.routing_table(); + let net = self.net(); + let receipt_manager = self.receipt_manager(); // Run the rolling transfers task self.unlocked_inner.rolling_transfers_task.tick().await?; @@ -591,7 +590,7 @@ impl NetworkManager { // Return what network class we are in pub fn get_network_class(&self, routing_domain: RoutingDomain) -> Option { - if let Some(components) = &self.inner.lock().components { + if let Some(components) = self.unlocked_inner.components.read().as_ref() { components.net.get_network_class(routing_domain) } else { None @@ -1517,15 +1516,6 @@ impl NetworkManager { } }; - // Get routing table and rpc processor - let (routing_table, rpc) = { - let inner = self.inner.lock(); - ( - inner.routing_table.as_ref().unwrap().clone(), - inner.components.as_ref().unwrap().rpc_processor.clone(), - ) - }; - // Get timestamp range let (tsbehind, tsahead) = { let c = self.config.get(); @@ -1557,6 +1547,10 @@ impl NetworkManager { } } + // Get routing table and rpc processor + let routing_table = self.routing_table(); + let rpc = self.rpc_processor(); + // Peek at header and see if we need to relay this // If the recipient id is not our node id, then it needs relaying let sender_id = envelope.get_sender_id(); @@ -1674,55 +1668,62 @@ impl NetworkManager { inner.stats.clone() } - fn get_veilid_state_inner(inner: &NetworkManagerInner) -> VeilidStateNetwork { - if inner.components.is_some() && inner.components.as_ref().unwrap().net.is_started() { - VeilidStateNetwork { - started: true, - bps_down: inner.stats.self_stats.transfer_stats.down.average, - bps_up: inner.stats.self_stats.transfer_stats.up.average, - peers: { - let mut out = Vec::new(); - let routing_table = inner.routing_table.as_ref().unwrap(); - for (k, v) in routing_table.get_recent_peers() { - if let Some(nr) = routing_table.lookup_node_ref(k) { - let peer_stats = nr.peer_stats(); - let peer = PeerTableData { - node_id: k, - peer_address: v.last_connection.remote(), - peer_stats, - }; - out.push(peer); - } - } + pub fn get_veilid_state(&self) -> VeilidStateNetwork { + let has_state = self + .unlocked_inner + .components + .read() + .as_ref() + .map(|c| c.net.is_started()) + .unwrap_or(false); - out - }, - } - } else { - VeilidStateNetwork { + if !has_state { + return VeilidStateNetwork { started: false, bps_down: 0, bps_up: 0, peers: Vec::new(), - } + }; + } + let routing_table = self.routing_table(); + + let (bps_down, bps_up) = { + let inner = self.inner.lock(); + ( + inner.stats.self_stats.transfer_stats.down.average, + inner.stats.self_stats.transfer_stats.up.average, + ) + }; + + VeilidStateNetwork { + started: true, + bps_down, + bps_up, + peers: { + let mut out = Vec::new(); + for (k, v) in routing_table.get_recent_peers() { + if let Some(nr) = routing_table.lookup_node_ref(k) { + let peer_stats = nr.peer_stats(); + let peer = PeerTableData { + node_id: k, + peer_address: v.last_connection.remote(), + peer_stats, + }; + out.push(peer); + } + } + out + }, } - } - pub fn get_veilid_state(&self) -> VeilidStateNetwork { - let inner = self.inner.lock(); - Self::get_veilid_state_inner(&*inner) } fn send_network_update(&self) { - let (update_cb, state) = { - let inner = self.inner.lock(); - let update_cb = inner.update_callback.clone(); - if update_cb.is_none() { - return; - } - let state = Self::get_veilid_state_inner(&*inner); - (update_cb.unwrap(), state) - }; - update_cb(VeilidUpdate::Network(state)); + let update_cb = self.unlocked_inner.update_callback.read().clone(); + if update_cb.is_none() { + return; + } + let state = self.get_veilid_state(); + (update_cb.unwrap())(VeilidUpdate::Network(state)); } // Determine if a local IP address has changed @@ -1750,12 +1751,12 @@ impl NetworkManager { //info!("report_global_socket_address\nsocket_address: {:#?}\nconnection_descriptor: {:#?}\nreporting_peer: {:#?}", socket_address, connection_descriptor, reporting_peer); // Ignore these reports if we are currently detecting public dial info - let inner = &mut *self.inner.lock(); - let net = inner.components.as_ref().unwrap().net.clone(); + let net = self.net(); if net.doing_public_dial_info_check() { return; } - let routing_table = inner.routing_table.as_ref().unwrap().clone(); + + let routing_table = self.routing_table(); let c = self.config.get(); let detect_address_changes = c.network.detect_address_changes; @@ -1771,6 +1772,10 @@ impl NetworkManager { connection_descriptor.protocol_type(), connection_descriptor.address_type(), ); + + let mut inner = self.inner.lock(); + let inner = &mut *inner; + let pacc = inner .public_address_check_cache .entry(key) diff --git a/veilid-core/src/network_manager/wasm/mod.rs b/veilid-core/src/network_manager/wasm/mod.rs index aa300718..448c1e9e 100644 --- a/veilid-core/src/network_manager/wasm/mod.rs +++ b/veilid-core/src/network_manager/wasm/mod.rs @@ -160,7 +160,7 @@ impl Network { // Handle connection-oriented protocols // Try to send to the exact existing connection if one exists - if let Some(conn) = self.connection_manager().get_connection(descriptor).await { + if let Some(conn) = self.connection_manager().get_connection(descriptor) { // connection exists, send over it match conn.send_async(data).await { ConnectionHandleSendResult::Sent => { @@ -292,11 +292,15 @@ impl Network { ////////////////////////////////////////// - pub fn set_needs_public_dial_info_check(&self) { + pub fn set_needs_public_dial_info_check(&self, _punishment: Option>) { // } - pub fn get_network_class(&self, _routing_domain: PublicInternet) -> Option { + pub fn doing_public_dial_info_check(&self) -> bool { + false + } + + pub fn get_network_class(&self, _routing_domain: RoutingDomain) -> Option { // xxx eventually detect tor browser? return if self.inner.lock().network_started { Some(NetworkClass::WebApp) diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index ad3fa95f..e88c5994 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -34,7 +34,6 @@ pub struct RecentPeersEntry { /// RoutingTable rwlock-internal data struct RoutingTableInner { - network_manager: NetworkManager, // The current node's public DHT key node_id: DHTKey, node_id_secret: DHTKeySecret, // The current node's DHT key secret @@ -60,6 +59,8 @@ pub struct RoutingTableHealth { } struct RoutingTableUnlockedInner { + network_manager: NetworkManager, + // Background processes rolling_transfers_task: TickTask, kick_buckets_task: TickTask, @@ -73,9 +74,8 @@ pub struct RoutingTable { } impl RoutingTable { - fn new_inner(network_manager: NetworkManager) -> RoutingTableInner { + fn new_inner() -> RoutingTableInner { RoutingTableInner { - network_manager, node_id: DHTKey::default(), node_id_secret: DHTKeySecret::default(), buckets: Vec::new(), @@ -89,9 +89,13 @@ impl RoutingTable { recent_peers: LruCache::new(RECENT_PEERS_TABLE_SIZE), } } - fn new_unlocked_inner(_config: VeilidConfig) -> RoutingTableUnlockedInner { + fn new_unlocked_inner( + _config: VeilidConfig, + network_manager: NetworkManager, + ) -> RoutingTableUnlockedInner { //let c = config.get(); RoutingTableUnlockedInner { + network_manager, rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS), kick_buckets_task: TickTask::new(1), } @@ -100,8 +104,8 @@ impl RoutingTable { let config = network_manager.config(); let this = Self { config: config.clone(), - inner: Arc::new(RwLock::new(Self::new_inner(network_manager))), - unlocked_inner: Arc::new(Self::new_unlocked_inner(config)), + inner: Arc::new(RwLock::new(Self::new_inner())), + unlocked_inner: Arc::new(Self::new_unlocked_inner(config, network_manager)), }; // Set rolling transfers tick task { @@ -126,7 +130,7 @@ impl RoutingTable { } pub fn network_manager(&self) -> NetworkManager { - self.inner.read().network_manager.clone() + self.unlocked_inner.network_manager.clone() } pub fn rpc_processor(&self) -> RPCProcessor { self.network_manager().rpc_processor() @@ -451,7 +455,7 @@ impl RoutingTable { error!("kick_buckets_task not stopped: {}", e); } - *self.inner.write() = Self::new_inner(self.network_manager()); + *self.inner.write() = Self::new_inner(); debug!("finished routing table terminate"); } diff --git a/veilid-flutter/example/android/app/build.gradle b/veilid-flutter/example/android/app/build.gradle index 050a97e6..3d452b27 100644 --- a/veilid-flutter/example/android/app/build.gradle +++ b/veilid-flutter/example/android/app/build.gradle @@ -41,13 +41,6 @@ android { main.java.srcDirs += 'src/main/kotlin' } - - // xxx why does backtrace not work in android? - // tried this but it doesn't help - // packagingOptions { - // jniLibs.useLegacyPackaging = true - // } - defaultConfig { // TODO: Specify your own unique Application ID (https://developer.android.com/studio/build/application-id.html). applicationId "com.veilid.veilid_example" @@ -63,6 +56,12 @@ android { // Signing with the debug keys for now, so `flutter run --release` works. signingConfig signingConfigs.debug } + debug { + packagingOptions { + jniLibs.useLegacyPackaging = true + jniLibs.keepDebugSymbols += '**/libveilid_flutter.so' + } + } } }