checkpoint
This commit is contained in:
@@ -2,18 +2,28 @@ use super::*;
|
||||
use core::sync::atomic::Ordering;
|
||||
use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
|
||||
|
||||
/// Routing Table Bucket
|
||||
/// Stores map of public keys to entries, which may be in multiple routing tables per crypto kind
|
||||
/// Keeps entries at a particular 'dht distance' from this cryptokind's node id
|
||||
/// Helps to keep managed lists at particular distances so we can evict nodes by priority
|
||||
/// where the priority comes from liveness and age of the entry (older is better)
|
||||
pub struct Bucket {
|
||||
/// handle to the routing table
|
||||
routing_table: RoutingTable,
|
||||
entries: BTreeMap<PublicKey, Arc<BucketEntry>>,
|
||||
newest_entry: Option<PublicKey>,
|
||||
/// Map of keys to entries for this bucket
|
||||
entries: BTreeMap<TypedKey, Arc<BucketEntry>>,
|
||||
/// The most recent entry in this bucket
|
||||
newest_entry: Option<TypedKey>,
|
||||
/// The crypto kind in use for the public keys in this bucket
|
||||
kind: CryptoKind,
|
||||
}
|
||||
pub(super) type EntriesIter<'a> =
|
||||
alloc::collections::btree_map::Iter<'a, PublicKey, Arc<BucketEntry>>;
|
||||
alloc::collections::btree_map::Iter<'a, TypedKey, Arc<BucketEntry>>;
|
||||
|
||||
#[derive(Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
|
||||
#[archive_attr(repr(C), derive(CheckBytes))]
|
||||
struct SerializedBucketEntryData {
|
||||
key: PublicKey,
|
||||
key: TypedKey,
|
||||
value: u32, // index into serialized entries list
|
||||
}
|
||||
|
||||
@@ -21,7 +31,7 @@ struct SerializedBucketEntryData {
|
||||
#[archive_attr(repr(C), derive(CheckBytes))]
|
||||
struct SerializedBucketData {
|
||||
entries: Vec<SerializedBucketEntryData>,
|
||||
newest_entry: Option<PublicKey>,
|
||||
newest_entry: Option<TypedKey>,
|
||||
}
|
||||
|
||||
fn state_ordering(state: BucketEntryState) -> usize {
|
||||
@@ -33,11 +43,12 @@ fn state_ordering(state: BucketEntryState) -> usize {
|
||||
}
|
||||
|
||||
impl Bucket {
|
||||
pub fn new(routing_table: RoutingTable) -> Self {
|
||||
pub fn new(routing_table: RoutingTable, kind: CryptoKind) -> Self {
|
||||
Self {
|
||||
routing_table,
|
||||
entries: BTreeMap::new(),
|
||||
newest_entry: None,
|
||||
kind,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -83,22 +94,43 @@ impl Bucket {
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
pub(super) fn add_entry(&mut self, node_id: PublicKey) -> NodeRef {
|
||||
log_rtab!("Node added: {}", node_id.encode());
|
||||
/// Create a new entry with a node_id of this crypto kind and return it
|
||||
pub(super) fn add_entry(&mut self, node_id: TypedKey) -> NodeRef {
|
||||
assert_eq!(node_id.kind, self.kind);
|
||||
|
||||
log_rtab!("Node added: {}", node_id);
|
||||
|
||||
// Add new entry
|
||||
self.entries.insert(node_id, Arc::new(BucketEntry::new()));
|
||||
let entry = Arc::new(BucketEntry::new());
|
||||
entry.with_mut_inner(|e| e.add_node_id(node_id));
|
||||
self.entries.insert(node_id.key, entry.clone());
|
||||
|
||||
// This is now the newest bucket entry
|
||||
self.newest_entry = Some(node_id);
|
||||
self.newest_entry = Some(node_id.key);
|
||||
|
||||
// Get a node ref to return
|
||||
let entry = self.entries.get(&node_id).unwrap().clone();
|
||||
NodeRef::new(self.routing_table.clone(), node_id, entry, None)
|
||||
// Get a node ref to return since this is new
|
||||
NodeRef::new(self.routing_table.clone(), entry, None)
|
||||
}
|
||||
|
||||
pub(super) fn remove_entry(&mut self, node_id: &PublicKey) {
|
||||
log_rtab!("Node removed: {}", node_id);
|
||||
/// Add an existing entry with a new node_id for this crypto kind
|
||||
pub(super) fn add_existing_entry(&mut self, node_id: TypedKey, entry: Arc<BucketEntry>) {
|
||||
assert_eq!(node_id.kind, self.kind);
|
||||
|
||||
log_rtab!("Existing node added: {}", node_id);
|
||||
|
||||
// Add existing entry
|
||||
entry.with_mut_inner(|e| e.add_node_id(node_id));
|
||||
self.entries.insert(node_id.key, entry);
|
||||
|
||||
// This is now the newest bucket entry
|
||||
self.newest_entry = Some(node_id.key);
|
||||
|
||||
// No need to return a noderef here because the noderef will already exist in the caller
|
||||
}
|
||||
|
||||
/// Remove an entry with a node_id for this crypto kind from the bucket
|
||||
fn remove_entry(&mut self, node_id: &TypedKey) {
|
||||
log_rtab!("Node removed: {}:{}", self.kind, node_id);
|
||||
|
||||
// Remove the entry
|
||||
self.entries.remove(node_id);
|
||||
@@ -106,7 +138,7 @@ impl Bucket {
|
||||
// newest_entry is updated by kick_bucket()
|
||||
}
|
||||
|
||||
pub(super) fn entry(&self, key: &PublicKey) -> Option<Arc<BucketEntry>> {
|
||||
pub(super) fn entry(&self, key: &TypedKey) -> Option<Arc<BucketEntry>> {
|
||||
self.entries.get(key).cloned()
|
||||
}
|
||||
|
||||
@@ -114,7 +146,7 @@ impl Bucket {
|
||||
self.entries.iter()
|
||||
}
|
||||
|
||||
pub(super) fn kick(&mut self, bucket_depth: usize) -> Option<BTreeSet<PublicKey>> {
|
||||
pub(super) fn kick(&mut self, bucket_depth: usize) -> Option<BTreeSet<TypedKey>> {
|
||||
// Get number of entries to attempt to purge from bucket
|
||||
let bucket_len = self.entries.len();
|
||||
|
||||
@@ -124,11 +156,11 @@ impl Bucket {
|
||||
}
|
||||
|
||||
// Try to purge the newest entries that overflow the bucket
|
||||
let mut dead_node_ids: BTreeSet<PublicKey> = BTreeSet::new();
|
||||
let mut dead_node_ids: BTreeSet<TypedKey> = BTreeSet::new();
|
||||
let mut extra_entries = bucket_len - bucket_depth;
|
||||
|
||||
// Get the sorted list of entries by their kick order
|
||||
let mut sorted_entries: Vec<(PublicKey, Arc<BucketEntry>)> = self
|
||||
let mut sorted_entries: Vec<(TypedKey, Arc<BucketEntry>)> = self
|
||||
.entries
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), v.clone()))
|
||||
|
@@ -68,22 +68,12 @@ pub struct BucketEntryLocalNetwork {
|
||||
node_status: Option<LocalNetworkNodeStatus>,
|
||||
}
|
||||
|
||||
/// A range of cryptography versions supported by this entry
|
||||
#[derive(Copy, Clone, Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
|
||||
#[archive_attr(repr(C), derive(CheckBytes))]
|
||||
pub struct VersionRange {
|
||||
/// The minimum cryptography version supported by this entry
|
||||
pub min: u8,
|
||||
/// The maximum cryptography version supported by this entry
|
||||
pub max: u8,
|
||||
}
|
||||
|
||||
/// The data associated with each bucket entry
|
||||
#[derive(Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
|
||||
#[archive_attr(repr(C), derive(CheckBytes))]
|
||||
pub struct BucketEntryInner {
|
||||
/// The node ids matching this bucket entry, with the cryptography versions supported by this node as the 'kind' field
|
||||
node_ids: Vec<TypedKey>,
|
||||
node_ids: TypedKeySet,
|
||||
/// The set of envelope versions supported by the node inclusive of the requirements of any relay the node may be using
|
||||
envelope_support: Vec<u8>,
|
||||
/// If this node has updated it's SignedNodeInfo since our network
|
||||
@@ -133,9 +123,12 @@ impl BucketEntryInner {
|
||||
}
|
||||
|
||||
// Node ids
|
||||
pub fn node_ids(&self) -> Vec<TypedKey> {
|
||||
pub fn node_ids(&self) -> TypedKeySet {
|
||||
self.node_ids.clone()
|
||||
}
|
||||
pub fn add_node_id(&mut self, node_id: TypedKey) {
|
||||
self.node_ids.add(node_id);
|
||||
}
|
||||
|
||||
// Less is faster
|
||||
pub fn cmp_fastest(e1: &Self, e2: &Self) -> std::cmp::Ordering {
|
||||
@@ -453,12 +446,23 @@ impl BucketEntryInner {
|
||||
out
|
||||
}
|
||||
|
||||
pub fn set_min_max_version(&mut self, min_max_version: VersionRange) {
|
||||
self.min_max_version = Some(min_max_version);
|
||||
pub fn add_envelope_version(&mut self, envelope_version: u8) {
|
||||
if self.envelope_support.contains(&envelope_version) {
|
||||
return;
|
||||
}
|
||||
self.envelope_support.push(envelope_version);
|
||||
self.envelope_support.dedup();
|
||||
self.envelope_support.sort();
|
||||
}
|
||||
|
||||
pub fn min_max_version(&self) -> Option<VersionRange> {
|
||||
self.min_max_version
|
||||
pub fn set_envelope_support(&mut self, envelope_support: Vec<u8>) {
|
||||
envelope_support.dedup();
|
||||
envelope_support.sort();
|
||||
self.envelope_support = envelope_support;
|
||||
}
|
||||
|
||||
pub fn envelope_support(&self) -> Vec<u8> {
|
||||
self.envelope_support.clone()
|
||||
}
|
||||
|
||||
pub fn state(&self, cur_ts: Timestamp) -> BucketEntryState {
|
||||
@@ -757,7 +761,8 @@ impl BucketEntry {
|
||||
Self {
|
||||
ref_count: AtomicU32::new(0),
|
||||
inner: RwLock::new(BucketEntryInner {
|
||||
min_max_version: None,
|
||||
node_ids: TypedKeySet::new(),
|
||||
envelope_support: Vec::new(),
|
||||
updated_since_last_network_change: false,
|
||||
last_connections: BTreeMap::new(),
|
||||
local_network: BucketEntryLocalNetwork {
|
||||
|
@@ -113,7 +113,7 @@ impl RoutingTable {
|
||||
let mut cnt = 0;
|
||||
out += &format!("Entries: {}\n", inner.bucket_entry_count);
|
||||
while b < blen {
|
||||
let filtered_entries: Vec<(&PublicKey, &Arc<BucketEntry>)> = inner.buckets[b]
|
||||
let filtered_entries: Vec<(&TypedKey, &Arc<BucketEntry>)> = inner.buckets[b]
|
||||
.entries()
|
||||
.filter(|e| {
|
||||
let state = e.1.with(inner, |_rti, e| e.state(cur_ts));
|
||||
@@ -149,7 +149,7 @@ impl RoutingTable {
|
||||
out
|
||||
}
|
||||
|
||||
pub(crate) fn debug_info_entry(&self, node_id: PublicKey) -> String {
|
||||
pub(crate) fn debug_info_entry(&self, node_id: TypedKey) -> String {
|
||||
let mut out = String::new();
|
||||
out += &format!("Entry {:?}:\n", node_id);
|
||||
if let Some(nr) = self.lookup_node_ref(node_id) {
|
||||
|
@@ -49,7 +49,7 @@ pub struct LowLevelPortInfo {
|
||||
pub protocol_to_port: ProtocolToPortMapping,
|
||||
}
|
||||
pub type RoutingTableEntryFilter<'t> =
|
||||
Box<dyn FnMut(&RoutingTableInner, PublicKey, Option<Arc<BucketEntry>>) -> bool + Send + 't>;
|
||||
Box<dyn FnMut(&RoutingTableInner, Option<Arc<BucketEntry>>) -> bool + Send + 't>;
|
||||
|
||||
#[derive(Clone, Debug, Default, Eq, PartialEq)]
|
||||
pub struct RoutingTableHealth {
|
||||
@@ -110,7 +110,7 @@ impl RoutingTableUnlockedInner {
|
||||
f(&*self.config.get())
|
||||
}
|
||||
|
||||
pub fn node_id(&self, kind: CryptoKind) -> PublicKey {
|
||||
pub fn node_id(&self, kind: CryptoKind) -> TypedKey {
|
||||
self.node_id_keypairs.get(&kind).unwrap().key
|
||||
}
|
||||
|
||||
@@ -192,6 +192,27 @@ impl RoutingTable {
|
||||
this
|
||||
}
|
||||
|
||||
/////////////////////////////////////
|
||||
/// Unlocked passthrough
|
||||
pub fn network_manager(&self) -> NetworkManager {
|
||||
self.unlocked_inner.network_manager()
|
||||
}
|
||||
pub fn crypto(&self) -> Crypto {
|
||||
self.unlocked_inner.crypto()
|
||||
}
|
||||
pub fn rpc_processor(&self) -> RPCProcessor {
|
||||
self.unlocked_inner.rpc_processor()
|
||||
}
|
||||
pub fn node_id(&self, kind: CryptoKind) -> TypedKey {
|
||||
self.unlocked_inner.node_id(kind)
|
||||
}
|
||||
pub fn node_id_secret(&self, kind: CryptoKind) -> SecretKey {
|
||||
self.unlocked_inner.node_id_secret(kind)
|
||||
}
|
||||
pub fn matches_own_node_id(&self, node_ids: &[TypedKey]) -> bool {
|
||||
self.unlocked_inner.matches_own_node_id(node_ids)
|
||||
}
|
||||
|
||||
/////////////////////////////////////
|
||||
/// Initialization
|
||||
|
||||
@@ -556,14 +577,14 @@ impl RoutingTable {
|
||||
}
|
||||
|
||||
/// Resolve an existing routing table entry and return a reference to it
|
||||
pub fn lookup_node_ref(&self, node_id: PublicKey) -> Option<NodeRef> {
|
||||
pub fn lookup_node_ref(&self, node_id: TypedKey) -> Option<NodeRef> {
|
||||
self.inner.read().lookup_node_ref(self.clone(), node_id)
|
||||
}
|
||||
|
||||
/// Resolve an existing routing table entry and return a filtered reference to it
|
||||
pub fn lookup_and_filter_noderef(
|
||||
&self,
|
||||
node_id: PublicKey,
|
||||
node_id: TypedKey,
|
||||
routing_domain_set: RoutingDomainSet,
|
||||
dial_info_filter: DialInfoFilter,
|
||||
) -> Option<NodeRef> {
|
||||
@@ -596,7 +617,7 @@ impl RoutingTable {
|
||||
/// and add the last peer address we have for it, since that's pretty common
|
||||
pub fn register_node_with_existing_connection(
|
||||
&self,
|
||||
node_id: PublicKey,
|
||||
node_id: TypedKey,
|
||||
descriptor: ConnectionDescriptor,
|
||||
timestamp: Timestamp,
|
||||
) -> Option<NodeRef> {
|
||||
@@ -615,7 +636,7 @@ impl RoutingTable {
|
||||
self.inner.read().get_routing_table_health()
|
||||
}
|
||||
|
||||
pub fn get_recent_peers(&self) -> Vec<(PublicKey, RecentPeersEntry)> {
|
||||
pub fn get_recent_peers(&self) -> Vec<(TypedKey, RecentPeersEntry)> {
|
||||
let mut recent_peers = Vec::new();
|
||||
let mut dead_peers = Vec::new();
|
||||
let mut out = Vec::new();
|
||||
@@ -654,7 +675,7 @@ impl RoutingTable {
|
||||
out
|
||||
}
|
||||
|
||||
pub fn touch_recent_peer(&self, node_id: PublicKey, last_connection: ConnectionDescriptor) {
|
||||
pub fn touch_recent_peer(&self, node_id: TypedKey, last_connection: ConnectionDescriptor) {
|
||||
self.inner
|
||||
.write()
|
||||
.touch_recent_peer(node_id, last_connection)
|
||||
@@ -703,7 +724,7 @@ impl RoutingTable {
|
||||
dial_info_filter: DialInfoFilter,
|
||||
) -> RoutingTableEntryFilter<'a> {
|
||||
// does it have matching public dial info?
|
||||
Box::new(move |rti, _k, e| {
|
||||
Box::new(move |rti, e| {
|
||||
if let Some(e) = e {
|
||||
e.with(rti, |_rti, e| {
|
||||
if let Some(ni) = e.node_info(routing_domain) {
|
||||
@@ -731,7 +752,7 @@ impl RoutingTable {
|
||||
dial_info: DialInfo,
|
||||
) -> RoutingTableEntryFilter<'a> {
|
||||
// does the node's outbound capabilities match the dialinfo?
|
||||
Box::new(move |rti, _k, e| {
|
||||
Box::new(move |rti, e| {
|
||||
if let Some(e) = e {
|
||||
e.with(rti, |_rti, e| {
|
||||
if let Some(ni) = e.node_info(routing_domain) {
|
||||
@@ -774,8 +795,8 @@ impl RoutingTable {
|
||||
let mut nodes_proto_v6 = vec![0usize, 0usize, 0usize, 0usize];
|
||||
|
||||
let filter = Box::new(
|
||||
move |rti: &RoutingTableInner, _k: PublicKey, v: Option<Arc<BucketEntry>>| {
|
||||
let entry = v.unwrap();
|
||||
move |rti: &RoutingTableInner, entry: Option<Arc<BucketEntry>>| {
|
||||
let entry = entry.unwrap();
|
||||
entry.with(rti, |_rti, e| {
|
||||
// skip nodes on our local network here
|
||||
if e.has_node_info(RoutingDomain::LocalNetwork.into()) {
|
||||
@@ -821,8 +842,8 @@ impl RoutingTable {
|
||||
self.find_fastest_nodes(
|
||||
protocol_types_len * 2 * max_per_type,
|
||||
filters,
|
||||
|_rti, k: PublicKey, v: Option<Arc<BucketEntry>>| {
|
||||
NodeRef::new(self.clone(), k, v.unwrap().clone(), None)
|
||||
|_rti, entry: Option<Arc<BucketEntry>>| {
|
||||
NodeRef::new(self.clone(), entry.unwrap().clone(), None)
|
||||
},
|
||||
)
|
||||
}
|
||||
@@ -838,10 +859,10 @@ impl RoutingTable {
|
||||
where
|
||||
C: for<'a, 'b> FnMut(
|
||||
&'a RoutingTableInner,
|
||||
&'b (PublicKey, Option<Arc<BucketEntry>>),
|
||||
&'b (PublicKey, Option<Arc<BucketEntry>>),
|
||||
&'b Option<Arc<BucketEntry>>,
|
||||
&'b Option<Arc<BucketEntry>>,
|
||||
) -> core::cmp::Ordering,
|
||||
T: for<'r> FnMut(&'r RoutingTableInner, PublicKey, Option<Arc<BucketEntry>>) -> O + Send,
|
||||
T: for<'r> FnMut(&'r RoutingTableInner, Option<Arc<BucketEntry>>) -> O + Send,
|
||||
{
|
||||
self.inner
|
||||
.read()
|
||||
@@ -855,7 +876,7 @@ impl RoutingTable {
|
||||
transform: T,
|
||||
) -> Vec<O>
|
||||
where
|
||||
T: for<'r> FnMut(&'r RoutingTableInner, PublicKey, Option<Arc<BucketEntry>>) -> O + Send,
|
||||
T: for<'r> FnMut(&'r RoutingTableInner, Option<Arc<BucketEntry>>) -> O + Send,
|
||||
{
|
||||
self.inner
|
||||
.read()
|
||||
@@ -864,12 +885,12 @@ impl RoutingTable {
|
||||
|
||||
pub fn find_closest_nodes<'a, T, O>(
|
||||
&self,
|
||||
node_id: PublicKey,
|
||||
node_id: TypedKey,
|
||||
filters: VecDeque<RoutingTableEntryFilter>,
|
||||
transform: T,
|
||||
) -> Vec<O>
|
||||
where
|
||||
T: for<'r> FnMut(&'r RoutingTableInner, PublicKey, Option<Arc<BucketEntry>>) -> O + Send,
|
||||
T: for<'r> FnMut(&'r RoutingTableInner, Option<Arc<BucketEntry>>) -> O + Send,
|
||||
{
|
||||
self.inner
|
||||
.read()
|
||||
@@ -895,7 +916,7 @@ impl RoutingTable {
|
||||
pub async fn find_node(
|
||||
&self,
|
||||
node_ref: NodeRef,
|
||||
node_id: PublicKey,
|
||||
node_id: TypedKey,
|
||||
) -> EyreResult<NetworkResult<Vec<NodeRef>>> {
|
||||
let rpc_processor = self.rpc_processor();
|
||||
|
||||
@@ -1021,7 +1042,7 @@ impl RoutingTable {
|
||||
// Go through all entries and find fastest entry that matches filter function
|
||||
let inner = self.inner.read();
|
||||
let inner = &*inner;
|
||||
let mut best_inbound_relay: Option<(PublicKey, Arc<BucketEntry>)> = None;
|
||||
let mut best_inbound_relay: Option<(TypedKey, Arc<BucketEntry>)> = None;
|
||||
|
||||
// Iterate all known nodes for candidates
|
||||
inner.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, k, v| {
|
||||
@@ -1054,6 +1075,6 @@ impl RoutingTable {
|
||||
Option::<()>::None
|
||||
});
|
||||
// Return the best inbound relay noderef
|
||||
best_inbound_relay.map(|(k, e)| NodeRef::new(self.clone(), k, e, None))
|
||||
best_inbound_relay.map(|(k, e)| NodeRef::new(self.clone(), e, None))
|
||||
}
|
||||
}
|
||||
|
@@ -98,7 +98,7 @@ pub trait NodeRefBase: Sized {
|
||||
fn routing_table(&self) -> RoutingTable {
|
||||
self.common().routing_table.clone()
|
||||
}
|
||||
fn node_ids(&self) -> Vec<TypedKey> {
|
||||
fn node_ids(&self) -> TypedKeySet {
|
||||
self.operate(|_rti, e| e.node_ids())
|
||||
}
|
||||
fn has_updated_since_last_network_change(&self) -> bool {
|
||||
@@ -112,11 +112,14 @@ pub trait NodeRefBase: Sized {
|
||||
e.update_node_status(node_status);
|
||||
});
|
||||
}
|
||||
fn min_max_version(&self) -> Option<VersionRange> {
|
||||
self.operate(|_rti, e| e.min_max_version())
|
||||
fn envelope_support(&self) -> Vec<u8> {
|
||||
self.operate(|_rti, e| e.envelope_support())
|
||||
}
|
||||
fn set_min_max_version(&self, min_max_version: VersionRange) {
|
||||
self.operate_mut(|_rti, e| e.set_min_max_version(min_max_version))
|
||||
fn add_envelope_version(&self, envelope_version: u8) {
|
||||
self.operate_mut(|_rti, e| e.add_envelope_version(envelope_version))
|
||||
}
|
||||
fn set_envelope_support(&self, envelope_support: Vec<u8>) {
|
||||
self.operate_mut(|_rti, e| e.set_envelope_support(envelope_support))
|
||||
}
|
||||
fn state(&self, cur_ts: Timestamp) -> BucketEntryState {
|
||||
self.operate(|_rti, e| e.state(cur_ts))
|
||||
|
@@ -117,14 +117,14 @@ impl PrivateRoute {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn first_hop_node_id(&self) -> Option<PublicKey> {
|
||||
pub fn first_hop_node_id(&self) -> Option<TypedKey> {
|
||||
let PrivateRouteHops::FirstHop(pr_first_hop) = &self.hops else {
|
||||
return None;
|
||||
};
|
||||
|
||||
// Get the safety route to use from the spec
|
||||
Some(match &pr_first_hop.node {
|
||||
RouteNode::NodeId(n) => n.key,
|
||||
RouteNode::NodeId(n) => n,
|
||||
RouteNode::PeerInfo(p) => p.node_id.key,
|
||||
})
|
||||
}
|
||||
@@ -162,13 +162,13 @@ pub enum SafetyRouteHops {
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct SafetyRoute {
|
||||
pub public_key: PublicKey,
|
||||
pub public_key: TypedKey,
|
||||
pub hop_count: u8,
|
||||
pub hops: SafetyRouteHops,
|
||||
}
|
||||
|
||||
impl SafetyRoute {
|
||||
pub fn new_stub(public_key: PublicKey, private_route: PrivateRoute) -> Self {
|
||||
pub fn new_stub(public_key: TypedKey, private_route: PrivateRoute) -> Self {
|
||||
// First hop should have already been popped off for stubbed safety routes since
|
||||
// we are sending directly to the first hop
|
||||
assert!(matches!(private_route.hops, PrivateRouteHops::Data(_)));
|
||||
|
@@ -10,13 +10,13 @@ pub enum ContactMethod {
|
||||
/// Contact the node directly
|
||||
Direct(DialInfo),
|
||||
/// Request via signal the node connect back directly (relay, target)
|
||||
SignalReverse(PublicKey, PublicKey),
|
||||
SignalReverse(TypedKey, TypedKey),
|
||||
/// Request via signal the node negotiate a hole punch (relay, target_node)
|
||||
SignalHolePunch(PublicKey, PublicKey),
|
||||
SignalHolePunch(TypedKey, TypedKey),
|
||||
/// Must use an inbound relay to reach the node
|
||||
InboundRelay(PublicKey),
|
||||
InboundRelay(TypedKey),
|
||||
/// Must use outbound relay to reach the node
|
||||
OutboundRelay(PublicKey),
|
||||
OutboundRelay(TypedKey),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@@ -28,7 +28,7 @@ pub struct RoutingTableInner {
|
||||
/// Statistics about the total bandwidth to/from this node
|
||||
pub(super) self_transfer_stats: TransferStatsDownUp,
|
||||
/// Peers we have recently communicated with
|
||||
pub(super) recent_peers: LruCache<PublicKey, RecentPeersEntry>,
|
||||
pub(super) recent_peers: LruCache<TypedKey, RecentPeersEntry>,
|
||||
/// Storage for private/safety RouteSpecs
|
||||
pub(super) route_spec_store: Option<RouteSpecStore>,
|
||||
}
|
||||
@@ -416,7 +416,7 @@ impl RoutingTableInner {
|
||||
|
||||
pub fn with_entries<
|
||||
T,
|
||||
F: FnMut(&RoutingTableInner, PublicKey, Arc<BucketEntry>) -> Option<T>,
|
||||
F: FnMut(&RoutingTableInner, TypedKey, Arc<BucketEntry>) -> Option<T>,
|
||||
>(
|
||||
&self,
|
||||
cur_ts: Timestamp,
|
||||
@@ -442,7 +442,7 @@ impl RoutingTableInner {
|
||||
|
||||
pub fn with_entries_mut<
|
||||
T,
|
||||
F: FnMut(&mut RoutingTableInner, PublicKey, Arc<BucketEntry>) -> Option<T>,
|
||||
F: FnMut(&mut RoutingTableInner, TypedKey, Arc<BucketEntry>) -> Option<T>,
|
||||
>(
|
||||
&mut self,
|
||||
cur_ts: Timestamp,
|
||||
@@ -537,66 +537,87 @@ impl RoutingTableInner {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Look up all bucket entries and make sure we only have zero or one
|
||||
// If we have more than one, pick the one with the best cryptokind to add node ids to
|
||||
let mut best_entry: Option<Arc<BucketEntry>> = None;
|
||||
for node_id in node_ids {
|
||||
if let Some((kind, idx)) = self.unlocked_inner.find_bucket_index(*node_id) {
|
||||
let bucket = &self.buckets[&kind][idx];
|
||||
if let Some(entry) = bucket.entry(&node_id.key) {
|
||||
// Best entry is the first one in sorted order that exists from the node id list
|
||||
// Everything else that matches will be overwritten in the bucket and the
|
||||
// existing noderefs will eventually unref and drop the old unindexed bucketentry
|
||||
// We do this instead of merging for now. We could 'kill' entries and have node_refs
|
||||
// rewrite themselves to point to the merged entry upon dereference. The use case for this
|
||||
// may not be worth the effort.
|
||||
best_entry = Some(entry);
|
||||
break;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// If the entry does exist already, update it
|
||||
if let Some(best_entry) = best_entry {
|
||||
let nr = best_entry.map(|e| NodeRef::new(outer_self.clone(), best_entry, None));
|
||||
|
||||
// Update the entry with all of the node ids
|
||||
nr.update_node_ids(node_ids);
|
||||
|
||||
// Update the entry with the update func
|
||||
nr.operate_mut(|rti, e| update_func(rti, e));
|
||||
|
||||
return Some(nr);
|
||||
}
|
||||
|
||||
// Find a bucket for the first node id crypto kind we can handle
|
||||
let (node_id, kind, idx) = node_ids.iter().find_map(|x| {
|
||||
self.unlocked_inner
|
||||
.find_bucket_index(*x)
|
||||
.map(|v| (*x, v.0, v.1))
|
||||
})?;
|
||||
|
||||
// Look up existing entry
|
||||
let idx = node_ids
|
||||
.iter()
|
||||
.find_map(|x| self.unlocked_inner.find_bucket_index(x));
|
||||
let noderef = {
|
||||
let bucket = &self.buckets[idx];
|
||||
let entry = bucket.entry(&node_id);
|
||||
entry.map(|e| NodeRef::new(outer_self.clone(), node_id, e, None))
|
||||
let bucket = &self.buckets[&kind][idx];
|
||||
let entry = bucket.entry(&node_id.key);
|
||||
entry.map(|e| NodeRef::new(outer_self.clone(), e, None))
|
||||
};
|
||||
|
||||
// If one doesn't exist, insert into bucket, possibly evicting a bucket member
|
||||
let noderef = match noderef {
|
||||
None => {
|
||||
// Make new entry
|
||||
self.bucket_entry_count += 1;
|
||||
let cnt = self.bucket_entry_count;
|
||||
let bucket = &mut self.buckets[idx];
|
||||
let nr = bucket.add_entry(node_id);
|
||||
self.bucket_entry_count += 1;
|
||||
let cnt = self.bucket_entry_count;
|
||||
let bucket = &mut self.buckets[idx];
|
||||
let nr = bucket.add_entry(node_id);
|
||||
|
||||
// Update the entry
|
||||
let entry = bucket.entry(&node_id).unwrap();
|
||||
entry.with_mut(self, update_func);
|
||||
// Update the entry
|
||||
let entry = bucket.entry(&node_id).unwrap();
|
||||
entry.with_mut(self, update_func);
|
||||
|
||||
// Kick the bucket
|
||||
self.unlocked_inner.kick_queue.lock().insert(idx);
|
||||
log_rtab!(debug "Routing table now has {} nodes, {} live", cnt, self.get_entry_count(RoutingDomainSet::all(), BucketEntryState::Unreliable));
|
||||
// Kick the bucket
|
||||
self.unlocked_inner.kick_queue.lock().insert(idx);
|
||||
log_rtab!(debug "Routing table now has {} nodes, {} live", cnt, self.get_entry_count(RoutingDomainSet::all(), BucketEntryState::Unreliable));
|
||||
|
||||
nr
|
||||
}
|
||||
Some(nr) => {
|
||||
// Update the entry
|
||||
let bucket = &mut self.buckets[idx];
|
||||
let entry = bucket.entry(&node_id).unwrap();
|
||||
entry.with_mut(self, update_func);
|
||||
|
||||
nr
|
||||
}
|
||||
};
|
||||
|
||||
Some(noderef)
|
||||
Some(NodeRef::new(outer_self.clone(), e, None))
|
||||
}
|
||||
|
||||
/// Resolve an existing routing table entry and return a reference to it
|
||||
pub fn lookup_node_ref(&self, outer_self: RoutingTable, node_id: PublicKey) -> Option<NodeRef> {
|
||||
if node_id == self.unlocked_inner.node_id {
|
||||
pub fn lookup_node_ref(&self, outer_self: RoutingTable, node_id: TypedKey) -> Option<NodeRef> {
|
||||
if self.unlocked_inner.matches_own_node_id(&[node_id]) {
|
||||
log_rtab!(error "can't look up own node id in routing table");
|
||||
return None;
|
||||
}
|
||||
let idx = self.unlocked_inner.find_bucket_index(node_id);
|
||||
let bucket = &self.buckets[idx];
|
||||
let (kind, idx) = self.unlocked_inner.find_bucket_index(node_id)?;
|
||||
let bucket = &self.buckets[&kind][idx];
|
||||
bucket
|
||||
.entry(&node_id)
|
||||
.map(|e| NodeRef::new(outer_self, node_id, e, None))
|
||||
.entry(&node_id.key)
|
||||
.map(|e| NodeRef::new(outer_self, e, None))
|
||||
}
|
||||
|
||||
/// Resolve an existing routing table entry and return a filtered reference to it
|
||||
pub fn lookup_and_filter_noderef(
|
||||
&self,
|
||||
outer_self: RoutingTable,
|
||||
node_id: PublicKey,
|
||||
node_id: TypedKey,
|
||||
routing_domain_set: RoutingDomainSet,
|
||||
dial_info_filter: DialInfoFilter,
|
||||
) -> Option<NodeRef> {
|
||||
@@ -611,7 +632,7 @@ impl RoutingTableInner {
|
||||
}
|
||||
|
||||
/// Resolve an existing routing table entry and call a function on its entry without using a noderef
|
||||
pub fn with_node_entry<F, R>(&self, node_id: PublicKey, f: F) -> Option<R>
|
||||
pub fn with_node_entry<F, R>(&self, node_id: TypedKey, f: F) -> Option<R>
|
||||
where
|
||||
F: FnOnce(Arc<BucketEntry>) -> R,
|
||||
{
|
||||
@@ -682,11 +703,11 @@ impl RoutingTableInner {
|
||||
pub fn register_node_with_existing_connection(
|
||||
&mut self,
|
||||
outer_self: RoutingTable,
|
||||
node_id: PublicKey,
|
||||
node_id: TypedKey,
|
||||
descriptor: ConnectionDescriptor,
|
||||
timestamp: Timestamp,
|
||||
) -> Option<NodeRef> {
|
||||
let out = self.create_node_ref(outer_self, node_id, |_rti, e| {
|
||||
let out = self.create_node_ref(outer_self, &[node_id], |_rti, e| {
|
||||
// this node is live because it literally just connected to us
|
||||
e.touch_last_seen(timestamp);
|
||||
});
|
||||
@@ -743,7 +764,7 @@ impl RoutingTableInner {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn touch_recent_peer(&mut self, node_id: PublicKey, last_connection: ConnectionDescriptor) {
|
||||
pub fn touch_recent_peer(&mut self, node_id: TypedKey, last_connection: ConnectionDescriptor) {
|
||||
self.recent_peers
|
||||
.insert(node_id, RecentPeersEntry { last_connection });
|
||||
}
|
||||
@@ -758,29 +779,27 @@ impl RoutingTableInner {
|
||||
node_count: usize,
|
||||
mut filters: VecDeque<RoutingTableEntryFilter>,
|
||||
) -> Vec<NodeRef> {
|
||||
let public_node_filter = Box::new(
|
||||
|rti: &RoutingTableInner, _k: PublicKey, v: Option<Arc<BucketEntry>>| {
|
||||
let entry = v.unwrap();
|
||||
entry.with(rti, |_rti, e| {
|
||||
// skip nodes on local network
|
||||
if e.node_info(RoutingDomain::LocalNetwork).is_some() {
|
||||
return false;
|
||||
}
|
||||
// skip nodes not on public internet
|
||||
if e.node_info(RoutingDomain::PublicInternet).is_none() {
|
||||
return false;
|
||||
}
|
||||
true
|
||||
})
|
||||
},
|
||||
) as RoutingTableEntryFilter;
|
||||
let public_node_filter = Box::new(|rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| {
|
||||
let entry = v.unwrap();
|
||||
entry.with(rti, |_rti, e| {
|
||||
// skip nodes on local network
|
||||
if e.node_info(RoutingDomain::LocalNetwork).is_some() {
|
||||
return false;
|
||||
}
|
||||
// skip nodes not on public internet
|
||||
if e.node_info(RoutingDomain::PublicInternet).is_none() {
|
||||
return false;
|
||||
}
|
||||
true
|
||||
})
|
||||
}) as RoutingTableEntryFilter;
|
||||
filters.push_front(public_node_filter);
|
||||
|
||||
self.find_fastest_nodes(
|
||||
node_count,
|
||||
filters,
|
||||
|_rti: &RoutingTableInner, k: PublicKey, v: Option<Arc<BucketEntry>>| {
|
||||
NodeRef::new(outer_self.clone(), k, v.unwrap().clone(), None)
|
||||
|_rti: &RoutingTableInner, k: TypedKey, v: Option<Arc<BucketEntry>>| {
|
||||
NodeRef::new(outer_self.clone(), v.unwrap().clone(), None)
|
||||
},
|
||||
)
|
||||
}
|
||||
@@ -789,9 +808,9 @@ impl RoutingTableInner {
|
||||
&self,
|
||||
routing_domain: RoutingDomain,
|
||||
has_valid_own_node_info: bool,
|
||||
v: Option<Arc<BucketEntry>>,
|
||||
entry: Option<Arc<BucketEntry>>,
|
||||
) -> bool {
|
||||
match v {
|
||||
match entry {
|
||||
None => has_valid_own_node_info,
|
||||
Some(entry) => entry.with(self, |_rti, e| {
|
||||
e.signed_node_info(routing_domain.into())
|
||||
@@ -805,12 +824,11 @@ impl RoutingTableInner {
|
||||
&self,
|
||||
routing_domain: RoutingDomain,
|
||||
own_peer_info: &PeerInfo,
|
||||
k: PublicKey,
|
||||
v: Option<Arc<BucketEntry>>,
|
||||
entry: Option<Arc<BucketEntry>>,
|
||||
) -> PeerInfo {
|
||||
match v {
|
||||
match entry {
|
||||
None => own_peer_info.clone(),
|
||||
Some(entry) => entry.with(self, |_rti, e| e.make_peer_info(k, routing_domain).unwrap()),
|
||||
Some(entry) => entry.with(self, |_rti, e| e.make_peer_info(routing_domain).unwrap()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -825,15 +843,14 @@ impl RoutingTableInner {
|
||||
where
|
||||
C: for<'a, 'b> FnMut(
|
||||
&'a RoutingTableInner,
|
||||
&'b (PublicKey, Option<Arc<BucketEntry>>),
|
||||
&'b (PublicKey, Option<Arc<BucketEntry>>),
|
||||
&'b Option<Arc<BucketEntry>>,
|
||||
&'b Option<Arc<BucketEntry>>,
|
||||
) -> core::cmp::Ordering,
|
||||
T: for<'r> FnMut(&'r RoutingTableInner, PublicKey, Option<Arc<BucketEntry>>) -> O,
|
||||
T: for<'r> FnMut(&'r RoutingTableInner, TypedKey, Option<Arc<BucketEntry>>) -> O,
|
||||
{
|
||||
// collect all the nodes for sorting
|
||||
let mut nodes = Vec::<(PublicKey, Option<Arc<BucketEntry>>)>::with_capacity(
|
||||
self.bucket_entry_count + 1,
|
||||
);
|
||||
let mut nodes =
|
||||
Vec::<(TypedKey, Option<Arc<BucketEntry>>)>::with_capacity(self.bucket_entry_count + 1);
|
||||
|
||||
// add our own node (only one of there with the None entry)
|
||||
let mut filtered = false;
|
||||
@@ -880,13 +897,13 @@ impl RoutingTableInner {
|
||||
transform: T,
|
||||
) -> Vec<O>
|
||||
where
|
||||
T: for<'r> FnMut(&'r RoutingTableInner, PublicKey, Option<Arc<BucketEntry>>) -> O,
|
||||
T: for<'r> FnMut(&'r RoutingTableInner, Option<Arc<BucketEntry>>) -> O,
|
||||
{
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
|
||||
// Add filter to remove dead nodes always
|
||||
let filter_dead = Box::new(
|
||||
move |rti: &RoutingTableInner, _k: PublicKey, v: Option<Arc<BucketEntry>>| {
|
||||
move |rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| {
|
||||
if let Some(entry) = &v {
|
||||
// always filter out dead nodes
|
||||
if entry.with(rti, |_rti, e| e.state(cur_ts) == BucketEntryState::Dead) {
|
||||
@@ -904,8 +921,8 @@ impl RoutingTableInner {
|
||||
|
||||
// Fastest sort
|
||||
let sort = |rti: &RoutingTableInner,
|
||||
(a_key, a_entry): &(PublicKey, Option<Arc<BucketEntry>>),
|
||||
(b_key, b_entry): &(PublicKey, Option<Arc<BucketEntry>>)| {
|
||||
(a_key, a_entry): &(TypedKey, Option<Arc<BucketEntry>>),
|
||||
(b_key, b_entry): &(TypedKey, Option<Arc<BucketEntry>>)| {
|
||||
// same nodes are always the same
|
||||
if a_key == b_key {
|
||||
return core::cmp::Ordering::Equal;
|
||||
@@ -960,12 +977,12 @@ impl RoutingTableInner {
|
||||
|
||||
pub fn find_closest_nodes<T, O>(
|
||||
&self,
|
||||
node_id: PublicKey,
|
||||
node_id: TypedKey,
|
||||
filters: VecDeque<RoutingTableEntryFilter>,
|
||||
transform: T,
|
||||
) -> Vec<O>
|
||||
where
|
||||
T: for<'r> FnMut(&'r RoutingTableInner, PublicKey, Option<Arc<BucketEntry>>) -> O,
|
||||
T: for<'r> FnMut(&'r RoutingTableInner, TypedKey, Option<Arc<BucketEntry>>) -> O,
|
||||
{
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let node_count = {
|
||||
@@ -976,8 +993,8 @@ impl RoutingTableInner {
|
||||
|
||||
// closest sort
|
||||
let sort = |rti: &RoutingTableInner,
|
||||
(a_key, a_entry): &(PublicKey, Option<Arc<BucketEntry>>),
|
||||
(b_key, b_entry): &(PublicKey, Option<Arc<BucketEntry>>)| {
|
||||
(a_key, a_entry): &(TypedKey, Option<Arc<BucketEntry>>),
|
||||
(b_key, b_entry): &(TypedKey, Option<Arc<BucketEntry>>)| {
|
||||
// same nodes are always the same
|
||||
if a_key == b_key {
|
||||
return core::cmp::Ordering::Equal;
|
||||
|
@@ -205,7 +205,7 @@ impl RoutingTable {
|
||||
for pi in peer_info {
|
||||
let k = pi.node_id.key;
|
||||
// Register the node
|
||||
if let Some(nr) = self.register_node_with_peer_info(
|
||||
if let Some(nr) = self.register_node_with_signed_node_info(
|
||||
RoutingDomain::PublicInternet,
|
||||
k,
|
||||
pi.signed_node_info,
|
||||
@@ -301,7 +301,7 @@ impl RoutingTable {
|
||||
log_rtab!("--- bootstrapping {} with {:?}", k.encode(), &v);
|
||||
|
||||
// Make invalid signed node info (no signature)
|
||||
if let Some(nr) = self.register_node_with_peer_info(
|
||||
if let Some(nr) = self.register_node_with_signed_node_info(
|
||||
RoutingDomain::PublicInternet,
|
||||
k,
|
||||
SignedNodeInfo::Direct(SignedDirectNodeInfo::with_no_signature(NodeInfo {
|
||||
|
@@ -24,7 +24,7 @@ impl RoutingTable {
|
||||
let noderefs = routing_table.find_fastest_nodes(
|
||||
min_peer_count,
|
||||
VecDeque::new(),
|
||||
|_rti, k: PublicKey, v: Option<Arc<BucketEntry>>| {
|
||||
|_rti, k: TypedKey, v: Option<Arc<BucketEntry>>| {
|
||||
NodeRef::new(routing_table.clone(), k, v.unwrap().clone(), None)
|
||||
},
|
||||
);
|
||||
|
@@ -8,7 +8,7 @@ const BACKGROUND_SAFETY_ROUTE_COUNT: usize = 2;
|
||||
|
||||
impl RoutingTable {
|
||||
/// Fastest routes sort
|
||||
fn route_sort_latency_fn(a: &(PublicKey, u64), b: &(PublicKey, u64)) -> cmp::Ordering {
|
||||
fn route_sort_latency_fn(a: &(TypedKey, u64), b: &(TypedKey, u64)) -> cmp::Ordering {
|
||||
let mut al = a.1;
|
||||
let mut bl = b.1;
|
||||
// Treat zero latency as uncalculated
|
||||
@@ -35,14 +35,14 @@ impl RoutingTable {
|
||||
///
|
||||
/// If a route doesn't 'need_testing', then we neither test nor drop it
|
||||
#[instrument(level = "trace", skip(self))]
|
||||
fn get_allocated_routes_to_test(&self, cur_ts: Timestamp) -> Vec<PublicKey> {
|
||||
fn get_allocated_routes_to_test(&self, cur_ts: Timestamp) -> Vec<TypedKey> {
|
||||
let default_route_hop_count =
|
||||
self.with_config(|c| c.network.rpc.default_route_hop_count as usize);
|
||||
|
||||
let rss = self.route_spec_store();
|
||||
let mut must_test_routes = Vec::<PublicKey>::new();
|
||||
let mut unpublished_routes = Vec::<(PublicKey, u64)>::new();
|
||||
let mut expired_routes = Vec::<PublicKey>::new();
|
||||
let mut must_test_routes = Vec::<TypedKey>::new();
|
||||
let mut unpublished_routes = Vec::<(TypedKey, u64)>::new();
|
||||
let mut expired_routes = Vec::<TypedKey>::new();
|
||||
rss.list_allocated_routes(|k, v| {
|
||||
let stats = v.get_stats();
|
||||
// Ignore nodes that don't need testing
|
||||
@@ -95,7 +95,7 @@ impl RoutingTable {
|
||||
async fn test_route_set(
|
||||
&self,
|
||||
stop_token: StopToken,
|
||||
routes_needing_testing: Vec<PublicKey>,
|
||||
routes_needing_testing: Vec<TypedKey>,
|
||||
) -> EyreResult<()> {
|
||||
if routes_needing_testing.is_empty() {
|
||||
return Ok(());
|
||||
@@ -107,7 +107,7 @@ impl RoutingTable {
|
||||
#[derive(Default, Debug)]
|
||||
struct TestRouteContext {
|
||||
failed: bool,
|
||||
dead_routes: Vec<PublicKey>,
|
||||
dead_routes: Vec<TypedKey>,
|
||||
}
|
||||
|
||||
let mut unord = FuturesUnordered::new();
|
||||
|
Reference in New Issue
Block a user