peer table thresholds
This commit is contained in:
		@@ -119,7 +119,7 @@ struct RouteHopData {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
struct RouteHop {
 | 
			
		||||
    dialInfo                @0  :NodeDialInfo;    # dial info for this hop
 | 
			
		||||
    dialInfo                @0  :NodeDialInfo;          # dial info for this hop
 | 
			
		||||
    nextHop                 @1  :RouteHopData;          # Optional: next hop in encrypted blob 
 | 
			
		||||
                                                        # Null means no next hop, at destination (only used in private route, safety routes must enclose a stub private route)
 | 
			
		||||
}
 | 
			
		||||
@@ -188,7 +188,7 @@ struct SenderInfo {
 | 
			
		||||
 | 
			
		||||
struct OperationInfoA {
 | 
			
		||||
    nodeInfo                @0  :NodeInfo;              # returned node information
 | 
			
		||||
    senderInfo              @1  :SenderInfo;            # info about InfoQ sender
 | 
			
		||||
    senderInfo              @1  :SenderInfo;            # info about InfoQ sender from the perspective of the replier
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
struct OperationValidateDialInfo {
 | 
			
		||||
@@ -204,7 +204,7 @@ struct OperationReturnReceipt {
 | 
			
		||||
 | 
			
		||||
struct OperationFindNodeQ {    
 | 
			
		||||
    nodeId                  @0  :NodeID;                # node id to locate
 | 
			
		||||
    peerInfo                @1  :PeerInfo;              # The peer info for node asking the question
 | 
			
		||||
    dialInfoList            @1  :List(DialInfo);        # dial info for the node asking the question
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
struct PeerInfo {
 | 
			
		||||
@@ -368,7 +368,7 @@ struct Operation {
 | 
			
		||||
 | 
			
		||||
    respondTo :union {
 | 
			
		||||
        none                @1  :Void;                  # no response is desired
 | 
			
		||||
        sender              @2  :Void;                  # envelope sender node id to be used for reply
 | 
			
		||||
        sender              @2  :DialInfo;              # (Optional) envelope sender node id to be used for reply
 | 
			
		||||
                                                        # possibly through a relay if the request arrived that way
 | 
			
		||||
        privateRoute        @3  :PrivateRoute;          # embedded private route to be used for reply
 | 
			
		||||
    }                              
 | 
			
		||||
@@ -399,7 +399,7 @@ struct Operation {
 | 
			
		||||
        signalQ             @21 :OperationSignalQ;
 | 
			
		||||
        signalA             @22 :OperationSignalA;
 | 
			
		||||
        
 | 
			
		||||
        returnReceipt       @23  :OperationReturnReceipt;
 | 
			
		||||
        returnReceipt       @23 :OperationReturnReceipt;
 | 
			
		||||
        
 | 
			
		||||
        # Tunnel operations
 | 
			
		||||
        startTunnelQ        @24 :OperationStartTunnelQ;
 | 
			
		||||
 
 | 
			
		||||
@@ -2,6 +2,7 @@ use crate::callback_state_machine::*;
 | 
			
		||||
use crate::dht::crypto::Crypto;
 | 
			
		||||
use crate::intf::*;
 | 
			
		||||
use crate::network_manager::*;
 | 
			
		||||
use crate::routing_table::*;
 | 
			
		||||
use crate::xx::*;
 | 
			
		||||
use crate::*;
 | 
			
		||||
use core::convert::TryFrom;
 | 
			
		||||
@@ -106,8 +107,8 @@ pub struct AttachmentManagerInner {
 | 
			
		||||
    attachment_machine: CallbackStateMachine<Attachment>,
 | 
			
		||||
    network_manager: NetworkManager,
 | 
			
		||||
    maintain_peers: bool,
 | 
			
		||||
    peer_count: u32,
 | 
			
		||||
    attach_timestamp: Option<u64>,
 | 
			
		||||
    update_callback: Option<UpdateCallback>,
 | 
			
		||||
    attachment_maintainer_jh: Option<JoinHandle<()>>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -127,8 +128,8 @@ impl AttachmentManager {
 | 
			
		||||
            attachment_machine: CallbackStateMachine::new(),
 | 
			
		||||
            network_manager: NetworkManager::new(config, table_store, crypto),
 | 
			
		||||
            maintain_peers: false,
 | 
			
		||||
            peer_count: 0,
 | 
			
		||||
            attach_timestamp: None,
 | 
			
		||||
            update_callback: None,
 | 
			
		||||
            attachment_maintainer_jh: None,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@@ -159,24 +160,30 @@ impl AttachmentManager {
 | 
			
		||||
        self.inner.lock().attach_timestamp
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn get_peer_count(&self) -> u32 {
 | 
			
		||||
        self.inner.lock().peer_count
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn translate_peer_input(cur: u32, max: u32) -> AttachmentInput {
 | 
			
		||||
        if cur > max {
 | 
			
		||||
    fn translate_routing_table_health(
 | 
			
		||||
        health: RoutingTableHealth,
 | 
			
		||||
        config: &VeilidConfigRoutingTable,
 | 
			
		||||
    ) -> AttachmentInput {
 | 
			
		||||
        if health.reliable_entry_count >= config.limit_over_attached.try_into().unwrap() {
 | 
			
		||||
            return AttachmentInput::TooManyPeers;
 | 
			
		||||
        }
 | 
			
		||||
        match cmp::min(4, 4 * cur / max) {
 | 
			
		||||
            4 => AttachmentInput::FullPeers,
 | 
			
		||||
            3 => AttachmentInput::StrongPeers,
 | 
			
		||||
            2 => AttachmentInput::GoodPeers,
 | 
			
		||||
            1 => AttachmentInput::WeakPeers,
 | 
			
		||||
            0 => AttachmentInput::NoPeers,
 | 
			
		||||
            _ => panic!("Invalid state"),
 | 
			
		||||
        if health.reliable_entry_count >= config.limit_fully_attached.try_into().unwrap() {
 | 
			
		||||
            return AttachmentInput::FullPeers;
 | 
			
		||||
        }
 | 
			
		||||
        if health.reliable_entry_count >= config.limit_attached_strong.try_into().unwrap() {
 | 
			
		||||
            return AttachmentInput::StrongPeers;
 | 
			
		||||
        }
 | 
			
		||||
        if health.reliable_entry_count >= config.limit_attached_good.try_into().unwrap() {
 | 
			
		||||
            return AttachmentInput::GoodPeers;
 | 
			
		||||
        }
 | 
			
		||||
        if health.reliable_entry_count >= config.limit_attached_weak.try_into().unwrap()
 | 
			
		||||
            || health.unreliable_entry_count >= config.limit_attached_weak.try_into().unwrap()
 | 
			
		||||
        {
 | 
			
		||||
            return AttachmentInput::WeakPeers;
 | 
			
		||||
        }
 | 
			
		||||
        AttachmentInput::NoPeers
 | 
			
		||||
    }
 | 
			
		||||
    fn translate_peer_state(state: &AttachmentState) -> AttachmentInput {
 | 
			
		||||
    fn translate_attachment_state(state: &AttachmentState) -> AttachmentInput {
 | 
			
		||||
        match state {
 | 
			
		||||
            AttachmentState::OverAttached => AttachmentInput::TooManyPeers,
 | 
			
		||||
            AttachmentState::FullyAttached => AttachmentInput::FullPeers,
 | 
			
		||||
@@ -188,19 +195,20 @@ impl AttachmentManager {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn update_peer_count(&self) {
 | 
			
		||||
    async fn update_attachment(&self) {
 | 
			
		||||
        let new_peer_state_input = {
 | 
			
		||||
            let inner = self.inner.lock();
 | 
			
		||||
 | 
			
		||||
            let old_peer_state_input =
 | 
			
		||||
                AttachmentManager::translate_peer_state(&inner.attachment_machine.state());
 | 
			
		||||
                AttachmentManager::translate_attachment_state(&inner.attachment_machine.state());
 | 
			
		||||
 | 
			
		||||
            let max_connections = inner.config.get().network.max_connections;
 | 
			
		||||
 | 
			
		||||
            // get active peer count from routing table
 | 
			
		||||
            // get reliable peer count from routing table
 | 
			
		||||
            let routing_table = inner.network_manager.routing_table();
 | 
			
		||||
            let health = routing_table.get_routing_table_health();
 | 
			
		||||
            let routing_table_config = &inner.config.get().network.routing_table;
 | 
			
		||||
 | 
			
		||||
            let new_peer_state_input =
 | 
			
		||||
                AttachmentManager::translate_peer_input(inner.peer_count, max_connections);
 | 
			
		||||
                AttachmentManager::translate_routing_table_health(health, routing_table_config);
 | 
			
		||||
 | 
			
		||||
            if old_peer_state_input == new_peer_state_input {
 | 
			
		||||
                None
 | 
			
		||||
@@ -238,8 +246,7 @@ impl AttachmentManager {
 | 
			
		||||
                    break;
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                // xxx: ?update peer count?
 | 
			
		||||
                self.update_peer_count().await;
 | 
			
		||||
                self.update_attachment().await;
 | 
			
		||||
 | 
			
		||||
                // sleep should be at the end in case maintain_peers changes state
 | 
			
		||||
                intf::sleep(1000).await;
 | 
			
		||||
@@ -259,16 +266,16 @@ impl AttachmentManager {
 | 
			
		||||
        self.inner.lock().attach_timestamp = None;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn init(
 | 
			
		||||
        &self,
 | 
			
		||||
        state_change_callback: StateChangeCallback<Attachment>,
 | 
			
		||||
    ) -> Result<(), String> {
 | 
			
		||||
    pub async fn init(&self, update_callback: UpdateCallback) -> Result<(), String> {
 | 
			
		||||
        trace!("init");
 | 
			
		||||
        let network_manager = {
 | 
			
		||||
            let inner = self.inner.lock();
 | 
			
		||||
            inner
 | 
			
		||||
                .attachment_machine
 | 
			
		||||
                .set_state_change_callback(state_change_callback);
 | 
			
		||||
            let mut inner = self.inner.lock();
 | 
			
		||||
            inner.update_callback = Some(update_callback.clone());
 | 
			
		||||
            inner.attachment_machine.set_state_change_callback(Arc::new(
 | 
			
		||||
                move |_old_state: AttachmentState, new_state: AttachmentState| {
 | 
			
		||||
                    update_callback(VeilidUpdate::Attachment { state: new_state })
 | 
			
		||||
                },
 | 
			
		||||
            ));
 | 
			
		||||
            inner.network_manager.clone()
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
@@ -284,6 +291,8 @@ impl AttachmentManager {
 | 
			
		||||
            inner.network_manager.clone()
 | 
			
		||||
        };
 | 
			
		||||
        network_manager.terminate().await;
 | 
			
		||||
        let mut inner = self.inner.lock();
 | 
			
		||||
        inner.update_callback = None;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn attach(&self) {
 | 
			
		||||
 
 | 
			
		||||
@@ -70,7 +70,7 @@ impl ConnectionManager {
 | 
			
		||||
    pub async fn startup(&self) {
 | 
			
		||||
        trace!("startup connection manager");
 | 
			
		||||
        let mut inner = self.arc.inner.lock().await;
 | 
			
		||||
        let cac = flume::bounded(CONNECTION_PROCESSOR_CHANNEL_SIZE); // xxx move to config
 | 
			
		||||
        let cac = flume::bounded(CONNECTION_PROCESSOR_CHANNEL_SIZE);
 | 
			
		||||
        inner.connection_add_channel_tx = Some(cac.0);
 | 
			
		||||
        let rx = cac.1.clone();
 | 
			
		||||
        let this = self.clone();
 | 
			
		||||
 
 | 
			
		||||
@@ -115,16 +115,9 @@ impl ServicesContext {
 | 
			
		||||
 | 
			
		||||
        // Set up attachment manager
 | 
			
		||||
        trace!("init attachment manager");
 | 
			
		||||
        let update_callback_move = self.update_callback.clone();
 | 
			
		||||
        let update_callback = self.update_callback.clone();
 | 
			
		||||
        let attachment_manager = AttachmentManager::new(self.config.clone(), table_store, crypto);
 | 
			
		||||
        if let Err(e) = attachment_manager
 | 
			
		||||
            .init(Arc::new(
 | 
			
		||||
                move |_old_state: AttachmentState, new_state: AttachmentState| {
 | 
			
		||||
                    update_callback_move(VeilidUpdate::Attachment { state: new_state })
 | 
			
		||||
                },
 | 
			
		||||
            ))
 | 
			
		||||
            .await
 | 
			
		||||
        {
 | 
			
		||||
        if let Err(e) = attachment_manager.init(update_callback).await {
 | 
			
		||||
            self.shutdown().await;
 | 
			
		||||
            return Err(VeilidAPIError::Internal { message: e });
 | 
			
		||||
        }
 | 
			
		||||
 
 | 
			
		||||
@@ -60,6 +60,13 @@ struct RoutingTableInner {
 | 
			
		||||
    self_transfer_stats: TransferStatsDownUp,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Clone, Debug, Default)]
 | 
			
		||||
pub struct RoutingTableHealth {
 | 
			
		||||
    pub reliable_entry_count: usize,
 | 
			
		||||
    pub unreliable_entry_count: usize,
 | 
			
		||||
    pub dead_entry_count: usize,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
struct RoutingTableUnlockedInner {
 | 
			
		||||
    // Background processes
 | 
			
		||||
    rolling_transfers_task: TickTask,
 | 
			
		||||
@@ -743,4 +750,29 @@ impl RoutingTable {
 | 
			
		||||
            e.question_lost(ts);
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    //////////////////////////////////////////////////////////////////////
 | 
			
		||||
    // Routing Table Health Metrics
 | 
			
		||||
 | 
			
		||||
    pub fn get_routing_table_health(&self) -> RoutingTableHealth {
 | 
			
		||||
        let mut health = RoutingTableHealth::default();
 | 
			
		||||
        let cur_ts = intf::get_timestamp();
 | 
			
		||||
        let inner = self.inner.lock();
 | 
			
		||||
        for bucket in &inner.buckets {
 | 
			
		||||
            for entry in bucket.entries() {
 | 
			
		||||
                match entry.1.state(cur_ts) {
 | 
			
		||||
                    BucketEntryState::Reliable => {
 | 
			
		||||
                        health.reliable_entry_count += 1;
 | 
			
		||||
                    }
 | 
			
		||||
                    BucketEntryState::Unreliable => {
 | 
			
		||||
                        health.unreliable_entry_count += 1;
 | 
			
		||||
                    }
 | 
			
		||||
                    BucketEntryState::Dead => {
 | 
			
		||||
                        health.dead_entry_count += 1;
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        health
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -191,6 +191,11 @@ fn config_callback(key: String) -> ConfigCallbackReturn {
 | 
			
		||||
        "network.node_id" => Ok(Box::new(dht::key::DHTKey::default())),
 | 
			
		||||
        "network.node_id_secret" => Ok(Box::new(dht::key::DHTKeySecret::default())),
 | 
			
		||||
        "network.bootstrap" => Ok(Box::new(Vec::<String>::new())),
 | 
			
		||||
        "network.routing_table.limit_over_attached" => Ok(Box::new(64u32)),
 | 
			
		||||
        "network.routing_table.limit_fully_attached" => Ok(Box::new(32u32)),
 | 
			
		||||
        "network.routing_table.limit_attached_strong" => Ok(Box::new(16u32)),
 | 
			
		||||
        "network.routing_table.limit_attached_good" => Ok(Box::new(8u32)),
 | 
			
		||||
        "network.routing_table.limit_attached_weak" => Ok(Box::new(4u32)),
 | 
			
		||||
        "network.rpc.concurrency" => Ok(Box::new(2u32)),
 | 
			
		||||
        "network.rpc.queue_size" => Ok(Box::new(128u32)),
 | 
			
		||||
        "network.rpc.max_timestamp_behind_ms" => Ok(Box::new(Some(10_000u32))),
 | 
			
		||||
@@ -300,6 +305,12 @@ pub async fn test_config() {
 | 
			
		||||
    assert_eq!(inner.network.rpc.queue_size, 128u32);
 | 
			
		||||
    assert_eq!(inner.network.rpc.timeout_ms, 10_000u32);
 | 
			
		||||
    assert_eq!(inner.network.rpc.max_route_hop_count, 7u8);
 | 
			
		||||
    assert_eq!(inner.network.routing_table.limit_over_attached, 64u32);
 | 
			
		||||
    assert_eq!(inner.network.routing_table.limit_fully_attached, 32u32);
 | 
			
		||||
    assert_eq!(inner.network.routing_table.limit_attached_strong, 16u32);
 | 
			
		||||
    assert_eq!(inner.network.routing_table.limit_attached_good, 8u32);
 | 
			
		||||
    assert_eq!(inner.network.routing_table.limit_attached_weak, 4u32);
 | 
			
		||||
 | 
			
		||||
    assert_eq!(
 | 
			
		||||
        inner.network.dht.resolve_node_timeout_ms,
 | 
			
		||||
        Option::<u32>::None
 | 
			
		||||
 
 | 
			
		||||
@@ -104,6 +104,7 @@ pub struct VeilidConfigDHT {
 | 
			
		||||
    pub min_peer_count: u32,
 | 
			
		||||
    pub min_peer_refresh_time_ms: u32,
 | 
			
		||||
    pub validate_dial_info_receipt_time_ms: u32,
 | 
			
		||||
    pub nearby_node_percentage: u32,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Default, Clone, Serialize, Deserialize)]
 | 
			
		||||
@@ -123,6 +124,14 @@ pub struct VeilidConfigLeases {
 | 
			
		||||
    pub max_client_signal_leases: u32,
 | 
			
		||||
    pub max_client_relay_leases: u32,
 | 
			
		||||
}
 | 
			
		||||
#[derive(Default, Clone, Serialize, Deserialize)]
 | 
			
		||||
pub struct VeilidConfigRoutingTable {
 | 
			
		||||
    pub limit_over_attached: u32,
 | 
			
		||||
    pub limit_fully_attached: u32,
 | 
			
		||||
    pub limit_attached_strong: u32,
 | 
			
		||||
    pub limit_attached_good: u32,
 | 
			
		||||
    pub limit_attached_weak: u32,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Default, Clone, Serialize, Deserialize)]
 | 
			
		||||
pub struct VeilidConfigNetwork {
 | 
			
		||||
@@ -132,6 +141,7 @@ pub struct VeilidConfigNetwork {
 | 
			
		||||
    pub node_id: key::DHTKey,
 | 
			
		||||
    pub node_id_secret: key::DHTKeySecret,
 | 
			
		||||
    pub bootstrap: Vec<String>,
 | 
			
		||||
    pub routing_table: VeilidConfigRoutingTable,
 | 
			
		||||
    pub rpc: VeilidConfigRPC,
 | 
			
		||||
    pub dht: VeilidConfigDHT,
 | 
			
		||||
    pub upnp: bool,
 | 
			
		||||
@@ -285,6 +295,11 @@ impl VeilidConfig {
 | 
			
		||||
            get_config!(inner.network.connection_initial_timeout_ms);
 | 
			
		||||
            get_config!(inner.network.connection_inactivity_timeout_ms);
 | 
			
		||||
            get_config!(inner.network.bootstrap);
 | 
			
		||||
            get_config!(inner.network.routing_table.limit_over_attached);
 | 
			
		||||
            get_config!(inner.network.routing_table.limit_fully_attached);
 | 
			
		||||
            get_config!(inner.network.routing_table.limit_attached_strong);
 | 
			
		||||
            get_config!(inner.network.routing_table.limit_attached_good);
 | 
			
		||||
            get_config!(inner.network.routing_table.limit_attached_weak);
 | 
			
		||||
            get_config!(inner.network.dht.resolve_node_timeout_ms);
 | 
			
		||||
            get_config!(inner.network.dht.resolve_node_count);
 | 
			
		||||
            get_config!(inner.network.dht.resolve_node_fanout);
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user