clean up locking

This commit is contained in:
John Smith 2022-09-07 21:52:08 -04:00
parent 19db64cdfa
commit b13f8947df
4 changed files with 126 additions and 114 deletions

View File

@ -16,12 +16,12 @@ pub mod tests;
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
pub use connection_manager::*;
pub use network_connection::*; pub use network_connection::*;
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
use connection_handle::*; use connection_handle::*;
use connection_limits::*; use connection_limits::*;
use connection_manager::*;
use dht::*; use dht::*;
use futures_util::stream::{FuturesOrdered, FuturesUnordered, StreamExt}; use futures_util::stream::{FuturesOrdered, FuturesUnordered, StreamExt};
use hashlink::LruCache; use hashlink::LruCache;
@ -142,9 +142,6 @@ struct PublicAddressCheckCacheKey(ProtocolType, AddressType);
// The mutable state of the network manager // The mutable state of the network manager
struct NetworkManagerInner { struct NetworkManagerInner {
routing_table: Option<RoutingTable>,
components: Option<NetworkComponents>,
update_callback: Option<UpdateCallback>,
stats: NetworkManagerStats, stats: NetworkManagerStats,
client_whitelist: LruCache<DHTKey, ClientWhitelistEntry>, client_whitelist: LruCache<DHTKey, ClientWhitelistEntry>,
public_address_check_cache: public_address_check_cache:
@ -159,6 +156,10 @@ struct NetworkManagerInner {
} }
struct NetworkManagerUnlockedInner { struct NetworkManagerUnlockedInner {
// Accessors
routing_table: RwLock<Option<RoutingTable>>,
components: RwLock<Option<NetworkComponents>>,
update_callback: RwLock<Option<UpdateCallback>>,
// Background processes // Background processes
rolling_transfers_task: TickTask<EyreReport>, rolling_transfers_task: TickTask<EyreReport>,
relay_management_task: TickTask<EyreReport>, relay_management_task: TickTask<EyreReport>,
@ -181,9 +182,6 @@ pub struct NetworkManager {
impl NetworkManager { impl NetworkManager {
fn new_inner() -> NetworkManagerInner { fn new_inner() -> NetworkManagerInner {
NetworkManagerInner { NetworkManagerInner {
routing_table: None,
components: None,
update_callback: None,
stats: NetworkManagerStats::default(), stats: NetworkManagerStats::default(),
client_whitelist: LruCache::new_unbounded(), client_whitelist: LruCache::new_unbounded(),
public_address_check_cache: BTreeMap::new(), public_address_check_cache: BTreeMap::new(),
@ -198,6 +196,9 @@ impl NetworkManager {
fn new_unlocked_inner(config: VeilidConfig) -> NetworkManagerUnlockedInner { fn new_unlocked_inner(config: VeilidConfig) -> NetworkManagerUnlockedInner {
let c = config.get(); let c = config.get();
NetworkManagerUnlockedInner { NetworkManagerUnlockedInner {
routing_table: RwLock::new(None),
components: RwLock::new(None),
update_callback: RwLock::new(None),
rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS), rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS),
relay_management_task: TickTask::new(RELAY_MANAGEMENT_INTERVAL_SECS), relay_management_task: TickTask::new(RELAY_MANAGEMENT_INTERVAL_SECS),
bootstrap_task: TickTask::new(1), bootstrap_task: TickTask::new(1),
@ -280,33 +281,44 @@ impl NetworkManager {
self.crypto.clone() self.crypto.clone()
} }
pub fn routing_table(&self) -> RoutingTable { pub fn routing_table(&self) -> RoutingTable {
self.inner.lock().routing_table.as_ref().unwrap().clone() self.unlocked_inner
.routing_table
.read()
.as_ref()
.unwrap()
.clone()
} }
pub fn net(&self) -> Network { pub fn net(&self) -> Network {
self.inner.lock().components.as_ref().unwrap().net.clone() self.unlocked_inner
.components
.read()
.as_ref()
.unwrap()
.net
.clone()
} }
pub fn rpc_processor(&self) -> RPCProcessor { pub fn rpc_processor(&self) -> RPCProcessor {
self.inner self.unlocked_inner
.lock()
.components .components
.read()
.as_ref() .as_ref()
.unwrap() .unwrap()
.rpc_processor .rpc_processor
.clone() .clone()
} }
pub fn receipt_manager(&self) -> ReceiptManager { pub fn receipt_manager(&self) -> ReceiptManager {
self.inner self.unlocked_inner
.lock()
.components .components
.read()
.as_ref() .as_ref()
.unwrap() .unwrap()
.receipt_manager .receipt_manager
.clone() .clone()
} }
pub fn connection_manager(&self) -> ConnectionManager { pub fn connection_manager(&self) -> ConnectionManager {
self.inner self.unlocked_inner
.lock()
.components .components
.read()
.as_ref() .as_ref()
.unwrap() .unwrap()
.connection_manager .connection_manager
@ -317,27 +329,24 @@ impl NetworkManager {
pub async fn init(&self, update_callback: UpdateCallback) -> EyreResult<()> { pub async fn init(&self, update_callback: UpdateCallback) -> EyreResult<()> {
let routing_table = RoutingTable::new(self.clone()); let routing_table = RoutingTable::new(self.clone());
routing_table.init().await?; routing_table.init().await?;
self.inner.lock().routing_table = Some(routing_table.clone()); *self.unlocked_inner.routing_table.write() = Some(routing_table.clone());
self.inner.lock().update_callback = Some(update_callback); *self.unlocked_inner.update_callback.write() = Some(update_callback);
Ok(()) Ok(())
} }
#[instrument(level = "debug", skip_all)] #[instrument(level = "debug", skip_all)]
pub async fn terminate(&self) { pub async fn terminate(&self) {
let routing_table = { let routing_table = self.unlocked_inner.routing_table.write().take();
let mut inner = self.inner.lock();
inner.routing_table.take()
};
if let Some(routing_table) = routing_table { if let Some(routing_table) = routing_table {
routing_table.terminate().await; routing_table.terminate().await;
} }
self.inner.lock().update_callback = None; *self.unlocked_inner.update_callback.write() = None;
} }
#[instrument(level = "debug", skip_all, err)] #[instrument(level = "debug", skip_all, err)]
pub async fn internal_startup(&self) -> EyreResult<()> { pub async fn internal_startup(&self) -> EyreResult<()> {
trace!("NetworkManager::internal_startup begin"); trace!("NetworkManager::internal_startup begin");
if self.inner.lock().components.is_some() { if self.unlocked_inner.components.read().is_some() {
debug!("NetworkManager::internal_startup already started"); debug!("NetworkManager::internal_startup already started");
return Ok(()); return Ok(());
} }
@ -351,7 +360,7 @@ impl NetworkManager {
); );
let rpc_processor = RPCProcessor::new(self.clone()); let rpc_processor = RPCProcessor::new(self.clone());
let receipt_manager = ReceiptManager::new(self.clone()); let receipt_manager = ReceiptManager::new(self.clone());
self.inner.lock().components = Some(NetworkComponents { *self.unlocked_inner.components.write() = Some(NetworkComponents {
net: net.clone(), net: net.clone(),
connection_manager: connection_manager.clone(), connection_manager: connection_manager.clone(),
rpc_processor: rpc_processor.clone(), rpc_processor: rpc_processor.clone(),
@ -378,14 +387,9 @@ impl NetworkManager {
// Store copy of protocol config and dial info filters // Store copy of protocol config and dial info filters
{ {
let pc = self.net().get_protocol_config().unwrap();
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
let pc = inner
.components
.as_ref()
.unwrap()
.net
.get_protocol_config()
.unwrap();
inner.public_inbound_dial_info_filter = Some( inner.public_inbound_dial_info_filter = Some(
DialInfoFilter::all() DialInfoFilter::all()
@ -461,19 +465,20 @@ impl NetworkManager {
// Shutdown network components if they started up // Shutdown network components if they started up
debug!("shutting down network components"); debug!("shutting down network components");
let components = self.inner.lock().components.clone(); let components = self.unlocked_inner.components.read().clone();
if let Some(components) = components { if let Some(components) = components {
components.net.shutdown().await; components.net.shutdown().await;
components.rpc_processor.shutdown().await; components.rpc_processor.shutdown().await;
components.receipt_manager.shutdown().await; components.receipt_manager.shutdown().await;
components.connection_manager.shutdown().await; components.connection_manager.shutdown().await;
*self.unlocked_inner.components.write() = None;
} }
// reset the state // reset the state
debug!("resetting network manager state"); debug!("resetting network manager state");
{ {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
inner.components = None;
inner.public_inbound_dial_info_filter = None; inner.public_inbound_dial_info_filter = None;
inner.local_inbound_dial_info_filter = None; inner.local_inbound_dial_info_filter = None;
inner.public_outbound_dial_info_filter = None; inner.public_outbound_dial_info_filter = None;
@ -482,7 +487,7 @@ impl NetworkManager {
} }
// send update // send update
debug!("sending network state update"); debug!("sending network state update to api clients");
self.send_network_update(); self.send_network_update();
debug!("finished network manager shutdown"); debug!("finished network manager shutdown");
@ -537,15 +542,9 @@ impl NetworkManager {
} }
pub async fn tick(&self) -> EyreResult<()> { pub async fn tick(&self) -> EyreResult<()> {
let (routing_table, net, receipt_manager) = { let routing_table = self.routing_table();
let inner = self.inner.lock(); let net = self.net();
let components = inner.components.as_ref().unwrap(); let receipt_manager = self.receipt_manager();
(
inner.routing_table.as_ref().unwrap().clone(),
components.net.clone(),
components.receipt_manager.clone(),
)
};
// Run the rolling transfers task // Run the rolling transfers task
self.unlocked_inner.rolling_transfers_task.tick().await?; self.unlocked_inner.rolling_transfers_task.tick().await?;
@ -591,7 +590,7 @@ impl NetworkManager {
// Return what network class we are in // Return what network class we are in
pub fn get_network_class(&self, routing_domain: RoutingDomain) -> Option<NetworkClass> { pub fn get_network_class(&self, routing_domain: RoutingDomain) -> Option<NetworkClass> {
if let Some(components) = &self.inner.lock().components { if let Some(components) = self.unlocked_inner.components.read().as_ref() {
components.net.get_network_class(routing_domain) components.net.get_network_class(routing_domain)
} else { } else {
None None
@ -1517,15 +1516,6 @@ impl NetworkManager {
} }
}; };
// Get routing table and rpc processor
let (routing_table, rpc) = {
let inner = self.inner.lock();
(
inner.routing_table.as_ref().unwrap().clone(),
inner.components.as_ref().unwrap().rpc_processor.clone(),
)
};
// Get timestamp range // Get timestamp range
let (tsbehind, tsahead) = { let (tsbehind, tsahead) = {
let c = self.config.get(); let c = self.config.get();
@ -1557,6 +1547,10 @@ impl NetworkManager {
} }
} }
// Get routing table and rpc processor
let routing_table = self.routing_table();
let rpc = self.rpc_processor();
// Peek at header and see if we need to relay this // Peek at header and see if we need to relay this
// If the recipient id is not our node id, then it needs relaying // If the recipient id is not our node id, then it needs relaying
let sender_id = envelope.get_sender_id(); let sender_id = envelope.get_sender_id();
@ -1674,55 +1668,62 @@ impl NetworkManager {
inner.stats.clone() inner.stats.clone()
} }
fn get_veilid_state_inner(inner: &NetworkManagerInner) -> VeilidStateNetwork { pub fn get_veilid_state(&self) -> VeilidStateNetwork {
if inner.components.is_some() && inner.components.as_ref().unwrap().net.is_started() { let has_state = self
VeilidStateNetwork { .unlocked_inner
started: true, .components
bps_down: inner.stats.self_stats.transfer_stats.down.average, .read()
bps_up: inner.stats.self_stats.transfer_stats.up.average, .as_ref()
peers: { .map(|c| c.net.is_started())
let mut out = Vec::new(); .unwrap_or(false);
let routing_table = inner.routing_table.as_ref().unwrap();
for (k, v) in routing_table.get_recent_peers() {
if let Some(nr) = routing_table.lookup_node_ref(k) {
let peer_stats = nr.peer_stats();
let peer = PeerTableData {
node_id: k,
peer_address: v.last_connection.remote(),
peer_stats,
};
out.push(peer);
}
}
out if !has_state {
}, return VeilidStateNetwork {
}
} else {
VeilidStateNetwork {
started: false, started: false,
bps_down: 0, bps_down: 0,
bps_up: 0, bps_up: 0,
peers: Vec::new(), peers: Vec::new(),
} };
}
let routing_table = self.routing_table();
let (bps_down, bps_up) = {
let inner = self.inner.lock();
(
inner.stats.self_stats.transfer_stats.down.average,
inner.stats.self_stats.transfer_stats.up.average,
)
};
VeilidStateNetwork {
started: true,
bps_down,
bps_up,
peers: {
let mut out = Vec::new();
for (k, v) in routing_table.get_recent_peers() {
if let Some(nr) = routing_table.lookup_node_ref(k) {
let peer_stats = nr.peer_stats();
let peer = PeerTableData {
node_id: k,
peer_address: v.last_connection.remote(),
peer_stats,
};
out.push(peer);
}
}
out
},
} }
}
pub fn get_veilid_state(&self) -> VeilidStateNetwork {
let inner = self.inner.lock();
Self::get_veilid_state_inner(&*inner)
} }
fn send_network_update(&self) { fn send_network_update(&self) {
let (update_cb, state) = { let update_cb = self.unlocked_inner.update_callback.read().clone();
let inner = self.inner.lock(); if update_cb.is_none() {
let update_cb = inner.update_callback.clone(); return;
if update_cb.is_none() { }
return; let state = self.get_veilid_state();
} (update_cb.unwrap())(VeilidUpdate::Network(state));
let state = Self::get_veilid_state_inner(&*inner);
(update_cb.unwrap(), state)
};
update_cb(VeilidUpdate::Network(state));
} }
// Determine if a local IP address has changed // Determine if a local IP address has changed
@ -1750,12 +1751,12 @@ impl NetworkManager {
//info!("report_global_socket_address\nsocket_address: {:#?}\nconnection_descriptor: {:#?}\nreporting_peer: {:#?}", socket_address, connection_descriptor, reporting_peer); //info!("report_global_socket_address\nsocket_address: {:#?}\nconnection_descriptor: {:#?}\nreporting_peer: {:#?}", socket_address, connection_descriptor, reporting_peer);
// Ignore these reports if we are currently detecting public dial info // Ignore these reports if we are currently detecting public dial info
let inner = &mut *self.inner.lock(); let net = self.net();
let net = inner.components.as_ref().unwrap().net.clone();
if net.doing_public_dial_info_check() { if net.doing_public_dial_info_check() {
return; return;
} }
let routing_table = inner.routing_table.as_ref().unwrap().clone();
let routing_table = self.routing_table();
let c = self.config.get(); let c = self.config.get();
let detect_address_changes = c.network.detect_address_changes; let detect_address_changes = c.network.detect_address_changes;
@ -1771,6 +1772,10 @@ impl NetworkManager {
connection_descriptor.protocol_type(), connection_descriptor.protocol_type(),
connection_descriptor.address_type(), connection_descriptor.address_type(),
); );
let mut inner = self.inner.lock();
let inner = &mut *inner;
let pacc = inner let pacc = inner
.public_address_check_cache .public_address_check_cache
.entry(key) .entry(key)

View File

@ -160,7 +160,7 @@ impl Network {
// Handle connection-oriented protocols // Handle connection-oriented protocols
// Try to send to the exact existing connection if one exists // Try to send to the exact existing connection if one exists
if let Some(conn) = self.connection_manager().get_connection(descriptor).await { if let Some(conn) = self.connection_manager().get_connection(descriptor) {
// connection exists, send over it // connection exists, send over it
match conn.send_async(data).await { match conn.send_async(data).await {
ConnectionHandleSendResult::Sent => { ConnectionHandleSendResult::Sent => {
@ -292,11 +292,15 @@ impl Network {
////////////////////////////////////////// //////////////////////////////////////////
pub fn set_needs_public_dial_info_check(&self) { pub fn set_needs_public_dial_info_check(&self, _punishment: Option<Box<dyn FnOnce() + Send + 'static>>) {
// //
} }
pub fn get_network_class(&self, _routing_domain: PublicInternet) -> Option<NetworkClass> { pub fn doing_public_dial_info_check(&self) -> bool {
false
}
pub fn get_network_class(&self, _routing_domain: RoutingDomain) -> Option<NetworkClass> {
// xxx eventually detect tor browser? // xxx eventually detect tor browser?
return if self.inner.lock().network_started { return if self.inner.lock().network_started {
Some(NetworkClass::WebApp) Some(NetworkClass::WebApp)

View File

@ -34,7 +34,6 @@ pub struct RecentPeersEntry {
/// RoutingTable rwlock-internal data /// RoutingTable rwlock-internal data
struct RoutingTableInner { struct RoutingTableInner {
network_manager: NetworkManager,
// The current node's public DHT key // The current node's public DHT key
node_id: DHTKey, node_id: DHTKey,
node_id_secret: DHTKeySecret, // The current node's DHT key secret node_id_secret: DHTKeySecret, // The current node's DHT key secret
@ -60,6 +59,8 @@ pub struct RoutingTableHealth {
} }
struct RoutingTableUnlockedInner { struct RoutingTableUnlockedInner {
network_manager: NetworkManager,
// Background processes // Background processes
rolling_transfers_task: TickTask<EyreReport>, rolling_transfers_task: TickTask<EyreReport>,
kick_buckets_task: TickTask<EyreReport>, kick_buckets_task: TickTask<EyreReport>,
@ -73,9 +74,8 @@ pub struct RoutingTable {
} }
impl RoutingTable { impl RoutingTable {
fn new_inner(network_manager: NetworkManager) -> RoutingTableInner { fn new_inner() -> RoutingTableInner {
RoutingTableInner { RoutingTableInner {
network_manager,
node_id: DHTKey::default(), node_id: DHTKey::default(),
node_id_secret: DHTKeySecret::default(), node_id_secret: DHTKeySecret::default(),
buckets: Vec::new(), buckets: Vec::new(),
@ -89,9 +89,13 @@ impl RoutingTable {
recent_peers: LruCache::new(RECENT_PEERS_TABLE_SIZE), recent_peers: LruCache::new(RECENT_PEERS_TABLE_SIZE),
} }
} }
fn new_unlocked_inner(_config: VeilidConfig) -> RoutingTableUnlockedInner { fn new_unlocked_inner(
_config: VeilidConfig,
network_manager: NetworkManager,
) -> RoutingTableUnlockedInner {
//let c = config.get(); //let c = config.get();
RoutingTableUnlockedInner { RoutingTableUnlockedInner {
network_manager,
rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS), rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS),
kick_buckets_task: TickTask::new(1), kick_buckets_task: TickTask::new(1),
} }
@ -100,8 +104,8 @@ impl RoutingTable {
let config = network_manager.config(); let config = network_manager.config();
let this = Self { let this = Self {
config: config.clone(), config: config.clone(),
inner: Arc::new(RwLock::new(Self::new_inner(network_manager))), inner: Arc::new(RwLock::new(Self::new_inner())),
unlocked_inner: Arc::new(Self::new_unlocked_inner(config)), unlocked_inner: Arc::new(Self::new_unlocked_inner(config, network_manager)),
}; };
// Set rolling transfers tick task // Set rolling transfers tick task
{ {
@ -126,7 +130,7 @@ impl RoutingTable {
} }
pub fn network_manager(&self) -> NetworkManager { pub fn network_manager(&self) -> NetworkManager {
self.inner.read().network_manager.clone() self.unlocked_inner.network_manager.clone()
} }
pub fn rpc_processor(&self) -> RPCProcessor { pub fn rpc_processor(&self) -> RPCProcessor {
self.network_manager().rpc_processor() self.network_manager().rpc_processor()
@ -451,7 +455,7 @@ impl RoutingTable {
error!("kick_buckets_task not stopped: {}", e); error!("kick_buckets_task not stopped: {}", e);
} }
*self.inner.write() = Self::new_inner(self.network_manager()); *self.inner.write() = Self::new_inner();
debug!("finished routing table terminate"); debug!("finished routing table terminate");
} }

View File

@ -41,13 +41,6 @@ android {
main.java.srcDirs += 'src/main/kotlin' main.java.srcDirs += 'src/main/kotlin'
} }
// xxx why does backtrace not work in android?
// tried this but it doesn't help
// packagingOptions {
// jniLibs.useLegacyPackaging = true
// }
defaultConfig { defaultConfig {
// TODO: Specify your own unique Application ID (https://developer.android.com/studio/build/application-id.html). // TODO: Specify your own unique Application ID (https://developer.android.com/studio/build/application-id.html).
applicationId "com.veilid.veilid_example" applicationId "com.veilid.veilid_example"
@ -63,6 +56,12 @@ android {
// Signing with the debug keys for now, so `flutter run --release` works. // Signing with the debug keys for now, so `flutter run --release` works.
signingConfig signingConfigs.debug signingConfig signingConfigs.debug
} }
debug {
packagingOptions {
jniLibs.useLegacyPackaging = true
jniLibs.keepDebugSymbols += '**/libveilid_flutter.so'
}
}
} }
} }