fix public address check

This commit is contained in:
Christien Rioux 2023-08-23 13:46:02 -04:00
parent 07646679d0
commit 030a0073da
10 changed files with 295 additions and 233 deletions

30
Cargo.lock generated
View File

@ -854,9 +854,9 @@ dependencies = [
[[package]]
name = "cc"
version = "1.0.83"
version = "1.0.82"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0"
checksum = "305fe645edc1442a0fa8b6726ba61d422798d37a52e12eaecf4b022ebbb88f01"
dependencies = [
"libc",
]
@ -1510,9 +1510,9 @@ dependencies = [
[[package]]
name = "dashmap"
version = "5.5.1"
version = "5.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edd72493923899c6f10c641bdbdeddc7183d6396641d99c1a0d1597f37f92e28"
checksum = "6943ae99c34386c84a470c499d3414f66502a41340aa895406e0d2e4a207b91d"
dependencies = [
"cfg-if 1.0.0",
"hashbrown 0.14.0",
@ -2226,9 +2226,9 @@ dependencies = [
[[package]]
name = "h2"
version = "0.3.21"
version = "0.3.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91fc23aa11be92976ef4729127f1a74adf36d8436f7816b185d18df956790833"
checksum = "97ec8491ebaf99c8eaa73058b045fe58073cd6be7f596ac993ced0b0a0c01049"
dependencies = [
"bytes 1.4.0",
"fnv",
@ -3728,12 +3728,12 @@ dependencies = [
[[package]]
name = "petgraph"
version = "0.6.4"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9"
checksum = "4dd7d28ee937e54fe3080c91faa1c3a46c06de6252988a7f4592ba2310ef22a4"
dependencies = [
"fixedbitset",
"indexmap 2.0.0",
"indexmap 1.9.3",
]
[[package]]
@ -4434,9 +4434,9 @@ dependencies = [
[[package]]
name = "serde"
version = "1.0.185"
version = "1.0.183"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be9b6f69f1dfd54c3b568ffa45c310d6973a5e5148fd40cf515acaf38cf5bc31"
checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c"
dependencies = [
"serde_derive",
]
@ -4462,9 +4462,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.185"
version = "1.0.183"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc59dfdcbad1437773485e0367fea4b090a2e0a16d9ffc46af47764536a298ec"
checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816"
dependencies = [
"proc-macro2",
"quote",
@ -4721,9 +4721,9 @@ dependencies = [
[[package]]
name = "slab"
version = "0.4.9"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67"
checksum = "6528351c9bc8ab22353f9d776db39a20288e8d6c37ef8cfe3317cf875eecfc2d"
dependencies = [
"autocfg",
]

View File

@ -54,8 +54,8 @@ pub const IPADDR_TABLE_SIZE: usize = 1024;
pub const IPADDR_MAX_INACTIVE_DURATION_US: TimestampDuration =
TimestampDuration::new(300_000_000u64); // 5 minutes
pub const NODE_CONTACT_METHOD_CACHE_SIZE: usize = 1024;
pub const PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT: usize = 3;
pub const PUBLIC_ADDRESS_CHECK_CACHE_SIZE: usize = 8;
pub const PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT: usize = 5;
pub const PUBLIC_ADDRESS_CHECK_CACHE_SIZE: usize = 10;
pub const PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS: u32 = 60;
pub const PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US: TimestampDuration =
TimestampDuration::new(300_000_000u64); // 5 minutes
@ -1116,199 +1116,4 @@ impl NetworkManager {
// Inform caller that we dealt with the envelope locally
Ok(true)
}
// Determine if a local IP address has changed
// this means we should restart the low level network and and recreate all of our dial info
// Wait until we have received confirmation from N different peers
pub fn report_local_network_socket_address(
&self,
_socket_address: SocketAddress,
_connection_descriptor: ConnectionDescriptor,
_reporting_peer: NodeRef,
) {
// XXX: Nothing here yet.
}
// Determine if a global IP address has changed
// this means we should recreate our public dial info if it is not static and rediscover it
// Wait until we have received confirmation from N different peers
pub fn report_public_internet_socket_address(
&self,
socket_address: SocketAddress, // the socket address as seen by the remote peer
connection_descriptor: ConnectionDescriptor, // the connection descriptor used
reporting_peer: NodeRef, // the peer's noderef reporting the socket address
) {
#[cfg(feature = "verbose-tracing")]
debug!("report_global_socket_address\nsocket_address: {:#?}\nconnection_descriptor: {:#?}\nreporting_peer: {:#?}", socket_address, connection_descriptor, reporting_peer);
// Ignore these reports if we are currently detecting public dial info
let net = self.net();
if net.needs_public_dial_info_check() {
return;
}
let routing_table = self.routing_table();
let (detect_address_changes, ip6_prefix_size) = self.with_config(|c| {
(
c.network.detect_address_changes,
c.network.max_connections_per_ip6_prefix_size as usize,
)
});
// Get the ip(block) this report is coming from
let ipblock = ip_to_ipblock(
ip6_prefix_size,
connection_descriptor.remote_address().to_ip_addr(),
);
// Store the reported address if it isn't denylisted
let key = PublicAddressCheckCacheKey(
connection_descriptor.protocol_type(),
connection_descriptor.address_type(),
);
let mut inner = self.inner.lock();
let inner = &mut *inner;
let pacc = inner
.public_address_check_cache
.entry(key)
.or_insert_with(|| LruCache::new(PUBLIC_ADDRESS_CHECK_CACHE_SIZE));
let pait = inner
.public_address_inconsistencies_table
.entry(key)
.or_insert_with(|| HashMap::new());
if pait.contains_key(&ipblock) {
return;
}
pacc.insert(ipblock, socket_address, |_k, _v| {
// do nothing on LRU evict
});
// Determine if our external address has likely changed
let mut bad_public_address_detection_punishment: Option<
Box<dyn FnOnce() + Send + 'static>,
> = None;
let public_internet_network_class = routing_table
.get_network_class(RoutingDomain::PublicInternet)
.unwrap_or(NetworkClass::Invalid);
let needs_public_address_detection =
if matches!(public_internet_network_class, NetworkClass::InboundCapable) {
// Get the dial info filter for this connection so we can check if we have any public dialinfo that may have changed
let dial_info_filter = connection_descriptor.make_dial_info_filter();
// Get current external ip/port from registered global dialinfo
let current_addresses: BTreeSet<SocketAddress> = routing_table
.all_filtered_dial_info_details(
RoutingDomain::PublicInternet.into(),
&dial_info_filter,
)
.iter()
.map(|did| did.dial_info.socket_address())
.collect();
// If we are inbound capable, but start to see inconsistent socket addresses from multiple reporting peers
// then we zap the network class and re-detect it
let mut inconsistencies = Vec::new();
// Iteration goes from most recent to least recent node/address pair
for (reporting_ip_block, a) in pacc {
// If this address is not one of our current addresses (inconsistent)
// and we haven't already denylisted the reporting source,
if !current_addresses.contains(a) && !pait.contains_key(reporting_ip_block) {
// Record the origin of the inconsistency
inconsistencies.push(*reporting_ip_block);
}
}
// If we have enough inconsistencies to consider changing our public dial info,
// add them to our denylist (throttling) and go ahead and check for new
// public dialinfo
let inconsistent = if inconsistencies.len() >= PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT
{
let exp_ts = get_aligned_timestamp() + PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US;
for i in &inconsistencies {
pait.insert(*i, exp_ts);
}
// Run this routine if the inconsistent nodes turn out to be lying
let this = self.clone();
bad_public_address_detection_punishment = Some(Box::new(move || {
let mut inner = this.inner.lock();
let pait = inner
.public_address_inconsistencies_table
.entry(key)
.or_insert_with(|| HashMap::new());
let exp_ts = get_aligned_timestamp()
+ PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US;
for i in inconsistencies {
pait.insert(i, exp_ts);
}
}));
true
} else {
false
};
// // debug code
// if inconsistent {
// trace!("public_address_check_cache: {:#?}\ncurrent_addresses: {:#?}\ninconsistencies: {}", inner
// .public_address_check_cache, current_addresses, inconsistencies);
// }
inconsistent
} else if matches!(public_internet_network_class, NetworkClass::OutboundOnly) {
// If we are currently outbound only, we don't have any public dial info
// but if we are starting to see consistent socket address from multiple reporting peers
// then we may be become inbound capable, so zap the network class so we can re-detect it and any public dial info
let mut consistencies = 0;
let mut consistent = false;
let mut current_address = Option::<SocketAddress>::None;
// Iteration goes from most recent to least recent node/address pair
let pacc = inner
.public_address_check_cache
.entry(key)
.or_insert_with(|| LruCache::new(PUBLIC_ADDRESS_CHECK_CACHE_SIZE));
for (_, a) in pacc {
if let Some(current_address) = current_address {
if current_address == *a {
consistencies += 1;
if consistencies >= PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT {
consistent = true;
break;
}
}
} else {
current_address = Some(*a);
}
}
consistent
} else {
// If we are a webapp we never do this.
// If we have invalid network class, then public address detection is already going to happen via the network_class_discovery task
false
};
if needs_public_address_detection {
if detect_address_changes {
// Reset the address check cache now so we can start detecting fresh
info!("Public address has changed, detecting public dial info");
inner.public_address_check_cache.clear();
// Re-detect the public dialinfo
net.set_needs_public_dial_info_check(bad_public_address_detection_punishment);
} else {
warn!("Public address may have changed. Restarting the server may be required.");
warn!("report_global_socket_address\nsocket_address: {:#?}\nconnection_descriptor: {:#?}\nreporting_peer: {:#?}", socket_address, connection_descriptor, reporting_peer);
warn!(
"public_address_check_cache: {:#?}",
inner.public_address_check_cache
);
}
}
}
}

View File

@ -79,13 +79,13 @@ impl RawUdpProtocolHandler {
};
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.len", &size);
tracing::Span::current().record("ret.len", &message_len);
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.descriptor", &format!("{:?}", descriptor).as_str());
Ok((message_len, descriptor))
}
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.len, ret.descriptor)))]
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.descriptor)))]
pub async fn send_message(
&self,
data: Vec<u8>,
@ -133,8 +133,6 @@ impl RawUdpProtocolHandler {
SocketAddress::from_socket_addr(local_socket_addr),
);
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.len", &len);
#[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.descriptor", &format!("{:?}", descriptor).as_str());
Ok(NetworkResult::value(descriptor))

View File

@ -24,4 +24,250 @@ impl NetworkManager {
}
Ok(())
}
// Determine if a local IP address has changed
// this means we should restart the low level network and and recreate all of our dial info
// Wait until we have received confirmation from N different peers
pub fn report_local_network_socket_address(
&self,
_socket_address: SocketAddress,
_connection_descriptor: ConnectionDescriptor,
_reporting_peer: NodeRef,
) {
// XXX: Nothing here yet.
}
// Determine if a global IP address has changed
// this means we should recreate our public dial info if it is not static and rediscover it
// Wait until we have received confirmation from N different peers
pub fn report_public_internet_socket_address(
&self,
socket_address: SocketAddress, // the socket address as seen by the remote peer
connection_descriptor: ConnectionDescriptor, // the connection descriptor used
reporting_peer: NodeRef, // the peer's noderef reporting the socket address
) {
#[cfg(feature = "network-result-extra")]
debug!("report_global_socket_address\nsocket_address: {:#?}\nconnection_descriptor: {:#?}\nreporting_peer: {:#?}", socket_address, connection_descriptor, reporting_peer);
// Ignore these reports if we are currently detecting public dial info
let net = self.net();
if net.needs_public_dial_info_check() {
return;
}
// If we are a webapp we should skip this completely
// because we will never get inbound dialinfo directly on our public ip address
// If we have an invalid network class, this is not necessary yet
let routing_table = self.routing_table();
let public_internet_network_class = routing_table
.get_network_class(RoutingDomain::PublicInternet)
.unwrap_or(NetworkClass::Invalid);
if matches!(
public_internet_network_class,
NetworkClass::Invalid | NetworkClass::WebApp
) {
return;
}
let (detect_address_changes, ip6_prefix_size) = self.with_config(|c| {
(
c.network.detect_address_changes,
c.network.max_connections_per_ip6_prefix_size as usize,
)
});
// Get the ip(block) this report is coming from
let ipblock = ip_to_ipblock(
ip6_prefix_size,
connection_descriptor.remote_address().to_ip_addr(),
);
// Reject public address reports from nodes that we know are behind symmetric nat or
// nodes that must be using a relay for everything
let Some(node_info) = reporting_peer.node_info(RoutingDomain::PublicInternet) else {
return;
};
if node_info.network_class() != NetworkClass::InboundCapable {
return;
}
// Check if the public address report is coming from a node/block that gives an 'inconsistent' location
// meaning that the node may be not useful for public address detection
// This is done on a per address/protocol basis
let mut inner = self.inner.lock();
let inner = &mut *inner;
let addr_proto_type_key = PublicAddressCheckCacheKey(
connection_descriptor.protocol_type(),
connection_descriptor.address_type(),
);
if inner
.public_address_inconsistencies_table
.get(&addr_proto_type_key)
.map(|pait| pait.contains_key(&ipblock))
.unwrap_or(false)
{
return;
}
// Insert this new public address into the lru cache for the address check
// if we've seen this address before, it brings it to the front
let pacc = inner
.public_address_check_cache
.entry(addr_proto_type_key)
.or_insert_with(|| LruCache::new(PUBLIC_ADDRESS_CHECK_CACHE_SIZE));
pacc.insert(ipblock, socket_address, |_k, _v| {
// do nothing on LRU evict
});
// Determine if our external address has likely changed
let mut bad_public_address_detection_punishment: Option<
Box<dyn FnOnce() + Send + 'static>,
> = None;
let needs_public_address_detection = if matches!(
public_internet_network_class,
NetworkClass::InboundCapable
) {
// Get the dial info filter for this connection so we can check if we have any public dialinfo that may have changed
let dial_info_filter = connection_descriptor.make_dial_info_filter();
// Get current external ip/port from registered global dialinfo
let current_addresses: BTreeSet<SocketAddress> = routing_table
.all_filtered_dial_info_details(
RoutingDomain::PublicInternet.into(),
&dial_info_filter,
)
.iter()
.map(|did| {
// Strip port from direct and mapped addresses
// as the incoming dialinfo may not match the outbound
// connections' NAT mapping. In this case we only check for IP address changes.
if did.class == DialInfoClass::Direct || did.class == DialInfoClass::Mapped {
did.dial_info.socket_address().with_port(0)
} else {
did.dial_info.socket_address()
}
})
.collect();
// If we are inbound capable, but start to see inconsistent socket addresses from multiple reporting peers
// then we zap the network class and re-detect it
// Keep list of the origin ip blocks of inconsistent public address reports
let mut inconsistencies = Vec::new();
// Iteration goes from most recent to least recent node/address pair
for (reporting_ip_block, a) in pacc {
// If this address is not one of our current addresses (inconsistent)
// and we haven't already denylisted the reporting source,
// Also check address with port zero in the even we are only checking changes to ip addresses
if !current_addresses.contains(a)
&& !current_addresses.contains(&a.with_port(0))
&& !inner
.public_address_inconsistencies_table
.get(&addr_proto_type_key)
.map(|pait| pait.contains_key(reporting_ip_block))
.unwrap_or(false)
{
// Record the origin of the inconsistency
#[cfg(feature = "network-result-extra")]
debug!("inconsistency added from {:?}: reported {:?} with current_addresses = {:?}", reporting_ip_block, a, current_addresses);
inconsistencies.push(*reporting_ip_block);
}
}
// If we have enough inconsistencies to consider changing our public dial info,
// add them to our denylist (throttling) and go ahead and check for new
// public dialinfo
let inconsistent = if inconsistencies.len() >= PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT {
let exp_ts = get_aligned_timestamp() + PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US;
let pait = inner
.public_address_inconsistencies_table
.entry(addr_proto_type_key)
.or_insert_with(|| HashMap::new());
for i in &inconsistencies {
pait.insert(*i, exp_ts);
}
// Run this routine if the inconsistent nodes turn out to be lying
let this = self.clone();
bad_public_address_detection_punishment = Some(Box::new(move || {
let mut inner = this.inner.lock();
let pait = inner
.public_address_inconsistencies_table
.entry(addr_proto_type_key)
.or_insert_with(|| HashMap::new());
let exp_ts = get_aligned_timestamp()
+ PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US;
for i in inconsistencies {
pait.insert(i, exp_ts);
}
}));
true
} else {
false
};
// // debug code
// if inconsistent {
// trace!("public_address_check_cache: {:#?}\ncurrent_addresses: {:#?}\ninconsistencies: {}", inner
// .public_address_check_cache, current_addresses, inconsistencies);
// }
inconsistent
} else if matches!(public_internet_network_class, NetworkClass::OutboundOnly) {
// If we are currently outbound only, we don't have any public dial info
// but if we are starting to see consistent socket address from multiple reporting peers
// then we may be become inbound capable, so zap the network class so we can re-detect it and any public dial info
let mut consistencies = 0;
let mut consistent = false;
let mut current_address = Option::<SocketAddress>::None;
// Iteration goes from most recent to least recent node/address pair
for (_, a) in pacc {
if let Some(current_address) = current_address {
if current_address == *a {
consistencies += 1;
if consistencies >= PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT {
consistent = true;
break;
}
}
} else {
current_address = Some(*a);
}
}
consistent
} else {
// If we are a webapp we never do this.
// If we have invalid network class, then public address detection is already going to happen via the network_class_discovery task
// we should have checked for this condition earlier at the top of this function
unreachable!();
};
if needs_public_address_detection {
if detect_address_changes {
// Reset the address check cache now so we can start detecting fresh
info!("Public address has changed, detecting public dial info");
inner.public_address_check_cache.clear();
// Re-detect the public dialinfo
net.set_needs_public_dial_info_check(bad_public_address_detection_punishment);
} else {
warn!("Public address may have changed. Restarting the server may be required.");
warn!("report_global_socket_address\nsocket_address: {:#?}\nconnection_descriptor: {:#?}\nreporting_peer: {:#?}", socket_address, connection_descriptor, reporting_peer);
warn!(
"public_address_check_cache: {:#?}",
inner.public_address_check_cache
);
}
}
}
}

