From a475028c75c452b8e7478b8ced21d56d5cffab8d Mon Sep 17 00:00:00 2001 From: John Smith Date: Sat, 25 Jun 2022 15:28:27 -0400 Subject: [PATCH] add direct bootstrap fallback --- veilid-core/src/network_manager/mod.rs | 20 +++++ veilid-core/src/network_manager/native/mod.rs | 44 +++++++++++ .../network_manager/native/protocol/mod.rs | 31 ++++++++ .../network_manager/native/protocol/tcp.rs | 67 ++++++++++++++--- .../network_manager/native/protocol/udp.rs | 58 +++++++++++++++ .../src/network_manager/native/protocol/ws.rs | 34 +++++++-- veilid-core/src/network_manager/wasm/mod.rs | 40 ++++++++++ .../src/network_manager/wasm/protocol/mod.rs | 27 ++++++- .../src/network_manager/wasm/protocol/ws.rs | 40 ++++++++-- veilid-core/src/routing_table/find_nodes.rs | 4 +- veilid-core/src/routing_table/mod.rs | 73 +++---------------- veilid-core/src/routing_table/tasks.rs | 69 ++++++++++++++++-- 12 files changed, 405 insertions(+), 102 deletions(-) diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 3c7837c8..31ac6a5f 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -1091,6 +1091,26 @@ impl NetworkManager { } } + // Direct bootstrap request + pub async fn boot_request(&self, dial_info: DialInfo) -> Result, String> { + let timeout_ms = { + let c = self.config.get(); + c.network.rpc.timeout_ms + }; + // Send boot magic to requested peer address + let data = BOOT_MAGIC.to_vec(); + let out_data: Vec = self + .net() + .send_recv_data_unbound_to_dial_info(dial_info, data, timeout_ms) + .await?; + + let bootstrap_peerinfo: Vec = + deserialize_json(std::str::from_utf8(&out_data).map_err(map_to_string)?) + .map_err(map_to_string)?; + + Ok(bootstrap_peerinfo) + } + // Called when a packet potentially containing an RPC envelope is received by a low-level // network protocol handler. Processes the envelope, authenticates and decrypts the RPC message // and passes it to the RPC handler diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index 40f7ea0e..803401f1 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -297,6 +297,50 @@ impl Network { res } + // Send data to a dial info, unbound, using a new connection from a random port + // Waits for a specified amount of time to receive a single response + // This creates a short-lived connection in the case of connection-oriented protocols + // for the purpose of sending this one message. + // This bypasses the connection table as it is not a 'node to node' connection. + #[instrument(level="trace", err, skip(self, data), fields(data.len = data.len(), ret.len))] + pub async fn send_recv_data_unbound_to_dial_info( + &self, + dial_info: DialInfo, + data: Vec, + timeout_ms: u32, + ) -> Result, String> { + let data_len = data.len(); + let out = match dial_info.protocol_type() { + ProtocolType::UDP => { + let peer_socket_addr = dial_info.to_socket_addr(); + RawUdpProtocolHandler::send_recv_unbound_message(peer_socket_addr, data, timeout_ms) + .await? + } + ProtocolType::TCP => { + let peer_socket_addr = dial_info.to_socket_addr(); + RawTcpProtocolHandler::send_recv_unbound_message(peer_socket_addr, data, timeout_ms) + .await? + } + ProtocolType::WS | ProtocolType::WSS => { + WebsocketProtocolHandler::send_recv_unbound_message( + dial_info.clone(), + data, + timeout_ms, + ) + .await? + } + }; + + // Network accounting + self.network_manager() + .stats_packet_sent(dial_info.to_ip_addr(), data_len as u64); + self.network_manager() + .stats_packet_rcvd(dial_info.to_ip_addr(), out.len() as u64); + + tracing::Span::current().record("ret.len", &out.len()); + Ok(out) + } + #[instrument(level="trace", err, skip(self, data), fields(data.len = data.len()))] pub async fn send_data_to_existing_connection( &self, diff --git a/veilid-core/src/network_manager/native/protocol/mod.rs b/veilid-core/src/network_manager/native/protocol/mod.rs index e2e071cf..e9ad74a6 100644 --- a/veilid-core/src/network_manager/native/protocol/mod.rs +++ b/veilid-core/src/network_manager/native/protocol/mod.rs @@ -51,6 +51,37 @@ impl ProtocolNetworkConnection { } } + pub async fn send_recv_unbound_message( + dial_info: DialInfo, + data: Vec, + timeout_ms: u32, + ) -> Result, String> { + match dial_info.protocol_type() { + ProtocolType::UDP => { + let peer_socket_addr = dial_info.to_socket_addr(); + udp::RawUdpProtocolHandler::send_recv_unbound_message( + peer_socket_addr, + data, + timeout_ms, + ) + .await + } + ProtocolType::TCP => { + let peer_socket_addr = dial_info.to_socket_addr(); + tcp::RawTcpProtocolHandler::send_recv_unbound_message( + peer_socket_addr, + data, + timeout_ms, + ) + .await + } + ProtocolType::WS | ProtocolType::WSS => { + ws::WebsocketProtocolHandler::send_recv_unbound_message(dial_info, data, timeout_ms) + .await + } + } + } + pub fn descriptor(&self) -> ConnectionDescriptor { match self { Self::Dummy(d) => d.descriptor(), diff --git a/veilid-core/src/network_manager/native/protocol/tcp.rs b/veilid-core/src/network_manager/native/protocol/tcp.rs index f8e3b000..8318ae9a 100644 --- a/veilid-core/src/network_manager/native/protocol/tcp.rs +++ b/veilid-core/src/network_manager/native/protocol/tcp.rs @@ -41,7 +41,7 @@ impl RawTcpNetworkConnection { .map_err(map_to_string) } - async fn send_internal(mut stream: AsyncPeekStream, message: Vec) -> Result<(), String> { + async fn send_internal(stream: &mut AsyncPeekStream, message: Vec) -> Result<(), String> { log_net!("sending TCP message of size {}", message.len()); if message.len() > MAX_MESSAGE_SIZE { return Err("sending too large TCP message".to_owned()); @@ -55,16 +55,13 @@ impl RawTcpNetworkConnection { #[instrument(level="trace", err, skip(self, message), fields(message.len = message.len()))] pub async fn send(&self, message: Vec) -> Result<(), String> { - let stream = self.stream.clone(); - Self::send_internal(stream, message).await + let mut stream = self.stream.clone(); + Self::send_internal(&mut stream, message).await } - #[instrument(level="trace", err, skip(self), fields(message.len))] - pub async fn recv(&self) -> Result, String> { + pub async fn recv_internal(stream: &mut AsyncPeekStream) -> Result, String> { let mut header = [0u8; 4]; - let mut stream = self.stream.clone(); - stream .read_exact(&mut header) .await @@ -80,7 +77,14 @@ impl RawTcpNetworkConnection { let mut out: Vec = vec![0u8; len]; stream.read_exact(&mut out).await.map_err(map_to_string)?; - tracing::Span::current().record("message.len", &out.len()); + Ok(out) + } + + #[instrument(level="trace", err, skip(self), fields(ret.len))] + pub async fn recv(&self) -> Result, String> { + let mut stream = self.stream.clone(); + let out = Self::recv_internal(&mut stream).await?; + tracing::Span::current().record("ret.len", &out.len()); Ok(out) } } @@ -212,11 +216,54 @@ impl RawTcpProtocolHandler { // .local_addr() // .map_err(map_to_string) // .map_err(logthru_net!("could not get local address from TCP stream"))?; - let ps = AsyncPeekStream::new(ts.clone()); + let mut ps = AsyncPeekStream::new(ts.clone()); // Send directly from the raw network connection // this builds the connection and tears it down immediately after the send - RawTcpNetworkConnection::send_internal(ps, data).await + RawTcpNetworkConnection::send_internal(&mut ps, data).await + } + + #[instrument(level = "trace", err, skip(data), fields(data.len = data.len(), ret.len))] + pub async fn send_recv_unbound_message( + socket_addr: SocketAddr, + data: Vec, + timeout_ms: u32, + ) -> Result, String> { + if data.len() > MAX_MESSAGE_SIZE { + return Err("sending too large unbound TCP message".to_owned()); + } + trace!( + "sending unbound message of length {} to {}", + data.len(), + socket_addr + ); + + // Make a shared socket + let socket = new_unbound_shared_tcp_socket(socket2::Domain::for_address(socket_addr))?; + + // Non-blocking connect to remote address + let ts = nonblocking_connect(socket, socket_addr) + .await + .map_err(map_to_string) + .map_err(logthru_net!(error "remote_addr={}", socket_addr))?; + + // See what local address we ended up with and turn this into a stream + // let actual_local_address = ts + // .local_addr() + // .map_err(map_to_string) + // .map_err(logthru_net!("could not get local address from TCP stream"))?; + let mut ps = AsyncPeekStream::new(ts.clone()); + + // Send directly from the raw network connection + // this builds the connection and tears it down immediately after the send + RawTcpNetworkConnection::send_internal(&mut ps, data).await?; + + let out = timeout(timeout_ms, RawTcpNetworkConnection::recv_internal(&mut ps)) + .await + .map_err(map_to_string)??; + + tracing::Span::current().record("ret.len", &out.len()); + Ok(out) } } diff --git a/veilid-core/src/network_manager/native/protocol/udp.rs b/veilid-core/src/network_manager/native/protocol/udp.rs index fb67f45e..961772b6 100644 --- a/veilid-core/src/network_manager/native/protocol/udp.rs +++ b/veilid-core/src/network_manager/native/protocol/udp.rs @@ -68,6 +68,7 @@ impl RawUdpProtocolHandler { } } + #[instrument(level = "trace", err, skip(data), fields(data.len = data.len()))] pub async fn send_unbound_message( socket_addr: SocketAddr, data: Vec, @@ -104,4 +105,61 @@ impl RawUdpProtocolHandler { Ok(()) } } + + #[instrument(level = "trace", err, skip(data), fields(data.len = data.len(), ret.len))] + pub async fn send_recv_unbound_message( + socket_addr: SocketAddr, + data: Vec, + timeout_ms: u32, + ) -> Result, String> { + if data.len() > MAX_MESSAGE_SIZE { + return Err("sending too large unbound UDP message".to_owned()) + .map_err(logthru_net!(error)); + } + log_net!( + "sending unbound message of length {} to {}", + data.len(), + socket_addr + ); + + // get local wildcard address for bind + let local_socket_addr = match socket_addr { + SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), + SocketAddr::V6(_) => { + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), 0) + } + }; + + // get unspecified bound socket + let socket = UdpSocket::bind(local_socket_addr) + .await + .map_err(map_to_string) + .map_err(logthru_net!(error "failed to bind unbound udp socket"))?; + let len = socket + .send_to(&data, socket_addr) + .await + .map_err(map_to_string) + .map_err(logthru_net!(error "failed unbound udp send: addr={}", socket_addr))?; + if len != data.len() { + return Err("UDP partial unbound send".to_owned()).map_err(logthru_net!(error)); + } + + // receive single response + let mut out = vec![0u8; MAX_MESSAGE_SIZE]; + let (len, from_addr) = timeout(timeout_ms, socket.recv_from(&mut out)) + .await + .map_err(map_to_string)? + .map_err(map_to_string)?; + + // if the from address is not the same as the one we sent to, then drop this + if from_addr != socket_addr { + return Err(format!( + "Unbound response received from wrong address: addr={}", + from_addr, + )); + } + out.resize(len, 0u8); + tracing::Span::current().record("ret.len", &len); + Ok(out) + } } diff --git a/veilid-core/src/network_manager/native/protocol/ws.rs b/veilid-core/src/network_manager/native/protocol/ws.rs index a93776f3..4b0ca448 100644 --- a/veilid-core/src/network_manager/native/protocol/ws.rs +++ b/veilid-core/src/network_manager/native/protocol/ws.rs @@ -59,7 +59,7 @@ where .map_err(map_to_string) } - #[instrument(level="trace", err, skip(self, message), fields(message.len = message.len()))] + #[instrument(level = "trace", err, skip(self, message), fields(message.len = message.len()))] pub async fn send(&self, message: Vec) -> Result<(), String> { if message.len() > MAX_MESSAGE_SIZE { return Err("received too large WS message".to_owned()); @@ -72,7 +72,7 @@ where .map_err(logthru_net!(error "failed to send websocket message")) } - #[instrument(level="trace", err, skip(self), fields(message.len))] + #[instrument(level = "trace", err, skip(self), fields(ret.len))] pub async fn recv(&self) -> Result, String> { let out = match self.stream.clone().next().await { Some(Ok(Message::Binary(v))) => v, @@ -89,7 +89,7 @@ where if out.len() > MAX_MESSAGE_SIZE { Err("sending too large WS message".to_owned()).map_err(logthru_net!(error)) } else { - tracing::Span::current().record("message.len", &out.len()); + tracing::Span::current().record("ret.len", &out.len()); Ok(out) } } @@ -283,11 +283,6 @@ impl WebsocketProtocolHandler { if data.len() > MAX_MESSAGE_SIZE { return Err("sending too large unbound WS message".to_owned()); } - trace!( - "sending unbound websocket message of length {} to {}", - data.len(), - dial_info, - ); let protconn = Self::connect_internal(None, dial_info.clone()) .await @@ -295,6 +290,29 @@ impl WebsocketProtocolHandler { protconn.send(data).await } + + #[instrument(level = "trace", err, skip(data), fields(data.len = data.len(), ret.len))] + pub async fn send_recv_unbound_message( + dial_info: DialInfo, + data: Vec, + timeout_ms: u32, + ) -> Result, String> { + if data.len() > MAX_MESSAGE_SIZE { + return Err("sending too large unbound WS message".to_owned()); + } + + let protconn = Self::connect_internal(None, dial_info.clone()) + .await + .map_err(|e| format!("failed to connect websocket for unbound message: {}", e))?; + + protconn.send(data).await?; + let out = timeout(timeout_ms, protconn.recv()) + .await + .map_err(map_to_string)??; + + tracing::Span::current().record("ret.len", &out.len()); + Ok(out) + } } impl ProtocolAcceptHandler for WebsocketProtocolHandler { diff --git a/veilid-core/src/network_manager/wasm/mod.rs b/veilid-core/src/network_manager/wasm/mod.rs index e938d1a3..96f1fcd7 100644 --- a/veilid-core/src/network_manager/wasm/mod.rs +++ b/veilid-core/src/network_manager/wasm/mod.rs @@ -79,6 +79,46 @@ impl Network { res } + // Send data to a dial info, unbound, using a new connection from a random port + // Waits for a specified amount of time to receive a single response + // This creates a short-lived connection in the case of connection-oriented protocols + // for the purpose of sending this one message. + // This bypasses the connection table as it is not a 'node to node' connection. + #[instrument(level="trace", err, skip(self, data), fields(data.len = data.len(), ret.len))] + pub async fn send_recv_data_unbound_to_dial_info( + &self, + dial_info: DialInfo, + data: Vec, + timeout_ms: u32, + ) -> Result, String> { + let data_len = data.len(); + let out = match dial_info.protocol_type() { + ProtocolType::UDP => { + return Err("no support for UDP protocol".to_owned()).map_err(logthru_net!(error)) + } + ProtocolType::TCP => { + return Err("no support for TCP protocol".to_owned()).map_err(logthru_net!(error)) + } + ProtocolType::WS | ProtocolType::WSS => { + WebsocketProtocolHandler::send_recv_unbound_message( + dial_info.clone(), + data, + timeout_ms, + ) + .await? + } + }; + + // Network accounting + self.network_manager() + .stats_packet_sent(dial_info.to_ip_addr(), data_len as u64); + self.network_manager() + .stats_packet_rcvd(dial_info.to_ip_addr(), out.len() as u64); + + tracing::Span::current().record("ret.len", &out.len()); + Ok(out) + } + #[instrument(level="trace", err, skip(self, data), fields(data.len = data.len()))] pub async fn send_data_to_existing_connection( &self, diff --git a/veilid-core/src/network_manager/wasm/protocol/mod.rs b/veilid-core/src/network_manager/wasm/protocol/mod.rs index 3b7d5884..7938e247 100644 --- a/veilid-core/src/network_manager/wasm/protocol/mod.rs +++ b/veilid-core/src/network_manager/wasm/protocol/mod.rs @@ -18,10 +18,10 @@ impl ProtocolNetworkConnection { ) -> Result { match dial_info.protocol_type() { ProtocolType::UDP => { - panic!("UDP dial info is not support on WASM targets"); + panic!("UDP dial info is not supported on WASM targets"); } ProtocolType::TCP => { - panic!("TCP dial info is not support on WASM targets"); + panic!("TCP dial info is not supported on WASM targets"); } ProtocolType::WS | ProtocolType::WSS => { ws::WebsocketProtocolHandler::connect(local_address, dial_info).await @@ -35,10 +35,10 @@ impl ProtocolNetworkConnection { ) -> Result<(), String> { match dial_info.protocol_type() { ProtocolType::UDP => { - panic!("UDP dial info is not support on WASM targets"); + panic!("UDP dial info is not supported on WASM targets"); } ProtocolType::TCP => { - panic!("TCP dial info is not support on WASM targets"); + panic!("TCP dial info is not supported on WASM targets"); } ProtocolType::WS | ProtocolType::WSS => { ws::WebsocketProtocolHandler::send_unbound_message(dial_info, data).await @@ -46,6 +46,25 @@ impl ProtocolNetworkConnection { } } + pub async fn send_recv_unbound_message( + dial_info: DialInfo, + data: Vec, + timeout_ms: u32, + ) -> Result, String> { + match dial_info.protocol_type() { + ProtocolType::UDP => { + panic!("UDP dial info is not supported on WASM targets"); + } + ProtocolType::TCP => { + panic!("TCP dial info is not supported on WASM targets"); + } + ProtocolType::WS | ProtocolType::WSS => { + ws::WebsocketProtocolHandler::send_recv_unbound_message(dial_info, data, timeout_ms) + .await + } + } + } + pub fn descriptor(&self) -> ConnectionDescriptor { match self { Self::Dummy(d) => d.descriptor(), diff --git a/veilid-core/src/network_manager/wasm/protocol/ws.rs b/veilid-core/src/network_manager/wasm/protocol/ws.rs index f2952570..96f8b480 100644 --- a/veilid-core/src/network_manager/wasm/protocol/ws.rs +++ b/veilid-core/src/network_manager/wasm/protocol/ws.rs @@ -36,10 +36,12 @@ impl WebsocketNetworkConnection { self.descriptor.clone() } + #[instrument(level = "trace", err, skip(self))] pub async fn close(&self) -> Result<(), String> { self.inner.ws_meta.close().await.map_err(map_to_string).map(drop) } + #[instrument(level = "trace", err, skip(self, message), fields(message.len = message.len()))] pub async fn send(&self, message: Vec) -> Result<(), String> { if message.len() > MAX_MESSAGE_SIZE { return Err("sending too large WS message".to_owned()).map_err(logthru_net!(error)); @@ -49,6 +51,8 @@ impl WebsocketNetworkConnection { .map_err(|_| "failed to send to websocket".to_owned()) .map_err(logthru_net!(error)) } + + #[instrument(level = "trace", err, skip(self), fields(ret.len))] pub async fn recv(&self) -> Result, String> { let out = match self.inner.ws_stream.clone().next().await { Some(WsMessage::Binary(v)) => v, @@ -74,6 +78,7 @@ impl WebsocketNetworkConnection { pub struct WebsocketProtocolHandler {} impl WebsocketProtocolHandler { + #[instrument(level = "trace", err)] pub async fn connect( local_address: Option, dial_info: DialInfo, @@ -99,28 +104,47 @@ impl WebsocketProtocolHandler { .map_err(logthru_net!(error))?; // Make our connection descriptor - Ok(ProtocolNetworkConnection::Ws(WebsocketNetworkConnection::new(ConnectionDescriptor::new_no_local( dial_info.to_peer_address(), ), wsmeta, wsio))) } + #[instrument(level = "trace", err, skip(data), fields(data.len = data.len()))] pub async fn send_unbound_message(dial_info: DialInfo, data: Vec) -> Result<(), String> { if data.len() > MAX_MESSAGE_SIZE { return Err("sending too large unbound WS message".to_owned()); } - trace!( - "sending unbound websocket message of length {} to {}", - data.len(), - dial_info, - ); // Make the real connection let conn = Self::connect(None, dial_info) .await .map_err(|e| format!("failed to connect websocket for unbound message: {}", e))?; - conn.send(data).await - + conn.send(data).await } + + #[instrument(level = "trace", err, skip(data), fields(data.len = data.len(), ret.len))] + pub async fn send_recv_unbound_message( + dial_info: DialInfo, + data: Vec, + timeout_ms: u32, + ) -> Result, String> { + if data.len() > MAX_MESSAGE_SIZE { + return Err("sending too large unbound WS message".to_owned()); + } + + let conn = Self::connect(None, dial_info.clone()) + .await + .map_err(|e| format!("failed to connect websocket for unbound message: {}", e))?; + + conn.send(data).await?; + let out = timeout(timeout_ms, conn.recv()) + .await + .map_err(map_to_string)??; + + tracing::Span::current().record("ret.len", &out.len()); + Ok(out) + } + + } diff --git a/veilid-core/src/routing_table/find_nodes.rs b/veilid-core/src/routing_table/find_nodes.rs index 57b9abed..5601f723 100644 --- a/veilid-core/src/routing_table/find_nodes.rs +++ b/veilid-core/src/routing_table/find_nodes.rs @@ -187,7 +187,7 @@ impl RoutingTable { } // add all nodes from buckets - Self::with_entries_unlocked(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { + Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { // Apply filter if filter(k, Some(v.clone())) { nodes.push((k, Some(v.clone()))); @@ -352,7 +352,7 @@ impl RoutingTable { let mut best_inbound_relay: Option<(DHTKey, Arc)> = None; // Iterate all known nodes for candidates - Self::with_entries_unlocked(inner, cur_ts, BucketEntryState::Unreliable, |k, v| { + Self::with_entries(inner, cur_ts, BucketEntryState::Unreliable, |k, v| { // Ensure this node is not on our local network if v.with(|e| { e.local_node_info() diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 0a73bf2a..039b3b1b 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -337,7 +337,7 @@ impl RoutingTable { // Public dial info changed, go through all nodes and reset their 'seen our node info' bit if matches!(domain, RoutingDomain::PublicInternet) { let cur_ts = intf::get_timestamp(); - Self::with_entries_unlocked(&*inner, cur_ts, BucketEntryState::Dead, |_, v| { + Self::with_entries(&*inner, cur_ts, BucketEntryState::Dead, |_, v| { v.with_mut(|e| e.set_seen_our_node_info(false)); Option::<()>::None }); @@ -450,18 +450,13 @@ impl RoutingTable { let inner = this.inner.read(); let mut node_refs = Vec::::with_capacity(inner.bucket_entry_count); let cur_ts = intf::get_timestamp(); - Self::with_entries_unlocked( - &*inner, - cur_ts, - BucketEntryState::Unreliable, - |k, v| { - // Only update nodes that haven't seen our node info yet - if !v.with(|e| e.has_seen_our_node_info()) { - node_refs.push(NodeRef::new(this.clone(), k, v, None)); - } - Option::<()>::None - }, - ); + Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { + // Only update nodes that haven't seen our node info yet + if !v.with(|e| e.has_seen_our_node_info()) { + node_refs.push(NodeRef::new(this.clone(), k, v, None)); + } + Option::<()>::None + }); node_refs }; @@ -540,14 +535,14 @@ impl RoutingTable { fn get_entry_count(inner: &RoutingTableInner, min_state: BucketEntryState) -> usize { let mut count = 0usize; let cur_ts = intf::get_timestamp(); - Self::with_entries_unlocked(inner, cur_ts, min_state, |_, _| { + Self::with_entries(inner, cur_ts, min_state, |_, _| { count += 1; Option::<()>::None }); count } - fn with_entries_unlocked) -> Option>( + fn with_entries) -> Option>( inner: &RoutingTableInner, cur_ts: u64, min_state: BucketEntryState, @@ -565,54 +560,6 @@ impl RoutingTable { None } - // fn with_entries Option>( - // inner: &RoutingTableInner, - // cur_ts: u64, - // min_state: BucketEntryState, - // mut f: F, - // ) -> Option { - // for bucket in &inner.buckets { - // for entry in bucket.entries() { - // let out = entry.1.with(|e| { - // if e.state(cur_ts) >= min_state { - // if let Some(out) = f(entry.0, e) { - // return Some(out); - // } - // } - // None - // }); - // if out.is_some() { - // return out; - // } - // } - // } - // None - // } - - // fn with_entries_mut Option>( - // inner: &RoutingTableInner, - // cur_ts: u64, - // min_state: BucketEntryState, - // mut f: F, - // ) -> Option { - // for bucket in &inner.buckets { - // for entry in bucket.entries() { - // let out = entry.1.with_mut(|e| { - // if e.state(cur_ts) >= min_state { - // if let Some(out) = f(entry.0, e) { - // return Some(out); - // } - // } - // None - // }); - // if out.is_some() { - // return out; - // } - // } - // } - // None - // } - fn queue_bucket_kick(&self, node_id: DHTKey) { let mut inner = self.inner.write(); let idx = Self::find_bucket_index(&*inner, node_id); diff --git a/veilid-core/src/routing_table/tasks.rs b/veilid-core/src/routing_table/tasks.rs index 6cb5a878..72aee5e2 100644 --- a/veilid-core/src/routing_table/tasks.rs +++ b/veilid-core/src/routing_table/tasks.rs @@ -3,13 +3,14 @@ use super::*; use crate::dht::*; use crate::xx::*; use crate::*; +use stop_token::future::FutureExt; impl RoutingTable { // Compute transfer statistics to determine how 'fast' a node is #[instrument(level = "trace", skip(self), err)] pub(super) async fn rolling_transfers_task_routine( self, - stop_token: StopToken, + _stop_token: StopToken, last_ts: u64, cur_ts: u64, ) -> Result<(), String> { @@ -196,6 +197,46 @@ impl RoutingTable { Ok(bsmap) } + // 'direct' bootstrap task routine for systems incapable of resolving TXT records, such as browser WASM + async fn direct_bootstrap_task_routine( + self, + stop_token: StopToken, + bootstrap_dialinfos: Vec, + ) -> Result<(), String> { + let network_manager = self.network_manager(); + + let mut unord = FuturesUnordered::new(); + for bootstrap_di in bootstrap_dialinfos { + let peer_info = match network_manager.boot_request(bootstrap_di).await { + Ok(v) => v, + Err(e) => { + error!("BOOT request failed: {}", e); + continue; + } + }; + + // Got peer info, let's add it to the routing table + for pi in peer_info { + let k = pi.node_id.key; + // Register the node + let nr = self + .register_node_with_signed_node_info(k, pi.signed_node_info) + .map_err(logthru_rtab!(error "Couldn't add bootstrap node: {}", k))?; + + // Add this our futures to process in parallel + unord.push( + // lets ask bootstrap to find ourselves now + self.reverse_find_node(nr, true), + ); + } + } + + // Wait for all bootstrap operations to complete before we complete the singlefuture + while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} + + Ok(()) + } + #[instrument(level = "trace", skip(self), err)] pub(super) async fn bootstrap_task_routine(self, stop_token: StopToken) -> Result<(), String> { let (bootstrap, bootstrap_nodes) = { @@ -208,8 +249,22 @@ impl RoutingTable { log_rtab!(debug "--- bootstrap_task"); - // If we aren't specifying a bootstrap node list explicitly, then pull from the bootstrap server(s) + // See if we are specifying a direct dialinfo for bootstrap, if so use the direct mechanism + if !bootstrap_nodes.is_empty() { + let mut bootstrap_dialinfos = Vec::::new(); + for b in &bootstrap { + if let Ok(bootstrap_di) = DialInfo::from_str(&b) { + bootstrap_dialinfos.push(bootstrap_di); + } + } + if bootstrap_dialinfos.len() > 0 { + return self + .direct_bootstrap_task_routine(stop_token, bootstrap_dialinfos) + .await; + } + } + // If we aren't specifying a bootstrap node list explicitly, then pull from the bootstrap server(s) let bsmap: BootstrapRecordMap = if !bootstrap_nodes.is_empty() { let mut bsmap = BootstrapRecordMap::new(); let mut bootstrap_node_dial_infos = Vec::new(); @@ -290,7 +345,7 @@ impl RoutingTable { } // Wait for all bootstrap operations to complete before we complete the singlefuture - while unord.next().await.is_some() {} + while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} Ok(()) } @@ -311,7 +366,7 @@ impl RoutingTable { { let inner = self.inner.read(); - Self::with_entries_unlocked(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { + Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { if v.with(|e| e.needs_ping(&k, cur_ts, relay_node_id)) { let nr = NodeRef::new(self.clone(), k, v, None); unord.push(MustJoinHandle::new(intf::spawn_local( @@ -323,7 +378,7 @@ impl RoutingTable { } // Wait for futures to complete - while unord.next().await.is_some() {} + while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} Ok(()) } @@ -340,7 +395,7 @@ impl RoutingTable { let inner = self.inner.read(); let mut noderefs = Vec::::with_capacity(inner.bucket_entry_count); let cur_ts = intf::get_timestamp(); - Self::with_entries_unlocked(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { + Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { noderefs.push(NodeRef::new(self.clone(), k, v, None)); Option::<()>::None }); @@ -353,7 +408,7 @@ impl RoutingTable { log_rtab!("--- peer minimum search with {:?}", nr); unord.push(self.reverse_find_node(nr, false)); } - while unord.next().await.is_some() {} + while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} Ok(()) }