From 0249b7c7aebdf65e519477edcd4c1ec528e8772d Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Mon, 21 Aug 2023 21:04:21 -0400 Subject: [PATCH] dial info failure reprioritization --- veilid-core/src/lib.rs | 4 +- .../src/network_manager/address_filter.rs | 61 ++++ veilid-core/src/network_manager/mod.rs | 3 + veilid-core/src/network_manager/native/mod.rs | 327 ++++++++++-------- veilid-core/src/network_manager/send_data.rs | 19 + veilid-core/src/network_manager/wasm/mod.rs | 220 +++++++----- veilid-core/src/routing_table/mod.rs | 2 + .../route_spec_store/route_spec_store.rs | 4 + .../src/routing_table/routing_domains.rs | 71 +++- .../src/routing_table/routing_table_inner.rs | 3 +- veilid-flutter/example/pubspec.lock | 2 +- 11 files changed, 452 insertions(+), 264 deletions(-) diff --git a/veilid-core/src/lib.rs b/veilid-core/src/lib.rs index 2446cfe4..ce93567c 100644 --- a/veilid-core/src/lib.rs +++ b/veilid-core/src/lib.rs @@ -61,7 +61,7 @@ pub fn veilid_version() -> (u32, u32, u32) { #[cfg(target_os = "android")] pub use intf::android::veilid_core_setup_android; -pub static DEFAULT_LOG_IGNORE_LIST: [&str; 21] = [ +pub static DEFAULT_LOG_IGNORE_LIST: [&str; 23] = [ "mio", "h2", "hyper", @@ -83,6 +83,8 @@ pub static DEFAULT_LOG_IGNORE_LIST: [&str; 21] = [ "trust_dns_resolver", "trust_dns_proto", "attohttpc", + "ws_stream_wasm", + "keyvaluedb_web", ]; use cfg_if::*; diff --git a/veilid-core/src/network_manager/address_filter.rs b/veilid-core/src/network_manager/address_filter.rs index 417ebd4e..5b17e3d0 100644 --- a/veilid-core/src/network_manager/address_filter.rs +++ b/veilid-core/src/network_manager/address_filter.rs @@ -4,6 +4,8 @@ use alloc::collections::btree_map::Entry; // XXX: Move to config eventually? const PUNISHMENT_DURATION_MIN: usize = 60; const MAX_PUNISHMENTS_BY_NODE_ID: usize = 65536; +const DIAL_INFO_FAILURE_DURATION_MIN: usize = 10; +const MAX_DIAL_INFO_FAILURES: usize = 65536; #[derive(ThisError, Debug, Clone, Copy, PartialEq, Eq)] pub enum AddressFilterError { @@ -28,6 +30,7 @@ struct AddressFilterInner { punishments_by_ip4: BTreeMap, punishments_by_ip6_prefix: BTreeMap, punishments_by_node_id: BTreeMap, + dial_info_failures: BTreeMap, } struct AddressFilterUnlockedInner { @@ -36,6 +39,7 @@ struct AddressFilterUnlockedInner { max_connections_per_ip6_prefix_size: usize, max_connection_frequency_per_min: usize, punishment_duration_min: usize, + dial_info_failure_duration_min: usize, routing_table: RoutingTable, } @@ -56,6 +60,10 @@ impl fmt::Debug for AddressFilterUnlockedInner { &self.max_connection_frequency_per_min, ) .field("punishment_duration_min", &self.punishment_duration_min) + .field( + "dial_info_failure_duration_min", + &self.dial_info_failure_duration_min, + ) .finish() } } @@ -78,6 +86,7 @@ impl AddressFilter { max_connection_frequency_per_min: c.network.max_connection_frequency_per_min as usize, punishment_duration_min: PUNISHMENT_DURATION_MIN, + dial_info_failure_duration_min: DIAL_INFO_FAILURE_DURATION_MIN, routing_table, }), inner: Arc::new(Mutex::new(AddressFilterInner { @@ -88,10 +97,17 @@ impl AddressFilter { punishments_by_ip4: BTreeMap::new(), punishments_by_ip6_prefix: BTreeMap::new(), punishments_by_node_id: BTreeMap::new(), + dial_info_failures: BTreeMap::new(), })), } } + // When the network restarts, some of the address filter can be cleared + pub fn restart(&self) { + let mut inner = self.inner.lock(); + inner.dial_info_failures.clear(); + } + fn purge_old_timestamps(&self, inner: &mut AddressFilterInner, cur_ts: Timestamp) { // v4 { @@ -180,6 +196,22 @@ impl AddressFilter { } } } + // dial info + { + let mut dead_keys = Vec::::new(); + for (key, value) in &mut inner.dial_info_failures { + // Drop failures older than the failure duration + if cur_ts.as_u64().saturating_sub(value.as_u64()) + > self.unlocked_inner.dial_info_failure_duration_min as u64 * 60_000_000u64 + { + dead_keys.push(key.clone()); + } + } + for key in dead_keys { + log_net!(debug ">>> DIALINFO PERMIT: {}", key); + inner.dial_info_failures.remove(&key); + } + } } fn is_ip_addr_punished_inner(&self, inner: &AddressFilterInner, ipblock: IpAddr) -> bool { @@ -198,6 +230,14 @@ impl AddressFilter { false } + fn get_dial_info_failed_ts_inner( + &self, + inner: &AddressFilterInner, + dial_info: &DialInfo, + ) -> Option { + inner.dial_info_failures.get(dial_info).copied() + } + pub fn is_ip_addr_punished(&self, addr: IpAddr) -> bool { let inner = self.inner.lock(); let ipblock = ip_to_ipblock( @@ -207,6 +247,27 @@ impl AddressFilter { self.is_ip_addr_punished_inner(&*inner, ipblock) } + pub fn get_dial_info_failed_ts(&self, dial_info: &DialInfo) -> Option { + let inner = self.inner.lock(); + self.get_dial_info_failed_ts_inner(&*inner, dial_info) + } + + pub fn set_dial_info_failed(&self, dial_info: DialInfo) { + let ts = get_aligned_timestamp(); + + let mut inner = self.inner.lock(); + if inner.dial_info_failures.len() >= MAX_DIAL_INFO_FAILURES { + log_net!(debug ">>> DIALINFO FAILURE TABLE FULL: {}", dial_info); + return; + } + log_net!(debug ">>> DIALINFO FAILURE: {:?}", dial_info); + inner + .dial_info_failures + .entry(dial_info) + .and_modify(|v| *v = ts) + .or_insert(ts); + } + pub fn punish_ip_addr(&self, addr: IpAddr) { log_net!(debug ">>> PUNISHED: {}", addr); let ts = get_aligned_timestamp(); diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index a36569bb..6d36a8bd 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -380,6 +380,9 @@ impl NetworkManager { return Ok(()); } + // Clean address filter for things that should not be persistent + self.address_filter().restart(); + // Create network components let connection_manager = ConnectionManager::new(self.clone()); let net = Network::new( diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index e39220ed..1cc1e7b1 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -384,6 +384,21 @@ impl Network { //////////////////////////////////////////////////////////// + // Record DialInfo failures + pub async fn record_dial_info_failure>>>( + &self, + dial_info: DialInfo, + fut: F, + ) -> EyreResult> { + let network_result = fut.await?; + if matches!(network_result, NetworkResult::NoConnection(_)) { + self.network_manager() + .address_filter() + .set_dial_info_failed(dial_info); + } + Ok(network_result) + } + // Send data to a dial info, unbound, using a new connection from a random port // This creates a short-lived connection in the case of connection-oriented protocols // for the purpose of sending this one message. @@ -394,59 +409,62 @@ impl Network { dial_info: DialInfo, data: Vec, ) -> EyreResult> { - let data_len = data.len(); - let connect_timeout_ms = { - let c = self.config.get(); - c.network.connection_initial_timeout_ms - }; + self.record_dial_info_failure(dial_info.clone(), async move { + let data_len = data.len(); + let connect_timeout_ms = { + let c = self.config.get(); + c.network.connection_initial_timeout_ms + }; - if self - .network_manager() - .address_filter() - .is_ip_addr_punished(dial_info.address().to_ip_addr()) - { - return Ok(NetworkResult::no_connection_other("punished")); - } + if self + .network_manager() + .address_filter() + .is_ip_addr_punished(dial_info.address().to_ip_addr()) + { + return Ok(NetworkResult::no_connection_other("punished")); + } - match dial_info.protocol_type() { - ProtocolType::UDP => { - let peer_socket_addr = dial_info.to_socket_addr(); - let h = RawUdpProtocolHandler::new_unspecified_bound_handler(&peer_socket_addr) + match dial_info.protocol_type() { + ProtocolType::UDP => { + let peer_socket_addr = dial_info.to_socket_addr(); + let h = RawUdpProtocolHandler::new_unspecified_bound_handler(&peer_socket_addr) + .await + .wrap_err("create socket failure")?; + let _ = network_result_try!(h + .send_message(data, peer_socket_addr) + .await + .map(NetworkResult::Value) + .wrap_err("send message failure")?); + } + ProtocolType::TCP => { + let peer_socket_addr = dial_info.to_socket_addr(); + let pnc = network_result_try!(RawTcpProtocolHandler::connect( + None, + peer_socket_addr, + connect_timeout_ms + ) .await - .wrap_err("create socket failure")?; - let _ = network_result_try!(h - .send_message(data, peer_socket_addr) + .wrap_err("connect failure")?); + network_result_try!(pnc.send(data).await.wrap_err("send failure")?); + } + ProtocolType::WS | ProtocolType::WSS => { + let pnc = network_result_try!(WebsocketProtocolHandler::connect( + None, + &dial_info, + connect_timeout_ms + ) .await - .map(NetworkResult::Value) - .wrap_err("send message failure")?); + .wrap_err("connect failure")?); + network_result_try!(pnc.send(data).await.wrap_err("send failure")?); + } } - ProtocolType::TCP => { - let peer_socket_addr = dial_info.to_socket_addr(); - let pnc = network_result_try!(RawTcpProtocolHandler::connect( - None, - peer_socket_addr, - connect_timeout_ms - ) - .await - .wrap_err("connect failure")?); - network_result_try!(pnc.send(data).await.wrap_err("send failure")?); - } - ProtocolType::WS | ProtocolType::WSS => { - let pnc = network_result_try!(WebsocketProtocolHandler::connect( - None, - &dial_info, - connect_timeout_ms - ) - .await - .wrap_err("connect failure")?); - network_result_try!(pnc.send(data).await.wrap_err("send failure")?); - } - } - // Network accounting - self.network_manager() - .stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); + // Network accounting + self.network_manager() + .stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); - Ok(NetworkResult::Value(())) + Ok(NetworkResult::Value(())) + }) + .await } // Send data to a dial info, unbound, using a new connection from a random port @@ -461,85 +479,95 @@ impl Network { data: Vec, timeout_ms: u32, ) -> EyreResult>> { - let data_len = data.len(); - let connect_timeout_ms = { - let c = self.config.get(); - c.network.connection_initial_timeout_ms - }; + self.record_dial_info_failure(dial_info.clone(), async move { + let data_len = data.len(); + let connect_timeout_ms = { + let c = self.config.get(); + c.network.connection_initial_timeout_ms + }; - if self - .network_manager() - .address_filter() - .is_ip_addr_punished(dial_info.address().to_ip_addr()) - { - return Ok(NetworkResult::no_connection_other("punished")); - } - - match dial_info.protocol_type() { - ProtocolType::UDP => { - let peer_socket_addr = dial_info.to_socket_addr(); - let h = RawUdpProtocolHandler::new_unspecified_bound_handler(&peer_socket_addr) - .await - .wrap_err("create socket failure")?; - network_result_try!(h - .send_message(data, peer_socket_addr) - .await - .wrap_err("send message failure")?); - self.network_manager() - .stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); - - // receive single response - let mut out = vec![0u8; MAX_MESSAGE_SIZE]; - let (recv_len, recv_addr) = network_result_try!(timeout( - timeout_ms, - h.recv_message(&mut out).instrument(Span::current()) - ) - .await - .into_network_result()) - .wrap_err("recv_message failure")?; - - let recv_socket_addr = recv_addr.remote_address().to_socket_addr(); - self.network_manager() - .stats_packet_rcvd(recv_socket_addr.ip(), ByteCount::new(recv_len as u64)); - - // if the from address is not the same as the one we sent to, then drop this - if recv_socket_addr != peer_socket_addr { - bail!("wrong address"); - } - out.resize(recv_len, 0u8); - Ok(NetworkResult::Value(out)) + if self + .network_manager() + .address_filter() + .is_ip_addr_punished(dial_info.address().to_ip_addr()) + { + return Ok(NetworkResult::no_connection_other("punished")); } - ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS => { - let pnc = network_result_try!(match dial_info.protocol_type() { - ProtocolType::UDP => unreachable!(), - ProtocolType::TCP => { - let peer_socket_addr = dial_info.to_socket_addr(); - RawTcpProtocolHandler::connect(None, peer_socket_addr, connect_timeout_ms) - .await - .wrap_err("connect failure")? - } - ProtocolType::WS | ProtocolType::WSS => { - WebsocketProtocolHandler::connect(None, &dial_info, connect_timeout_ms) - .await - .wrap_err("connect failure")? - } - }); - network_result_try!(pnc.send(data).await.wrap_err("send failure")?); - self.network_manager() - .stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); + match dial_info.protocol_type() { + ProtocolType::UDP => { + let peer_socket_addr = dial_info.to_socket_addr(); + let h = RawUdpProtocolHandler::new_unspecified_bound_handler(&peer_socket_addr) + .await + .wrap_err("create socket failure")?; + network_result_try!(h + .send_message(data, peer_socket_addr) + .await + .wrap_err("send message failure")?); + self.network_manager() + .stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); - let out = network_result_try!(network_result_try!(timeout(timeout_ms, pnc.recv()) + // receive single response + let mut out = vec![0u8; MAX_MESSAGE_SIZE]; + let (recv_len, recv_addr) = network_result_try!(timeout( + timeout_ms, + h.recv_message(&mut out).instrument(Span::current()) + ) .await .into_network_result()) - .wrap_err("recv failure")?); + .wrap_err("recv_message failure")?; - self.network_manager() - .stats_packet_rcvd(dial_info.to_ip_addr(), ByteCount::new(out.len() as u64)); + let recv_socket_addr = recv_addr.remote_address().to_socket_addr(); + self.network_manager() + .stats_packet_rcvd(recv_socket_addr.ip(), ByteCount::new(recv_len as u64)); - Ok(NetworkResult::Value(out)) + // if the from address is not the same as the one we sent to, then drop this + if recv_socket_addr != peer_socket_addr { + bail!("wrong address"); + } + out.resize(recv_len, 0u8); + Ok(NetworkResult::Value(out)) + } + ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS => { + let pnc = network_result_try!(match dial_info.protocol_type() { + ProtocolType::UDP => unreachable!(), + ProtocolType::TCP => { + let peer_socket_addr = dial_info.to_socket_addr(); + RawTcpProtocolHandler::connect( + None, + peer_socket_addr, + connect_timeout_ms, + ) + .await + .wrap_err("connect failure")? + } + ProtocolType::WS | ProtocolType::WSS => { + WebsocketProtocolHandler::connect(None, &dial_info, connect_timeout_ms) + .await + .wrap_err("connect failure")? + } + }); + + network_result_try!(pnc.send(data).await.wrap_err("send failure")?); + self.network_manager() + .stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); + + let out = + network_result_try!(network_result_try!(timeout(timeout_ms, pnc.recv()) + .await + .into_network_result()) + .wrap_err("recv failure")?); + + self.network_manager().stats_packet_rcvd( + dial_info.to_ip_addr(), + ByteCount::new(out.len() as u64), + ); + + Ok(NetworkResult::Value(out)) + } } - } + }) + .await } #[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))] @@ -609,41 +637,44 @@ impl Network { dial_info: DialInfo, data: Vec, ) -> EyreResult> { - let data_len = data.len(); - let connection_descriptor; - if dial_info.protocol_type() == ProtocolType::UDP { - // Handle connectionless protocol - let peer_socket_addr = dial_info.to_socket_addr(); - let ph = match self.find_best_udp_protocol_handler(&peer_socket_addr, &None) { - Some(ph) => ph, - None => bail!("no appropriate UDP protocol handler for dial_info"), - }; - connection_descriptor = network_result_try!(ph - .send_message(data, peer_socket_addr) - .await - .wrap_err("failed to send data to dial info")?); - } else { - // Handle connection-oriented protocols - let conn = network_result_try!( - self.connection_manager() - .get_or_create_connection(dial_info.clone()) - .await? - ); + self.record_dial_info_failure(dial_info.clone(), async move { + let data_len = data.len(); + let connection_descriptor; + if dial_info.protocol_type() == ProtocolType::UDP { + // Handle connectionless protocol + let peer_socket_addr = dial_info.to_socket_addr(); + let ph = match self.find_best_udp_protocol_handler(&peer_socket_addr, &None) { + Some(ph) => ph, + None => bail!("no appropriate UDP protocol handler for dial_info"), + }; + connection_descriptor = network_result_try!(ph + .send_message(data, peer_socket_addr) + .await + .wrap_err("failed to send data to dial info")?); + } else { + // Handle connection-oriented protocols + let conn = network_result_try!( + self.connection_manager() + .get_or_create_connection(dial_info.clone()) + .await? + ); - if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await { - return Ok(NetworkResult::NoConnection(io::Error::new( - io::ErrorKind::ConnectionReset, - "failed to send", - ))); + if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await { + return Ok(NetworkResult::NoConnection(io::Error::new( + io::ErrorKind::ConnectionReset, + "failed to send", + ))); + } + connection_descriptor = conn.connection_descriptor(); } - connection_descriptor = conn.connection_descriptor(); - } - // Network accounting - self.network_manager() - .stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); + // Network accounting + self.network_manager() + .stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); - Ok(NetworkResult::value(connection_descriptor)) + Ok(NetworkResult::value(connection_descriptor)) + }) + .await } ///////////////////////////////////////////////////////////////// diff --git a/veilid-core/src/network_manager/send_data.rs b/veilid-core/src/network_manager/send_data.rs index 70d9ce90..64aa550f 100644 --- a/veilid-core/src/network_manager/send_data.rs +++ b/veilid-core/src/network_manager/send_data.rs @@ -357,6 +357,24 @@ impl NetworkManager { // log_net!(debug "Node contact failing over to Ordered for {}", target_node_ref.to_string().cyan()); // sequencing = Sequencing::PreferOrdered; // } + + // Deprioritize dial info that have recently failed + let address_filter = self.address_filter(); + let mut dial_info_failures_map = BTreeMap::::new(); + for did in peer_b.signed_node_info().node_info().all_filtered_dial_info_details(DialInfoDetail::NO_SORT, |_| true) { + if let Some(ts) = address_filter.get_dial_info_failed_ts(&did.dial_info) { + dial_info_failures_map.insert(did.dial_info, ts); + } + } + let dif_sort: Option core::cmp::Ordering>> = if dial_info_failures_map.is_empty() { + None + } else { + Some(Arc::new(move |a: &DialInfoDetail, b: &DialInfoDetail| { + let ats = dial_info_failures_map.get(&a.dial_info).copied().unwrap_or_default(); + let bts = dial_info_failures_map.get(&b.dial_info).copied().unwrap_or_default(); + ats.cmp(&bts) + })) + }; // Get the best contact method with these parameters from the routing domain let cm = routing_table.get_contact_method( @@ -365,6 +383,7 @@ impl NetworkManager { &peer_b, dial_info_filter, sequencing, + dif_sort, ); // Translate the raw contact method to a referenced contact method diff --git a/veilid-core/src/network_manager/wasm/mod.rs b/veilid-core/src/network_manager/wasm/mod.rs index 7c3b40e0..feb9c0da 100644 --- a/veilid-core/src/network_manager/wasm/mod.rs +++ b/veilid-core/src/network_manager/wasm/mod.rs @@ -118,47 +118,66 @@ impl Network { ///////////////////////////////////////////////////////////////// + // Record DialInfo failures + pub async fn record_dial_info_failure>>>( + &self, + dial_info: DialInfo, + fut: F, + ) -> EyreResult> { + let network_result = fut.await?; + if matches!(network_result, NetworkResult::NoConnection(_)) { + self.network_manager() + .address_filter() + .set_dial_info_failed(dial_info); + } + Ok(network_result) + } + #[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))] pub async fn send_data_unbound_to_dial_info( &self, dial_info: DialInfo, data: Vec, ) -> EyreResult> { - let data_len = data.len(); - let timeout_ms = { - let c = self.config.get(); - c.network.connection_initial_timeout_ms - }; + self.record_dial_info_failure(dial_info.clone(), async move { + let data_len = data.len(); + let timeout_ms = { + let c = self.config.get(); + c.network.connection_initial_timeout_ms + }; - if self - .network_manager() - .address_filter() - .is_ip_addr_punished(dial_info.address().to_ip_addr()) - { - return Ok(NetworkResult::no_connection_other("punished")); - } - - match dial_info.protocol_type() { - ProtocolType::UDP => { - bail!("no support for UDP protocol") + if self + .network_manager() + .address_filter() + .is_ip_addr_punished(dial_info.address().to_ip_addr()) + { + return Ok(NetworkResult::no_connection_other("punished")); } - ProtocolType::TCP => { - bail!("no support for TCP protocol") - } - ProtocolType::WS | ProtocolType::WSS => { - let pnc = - network_result_try!(WebsocketProtocolHandler::connect(&dial_info, timeout_ms) - .await - .wrap_err("connect failure")?); - network_result_try!(pnc.send(data).await.wrap_err("send failure")?); - } - }; - // Network accounting - self.network_manager() - .stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); + match dial_info.protocol_type() { + ProtocolType::UDP => { + bail!("no support for UDP protocol") + } + ProtocolType::TCP => { + bail!("no support for TCP protocol") + } + ProtocolType::WS | ProtocolType::WSS => { + let pnc = network_result_try!(WebsocketProtocolHandler::connect( + &dial_info, timeout_ms + ) + .await + .wrap_err("connect failure")?); + network_result_try!(pnc.send(data).await.wrap_err("send failure")?); + } + }; - Ok(NetworkResult::Value(())) + // Network accounting + self.network_manager() + .stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); + + Ok(NetworkResult::Value(())) + }) + .await } // Send data to a dial info, unbound, using a new connection from a random port @@ -173,53 +192,59 @@ impl Network { data: Vec, timeout_ms: u32, ) -> EyreResult>> { - let data_len = data.len(); - let connect_timeout_ms = { - let c = self.config.get(); - c.network.connection_initial_timeout_ms - }; + self.record_dial_info_failure(dial_info.clone(), async move { + let data_len = data.len(); + let connect_timeout_ms = { + let c = self.config.get(); + c.network.connection_initial_timeout_ms + }; - if self - .network_manager() - .address_filter() - .is_ip_addr_punished(dial_info.address().to_ip_addr()) - { - return Ok(NetworkResult::no_connection_other("punished")); - } + if self + .network_manager() + .address_filter() + .is_ip_addr_punished(dial_info.address().to_ip_addr()) + { + return Ok(NetworkResult::no_connection_other("punished")); + } - match dial_info.protocol_type() { - ProtocolType::UDP => { - bail!("no support for UDP protocol") - } - ProtocolType::TCP => { - bail!("no support for TCP protocol") - } - ProtocolType::WS | ProtocolType::WSS => { - let pnc = network_result_try!(match dial_info.protocol_type() { - ProtocolType::UDP => unreachable!(), - ProtocolType::TCP => unreachable!(), - ProtocolType::WS | ProtocolType::WSS => { - WebsocketProtocolHandler::connect(&dial_info, connect_timeout_ms) + match dial_info.protocol_type() { + ProtocolType::UDP => { + bail!("no support for UDP protocol") + } + ProtocolType::TCP => { + bail!("no support for TCP protocol") + } + ProtocolType::WS | ProtocolType::WSS => { + let pnc = network_result_try!(match dial_info.protocol_type() { + ProtocolType::UDP => unreachable!(), + ProtocolType::TCP => unreachable!(), + ProtocolType::WS | ProtocolType::WSS => { + WebsocketProtocolHandler::connect(&dial_info, connect_timeout_ms) + .await + .wrap_err("connect failure")? + } + }); + + network_result_try!(pnc.send(data).await.wrap_err("send failure")?); + self.network_manager() + .stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); + + let out = + network_result_try!(network_result_try!(timeout(timeout_ms, pnc.recv()) .await - .wrap_err("connect failure")? - } - }); + .into_network_result()) + .wrap_err("recv failure")?); - network_result_try!(pnc.send(data).await.wrap_err("send failure")?); - self.network_manager() - .stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); + self.network_manager().stats_packet_rcvd( + dial_info.to_ip_addr(), + ByteCount::new(out.len() as u64), + ); - let out = network_result_try!(network_result_try!(timeout(timeout_ms, pnc.recv()) - .await - .into_network_result()) - .wrap_err("recv failure")?); - - self.network_manager() - .stats_packet_rcvd(dial_info.to_ip_addr(), ByteCount::new(out.len() as u64)); - - Ok(NetworkResult::Value(out)) + Ok(NetworkResult::Value(out)) + } } - } + }) + .await } #[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))] @@ -273,34 +298,37 @@ impl Network { dial_info: DialInfo, data: Vec, ) -> EyreResult> { - let data_len = data.len(); - if dial_info.protocol_type() == ProtocolType::UDP { - bail!("no support for UDP protocol"); - } - if dial_info.protocol_type() == ProtocolType::TCP { - bail!("no support for TCP protocol"); - } + self.record_dial_info_failure(dial_info.clone(), async move { + let data_len = data.len(); + if dial_info.protocol_type() == ProtocolType::UDP { + bail!("no support for UDP protocol"); + } + if dial_info.protocol_type() == ProtocolType::TCP { + bail!("no support for TCP protocol"); + } - // Handle connection-oriented protocols - let conn = network_result_try!( - self.connection_manager() - .get_or_create_connection(dial_info.clone()) - .await? - ); + // Handle connection-oriented protocols + let conn = network_result_try!( + self.connection_manager() + .get_or_create_connection(dial_info.clone()) + .await? + ); - if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await { - return Ok(NetworkResult::NoConnection(io::Error::new( - io::ErrorKind::ConnectionReset, - "failed to send", - ))); - } - let connection_descriptor = conn.connection_descriptor(); + if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await { + return Ok(NetworkResult::NoConnection(io::Error::new( + io::ErrorKind::ConnectionReset, + "failed to send", + ))); + } + let connection_descriptor = conn.connection_descriptor(); - // Network accounting - self.network_manager() - .stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); + // Network accounting + self.network_manager() + .stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); - Ok(NetworkResult::value(connection_descriptor)) + Ok(NetworkResult::value(connection_descriptor)) + }) + .await } ///////////////////////////////////////////////////////////////// diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 2f63df4a..82d9ed78 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -538,6 +538,7 @@ impl RoutingTable { peer_b: &PeerInfo, dial_info_filter: DialInfoFilter, sequencing: Sequencing, + dif_sort: Option core::cmp::Ordering>>, ) -> ContactMethod { self.inner.read().get_contact_method( routing_domain, @@ -545,6 +546,7 @@ impl RoutingTable { peer_b, dial_info_filter, sequencing, + dif_sort, ) } diff --git a/veilid-core/src/routing_table/route_spec_store/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store/route_spec_store.rs index fb85fcc9..1889abca 100644 --- a/veilid-core/src/routing_table/route_spec_store/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store/route_spec_store.rs @@ -401,6 +401,7 @@ impl RouteSpecStore { current_node, DialInfoFilter::all(), sequencing, + None, ); if matches!(cm, ContactMethod::Unreachable) { reachable = false; @@ -415,6 +416,7 @@ impl RouteSpecStore { current_node, DialInfoFilter::all(), Sequencing::EnsureOrdered, + None, ); if matches!(cm, ContactMethod::Unreachable) { can_do_sequenced = false; @@ -438,6 +440,7 @@ impl RouteSpecStore { current_node, DialInfoFilter::all(), sequencing, + None, ); if matches!(cm, ContactMethod::Unreachable) { reachable = false; @@ -452,6 +455,7 @@ impl RouteSpecStore { current_node, DialInfoFilter::all(), Sequencing::EnsureOrdered, + None, ); if matches!(cm, ContactMethod::Unreachable) { can_do_sequenced = false; diff --git a/veilid-core/src/routing_table/routing_domains.rs b/veilid-core/src/routing_table/routing_domains.rs index 7f60cc04..d92dddc9 100644 --- a/veilid-core/src/routing_table/routing_domains.rs +++ b/veilid-core/src/routing_table/routing_domains.rs @@ -220,6 +220,7 @@ pub trait RoutingDomainDetail { peer_b: &PeerInfo, dial_info_filter: DialInfoFilter, sequencing: Sequencing, + dif_sort: Option core::cmp::Ordering>>, ) -> ContactMethod; } @@ -245,6 +246,7 @@ fn first_filtered_dial_info_detail_between_nodes( to_node: &NodeInfo, dial_info_filter: &DialInfoFilter, sequencing: Sequencing, + dif_sort: Option core::cmp::Ordering>> ) -> Option { let dial_info_filter = dial_info_filter.clone().filtered( &DialInfoFilter::all() @@ -253,11 +255,28 @@ fn first_filtered_dial_info_detail_between_nodes( ); // Apply sequencing and get sort + // Include sorting by external dial info sort for rotating through dialinfo + // based on an external preference table, for example the one kept by + // AddressFilter to deprioritize dialinfo that have recently failed to connect let (ordered, dial_info_filter) = dial_info_filter.with_sequencing(sequencing); - let sort = if ordered { - Some(DialInfoDetail::ordered_sequencing_sort) + let sort: Option core::cmp::Ordering>> = if ordered { + if let Some(dif_sort) = dif_sort { + Some(Box::new(move |a, b| { + let mut ord = dif_sort(a,b); + if ord == core::cmp::Ordering::Equal { + ord = DialInfoDetail::ordered_sequencing_sort(a,b); + } + ord + })) + } else { + Some(Box::new(move |a,b| { DialInfoDetail::ordered_sequencing_sort(a,b) })) + } } else { - None + if let Some(dif_sort) = dif_sort { + Some(Box::new(move |a,b| { dif_sort(a,b) })) + } else { + None + } }; // If the filter is dead then we won't be able to connect @@ -287,6 +306,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { peer_b: &PeerInfo, dial_info_filter: DialInfoFilter, sequencing: Sequencing, + dif_sort: Option core::cmp::Ordering>>, ) -> ContactMethod { // Get the nodeinfos for convenience let node_a = peer_a.signed_node_info().node_info(); @@ -304,7 +324,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { // Get the best match dial info for node B if we have it if let Some(target_did) = - first_filtered_dial_info_detail_between_nodes(node_a, node_b, &dial_info_filter, sequencing) + first_filtered_dial_info_detail_between_nodes(node_a, node_b, &dial_info_filter, sequencing, dif_sort.clone()) { // Do we need to signal before going inbound? if !target_did.class.requires_signal() { @@ -334,6 +354,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { node_b_relay, &dial_info_filter, sequencing, + dif_sort.clone(), ) .is_some() { @@ -347,6 +368,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { node_a, &dial_info_filter, sequencing, + dif_sort.clone() ) { // Ensure we aren't on the same public IP address (no hairpin nat) if reverse_did.dial_info.to_ip_addr() @@ -373,6 +395,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { node_b, &udp_dial_info_filter, sequencing, + dif_sort.clone() ) { // Does node A have a direct udp dialinfo that node B can reach? if let Some(reverse_udp_did) = first_filtered_dial_info_detail_between_nodes( @@ -380,6 +403,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { node_a, &udp_dial_info_filter, sequencing, + dif_sort.clone(), ) { // Ensure we aren't on the same public IP address (no hairpin nat) if reverse_udp_did.dial_info.to_ip_addr() @@ -422,6 +446,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { &node_b_relay, &dial_info_filter, sequencing, + dif_sort.clone() ) .is_some() { @@ -496,6 +521,7 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail { peer_b: &PeerInfo, dial_info_filter: DialInfoFilter, sequencing: Sequencing, + dif_sort: Option core::cmp::Ordering>>, ) -> ContactMethod { // Scope the filter down to protocols node A can do outbound let dial_info_filter = dial_info_filter.filtered( @@ -504,20 +530,31 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail { .with_protocol_type_set(peer_a.signed_node_info().node_info().outbound_protocols()), ); - // Get first filtered dialinfo - let (sort, dial_info_filter) = match sequencing { - Sequencing::NoPreference => (None, dial_info_filter), - Sequencing::PreferOrdered => ( - Some(DialInfoDetail::ordered_sequencing_sort), - dial_info_filter, - ), - Sequencing::EnsureOrdered => ( - Some(DialInfoDetail::ordered_sequencing_sort), - dial_info_filter.filtered( - &DialInfoFilter::all().with_protocol_type_set(ProtocolType::all_ordered_set()), - ), - ), + // Apply sequencing and get sort + // Include sorting by external dial info sort for rotating through dialinfo + // based on an external preference table, for example the one kept by + // AddressFilter to deprioritize dialinfo that have recently failed to connect + let (ordered, dial_info_filter) = dial_info_filter.with_sequencing(sequencing); + let sort: Option core::cmp::Ordering>> = if ordered { + if let Some(dif_sort) = dif_sort { + Some(Box::new(move |a, b| { + let mut ord = dif_sort(a,b); + if ord == core::cmp::Ordering::Equal { + ord = DialInfoDetail::ordered_sequencing_sort(a,b); + } + ord + })) + } else { + Some(Box::new(move |a,b| { DialInfoDetail::ordered_sequencing_sort(a,b) })) + } + } else { + if let Some(dif_sort) = dif_sort { + Some(Box::new(move |a,b| { dif_sort(a,b) })) + } else { + None + } }; + // If the filter is dead then we won't be able to connect if dial_info_filter.is_dead() { return ContactMethod::Unreachable; diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index 029f5da8..b6f24908 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -226,9 +226,10 @@ impl RoutingTableInner { peer_b: &PeerInfo, dial_info_filter: DialInfoFilter, sequencing: Sequencing, + dif_sort: Option core::cmp::Ordering>>, ) -> ContactMethod { self.with_routing_domain(routing_domain, |rdd| { - rdd.get_contact_method(self, peer_a, peer_b, dial_info_filter, sequencing) + rdd.get_contact_method(self, peer_a, peer_b, dial_info_filter, sequencing, dif_sort) }) } diff --git a/veilid-flutter/example/pubspec.lock b/veilid-flutter/example/pubspec.lock index b3d588dd..73c75fc4 100644 --- a/veilid-flutter/example/pubspec.lock +++ b/veilid-flutter/example/pubspec.lock @@ -403,7 +403,7 @@ packages: path: ".." relative: true source: path - version: "0.1.9" + version: "0.1.10" web: dependency: transitive description: