This commit is contained in:
John Smith
2022-08-22 13:27:26 -04:00
parent 53ae04aff9
commit 997eca05b6
14 changed files with 920 additions and 154 deletions

View File

@@ -0,0 +1,380 @@
use super::*;
use crate::xx::*;
use igd::*;
use std::net::UdpSocket;
const UPNP_GATEWAY_DETECT_TIMEOUT_MS: u32 = 5_000;
const UPNP_MAPPING_LIFETIME_MS: u32 = 120_000;
const UPNP_MAPPING_ATTEMPTS: u32 = 3;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct PortMapKey {
llpt: LowLevelProtocolType,
at: AddressType,
local_port: u16,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct PortMapValue {
ext_ip: IpAddr,
mapped_port: u16,
timestamp: u64,
renewal_lifetime: u64,
renewal_attempts: u32,
}
struct IGDManagerInner {
local_ip_addrs: BTreeMap<AddressType, IpAddr>,
gateways: BTreeMap<AddressType, Arc<Gateway>>,
port_maps: BTreeMap<PortMapKey, PortMapValue>,
}
#[derive(Clone)]
pub struct IGDManager {
config: VeilidConfig,
inner: Arc<Mutex<IGDManagerInner>>,
}
fn convert_llpt(llpt: LowLevelProtocolType) -> PortMappingProtocol {
match llpt {
LowLevelProtocolType::UDP => PortMappingProtocol::UDP,
LowLevelProtocolType::TCP => PortMappingProtocol::TCP,
}
}
impl IGDManager {
//
pub fn new(config: VeilidConfig) -> Self {
Self {
config,
inner: Arc::new(Mutex::new(IGDManagerInner {
local_ip_addrs: BTreeMap::new(),
gateways: BTreeMap::new(),
port_maps: BTreeMap::new(),
})),
}
}
fn get_routed_local_ip_address(address_type: AddressType) -> Option<IpAddr> {
let socket = match UdpSocket::bind(match address_type {
AddressType::IPV4 => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
AddressType::IPV6 => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
}) {
Ok(s) => s,
Err(_) => return None,
};
// can be any routable ip address,
// this is just to make the system routing table calculate the appropriate local ip address
// using google's dns, but it wont actually send any packets to it
socket
.connect(match address_type {
AddressType::IPV4 => SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 0),
AddressType::IPV6 => SocketAddr::new(
IpAddr::V6(Ipv6Addr::new(0x2001, 0x4860, 0x4860, 0, 0, 0, 0, 0x8888)),
0,
),
})
.ok()?;
Some(socket.local_addr().ok()?.ip())
}
fn find_local_ip(inner: &mut IGDManagerInner,
address_type: AddressType,
) -> Option<IpAddr> {
if let Some(ip) = inner.local_ip_addrs.get(&address_type) {
return Some(*ip);
}
let ip = match Self::get_routed_local_ip_address(address_type) {
Some(x) => x,
None => {
log_net!("failed to get local ip address");
return None;
}
};
inner.local_ip_addrs.insert(address_type, ip);
Some(ip)
}
fn get_local_ip(
inner: &mut IGDManagerInner,
address_type: AddressType,
) -> Option<IpAddr> {
if let Some(ip) = inner.local_ip_addrs.get(&address_type) {
return Some(*ip);
}
None
}
fn find_gateway(
inner: &mut IGDManagerInner,
address_type: AddressType,
) -> Option<Arc<Gateway>> {
if let Some(gw) = inner.gateways.get(&address_type) {
return Some(gw.clone());
}
let gateway = match address_type {
AddressType::IPV4 => {
match igd::search_gateway(SearchOptions::new_v4(
UPNP_GATEWAY_DETECT_TIMEOUT_MS as u64,
)) {
Ok(v) => v,
Err(e) => {
log_net!("couldn't find ipv4 igd: {}", e);
return None;
}
}
}
AddressType::IPV6 => {
match igd::search_gateway(SearchOptions::new_v6(
Ipv6SearchScope::LinkLocal,
UPNP_GATEWAY_DETECT_TIMEOUT_MS as u64,
)) {
Ok(v) => v,
Err(e) => {
log_net!("couldn't find ipv6 igd: {}", e);
return None;
}
}
}
};
let gw = Arc::new(gateway);
inner.gateways.insert(address_type, gw.clone());
Some(gw)
}
fn get_gateway(
inner: &mut IGDManagerInner,
address_type: AddressType,
) -> Option<Arc<Gateway>> {
if let Some(gw) = inner.gateways.get(&address_type) {
return Some(gw.clone());
}
None
}
fn get_description(&self, llpt: LowLevelProtocolType, local_port:u16) -> String {
format!("{} map {} for port {}", self.config.get().program_name, convert_llpt(llpt), local_port )
}
pub async fn map_any_port(
&self,
llpt: LowLevelProtocolType,
at: AddressType,
local_port: u16,
expected_external_address: Option<IpAddr>,
) -> Option<SocketAddr> {
let this = self.clone();
intf::blocking_wrapper(move || {
let mut inner = this.inner.lock();
// If we already have this port mapped, just return the existing portmap
let pmkey = PortMapKey {
llpt,
at,
local_port,
};
if let Some(pmval) = inner.port_maps.get(&pmkey) {
return Some(SocketAddr::new(pmval.ext_ip, pmval.mapped_port));
}
// Get local ip address
let local_ip = Self::find_local_ip(&mut *inner, at)?;
// Find gateway
let gw = Self::find_gateway(&mut *inner, at)?;
// Get external address
let ext_ip = match gw.get_external_ip() {
Ok(ip) => ip,
Err(e) => {
log_net!(debug "couldn't get external ip from igd: {}", e);
return None;
}
};
// Ensure external IP matches address type
if ext_ip.is_ipv4() {
if at != AddressType::IPV4 {
log_net!(debug "mismatched ip address type from igd, wanted v4, got v6");
return None;
}
} else if ext_ip.is_ipv6() {
if at != AddressType::IPV6 {
log_net!(debug "mismatched ip address type from igd, wanted v6, got v4");
return None;
}
}
if let Some(expected_external_address) = expected_external_address {
if ext_ip != expected_external_address {
log_net!(debug "gateway external address does not match calculated external address: expected={} vs gateway={}", expected_external_address, ext_ip);
return None;
}
}
// Map any port
let desc = this.get_description(llpt, local_port);
let mapped_port = match gw.add_any_port(convert_llpt(llpt), SocketAddr::new(local_ip, local_port), (UPNP_MAPPING_LIFETIME_MS + 999) / 1000, &desc) {
Ok(mapped_port) => mapped_port,
Err(e) => {
// Failed to map external port
log_net!(debug "upnp failed to map external port: {}", e);
return None;
}
};
// Add to mapping list to keep alive
let timestamp = intf::get_timestamp();
inner.port_maps.insert(PortMapKey {
llpt,
at,
local_port,
}, PortMapValue {
ext_ip,
mapped_port,
timestamp,
renewal_lifetime: (UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64,
renewal_attempts: 0,
});
// Succeeded, return the externally mapped port
Some(SocketAddr::new(ext_ip, mapped_port))
}, None)
.await
}
pub async fn tick(&self) -> EyreResult<bool> {
// Refresh mappings if we have them
// If an error is received, then return false to restart the local network
let mut full_renews: Vec<(PortMapKey, PortMapValue)> = Vec::new();
let mut renews: Vec<(PortMapKey, PortMapValue)> = Vec::new();
let now = intf::get_timestamp();
const UPNP_MAPPING_LIFETIME_US:u64 = (UPNP_MAPPING_LIFETIME_MS as u64) * 1000u64;
{
let inner = self.inner.lock();
for (k, v) in &inner.port_maps {
if (now - v.timestamp) >= UPNP_MAPPING_LIFETIME_US || v.renewal_attempts >= UPNP_MAPPING_ATTEMPTS {
// Past expiration time or tried N times, do a full renew and fail out if we can't
full_renews.push((*k, *v));
}
else if (now - v.timestamp) >= v.renewal_lifetime {
// Attempt a normal renewal
renews.push((*k, *v));
}
}
// See if we need to do some blocking operations
if full_renews.is_empty() && renews.is_empty() {
// Just return now since there's nothing to renew
return Ok(true);
}
}
let this = self.clone();
intf::blocking_wrapper(move || {
let mut inner = this.inner.lock();
// Process full renewals
for (k, v) in full_renews {
// Get gateway for address type
let gw = match Self::get_gateway(&mut inner, k.at) {
Some(gw) => gw,
None => {
return Err(eyre!("gateway missing for address type"));
}
};
// Get local ip for address type
let local_ip = match Self::get_local_ip(&mut inner, k.at) {
Some(ip) => ip,
None => {
return Err(eyre!("local ip missing for address type"));
}
};
// Delete the mapping if it exists, ignore any errors here
let _ = gw.remove_port(convert_llpt(k.llpt), v.mapped_port);
inner.port_maps.remove(&k);
let desc = this.get_description(k.llpt, k.local_port);
match gw.add_any_port(convert_llpt(k.llpt), SocketAddr::new(local_ip, k.local_port), (UPNP_MAPPING_LIFETIME_MS + 999) / 1000, &desc) {
Ok(mapped_port) => {
log_net!(debug "full-renewed mapped port {:?} -> {:?}", v, k);
inner.port_maps.insert(k, PortMapValue {
ext_ip: v.ext_ip,
mapped_port,
timestamp: intf::get_timestamp(),
renewal_lifetime: (UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64,
renewal_attempts: 0,
});
},
Err(e) => {
info!("failed to full-renew mapped port {:?} -> {:?}: {}", v, k, e);
// Must restart network now :(
return Ok(false);
}
};
}
// Process normal renewals
for (k, mut v) in renews {
// Get gateway for address type
let gw = match Self::get_gateway(&mut inner, k.at) {
Some(gw) => gw,
None => {
return Err(eyre!("gateway missing for address type"));
}
};
// Get local ip for address type
let local_ip = match Self::get_local_ip(&mut inner, k.at) {
Some(ip) => ip,
None => {
return Err(eyre!("local ip missing for address type"));
}
};
let desc = this.get_description(k.llpt, k.local_port);
match gw.add_port(convert_llpt(k.llpt), v.mapped_port, SocketAddr::new(local_ip, k.local_port), (UPNP_MAPPING_LIFETIME_MS + 999) / 1000, &desc) {
Ok(()) => {
log_net!(debug "renewed mapped port {:?} -> {:?}", v, k);
inner.port_maps.insert(k, PortMapValue {
ext_ip: v.ext_ip,
mapped_port: v.mapped_port,
timestamp: intf::get_timestamp(),
renewal_lifetime: (UPNP_MAPPING_LIFETIME_MS / 2) as u64 * 1000u64,
renewal_attempts: 0,
});
},
Err(e) => {
log_net!(debug "failed to renew mapped port {:?} -> {:?}: {}", v, k, e);
// Get closer to the maximum renewal timeline by a factor of two each time
v.renewal_lifetime = (v.renewal_lifetime + UPNP_MAPPING_LIFETIME_US) / 2;
v.renewal_attempts += 1;
// Store new value to try again
inner.port_maps.insert(k, v);
}
};
}
// Normal exit, no restart
Ok(true)
}, Err(eyre!("failed to process blocking task"))).await
}
}

View File

@@ -1,3 +1,5 @@
mod igd_manager;
mod natpmp_manager;
mod network_class_discovery;
mod network_tcp;
mod network_udp;
@@ -67,6 +69,12 @@ struct NetworkUnlockedInner {
// Background processes
update_network_class_task: TickTask<EyreReport>,
network_interfaces_task: TickTask<EyreReport>,
upnp_task: TickTask<EyreReport>,
natpmp_task: TickTask<EyreReport>,
// Managers
igd_manager: igd_manager::IGDManager,
natpmp_manager: natpmp_manager::NATPMPManager,
}
#[derive(Clone)]
@@ -108,6 +116,7 @@ impl Network {
routing_table: RoutingTable,
connection_manager: ConnectionManager,
) -> NetworkUnlockedInner {
let config = network_manager.config();
NetworkUnlockedInner {
network_manager,
routing_table,
@@ -115,6 +124,10 @@ impl Network {
interfaces: NetworkInterfaces::new(),
update_network_class_task: TickTask::new(1),
network_interfaces_task: TickTask::new(5),
upnp_task: TickTask::new(1),
natpmp_task: TickTask::new(1),
igd_manager: igd_manager::IGDManager::new(config.clone()),
natpmp_manager: natpmp_manager::NATPMPManager::new(config),
}
}
@@ -151,6 +164,20 @@ impl Network {
Box::pin(this2.clone().network_interfaces_task_routine(s, l, t))
});
}
// Set upnp tick task
{
let this2 = this.clone();
this.unlocked_inner
.upnp_task
.set_routine(move |s, l, t| Box::pin(this2.clone().upnp_task_routine(s, l, t)));
}
// Set natpmp tick task
{
let this2 = this.clone();
this.unlocked_inner
.natpmp_task
.set_routine(move |s, l, t| Box::pin(this2.clone().natpmp_task_routine(s, l, t)));
}
this
}
@@ -257,6 +284,17 @@ impl Network {
}
}
pub fn get_local_port(&self, protocol_type: ProtocolType) -> u16 {
let inner = self.inner.lock();
let local_port = match protocol_type {
ProtocolType::UDP => inner.udp_port,
ProtocolType::TCP => inner.tcp_port,
ProtocolType::WS => inner.ws_port,
ProtocolType::WSS => inner.wss_port,
};
local_port
}
fn get_preferred_local_address(&self, dial_info: &DialInfo) -> SocketAddr {
let inner = self.inner.lock();
@@ -749,11 +787,47 @@ impl Network {
Ok(())
}
#[instrument(level = "trace", skip(self), err)]
pub async fn upnp_task_routine(
self,
stop_token: StopToken,
_l: u64,
_t: u64,
) -> EyreResult<()> {
if !self.unlocked_inner.igd_manager.tick().await? {
info!("upnp failed, restarting local network");
let mut inner = self.inner.lock();
inner.network_needs_restart = true;
}
Ok(())
}
#[instrument(level = "trace", skip(self), err)]
pub async fn natpmp_task_routine(
self,
stop_token: StopToken,
_l: u64,
_t: u64,
) -> EyreResult<()> {
if !self.unlocked_inner.natpmp_manager.tick().await? {
info!("natpmp failed, restarting local network");
let mut inner = self.inner.lock();
inner.network_needs_restart = true;
}
Ok(())
}
pub async fn tick(&self) -> EyreResult<()> {
let detect_address_changes = {
let (detect_address_changes, upnp, natpmp) = {
let config = self.network_manager().config();
let c = config.get();
c.network.detect_address_changes
(
c.network.detect_address_changes,
c.network.upnp,
c.network.natpmp,
)
};
// If we need to figure out our network class, tick the task for it
@@ -776,6 +850,16 @@ impl Network {
}
}
// If we need to tick upnp, do it
if upnp && !self.needs_restart() {
self.unlocked_inner.upnp_task.tick().await?;
}
// If we need to tick natpmp, do it
if natpmp && !self.needs_restart() {
self.unlocked_inner.natpmp_task.tick().await?;
}
Ok(())
}
}

