From 197b7fef6e3d14eab559cb897bd59d5451b32ed7 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sat, 24 Jun 2023 11:16:34 -0400 Subject: [PATCH] reliability work --- veilid-core/src/network_manager/mod.rs | 1 + .../network_manager/types/dial_info/mod.rs | 10 ++-- .../network_manager/types/dial_info_filter.rs | 33 +++++++++++- .../network_manager/types/protocol_type.rs | 12 +++++ veilid-core/src/routing_table/bucket_entry.rs | 42 ++++++++------- veilid-core/src/routing_table/node_ref.rs | 33 ++++++------ .../src/routing_table/node_ref_filter.rs | 51 ++++++++++++++++++- .../route_spec_store/route_spec_store.rs | 13 +++-- .../route_spec_store/route_stats.rs | 5 ++ .../src/routing_table/routing_domains.rs | 37 ++++++-------- .../src/routing_table/routing_table_inner.rs | 6 +++ .../src/routing_table/tasks/bootstrap.rs | 7 +-- veilid-python/tests/test_routing_context.py | 19 ++++--- veilid-python/veilid/schema/RecvMessage.json | 6 +++ 14 files changed, 191 insertions(+), 84 deletions(-) diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index f410ca31..9c64cf2e 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -1122,6 +1122,7 @@ impl NetworkManager { // or other firewalling issues and may perform better with TCP. let unreliable = target_node_ref.peer_stats().rpc_stats.failed_to_send > 2 || target_node_ref.peer_stats().rpc_stats.recent_lost_answers > 2; if unreliable && sequencing < Sequencing::PreferOrdered { + log_net!(debug "Node contact failing over to Ordered for {}", target_node_ref.to_string().cyan()); sequencing = Sequencing::PreferOrdered; } diff --git a/veilid-core/src/network_manager/types/dial_info/mod.rs b/veilid-core/src/network_manager/types/dial_info/mod.rs index a6a12c69..5eec84f5 100644 --- a/veilid-core/src/network_manager/types/dial_info/mod.rs +++ b/veilid-core/src/network_manager/types/dial_info/mod.rs @@ -491,13 +491,9 @@ impl DialInfo { } pub fn ordered_sequencing_sort(a: &DialInfo, b: &DialInfo) -> core::cmp::Ordering { - let ca = a.protocol_type().sort_order(Sequencing::EnsureOrdered); - let cb = b.protocol_type().sort_order(Sequencing::EnsureOrdered); - if ca < cb { - return core::cmp::Ordering::Less; - } - if ca > cb { - return core::cmp::Ordering::Greater; + let s = ProtocolType::ordered_sequencing_sort(a.protocol_type(), b.protocol_type()); + if s != core::cmp::Ordering::Equal { + return s; } match (a, b) { (DialInfo::UDP(a), DialInfo::UDP(b)) => a.cmp(b), diff --git a/veilid-core/src/network_manager/types/dial_info_filter.rs b/veilid-core/src/network_manager/types/dial_info_filter.rs index c3635957..a6867789 100644 --- a/veilid-core/src/network_manager/types/dial_info_filter.rs +++ b/veilid-core/src/network_manager/types/dial_info_filter.rs @@ -61,6 +61,20 @@ impl DialInfoFilter { pub fn is_dead(&self) -> bool { self.protocol_type_set.is_empty() || self.address_type_set.is_empty() } + pub fn with_sequencing(mut self, sequencing: Sequencing) -> (bool, DialInfoFilter) { + // Get first filtered dialinfo + match sequencing { + Sequencing::NoPreference => (false, self), + Sequencing::PreferOrdered => (true, self), + Sequencing::EnsureOrdered => ( + true, + self.filtered( + &DialInfoFilter::all().with_protocol_type_set(ProtocolType::all_ordered_set()), + ), + ), + } + // return ordered sort and filter with ensure applied + } } impl fmt::Debug for DialInfoFilter { @@ -80,7 +94,24 @@ impl fmt::Debug for DialInfoFilter { } } +impl From for DialInfoFilter { + fn from(other: ProtocolType) -> Self { + Self { + protocol_type_set: ProtocolTypeSet::from(other), + address_type_set: AddressTypeSet::all(), + } + } +} + +impl From for DialInfoFilter { + fn from(other: AddressType) -> Self { + Self { + protocol_type_set: ProtocolTypeSet::all(), + address_type_set: AddressTypeSet::from(other), + } + } +} + pub trait MatchesDialInfoFilter { fn matches_filter(&self, filter: &DialInfoFilter) -> bool; } - diff --git a/veilid-core/src/network_manager/types/protocol_type.rs b/veilid-core/src/network_manager/types/protocol_type.rs index 4ba47000..627a5ccc 100644 --- a/veilid-core/src/network_manager/types/protocol_type.rs +++ b/veilid-core/src/network_manager/types/protocol_type.rs @@ -72,6 +72,18 @@ impl ProtocolType { pub fn all_ordered_set() -> ProtocolTypeSet { ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS } + + pub fn ordered_sequencing_sort(a: Self, b: Self) -> core::cmp::Ordering { + let ca = a.sort_order(Sequencing::EnsureOrdered); + let cb = b.sort_order(Sequencing::EnsureOrdered); + if ca < cb { + return core::cmp::Ordering::Less; + } + if ca > cb { + return core::cmp::Ordering::Greater; + } + core::cmp::Ordering::Equal + } } impl fmt::Display for ProtocolType { diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index f133f4e5..4315a176 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -316,7 +316,8 @@ impl BucketEntryInner { let last_connections = self.last_connections( rti, true, - Some(NodeRefFilter::new().with_routing_domain(routing_domain)), + NodeRefFilter::from(routing_domain), + false, ); !last_connections.is_empty() } @@ -370,7 +371,8 @@ impl BucketEntryInner { let last_connections = self.last_connections( rti, true, - Some(NodeRefFilter::new().with_routing_domain_set(routing_domain_set)), + NodeRefFilter::from(routing_domain_set), + false ); for lc in last_connections { if let Some(rd) = @@ -412,7 +414,8 @@ impl BucketEntryInner { &self, rti: &RoutingTableInner, only_live: bool, - filter: Option, + filter: NodeRefFilter, + ordered: bool, ) -> Vec<(ConnectionDescriptor, Timestamp)> { let connection_manager = rti.unlocked_inner.network_manager.connection_manager(); @@ -421,26 +424,13 @@ impl BucketEntryInner { .last_connections .iter() .filter_map(|(k, v)| { - let include = if let Some(filter) = &filter { + let include = { let remote_address = v.0.remote_address().address(); - if let Some(routing_domain) = rti.routing_domain_for_address(remote_address) { - if filter.routing_domain_set.contains(routing_domain) + rti.routing_domain_for_address(remote_address).map(|rd| { + filter.routing_domain_set.contains(rd) && filter.dial_info_filter.protocol_type_set.contains(k.0) && filter.dial_info_filter.address_type_set.contains(k.1) - { - // matches filter - true - } else { - // does not match filter - false - } - } else { - // no valid routing domain - false - } - } else { - // no filter - true + }).unwrap_or(false) }; if !include { @@ -471,8 +461,16 @@ impl BucketEntryInner { } }) .collect(); - // Sort with newest timestamps first - out.sort_by(|a, b| b.1.cmp(&a.1)); + // Sort with ordering preference first and then sort with newest timestamps + out.sort_by(|a, b| { + if ordered { + let s = ProtocolType::ordered_sequencing_sort(a.0.protocol_type(), b.0.protocol_type()); + if s != core::cmp::Ordering::Equal { + return s; + } + } + b.1.cmp(&a.1) + }); out } diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 5cb53c64..2044ee0a 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -217,25 +217,24 @@ pub trait NodeRefBase: Sized { fn first_filtered_dial_info_detail(&self) -> Option { let routing_domain_set = self.routing_domain_set(); let dial_info_filter = self.dial_info_filter(); + let sequencing = self.common().sequencing; + let (ordered, dial_info_filter) = dial_info_filter.with_sequencing(sequencing); - let (sort, dial_info_filter) = match self.common().sequencing { - Sequencing::NoPreference => (None, dial_info_filter), - Sequencing::PreferOrdered => ( - Some(DialInfoDetail::ordered_sequencing_sort), - dial_info_filter, - ), - Sequencing::EnsureOrdered => ( - Some(DialInfoDetail::ordered_sequencing_sort), - dial_info_filter.filtered( - &DialInfoFilter::all().with_protocol_type_set(ProtocolType::all_ordered_set()), - ), - ), + let sort = if ordered { + Some(DialInfoDetail::ordered_sequencing_sort) + } else { + None }; + if dial_info_filter.is_dead() { + return None; + } + + let filter = |did: &DialInfoDetail| did.matches_filter(&dial_info_filter); + self.operate(|_rt, e| { for routing_domain in routing_domain_set { if let Some(ni) = e.node_info(routing_domain) { - let filter = |did: &DialInfoDetail| did.matches_filter(&dial_info_filter); if let Some(did) = ni.first_filtered_dial_info_detail(sort, filter) { return Some(did); } @@ -280,9 +279,13 @@ pub trait NodeRefBase: Sized { fn last_connection(&self) -> Option { // Get the last connections and the last time we saw anything with this connection - // Filtered first and then sorted by most recent + // Filtered first and then sorted by sequencing and then by most recent self.operate(|rti, e| { - let last_connections = e.last_connections(rti, true, self.common().filter.clone()); + // apply sequencing to filter and get sort + let sequencing = self.common().sequencing; + let filter = self.common().filter.clone().unwrap_or_default(); + let (ordered, filter) = filter.with_sequencing(sequencing); + let last_connections = e.last_connections(rti, true, filter, ordered); last_connections.first().map(|x| x.0) }) } diff --git a/veilid-core/src/routing_table/node_ref_filter.rs b/veilid-core/src/routing_table/node_ref_filter.rs index 934d93a1..9c0936c5 100644 --- a/veilid-core/src/routing_table/node_ref_filter.rs +++ b/veilid-core/src/routing_table/node_ref_filter.rs @@ -19,7 +19,6 @@ impl NodeRefFilter { dial_info_filter: DialInfoFilter::all(), } } - pub fn with_routing_domain(mut self, routing_domain: RoutingDomain) -> Self { self.routing_domain_set = routing_domain.into(); self @@ -58,4 +57,54 @@ impl NodeRefFilter { pub fn is_dead(&self) -> bool { self.dial_info_filter.is_dead() || self.routing_domain_set.is_empty() } + pub fn with_sequencing(mut self, sequencing: Sequencing) -> (bool, Self) { + let (ordered, dif) = self.dial_info_filter.with_sequencing(sequencing); + self.dial_info_filter = dif; + (ordered, self) + } +} + +impl From for NodeRefFilter { + fn from(other: RoutingDomain) -> Self { + Self { + routing_domain_set: other.into(), + dial_info_filter: DialInfoFilter::all(), + } + } +} + +impl From for NodeRefFilter { + fn from(other: RoutingDomainSet) -> Self { + Self { + routing_domain_set: other, + dial_info_filter: DialInfoFilter::all(), + } + } +} + +impl From for NodeRefFilter { + fn from(other: DialInfoFilter) -> Self { + Self { + routing_domain_set: RoutingDomainSet::all(), + dial_info_filter: other, + } + } +} + +impl From for NodeRefFilter { + fn from(other: ProtocolType) -> Self { + Self { + routing_domain_set: RoutingDomainSet::all(), + dial_info_filter: DialInfoFilter::from(other), + } + } +} + +impl From for NodeRefFilter { + fn from(other: AddressType) -> Self { + Self { + routing_domain_set: RoutingDomainSet::all(), + dial_info_filter: DialInfoFilter::from(other), + } + } } diff --git a/veilid-core/src/routing_table/route_spec_store/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store/route_spec_store.rs index db490b08..9462c7cc 100644 --- a/veilid-core/src/routing_table/route_spec_store/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store/route_spec_store.rs @@ -616,8 +616,8 @@ impl RouteSpecStore { let private_route = self.assemble_private_route(&key, None)?; // Always test routes with safety routes that are more likely to succeed let stability = Stability::Reliable; - // Routes can test with whatever sequencing they were allocated with - let sequencing = Sequencing::NoPreference; + // Routes should test with the most likely to succeed sequencing they are capable of + let sequencing = Sequencing::PreferOrdered; let safety_spec = SafetySpec { preferred_route: Some(private_route_id), @@ -657,12 +657,17 @@ impl RouteSpecStore { bail!("no best key to test remote route"); }; + // Always test routes with safety routes that are more likely to succeed + let stability = Stability::Reliable; + // Routes should test with the most likely to succeed sequencing they are capable of + let sequencing = Sequencing::PreferOrdered; + // Get a safety route that is good enough let safety_spec = SafetySpec { preferred_route: None, hop_count: self.unlocked_inner.default_route_hop_count, - stability: Stability::default(), - sequencing: Sequencing::default(), + stability, + sequencing, }; let safety_selection = SafetySelection::Safe(safety_spec); diff --git a/veilid-core/src/routing_table/route_spec_store/route_stats.rs b/veilid-core/src/routing_table/route_spec_store/route_stats.rs index 93b4f304..1327e87f 100644 --- a/veilid-core/src/routing_table/route_spec_store/route_stats.rs +++ b/veilid-core/src/routing_table/route_spec_store/route_stats.rs @@ -61,6 +61,9 @@ impl RouteStats { pub fn record_sent(&mut self, cur_ts: Timestamp, bytes: ByteCount) { self.last_sent_ts = Some(cur_ts); self.transfer_stats_accounting.add_up(bytes); + + // If we sent successfully, then reset 'failed_to_send' + self.failed_to_send = 0; } /// Mark a route as having been sent to @@ -101,6 +104,8 @@ impl RouteStats { self.last_tested_ts = None; self.last_sent_ts = None; self.last_received_ts = None; + self.failed_to_send = 0; + self.questions_lost = 0; } /// Check if a route needs testing diff --git a/veilid-core/src/routing_table/routing_domains.rs b/veilid-core/src/routing_table/routing_domains.rs index 096d777d..e9f96a1b 100644 --- a/veilid-core/src/routing_table/routing_domains.rs +++ b/veilid-core/src/routing_table/routing_domains.rs @@ -223,7 +223,7 @@ impl Default for PublicInternetRoutingDomainDetail { } } -fn first_filtered_dial_info_detail( +fn first_filtered_dial_info_detail_between_nodes( from_node: &NodeInfo, to_node: &NodeInfo, dial_info_filter: &DialInfoFilter, @@ -235,28 +235,21 @@ fn first_filtered_dial_info_detail( .with_protocol_type_set(from_node.outbound_protocols()), ); - // Get first filtered dialinfo - let (sort, dial_info_filter) = match sequencing { - Sequencing::NoPreference => (None, dial_info_filter), - Sequencing::PreferOrdered => ( - Some(DialInfoDetail::ordered_sequencing_sort), - dial_info_filter, - ), - Sequencing::EnsureOrdered => ( - Some(DialInfoDetail::ordered_sequencing_sort), - dial_info_filter.filtered( - &DialInfoFilter::all().with_protocol_type_set(ProtocolType::all_ordered_set()), - ), - ), + // Apply sequencing and get sort + let (ordered, dial_info_filter) = dial_info_filter.with_sequencing(sequencing); + let sort = if ordered { + Some(DialInfoDetail::ordered_sequencing_sort) + } else { + None }; + // If the filter is dead then we won't be able to connect if dial_info_filter.is_dead() { return None; } - let direct_filter = |did: &DialInfoDetail| did.matches_filter(&dial_info_filter); - // Get the best match dial info for node B if we have it + let direct_filter = |did: &DialInfoDetail| did.matches_filter(&dial_info_filter); to_node.first_filtered_dial_info_detail(sort, direct_filter) } @@ -294,7 +287,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { // Get the best match dial info for node B if we have it if let Some(target_did) = - first_filtered_dial_info_detail(node_a, node_b, &dial_info_filter, sequencing) + first_filtered_dial_info_detail_between_nodes(node_a, node_b, &dial_info_filter, sequencing) { // Do we need to signal before going inbound? if !target_did.class.requires_signal() { @@ -319,7 +312,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { }; // Can node A reach the inbound relay directly? - if first_filtered_dial_info_detail( + if first_filtered_dial_info_detail_between_nodes( node_a, node_b_relay, &dial_info_filter, @@ -332,7 +325,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { ///////// Reverse connection // Get the best match dial info for an reverse inbound connection from node B to node A - if let Some(reverse_did) = first_filtered_dial_info_detail( + if let Some(reverse_did) = first_filtered_dial_info_detail_between_nodes( node_b, node_a, &dial_info_filter, @@ -358,14 +351,14 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { let udp_dial_info_filter = dial_info_filter .clone() .filtered(&DialInfoFilter::all().with_protocol_type(ProtocolType::UDP)); - if let Some(target_udp_did) = first_filtered_dial_info_detail( + if let Some(target_udp_did) = first_filtered_dial_info_detail_between_nodes( node_a, node_b, &udp_dial_info_filter, sequencing, ) { // Does node A have a direct udp dialinfo that node B can reach? - if let Some(reverse_udp_did) = first_filtered_dial_info_detail( + if let Some(reverse_udp_did) = first_filtered_dial_info_detail_between_nodes( node_b, node_a, &udp_dial_info_filter, @@ -407,7 +400,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { }; // Can we reach the full relay? - if first_filtered_dial_info_detail( + if first_filtered_dial_info_detail_between_nodes( node_a, &node_b_relay, &dial_info_filter, diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index 4f5af94c..1fe369f3 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -111,6 +111,9 @@ impl RoutingTableInner { routing_domain_set: RoutingDomainSet, filter: &DialInfoFilter, ) -> Option { + if filter.is_dead() || routing_domain_set.is_empty() { + return None; + } for routing_domain in routing_domain_set { let did = self.with_routing_domain(routing_domain, |rd| { for did in rd.common().dial_info_details() { @@ -133,6 +136,9 @@ impl RoutingTableInner { filter: &DialInfoFilter, ) -> Vec { let mut ret = Vec::new(); + if filter.is_dead() || routing_domain_set.is_empty() { + return ret; + } for routing_domain in routing_domain_set { self.with_routing_domain(routing_domain, |rd| { for did in rd.common().dial_info_details() { diff --git a/veilid-core/src/routing_table/tasks/bootstrap.rs b/veilid-core/src/routing_table/tasks/bootstrap.rs index f811e4aa..a23b5688 100644 --- a/veilid-core/src/routing_table/tasks/bootstrap.rs +++ b/veilid-core/src/routing_table/tasks/bootstrap.rs @@ -377,11 +377,8 @@ impl RoutingTable { // Ensure we got the signed peer info if !nr.signed_node_info_has_valid_signature(RoutingDomain::PublicInternet) { - log_rtab!(warn - "bootstrap at {:?} did not return valid signed node info", - nr - ); - // If this node info is invalid, it will time out after being unpingable + log_rtab!(warn "bootstrap server is not responding"); + log_rtab!(debug "bootstrap server is not responding: {}", nr); } else { // otherwise this bootstrap is valid, lets ask it to find ourselves now routing_table.reverse_find_node(crypto_kind, nr, true).await diff --git a/veilid-python/tests/test_routing_context.py b/veilid-python/tests/test_routing_context.py index 0628ed2c..588e1936 100644 --- a/veilid-python/tests/test_routing_context.py +++ b/veilid-python/tests/test_routing_context.py @@ -121,6 +121,8 @@ async def test_routing_context_app_message_loopback_big_packets(): if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE: await app_message_queue.put(update) + sent_messages: set[bytes] = set() + hostname, port = server_info() api = await veilid.json_api_connect( hostname, port, app_message_queue_update_callback @@ -130,8 +132,7 @@ async def test_routing_context_app_message_loopback_big_packets(): await api.debug("purge routes") # make a routing context that uses a safety route - #rc = await (await (await api.new_routing_context()).with_privacy()).with_sequencing(veilid.Sequencing.ENSURE_ORDERED) - rc = await (await api.new_routing_context()).with_privacy() + rc = await (await (await api.new_routing_context()).with_privacy()).with_sequencing(veilid.Sequencing.ENSURE_ORDERED) async with rc: # make a new local private route @@ -140,17 +141,21 @@ async def test_routing_context_app_message_loopback_big_packets(): # import it as a remote route as well so we can send to it prr = await api.import_remote_private_route(blob) - # do this test 10 times - for _ in range(10): + # do this test 100 times + for _ in range(1000): # send a random sized random app message to our own private route message = random.randbytes(random.randint(0, 32768)) await rc.app_message(prr, message) - # we should get the same message back + sent_messages.add(message) + + # we should get the same messages back + for _ in range(len(sent_messages)): + update: veilid.VeilidUpdate = await asyncio.wait_for( app_message_queue.get(), timeout=10 ) - assert isinstance(update.detail, veilid.VeilidAppMessage) - assert update.detail.message == message + + assert update.detail.message in sent_messages diff --git a/veilid-python/veilid/schema/RecvMessage.json b/veilid-python/veilid/schema/RecvMessage.json index 073e0bf1..142ebd10 100644 --- a/veilid-python/veilid/schema/RecvMessage.json +++ b/veilid-python/veilid/schema/RecvMessage.json @@ -3516,6 +3516,12 @@ "format": "uint32", "minimum": 0.0 }, + "network_key_password": { + "type": [ + "string", + "null" + ] + }, "protocol": { "$ref": "#/definitions/VeilidConfigProtocol" },