more parallel

This commit is contained in:
Christien Rioux 2023-09-05 19:27:16 -04:00
parent dfcdcf2364
commit b325c82b9a

View File

@ -378,26 +378,30 @@ impl DiscoveryContext {
async fn protocol_process_no_nat( async fn protocol_process_no_nat(
&self, &self,
unord: &mut FuturesUnordered<SendPinBoxFuture<Option<DetectedDialInfo>>>, unord: &mut FuturesUnordered<SendPinBoxFuture<Option<DetectedDialInfo>>>,
) -> DetectedDialInfo { ) {
let external_1 = self.inner.lock().external_1.as_ref().unwrap().clone(); let external_1 = self.inner.lock().external_1.as_ref().unwrap().clone();
// Do a validate_dial_info on the external address from a redirected node let this = self.clone();
if self let do_no_nat_fut: SendPinBoxFuture<Option<DetectedDialInfo>> = Box::pin(async move {
.validate_dial_info(external_1.node.clone(), external_1.dial_info.clone(), true) // Do a validate_dial_info on the external address from a redirected node
.await if this
{ .validate_dial_info(external_1.node.clone(), external_1.dial_info.clone(), true)
// Add public dial info with Direct dialinfo class .await
DetectedDialInfo::Detected(DialInfoDetail { {
dial_info: external_1.dial_info.clone(), // Add public dial info with Direct dialinfo class
class: DialInfoClass::Direct, Some(DetectedDialInfo::Detected(DialInfoDetail {
}) dial_info: external_1.dial_info.clone(),
} else { class: DialInfoClass::Direct,
// Add public dial info with Blocked dialinfo class }))
DetectedDialInfo::Detected(DialInfoDetail { } else {
dial_info: external_1.dial_info.clone(), // Add public dial info with Blocked dialinfo class
class: DialInfoClass::Blocked, Some(DetectedDialInfo::Detected(DialInfoDetail {
}) dial_info: external_1.dial_info.clone(),
} class: DialInfoClass::Blocked,
}))
}
});
unord.push(do_no_nat_fut);
} }
// If we know we are behind NAT check what kind // If we know we are behind NAT check what kind
@ -405,20 +409,24 @@ impl DiscoveryContext {
async fn protocol_process_nat( async fn protocol_process_nat(
&self, &self,
unord: &mut FuturesUnordered<SendPinBoxFuture<Option<DetectedDialInfo>>>, unord: &mut FuturesUnordered<SendPinBoxFuture<Option<DetectedDialInfo>>>,
) -> Option<DetectedDialInfo> { ) {
// Get the external dial info for our use here // Get the external dial info for our use here
let external_1 = self.inner.lock().external_1.as_ref().unwrap().clone(); let (external_1, external_2) = {
let external_2 = self.inner.lock().external_2.as_ref().unwrap().clone(); let inner = self.inner.lock();
(
inner.external_1.as_ref().unwrap().clone(),
inner.external_2.as_ref().unwrap().clone(),
)
};
// If we have two different external addresses, then this is a symmetric NAT // If we have two different external addresses, then this is a symmetric NAT
if external_2.address != external_1.address { if external_2.address != external_1.address {
// No more retries let do_symmetric_nat_fut: SendPinBoxFuture<Option<DetectedDialInfo>> =
return Some(DetectedDialInfo::SymmetricNAT); Box::pin(async move { Some(DetectedDialInfo::SymmetricNAT) });
unord.push(do_symmetric_nat_fut);
return;
} }
// Do these detections in parallel, but with ordering preference
let mut ord = FuturesOrdered::new();
// Manual Mapping Detection // Manual Mapping Detection
/////////// ///////////
let this = self.clone(); let this = self.clone();
@ -454,65 +462,111 @@ impl DiscoveryContext {
None None
}); });
ord.push_back(do_manual_map_fut); unord.push(do_manual_map_fut);
} }
} }
// NAT Detection
///////////
// Full Cone NAT Detection // Full Cone NAT Detection
/////////// ///////////
let this = self.clone(); let this = self.clone();
let c_external_1 = external_1.clone(); let do_nat_detect_fut: SendPinBoxFuture<Option<DetectedDialInfo>> = Box::pin(async move {
let do_full_cone_fut: SendPinBoxFuture<Option<DetectedDialInfo>> = Box::pin(async move { let mut retry_count = {
// Let's see what kind of NAT we have let c = this.unlocked_inner.net.config.get();
// Does a redirected dial info validation from a different address and a random port find us? c.network.restricted_nat_retries
if this };
.validate_dial_info(
c_external_1.node.clone(),
c_external_1.dial_info.clone(),
true,
)
.await
{
// Yes, another machine can use the dial info directly, so Full Cone
// Add public dial info with full cone NAT network class
return Some(DetectedDialInfo::Detected(DialInfoDetail { // Loop for restricted NAT retries
dial_info: c_external_1.dial_info, loop {
class: DialInfoClass::FullConeNAT, let mut ord = FuturesOrdered::new();
}));
let c_this = this.clone();
let c_external_1 = external_1.clone();
let do_full_cone_fut: SendPinBoxFuture<Option<DetectedDialInfo>> =
Box::pin(async move {
// Let's see what kind of NAT we have
// Does a redirected dial info validation from a different address and a random port find us?
if c_this
.validate_dial_info(
c_external_1.node.clone(),
c_external_1.dial_info.clone(),
true,
)
.await
{
// Yes, another machine can use the dial info directly, so Full Cone
// Add public dial info with full cone NAT network class
return Some(DetectedDialInfo::Detected(DialInfoDetail {
dial_info: c_external_1.dial_info,
class: DialInfoClass::FullConeNAT,
}));
}
None
});
ord.push_back(do_full_cone_fut);
let c_this = this.clone();
let c_external_1 = external_1.clone();
let c_external_2 = external_2.clone();
let do_restricted_cone_fut: SendPinBoxFuture<Option<DetectedDialInfo>> =
Box::pin(async move {
// We are restricted, determine what kind of restriction
// If we're going to end up as a restricted NAT of some sort
// Address is the same, so it's address or port restricted
// Do a validate_dial_info on the external address from a random port
if c_this
.validate_dial_info(
c_external_2.node.clone(),
c_external_1.dial_info.clone(),
false,
)
.await
{
// Got a reply from a non-default port, which means we're only address restricted
return Some(DetectedDialInfo::Detected(DialInfoDetail {
dial_info: c_external_1.dial_info.clone(),
class: DialInfoClass::AddressRestrictedNAT,
}));
}
// Didn't get a reply from a non-default port, which means we are also port restricted
Some(DetectedDialInfo::Detected(DialInfoDetail {
dial_info: c_external_1.dial_info.clone(),
class: DialInfoClass::PortRestrictedNAT,
}))
});
ord.push_back(do_restricted_cone_fut);
// Return the first result we get
let mut some_ddi = None;
while let Some(res) = ord.next().await {
if let Some(ddi) = res {
some_ddi = Some(ddi);
break;
}
}
if let Some(ddi) = some_ddi {
if let DetectedDialInfo::Detected(did) = &ddi {
// If we got something better than restricted NAT or we're done retrying
if did.class < DialInfoClass::AddressRestrictedNAT || retry_count == 0 {
return Some(ddi);
}
}
}
if retry_count == 0 {
break;
}
retry_count -= 1;
} }
None None
}); });
ord.push_back(do_full_cone_fut); unord.push(do_nat_detect_fut);
// Run detections in parallel and take the first one, ordered by preference, that returns a result
while let Some(res) = ord.next().await {
if let Some(ddi) = res {
return Some(ddi);
}
}
// We are restricted, determine what kind of restriction
// If we're going to end up as a restricted NAT of some sort
// Address is the same, so it's address or port restricted
// Do a validate_dial_info on the external address from a random port
if self
.validate_dial_info(external_2.node.clone(), external_1.dial_info.clone(), false)
.await
{
// Got a reply from a non-default port, which means we're only address restricted
return Some(DetectedDialInfo::Detected(DialInfoDetail {
dial_info: external_1.dial_info.clone(),
class: DialInfoClass::AddressRestrictedNAT,
}));
}
// Didn't get a reply from a non-default port, which means we are also port restricted
Some(DetectedDialInfo::Detected(DialInfoDetail {
dial_info: external_1.dial_info.clone(),
class: DialInfoClass::PortRestrictedNAT,
}))
} }
/// Add discovery futures to an unordered set that may detect dialinfo when they complete /// Add discovery futures to an unordered set that may detect dialinfo when they complete
@ -520,9 +574,9 @@ impl DiscoveryContext {
&self, &self,
unord: &mut FuturesUnordered<SendPinBoxFuture<Option<DetectedDialInfo>>>, unord: &mut FuturesUnordered<SendPinBoxFuture<Option<DetectedDialInfo>>>,
) { ) {
let (mut retry_count, enable_upnp) = { let enable_upnp = {
let c = self.unlocked_inner.net.config.get(); let c = self.unlocked_inner.net.config.get();
(c.network.restricted_nat_retries, c.network.upnp) c.network.upnp
}; };
// Do this right away because it's fast and every detection is going to need it // Do this right away because it's fast and every detection is going to need it
@ -562,28 +616,6 @@ impl DiscoveryContext {
self.protocol_process_no_nat(unord).await; self.protocol_process_no_nat(unord).await;
} else { } else {
self.protocol_process_nat(unord).await; self.protocol_process_nat(unord).await;
// // Loop for restricted NAT retries
// let this = self.clone();
// let do_nat_fut: SendPinBoxFuture<Option<DetectedDialInfo>> = Box::pin(async move {
// loop {
// // There is -some NAT-
// if let Some(ddi) = this.protocol_process_nat().await {
// if let DetectedDialInfo::Detected(did) = &ddi {
// // If we got something better than restricted NAT or we're done retrying
// if did.class < DialInfoClass::AddressRestrictedNAT || retry_count == 0 {
// return Some(ddi);
// }
// }
// }
// if retry_count == 0 {
// break;
// }
// retry_count -= 1;
// }
// None
// });
// unord.push(do_nat_fut);
} }
} }
} }