more sequencing

This commit is contained in:
John Smith 2023-06-25 01:23:24 -04:00
parent 234f048241
commit 3e23f808d0
11 changed files with 137 additions and 114 deletions

View File

@ -878,6 +878,7 @@ impl NetworkManager {
data: &mut [u8], data: &mut [u8],
connection_descriptor: ConnectionDescriptor, connection_descriptor: ConnectionDescriptor,
) -> EyreResult<bool> { ) -> EyreResult<bool> {
#[cfg(feature="verbose-tracing")]
let root = span!( let root = span!(
parent: None, parent: None,
Level::TRACE, Level::TRACE,
@ -885,6 +886,7 @@ impl NetworkManager {
"data.len" = data.len(), "data.len" = data.len(),
"descriptor" = ?connection_descriptor "descriptor" = ?connection_descriptor
); );
#[cfg(feature="verbose-tracing")]
let _root_enter = root.enter(); let _root_enter = root.enter();
log_net!( log_net!(
@ -1017,6 +1019,17 @@ impl NetworkManager {
}; };
if let Some(relay_nr) = some_relay_nr { if let Some(relay_nr) = some_relay_nr {
// Force sequencing if this came in sequenced.
// The sender did the prefer/ensure calculation when it did get_contact_method,
// so we don't need to do it here.
let relay_nr = if connection_descriptor.remote().protocol_type().is_ordered() {
let mut relay_nr = relay_nr.clone();
relay_nr.set_sequencing(Sequencing::EnsureOrdered);
relay_nr
} else {
relay_nr
};
// Relay the packet to the desired destination // Relay the packet to the desired destination
log_net!("relaying {} bytes to {}", data.len(), relay_nr); log_net!("relaying {} bytes to {}", data.len(), relay_nr);

View File

@ -342,21 +342,24 @@ impl NetworkManager {
ContactMethod::Existing => NodeContactMethod::Existing, ContactMethod::Existing => NodeContactMethod::Existing,
ContactMethod::Direct(di) => NodeContactMethod::Direct(di), ContactMethod::Direct(di) => NodeContactMethod::Direct(di),
ContactMethod::SignalReverse(relay_key, target_key) => { ContactMethod::SignalReverse(relay_key, target_key) => {
let relay_nr = routing_table let mut relay_nr = routing_table
.lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)? .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)?
.ok_or_else(|| eyre!("couldn't look up relay"))?; .ok_or_else(|| eyre!("couldn't look up relay"))?;
if !target_node_ref.node_ids().contains(&target_key) { if !target_node_ref.node_ids().contains(&target_key) {
bail!("signalreverse target noderef didn't match target key: {:?} != {} for relay {}", target_node_ref, target_key, relay_key ); bail!("signalreverse target noderef didn't match target key: {:?} != {} for relay {}", target_node_ref, target_key, relay_key );
} }
relay_nr.set_sequencing(sequencing);
NodeContactMethod::SignalReverse(relay_nr, target_node_ref) NodeContactMethod::SignalReverse(relay_nr, target_node_ref)
} }
ContactMethod::SignalHolePunch(relay_key, target_key) => { ContactMethod::SignalHolePunch(relay_key, target_key) => {
let relay_nr = routing_table let mut relay_nr = routing_table
.lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)? .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)?
.ok_or_else(|| eyre!("couldn't look up relay"))?; .ok_or_else(|| eyre!("couldn't look up relay"))?;
if !target_node_ref.node_ids().contains(&target_key) { if !target_node_ref.node_ids().contains(&target_key) {
bail!("signalholepunch target noderef didn't match target key: {:?} != {} for relay {}", target_node_ref, target_key, relay_key ); bail!("signalholepunch target noderef didn't match target key: {:?} != {} for relay {}", target_node_ref, target_key, relay_key );
} }
relay_nr.set_sequencing(sequencing);
// if any other protocol were possible here we could update this and do_hole_punch // if any other protocol were possible here we could update this and do_hole_punch
// but tcp hole punch is very very unreliable it seems // but tcp hole punch is very very unreliable it seems
let udp_target_node_ref = target_node_ref let udp_target_node_ref = target_node_ref
@ -365,15 +368,17 @@ impl NetworkManager {
NodeContactMethod::SignalHolePunch(relay_nr, udp_target_node_ref) NodeContactMethod::SignalHolePunch(relay_nr, udp_target_node_ref)
} }
ContactMethod::InboundRelay(relay_key) => { ContactMethod::InboundRelay(relay_key) => {
let relay_nr = routing_table let mut relay_nr = routing_table
.lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)? .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)?
.ok_or_else(|| eyre!("couldn't look up relay"))?; .ok_or_else(|| eyre!("couldn't look up relay"))?;
relay_nr.set_sequencing(sequencing);
NodeContactMethod::InboundRelay(relay_nr) NodeContactMethod::InboundRelay(relay_nr)
} }
ContactMethod::OutboundRelay(relay_key) => { ContactMethod::OutboundRelay(relay_key) => {
let relay_nr = routing_table let mut relay_nr = routing_table
.lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)? .lookup_and_filter_noderef(relay_key, routing_domain.into(), dial_info_filter)?
.ok_or_else(|| eyre!("couldn't look up relay"))?; .ok_or_else(|| eyre!("couldn't look up relay"))?;
relay_nr.set_sequencing(sequencing);
NodeContactMethod::OutboundRelay(relay_nr) NodeContactMethod::OutboundRelay(relay_nr)
} }
}; };

