diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 80301d6c..4e3d0593 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -610,12 +610,15 @@ impl NetworkManager { let c = self.config.get(); c.network.dht.min_peer_count as usize }; + // If none, then add the bootstrap nodes to it if live_public_internet_entry_count == 0 { self.unlocked_inner.bootstrap_task.tick().await?; } // If we still don't have enough peers, find nodes until we do - else if live_public_internet_entry_count < min_peer_count { + else if !self.unlocked_inner.bootstrap_task.is_running() + && live_public_internet_entry_count < min_peer_count + { self.unlocked_inner.peer_minimum_refresh_task.tick().await?; } diff --git a/veilid-core/src/network_manager/native/network_class_discovery.rs b/veilid-core/src/network_manager/native/network_class_discovery.rs index ed95e0d1..b53fefad 100644 --- a/veilid-core/src/network_manager/native/network_class_discovery.rs +++ b/veilid-core/src/network_manager/native/network_class_discovery.rs @@ -382,17 +382,6 @@ impl DiscoveryContext { // If we know we are behind NAT check what kind #[instrument(level = "trace", skip(self), ret, err)] pub async fn protocol_process_nat(&self) -> EyreResult { - let (node_1, external_1_dial_info, external_1_address, protocol_type, address_type) = { - let inner = self.inner.lock(); - ( - inner.node_1.as_ref().unwrap().clone(), - inner.external_1_dial_info.as_ref().unwrap().clone(), - inner.external_1_address.unwrap(), - inner.protocol_type.unwrap(), - inner.address_type.unwrap(), - ) - }; - // Attempt a port mapping via all available and enabled mechanisms // Try this before the direct mapping in the event that we are restarting // and may not have recorded a mapping created the last time @@ -404,8 +393,30 @@ impl DiscoveryContext { // No more retries return Ok(true); } + + // XXX: is this necessary? + // Redo our external_1 dial info detection because a failed port mapping attempt + // may cause it to become invalid + // Get our external address from some fast node, call it node 1 + // if !self.protocol_get_external_address_1().await { + // // If we couldn't get an external address, then we should just try the whole network class detection again later + // return Ok(false); + // } + + // Get the external dial info for our use here + let (node_1, external_1_dial_info, external_1_address, protocol_type, address_type) = { + let inner = self.inner.lock(); + ( + inner.node_1.as_ref().unwrap().clone(), + inner.external_1_dial_info.as_ref().unwrap().clone(), + inner.external_1_address.unwrap(), + inner.protocol_type.unwrap(), + inner.address_type.unwrap(), + ) + }; + // Do a validate_dial_info on the external address from a redirected node - else if self + if self .validate_dial_info(node_1.clone(), external_1_dial_info.clone(), true) .await { diff --git a/veilid-core/src/network_manager/native/network_udp.rs b/veilid-core/src/network_manager/native/network_udp.rs index 0d906f16..b00bf643 100644 --- a/veilid-core/src/network_manager/native/network_udp.rs +++ b/veilid-core/src/network_manager/native/network_udp.rs @@ -54,7 +54,7 @@ impl Network { loop { match ph - .recv_message(&mut data).instrument(Span::current()) + .recv_message(&mut data) .timeout_at(stop_token.clone()) .await { @@ -84,7 +84,7 @@ impl Network { } } } - }.instrument(Span::current()); + }; protocol_handlers_unordered.push(ph_future); } diff --git a/veilid-core/src/network_manager/native/protocol/tcp.rs b/veilid-core/src/network_manager/native/protocol/tcp.rs index 91167b69..847c8608 100644 --- a/veilid-core/src/network_manager/native/protocol/tcp.rs +++ b/veilid-core/src/network_manager/native/protocol/tcp.rs @@ -87,11 +87,11 @@ impl RawTcpNetworkConnection { Ok(NetworkResult::Value(out)) } - #[instrument(level = "trace", err, skip(self), fields(network_result))] + // #[instrument(level = "trace", err, skip(self), fields(network_result))] pub async fn recv(&self) -> io::Result>> { let mut stream = self.stream.clone(); let out = Self::recv_internal(&mut stream).await?; - tracing::Span::current().record("network_result", &tracing::field::display(&out)); + //tracing::Span::current().record("network_result", &tracing::field::display(&out)); Ok(out) } } diff --git a/veilid-core/src/network_manager/native/protocol/udp.rs b/veilid-core/src/network_manager/native/protocol/udp.rs index 8dc39b2e..6af66b1d 100644 --- a/veilid-core/src/network_manager/native/protocol/udp.rs +++ b/veilid-core/src/network_manager/native/protocol/udp.rs @@ -11,7 +11,7 @@ impl RawUdpProtocolHandler { Self { socket } } - #[instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.len, ret.from))] + // #[instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.len, ret.descriptor))] pub async fn recv_message(&self, data: &mut [u8]) -> io::Result<(usize, ConnectionDescriptor)> { let (size, descriptor) = loop { let (size, remote_addr) = network_result_value_or_log!(debug self.socket.recv_from(data).await.into_network_result()? => continue); @@ -33,12 +33,12 @@ impl RawUdpProtocolHandler { break (size, descriptor); }; - tracing::Span::current().record("ret.len", &size); - tracing::Span::current().record("ret.from", &format!("{:?}", descriptor).as_str()); + // tracing::Span::current().record("ret.len", &size); + // tracing::Span::current().record("ret.descriptor", &format!("{:?}", descriptor).as_str()); Ok((size, descriptor)) } - #[instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.len, ret.from))] + #[instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.len, ret.descriptor))] pub async fn send_message( &self, data: Vec, @@ -67,6 +67,8 @@ impl RawUdpProtocolHandler { bail_io_error_other!("UDP partial send") } + tracing::Span::current().record("ret.len", &len); + tracing::Span::current().record("ret.descriptor", &format!("{:?}", descriptor).as_str()); Ok(NetworkResult::value(descriptor)) } diff --git a/veilid-core/src/network_manager/native/protocol/ws.rs b/veilid-core/src/network_manager/native/protocol/ws.rs index 520cd344..89e3c46d 100644 --- a/veilid-core/src/network_manager/native/protocol/ws.rs +++ b/veilid-core/src/network_manager/native/protocol/ws.rs @@ -93,7 +93,7 @@ where Ok(out) } - #[instrument(level = "trace", err, skip(self), fields(network_result, ret.len))] + // #[instrument(level = "trace", err, skip(self), fields(network_result, ret.len))] pub async fn recv(&self) -> io::Result>> { let out = match self.stream.clone().next().await { Some(Ok(Message::Binary(v))) => { @@ -120,7 +120,7 @@ where )), }; - tracing::Span::current().record("network_result", &tracing::field::display(&out)); + // tracing::Span::current().record("network_result", &tracing::field::display(&out)); Ok(out) } } diff --git a/veilid-core/src/network_manager/wasm/protocol/ws.rs b/veilid-core/src/network_manager/wasm/protocol/ws.rs index 1b48c59d..f9adebb8 100644 --- a/veilid-core/src/network_manager/wasm/protocol/ws.rs +++ b/veilid-core/src/network_manager/wasm/protocol/ws.rs @@ -75,7 +75,7 @@ impl WebsocketNetworkConnection { Ok(out) } - #[instrument(level = "trace", err, skip(self), fields(network_result, ret.len))] + // #[instrument(level = "trace", err, skip(self), fields(network_result, ret.len))] pub async fn recv(&self) -> io::Result>> { let out = match SendWrapper::new(self.inner.ws_stream.clone().next()).await { Some(WsMessage::Binary(v)) => { @@ -95,7 +95,7 @@ impl WebsocketNetworkConnection { bail_io_error_other!("WS stream closed"); } }; - tracing::Span::current().record("network_result", &tracing::field::display(&out)); + // tracing::Span::current().record("network_result", &tracing::field::display(&out)); Ok(out) } } diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 96b90393..d686e84d 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -90,7 +90,7 @@ impl NodeRef { node_id, entry, filter, - reliable: true, + reliable: false, #[cfg(feature = "tracking")] track_id: entry.track(), } diff --git a/veilid-core/src/xx/must_join_single_future.rs b/veilid-core/src/xx/must_join_single_future.rs index 8830c1d3..ffebeaae 100644 --- a/veilid-core/src/xx/must_join_single_future.rs +++ b/veilid-core/src/xx/must_join_single_future.rs @@ -63,7 +63,7 @@ where inner.join_handle = jh; } - // Check the result + /// Check the result and take it if there is one pub async fn check(&self) -> Result, ()> { let mut out: Option = None; @@ -96,7 +96,7 @@ where Ok(out) } - // Wait for the result + /// Wait for the result and take it pub async fn join(&self) -> Result, ()> { let mut out: Option = None; diff --git a/veilid-core/src/xx/tick_task.rs b/veilid-core/src/xx/tick_task.rs index 9ac821d1..56ea442e 100644 --- a/veilid-core/src/xx/tick_task.rs +++ b/veilid-core/src/xx/tick_task.rs @@ -15,6 +15,7 @@ pub struct TickTask { routine: OnceCell>>, stop_source: AsyncMutex>, single_future: MustJoinSingleFuture>, + running: Arc, } impl TickTask { @@ -25,6 +26,7 @@ impl TickTask { routine: OnceCell::new(), stop_source: AsyncMutex::new(None), single_future: MustJoinSingleFuture::new(), + running: Arc::new(AtomicBool::new(false)), } } pub fn new_ms(tick_period_ms: u32) -> Self { @@ -34,6 +36,7 @@ impl TickTask { routine: OnceCell::new(), stop_source: AsyncMutex::new(None), single_future: MustJoinSingleFuture::new(), + running: Arc::new(AtomicBool::new(false)), } } pub fn new(tick_period_sec: u32) -> Self { @@ -43,6 +46,7 @@ impl TickTask { routine: OnceCell::new(), stop_source: AsyncMutex::new(None), single_future: MustJoinSingleFuture::new(), + running: Arc::new(AtomicBool::new(false)), } } @@ -53,6 +57,10 @@ impl TickTask { self.routine.set(Box::new(routine)).map_err(drop).unwrap(); } + pub fn is_running(&self) -> bool { + self.running.load(core::sync::atomic::Ordering::Relaxed) + } + pub async fn stop(&self) -> Result<(), E> { // drop the stop source if we have one let opt_stop_source = &mut *self.stop_source.lock().await; @@ -107,15 +115,16 @@ impl TickTask { // Run the singlefuture let stop_source = StopSource::new(); - match self - .single_future - .single_spawn(self.routine.get().unwrap()( - stop_source.token(), - last_timestamp_us, - now, - )) - .await - { + let stop_token = stop_source.token(); + let running = self.running.clone(); + let routine = self.routine.get().unwrap()(stop_token, last_timestamp_us, now); + let wrapped_routine = Box::pin(async move { + running.store(true, core::sync::atomic::Ordering::Relaxed); + let out = routine.await; + running.store(false, core::sync::atomic::Ordering::Relaxed); + out + }); + match self.single_future.single_spawn(wrapped_routine).await { // We should have already consumed the result of the last run, or there was none // and we should definitely have run, because the prior 'check()' operation // should have ensured the singlefuture was ready to run