From 3264b568d0258bc3ba8cc12b970cc26f62e5fc5c Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sat, 15 Jul 2023 19:32:53 -0400 Subject: [PATCH] punish by node id --- .../src/network_manager/address_filter.rs | 92 +++++++++++++++++-- veilid-core/src/network_manager/mod.rs | 23 +++-- veilid-core/src/network_manager/native/mod.rs | 4 +- .../src/network_manager/native/network_tcp.rs | 2 +- .../network_manager/native/protocol/mod.rs | 2 +- .../network_manager/native/protocol/udp.rs | 4 +- .../src/network_manager/network_connection.rs | 2 +- veilid-core/src/network_manager/send_data.rs | 5 + .../tests/test_connection_table.rs | 3 +- veilid-core/src/network_manager/wasm/mod.rs | 4 +- .../src/network_manager/wasm/protocol/mod.rs | 2 +- veilid-core/src/routing_table/bucket_entry.rs | 17 ++++ veilid-core/src/routing_table/tests/mod.rs | 28 ++++++ .../tests/test_serialize_routing_table.rs | 33 +------ veilid-core/src/rpc_processor/mod.rs | 45 +++++++-- 15 files changed, 202 insertions(+), 64 deletions(-) diff --git a/veilid-core/src/network_manager/address_filter.rs b/veilid-core/src/network_manager/address_filter.rs index 326a41b0..5bb1ec4b 100644 --- a/veilid-core/src/network_manager/address_filter.rs +++ b/veilid-core/src/network_manager/address_filter.rs @@ -3,6 +3,7 @@ use alloc::collections::btree_map::Entry; // XXX: Move to config eventually? const PUNISHMENT_DURATION_MIN: usize = 60; +const MAX_PUNISHMENTS_BY_NODE_ID: usize = 65536; #[derive(ThisError, Debug, Clone, Copy, PartialEq, Eq)] pub enum AddressFilterError { @@ -26,15 +27,37 @@ struct AddressFilterInner { conn_timestamps_by_ip6_prefix: BTreeMap>, punishments_by_ip4: BTreeMap, punishments_by_ip6_prefix: BTreeMap, + punishments_by_node_id: BTreeMap, } -#[derive(Debug)] struct AddressFilterUnlockedInner { max_connections_per_ip4: usize, max_connections_per_ip6_prefix: usize, max_connections_per_ip6_prefix_size: usize, max_connection_frequency_per_min: usize, punishment_duration_min: usize, + routing_table: RoutingTable, +} + +impl fmt::Debug for AddressFilterUnlockedInner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("AddressFilterUnlockedInner") + .field("max_connections_per_ip4", &self.max_connections_per_ip4) + .field( + "max_connections_per_ip6_prefix", + &self.max_connections_per_ip6_prefix, + ) + .field( + "max_connections_per_ip6_prefix_size", + &self.max_connections_per_ip6_prefix_size, + ) + .field( + "max_connection_frequency_per_min", + &self.max_connection_frequency_per_min, + ) + .field("punishment_duration_min", &self.punishment_duration_min) + .finish() + } } #[derive(Clone, Debug)] @@ -44,7 +67,7 @@ pub struct AddressFilter { } impl AddressFilter { - pub fn new(config: VeilidConfig) -> Self { + pub fn new(config: VeilidConfig, routing_table: RoutingTable) -> Self { let c = config.get(); Self { unlocked_inner: Arc::new(AddressFilterUnlockedInner { @@ -55,6 +78,7 @@ impl AddressFilter { max_connection_frequency_per_min: c.network.max_connection_frequency_per_min as usize, punishment_duration_min: PUNISHMENT_DURATION_MIN, + routing_table, }), inner: Arc::new(Mutex::new(AddressFilterInner { conn_count_by_ip4: BTreeMap::new(), @@ -63,6 +87,7 @@ impl AddressFilter { conn_timestamps_by_ip6_prefix: BTreeMap::new(), punishments_by_ip4: BTreeMap::new(), punishments_by_ip6_prefix: BTreeMap::new(), + punishments_by_node_id: BTreeMap::new(), })), } } @@ -135,9 +160,29 @@ impl AddressFilter { inner.punishments_by_ip6_prefix.remove(&key); } } + // node id + { + let mut dead_keys = Vec::::new(); + for (key, value) in &mut inner.punishments_by_node_id { + // Drop punishments older than the punishment duration + if cur_ts.as_u64().saturating_sub(value.as_u64()) + > self.unlocked_inner.punishment_duration_min as u64 * 60_000_000u64 + { + dead_keys.push(*key); + } + } + for key in dead_keys { + log_net!(debug ">>> FORGIVING: {}", key); + inner.punishments_by_node_id.remove(&key); + // make the entry alive again if it's still here + if let Ok(Some(nr)) = self.unlocked_inner.routing_table.lookup_node_ref(key) { + nr.operate_mut(|_rti, e| e.set_punished(false)); + } + } + } } - fn is_punished_inner(&self, inner: &AddressFilterInner, ipblock: IpAddr) -> bool { + fn is_ip_addr_punished_inner(&self, inner: &AddressFilterInner, ipblock: IpAddr) -> bool { match ipblock { IpAddr::V4(v4) => { if inner.punishments_by_ip4.contains_key(&v4) { @@ -153,16 +198,16 @@ impl AddressFilter { false } - pub fn is_punished(&self, addr: IpAddr) -> bool { + pub fn is_ip_addr_punished(&self, addr: IpAddr) -> bool { let inner = self.inner.lock(); let ipblock = ip_to_ipblock( self.unlocked_inner.max_connections_per_ip6_prefix_size, addr, ); - self.is_punished_inner(&*inner, ipblock) + self.is_ip_addr_punished_inner(&*inner, ipblock) } - pub fn punish(&self, addr: IpAddr) { + pub fn punish_ip_addr(&self, addr: IpAddr) { log_net!(debug ">>> PUNISHED: {}", addr); let ts = get_aligned_timestamp(); @@ -186,6 +231,39 @@ impl AddressFilter { }; } + fn is_node_id_punished_inner(&self, inner: &AddressFilterInner, node_id: TypedKey) -> bool { + if inner.punishments_by_node_id.contains_key(&node_id) { + return true; + } + false + } + + pub fn is_node_id_punished(&self, node_id: TypedKey) -> bool { + let inner = self.inner.lock(); + self.is_node_id_punished_inner(&*inner, node_id) + } + + pub fn punish_node_id(&self, node_id: TypedKey) { + if let Ok(Some(nr)) = self.unlocked_inner.routing_table.lookup_node_ref(node_id) { + // make the entry dead if it's punished + nr.operate_mut(|_rti, e| e.set_punished(true)); + } + + let ts = get_aligned_timestamp(); + + let mut inner = self.inner.lock(); + if inner.punishments_by_node_id.len() >= MAX_PUNISHMENTS_BY_NODE_ID { + log_net!(debug ">>> PUNISHMENT TABLE FULL: {}", node_id); + return; + } + log_net!(debug ">>> PUNISHED: {}", node_id); + inner + .punishments_by_node_id + .entry(node_id) + .and_modify(|v| *v = ts) + .or_insert(ts); + } + pub async fn address_filter_task_routine( self, _stop_token: StopToken, @@ -207,7 +285,7 @@ impl AddressFilter { self.unlocked_inner.max_connections_per_ip6_prefix_size, addr, ); - if self.is_punished_inner(inner, ipblock) { + if self.is_ip_addr_punished_inner(inner, ipblock) { return Err(AddressFilterError::Punished); } diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index c74bb4f5..807ec42d 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -143,9 +143,9 @@ struct NetworkManagerUnlockedInner { #[cfg(feature = "unstable-blockstore")] block_store: BlockStore, crypto: Crypto, - address_filter: AddressFilter, // Accessors routing_table: RwLock>, + address_filter: RwLock>, components: RwLock>, update_callback: RwLock>, // Background processes @@ -189,7 +189,7 @@ impl NetworkManager { #[cfg(feature = "unstable-blockstore")] block_store, crypto, - address_filter: AddressFilter::new(config), + address_filter: RwLock::new(None), routing_table: RwLock::new(None), components: RwLock::new(None), update_callback: RwLock::new(None), @@ -292,7 +292,12 @@ impl NetworkManager { self.unlocked_inner.crypto.clone() } pub fn address_filter(&self) -> AddressFilter { - self.unlocked_inner.address_filter.clone() + self.unlocked_inner + .address_filter + .read() + .as_ref() + .unwrap() + .clone() } pub fn routing_table(&self) -> RoutingTable { self.unlocked_inner @@ -351,7 +356,9 @@ impl NetworkManager { pub async fn init(&self, update_callback: UpdateCallback) -> EyreResult<()> { let routing_table = RoutingTable::new(self.clone()); routing_table.init().await?; + let address_filter = AddressFilter::new(self.config(), routing_table.clone()); *self.unlocked_inner.routing_table.write() = Some(routing_table.clone()); + *self.unlocked_inner.address_filter.write() = Some(address_filter); *self.unlocked_inner.update_callback.write() = Some(update_callback); Ok(()) } @@ -904,7 +911,7 @@ impl NetworkManager { // Ensure we can read the magic number if data.len() < 4 { log_net!(debug "short packet"); - self.address_filter().punish(remote_addr); + self.address_filter().punish_ip_addr(remote_addr); return Ok(false); } @@ -939,7 +946,7 @@ impl NetworkManager { Ok(v) => v, Err(e) => { log_net!(debug "envelope failed to decode: {}", e); - self.address_filter().punish(remote_addr); + self.address_filter().punish_ip_addr(remote_addr); return Ok(false); } }; @@ -991,6 +998,10 @@ impl NetworkManager { // Peek at header and see if we need to relay this // If the recipient id is not our node id, then it needs relaying let sender_id = TypedKey::new(envelope.get_crypto_kind(), envelope.get_sender_id()); + if self.address_filter().is_node_id_punished(sender_id) { + return Ok(false); + } + let recipient_id = TypedKey::new(envelope.get_crypto_kind(), envelope.get_recipient_id()); if !routing_table.matches_own_node_id(&[recipient_id]) { // See if the source node is allowed to resolve nodes @@ -1070,7 +1081,7 @@ impl NetworkManager { Ok(v) => v, Err(e) => { log_net!(debug "failed to decrypt envelope body: {}",e); - self.address_filter().punish(remote_addr); + self.address_filter().punish_ip_addr(remote_addr); return Ok(false); } }; diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index fe0408c2..b7a55dad 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -410,7 +410,7 @@ impl Network { if self .network_manager() .address_filter() - .is_punished(dial_info.address().to_ip_addr()) + .is_ip_addr_punished(dial_info.address().to_ip_addr()) { return Ok(NetworkResult::no_connection_other("punished")); } @@ -477,7 +477,7 @@ impl Network { if self .network_manager() .address_filter() - .is_punished(dial_info.address().to_ip_addr()) + .is_ip_addr_punished(dial_info.address().to_ip_addr()) { return Ok(NetworkResult::no_connection_other("punished")); } diff --git a/veilid-core/src/network_manager/native/network_tcp.rs b/veilid-core/src/network_manager/native/network_tcp.rs index 3c9434bf..d79fa414 100644 --- a/veilid-core/src/network_manager/native/network_tcp.rs +++ b/veilid-core/src/network_manager/native/network_tcp.rs @@ -120,7 +120,7 @@ impl Network { }; // Check to see if it is punished let address_filter = self.network_manager().address_filter(); - if address_filter.is_punished(peer_addr.ip()) { + if address_filter.is_ip_addr_punished(peer_addr.ip()) { return; } diff --git a/veilid-core/src/network_manager/native/protocol/mod.rs b/veilid-core/src/network_manager/native/protocol/mod.rs index 7af29c46..6cbd79e9 100644 --- a/veilid-core/src/network_manager/native/protocol/mod.rs +++ b/veilid-core/src/network_manager/native/protocol/mod.rs @@ -24,7 +24,7 @@ impl ProtocolNetworkConnection { timeout_ms: u32, address_filter: AddressFilter, ) -> io::Result> { - if address_filter.is_punished(dial_info.address().to_ip_addr()) { + if address_filter.is_ip_addr_punished(dial_info.address().to_ip_addr()) { return Ok(NetworkResult::no_connection_other("punished")); } match dial_info.protocol_type() { diff --git a/veilid-core/src/network_manager/native/protocol/udp.rs b/veilid-core/src/network_manager/native/protocol/udp.rs index da129a75..f0519235 100644 --- a/veilid-core/src/network_manager/native/protocol/udp.rs +++ b/veilid-core/src/network_manager/native/protocol/udp.rs @@ -25,7 +25,7 @@ impl RawUdpProtocolHandler { // Check to see if it is punished if let Some(af) = self.address_filter.as_ref() { - if af.is_punished(remote_addr.ip()) { + if af.is_ip_addr_punished(remote_addr.ip()) { continue; } } @@ -97,7 +97,7 @@ impl RawUdpProtocolHandler { // Check to see if it is punished if let Some(af) = self.address_filter.as_ref() { - if af.is_punished(remote_addr.ip()) { + if af.is_ip_addr_punished(remote_addr.ip()) { return Ok(NetworkResult::no_connection_other("punished")); } } diff --git a/veilid-core/src/network_manager/network_connection.rs b/veilid-core/src/network_manager/network_connection.rs index f657037b..9d7b7eb6 100644 --- a/veilid-core/src/network_manager/network_connection.rs +++ b/veilid-core/src/network_manager/network_connection.rs @@ -305,7 +305,7 @@ impl NetworkConnection { let peer_address = protocol_connection.descriptor().remote(); // Check to see if it is punished - if address_filter.is_punished(peer_address.to_socket_addr().ip()) { + if address_filter.is_ip_addr_punished(peer_address.to_socket_addr().ip()) { return RecvLoopAction::Finish; } diff --git a/veilid-core/src/network_manager/send_data.rs b/veilid-core/src/network_manager/send_data.rs index f1a7345b..fea17fe8 100644 --- a/veilid-core/src/network_manager/send_data.rs +++ b/veilid-core/src/network_manager/send_data.rs @@ -307,6 +307,11 @@ impl NetworkManager { ) -> EyreResult { let routing_table = self.routing_table(); + // If a node is punished, then don't try to contact it + if target_node_ref.node_ids().iter().find(|nid| self.address_filter().is_node_id_punished(**nid)).is_some() { + return Ok(NodeContactMethod::Unreachable); + } + // Figure out the best routing domain to get the contact method over let routing_domain = match target_node_ref.best_routing_domain() { Some(rd) => rd, diff --git a/veilid-core/src/network_manager/tests/test_connection_table.rs b/veilid-core/src/network_manager/tests/test_connection_table.rs index 5a52cd46..c683b6d7 100644 --- a/veilid-core/src/network_manager/tests/test_connection_table.rs +++ b/veilid-core/src/network_manager/tests/test_connection_table.rs @@ -2,10 +2,11 @@ use super::*; use super::connection_table::*; use crate::tests::common::test_veilid_config::*; +use crate::tests::mock_routing_table; pub async fn test_add_get_remove() { let config = get_config(); - let address_filter = AddressFilter::new(config.clone()); + let address_filter = AddressFilter::new(config.clone(), mock_routing_table()); let table = ConnectionTable::new(config, address_filter); let a1 = ConnectionDescriptor::new_no_local(PeerAddress::new( diff --git a/veilid-core/src/network_manager/wasm/mod.rs b/veilid-core/src/network_manager/wasm/mod.rs index 86d56584..5c529df7 100644 --- a/veilid-core/src/network_manager/wasm/mod.rs +++ b/veilid-core/src/network_manager/wasm/mod.rs @@ -133,7 +133,7 @@ impl Network { if self .network_manager() .address_filter() - .is_punished(dial_info.address().to_ip_addr()) + .is_ip_addr_punished(dial_info.address().to_ip_addr()) { return Ok(NetworkResult::no_connection_other("punished")); } @@ -182,7 +182,7 @@ impl Network { if self .network_manager() .address_filter() - .is_punished(dial_info.address().to_ip_addr()) + .is_ip_addr_punished(dial_info.address().to_ip_addr()) { return Ok(NetworkResult::no_connection_other("punished")); } diff --git a/veilid-core/src/network_manager/wasm/protocol/mod.rs b/veilid-core/src/network_manager/wasm/protocol/mod.rs index 4b08e7b1..846cff18 100644 --- a/veilid-core/src/network_manager/wasm/protocol/mod.rs +++ b/veilid-core/src/network_manager/wasm/protocol/mod.rs @@ -19,7 +19,7 @@ impl ProtocolNetworkConnection { timeout_ms: u32, address_filter: AddressFilter, ) -> io::Result> { - if address_filter.is_punished(dial_info.address().to_ip_addr()) { + if address_filter.is_ip_addr_punished(dial_info.address().to_ip_addr()) { return Ok(NetworkResult::no_connection_other("punished")); } match dial_info.protocol_type() { diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index b2acbf4e..f1265bfe 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -90,6 +90,9 @@ pub struct BucketEntryInner { /// The accounting for the transfer statistics #[serde(skip)] transfer_stats_accounting: TransferStatsAccounting, + /// If the entry is being punished and should be considered dead + #[serde(skip)] + is_punished: bool, /// Tracking identifier for NodeRef debugging #[cfg(feature = "tracking")] #[serde(skip)] @@ -403,6 +406,10 @@ impl BucketEntryInner { // Stores a connection descriptor in this entry's table of last connections pub fn set_last_connection(&mut self, last_connection: ConnectionDescriptor, timestamp: Timestamp) { + if self.is_punished { + // Don't record connection if this entry is currently punished + return; + } let key = self.descriptor_to_key(last_connection); self.last_connections .insert(key, (last_connection, timestamp)); @@ -531,6 +538,9 @@ impl BucketEntryInner { } pub fn state(&self, cur_ts: Timestamp) -> BucketEntryState { + if self.is_punished { + return BucketEntryState::Dead; + } if self.check_reliable(cur_ts) { BucketEntryState::Reliable } else if self.check_dead(cur_ts) { @@ -539,6 +549,12 @@ impl BucketEntryInner { BucketEntryState::Unreliable } } + pub fn set_punished(&mut self, punished: bool) { + self.is_punished = punished; + if punished { + self.clear_last_connections(); + } + } pub fn peer_stats(&self) -> &PeerStats { &self.peer_stats @@ -845,6 +861,7 @@ impl BucketEntry { }, latency_stats_accounting: LatencyStatsAccounting::new(), transfer_stats_accounting: TransferStatsAccounting::new(), + is_punished: false, #[cfg(feature = "tracking")] next_track_id: 0, #[cfg(feature = "tracking")] diff --git a/veilid-core/src/routing_table/tests/mod.rs b/veilid-core/src/routing_table/tests/mod.rs index 44ed3a79..45780128 100644 --- a/veilid-core/src/routing_table/tests/mod.rs +++ b/veilid-core/src/routing_table/tests/mod.rs @@ -1 +1,29 @@ +use super::*; + pub mod test_serialize_routing_table; + +pub(crate) fn mock_routing_table() -> routing_table::RoutingTable { + let veilid_config = VeilidConfig::new(); + #[cfg(feature = "unstable-blockstore")] + let block_store = BlockStore::new(veilid_config.clone()); + let protected_store = ProtectedStore::new(veilid_config.clone()); + let table_store = TableStore::new(veilid_config.clone(), protected_store.clone()); + let crypto = Crypto::new(veilid_config.clone(), table_store.clone()); + let storage_manager = storage_manager::StorageManager::new( + veilid_config.clone(), + crypto.clone(), + table_store.clone(), + #[cfg(feature = "unstable-blockstore")] + block_store.clone(), + ); + let network_manager = network_manager::NetworkManager::new( + veilid_config.clone(), + storage_manager, + protected_store.clone(), + table_store.clone(), + #[cfg(feature = "unstable-blockstore")] + block_store.clone(), + crypto.clone(), + ); + RoutingTable::new(network_manager) +} diff --git a/veilid-core/src/routing_table/tests/test_serialize_routing_table.rs b/veilid-core/src/routing_table/tests/test_serialize_routing_table.rs index 265b99aa..bbf2ac30 100644 --- a/veilid-core/src/routing_table/tests/test_serialize_routing_table.rs +++ b/veilid-core/src/routing_table/tests/test_serialize_routing_table.rs @@ -1,35 +1,8 @@ -use crate::*; -use routing_table::*; - -fn fake_routing_table() -> routing_table::RoutingTable { - let veilid_config = VeilidConfig::new(); - #[cfg(feature = "unstable-blockstore")] - let block_store = BlockStore::new(veilid_config.clone()); - let protected_store = ProtectedStore::new(veilid_config.clone()); - let table_store = TableStore::new(veilid_config.clone(), protected_store.clone()); - let crypto = Crypto::new(veilid_config.clone(), table_store.clone()); - let storage_manager = storage_manager::StorageManager::new( - veilid_config.clone(), - crypto.clone(), - table_store.clone(), - #[cfg(feature = "unstable-blockstore")] - block_store.clone(), - ); - let network_manager = network_manager::NetworkManager::new( - veilid_config.clone(), - storage_manager, - protected_store.clone(), - table_store.clone(), - #[cfg(feature = "unstable-blockstore")] - block_store.clone(), - crypto.clone(), - ); - RoutingTable::new(network_manager) -} +use super::*; pub async fn test_routingtable_buckets_round_trip() { - let original = fake_routing_table(); - let copy = fake_routing_table(); + let original = mock_routing_table(); + let copy = mock_routing_table(); original.init().await.unwrap(); copy.init().await.unwrap(); diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 96de25d7..ec496eb9 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -1454,28 +1454,37 @@ impl RPCProcessor { &self, encoded_msg: RPCMessageEncoded, ) -> Result, RPCError> { + let address_filter = self.network_manager.address_filter(); + // Decode operation appropriately based on header detail let msg = match &encoded_msg.header.detail { RPCMessageHeaderDetail::Direct(detail) => { + // Get sender node id + let sender_node_id = TypedKey::new( + detail.envelope.get_crypto_kind(), + detail.envelope.get_sender_id(), + ); + // Decode and validate the RPC operation let operation = match self.decode_rpc_operation(&encoded_msg) { Ok(v) => v, - Err(e) => return Ok(NetworkResult::invalid_message(e)), + Err(e) => { + // Punish nodes that send direct undecodable crap + address_filter.punish_node_id(sender_node_id); + return Ok(NetworkResult::invalid_message(e)); + } }; // Get the routing domain this message came over let routing_domain = detail.routing_domain; // Get the sender noderef, incorporating sender's peer info - let sender_node_id = TypedKey::new( - detail.envelope.get_crypto_kind(), - detail.envelope.get_sender_id(), - ); let mut opt_sender_nr: Option = None; if let Some(sender_peer_info) = operation.sender_peer_info() { // Ensure the sender peer info is for the actual sender specified in the envelope if !sender_peer_info.node_ids().contains(&sender_node_id) { // Attempted to update peer info for the wrong node id + address_filter.punish_node_id(sender_node_id); return Ok(NetworkResult::invalid_message( "attempt to update peer info for non-sender node id", )); @@ -1487,6 +1496,7 @@ impl RPCProcessor { sender_peer_info.signed_node_info(), &[], ) { + address_filter.punish_node_id(sender_node_id); return Ok(NetworkResult::invalid_message( "sender peerinfo has invalid peer scope", )); @@ -1497,7 +1507,10 @@ impl RPCProcessor { false, ) { Ok(v) => Some(v), - Err(e) => return Ok(NetworkResult::invalid_message(e)), + Err(e) => { + address_filter.punish_node_id(sender_node_id); + return Ok(NetworkResult::invalid_message(e)); + } } } @@ -1505,7 +1518,10 @@ impl RPCProcessor { if opt_sender_nr.is_none() { opt_sender_nr = match self.routing_table().lookup_node_ref(sender_node_id) { Ok(v) => v, - Err(e) => return Ok(NetworkResult::invalid_message(e)), + Err(e) => { + address_filter.punish_node_id(sender_node_id); + return Ok(NetworkResult::invalid_message(e)); + } } } @@ -1525,7 +1541,14 @@ impl RPCProcessor { } RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => { // Decode and validate the RPC operation - let operation = self.decode_rpc_operation(&encoded_msg)?; + let operation = match self.decode_rpc_operation(&encoded_msg) { + Ok(v) => v, + Err(e) => { + // Punish routes that send routed undecodable crap + // address_filter.punish_route_id(xxx); + return Ok(NetworkResult::invalid_message(e)); + } + }; // Make the RPC message RPCMessage { @@ -1612,7 +1635,7 @@ impl RPCProcessor { let rpc_worker_span = span!(parent: None, Level::TRACE, "rpc_worker recv"); // xxx: causes crash (Missing otel data span extensions) // rpc_worker_span.follows_from(span_id); - + network_result_value_or_log!(match self .process_rpc_message(msg) .instrument(rpc_worker_span) @@ -1623,7 +1646,9 @@ impl RPCProcessor { continue; } - Ok(v) => v, + Ok(v) => { + v + } } => [ format!(": msg.header={:?}", msg.header) ] {}); } }