View File

@@ -0,0 +1,18 @@
use super::*;
pub struct NATPMPManager {
config: VeilidConfig,
}
impl NATPMPManager {
//
pub fn new(config: VeilidConfig) -> Self {
Self { config }
}
pub async fn tick(&self) -> EyreResult<bool> {
// xxx
Ok(true)
}
}

View File

@@ -3,6 +3,7 @@ use super::*;
use futures_util::stream::FuturesUnordered;
use futures_util::FutureExt;
use stop_token::future::FutureExt as StopTokenFutureExt;
use tokio::task::spawn_blocking;
struct DetectedPublicDialInfo {
dial_info: DialInfo,
@@ -213,7 +214,39 @@ impl DiscoveryContext {
#[instrument(level = "trace", skip(self), ret)]
async fn try_port_mapping(&self) -> Option<DialInfo> {
//xxx
let (enable_upnp, enable_natpmp) = {
let c = self.net.config.get();
(c.network.upnp, c.network.natpmp)
};
if enable_upnp {
let (pt, llpt, at, external_address_1, local_port) = {
let inner = self.inner.lock();
let pt = inner.protocol_type.unwrap();
let llpt = pt.low_level_protocol_type();
let at = inner.address_type.unwrap();
let external_address_1 = inner.external_1_address.unwrap();
let local_port = self.net.get_local_port(pt);
(pt, llpt, at, external_address_1, local_port)
};
if let Some(mapped_external_address) = self
.net
.unlocked_inner
.igd_manager
.map_any_port(llpt, at, local_port, Some(external_address_1.to_ip_addr()))
.await
{
// make dial info from the port
return Some(
self.make_dial_info(
SocketAddress::from_socket_addr(mapped_external_address),
pt,
),
);
}
}
None
}