refactor checkpoint

This commit is contained in:
John Smith
2022-09-04 14:17:28 -04:00
parent e0a5b1bd69
commit 79cda4a712
25 changed files with 531 additions and 398 deletions

View File

@@ -23,7 +23,7 @@ use connection_handle::*;
use connection_limits::*;
use connection_manager::*;
use dht::*;
use futures_util::stream::{FuturesUnordered, StreamExt};
use futures_util::stream::{FuturesOrdered, FuturesUnordered, StreamExt};
use hashlink::LruCache;
use intf::*;
#[cfg(not(target_arch = "wasm32"))]
@@ -388,22 +388,22 @@ impl NetworkManager {
.unwrap();
inner.public_inbound_dial_info_filter = Some(
DialInfoFilter::global()
DialInfoFilter::all()
.with_protocol_type_set(pc.inbound)
.with_address_type_set(pc.family_global),
);
inner.local_inbound_dial_info_filter = Some(
DialInfoFilter::local()
DialInfoFilter::all()
.with_protocol_type_set(pc.inbound)
.with_address_type_set(pc.family_local),
);
inner.public_outbound_dial_info_filter = Some(
DialInfoFilter::global()
DialInfoFilter::all()
.with_protocol_type_set(pc.outbound)
.with_address_type_set(pc.family_global),
);
inner.local_outbound_dial_info_filter = Some(
DialInfoFilter::local()
DialInfoFilter::all()
.with_protocol_type_set(pc.outbound)
.with_address_type_set(pc.family_local),
);
@@ -557,7 +557,7 @@ impl NetworkManager {
// See how many live PublicInternet entries we have
let live_public_internet_entry_count = routing_table.get_entry_count(
RoutingTableSet::only(RoutingTable::PublicInternet),
RoutingDomain::PublicInternet.into(),
BucketEntryState::Unreliable,
);
let min_peer_count = {
@@ -816,7 +816,7 @@ impl NetworkManager {
};
// Make a reverse connection to the peer and send the receipt to it
rpc.rpc_call_return_receipt(Destination::Direct(peer_nr), None, receipt)
rpc.rpc_call_return_receipt(Destination::direct(peer_nr), receipt)
.await
.wrap_err("rpc failure")
}
@@ -841,12 +841,12 @@ impl NetworkManager {
};
// Get the udp direct dialinfo for the hole punch
let outbound_dif = self
.get_outbound_dial_info_filter(RoutingDomain::PublicInternet)
let outbound_nrf = self
.get_outbound_node_ref_filter(RoutingDomain::PublicInternet)
.with_protocol_type(ProtocolType::UDP);
peer_nr.set_filter(Some(outbound_dif));
peer_nr.set_filter(Some(outbound_nrf));
let hole_punch_dial_info_detail = peer_nr
.first_filtered_dial_info_detail(Some(RoutingDomain::PublicInternet))
.first_filtered_dial_info_detail()
.ok_or_else(|| eyre!("No hole punch capable dialinfo found for node"))?;
// Now that we picked a specific dialinfo, further restrict the noderef to the specific address type
@@ -872,7 +872,7 @@ impl NetworkManager {
peer_nr.set_last_connection(connection_descriptor, intf::get_timestamp());
// Return the receipt using the same dial info send the receipt to it
rpc.rpc_call_return_receipt(Destination::Direct(peer_nr), None, receipt)
rpc.rpc_call_return_receipt(Destination::direct(peer_nr), receipt)
.await
.wrap_err("rpc failure")
}
@@ -1014,7 +1014,7 @@ impl NetworkManager {
.node_info_outbound_filter(RoutingDomain::PublicInternet),
);
if let Some(reverse_did) = routing_table.first_filtered_dial_info_detail(
RoutingDomainSet::only(RoutingDomain::PublicInternet),
RoutingDomain::PublicInternet.into(),
&reverse_dif,
) {
// Ensure we aren't on the same public IP address (no hairpin nat)
@@ -1048,11 +1048,11 @@ impl NetworkManager {
.node_info_outbound_filter(RoutingDomain::PublicInternet),
)
.filtered(
DialInfoFilter::global().with_protocol_type(ProtocolType::UDP),
&DialInfoFilter::all().with_protocol_type(ProtocolType::UDP),
);
if let Some(self_udp_dialinfo_detail) = routing_table
.first_filtered_dial_info_detail(
RoutingDomainSet::only(RoutingDomain::PublicInternet),
RoutingDomain::PublicInternet.into(),
&inbound_udp_dif,
)
{
@@ -1102,8 +1102,8 @@ impl NetworkManager {
#[instrument(level = "trace", skip(self), ret)]
fn get_contact_method_local(&self, target_node_ref: NodeRef) -> ContactMethod {
// Scope noderef down to protocols we can do outbound
let local_outbound_dif = self.get_outbound_dial_info_filter(RoutingDomain::LocalNetwork);
let target_node_ref = target_node_ref.filtered_clone(NodeRefFilter::local_outbound_dif);
let local_outbound_nrf = self.get_outbound_node_ref_filter(RoutingDomain::LocalNetwork);
let target_node_ref = target_node_ref.filtered_clone(local_outbound_nrf);
// Get the best matching local direct dial info if we have it
if target_node_ref.is_filter_dead() {
@@ -1157,8 +1157,7 @@ impl NetworkManager {
let rpc = self.rpc_processor();
network_result_try!(rpc
.rpc_call_signal(
Destination::Relay(relay_nr.clone(), target_nr.node_id()),
None,
Destination::relay(relay_nr, target_nr.node_id()),
SignalInfo::ReverseConnect { receipt, peer_info },
)
.await
@@ -1214,9 +1213,16 @@ impl NetworkManager {
data: Vec<u8>,
) -> EyreResult<NetworkResult<ConnectionDescriptor>> {
// Ensure we are filtered down to UDP (the only hole punch protocol supported today)
// and only in the PublicInternet routing domain
assert!(target_nr
.filter_ref()
.map(|dif| dif.protocol_type_set == ProtocolTypeSet::only(ProtocolType::UDP))
.map(|nrf| nrf.dial_info_filter.protocol_type_set
== ProtocolTypeSet::only(ProtocolType::UDP))
.unwrap_or_default());
assert!(target_nr
.filter_ref()
.map(|nrf| nrf.routing_domain_set
== RoutingDomainSet::only(RoutingDomain::PublicInternet))
.unwrap_or_default());
// Build a return receipt for the signal
@@ -1229,7 +1235,7 @@ impl NetworkManager {
// Get the udp direct dialinfo for the hole punch
let hole_punch_did = target_nr
.first_filtered_dial_info_detail(Some(RoutingDomain::PublicInternet))
.first_filtered_dial_info_detail()
.ok_or_else(|| eyre!("No hole punch capable dialinfo found for node"))?;
// Do our half of the hole punch by sending an empty packet
@@ -1246,8 +1252,7 @@ impl NetworkManager {
let rpc = self.rpc_processor();
network_result_try!(rpc
.rpc_call_signal(
Destination::Relay(relay_nr.clone(), target_nr.node_id()),
None,
Destination::relay(relay_nr, target_nr.node_id()),
SignalInfo::HolePunch { receipt, peer_info },
)
.await
@@ -1394,7 +1399,7 @@ impl NetworkManager {
// Serialize out peer info
let bootstrap_peerinfo: Vec<PeerInfo> = bootstrap_nodes
.iter()
.filter_map(|b| b.peer_info(RoutingDomain::PublicInternet))
.filter_map(|nr| nr.make_peer_info(RoutingDomain::PublicInternet))
.collect();
let json_bytes = serialize_json(bootstrap_peerinfo).as_bytes().to_vec();
@@ -1484,7 +1489,7 @@ impl NetworkManager {
// Get the routing domain for this data
let routing_domain = match self
.routing_table()
.routing_domain_for_address(connection_descriptor.remote().address())
.routing_domain_for_address(connection_descriptor.remote_address().address())
{
Some(rd) => rd,
None => {
@@ -1778,7 +1783,7 @@ impl NetworkManager {
// Get current external ip/port from registered global dialinfo
let current_addresses: BTreeSet<SocketAddress> = routing_table
.all_filtered_dial_info_details(
Some(RoutingDomain::PublicInternet),
RoutingDomain::PublicInternet.into(),
&dial_info_filter,
)
.iter()
@@ -1896,12 +1901,8 @@ impl NetworkManager {
.unlocked_inner
.node_info_update_single_future
.single_spawn(async move {
// Only update if we actually have a valid network class
if matches!(
this.get_network_class(routing_domain)
.unwrap_or(NetworkClass::Invalid),
NetworkClass::Invalid
) {
// Only update if we actually have valid signed node info for this routing domain
if !this.routing_table().has_valid_own_node_info(routing_domain) {
trace!(
"not sending node info update because our network class is not yet valid"
);
@@ -1922,7 +1923,7 @@ impl NetworkManager {
unord.push(async move {
// Update the node
if let Err(e) = rpc
.rpc_call_node_info_update(nr.clone, routing_domain)
.rpc_call_node_info_update(nr.clone(), routing_domain)
.await
{
// Not fatal, but we should be able to see if this is happening

View File

@@ -614,7 +614,7 @@ impl Network {
return;
}
// Add network to local networks table
for addr in intf.addrs {
for addr in &intf.addrs {
let netmask = addr.if_addr().netmask();
let network_ip = ipaddr_apply_netmask(addr.if_addr().ip(), netmask);
local_networks.insert((network_ip, netmask));

View File

@@ -132,8 +132,10 @@ impl DiscoveryContext {
false
}
};
let filter =
RoutingTable::combine_filters(inbound_dial_info_entry_filter, disallow_relays_filter);
let filter = RoutingTable::combine_entry_filters(
inbound_dial_info_entry_filter,
disallow_relays_filter,
);
// Find public nodes matching this filter
let peers = self
@@ -155,7 +157,11 @@ impl DiscoveryContext {
continue;
}
}
peer.set_filter(Some(dial_info_filter.clone()));
peer.set_filter(Some(
NodeRefFilter::new()
.with_routing_domain(RoutingDomain::PublicInternet)
.with_dial_info_filter(dial_info_filter.clone()),
));
if let Some(sa) = self.request_public_address(peer.clone()).await {
return Some((sa, peer));
}
@@ -764,7 +770,7 @@ impl Network {
// Get existing public dial info
let existing_public_dial_info: HashSet<DialInfoDetail> = routing_table
.all_filtered_dial_info_details(
Some(RoutingDomain::PublicInternet),
RoutingDomain::PublicInternet.into(),
&DialInfoFilter::all(),
)
.into_iter()

View File

@@ -287,14 +287,11 @@ impl Network {
// Register local dial info
for di in &local_dial_info_list {
xxx write routing table sieve for routing domain from dialinfo and local network detection and registration
// If the local interface address is global, or we are enabling local peer scope
// register global dial info if no public address is specified
// If the local interface address is global, then register global dial info
// if no other public address is specified
if !detect_address_changes
&& public_address.is_none()
&& (di.is_global() || enable_local_peer_scope)
&& routing_table.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, &di)
{
routing_table.register_dial_info(
RoutingDomain::PublicInternet,
@@ -455,7 +452,7 @@ xxx write routing table sieve for routing domain from dialinfo and local network
if !detect_address_changes
&& url.is_none()
&& (socket_address.address().is_global() || enable_local_peer_scope)
&& routing_table.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, &local_di)
{
// Register public dial info
routing_table.register_dial_info(
@@ -625,7 +622,7 @@ xxx write routing table sieve for routing domain from dialinfo and local network
// Register global dial info if no public address is specified
if !detect_address_changes
&& public_address.is_none()
&& (di.is_global() || enable_local_peer_scope)
&& routing_table.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, &di)
{
routing_table.register_dial_info(
RoutingDomain::PublicInternet,

View File

@@ -197,11 +197,7 @@ impl NetworkManager {
let routing_table = routing_table.clone();
unord.push(
// lets ask bootstrap to find ourselves now
async move {
routing_table
.reverse_find_node(RoutingDomain::PublicInternet, nr, true)
.await
},
async move { routing_table.reverse_find_node(nr, true).await },
);
}
}
@@ -303,12 +299,10 @@ impl NetworkManager {
unord.push(async move {
// Need VALID signed peer info, so ask bootstrap to find_node of itself
// which will ensure it has the bootstrap's signed peer info as part of the response
let _ = routing_table
.find_target(RoutingDomain::PublicInternet, nr.clone())
.await;
let _ = routing_table.find_target(nr.clone()).await;
// Ensure we got the signed peer info
if !nr.has_valid_signed_node_info(Some(RoutingDomain::PublicInternet)) {
if !nr.signed_node_info_has_valid_signature(RoutingDomain::PublicInternet) {
log_net!(warn
"bootstrap at {:?} did not return valid signed node info",
nr
@@ -316,9 +310,7 @@ impl NetworkManager {
// If this node info is invalid, it will time out after being unpingable
} else {
// otherwise this bootstrap is valid, lets ask it to find ourselves now
routing_table
.reverse_find_node(RoutingDomain::PublicInternet, nr, true)
.await
routing_table.reverse_find_node(nr, true).await
}
});
}
@@ -335,7 +327,9 @@ impl NetworkManager {
fn ping_validator_public_internet(
&self,
cur_ts: u64,
unord: &mut FuturesUnordered,
unord: &mut FuturesUnordered<
SendPinBoxFuture<Result<NetworkResult<Answer<SenderInfo>>, RPCError>>,
>,
) -> EyreResult<()> {
let rpc = self.rpc_processor();
let routing_table = self.routing_table();
@@ -352,7 +346,7 @@ impl NetworkManager {
// Get our publicinternet dial info
let dids = routing_table.all_filtered_dial_info_details(
RoutingDomainSet::only(RoutingDomain::PublicInternet),
RoutingDomain::PublicInternet.into(),
&DialInfoFilter::all(),
);
@@ -381,14 +375,10 @@ impl NetworkManager {
if needs_ping {
let rpc = rpc.clone();
let dif = did.dial_info.make_filter();
let nr_filtered = nr.filtered_clone(dif);
let nr_filtered =
nr.filtered_clone(NodeRefFilter::new().with_dial_info_filter(dif));
log_net!("--> Keepalive ping to {:?}", nr_filtered);
unord.push(
async move {
rpc.rpc_call_status(Some(routing_domain), nr_filtered).await
}
.boxed(),
);
unord.push(async move { rpc.rpc_call_status(nr_filtered).await }.boxed());
did_pings = true;
}
}
@@ -398,9 +388,7 @@ impl NetworkManager {
// any mapped ports to preserve
if !did_pings {
let rpc = rpc.clone();
unord.push(
async move { rpc.rpc_call_status(Some(routing_domain), nr).await }.boxed(),
);
unord.push(async move { rpc.rpc_call_status(nr).await }.boxed());
}
}
@@ -413,7 +401,9 @@ impl NetworkManager {
fn ping_validator_local_network(
&self,
cur_ts: u64,
unord: &mut FuturesUnordered,
unord: &mut FuturesUnordered<
SendPinBoxFuture<Result<NetworkResult<Answer<SenderInfo>>, RPCError>>,
>,
) -> EyreResult<()> {
let rpc = self.rpc_processor();
let routing_table = self.routing_table();
@@ -423,7 +413,7 @@ impl NetworkManager {
// Get our LocalNetwork dial info
let dids = routing_table.all_filtered_dial_info_details(
RoutingDomainSet::only(RoutingDomain::LocalNetwork),
RoutingDomain::LocalNetwork.into(),
&DialInfoFilter::all(),
);
@@ -432,7 +422,7 @@ impl NetworkManager {
let rpc = rpc.clone();
// Just do a single ping with the best protocol for all the nodes
unord.push(async move { rpc.rpc_call_status(Some(routing_domain), nr).await }.boxed());
unord.push(async move { rpc.rpc_call_status(nr).await }.boxed());
}
Ok(())
@@ -476,7 +466,7 @@ impl NetworkManager {
stop_token: StopToken,
) -> EyreResult<()> {
let routing_table = self.routing_table();
let mut unord = FuturesOrdered::new();
let mut ord = FuturesOrdered::new();
let min_peer_count = {
let c = self.config.get();
c.network.dht.min_peer_count as usize
@@ -486,18 +476,18 @@ impl NetworkManager {
// even the unreliable ones, and ask them to find nodes close to our node too
let noderefs = routing_table.find_fastest_nodes(
min_peer_count,
None,
|_k, _v| true,
|k: DHTKey, v: Option<Arc<BucketEntry>>| {
NodeRef::new(self.clone(), k, v.unwrap().clone(), None)
NodeRef::new(routing_table.clone(), k, v.unwrap().clone(), None)
},
);
for nr in noderefs {
let routing_table = routing_table.clone();
unord.push(async move { routing_table.reverse_find_node(nr, false).await });
ord.push_back(async move { routing_table.reverse_find_node(nr, false).await });
}
// do peer minimum search in order from fastest to slowest
while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {}
while let Ok(Some(_)) = ord.next().timeout_at(stop_token.clone()).await {}
Ok(())
}