diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml index ffa7f6b9..baced512 100644 --- a/veilid-core/Cargo.toml +++ b/veilid-core/Cargo.toml @@ -10,19 +10,25 @@ license = "LGPL-2.0-or-later OR MPL-2.0 OR (MIT AND BSD-3-Clause)" crate-type = ["cdylib", "staticlib", "rlib"] [features] + +# Common features default = ["enable-crypto-vld0"] -crypto-test = ["enable-crypto-vld0", "enable-crypto-none"] -crypto-test-none = ["enable-crypto-none"] -enable-crypto-vld0 = [] -enable-crypto-none = [] -verbose-tracing = [] rt-async-std = ["async-std", "async-std-resolver", "async_executors/async_std", "rtnetlink/smol_socket", "veilid-tools/rt-async-std"] rt-tokio = ["tokio", "tokio-util", "tokio-stream", "trust-dns-resolver/tokio-runtime", "async_executors/tokio_tp", "async_executors/tokio_io", "async_executors/tokio_timer", "rtnetlink/tokio_socket", "veilid-tools/rt-tokio"] rt-wasm-bindgen = ["veilid-tools/rt-wasm-bindgen", "async_executors/bindgen"] +# Crypto support features +enable-crypto-vld0 = [] +enable-crypto-none = [] + +# Debugging and testing features +verbose-tracing = [] +tracking = [] +debug-dht = [] +crypto-test = ["enable-crypto-vld0", "enable-crypto-none"] +crypto-test-none = ["enable-crypto-none"] veilid_core_android_tests = ["dep:paranoid-android"] veilid_core_ios_tests = ["dep:tracing-oslog"] -tracking = [] network-result-extra = ["veilid-tools/network-result-extra"] [dependencies] diff --git a/veilid-core/src/network_manager/address_filter.rs b/veilid-core/src/network_manager/address_filter.rs new file mode 100644 index 00000000..9818c671 --- /dev/null +++ b/veilid-core/src/network_manager/address_filter.rs @@ -0,0 +1,311 @@ +use super::*; +use alloc::collections::btree_map::Entry; + +// XXX: Move to config eventually? +const PUNISHMENT_DURATION_MIN: usize = 60; + +#[derive(ThisError, Debug, Clone, Copy, PartialEq, Eq)] +pub enum AddressFilterError { + #[error("Count exceeded")] + CountExceeded, + #[error("Rate exceeded")] + RateExceeded, + #[error("Address is punished")] + Punished, +} + +#[derive(ThisError, Debug, Clone, Copy, PartialEq, Eq)] +#[error("Address not in table")] +pub struct AddressNotInTableError {} + +#[derive(Debug)] +struct AddressFilterInner { + conn_count_by_ip4: BTreeMap, + conn_count_by_ip6_prefix: BTreeMap, + conn_timestamps_by_ip4: BTreeMap>, + conn_timestamps_by_ip6_prefix: BTreeMap>, + punishments_by_ip4: BTreeMap, + punishments_by_ip6_prefix: BTreeMap, +} + +#[derive(Debug)] +struct AddressFilterUnlockedInner { + max_connections_per_ip4: usize, + max_connections_per_ip6_prefix: usize, + max_connections_per_ip6_prefix_size: usize, + max_connection_frequency_per_min: usize, + punishment_duration_min: usize, +} + +#[derive(Clone, Debug)] +pub struct AddressFilter { + unlocked_inner: Arc, + inner: Arc>, +} + +impl AddressFilter { + pub fn new(config: VeilidConfig) -> Self { + let c = config.get(); + Self { + unlocked_inner: Arc::new(AddressFilterUnlockedInner { + max_connections_per_ip4: c.network.max_connections_per_ip4 as usize, + max_connections_per_ip6_prefix: c.network.max_connections_per_ip6_prefix as usize, + max_connections_per_ip6_prefix_size: c.network.max_connections_per_ip6_prefix_size + as usize, + max_connection_frequency_per_min: c.network.max_connection_frequency_per_min + as usize, + punishment_duration_min: PUNISHMENT_DURATION_MIN, + }), + inner: Arc::new(Mutex::new(AddressFilterInner { + conn_count_by_ip4: BTreeMap::new(), + conn_count_by_ip6_prefix: BTreeMap::new(), + conn_timestamps_by_ip4: BTreeMap::new(), + conn_timestamps_by_ip6_prefix: BTreeMap::new(), + punishments_by_ip4: BTreeMap::new(), + punishments_by_ip6_prefix: BTreeMap::new(), + })), + } + } + + fn purge_old_timestamps(&self, inner: &mut AddressFilterInner, cur_ts: Timestamp) { + // v4 + { + let mut dead_keys = Vec::::new(); + for (key, value) in &mut inner.conn_timestamps_by_ip4 { + value.retain(|v| { + // keep timestamps that are less than a minute away + cur_ts.saturating_sub(*v) < TimestampDuration::new(60_000_000u64) + }); + if value.is_empty() { + dead_keys.push(*key); + } + } + for key in dead_keys { + inner.conn_timestamps_by_ip4.remove(&key); + } + } + // v6 + { + let mut dead_keys = Vec::::new(); + for (key, value) in &mut inner.conn_timestamps_by_ip6_prefix { + value.retain(|v| { + // keep timestamps that are less than a minute away + cur_ts.saturating_sub(*v) < TimestampDuration::new(60_000_000u64) + }); + if value.is_empty() { + dead_keys.push(*key); + } + } + for key in dead_keys { + inner.conn_timestamps_by_ip6_prefix.remove(&key); + } + } + } + + fn purge_old_punishments(&self, inner: &mut AddressFilterInner, cur_ts: Timestamp) { + // v4 + { + let mut dead_keys = Vec::::new(); + for (key, value) in &mut inner.punishments_by_ip4 { + // Drop punishments older than the punishment duration + if cur_ts.as_u64().saturating_sub(value.as_u64()) + > self.unlocked_inner.punishment_duration_min as u64 * 60_000_000u64 + { + dead_keys.push(*key); + } + } + for key in dead_keys { + inner.punishments_by_ip4.remove(&key); + } + } + // v6 + { + let mut dead_keys = Vec::::new(); + for (key, value) in &mut inner.punishments_by_ip6_prefix { + // Drop punishments older than the punishment duration + if cur_ts.as_u64().saturating_sub(value.as_u64()) + > self.unlocked_inner.punishment_duration_min as u64 * 60_000_000u64 + { + dead_keys.push(*key); + } + } + for key in dead_keys { + inner.punishments_by_ip6_prefix.remove(&key); + } + } + } + + fn is_punished_inner(&self, inner: &AddressFilterInner, ipblock: IpAddr) -> bool { + match ipblock { + IpAddr::V4(v4) => { + if inner.punishments_by_ip4.contains_key(&v4) { + return true; + } + } + IpAddr::V6(v6) => { + if inner.punishments_by_ip6_prefix.contains_key(&v6) { + return true; + } + } + } + false + } + + pub fn is_punished(&self, addr: IpAddr) -> bool { + let inner = self.inner.lock(); + let ipblock = ip_to_ipblock( + self.unlocked_inner.max_connections_per_ip6_prefix_size, + addr, + ); + self.is_punished_inner(&*inner, ipblock) + } + + pub fn punish(&self, addr: IpAddr) { + log_net!(debug ">>> PUNISHED: {}", addr); + let ts = get_aligned_timestamp(); + + let ipblock = ip_to_ipblock( + self.unlocked_inner.max_connections_per_ip6_prefix_size, + addr, + ); + + let mut inner = self.inner.lock(); + match ipblock { + IpAddr::V4(v4) => inner + .punishments_by_ip4 + .entry(v4) + .and_modify(|v| *v = ts) + .or_insert(ts), + IpAddr::V6(v6) => inner + .punishments_by_ip6_prefix + .entry(v6) + .and_modify(|v| *v = ts) + .or_insert(ts), + }; + } + + pub async fn address_filter_task_routine( + self, + _stop_token: StopToken, + _last_ts: Timestamp, + cur_ts: Timestamp, + ) -> EyreResult<()> { + // + let mut inner = self.inner.lock(); + self.purge_old_timestamps(&mut *inner, cur_ts); + self.purge_old_punishments(&mut *inner, cur_ts); + + Ok(()) + } + + pub fn add_connection(&self, addr: IpAddr) -> Result<(), AddressFilterError> { + let inner = &mut *self.inner.lock(); + + let ipblock = ip_to_ipblock( + self.unlocked_inner.max_connections_per_ip6_prefix_size, + addr, + ); + if self.is_punished_inner(inner, ipblock) { + return Err(AddressFilterError::Punished); + } + + let ts = get_aligned_timestamp(); + self.purge_old_timestamps(inner, ts); + + match ipblock { + IpAddr::V4(v4) => { + // See if we have too many connections from this ip block + let cnt = inner.conn_count_by_ip4.entry(v4).or_default(); + assert!(*cnt <= self.unlocked_inner.max_connections_per_ip4); + if *cnt == self.unlocked_inner.max_connections_per_ip4 { + warn!("address filter count exceeded: {:?}", v4); + return Err(AddressFilterError::CountExceeded); + } + // See if this ip block has connected too frequently + let tstamps = inner.conn_timestamps_by_ip4.entry(v4).or_default(); + tstamps.retain(|v| { + // keep timestamps that are less than a minute away + ts.saturating_sub(*v) < TimestampDuration::new(60_000_000u64) + }); + assert!(tstamps.len() <= self.unlocked_inner.max_connection_frequency_per_min); + if tstamps.len() == self.unlocked_inner.max_connection_frequency_per_min { + warn!("address filter rate exceeded: {:?}", v4); + return Err(AddressFilterError::RateExceeded); + } + + // If it's okay, add the counts and timestamps + *cnt += 1; + tstamps.push(ts); + } + IpAddr::V6(v6) => { + // See if we have too many connections from this ip block + let cnt = inner.conn_count_by_ip6_prefix.entry(v6).or_default(); + assert!(*cnt <= self.unlocked_inner.max_connections_per_ip6_prefix); + if *cnt == self.unlocked_inner.max_connections_per_ip6_prefix { + warn!("address filter count exceeded: {:?}", v6); + return Err(AddressFilterError::CountExceeded); + } + // See if this ip block has connected too frequently + let tstamps = inner.conn_timestamps_by_ip6_prefix.entry(v6).or_default(); + assert!(tstamps.len() <= self.unlocked_inner.max_connection_frequency_per_min); + if tstamps.len() == self.unlocked_inner.max_connection_frequency_per_min { + warn!("address filter rate exceeded: {:?}", v6); + return Err(AddressFilterError::RateExceeded); + } + + // If it's okay, add the counts and timestamps + *cnt += 1; + tstamps.push(ts); + } + } + Ok(()) + } + + pub fn remove_connection(&mut self, addr: IpAddr) -> Result<(), AddressNotInTableError> { + let mut inner = self.inner.lock(); + + let ipblock = ip_to_ipblock( + self.unlocked_inner.max_connections_per_ip6_prefix_size, + addr, + ); + + let ts = get_aligned_timestamp(); + self.purge_old_timestamps(&mut *inner, ts); + + match ipblock { + IpAddr::V4(v4) => { + match inner.conn_count_by_ip4.entry(v4) { + Entry::Vacant(_) => { + return Err(AddressNotInTableError {}); + } + Entry::Occupied(mut o) => { + let cnt = o.get_mut(); + assert!(*cnt > 0); + if *cnt == 0 { + inner.conn_count_by_ip4.remove(&v4); + } else { + *cnt -= 1; + } + } + }; + } + IpAddr::V6(v6) => { + match inner.conn_count_by_ip6_prefix.entry(v6) { + Entry::Vacant(_) => { + return Err(AddressNotInTableError {}); + } + Entry::Occupied(mut o) => { + let cnt = o.get_mut(); + assert!(*cnt > 0); + if *cnt == 0 { + inner.conn_count_by_ip6_prefix.remove(&v6); + } else { + *cnt -= 1; + } + } + }; + } + } + Ok(()) + } +} diff --git a/veilid-core/src/network_manager/connection_limits.rs b/veilid-core/src/network_manager/connection_limits.rs deleted file mode 100644 index c118c4e9..00000000 --- a/veilid-core/src/network_manager/connection_limits.rs +++ /dev/null @@ -1,176 +0,0 @@ -use super::*; -use alloc::collections::btree_map::Entry; - -#[derive(ThisError, Debug, Clone, Copy, PartialEq, Eq)] -pub enum AddressFilterError { - #[error("Count exceeded")] - CountExceeded, - #[error("Rate exceeded")] - RateExceeded, -} - -#[derive(ThisError, Debug, Clone, Copy, PartialEq, Eq)] -#[error("Address not in table")] -pub struct AddressNotInTableError {} - -#[derive(Debug)] -pub struct ConnectionLimits { - max_connections_per_ip4: usize, - max_connections_per_ip6_prefix: usize, - max_connections_per_ip6_prefix_size: usize, - max_connection_frequency_per_min: usize, - conn_count_by_ip4: BTreeMap, - conn_count_by_ip6_prefix: BTreeMap, - conn_timestamps_by_ip4: BTreeMap>, - conn_timestamps_by_ip6_prefix: BTreeMap>, -} - -impl ConnectionLimits { - pub fn new(config: VeilidConfig) -> Self { - let c = config.get(); - Self { - max_connections_per_ip4: c.network.max_connections_per_ip4 as usize, - max_connections_per_ip6_prefix: c.network.max_connections_per_ip6_prefix as usize, - max_connections_per_ip6_prefix_size: c.network.max_connections_per_ip6_prefix_size - as usize, - max_connection_frequency_per_min: c.network.max_connection_frequency_per_min as usize, - conn_count_by_ip4: BTreeMap::new(), - conn_count_by_ip6_prefix: BTreeMap::new(), - conn_timestamps_by_ip4: BTreeMap::new(), - conn_timestamps_by_ip6_prefix: BTreeMap::new(), - } - } - - fn purge_old_timestamps(&mut self, cur_ts: Timestamp) { - // v4 - { - let mut dead_keys = Vec::::new(); - for (key, value) in &mut self.conn_timestamps_by_ip4 { - value.retain(|v| { - // keep timestamps that are less than a minute away - cur_ts.saturating_sub(*v) < TimestampDuration::new(60_000_000u64) - }); - if value.is_empty() { - dead_keys.push(*key); - } - } - for key in dead_keys { - self.conn_timestamps_by_ip4.remove(&key); - } - } - // v6 - { - let mut dead_keys = Vec::::new(); - for (key, value) in &mut self.conn_timestamps_by_ip6_prefix { - value.retain(|v| { - // keep timestamps that are less than a minute away - cur_ts.saturating_sub(*v) < TimestampDuration::new(60_000_000u64) - }); - if value.is_empty() { - dead_keys.push(*key); - } - } - for key in dead_keys { - self.conn_timestamps_by_ip6_prefix.remove(&key); - } - } - } - - pub fn add(&mut self, addr: IpAddr) -> Result<(), AddressFilterError> { - let ipblock = ip_to_ipblock(self.max_connections_per_ip6_prefix_size, addr); - let ts = get_aligned_timestamp(); - - self.purge_old_timestamps(ts); - - match ipblock { - IpAddr::V4(v4) => { - // See if we have too many connections from this ip block - let cnt = &mut *self.conn_count_by_ip4.entry(v4).or_default(); - assert!(*cnt <= self.max_connections_per_ip4); - if *cnt == self.max_connections_per_ip4 { - warn!("address filter count exceeded: {:?}", v4); - return Err(AddressFilterError::CountExceeded); - } - // See if this ip block has connected too frequently - let tstamps = &mut self.conn_timestamps_by_ip4.entry(v4).or_default(); - tstamps.retain(|v| { - // keep timestamps that are less than a minute away - ts.saturating_sub(*v) < TimestampDuration::new(60_000_000u64) - }); - assert!(tstamps.len() <= self.max_connection_frequency_per_min); - if tstamps.len() == self.max_connection_frequency_per_min { - warn!("address filter rate exceeded: {:?}", v4); - return Err(AddressFilterError::RateExceeded); - } - - // If it's okay, add the counts and timestamps - *cnt += 1; - tstamps.push(ts); - } - IpAddr::V6(v6) => { - // See if we have too many connections from this ip block - let cnt = &mut *self.conn_count_by_ip6_prefix.entry(v6).or_default(); - assert!(*cnt <= self.max_connections_per_ip6_prefix); - if *cnt == self.max_connections_per_ip6_prefix { - warn!("address filter count exceeded: {:?}", v6); - return Err(AddressFilterError::CountExceeded); - } - // See if this ip block has connected too frequently - let tstamps = &mut self.conn_timestamps_by_ip6_prefix.entry(v6).or_default(); - assert!(tstamps.len() <= self.max_connection_frequency_per_min); - if tstamps.len() == self.max_connection_frequency_per_min { - warn!("address filter rate exceeded: {:?}", v6); - return Err(AddressFilterError::RateExceeded); - } - - // If it's okay, add the counts and timestamps - *cnt += 1; - tstamps.push(ts); - } - } - Ok(()) - } - - pub fn remove(&mut self, addr: IpAddr) -> Result<(), AddressNotInTableError> { - let ipblock = ip_to_ipblock(self.max_connections_per_ip6_prefix_size, addr); - - let ts = get_aligned_timestamp(); - self.purge_old_timestamps(ts); - - match ipblock { - IpAddr::V4(v4) => { - match self.conn_count_by_ip4.entry(v4) { - Entry::Vacant(_) => { - return Err(AddressNotInTableError {}); - } - Entry::Occupied(mut o) => { - let cnt = o.get_mut(); - assert!(*cnt > 0); - if *cnt == 0 { - self.conn_count_by_ip4.remove(&v4); - } else { - *cnt -= 1; - } - } - }; - } - IpAddr::V6(v6) => { - match self.conn_count_by_ip6_prefix.entry(v6) { - Entry::Vacant(_) => { - return Err(AddressNotInTableError {}); - } - Entry::Occupied(mut o) => { - let cnt = o.get_mut(); - assert!(*cnt > 0); - if *cnt == 0 { - self.conn_count_by_ip6_prefix.remove(&v6); - } else { - *cnt -= 1; - } - } - }; - } - } - Ok(()) - } -} diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index 1d1445d5..1496aae0 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -286,6 +286,7 @@ impl ConnectionManager { local_addr, &dial_info, self.arc.connection_initial_timeout_ms, + self.network_manager().address_filter(), ) .await; match result_net_res { diff --git a/veilid-core/src/network_manager/connection_table.rs b/veilid-core/src/network_manager/connection_table.rs index b0879150..6ca9b210 100644 --- a/veilid-core/src/network_manager/connection_table.rs +++ b/veilid-core/src/network_manager/connection_table.rs @@ -29,7 +29,7 @@ pub struct ConnectionTableInner { protocol_index_by_id: BTreeMap, id_by_descriptor: BTreeMap, ids_by_remote: BTreeMap>, - address_filter: ConnectionLimits, + address_filter: AddressFilter, } #[derive(Debug)] @@ -58,7 +58,7 @@ impl ConnectionTable { protocol_index_by_id: BTreeMap::new(), id_by_descriptor: BTreeMap::new(), ids_by_remote: BTreeMap::new(), - address_filter: ConnectionLimits::new(config), + address_filter: AddressFilter::new(config), })), } } @@ -125,7 +125,7 @@ impl ConnectionTable { // Filter by ip for connection limits let ip_addr = descriptor.remote_address().to_ip_addr(); - match inner.address_filter.add(ip_addr) { + match inner.address_filter.add_connection(ip_addr) { Ok(()) => {} Err(e) => { // Return the connection in the error to be disposed of @@ -258,7 +258,7 @@ impl ConnectionTable { let ip_addr = remote.to_socket_addr().ip(); inner .address_filter - .remove(ip_addr) + .remove_connection(ip_addr) .expect("Inconsistency in connection table"); conn } diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 02187f24..3be13686 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -8,7 +8,7 @@ mod wasm; mod direct_boot; mod send_data; mod connection_handle; -mod connection_limits; +mod address_filter; mod connection_manager; mod connection_table; mod network_connection; @@ -29,7 +29,7 @@ pub use stats::*; //////////////////////////////////////////////////////////////////////////////////////// use connection_handle::*; -use connection_limits::*; +use address_filter::*; use crypto::*; use futures_util::stream::FuturesUnordered; use hashlink::LruCache; @@ -54,6 +54,7 @@ pub const PUBLIC_ADDRESS_CHECK_CACHE_SIZE: usize = 8; pub const PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS: u32 = 60; pub const PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US: TimestampDuration = TimestampDuration::new(300_000_000u64); // 5 minutes pub const PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US: TimestampDuration = TimestampDuration::new(3600_000_000u64); // 60 minutes +pub const ADDRESS_FILTER_TASK_INTERVAL_SECS: u32 = 60; pub const BOOT_MAGIC: &[u8; 4] = b"BOOT"; #[derive(Copy, Clone, Debug, Default)] @@ -136,6 +137,7 @@ struct NetworkManagerUnlockedInner { #[cfg(feature="unstable-blockstore")] block_store: BlockStore, crypto: Crypto, + address_filter: AddressFilter, // Accessors routing_table: RwLock>, components: RwLock>, @@ -143,6 +145,7 @@ struct NetworkManagerUnlockedInner { // Background processes rolling_transfers_task: TickTask, public_address_check_task: TickTask, + address_filter_task: TickTask, // Network Key network_key: Option, } @@ -174,18 +177,20 @@ impl NetworkManager { network_key: Option, ) -> NetworkManagerUnlockedInner { NetworkManagerUnlockedInner { - config, + config: config.clone(), storage_manager, protected_store, table_store, #[cfg(feature="unstable-blockstore")] block_store, crypto, + address_filter: AddressFilter::new(config), routing_table: RwLock::new(None), components: RwLock::new(None), update_callback: RwLock::new(None), rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS), public_address_check_task: TickTask::new(PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS), + address_filter_task: TickTask::new(ADDRESS_FILTER_TASK_INTERVAL_SECS), network_key, } } @@ -273,6 +278,9 @@ impl NetworkManager { pub fn crypto(&self) -> Crypto { self.unlocked_inner.crypto.clone() } + pub fn address_filter(&self) -> AddressFilter { + self.unlocked_inner.address_filter.clone() + } pub fn routing_table(&self) -> RoutingTable { self.unlocked_inner .routing_table @@ -894,10 +902,11 @@ impl NetworkManager { data.len(), connection_descriptor ); + let remote_addr = connection_descriptor.remote_address().to_ip_addr(); // Network accounting self.stats_packet_rcvd( - connection_descriptor.remote_address().to_ip_addr(), + remote_addr, ByteCount::new(data.len() as u64), ); @@ -911,6 +920,7 @@ impl NetworkManager { // Ensure we can read the magic number if data.len() < 4 { log_net!(debug "short packet"); + self.address_filter().punish(remote_addr); return Ok(false); } @@ -943,6 +953,7 @@ impl NetworkManager { Ok(v) => v, Err(e) => { log_net!(debug "envelope failed to decode: {}", e); + self.address_filter().punish(remote_addr); return Ok(false); } }; @@ -1058,7 +1069,7 @@ impl NetworkManager { Ok(v) => v, Err(e) => { log_net!(debug "failed to decrypt envelope body: {}",e); - // xxx: punish nodes that send messages that fail to decrypt eventually + self.address_filter().punish(remote_addr); return Ok(false); } }; @@ -1078,8 +1089,6 @@ impl NetworkManager { }; source_noderef.add_envelope_version(envelope.get_version()); - // xxx: deal with spoofing and flooding here? - // Pass message to RPC system rpc.enqueue_direct_message( envelope, diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index 8a3a7023..c773b944 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -370,6 +370,14 @@ impl Network { c.network.connection_initial_timeout_ms }; + if self + .network_manager() + .address_filter() + .is_punished(dial_info.address().to_ip_addr()) + { + return Ok(NetworkResult::no_connection_other("punished")); + } + match dial_info.protocol_type() { ProtocolType::UDP => { let peer_socket_addr = dial_info.to_socket_addr(); @@ -429,6 +437,14 @@ impl Network { c.network.connection_initial_timeout_ms }; + if self + .network_manager() + .address_filter() + .is_punished(dial_info.address().to_ip_addr()) + { + return Ok(NetworkResult::no_connection_other("punished")); + } + match dial_info.protocol_type() { ProtocolType::UDP => { let peer_socket_addr = dial_info.to_socket_addr(); diff --git a/veilid-core/src/network_manager/native/network_tcp.rs b/veilid-core/src/network_manager/native/network_tcp.rs index 6d717ad8..925ebb31 100644 --- a/veilid-core/src/network_manager/native/network_tcp.rs +++ b/veilid-core/src/network_manager/native/network_tcp.rs @@ -108,12 +108,29 @@ impl Network { } }; - // XXX - // warn!( - // "DEBUGACCEPT: local={} remote={}", - // tcp_stream.local_addr().unwrap(), - // tcp_stream.peer_addr().unwrap(), - // ); + // Limit the number of connections from the same IP address + // and the number of total connections + // XXX limiting here instead for connection table? may be faster and avoids tls negotiation + let peer_addr = match tcp_stream.peer_addr() { + Ok(addr) => addr, + Err(e) => { + log_net!(debug "failed to get peer address: {}", e); + return; + } + }; + let address_filter = self.network_manager().address_filter(); + // Check to see if it is punished + if address_filter.is_punished(peer_addr.ip()) { + return; + } + + let local_addr = match tcp_stream.local_addr() { + Ok(addr) => addr, + Err(e) => { + log_net!(debug "failed to get local address: {}", e); + return; + } + }; if let Err(e) = tcp_stream.set_linger(Some(core::time::Duration::from_secs(0))) { log_net!(debug "Couldn't set TCP linger: {}", e); @@ -127,24 +144,6 @@ impl Network { let listener_state = listener_state.clone(); let connection_manager = connection_manager.clone(); - // Limit the number of connections from the same IP address - // and the number of total connections - let peer_addr = match tcp_stream.peer_addr() { - Ok(addr) => addr, - Err(e) => { - log_net!(debug "failed to get peer address: {}", e); - return; - } - }; - let local_addr = match tcp_stream.local_addr() { - Ok(addr) => addr, - Err(e) => { - log_net!(debug "failed to get local address: {}", e); - return; - } - }; - // XXX limiting here instead for connection table? may be faster and avoids tls negotiation - log_net!("TCP connection from: {}", peer_addr); // Create a stream we can peek on diff --git a/veilid-core/src/network_manager/native/network_udp.rs b/veilid-core/src/network_manager/native/network_udp.rs index 002ccf06..4ea508d9 100644 --- a/veilid-core/src/network_manager/native/network_udp.rs +++ b/veilid-core/src/network_manager/native/network_udp.rs @@ -66,8 +66,6 @@ impl Network { .await { Ok(Ok((size, descriptor))) => { - // XXX: Limit the number of packets from the same IP address? - // Network accounting network_manager.stats_packet_rcvd( descriptor.remote_address().to_ip_addr(), @@ -143,7 +141,10 @@ impl Network { let socket_arc = Arc::new(udp_socket); // Create protocol handler - let udpv4_handler = RawUdpProtocolHandler::new(socket_arc); + let udpv4_handler = RawUdpProtocolHandler::new( + socket_arc, + Some(self.network_manager().address_filter()), + ); inner.outbound_udpv4_protocol_handler = Some(udpv4_handler); } @@ -164,7 +165,10 @@ impl Network { let socket_arc = Arc::new(udp_socket); // Create protocol handler - let udpv6_handler = RawUdpProtocolHandler::new(socket_arc); + let udpv6_handler = RawUdpProtocolHandler::new( + socket_arc, + Some(self.network_manager().address_filter()), + ); inner.outbound_udpv6_protocol_handler = Some(udpv6_handler); } @@ -191,7 +195,8 @@ impl Network { let socket_arc = Arc::new(udp_socket); // Create protocol handler - let protocol_handler = RawUdpProtocolHandler::new(socket_arc); + let protocol_handler = + RawUdpProtocolHandler::new(socket_arc, Some(self.network_manager().address_filter())); // Create message_handler records self.inner diff --git a/veilid-core/src/network_manager/native/protocol/mod.rs b/veilid-core/src/network_manager/native/protocol/mod.rs index 0a41a77b..7af29c46 100644 --- a/veilid-core/src/network_manager/native/protocol/mod.rs +++ b/veilid-core/src/network_manager/native/protocol/mod.rs @@ -22,7 +22,11 @@ impl ProtocolNetworkConnection { local_address: Option, dial_info: &DialInfo, timeout_ms: u32, + address_filter: AddressFilter, ) -> io::Result> { + if address_filter.is_punished(dial_info.address().to_ip_addr()) { + return Ok(NetworkResult::no_connection_other("punished")); + } match dial_info.protocol_type() { ProtocolType::UDP => { panic!("Should not connect to UDP dialinfo"); diff --git a/veilid-core/src/network_manager/native/protocol/udp.rs b/veilid-core/src/network_manager/native/protocol/udp.rs index a8d33b5a..feab4092 100644 --- a/veilid-core/src/network_manager/native/protocol/udp.rs +++ b/veilid-core/src/network_manager/native/protocol/udp.rs @@ -5,13 +5,15 @@ use sockets::*; pub struct RawUdpProtocolHandler { socket: Arc, assembly_buffer: AssemblyBuffer, + address_filter: Option, } impl RawUdpProtocolHandler { - pub fn new(socket: Arc) -> Self { + pub fn new(socket: Arc, address_filter: Option) -> Self { Self { socket, assembly_buffer: AssemblyBuffer::new(), + address_filter, } } @@ -21,6 +23,13 @@ impl RawUdpProtocolHandler { // Get a packet let (size, remote_addr) = network_result_value_or_log!(self.socket.recv_from(data).await.into_network_result()? => continue); + // Check to see if it is punished + if let Some(af) = self.address_filter.as_ref() { + if af.is_punished(remote_addr.ip()) { + continue; + } + } + // Insert into assembly buffer let Some(message) = self.assembly_buffer.insert_frame(&data[0..size], remote_addr) else { continue; @@ -66,6 +75,13 @@ impl RawUdpProtocolHandler { bail_io_error_other!("sending too large UDP message"); } + // Check to see if it is punished + if let Some(af) = self.address_filter.as_ref() { + if af.is_punished(remote_addr.ip()) { + return Ok(NetworkResult::no_connection_other("punished")); + } + } + // Fragment and send let sender = |framed_chunk: Vec, remote_addr: SocketAddr| async move { let len = network_result_try!(self @@ -111,6 +127,6 @@ impl RawUdpProtocolHandler { // get local wildcard address for bind let local_socket_addr = compatible_unspecified_socket_addr(&socket_addr); let socket = UdpSocket::bind(local_socket_addr).await?; - Ok(RawUdpProtocolHandler::new(Arc::new(socket))) + Ok(RawUdpProtocolHandler::new(Arc::new(socket), None)) } } diff --git a/veilid-core/src/network_manager/native/start_protocols.rs b/veilid-core/src/network_manager/native/start_protocols.rs index 5d4ce372..25286249 100644 --- a/veilid-core/src/network_manager/native/start_protocols.rs +++ b/veilid-core/src/network_manager/native/start_protocols.rs @@ -609,7 +609,7 @@ impl Network { ip_addrs, tcp_port, false, - Box::new(move |c, _| Box::new(RawTcpProtocolHandler::new(c))), + Box::new(|c, _| Box::new(RawTcpProtocolHandler::new(c))), ) .await?; trace!("TCP: listener started on {:#?}", socket_addresses); diff --git a/veilid-core/src/network_manager/tasks/mod.rs b/veilid-core/src/network_manager/tasks/mod.rs index 35e3e99c..06bec578 100644 --- a/veilid-core/src/network_manager/tasks/mod.rs +++ b/veilid-core/src/network_manager/tasks/mod.rs @@ -42,6 +42,20 @@ impl NetworkManager { ) }); } + + // Set address filter task + { + let this = self.clone(); + self.unlocked_inner + .address_filter_task + .set_routine(move |s, l, t| { + Box::pin( + this.address_filter() + .address_filter_task_routine(s, Timestamp::new(l), Timestamp::new(t)) + .instrument(trace_span!(parent: None, "address filter task routine")), + ) + }); + } } pub async fn tick(&self) -> EyreResult<()> { diff --git a/veilid-core/src/network_manager/wasm/mod.rs b/veilid-core/src/network_manager/wasm/mod.rs index 4b87a071..77111b3a 100644 --- a/veilid-core/src/network_manager/wasm/mod.rs +++ b/veilid-core/src/network_manager/wasm/mod.rs @@ -91,6 +91,14 @@ impl Network { c.network.connection_initial_timeout_ms }; + if self + .network_manager() + .address_filter() + .is_punished(dial_info.address().to_ip_addr()) + { + return Ok(NetworkResult::no_connection_other("punished")); + } + match dial_info.protocol_type() { ProtocolType::UDP => { bail!("no support for UDP protocol") @@ -132,6 +140,14 @@ impl Network { c.network.connection_initial_timeout_ms }; + if self + .network_manager() + .address_filter() + .is_punished(dial_info.address().to_ip_addr()) + { + return Ok(NetworkResult::no_connection_other("punished")); + } + match dial_info.protocol_type() { ProtocolType::UDP => { bail!("no support for UDP protocol") diff --git a/veilid-core/src/network_manager/wasm/protocol/mod.rs b/veilid-core/src/network_manager/wasm/protocol/mod.rs index bc4966ca..e6314dd7 100644 --- a/veilid-core/src/network_manager/wasm/protocol/mod.rs +++ b/veilid-core/src/network_manager/wasm/protocol/mod.rs @@ -17,7 +17,11 @@ impl ProtocolNetworkConnection { _local_address: Option, dial_info: &DialInfo, timeout_ms: u32, + address_filter: AddressFiltter, ) -> io::Result> { + if address_filter.is_punished(dial_info.address().to_ip_addr()) { + return Ok(NetworkResult::no_connection_other("punished")); + } match dial_info.protocol_type() { ProtocolType::UDP => { panic!("UDP dial info is not supported on WASM targets"); diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index d484b51b..8f419f80 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -74,6 +74,7 @@ impl RPCProcessor { vcrypto: vcrypto.clone(), }); + #[cfg(feature="debug-dht")] log_rpc!(debug "{}", debug_string); let waitable_reply = network_result_try!( @@ -210,29 +211,32 @@ impl RPCProcessor { .await .map_err(RPCError::internal)?); - let debug_string_value = subkey_result.value.as_ref().map(|v| { - format!(" len={} seq={} writer={}", - v.value_data().data().len(), - v.value_data().seq(), - v.value_data().writer(), - ) - }).unwrap_or_default(); + #[cfg(feature="debug-dht")] + { + let debug_string_value = subkey_result.value.as_ref().map(|v| { + format!(" len={} seq={} writer={}", + v.value_data().data().len(), + v.value_data().seq(), + v.value_data().writer(), + ) + }).unwrap_or_default(); - let debug_string_answer = format!( - "IN ===> GetValueA({} #{}{}{} peers={}) ==> {}", - key, - subkey, - debug_string_value, - if subkey_result.descriptor.is_some() { - " +desc" - } else { - "" - }, - closer_to_key_peers.len(), - msg.header.direct_sender_node_id() - ); - - log_rpc!(debug "{}", debug_string_answer); + let debug_string_answer = format!( + "IN ===> GetValueA({} #{}{}{} peers={}) ==> {}", + key, + subkey, + debug_string_value, + if subkey_result.descriptor.is_some() { + " +desc" + } else { + "" + }, + closer_to_key_peers.len(), + msg.header.direct_sender_node_id() + ); + + log_rpc!(debug "{}", debug_string_answer); + } // Make GetValue answer let get_value_a = RPCOperationGetValueA::new( diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs index e44b3569..05bc3a23 100644 --- a/veilid-core/src/rpc_processor/rpc_set_value.rs +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -88,6 +88,7 @@ impl RPCProcessor { vcrypto: vcrypto.clone(), }); + #[cfg(feature="debug-dht")] log_rpc!(debug "{}", debug_string); let waitable_reply = network_result_try!( @@ -239,29 +240,32 @@ impl RPCProcessor { (true, new_value) }; - let debug_string_value = new_value.as_ref().map(|v| { - format!(" len={} seq={} writer={}", - v.value_data().data().len(), - v.value_data().seq(), - v.value_data().writer(), - ) - }).unwrap_or_default(); + #[cfg(feature="debug-dht")] + { + let debug_string_value = new_value.as_ref().map(|v| { + format!(" len={} seq={} writer={}", + v.value_data().data().len(), + v.value_data().seq(), + v.value_data().writer(), + ) + }).unwrap_or_default(); - let debug_string_answer = format!( - "IN ===> SetValueA({} #{}{}{} peers={}) ==> {}", - key, - subkey, - if set { - " +set" - } else { - "" - }, - debug_string_value, - closer_to_key_peers.len(), - msg.header.direct_sender_node_id() - ); - - log_rpc!(debug "{}", debug_string_answer); + let debug_string_answer = format!( + "IN ===> SetValueA({} #{}{}{} peers={}) ==> {}", + key, + subkey, + if set { + " +set" + } else { + "" + }, + debug_string_value, + closer_to_key_peers.len(), + msg.header.direct_sender_node_id() + ); + + log_rpc!(debug "{}", debug_string_answer); + } // Make SetValue answer let set_value_a = RPCOperationSetValueA::new(set, new_value, closer_to_key_peers)?;