more route work

This commit is contained in:
John Smith 2022-10-21 10:35:03 -04:00
parent c8ba88fb99
commit a1b40c79f1
8 changed files with 207 additions and 97 deletions

View File

@ -1182,7 +1182,7 @@ impl NetworkManager {
// Dial info filter comes from the target node ref
let dial_info_filter = target_node_ref.dial_info_filter();
let reliable = target_node_ref.reliable();
let sequencing = target_node_ref.sequencing();
let cm = routing_table.get_contact_method(
routing_domain,
@ -1191,7 +1191,7 @@ impl NetworkManager {
&node_b_id,
&node_b,
dial_info_filter,
reliable,
sequencing,
);
// Translate the raw contact method to a referenced contact method

View File

@ -263,7 +263,7 @@ impl RoutingTable {
node_b_id: &DHTKey,
node_b: &NodeInfo,
dial_info_filter: DialInfoFilter,
reliable: bool,
sequencing: Sequencing,
) -> ContactMethod {
self.inner.read().get_contact_method(
routing_domain,
@ -272,7 +272,7 @@ impl RoutingTable {
node_b_id,
node_b,
dial_info_filter,
reliable,
sequencing,
)
}
@ -836,7 +836,7 @@ impl RoutingTable {
.node_info(RoutingDomain::PublicInternet)
.map(|n| {
let dids = n.all_filtered_dial_info_details(
Some(DialInfoDetail::reliable_sort), // By default, choose reliable protocol for relay
Some(DialInfoDetail::ordered_sequencing_sort), // By default, choose connection-oriented protocol for relay
|did| did.matches_filter(&outbound_dif),
);
for did in &dids {
@ -889,6 +889,7 @@ impl RoutingTable {
if let Some(best_inbound_relay) = best_inbound_relay.as_mut() {
// Less is faster
let better = best_inbound_relay.1.with(rti, |_rti, best| {
// choose low latency stability for relays
BucketEntryInner::cmp_fastest_reliable(cur_ts, e, best)
== std::cmp::Ordering::Less
});

View File

@ -71,7 +71,7 @@ pub struct NodeRef {
node_id: DHTKey,
entry: Arc<BucketEntry>,
filter: Option<NodeRefFilter>,
reliable: bool,
sequencing: Sequencing,
#[cfg(feature = "tracking")]
track_id: usize,
}
@ -90,7 +90,7 @@ impl NodeRef {
node_id,
entry,
filter,
reliable: false,
sequencing: Sequencing::NoPreference,
#[cfg(feature = "tracking")]
track_id: entry.track(),
}
@ -127,11 +127,11 @@ impl NodeRef {
self.filter = filter
}
pub fn set_reliable(&mut self) {
self.reliable = true;
pub fn set_sequencing(&mut self, sequencing: Sequencing) {
self.sequencing = sequencing;
}
pub fn reliable(&self) -> bool {
self.reliable
pub fn sequencing(&self) -> Sequencing {
self.sequencing
}
pub fn merge_filter(&mut self, filter: NodeRefFilter) {
@ -278,10 +278,18 @@ impl NodeRef {
let routing_domain_set = self.routing_domain_set();
let dial_info_filter = self.dial_info_filter();
let sort = if self.reliable {
Some(DialInfoDetail::reliable_sort)
} else {
None
let (sort, dial_info_filter) = match self.sequencing {
Sequencing::NoPreference => (None, dial_info_filter),
Sequencing::PreferOrdered => (
Some(DialInfoDetail::ordered_sequencing_sort),
dial_info_filter,
),
Sequencing::EnsureOrdered => (
Some(DialInfoDetail::ordered_sequencing_sort),
dial_info_filter.filtered(
&DialInfoFilter::all().with_protocol_type_set(ProtocolType::all_ordered_set()),
),
),
};
self.operate(|_rt, e| {
@ -301,10 +309,18 @@ impl NodeRef {
let routing_domain_set = self.routing_domain_set();
let dial_info_filter = self.dial_info_filter();
let sort = if self.reliable {
Some(DialInfoDetail::reliable_sort)
} else {
None
let (sort, dial_info_filter) = match self.sequencing {
Sequencing::NoPreference => (None, dial_info_filter),
Sequencing::PreferOrdered => (
Some(DialInfoDetail::ordered_sequencing_sort),
dial_info_filter,
),
Sequencing::EnsureOrdered => (
Some(DialInfoDetail::ordered_sequencing_sort),
dial_info_filter.filtered(
&DialInfoFilter::all().with_protocol_type_set(ProtocolType::all_ordered_set()),
),
),
};
let mut out = Vec::new();
@ -418,20 +434,20 @@ impl Clone for NodeRef {
node_id: self.node_id,
entry: self.entry.clone(),
filter: self.filter.clone(),
reliable: self.reliable,
sequencing: self.sequencing,
#[cfg(feature = "tracking")]
track_id: e.track(),
}
}
}
impl PartialEq for NodeRef {
fn eq(&self, other: &Self) -> bool {
self.node_id == other.node_id
}
}
// impl PartialEq for NodeRef {
// fn eq(&self, other: &Self) -> bool {
// self.node_id == other.node_id
// }
// }
impl Eq for NodeRef {}
// impl Eq for NodeRef {}
impl fmt::Display for NodeRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
@ -444,6 +460,7 @@ impl fmt::Debug for NodeRef {
f.debug_struct("NodeRef")
.field("node_id", &self.node_id)
.field("filter", &self.filter)
.field("sequencing", &self.sequencing)
.finish()
}
}

View File

@ -9,8 +9,10 @@ pub struct SafetySpec {
pub preferred_route: Option<DHTKey>,
/// 0 = no safety route, just use node's node id, more hops is safer but slower
pub hop_count: usize,
/// prefer more reliable protocols and relays over faster ones
pub reliable: bool,
/// prefer reliability over speed
pub stability: Stability,
/// prefer connection-oriented sequenced protocols
pub sequencing: Sequencing,
}
/// Compiled route (safety route + private route)
@ -59,8 +61,10 @@ struct RouteSpecDetail {
last_used_ts: Option<u64>,
/// Directions this route is guaranteed to work in
directions: DirectionSet,
/// Reliability
reliable: bool,
/// Stability preference (prefer reliable nodes over faster)
stability: Stability,
/// Sequencing preference (connection oriented protocols vs datagram)
sequencing: Sequencing,
}
/// The core representation of the RouteSpecStore that can be serialized
@ -81,6 +85,7 @@ pub struct RouteSpecStoreCache {
hop_cache: HashSet<Vec<u8>>,
}
/// The routing table's storage for private/safety routes
#[derive(Debug)]
pub struct RouteSpecStore {
/// Maximum number of hops in a route
@ -93,6 +98,15 @@ pub struct RouteSpecStore {
cache: RouteSpecStoreCache,
}
/// The choice of safety route including in compiled routes
#[derive(Debug, Clone)]
pub enum SafetySelection {
/// Don't use a safety route, only specify the sequencing preference
Unsafe(Sequencing),
/// Use a safety route and parameters specified by a SafetySpec
Safe(SafetySpec),
}
fn route_hops_to_hop_cache(hops: &[DHTKey]) -> Vec<u8> {
let mut cache: Vec<u8> = Vec::with_capacity(hops.len() * DHT_KEY_LENGTH);
for hop in hops {
@ -293,7 +307,8 @@ impl RouteSpecStore {
&mut self,
rti: &RoutingTableInner,
routing_table: RoutingTable,
reliable: bool,
stability: Stability,
sequencing: Sequencing,
hop_count: usize,
directions: DirectionSet,
) -> EyreResult<Option<DHTKey>> {
@ -327,7 +342,7 @@ impl RouteSpecStore {
// Exclude nodes with no publicinternet nodeinfo, or incompatible nodeinfo or node status won't route
v.with(rti, |_rti, e| {
let node_info_ok = if let Some(ni) = e.node_info(RoutingDomain::PublicInternet) {
ni.has_any_dial_info()
ni.has_sequencing_matched_dial_info(sequencing)
} else {
false
};
@ -383,13 +398,16 @@ impl RouteSpecStore {
// always prioritize reliable nodes, but sort by oldest or fastest
let cmpout = v1.1.as_ref().unwrap().with(rti, |rti, e1| {
v2.1.as_ref().unwrap().with(rti, |_rti, e2| {
if reliable {
BucketEntryInner::cmp_oldest_reliable(cur_ts, e1, e2)
} else {
BucketEntryInner::cmp_fastest_reliable(cur_ts, e1, e2)
}
})
v2.1.as_ref()
.unwrap()
.with(rti, |_rti, e2| match stability {
Stability::LowLatency => {
BucketEntryInner::cmp_fastest_reliable(cur_ts, e1, e2)
}
Stability::Reliable => {
BucketEntryInner::cmp_oldest_reliable(cur_ts, e1, e2)
}
})
});
cmpout
};
@ -448,7 +466,7 @@ impl RouteSpecStore {
&current_node.0,
&current_node.1,
DialInfoFilter::all(),
reliable,
sequencing,
);
if matches!(cm, ContactMethod::Unreachable) {
reachable = false;
@ -474,7 +492,7 @@ impl RouteSpecStore {
&current_node.0,
&current_node.1,
DialInfoFilter::all(),
reliable,
sequencing,
);
if matches!(cm, ContactMethod::Unreachable) {
reachable = false;
@ -525,7 +543,8 @@ impl RouteSpecStore {
last_checked_ts: None,
last_used_ts: None,
directions,
reliable,
stability,
sequencing,
};
// Add to cache
@ -575,15 +594,18 @@ impl RouteSpecStore {
}
}
/// Find first matching unpublished route that fits into the selection criteria
pub fn first_unpublished_route(
&mut self,
reliable: bool,
min_hop_count: usize,
max_hop_count: usize,
stability: Stability,
sequencing: Sequencing,
directions: DirectionSet,
) -> Option<DHTKey> {
for detail in &self.content.details {
if detail.1.reliable == reliable
if detail.1.stability >= stability
&& detail.1.sequencing >= sequencing
&& detail.1.hops.len() >= min_hop_count
&& detail.1.hops.len() <= max_hop_count
&& detail.1.directions.is_subset(directions)
@ -602,46 +624,54 @@ impl RouteSpecStore {
/// Returns Ok(None) if no allocation could happen at this time (not an error)
pub fn compile_safety_route(
&mut self,
rti: &RoutingTableInner,
rti: &mut RoutingTableInner,
routing_table: RoutingTable,
safety_spec: Option<SafetySpec>,
safety_selection: SafetySelection,
private_route: PrivateRoute,
) -> Result<Option<CompiledRoute>, RPCError> {
let pr_hopcount = private_route.hop_count as usize;
if pr_hopcount > self.max_route_hop_count {
let max_route_hop_count = self.max_route_hop_count;
if pr_hopcount > max_route_hop_count {
return Err(RPCError::internal("private route hop count too long"));
}
// See if we are using a safety route, if not, short circuit this operation
if safety_spec.is_none() {
// Safety route stub with the node's public key as the safety route key since it's the 0th hop
if private_route.first_hop.is_none() {
return Err(RPCError::internal("can't compile zero length route"));
}
let first_hop = private_route.first_hop.as_ref().unwrap();
let opt_first_hop_noderef = match &first_hop.node {
RouteNode::NodeId(id) => rti.lookup_node_ref(routing_table, id.key),
RouteNode::PeerInfo(pi) => rti.register_node_with_signed_node_info(
routing_table.clone(),
RoutingDomain::PublicInternet,
pi.node_id.key,
pi.signed_node_info,
false,
),
};
if opt_first_hop_noderef.is_none() {
// Can't reach this private route any more
log_rtab!(debug "can't reach private route any more");
return Ok(None);
}
let safety_spec = match safety_selection {
SafetySelection::Unsafe(sequencing) => {
// Safety route stub with the node's public key as the safety route key since it's the 0th hop
if private_route.first_hop.is_none() {
return Err(RPCError::internal("can't compile zero length route"));
}
let first_hop = private_route.first_hop.as_ref().unwrap();
let opt_first_hop = match &first_hop.node {
RouteNode::NodeId(id) => rti.lookup_node_ref(routing_table.clone(), id.key),
RouteNode::PeerInfo(pi) => rti.register_node_with_signed_node_info(
routing_table.clone(),
RoutingDomain::PublicInternet,
pi.node_id.key,
pi.signed_node_info.clone(),
false,
),
};
if opt_first_hop.is_none() {
// Can't reach this private route any more
log_rtab!(debug "can't reach private route any more");
return Ok(None);
}
let mut first_hop = opt_first_hop.unwrap();
return Ok(Some(CompiledRoute {
safety_route: SafetyRoute::new_stub(routing_table.node_id(), private_route),
secret: routing_table.node_id_secret(),
first_hop: opt_first_hop_noderef.unwrap(),
}));
}
let safety_spec = safety_spec.unwrap();
// Set sequencing requirement
first_hop.set_sequencing(sequencing);
// Return the compiled safety route
return Ok(Some(CompiledRoute {
safety_route: SafetyRoute::new_stub(routing_table.node_id(), private_route),
secret: routing_table.node_id_secret(),
first_hop,
}));
}
SafetySelection::Safe(safety_spec) => safety_spec,
};
// See if the preferred route is here
let opt_safety_rsd: Option<(&mut RouteSpecDetail, DHTKey)> =
@ -658,9 +688,10 @@ impl RouteSpecStore {
} else {
// Select a safety route from the pool or make one if we don't have one that matches
if let Some(sr_pubkey) = self.first_unpublished_route(
safety_spec.reliable,
safety_spec.hop_count,
safety_spec.hop_count,
safety_spec.stability,
safety_spec.sequencing,
Direction::Outbound.into(),
) {
// Found a route to use
@ -671,7 +702,8 @@ impl RouteSpecStore {
.allocate_route(
rti,
routing_table.clone(),
safety_spec.reliable,
safety_spec.stability,
safety_spec.sequencing,
safety_spec.hop_count,
Direction::Outbound.into(),
)
@ -689,7 +721,7 @@ impl RouteSpecStore {
if sr_hopcount == 0 {
return Err(RPCError::internal("safety route hop count is zero"));
}
if sr_hopcount > self.max_route_hop_count {
if sr_hopcount > max_route_hop_count {
return Err(RPCError::internal("safety route hop count too long"));
}
@ -747,7 +779,7 @@ impl RouteSpecStore {
let node_id = safety_rsd.hops[h];
let pi = rti
.with_node_entry(node_id, |entry| {
entry.with(rti, |rti, e| {
entry.with(rti, |_rti, e| {
e.make_peer_info(node_id, RoutingDomain::PublicInternet)
})
})
@ -800,14 +832,21 @@ impl RouteSpecStore {
hops,
};
let mut first_hop = safety_rsd.hop_node_refs.first().unwrap().clone();
// Ensure sequencing requirement is set on first hop
first_hop.set_sequencing(safety_spec.sequencing);
// Build compiled route
let compiled_route = CompiledRoute {
safety_route,
secret: safety_rsd.secret_key,
first_hop: safety_rsd.hop_node_refs.first().unwrap().clone(),
first_hop,
};
// xxx: add cache here
// Return compiled route
Ok(Some(compiled_route))
}

View File

@ -179,7 +179,7 @@ pub trait RoutingDomainDetail {
node_b_id: &DHTKey,
node_b: &NodeInfo,
dial_info_filter: DialInfoFilter,
reliable: bool,
sequencing: Sequencing,
) -> ContactMethod;
}
@ -204,7 +204,7 @@ fn first_filtered_dial_info_detail(
from_node: &NodeInfo,
to_node: &NodeInfo,
dial_info_filter: &DialInfoFilter,
reliable: bool,
reliable: bool, xxx continue here
) -> Option<DialInfoDetail> {
let direct_dial_info_filter = dial_info_filter.clone().filtered(
&DialInfoFilter::all()
@ -214,7 +214,7 @@ fn first_filtered_dial_info_detail(
// Get first filtered dialinfo
let sort = if reliable {
Some(DialInfoDetail::reliable_sort)
Some(DialInfoDetail::ordered_sequencing_sort)
} else {
None
};
@ -428,7 +428,7 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
// Get first filtered dialinfo
let sort = if reliable {
Some(DialInfoDetail::reliable_sort)
Some(DialInfoDetail::ordered_sequencing_sort)
} else {
None
};

View File

@ -224,7 +224,7 @@ impl RoutingTableInner {
node_b_id: &DHTKey,
node_b: &NodeInfo,
dial_info_filter: DialInfoFilter,
reliable: bool,
sequencing: Sequencing,
) -> ContactMethod {
self.with_routing_domain(routing_domain, |rdd| {
rdd.get_contact_method(
@ -234,7 +234,7 @@ impl RoutingTableInner {
node_b_id,
node_b,
dial_info_filter,
reliable,
sequencing,
)
})
}

View File

@ -402,6 +402,7 @@ impl RPCProcessor {
&self,
safety_spec: Option<SafetySpec>,
private_route: PrivateRoute,
reliable: bool,
message_data: Vec<u8>,
) -> Result<NetworkResult<RenderedOperation>, RPCError> {
let routing_table = self.routing_table();
@ -411,7 +412,7 @@ impl RPCProcessor {
let compiled_route: CompiledRoute =
match self.routing_table().with_route_spec_store_mut(|rss, rti| {
// Compile the safety route with the private route
rss.compile_safety_route(rti, routing_table, safety_spec, private_route)
rss.compile_safety_route(rti, routing_table, safety_spec, private_route, reliable)
})? {
Some(cr) => cr,
None => {
@ -455,6 +456,7 @@ impl RPCProcessor {
let out_node_id = compiled_route.first_hop.node_id();
let out_hop_count = (1 + sr_hop_count + pr_hop_count) as usize;
let out = RenderedOperation {
message: out_message,
node_id: out_node_id,
@ -537,12 +539,12 @@ impl RPCProcessor {
Destination::PrivateRoute {
private_route,
safety_spec,
reliable,
reliable, xxxx does this need to be here? what about None safety spec, reliable is in there, does it need to not be? or something?
} => {
// Send to private route
// ---------------------
// Reply with 'route' operation
out = self.wrap_with_route(safety_spec, private_route, message)?;
out = self.wrap_with_route(safety_spec, private_route, reliable, message)?;
}
}

View File

@ -405,6 +405,21 @@ impl DialInfoClass {
}
}
// Ordering here matters, >= is used to check strength of sequencing requirement
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub enum Sequencing {
NoPreference,
PreferOrdered,
EnsureOrdered,
}
// Ordering here matters, >= is used to check strength of stability requirement
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub enum Stability {
LowLatency,
Reliable,
}
// Keep member order appropriate for sorting < preference
#[derive(Debug, Clone, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize, Hash)]
pub struct DialInfoDetail {
@ -419,14 +434,14 @@ impl MatchesDialInfoFilter for DialInfoDetail {
}
impl DialInfoDetail {
pub fn reliable_sort(a: &DialInfoDetail, b: &DialInfoDetail) -> core::cmp::Ordering {
pub fn ordered_sequencing_sort(a: &DialInfoDetail, b: &DialInfoDetail) -> core::cmp::Ordering {
if a.class < b.class {
return core::cmp::Ordering::Less;
}
if a.class > b.class {
return core::cmp::Ordering::Greater;
}
DialInfo::reliable_sort(&a.dial_info, &b.dial_info)
DialInfo::ordered_sequencing_sort(&a.dial_info, &b.dial_info)
}
pub const NO_SORT: std::option::Option<
for<'r, 's> fn(
@ -592,6 +607,39 @@ impl NodeInfo {
.unwrap_or_default()
}
pub fn has_sequencing_matched_dial_info(&self, sequencing: Sequencing) -> bool {
// Check our dial info
for did in &self.dial_info_detail_list {
match sequencing {
Sequencing::NoPreference | Sequencing::PreferOrdered => return true,
Sequencing::EnsureOrdered => {
if did.dial_info.protocol_type().is_connection_oriented() {
return true;
}
}
}
}
// Check our relay if we have one
return self
.relay_peer_info
.as_ref()
.map(|rpi| {
let relay_ni = &rpi.signed_node_info.node_info;
for did in relay_ni.dial_info_detail_list {
match sequencing {
Sequencing::NoPreference | Sequencing::PreferOrdered => return true,
Sequencing::EnsureOrdered => {
if did.dial_info.protocol_type().is_connection_oriented() {
return true;
}
}
}
}
false
})
.unwrap_or_default();
}
pub fn has_direct_dial_info(&self) -> bool {
!self.dial_info_detail_list.is_empty()
}
@ -693,31 +741,31 @@ impl ProtocolType {
ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS => LowLevelProtocolType::TCP,
}
}
pub fn sort_order(&self, reliable: bool) -> usize {
pub fn sort_order(&self, sequencing: Sequencing) -> usize {
match self {
ProtocolType::UDP => {
if reliable {
if sequencing != Sequencing::NoPreference {
3
} else {
0
}
}
ProtocolType::TCP => {
if reliable {
if sequencing != Sequencing::NoPreference {
0
} else {
1
}
}
ProtocolType::WS => {
if reliable {
if sequencing != Sequencing::NoPreference {
1
} else {
2
}
}
ProtocolType::WSS => {
if reliable {
if sequencing != Sequencing::NoPreference {
2
} else {
3
@ -725,6 +773,9 @@ impl ProtocolType {
}
}
}
pub fn all_ordered_set() -> ProtocolTypeSet {
ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS
}
}
pub type ProtocolTypeSet = EnumSet<ProtocolType>;
@ -1499,7 +1550,7 @@ impl DialInfo {
}
}
pub fn reliable_sort(a: &DialInfo, b: &DialInfo) -> core::cmp::Ordering {
pub fn ordered_sequencing_sort(a: &DialInfo, b: &DialInfo) -> core::cmp::Ordering {
let ca = a.protocol_type().sort_order(true);
let cb = b.protocol_type().sort_order(true);
if ca < cb {