reliability work

This commit is contained in:
John Smith 2023-06-24 11:16:34 -04:00
parent acebcb7947
commit 197b7fef6e
14 changed files with 191 additions and 84 deletions

View File

@ -1122,6 +1122,7 @@ impl NetworkManager {
// or other firewalling issues and may perform better with TCP. // or other firewalling issues and may perform better with TCP.
let unreliable = target_node_ref.peer_stats().rpc_stats.failed_to_send > 2 || target_node_ref.peer_stats().rpc_stats.recent_lost_answers > 2; let unreliable = target_node_ref.peer_stats().rpc_stats.failed_to_send > 2 || target_node_ref.peer_stats().rpc_stats.recent_lost_answers > 2;
if unreliable && sequencing < Sequencing::PreferOrdered { if unreliable && sequencing < Sequencing::PreferOrdered {
log_net!(debug "Node contact failing over to Ordered for {}", target_node_ref.to_string().cyan());
sequencing = Sequencing::PreferOrdered; sequencing = Sequencing::PreferOrdered;
} }

View File

@ -491,13 +491,9 @@ impl DialInfo {
} }
pub fn ordered_sequencing_sort(a: &DialInfo, b: &DialInfo) -> core::cmp::Ordering { pub fn ordered_sequencing_sort(a: &DialInfo, b: &DialInfo) -> core::cmp::Ordering {
let ca = a.protocol_type().sort_order(Sequencing::EnsureOrdered); let s = ProtocolType::ordered_sequencing_sort(a.protocol_type(), b.protocol_type());
let cb = b.protocol_type().sort_order(Sequencing::EnsureOrdered); if s != core::cmp::Ordering::Equal {
if ca < cb { return s;
return core::cmp::Ordering::Less;
}
if ca > cb {
return core::cmp::Ordering::Greater;
} }
match (a, b) { match (a, b) {
(DialInfo::UDP(a), DialInfo::UDP(b)) => a.cmp(b), (DialInfo::UDP(a), DialInfo::UDP(b)) => a.cmp(b),

View File

@ -61,6 +61,20 @@ impl DialInfoFilter {
pub fn is_dead(&self) -> bool { pub fn is_dead(&self) -> bool {
self.protocol_type_set.is_empty() || self.address_type_set.is_empty() self.protocol_type_set.is_empty() || self.address_type_set.is_empty()
} }
pub fn with_sequencing(mut self, sequencing: Sequencing) -> (bool, DialInfoFilter) {
// Get first filtered dialinfo
match sequencing {
Sequencing::NoPreference => (false, self),
Sequencing::PreferOrdered => (true, self),
Sequencing::EnsureOrdered => (
true,
self.filtered(
&DialInfoFilter::all().with_protocol_type_set(ProtocolType::all_ordered_set()),
),
),
}
// return ordered sort and filter with ensure applied
}
} }
impl fmt::Debug for DialInfoFilter { impl fmt::Debug for DialInfoFilter {
@ -80,7 +94,24 @@ impl fmt::Debug for DialInfoFilter {
} }
} }
impl From<ProtocolType> for DialInfoFilter {
fn from(other: ProtocolType) -> Self {
Self {
protocol_type_set: ProtocolTypeSet::from(other),
address_type_set: AddressTypeSet::all(),
}
}
}
impl From<AddressType> for DialInfoFilter {
fn from(other: AddressType) -> Self {
Self {
protocol_type_set: ProtocolTypeSet::all(),
address_type_set: AddressTypeSet::from(other),
}
}
}
pub trait MatchesDialInfoFilter { pub trait MatchesDialInfoFilter {
fn matches_filter(&self, filter: &DialInfoFilter) -> bool; fn matches_filter(&self, filter: &DialInfoFilter) -> bool;
} }

View File

@ -72,6 +72,18 @@ impl ProtocolType {
pub fn all_ordered_set() -> ProtocolTypeSet { pub fn all_ordered_set() -> ProtocolTypeSet {
ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS
} }
pub fn ordered_sequencing_sort(a: Self, b: Self) -> core::cmp::Ordering {
let ca = a.sort_order(Sequencing::EnsureOrdered);
let cb = b.sort_order(Sequencing::EnsureOrdered);
if ca < cb {
return core::cmp::Ordering::Less;
}
if ca > cb {
return core::cmp::Ordering::Greater;
}
core::cmp::Ordering::Equal
}
} }
impl fmt::Display for ProtocolType { impl fmt::Display for ProtocolType {

View File

@ -316,7 +316,8 @@ impl BucketEntryInner {
let last_connections = self.last_connections( let last_connections = self.last_connections(
rti, rti,
true, true,
Some(NodeRefFilter::new().with_routing_domain(routing_domain)), NodeRefFilter::from(routing_domain),
false,
); );
!last_connections.is_empty() !last_connections.is_empty()
} }
@ -370,7 +371,8 @@ impl BucketEntryInner {
let last_connections = self.last_connections( let last_connections = self.last_connections(
rti, rti,
true, true,
Some(NodeRefFilter::new().with_routing_domain_set(routing_domain_set)), NodeRefFilter::from(routing_domain_set),
false
); );
for lc in last_connections { for lc in last_connections {
if let Some(rd) = if let Some(rd) =
@ -412,7 +414,8 @@ impl BucketEntryInner {
&self, &self,
rti: &RoutingTableInner, rti: &RoutingTableInner,
only_live: bool, only_live: bool,
filter: Option<NodeRefFilter>, filter: NodeRefFilter,
ordered: bool,
) -> Vec<(ConnectionDescriptor, Timestamp)> { ) -> Vec<(ConnectionDescriptor, Timestamp)> {
let connection_manager = let connection_manager =
rti.unlocked_inner.network_manager.connection_manager(); rti.unlocked_inner.network_manager.connection_manager();
@ -421,26 +424,13 @@ impl BucketEntryInner {
.last_connections .last_connections
.iter() .iter()
.filter_map(|(k, v)| { .filter_map(|(k, v)| {
let include = if let Some(filter) = &filter { let include = {
let remote_address = v.0.remote_address().address(); let remote_address = v.0.remote_address().address();
if let Some(routing_domain) = rti.routing_domain_for_address(remote_address) { rti.routing_domain_for_address(remote_address).map(|rd| {
if filter.routing_domain_set.contains(routing_domain) filter.routing_domain_set.contains(rd)
&& filter.dial_info_filter.protocol_type_set.contains(k.0) && filter.dial_info_filter.protocol_type_set.contains(k.0)
&& filter.dial_info_filter.address_type_set.contains(k.1) && filter.dial_info_filter.address_type_set.contains(k.1)
{ }).unwrap_or(false)
// matches filter
true
} else {
// does not match filter
false
}
} else {
// no valid routing domain
false
}
} else {
// no filter
true
}; };
if !include { if !include {
@ -471,8 +461,16 @@ impl BucketEntryInner {
} }
}) })
.collect(); .collect();
// Sort with newest timestamps first // Sort with ordering preference first and then sort with newest timestamps
out.sort_by(|a, b| b.1.cmp(&a.1)); out.sort_by(|a, b| {
if ordered {
let s = ProtocolType::ordered_sequencing_sort(a.0.protocol_type(), b.0.protocol_type());
if s != core::cmp::Ordering::Equal {
return s;
}
}
b.1.cmp(&a.1)
});
out out
} }

View File

@ -217,25 +217,24 @@ pub trait NodeRefBase: Sized {
fn first_filtered_dial_info_detail(&self) -> Option<DialInfoDetail> { fn first_filtered_dial_info_detail(&self) -> Option<DialInfoDetail> {
let routing_domain_set = self.routing_domain_set(); let routing_domain_set = self.routing_domain_set();
let dial_info_filter = self.dial_info_filter(); let dial_info_filter = self.dial_info_filter();
let sequencing = self.common().sequencing;
let (ordered, dial_info_filter) = dial_info_filter.with_sequencing(sequencing);
let (sort, dial_info_filter) = match self.common().sequencing { let sort = if ordered {
Sequencing::NoPreference => (None, dial_info_filter), Some(DialInfoDetail::ordered_sequencing_sort)
Sequencing::PreferOrdered => ( } else {
Some(DialInfoDetail::ordered_sequencing_sort), None
dial_info_filter,
),
Sequencing::EnsureOrdered => (
Some(DialInfoDetail::ordered_sequencing_sort),
dial_info_filter.filtered(
&DialInfoFilter::all().with_protocol_type_set(ProtocolType::all_ordered_set()),
),
),
}; };
if dial_info_filter.is_dead() {
return None;
}
let filter = |did: &DialInfoDetail| did.matches_filter(&dial_info_filter);
self.operate(|_rt, e| { self.operate(|_rt, e| {
for routing_domain in routing_domain_set { for routing_domain in routing_domain_set {
if let Some(ni) = e.node_info(routing_domain) { if let Some(ni) = e.node_info(routing_domain) {
let filter = |did: &DialInfoDetail| did.matches_filter(&dial_info_filter);
if let Some(did) = ni.first_filtered_dial_info_detail(sort, filter) { if let Some(did) = ni.first_filtered_dial_info_detail(sort, filter) {
return Some(did); return Some(did);
} }
@ -280,9 +279,13 @@ pub trait NodeRefBase: Sized {
fn last_connection(&self) -> Option<ConnectionDescriptor> { fn last_connection(&self) -> Option<ConnectionDescriptor> {
// Get the last connections and the last time we saw anything with this connection // Get the last connections and the last time we saw anything with this connection
// Filtered first and then sorted by most recent // Filtered first and then sorted by sequencing and then by most recent
self.operate(|rti, e| { self.operate(|rti, e| {
let last_connections = e.last_connections(rti, true, self.common().filter.clone()); // apply sequencing to filter and get sort
let sequencing = self.common().sequencing;
let filter = self.common().filter.clone().unwrap_or_default();
let (ordered, filter) = filter.with_sequencing(sequencing);
let last_connections = e.last_connections(rti, true, filter, ordered);
last_connections.first().map(|x| x.0) last_connections.first().map(|x| x.0)
}) })
} }

View File

@ -19,7 +19,6 @@ impl NodeRefFilter {
dial_info_filter: DialInfoFilter::all(), dial_info_filter: DialInfoFilter::all(),
} }
} }
pub fn with_routing_domain(mut self, routing_domain: RoutingDomain) -> Self { pub fn with_routing_domain(mut self, routing_domain: RoutingDomain) -> Self {
self.routing_domain_set = routing_domain.into(); self.routing_domain_set = routing_domain.into();
self self
@ -58,4 +57,54 @@ impl NodeRefFilter {
pub fn is_dead(&self) -> bool { pub fn is_dead(&self) -> bool {
self.dial_info_filter.is_dead() || self.routing_domain_set.is_empty() self.dial_info_filter.is_dead() || self.routing_domain_set.is_empty()
} }
pub fn with_sequencing(mut self, sequencing: Sequencing) -> (bool, Self) {
let (ordered, dif) = self.dial_info_filter.with_sequencing(sequencing);
self.dial_info_filter = dif;
(ordered, self)
}
}
impl From<RoutingDomain> for NodeRefFilter {
fn from(other: RoutingDomain) -> Self {
Self {
routing_domain_set: other.into(),
dial_info_filter: DialInfoFilter::all(),
}
}
}
impl From<RoutingDomainSet> for NodeRefFilter {
fn from(other: RoutingDomainSet) -> Self {
Self {
routing_domain_set: other,
dial_info_filter: DialInfoFilter::all(),
}
}
}
impl From<DialInfoFilter> for NodeRefFilter {
fn from(other: DialInfoFilter) -> Self {
Self {
routing_domain_set: RoutingDomainSet::all(),
dial_info_filter: other,
}
}
}
impl From<ProtocolType> for NodeRefFilter {
fn from(other: ProtocolType) -> Self {
Self {
routing_domain_set: RoutingDomainSet::all(),
dial_info_filter: DialInfoFilter::from(other),
}
}
}
impl From<AddressType> for NodeRefFilter {
fn from(other: AddressType) -> Self {
Self {
routing_domain_set: RoutingDomainSet::all(),
dial_info_filter: DialInfoFilter::from(other),
}
}
} }

View File

@ -616,8 +616,8 @@ impl RouteSpecStore {
let private_route = self.assemble_private_route(&key, None)?; let private_route = self.assemble_private_route(&key, None)?;
// Always test routes with safety routes that are more likely to succeed // Always test routes with safety routes that are more likely to succeed
let stability = Stability::Reliable; let stability = Stability::Reliable;
// Routes can test with whatever sequencing they were allocated with // Routes should test with the most likely to succeed sequencing they are capable of
let sequencing = Sequencing::NoPreference; let sequencing = Sequencing::PreferOrdered;
let safety_spec = SafetySpec { let safety_spec = SafetySpec {
preferred_route: Some(private_route_id), preferred_route: Some(private_route_id),
@ -657,12 +657,17 @@ impl RouteSpecStore {
bail!("no best key to test remote route"); bail!("no best key to test remote route");
}; };
// Always test routes with safety routes that are more likely to succeed
let stability = Stability::Reliable;
// Routes should test with the most likely to succeed sequencing they are capable of
let sequencing = Sequencing::PreferOrdered;
// Get a safety route that is good enough // Get a safety route that is good enough
let safety_spec = SafetySpec { let safety_spec = SafetySpec {
preferred_route: None, preferred_route: None,
hop_count: self.unlocked_inner.default_route_hop_count, hop_count: self.unlocked_inner.default_route_hop_count,
stability: Stability::default(), stability,
sequencing: Sequencing::default(), sequencing,
}; };
let safety_selection = SafetySelection::Safe(safety_spec); let safety_selection = SafetySelection::Safe(safety_spec);

View File

@ -61,6 +61,9 @@ impl RouteStats {
pub fn record_sent(&mut self, cur_ts: Timestamp, bytes: ByteCount) { pub fn record_sent(&mut self, cur_ts: Timestamp, bytes: ByteCount) {
self.last_sent_ts = Some(cur_ts); self.last_sent_ts = Some(cur_ts);
self.transfer_stats_accounting.add_up(bytes); self.transfer_stats_accounting.add_up(bytes);
// If we sent successfully, then reset 'failed_to_send'
self.failed_to_send = 0;
} }
/// Mark a route as having been sent to /// Mark a route as having been sent to
@ -101,6 +104,8 @@ impl RouteStats {
self.last_tested_ts = None; self.last_tested_ts = None;
self.last_sent_ts = None; self.last_sent_ts = None;
self.last_received_ts = None; self.last_received_ts = None;
self.failed_to_send = 0;
self.questions_lost = 0;
} }
/// Check if a route needs testing /// Check if a route needs testing

View File

@ -223,7 +223,7 @@ impl Default for PublicInternetRoutingDomainDetail {
} }
} }
fn first_filtered_dial_info_detail( fn first_filtered_dial_info_detail_between_nodes(
from_node: &NodeInfo, from_node: &NodeInfo,
to_node: &NodeInfo, to_node: &NodeInfo,
dial_info_filter: &DialInfoFilter, dial_info_filter: &DialInfoFilter,
@ -235,28 +235,21 @@ fn first_filtered_dial_info_detail(
.with_protocol_type_set(from_node.outbound_protocols()), .with_protocol_type_set(from_node.outbound_protocols()),
); );
// Get first filtered dialinfo // Apply sequencing and get sort
let (sort, dial_info_filter) = match sequencing { let (ordered, dial_info_filter) = dial_info_filter.with_sequencing(sequencing);
Sequencing::NoPreference => (None, dial_info_filter), let sort = if ordered {
Sequencing::PreferOrdered => ( Some(DialInfoDetail::ordered_sequencing_sort)
Some(DialInfoDetail::ordered_sequencing_sort), } else {
dial_info_filter, None
),
Sequencing::EnsureOrdered => (
Some(DialInfoDetail::ordered_sequencing_sort),
dial_info_filter.filtered(
&DialInfoFilter::all().with_protocol_type_set(ProtocolType::all_ordered_set()),
),
),
}; };
// If the filter is dead then we won't be able to connect // If the filter is dead then we won't be able to connect
if dial_info_filter.is_dead() { if dial_info_filter.is_dead() {
return None; return None;
} }
let direct_filter = |did: &DialInfoDetail| did.matches_filter(&dial_info_filter);
// Get the best match dial info for node B if we have it // Get the best match dial info for node B if we have it
let direct_filter = |did: &DialInfoDetail| did.matches_filter(&dial_info_filter);
to_node.first_filtered_dial_info_detail(sort, direct_filter) to_node.first_filtered_dial_info_detail(sort, direct_filter)
} }
@ -294,7 +287,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
// Get the best match dial info for node B if we have it // Get the best match dial info for node B if we have it
if let Some(target_did) = if let Some(target_did) =
first_filtered_dial_info_detail(node_a, node_b, &dial_info_filter, sequencing) first_filtered_dial_info_detail_between_nodes(node_a, node_b, &dial_info_filter, sequencing)
{ {
// Do we need to signal before going inbound? // Do we need to signal before going inbound?
if !target_did.class.requires_signal() { if !target_did.class.requires_signal() {
@ -319,7 +312,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
}; };
// Can node A reach the inbound relay directly? // Can node A reach the inbound relay directly?
if first_filtered_dial_info_detail( if first_filtered_dial_info_detail_between_nodes(
node_a, node_a,
node_b_relay, node_b_relay,
&dial_info_filter, &dial_info_filter,
@ -332,7 +325,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
///////// Reverse connection ///////// Reverse connection
// Get the best match dial info for an reverse inbound connection from node B to node A // Get the best match dial info for an reverse inbound connection from node B to node A
if let Some(reverse_did) = first_filtered_dial_info_detail( if let Some(reverse_did) = first_filtered_dial_info_detail_between_nodes(
node_b, node_b,
node_a, node_a,
&dial_info_filter, &dial_info_filter,
@ -358,14 +351,14 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
let udp_dial_info_filter = dial_info_filter let udp_dial_info_filter = dial_info_filter
.clone() .clone()
.filtered(&DialInfoFilter::all().with_protocol_type(ProtocolType::UDP)); .filtered(&DialInfoFilter::all().with_protocol_type(ProtocolType::UDP));
if let Some(target_udp_did) = first_filtered_dial_info_detail( if let Some(target_udp_did) = first_filtered_dial_info_detail_between_nodes(
node_a, node_a,
node_b, node_b,
&udp_dial_info_filter, &udp_dial_info_filter,
sequencing, sequencing,
) { ) {
// Does node A have a direct udp dialinfo that node B can reach? // Does node A have a direct udp dialinfo that node B can reach?
if let Some(reverse_udp_did) = first_filtered_dial_info_detail( if let Some(reverse_udp_did) = first_filtered_dial_info_detail_between_nodes(
node_b, node_b,
node_a, node_a,
&udp_dial_info_filter, &udp_dial_info_filter,
@ -407,7 +400,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
}; };
// Can we reach the full relay? // Can we reach the full relay?
if first_filtered_dial_info_detail( if first_filtered_dial_info_detail_between_nodes(
node_a, node_a,
&node_b_relay, &node_b_relay,
&dial_info_filter, &dial_info_filter,

View File

@ -111,6 +111,9 @@ impl RoutingTableInner {
routing_domain_set: RoutingDomainSet, routing_domain_set: RoutingDomainSet,
filter: &DialInfoFilter, filter: &DialInfoFilter,
) -> Option<DialInfoDetail> { ) -> Option<DialInfoDetail> {
if filter.is_dead() || routing_domain_set.is_empty() {
return None;
}
for routing_domain in routing_domain_set { for routing_domain in routing_domain_set {
let did = self.with_routing_domain(routing_domain, |rd| { let did = self.with_routing_domain(routing_domain, |rd| {
for did in rd.common().dial_info_details() { for did in rd.common().dial_info_details() {
@ -133,6 +136,9 @@ impl RoutingTableInner {
filter: &DialInfoFilter, filter: &DialInfoFilter,
) -> Vec<DialInfoDetail> { ) -> Vec<DialInfoDetail> {
let mut ret = Vec::new(); let mut ret = Vec::new();
if filter.is_dead() || routing_domain_set.is_empty() {
return ret;
}
for routing_domain in routing_domain_set { for routing_domain in routing_domain_set {
self.with_routing_domain(routing_domain, |rd| { self.with_routing_domain(routing_domain, |rd| {
for did in rd.common().dial_info_details() { for did in rd.common().dial_info_details() {

View File

@ -377,11 +377,8 @@ impl RoutingTable {
// Ensure we got the signed peer info // Ensure we got the signed peer info
if !nr.signed_node_info_has_valid_signature(RoutingDomain::PublicInternet) { if !nr.signed_node_info_has_valid_signature(RoutingDomain::PublicInternet) {
log_rtab!(warn log_rtab!(warn "bootstrap server is not responding");
"bootstrap at {:?} did not return valid signed node info", log_rtab!(debug "bootstrap server is not responding: {}", nr);
nr
);
// If this node info is invalid, it will time out after being unpingable
} else { } else {
// otherwise this bootstrap is valid, lets ask it to find ourselves now // otherwise this bootstrap is valid, lets ask it to find ourselves now
routing_table.reverse_find_node(crypto_kind, nr, true).await routing_table.reverse_find_node(crypto_kind, nr, true).await

View File

@ -121,6 +121,8 @@ async def test_routing_context_app_message_loopback_big_packets():
if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE: if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE:
await app_message_queue.put(update) await app_message_queue.put(update)
sent_messages: set[bytes] = set()
hostname, port = server_info() hostname, port = server_info()
api = await veilid.json_api_connect( api = await veilid.json_api_connect(
hostname, port, app_message_queue_update_callback hostname, port, app_message_queue_update_callback
@ -130,8 +132,7 @@ async def test_routing_context_app_message_loopback_big_packets():
await api.debug("purge routes") await api.debug("purge routes")
# make a routing context that uses a safety route # make a routing context that uses a safety route
#rc = await (await (await api.new_routing_context()).with_privacy()).with_sequencing(veilid.Sequencing.ENSURE_ORDERED) rc = await (await (await api.new_routing_context()).with_privacy()).with_sequencing(veilid.Sequencing.ENSURE_ORDERED)
rc = await (await api.new_routing_context()).with_privacy()
async with rc: async with rc:
# make a new local private route # make a new local private route
@ -140,17 +141,21 @@ async def test_routing_context_app_message_loopback_big_packets():
# import it as a remote route as well so we can send to it # import it as a remote route as well so we can send to it
prr = await api.import_remote_private_route(blob) prr = await api.import_remote_private_route(blob)
# do this test 10 times # do this test 100 times
for _ in range(10): for _ in range(1000):
# send a random sized random app message to our own private route # send a random sized random app message to our own private route
message = random.randbytes(random.randint(0, 32768)) message = random.randbytes(random.randint(0, 32768))
await rc.app_message(prr, message) await rc.app_message(prr, message)
# we should get the same message back sent_messages.add(message)
# we should get the same messages back
for _ in range(len(sent_messages)):
update: veilid.VeilidUpdate = await asyncio.wait_for( update: veilid.VeilidUpdate = await asyncio.wait_for(
app_message_queue.get(), timeout=10 app_message_queue.get(), timeout=10
) )
assert isinstance(update.detail, veilid.VeilidAppMessage) assert isinstance(update.detail, veilid.VeilidAppMessage)
assert update.detail.message == message
assert update.detail.message in sent_messages

View File

@ -3516,6 +3516,12 @@
"format": "uint32", "format": "uint32",
"minimum": 0.0 "minimum": 0.0
}, },
"network_key_password": {
"type": [
"string",
"null"
]
},
"protocol": { "protocol": {
"$ref": "#/definitions/VeilidConfigProtocol" "$ref": "#/definitions/VeilidConfigProtocol"
}, },