diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 59f2eb39..d3948b68 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -792,25 +792,15 @@ impl NetworkManager { // Get node's min/max envelope version and see if we can send to it // and if so, get the max version we can use - node_ref.envelope_support() - let envelope_version = if let Some(min_max_version) = { - #[allow(clippy::absurd_extreme_comparisons)] - if min_max_version.min > MAX_ENVELOPE_VERSION || min_max_version.max < MIN_ENVELOPE_VERSION - { - bail!( - "can't talk to this node {} because version is unsupported: ({},{})", - via_node_id, - min_max_version.min, - min_max_version.max - ); - } - cmp::min(min_max_version.max, MAX_CRYPTO_VERSION) - } else { - MAX_CRYPTO_VERSION + let Some(envelope_version) = node_ref.envelope_support().iter().rev().find(|x| VALID_ENVELOPE_VERSIONS.contains(x)) else { + bail!( + "can't talk to this node {} because we dont support its envelope versions", + node_ref + ); }; // Build the envelope to send - let out = self.build_envelope(envelope_node_id, version, body)?; + let out = self.build_envelope(envelope_node_id, envelope_version, body)?; // Send the envelope via whatever means necessary self.send_data(node_ref.clone(), out).await @@ -1449,7 +1439,7 @@ impl NetworkManager { // Cache the envelope information in the routing table let source_noderef = match routing_table.register_node_with_existing_connection( - envelope.get_sender_id(), + TypedKey::new(envelope.get_crypto_kind(), envelope.get_sender_id()), connection_descriptor, ts, ) { diff --git a/veilid-core/src/routing_table/bucket.rs b/veilid-core/src/routing_table/bucket.rs index 0f7cbecd..ca99e64d 100644 --- a/veilid-core/src/routing_table/bucket.rs +++ b/veilid-core/src/routing_table/bucket.rs @@ -101,8 +101,7 @@ impl Bucket { log_rtab!("Node added: {}", node_id); // Add new entry - let entry = Arc::new(BucketEntry::new()); - entry.with_mut_inner(|e| e.add_node_id(node_id)); + let entry = Arc::new(BucketEntry::new(node_id)); self.entries.insert(node_id.key, entry.clone()); // This is now the newest bucket entry diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 1fea2d27..c0df7e4d 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -129,6 +129,9 @@ impl BucketEntryInner { pub fn add_node_id(&mut self, node_id: TypedKey) { self.node_ids.add(node_id); } + pub fn best_node_id(&self) -> TypedKey { + self.node_ids.best().unwrap() + } // Less is faster pub fn cmp_fastest(e1: &Self, e2: &Self) -> std::cmp::Ordering { @@ -237,22 +240,9 @@ impl BucketEntryInner { } } - // Update the protocol min/max version we have to use, to include relay requirements if needed - let mut version_range = VersionRange { - min: signed_node_info.node_info().min_version, - max: signed_node_info.node_info().max_version, - }; - if let Some(relay_info) = signed_node_info.relay_info() { - version_range.min.max_assign(relay_info.min_version); - version_range.max.min_assign(relay_info.max_version); - } - if version_range.min <= version_range.max { - // Can be reached with at least one crypto version - self.min_max_version = Some(version_range); - } else { - // No valid crypto version in range - self.min_max_version = None; - } + // Update the envelope version support we have to use + let mut envelope_support = signed_node_info.node_info().envelope_support.clone(); + self.set_envelope_support(envelope_support); // Update the signed node info *opt_current_sni = Some(Box::new(signed_node_info)); @@ -756,39 +746,41 @@ pub struct BucketEntry { } impl BucketEntry { - pub(super) fn new() -> Self { + pub(super) fn new(first_node_id: TypedKey) -> Self { let now = get_aligned_timestamp(); - Self { - ref_count: AtomicU32::new(0), - inner: RwLock::new(BucketEntryInner { - node_ids: TypedKeySet::new(), - envelope_support: Vec::new(), - updated_since_last_network_change: false, - last_connections: BTreeMap::new(), - local_network: BucketEntryLocalNetwork { - last_seen_our_node_info_ts: Timestamp::new(0u64), - signed_node_info: None, - node_status: None, - }, - public_internet: BucketEntryPublicInternet { - last_seen_our_node_info_ts: Timestamp::new(0u64), - signed_node_info: None, - node_status: None, - }, - peer_stats: PeerStats { - time_added: now, - rpc_stats: RPCStats::default(), - latency: None, - transfer: TransferStatsDownUp::default(), - }, - latency_stats_accounting: LatencyStatsAccounting::new(), - transfer_stats_accounting: TransferStatsAccounting::new(), - #[cfg(feature = "tracking")] - next_track_id: 0, - #[cfg(feature = "tracking")] - node_ref_tracks: HashMap::new(), - }), - } + let mut node_ids = TypedKeySet::new(); + node_ids.add(first_node_id); + + let inner = BucketEntryInner { + node_ids, + envelope_support: Vec::new(), + updated_since_last_network_change: false, + last_connections: BTreeMap::new(), + local_network: BucketEntryLocalNetwork { + last_seen_our_node_info_ts: Timestamp::new(0u64), + signed_node_info: None, + node_status: None, + }, + public_internet: BucketEntryPublicInternet { + last_seen_our_node_info_ts: Timestamp::new(0u64), + signed_node_info: None, + node_status: None, + }, + peer_stats: PeerStats { + time_added: now, + rpc_stats: RPCStats::default(), + latency: None, + transfer: TransferStatsDownUp::default(), + }, + latency_stats_accounting: LatencyStatsAccounting::new(), + transfer_stats_accounting: TransferStatsAccounting::new(), + #[cfg(feature = "tracking")] + next_track_id: 0, + #[cfg(feature = "tracking")] + node_ref_tracks: HashMap::new(), + }; + + Self::new_with_inner(inner) } pub(super) fn new_with_inner(inner: BucketEntryInner) -> Self { diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 14a5f0c3..616a31b4 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -571,9 +571,14 @@ impl RoutingTable { inner.get_all_nodes(self.clone(), cur_ts) } - fn queue_bucket_kick(&self, node_id: TypedKey) { - let idx = self.unlocked_inner.find_bucket_index(node_id).unwrap(); - self.unlocked_inner.kick_queue.lock().insert(idx); + fn queue_bucket_kicks(&self, node_ids: TypedKeySet) { + for node_id in node_ids.iter() { + let Some(x) = self.unlocked_inner.find_bucket_index(*node_id) else { + log_rtab!(error "find bucket index failed for nodeid {}", node_id); + continue; + }; + self.unlocked_inner.kick_queue.lock().insert(x); + } } /// Resolve an existing routing table entry and return a reference to it diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 2537a97f..e2a7d6bd 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -101,6 +101,9 @@ pub trait NodeRefBase: Sized { fn node_ids(&self) -> TypedKeySet { self.operate(|_rti, e| e.node_ids()) } + fn best_node_id(&self) -> TypedKey { + self.operate(|_rti, e| e.best_node_id()) + } fn has_updated_since_last_network_change(&self) -> bool { self.operate(|_rti, e| e.has_updated_since_last_network_change()) } @@ -281,7 +284,7 @@ pub trait NodeRefBase: Sized { fn set_last_connection(&self, connection_descriptor: ConnectionDescriptor, ts: Timestamp) { self.operate_mut(|rti, e| { e.set_last_connection(connection_descriptor, ts); - rti.touch_recent_peer(self.common().node_id, connection_descriptor); + rti.touch_recent_peer(e.best_node_id(), connection_descriptor); }) } @@ -426,14 +429,14 @@ impl Clone for NodeRef { impl fmt::Display for NodeRef { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.common.node_id.encode()) + write!(f, "{}", self.common.entry.with_inner(|e| e.best_node_id())) } } impl fmt::Debug for NodeRef { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("NodeRef") - .field("node_id", &self.common.node_id) + .field("node_ids", &self.common.entry.with_inner(|e| e.node_ids())) .field("filter", &self.common.filter) .field("sequencing", &self.common.sequencing) .finish() @@ -453,9 +456,10 @@ impl Drop for NodeRef { .fetch_sub(1u32, Ordering::Relaxed) - 1; if new_ref_count == 0 { - self.common - .routing_table - .queue_bucket_kick(self.common.node_id); + // get node ids with inner unlocked because nothing could be referencing this entry now + // and we don't know when it will get dropped, possibly inside a lock + let node_ids = self.common().entry.with_inner(|e| e.node_ids()); + self.common.routing_table.queue_bucket_kicks(node_ids); } } }