View File

@ -33,6 +33,11 @@ impl SocketAddress {
pub fn set_port(&mut self, port: u16) {
self.port = port
}
pub fn with_port(&self, port: u16) -> Self {
let mut sa = self.clone();
sa.port = port;
sa
}
pub fn to_canonical(&self) -> SocketAddress {
SocketAddress {
address: self.address.to_canonical(),

View File

@ -527,10 +527,6 @@ impl RoutingTable {
}
/// Look up the best way for two nodes to reach each other over a specific routing domain
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self), ret)
)]
pub fn get_contact_method(
&self,
routing_domain: RoutingDomain,

View File

@ -215,10 +215,6 @@ impl RoutingTableInner {
true
}
#[cfg_attr(
feature = "verbose-tracing",
instrument(level = "trace", skip(self), ret)
)]
pub fn get_contact_method(
&self,
routing_domain: RoutingDomain,

View File

@ -208,7 +208,7 @@
isa = PBXProject;
attributes = {
LastSwiftUpdateCheck = 0920;
LastUpgradeCheck = 1300;
LastUpgradeCheck = 1430;
ORGANIZATIONNAME = "";
TargetAttributes = {
33CC10EC2044A3C60003C045 = {

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "1300"
LastUpgradeVersion = "1430"
version = "1.3">
<BuildAction
parallelizeBuildables = "YES"

View File

@ -9,20 +9,36 @@ edition = "2021"
crate-type = ["cdylib", "staticlib", "rlib"]
[features]
default = [ "rt-tokio", "veilid-core/default" ]
crypto-test = [ "rt-tokio", "veilid-core/crypto-test"]
rt-async-std = [ "veilid-core/rt-async-std", "async-std", "opentelemetry/rt-async-std", "opentelemetry-otlp/grpc-sys"]
rt-tokio = [ "veilid-core/rt-tokio", "tokio", "tokio-stream", "tokio-util", "opentelemetry/rt-tokio"]
default = ["rt-tokio", "veilid-core/default"]
crypto-test = ["rt-tokio", "veilid-core/crypto-test"]
rt-async-std = [
"veilid-core/rt-async-std",
"async-std",
"opentelemetry/rt-async-std",
"opentelemetry-otlp/grpc-sys",
]
rt-tokio = [
"veilid-core/rt-tokio",
"tokio",
"tokio-stream",
"tokio-util",
"opentelemetry/rt-tokio",
]
[dependencies]
veilid-core = { path="../../veilid-core", default-features = false }
veilid-core = { path = "../../veilid-core", default-features = false, features = [
"verbose-tracing",
"network-result-extra",
] }
tracing = { version = "^0", features = ["log", "attributes"] }
tracing-subscriber = "^0"
parking_lot = "^0"
backtrace = "^0"
serde_json = "^1"
serde = "^1"
futures-util = { version = "^0", default_features = false, features = ["alloc"] }
futures-util = { version = "^0", default_features = false, features = [
"alloc",
] }
cfg-if = "^1"
data-encoding = { version = "^2" }
@ -36,7 +52,7 @@ opentelemetry-semantic-conventions = "0.10"
async-std = { version = "^1", features = ["unstable"], optional = true }
tokio = { version = "^1", features = ["full"], optional = true }
tokio-stream = { version = "^0", features = ["net"], optional = true }
tokio-util = { version = "^0", features = ["compat"], optional = true}
tokio-util = { version = "^0", features = ["compat"], optional = true }
allo-isolate = "^0"
ffi-support = "^0"
lazy_static = "^1"