more route work

This commit is contained in:
John Smith
2022-10-20 23:11:41 -04:00
parent fc6eb6e84a
commit c8ba88fb99
8 changed files with 171 additions and 77 deletions

View File

@@ -57,7 +57,7 @@ impl Bucket {
}
pub(super) fn kick(
&self,
&mut self,
inner: &mut RoutingTableInner,
bucket_depth: usize,
) -> Option<BTreeSet<DHTKey>> {

View File

@@ -658,7 +658,7 @@ impl BucketEntry {
// immutable reference to RoutingTableInner must be passed in to get this
// This ensures that an operation on the routing table can not change entries
// while it is being read from
pub(super) fn with<F, R>(&self, rti: &RoutingTableInner, f: F) -> R
pub fn with<F, R>(&self, rti: &RoutingTableInner, f: F) -> R
where
F: FnOnce(&RoutingTableInner, &BucketEntryInner) -> R,
{
@@ -668,7 +668,7 @@ impl BucketEntry {
// Note, that this requires -also- holding the RoutingTable write lock, as a
// mutable reference to RoutingTableInner must be passed in to get this
pub(super) fn with_mut<F, R>(&self, rti: &mut RoutingTableInner, f: F) -> R
pub fn with_mut<F, R>(&self, rti: &mut RoutingTableInner, f: F) -> R
where
F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner) -> R,
{

View File

@@ -20,6 +20,8 @@ pub struct CompiledRoute {
pub safety_route: SafetyRoute,
/// The secret used to encrypt the message payload
pub secret: DHTKeySecret,
/// The node ref to the first hop in the compiled route
pub first_hop: NodeRef,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@@ -46,6 +48,9 @@ struct RouteSpecDetail {
/// Not serialized because all routes should be re-published when restarting
#[serde(skip)]
published: bool,
// Can optimize the rendering of this route, using node ids only instead of full peer info
#[serde(skip)]
reachable: bool,
/// Timestamp of when the route was created
created_ts: u64,
/// Timestamp of when the route was last checked for validity
@@ -124,7 +129,7 @@ fn get_route_permutation_count(hop_count: usize) -> usize {
/// get the route permutation at particular 'perm' index, starting at the 'start' index
/// for a set of 'hop_count' nodes. the first node is always fixed, and the maximum
/// number of permutations is given by get_route_permutation_count()
fn with_route_permutations<F>(hop_count: usize, start: usize, f: F) -> bool
fn with_route_permutations<F>(hop_count: usize, start: usize, mut f: F) -> bool
where
F: FnMut(&[usize]) -> bool,
{
@@ -142,7 +147,7 @@ where
}
// heaps algorithm, but skipping the first element
fn heaps_permutation<F>(permutation: &mut [usize], size: usize, f: F) -> bool
fn heaps_permutation<F>(permutation: &mut [usize], size: usize, mut f: F) -> bool
where
F: FnMut(&[usize]) -> bool,
{
@@ -154,7 +159,7 @@ where
}
for i in 0..size {
if heaps_permutation(permutation, size - 1, f) {
if heaps_permutation(permutation, size - 1, &mut f) {
return true;
}
if size % 2 == 1 {
@@ -224,7 +229,7 @@ impl RouteSpecStore {
}
// Rebuild the routespecstore cache
rss.rebuild_cache(routing_table);
rss.rebuild_cache();
Ok(rss)
}
@@ -251,28 +256,28 @@ impl RouteSpecStore {
Ok(())
}
fn add_to_cache(&mut self, cache_key: Vec<u8>, rsd: &RouteSpecDetail) {
if !self.cache.hop_cache.insert(cache_key) {
fn add_to_cache(cache: &mut RouteSpecStoreCache, cache_key: Vec<u8>, rsd: &RouteSpecDetail) {
if !cache.hop_cache.insert(cache_key) {
panic!("route should never be inserted twice");
}
for h in &rsd.hops {
self.cache
cache
.used_nodes
.entry(*h)
.and_modify(|e| *e += 1)
.or_insert(1);
}
self.cache
cache
.used_end_nodes
.entry(*rsd.hops.last().unwrap())
.and_modify(|e| *e += 1)
.or_insert(1);
}
fn rebuild_cache(&mut self, routing_table: RoutingTable) {
fn rebuild_cache(&mut self) {
for v in self.content.details.values() {
let cache_key = route_hops_to_hop_cache(&v.hops);
self.add_to_cache(cache_key, &v);
Self::add_to_cache(&mut self.cache, cache_key, &v);
}
}
@@ -304,12 +309,7 @@ impl RouteSpecStore {
// Get list of all nodes, and sort them for selection
let cur_ts = intf::get_timestamp();
let dial_info_sort = if reliable {
Some(DialInfoDetail::reliable_sort)
} else {
None
};
let filter = |rti, k: DHTKey, v: Option<Arc<BucketEntry>>| -> bool {
let filter = |rti, _k: DHTKey, v: Option<Arc<BucketEntry>>| -> bool {
// Exclude our own node from routes
if v.is_none() {
return false;
@@ -503,7 +503,10 @@ impl RouteSpecStore {
let hops = route_nodes.iter().map(|v| nodes[*v].0).collect();
let hop_node_refs = route_nodes
.iter()
.map(|v| rti.lookup_node_ref(routing_table, nodes[*v].0).unwrap())
.map(|v| {
rti.lookup_node_ref(routing_table.clone(), nodes[*v].0)
.unwrap()
})
.collect();
let (public_key, secret_key) = generate_secret();
@@ -517,6 +520,7 @@ impl RouteSpecStore {
latency_stats_accounting: Default::default(),
transfer_stats_accounting: Default::default(),
published: false,
reachable: false,
created_ts: cur_ts,
last_checked_ts: None,
last_used_ts: None,
@@ -525,7 +529,7 @@ impl RouteSpecStore {
};
// Add to cache
self.add_to_cache(cache_key, &rsd);
Self::add_to_cache(&mut self.cache, cache_key, &rsd);
// Keep route in spec store
self.content.details.insert(public_key, rsd);
@@ -543,7 +547,7 @@ impl RouteSpecStore {
// Remove from used nodes cache
for h in &detail.hops {
match self.cache.used_nodes.entry(*h) {
std::collections::hash_map::Entry::Occupied(o) => {
std::collections::hash_map::Entry::Occupied(mut o) => {
*o.get_mut() -= 1;
if *o.get() == 0 {
o.remove();
@@ -556,7 +560,7 @@ impl RouteSpecStore {
}
// Remove from end nodes cache
match self.cache.used_nodes.entry(*detail.hops.last().unwrap()) {
std::collections::hash_map::Entry::Occupied(o) => {
std::collections::hash_map::Entry::Occupied(mut o) => {
*o.get_mut() -= 1;
if *o.get() == 0 {
o.remove();
@@ -611,23 +615,44 @@ impl RouteSpecStore {
// See if we are using a safety route, if not, short circuit this operation
if safety_spec.is_none() {
// Safety route stub with the node's public key as the safety route key since it's the 0th hop
if private_route.first_hop.is_none() {
return Err(RPCError::internal("can't compile zero length route"));
}
let first_hop = private_route.first_hop.as_ref().unwrap();
let opt_first_hop_noderef = match &first_hop.node {
RouteNode::NodeId(id) => rti.lookup_node_ref(routing_table, id.key),
RouteNode::PeerInfo(pi) => rti.register_node_with_signed_node_info(
routing_table.clone(),
RoutingDomain::PublicInternet,
pi.node_id.key,
pi.signed_node_info,
false,
),
};
if opt_first_hop_noderef.is_none() {
// Can't reach this private route any more
log_rtab!(debug "can't reach private route any more");
return Ok(None);
}
return Ok(Some(CompiledRoute {
safety_route: SafetyRoute::new_stub(routing_table.node_id(), private_route),
secret: routing_table.node_id_secret(),
first_hop: opt_first_hop_noderef.unwrap(),
}));
}
let safety_spec = safety_spec.unwrap();
// See if the preferred route is here
let safety_route_public_key;
let opt_safety_rsd: Option<&mut RouteSpecDetail> =
let opt_safety_rsd: Option<(&mut RouteSpecDetail, DHTKey)> =
if let Some(preferred_route) = safety_spec.preferred_route {
self.detail_mut(&preferred_route)
.map(|rsd| (rsd, preferred_route))
} else {
// Preferred safety route was not requested
None
};
let safety_rsd: &mut RouteSpecDetail = if let Some(safety_rsd) = opt_safety_rsd {
let (safety_rsd, sr_pubkey) = if let Some(safety_rsd) = opt_safety_rsd {
// Safety route exists
safety_rsd
} else {
@@ -639,13 +664,13 @@ impl RouteSpecStore {
Direction::Outbound.into(),
) {
// Found a route to use
self.detail_mut(&sr_pubkey).unwrap()
(self.detail_mut(&sr_pubkey).unwrap(), sr_pubkey)
} else {
// No route found, gotta allocate one
let sr_pubkey = match self
.allocate_route(
rti,
routing_table,
routing_table.clone(),
safety_spec.reliable,
safety_spec.hop_count,
Direction::Outbound.into(),
@@ -655,12 +680,10 @@ impl RouteSpecStore {
Some(pk) => pk,
None => return Ok(None),
};
self.detail_mut(&sr_pubkey).unwrap()
(self.detail_mut(&sr_pubkey).unwrap(), sr_pubkey)
}
};
// xxx implement caching first!
// Ensure the total hop count isn't too long for our config
let sr_hopcount = safety_spec.hop_count;
if sr_hopcount == 0 {
@@ -672,7 +695,9 @@ impl RouteSpecStore {
// See if we can optimize this compilation yet
// We don't want to include full nodeinfo if we don't have to
//let optimize = safety_rsd.
let optimize = safety_rsd.reachable;
// xxx implement caching here!
// Create hops
let hops = {
@@ -714,7 +739,27 @@ impl RouteSpecStore {
// Make route hop
let route_hop = RouteHop {
node: safety_route_spec.hops[h].dial_info.clone(),
node: match optimize {
// Optimized, no peer info, just the dht key
true => RouteNode::NodeId(NodeId::new(safety_rsd.hops[h])),
// Full peer info, required until we are sure the route has been fully established
false => {
let node_id = safety_rsd.hops[h];
let pi = rti
.with_node_entry(node_id, |entry| {
entry.with(rti, |rti, e| {
e.make_peer_info(node_id, RoutingDomain::PublicInternet)
})
})
.flatten();
if pi.is_none() {
return Err(RPCError::internal(
"peer info should exist for route but doesn't",
));
}
RouteNode::PeerInfo(pi.unwrap())
}
},
next_hop: Some(route_hop_data),
};
@@ -734,12 +779,8 @@ impl RouteSpecStore {
}
// Encode first RouteHopData
let dh_secret = self
.crypto
.cached_dh(
&safety_route_spec.hops[0].dial_info.node_id.key,
&safety_route_spec.secret_key,
)
let dh_secret = crypto
.cached_dh(&safety_rsd.hops[0], &safety_rsd.secret_key)
.map_err(RPCError::map_internal("dh failed"))?;
let enc_msg_data = Crypto::encrypt_aead(blob_data.as_slice(), &nonce, &dh_secret, None)
.map_err(RPCError::map_internal("encryption failed"))?;
@@ -754,15 +795,20 @@ impl RouteSpecStore {
// Build safety route
let safety_route = SafetyRoute {
public_key: safety_rsd.
public_key: sr_pubkey,
hop_count: safety_spec.hop_count as u8,
hops,
};
Ok(Some(CompiledRoute {
let compiled_route = CompiledRoute {
safety_route,
secret: todo!(),
}))
secret: safety_rsd.secret_key,
first_hop: safety_rsd.hop_node_refs.first().unwrap().clone(),
};
// xxx: add cache here
Ok(Some(compiled_route))
}
/// Mark route as published
@@ -775,6 +821,16 @@ impl RouteSpecStore {
Ok(())
}
/// Mark route as reachable
/// When first deserialized, routes must be re-tested for reachability
/// This can be used to determine if routes need to be sent with full peerinfo or can just use a node id
pub fn mark_route_reachable(&mut self, key: &DHTKey) -> EyreResult<()> {
self.detail_mut(&key)
.ok_or_else(|| eyre!("route does not exist"))?
.published = true;
Ok(())
}
/// Mark route as checked
pub fn touch_route_checked(&mut self, key: &DHTKey, cur_ts: u64) -> EyreResult<()> {
self.detail_mut(&key)

View File

@@ -68,10 +68,10 @@ impl RoutingTableInner {
self.unlocked_inner.config.clone()
}
pub fn transfer_stats_accounting(&mut self) -> &TransferStatsAccounting {
pub fn transfer_stats_accounting(&mut self) -> &mut TransferStatsAccounting {
&mut self.self_transfer_stats_accounting
}
pub fn latency_stats_accounting(&mut self) -> &LatencyStatsAccounting {
pub fn latency_stats_accounting(&mut self) -> &mut LatencyStatsAccounting {
&mut self.self_latency_stats_accounting
}
@@ -616,7 +616,7 @@ impl RoutingTableInner {
/// Resolve an existing routing table entry and return a reference to it
pub fn lookup_node_ref(&self, outer_self: RoutingTable, node_id: DHTKey) -> Option<NodeRef> {
if node_id == self.unlocked_inner.node_id {
log_rtab!(debug "can't look up own node id in routing table");
log_rtab!(error "can't look up own node id in routing table");
return None;
}
let idx = self.find_bucket_index(node_id);
@@ -644,6 +644,23 @@ impl RoutingTableInner {
)
}
/// Resolve an existing routing table entry and call a function on its entry without using a noderef
pub fn with_node_entry<F, R>(&self, node_id: DHTKey, f: F) -> Option<R>
where
F: FnOnce(Arc<BucketEntry>) -> R,
{
if node_id == self.unlocked_inner.node_id {
log_rtab!(error "can't look up own node id in routing table");
return None;
}
let idx = self.find_bucket_index(node_id);
let bucket = &self.buckets[idx];
if let Some(e) = bucket.entry(&node_id) {
return Some(f(e));
}
None
}
/// Shortcut function to add a node to our routing table if it doesn't exist
/// and add the dial info we have for it. Returns a noderef filtered to
/// the routing domain in which this node was registered for convenience.