fixes
This commit is contained in:
@@ -6,6 +6,7 @@ use std::net::UdpSocket;
|
||||
const UPNP_GATEWAY_DETECT_TIMEOUT_MS: u32 = 5_000;
|
||||
const UPNP_MAPPING_LIFETIME_MS: u32 = 120_000;
|
||||
const UPNP_MAPPING_ATTEMPTS: u32 = 3;
|
||||
const UPNP_MAPPING_LIFETIME_US:u64 = (UPNP_MAPPING_LIFETIME_MS as u64) * 1000u64;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
struct PortMapKey {
|
||||
@@ -299,18 +300,17 @@ impl IGDManager {
|
||||
// If an error is received, then return false to restart the local network
|
||||
let mut full_renews: Vec<(PortMapKey, PortMapValue)> = Vec::new();
|
||||
let mut renews: Vec<(PortMapKey, PortMapValue)> = Vec::new();
|
||||
let now = intf::get_timestamp();
|
||||
const UPNP_MAPPING_LIFETIME_US:u64 = (UPNP_MAPPING_LIFETIME_MS as u64) * 1000u64;
|
||||
|
||||
{
|
||||
let inner = self.inner.lock();
|
||||
let now = intf::get_timestamp();
|
||||
|
||||
for (k, v) in &inner.port_maps {
|
||||
if (now - v.timestamp) >= UPNP_MAPPING_LIFETIME_US || v.renewal_attempts >= UPNP_MAPPING_ATTEMPTS {
|
||||
let mapping_lifetime = now.saturating_sub(v.timestamp);
|
||||
if mapping_lifetime >= UPNP_MAPPING_LIFETIME_US || v.renewal_attempts >= UPNP_MAPPING_ATTEMPTS {
|
||||
// Past expiration time or tried N times, do a full renew and fail out if we can't
|
||||
full_renews.push((*k, *v));
|
||||
}
|
||||
else if (now - v.timestamp) >= v.renewal_lifetime {
|
||||
else if mapping_lifetime >= v.renewal_lifetime {
|
||||
// Attempt a normal renewal
|
||||
renews.push((*k, *v));
|
||||
}
|
||||
|
@@ -1,5 +1,4 @@
|
||||
use super::*;
|
||||
//use futures_util::stream::FuturesOrdered;
|
||||
use futures_util::stream::FuturesUnordered;
|
||||
use futures_util::FutureExt;
|
||||
use stop_token::future::FutureExt as StopTokenFutureExt;
|
||||
@@ -612,19 +611,24 @@ impl Network {
|
||||
_t: u64,
|
||||
) -> EyreResult<()> {
|
||||
// Figure out if we can optimize TCP/WS checking since they are often on the same port
|
||||
let protocol_config = self.inner.lock().protocol_config.unwrap_or_default();
|
||||
let tcp_same_port = if protocol_config.inbound.contains(ProtocolType::TCP)
|
||||
&& protocol_config.inbound.contains(ProtocolType::WS)
|
||||
{
|
||||
let (protocol_config, existing_network_class, tcp_same_port) = {
|
||||
let inner = self.inner.lock();
|
||||
inner.tcp_port == inner.ws_port
|
||||
} else {
|
||||
false
|
||||
let protocol_config = inner.protocol_config.unwrap_or_default();
|
||||
let existing_network_class = inner.network_class;
|
||||
let tcp_same_port = if protocol_config.inbound.contains(ProtocolType::TCP)
|
||||
&& protocol_config.inbound.contains(ProtocolType::WS)
|
||||
{
|
||||
inner.tcp_port == inner.ws_port
|
||||
} else {
|
||||
false
|
||||
};
|
||||
(protocol_config, existing_network_class, tcp_same_port)
|
||||
};
|
||||
let routing_table = self.routing_table();
|
||||
let network_manager = self.network_manager();
|
||||
|
||||
// Process all protocol and address combinations
|
||||
let mut futures = FuturesUnordered::new();
|
||||
//let mut futures = FuturesOrdered::new();
|
||||
|
||||
// Do UDPv4+v6 at the same time as everything else
|
||||
if protocol_config.inbound.contains(ProtocolType::UDP) {
|
||||
// UDPv4
|
||||
@@ -750,19 +754,19 @@ impl Network {
|
||||
|
||||
// Wait for all discovery futures to complete and collect contexts
|
||||
let mut contexts = Vec::<DiscoveryContext>::new();
|
||||
let mut network_class = Option::<NetworkClass>::None;
|
||||
let mut new_network_class = Option::<NetworkClass>::None;
|
||||
loop {
|
||||
match futures.next().timeout_at(stop_token.clone()).await {
|
||||
Ok(Some(ctxvec)) => {
|
||||
if let Some(ctxvec) = ctxvec {
|
||||
for ctx in ctxvec {
|
||||
if let Some(nc) = ctx.inner.lock().detected_network_class {
|
||||
if let Some(last_nc) = network_class {
|
||||
if let Some(last_nc) = new_network_class {
|
||||
if nc < last_nc {
|
||||
network_class = Some(nc);
|
||||
new_network_class = Some(nc);
|
||||
}
|
||||
} else {
|
||||
network_class = Some(nc);
|
||||
new_network_class = Some(nc);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -781,47 +785,89 @@ impl Network {
|
||||
}
|
||||
}
|
||||
|
||||
// Get best network class
|
||||
if network_class.is_some() {
|
||||
// Update public dial info
|
||||
let routing_table = self.routing_table();
|
||||
let network_manager = self.network_manager();
|
||||
// If a network class could be determined
|
||||
// see about updating our public dial info
|
||||
let mut changed = false;
|
||||
if new_network_class.is_some() {
|
||||
// Get existing public dial info
|
||||
let existing_public_dial_info: HashSet<DialInfoDetail> = routing_table
|
||||
.all_filtered_dial_info_details(
|
||||
Some(RoutingDomain::PublicInternet),
|
||||
&DialInfoFilter::all(),
|
||||
)
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
// Get new public dial info and ensure it is valid
|
||||
let mut new_public_dial_info: HashSet<DialInfoDetail> = HashSet::new();
|
||||
for ctx in contexts {
|
||||
let inner = ctx.inner.lock();
|
||||
if let Some(pdi) = &inner.detected_public_dial_info {
|
||||
if let Err(e) = routing_table.register_dial_info(
|
||||
RoutingDomain::PublicInternet,
|
||||
pdi.dial_info.clone(),
|
||||
pdi.class,
|
||||
) {
|
||||
log_net!(warn "Failed to register detected public dial info: {}", e);
|
||||
if routing_table
|
||||
.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, &pdi.dial_info)
|
||||
{
|
||||
new_public_dial_info.insert(DialInfoDetail {
|
||||
class: pdi.class,
|
||||
dial_info: pdi.dial_info.clone(),
|
||||
});
|
||||
}
|
||||
|
||||
// duplicate for same port
|
||||
if tcp_same_port && pdi.dial_info.protocol_type() == ProtocolType::TCP {
|
||||
let ws_dial_info =
|
||||
ctx.make_dial_info(pdi.dial_info.socket_address(), ProtocolType::WS);
|
||||
if let Err(e) = routing_table.register_dial_info(
|
||||
RoutingDomain::PublicInternet,
|
||||
ws_dial_info,
|
||||
pdi.class,
|
||||
) {
|
||||
log_net!(warn "Failed to register detected public dial info: {}", e);
|
||||
if routing_table
|
||||
.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, &ws_dial_info)
|
||||
{
|
||||
new_public_dial_info.insert(DialInfoDetail {
|
||||
class: pdi.class,
|
||||
dial_info: ws_dial_info,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Update network class
|
||||
self.inner.lock().network_class = network_class;
|
||||
log_net!(debug "network class changed to {:?}", network_class);
|
||||
|
||||
// Is the public dial info different?
|
||||
if existing_public_dial_info != new_public_dial_info {
|
||||
// If so, clear existing public dial info and re-register the new public dial info
|
||||
routing_table.clear_dial_info_details(RoutingDomain::PublicInternet);
|
||||
for did in new_public_dial_info {
|
||||
if let Err(e) = routing_table.register_dial_info(
|
||||
RoutingDomain::PublicInternet,
|
||||
did.dial_info,
|
||||
did.class,
|
||||
) {
|
||||
log_net!(error "Failed to register detected public dial info: {}", e);
|
||||
}
|
||||
}
|
||||
changed = true;
|
||||
}
|
||||
|
||||
// Is the network class different?
|
||||
if existing_network_class != new_network_class {
|
||||
self.inner.lock().network_class = new_network_class;
|
||||
changed = true;
|
||||
log_net!(debug "network class changed to {:?}", new_network_class);
|
||||
}
|
||||
} else if existing_network_class.is_some() {
|
||||
// Network class could not be determined
|
||||
routing_table.clear_dial_info_details(RoutingDomain::PublicInternet);
|
||||
self.inner.lock().network_class = None;
|
||||
changed = true;
|
||||
log_net!(debug "network class cleared");
|
||||
}
|
||||
|
||||
// Punish nodes that told us our public address had changed when it didn't
|
||||
if !changed {
|
||||
if let Some(punish) = self.inner.lock().public_dial_info_check_punishment.take() {
|
||||
punish();
|
||||
}
|
||||
} else {
|
||||
// Send updates to everyone
|
||||
network_manager.send_node_info_updates(true).await;
|
||||
}
|
||||
|
||||
if !changed {}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
#[instrument(level = "trace", skip(self), err)]
|
||||
@@ -832,10 +878,11 @@ impl Network {
|
||||
t: u64,
|
||||
) -> EyreResult<()> {
|
||||
// Note that we are doing the public dial info check
|
||||
// We don't have to check this for concurrency, since this routine is run in a TickTask/SingleFuture
|
||||
self.inner.lock().doing_public_dial_info_check = true;
|
||||
|
||||
// Do the public dial info check
|
||||
let out = self.do_public_dial_info_check(stop_token, l, t);
|
||||
let out = self.do_public_dial_info_check(stop_token, l, t).await;
|
||||
|
||||
// Done with public dial info check
|
||||
self.inner.lock().doing_public_dial_info_check = false;
|
||||
|
Reference in New Issue
Block a user