Merge branch 'veilidchat-work' into 'main'

Veilidchat Work

See merge request veilid/veilid!211
This commit is contained in:
Christien Rioux 2023-09-29 18:04:44 +00:00
commit 7bb4d50972
17 changed files with 288 additions and 147 deletions

View File

@ -132,7 +132,7 @@ impl ConnectionTable {
false false
} }
#[instrument(level = "trace", skip(self), ret, err)] #[instrument(level = "trace", skip(self), ret)]
pub fn add_connection( pub fn add_connection(
&self, &self,
network_connection: NetworkConnection, network_connection: NetworkConnection,

View File

@ -65,12 +65,14 @@ pub const PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US: TimestampDuration
pub const ADDRESS_FILTER_TASK_INTERVAL_SECS: u32 = 60; pub const ADDRESS_FILTER_TASK_INTERVAL_SECS: u32 = 60;
pub const BOOT_MAGIC: &[u8; 4] = b"BOOT"; pub const BOOT_MAGIC: &[u8; 4] = b"BOOT";
#[derive(Copy, Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
pub struct ProtocolConfig { pub struct ProtocolConfig {
pub outbound: ProtocolTypeSet, pub outbound: ProtocolTypeSet,
pub inbound: ProtocolTypeSet, pub inbound: ProtocolTypeSet,
pub family_global: AddressTypeSet, pub family_global: AddressTypeSet,
pub family_local: AddressTypeSet, pub family_local: AddressTypeSet,
pub public_internet_capabilities: Vec<FourCC>,
pub local_network_capabilities: Vec<FourCC>,
} }
// Things we get when we start up and go away when we shut down // Things we get when we start up and go away when we shut down

View File

@ -14,6 +14,13 @@ pub enum DetectedDialInfo {
Detected(DialInfoDetail), Detected(DialInfoDetail),
} }
// Detection result of external address
#[derive(Clone, Debug)]
pub struct DetectionResult {
pub ddi: DetectedDialInfo,
pub external_address_types: AddressTypeSet,
}
// Result of checking external address // Result of checking external address
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct ExternalInfo { struct ExternalInfo {
@ -244,7 +251,9 @@ impl DiscoveryContext {
} }
} }
if external_address_infos.len() < 2 { if external_address_infos.len() < 2 {
log_net!(debug "not enough peers responded with an external address"); log_net!(debug "not enough peers responded with an external address for type {:?}:{:?}",
protocol_type,
address_type);
return false; return false;
} }
@ -378,28 +387,34 @@ impl DiscoveryContext {
#[instrument(level = "trace", skip(self), ret)] #[instrument(level = "trace", skip(self), ret)]
async fn protocol_process_no_nat( async fn protocol_process_no_nat(
&self, &self,
unord: &mut FuturesUnordered<SendPinBoxFuture<Option<DetectedDialInfo>>>, unord: &mut FuturesUnordered<SendPinBoxFuture<Option<DetectionResult>>>,
) { ) {
let external_1 = self.inner.lock().external_1.as_ref().unwrap().clone(); let external_1 = self.inner.lock().external_1.as_ref().unwrap().clone();
let this = self.clone(); let this = self.clone();
let do_no_nat_fut: SendPinBoxFuture<Option<DetectedDialInfo>> = Box::pin(async move { let do_no_nat_fut: SendPinBoxFuture<Option<DetectionResult>> = Box::pin(async move {
// Do a validate_dial_info on the external address from a redirected node // Do a validate_dial_info on the external address from a redirected node
if this if this
.validate_dial_info(external_1.node.clone(), external_1.dial_info.clone(), true) .validate_dial_info(external_1.node.clone(), external_1.dial_info.clone(), true)
.await .await
{ {
// Add public dial info with Direct dialinfo class // Add public dial info with Direct dialinfo class
Some(DetectedDialInfo::Detected(DialInfoDetail { Some(DetectionResult {
ddi: DetectedDialInfo::Detected(DialInfoDetail {
dial_info: external_1.dial_info.clone(), dial_info: external_1.dial_info.clone(),
class: DialInfoClass::Direct, class: DialInfoClass::Direct,
})) }),
external_address_types: AddressTypeSet::only(external_1.address.address_type()),
})
} else { } else {
// Add public dial info with Blocked dialinfo class // Add public dial info with Blocked dialinfo class
Some(DetectedDialInfo::Detected(DialInfoDetail { Some(DetectionResult {
ddi: DetectedDialInfo::Detected(DialInfoDetail {
dial_info: external_1.dial_info.clone(), dial_info: external_1.dial_info.clone(),
class: DialInfoClass::Blocked, class: DialInfoClass::Blocked,
})) }),
external_address_types: AddressTypeSet::only(external_1.address.address_type()),
})
} }
}); });
unord.push(do_no_nat_fut); unord.push(do_no_nat_fut);
@ -409,7 +424,7 @@ impl DiscoveryContext {
#[instrument(level = "trace", skip(self), ret)] #[instrument(level = "trace", skip(self), ret)]
async fn protocol_process_nat( async fn protocol_process_nat(
&self, &self,
unord: &mut FuturesUnordered<SendPinBoxFuture<Option<DetectedDialInfo>>>, unord: &mut FuturesUnordered<SendPinBoxFuture<Option<DetectionResult>>>,
) { ) {
// Get the external dial info for our use here // Get the external dial info for our use here
let (external_1, external_2) = { let (external_1, external_2) = {
@ -422,8 +437,17 @@ impl DiscoveryContext {
// If we have two different external addresses, then this is a symmetric NAT // If we have two different external addresses, then this is a symmetric NAT
if external_2.address.address() != external_1.address.address() { if external_2.address.address() != external_1.address.address() {
let do_symmetric_nat_fut: SendPinBoxFuture<Option<DetectedDialInfo>> = let do_symmetric_nat_fut: SendPinBoxFuture<Option<DetectionResult>> =
Box::pin(async move { Some(DetectedDialInfo::SymmetricNAT) }); Box::pin(async move {
Some(DetectionResult {
ddi: DetectedDialInfo::SymmetricNAT,
external_address_types: AddressTypeSet::only(
external_1.address.address_type(),
) | AddressTypeSet::only(
external_2.address.address_type(),
),
})
});
unord.push(do_symmetric_nat_fut); unord.push(do_symmetric_nat_fut);
return; return;
} }
@ -438,7 +462,7 @@ impl DiscoveryContext {
{ {
if external_1.dial_info.port() != local_port { if external_1.dial_info.port() != local_port {
let c_external_1 = external_1.clone(); let c_external_1 = external_1.clone();
let do_manual_map_fut: SendPinBoxFuture<Option<DetectedDialInfo>> = let do_manual_map_fut: SendPinBoxFuture<Option<DetectionResult>> =
Box::pin(async move { Box::pin(async move {
// Do a validate_dial_info on the external address, but with the same port as the local port of local interface, from a redirected node // Do a validate_dial_info on the external address, but with the same port as the local port of local interface, from a redirected node
// This test is to see if a node had manual port forwarding done with the same port number as the local listener // This test is to see if a node had manual port forwarding done with the same port number as the local listener
@ -455,10 +479,15 @@ impl DiscoveryContext {
.await .await
{ {
// Add public dial info with Direct dialinfo class // Add public dial info with Direct dialinfo class
return Some(DetectedDialInfo::Detected(DialInfoDetail { return Some(DetectionResult {
ddi: DetectedDialInfo::Detected(DialInfoDetail {
dial_info: external_1_dial_info_with_local_port, dial_info: external_1_dial_info_with_local_port,
class: DialInfoClass::Direct, class: DialInfoClass::Direct,
})); }),
external_address_types: AddressTypeSet::only(
c_external_1.address.address_type(),
),
});
} }
None None
@ -473,7 +502,7 @@ impl DiscoveryContext {
// Full Cone NAT Detection // Full Cone NAT Detection
/////////// ///////////
let this = self.clone(); let this = self.clone();
let do_nat_detect_fut: SendPinBoxFuture<Option<DetectedDialInfo>> = Box::pin(async move { let do_nat_detect_fut: SendPinBoxFuture<Option<DetectionResult>> = Box::pin(async move {
let mut retry_count = { let mut retry_count = {
let c = this.unlocked_inner.net.config.get(); let c = this.unlocked_inner.net.config.get();
c.network.restricted_nat_retries c.network.restricted_nat_retries
@ -485,7 +514,7 @@ impl DiscoveryContext {
let c_this = this.clone(); let c_this = this.clone();
let c_external_1 = external_1.clone(); let c_external_1 = external_1.clone();
let do_full_cone_fut: SendPinBoxFuture<Option<DetectedDialInfo>> = let do_full_cone_fut: SendPinBoxFuture<Option<DetectionResult>> =
Box::pin(async move { Box::pin(async move {
// Let's see what kind of NAT we have // Let's see what kind of NAT we have
// Does a redirected dial info validation from a different address and a random port find us? // Does a redirected dial info validation from a different address and a random port find us?
@ -500,10 +529,15 @@ impl DiscoveryContext {
// Yes, another machine can use the dial info directly, so Full Cone // Yes, another machine can use the dial info directly, so Full Cone
// Add public dial info with full cone NAT network class // Add public dial info with full cone NAT network class
return Some(DetectedDialInfo::Detected(DialInfoDetail { return Some(DetectionResult {
ddi: DetectedDialInfo::Detected(DialInfoDetail {
dial_info: c_external_1.dial_info, dial_info: c_external_1.dial_info,
class: DialInfoClass::FullConeNAT, class: DialInfoClass::FullConeNAT,
})); }),
external_address_types: AddressTypeSet::only(
c_external_1.address.address_type(),
),
});
} }
None None
}); });
@ -512,7 +546,7 @@ impl DiscoveryContext {
let c_this = this.clone(); let c_this = this.clone();
let c_external_1 = external_1.clone(); let c_external_1 = external_1.clone();
let c_external_2 = external_2.clone(); let c_external_2 = external_2.clone();
let do_restricted_cone_fut: SendPinBoxFuture<Option<DetectedDialInfo>> = let do_restricted_cone_fut: SendPinBoxFuture<Option<DetectionResult>> =
Box::pin(async move { Box::pin(async move {
// We are restricted, determine what kind of restriction // We are restricted, determine what kind of restriction
@ -529,33 +563,43 @@ impl DiscoveryContext {
.await .await
{ {
// Got a reply from a non-default port, which means we're only address restricted // Got a reply from a non-default port, which means we're only address restricted
return Some(DetectedDialInfo::Detected(DialInfoDetail { return Some(DetectionResult {
ddi: DetectedDialInfo::Detected(DialInfoDetail {
dial_info: c_external_1.dial_info.clone(), dial_info: c_external_1.dial_info.clone(),
class: DialInfoClass::AddressRestrictedNAT, class: DialInfoClass::AddressRestrictedNAT,
})); }),
external_address_types: AddressTypeSet::only(
c_external_1.address.address_type(),
),
});
} }
// Didn't get a reply from a non-default port, which means we are also port restricted // Didn't get a reply from a non-default port, which means we are also port restricted
Some(DetectedDialInfo::Detected(DialInfoDetail { Some(DetectionResult {
ddi: DetectedDialInfo::Detected(DialInfoDetail {
dial_info: c_external_1.dial_info.clone(), dial_info: c_external_1.dial_info.clone(),
class: DialInfoClass::PortRestrictedNAT, class: DialInfoClass::PortRestrictedNAT,
})) }),
external_address_types: AddressTypeSet::only(
c_external_1.address.address_type(),
),
})
}); });
ord.push_back(do_restricted_cone_fut); ord.push_back(do_restricted_cone_fut);
// Return the first result we get // Return the first result we get
let mut some_ddi = None; let mut some_dr = None;
while let Some(res) = ord.next().await { while let Some(res) = ord.next().await {
if let Some(ddi) = res { if let Some(dr) = res {
some_ddi = Some(ddi); some_dr = Some(dr);
break; break;
} }
} }
if let Some(ddi) = some_ddi { if let Some(dr) = some_dr {
if let DetectedDialInfo::Detected(did) = &ddi { if let DetectedDialInfo::Detected(did) = &dr.ddi {
// If we got something better than restricted NAT or we're done retrying // If we got something better than restricted NAT or we're done retrying
if did.class < DialInfoClass::AddressRestrictedNAT || retry_count == 0 { if did.class < DialInfoClass::AddressRestrictedNAT || retry_count == 0 {
return Some(ddi); return Some(dr);
} }
} }
} }
@ -573,7 +617,7 @@ impl DiscoveryContext {
/// Add discovery futures to an unordered set that may detect dialinfo when they complete /// Add discovery futures to an unordered set that may detect dialinfo when they complete
pub async fn discover( pub async fn discover(
&self, &self,
unord: &mut FuturesUnordered<SendPinBoxFuture<Option<DetectedDialInfo>>>, unord: &mut FuturesUnordered<SendPinBoxFuture<Option<DetectionResult>>>,
) { ) {
let enable_upnp = { let enable_upnp = {
let c = self.unlocked_inner.net.config.get(); let c = self.unlocked_inner.net.config.get();
@ -591,16 +635,21 @@ impl DiscoveryContext {
/////////// ///////////
if enable_upnp { if enable_upnp {
let this = self.clone(); let this = self.clone();
let do_mapped_fut: SendPinBoxFuture<Option<DetectedDialInfo>> = Box::pin(async move { let do_mapped_fut: SendPinBoxFuture<Option<DetectionResult>> = Box::pin(async move {
// Attempt a port mapping via all available and enabled mechanisms // Attempt a port mapping via all available and enabled mechanisms
// Try this before the direct mapping in the event that we are restarting // Try this before the direct mapping in the event that we are restarting
// and may not have recorded a mapping created the last time // and may not have recorded a mapping created the last time
if let Some(external_mapped_dial_info) = this.try_upnp_port_mapping().await { if let Some(external_mapped_dial_info) = this.try_upnp_port_mapping().await {
// Got a port mapping, let's use it // Got a port mapping, let's use it
return Some(DetectedDialInfo::Detected(DialInfoDetail { return Some(DetectionResult {
ddi: DetectedDialInfo::Detected(DialInfoDetail {
dial_info: external_mapped_dial_info.clone(), dial_info: external_mapped_dial_info.clone(),
class: DialInfoClass::Mapped, class: DialInfoClass::Mapped,
})); }),
external_address_types: AddressTypeSet::only(
external_mapped_dial_info.address_type(),
),
});
} }
None None
}); });

View File

@ -684,7 +684,7 @@ impl Network {
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
pub fn get_protocol_config(&self) -> ProtocolConfig { pub fn get_protocol_config(&self) -> ProtocolConfig {
self.inner.lock().protocol_config self.inner.lock().protocol_config.clone()
} }
#[instrument(level = "debug", err, skip_all)] #[instrument(level = "debug", err, skip_all)]
@ -790,14 +790,33 @@ impl Network {
family_local.insert(AddressType::IPV6); family_local.insert(AddressType::IPV6);
} }
// set up the routing table's network config
// if we have static public dialinfo, upgrade our network class
let public_internet_capabilities = {
PUBLIC_INTERNET_CAPABILITIES
.iter()
.copied()
.filter(|cap| !c.capabilities.disable.contains(cap))
.collect::<Vec<Capability>>()
};
let local_network_capabilities = {
LOCAL_NETWORK_CAPABILITIES
.iter()
.copied()
.filter(|cap| !c.capabilities.disable.contains(cap))
.collect::<Vec<Capability>>()
};
ProtocolConfig { ProtocolConfig {
outbound, outbound,
inbound, inbound,
family_global, family_global,
family_local, family_local,
public_internet_capabilities,
local_network_capabilities,
} }
}; };
inner.protocol_config = protocol_config; inner.protocol_config = protocol_config.clone();
protocol_config protocol_config
}; };
@ -835,36 +854,17 @@ impl Network {
// that we have ports available to us // that we have ports available to us
self.free_bound_first_ports(); self.free_bound_first_ports();
// set up the routing table's network config
// if we have static public dialinfo, upgrade our network class
let public_internet_capabilities = {
let c = self.config.get();
PUBLIC_INTERNET_CAPABILITIES
.iter()
.copied()
.filter(|cap| !c.capabilities.disable.contains(cap))
.collect::<Vec<Capability>>()
};
let local_network_capabilities = {
let c = self.config.get();
LOCAL_NETWORK_CAPABILITIES
.iter()
.copied()
.filter(|cap| !c.capabilities.disable.contains(cap))
.collect::<Vec<Capability>>()
};
editor_public_internet.setup_network( editor_public_internet.setup_network(
protocol_config.outbound, protocol_config.outbound,
protocol_config.inbound, protocol_config.inbound,
protocol_config.family_global, protocol_config.family_global,
public_internet_capabilities, protocol_config.public_internet_capabilities,
); );
editor_local_network.setup_network( editor_local_network.setup_network(
protocol_config.outbound, protocol_config.outbound,
protocol_config.inbound, protocol_config.inbound,
protocol_config.family_local, protocol_config.family_local,
local_network_capabilities, protocol_config.local_network_capabilities,
); );
let detect_address_changes = { let detect_address_changes = {
let c = self.config.get(); let c = self.config.get();

View File

@ -22,6 +22,7 @@ impl Network {
editor.clear_dial_info_details(None, None); editor.clear_dial_info_details(None, None);
editor.set_network_class(Some(NetworkClass::OutboundOnly)); editor.set_network_class(Some(NetworkClass::OutboundOnly));
editor.clear_relay_node();
editor.commit(true).await; editor.commit(true).await;
} }
} }
@ -103,7 +104,7 @@ impl Network {
// Figure out if we can optimize TCP/WS checking since they are often on the same port // Figure out if we can optimize TCP/WS checking since they are often on the same port
let (protocol_config, tcp_same_port) = { let (protocol_config, tcp_same_port) = {
let inner = self.inner.lock(); let inner = self.inner.lock();
let protocol_config = inner.protocol_config; let protocol_config = inner.protocol_config.clone();
let tcp_same_port = if protocol_config.inbound.contains(ProtocolType::TCP) let tcp_same_port = if protocol_config.inbound.contains(ProtocolType::TCP)
&& protocol_config.inbound.contains(ProtocolType::WS) && protocol_config.inbound.contains(ProtocolType::WS)
{ {
@ -125,9 +126,16 @@ impl Network {
.collect(); .collect();
// Clear public dialinfo and network class in prep for discovery // Clear public dialinfo and network class in prep for discovery
let mut editor = self let mut editor = self
.routing_table() .routing_table()
.edit_routing_domain(RoutingDomain::PublicInternet); .edit_routing_domain(RoutingDomain::PublicInternet);
editor.setup_network(
protocol_config.outbound,
protocol_config.inbound,
protocol_config.family_global,
protocol_config.public_internet_capabilities.clone(),
);
editor.clear_dial_info_details(None, None); editor.clear_dial_info_details(None, None);
editor.set_network_class(None); editor.set_network_class(None);
editor.clear_relay_node(); editor.clear_relay_node();
@ -226,14 +234,18 @@ impl Network {
} }
// Wait for all discovery futures to complete and apply discoverycontexts // Wait for all discovery futures to complete and apply discoverycontexts
let mut all_address_types = AddressTypeSet::new();
loop { loop {
match unord.next().timeout_at(stop_token.clone()).await { match unord.next().timeout_at(stop_token.clone()).await {
Ok(Some(Some(ddi))) => { Ok(Some(Some(dr))) => {
// Found some new dial info for this protocol/address combination // Found some new dial info for this protocol/address combination
self.update_with_detected_dial_info(ddi.clone()).await?; self.update_with_detected_dial_info(dr.ddi.clone()).await?;
// Add the external address kinds to the set we've seen
all_address_types |= dr.external_address_types;
// Add WS dialinfo as well if it is on the same port as TCP // Add WS dialinfo as well if it is on the same port as TCP
if let DetectedDialInfo::Detected(did) = &ddi { if let DetectedDialInfo::Detected(did) = &dr.ddi {
if did.dial_info.protocol_type() == ProtocolType::TCP && tcp_same_port { if did.dial_info.protocol_type() == ProtocolType::TCP && tcp_same_port {
// Make WS dialinfo as well with same socket address as TCP // Make WS dialinfo as well with same socket address as TCP
let ws_ddi = DetectedDialInfo::Detected(DialInfoDetail { let ws_ddi = DetectedDialInfo::Detected(DialInfoDetail {
@ -262,7 +274,18 @@ impl Network {
} }
} }
// All done, see if things changed // All done
// Set the address types we've seen
editor.setup_network(
protocol_config.outbound,
protocol_config.inbound,
all_address_types,
protocol_config.public_internet_capabilities,
);
editor.commit(true).await;
// See if the dial info changed
let new_public_dial_info: HashSet<DialInfoDetail> = self let new_public_dial_info: HashSet<DialInfoDetail> = self
.routing_table() .routing_table()
.all_filtered_dial_info_details( .all_filtered_dial_info_details(

View File

@ -375,10 +375,14 @@ impl NetworkManager {
} }
}; };
// Dial info filter comes from the target node ref // Dial info filter comes from the target node ref but must be filtered by this node's outbound capabilities
let dial_info_filter = target_node_ref.dial_info_filter(); let dial_info_filter = target_node_ref.dial_info_filter().filtered(
&DialInfoFilter::all()
.with_address_type_set(peer_a.signed_node_info().node_info().address_types())
.with_protocol_type_set(peer_a.signed_node_info().node_info().outbound_protocols()));
let sequencing = target_node_ref.sequencing(); let sequencing = target_node_ref.sequencing();
// If the node has had lost questions or failures to send, prefer sequencing // If the node has had lost questions or failures to send, prefer sequencing
// to improve reliability. The node may be experiencing UDP fragmentation drops // to improve reliability. The node may be experiencing UDP fragmentation drops
// or other firewalling issues and may perform better with TCP. // or other firewalling issues and may perform better with TCP.

View File

@ -349,14 +349,24 @@ impl Network {
let family_global = AddressTypeSet::from(AddressType::IPV4); let family_global = AddressTypeSet::from(AddressType::IPV4);
let family_local = AddressTypeSet::from(AddressType::IPV4); let family_local = AddressTypeSet::from(AddressType::IPV4);
let public_internet_capabilities = {
PUBLIC_INTERNET_CAPABILITIES
.iter()
.copied()
.filter(|cap| !c.capabilities.disable.contains(cap))
.collect::<Vec<Capability>>()
};
ProtocolConfig { ProtocolConfig {
outbound, outbound,
inbound, inbound,
family_global, family_global,
family_local, family_local,
local_network_capabilities: vec![],
public_internet_capabilities,
} }
}; };
self.inner.lock().protocol_config = protocol_config; self.inner.lock().protocol_config = protocol_config.clone();
// Start editing routing table // Start editing routing table
let mut editor_public_internet = self let mut editor_public_internet = self
@ -367,20 +377,11 @@ impl Network {
// set up the routing table's network config // set up the routing table's network config
// if we have static public dialinfo, upgrade our network class // if we have static public dialinfo, upgrade our network class
let public_internet_capabilities = {
let c = self.config.get();
PUBLIC_INTERNET_CAPABILITIES
.iter()
.copied()
.filter(|cap| !c.capabilities.disable.contains(cap))
.collect::<Vec<Capability>>()
};
editor_public_internet.setup_network( editor_public_internet.setup_network(
protocol_config.outbound, protocol_config.outbound,
protocol_config.inbound, protocol_config.inbound,
protocol_config.family_global, protocol_config.family_global,
public_internet_capabilities, protocol_config.public_internet_capabilities.clone(),
); );
editor_public_internet.set_network_class(Some(NetworkClass::WebApp)); editor_public_internet.set_network_class(Some(NetworkClass::WebApp));
@ -454,7 +455,7 @@ impl Network {
} }
pub fn get_protocol_config(&self) -> ProtocolConfig { pub fn get_protocol_config(&self) -> ProtocolConfig {
self.inner.lock().protocol_config self.inner.lock().protocol_config.clone()
} }
////////////////////////////////////////// //////////////////////////////////////////

View File

@ -1285,7 +1285,11 @@ impl RouteSpecStore {
// Ensure our network class is valid before attempting to assemble any routes // Ensure our network class is valid before attempting to assemble any routes
if !rti.has_valid_network_class(RoutingDomain::PublicInternet) { if !rti.has_valid_network_class(RoutingDomain::PublicInternet) {
bail!("can't make private routes until our node info is valid"); let peer_info = rti.get_own_peer_info(RoutingDomain::PublicInternet);
bail!(
"can't make private routes until our node info is valid: {:?}",
peer_info
);
} }
// Make innermost route hop to our own node // Make innermost route hop to our own node

View File

@ -1,5 +1,6 @@
use super::*; use super::*;
#[derive(Debug)]
enum RoutingDomainChange { enum RoutingDomainChange {
ClearDialInfoDetails { ClearDialInfoDetails {
address_type: Option<AddressType>, address_type: Option<AddressType>,
@ -135,6 +136,9 @@ impl RoutingDomainEditor {
None None
}; };
// Debug print
log_rtab!(debug "[{:?}] COMMIT: {:?}", self.routing_domain, self.changes);
// Apply changes // Apply changes
let mut changed = false; let mut changed = false;
{ {
@ -247,10 +251,6 @@ impl RoutingDomainEditor {
} }
} }
} }
if changed {
// Clear our 'peer info' cache, the peerinfo for this routing domain will get regenerated next time it is asked for
detail.common_mut().clear_cache()
}
}); });
if changed { if changed {
// Allow signed node info updates at same timestamp for otherwise dead nodes if our network has changed // Allow signed node info updates at same timestamp for otherwise dead nodes if our network has changed

View File

@ -278,6 +278,8 @@ fn first_filtered_dial_info_detail_between_nodes(
sequencing: Sequencing, sequencing: Sequencing,
dif_sort: Option<Arc<DialInfoDetailSort>> dif_sort: Option<Arc<DialInfoDetailSort>>
) -> Option<DialInfoDetail> { ) -> Option<DialInfoDetail> {
// Consider outbound capabilities
let dial_info_filter = (*dial_info_filter).filtered( let dial_info_filter = (*dial_info_filter).filtered(
&DialInfoFilter::all() &DialInfoFilter::all()
.with_address_type_set(from_node.address_types()) .with_address_type_set(from_node.address_types())
@ -569,45 +571,19 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
sequencing: Sequencing, sequencing: Sequencing,
dif_sort: Option<Arc<DialInfoDetailSort>>, dif_sort: Option<Arc<DialInfoDetailSort>>,
) -> ContactMethod { ) -> ContactMethod {
// Scope the filter down to protocols node A can do outbound
let dial_info_filter = dial_info_filter.filtered(
&DialInfoFilter::all()
.with_address_type_set(peer_a.signed_node_info().node_info().address_types())
.with_protocol_type_set(peer_a.signed_node_info().node_info().outbound_protocols()),
);
// Apply sequencing and get sort // Get the nodeinfos for convenience
// Include sorting by external dial info sort for rotating through dialinfo let node_a = peer_a.signed_node_info().node_info();
// based on an external preference table, for example the one kept by let node_b = peer_b.signed_node_info().node_info();
// AddressFilter to deprioritize dialinfo that have recently failed to connect
let (ordered, dial_info_filter) = dial_info_filter.with_sequencing(sequencing); // Get the node ids that would be used between these peers
let sort: Option<Box<DialInfoDetailSort>> = if ordered { let cck = common_crypto_kinds(&peer_a.node_ids().kinds(), &peer_b.node_ids().kinds());
if let Some(dif_sort) = dif_sort { let Some(_best_ck) = cck.first().copied() else {
Some(Box::new(move |a, b| { // No common crypto kinds between these nodes, can't contact
let mut ord = dif_sort(a,b); return ContactMethod::Unreachable;
if ord == core::cmp::Ordering::Equal {
ord = DialInfoDetail::ordered_sequencing_sort(a,b);
}
ord
}))
} else {
Some(Box::new(move |a,b| { DialInfoDetail::ordered_sequencing_sort(a,b) }))
}
} else if let Some(dif_sort) = dif_sort {
Some(Box::new(move |a,b| { dif_sort(a,b) }))
} else {
None
}; };
// If the filter is dead then we won't be able to connect if let Some(target_did) = first_filtered_dial_info_detail_between_nodes(node_a, node_b, &dial_info_filter, sequencing, dif_sort) {
if dial_info_filter.is_dead() {
return ContactMethod::Unreachable;
}
let filter = |did: &DialInfoDetail| did.matches_filter(&dial_info_filter);
let opt_target_did = peer_b.signed_node_info().node_info().first_filtered_dial_info_detail(sort, filter);
if let Some(target_did) = opt_target_did {
return ContactMethod::Direct(target_did.dial_info); return ContactMethod::Direct(target_did.dial_info);
} }

View File

@ -113,21 +113,21 @@ where
}); });
} }
async fn fanout_processor(self: Arc<Self>) { async fn fanout_processor(self: Arc<Self>) -> bool {
// Loop until we have a result or are done // Loop until we have a result or are done
loop { loop {
// Get the closest node we haven't processed yet if we're not done yet // Get the closest node we haven't processed yet if we're not done yet
let next_node = { let next_node = {
let mut ctx = self.context.lock(); let mut ctx = self.context.lock();
if self.clone().evaluate_done(&mut ctx) { if self.clone().evaluate_done(&mut ctx) {
break; break true;
} }
ctx.fanout_queue.next() ctx.fanout_queue.next()
}; };
// If we don't have a node to process, stop fanning out // If we don't have a node to process, stop fanning out
let Some(next_node) = next_node else { let Some(next_node) = next_node else {
break; break false;
}; };
// Do the call for this node // Do the call for this node
@ -161,7 +161,7 @@ where
Err(e) => { Err(e) => {
// Error happened, abort everything and return the error // Error happened, abort everything and return the error
self.context.lock().result = Some(Err(e)); self.context.lock().result = Some(Err(e));
return; break true;
} }
}; };
} }
@ -248,7 +248,13 @@ where
} }
} }
// Wait for them to complete // Wait for them to complete
timeout(timeout_ms, async { while unord.next().await.is_some() {} }) timeout(timeout_ms, async {
while let Some(is_done) = unord.next().await {
if is_done {
break;
}
}
})
.await .await
.into_timeout_or() .into_timeout_or()
.map(|_| { .map(|_| {

View File

@ -166,9 +166,13 @@ impl StorageManager {
match fanout_call.run().await { match fanout_call.run().await {
// If we don't finish in the timeout (too much time passed checking for consensus) // If we don't finish in the timeout (too much time passed checking for consensus)
TimeoutOr::Timeout => { TimeoutOr::Timeout => {
log_stor!(debug "GetValue Fanout Timeout");
// Return the best answer we've got // Return the best answer we've got
let ctx = context.lock(); let ctx = context.lock();
if ctx.value_count >= consensus_count {
log_stor!(debug "GetValue Fanout Timeout Consensus");
} else {
log_stor!(debug "GetValue Fanout Timeout Non-Consensus: {}", ctx.value_count);
}
Ok(SubkeyResult { Ok(SubkeyResult {
value: ctx.value.clone(), value: ctx.value.clone(),
descriptor: ctx.descriptor.clone(), descriptor: ctx.descriptor.clone(),

View File

@ -162,9 +162,14 @@ impl StorageManager {
match fanout_call.run().await { match fanout_call.run().await {
// If we don't finish in the timeout (too much time passed checking for consensus) // If we don't finish in the timeout (too much time passed checking for consensus)
TimeoutOr::Timeout => { TimeoutOr::Timeout => {
log_stor!(debug "SetValue Fanout Timeout");
// Return the best answer we've got // Return the best answer we've got
let ctx = context.lock(); let ctx = context.lock();
if ctx.set_count >= consensus_count {
log_stor!(debug "SetValue Fanout Timeout Consensus");
} else {
log_stor!(debug "SetValue Fanout Timeout Non-Consensus: {}", ctx.set_count);
}
Ok(ctx.value.clone()) Ok(ctx.value.clone())
} }
// If we finished with or without consensus (enough nodes returning the same value) // If we finished with or without consensus (enough nodes returning the same value)

View File

@ -0,0 +1,14 @@
gradle-wrapper.jar
/.gradle
/.idea
/captures/
/gradlew
/gradlew.bat
/local.properties
GeneratedPluginRegistrant.java
# Remember to never publicly share your keystore.
# See https://flutter.dev/docs/deployment/android#reference-the-keystore-from-the-app
key.properties
**/*.keystore
**/*.jks

View File

@ -0,0 +1 @@
// Top-level build file where you can add configuration options common to all sub-projects/modules.

View File

@ -0,0 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.4-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists

View File

@ -186,6 +186,52 @@ abstract class VeilidCryptoSystem {
Future<Uint8List> cryptNoAuth( Future<Uint8List> cryptNoAuth(
Uint8List body, Nonce nonce, SharedSecret sharedSecret); Uint8List body, Nonce nonce, SharedSecret sharedSecret);
Future<Uint8List> encryptAeadWithNonce(
Uint8List body, SharedSecret secret) async {
// generate nonce
final nonce = await randomNonce();
// crypt and append nonce
final b = BytesBuilder()
..add(await encryptAead(body, nonce, secret, null))
..add(nonce.decode());
return b.toBytes();
}
Future<Uint8List> decryptAeadWithNonce(
Uint8List body, SharedSecret secret) async {
if (body.length < Nonce.decodedLength()) {
throw const FormatException('not enough data to decrypt');
}
final nonce =
Nonce.fromBytes(body.sublist(body.length - Nonce.decodedLength()));
final encryptedData = body.sublist(0, body.length - Nonce.decodedLength());
// decrypt
return decryptAead(encryptedData, nonce, secret, null);
}
Future<Uint8List> encryptAeadWithPassword(
Uint8List body, String password) async {
final ekbytes = Uint8List.fromList(utf8.encode(password));
final nonce = await randomNonce();
final saltBytes = nonce.decode();
final sharedSecret = await deriveSharedSecret(ekbytes, saltBytes);
return Uint8List.fromList(
(await encryptAead(body, nonce, sharedSecret, null)) + saltBytes);
}
Future<Uint8List> decryptAeadWithPassword(
Uint8List body, String password) async {
if (body.length < Nonce.decodedLength()) {
throw const FormatException('not enough data to decrypt');
}
final ekbytes = Uint8List.fromList(utf8.encode(password));
final bodyBytes = body.sublist(0, body.length - Nonce.decodedLength());
final saltBytes = body.sublist(body.length - Nonce.decodedLength());
final nonce = Nonce.fromBytes(saltBytes);
final sharedSecret = await deriveSharedSecret(ekbytes, saltBytes);
return decryptAead(bodyBytes, nonce, sharedSecret, null);
}
Future<Uint8List> encryptNoAuthWithNonce( Future<Uint8List> encryptNoAuthWithNonce(
Uint8List body, SharedSecret secret) async { Uint8List body, SharedSecret secret) async {
// generate nonce // generate nonce
@ -215,7 +261,8 @@ abstract class VeilidCryptoSystem {
final nonce = await randomNonce(); final nonce = await randomNonce();
final saltBytes = nonce.decode(); final saltBytes = nonce.decode();
final sharedSecret = await deriveSharedSecret(ekbytes, saltBytes); final sharedSecret = await deriveSharedSecret(ekbytes, saltBytes);
return (await cryptNoAuth(body, nonce, sharedSecret))..addAll(saltBytes); return Uint8List.fromList(
(await cryptNoAuth(body, nonce, sharedSecret)) + saltBytes);
} }
Future<Uint8List> decryptNoAuthWithPassword( Future<Uint8List> decryptNoAuthWithPassword(