network fixes
This commit is contained in:
@@ -173,6 +173,7 @@ impl ConnectionManager {
|
||||
Ok(Some(conn)) => {
|
||||
// Connection added and a different one LRU'd out
|
||||
// Send it to be terminated
|
||||
log_net!(debug "== LRU kill connection due to limit: {:?}", conn);
|
||||
let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn));
|
||||
}
|
||||
Err(ConnectionTableAddError::AddressFilter(conn, e)) => {
|
||||
@@ -205,40 +206,6 @@ impl ConnectionManager {
|
||||
.get_connection_by_descriptor(descriptor)
|
||||
}
|
||||
|
||||
// Terminate any connections that would collide with a new connection
|
||||
// using different protocols to the same remote address and port. Used to ensure
|
||||
// that we can switch quickly between TCP and WS if necessary to the same node
|
||||
// Returns true if we killed off colliding connections
|
||||
async fn kill_off_colliding_connections(&self, dial_info: &DialInfo) -> bool {
|
||||
let protocol_type = dial_info.protocol_type();
|
||||
let socket_address = dial_info.socket_address();
|
||||
|
||||
let killed = self.arc.connection_table.drain_filter(|prior_descriptor| {
|
||||
// If the protocol types aren't the same, then this is a candidate to be killed off
|
||||
// If they are the same, then we would just return the exact same connection from get_or_create_connection()
|
||||
if prior_descriptor.protocol_type() == protocol_type {
|
||||
return false;
|
||||
}
|
||||
// If the prior remote is not the same address, then we're not going to collide
|
||||
if *prior_descriptor.remote().socket_address() != socket_address {
|
||||
return false;
|
||||
}
|
||||
|
||||
log_net!(debug
|
||||
">< Terminating connection prior_descriptor={:?}",
|
||||
prior_descriptor
|
||||
);
|
||||
true
|
||||
});
|
||||
// Wait for the killed connections to end their recv loops
|
||||
let did_kill = !killed.is_empty();
|
||||
for mut k in killed {
|
||||
k.close();
|
||||
k.await;
|
||||
}
|
||||
did_kill
|
||||
}
|
||||
|
||||
/// Called when we want to create a new connection or get the current one that already exists
|
||||
/// This will kill off any connections that are in conflict with the new connection to be made
|
||||
/// in order to make room for the new connection in the system's connection table
|
||||
@@ -246,45 +213,53 @@ impl ConnectionManager {
|
||||
#[instrument(level = "trace", skip(self), ret, err)]
|
||||
pub async fn get_or_create_connection(
|
||||
&self,
|
||||
local_addr: Option<SocketAddr>,
|
||||
dial_info: DialInfo,
|
||||
) -> EyreResult<NetworkResult<ConnectionHandle>> {
|
||||
// Async lock on the remote address for atomicity per remote
|
||||
let peer_address = dial_info.to_peer_address();
|
||||
let remote_addr = peer_address.to_socket_addr();
|
||||
let mut preferred_local_address = self
|
||||
.network_manager()
|
||||
.net()
|
||||
.get_preferred_local_address(&dial_info);
|
||||
let best_port = preferred_local_address.map(|pla| pla.port());
|
||||
|
||||
// Async lock on the remote address for atomicity per remote
|
||||
let _lock_guard = self.arc.address_lock_table.lock_tag(remote_addr).await;
|
||||
|
||||
log_net!(
|
||||
"== get_or_create_connection local_addr={:?} dial_info={:?}",
|
||||
local_addr,
|
||||
dial_info
|
||||
);
|
||||
|
||||
// Kill off any possibly conflicting connections
|
||||
let did_kill = self.kill_off_colliding_connections(&dial_info).await;
|
||||
let mut retry_count = if did_kill { 2 } else { 0 };
|
||||
log_net!("== get_or_create_connection dial_info={:?}", dial_info);
|
||||
|
||||
// If any connection to this remote exists that has the same protocol, return it
|
||||
// Any connection will do, we don't have to match the local address
|
||||
if let Some(conn) = self
|
||||
// Any connection will do, we don't have to match the local address but if we can
|
||||
// match the preferred port do it
|
||||
if let Some(best_existing_conn) = self
|
||||
.arc
|
||||
.connection_table
|
||||
.get_last_connection_by_remote(peer_address)
|
||||
.get_best_connection_by_remote(best_port, peer_address)
|
||||
{
|
||||
log_net!(
|
||||
"== Returning existing connection local_addr={:?} peer_address={:?}",
|
||||
local_addr,
|
||||
peer_address
|
||||
"== Returning best existing connection {:?}",
|
||||
best_existing_conn
|
||||
);
|
||||
|
||||
return Ok(NetworkResult::Value(conn));
|
||||
return Ok(NetworkResult::Value(best_existing_conn));
|
||||
}
|
||||
|
||||
// If there is a low-level connection collision here, then we release the 'preferred local address'
|
||||
// so we can make a second connection with an ephemeral port
|
||||
if self
|
||||
.arc
|
||||
.connection_table
|
||||
.check_for_colliding_connection(&dial_info)
|
||||
{
|
||||
preferred_local_address = None;
|
||||
}
|
||||
|
||||
// Attempt new connection
|
||||
let mut retry_count = 0; // Someday, if we need this
|
||||
|
||||
let prot_conn = network_result_try!(loop {
|
||||
let result_net_res = ProtocolNetworkConnection::connect(
|
||||
local_addr,
|
||||
preferred_local_address,
|
||||
&dial_info,
|
||||
self.arc.connection_initial_timeout_ms,
|
||||
self.network_manager().address_filter(),
|
||||
@@ -292,24 +267,28 @@ impl ConnectionManager {
|
||||
.await;
|
||||
match result_net_res {
|
||||
Ok(net_res) => {
|
||||
// If the connection 'already exists', then try one last time to return a connection from the table, in case
|
||||
// an 'accept' happened at literally the same time as our connect
|
||||
if net_res.is_already_exists() {
|
||||
if let Some(conn) = self
|
||||
.arc
|
||||
.connection_table
|
||||
.get_last_connection_by_remote(peer_address)
|
||||
{
|
||||
log_net!(
|
||||
"== Returning existing connection in race local_addr={:?} peer_address={:?}",
|
||||
local_addr,
|
||||
peer_address
|
||||
);
|
||||
|
||||
return Ok(NetworkResult::Value(conn));
|
||||
}
|
||||
}
|
||||
// // If the connection 'already exists', then try one last time to return a connection from the table, in case
|
||||
// // an 'accept' happened at literally the same time as our connect. A preferred local address must have been
|
||||
// // specified otherwise we would have picked a different ephemeral port and this could not have happened
|
||||
// if net_res.is_already_exists() && preferred_local_address.is_some() {
|
||||
// // Make 'already existing' connection descriptor
|
||||
// let conn_desc = ConnectionDescriptor::new(
|
||||
// dial_info.to_peer_address(),
|
||||
// SocketAddress::from_socket_addr(preferred_local_address.unwrap()),
|
||||
// );
|
||||
// // Return the connection for this if we have it
|
||||
// if let Some(conn) = self
|
||||
// .arc
|
||||
// .connection_table
|
||||
// .get_connection_by_descriptor(conn_desc)
|
||||
// {
|
||||
// // Should not really happen, lets make sure we see this if it does
|
||||
// log_net!(warn "== Returning existing connection in race: {:?}", conn_desc);
|
||||
// return Ok(NetworkResult::Value(conn));
|
||||
// }
|
||||
// }
|
||||
if net_res.is_value() || retry_count == 0 {
|
||||
// Successful new connection, return it
|
||||
break net_res;
|
||||
}
|
||||
}
|
||||
|
@@ -92,6 +92,37 @@ impl ConnectionTable {
|
||||
while unord.next().await.is_some() {}
|
||||
}
|
||||
|
||||
// Return true if there is another connection in the table using a different protocol type
|
||||
// to the same address and port with the same low level protocol type.
|
||||
// Specifically right now this checks for a TCP connection that exists to the same
|
||||
// low level TCP remote as a WS or WSS connection, since they are all low-level TCP
|
||||
#[instrument(level = "trace", skip(self), ret)]
|
||||
pub fn check_for_colliding_connection(&self, dial_info: &DialInfo) -> bool {
|
||||
let inner = self.inner.lock();
|
||||
|
||||
let protocol_type = dial_info.protocol_type();
|
||||
let low_level_protocol_type = protocol_type.low_level_protocol_type();
|
||||
|
||||
// check protocol types
|
||||
let mut check_protocol_types = ProtocolTypeSet::empty();
|
||||
for check_pt in ProtocolTypeSet::all().iter() {
|
||||
if check_pt != protocol_type
|
||||
&& check_pt.low_level_protocol_type() == low_level_protocol_type
|
||||
{
|
||||
check_protocol_types.insert(check_pt);
|
||||
}
|
||||
}
|
||||
let socket_address = dial_info.socket_address();
|
||||
|
||||
for check_pt in check_protocol_types {
|
||||
let check_pa = PeerAddress::new(socket_address, check_pt);
|
||||
if inner.ids_by_remote.contains_key(&check_pa) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip(self), ret, err)]
|
||||
pub fn add_connection(
|
||||
&self,
|
||||
@@ -183,14 +214,42 @@ impl ConnectionTable {
|
||||
Some(out.get_handle())
|
||||
}
|
||||
|
||||
//#[instrument(level = "trace", skip(self), ret)]
|
||||
pub fn get_last_connection_by_remote(&self, remote: PeerAddress) -> Option<ConnectionHandle> {
|
||||
// #[instrument(level = "trace", skip(self), ret)]
|
||||
pub fn get_best_connection_by_remote(
|
||||
&self,
|
||||
best_port: Option<u16>,
|
||||
remote: PeerAddress,
|
||||
) -> Option<ConnectionHandle> {
|
||||
let mut inner = self.inner.lock();
|
||||
|
||||
let id = inner.ids_by_remote.get(&remote).map(|v| v[v.len() - 1])?;
|
||||
let all_ids_by_remote = inner.ids_by_remote.get(&remote)?;
|
||||
let protocol_index = Self::protocol_to_index(remote.protocol_type());
|
||||
let out = inner.conn_by_id[protocol_index].get(&id).unwrap();
|
||||
Some(out.get_handle())
|
||||
if all_ids_by_remote.len() == 0 {
|
||||
// no connections
|
||||
return None;
|
||||
}
|
||||
if all_ids_by_remote.len() == 1 {
|
||||
// only one connection
|
||||
let id = all_ids_by_remote[0];
|
||||
let nc = inner.conn_by_id[protocol_index].get(&id).unwrap();
|
||||
return Some(nc.get_handle());
|
||||
}
|
||||
// multiple connections, find the one that matches the best port, or the most recent
|
||||
if let Some(best_port) = best_port {
|
||||
for id in all_ids_by_remote.iter().copied() {
|
||||
let nc = inner.conn_by_id[protocol_index].peek(&id).unwrap();
|
||||
if let Some(local_addr) = nc.connection_descriptor().local() {
|
||||
if local_addr.port() == best_port {
|
||||
let nc = inner.conn_by_id[protocol_index].get(&id).unwrap();
|
||||
return Some(nc.get_handle());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// just return most recent network connection if a best port match can not be found
|
||||
let best_id = *all_ids_by_remote.last().unwrap();
|
||||
let nc = inner.conn_by_id[protocol_index].get(&best_id).unwrap();
|
||||
Some(nc.get_handle())
|
||||
}
|
||||
|
||||
//#[instrument(level = "trace", skip(self), ret)]
|
||||
@@ -204,26 +263,26 @@ impl ConnectionTable {
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
pub fn drain_filter<F>(&self, mut filter: F) -> Vec<NetworkConnection>
|
||||
where
|
||||
F: FnMut(ConnectionDescriptor) -> bool,
|
||||
{
|
||||
let mut inner = self.inner.lock();
|
||||
let mut filtered_ids = Vec::new();
|
||||
for cbi in &mut inner.conn_by_id {
|
||||
for (id, conn) in cbi {
|
||||
if filter(conn.connection_descriptor()) {
|
||||
filtered_ids.push(*id);
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut filtered_connections = Vec::new();
|
||||
for id in filtered_ids {
|
||||
let conn = Self::remove_connection_records(&mut *inner, id);
|
||||
filtered_connections.push(conn)
|
||||
}
|
||||
filtered_connections
|
||||
}
|
||||
// pub fn drain_filter<F>(&self, mut filter: F) -> Vec<NetworkConnection>
|
||||
// where
|
||||
// F: FnMut(ConnectionDescriptor) -> bool,
|
||||
// {
|
||||
// let mut inner = self.inner.lock();
|
||||
// let mut filtered_ids = Vec::new();
|
||||
// for cbi in &mut inner.conn_by_id {
|
||||
// for (id, conn) in cbi {
|
||||
// if filter(conn.connection_descriptor()) {
|
||||
// filtered_ids.push(*id);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// let mut filtered_connections = Vec::new();
|
||||
// for id in filtered_ids {
|
||||
// let conn = Self::remove_connection_records(&mut *inner, id);
|
||||
// filtered_connections.push(conn)
|
||||
// }
|
||||
// filtered_connections
|
||||
// }
|
||||
|
||||
pub fn connection_count(&self) -> usize {
|
||||
let inner = self.inner.lock();
|
||||
|
@@ -333,7 +333,7 @@ impl Network {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_local_port(&self, protocol_type: ProtocolType) -> u16 {
|
||||
pub fn get_local_port(&self, protocol_type: ProtocolType) -> Option<u16> {
|
||||
let inner = self.inner.lock();
|
||||
let local_port = match protocol_type {
|
||||
ProtocolType::UDP => inner.udp_port,
|
||||
@@ -341,10 +341,10 @@ impl Network {
|
||||
ProtocolType::WS => inner.ws_port,
|
||||
ProtocolType::WSS => inner.wss_port,
|
||||
};
|
||||
local_port
|
||||
Some(local_port)
|
||||
}
|
||||
|
||||
fn get_preferred_local_address(&self, dial_info: &DialInfo) -> SocketAddr {
|
||||
pub fn get_preferred_local_address(&self, dial_info: &DialInfo) -> Option<SocketAddr> {
|
||||
let inner = self.inner.lock();
|
||||
|
||||
let local_port = match dial_info.protocol_type() {
|
||||
@@ -354,10 +354,10 @@ impl Network {
|
||||
ProtocolType::WSS => inner.wss_port,
|
||||
};
|
||||
|
||||
match dial_info.address_type() {
|
||||
Some(match dial_info.address_type() {
|
||||
AddressType::IPV4 => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), local_port),
|
||||
AddressType::IPV6 => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), local_port),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn is_usable_interface_address(&self, addr: IpAddr) -> bool {
|
||||
@@ -631,10 +631,9 @@ impl Network {
|
||||
.wrap_err("failed to send data to dial info")?);
|
||||
} else {
|
||||
// Handle connection-oriented protocols
|
||||
let local_addr = self.get_preferred_local_address(&dial_info);
|
||||
let conn = network_result_try!(
|
||||
self.connection_manager()
|
||||
.get_or_create_connection(Some(local_addr), dial_info.clone())
|
||||
.get_or_create_connection(dial_info.clone())
|
||||
.await?
|
||||
);
|
||||
|
||||
|
@@ -256,7 +256,7 @@ impl DiscoveryContext {
|
||||
let at = inner.address_type.unwrap();
|
||||
let external_address_1 = inner.external_1_address.unwrap();
|
||||
let node_1 = inner.node_1.as_ref().unwrap().clone();
|
||||
let local_port = self.net.get_local_port(pt);
|
||||
let local_port = self.net.get_local_port(pt).unwrap();
|
||||
(pt, llpt, at, external_address_1, node_1, local_port)
|
||||
};
|
||||
|
||||
|
@@ -284,7 +284,7 @@ impl Network {
|
||||
// Handle connection-oriented protocols
|
||||
let conn = network_result_try!(
|
||||
self.connection_manager()
|
||||
.get_or_create_connection(None, dial_info.clone())
|
||||
.get_or_create_connection(dial_info.clone())
|
||||
.await?
|
||||
);
|
||||
|
||||
@@ -405,6 +405,14 @@ impl Network {
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
pub fn get_local_port(&self, protocol_type: ProtocolType) -> Option<u16> {
|
||||
None
|
||||
}
|
||||
|
||||
pub fn get_preferred_local_address(&self, dial_info: &DialInfo) -> Option<SocketAddr> {
|
||||
None
|
||||
}
|
||||
|
||||
//////////////////////////////////////////
|
||||
|
||||
pub fn set_needs_public_dial_info_check(
|
||||
|
Reference in New Issue
Block a user