View File

@ -30,9 +30,7 @@ pub struct ConnectionDescriptor {
impl ConnectionDescriptor { impl ConnectionDescriptor {
pub fn new(remote: PeerAddress, local: SocketAddress) -> Self { pub fn new(remote: PeerAddress, local: SocketAddress) -> Self {
assert!( assert!(!remote.protocol_type().is_ordered() || !local.address().is_unspecified());
!remote.protocol_type().is_connection_oriented() || !local.address().is_unspecified()
);
Self { Self {
remote, remote,

View File

@ -25,7 +25,7 @@ pub enum ProtocolType {
} }
impl ProtocolType { impl ProtocolType {
pub fn is_connection_oriented(&self) -> bool { pub fn is_ordered(&self) -> bool {
matches!( matches!(
self, self,
ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS

View File

@ -441,7 +441,7 @@ impl BucketEntryInner {
// Check if the connection is still considered live // Check if the connection is still considered live
let alive = let alive =
// Should we check the connection table? // Should we check the connection table?
if v.0.protocol_type().is_connection_oriented() { if v.0.protocol_type().is_ordered() {
// Look the connection up in the connection manager and see if it's still there // Look the connection up in the connection manager and see if it's still there
connection_manager.get_connection(v.0).is_some() connection_manager.get_connection(v.0).is_some()
} else { } else {

View File

@ -1144,100 +1144,6 @@ impl RoutingTable {
} }
} }
} }
pub fn make_public_internet_relay_node_filter(&self) -> impl Fn(&BucketEntryInner) -> bool {
// Get all our outbound protocol/address types
let outbound_dif = self.get_outbound_dial_info_filter(RoutingDomain::PublicInternet);
let mapped_port_info = self.get_low_level_port_info();
move |e: &BucketEntryInner| {
// Ensure this node is not on the local network
if e.has_node_info(RoutingDomain::LocalNetwork.into()) {
return false;
}
// Disqualify nodes that don't cover all our inbound ports for tcp and udp
// as we need to be able to use the relay for keepalives for all nat mappings
let mut low_level_protocol_ports = mapped_port_info.low_level_protocol_ports.clone();
let can_serve_as_relay = e
.node_info(RoutingDomain::PublicInternet)
.map(|n| {
let dids = n.all_filtered_dial_info_details(
Some(DialInfoDetail::ordered_sequencing_sort), // By default, choose connection-oriented protocol for relay
|did| did.matches_filter(&outbound_dif),
);
for did in &dids {
let pt = did.dial_info.protocol_type();
let at = did.dial_info.address_type();
if let Some((llpt, port)) = mapped_port_info.protocol_to_port.get(&(pt, at))
{
low_level_protocol_ports.remove(&(*llpt, at, *port));
}
}
low_level_protocol_ports.is_empty()
})
.unwrap_or(false);
if !can_serve_as_relay {
return false;
}
true
}
}
#[instrument(level = "trace", skip(self), ret)]
pub fn find_inbound_relay(
&self,
routing_domain: RoutingDomain,
cur_ts: Timestamp,
) -> Option<NodeRef> {
// Get relay filter function
let relay_node_filter = match routing_domain {
RoutingDomain::PublicInternet => self.make_public_internet_relay_node_filter(),
RoutingDomain::LocalNetwork => {
unimplemented!();
}
};
// Go through all entries and find fastest entry that matches filter function
let inner = self.inner.read();
let inner = &*inner;
let mut best_inbound_relay: Option<Arc<BucketEntry>> = None;
// Iterate all known nodes for candidates
inner.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, entry| {
let entry2 = entry.clone();
entry.with(rti, |rti, e| {
// Ensure we have the node's status
if let Some(node_status) = e.node_status(routing_domain) {
// Ensure the node will relay
if node_status.will_relay() {
// Compare against previous candidate
if let Some(best_inbound_relay) = best_inbound_relay.as_mut() {
// Less is faster
let better = best_inbound_relay.with(rti, |_rti, best| {
// choose low latency stability for relays
BucketEntryInner::cmp_fastest_reliable(cur_ts, e, best)
== std::cmp::Ordering::Less
});
// Now apply filter function and see if this node should be included
if better && relay_node_filter(e) {
*best_inbound_relay = entry2;
}
} else if relay_node_filter(e) {
// Always store the first candidate
best_inbound_relay = Some(entry2);
}
}
}
});
// Don't end early, iterate through all entries
Option::<()>::None
});
// Return the best inbound relay noderef
best_inbound_relay.map(|e| NodeRef::new(self.clone(), e, None))
}
} }
impl core::ops::Deref for RoutingTable { impl core::ops::Deref for RoutingTable {

View File

@ -81,4 +81,97 @@ impl RoutingTable {
Ok(()) Ok(())
} }
pub fn make_public_internet_relay_node_filter(&self) -> impl Fn(&BucketEntryInner) -> bool {
// Get all our outbound protocol/address types
let outbound_dif = self.get_outbound_dial_info_filter(RoutingDomain::PublicInternet);
let mapped_port_info = self.get_low_level_port_info();
move |e: &BucketEntryInner| {
// Ensure this node is not on the local network
if e.has_node_info(RoutingDomain::LocalNetwork.into()) {
return false;
}
// Disqualify nodes that don't cover all our inbound ports for tcp and udp
// as we need to be able to use the relay for keepalives for all nat mappings
let mut low_level_protocol_ports = mapped_port_info.low_level_protocol_ports.clone();
let can_serve_as_relay = e
.node_info(RoutingDomain::PublicInternet)
.map(|n| {
let dids = n.all_filtered_dial_info_details(DialInfoDetail::NO_SORT, |did| {
did.matches_filter(&outbound_dif)
});
for did in &dids {
let pt = did.dial_info.protocol_type();
let at = did.dial_info.address_type();
if let Some((llpt, port)) = mapped_port_info.protocol_to_port.get(&(pt, at))
{
low_level_protocol_ports.remove(&(*llpt, at, *port));
}
}
low_level_protocol_ports.is_empty()
})
.unwrap_or(false);
if !can_serve_as_relay {
return false;
}
true
}
}
#[instrument(level = "trace", skip(self), ret)]
pub fn find_inbound_relay(
&self,
routing_domain: RoutingDomain,
cur_ts: Timestamp,
) -> Option<NodeRef> {
// Get relay filter function
let relay_node_filter = match routing_domain {
RoutingDomain::PublicInternet => self.make_public_internet_relay_node_filter(),
RoutingDomain::LocalNetwork => {
unimplemented!();
}
};
// Go through all entries and find fastest entry that matches filter function
let inner = self.inner.read();
let inner = &*inner;
let mut best_inbound_relay: Option<Arc<BucketEntry>> = None;
// Iterate all known nodes for candidates
inner.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, entry| {
let entry2 = entry.clone();
entry.with(rti, |rti, e| {
// Ensure we have the node's status
if let Some(node_status) = e.node_status(routing_domain) {
// Ensure the node will relay
if node_status.will_relay() {
// Compare against previous candidate
if let Some(best_inbound_relay) = best_inbound_relay.as_mut() {
// Less is faster
let better = best_inbound_relay.with(rti, |_rti, best| {
// choose low latency stability for relays
BucketEntryInner::cmp_fastest_reliable(cur_ts, e, best)
== std::cmp::Ordering::Less
});
// Now apply filter function and see if this node should be included
if better && relay_node_filter(e) {
*best_inbound_relay = entry2;
}
} else if relay_node_filter(e) {
// Always store the first candidate
best_inbound_relay = Some(entry2);
}
}
}
});
// Don't end early, iterate through all entries
Option::<()>::None
});
// Return the best inbound relay noderef
best_inbound_relay.map(|e| NodeRef::new(self.clone(), e, None))
}
} }

View File

@ -29,13 +29,11 @@ impl MatchesDialInfoFilter for DialInfoDetail {
impl DialInfoDetail { impl DialInfoDetail {
pub fn ordered_sequencing_sort(a: &DialInfoDetail, b: &DialInfoDetail) -> core::cmp::Ordering { pub fn ordered_sequencing_sort(a: &DialInfoDetail, b: &DialInfoDetail) -> core::cmp::Ordering {
if a.class < b.class { let c = DialInfo::ordered_sequencing_sort(&a.dial_info, &b.dial_info);
return core::cmp::Ordering::Less; if c != core::cmp::Ordering::Equal {
return c;
} }
if a.class > b.class { a.class.cmp(&b.class)
return core::cmp::Ordering::Greater;
}
DialInfo::ordered_sequencing_sort(&a.dial_info, &b.dial_info)
} }
pub const NO_SORT: std::option::Option< pub const NO_SORT: std::option::Option<
for<'r, 's> fn(&'r DialInfoDetail, &'s DialInfoDetail) -> std::cmp::Ordering, for<'r, 's> fn(&'r DialInfoDetail, &'s DialInfoDetail) -> std::cmp::Ordering,

View File

@ -71,7 +71,7 @@ impl SignedNodeInfo {
match sequencing { match sequencing {
Sequencing::NoPreference | Sequencing::PreferOrdered => return true, Sequencing::NoPreference | Sequencing::PreferOrdered => return true,
Sequencing::EnsureOrdered => { Sequencing::EnsureOrdered => {
if did.dial_info.protocol_type().is_connection_oriented() { if did.dial_info.protocol_type().is_ordered() {
return true; return true;
} }
} }
@ -85,7 +85,7 @@ impl SignedNodeInfo {
match sequencing { match sequencing {
Sequencing::NoPreference | Sequencing::PreferOrdered => return true, Sequencing::NoPreference | Sequencing::PreferOrdered => return true,
Sequencing::EnsureOrdered => { Sequencing::EnsureOrdered => {
if did.dial_info.protocol_type().is_connection_oriented() { if did.dial_info.protocol_type().is_ordered() {
return true; return true;
} }
} }

View File

@ -728,6 +728,10 @@ impl RPCProcessor {
if sequencing > node_ref.sequencing() { if sequencing > node_ref.sequencing() {
node_ref.set_sequencing(sequencing) node_ref.set_sequencing(sequencing)
} }
let mut destination_node_ref = destination_node_ref.clone();
if sequencing > destination_node_ref.sequencing() {
destination_node_ref.set_sequencing(sequencing)
}
// Reply private route should be None here, even for questions // Reply private route should be None here, even for questions
assert!(reply_private_route.is_none()); assert!(reply_private_route.is_none());

View File

@ -117,8 +117,13 @@ async def test_routing_context_app_message_loopback_big_packets():
app_message_queue: asyncio.Queue = asyncio.Queue() app_message_queue: asyncio.Queue = asyncio.Queue()
global got_message
got_message = 0
async def app_message_queue_update_callback(update: veilid.VeilidUpdate): async def app_message_queue_update_callback(update: veilid.VeilidUpdate):
if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE: if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE:
global got_message
got_message += 1
print("got {}".format(got_message))
await app_message_queue.put(update) await app_message_queue.put(update)
sent_messages: set[bytes] = set() sent_messages: set[bytes] = set()
@ -142,7 +147,7 @@ async def test_routing_context_app_message_loopback_big_packets():
prr = await api.import_remote_private_route(blob) prr = await api.import_remote_private_route(blob)
# do this test 100 times # do this test 100 times
for _ in range(1000): for _ in range(100):
# send a random sized random app message to our own private route # send a random sized random app message to our own private route
message = random.randbytes(random.randint(0, 32768)) message = random.randbytes(random.randint(0, 32768))
@ -151,8 +156,9 @@ async def test_routing_context_app_message_loopback_big_packets():
sent_messages.add(message) sent_messages.add(message)
# we should get the same messages back # we should get the same messages back
for _ in range(len(sent_messages)): print(len(sent_messages))
for n in range(len(sent_messages)):
print(n)
update: veilid.VeilidUpdate = await asyncio.wait_for( update: veilid.VeilidUpdate = await asyncio.wait_for(
app_message_queue.get(), timeout=10 app_message_queue.get(), timeout=10
) )