more refactor

This commit is contained in:
John Smith 2021-12-23 20:34:52 -05:00
parent 5826551763
commit 922470365a
9 changed files with 354 additions and 281 deletions
veilid-core/src

View File

@ -394,13 +394,14 @@ impl Network {
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
// TCP listener that multiplexes ports so multiple protocols can exist on a single port
async fn start_tcp_listener( async fn start_tcp_listener(
&self, &self,
address: String, address: String,
is_tls: bool, is_tls: bool,
new_tcp_protocol_handler: Box<NewTcpProtocolHandler>, new_tcp_protocol_handler: Box<NewTcpProtocolHandler>,
) -> Result<Vec<(Address, u16)>, String> { ) -> Result<Vec<SocketAddress>, String> {
let mut out = Vec::<(Address, u16)>::new(); let mut out = Vec::<SocketAddress>::new();
// convert to socketaddrs // convert to socketaddrs
let mut sockaddrs = address let mut sockaddrs = address
.to_socket_addrs() .to_socket_addrs()
@ -442,7 +443,7 @@ impl Network {
// Return local dial infos we listen on // Return local dial infos we listen on
for ldi_addr in ldi_addrs { for ldi_addr in ldi_addrs {
out.push((Address::from_socket_addr(ldi_addr), ldi_addr.port())); out.push(SocketAddress::from_socket_addr(ldi_addr));
} }
} }
@ -613,27 +614,6 @@ impl Network {
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
fn match_socket_addr(
inner: &NetworkInner,
listen_socket_addr: &SocketAddr,
peer_socket_addr: &SocketAddr,
) -> bool {
let ldi_addrs = Self::translate_unspecified_address(inner, listen_socket_addr);
// xxx POSSIBLE CONCERN (verify this?)
// xxx will need to be reworked to search routing table information if we
// xxx allow systems to be dual homed with multiple interfaces eventually
// xxx to ensure the socket on the appropriate interface is chosen
// xxx this may not be necessary if the kernel automatically picks the right interface
// xxx it may do that. need to verify that though
for local_addr in &ldi_addrs {
if mem::discriminant(local_addr) == mem::discriminant(peer_socket_addr) {
return true;
}
}
false
}
fn find_best_udp_protocol_handler( fn find_best_udp_protocol_handler(
&self, &self,
peer_socket_addr: &SocketAddr, peer_socket_addr: &SocketAddr,
@ -785,7 +765,7 @@ impl Network {
} }
pub async fn send_data(&self, node_ref: NodeRef, data: Vec<u8>) -> Result<(), String> { pub async fn send_data(&self, node_ref: NodeRef, data: Vec<u8>) -> Result<(), String> {
let dial_info = node_ref.dial_info(); let dial_info = node_ref.best_dial_info();
let descriptor = node_ref.last_connection(); let descriptor = node_ref.last_connection();
// First try to send data to the last socket we've seen this peer on // First try to send data to the last socket we've seen this peer on
@ -880,7 +860,7 @@ impl Network {
) )
}; };
trace!("WS: starting listener at {:?}", listen_address); trace!("WS: starting listener at {:?}", listen_address);
let addresses = self let socket_addresses = self
.start_tcp_listener( .start_tcp_listener(
listen_address.clone(), listen_address.clone(),
false, false,
@ -890,34 +870,45 @@ impl Network {
trace!("WS: listener started"); trace!("WS: listener started");
let mut dial_infos: Vec<DialInfo> = Vec::new(); let mut dial_infos: Vec<DialInfo> = Vec::new();
for (a, p) in addresses { for socket_address in socket_addresses {
// Pick out WS port for outbound connections (they will all be the same) // Pick out WS port for outbound connections (they will all be the same)
self.inner.lock().ws_port = p; self.inner.lock().ws_port = socket_address.port();
xxx continue here // Build local dial info request url
let di = DialInfo::try_ws(a.address_string(), p, path.clone()); let local_url = format!("ws://{}/{}", socket_address, path);
// Create local dial info
let di = DialInfo::try_ws(socket_address, local_url)
.map_err(map_to_string)
.map_err(logthru_net!(error))?;
dial_infos.push(di.clone()); dial_infos.push(di.clone());
routing_table.register_local_dial_info(di, DialInfoOrigin::Static); routing_table.register_local_dial_info(di, DialInfoOrigin::Static);
} }
// Add static public dialinfo if it's configured // Add static public dialinfo if it's configured
if let Some(url) = url.as_ref() { if let Some(url) = url.as_ref() {
let split_url = SplitUrl::from_str(url)?; let mut split_url = SplitUrl::from_str(url)?;
if split_url.scheme.to_ascii_lowercase() != "ws" { if split_url.scheme.to_ascii_lowercase() != "ws" {
return Err("WS URL must use 'ws://' scheme".to_owned()); return Err("WS URL must use 'ws://' scheme".to_owned());
} }
split_url.scheme = "ws".to_owned();
// Resolve static public hostnames
let global_socket_addrs = split_url
.host
.to_socket_addrs()
.await
.map_err(map_to_string)
.map_err(logthru_net!(error))?;
for gsa in global_socket_addrs {
routing_table.register_global_dial_info( routing_table.register_global_dial_info(
DialInfo::ws( DialInfo::try_ws(SocketAddress::from_socket_addr(gsa), url.clone())
split_url.host, .map_err(map_to_string)
split_url.port.unwrap_or(80), .map_err(logthru_net!(error))?,
split_url
.path
.map(|p| p.to_string())
.unwrap_or_else(|| "/".to_string()),
),
Some(NetworkClass::Server), Some(NetworkClass::Server),
DialInfoOrigin::Static, DialInfoOrigin::Static,
); );
} }
}
Ok(()) Ok(())
} }
@ -932,7 +923,7 @@ xxx continue here
) )
}; };
trace!("WSS: starting listener at {}", listen_address); trace!("WSS: starting listener at {}", listen_address);
let addresses = self let socket_addresses = self
.start_tcp_listener( .start_tcp_listener(
listen_address.clone(), listen_address.clone(),
true, true,
@ -947,33 +938,40 @@ xxx continue here
// This is not the case with unencrypted websockets, which can be specified solely by an IP address // This is not the case with unencrypted websockets, which can be specified solely by an IP address
// //
// let mut dial_infos: Vec<DialInfo> = Vec::new(); // let mut dial_infos: Vec<DialInfo> = Vec::new();
for (_, p) in addresses { for socket_address in socket_addresses {
// Pick out WS port for outbound connections (they will all be the same) // Pick out WSS port for outbound connections (they will all be the same)
self.inner.lock().wss_port = p; self.inner.lock().wss_port = socket_address.port();
// let di = DialInfo::wss(a.address_string(), p, path.clone()); // Don't register local dial info because TLS won't allow that anyway without a local CA
// dial_infos.push(di.clone()); // and we aren't doing that yet at all today.
// routing_table.register_local_dial_info(di, DialInfoOrigin::Static);
} }
// Add static public dialinfo if it's configured // Add static public dialinfo if it's configured
if let Some(url) = url.as_ref() { if let Some(url) = url.as_ref() {
let split_url = SplitUrl::from_str(url)?; // Add static public dialinfo if it's configured
let mut split_url = SplitUrl::from_str(url)?;
if split_url.scheme.to_ascii_lowercase() != "wss" { if split_url.scheme.to_ascii_lowercase() != "wss" {
return Err("WSS URL must use 'wss://' scheme".to_owned()); return Err("WSS URL must use 'wss://' scheme".to_owned());
} }
split_url.scheme = "wss".to_owned();
// Resolve static public hostnames
let global_socket_addrs = split_url
.host
.to_socket_addrs()
.await
.map_err(map_to_string)
.map_err(logthru_net!(error))?;
for gsa in global_socket_addrs {
routing_table.register_global_dial_info( routing_table.register_global_dial_info(
DialInfo::wss( DialInfo::try_wss(SocketAddress::from_socket_addr(gsa), url.clone())
split_url.host, .map_err(map_to_string)
split_url.port.unwrap_or(443), .map_err(logthru_net!(error))?,
split_url
.path
.map(|p| p.to_string())
.unwrap_or_else(|| "/".to_string()),
),
Some(NetworkClass::Server), Some(NetworkClass::Server),
DialInfoOrigin::Static, DialInfoOrigin::Static,
); );
}
} else { } else {
return Err("WSS URL must be specified due to TLS requirements".to_owned()); return Err("WSS URL must be specified due to TLS requirements".to_owned());
} }
@ -991,7 +989,7 @@ xxx continue here
) )
}; };
trace!("TCP: starting listener at {}", &listen_address); trace!("TCP: starting listener at {}", &listen_address);
let addresses = self let socket_addresses = self
.start_tcp_listener( .start_tcp_listener(
listen_address.clone(), listen_address.clone(),
false, false,
@ -1001,11 +999,11 @@ xxx continue here
trace!("TCP: listener started"); trace!("TCP: listener started");
let mut dial_infos: Vec<DialInfo> = Vec::new(); let mut dial_infos: Vec<DialInfo> = Vec::new();
for (a, p) in addresses { for socket_address in socket_addresses {
// Pick out TCP port for outbound connections (they will all be the same) // Pick out TCP port for outbound connections (they will all be the same)
self.inner.lock().tcp_port = p; self.inner.lock().tcp_port = socket_address.port();
let di = DialInfo::tcp(a.to_canonical(), p); let di = DialInfo::tcp(socket_address);
dial_infos.push(di.clone()); dial_infos.push(di.clone());
routing_table.register_local_dial_info(di, DialInfoOrigin::Static); routing_table.register_local_dial_info(di, DialInfoOrigin::Static);
} }
@ -1128,10 +1126,10 @@ xxx continue here
return inner.network_class; return inner.network_class;
} }
// Go through our public dialinfo and see what our best network class is // Go through our global dialinfo and see what our best network class is
let mut network_class = NetworkClass::Invalid; let mut network_class = NetworkClass::Invalid;
for x in routing_table.global_dial_info() { for did in routing_table.global_dial_info_details() {
if let Some(nc) = x.network_class { if let Some(nc) = did.network_class {
if nc < network_class { if nc < network_class {
network_class = nc; network_class = nc;
} }
@ -1172,9 +1170,13 @@ xxx continue here
&& !udp_static_public_dialinfo && !udp_static_public_dialinfo
&& (network_class.inbound_capable() || network_class == NetworkClass::Invalid) && (network_class.inbound_capable() || network_class == NetworkClass::Invalid)
{ {
let filter = DialInfoFilter::with_protocol_type_and_address_type(
ProtocolType::UDP,
AddressType::IPV4,
);
let need_udpv4_dialinfo = routing_table let need_udpv4_dialinfo = routing_table
.global_dial_info_for_protocol_address_type(ProtocolAddressType::UDPv4) .first_filtered_global_dial_info_details(|d| d.dial_info.matches_filter(&filter))
.is_empty(); .is_none();
if need_udpv4_dialinfo { if need_udpv4_dialinfo {
// If we have no public UDPv4 dialinfo, then we need to run a NAT check // If we have no public UDPv4 dialinfo, then we need to run a NAT check
// ensure the singlefuture is running for this // ensure the singlefuture is running for this
@ -1186,13 +1188,17 @@ xxx continue here
} }
// Same but for TCPv4 // Same but for TCPv4
if protocol_config.tcp_enabled if protocol_config.tcp_listen
&& !tcp_static_public_dialinfo && !tcp_static_public_dialinfo
&& (network_class.inbound_capable() || network_class == NetworkClass::Invalid) && (network_class.inbound_capable() || network_class == NetworkClass::Invalid)
{ {
let filter = DialInfoFilter::with_protocol_type_and_address_type(
ProtocolType::TCP,
AddressType::IPV4,
);
let need_tcpv4_dialinfo = routing_table let need_tcpv4_dialinfo = routing_table
.global_dial_info_for_protocol_address_type(ProtocolAddressType::TCPv4) .first_filtered_global_dial_info_details(|d| d.dial_info.matches_filter(&filter))
.is_empty(); .is_none();
if need_tcpv4_dialinfo { if need_tcpv4_dialinfo {
// If we have no public TCPv4 dialinfo, then we need to run a NAT check // If we have no public TCPv4 dialinfo, then we need to run a NAT check
// ensure the singlefuture is running for this // ensure the singlefuture is running for this

View File

@ -9,7 +9,7 @@ use async_std::net::*;
impl Network { impl Network {
// Ask for a public address check from a particular noderef // Ask for a public address check from a particular noderef
async fn request_public_address(&self, node_ref: NodeRef) -> Option<SocketAddr> { async fn request_public_address(&self, node_ref: NodeRef) -> Option<SocketAddress> {
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let rpc = routing_table.rpc_processor(); let rpc = routing_table.rpc_processor();
rpc.rpc_call_info(node_ref.clone()) rpc.rpc_call_info(node_ref.clone())
@ -22,6 +22,7 @@ impl Network {
.unwrap_or(None) .unwrap_or(None)
} }
xxx convert to filter
// find fast peers with a particular address type, and ask them to tell us what our external address is // find fast peers with a particular address type, and ask them to tell us what our external address is
async fn discover_external_address( async fn discover_external_address(
&self, &self,

View File

@ -53,6 +53,25 @@ pub struct ProtocolConfig {
pub wss_listen: bool, pub wss_listen: bool,
} }
impl ProtocolConfig {
pub fn is_protocol_type_connect_enabled(&self, protocol_type: ProtocolType) -> bool {
match protocol_type {
ProtocolType::UDP => self.udp_enabled,
ProtocolType::TCP => self.tcp_connect,
ProtocolType::WS => self.ws_connect,
ProtocolType::WSS => self.wss_connect,
}
}
pub fn is_protocol_type_listen_enabled(&self, protocol_type: ProtocolType) -> bool {
match protocol_type {
ProtocolType::UDP => self.udp_enabled,
ProtocolType::TCP => self.tcp_listen,
ProtocolType::WS => self.ws_listen,
ProtocolType::WSS => self.wss_listen,
}
}
}
// Things we get when we start up and go away when we shut down // Things we get when we start up and go away when we shut down
// Routing table is not in here because we want it to survive a network shutdown/startup restart // Routing table is not in here because we want it to survive a network shutdown/startup restart
#[derive(Clone)] #[derive(Clone)]

View File

@ -13,15 +13,15 @@ impl RoutingTable {
out out
} }
pub fn debug_info_dialinfo(&self) -> String { pub fn debug_info_dialinfo(&self) -> String {
let ldis = self.local_dial_info(); let ldis = self.local_dial_info_details();
let gdis = self.global_dial_info(); let gdis = self.global_dial_info_details();
let mut out = String::new(); let mut out = String::new();
out += "Local Dial Info:\n"; out += "Local Dial Info Details:\n";
for (n, ldi) in ldis.iter().enumerate() { for (n, ldi) in ldis.iter().enumerate() {
out += &format!(" {:>2}: {:?}\n", n, ldi); out += &format!(" {:>2}: {:?}\n", n, ldi);
} }
out += "Global Dial Info:\n"; out += "Global Dial Info Details:\n";
for (n, gdi) in gdis.iter().enumerate() { for (n, gdi) in gdis.iter().enumerate() {
out += &format!(" {:>2}: {:?}\n", n, gdi); out += &format!(" {:>2}: {:?}\n", n, gdi);
} }

View File

@ -10,42 +10,18 @@ pub type FilterType = Box<dyn Fn(&(&DHTKey, Option<&mut BucketEntry>)) -> bool>;
impl RoutingTable { impl RoutingTable {
// Retrieve the fastest nodes in the routing table with a particular kind of protocol address type // Retrieve the fastest nodes in the routing table with a particular kind of protocol address type
// Returns noderefs are are scoped to that address type only // Returns noderefs are are scoped to that address type only
pub fn get_fast_nodes_of_type( pub fn get_fast_nodes_filtered(&self, dial_info_filter: &DialInfoFilter) -> Vec<NodeRef> {
&self, let dial_info_filter = dial_info_filter.clone();
protocol_address_type: ProtocolAddressType,
) -> Vec<NodeRef> {
self.find_fastest_nodes( self.find_fastest_nodes(
// filter // filter
Some(Box::new( Some(Box::new(
move |params: &(&DHTKey, Option<&mut BucketEntry>)| { move |params: &(&DHTKey, Option<&mut BucketEntry>)| {
// Only interested in nodes with node info params
if let Some(node_info) = &params.1.as_ref().unwrap().peer_stats().node_info {
// Will the node validate dial info?
// and does it have a UDPv4, public scope, dial info?
if node_info.will_validate_dial_info
&& params
.1 .1
.as_ref() .as_ref()
.unwrap() .unwrap()
.dial_info_entries_as_ref() .first_filtered_dial_info(|di| di.matches_filter(&dial_info_filter))
.iter()
.find_map(|die| {
if die.matches_peer_scope(PeerScope::Global)
&& die.dial_info().protocol_address_type()
== protocol_address_type
{
Some(())
} else {
None
}
})
.is_some() .is_some()
{
// If so return true and include this node
return true;
}
}
false
}, },
)), )),
// transform // transform
@ -54,7 +30,7 @@ impl RoutingTable {
self.clone(), self.clone(),
*e.0, *e.0,
e.1.as_mut().unwrap(), e.1.as_mut().unwrap(),
protocol_address_type, dial_info_filter.clone(),
) )
}, },
) )
@ -63,13 +39,13 @@ impl RoutingTable {
pub fn get_own_peer_info(&self, scope: PeerScope) -> PeerInfo { pub fn get_own_peer_info(&self, scope: PeerScope) -> PeerInfo {
let dial_infos = match scope { let dial_infos = match scope {
PeerScope::All => { PeerScope::All => {
let mut divec = self.global_dial_info(); let mut divec = self.global_dial_info_details();
divec.append(&mut self.local_dial_info()); divec.append(&mut self.local_dial_info_details());
divec.dedup(); divec.dedup();
divec divec
} }
PeerScope::Global => self.global_dial_info(), PeerScope::Global => self.global_dial_info_details(),
PeerScope::Local => self.local_dial_info(), PeerScope::Local => self.local_dial_info_details(),
}; };
PeerInfo { PeerInfo {

View File

@ -1,7 +1,6 @@
mod bucket; mod bucket;
mod bucket_entry; mod bucket_entry;
mod debug; mod debug;
mod dial_info_entry;
mod find_nodes; mod find_nodes;
mod node_ref; mod node_ref;
mod stats_accounting; mod stats_accounting;
@ -12,12 +11,10 @@ use crate::network_manager::*;
use crate::rpc_processor::*; use crate::rpc_processor::*;
use crate::xx::*; use crate::xx::*;
use crate::*; use crate::*;
use alloc::collections::VecDeque;
use alloc::str::FromStr; use alloc::str::FromStr;
use bucket::*; use bucket::*;
pub use bucket_entry::*; pub use bucket_entry::*;
pub use debug::*; pub use debug::*;
pub use dial_info_entry::*;
pub use find_nodes::*; pub use find_nodes::*;
use futures_util::stream::{FuturesUnordered, StreamExt}; use futures_util::stream::{FuturesUnordered, StreamExt};
pub use node_ref::*; pub use node_ref::*;
@ -157,42 +154,35 @@ impl RoutingTable {
!inner.local_dial_info.is_empty() !inner.local_dial_info.is_empty()
} }
pub fn local_dial_info(&self) -> Vec<DialInfoDetail> { pub fn local_dial_info_details(&self) -> Vec<DialInfoDetail> {
let inner = self.inner.lock(); let inner = self.inner.lock();
inner.local_dial_info.clone() inner.local_dial_info.clone()
} }
pub fn local_dial_info_for_protocol(&self, protocol_type: ProtocolType) -> Vec<DialInfoDetail> { pub fn first_filtered_local_dial_info_details<F>(&self, filter: F) -> Option<DialInfoDetail>
where
F: Fn(&DialInfoDetail) -> bool,
{
let inner = self.inner.lock(); let inner = self.inner.lock();
inner for did in &inner.local_dial_info {
.local_dial_info if filter(did) {
.iter() return Some(did.clone());
.filter_map(|di| { }
if di.dial_info.protocol_type() != protocol_type { }
None None
} else {
Some(di.clone())
} }
}) pub fn all_filtered_local_dial_info_details<F>(&self, filter: F) -> Vec<DialInfoDetail>
.collect() where
} F: Fn(&DialInfoDetail) -> bool,
{
pub fn local_dial_info_for_protocol_address_type(
&self,
protocol_address_type: ProtocolAddressType,
) -> Vec<DialInfoDetail> {
let inner = self.inner.lock(); let inner = self.inner.lock();
inner let ret = Vec::new();
.local_dial_info for did in &inner.local_dial_info {
.iter() if filter(did) {
.filter_map(|di| { ret.push(did.clone());
if di.dial_info.protocol_address_type() != protocol_address_type {
None
} else {
Some(di.clone())
} }
}) }
.collect() ret
} }
pub fn register_local_dial_info(&self, dial_info: DialInfo, origin: DialInfoOrigin) { pub fn register_local_dial_info(&self, dial_info: DialInfo, origin: DialInfoOrigin) {
@ -230,44 +220,35 @@ impl RoutingTable {
!inner.global_dial_info.is_empty() !inner.global_dial_info.is_empty()
} }
pub fn global_dial_info(&self) -> Vec<DialInfoDetail> { pub fn global_dial_info_details(&self) -> Vec<DialInfoDetail> {
let inner = self.inner.lock(); let inner = self.inner.lock();
inner.global_dial_info.clone() inner.global_dial_info.clone()
} }
pub fn global_dial_info_for_protocol( pub fn first_filtered_global_dial_info_details<F>(&self, filter: F) -> Option<DialInfoDetail>
&self, where
protocol_type: ProtocolType, F: Fn(&DialInfoDetail) -> bool,
) -> Vec<DialInfoDetail> { {
let inner = self.inner.lock(); let inner = self.inner.lock();
inner for did in &inner.global_dial_info {
.global_dial_info if filter(did) {
.iter() return Some(did.clone());
.filter_map(|di| { }
if di.dial_info.protocol_type() != protocol_type { }
None None
} else {
Some(di.clone())
} }
}) pub fn all_filtered_global_dial_info_details<F>(&self, filter: F) -> Vec<DialInfoDetail>
.collect() where
} F: Fn(&DialInfoDetail) -> bool,
pub fn global_dial_info_for_protocol_address_type( {
&self,
protocol_address_type: ProtocolAddressType,
) -> Vec<DialInfoDetail> {
let inner = self.inner.lock(); let inner = self.inner.lock();
inner let ret = Vec::new();
.global_dial_info for did in &inner.global_dial_info {
.iter() if filter(did) {
.filter_map(|di| { ret.push(did.clone());
if di.dial_info.protocol_address_type() != protocol_address_type {
None
} else {
Some(di.clone())
} }
}) }
.collect() ret
} }
pub fn register_global_dial_info( pub fn register_global_dial_info(
@ -287,7 +268,7 @@ impl RoutingTable {
}); });
info!( info!(
"Public Dial Info: {}", "Global Dial Info: {}",
NodeDialInfoSingle { NodeDialInfoSingle {
node_id: NodeId::new(inner.node_id), node_id: NodeId::new(inner.node_id),
dial_info dial_info
@ -356,34 +337,6 @@ impl RoutingTable {
*self.inner.lock() = Self::new_inner(self.network_manager()); *self.inner.lock() = Self::new_inner(self.network_manager());
} }
// Just match address and port to help sort dialinfoentries for buckets
// because inbound connections will not have dialinfo associated with them
// but should have ip addresses if they have changed
fn dial_info_peer_address_match(dial_info: &DialInfo, peer_addr: &PeerAddress) -> bool {
match dial_info {
DialInfo::UDP(_) => {
peer_addr.protocol_type == ProtocolType::UDP
&& peer_addr.port == dial_info.port()
&& peer_addr.address.address_string() == dial_info.address_string()
}
DialInfo::TCP(_) => {
peer_addr.protocol_type == ProtocolType::TCP
&& peer_addr.port == dial_info.port()
&& peer_addr.address.address_string() == dial_info.address_string()
}
DialInfo::WS(_) => {
peer_addr.protocol_type == ProtocolType::WS
&& peer_addr.port == dial_info.port()
&& peer_addr.address.address_string() == dial_info.address_string()
}
DialInfo::WSS(_) => {
peer_addr.protocol_type == ProtocolType::WSS
&& peer_addr.port == dial_info.port()
&& peer_addr.address.address_string() == dial_info.address_string()
}
}
}
// Attempt to settle buckets and remove entries down to the desired number // Attempt to settle buckets and remove entries down to the desired number
// which may not be possible due extant NodeRefs // which may not be possible due extant NodeRefs
fn kick_bucket(inner: &mut RoutingTableInner, idx: usize) { fn kick_bucket(inner: &mut RoutingTableInner, idx: usize) {
@ -482,7 +435,7 @@ impl RoutingTable {
) -> Result<NodeRef, String> { ) -> Result<NodeRef, String> {
let nr = self.create_node_ref(node_id)?; let nr = self.create_node_ref(node_id)?;
nr.operate(move |e| -> Result<(), String> { nr.operate(move |e| -> Result<(), String> {
e.update_dial_info(dial_infos); e.update_dial_infos(dial_infos);
Ok(()) Ok(())
})?; })?;
@ -604,6 +557,7 @@ impl RoutingTable {
let mut bsmap: BTreeMap<DHTKey, Vec<DialInfo>> = BTreeMap::new(); let mut bsmap: BTreeMap<DHTKey, Vec<DialInfo>> = BTreeMap::new();
for b in bootstrap { for b in bootstrap {
let ndis = NodeDialInfoSingle::from_str(b.as_str()) let ndis = NodeDialInfoSingle::from_str(b.as_str())
.map_err(map_to_string)
.map_err(logthru_rtab!("Invalid dial info in bootstrap entry: {}", b))?; .map_err(logthru_rtab!("Invalid dial info in bootstrap entry: {}", b))?;
let node_id = ndis.node_id.key; let node_id = ndis.node_id.key;
bsmap bsmap

View File

@ -5,9 +5,7 @@ use alloc::fmt;
pub struct NodeRef { pub struct NodeRef {
routing_table: RoutingTable, routing_table: RoutingTable,
node_id: DHTKey, node_id: DHTKey,
// Filters dial_info_filter: DialInfoFilter,
protocol_type: Option<ProtocolType>,
address_type: Option<AddressType>,
} }
impl NodeRef { impl NodeRef {
@ -16,23 +14,20 @@ impl NodeRef {
Self { Self {
routing_table, routing_table,
node_id: key, node_id: key,
protocol_type: None, dial_info_filter: DialInfoFilter::default(),
address_type: None,
} }
} }
pub fn new_filtered( pub fn new_filtered(
routing_table: RoutingTable, routing_table: RoutingTable,
key: DHTKey, key: DHTKey,
entry: &mut BucketEntry, entry: &mut BucketEntry,
protocol_type: Option<ProtocolType>, dial_info_filter: DialInfoFilter,
address_type: Option<AddressType>,
) -> Self { ) -> Self {
entry.ref_count += 1; entry.ref_count += 1;
Self { Self {
routing_table, routing_table,
node_id: key, node_id: key,
protocol_type, dial_info_filter,
address_type,
} }
} }
@ -40,20 +35,8 @@ impl NodeRef {
self.node_id self.node_id
} }
pub fn protocol_type(&self) -> Option<ProtocolType> { pub fn dial_info_filter(&self) -> DialInfoFilter {
self.protocol_type self.dial_info_filter.clone()
}
pub fn set_protocol_type(&mut self, protocol_type: Option<ProtocolType>) {
self.protocol_type = protocol_type;
}
pub fn address_type(&self) -> Option<AddressType> {
self.address_type
}
pub fn set_address_type(&mut self, address_type: Option<AddressType>) {
self.address_type = address_type;
} }
pub fn operate<T, F>(&self, f: F) -> T pub fn operate<T, F>(&self, f: F) -> T
@ -63,32 +46,43 @@ impl NodeRef {
self.routing_table.operate_on_bucket_entry(self.node_id, f) self.routing_table.operate_on_bucket_entry(self.node_id, f)
} }
xxx fix the notion of 'best dial info' to sort by capability and udp/tcp/ws/wss preference order // Returns the best dial info to attempt a connection to this node
pub fn dial_info(&self) -> Option<DialInfo> { pub fn best_dial_info(&self) -> Option<DialInfo> {
if self.protocol_type || self. { let nm = self.routing_table.network_manager();
None => self.operate(|e| e.best_dial_info()), let protocol_config = nm.get_protocol_config();
Some(pat) => self.operate(|e| { if protocol_config.is_none() {
e.filtered_dial_info(|die| die.dial_info().protocol_address_type() == pat) return None;
}),
} }
let protocol_config = protocol_config.unwrap();
self.operate(|e| {
e.first_filtered_dial_info(|di| {
// Does it match the dial info filter
if !di.matches_filter(&self.dial_info_filter) {
return false;
}
// Filter out dial infos that don't match our protocol config
// for outbound connections. This routine filters on 'connect' settings
// to ensure we connect using only the protocols we have enabled.
protocol_config.is_protocol_type_connect_enabled(di.protocol_type())
})
})
} }
pub fn last_connection(&self) -> Option<ConnectionDescriptor> { pub fn last_connection(&self) -> Option<ConnectionDescriptor> {
match self.operate(|e| e.last_connection()) { match self.operate(|e| e.last_connection()) {
None => None, None => None,
Some(c) => { Some(c) => {
if let Some(protocol_address_type) = self.protocol_address_type { if !c.matches_filter(&self.dial_info_filter) {
if c.remote.protocol_address_type() == protocol_address_type { return None;
Some(c)
} else {
None
} }
} else { // We don't filter this out by protocol config because if a connection
// succeeded, it's allowed to persist and be used for communication
// regardless of any other configuration
Some(c) Some(c)
} }
} }
} }
} }
}
impl Clone for NodeRef { impl Clone for NodeRef {
fn clone(&self) -> Self { fn clone(&self) -> Self {
@ -98,17 +92,18 @@ impl Clone for NodeRef {
Self { Self {
routing_table: self.routing_table.clone(), routing_table: self.routing_table.clone(),
node_id: self.node_id, node_id: self.node_id,
protocol_address_type: self.protocol_address_type, dial_info_filter: self.dial_info_filter.clone(),
} }
} }
} }
impl fmt::Debug for NodeRef { impl fmt::Debug for NodeRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.protocol_address_type { let out = format!("{}", self.node_id.encode());
None => write!(f, "{}", self.node_id.encode()), if !self.dial_info_filter.is_empty() {
Some(pat) => write!(f, "{}#{:?}", self.node_id.encode(), pat), out += &format!("{:?}", self.dial_info_filter);
} }
write!(f, "{}", out)
} }
} }

View File

@ -219,7 +219,7 @@ impl RPCProcessor {
if let Some(nr) = routing_table.lookup_node_ref(node_id) { if let Some(nr) = routing_table.lookup_node_ref(node_id) {
// ensure we have dial_info for the entry already, // ensure we have dial_info for the entry already,
// if not, we should do the find_node anyway // if not, we should do the find_node anyway
if nr.operate(|e| e.best_dial_info().is_some()) { if !nr.operate(|e| e.dial_infos().is_empty()) {
return Ok(nr); return Ok(nr);
} }
} }
@ -712,7 +712,8 @@ impl RPCProcessor {
fn can_validate_dial_info(&self) -> bool { fn can_validate_dial_info(&self) -> bool {
let nman = self.network_manager(); let nman = self.network_manager();
match nman.get_network_class() { if let Some(nc) = nman.get_network_class() {
match nc {
NetworkClass::Server => true, NetworkClass::Server => true,
NetworkClass::Mapped => true, NetworkClass::Mapped => true,
NetworkClass::FullNAT => true, NetworkClass::FullNAT => true,
@ -723,6 +724,9 @@ impl RPCProcessor {
NetworkClass::TorWebApp => false, NetworkClass::TorWebApp => false,
NetworkClass::Invalid => false, NetworkClass::Invalid => false,
} }
} else {
false
}
} }
fn will_validate_dial_info(&self) -> bool { fn will_validate_dial_info(&self) -> bool {
@ -779,7 +783,7 @@ impl RPCProcessor {
.peer_noderef .peer_noderef
.operate(|entry| match entry.last_connection() { .operate(|entry| match entry.last_connection() {
None => None, None => None,
Some(c) => Some(c.remote.to_socket_addr()), Some(c) => Some(c.remote.socket_address),
}); });
SenderInfo { socket_address } SenderInfo { socket_address }
} }
@ -862,16 +866,26 @@ impl RPCProcessor {
// Redirect this request if we are asked to // Redirect this request if we are asked to
if redirect { if redirect {
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let protocol_address_type = dial_info.protocol_address_type(); let filter = dial_info.make_filter(true);
let peers = routing_table.get_fast_nodes_of_type(protocol_address_type); let peers = routing_table.get_fast_nodes_filtered(&filter);
if peers.is_empty() { if peers.is_empty() {
return Err(rpc_error_internal(format!( return Err(rpc_error_internal(format!(
"no peers of type '{:?}'", "no peers matching filter '{:?}'",
protocol_address_type filter
))); )));
} }
for peer in peers { for peer in peers {
// See if this peer will validate dial info
if !peer.operate(|e| {
if let Some(ni) = &e.peer_stats().node_info {
ni.will_validate_dial_info
} else {
true
}
}) {
continue;
}
// Make a copy of the request, without the redirect flag // Make a copy of the request, without the redirect flag
let vdi_msg_reader = { let vdi_msg_reader = {
let mut vdi_msg = ::capnp::message::Builder::new_default(); let mut vdi_msg = ::capnp::message::Builder::new_default();

View File

@ -311,24 +311,71 @@ impl SocketAddress {
impl fmt::Display for SocketAddress { impl fmt::Display for SocketAddress {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(f, "{}:{}", self.to_ip_addr(), self.port) write!(f, "{}", self.to_socket_addr())
} }
} }
impl FromStr for SocketAddress { impl FromStr for SocketAddress {
type Err = VeilidAPIError; type Err = VeilidAPIError;
fn from_str(s: &str) -> Result<SocketAddress, VeilidAPIError> { fn from_str(s: &str) -> Result<SocketAddress, VeilidAPIError> {
let split = s.rsplit_once(':').ok_or_else(|| { let sa = SocketAddr::from_str(s)
parse_error!("SocketAddress::from_str missing colon port separator", s) .map_err(|e| parse_error!("Failed to parse SocketAddress", e))?;
})?; Ok(SocketAddress::from_socket_addr(sa))
let address = Address::from_str(split.0)?; }
let port = u16::from_str(split.1).map_err(|e| { }
parse_error!(
format!("SocketAddress::from_str failed parting port: {}", e), //////////////////////////////////////////////////////////////////
s
) #[derive(Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
})?; pub struct DialInfoFilter {
Ok(SocketAddress { address, port }) pub peer_scope: PeerScope,
pub protocol_type: Option<ProtocolType>,
pub address_type: Option<AddressType>,
}
impl DialInfoFilter {
pub fn new_empty() -> Self {
Self {
peer_scope: PeerScope::All,
protocol_type: None,
address_type: None,
}
}
pub fn with_protocol_type(protocol_type: ProtocolType) -> Self {
Self {
peer_scope: PeerScope::All,
protocol_type: Some(protocol_type),
address_type: None,
}
}
pub fn with_protocol_type_and_address_type(
protocol_type: ProtocolType,
address_type: AddressType,
) -> Self {
Self {
peer_scope: PeerScope::All,
protocol_type: Some(protocol_type),
address_type: Some(address_type),
}
}
pub fn is_empty(&self) -> bool {
self.peer_scope == PeerScope::All
&& self.protocol_type.is_none()
&& self.address_type.is_none()
}
}
impl fmt::Debug for DialInfoFilter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
let mut out = String::new();
out += &format!("{:?}", self.peer_scope);
if let Some(pt) = self.protocol_type {
out += &format!("+{:?}", pt);
}
if let Some(at) = self.address_type {
out += &format!("+{:?}", at);
}
write!(f, "[{}]", out)
} }
} }
@ -552,16 +599,54 @@ impl DialInfo {
PeerScope::Local => self.is_local(), PeerScope::Local => self.is_local(),
} }
} }
pub fn matches_filter(&self, filter: &DialInfoFilter) -> bool {
if !self.matches_peer_scope(filter.peer_scope) {
return false;
}
if let Some(pt) = filter.protocol_type {
if self.protocol_type() != pt {
return false;
}
}
if let Some(at) = filter.address_type {
if self.address_type() != at {
return false;
}
}
true
}
pub fn make_filter(&self, scoped: bool) -> DialInfoFilter {
DialInfoFilter {
peer_scope: if scoped {
if self.is_global() {
PeerScope::Global
} else if self.is_local() {
PeerScope::Local
} else {
PeerScope::All
}
} else {
PeerScope::All
},
protocol_type: Some(self.protocol_type()),
address_type: Some(self.address_type()),
}
}
} }
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
pub enum PeerScope { pub enum PeerScope {
All, All,
Global, Global,
Local, Local,
} }
impl Default for PeerScope {
fn default() -> Self {
PeerScope::All
}
}
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
pub struct PeerInfo { pub struct PeerInfo {
@ -617,6 +702,29 @@ impl ConnectionDescriptor {
pub fn address_type(&self) -> AddressType { pub fn address_type(&self) -> AddressType {
self.remote.address_type() self.remote.address_type()
} }
pub fn matches_peer_scope(&self, scope: PeerScope) -> bool {
match scope {
PeerScope::All => true,
PeerScope::Global => self.remote.socket_address.address().is_global(),
PeerScope::Local => self.remote.socket_address.address().is_local(),
}
}
pub fn matches_filter(&self, filter: &DialInfoFilter) -> bool {
if !self.matches_peer_scope(filter.peer_scope) {
return false;
}
if let Some(pt) = filter.protocol_type {
if self.protocol_type() != pt {
return false;
}
}
if let Some(at) = filter.address_type {
if self.address_type() != at {
return false;
}
}
true
}
} }
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////