network fixes
This commit is contained in:
		@@ -261,7 +261,7 @@ impl BucketEntryInner {
 | 
			
		||||
        // See if we have an existing signed_node_info to update or not
 | 
			
		||||
        let mut node_info_changed = false;
 | 
			
		||||
        if let Some(current_sni) = opt_current_sni {
 | 
			
		||||
            // Always allow overwriting invalid/unsigned node
 | 
			
		||||
            // Always allow overwriting unsigned node (bootstrap)
 | 
			
		||||
            if current_sni.has_any_signature() {
 | 
			
		||||
                // If the timestamp hasn't changed or is less, ignore this update
 | 
			
		||||
                if signed_node_info.timestamp() <= current_sni.timestamp() {
 | 
			
		||||
@@ -424,7 +424,7 @@ impl BucketEntryInner {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
     // Removes a connection descriptor in this entry's table of last connections
 | 
			
		||||
     pub fn clear_last_connection(&mut self, last_connection: ConnectionDescriptor) {
 | 
			
		||||
    pub fn clear_last_connection(&mut self, last_connection: ConnectionDescriptor) {
 | 
			
		||||
        let key = self.descriptor_to_key(last_connection);
 | 
			
		||||
        self.last_connections
 | 
			
		||||
            .remove(&key);
 | 
			
		||||
 
 | 
			
		||||
@@ -208,7 +208,7 @@ impl RoutingTable {
 | 
			
		||||
            rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS),
 | 
			
		||||
            kick_buckets_task: TickTask::new(1),
 | 
			
		||||
            bootstrap_task: TickTask::new(1),
 | 
			
		||||
            peer_minimum_refresh_task: TickTask::new_ms(c.network.dht.min_peer_refresh_time_ms),
 | 
			
		||||
            peer_minimum_refresh_task: TickTask::new(1),
 | 
			
		||||
            ping_validator_task: TickTask::new(1),
 | 
			
		||||
            relay_management_task: TickTask::new(RELAY_MANAGEMENT_INTERVAL_SECS),
 | 
			
		||||
            private_route_management_task: TickTask::new(PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS),
 | 
			
		||||
 
 | 
			
		||||
@@ -19,12 +19,18 @@ impl RoutingTable {
 | 
			
		||||
        // Get counts by crypto kind
 | 
			
		||||
        let entry_count = self.inner.read().cached_entry_counts();
 | 
			
		||||
 | 
			
		||||
        let min_peer_count = self.with_config(|c| c.network.dht.min_peer_count as usize);
 | 
			
		||||
        let (min_peer_count, min_peer_refresh_time_ms) = self.with_config(|c| {
 | 
			
		||||
            (
 | 
			
		||||
                c.network.dht.min_peer_count as usize,
 | 
			
		||||
                c.network.dht.min_peer_refresh_time_ms,
 | 
			
		||||
            )
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        // For the PublicInternet routing domain, get list of all peers we know about
 | 
			
		||||
        // even the unreliable ones, and ask them to find nodes close to our node too
 | 
			
		||||
 | 
			
		||||
        let mut ord = FuturesOrdered::new();
 | 
			
		||||
        let cur_ts = get_timestamp();
 | 
			
		||||
 | 
			
		||||
        for crypto_kind in VALID_CRYPTO_KINDS {
 | 
			
		||||
            // Do we need to peer minimum refresh this crypto kind?
 | 
			
		||||
@@ -37,16 +43,26 @@ impl RoutingTable {
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            let routing_table = self.clone();
 | 
			
		||||
 | 
			
		||||
            let mut filters = VecDeque::new();
 | 
			
		||||
            let filter = Box::new(
 | 
			
		||||
                move |_rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
 | 
			
		||||
                    // Keep only the entries that contain the crypto kind we're looking for
 | 
			
		||||
                    if let Some(entry) = opt_entry {
 | 
			
		||||
                        entry.with_inner(|e| e.crypto_kinds().contains(&crypto_kind))
 | 
			
		||||
                    } else {
 | 
			
		||||
                        VALID_CRYPTO_KINDS.contains(&crypto_kind)
 | 
			
		||||
                    }
 | 
			
		||||
                move |rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
 | 
			
		||||
                    let entry = opt_entry.unwrap().clone();
 | 
			
		||||
                    entry.with(rti, |_rti, e| {
 | 
			
		||||
                        // Keep only the entries that contain the crypto kind we're looking for
 | 
			
		||||
                        let compatible_crypto = e.crypto_kinds().contains(&crypto_kind);
 | 
			
		||||
                        if !compatible_crypto {
 | 
			
		||||
                            return false;
 | 
			
		||||
                        }
 | 
			
		||||
                        // Keep only the entries we haven't talked to in the min_peer_refresh_time
 | 
			
		||||
                        if let Some(last_q_ts) = e.peer_stats().rpc_stats.last_question_ts {
 | 
			
		||||
                            if cur_ts.saturating_sub(last_q_ts.as_u64())
 | 
			
		||||
                                < (min_peer_refresh_time_ms as u64 * 1_000u64)
 | 
			
		||||
                            {
 | 
			
		||||
                                return false;
 | 
			
		||||
                            }
 | 
			
		||||
                        }
 | 
			
		||||
                        true
 | 
			
		||||
                    })
 | 
			
		||||
                },
 | 
			
		||||
            ) as RoutingTableEntryFilter;
 | 
			
		||||
            filters.push_front(filter);
 | 
			
		||||
 
 | 
			
		||||
@@ -324,10 +324,13 @@ impl RPCProcessor {
 | 
			
		||||
        let timeout_us = TimestampDuration::new(ms_to_us(c.network.rpc.timeout_ms));
 | 
			
		||||
        let max_route_hop_count = c.network.rpc.max_route_hop_count as usize;
 | 
			
		||||
        if concurrency == 0 {
 | 
			
		||||
            concurrency = get_concurrency() / 2;
 | 
			
		||||
            concurrency = get_concurrency();
 | 
			
		||||
            if concurrency == 0 {
 | 
			
		||||
                concurrency = 1;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            // Default RPC concurrency is the number of CPUs * 16 rpc workers per core, as a single worker takes about 1% CPU when relaying and 16% is reasonable for baseline plus relay
 | 
			
		||||
            concurrency *= 16;
 | 
			
		||||
        }
 | 
			
		||||
        let validate_dial_info_receipt_time_ms = c.network.dht.validate_dial_info_receipt_time_ms;
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -217,7 +217,7 @@ fn config_callback(key: String) -> ConfigCallbackReturn {
 | 
			
		||||
        "network.dht.set_value_count" => Ok(Box::new(5u32)),
 | 
			
		||||
        "network.dht.set_value_fanout" => Ok(Box::new(4u32)),
 | 
			
		||||
        "network.dht.min_peer_count" => Ok(Box::new(20u32)),
 | 
			
		||||
        "network.dht.min_peer_refresh_time_ms" => Ok(Box::new(2_000u32)),
 | 
			
		||||
        "network.dht.min_peer_refresh_time_ms" => Ok(Box::new(60_000u32)),
 | 
			
		||||
        "network.dht.validate_dial_info_receipt_time_ms" => Ok(Box::new(5_000u32)),
 | 
			
		||||
        "network.dht.local_subkey_cache_size" => Ok(Box::new(128u32)),
 | 
			
		||||
        "network.dht.local_max_subkey_cache_memory_mb" => Ok(Box::new(256u32)),
 | 
			
		||||
@@ -345,7 +345,7 @@ pub async fn test_config() {
 | 
			
		||||
    assert_eq!(inner.network.dht.set_value_count, 5u32);
 | 
			
		||||
    assert_eq!(inner.network.dht.set_value_fanout, 4u32);
 | 
			
		||||
    assert_eq!(inner.network.dht.min_peer_count, 20u32);
 | 
			
		||||
    assert_eq!(inner.network.dht.min_peer_refresh_time_ms, 2_000u32);
 | 
			
		||||
    assert_eq!(inner.network.dht.min_peer_refresh_time_ms, 60_000u32);
 | 
			
		||||
    assert_eq!(
 | 
			
		||||
        inner.network.dht.validate_dial_info_receipt_time_ms,
 | 
			
		||||
        5_000u32
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user