instrumentation and network fixes
This commit is contained in:
		@@ -610,12 +610,15 @@ impl NetworkManager {
 | 
			
		||||
            let c = self.config.get();
 | 
			
		||||
            c.network.dht.min_peer_count as usize
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        // If none, then add the bootstrap nodes to it
 | 
			
		||||
        if live_public_internet_entry_count == 0 {
 | 
			
		||||
            self.unlocked_inner.bootstrap_task.tick().await?;
 | 
			
		||||
        }
 | 
			
		||||
        // If we still don't have enough peers, find nodes until we do
 | 
			
		||||
        else if live_public_internet_entry_count < min_peer_count {
 | 
			
		||||
        else if !self.unlocked_inner.bootstrap_task.is_running()
 | 
			
		||||
            && live_public_internet_entry_count < min_peer_count
 | 
			
		||||
        {
 | 
			
		||||
            self.unlocked_inner.peer_minimum_refresh_task.tick().await?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -382,17 +382,6 @@ impl DiscoveryContext {
 | 
			
		||||
    // If we know we are behind NAT check what kind
 | 
			
		||||
    #[instrument(level = "trace", skip(self), ret, err)]
 | 
			
		||||
    pub async fn protocol_process_nat(&self) -> EyreResult<bool> {
 | 
			
		||||
        let (node_1, external_1_dial_info, external_1_address, protocol_type, address_type) = {
 | 
			
		||||
            let inner = self.inner.lock();
 | 
			
		||||
            (
 | 
			
		||||
                inner.node_1.as_ref().unwrap().clone(),
 | 
			
		||||
                inner.external_1_dial_info.as_ref().unwrap().clone(),
 | 
			
		||||
                inner.external_1_address.unwrap(),
 | 
			
		||||
                inner.protocol_type.unwrap(),
 | 
			
		||||
                inner.address_type.unwrap(),
 | 
			
		||||
            )
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        // Attempt a port mapping via all available and enabled mechanisms
 | 
			
		||||
        // Try this before the direct mapping in the event that we are restarting
 | 
			
		||||
        // and may not have recorded a mapping created the last time
 | 
			
		||||
@@ -404,8 +393,30 @@ impl DiscoveryContext {
 | 
			
		||||
            // No more retries
 | 
			
		||||
            return Ok(true);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // XXX: is this necessary?
 | 
			
		||||
        // Redo our external_1 dial info detection because a failed port mapping attempt
 | 
			
		||||
        // may cause it to become invalid
 | 
			
		||||
        // Get our external address from some fast node, call it node 1
 | 
			
		||||
        // if !self.protocol_get_external_address_1().await {
 | 
			
		||||
        //     // If we couldn't get an external address, then we should just try the whole network class detection again later
 | 
			
		||||
        //     return Ok(false);
 | 
			
		||||
        // }
 | 
			
		||||
 | 
			
		||||
        // Get the external dial info for our use here
 | 
			
		||||
        let (node_1, external_1_dial_info, external_1_address, protocol_type, address_type) = {
 | 
			
		||||
            let inner = self.inner.lock();
 | 
			
		||||
            (
 | 
			
		||||
                inner.node_1.as_ref().unwrap().clone(),
 | 
			
		||||
                inner.external_1_dial_info.as_ref().unwrap().clone(),
 | 
			
		||||
                inner.external_1_address.unwrap(),
 | 
			
		||||
                inner.protocol_type.unwrap(),
 | 
			
		||||
                inner.address_type.unwrap(),
 | 
			
		||||
            )
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        // Do a validate_dial_info on the external address from a redirected node
 | 
			
		||||
        else if self
 | 
			
		||||
        if self
 | 
			
		||||
            .validate_dial_info(node_1.clone(), external_1_dial_info.clone(), true)
 | 
			
		||||
            .await
 | 
			
		||||
        {
 | 
			
		||||
 
 | 
			
		||||
@@ -54,7 +54,7 @@ impl Network {
 | 
			
		||||
 | 
			
		||||
                        loop {
 | 
			
		||||
                            match ph
 | 
			
		||||
                                .recv_message(&mut data).instrument(Span::current())
 | 
			
		||||
                                .recv_message(&mut data)
 | 
			
		||||
                                .timeout_at(stop_token.clone())
 | 
			
		||||
                                .await
 | 
			
		||||
                            {
 | 
			
		||||
@@ -84,7 +84,7 @@ impl Network {
 | 
			
		||||
                                }
 | 
			
		||||
                            }
 | 
			
		||||
                        }
 | 
			
		||||
                    }.instrument(Span::current());
 | 
			
		||||
                    };
 | 
			
		||||
 | 
			
		||||
                    protocol_handlers_unordered.push(ph_future);
 | 
			
		||||
                }
 | 
			
		||||
 
 | 
			
		||||
@@ -87,11 +87,11 @@ impl RawTcpNetworkConnection {
 | 
			
		||||
        Ok(NetworkResult::Value(out))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[instrument(level = "trace", err, skip(self), fields(network_result))]
 | 
			
		||||
    // #[instrument(level = "trace", err, skip(self), fields(network_result))]
 | 
			
		||||
    pub async fn recv(&self) -> io::Result<NetworkResult<Vec<u8>>> {
 | 
			
		||||
        let mut stream = self.stream.clone();
 | 
			
		||||
        let out = Self::recv_internal(&mut stream).await?;
 | 
			
		||||
        tracing::Span::current().record("network_result", &tracing::field::display(&out));
 | 
			
		||||
        //tracing::Span::current().record("network_result", &tracing::field::display(&out));
 | 
			
		||||
        Ok(out)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -11,7 +11,7 @@ impl RawUdpProtocolHandler {
 | 
			
		||||
        Self { socket }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.len, ret.from))]
 | 
			
		||||
    // #[instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.len, ret.descriptor))]
 | 
			
		||||
    pub async fn recv_message(&self, data: &mut [u8]) -> io::Result<(usize, ConnectionDescriptor)> {
 | 
			
		||||
        let (size, descriptor) = loop {
 | 
			
		||||
            let (size, remote_addr) = network_result_value_or_log!(debug self.socket.recv_from(data).await.into_network_result()? => continue);
 | 
			
		||||
@@ -33,12 +33,12 @@ impl RawUdpProtocolHandler {
 | 
			
		||||
            break (size, descriptor);
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        tracing::Span::current().record("ret.len", &size);
 | 
			
		||||
        tracing::Span::current().record("ret.from", &format!("{:?}", descriptor).as_str());
 | 
			
		||||
        // tracing::Span::current().record("ret.len", &size);
 | 
			
		||||
        // tracing::Span::current().record("ret.descriptor", &format!("{:?}", descriptor).as_str());
 | 
			
		||||
        Ok((size, descriptor))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.len, ret.from))]
 | 
			
		||||
    #[instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.len, ret.descriptor))]
 | 
			
		||||
    pub async fn send_message(
 | 
			
		||||
        &self,
 | 
			
		||||
        data: Vec<u8>,
 | 
			
		||||
@@ -67,6 +67,8 @@ impl RawUdpProtocolHandler {
 | 
			
		||||
            bail_io_error_other!("UDP partial send")
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        tracing::Span::current().record("ret.len", &len);
 | 
			
		||||
        tracing::Span::current().record("ret.descriptor", &format!("{:?}", descriptor).as_str());
 | 
			
		||||
        Ok(NetworkResult::value(descriptor))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -93,7 +93,7 @@ where
 | 
			
		||||
        Ok(out)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[instrument(level = "trace", err, skip(self), fields(network_result, ret.len))]
 | 
			
		||||
    // #[instrument(level = "trace", err, skip(self), fields(network_result, ret.len))]
 | 
			
		||||
    pub async fn recv(&self) -> io::Result<NetworkResult<Vec<u8>>> {
 | 
			
		||||
        let out = match self.stream.clone().next().await {
 | 
			
		||||
            Some(Ok(Message::Binary(v))) => {
 | 
			
		||||
@@ -120,7 +120,7 @@ where
 | 
			
		||||
            )),
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        tracing::Span::current().record("network_result", &tracing::field::display(&out));
 | 
			
		||||
        // tracing::Span::current().record("network_result", &tracing::field::display(&out));
 | 
			
		||||
        Ok(out)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -75,7 +75,7 @@ impl WebsocketNetworkConnection {
 | 
			
		||||
        Ok(out)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[instrument(level = "trace", err, skip(self), fields(network_result, ret.len))]
 | 
			
		||||
    // #[instrument(level = "trace", err, skip(self), fields(network_result, ret.len))]
 | 
			
		||||
    pub async fn recv(&self) -> io::Result<NetworkResult<Vec<u8>>> {
 | 
			
		||||
        let out = match SendWrapper::new(self.inner.ws_stream.clone().next()).await {
 | 
			
		||||
            Some(WsMessage::Binary(v)) => {
 | 
			
		||||
@@ -95,7 +95,7 @@ impl WebsocketNetworkConnection {
 | 
			
		||||
                bail_io_error_other!("WS stream closed");
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
        tracing::Span::current().record("network_result", &tracing::field::display(&out));
 | 
			
		||||
        // tracing::Span::current().record("network_result", &tracing::field::display(&out));
 | 
			
		||||
        Ok(out)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -90,7 +90,7 @@ impl NodeRef {
 | 
			
		||||
            node_id,
 | 
			
		||||
            entry,
 | 
			
		||||
            filter,
 | 
			
		||||
            reliable: true,
 | 
			
		||||
            reliable: false,
 | 
			
		||||
            #[cfg(feature = "tracking")]
 | 
			
		||||
            track_id: entry.track(),
 | 
			
		||||
        }
 | 
			
		||||
 
 | 
			
		||||
@@ -63,7 +63,7 @@ where
 | 
			
		||||
        inner.join_handle = jh;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Check the result
 | 
			
		||||
    /// Check the result and take it if there is one
 | 
			
		||||
    pub async fn check(&self) -> Result<Option<T>, ()> {
 | 
			
		||||
        let mut out: Option<T> = None;
 | 
			
		||||
 | 
			
		||||
@@ -96,7 +96,7 @@ where
 | 
			
		||||
        Ok(out)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Wait for the result
 | 
			
		||||
    /// Wait for the result and take it
 | 
			
		||||
    pub async fn join(&self) -> Result<Option<T>, ()> {
 | 
			
		||||
        let mut out: Option<T> = None;
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -15,6 +15,7 @@ pub struct TickTask<E: Send + 'static> {
 | 
			
		||||
    routine: OnceCell<Box<TickTaskRoutine<E>>>,
 | 
			
		||||
    stop_source: AsyncMutex<Option<StopSource>>,
 | 
			
		||||
    single_future: MustJoinSingleFuture<Result<(), E>>,
 | 
			
		||||
    running: Arc<AtomicBool>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<E: Send + 'static> TickTask<E> {
 | 
			
		||||
@@ -25,6 +26,7 @@ impl<E: Send + 'static> TickTask<E> {
 | 
			
		||||
            routine: OnceCell::new(),
 | 
			
		||||
            stop_source: AsyncMutex::new(None),
 | 
			
		||||
            single_future: MustJoinSingleFuture::new(),
 | 
			
		||||
            running: Arc::new(AtomicBool::new(false)),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    pub fn new_ms(tick_period_ms: u32) -> Self {
 | 
			
		||||
@@ -34,6 +36,7 @@ impl<E: Send + 'static> TickTask<E> {
 | 
			
		||||
            routine: OnceCell::new(),
 | 
			
		||||
            stop_source: AsyncMutex::new(None),
 | 
			
		||||
            single_future: MustJoinSingleFuture::new(),
 | 
			
		||||
            running: Arc::new(AtomicBool::new(false)),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    pub fn new(tick_period_sec: u32) -> Self {
 | 
			
		||||
@@ -43,6 +46,7 @@ impl<E: Send + 'static> TickTask<E> {
 | 
			
		||||
            routine: OnceCell::new(),
 | 
			
		||||
            stop_source: AsyncMutex::new(None),
 | 
			
		||||
            single_future: MustJoinSingleFuture::new(),
 | 
			
		||||
            running: Arc::new(AtomicBool::new(false)),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -53,6 +57,10 @@ impl<E: Send + 'static> TickTask<E> {
 | 
			
		||||
        self.routine.set(Box::new(routine)).map_err(drop).unwrap();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn is_running(&self) -> bool {
 | 
			
		||||
        self.running.load(core::sync::atomic::Ordering::Relaxed)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn stop(&self) -> Result<(), E> {
 | 
			
		||||
        // drop the stop source if we have one
 | 
			
		||||
        let opt_stop_source = &mut *self.stop_source.lock().await;
 | 
			
		||||
@@ -107,15 +115,16 @@ impl<E: Send + 'static> TickTask<E> {
 | 
			
		||||
 | 
			
		||||
        // Run the singlefuture
 | 
			
		||||
        let stop_source = StopSource::new();
 | 
			
		||||
        match self
 | 
			
		||||
            .single_future
 | 
			
		||||
            .single_spawn(self.routine.get().unwrap()(
 | 
			
		||||
                stop_source.token(),
 | 
			
		||||
                last_timestamp_us,
 | 
			
		||||
                now,
 | 
			
		||||
            ))
 | 
			
		||||
            .await
 | 
			
		||||
        {
 | 
			
		||||
        let stop_token = stop_source.token();
 | 
			
		||||
        let running = self.running.clone();
 | 
			
		||||
        let routine = self.routine.get().unwrap()(stop_token, last_timestamp_us, now);
 | 
			
		||||
        let wrapped_routine = Box::pin(async move {
 | 
			
		||||
            running.store(true, core::sync::atomic::Ordering::Relaxed);
 | 
			
		||||
            let out = routine.await;
 | 
			
		||||
            running.store(false, core::sync::atomic::Ordering::Relaxed);
 | 
			
		||||
            out
 | 
			
		||||
        });
 | 
			
		||||
        match self.single_future.single_spawn(wrapped_routine).await {
 | 
			
		||||
            // We should have already consumed the result of the last run, or there was none
 | 
			
		||||
            // and we should definitely have run, because the prior 'check()' operation
 | 
			
		||||
            // should have ensured the singlefuture was ready to run
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user