Merge branch 'connection-table-fixes' into 'main'
Connection table fixes See merge request veilid/veilid!189
This commit is contained in:
		@@ -139,6 +139,33 @@ impl ConnectionManager {
 | 
			
		||||
        debug!("finished connection manager shutdown");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Internal routine to see if we should keep this connection
 | 
			
		||||
    // from being LRU removed. Used on our initiated relay connections.
 | 
			
		||||
    fn should_protect_connection(&self, conn: &NetworkConnection) -> bool {
 | 
			
		||||
        let netman = self.network_manager();
 | 
			
		||||
        let routing_table = netman.routing_table();
 | 
			
		||||
        let remote_address = conn.connection_descriptor().remote_address().address();
 | 
			
		||||
        let Some(routing_domain) = routing_table.routing_domain_for_address(remote_address) else {
 | 
			
		||||
            return false;
 | 
			
		||||
        };
 | 
			
		||||
        let Some(rn) = routing_table.relay_node(routing_domain) else {
 | 
			
		||||
            return false;
 | 
			
		||||
        };
 | 
			
		||||
        let relay_nr = rn.filtered_clone(
 | 
			
		||||
            NodeRefFilter::new()
 | 
			
		||||
                .with_routing_domain(routing_domain)
 | 
			
		||||
                .with_address_type(conn.connection_descriptor().address_type())
 | 
			
		||||
                .with_protocol_type(conn.connection_descriptor().protocol_type()),
 | 
			
		||||
        );
 | 
			
		||||
        let dids = relay_nr.all_filtered_dial_info_details();
 | 
			
		||||
        for did in dids {
 | 
			
		||||
            if did.dial_info.address() == remote_address {
 | 
			
		||||
                return true;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        false
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Internal routine to register new connection atomically.
 | 
			
		||||
    // Registers connection in the connection table for later access
 | 
			
		||||
    // and spawns a message processing loop for the connection
 | 
			
		||||
@@ -163,8 +190,16 @@ impl ConnectionManager {
 | 
			
		||||
            None => bail!("not creating connection because we are stopping"),
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        let conn = NetworkConnection::from_protocol(self.clone(), stop_token, prot_conn, id);
 | 
			
		||||
        let mut conn = NetworkConnection::from_protocol(self.clone(), stop_token, prot_conn, id);
 | 
			
		||||
        let handle = conn.get_handle();
 | 
			
		||||
 | 
			
		||||
        // See if this should be a protected connection
 | 
			
		||||
        let protect = self.should_protect_connection(&conn);
 | 
			
		||||
        if protect {
 | 
			
		||||
            log_net!(debug "== PROTECTING connection: {} -> {}", id, conn.debug_print(get_aligned_timestamp()));
 | 
			
		||||
            conn.protect();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // Add to the connection table
 | 
			
		||||
        match self.arc.connection_table.add_connection(conn) {
 | 
			
		||||
            Ok(None) => {
 | 
			
		||||
@@ -173,7 +208,7 @@ impl ConnectionManager {
 | 
			
		||||
            Ok(Some(conn)) => {
 | 
			
		||||
                // Connection added and a different one LRU'd out
 | 
			
		||||
                // Send it to be terminated
 | 
			
		||||
                log_net!(debug "== LRU kill connection due to limit: {:?}", conn);
 | 
			
		||||
                // log_net!(debug "== LRU kill connection due to limit: {:?}", conn);
 | 
			
		||||
                let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn));
 | 
			
		||||
            }
 | 
			
		||||
            Err(ConnectionTableAddError::AddressFilter(conn, e)) => {
 | 
			
		||||
 
 | 
			
		||||
@@ -184,10 +184,20 @@ impl ConnectionTable {
 | 
			
		||||
        // then drop the least recently used connection
 | 
			
		||||
        let mut out_conn = None;
 | 
			
		||||
        if inner.conn_by_id[protocol_index].len() > inner.max_connections[protocol_index] {
 | 
			
		||||
            if let Some((lruk, lru_conn)) = inner.conn_by_id[protocol_index].peek_lru() {
 | 
			
		||||
            while let Some((lruk, lru_conn)) = inner.conn_by_id[protocol_index].peek_lru() {
 | 
			
		||||
                let lruk = *lruk;
 | 
			
		||||
                log_net!(debug "connection lru out: {:?}", lru_conn);
 | 
			
		||||
 | 
			
		||||
                // Don't LRU protected connections
 | 
			
		||||
                if lru_conn.is_protected() {
 | 
			
		||||
                    // Mark as recently used
 | 
			
		||||
                    log_net!(debug "== No LRU Out for PROTECTED connection: {} -> {}", lruk, lru_conn.debug_print(get_aligned_timestamp()));
 | 
			
		||||
                    inner.conn_by_id[protocol_index].get(&lruk);
 | 
			
		||||
                    continue;
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                log_net!(debug "== LRU Connection Killed: {} -> {}", lruk, lru_conn.debug_print(get_aligned_timestamp()));
 | 
			
		||||
                out_conn = Some(Self::remove_connection_records(&mut *inner, lruk));
 | 
			
		||||
                break;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -665,7 +665,7 @@ impl NetworkManager {
 | 
			
		||||
    #[instrument(level = "trace", skip(self), err)]
 | 
			
		||||
    pub async fn handle_signal(
 | 
			
		||||
        &self,
 | 
			
		||||
        connection_descriptor: ConnectionDescriptor,
 | 
			
		||||
        signal_connection_descriptor: ConnectionDescriptor,
 | 
			
		||||
        signal_info: SignalInfo,
 | 
			
		||||
    ) -> EyreResult<NetworkResult<()>> {
 | 
			
		||||
        match signal_info {
 | 
			
		||||
@@ -689,8 +689,9 @@ impl NetworkManager {
 | 
			
		||||
                };
 | 
			
		||||
 | 
			
		||||
                // Restrict reverse connection to same protocol as inbound signal
 | 
			
		||||
                let peer_nr = peer_nr
 | 
			
		||||
                    .filtered_clone(NodeRefFilter::from(connection_descriptor.protocol_type()));
 | 
			
		||||
                let peer_nr = peer_nr.filtered_clone(NodeRefFilter::from(
 | 
			
		||||
                    signal_connection_descriptor.protocol_type(),
 | 
			
		||||
                ));
 | 
			
		||||
 | 
			
		||||
                // Make a reverse connection to the peer and send the receipt to it
 | 
			
		||||
                rpc.rpc_call_return_receipt(Destination::direct(peer_nr), receipt)
 | 
			
		||||
 
 | 
			
		||||
@@ -94,6 +94,7 @@ pub struct NetworkConnection {
 | 
			
		||||
    stats: Arc<Mutex<NetworkConnectionStats>>,
 | 
			
		||||
    sender: flume::Sender<(Option<Id>, Vec<u8>)>,
 | 
			
		||||
    stop_source: Option<StopSource>,
 | 
			
		||||
    protected: bool,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl NetworkConnection {
 | 
			
		||||
@@ -112,6 +113,7 @@ impl NetworkConnection {
 | 
			
		||||
            })),
 | 
			
		||||
            sender,
 | 
			
		||||
            stop_source: None,
 | 
			
		||||
            protected: false,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -157,6 +159,7 @@ impl NetworkConnection {
 | 
			
		||||
            stats,
 | 
			
		||||
            sender,
 | 
			
		||||
            stop_source: Some(stop_source),
 | 
			
		||||
            protected: false,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -172,6 +175,14 @@ impl NetworkConnection {
 | 
			
		||||
        ConnectionHandle::new(self.connection_id, self.descriptor.clone(), self.sender.clone())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn is_protected(&self) -> bool {
 | 
			
		||||
        self.protected
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn protect(&mut self) {
 | 
			
		||||
        self.protected = true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn close(&mut self) {
 | 
			
		||||
        if let Some(stop_source) = self.stop_source.take() {
 | 
			
		||||
            // drop the stopper
 | 
			
		||||
 
 | 
			
		||||
@@ -18,6 +18,36 @@ impl NetworkManager {
 | 
			
		||||
        let this = self.clone();
 | 
			
		||||
        Box::pin(
 | 
			
		||||
            async move {
 | 
			
		||||
 | 
			
		||||
                // First try to send data to the last socket we've seen this peer on
 | 
			
		||||
                let data = if let Some(connection_descriptor) = destination_node_ref.last_connection() {
 | 
			
		||||
                    match this
 | 
			
		||||
                        .net()
 | 
			
		||||
                        .send_data_to_existing_connection(connection_descriptor, data)
 | 
			
		||||
                        .await?
 | 
			
		||||
                    {
 | 
			
		||||
                        None => {
 | 
			
		||||
                            // Update timestamp for this last connection since we just sent to it
 | 
			
		||||
                            destination_node_ref
 | 
			
		||||
                                .set_last_connection(connection_descriptor, get_aligned_timestamp());
 | 
			
		||||
 | 
			
		||||
                            return Ok(NetworkResult::value(SendDataKind::Existing(
 | 
			
		||||
                                connection_descriptor,
 | 
			
		||||
                            )));
 | 
			
		||||
                        }
 | 
			
		||||
                        Some(data) => {
 | 
			
		||||
                            // Couldn't send data to existing connection
 | 
			
		||||
                            // so pass the data back out
 | 
			
		||||
                            data
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                } else {
 | 
			
		||||
                    // No last connection
 | 
			
		||||
                    data
 | 
			
		||||
                };
 | 
			
		||||
 | 
			
		||||
                // No existing connection was found or usable, so we proceed to see how to make a new one
 | 
			
		||||
                
 | 
			
		||||
                // Get the best way to contact this node
 | 
			
		||||
                let contact_method = this.get_node_contact_method(destination_node_ref.clone())?;
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -244,7 +244,7 @@ pub trait NodeRefBase: Sized {
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn all_filtered_dial_info_details<F>(&self) -> Vec<DialInfoDetail> {
 | 
			
		||||
    fn all_filtered_dial_info_details(&self) -> Vec<DialInfoDetail> {
 | 
			
		||||
        let routing_domain_set = self.routing_domain_set();
 | 
			
		||||
        let dial_info_filter = self.dial_info_filter();
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -173,14 +173,14 @@ Future<VeilidConfig> getDefaultVeilidConfig(String programName) async {
 | 
			
		||||
        ws: VeilidConfigWS(
 | 
			
		||||
          connect: true,
 | 
			
		||||
          listen: !kIsWeb,
 | 
			
		||||
          maxConnections: 1024,
 | 
			
		||||
          maxConnections: 32,
 | 
			
		||||
          listenAddress: '',
 | 
			
		||||
          path: 'ws',
 | 
			
		||||
        ),
 | 
			
		||||
        wss: VeilidConfigWSS(
 | 
			
		||||
          connect: true,
 | 
			
		||||
          listen: false,
 | 
			
		||||
          maxConnections: 1024,
 | 
			
		||||
          maxConnections: 32,
 | 
			
		||||
          listenAddress: '',
 | 
			
		||||
          path: 'ws',
 | 
			
		||||
        ),
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user