connection fix
This commit is contained in:
parent
680c987321
commit
17ea68fb84
@ -139,6 +139,33 @@ impl ConnectionManager {
|
|||||||
debug!("finished connection manager shutdown");
|
debug!("finished connection manager shutdown");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Internal routine to see if we should keep this connection
|
||||||
|
// from being LRU removed. Used on our initiated relay connections.
|
||||||
|
fn should_protect_connection(&self, conn: &NetworkConnection) -> bool {
|
||||||
|
let netman = self.network_manager();
|
||||||
|
let routing_table = netman.routing_table();
|
||||||
|
let remote_address = conn.connection_descriptor().remote_address().address();
|
||||||
|
let Some(routing_domain) = routing_table.routing_domain_for_address(remote_address) else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let Some(rn) = routing_table.relay_node(routing_domain) else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let relay_nr = rn.filtered_clone(
|
||||||
|
NodeRefFilter::new()
|
||||||
|
.with_routing_domain(routing_domain)
|
||||||
|
.with_address_type(conn.connection_descriptor().address_type())
|
||||||
|
.with_protocol_type(conn.connection_descriptor().protocol_type()),
|
||||||
|
);
|
||||||
|
let dids = relay_nr.all_filtered_dial_info_details();
|
||||||
|
for did in dids {
|
||||||
|
if did.dial_info.address() == remote_address {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
// Internal routine to register new connection atomically.
|
// Internal routine to register new connection atomically.
|
||||||
// Registers connection in the connection table for later access
|
// Registers connection in the connection table for later access
|
||||||
// and spawns a message processing loop for the connection
|
// and spawns a message processing loop for the connection
|
||||||
@ -163,8 +190,16 @@ impl ConnectionManager {
|
|||||||
None => bail!("not creating connection because we are stopping"),
|
None => bail!("not creating connection because we are stopping"),
|
||||||
};
|
};
|
||||||
|
|
||||||
let conn = NetworkConnection::from_protocol(self.clone(), stop_token, prot_conn, id);
|
let mut conn = NetworkConnection::from_protocol(self.clone(), stop_token, prot_conn, id);
|
||||||
let handle = conn.get_handle();
|
let handle = conn.get_handle();
|
||||||
|
|
||||||
|
// See if this should be a protected connection
|
||||||
|
let protect = self.should_protect_connection(&conn);
|
||||||
|
if protect {
|
||||||
|
log_net!(debug "== PROTECTING connection: {} -> {}", id, conn.debug_print(get_aligned_timestamp()));
|
||||||
|
conn.protect();
|
||||||
|
}
|
||||||
|
|
||||||
// Add to the connection table
|
// Add to the connection table
|
||||||
match self.arc.connection_table.add_connection(conn) {
|
match self.arc.connection_table.add_connection(conn) {
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
@ -173,7 +208,7 @@ impl ConnectionManager {
|
|||||||
Ok(Some(conn)) => {
|
Ok(Some(conn)) => {
|
||||||
// Connection added and a different one LRU'd out
|
// Connection added and a different one LRU'd out
|
||||||
// Send it to be terminated
|
// Send it to be terminated
|
||||||
log_net!(debug "== LRU kill connection due to limit: {:?}", conn);
|
// log_net!(debug "== LRU kill connection due to limit: {:?}", conn);
|
||||||
let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn));
|
let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn));
|
||||||
}
|
}
|
||||||
Err(ConnectionTableAddError::AddressFilter(conn, e)) => {
|
Err(ConnectionTableAddError::AddressFilter(conn, e)) => {
|
||||||
|
@ -184,10 +184,20 @@ impl ConnectionTable {
|
|||||||
// then drop the least recently used connection
|
// then drop the least recently used connection
|
||||||
let mut out_conn = None;
|
let mut out_conn = None;
|
||||||
if inner.conn_by_id[protocol_index].len() > inner.max_connections[protocol_index] {
|
if inner.conn_by_id[protocol_index].len() > inner.max_connections[protocol_index] {
|
||||||
if let Some((lruk, lru_conn)) = inner.conn_by_id[protocol_index].peek_lru() {
|
while let Some((lruk, lru_conn)) = inner.conn_by_id[protocol_index].peek_lru() {
|
||||||
let lruk = *lruk;
|
let lruk = *lruk;
|
||||||
log_net!(debug "connection lru out: {:?}", lru_conn);
|
|
||||||
|
// Don't LRU protected connections
|
||||||
|
if lru_conn.is_protected() {
|
||||||
|
// Mark as recently used
|
||||||
|
log_net!(debug "== No LRU Out for PROTECTED connection: {} -> {}", lruk, lru_conn.debug_print(get_aligned_timestamp()));
|
||||||
|
inner.conn_by_id[protocol_index].get(&lruk);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
log_net!(debug "== LRU Connection Killed: {} -> {}", lruk, lru_conn.debug_print(get_aligned_timestamp()));
|
||||||
out_conn = Some(Self::remove_connection_records(&mut *inner, lruk));
|
out_conn = Some(Self::remove_connection_records(&mut *inner, lruk));
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -665,7 +665,7 @@ impl NetworkManager {
|
|||||||
#[instrument(level = "trace", skip(self), err)]
|
#[instrument(level = "trace", skip(self), err)]
|
||||||
pub async fn handle_signal(
|
pub async fn handle_signal(
|
||||||
&self,
|
&self,
|
||||||
connection_descriptor: ConnectionDescriptor,
|
signal_connection_descriptor: ConnectionDescriptor,
|
||||||
signal_info: SignalInfo,
|
signal_info: SignalInfo,
|
||||||
) -> EyreResult<NetworkResult<()>> {
|
) -> EyreResult<NetworkResult<()>> {
|
||||||
match signal_info {
|
match signal_info {
|
||||||
@ -689,8 +689,9 @@ impl NetworkManager {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Restrict reverse connection to same protocol as inbound signal
|
// Restrict reverse connection to same protocol as inbound signal
|
||||||
let peer_nr = peer_nr
|
let peer_nr = peer_nr.filtered_clone(NodeRefFilter::from(
|
||||||
.filtered_clone(NodeRefFilter::from(connection_descriptor.protocol_type()));
|
signal_connection_descriptor.protocol_type(),
|
||||||
|
));
|
||||||
|
|
||||||
// Make a reverse connection to the peer and send the receipt to it
|
// Make a reverse connection to the peer and send the receipt to it
|
||||||
rpc.rpc_call_return_receipt(Destination::direct(peer_nr), receipt)
|
rpc.rpc_call_return_receipt(Destination::direct(peer_nr), receipt)
|
||||||
|
@ -94,6 +94,7 @@ pub struct NetworkConnection {
|
|||||||
stats: Arc<Mutex<NetworkConnectionStats>>,
|
stats: Arc<Mutex<NetworkConnectionStats>>,
|
||||||
sender: flume::Sender<(Option<Id>, Vec<u8>)>,
|
sender: flume::Sender<(Option<Id>, Vec<u8>)>,
|
||||||
stop_source: Option<StopSource>,
|
stop_source: Option<StopSource>,
|
||||||
|
protected: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NetworkConnection {
|
impl NetworkConnection {
|
||||||
@ -112,6 +113,7 @@ impl NetworkConnection {
|
|||||||
})),
|
})),
|
||||||
sender,
|
sender,
|
||||||
stop_source: None,
|
stop_source: None,
|
||||||
|
protected: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -157,6 +159,7 @@ impl NetworkConnection {
|
|||||||
stats,
|
stats,
|
||||||
sender,
|
sender,
|
||||||
stop_source: Some(stop_source),
|
stop_source: Some(stop_source),
|
||||||
|
protected: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -172,6 +175,14 @@ impl NetworkConnection {
|
|||||||
ConnectionHandle::new(self.connection_id, self.descriptor.clone(), self.sender.clone())
|
ConnectionHandle::new(self.connection_id, self.descriptor.clone(), self.sender.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn is_protected(&self) -> bool {
|
||||||
|
self.protected
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn protect(&mut self) {
|
||||||
|
self.protected = true;
|
||||||
|
}
|
||||||
|
|
||||||
pub fn close(&mut self) {
|
pub fn close(&mut self) {
|
||||||
if let Some(stop_source) = self.stop_source.take() {
|
if let Some(stop_source) = self.stop_source.take() {
|
||||||
// drop the stopper
|
// drop the stopper
|
||||||
|
@ -18,6 +18,36 @@ impl NetworkManager {
|
|||||||
let this = self.clone();
|
let this = self.clone();
|
||||||
Box::pin(
|
Box::pin(
|
||||||
async move {
|
async move {
|
||||||
|
|
||||||
|
// First try to send data to the last socket we've seen this peer on
|
||||||
|
let data = if let Some(connection_descriptor) = destination_node_ref.last_connection() {
|
||||||
|
match this
|
||||||
|
.net()
|
||||||
|
.send_data_to_existing_connection(connection_descriptor, data)
|
||||||
|
.await?
|
||||||
|
{
|
||||||
|
None => {
|
||||||
|
// Update timestamp for this last connection since we just sent to it
|
||||||
|
destination_node_ref
|
||||||
|
.set_last_connection(connection_descriptor, get_aligned_timestamp());
|
||||||
|
|
||||||
|
return Ok(NetworkResult::value(SendDataKind::Existing(
|
||||||
|
connection_descriptor,
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
Some(data) => {
|
||||||
|
// Couldn't send data to existing connection
|
||||||
|
// so pass the data back out
|
||||||
|
data
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// No last connection
|
||||||
|
data
|
||||||
|
};
|
||||||
|
|
||||||
|
// No existing connection was found or usable, so we proceed to see how to make a new one
|
||||||
|
|
||||||
// Get the best way to contact this node
|
// Get the best way to contact this node
|
||||||
let contact_method = this.get_node_contact_method(destination_node_ref.clone())?;
|
let contact_method = this.get_node_contact_method(destination_node_ref.clone())?;
|
||||||
|
|
||||||
|
@ -244,7 +244,7 @@ pub trait NodeRefBase: Sized {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn all_filtered_dial_info_details<F>(&self) -> Vec<DialInfoDetail> {
|
fn all_filtered_dial_info_details(&self) -> Vec<DialInfoDetail> {
|
||||||
let routing_domain_set = self.routing_domain_set();
|
let routing_domain_set = self.routing_domain_set();
|
||||||
let dial_info_filter = self.dial_info_filter();
|
let dial_info_filter = self.dial_info_filter();
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user