bootstrap
This commit is contained in:
		| @@ -11,19 +11,19 @@ pub struct Bucket { | ||||
|     /// handle to the routing table | ||||
|     routing_table: RoutingTable, | ||||
|     /// Map of keys to entries for this bucket | ||||
|     entries: BTreeMap<TypedKey, Arc<BucketEntry>>, | ||||
|     entries: BTreeMap<PublicKey, Arc<BucketEntry>>, | ||||
|     /// The most recent entry in this bucket | ||||
|     newest_entry: Option<TypedKey>, | ||||
|     newest_entry: Option<PublicKey>, | ||||
|     /// The crypto kind in use for the public keys in this bucket | ||||
|     kind: CryptoKind, | ||||
| } | ||||
| pub(super) type EntriesIter<'a> = | ||||
|     alloc::collections::btree_map::Iter<'a, TypedKey, Arc<BucketEntry>>; | ||||
|     alloc::collections::btree_map::Iter<'a, PublicKey, Arc<BucketEntry>>; | ||||
|  | ||||
| #[derive(Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)] | ||||
| #[archive_attr(repr(C), derive(CheckBytes))] | ||||
| struct SerializedBucketEntryData { | ||||
|     key: TypedKey, | ||||
|     key: PublicKey, | ||||
|     value: u32, // index into serialized entries list | ||||
| } | ||||
|  | ||||
| @@ -31,7 +31,7 @@ struct SerializedBucketEntryData { | ||||
| #[archive_attr(repr(C), derive(CheckBytes))] | ||||
| struct SerializedBucketData { | ||||
|     entries: Vec<SerializedBucketEntryData>, | ||||
|     newest_entry: Option<TypedKey>, | ||||
|     newest_entry: Option<PublicKey>, | ||||
| } | ||||
|  | ||||
| fn state_ordering(state: BucketEntryState) -> usize { | ||||
| @@ -95,49 +95,45 @@ impl Bucket { | ||||
|     } | ||||
|  | ||||
|     /// Create a new entry with a node_id of this crypto kind and return it | ||||
|     pub(super) fn add_entry(&mut self, node_id: TypedKey) -> NodeRef { | ||||
|         assert_eq!(node_id.kind, self.kind); | ||||
|  | ||||
|         log_rtab!("Node added: {}", node_id); | ||||
|     pub(super) fn add_entry(&mut self, node_id_key: PublicKey) -> NodeRef { | ||||
|         log_rtab!("Node added: {}:{}", self.kind, node_id_key); | ||||
|  | ||||
|         // Add new entry | ||||
|         let entry = Arc::new(BucketEntry::new(node_id)); | ||||
|         self.entries.insert(node_id.key, entry.clone()); | ||||
|         let entry = Arc::new(BucketEntry::new(TypedKey::new(self.kind, node_id_key))); | ||||
|         self.entries.insert(node_id_key, entry.clone()); | ||||
|  | ||||
|         // This is now the newest bucket entry | ||||
|         self.newest_entry = Some(node_id.key); | ||||
|         self.newest_entry = Some(node_id_key); | ||||
|  | ||||
|         // Get a node ref to return since this is new | ||||
|         NodeRef::new(self.routing_table.clone(), entry, None) | ||||
|     } | ||||
|  | ||||
|     /// Add an existing entry with a new node_id for this crypto kind | ||||
|     pub(super) fn add_existing_entry(&mut self, node_id: TypedKey, entry: Arc<BucketEntry>) { | ||||
|         assert_eq!(node_id.kind, self.kind); | ||||
|  | ||||
|         log_rtab!("Existing node added: {}", node_id); | ||||
|     pub(super) fn add_existing_entry(&mut self, node_id_key: PublicKey, entry: Arc<BucketEntry>) { | ||||
|         log_rtab!("Existing node added: {}:{}", self.kind, node_id_key); | ||||
|  | ||||
|         // Add existing entry | ||||
|         entry.with_mut_inner(|e| e.add_node_id(node_id)); | ||||
|         self.entries.insert(node_id.key, entry); | ||||
|         entry.with_mut_inner(|e| e.add_node_id(TypedKey::new(self.kind, node_id_key))); | ||||
|         self.entries.insert(node_id_key, entry); | ||||
|  | ||||
|         // This is now the newest bucket entry | ||||
|         self.newest_entry = Some(node_id.key); | ||||
|         self.newest_entry = Some(node_id_key); | ||||
|  | ||||
|         // No need to return a noderef here because the noderef will already exist in the caller | ||||
|     } | ||||
|  | ||||
|     /// Remove an entry with a node_id for this crypto kind from the bucket | ||||
|     fn remove_entry(&mut self, node_id: &TypedKey) { | ||||
|         log_rtab!("Node removed: {}:{}", self.kind, node_id); | ||||
|     fn remove_entry(&mut self, node_id_key: &PublicKey) { | ||||
|         log_rtab!("Node removed: {}:{}", self.kind, node_id_key); | ||||
|  | ||||
|         // Remove the entry | ||||
|         self.entries.remove(node_id); | ||||
|         self.entries.remove(node_id_key); | ||||
|  | ||||
|         // newest_entry is updated by kick_bucket() | ||||
|     } | ||||
|  | ||||
|     pub(super) fn entry(&self, key: &TypedKey) -> Option<Arc<BucketEntry>> { | ||||
|     pub(super) fn entry(&self, key: &PublicKey) -> Option<Arc<BucketEntry>> { | ||||
|         self.entries.get(key).cloned() | ||||
|     } | ||||
|  | ||||
| @@ -145,7 +141,7 @@ impl Bucket { | ||||
|         self.entries.iter() | ||||
|     } | ||||
|  | ||||
|     pub(super) fn kick(&mut self, bucket_depth: usize) -> Option<BTreeSet<TypedKey>> { | ||||
|     pub(super) fn kick(&mut self, bucket_depth: usize) -> Option<BTreeSet<PublicKey>> { | ||||
|         // Get number of entries to attempt to purge from bucket | ||||
|         let bucket_len = self.entries.len(); | ||||
|  | ||||
| @@ -155,11 +151,11 @@ impl Bucket { | ||||
|         } | ||||
|  | ||||
|         // Try to purge the newest entries that overflow the bucket | ||||
|         let mut dead_node_ids: BTreeSet<TypedKey> = BTreeSet::new(); | ||||
|         let mut dead_node_ids: BTreeSet<PublicKey> = BTreeSet::new(); | ||||
|         let mut extra_entries = bucket_len - bucket_depth; | ||||
|  | ||||
|         // Get the sorted list of entries by their kick order | ||||
|         let mut sorted_entries: Vec<(TypedKey, Arc<BucketEntry>)> = self | ||||
|         let mut sorted_entries: Vec<(PublicKey, Arc<BucketEntry>)> = self | ||||
|             .entries | ||||
|             .iter() | ||||
|             .map(|(k, v)| (k.clone(), v.clone())) | ||||
|   | ||||
| @@ -7,7 +7,7 @@ impl RoutingTable { | ||||
|         let inner = self.inner.read(); | ||||
|         out += "Routing Table Info:\n"; | ||||
|  | ||||
|         out += &format!("   Node Id: {}\n", self.unlocked_inner.node_id.encode()); | ||||
|         out += &format!("   Node Ids: {}\n", self.unlocked_inner.node_ids()); | ||||
|         out += &format!( | ||||
|             "   Self Latency Stats Accounting: {:#?}\n\n", | ||||
|             inner.self_latency_stats_accounting | ||||
| @@ -55,13 +55,20 @@ impl RoutingTable { | ||||
|             short_urls.sort(); | ||||
|             short_urls.dedup(); | ||||
|  | ||||
|             let valid_envelope_versions = VALID_ENVELOPE_VERSIONS.map(|x| x.to_string()).join(","); | ||||
|             let node_ids = self | ||||
|                 .unlocked_inner | ||||
|                 .node_ids() | ||||
|                 .iter() | ||||
|                 .map(|x| x.to_string()) | ||||
|                 .collect::<Vec<String>>() | ||||
|                 .join(","); | ||||
|             out += "TXT Record:\n"; | ||||
|             out += &format!( | ||||
|                 "{},{},{},{},{}", | ||||
|                 "{}|{}|{}|{}|", | ||||
|                 BOOTSTRAP_TXT_VERSION, | ||||
|                 MIN_CRYPTO_VERSION, | ||||
|                 MAX_CRYPTO_VERSION, | ||||
|                 self.node_id().encode(), | ||||
|                 valid_envelope_versions, | ||||
|                 node_ids, | ||||
|                 some_hostname.unwrap() | ||||
|             ); | ||||
|             for short_url in short_urls { | ||||
|   | ||||
| @@ -111,7 +111,15 @@ impl RoutingTableUnlockedInner { | ||||
|     } | ||||
|  | ||||
|     pub fn node_id(&self, kind: CryptoKind) -> TypedKey { | ||||
|         self.node_id_keypairs.get(&kind).unwrap().key | ||||
|         TypedKey::new(kind, self.node_id_keypairs.get(&kind).unwrap().key) | ||||
|     } | ||||
|  | ||||
|     pub fn node_ids(&self) -> TypedKeySet { | ||||
|         let mut tks = TypedKeySet::new(); | ||||
|         for x in &self.node_id_keypairs { | ||||
|             tks.add(TypedKey::new(*x.0, x.1.key)); | ||||
|         } | ||||
|         tks | ||||
|     } | ||||
|  | ||||
|     pub fn node_id_secret(&self, kind: CryptoKind) -> SecretKey { | ||||
| @@ -890,6 +898,7 @@ impl RoutingTable { | ||||
|  | ||||
|     pub fn find_closest_nodes<'a, T, O>( | ||||
|         &self, | ||||
|         node_count: usize, | ||||
|         node_id: TypedKey, | ||||
|         filters: VecDeque<RoutingTableEntryFilter>, | ||||
|         transform: T, | ||||
| @@ -899,7 +908,7 @@ impl RoutingTable { | ||||
|     { | ||||
|         self.inner | ||||
|             .read() | ||||
|             .find_closest_nodes(node_id, filters, transform) | ||||
|             .find_closest_nodes(node_count, node_id, filters, transform) | ||||
|     } | ||||
|  | ||||
|     #[instrument(level = "trace", skip(self), ret)] | ||||
| @@ -940,14 +949,14 @@ impl RoutingTable { | ||||
|  | ||||
|     #[instrument(level = "trace", skip(self), ret, err)] | ||||
|     pub async fn find_self(&self, node_ref: NodeRef) -> EyreResult<NetworkResult<Vec<NodeRef>>> { | ||||
|         let node_id = self.node_id(); | ||||
|         self.find_node(node_ref, node_id).await | ||||
|         let self_node_id = self.node_id(); | ||||
|         self.find_node(node_ref, self_node_id).await | ||||
|     } | ||||
|  | ||||
|     #[instrument(level = "trace", skip(self), ret, err)] | ||||
|     pub async fn find_target(&self, node_ref: NodeRef) -> EyreResult<NetworkResult<Vec<NodeRef>>> { | ||||
|         let node_id = node_ref.node_id(); | ||||
|         self.find_node(node_ref, node_id).await | ||||
|         let target_node_id = node_ref.node_id(); | ||||
|         self.find_node(node_ref, target_node_id).await | ||||
|     } | ||||
|  | ||||
|     #[instrument(level = "trace", skip(self))] | ||||
| @@ -1047,12 +1056,12 @@ impl RoutingTable { | ||||
|         // Go through all entries and find fastest entry that matches filter function | ||||
|         let inner = self.inner.read(); | ||||
|         let inner = &*inner; | ||||
|         let mut best_inbound_relay: Option<(TypedKey, Arc<BucketEntry>)> = None; | ||||
|         let mut best_inbound_relay: Option<Arc<BucketEntry>> = None; | ||||
|  | ||||
|         // Iterate all known nodes for candidates | ||||
|         inner.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, k, v| { | ||||
|             let v2 = v.clone(); | ||||
|             v.with(rti, |rti, e| { | ||||
|         inner.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, entry| { | ||||
|             let entry2 = entry.clone(); | ||||
|             entry.with(rti, |rti, e| { | ||||
|                 // Ensure we have the node's status | ||||
|                 if let Some(node_status) = e.node_status(routing_domain) { | ||||
|                     // Ensure the node will relay | ||||
| @@ -1060,18 +1069,18 @@ impl RoutingTable { | ||||
|                         // Compare against previous candidate | ||||
|                         if let Some(best_inbound_relay) = best_inbound_relay.as_mut() { | ||||
|                             // Less is faster | ||||
|                             let better = best_inbound_relay.1.with(rti, |_rti, best| { | ||||
|                             let better = best_inbound_relay.with(rti, |_rti, best| { | ||||
|                                 // choose low latency stability for relays | ||||
|                                 BucketEntryInner::cmp_fastest_reliable(cur_ts, e, best) | ||||
|                                     == std::cmp::Ordering::Less | ||||
|                             }); | ||||
|                             // Now apply filter function and see if this node should be included | ||||
|                             if better && relay_node_filter(e) { | ||||
|                                 *best_inbound_relay = (k, v2); | ||||
|                                 *best_inbound_relay = entry2; | ||||
|                             } | ||||
|                         } else if relay_node_filter(e) { | ||||
|                             // Always store the first candidate | ||||
|                             best_inbound_relay = Some((k, v2)); | ||||
|                             best_inbound_relay = Some(entry2); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
| @@ -1080,6 +1089,6 @@ impl RoutingTable { | ||||
|             Option::<()>::None | ||||
|         }); | ||||
|         // Return the best inbound relay noderef | ||||
|         best_inbound_relay.map(|(k, e)| NodeRef::new(self.clone(), e, None)) | ||||
|         best_inbound_relay.map(|e| NodeRef::new(self.clone(), e, None)) | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -1,4 +1,5 @@ | ||||
| use super::*; | ||||
| use weak_table::PtrWeakHashSet; | ||||
|  | ||||
| const RECENT_PEERS_TABLE_SIZE: usize = 64; | ||||
|  | ||||
| @@ -15,8 +16,8 @@ pub struct RoutingTableInner { | ||||
|     pub(super) unlocked_inner: Arc<RoutingTableUnlockedInner>, | ||||
|     /// Routing table buckets that hold references to entries, per crypto kind | ||||
|     pub(super) buckets: BTreeMap<CryptoKind, Vec<Bucket>>, | ||||
|     /// A fast counter for the number of entries in the table, total | ||||
|     pub(super) bucket_entry_count: usize, | ||||
|     /// A weak set of all the entries we have in the buckets for faster iteration | ||||
|     pub(super) all_entries: PtrWeakHashSet<Weak<BucketEntry>>, | ||||
|     /// The public internet routing domain | ||||
|     pub(super) public_internet_routing_domain: PublicInternetRoutingDomainDetail, | ||||
|     /// The dial info we use on the local network | ||||
| @@ -40,7 +41,7 @@ impl RoutingTableInner { | ||||
|             buckets: BTreeMap::new(), | ||||
|             public_internet_routing_domain: PublicInternetRoutingDomainDetail::default(), | ||||
|             local_network_routing_domain: LocalNetworkRoutingDomainDetail::default(), | ||||
|             bucket_entry_count: 0, | ||||
|             all_entries: PtrWeakHashSet::new(), | ||||
|             self_latency_stats_accounting: LatencyStatsAccounting::new(), | ||||
|             self_transfer_stats_accounting: TransferStatsAccounting::new(), | ||||
|             self_transfer_stats: TransferStatsDownUp::default(), | ||||
| @@ -49,6 +50,10 @@ impl RoutingTableInner { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn bucket_entry_count(&self) -> usize { | ||||
|         self.all_entries.len() | ||||
|     } | ||||
|  | ||||
|     pub fn transfer_stats_accounting(&mut self) -> &mut TransferStatsAccounting { | ||||
|         &mut self.self_transfer_stats_accounting | ||||
|     } | ||||
| @@ -209,7 +214,7 @@ impl RoutingTableInner { | ||||
|  | ||||
|     pub fn reset_all_updated_since_last_network_change(&mut self) { | ||||
|         let cur_ts = get_aligned_timestamp(); | ||||
|         self.with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, _, v| { | ||||
|         self.with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, v| { | ||||
|             v.with_mut(rti, |_rti, e| { | ||||
|                 e.set_updated_since_last_network_change(false) | ||||
|             }); | ||||
| @@ -310,7 +315,7 @@ impl RoutingTableInner { | ||||
|         for ck in VALID_CRYPTO_KINDS { | ||||
|             let ckbuckets = Vec::with_capacity(PUBLIC_KEY_LENGTH * 8); | ||||
|             for _ in 0..PUBLIC_KEY_LENGTH * 8 { | ||||
|                 let bucket = Bucket::new(routing_table.clone()); | ||||
|                 let bucket = Bucket::new(routing_table.clone(), ck); | ||||
|                 ckbuckets.push(bucket); | ||||
|             } | ||||
|             self.buckets.insert(ck, ckbuckets); | ||||
| @@ -345,14 +350,18 @@ impl RoutingTableInner { | ||||
|     pub fn purge_buckets(&mut self) { | ||||
|         log_rtab!( | ||||
|             "Starting routing table buckets purge. Table currently has {} nodes", | ||||
|             self.bucket_entry_count | ||||
|             self.bucket_entry_count() | ||||
|         ); | ||||
|         for bucket in &mut self.buckets { | ||||
|             bucket.kick(0); | ||||
|         for ck in VALID_CRYPTO_KINDS { | ||||
|             for bucket in &mut self.buckets[&ck] { | ||||
|                 bucket.kick(0); | ||||
|             } | ||||
|         } | ||||
|         self.all_entries.remove_expired(); | ||||
|  | ||||
|         log_rtab!(debug | ||||
|              "Routing table buckets purge complete. Routing table now has {} nodes", | ||||
|             self.bucket_entry_count | ||||
|             self.bucket_entry_count() | ||||
|         ); | ||||
|     } | ||||
|  | ||||
| @@ -360,32 +369,36 @@ impl RoutingTableInner { | ||||
|     pub fn purge_last_connections(&mut self) { | ||||
|         log_rtab!( | ||||
|             "Starting routing table last_connections purge. Table currently has {} nodes", | ||||
|             self.bucket_entry_count | ||||
|             self.bucket_entry_count() | ||||
|         ); | ||||
|         for bucket in &self.buckets { | ||||
|             for entry in bucket.entries() { | ||||
|                 entry.1.with_mut_inner(|e| { | ||||
|                     e.clear_last_connections(); | ||||
|                 }); | ||||
|         for ck in VALID_CRYPTO_KINDS { | ||||
|             for bucket in &self.buckets[&ck] { | ||||
|                 for entry in bucket.entries() { | ||||
|                     entry.1.with_mut_inner(|e| { | ||||
|                         e.clear_last_connections(); | ||||
|                     }); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         self.all_entries.remove_expired(); | ||||
|  | ||||
|         log_rtab!(debug | ||||
|              "Routing table last_connections purge complete. Routing table now has {} nodes", | ||||
|              self.bucket_entry_count | ||||
|              self.bucket_entry_count() | ||||
|         ); | ||||
|     } | ||||
|  | ||||
|     /// Attempt to settle buckets and remove entries down to the desired number | ||||
|     /// which may not be possible due extant NodeRefs | ||||
|     pub fn kick_bucket(&mut self, idx: usize) { | ||||
|         let bucket = &mut self.buckets[idx]; | ||||
|     pub fn kick_bucket(&mut self, kind: CryptoKind, idx: usize) { | ||||
|         let bucket = &mut self.buckets[&kind][idx]; | ||||
|         let bucket_depth = Self::bucket_depth(idx); | ||||
|  | ||||
|         if let Some(dead_node_ids) = bucket.kick(bucket_depth) { | ||||
|             // Remove counts | ||||
|             self.bucket_entry_count -= dead_node_ids.len(); | ||||
|             log_rtab!(debug "Routing table now has {} nodes", self.bucket_entry_count); | ||||
|             // Remove expired entries | ||||
|             self.all_entries.remove_expired(); | ||||
|  | ||||
|             log_rtab!(debug "Bucket {}:{} kicked Routing table now has {} nodes", kind, idx, self.bucket_entry_count()); | ||||
|  | ||||
|             // Now purge the routing table inner vectors | ||||
|             //let filter = |k: &DHTKey| dead_node_ids.contains(k); | ||||
| @@ -403,7 +416,7 @@ impl RoutingTableInner { | ||||
|     ) -> usize { | ||||
|         let mut count = 0usize; | ||||
|         let cur_ts = get_aligned_timestamp(); | ||||
|         self.with_entries(cur_ts, min_state, |rti, _, e| { | ||||
|         self.with_entries(cur_ts, min_state, |rti, e| { | ||||
|             if e.with(rti, |rti, e| e.best_routing_domain(rti, routing_domain_set)) | ||||
|                 .is_some() | ||||
|             { | ||||
| @@ -414,54 +427,36 @@ impl RoutingTableInner { | ||||
|         count | ||||
|     } | ||||
|  | ||||
|     pub fn with_entries< | ||||
|         T, | ||||
|         F: FnMut(&RoutingTableInner, TypedKey, Arc<BucketEntry>) -> Option<T>, | ||||
|     >( | ||||
|     pub fn with_entries<T, F: FnMut(&RoutingTableInner, Arc<BucketEntry>) -> Option<T>>( | ||||
|         &self, | ||||
|         cur_ts: Timestamp, | ||||
|         min_state: BucketEntryState, | ||||
|         mut f: F, | ||||
|     ) -> Option<T> { | ||||
|         let mut entryvec = Vec::with_capacity(self.bucket_entry_count); | ||||
|         for bucket in &self.buckets { | ||||
|             for entry in bucket.entries() { | ||||
|                 if entry.1.with(self, |_rti, e| e.state(cur_ts) >= min_state) { | ||||
|                     entryvec.push((*entry.0, entry.1.clone())); | ||||
|         for entry in self.all_entries { | ||||
|             if entry.with(self, |_rti, e| e.state(cur_ts) >= min_state) { | ||||
|                 if let Some(out) = f(self, entry) { | ||||
|                     return Some(out); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         for entry in entryvec { | ||||
|             if let Some(out) = f(self, entry.0, entry.1) { | ||||
|                 return Some(out); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         None | ||||
|     } | ||||
|  | ||||
|     pub fn with_entries_mut< | ||||
|         T, | ||||
|         F: FnMut(&mut RoutingTableInner, TypedKey, Arc<BucketEntry>) -> Option<T>, | ||||
|     >( | ||||
|     pub fn with_entries_mut<T, F: FnMut(&mut RoutingTableInner, Arc<BucketEntry>) -> Option<T>>( | ||||
|         &mut self, | ||||
|         cur_ts: Timestamp, | ||||
|         min_state: BucketEntryState, | ||||
|         mut f: F, | ||||
|     ) -> Option<T> { | ||||
|         let mut entryvec = Vec::with_capacity(self.bucket_entry_count); | ||||
|         for bucket in &self.buckets { | ||||
|             for entry in bucket.entries() { | ||||
|                 if entry.1.with(self, |_rti, e| e.state(cur_ts) >= min_state) { | ||||
|                     entryvec.push((*entry.0, entry.1.clone())); | ||||
|         for entry in self.all_entries { | ||||
|             if entry.with(self, |_rti, e| e.state(cur_ts) >= min_state) { | ||||
|                 if let Some(out) = f(self, entry) { | ||||
|                     return Some(out); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         for entry in entryvec { | ||||
|             if let Some(out) = f(self, entry.0, entry.1) { | ||||
|                 return Some(out); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         None | ||||
|     } | ||||
| @@ -728,18 +723,16 @@ impl RoutingTableInner { | ||||
|         let mut dead_entry_count: usize = 0; | ||||
|  | ||||
|         let cur_ts = get_aligned_timestamp(); | ||||
|         for bucket in &self.buckets { | ||||
|             for (_, v) in bucket.entries() { | ||||
|                 match v.with(self, |_rti, e| e.state(cur_ts)) { | ||||
|                     BucketEntryState::Reliable => { | ||||
|                         reliable_entry_count += 1; | ||||
|                     } | ||||
|                     BucketEntryState::Unreliable => { | ||||
|                         unreliable_entry_count += 1; | ||||
|                     } | ||||
|                     BucketEntryState::Dead => { | ||||
|                         dead_entry_count += 1; | ||||
|                     } | ||||
|         for entry in self.all_entries { | ||||
|             match entry.with(self, |_rti, e| e.state(cur_ts)) { | ||||
|                 BucketEntryState::Reliable => { | ||||
|                     reliable_entry_count += 1; | ||||
|                 } | ||||
|                 BucketEntryState::Unreliable => { | ||||
|                     unreliable_entry_count += 1; | ||||
|                 } | ||||
|                 BucketEntryState::Dead => { | ||||
|                     dead_entry_count += 1; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
| @@ -798,7 +791,7 @@ impl RoutingTableInner { | ||||
|         self.find_fastest_nodes( | ||||
|             node_count, | ||||
|             filters, | ||||
|             |_rti: &RoutingTableInner, k: TypedKey, v: Option<Arc<BucketEntry>>| { | ||||
|             |_rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| { | ||||
|                 NodeRef::new(outer_self.clone(), v.unwrap().clone(), None) | ||||
|             }, | ||||
|         ) | ||||
| @@ -846,30 +839,30 @@ impl RoutingTableInner { | ||||
|             &'b Option<Arc<BucketEntry>>, | ||||
|             &'b Option<Arc<BucketEntry>>, | ||||
|         ) -> core::cmp::Ordering, | ||||
|         T: for<'r> FnMut(&'r RoutingTableInner, TypedKey, Option<Arc<BucketEntry>>) -> O, | ||||
|         T: for<'r> FnMut(&'r RoutingTableInner, Option<Arc<BucketEntry>>) -> O, | ||||
|     { | ||||
|         // collect all the nodes for sorting | ||||
|         let mut nodes = | ||||
|             Vec::<(TypedKey, Option<Arc<BucketEntry>>)>::with_capacity(self.bucket_entry_count + 1); | ||||
|             Vec::<Option<Arc<BucketEntry>>>::with_capacity(self.bucket_entry_count() + 1); | ||||
|  | ||||
|         // add our own node (only one of there with the None entry) | ||||
|         let mut filtered = false; | ||||
|         for filter in &mut filters { | ||||
|             if !filter(self, self.unlocked_inner.node_id, None) { | ||||
|             if !filter(self, None) { | ||||
|                 filtered = true; | ||||
|                 break; | ||||
|             } | ||||
|         } | ||||
|         if !filtered { | ||||
|             nodes.push((self.unlocked_inner.node_id, None)); | ||||
|             nodes.push(None); | ||||
|         } | ||||
|  | ||||
|         // add all nodes from buckets | ||||
|         self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, k, v| { | ||||
|         // add all nodes that match filter | ||||
|         self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, v| { | ||||
|             // Apply filter | ||||
|             for filter in &mut filters { | ||||
|                 if filter(rti, k, Some(v.clone())) { | ||||
|                     nodes.push((k, Some(v.clone()))); | ||||
|                 if filter(rti, Some(v.clone())) { | ||||
|                     nodes.push(Some(v.clone())); | ||||
|                     break; | ||||
|                 } | ||||
|             } | ||||
| @@ -883,7 +876,7 @@ impl RoutingTableInner { | ||||
|         let cnt = usize::min(node_count, nodes.len()); | ||||
|         let mut out = Vec::<O>::with_capacity(cnt); | ||||
|         for node in nodes { | ||||
|             let val = transform(self, node.0, node.1); | ||||
|             let val = transform(self, node); | ||||
|             out.push(val); | ||||
|         } | ||||
|  | ||||
| @@ -921,12 +914,19 @@ impl RoutingTableInner { | ||||
|  | ||||
|         // Fastest sort | ||||
|         let sort = |rti: &RoutingTableInner, | ||||
|                     (a_key, a_entry): &(TypedKey, Option<Arc<BucketEntry>>), | ||||
|                     (b_key, b_entry): &(TypedKey, Option<Arc<BucketEntry>>)| { | ||||
|                     a_entry: &Option<Arc<BucketEntry>>, | ||||
|                     b_entry: &Option<Arc<BucketEntry>>| { | ||||
|             // same nodes are always the same | ||||
|             if a_key == b_key { | ||||
|             if let Some(a_entry) = a_entry { | ||||
|                 if let Some(b_entry) = b_entry { | ||||
|                     if Arc::ptr_eq(&a_entry, &b_entry) { | ||||
|                         return core::cmp::Ordering::Equal; | ||||
|                     } | ||||
|                 } | ||||
|             } else if b_entry.is_none() { | ||||
|                 return core::cmp::Ordering::Equal; | ||||
|             } | ||||
|  | ||||
|             // our own node always comes last (should not happen, here for completeness) | ||||
|             if a_entry.is_none() { | ||||
|                 return core::cmp::Ordering::Greater; | ||||
| @@ -977,26 +977,28 @@ impl RoutingTableInner { | ||||
|  | ||||
|     pub fn find_closest_nodes<T, O>( | ||||
|         &self, | ||||
|         node_count: usize, | ||||
|         node_id: TypedKey, | ||||
|         filters: VecDeque<RoutingTableEntryFilter>, | ||||
|         transform: T, | ||||
|     ) -> Vec<O> | ||||
|     where | ||||
|         T: for<'r> FnMut(&'r RoutingTableInner, TypedKey, Option<Arc<BucketEntry>>) -> O, | ||||
|         T: for<'r> FnMut(&'r RoutingTableInner, Option<Arc<BucketEntry>>) -> O, | ||||
|     { | ||||
|         let cur_ts = get_aligned_timestamp(); | ||||
|         let node_count = { | ||||
|             let config = self.config(); | ||||
|             let c = config.get(); | ||||
|             c.network.dht.max_find_node_count as usize | ||||
|         }; | ||||
|  | ||||
|         // closest sort | ||||
|         let sort = |rti: &RoutingTableInner, | ||||
|                     (a_key, a_entry): &(TypedKey, Option<Arc<BucketEntry>>), | ||||
|                     (b_key, b_entry): &(TypedKey, Option<Arc<BucketEntry>>)| { | ||||
|                     a_entry: &Option<Arc<BucketEntry>>, | ||||
|                     b_entry: &Option<Arc<BucketEntry>>| { | ||||
|             // same nodes are always the same | ||||
|             if a_key == b_key { | ||||
|             if let Some(a_entry) = a_entry { | ||||
|                 if let Some(b_entry) = b_entry { | ||||
|                     if Arc::ptr_eq(&a_entry, &b_entry) { | ||||
|                         return core::cmp::Ordering::Equal; | ||||
|                     } | ||||
|                 } | ||||
|             } else if b_entry.is_none() { | ||||
|                 return core::cmp::Ordering::Equal; | ||||
|             } | ||||
|  | ||||
|   | ||||
| @@ -3,23 +3,122 @@ use super::*; | ||||
| use futures_util::stream::{FuturesUnordered, StreamExt}; | ||||
| use stop_token::future::FutureExt as StopFutureExt; | ||||
|  | ||||
| pub const BOOTSTRAP_TXT_VERSION: u8 = 0; | ||||
| pub const BOOTSTRAP_TXT_VERSION_0: u8 = 0; | ||||
|  | ||||
| #[derive(Clone, Debug)] | ||||
| pub struct BootstrapRecord { | ||||
|     min_version: u8, | ||||
|     max_version: u8, | ||||
|     node_ids: TypedKeySet, | ||||
|     envelope_support: Vec<u8>, | ||||
|     dial_info_details: Vec<DialInfoDetail>, | ||||
| } | ||||
| pub type BootstrapRecordMap = BTreeMap<PublicKey, BootstrapRecord>; | ||||
| impl BootstrapRecord { | ||||
|     pub fn merge(&mut self, other: &BootstrapRecord) { | ||||
|         self.node_ids.add_all(&other.node_ids); | ||||
|         for x in other.envelope_support { | ||||
|             if !self.envelope_support.contains(&x) { | ||||
|                 self.envelope_support.push(x); | ||||
|                 self.envelope_support.sort(); | ||||
|             } | ||||
|         } | ||||
|         for did in &other.dial_info_details { | ||||
|             if !self.dial_info_details.contains(did) { | ||||
|                 self.dial_info_details.push(did.clone()); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl RoutingTable { | ||||
|     /// Process bootstrap version 0 | ||||
|     async fn process_bootstrap_records_v0( | ||||
|         &self, | ||||
|         records: Vec<String>, | ||||
|     ) -> EyreResult<Option<BootstrapRecord>> { | ||||
|         // Bootstrap TXT Record Format Version 0: | ||||
|         // txt_version|envelope_support|node_ids|hostname|dialinfoshort* | ||||
|         // | ||||
|         // Split bootstrap node record by '|' and then lists by ','. Example: | ||||
|         // 0|0|VLD0:7lxDEabK_qgjbe38RtBa3IZLrud84P6NhGP-pRTZzdQ|bootstrap-1.dev.veilid.net|T5150,U5150,W5150/ws | ||||
|  | ||||
|         if records.len() != 5 { | ||||
|             bail!("invalid number of fields in bootstrap v0 txt record"); | ||||
|         } | ||||
|  | ||||
|         // Envelope support | ||||
|         let mut envelope_support = Vec::new(); | ||||
|         for ess in records[1].split(",") { | ||||
|             let ess = ess.trim(); | ||||
|             let es = match records[1].parse::<u8>() { | ||||
|                 Ok(v) => v, | ||||
|                 Err(e) => { | ||||
|                     bail!( | ||||
|                         "invalid envelope version specified in bootstrap node txt record: {}", | ||||
|                         e | ||||
|                     ); | ||||
|                 } | ||||
|             }; | ||||
|             envelope_support.push(es); | ||||
|         } | ||||
|         envelope_support.dedup(); | ||||
|         envelope_support.sort(); | ||||
|  | ||||
|         // Node Id | ||||
|         let node_ids = TypedKeySet::new(); | ||||
|         for node_id_str in records[2].split(",") { | ||||
|             let node_id_str = node_id_str.trim(); | ||||
|             let node_id = match TypedKey::from_str(&node_id_str) { | ||||
|                 Ok(v) => v, | ||||
|                 Err(e) => { | ||||
|                     bail!( | ||||
|                         "Invalid node id in bootstrap node record {}: {}", | ||||
|                         node_id_str, | ||||
|                         e | ||||
|                     ); | ||||
|                 } | ||||
|             }; | ||||
|         } | ||||
|  | ||||
|         // If this is our own node id, then we skip it for bootstrap, in case we are a bootstrap node | ||||
|         if self.unlocked_inner.matches_own_node_id(&node_ids) { | ||||
|             return Ok(None); | ||||
|         } | ||||
|  | ||||
|         // Hostname | ||||
|         let hostname_str = records[3].trim(); | ||||
|  | ||||
|         // Resolve each record and store in node dial infos list | ||||
|         let mut dial_info_details = Vec::new(); | ||||
|         for rec in records[4].split(",") { | ||||
|             let rec = rec.trim(); | ||||
|             let dial_infos = match DialInfo::try_vec_from_short(rec, hostname_str) { | ||||
|                 Ok(dis) => dis, | ||||
|                 Err(e) => { | ||||
|                     warn!("Couldn't resolve bootstrap node dial info {}: {}", rec, e); | ||||
|                     continue; | ||||
|                 } | ||||
|             }; | ||||
|  | ||||
|             for di in dial_infos { | ||||
|                 dial_info_details.push(DialInfoDetail { | ||||
|                     dial_info: di, | ||||
|                     class: DialInfoClass::Direct, | ||||
|                 }); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         Ok(Some(BootstrapRecord { | ||||
|             node_ids, | ||||
|             envelope_support, | ||||
|             dial_info_details, | ||||
|         })) | ||||
|     } | ||||
|  | ||||
|     // Bootstrap lookup process | ||||
|     #[instrument(level = "trace", skip(self), ret, err)] | ||||
|     pub(crate) async fn resolve_bootstrap( | ||||
|         &self, | ||||
|         bootstrap: Vec<String>, | ||||
|     ) -> EyreResult<BootstrapRecordMap> { | ||||
|     ) -> EyreResult<Vec<BootstrapRecord>> { | ||||
|         // Resolve from bootstrap root to bootstrap hostnames | ||||
|         let mut bsnames = Vec::<String>::new(); | ||||
|         for bh in bootstrap { | ||||
| @@ -58,22 +157,14 @@ impl RoutingTable { | ||||
|                         Ok(v) => v, | ||||
|                     }; | ||||
|                     // for each record resolve into key/bootstraprecord pairs | ||||
|                     let mut bootstrap_records: Vec<(PublicKey, BootstrapRecord)> = Vec::new(); | ||||
|                     let mut bootstrap_records: Vec<BootstrapRecord> = Vec::new(); | ||||
|                     for bsnirecord in bsnirecords { | ||||
|                         // Bootstrap TXT Record Format Version 0: | ||||
|                         // txt_version,min_version,max_version,nodeid,hostname,dialinfoshort* | ||||
|                         // | ||||
|                         // Split bootstrap node record by commas. Example: | ||||
|                         // 0,0,0,7lxDEabK_qgjbe38RtBa3IZLrud84P6NhGP-pRTZzdQ,bootstrap-1.dev.veilid.net,T5150,U5150,W5150/ws | ||||
|                         // All formats split on '|' character | ||||
|                         let records: Vec<String> = bsnirecord | ||||
|                             .trim() | ||||
|                             .split(',') | ||||
|                             .split('|') | ||||
|                             .map(|x| x.trim().to_owned()) | ||||
|                             .collect(); | ||||
|                         if records.len() < 6 { | ||||
|                             warn!("invalid number of fields in bootstrap txt record"); | ||||
|                             continue; | ||||
|                         } | ||||
|  | ||||
|                         // Bootstrap TXT record version | ||||
|                         let txt_version: u8 = match records[0].parse::<u8>() { | ||||
| @@ -86,81 +177,30 @@ impl RoutingTable { | ||||
|                                 continue; | ||||
|                             } | ||||
|                         }; | ||||
|                         if txt_version != BOOTSTRAP_TXT_VERSION { | ||||
|                             warn!("unsupported bootstrap txt record version"); | ||||
|                             continue; | ||||
|                         } | ||||
|  | ||||
|                         // Min/Max wire protocol version | ||||
|                         let min_version: u8 = match records[1].parse::<u8>() { | ||||
|                             Ok(v) => v, | ||||
|                             Err(e) => { | ||||
|                                 warn!( | ||||
|                                 "invalid min_version specified in bootstrap node txt record: {}", | ||||
|                                 e | ||||
|                             ); | ||||
|                                 continue; | ||||
|                             } | ||||
|                         }; | ||||
|                         let max_version: u8 = match records[2].parse::<u8>() { | ||||
|                             Ok(v) => v, | ||||
|                             Err(e) => { | ||||
|                                 warn!( | ||||
|                                 "invalid max_version specified in bootstrap node txt record: {}", | ||||
|                                 e | ||||
|                             ); | ||||
|                                 continue; | ||||
|                             } | ||||
|                         }; | ||||
|  | ||||
|                         // Node Id | ||||
|                         let node_id_str = &records[3]; | ||||
|                         let node_id_key = match PublicKey::try_decode(node_id_str) { | ||||
|                             Ok(v) => v, | ||||
|                             Err(e) => { | ||||
|                                 warn!( | ||||
|                                     "Invalid node id in bootstrap node record {}: {}", | ||||
|                                     node_id_str, e | ||||
|                                 ); | ||||
|                                 continue; | ||||
|                             } | ||||
|                         }; | ||||
|  | ||||
|                         // Hostname | ||||
|                         let hostname_str = &records[4]; | ||||
|  | ||||
|                         // If this is our own node id, then we skip it for bootstrap, in case we are a bootstrap node | ||||
|                         if self.node_id() == node_id_key { | ||||
|                             continue; | ||||
|                         } | ||||
|  | ||||
|                         // Resolve each record and store in node dial infos list | ||||
|                         let mut bootstrap_record = BootstrapRecord { | ||||
|                             min_version, | ||||
|                             max_version, | ||||
|                             dial_info_details: Vec::new(), | ||||
|                         }; | ||||
|                         for rec in &records[5..] { | ||||
|                             let rec = rec.trim(); | ||||
|                             let dial_infos = match DialInfo::try_vec_from_short(rec, hostname_str) { | ||||
|                                 Ok(dis) => dis, | ||||
|                                 Err(e) => { | ||||
|                                     warn!( | ||||
|                                         "Couldn't resolve bootstrap node dial info {}: {}", | ||||
|                                         rec, e | ||||
|                                     ); | ||||
|                                     continue; | ||||
|                         let bootstrap_record = match txt_version { | ||||
|                             BOOTSTRAP_TXT_VERSION_0 => { | ||||
|                                 match self.process_bootstrap_records_v0(records).await { | ||||
|                                     Err(e) => { | ||||
|                                         warn!( | ||||
|                                             "couldn't process v0 bootstrap records from {}: {}", | ||||
|                                             bsname, e | ||||
|                                         ); | ||||
|                                         continue; | ||||
|                                     } | ||||
|                                     Ok(Some(v)) => v, | ||||
|                                     Ok(None) => { | ||||
|                                         // skipping | ||||
|                                         continue; | ||||
|                                     } | ||||
|                                 } | ||||
|                             }; | ||||
|  | ||||
|                             for di in dial_infos { | ||||
|                                 bootstrap_record.dial_info_details.push(DialInfoDetail { | ||||
|                                     dial_info: di, | ||||
|                                     class: DialInfoClass::Direct, | ||||
|                                 }); | ||||
|                             } | ||||
|                         } | ||||
|                         bootstrap_records.push((node_id_key, bootstrap_record)); | ||||
|                             _ => { | ||||
|                                 warn!("unsupported bootstrap txt record version"); | ||||
|                                 continue; | ||||
|                             } | ||||
|                         }; | ||||
|  | ||||
|                         bootstrap_records.push(bootstrap_record); | ||||
|                     } | ||||
|                     Some(bootstrap_records) | ||||
|                 } | ||||
| @@ -168,21 +208,35 @@ impl RoutingTable { | ||||
|             ); | ||||
|         } | ||||
|  | ||||
|         let mut bsmap = BootstrapRecordMap::new(); | ||||
|         let mut merged_bootstrap_records: Vec<BootstrapRecord> = Vec::new(); | ||||
|         while let Some(bootstrap_records) = unord.next().await { | ||||
|             if let Some(bootstrap_records) = bootstrap_records { | ||||
|                 for (bskey, mut bsrec) in bootstrap_records { | ||||
|                     let rec = bsmap.entry(bskey).or_insert_with(|| BootstrapRecord { | ||||
|                         min_version: bsrec.min_version, | ||||
|                         max_version: bsrec.max_version, | ||||
|                         dial_info_details: Vec::new(), | ||||
|                     }); | ||||
|                     rec.dial_info_details.append(&mut bsrec.dial_info_details); | ||||
|             let Some(bootstrap_records) = bootstrap_records else { | ||||
|                 continue; | ||||
|             }; | ||||
|             for mut bsrec in bootstrap_records { | ||||
|                 let mut mbi = 0; | ||||
|                 while mbi < merged_bootstrap_records.len() { | ||||
|                     let mbr = &mut merged_bootstrap_records[mbi]; | ||||
|                     if mbr.node_ids.contains_any(&bsrec.node_ids) { | ||||
|                         // Merge record, pop this one out | ||||
|                         let mbr = merged_bootstrap_records.remove(mbi); | ||||
|                         bsrec.merge(&mbr); | ||||
|                     } else { | ||||
|                         // No overlap, go to next record | ||||
|                         mbi += 1; | ||||
|                     } | ||||
|                 } | ||||
|                 // Append merged record | ||||
|                 merged_bootstrap_records.push(bsrec); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         Ok(bsmap) | ||||
|         // ensure dial infos are sorted | ||||
|         for mbr in &mut merged_bootstrap_records { | ||||
|             mbr.dial_info_details.sort(); | ||||
|         } | ||||
|  | ||||
|         Ok(merged_bootstrap_records) | ||||
|     } | ||||
|  | ||||
|     // 'direct' bootstrap task routine for systems incapable of resolving TXT records, such as browser WASM | ||||
| @@ -203,14 +257,10 @@ impl RoutingTable { | ||||
|  | ||||
|             // Got peer info, let's add it to the routing table | ||||
|             for pi in peer_info { | ||||
|                 let k = pi.node_id.key; | ||||
|                 // Register the node | ||||
|                 if let Some(nr) = self.register_node_with_signed_node_info( | ||||
|                     RoutingDomain::PublicInternet, | ||||
|                     k, | ||||
|                     pi.signed_node_info, | ||||
|                     false, | ||||
|                 ) { | ||||
|                 if let Some(nr) = | ||||
|                     self.register_node_with_peer_info(RoutingDomain::PublicInternet, pi, false) | ||||
|                 { | ||||
|                     // Add this our futures to process in parallel | ||||
|                     let routing_table = self.clone(); | ||||
|                     unord.push( | ||||
| @@ -230,90 +280,62 @@ impl RoutingTable { | ||||
|  | ||||
|     #[instrument(level = "trace", skip(self), err)] | ||||
|     pub(crate) async fn bootstrap_task_routine(self, stop_token: StopToken) -> EyreResult<()> { | ||||
|         let (bootstrap, bootstrap_nodes) = self.with_config(|c| { | ||||
|             ( | ||||
|                 c.network.bootstrap.clone(), | ||||
|                 c.network.bootstrap_nodes.clone(), | ||||
|             ) | ||||
|         }); | ||||
|         let bootstrap = self | ||||
|             .unlocked_inner | ||||
|             .with_config(|c| c.network.routing_table.bootstrap.clone()); | ||||
|  | ||||
|         // Don't bother if bootstraps aren't configured | ||||
|         if bootstrap.is_empty() { | ||||
|             return Ok(()); | ||||
|         } | ||||
|  | ||||
|         log_rtab!(debug "--- bootstrap_task"); | ||||
|  | ||||
|         // See if we are specifying a direct dialinfo for bootstrap, if so use the direct mechanism | ||||
|         if !bootstrap.is_empty() && bootstrap_nodes.is_empty() { | ||||
|             let mut bootstrap_dialinfos = Vec::<DialInfo>::new(); | ||||
|             for b in &bootstrap { | ||||
|                 if let Ok(bootstrap_di_vec) = DialInfo::try_vec_from_url(&b) { | ||||
|                     for bootstrap_di in bootstrap_di_vec { | ||||
|                         bootstrap_dialinfos.push(bootstrap_di); | ||||
|                     } | ||||
|         let mut bootstrap_dialinfos = Vec::<DialInfo>::new(); | ||||
|         for b in &bootstrap { | ||||
|             if let Ok(bootstrap_di_vec) = DialInfo::try_vec_from_url(&b) { | ||||
|                 for bootstrap_di in bootstrap_di_vec { | ||||
|                     bootstrap_dialinfos.push(bootstrap_di); | ||||
|                 } | ||||
|             } | ||||
|             if bootstrap_dialinfos.len() > 0 { | ||||
|                 return self | ||||
|                     .direct_bootstrap_task_routine(stop_token, bootstrap_dialinfos) | ||||
|                     .await; | ||||
|             } | ||||
|         } | ||||
|         if bootstrap_dialinfos.len() > 0 { | ||||
|             return self | ||||
|                 .direct_bootstrap_task_routine(stop_token, bootstrap_dialinfos) | ||||
|                 .await; | ||||
|         } | ||||
|  | ||||
|         // If we aren't specifying a bootstrap node list explicitly, then pull from the bootstrap server(s) | ||||
|         let bsmap: BootstrapRecordMap = if !bootstrap_nodes.is_empty() { | ||||
|             let mut bsmap = BootstrapRecordMap::new(); | ||||
|             let mut bootstrap_node_dial_infos = Vec::new(); | ||||
|             for b in bootstrap_nodes { | ||||
|                 let (id_str, di_str) = b | ||||
|                     .split_once('@') | ||||
|                     .ok_or_else(|| eyre!("Invalid node dial info in bootstrap entry"))?; | ||||
|                 let node_id = | ||||
|                     NodeId::from_str(id_str).wrap_err("Invalid node id in bootstrap entry")?; | ||||
|                 let dial_info = | ||||
|                     DialInfo::from_str(di_str).wrap_err("Invalid dial info in bootstrap entry")?; | ||||
|                 bootstrap_node_dial_infos.push((node_id, dial_info)); | ||||
|             } | ||||
|             for (node_id, dial_info) in bootstrap_node_dial_infos { | ||||
|                 bsmap | ||||
|                     .entry(node_id.key) | ||||
|                     .or_insert_with(|| BootstrapRecord { | ||||
|                         min_version: MIN_CRYPTO_VERSION, | ||||
|                         max_version: MAX_CRYPTO_VERSION, | ||||
|                         dial_info_details: Vec::new(), | ||||
|                     }) | ||||
|                     .dial_info_details | ||||
|                     .push(DialInfoDetail { | ||||
|                         dial_info, | ||||
|                         class: DialInfoClass::Direct, // Bootstraps are always directly reachable | ||||
|                     }); | ||||
|             } | ||||
|             bsmap | ||||
|         } else { | ||||
|             // Resolve bootstrap servers and recurse their TXT entries | ||||
|             self.resolve_bootstrap(bootstrap).await? | ||||
|         }; | ||||
|  | ||||
|         // Map all bootstrap entries to a single key with multiple dialinfo | ||||
|         // If not direct, resolve bootstrap servers and recurse their TXT entries | ||||
|         let bsrecs = self.resolve_bootstrap(bootstrap).await?; | ||||
|  | ||||
|         // Run all bootstrap operations concurrently | ||||
|         let mut unord = FuturesUnordered::new(); | ||||
|         for (k, mut v) in bsmap { | ||||
|             // Sort dial info so we get the preferred order correct | ||||
|             v.dial_info_details.sort(); | ||||
|         for bsrec in bsrecs { | ||||
|             log_rtab!( | ||||
|                 "--- bootstrapping {} with {:?}", | ||||
|                 &bsrec.node_ids, | ||||
|                 &bsrec.dial_info_details | ||||
|             ); | ||||
|  | ||||
|             log_rtab!("--- bootstrapping {} with {:?}", k.encode(), &v); | ||||
|             // Get crypto support from list of node ids | ||||
|             let crypto_support = bsrec.node_ids.kinds(); | ||||
|  | ||||
|             // Make invalid signed node info (no signature) | ||||
|             if let Some(nr) = self.register_node_with_signed_node_info( | ||||
|                 RoutingDomain::PublicInternet, | ||||
|                 k, | ||||
|                 SignedNodeInfo::Direct(SignedDirectNodeInfo::with_no_signature(NodeInfo { | ||||
|                     network_class: NetworkClass::InboundCapable, // Bootstraps are always inbound capable | ||||
|                     outbound_protocols: ProtocolTypeSet::only(ProtocolType::UDP), // Bootstraps do not participate in relaying and will not make outbound requests, but will have UDP enabled | ||||
|                     address_types: AddressTypeSet::all(), // Bootstraps are always IPV4 and IPV6 capable | ||||
|                     min_version: v.min_version, // Minimum crypto version specified in txt record | ||||
|                     max_version: v.max_version, // Maximum crypto version specified in txt record | ||||
|                     dial_info_detail_list: v.dial_info_details, // Dial info is as specified in the bootstrap list | ||||
|                 })), | ||||
|                 true, | ||||
|             ) { | ||||
|             // Make unsigned SignedNodeInfo | ||||
|             let sni = SignedNodeInfo::Direct(SignedDirectNodeInfo::with_no_signature(NodeInfo { | ||||
|                 network_class: NetworkClass::InboundCapable, // Bootstraps are always inbound capable | ||||
|                 outbound_protocols: ProtocolTypeSet::only(ProtocolType::UDP), // Bootstraps do not participate in relaying and will not make outbound requests, but will have UDP enabled | ||||
|                 address_types: AddressTypeSet::all(), // Bootstraps are always IPV4 and IPV6 capable | ||||
|                 envelope_support: bsrec.envelope_support, // Envelope support is as specified in the bootstrap list | ||||
|                 crypto_support, // Crypto support is derived from list of node ids | ||||
|                 dial_info_detail_list: bsrec.dial_info_details, // Dial info is as specified in the bootstrap list | ||||
|             })); | ||||
|  | ||||
|             let pi = PeerInfo::new(bsrec.node_ids, sni); | ||||
|  | ||||
|             if let Some(nr) = | ||||
|                 self.register_node_with_peer_info(RoutingDomain::PublicInternet, pi, true) | ||||
|             { | ||||
|                 // Add this our futures to process in parallel | ||||
|                 let routing_table = self.clone(); | ||||
|                 unord.push( | ||||
|   | ||||
| @@ -10,12 +10,13 @@ impl RoutingTable { | ||||
|         _last_ts: Timestamp, | ||||
|         cur_ts: Timestamp, | ||||
|     ) -> EyreResult<()> { | ||||
|         let kick_queue: Vec<usize> = core::mem::take(&mut *self.unlocked_inner.kick_queue.lock()) | ||||
|             .into_iter() | ||||
|             .collect(); | ||||
|         let kick_queue: Vec<(CryptoKind, usize)> = | ||||
|             core::mem::take(&mut *self.unlocked_inner.kick_queue.lock()) | ||||
|                 .into_iter() | ||||
|                 .collect(); | ||||
|         let mut inner = self.inner.write(); | ||||
|         for idx in kick_queue { | ||||
|             inner.kick_bucket(idx) | ||||
|         for (ck, idx) in kick_queue { | ||||
|             inner.kick_bucket(ck, idx) | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user