diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index 2a4116c2..eaadbb87 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -119,7 +119,7 @@ struct RouteHopData { } struct RouteHop { - dialInfo @0 :NodeDialInfo; # dial info for this hop + dialInfo @0 :NodeDialInfo; # dial info for this hop nextHop @1 :RouteHopData; # Optional: next hop in encrypted blob # Null means no next hop, at destination (only used in private route, safety routes must enclose a stub private route) } @@ -188,7 +188,7 @@ struct SenderInfo { struct OperationInfoA { nodeInfo @0 :NodeInfo; # returned node information - senderInfo @1 :SenderInfo; # info about InfoQ sender + senderInfo @1 :SenderInfo; # info about InfoQ sender from the perspective of the replier } struct OperationValidateDialInfo { @@ -204,7 +204,7 @@ struct OperationReturnReceipt { struct OperationFindNodeQ { nodeId @0 :NodeID; # node id to locate - peerInfo @1 :PeerInfo; # The peer info for node asking the question + dialInfoList @1 :List(DialInfo); # dial info for the node asking the question } struct PeerInfo { @@ -368,7 +368,7 @@ struct Operation { respondTo :union { none @1 :Void; # no response is desired - sender @2 :Void; # envelope sender node id to be used for reply + sender @2 :DialInfo; # (Optional) envelope sender node id to be used for reply # possibly through a relay if the request arrived that way privateRoute @3 :PrivateRoute; # embedded private route to be used for reply } @@ -399,7 +399,7 @@ struct Operation { signalQ @21 :OperationSignalQ; signalA @22 :OperationSignalA; - returnReceipt @23 :OperationReturnReceipt; + returnReceipt @23 :OperationReturnReceipt; # Tunnel operations startTunnelQ @24 :OperationStartTunnelQ; diff --git a/veilid-core/src/attachment_manager.rs b/veilid-core/src/attachment_manager.rs index 39b1a9a0..f0839a76 100644 --- a/veilid-core/src/attachment_manager.rs +++ b/veilid-core/src/attachment_manager.rs @@ -2,6 +2,7 @@ use crate::callback_state_machine::*; use crate::dht::crypto::Crypto; use crate::intf::*; use crate::network_manager::*; +use crate::routing_table::*; use crate::xx::*; use crate::*; use core::convert::TryFrom; @@ -106,8 +107,8 @@ pub struct AttachmentManagerInner { attachment_machine: CallbackStateMachine, network_manager: NetworkManager, maintain_peers: bool, - peer_count: u32, attach_timestamp: Option, + update_callback: Option, attachment_maintainer_jh: Option>, } @@ -127,8 +128,8 @@ impl AttachmentManager { attachment_machine: CallbackStateMachine::new(), network_manager: NetworkManager::new(config, table_store, crypto), maintain_peers: false, - peer_count: 0, attach_timestamp: None, + update_callback: None, attachment_maintainer_jh: None, } } @@ -159,24 +160,30 @@ impl AttachmentManager { self.inner.lock().attach_timestamp } - pub fn get_peer_count(&self) -> u32 { - self.inner.lock().peer_count - } - - fn translate_peer_input(cur: u32, max: u32) -> AttachmentInput { - if cur > max { + fn translate_routing_table_health( + health: RoutingTableHealth, + config: &VeilidConfigRoutingTable, + ) -> AttachmentInput { + if health.reliable_entry_count >= config.limit_over_attached.try_into().unwrap() { return AttachmentInput::TooManyPeers; } - match cmp::min(4, 4 * cur / max) { - 4 => AttachmentInput::FullPeers, - 3 => AttachmentInput::StrongPeers, - 2 => AttachmentInput::GoodPeers, - 1 => AttachmentInput::WeakPeers, - 0 => AttachmentInput::NoPeers, - _ => panic!("Invalid state"), + if health.reliable_entry_count >= config.limit_fully_attached.try_into().unwrap() { + return AttachmentInput::FullPeers; } + if health.reliable_entry_count >= config.limit_attached_strong.try_into().unwrap() { + return AttachmentInput::StrongPeers; + } + if health.reliable_entry_count >= config.limit_attached_good.try_into().unwrap() { + return AttachmentInput::GoodPeers; + } + if health.reliable_entry_count >= config.limit_attached_weak.try_into().unwrap() + || health.unreliable_entry_count >= config.limit_attached_weak.try_into().unwrap() + { + return AttachmentInput::WeakPeers; + } + AttachmentInput::NoPeers } - fn translate_peer_state(state: &AttachmentState) -> AttachmentInput { + fn translate_attachment_state(state: &AttachmentState) -> AttachmentInput { match state { AttachmentState::OverAttached => AttachmentInput::TooManyPeers, AttachmentState::FullyAttached => AttachmentInput::FullPeers, @@ -188,19 +195,20 @@ impl AttachmentManager { } } - async fn update_peer_count(&self) { + async fn update_attachment(&self) { let new_peer_state_input = { let inner = self.inner.lock(); let old_peer_state_input = - AttachmentManager::translate_peer_state(&inner.attachment_machine.state()); + AttachmentManager::translate_attachment_state(&inner.attachment_machine.state()); - let max_connections = inner.config.get().network.max_connections; - - // get active peer count from routing table + // get reliable peer count from routing table + let routing_table = inner.network_manager.routing_table(); + let health = routing_table.get_routing_table_health(); + let routing_table_config = &inner.config.get().network.routing_table; let new_peer_state_input = - AttachmentManager::translate_peer_input(inner.peer_count, max_connections); + AttachmentManager::translate_routing_table_health(health, routing_table_config); if old_peer_state_input == new_peer_state_input { None @@ -238,8 +246,7 @@ impl AttachmentManager { break; } - // xxx: ?update peer count? - self.update_peer_count().await; + self.update_attachment().await; // sleep should be at the end in case maintain_peers changes state intf::sleep(1000).await; @@ -259,16 +266,16 @@ impl AttachmentManager { self.inner.lock().attach_timestamp = None; } - pub async fn init( - &self, - state_change_callback: StateChangeCallback, - ) -> Result<(), String> { + pub async fn init(&self, update_callback: UpdateCallback) -> Result<(), String> { trace!("init"); let network_manager = { - let inner = self.inner.lock(); - inner - .attachment_machine - .set_state_change_callback(state_change_callback); + let mut inner = self.inner.lock(); + inner.update_callback = Some(update_callback.clone()); + inner.attachment_machine.set_state_change_callback(Arc::new( + move |_old_state: AttachmentState, new_state: AttachmentState| { + update_callback(VeilidUpdate::Attachment { state: new_state }) + }, + )); inner.network_manager.clone() }; @@ -284,6 +291,8 @@ impl AttachmentManager { inner.network_manager.clone() }; network_manager.terminate().await; + let mut inner = self.inner.lock(); + inner.update_callback = None; } fn attach(&self) { diff --git a/veilid-core/src/connection_manager.rs b/veilid-core/src/connection_manager.rs index 67e5feb3..29234728 100644 --- a/veilid-core/src/connection_manager.rs +++ b/veilid-core/src/connection_manager.rs @@ -70,7 +70,7 @@ impl ConnectionManager { pub async fn startup(&self) { trace!("startup connection manager"); let mut inner = self.arc.inner.lock().await; - let cac = flume::bounded(CONNECTION_PROCESSOR_CHANNEL_SIZE); // xxx move to config + let cac = flume::bounded(CONNECTION_PROCESSOR_CHANNEL_SIZE); inner.connection_add_channel_tx = Some(cac.0); let rx = cac.1.clone(); let this = self.clone(); diff --git a/veilid-core/src/core_context.rs b/veilid-core/src/core_context.rs index a7ecafae..3794f757 100644 --- a/veilid-core/src/core_context.rs +++ b/veilid-core/src/core_context.rs @@ -115,16 +115,9 @@ impl ServicesContext { // Set up attachment manager trace!("init attachment manager"); - let update_callback_move = self.update_callback.clone(); + let update_callback = self.update_callback.clone(); let attachment_manager = AttachmentManager::new(self.config.clone(), table_store, crypto); - if let Err(e) = attachment_manager - .init(Arc::new( - move |_old_state: AttachmentState, new_state: AttachmentState| { - update_callback_move(VeilidUpdate::Attachment { state: new_state }) - }, - )) - .await - { + if let Err(e) = attachment_manager.init(update_callback).await { self.shutdown().await; return Err(VeilidAPIError::Internal { message: e }); } diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index ee9239ba..468f4b77 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -60,6 +60,13 @@ struct RoutingTableInner { self_transfer_stats: TransferStatsDownUp, } +#[derive(Clone, Debug, Default)] +pub struct RoutingTableHealth { + pub reliable_entry_count: usize, + pub unreliable_entry_count: usize, + pub dead_entry_count: usize, +} + struct RoutingTableUnlockedInner { // Background processes rolling_transfers_task: TickTask, @@ -743,4 +750,29 @@ impl RoutingTable { e.question_lost(ts); }) } + + ////////////////////////////////////////////////////////////////////// + // Routing Table Health Metrics + + pub fn get_routing_table_health(&self) -> RoutingTableHealth { + let mut health = RoutingTableHealth::default(); + let cur_ts = intf::get_timestamp(); + let inner = self.inner.lock(); + for bucket in &inner.buckets { + for entry in bucket.entries() { + match entry.1.state(cur_ts) { + BucketEntryState::Reliable => { + health.reliable_entry_count += 1; + } + BucketEntryState::Unreliable => { + health.unreliable_entry_count += 1; + } + BucketEntryState::Dead => { + health.dead_entry_count += 1; + } + } + } + } + health + } } diff --git a/veilid-core/src/tests/common/test_veilid_config.rs b/veilid-core/src/tests/common/test_veilid_config.rs index 7bcc8580..1bf1b79f 100644 --- a/veilid-core/src/tests/common/test_veilid_config.rs +++ b/veilid-core/src/tests/common/test_veilid_config.rs @@ -191,6 +191,11 @@ fn config_callback(key: String) -> ConfigCallbackReturn { "network.node_id" => Ok(Box::new(dht::key::DHTKey::default())), "network.node_id_secret" => Ok(Box::new(dht::key::DHTKeySecret::default())), "network.bootstrap" => Ok(Box::new(Vec::::new())), + "network.routing_table.limit_over_attached" => Ok(Box::new(64u32)), + "network.routing_table.limit_fully_attached" => Ok(Box::new(32u32)), + "network.routing_table.limit_attached_strong" => Ok(Box::new(16u32)), + "network.routing_table.limit_attached_good" => Ok(Box::new(8u32)), + "network.routing_table.limit_attached_weak" => Ok(Box::new(4u32)), "network.rpc.concurrency" => Ok(Box::new(2u32)), "network.rpc.queue_size" => Ok(Box::new(128u32)), "network.rpc.max_timestamp_behind_ms" => Ok(Box::new(Some(10_000u32))), @@ -300,6 +305,12 @@ pub async fn test_config() { assert_eq!(inner.network.rpc.queue_size, 128u32); assert_eq!(inner.network.rpc.timeout_ms, 10_000u32); assert_eq!(inner.network.rpc.max_route_hop_count, 7u8); + assert_eq!(inner.network.routing_table.limit_over_attached, 64u32); + assert_eq!(inner.network.routing_table.limit_fully_attached, 32u32); + assert_eq!(inner.network.routing_table.limit_attached_strong, 16u32); + assert_eq!(inner.network.routing_table.limit_attached_good, 8u32); + assert_eq!(inner.network.routing_table.limit_attached_weak, 4u32); + assert_eq!( inner.network.dht.resolve_node_timeout_ms, Option::::None diff --git a/veilid-core/src/veilid_config.rs b/veilid-core/src/veilid_config.rs index 36d7d409..6a6af1af 100644 --- a/veilid-core/src/veilid_config.rs +++ b/veilid-core/src/veilid_config.rs @@ -104,6 +104,7 @@ pub struct VeilidConfigDHT { pub min_peer_count: u32, pub min_peer_refresh_time_ms: u32, pub validate_dial_info_receipt_time_ms: u32, + pub nearby_node_percentage: u32, } #[derive(Default, Clone, Serialize, Deserialize)] @@ -123,6 +124,14 @@ pub struct VeilidConfigLeases { pub max_client_signal_leases: u32, pub max_client_relay_leases: u32, } +#[derive(Default, Clone, Serialize, Deserialize)] +pub struct VeilidConfigRoutingTable { + pub limit_over_attached: u32, + pub limit_fully_attached: u32, + pub limit_attached_strong: u32, + pub limit_attached_good: u32, + pub limit_attached_weak: u32, +} #[derive(Default, Clone, Serialize, Deserialize)] pub struct VeilidConfigNetwork { @@ -132,6 +141,7 @@ pub struct VeilidConfigNetwork { pub node_id: key::DHTKey, pub node_id_secret: key::DHTKeySecret, pub bootstrap: Vec, + pub routing_table: VeilidConfigRoutingTable, pub rpc: VeilidConfigRPC, pub dht: VeilidConfigDHT, pub upnp: bool, @@ -285,6 +295,11 @@ impl VeilidConfig { get_config!(inner.network.connection_initial_timeout_ms); get_config!(inner.network.connection_inactivity_timeout_ms); get_config!(inner.network.bootstrap); + get_config!(inner.network.routing_table.limit_over_attached); + get_config!(inner.network.routing_table.limit_fully_attached); + get_config!(inner.network.routing_table.limit_attached_strong); + get_config!(inner.network.routing_table.limit_attached_good); + get_config!(inner.network.routing_table.limit_attached_weak); get_config!(inner.network.dht.resolve_node_timeout_ms); get_config!(inner.network.dht.resolve_node_count); get_config!(inner.network.dht.resolve_node_fanout); diff --git a/veilid-flutter/example/lib/config.dart b/veilid-flutter/example/lib/config.dart index 61a32bc2..a197bca8 100644 --- a/veilid-flutter/example/lib/config.dart +++ b/veilid-flutter/example/lib/config.dart @@ -44,6 +44,13 @@ Future getDefaultVeilidConfig() async { nodeId: "", nodeIdSecret: "", bootstrap: [], + routingTable: VeilidConfigRoutingTable( + limitOverAttached: 64, + limitFullyAttached: 32, + limitAttachedStrong: 16, + limitAttachedGood: 8, + limitAttachedWeak: 4, + ), rpc: VeilidConfigRPC( concurrency: 0, queueSize: 1024, diff --git a/veilid-flutter/lib/veilid.dart b/veilid-flutter/lib/veilid.dart index 62a12546..cf56bc72 100644 --- a/veilid-flutter/lib/veilid.dart +++ b/veilid-flutter/lib/veilid.dart @@ -483,6 +483,41 @@ class VeilidConfigRPC { //////////// +class VeilidConfigRoutingTable { + int limitOverAttached; + int limitFullyAttached; + int limitAttachedStrong; + int limitAttachedGood; + int limitAttachedWeak; + + VeilidConfigRoutingTable({ + required this.limitOverAttached, + required this.limitFullyAttached, + required this.limitAttachedStrong, + required this.limitAttachedGood, + required this.limitAttachedWeak, + }); + + Map get json { + return { + 'limit_over_attached': limitOverAttached, + 'limit_fully_attached': limitFullyAttached, + 'limit_attached_strong': limitAttachedStrong, + 'limit_attached_good': limitAttachedGood, + 'limit_attached_weak': limitAttachedWeak, + }; + } + + VeilidConfigRoutingTable.fromJson(Map json) + : limitOverAttached = json['limit_over_attached'], + limitFullyAttached = json['limit_fully_attached'], + limitAttachedStrong = json['limit_attached_strong'], + limitAttachedGood = json['limit_attached_good'], + limitAttachedWeak = json['limit_attached_weak']; +} + +//////////// + class VeilidConfigLeases { int maxServerSignalLeases; int maxServerRelayLeases; @@ -520,6 +555,7 @@ class VeilidConfigNetwork { String nodeId; String nodeIdSecret; List bootstrap; + VeilidConfigRoutingTable routingTable; VeilidConfigRPC rpc; VeilidConfigDHT dht; bool upnp; @@ -538,6 +574,7 @@ class VeilidConfigNetwork { required this.nodeId, required this.nodeIdSecret, required this.bootstrap, + required this.routingTable, required this.rpc, required this.dht, required this.upnp, @@ -558,6 +595,7 @@ class VeilidConfigNetwork { 'node_id': nodeId, 'node_id_secret': nodeIdSecret, 'bootstrap': bootstrap, + 'routing_table': routingTable.json, 'rpc': rpc.json, 'dht': dht.json, 'upnp': upnp, @@ -579,6 +617,7 @@ class VeilidConfigNetwork { nodeId = json['node_id'], nodeIdSecret = json['node_id_secret'], bootstrap = json['bootstrap'], + routingTable = VeilidConfigRoutingTable.fromJson(json['routing_table']), rpc = VeilidConfigRPC.fromJson(json['rpc']), dht = VeilidConfigDHT.fromJson(json['dht']), upnp = json['upnp'], diff --git a/veilid-server/src/settings.rs b/veilid-server/src/settings.rs index 78e50139..873090f8 100644 --- a/veilid-server/src/settings.rs +++ b/veilid-server/src/settings.rs @@ -54,6 +54,12 @@ core: node_id: '' node_id_secret: '' bootstrap: [] + routing_table: + limit_over_attached: 64 + limit_fully_attached: 32 + limit_attached_strong: 16 + limit_attached_good: 8 + limit_attached_weak: 4 rpc: concurrency: 0 queue_size: 1024 @@ -520,6 +526,15 @@ pub struct Leases { pub max_client_relay_leases: u32, } +#[derive(Debug, Deserialize, Serialize)] +pub struct RoutingTable { + pub limit_over_attached: u32, + pub limit_fully_attached: u32, + pub limit_attached_strong: u32, + pub limit_attached_good: u32, + pub limit_attached_weak: u32, +} + #[derive(Debug, Deserialize, Serialize)] pub struct Network { pub max_connections: u32, @@ -528,6 +543,7 @@ pub struct Network { pub node_id: veilid_core::DHTKey, pub node_id_secret: veilid_core::DHTKeySecret, pub bootstrap: Vec, + pub routing_table: RoutingTable, pub rpc: Rpc, pub dht: Dht, pub upnp: bool, @@ -811,6 +827,21 @@ impl Settings { .map(|e| e.node_dial_info_string) .collect::>(), )), + "network.routing_table.limit_over_attached" => Ok(Box::new( + inner.core.network.routing_table.limit_over_attached, + )), + "network.routing_table.limit_fully_attached" => Ok(Box::new( + inner.core.network.routing_table.limit_fully_attached, + )), + "network.routing_table.limit_attached_strong" => Ok(Box::new( + inner.core.network.routing_table.limit_attached_strong, + )), + "network.routing_table.limit_attached_good" => Ok(Box::new( + inner.core.network.routing_table.limit_attached_good, + )), + "network.routing_table.limit_attached_weak" => Ok(Box::new( + inner.core.network.routing_table.limit_attached_weak, + )), "network.rpc.concurrency" => Ok(Box::new(inner.core.network.rpc.concurrency)), "network.rpc.queue_size" => Ok(Box::new(inner.core.network.rpc.queue_size)), "network.rpc.max_timestamp_behind_ms" => {