diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 4e72ea51..01ab901e 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -878,6 +878,7 @@ impl NetworkManager { data: &mut [u8], connection_descriptor: ConnectionDescriptor, ) -> EyreResult { + #[cfg(feature="verbose-tracing")] let root = span!( parent: None, Level::TRACE, @@ -885,6 +886,7 @@ impl NetworkManager { "data.len" = data.len(), "descriptor" = ?connection_descriptor ); + #[cfg(feature="verbose-tracing")] let _root_enter = root.enter(); log_net!( @@ -1017,6 +1019,17 @@ impl NetworkManager { }; if let Some(relay_nr) = some_relay_nr { + // Force sequencing if this came in sequenced. + // The sender did the prefer/ensure calculation when it did get_contact_method, + // so we don't need to do it here. + let relay_nr = if connection_descriptor.remote().protocol_type().is_ordered() { + let mut relay_nr = relay_nr.clone(); + relay_nr.set_sequencing(Sequencing::EnsureOrdered); + relay_nr + } else { + relay_nr + }; + // Relay the packet to the desired destination log_net!("relaying {} bytes to {}", data.len(), relay_nr); diff --git a/veilid-core/src/network_manager/send_data.rs b/veilid-core/src/network_manager/send_data.rs index 246098e7..063d4f19 100644 --- a/veilid-core/src/network_manager/send_data.rs +++ b/veilid-core/src/network_manager/send_data.rs @@ -342,21 +342,24 @@ impl NetworkManager { ContactMethod::Existing => NodeContactMethod::Existing, ContactMethod::Direct(di) => NodeContactMethod::Direct(di), ContactMethod::SignalReverse(relay_key, target_key) => { - let relay_nr = routing_table + let mut relay_nr = routing_table .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)? .ok_or_else(|| eyre!("couldn't look up relay"))?; if !target_node_ref.node_ids().contains(&target_key) { bail!("signalreverse target noderef didn't match target key: {:?} != {} for relay {}", target_node_ref, target_key, relay_key ); } + relay_nr.set_sequencing(sequencing); NodeContactMethod::SignalReverse(relay_nr, target_node_ref) } ContactMethod::SignalHolePunch(relay_key, target_key) => { - let relay_nr = routing_table + let mut relay_nr = routing_table .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)? .ok_or_else(|| eyre!("couldn't look up relay"))?; if !target_node_ref.node_ids().contains(&target_key) { bail!("signalholepunch target noderef didn't match target key: {:?} != {} for relay {}", target_node_ref, target_key, relay_key ); } + relay_nr.set_sequencing(sequencing); + // if any other protocol were possible here we could update this and do_hole_punch // but tcp hole punch is very very unreliable it seems let udp_target_node_ref = target_node_ref @@ -365,15 +368,17 @@ impl NetworkManager { NodeContactMethod::SignalHolePunch(relay_nr, udp_target_node_ref) } ContactMethod::InboundRelay(relay_key) => { - let relay_nr = routing_table + let mut relay_nr = routing_table .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)? .ok_or_else(|| eyre!("couldn't look up relay"))?; + relay_nr.set_sequencing(sequencing); NodeContactMethod::InboundRelay(relay_nr) } ContactMethod::OutboundRelay(relay_key) => { - let relay_nr = routing_table + let mut relay_nr = routing_table .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)? .ok_or_else(|| eyre!("couldn't look up relay"))?; + relay_nr.set_sequencing(sequencing); NodeContactMethod::OutboundRelay(relay_nr) } }; diff --git a/veilid-core/src/network_manager/types/connection_descriptor.rs b/veilid-core/src/network_manager/types/connection_descriptor.rs index 1046838f..54481c39 100644 --- a/veilid-core/src/network_manager/types/connection_descriptor.rs +++ b/veilid-core/src/network_manager/types/connection_descriptor.rs @@ -30,9 +30,7 @@ pub struct ConnectionDescriptor { impl ConnectionDescriptor { pub fn new(remote: PeerAddress, local: SocketAddress) -> Self { - assert!( - !remote.protocol_type().is_connection_oriented() || !local.address().is_unspecified() - ); + assert!(!remote.protocol_type().is_ordered() || !local.address().is_unspecified()); Self { remote, diff --git a/veilid-core/src/network_manager/types/protocol_type.rs b/veilid-core/src/network_manager/types/protocol_type.rs index 627a5ccc..41167580 100644 --- a/veilid-core/src/network_manager/types/protocol_type.rs +++ b/veilid-core/src/network_manager/types/protocol_type.rs @@ -25,7 +25,7 @@ pub enum ProtocolType { } impl ProtocolType { - pub fn is_connection_oriented(&self) -> bool { + pub fn is_ordered(&self) -> bool { matches!( self, ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 909f9394..be41c1b9 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -441,7 +441,7 @@ impl BucketEntryInner { // Check if the connection is still considered live let alive = // Should we check the connection table? - if v.0.protocol_type().is_connection_oriented() { + if v.0.protocol_type().is_ordered() { // Look the connection up in the connection manager and see if it's still there connection_manager.get_connection(v.0).is_some() } else { diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index fcaf8c99..81a2dbd7 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -1144,100 +1144,6 @@ impl RoutingTable { } } } - - pub fn make_public_internet_relay_node_filter(&self) -> impl Fn(&BucketEntryInner) -> bool { - // Get all our outbound protocol/address types - let outbound_dif = self.get_outbound_dial_info_filter(RoutingDomain::PublicInternet); - let mapped_port_info = self.get_low_level_port_info(); - - move |e: &BucketEntryInner| { - // Ensure this node is not on the local network - if e.has_node_info(RoutingDomain::LocalNetwork.into()) { - return false; - } - - // Disqualify nodes that don't cover all our inbound ports for tcp and udp - // as we need to be able to use the relay for keepalives for all nat mappings - let mut low_level_protocol_ports = mapped_port_info.low_level_protocol_ports.clone(); - - let can_serve_as_relay = e - .node_info(RoutingDomain::PublicInternet) - .map(|n| { - let dids = n.all_filtered_dial_info_details( - Some(DialInfoDetail::ordered_sequencing_sort), // By default, choose connection-oriented protocol for relay - |did| did.matches_filter(&outbound_dif), - ); - for did in &dids { - let pt = did.dial_info.protocol_type(); - let at = did.dial_info.address_type(); - if let Some((llpt, port)) = mapped_port_info.protocol_to_port.get(&(pt, at)) - { - low_level_protocol_ports.remove(&(*llpt, at, *port)); - } - } - low_level_protocol_ports.is_empty() - }) - .unwrap_or(false); - if !can_serve_as_relay { - return false; - } - - true - } - } - - #[instrument(level = "trace", skip(self), ret)] - pub fn find_inbound_relay( - &self, - routing_domain: RoutingDomain, - cur_ts: Timestamp, - ) -> Option { - // Get relay filter function - let relay_node_filter = match routing_domain { - RoutingDomain::PublicInternet => self.make_public_internet_relay_node_filter(), - RoutingDomain::LocalNetwork => { - unimplemented!(); - } - }; - - // Go through all entries and find fastest entry that matches filter function - let inner = self.inner.read(); - let inner = &*inner; - let mut best_inbound_relay: Option> = None; - - // Iterate all known nodes for candidates - inner.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, entry| { - let entry2 = entry.clone(); - entry.with(rti, |rti, e| { - // Ensure we have the node's status - if let Some(node_status) = e.node_status(routing_domain) { - // Ensure the node will relay - if node_status.will_relay() { - // Compare against previous candidate - if let Some(best_inbound_relay) = best_inbound_relay.as_mut() { - // Less is faster - let better = best_inbound_relay.with(rti, |_rti, best| { - // choose low latency stability for relays - BucketEntryInner::cmp_fastest_reliable(cur_ts, e, best) - == std::cmp::Ordering::Less - }); - // Now apply filter function and see if this node should be included - if better && relay_node_filter(e) { - *best_inbound_relay = entry2; - } - } else if relay_node_filter(e) { - // Always store the first candidate - best_inbound_relay = Some(entry2); - } - } - } - }); - // Don't end early, iterate through all entries - Option::<()>::None - }); - // Return the best inbound relay noderef - best_inbound_relay.map(|e| NodeRef::new(self.clone(), e, None)) - } } impl core::ops::Deref for RoutingTable { diff --git a/veilid-core/src/routing_table/tasks/relay_management.rs b/veilid-core/src/routing_table/tasks/relay_management.rs index 8d980c04..37d5ec5f 100644 --- a/veilid-core/src/routing_table/tasks/relay_management.rs +++ b/veilid-core/src/routing_table/tasks/relay_management.rs @@ -81,4 +81,97 @@ impl RoutingTable { Ok(()) } + + pub fn make_public_internet_relay_node_filter(&self) -> impl Fn(&BucketEntryInner) -> bool { + // Get all our outbound protocol/address types + let outbound_dif = self.get_outbound_dial_info_filter(RoutingDomain::PublicInternet); + let mapped_port_info = self.get_low_level_port_info(); + + move |e: &BucketEntryInner| { + // Ensure this node is not on the local network + if e.has_node_info(RoutingDomain::LocalNetwork.into()) { + return false; + } + + // Disqualify nodes that don't cover all our inbound ports for tcp and udp + // as we need to be able to use the relay for keepalives for all nat mappings + let mut low_level_protocol_ports = mapped_port_info.low_level_protocol_ports.clone(); + + let can_serve_as_relay = e + .node_info(RoutingDomain::PublicInternet) + .map(|n| { + let dids = n.all_filtered_dial_info_details(DialInfoDetail::NO_SORT, |did| { + did.matches_filter(&outbound_dif) + }); + for did in &dids { + let pt = did.dial_info.protocol_type(); + let at = did.dial_info.address_type(); + if let Some((llpt, port)) = mapped_port_info.protocol_to_port.get(&(pt, at)) + { + low_level_protocol_ports.remove(&(*llpt, at, *port)); + } + } + low_level_protocol_ports.is_empty() + }) + .unwrap_or(false); + if !can_serve_as_relay { + return false; + } + + true + } + } + + #[instrument(level = "trace", skip(self), ret)] + pub fn find_inbound_relay( + &self, + routing_domain: RoutingDomain, + cur_ts: Timestamp, + ) -> Option { + // Get relay filter function + let relay_node_filter = match routing_domain { + RoutingDomain::PublicInternet => self.make_public_internet_relay_node_filter(), + RoutingDomain::LocalNetwork => { + unimplemented!(); + } + }; + + // Go through all entries and find fastest entry that matches filter function + let inner = self.inner.read(); + let inner = &*inner; + let mut best_inbound_relay: Option> = None; + + // Iterate all known nodes for candidates + inner.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, entry| { + let entry2 = entry.clone(); + entry.with(rti, |rti, e| { + // Ensure we have the node's status + if let Some(node_status) = e.node_status(routing_domain) { + // Ensure the node will relay + if node_status.will_relay() { + // Compare against previous candidate + if let Some(best_inbound_relay) = best_inbound_relay.as_mut() { + // Less is faster + let better = best_inbound_relay.with(rti, |_rti, best| { + // choose low latency stability for relays + BucketEntryInner::cmp_fastest_reliable(cur_ts, e, best) + == std::cmp::Ordering::Less + }); + // Now apply filter function and see if this node should be included + if better && relay_node_filter(e) { + *best_inbound_relay = entry2; + } + } else if relay_node_filter(e) { + // Always store the first candidate + best_inbound_relay = Some(entry2); + } + } + } + }); + // Don't end early, iterate through all entries + Option::<()>::None + }); + // Return the best inbound relay noderef + best_inbound_relay.map(|e| NodeRef::new(self.clone(), e, None)) + } } diff --git a/veilid-core/src/routing_table/types/dial_info_detail.rs b/veilid-core/src/routing_table/types/dial_info_detail.rs index 22adf233..52e51492 100644 --- a/veilid-core/src/routing_table/types/dial_info_detail.rs +++ b/veilid-core/src/routing_table/types/dial_info_detail.rs @@ -29,13 +29,11 @@ impl MatchesDialInfoFilter for DialInfoDetail { impl DialInfoDetail { pub fn ordered_sequencing_sort(a: &DialInfoDetail, b: &DialInfoDetail) -> core::cmp::Ordering { - if a.class < b.class { - return core::cmp::Ordering::Less; + let c = DialInfo::ordered_sequencing_sort(&a.dial_info, &b.dial_info); + if c != core::cmp::Ordering::Equal { + return c; } - if a.class > b.class { - return core::cmp::Ordering::Greater; - } - DialInfo::ordered_sequencing_sort(&a.dial_info, &b.dial_info) + a.class.cmp(&b.class) } pub const NO_SORT: std::option::Option< for<'r, 's> fn(&'r DialInfoDetail, &'s DialInfoDetail) -> std::cmp::Ordering, diff --git a/veilid-core/src/routing_table/types/signed_node_info.rs b/veilid-core/src/routing_table/types/signed_node_info.rs index 60e99c89..fe2ecbc1 100644 --- a/veilid-core/src/routing_table/types/signed_node_info.rs +++ b/veilid-core/src/routing_table/types/signed_node_info.rs @@ -71,7 +71,7 @@ impl SignedNodeInfo { match sequencing { Sequencing::NoPreference | Sequencing::PreferOrdered => return true, Sequencing::EnsureOrdered => { - if did.dial_info.protocol_type().is_connection_oriented() { + if did.dial_info.protocol_type().is_ordered() { return true; } } @@ -85,7 +85,7 @@ impl SignedNodeInfo { match sequencing { Sequencing::NoPreference | Sequencing::PreferOrdered => return true, Sequencing::EnsureOrdered => { - if did.dial_info.protocol_type().is_connection_oriented() { + if did.dial_info.protocol_type().is_ordered() { return true; } } diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 59a72c32..cf71b849 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -728,6 +728,10 @@ impl RPCProcessor { if sequencing > node_ref.sequencing() { node_ref.set_sequencing(sequencing) } + let mut destination_node_ref = destination_node_ref.clone(); + if sequencing > destination_node_ref.sequencing() { + destination_node_ref.set_sequencing(sequencing) + } // Reply private route should be None here, even for questions assert!(reply_private_route.is_none()); diff --git a/veilid-python/tests/test_routing_context.py b/veilid-python/tests/test_routing_context.py index 588e1936..00f5e714 100644 --- a/veilid-python/tests/test_routing_context.py +++ b/veilid-python/tests/test_routing_context.py @@ -117,8 +117,13 @@ async def test_routing_context_app_message_loopback_big_packets(): app_message_queue: asyncio.Queue = asyncio.Queue() + global got_message + got_message = 0 async def app_message_queue_update_callback(update: veilid.VeilidUpdate): if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE: + global got_message + got_message += 1 + print("got {}".format(got_message)) await app_message_queue.put(update) sent_messages: set[bytes] = set() @@ -142,7 +147,7 @@ async def test_routing_context_app_message_loopback_big_packets(): prr = await api.import_remote_private_route(blob) # do this test 100 times - for _ in range(1000): + for _ in range(100): # send a random sized random app message to our own private route message = random.randbytes(random.randint(0, 32768)) @@ -151,8 +156,9 @@ async def test_routing_context_app_message_loopback_big_packets(): sent_messages.add(message) # we should get the same messages back - for _ in range(len(sent_messages)): - + print(len(sent_messages)) + for n in range(len(sent_messages)): + print(n) update: veilid.VeilidUpdate = await asyncio.wait_for( app_message_queue.get(), timeout=10 )