From 75ade4200aaba4e351be15968bd2d0aa0552a438 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sun, 4 Sep 2022 15:40:35 -0400 Subject: [PATCH] more refactor checkpoint --- veilid-core/src/network_manager/mod.rs | 4 +- veilid-core/src/network_manager/native/mod.rs | 2 +- veilid-core/src/network_manager/tasks.rs | 8 ---- veilid-core/src/routing_table/bucket_entry.rs | 38 ++++++++++--------- veilid-core/src/routing_table/find_nodes.rs | 6 +-- veilid-core/src/routing_table/mod.rs | 12 +++--- veilid-core/src/routing_table/node_ref.rs | 13 +++++-- .../src/routing_table/routing_domains.rs | 4 +- .../coders/operations/operation.rs | 10 ++--- .../coders/operations/question.rs | 5 +-- veilid-core/src/rpc_processor/destination.rs | 11 ++++-- veilid-core/src/rpc_processor/mod.rs | 19 +++++----- 12 files changed, 65 insertions(+), 67 deletions(-) diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 3747c55e..38bec4d8 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -537,15 +537,13 @@ impl NetworkManager { } pub async fn tick(&self) -> EyreResult<()> { - let (routing_table, net, receipt_manager, protocol_config) = { + let (routing_table, net, receipt_manager) = { let inner = self.inner.lock(); let components = inner.components.as_ref().unwrap(); - let protocol_config = inner.protocol_config.as_ref().unwrap(); ( inner.routing_table.as_ref().unwrap().clone(), components.net.clone(), components.receipt_manager.clone(), - protocol_config.clone(), ) }; diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index b221d259..b86912af 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -608,7 +608,7 @@ impl Network { self.unlocked_inner .interfaces .with_interfaces(|interfaces| { - for (name, intf) in interfaces { + for (_name, intf) in interfaces { // Skip networks that we should never encounter if intf.is_loopback() || !intf.is_running() { return; diff --git a/veilid-core/src/network_manager/tasks.rs b/veilid-core/src/network_manager/tasks.rs index 16511a17..26cd7ffb 100644 --- a/veilid-core/src/network_manager/tasks.rs +++ b/veilid-core/src/network_manager/tasks.rs @@ -411,12 +411,6 @@ impl NetworkManager { // Get all nodes needing pings in the LocalNetwork routing domain let node_refs = routing_table.get_nodes_needing_ping(RoutingDomain::LocalNetwork, cur_ts); - // Get our LocalNetwork dial info - let dids = routing_table.all_filtered_dial_info_details( - RoutingDomain::LocalNetwork.into(), - &DialInfoFilter::all(), - ); - // For all nodes needing pings, figure out how many and over what protocols for nr in node_refs { let rpc = rpc.clone(); @@ -437,8 +431,6 @@ impl NetworkManager { _last_ts: u64, cur_ts: u64, ) -> EyreResult<()> { - let rpc = self.rpc_processor(); - let routing_table = self.routing_table(); let mut unord = FuturesUnordered::new(); // PublicInternet diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 0c6aa75a..89fa46c1 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -192,8 +192,8 @@ impl BucketEntryInner { for routing_domain in routing_domain_set { // Get the correct signed_node_info for the chosen routing domain let opt_current_sni = match routing_domain { - RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, - RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, + RoutingDomain::LocalNetwork => &self.local_network.signed_node_info, + RoutingDomain::PublicInternet => &self.public_internet.signed_node_info, }; if opt_current_sni.is_some() { return true; @@ -204,24 +204,24 @@ impl BucketEntryInner { pub fn node_info(&self, routing_domain: RoutingDomain) -> Option<&NodeInfo> { let opt_current_sni = match routing_domain { - RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, - RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, + RoutingDomain::LocalNetwork => &self.local_network.signed_node_info, + RoutingDomain::PublicInternet => &self.public_internet.signed_node_info, }; opt_current_sni.as_ref().map(|s| &s.node_info) } pub fn signed_node_info(&self, routing_domain: RoutingDomain) -> Option<&SignedNodeInfo> { let opt_current_sni = match routing_domain { - RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, - RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, + RoutingDomain::LocalNetwork => &self.local_network.signed_node_info, + RoutingDomain::PublicInternet => &self.public_internet.signed_node_info, }; opt_current_sni.as_ref().map(|s| s.as_ref()) } pub fn make_peer_info(&self, key: DHTKey, routing_domain: RoutingDomain) -> Option { let opt_current_sni = match routing_domain { - RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, - RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, + RoutingDomain::LocalNetwork => &self.local_network.signed_node_info, + RoutingDomain::PublicInternet => &self.public_internet.signed_node_info, }; opt_current_sni.as_ref().map(|s| PeerInfo { node_id: NodeId::new(key), @@ -235,10 +235,10 @@ impl BucketEntryInner { ) -> Option { for routing_domain in routing_domain_set { let opt_current_sni = match routing_domain { - RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, - RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, + RoutingDomain::LocalNetwork => &self.local_network.signed_node_info, + RoutingDomain::PublicInternet => &self.public_internet.signed_node_info, }; - if let Some(sni) = opt_current_sni { + if opt_current_sni.is_some() { return Some(routing_domain); } } @@ -265,10 +265,10 @@ impl BucketEntryInner { } // Gets the best 'last connection' that matches a set of routing domain, protocol types and address types - pub fn last_connection( + pub(super) fn last_connection( &self, routing_table_inner: &RoutingTableInner, - node_ref_filter: &Option, + node_ref_filter: Option, ) -> Option<(ConnectionDescriptor, u64)> { // Iterate peer scopes and protocol types and address type in order to ensure we pick the preferred protocols if all else is the same let nrf = node_ref_filter.unwrap_or_default(); @@ -327,11 +327,13 @@ impl BucketEntryInner { RoutingDomain::LocalNetwork => self .local_network .node_status - .map(|ln| NodeStatus::LocalNetwork(ln)), + .as_ref() + .map(|ln| NodeStatus::LocalNetwork(ln.clone())), RoutingDomain::PublicInternet => self .public_internet .node_status - .map(|pi| NodeStatus::PublicInternet(pi)), + .as_ref() + .map(|pi| NodeStatus::PublicInternet(pi.clone())), } } @@ -426,7 +428,7 @@ impl BucketEntryInner { } // Check if this node needs a ping right now to validate it is still reachable - pub(super) fn needs_ping(&self, node_id: &DHTKey, cur_ts: u64, needs_keepalive: bool) -> bool { + pub(super) fn needs_ping(&self, cur_ts: u64, needs_keepalive: bool) -> bool { // See which ping pattern we are to use let state = self.state(cur_ts); @@ -605,7 +607,7 @@ impl BucketEntry { } } - pub fn with(&self, f: F) -> R + pub(super) fn with(&self, f: F) -> R where F: FnOnce(&BucketEntryInner) -> R, { @@ -613,7 +615,7 @@ impl BucketEntry { f(&*inner) } - pub fn with_mut(&self, f: F) -> R + pub(super) fn with_mut(&self, f: F) -> R where F: FnOnce(&mut BucketEntryInner) -> R, { diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index 49929d98..3519f733 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -132,7 +132,7 @@ impl RoutingTable { // does it have some dial info we need? let filter = |n: &NodeInfo| { let mut keep = false; - for did in n.dial_info_detail_list { + for did in &n.dial_info_detail_list { if matches!(did.dial_info.address_type(), AddressType::IPV4) { for (n, protocol_type) in protocol_types.iter().enumerate() { if nodes_proto_v4[n] < max_per_type @@ -250,7 +250,7 @@ impl RoutingTable { &self, node_count: usize, mut filter: F, - mut transform: T, + transform: T, ) -> Vec where F: FnMut(DHTKey, Option>) -> bool, @@ -331,7 +331,7 @@ impl RoutingTable { pub fn find_closest_nodes( &self, node_id: DHTKey, - mut filter: F, + filter: F, mut transform: T, ) -> Vec where diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 7d92a890..84ed94a6 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -140,7 +140,7 @@ impl RoutingTable { self.inner.read().node_id_secret } - pub fn routing_domain_for_address_inner( + fn routing_domain_for_address_inner( inner: &RoutingTableInner, address: Address, ) -> Option { @@ -189,7 +189,7 @@ impl RoutingTable { } pub fn set_relay_node(&self, domain: RoutingDomain, opt_relay_node: Option) { - let inner = self.inner.write(); + let mut inner = self.inner.write(); Self::with_routing_domain_mut(&mut *inner, domain, |rd| rd.set_relay_node(opt_relay_node)); } @@ -275,13 +275,13 @@ impl RoutingTable { return false; } // Ensure all of the dial info works in this routing domain - for did in node_info.dial_info_detail_list { + for did in &node_info.dial_info_detail_list { if !self.ensure_dial_info_is_valid(routing_domain, &did.dial_info) { return false; } } // Ensure the relay is also valid in this routing domain if it is provided - if let Some(relay_peer_info) = node_info.relay_peer_info { + if let Some(relay_peer_info) = node_info.relay_peer_info.as_ref() { let relay_ni = &relay_peer_info.signed_node_info.node_info; if !self.node_info_is_valid_in_routing_domain(routing_domain, relay_ni) { return false; @@ -620,7 +620,7 @@ impl RoutingTable { Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { if v.with(|e| { e.has_node_info(routing_domain.into()) - && e.needs_ping(&k, cur_ts, opt_relay_id == Some(k)) + && e.needs_ping(cur_ts, opt_relay_id == Some(k)) }) { node_refs.push(NodeRef::new( self.clone(), @@ -834,7 +834,7 @@ impl RoutingTable { .collect() } - pub fn touch_recent_peer( + fn touch_recent_peer( inner: &mut RoutingTableInner, node_id: DHTKey, last_connection: ConnectionDescriptor, diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 326b351d..d805882f 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -150,13 +150,15 @@ impl NodeRef { pub fn routing_domain_set(&self) -> RoutingDomainSet { self.filter + .as_ref() .map(|f| f.routing_domain_set) .unwrap_or(RoutingDomainSet::all()) } pub fn dial_info_filter(&self) -> DialInfoFilter { self.filter - .map(|f| f.dial_info_filter) + .as_ref() + .map(|f| f.dial_info_filter.clone()) .unwrap_or(DialInfoFilter::all()) } @@ -164,6 +166,7 @@ impl NodeRef { self.operate(|_rti, e| { e.best_routing_domain( self.filter + .as_ref() .map(|f| f.routing_domain_set) .unwrap_or(RoutingDomainSet::all()), ) @@ -235,8 +238,10 @@ impl NodeRef { dif } pub fn relay(&self, routing_domain: RoutingDomain) -> Option { - let target_rpi = - self.operate(|_rt, e| e.node_info(routing_domain).map(|n| n.relay_peer_info))?; + let target_rpi = self.operate(|_rti, e| { + e.node_info(routing_domain) + .map(|n| n.relay_peer_info.as_ref().map(|pi| pi.as_ref().clone())) + })?; target_rpi.and_then(|t| { // If relay is ourselves, then return None, because we can't relay through ourselves // and to contact this node we should have had an existing inbound connection @@ -294,7 +299,7 @@ impl NodeRef { pub async fn last_connection(&self) -> Option { // Get the last connection and the last time we saw anything with this connection let (last_connection, last_seen) = - self.operate(|rti, e| e.last_connection(rti, &self.filter))?; + self.operate(|rti, e| e.last_connection(rti, self.filter.clone()))?; // Should we check the connection table? if last_connection.protocol_type().is_connection_oriented() { diff --git a/veilid-core/src/routing_table/routing_domains.rs b/veilid-core/src/routing_table/routing_domains.rs index ea2ea1a7..ee29fa73 100644 --- a/veilid-core/src/routing_table/routing_domains.rs +++ b/veilid-core/src/routing_table/routing_domains.rs @@ -57,7 +57,7 @@ pub struct LocalInternetRoutingDomainDetail { } impl LocalInternetRoutingDomainDetail { - pub fn set_local_networks(&mut self, local_networks: Vec<(IpAddr, IpAddr)>) -> bool { + pub fn set_local_networks(&mut self, mut local_networks: Vec<(IpAddr, IpAddr)>) -> bool { local_networks.sort(); if local_networks == self.local_networks { return false; @@ -70,7 +70,7 @@ impl LocalInternetRoutingDomainDetail { impl RoutingDomainDetail for LocalInternetRoutingDomainDetail { fn can_contain_address(&self, address: Address) -> bool { let ip = address.to_ip_addr(); - for localnet in self.local_networks { + for localnet in &self.local_networks { if ipaddr_in_network(ip, localnet.0, localnet.1) { return true; } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation.rs b/veilid-core/src/rpc_processor/coders/operations/operation.rs index aa7185a4..fc61cf32 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation.rs @@ -25,7 +25,7 @@ impl RPCOperationKind { let out = match which_reader { veilid_capnp::operation::kind::Which::Question(r) => { let q_reader = r.map_err(RPCError::protocol)?; - let out = RPCQuestion::decode(&q_reader, sender_node_id)?; + let out = RPCQuestion::decode(&q_reader)?; RPCOperationKind::Question(out) } veilid_capnp::operation::kind::Which::Statement(r) => { @@ -137,12 +137,12 @@ impl RPCOperation { pub fn encode(&self, builder: &mut veilid_capnp::operation::Builder) -> Result<(), RPCError> { builder.set_op_id(self.op_id); - let mut k_builder = builder.reborrow().init_kind(); - self.kind.encode(&mut k_builder)?; - if let Some(sender_info) = self.sender_node_info { - let si_builder = builder.reborrow().init_sender_node_info(); + if let Some(sender_info) = &self.sender_node_info { + let mut si_builder = builder.reborrow().init_sender_node_info(); encode_signed_node_info(&sender_info, &mut si_builder)?; } + let mut k_builder = builder.reborrow().init_kind(); + self.kind.encode(&mut k_builder)?; Ok(()) } } diff --git a/veilid-core/src/rpc_processor/coders/operations/question.rs b/veilid-core/src/rpc_processor/coders/operations/question.rs index eb0bae02..ae62ca72 100644 --- a/veilid-core/src/rpc_processor/coders/operations/question.rs +++ b/veilid-core/src/rpc_processor/coders/operations/question.rs @@ -21,10 +21,7 @@ impl RPCQuestion { pub fn desc(&self) -> &'static str { self.detail.desc() } - pub fn decode( - reader: &veilid_capnp::question::Reader, - sender_node_id: &DHTKey, - ) -> Result { + pub fn decode(reader: &veilid_capnp::question::Reader) -> Result { let rt_reader = reader.get_respond_to(); let respond_to = RespondTo::decode(&rt_reader)?; let d_reader = reader.get_detail(); diff --git a/veilid-core/src/rpc_processor/destination.rs b/veilid-core/src/rpc_processor/destination.rs index bbbd84c5..d98adf30 100644 --- a/veilid-core/src/rpc_processor/destination.rs +++ b/veilid-core/src/rpc_processor/destination.rs @@ -103,16 +103,16 @@ impl Destination { pub fn safety_route_spec(&self) -> Option> { match self { Destination::Direct { - target, + target: _, safety_route_spec, } => safety_route_spec.clone(), Destination::Relay { - relay, - target, + relay: _, + target: _, safety_route_spec, } => safety_route_spec.clone(), Destination::PrivateRoute { - private_route, + private_route: _, safety_route_spec, } => safety_route_spec.clone(), } @@ -154,6 +154,7 @@ impl fmt::Display for Destination { safety_route_spec, } => { let sr = safety_route_spec + .as_ref() .map(|_sr| "+SR".to_owned()) .unwrap_or_default(); @@ -165,6 +166,7 @@ impl fmt::Display for Destination { safety_route_spec, } => { let sr = safety_route_spec + .as_ref() .map(|_sr| "+SR".to_owned()) .unwrap_or_default(); @@ -175,6 +177,7 @@ impl fmt::Display for Destination { safety_route_spec, } => { let sr = safety_route_spec + .as_ref() .map(|_sr| "+SR".to_owned()) .unwrap_or_default(); diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index cb38b124..b4c38203 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -323,7 +323,7 @@ impl RPCProcessor { .await .into_timeout_or(); Ok(res.map(|res| { - let (span_id, rpcreader) = res.take_value().unwrap(); + let (_span_id, rpcreader) = res.take_value().unwrap(); let end_ts = intf::get_timestamp(); // fixme: causes crashes? "Missing otel data span extensions"?? @@ -385,13 +385,13 @@ impl RPCProcessor { // To where are we sending the request match dest { Destination::Direct { - target: node_ref, - safety_route_spec, + target: ref node_ref, + ref safety_route_spec, } | Destination::Relay { - relay: node_ref, + relay: ref node_ref, target: _, - safety_route_spec, + ref safety_route_spec, } => { // Send to a node without a private route // -------------------------------------- @@ -399,7 +399,7 @@ impl RPCProcessor { // Get the actual destination node id accounting for relays let (node_ref, node_id) = if let Destination::Relay { relay: _, - target: dht_key, + target: ref dht_key, safety_route_spec: _, } = dest { @@ -410,7 +410,7 @@ impl RPCProcessor { }; // Handle the existence of safety route - match safety_route_spec { + match safety_route_spec.as_ref() { None => { // If no safety route is being used, and we're not sending to a private // route, we can use a direct envelope instead of routing @@ -434,7 +434,8 @@ impl RPCProcessor { .dial_info .node_id .key; - out_message = self.wrap_with_route(Some(sr), private_route, message_vec)?; + out_message = + self.wrap_with_route(Some(sr.clone()), private_route, message_vec)?; out_hop_count = 1 + sr.hops.len(); } }; @@ -892,7 +893,7 @@ impl RPCProcessor { stop_token: StopToken, receiver: flume::Receiver<(Option, RPCMessageEncoded)>, ) { - while let Ok(Ok((span_id, msg))) = + while let Ok(Ok((_span_id, msg))) = receiver.recv_async().timeout_at(stop_token.clone()).await { let rpc_worker_span = span!(parent: None, Level::TRACE, "rpc_worker");